|
| 1 | +from mrjob.job import MRJob |
| 2 | +from mrjob.step import MRStep |
| 3 | +# from mrjob.protocol import TextValueProtocol |
| 4 | +import datetime as dt |
| 5 | + |
| 6 | +class RevisionTimeline(MRJob): |
| 7 | + |
| 8 | + # OUTPUT_PROTOCOL = TextValueProtocol |
| 9 | + |
| 10 | + def mapperGroupRevisions(self, _, line): |
| 11 | + record = line.split('\x1e') |
| 12 | + article_info = record[0].split(' ') |
| 13 | + |
| 14 | + article_id = article_info[1] |
| 15 | + article_name = article_info[3] |
| 16 | + |
| 17 | + revision_datetime_str = article_info[4] |
| 18 | + user_name = article_info[5] |
| 19 | + user_id = article_info[6] |
| 20 | + revision_length = int(record[12].split(' ')[1]) |
| 21 | + minor_flag = int(record[11].split(' ')[1]) |
| 22 | + |
| 23 | + yield [article_id, article_name], [revision_datetime_str, revision_length, minor_flag, user_name, user_id] |
| 24 | + |
| 25 | + |
| 26 | + def reducerCreateTimeline(self, key, revisions): |
| 27 | + creation_datetime = dt.datetime.now() |
| 28 | + revisions = list(revisions) |
| 29 | + |
| 30 | + for r in revisions: |
| 31 | + revision_datetime = dt.datetime.strptime(r[0],'%Y-%m-%dT%H:%M:%SZ') |
| 32 | + if revision_datetime < creation_datetime: |
| 33 | + creation_datetime = revision_datetime |
| 34 | + |
| 35 | + |
| 36 | + # check if creation was before 01-01-03, so each article has at least a five year history |
| 37 | + |
| 38 | + if creation_datetime < dt.datetime.strptime('2003-01-01','%Y-%m-%d'): |
| 39 | + num_revisions = len(revisions) |
| 40 | + normalized_revision_timeline = [[] for i in range(num_revisions)] |
| 41 | + |
| 42 | + # creation_datetime = dt.datetime.strptime(values[1],'%Y-%m-%dT%H:%M:%SZ') |
| 43 | + # revisions = values[0] |
| 44 | + i = 0 |
| 45 | + for r in revisions: |
| 46 | + revision_datetime = dt.datetime.strptime(r[0],'%Y-%m-%dT%H:%M:%SZ') |
| 47 | + time_since_creation = revision_datetime - creation_datetime |
| 48 | + normalized_revision_timeline[i] = [time_since_creation.days, time_since_creation.seconds, r[1], r[2],r[3],r[4]] |
| 49 | + i += 1 |
| 50 | + |
| 51 | + |
| 52 | + yield key , [creation_datetime.strftime('%Y-%m-%d %H:%M:%S'), num_revisions, normalized_revision_timeline] |
| 53 | + # yield key , [revisions, creation_datetime_str, num_revisions] |
| 54 | + |
| 55 | + # def reducerCreateTimeline(self, key, values): |
| 56 | + # |
| 57 | + # num_revisions = values[2] |
| 58 | + # normalized_revision_timeline = []*num_revisions |
| 59 | + # |
| 60 | + # creation_datetime = dt.datetime.strptime(values[1],'%Y-%m-%dT%H:%M:%SZ') |
| 61 | + # revisions = values[0] |
| 62 | + # i = 0 |
| 63 | + # for r in revisions: |
| 64 | + # revision_datetime = dt.datetime.strptime(r[0],'%Y-%m-%dT%H:%M:%SZ') |
| 65 | + # time_since_creation = revision_datetime - creation_datetime |
| 66 | + # normalized_revision_timeline[i] = [time_since_creation.days, time_since_creation.seconds, r[1], r[2]] |
| 67 | + # i += 1 |
| 68 | + # yield key + (creation_datetime.strftime('%Y-%m-%d %H:%M:%S'),num_revisions), normalized_revision_timeline |
| 69 | + def steps(self): |
| 70 | + return [ |
| 71 | + MRStep(mapper=self.mapperGroupRevisions, |
| 72 | + reducer=self.reducerCreateTimeline) |
| 73 | + ] |
| 74 | + |
| 75 | +if __name__ == '__main__': |
| 76 | + RevisionTimeline.run() |
0 commit comments