Skip to content

Commit cb8a25f

Browse files
vringarStefan Zabka
authored andcommitted
Added mode parameter to PySparkS3Dataset
This parameter allows for filtering out VisitIds that are part of `incompleted_visits` or that had a command with a command_status other than "ok" since users probably shouldn't consider them for analysis This filtering functionality is extracted into the TableFilter class to be reused by other Datasets.
1 parent 33bb9a2 commit cb8a25f

File tree

4 files changed

+108
-25
lines changed

4 files changed

+108
-25
lines changed

openwpm_utils/crawlhistory.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import pyspark.sql.functions as F
2+
from pyspark.sql.types import StringType
3+
4+
reduce_to_worst_command_status = (
5+
F.when(F.array_contains("command_status", "critical"), "critical")
6+
.when(F.array_contains("command_status", "error"), "error")
7+
.when(F.array_contains("command_status", "neterror"), "neterror")
8+
.when(F.array_contains("command_status", "timeout"), "timeout")
9+
.otherwise("ok")
10+
.alias("worst_status")
11+
)
12+
13+
14+
reduce_to_best_command_status = (
15+
F.when(F.array_contains("command_status", "ok"), "ok")
16+
.when(F.array_contains("command_status", "timeout"), "timeout")
17+
.when(F.array_contains("command_status", "neterror"), "neterror")
18+
.when(F.array_contains("command_status", "error"), "error")
19+
.otherwise("critical")
20+
.alias("best_status")
21+
)
22+
23+
24+
def get_worst_status_per_visit_id(crawl_history):
25+
"""Adds column `worst_status`"""
26+
return (crawl_history.groupBy("visit_id")
27+
.agg(F.collect_list("command_status").alias("command_status"))
28+
.withColumn("worst_status",reduce_to_worst_command_status))

openwpm_utils/dataquality.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
from pyspark.sql.functions import countDistinct, col, isnan, lit, sum, count, when
1+
import pyspark.sql.functions as F
22
from pyspark.mllib.stat import Statistics
3+
from pyspark.sql.dataframe import DataFrame
4+
from pyspark.sql.functions import col, count, countDistinct, isnan, lit, sum, when
5+
6+
from openwpm_utils.crawlhistory import get_worst_status_per_visit_id
37

48

59
def count_not_null(c, nan_as_null=False):
@@ -53,3 +57,23 @@ def check_df(df, skip_null_check=True):
5357
"\nNumber of records with visit_id == -1: %d"
5458
% df.where(df.visit_id == -1).count()
5559
)
60+
61+
62+
class TableFilter:
63+
def __init__(self, incomplete_visits: DataFrame, crawl_history: DataFrame) -> None:
64+
self._incomplete_visit_ids = incomplete_visits.select("visit_id")
65+
self._failed_visit_ids = (
66+
get_worst_status_per_visit_id(crawl_history)
67+
.where(F.col("worst_status") != "ok")
68+
.select("visit_id")
69+
)
70+
71+
def clean_table(self, table: DataFrame) -> DataFrame:
72+
return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join(
73+
self._incomplete_visit_ids, "visit_id", how="leftanti"
74+
)
75+
76+
def dirty_table(self, table: DataFrame) -> DataFrame:
77+
return table.join(self._failed_visit_ids, "visit_id", how="inner").union(
78+
table.join(self._incomplete_visit_ids, "visit_id", how="inner")
79+
)

openwpm_utils/s3.py

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
import gzip
2+
from typing import List
23

34
import boto3
45
import jsbeautifier
56
import pyarrow.parquet as pq
7+
import pyspark.sql.functions as F
68
import s3fs
79
from botocore.exceptions import ClientError
810
from pyarrow.filesystem import S3FSWrapper # noqa
9-
from pyspark.sql import SQLContext
11+
from pyspark import SparkContext
12+
from pyspark.sql import DataFrame, SQLContext
1013

11-
class S3Dataset:
12-
def __init__(self, s3_directory, s3_bucket='openwpm-crawls'):
14+
from openwpm_utils.crawlhistory import get_worst_status_per_visit_id
15+
from openwpm_utils.dataquality import TableFilter
16+
17+
18+
class S3Dataset(object):
19+
def __init__(self, s3_directory, s3_bucket="openwpm-crawls"):
1320
"""Helper class to load OpenWPM datasets from S3 using pandas
1421
1522
This dataset wrapper is safe to use by spark worker processes, as it
@@ -38,30 +45,33 @@ def read_table(self, table_name, columns=None):
3845
columns : list of strings
3946
The set of columns to filter the parquet dataset by
4047
"""
41-
return pq.ParquetDataset(
42-
self._s3_table_loc % table_name,
43-
filesystem=self._s3fs,
44-
metadata_nthreads=4
45-
).read(use_pandas_metadata=True, columns=columns).to_pandas()
48+
return (
49+
pq.ParquetDataset(
50+
self._s3_table_loc % table_name,
51+
filesystem=self._s3fs,
52+
metadata_nthreads=4,
53+
)
54+
.read(use_pandas_metadata=True, columns=columns)
55+
.to_pandas()
56+
)
4657

