|
1 | 1 | import pyspark.sql.functions as F |
2 | 2 | from pyspark.sql.types import StringType |
| 3 | +import json |
| 4 | + |
3 | 5 |
|
4 | 6 |
|
5 | 7 | def reduce_to_worst_command_status(statuses): |
@@ -46,3 +48,84 @@ def get_worst_status_per_visit_id(crawl_history): |
46 | 48 | ) |
47 | 49 |
|
48 | 50 |
|
| 51 | +# TODO: This needs a name that expresses that we are giving general stats about the |
| 52 | +# way the crawl ran and not it's specific data |
| 53 | +def display_crawl_results(crawl_history, interrupted_visits): |
| 54 | + """ |
| 55 | + Analyze crawl_history and interrupted_visits to display general |
| 56 | + success statistics |
| 57 | + This function should be given the all entries in the crawl_history and |
| 58 | + interrupted_visits tableq |
| 59 | + """ |
| 60 | + crawl_history.groupBy("command").count().show() |
| 61 | + |
| 62 | + total_num_command_sequences = crawl_history.groupBy("visit_id").count() |
| 63 | + visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) |
| 64 | + print( |
| 65 | + "Percentage of command_sequence that didn't complete successfully %0.2f%%" |
| 66 | + % ( |
| 67 | + visit_id_and_worst_status.where(F.col("worst_status") != "ok").count() |
| 68 | + / float(total_num_command_sequences) |
| 69 | + * 100 |
| 70 | + ) |
| 71 | + ) |
| 72 | + net_error_count = visit_id_and_worst_status.where( |
| 73 | + F.col("worst_status") == "neterror" |
| 74 | + ).count() |
| 75 | + print( |
| 76 | + "There were a total of %d neterrors(%0.2f%% of the all command_sequences)" |
| 77 | + % (net_error_count, net_error_count / float(total_num_command_sequences) * 100) |
| 78 | + ) |
| 79 | + timeout_count = visit_id_and_worst_status.where( |
| 80 | + F.col("worst_status") == "timeout" |
| 81 | + ).count() |
| 82 | + print( |
| 83 | + "There were a total of %d timeouts(%0.2f%% of the all command_sequences)" |
| 84 | + % (timeout_count, timeout_count / float(total_num_command_sequences) * 100) |
| 85 | + ) |
| 86 | + |
| 87 | + error_count = visit_id_and_worst_status.where( |
| 88 | + F.col("worst_status") == "error" |
| 89 | + ).count() |
| 90 | + print( |
| 91 | + "There were a total of %d errors(%0.2f%% of the all command_sequences)" |
| 92 | + % (error_count, error_count / float(total_num_command_sequences) * 100) |
| 93 | + ) |
| 94 | + |
| 95 | + print( |
| 96 | + f"A total of ${interrupted_visits.count()} were interrupted." |
| 97 | + f"This represents ${interrupted_visits.count()/ float(total_num_command_sequences)* 100} % of the entire crawl" |
| 98 | + ) |
| 99 | + |
| 100 | + def extract_website_from_arguments(arguments): |
| 101 | + """Given the arguments of a get_command this function returns which website was visited""" |
| 102 | + return json.loads(arguments)["url"] |
| 103 | + |
| 104 | + udf_extract_website_from_arguments = F.udf( |
| 105 | + extract_website_from_arguments, StringType() |
| 106 | + ) |
| 107 | + |
| 108 | + visit_id_to_website = crawl_history.where( |
| 109 | + F.col("command") == "GetCommand" |
| 110 | + ).withColumn("website", udf_extract_website_from_arguments("arguments")) |
| 111 | + visit_id_to_website = visit_id_to_website[["visit_id", "website"]] |
| 112 | + |
| 113 | + visit_id_website_status = visit_id_and_worst_status.join( |
| 114 | + visit_id_to_website, "visit_id" |
| 115 | + ) |
| 116 | + multiple_successes = ( |
| 117 | + visit_id_website_status.where(F.col("worst_status") == "ok") |
| 118 | + .join(interrupted_visits, "visit_id", how="leftanti") |
| 119 | + .groupBy("website") |
| 120 | + .count() |
| 121 | + .filter("count > 1") |
| 122 | + .orderBy(F.desc("count")) |
| 123 | + ) |
| 124 | + |
| 125 | + print( |
| 126 | + f"There were {multiple_successes.count()} websites that were successfully visited multiple times" |
| 127 | + ) |
| 128 | + multiple_successes.groupBy( |
| 129 | + F.col("count").alias("Number of successes") |
| 130 | + ).count().show() |
| 131 | + multiple_successes.filter("count > 2").show() |
0 commit comments