-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathbase.py
More file actions
374 lines (290 loc) · 12.6 KB
/
base.py
File metadata and controls
374 lines (290 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
import datetime
import itertools
import traceback
from collections import defaultdict, deque
import re
import requests
import json
import logging
import scrapelib
import lxml.html
import lxml.etree as etree
import pytz
class LegistarSession(requests.Session):
def request(self, method, url, **kwargs):
response = super(LegistarSession, self).request(method, url, **kwargs)
payload = kwargs.get('data')
self._check_errors(response, payload)
return response
def _check_errors(self, response, payload):
if response.url.endswith('Error.aspx'):
response.status_code = 503
raise scrapelib.HTTPError(response)
if not response.text:
if response.request.method.lower() in {'get', 'post'}:
response.status_code = 520
raise scrapelib.HTTPError(response)
if 'This record no longer exists. It might have been deleted.' in response.text:
response.status_code = 410
raise scrapelib.HTTPError(response)
if payload:
self._range_error(response, payload)
def _range_error(self, response, payload):
'''Legistar intermittently does not return the expected response when
selecting a time range when searching for events. Right now we
are only handling the 'All' range
'''
if self._range_is_all(payload):
expected_range = 'All Years'
page = lxml.html.fromstring(response.text)
returned_range, = page.xpath(
"//input[@id='ctl00_ContentPlaceHolder1_lstYears_Input']")
returned_range = returned_range.value
if returned_range != expected_range:
response.status_code = 520
# In the event of a retry, the new request does not
# contain the correct payload data. This comes as a
# result of not updating the payload via sessionSecrets:
# so, we do that here.
payload.update(self.sessionSecrets(page))
raise scrapelib.HTTPError(response)
def _range_is_all(self, payload):
range_var = 'ctl00_ContentPlaceHolder1_lstYears_ClientState'
all_range = (range_var in payload and
json.loads(payload[range_var])['value'] == 'All')
return all_range
class LegistarScraper(scrapelib.Scraper, LegistarSession):
date_format = '%m/%d/%Y'
def __init__(self, *args, **kwargs):
super(LegistarScraper, self).__init__(*args, **kwargs)
def lxmlize(self, url, payload=None):
'''
Gets page and returns as XML
'''
if payload:
response = self.post(url, payload, verify=False)
else:
response = self.get(url, verify=False)
entry = response.text
page = lxml.html.fromstring(entry)
page.make_links_absolute(url)
return page
def pages(self, url, payload=None):
page = self.lxmlize(url, payload)
yield page
next_page = page.xpath(
"//a[@class='rgCurrentPage']/following-sibling::a[1]")
if payload and 'ctl00$ContentPlaceHolder1$btnSearch' in payload:
del payload['ctl00$ContentPlaceHolder1$btnSearch']
while len(next_page) > 0:
if payload is None:
payload = {}
payload.update(self.sessionSecrets(page))
event_target = next_page[0].attrib['href'].split("'")[1]
payload['__EVENTTARGET'] = event_target
page = self.lxmlize(url, payload)
yield page
next_page = page.xpath(
"//a[@class='rgCurrentPage']/following-sibling::a[1]")
def parseDetails(self, detail_div):
"""
Parse the data in the top section of a detail page.
"""
detail_query = ".//*[starts-with(@id, 'ctl00_ContentPlaceHolder1_lbl')"\
" or starts-with(@id, 'ctl00_ContentPlaceHolder1_hyp')"\
" or starts-with(@id, 'ctl00_ContentPlaceHolder1_Label')]"
fields = detail_div.xpath(detail_query)
details = {}
for field_key, field in itertools.groupby(fields, fieldKey):
field = list(field)
field_1, field_2 = field[0], field[-1]
key = field_1.text_content().replace(':', '').strip()
if field_2.find('.//a') is not None:
value = []
for link in field_2.xpath('.//a'):
value.append({'label': link.text_content().strip(),
'url': self._get_link_address(link)})
elif 'href' in field_2.attrib:
value = {'label': field_2.text_content().strip(),
'url': self._get_link_address(field_2)}
elif self._parse_detail(key, field_1, field_2):
value = self._parse_detail(key, field_1, field_2)
else:
value = field_2.text_content().strip()
details[key] = value
return details
def parseDataTable(self, table):
"""
Legistar uses the same kind of data table in a number of
places. This will return a list of dictionaries using the
table headers as keys.
"""
headers = table.xpath(".//th[starts-with(@class, 'rgHeader')]")
rows = table.xpath(".//tr[@class='rgRow' or @class='rgAltRow']")
keys = []
for header in headers:
text_content = header.text_content().replace(' ', ' ').strip()
inputs = header.xpath('.//input')
if text_content:
keys.append(text_content)
elif len(inputs) > 0:
keys.append(header.xpath('.//input')[0].value)
else:
keys.append(header.xpath('.//img')[0].get('alt'))
for row in rows:
try:
data = defaultdict(lambda: None)
for key, field in zip(keys, row.xpath("./td")):
text_content = self._stringify(field)
if field.find('.//a') is not None:
address = self._get_link_address(field.find('.//a'))
if address:
if key.strip() in ['', 'ics'] and 'View.ashx?M=IC' in address:
key = 'iCalendar'
value = {'url': address}
else:
value = {'label': text_content,
'url': address}
else:
value = text_content
else:
value = text_content
data[key] = value
yield dict(data), keys, row
except Exception as e:
print('Problem parsing row:')
print(etree.tostring(row))
print(traceback.format_exc())
raise e
def _get_link_address(self, link):
url = None
if 'onclick' in link.attrib:
onclick = link.attrib['onclick']
if (onclick is not None and
onclick.startswith(("radopen('",
"window.open",
"OpenTelerikWindow"))):
onclick_path = onclick.split("'")[1]
if not onclick_path.startswith("/"):
onclick_path = "/" + onclick_path
url = self.BASE_URL + onclick_path
elif 'href' in link.attrib:
url = link.attrib['href']
return url
def _parse_detail(self, key, field_1, field_2):
"""
Perform custom parsing on a given key and field from a detail table.
Available for override on web scraper base classes.
"""
return None
def _stringify(self, field):
for br in field.xpath("*//br"):
br.tail = "\n" + br.tail if br.tail else "\n"
for em in field.xpath("*//em"):
if em.text:
em.text = "--em--" + em.text + "--em--"
return field.text_content().replace(' ', ' ').strip()
def toTime(self, text):
time = datetime.datetime.strptime(text, self.date_format)
time = pytz.timezone(self.TIMEZONE).localize(time)
return time
def toDate(self, text):
return self.toTime(text).date().isoformat()
def now(self):
return datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
def mdY2Ymd(self, text):
month, day, year = text.split('/')
return "%d-%02d-%02d" % (int(year), int(month), int(day))
def sessionSecrets(self, page):
payload = {}
payload['__EVENTARGUMENT'] = None
payload['__VIEWSTATE'] = page.xpath(
"//input[@name='__VIEWSTATE']/@value")[0]
try:
payload['__EVENTVALIDATION'] = page.xpath(
"//input[@name='__EVENTVALIDATION']/@value")[0]
except IndexError:
pass
return(payload)
def accept_response(self, response, **kwargs):
if response.status_code == 410:
return True
return super().accept_response(response, **kwargs)
def fieldKey(x):
field_id = x.attrib['id']
field = re.split(r'hyp|lbl|Label', field_id)[-1]
field = field.split('Prompt')[0]
field = field.rstrip('X21')
return field
class LegistarAPIScraper(scrapelib.Scraper):
date_format = '%Y-%m-%dT%H:%M:%S'
time_string_format = '%I:%M %p'
utc_timestamp_format = '%Y-%m-%dT%H:%M:%S.%f'
def __init__(self, *args, **kwargs):
super(LegistarAPIScraper, self).__init__(*args, **kwargs)
self.logger = logging.getLogger("legistar")
self.warning = self.logger.warning
def toTime(self, text):
time = datetime.datetime.strptime(text, self.date_format)
time = pytz.timezone(self.TIMEZONE).localize(time)
return time
def to_utc_timestamp(self, text):
try:
time = datetime.datetime.strptime(text, self.utc_timestamp_format)
except ValueError as e:
if 'does not match format' in str(e):
time = datetime.datetime.strptime(text, self.date_format)
else:
raise
time = pytz.timezone('UTC').localize(time)
return time
def search(self, route, item_key, search_conditions):
"""
Base function for searching the Legistar API.
Arguments:
route -- The path to search, i.e. /matters/, /events/, etc
item_key -- The unique id field for the items that you are searching.
This is necessary for proper pagination. examples
might be MatterId or EventId
search_conditions -- a string in the OData format for the
your search conditions http://www.odata.org/documentation/odata-version-3-0/url-conventions/#url5.1.2
It would be nice if we could provide a
friendly search API. Something like https://github.com/tuomur/python-odata
Examples:
# Search for bills introduced after Jan. 1, 2017
search('/matters/', 'MatterId', "MatterIntroDate gt datetime'2017-01-01'")
"""
search_url = self.BASE_URL + route
params = {'$filter': search_conditions}
try:
yield from self.pages(search_url,
params=params,
item_key=item_key)
except requests.HTTPError as e:
if e.response.status_code == 400:
raise ValueError(e.response.json()['Message'])
if not self.accept_response(e.response):
raise
def pages(self, url, params=None, item_key=None):
if params is None:
params = {}
seen = deque([], maxlen=1000)
page_num = 0
response = None
while page_num == 0 or len(response.json()) == 1000:
params['$skip'] = page_num * 1000
response = self.get(url, params=params)
response.raise_for_status()
for item in response.json():
if item[item_key] not in seen:
yield item
seen.append(item[item_key])
page_num += 1
def accept_response(self, response, **kwargs):
"""
This overrides a method that controls whether
the scraper should retry on an error. We don't
want to retry if the API returns a 400, except for
410, which means the record no longer exists.
"""
return response.status_code < 401 or response.status_code == 410