4758
def collect_content(self, content_hash, beautify=False):
4859
"""Collect content by directly connecting to S3 via boto3"""
49-
s3 = boto3.client('s3')
60+
s3 = boto3.client("s3")
5061
try:
5162
obj = s3.get_object(
52-
Bucket=self._s3_bucket,
53-
Key=self._content_key % content_hash
63+
Bucket=self._s3_bucket, Key=self._content_key % content_hash
5464
)
5565
body = obj["Body"]
5666
compressed_content = body.read()
5767
body.close()
5868
except ClientError as e:
59-
if e.response['Error']['Code'] != 'NoSuchKey':
69+
if e.response["Error"]["Code"] != "NoSuchKey":
6070
raise
6171
else:
6272
return None
6373

64-
with gzip.GzipFile(fileobj=compressed_content, mode='r') as f:
74+
with gzip.GzipFile(fileobj=compressed_content, mode="r") as f:
6575
content = f.read()
6676

6777
if content is None or content == "":
@@ -74,9 +84,11 @@ def collect_content(self, content_hash, beautify=False):
7484
pass
7585
return content
7686

87+
7788
class PySparkS3Dataset(S3Dataset):
78-
def __init__(self, spark_context, s3_directory,
79-
s3_bucket='openwpm-crawls'):
89+
def __init__(
90+
self, spark_context, s3_directory: str, s3_bucket: str = "openwpm-crawls"
91+
):
8092
"""Helper class to load OpenWPM datasets from S3 using PySpark
8193
8294
Parameters
@@ -89,16 +101,17 @@ def __init__(self, spark_context, s3_directory,
89101
s3_bucket : string, optional
90102
The bucket name on S3. Defaults to `openwpm-crawls`.
91103
"""
92-
self._s3_bucket = s3_bucket
93-
self._s3_directory = s3_directory
104+
super().__init__(s3_directory, s3_bucket)
94105
self._spark_context = spark_context
95106
self._sql_context = SQLContext(spark_context)
96-
self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % (
97-
s3_bucket, s3_directory)
98-
self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % (
99-
s3_bucket, s3_directory)
107+
self._s3_table_loc = f"s3a://{self._s3_table_loc}"
108+
incomplete_visits = self.read_table("incomplete_visits", mode="all")
109+
crawl_history = self.read_table("crawl_history", mode="all")
110+
self._filter = TableFilter(incomplete_visits, crawl_history)
100111

101-
def read_table(self, table_name, columns=None):
112+
def read_table(
113+
self, table_name: str, columns: List[str] = None, mode: str = "successful"
114+
):
102115
"""Read `table_name` from OpenWPM dataset into a pyspark dataframe.
103116
104117
Parameters
@@ -107,8 +120,26 @@ def read_table(self, table_name, columns=None):
107120
OpenWPM table to read
108121
columns : list of strings
109122
The set of columns to filter the parquet dataset by
123+
mode : string
124+
The valid values are "successful", "failed", "all"
125+
Success is determined per visit_id. A visit_id is failed
126+
if one of it's commands failed or if it's in the interrupted table
110127
"""
111128
table = self._sql_context.read.parquet(self._s3_table_loc % table_name)
129+
130+
if mode == "all":
131+
table = table
132+
elif mode == "failed":
133+
table = self._filter.dirty_table(table)
134+
elif mode == "successful":
135+
table = self._filter.clean_table(table)
136+
else:
137+
raise AssertionError(
138+
f"Mode was ${mode},"
139+
"allowed modes are 'all', 'failed' and 'successful'"
140+
)
141+
112142
if columns is not None:
113-
return table.select(columns)
143+
table = table.select(columns)
144+
114145
return table

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
name='openwpm-utils',
1212
license='MPL 2.0',
1313
url='https://github.com/mozilla/openwpm-utils',
14-
version='0.2.0',
14+
version='0.3.0',
1515
packages=['openwpm_utils'],
1616

1717
# Dependencies

0 commit comments

Comments
 (0)