Skip to content

Commit e643dd2

Browse files
authored
Handle multiple crowdtangle dashboards in same pipeline, fetch and process in parallel (#200)
* get map of dashboard name -> id before pipeline start. attached dashboard id to encapsulated post instead of doing dashboard name -> id lookup in insert_post_dashboards * fix crowdtangle db_functions import * log Dashboard Names -> IDs * write_crowdtangle_results_to_database.py from master * post merge cleanup * Handle multiple dashboards in FetchCrowdTangle PTransform, to config multiple dashboards add config var DASHBOARD_CONFIG_SECTION_NAMES which should be a comma separated list of config sections to read dashboard config data from * var referene and other lint cleanups * bump minet dep version due to second-level dep issue (quenouille) * fix minet version * minet now returns objects as casanova.namedrecord, so convert it to dict before adding annotation. * Lots of changes to handle new post datastructure format from minet v0.52.8 CrowdTangleAPIClient * fix keyword arg for ExpandedLinkRecord * add necessary PostRecord namedtuple arg * fix make_statistics_record keyword arg * fix arg reference in make_statistics_record * import itertools * add default False for account_verified * check media and links values instead of just key presence * log ProcessCrowdTanglePosts.process input args for debugging * move crowdtangle fetch out of PTransform into own DoFn * move crowdtangle fetch out of PTransform into own DoFn * see if parallelism works with FlatMap * undo accidental uncapitalization of code block * remove unneeded code * remove more unneeded code * move date parsing out of dashboard parsing loop, move max_results_to_fetch to be dashboard_specific, simplify ref to config_section with new var instead of lookup each time * more cleanup * go back to version of minet currently in use in prod, and undo changes to post datastruct processing * undo unrelated changes in write_crowdtangle_results_to_database
1 parent 7cbcb75 commit e643dd2

File tree

4 files changed

+73
-56
lines changed

4 files changed

+73
-56
lines changed

crowdtangle/EXAMPLE.cfg

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,23 @@ PORT=<database port>
1313
# END_DATE=2021-01-02
1414
# If used, API query uses Today - DAYS_IN_PAST_TO_SYNC for start_date
1515
DAYS_IN_PAST_TO_SYNC=7
16+
# Comma separated list of config section names to read dashboard configurations from.
17+
DASHBOARD_CONFIG_SECTION_NAMES=DASHBOARD_1,DASHBOARD_2
18+
19+
[DASHBOARD_1]
1620
API_TOKEN=<crowdtangle API token>
1721
# Dashboard name can be any string. This is used to track which dashboards a posts comes from (potentially multiple)
18-
DASHBOARD_NAME=<dashboard name,
22+
DASHBOARD_NAME=<dashboard name>
1923
# Limit on number of results to fetch from API. If not specified no limit used.
2024
# MAX_RESULTS_TO_FETCH=10000000
2125
# Comman separated list of crowdtangle list ID(s). Leave empty to get posts from all lists (associated to the API token)
2226
# LIST_IDS=
27+
28+
[DASHBOARD_2]
29+
API_TOKEN=<crowdtangle API token>
30+
# Dashboard name can be any string. This is used to track which dashboards a posts comes from (potentially multiple)
31+
DASHBOARD_NAME=<dashboard name>
32+
# Limit on number of results to fetch from API. If not specified no limit used.
33+
# MAX_RESULTS_TO_FETCH=10000000
34+
# Comman separated list of crowdtangle list ID(s). Leave empty to get posts from all lists (associated to the API token)
35+
LIST_IDS=2,3

crowdtangle/fetch_crowdtangle.py

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,16 @@
77
from minet.crowdtangle.exceptions import CrowdTangleError
88

99

10-
FetchCrowdTangleArgs = namedtuple('FetchCrowdTangleArgs', ['start_date',
10+
FetchCrowdTangleArgs = namedtuple('FetchCrowdTangleArgs', ['api_token',
11+
'start_date',
1112
'end_date',
1213
'list_ids',
1314
'dashboard_id',
1415
'max_results_to_fetch'])
1516

16-
1717
class FetchCrowdTangle(PTransform):
18-
def __init__(self, *args, api_token=None, crowdtangle_client=None, **kwargs):
19-
super().__init__(*args, **kwargs)
20-
if api_token and crowdtangle_client:
21-
raise ValueError('api_token and crowdtangle_client args are mutually exclusive.')
22-
self._api_token = api_token
23-
self._crowdtangle_client = crowdtangle_client
24-
25-
def get_crowdtangle_client(self):
26-
"""Returns the CrowdTangleAPIClient provided in the constructor, or creates a new client
27-
from API token stores in GCP secrets manager.
28-
29-
This is neccessary because CrowdTangleAPIClient hangs when pickled and then depickled (which
30-
Apache Beam does sometimes for PTransform objects)
31-
"""
32-
if self._crowdtangle_client:
33-
return self._crowdtangle_client
34-
35-
return CrowdTangleAPIClient(token=self._api_token)
36-
3718
def fetch(self, input_args):
19+
logging.info('in FetchCrowdTangle.fetch input_args: %s', input_args)
3820
try:
3921
start_date = input_args.start_date
4022
except KeyError as e:
@@ -59,7 +41,7 @@ def fetch(self, input_args):
5941
logging.info('Querying CrowdTangle. %s', query_info_message)
6042
num_posts = 0
6143
try:
62-
crowdtangle_client = self.get_crowdtangle_client()
44+
crowdtangle_client = CrowdTangleAPIClient(token=input_args.api_token)
6345
for post in crowdtangle_client.posts(start_date=start_date, end_date=end_date,
6446
partition_strategy=partition_strategy,
6547
sort_by=sort_by, format=format_val,
@@ -82,5 +64,5 @@ def expand(self, p):
8264
(if encountered)
8365
"""
8466
return (
85-
p | "Fetch CrowdTangle results" >> beam.FlatMap(self.fetch).with_outputs('api_results',
67+
p | "Fetch CrowdTangle results" >> beam.ParDo(self.fetch).with_outputs('api_results',
8668
'errors'))

crowdtangle/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
setuptools
22
apache-beam>=2.28.0
33
minet==0.47.0
4-
psycopg2==2.8.6
4+
quenouille==0.6.6 # required due to intreface change that breaks minet
5+
psycopg2<3,>=2.8.6

crowdtangle/run_fetch_crowdtangle.py

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,64 @@
11
import argparse
2+
import configparser
23
import datetime
34
import logging
45

6+
from typing import Sequence
7+
58
import apache_beam as beam
69
from apache_beam.options.pipeline_options import PipelineOptions
710
from apache_beam.options.pipeline_options import SetupOptions
811

12+
import config_utils
913
from crowdtangle import fetch_crowdtangle
1014
from crowdtangle import process_crowdtangle_posts
1115
from crowdtangle import write_crowdtangle_results_to_database
12-
13-
import config_utils
1416
from crowdtangle import db_functions
1517

18+
19+
def get_dashboards_fetch_args(config: configparser.ConfigParser,
20+
database_connection_params: config_utils.DatabaseConnectionParams) -> Sequence[fetch_crowdtangle.FetchCrowdTangleArgs]:
21+
"""Gets list of config section names from ['CROWDTANGLE']['DASHBOARD_CONFIG_SECTION_NAMES'],
22+
parses API_TOKEN, DASHBOARD_NAME, LIST_IDS from those named config sections, and returns
23+
FetchCrowdTangleArgs for each named config section.
24+
"""
25+
dashboard_config_section_names = (
26+
config['CROWDTANGLE']['DASHBOARD_CONFIG_SECTION_NAMES'].split(','))
27+
28+
with config_utils.get_database_connection(database_connection_params) as db_connection:
29+
db_interface = db_functions.CrowdTangleDBInterface(db_connection)
30+
dashboard_name_to_id = db_interface.all_dashboards_name_to_id()
31+
logging.info('Dashboard Names -> IDs: %s', dashboard_name_to_id)
32+
33+
if 'DAYS_IN_PAST_TO_SYNC' in config['CROWDTANGLE']:
34+
start_date = (datetime.date.today() -
35+
datetime.timedelta(days=config['CROWDTANGLE'].getint('DAYS_IN_PAST_TO_SYNC'))
36+
).isoformat()
37+
end_date = None
38+
else:
39+
start_date = config['CROWDTANGLE'].get('START_DATE')
40+
end_date = config['CROWDTANGLE'].get('END_DATE', None)
41+
42+
fetch_args_list = []
43+
for config_section_name in dashboard_config_section_names:
44+
config_section = config[config_section_name]
45+
api_token = config_section.get('API_TOKEN')
46+
dashboard_name = config_section.get('DASHBOARD_NAME')
47+
max_results_to_fetch = config_section.getint('MAX_RESULTS_TO_FETCH', None)
48+
list_ids = config_section.get('LIST_IDS', None)
49+
if list_ids:
50+
list_ids = list_ids.split(',')
51+
52+
fetch_args_list.append(fetch_crowdtangle.FetchCrowdTangleArgs(
53+
api_token=api_token,
54+
list_ids=list_ids,
55+
start_date=start_date,
56+
end_date=end_date,
57+
dashboard_id=dashboard_name_to_id[dashboard_name],
58+
max_results_to_fetch=max_results_to_fetch))
59+
return fetch_args_list
60+
61+
1662
def run(argv=None, save_main_session=True):
1763
"""Main entry point; defines and runs the wordcount pipeline."""
1864
parser = argparse.ArgumentParser()
@@ -29,39 +75,14 @@ def run(argv=None, save_main_session=True):
2975
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
3076

3177
config = config_utils.get_config(known_args.config_path)
32-
max_results_to_fetch = config['CROWDTANGLE'].getint('MAX_RESULTS_TO_FETCH', None)
33-
if 'DAYS_IN_PAST_TO_SYNC' in config['CROWDTANGLE']:
34-
start_date = (datetime.date.today() -
35-
datetime.timedelta(days=config['CROWDTANGLE'].getint('DAYS_IN_PAST_TO_SYNC'))
36-
).isoformat()
37-
end_date = None
38-
else:
39-
start_date = config['CROWDTANGLE'].get('START_DATE')
40-
end_date = config['CROWDTANGLE'].get('END_DATE', None)
41-
api_token = config['CROWDTANGLE'].get('API_TOKEN')
42-
list_ids = config['CROWDTANGLE'].get('LIST_IDS', None)
43-
dashboard_name = config['CROWDTANGLE'].get('DASHBOARD_NAME')
44-
if list_ids:
45-
list_ids = list_ids.split(',')
46-
4778
database_connection_params = config_utils.get_database_connection_params_from_config(config)
48-
with config_utils.get_database_connection(database_connection_params) as db_connection:
49-
db_interface = db_functions.CrowdTangleDBInterface(db_connection)
50-
dashboard_name_to_id = db_interface.all_dashboards_name_to_id()
51-
logging.info('Dashboard Names -> IDs: %s', dashboard_name_to_id)
52-
53-
fetch_crowdtangle_args = fetch_crowdtangle.FetchCrowdTangleArgs(
54-
list_ids=list_ids,
55-
start_date=start_date,
56-
end_date=end_date,
57-
dashboard_id=dashboard_name_to_id[dashboard_name],
58-
max_results_to_fetch=max_results_to_fetch)
79+
fetch_args_list = get_dashboards_fetch_args(config, database_connection_params)
5980

60-
logging.info('About to start crowdtangle fetch pipline with args: %s', fetch_crowdtangle_args)
81+
logging.info('About to start crowdtangle fetch pipline with args: %s', fetch_args_list)
6182
with beam.Pipeline(options=pipeline_options) as pipeline:
6283
results, errors = (
63-
pipeline | beam.Create([fetch_crowdtangle_args])
64-
| 'Fetch CrowdTangle results' >> fetch_crowdtangle.FetchCrowdTangle(api_token=api_token)
84+
pipeline | beam.Create(fetch_args_list)
85+
| 'Fetch CrowdTangle results' >> fetch_crowdtangle.FetchCrowdTangle()
6586
)
6687

6788
processed_results = (

0 commit comments

Comments
 (0)