-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtweets.py
More file actions
179 lines (144 loc) · 6.47 KB
/
tweets.py
File metadata and controls
179 lines (144 loc) · 6.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from news import *
import tweepy # python package for accessing Tweet streaming API
from tweepy import API
from tweepy import Stream
import json
import logging
import pandas as pd
import configparser
import requests
import geocoder
from datetime import date, timedelta
import sys
# import urllib.parse
config = configparser.ConfigParser()
config.read('config.ini')
access_token = config['twitterAuth']['access_token']
access_token_secret = config['twitterAuth']['access_token_secret']
consumer_key = config['twitterAuth']['consumer_key']
consumer_secret = config['twitterAuth']['consumer_secret']
news_api_key = config['newsAuth']['api_key']
# instantiate News class
news = News(news_api_key)
# get all news - takes about 30 seconds
news.get_all_news()
class Tweets():
def __init__(self, consumer_key, consumer_secret, access_token, access_token_secret, logger=logging):
self.logger = logging.basicConfig(filename='tweets.log', filemode='w',
format=f'%(asctime)s - %(levelname)s - %(message)s')
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
self.location = sys.argv[1] # user location as argument variable
# object with latitude & longitude of user location
self.geo = geocoder.osm(self.location)
def tweepy_auth(self):
"""Authorize tweepy API"""
self.auth = tweepy.OAuthHandler(self.consumer_key, self.consumer_secret)
self.auth.set_access_token(self.access_token, self.access_token_secret)
# create API object
self.api = API(self.auth, wait_on_rate_limit=True, user_agent=get_random_ua('Chrome'))# wait_on_rate_limit_notify=True)
try:
self.api.verify_credentials()
logging.info("Tweepy API Authenticated")
print('Tweepy authentication successful')
except Exception as e:
logging.error(f"Error during Tweepy authentication: {e}")
raise e
return self.api
def get_tweets(self, news_keywords, news_instance): # TODO add stream listening stuff to params
searched_tweets = self.tweet_search(news_keywords)
# stream_tweets = TwitterStreamListener.on_status(listener, tweet_stream)
def tweet_search(self, news_keywords):
"""Search for tweets within previous 7 days.
Inputs:
keyword list
Returns:
Tweet list => JSON
"""
api = self.api
# unpack keyword tuples
print('Searching for tweets matching keywords')
for keys in news_keywords:
keywords = list(keys) # TODO add itertools combinations
for word in keywords:
try:
result = api.search_tweets(q=str(
word) + " -filter:retweets", lang='en')
# print(type(result))
status = result[0]
# print(type(status))
tweet = status._json
search_tweet_count = len(tweet)
#self.file.write(json.dumps(tweets)+ '\\n')
tweet = json.dumps(tweet) # tweet to json string
assert (type(tweet) == str), "Tweet must be converted to JSON string"
tweet = json.loads(tweet) # tweet to dict
assert (type(tweet) == dict), "Tweet must be converted from JSON string to type dict"
except (TypeError) as e:
logging('Error: ', e)
print('Error: keyword not found in tweet search')
break
else:
# write tweets to json file
with open("tweets.json", "a") as f:
json.dump(tweet, f)
logging.info('Tweet search successful')
print('Tweet search by keyword was successful')
#finally:
# TODO add tweet unpacking & cleaning?
#pass
# TODO put tweets into db
# TODO
def clean_tweets(self, tweets):
# use slang.txt
# https://www.geeksforgeeks.org/python-efficient-text-data-cleaning/
pass
# define stream listener class
class TwitterStreamListener(tweepy.Stream):
def __init__(self, api=None):
super(tweepy.Stream, self).__init__()
# super(json.JSONEncoder, self).__init__()
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
self.api = api
self.num_tweets = 0
# self.file = open('tweets.txt', 'w')
self.tweet_list = []
# def toJson(self):
# return json.dumps(self, default=lambda o: o.__dict__)
def on_status(self, status):
tweet = status._json
with open("tweets.json", "w") as f:
# write tweets to json file
json.dump(tweet, f)
with open("tweets.json", "r") as file:
# create python object from json
tweets_json = file.read().split("\n")
for tweet in tweets_json:
tweet_obj = json.loads(tweet)
#flatten nested fields
if 'quoted_status' in tweet_obj:
tweet_obj['quote_tweet'] =tweet_obj['quoted_status']['extended_tweet']['full_text']
if 'user' in tweet_obj:
tweet_obj['location'] = tweet_obj['user']['location']
# if 'created_at' in tweet_obj:
# tweet_obj['created_at'] = pd.to_datetime(tweet)
self.tweet_list.append(status)
self.num_tweets += 1
# flatten data to dataframe
# tweets = pd.json_normalize(self.tweet_list, record_path=['articles'])
self.tweets_df = pd.DataFrame(self.tweet_list, columns=["tweet_id", "publishedAt", "userID", "text", "location"])
return self.tweets_df
if self.num_tweets < 450: # whatever the max stream rate is for the twitter API Client
return True
else:
return False
# keywords = dict(news.all_news_df["keywords"])
#print(keywords)
t = Tweets(consumer_key, consumer_secret, access_token, access_token_secret)
auth = t.tweepy_auth()
# search_df = t.tweet_search(keywords)