-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathred_line.py
More file actions
222 lines (172 loc) · 6.75 KB
/
red_line.py
File metadata and controls
222 lines (172 loc) · 6.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import os
import tweepy
import time
import datetime
import threading
import logging
import logging.handlers as handlers
def connect_postgres():
'''
Connect to postgres and get secrets
Return: tuple of args to be used to start the bot
'''
import psycopg2
DATABASE_URL = os.environ['DATABASE_URL']
CONN = psycopg2.connect(DATABASE_URL, sslmode='require')
CUR = CONN.cursor()
CUR.execute('CREATE TABLE IF NOT EXISTS Data (Last_Tweet BIGINT, Incidents_This_Month INTEGER, Incidents_Last_Month INTEGER)')
CUR.execute('SELECT Last_Tweet FROM Data')
if CUR.fetchone() is None:
# if table was just created
CUR.execute('INSERT INTO Data VALUES (0, 0, 0)')
CONN.commit()
CONSUMER_KEY = os.environ.get('CONSUMER_KEY')
CONSUMER_SECRET = os.environ.get('CONSUMER_SECRET')
ACCESS_KEY = os.environ.get('ACCESS_KEY')
ACCESS_SECRET = os.environ.get('ACCESS_SECRET')
return (CONN, CUR, CONSUMER_KEY, CONSUMER_SECRET, ACCESS_KEY, ACCESS_SECRET)
def connect_sqlite():
'''
Connect to sqlite and get secrets
Return: tuple of args to be used to start the bot
'''
import sqlite3
CONN = sqlite3.connect('data.db')
CUR = CONN.cursor()
CONSUMER_KEY = CUR.execute('SELECT Consumer_Key FROM OAuth').fetchone()[0]
CONSUMER_SECRET = CUR.execute('SELECT Consumer_Secret FROM OAuth').fetchone()[0]
ACCESS_KEY = CUR.execute('SELECT Access_Key FROM OAuth').fetchone()[0]
ACCESS_SECRET = CUR.execute('SELECT Access_Secret FROM OAuth').fetchone()[0]
return (CONN, CUR, CONSUMER_KEY, CONSUMER_SECRET, ACCESS_KEY, ACCESS_SECRET)
class RedLine:
def __init__(self, CONN, CUR, CONSUMER_KEY, CONSUMER_SECRET, ACCESS_KEY, ACCESS_SECRET):
'''
Create our API object
'''
self.CONN = CONN
self.CUR = CUR
self.CONSUMER_KEY = CONSUMER_KEY
self.CONSUMER_SECRET = CONSUMER_SECRET
self.ACCESS_KEY = ACCESS_KEY
self.ACCESS_SECRET = ACCESS_SECRET
self.AUTH = tweepy.OAuthHandler(self.CONSUMER_KEY, self.CONSUMER_SECRET)
self.AUTH.set_access_token(self.ACCESS_KEY, self.ACCESS_SECRET)
self.API = tweepy.API(self.AUTH)
self.CTA_ID = self.API.get_user('CTA').id
self.CUR.execute('SELECT Last_Tweet FROM Data')
self.last_tweet_id = self.CUR.fetchone()[0]
def scan_for_tweets(self):
'''
Scan all tweets made since most recent tweet, or the most recent 20 tweets.
'''
logging.info('Starting ...')
print('Starting ... \n')
if self.last_tweet_id == 0:
logging.info('Getting 10 most recent tweets as starting point.')
most_recent_five = self.API.user_timeline(id=self.CTA_ID, count=10)
# reverse the list so the most recent tweet is last
most_recent_five.reverse()
# get the absolute most recent status ID
self.last_tweet_id = most_recent_five[-1].id
self.check_if_red_line(most_recent_five)
# push most recent tweet ID to db in case we lose connection
self.CUR.execute("UPDATE Data SET Last_Tweet = {0}".format(self.last_tweet_id))
self.CONN.commit()
while True:
cta_tweets_since_last = self.API.user_timeline(id=self.CTA_ID, count=10, since_id=self.last_tweet_id)
if cta_tweets_since_last:
self.last_tweet_id = cta_tweets_since_last[-1].id
self.check_if_red_line(cta_tweets_since_last)
self.CUR.execute("UPDATE Data SET Last_Tweet = {0}".format(self.last_tweet_id))
self.CONN.commit()
logging.debug('Most recent status: %s', self.last_tweet_id)
logging.debug('Sleeping ... ')
time.sleep(60)
def check_if_red_line(self, tweet_list: list):
'''
Check list of recent tweets for 'red line' and 'delays'
Args:
tweet_list: List of tweets to check.
'''
for tweet in tweet_list:
# we don't want replies to be processed
if tweet.in_reply_to_status_id is None:
tweet_text = tweet.text.lower()
tweet_words = tweet_text.split()
if 'red' in tweet_words and '[' in tweet_text:
logging.info('Retweeting status %s.', tweet.id)
print('Retweeting ' + 'https://twitter.com/cta/status/' + str(tweet.id) + '\n')
try:
self.API.retweet(tweet.id)
self._increment_incident_tally()
except tweepy.error.TweepError as e:
logging.error('ERROR RETWEETING: %s', str(e))
def _increment_incident_tally(self):
'''
Increment the Incidents_Per_Month tally stored in DB.
'''
logging.debug('Incrementing Incidents_This_Month ...')
self.CUR.execute('UPDATE Data SET Incidents_This_Month = Incidents_This_Month + 1')
class CheckDay(threading.Thread):
def __init__(self, red, CONN, CUR):
self.RED = red
self.CONN = CONN
self.CUR = CUR
threading.Thread.__init__(self)
def run(self):
'''
Check if it is the 1st of the month every 24 hours.
'''
while True:
logging.debug("Checking if it's the 1st of the month ... ")
now = datetime.datetime.now()
if now.day == 1:
logging.info('Tweeting the monthly tally ...')
self.CUR.execute('SELECT Incidents_This_Month, Incidents_Last_Month FROM Data')
incidents_this_month, incidents_last_month = self.CUR.fetchone()
delta = incidents_this_month - incidents_last_month
if delta > 0:
self.RED.API.update_status("There were {0} incidents in {1}. That's up {2} from last month.".format(incidents_this_month, now.strftime("%B"), abs(delta)))
elif delta < 0:
self.RED.API.update_status("There were {0} incidents in {1}. That's down {2} from last month.".format(incidents_this_month, now.strftime("%B"), abs(delta)))
# set this month's incident total to be last month's
self.CUR.execute('INSERT INTO Data Incidents_Last_Month = {0}'.format(incidents_this_month))
# reset counter for this month
self.CUR.execute('UPDATE Data SET Incidents_This_Month = 0')
self.CONN.commit()
time.sleep(84600)
def check_environment():
'''
Return string representing whether we are running on cloud or local
'''
if os.environ.get('DATABASE_URL') is not None:
# on heroku
return 'CLOUD'
else:
return 'LOCAL'
################ Setup Logger ################
log_name = 'log/log.log'
logger = logging.getLogger('')
logger.setLevel(logging.INFO)
formatter = logging.Formatter('[%(levelname)s] %(asctime)s: %(message)s')
log_handler = handlers.TimedRotatingFileHandler(log_name, when="midnight", interval=1)
log_handler.setLevel(logging.INFO)
log_handler.suffix = "%m%d%Y"
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)
#############################################
#### Check environment and connect to DB ####
environment = check_environment()
if environment == 'CLOUD':
args = connect_postgres()
elif environment == 'LOCAL':
args = connect_sqlite()
################### Start ###################
(CONN, CUR, CONSUMER_KEY, CONSUMER_SECRET, ACCESS_KEY, ACCESS_SECRET) = args
red = RedLine(CONN, CUR, CONSUMER_KEY, CONSUMER_SECRET, ACCESS_KEY, ACCESS_SECRET)
# thread to check the day
check_day = CheckDay(red, CONN, CUR)
check_day.start()
# start loop
red.scan_for_tweets()
#############################################