Skip to content

Commit 33bb9a2

Browse files
author
Stefan Zabka
committed
Removed collect_content from PySparkS3Dataset
Downloading files via the SparkContext was much slower than downloading via boto (which is what S3Dataset does. So now both classes use the same method, as PySparkS3Dataset inherits from S3Dataset
1 parent 3dadb31 commit 33bb9a2

File tree

1 file changed

+40
-62
lines changed

1 file changed

+40
-62
lines changed

openwpm_utils/s3.py

Lines changed: 40 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -8,68 +8,7 @@
88
from pyarrow.filesystem import S3FSWrapper # noqa
99
from pyspark.sql import SQLContext
1010

11-
12-
class PySparkS3Dataset(object):
13-
def __init__(self, spark_context, s3_directory,
14-
s3_bucket='openwpm-crawls'):
15-
"""Helper class to load OpenWPM datasets from S3 using PySpark
16-
17-
Parameters
18-
----------
19-
spark_context
20-
Spark context. In databricks, this is available via the `sc`
21-
variable.
22-
s3_directory : string
23-
Directory within the S3 bucket in which the dataset is saved.
24-
s3_bucket : string, optional
25-
The bucket name on S3. Defaults to `openwpm-crawls`.
26-
"""
27-
self._s3_bucket = s3_bucket
28-
self._s3_directory = s3_directory
29-
self._spark_context = spark_context
30-
self._sql_context = SQLContext(spark_context)
31-
self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % (
32-
s3_bucket, s3_directory)
33-
self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % (
34-
s3_bucket, s3_directory)
35-
36-
def read_table(self, table_name, columns=None):
37-
"""Read `table_name` from OpenWPM dataset into a pyspark dataframe.
38-
39-
Parameters
40-
----------
41-
table_name : string
42-
OpenWPM table to read
43-
columns : list of strings
44-
The set of columns to filter the parquet dataset by
45-
"""
46-
table = self._sql_context.read.parquet(self._s3_table_loc % table_name)
47-
if columns is not None:
48-
return table.select(columns)
49-
return table
50-
51-
def read_content(self, content_hash):
52-
"""Read the content corresponding to `content_hash`.
53-
54-
NOTE: This can only be run in the driver process since it requires
55-
access to the spark context
56-
"""
57-
return self._spark_context.textFile(
58-
self._s3_content_loc % content_hash)
59-
60-
def collect_content(self, content_hash, beautify=False):
61-
"""Collect content for `content_hash` to driver
62-
63-
NOTE: This can only be run in the driver process since it requires
64-
access to the spark context
65-
"""
66-
content = ''.join(self.read_content(content_hash).collect())
67-
if beautify:
68-
return jsbeautifier.beautify(content)
69-
return content
70-
71-
72-
class S3Dataset(object):
11+
class S3Dataset:
7312
def __init__(self, s3_directory, s3_bucket='openwpm-crawls'):
7413
"""Helper class to load OpenWPM datasets from S3 using pandas
7514
@@ -134,3 +73,42 @@ def collect_content(self, content_hash, beautify=False):
13473
except IndexError:
13574
pass
13675
return content
76+
77+
class PySparkS3Dataset(S3Dataset):
78+
def __init__(self, spark_context, s3_directory,
79+
s3_bucket='openwpm-crawls'):
80+
"""Helper class to load OpenWPM datasets from S3 using PySpark
81+
82+
Parameters
83+
----------
84+
spark_context
85+
Spark context. In databricks, this is available via the `sc`
86+
variable.
87+
s3_directory : string
88+
Directory within the S3 bucket in which the dataset is saved.
89+
s3_bucket : string, optional
90+
The bucket name on S3. Defaults to `openwpm-crawls`.
91+
"""
92+
self._s3_bucket = s3_bucket
93+
self._s3_directory = s3_directory
94+
self._spark_context = spark_context
95+
self._sql_context = SQLContext(spark_context)
96+
self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % (
97+
s3_bucket, s3_directory)
98+
self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % (
99+
s3_bucket, s3_directory)
100+
101+
def read_table(self, table_name, columns=None):
102+
"""Read `table_name` from OpenWPM dataset into a pyspark dataframe.
103+
104+
Parameters
105+
----------
106+
table_name : string
107+
OpenWPM table to read
108+
columns : list of strings
109+
The set of columns to filter the parquet dataset by
110+
"""
111+
table = self._sql_context.read.parquet(self._s3_table_loc % table_name)
112+
if columns is not None:
113+
return table.select(columns)
114+
return table

0 commit comments

Comments
 (0)