From 03c8cc9c456e5400ac93f7acb50136ab90bb23d7 Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Fri, 24 Jul 2020 05:04:55 +0000 Subject: [PATCH 01/16] changed parser and serialization code to use the same json format to represent models, added auctionprice model and corrected the behavior of query2, corrected the behavior of query 0 and 1 to align with the nexmark specification. created fieldname file to map the fieldname with string literals. created nexmark_query_util to put transforms that gets reused across different queries. --- .../nexmark/models/auction_price.py | 29 ++++++++ .../benchmarks/nexmark/models/field_name.py | 40 +++++++++++ .../nexmark/models/nexmark_model.py | 38 ++++------ .../benchmarks/nexmark/nexmark_util.py | 71 +++++++++++++++++++ .../nexmark/queries/nexmark_query_util.py | 32 +++++++++ .../benchmarks/nexmark/queries/query0.py | 7 +- .../benchmarks/nexmark/queries/query1.py | 14 ++-- .../benchmarks/nexmark/queries/query2.py | 25 ++++--- 8 files changed, 210 insertions(+), 46 deletions(-) create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py new file mode 100644 index 000000000000..fc09331c312d --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Result of Query2.""" +import json + + +class AuctionPrice(object): + + def __init__(self, auction, price): + self.auction = auction + self.price = price + + def __repr__(self): + return json.dumps(self.__dict__) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py new file mode 100644 index 000000000000..bb41fd6d523f --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" Field names for de-serializing json representation of Models +""" + + +class FieldName: + id = 'id' + name = 'name' + emailAddress = 'emailAddress' + creditCard = 'creditCard' + city = "city" + state = "state" + dateTime = "dateTime" + extra = "extra" + itemName = "itemName" + description = "description" + initialBid = "initialBid" + reserve = "reserve" + expires = "expires" + seller = "seller" + category = "category" + auction = "auction" + bidder = "bidder" + price = "price" diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 2cd7bf0acf2e..14d4f974ca4d 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,27 +26,25 @@ - The bid on an item for auction (Bid). """ +import json class Person(object): "Author of an auction or a bid." def __init__( - self, id, name, email, credit_card, city, state, timestamp, extra=None): + self, id, name, email, credit_card, city, state, date_time, extra=None): self.id = id self.name = name - self.email = email # key - self.credit_card = credit_card + self.emailAddress = email # key + self.creditCard = credit_card self.city = city self.state = state - self.timestamp = timestamp + self.dateTime = date_time self.extra = extra def __repr__(self): - return 'Person({id}, {email})'.format( - **{ - 'id': self.id, 'email': self.email - }) + return json.dumps(self.__dict__) class Auction(object): @@ -59,41 +57,35 @@ def __init__( description, initial_bid, reserve_price, - timestamp, + date_time, expires, seller, category, extra=None): self.id = id - self.item_name = item_name # key + self.itemName = item_name # key self.description = description - self.initial_bid = initial_bid - self.reserve_price = reserve_price - self.timestamp = timestamp + self.initialBid = initial_bid + self.reserve = reserve_price + self.dateTime = date_time self.expires = expires self.seller = seller self.category = category self.extra = extra def __repr__(self): - return 'Auction({id}, {item_name})'.format( - **{ - 'id': self.id, 'item_name': self.item_name - }) + return json.dumps(self.__dict__) class Bid(object): "A bid for an item for auction." - def __init__(self, auction, bidder, price, timestamp, extra=None): + def __init__(self, auction, bidder, price, date_time, extra=None): self.auction = auction # key self.bidder = bidder self.price = price - self.timestamp = timestamp + self.dateTime = date_time self.extra = extra def __repr__(self): - return 'Bid({auction}, {bidder}, {price})'.format( - **{ - 'auction': self.auction, 'bidder': self.bidder, 'price': self.price - }) + return json.dumps(self.__dict__) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 9079596a1a2a..8da5eecccaf6 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -39,9 +39,11 @@ import logging import threading +import json import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldName _LOGGER = logging.getLogger(__name__) @@ -103,6 +105,75 @@ def process(self, elem): yield event +class ParseJsonEvnetFn(beam.DoFn): + """Parses the raw event info into a Python objects. + + Each event line has the following format: + + person: {id,name,email,credit_card,city, \ + state,timestamp,extra} + auction: {id,item_name, description,initial_bid, \ + reserve_price,timestamp,expires,seller,category,extra} + bid: {auction,bidder,price,timestamp,extra} + + For example: + + {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\ + "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\ + "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["} + + {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\ + "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\ + "expires":1528098840451,"seller":1000,"category":13,"extra":"zcurskupiz"} + + {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\ + "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"} + """ + def process(self, elem): + json_dict = json.loads(elem) + if type(json_dict[FieldName.dateTime]) is dict: + json_dict[FieldName.dateTime] = json_dict[FieldName.dateTime]['millis'] + if FieldName.name in json_dict: + yield nexmark_model.Person(json_dict[FieldName.id], + json_dict[FieldName.name], + json_dict[FieldName.emailAddress], + json_dict[FieldName.creditCard], + json_dict[FieldName.city], + json_dict[FieldName.state], + json_dict[FieldName.dateTime], + json_dict[FieldName.extra]) + elif FieldName.itemName in json_dict: + yield nexmark_model.Auction(json_dict[FieldName.id], + json_dict[FieldName.itemName], + json_dict[FieldName.description], + json_dict[FieldName.initialBid], + json_dict[FieldName.reserve], + json_dict[FieldName.dateTime], + json_dict[FieldName.expires], + json_dict[FieldName.seller], + json_dict[FieldName.category], + json_dict[FieldName.extra]) + elif FieldName.auction in json_dict: + yield nexmark_model.Bid(json_dict[FieldName.auction], + json_dict[FieldName.bidder], + json_dict[FieldName.price], + json_dict[FieldName.dateTime], + json_dict[FieldName.extra]) + else: + raise ValueError('Invalid event: %s.' % str(json_dict)) + + +class CountAndLog(beam.PTransform): + def expand(self, pcoll): + return (pcoll | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) + + +def log_count_info(count): + logging.info('Query resulted in %d results', count) + return count + + def display(elm): logging.debug(elm) return elm diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py new file mode 100644 index 000000000000..8724d81a33d9 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities for working with NEXmark data stream.""" + +import logging + +import apache_beam as beam +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model + + +def is_bid(event): + return isinstance(event, nexmark_model.Bid) + + +class JustBids(beam.PTransform): + def expand(self, pcoll): + return pcoll | "IsBid" >> beam.Filter(is_bid) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index c2c0d509c0f4..2245a78485ce 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -30,8 +30,9 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn +from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn -def load(raw_events, query_args=None): - return raw_events | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) # pylint: disable=expression-not-assigned +def load(events, query_args=None): + return (events | 'serialization' >> beam.Map(repr) + | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py index 95f3f162fbdf..a807259596c8 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py @@ -29,20 +29,16 @@ import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -from apache_beam.testing.benchmarks.nexmark.nexmark_util import display +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util def load(raw_events, query_args=None): return ( raw_events - | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) - | 'FilterInBids' >> - beam.Filter(lambda event: isinstance(event, nexmark_model.Bid)) + | nexmark_query_util.JustBids() | 'ConvertToEuro' >> beam.Map( lambda bid: nexmark_model.Bid( bid.auction, - bid.bidder, (float(bid.price) * 89) // 100, - bid.timestamp, - bid.extra)) - | 'DisplayQuery1' >> beam.Map(display)) # pylint: disable=expression-not-assigned + bid.bidder, (bid.price * 89) // 100, + bid.dateTime, + bid.extra))) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py index 51504a600dce..a104aa0c4857 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py @@ -29,17 +29,20 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -from apache_beam.testing.benchmarks.nexmark.nexmark_util import display +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +from apache_beam.testing.benchmarks.nexmark.models import auction_price -def load(raw_events, metadata=None): +def load(events, metadata=None): return ( - raw_events - | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) - | 'FilterInAuctionsWithSelectedId' >> beam.Filter( - lambda event: ( - isinstance(event, nexmark_model.Auction) and event.id == metadata. - get('auction_id'))) - | 'DisplayQuery2' >> beam.Map(display)) # pylint: disable=expression-not-assigned + events + | nexmark_query_util.JustBids() + | beam.Filter(lambda bid: bid.auction % metadata.get('auction_skip') == 0) + | 'project' >> beam.Map( + lambda bid: auction_price.AuctionPrice( + bid.auction, bid.price))) + # | 'FilterInAuctionsWithSelectedId' >> beam.Filter( + # lambda event: ( + # isinstance(event, nexmark_model.Auction) and event.id == metadata. + # get('auction_id'))) + # | 'DisplayQuery2' >> beam.Map(display)) # pylint: disable=expression-not-assigned From ff865ef882be9d8a06f7f4a494b0cad501bbeafd Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Fri, 24 Jul 2020 17:33:18 +0000 Subject: [PATCH 02/16] changed repr and aligned code style. Changed repr for all models to eliminate spaces in their json string representation; aligned code style for some files --- .../nexmark/models/auction_price.py | 3 +- .../benchmarks/nexmark/models/field_name.py | 36 +++++----- .../nexmark/models/nexmark_model.py | 6 +- .../benchmarks/nexmark/nexmark_util.py | 66 ++++++++++--------- .../nexmark/queries/nexmark_query_util.py | 6 +- .../benchmarks/nexmark/queries/query0.py | 6 +- .../benchmarks/nexmark/queries/query2.py | 13 ++-- 7 files changed, 70 insertions(+), 66 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py index fc09331c312d..7cb60ec1bd54 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -20,10 +20,9 @@ class AuctionPrice(object): - def __init__(self, auction, price): self.auction = auction self.price = price def __repr__(self): - return json.dumps(self.__dict__) + return json.dumps(self.__dict__, separators=(',', ':')) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py index bb41fd6d523f..c564d8689cc7 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py @@ -20,21 +20,21 @@ class FieldName: - id = 'id' - name = 'name' - emailAddress = 'emailAddress' - creditCard = 'creditCard' - city = "city" - state = "state" - dateTime = "dateTime" - extra = "extra" - itemName = "itemName" - description = "description" - initialBid = "initialBid" - reserve = "reserve" - expires = "expires" - seller = "seller" - category = "category" - auction = "auction" - bidder = "bidder" - price = "price" + ID = 'id' + NAME = 'name' + EMAIL_ADDRESS = 'emailAddress' + CREDIT_CARD = 'creditCard' + CITY = "city" + STATE = "state" + DATE_TIME = "dateTime" + EXTRA = "extra" + ITEM_NAME = "itemName" + DESCRIPTION = "description" + INITIAL_BID = "initialBid" + RESERVE = "reserve" + EXPIRES = "expires" + SELLER = "seller" + CATEGORY = "category" + AUCTION = "auction" + BIDDER = "bidder" + PRICE = "price" diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 14d4f974ca4d..94a98cdd0a26 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -44,7 +44,7 @@ def __init__( self.extra = extra def __repr__(self): - return json.dumps(self.__dict__) + return json.dumps(self.__dict__, separators=(',', ':')) class Auction(object): @@ -74,7 +74,7 @@ def __init__( self.extra = extra def __repr__(self): - return json.dumps(self.__dict__) + return json.dumps(self.__dict__, separators=(',', ':')) class Bid(object): @@ -88,4 +88,4 @@ def __init__(self, auction, bidder, price, date_time, extra=None): self.extra = extra def __repr__(self): - return json.dumps(self.__dict__) + return json.dumps(self.__dict__, separators=(',', ':')) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 8da5eecccaf6..f2b2fbd33084 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -124,49 +124,53 @@ class ParseJsonEvnetFn(beam.DoFn): {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\ "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\ - "expires":1528098840451,"seller":1000,"category":13,"extra":"zcurskupiz"} + "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"} {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\ "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"} """ def process(self, elem): json_dict = json.loads(elem) - if type(json_dict[FieldName.dateTime]) is dict: - json_dict[FieldName.dateTime] = json_dict[FieldName.dateTime]['millis'] - if FieldName.name in json_dict: - yield nexmark_model.Person(json_dict[FieldName.id], - json_dict[FieldName.name], - json_dict[FieldName.emailAddress], - json_dict[FieldName.creditCard], - json_dict[FieldName.city], - json_dict[FieldName.state], - json_dict[FieldName.dateTime], - json_dict[FieldName.extra]) - elif FieldName.itemName in json_dict: - yield nexmark_model.Auction(json_dict[FieldName.id], - json_dict[FieldName.itemName], - json_dict[FieldName.description], - json_dict[FieldName.initialBid], - json_dict[FieldName.reserve], - json_dict[FieldName.dateTime], - json_dict[FieldName.expires], - json_dict[FieldName.seller], - json_dict[FieldName.category], - json_dict[FieldName.extra]) - elif FieldName.auction in json_dict: - yield nexmark_model.Bid(json_dict[FieldName.auction], - json_dict[FieldName.bidder], - json_dict[FieldName.price], - json_dict[FieldName.dateTime], - json_dict[FieldName.extra]) + if type(json_dict[FieldName.DATE_TIME]) is dict: + json_dict[FieldName.DATE_TIME] = json_dict[FieldName.DATE_TIME]['millis'] + if FieldName.NAME in json_dict: + yield nexmark_model.Person( + json_dict[FieldName.ID], + json_dict[FieldName.NAME], + json_dict[FieldName.EMAIL_ADDRESS], + json_dict[FieldName.CREDIT_CARD], + json_dict[FieldName.CITY], + json_dict[FieldName.STATE], + json_dict[FieldName.DATE_TIME], + json_dict[FieldName.EXTRA]) + elif FieldName.ITEM_NAME in json_dict: + yield nexmark_model.Auction( + json_dict[FieldName.ID], + json_dict[FieldName.ITEM_NAME], + json_dict[FieldName.DESCRIPTION], + json_dict[FieldName.INITIAL_BID], + json_dict[FieldName.RESERVE], + json_dict[FieldName.DATE_TIME], + json_dict[FieldName.EXPIRES], + json_dict[FieldName.SELLER], + json_dict[FieldName.CATEGORY], + json_dict[FieldName.EXTRA]) + elif FieldName.AUCTION in json_dict: + yield nexmark_model.Bid( + json_dict[FieldName.AUCTION], + json_dict[FieldName.BIDDER], + json_dict[FieldName.PRICE], + json_dict[FieldName.DATE_TIME], + json_dict[FieldName.EXTRA]) else: raise ValueError('Invalid event: %s.' % str(json_dict)) class CountAndLog(beam.PTransform): def expand(self, pcoll): - return (pcoll | "Count" >> beam.combiners.Count.Globally() - | "Log" >> beam.Map(log_count_info)) + return ( + pcoll | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) def log_count_info(count): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index 8724d81a33d9..f07fd8eaccfc 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -17,7 +17,6 @@ """Utilities for working with NEXmark data stream.""" -import logging import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model @@ -27,6 +26,11 @@ def is_bid(event): return isinstance(event, nexmark_model.Bid) +class ToString(beam.PTransform): + def expand(self, pcoll): + return pcoll | beam.Map(repr) + + class JustBids(beam.PTransform): def expand(self, pcoll): return pcoll | "IsBid" >> beam.Filter(is_bid) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index 2245a78485ce..918926def747 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -31,8 +31,10 @@ import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util def load(events, query_args=None): - return (events | 'serialization' >> beam.Map(repr) - | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) + return ( + events | 'serialization' >> nexmark_query_util.ToString() + | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py index a104aa0c4857..edd6ec809c25 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py @@ -37,12 +37,7 @@ def load(events, metadata=None): return ( events | nexmark_query_util.JustBids() - | beam.Filter(lambda bid: bid.auction % metadata.get('auction_skip') == 0) - | 'project' >> beam.Map( - lambda bid: auction_price.AuctionPrice( - bid.auction, bid.price))) - # | 'FilterInAuctionsWithSelectedId' >> beam.Filter( - # lambda event: ( - # isinstance(event, nexmark_model.Auction) and event.id == metadata. - # get('auction_id'))) - # | 'DisplayQuery2' >> beam.Map(display)) # pylint: disable=expression-not-assigned + | 'filter_by_skip' >> + beam.Filter(lambda bid: bid.auction % metadata.get('auction_skip') == 0) + | 'project' >> + beam.Map(lambda bid: auction_price.AuctionPrice(bid.auction, bid.price))) From 5b5f6ce6af834e1832b315661d162b8b2d51370f Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Tue, 28 Jul 2020 07:54:17 +0000 Subject: [PATCH 03/16] refactoring code to have a better code practice --- .../nexmark/models/auction_price.py | 4 +- .../benchmarks/nexmark/models/field_name.py | 2 +- .../nexmark/models/nexmark_model.py | 8 +-- .../benchmarks/nexmark/nexmark_util.py | 71 +++++++++++-------- .../nexmark/queries/nexmark_query_util.py | 9 ++- .../benchmarks/nexmark/queries/query0.py | 3 +- 6 files changed, 54 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py index 7cb60ec1bd54..851e7c0a559f 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -16,7 +16,7 @@ # """Result of Query2.""" -import json +from apache_beam.testing.benchmarks.nexmark import nexmark_util class AuctionPrice(object): @@ -25,4 +25,4 @@ def __init__(self, auction, price): self.price = price def __repr__(self): - return json.dumps(self.__dict__, separators=(',', ':')) + return nexmark_util.model_to_json(self) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py index c564d8689cc7..5d583bca27f8 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py @@ -19,7 +19,7 @@ """ -class FieldName: +class FieldNames: ID = 'id' NAME = 'name' EMAIL_ADDRESS = 'emailAddress' diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 94a98cdd0a26..9a5048301177 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,7 +26,7 @@ - The bid on an item for auction (Bid). """ -import json +from apache_beam.testing.benchmarks.nexmark import nexmark_util class Person(object): @@ -44,7 +44,7 @@ def __init__( self.extra = extra def __repr__(self): - return json.dumps(self.__dict__, separators=(',', ':')) + return nexmark_util.model_to_json(self) class Auction(object): @@ -74,7 +74,7 @@ def __init__( self.extra = extra def __repr__(self): - return json.dumps(self.__dict__, separators=(',', ':')) + return nexmark_util.model_to_json(self) class Bid(object): @@ -88,4 +88,4 @@ def __init__(self, auction, bidder, price, date_time, extra=None): self.extra = extra def __repr__(self): - return json.dumps(self.__dict__, separators=(',', ':')) + return nexmark_util.model_to_json(self) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index f2b2fbd33084..26529818fa4d 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -42,8 +42,9 @@ import json import apache_beam as beam +from apache_beam.utils.timestamp import Timestamp from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldName +from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames _LOGGER = logging.getLogger(__name__) @@ -131,37 +132,39 @@ class ParseJsonEvnetFn(beam.DoFn): """ def process(self, elem): json_dict = json.loads(elem) - if type(json_dict[FieldName.DATE_TIME]) is dict: - json_dict[FieldName.DATE_TIME] = json_dict[FieldName.DATE_TIME]['millis'] - if FieldName.NAME in json_dict: + if type(json_dict[FieldNames.DATE_TIME]) is dict: + json_dict[FieldNames.DATE_TIME] = json_dict[FieldNames.DATE_TIME]['millis'] + if FieldNames.NAME in json_dict: yield nexmark_model.Person( - json_dict[FieldName.ID], - json_dict[FieldName.NAME], - json_dict[FieldName.EMAIL_ADDRESS], - json_dict[FieldName.CREDIT_CARD], - json_dict[FieldName.CITY], - json_dict[FieldName.STATE], - json_dict[FieldName.DATE_TIME], - json_dict[FieldName.EXTRA]) - elif FieldName.ITEM_NAME in json_dict: + json_dict[FieldNames.ID], + json_dict[FieldNames.NAME], + json_dict[FieldNames.EMAIL_ADDRESS], + json_dict[FieldNames.CREDIT_CARD], + json_dict[FieldNames.CITY], + json_dict[FieldNames.STATE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) + elif FieldNames.ITEM_NAME in json_dict: + if type(json_dict[FieldNames.EXPIRES]) is dict: + json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis'] yield nexmark_model.Auction( - json_dict[FieldName.ID], - json_dict[FieldName.ITEM_NAME], - json_dict[FieldName.DESCRIPTION], - json_dict[FieldName.INITIAL_BID], - json_dict[FieldName.RESERVE], - json_dict[FieldName.DATE_TIME], - json_dict[FieldName.EXPIRES], - json_dict[FieldName.SELLER], - json_dict[FieldName.CATEGORY], - json_dict[FieldName.EXTRA]) - elif FieldName.AUCTION in json_dict: + json_dict[FieldNames.ID], + json_dict[FieldNames.ITEM_NAME], + json_dict[FieldNames.DESCRIPTION], + json_dict[FieldNames.INITIAL_BID], + json_dict[FieldNames.RESERVE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + millis_to_timestamp(json_dict[FieldNames.EXPIRES]), + json_dict[FieldNames.SELLER], + json_dict[FieldNames.CATEGORY], + json_dict[FieldNames.EXTRA]) + elif FieldNames.AUCTION in json_dict: yield nexmark_model.Bid( - json_dict[FieldName.AUCTION], - json_dict[FieldName.BIDDER], - json_dict[FieldName.PRICE], - json_dict[FieldName.DATE_TIME], - json_dict[FieldName.EXTRA]) + json_dict[FieldNames.AUCTION], + json_dict[FieldNames.BIDDER], + json_dict[FieldNames.PRICE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) else: raise ValueError('Invalid event: %s.' % str(json_dict)) @@ -181,3 +184,13 @@ def log_count_info(count): def display(elm): logging.debug(elm) return elm + + +def model_to_json(model): + return json.dumps(model.__dict__, separators=(',', ':')) + + +def millis_to_timestamp(millis: int): + second = millis // 1000 + micro_second = millis % 1000 * 1000 + return Timestamp(second, micro_second) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index f07fd8eaccfc..dece9b04b5f6 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -26,11 +26,10 @@ def is_bid(event): return isinstance(event, nexmark_model.Bid) -class ToString(beam.PTransform): - def expand(self, pcoll): - return pcoll | beam.Map(repr) - - class JustBids(beam.PTransform): def expand(self, pcoll): return pcoll | "IsBid" >> beam.Filter(is_bid) + + +def auction_or_bid(event): + return isinstance(event, nexmark_model.Bid) or isinstance(event, nexmark_model.Auction) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index 918926def747..8df56069f3c1 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -31,10 +31,9 @@ import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn -from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util def load(events, query_args=None): return ( - events | 'serialization' >> nexmark_query_util.ToString() + events | 'serialization' >> beam.Map(repr) | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) From d6002dc3a73874088326f45a7cb084b795fa037d Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Tue, 28 Jul 2020 22:27:01 +0000 Subject: [PATCH 04/16] implemented coder for all models, query9 --- .../benchmarks/nexmark/models/auction_bid.py | 56 ++++++ .../nexmark/models/auction_price.py | 27 +++ .../nexmark/models/nexmark_model.py | 114 ++++++++++++ .../benchmarks/nexmark/nexmark_util.py | 18 +- .../nexmark/queries/nexmark_query_util.py | 24 +++ .../benchmarks/nexmark/queries/query9.py | 27 +++ .../nexmark/queries/winning_bids.py | 172 ++++++++++++++++++ 7 files changed, 435 insertions(+), 3 deletions(-) create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py new file mode 100644 index 000000000000..59bc9b29c3a6 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Result of WinningBid transform.""" +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.testing.benchmarks.nexmark import nexmark_util +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model + + +class AuctionBidCoder(FastCoder): + def _create_impl(self): + return AuctionBidCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + +class AuctionBid(object): + CODER = AuctionBidCoder() + + def __init__(self, auction, bid): + self.auction = auction + self.bid = bid + + def __repr__(self): + return nexmark_util.model_to_json(self) + + +class AuctionBidCoderImpl(coder_impl.StreamCoderImpl): + _auction_coder_impl = nexmark_model.AuctionCoderImpl() + _bid_coder_Impl = nexmark_model.BidCoderImpl() + + def encode_to_stream(self, value: AuctionBid, stream, nested): + self._auction_coder_impl.encode_to_stream(value.auction, stream, True) + self._bid_coder_Impl.encode_to_stream(value.bid, stream, True) + + def decode_from_stream(self, stream, nested): + auction = self._auction_coder_impl.decode_from_stream(stream, True) + bid = self._bid_coder_Impl.decode_from_stream(stream, True) + return AuctionBid(auction, bid) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py index 851e7c0a559f..7f081be92591 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -16,13 +16,40 @@ # """Result of Query2.""" +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder from apache_beam.testing.benchmarks.nexmark import nexmark_util +class AuctionPriceCoder(FastCoder): + def _create_impl(self): + return AuctionPriceCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + class AuctionPrice(object): + CODER = AuctionPriceCoder() + def __init__(self, auction, price): self.auction = auction self.price = price def __repr__(self): return nexmark_util.model_to_json(self) + + +class AuctionPriceCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + + def encode_to_stream(self, value: AuctionPrice, stream, nested): + self._int_coder_impl.encode_to_stream(value.auction, stream, True) + self._int_coder_impl.encode_to_stream(value.price, stream, True) + + def decode_from_stream(self, stream, nested): + + auction = self._int_coder_impl.decode_from_stream(stream, True) + price = self._int_coder_impl.decode_from_stream(stream, True) + return AuctionPrice(auction, price) \ No newline at end of file diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 9a5048301177..44bd19a23ac1 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,11 +26,23 @@ - The bid on an item for auction (Bid). """ +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder from apache_beam.testing.benchmarks.nexmark import nexmark_util +class PersonCoder(FastCoder): + def _create_impl(self): + return PersonCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + class Person(object): "Author of an auction or a bid." + CODER = PersonCoder() def __init__( self, id, name, email, credit_card, city, state, date_time, extra=None): @@ -47,8 +59,21 @@ def __repr__(self): return nexmark_util.model_to_json(self) +class AuctionCoder(FastCoder): + def to_type_hint(self): + pass + + def _create_impl(self): + return AuctionCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + class Auction(object): "Item for auction." + CODER = AuctionCoder() def __init__( self, @@ -77,8 +102,18 @@ def __repr__(self): return nexmark_util.model_to_json(self) +class BidCoder(FastCoder): + def _create_impl(self): + return BidCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + class Bid(object): "A bid for an item for auction." + CODER = BidCoder() def __init__(self, auction, bidder, price, date_time, extra=None): self.auction = auction # key @@ -89,3 +124,82 @@ def __init__(self, auction, bidder, price, date_time, extra=None): def __repr__(self): return nexmark_util.model_to_json(self) + + +class AuctionCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = coder_impl.BytesCoderImpl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value: Auction, stream, nested): + self._int_coder_impl.encode_to_stream(value.id, stream, True) + self._str_coder_impl.encode_to_stream(value.itemName, stream, True) + self._str_coder_impl.encode_to_stream(value.description, stream, True) + self._int_coder_impl.encode_to_stream(value.initialBid, stream, True) + self._int_coder_impl.encode_to_stream(value.reserve, stream, True) + self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._time_coder_impl.encode_to_stream(value.expires, stream, True) + self._int_coder_impl.encode_to_stream(value.seller, stream, True) + self._int_coder_impl.encode_to_stream(value.category, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + id = self._int_coder_impl.decode_from_stream(stream, True) + item_name = self._str_coder_impl.decode_from_stream(stream, True) + description = self._str_coder_impl.decode_from_stream(stream, True) + initial_bid = self._int_coder_impl.decode_from_stream(stream, True) + reserve = self._int_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + expires = self._time_coder_impl.decode_from_stream(stream, True) + seller = self._int_coder_impl.decode_from_stream(stream, True) + category = self._int_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Auction(id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra) + + +class BidCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = coder_impl.BytesCoderImpl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value: Bid, stream, nested): + self._int_coder_impl.encode_to_stream(value.auction, stream, True) + self._int_coder_impl.encode_to_stream(value.bidder, stream, True) + self._int_coder_impl.encode_to_stream(value.price, stream, True) + self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + auction = self._int_coder_impl.decode_from_stream(stream, True) + bidder = self._int_coder_impl.decode_from_stream(stream, True) + price = self._int_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Bid(auction, bidder, price, date_time, extra) + + +class PersonCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = coder_impl.BytesCoderImpl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value: Person, stream, nested): + self._int_coder_impl.encode_to_stream(value.id, stream, True) + self._str_coder_impl.encode_to_stream(value.name, stream, True) + self._str_coder_impl.encode_to_stream(value.emailAddress, stream, True) + self._str_coder_impl.encode_to_stream(value.creditCard, stream, True) + self._str_coder_impl.encode_to_stream(value.city, stream, True) + self._str_coder_impl.encode_to_stream(value.state, stream, True) + self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + id = self._int_coder_impl.decode_from_stream(stream, True) + name = self._str_coder_impl.decode_from_stream(stream, True) + email = self._str_coder_impl.decode_from_stream(stream, True) + credit_card = self._str_coder_impl.decode_from_stream(stream, True) + city = self._str_coder_impl.decode_from_stream(stream, True) + state = self._str_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Person(id, name, email, credit_card, city, state, date_time, extra) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 26529818fa4d..8758e165c9e0 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -42,8 +42,11 @@ import json import apache_beam as beam +from apache_beam.transforms import window from apache_beam.utils.timestamp import Timestamp from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.models import auction_price from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames _LOGGER = logging.getLogger(__name__) @@ -71,6 +74,14 @@ def thread_target(): thread.join(timeout) +def setup_coder(): + beam.coders.registry.register_coder(nexmark_model.Auction, nexmark_model.AuctionCoder) + beam.coders.registry.register_coder(nexmark_model.Person, nexmark_model.PersonCoder) + beam.coders.registry.register_coder(nexmark_model.Bid, nexmark_model.BidCoder) + beam.coders.registry.register_coder(auction_bid.AuctionBid, auction_bid.AuctionBidCoder) + beam.coders.registry.register_coder(auction_price.AuctionPrice, auction_price.AuctionPriceCoder) + + class ParseEventFn(beam.DoFn): """Parses the raw event info into a Python objects. @@ -171,9 +182,10 @@ def process(self, elem): class CountAndLog(beam.PTransform): def expand(self, pcoll): - return ( - pcoll | "Count" >> beam.combiners.Count.Globally() - | "Log" >> beam.Map(log_count_info)) + return (pcoll + | 'window' >> beam.WindowInto(window.GlobalWindows()) + | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) def log_count_info(count): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index dece9b04b5f6..1c8a4a0ff3ff 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -22,14 +22,38 @@ from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +AUCTION_TAG = 'auctions' +BID_TAG = 'bids' +PERSON_TAG = 'person' + + def is_bid(event): return isinstance(event, nexmark_model.Bid) +def is_auction(event): + return isinstance(event, nexmark_model.Auction) + + class JustBids(beam.PTransform): def expand(self, pcoll): return pcoll | "IsBid" >> beam.Filter(is_bid) +class JustAuctions(beam.PTransform): + def expand(self, pcoll): + return pcoll | "IsAuction" >> beam.Filter(is_auction) + + +class AuctionByIdFn(beam.DoFn): + def process(self, element): + yield element.id, element + + +class BidByAuctionIdFn(beam.DoFn): + def process(self, element): + yield element.auction, element + + def auction_or_bid(event): return isinstance(event, nexmark_model.Bid) or isinstance(event, nexmark_model.Auction) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py new file mode 100644 index 000000000000..4e97825744c1 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +from apache_beam.testing.benchmarks.nexmark.queries import winning_bids + +def load(events, metadata=None): + return (events + | beam.Filter(nexmark_query_util.auction_or_bid) + | winning_bids.WinningBids()) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py new file mode 100644 index 000000000000..0c1001ff8508 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -0,0 +1,172 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.transforms.window import WindowFn +from apache_beam.transforms.window import IntervalWindow +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util + + +class AuctionOrBidWindow(IntervalWindow): + """Windows for open auctions and bids.""" + def __init__(self, start, end, auction_id, is_auction_window): + super(AuctionOrBidWindow, self).__init__(start, end) + self.auction = auction_id + self.is_auction_window = is_auction_window + + @staticmethod + def for_auction(timestamp, auction: nexmark_model.Auction): + return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True) + + @staticmethod + def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid): + return AuctionOrBidWindow(timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) + + def is_auction_window(self): + return self.is_auction_window + + def __str__(self): + return ('AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' + % (self.start, self.end, self.auction, self.is_auction_window)) + + +class AuctionOrBidWindowCoder(FastCoder): + + def _create_impl(self): + return AuctionOrBidWindowCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + +class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl): + _super_coder_impl = coder_impl.IntervalWindowCoderImpl() + _id_coder_impl = coder_impl.VarIntCoderImpl() + _bool_coder_impl = coder_impl.BooleanCoderImpl() + + def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested): + self._super_coder_impl.encode_to_stream(value, stream, True) + self._id_coder_impl.encode_to_stream(value.auction, stream, True) + self._bool_coder_impl.encode_to_stream(value.is_auction_window, stream, True) + + def decode_from_stream(self, stream, nested): + super_window = self._super_coder_impl.decode_from_stream(stream, True) + auction = self._id_coder_impl.decode_from_stream(stream, True) + is_auction = self._bool_coder_impl.decode_from_stream(stream, True) + return AuctionOrBidWindow(super_window.start, super_window.end, auction, is_auction) + + +class AuctionOrBidWindowFn(WindowFn): + def __init__(self, expected_duration_micro): + self.expected_duration = expected_duration_micro + + def assign(self, assign_context): + event = assign_context.element + if isinstance(event, nexmark_model.Auction): + return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)] + elif isinstance(event, nexmark_model.Bid): + return [AuctionOrBidWindow.for_bid(self.expected_duration, assign_context.timestamp, event)] + else: + raise ValueError('%s can only assign windows to auctions and bids, but received %s' % (self.__class__.__name__, event)) + + def merge(self, merge_context): + id_to_auction = {} + id_to_bid = {} + for window in merge_context.windows: + if window.is_auction_window(): + id_to_auction[window.auction] = window + else: + if window.auction in id_to_bid: + bid_windows = id_to_bid[window.auction] + else: + bid_windows = [] + id_to_bid[window.auction] = bid_windows + bid_windows.append(window) + + for auction, auction_window in id_to_auction.items(): + bid_window = id_to_bid.get(auction) + if bid_window is not None: + to_merge = [] + for bid in bid_window: + if bid.start < auction_window.end: + to_merge.append(bid) + if len(to_merge) > 0: + to_merge.append(auction_window) + merge_context.merge(to_merge, auction_window) + + def get_window_coder(self): + return AuctionOrBidWindowCoder() + + def get_transformed_output_time(self, window, input_timestamp): + return window.max_timestamp() + + +class JoinAuctionBidFn(beam.DoFn): + @staticmethod + def higher_bid(bid, other): + if bid.price > other.price: + return True + elif bid.price < other.price: + return False + else: + return bid.dateTime < other.dateTime + + def process(self, element): + auction_id, group = element + auctions = group[nexmark_query_util.AUCTION_TAG] + auction = auctions[0] if auctions else None + if auction is None: + return + best_bid = None + for bid in group[nexmark_query_util.BID_TAG]: + if bid.price < auction.reserve: + continue + if best_bid is None or JoinAuctionBidFn.higher_bid(bid, best_bid): + best_bid = bid + if best_bid is None: + return + yield auction_bid.AuctionBid(auction, best_bid) + + + +class WinningBids(beam.PTransform): + def __init__(self): + expected_duration = 16667000 #TODO: change this to be calculated by event generation + self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration) + + + def expand(self, pcoll): + events = pcoll | beam.WindowInto(self.auction_or_bid_windowFn) + + auction_by_id = (events + | nexmark_query_util.JustAuctions() + | 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn())) + bids_by_auction_id = (events + | nexmark_query_util.JustBids() + | 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn())) + + return ({nexmark_query_util.AUCTION_TAG: auction_by_id, + nexmark_query_util.BID_TAG: bids_by_auction_id} + | beam.CoGroupByKey() + | beam.ParDo(JoinAuctionBidFn())) From 1fbf968b80f96f77dbcef4f9246e1b1e34f941bc Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Tue, 28 Jul 2020 23:59:11 +0000 Subject: [PATCH 05/16] yapf style, implemented query0 to use the coder --- .../nexmark/models/nexmark_model.py | 19 +++++++--- .../benchmarks/nexmark/nexmark_util.py | 35 +++++++++++++------ .../nexmark/queries/nexmark_query_util.py | 5 ++- .../benchmarks/nexmark/queries/query0.py | 12 +++++-- 4 files changed, 52 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 44bd19a23ac1..921ba54d5a5c 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -28,6 +28,7 @@ """ from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder +from apache_beam.coders.coders import StrUtf8Coder from apache_beam.testing.benchmarks.nexmark import nexmark_util @@ -128,7 +129,7 @@ def __repr__(self): class AuctionCoderImpl(coder_impl.StreamCoderImpl): _int_coder_impl = coder_impl.VarIntCoderImpl() - _str_coder_impl = coder_impl.BytesCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() _time_coder_impl = coder_impl.TimestampCoderImpl() def encode_to_stream(self, value: Auction, stream, nested): @@ -154,12 +155,22 @@ def decode_from_stream(self, stream, nested): seller = self._int_coder_impl.decode_from_stream(stream, True) category = self._int_coder_impl.decode_from_stream(stream, True) extra = self._str_coder_impl.decode_from_stream(stream, True) - return Auction(id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra) + return Auction( + id, + item_name, + description, + initial_bid, + reserve, + date_time, + expires, + seller, + category, + extra) class BidCoderImpl(coder_impl.StreamCoderImpl): _int_coder_impl = coder_impl.VarIntCoderImpl() - _str_coder_impl = coder_impl.BytesCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() _time_coder_impl = coder_impl.TimestampCoderImpl() def encode_to_stream(self, value: Bid, stream, nested): @@ -180,7 +191,7 @@ def decode_from_stream(self, stream, nested): class PersonCoderImpl(coder_impl.StreamCoderImpl): _int_coder_impl = coder_impl.VarIntCoderImpl() - _str_coder_impl = coder_impl.BytesCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() _time_coder_impl = coder_impl.TimestampCoderImpl() def encode_to_stream(self, value: Person, stream, nested): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 8758e165c9e0..e582dcc39ebc 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -75,11 +75,15 @@ def thread_target(): def setup_coder(): - beam.coders.registry.register_coder(nexmark_model.Auction, nexmark_model.AuctionCoder) - beam.coders.registry.register_coder(nexmark_model.Person, nexmark_model.PersonCoder) + beam.coders.registry.register_coder( + nexmark_model.Auction, nexmark_model.AuctionCoder) + beam.coders.registry.register_coder( + nexmark_model.Person, nexmark_model.PersonCoder) beam.coders.registry.register_coder(nexmark_model.Bid, nexmark_model.BidCoder) - beam.coders.registry.register_coder(auction_bid.AuctionBid, auction_bid.AuctionBidCoder) - beam.coders.registry.register_coder(auction_price.AuctionPrice, auction_price.AuctionPriceCoder) + beam.coders.registry.register_coder( + auction_bid.AuctionBid, auction_bid.AuctionBidCoder) + beam.coders.registry.register_coder( + auction_price.AuctionPrice, auction_price.AuctionPriceCoder) class ParseEventFn(beam.DoFn): @@ -144,7 +148,8 @@ class ParseJsonEvnetFn(beam.DoFn): def process(self, elem): json_dict = json.loads(elem) if type(json_dict[FieldNames.DATE_TIME]) is dict: - json_dict[FieldNames.DATE_TIME] = json_dict[FieldNames.DATE_TIME]['millis'] + json_dict[FieldNames.DATE_TIME] = json_dict[ + FieldNames.DATE_TIME]['millis'] if FieldNames.NAME in json_dict: yield nexmark_model.Person( json_dict[FieldNames.ID], @@ -182,10 +187,11 @@ def process(self, elem): class CountAndLog(beam.PTransform): def expand(self, pcoll): - return (pcoll - | 'window' >> beam.WindowInto(window.GlobalWindows()) - | "Count" >> beam.combiners.Count.Globally() - | "Log" >> beam.Map(log_count_info)) + return ( + pcoll + | 'window' >> beam.WindowInto(window.GlobalWindows()) + | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) def log_count_info(count): @@ -199,7 +205,16 @@ def display(elm): def model_to_json(model): - return json.dumps(model.__dict__, separators=(',', ':')) + return json.dumps({k: timestamp_to_int(v) + for k, v in model.__dict__.items()}, + separators=(',', ':')) + + +def timestamp_to_int(cand): + if isinstance(cand, Timestamp): + return cand.micros // 1000 + else: + return cand def millis_to_timestamp(millis: int): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index 1c8a4a0ff3ff..f23d93275526 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -17,11 +17,9 @@ """Utilities for working with NEXmark data stream.""" - import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model - AUCTION_TAG = 'auctions' BID_TAG = 'bids' PERSON_TAG = 'person' @@ -56,4 +54,5 @@ def process(self, element): def auction_or_bid(event): - return isinstance(event, nexmark_model.Bid) or isinstance(event, nexmark_model.Auction) + return isinstance(event, nexmark_model.Bid) or isinstance( + event, nexmark_model.Auction) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index 8df56069f3c1..27dc34ed9b27 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -33,7 +33,15 @@ from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn +class round_tripFn(beam.DoFn): + def process(self, element): + coder = element.CODER + byte_value = coder.encode(element) + recon = coder.decode(byte_value) + yield recon + + def load(events, query_args=None): return ( - events | 'serialization' >> beam.Map(repr) - | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) + events + | 'serialization_and_deserialization' >> beam.ParDo(round_tripFn())) From 37add17a70ce74955a9b2ef24c50fcc84ac5de07 Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Wed, 29 Jul 2020 00:01:11 +0000 Subject: [PATCH 06/16] yapf style change --- .../benchmarks/nexmark/queries/query9.py | 8 +-- .../nexmark/queries/winning_bids.py | 50 +++++++++++-------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py index 4e97825744c1..7ac6d01689db 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py @@ -21,7 +21,9 @@ from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util from apache_beam.testing.benchmarks.nexmark.queries import winning_bids + def load(events, metadata=None): - return (events - | beam.Filter(nexmark_query_util.auction_or_bid) - | winning_bids.WinningBids()) + return ( + events + | beam.Filter(nexmark_query_util.auction_or_bid) + | winning_bids.WinningBids()) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index 0c1001ff8508..5dac0be72b75 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -40,18 +40,19 @@ def for_auction(timestamp, auction: nexmark_model.Auction): @staticmethod def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid): - return AuctionOrBidWindow(timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) + return AuctionOrBidWindow( + timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) def is_auction_window(self): return self.is_auction_window def __str__(self): - return ('AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' - % (self.start, self.end, self.auction, self.is_auction_window)) + return ( + 'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' % + (self.start, self.end, self.auction, self.is_auction_window)) class AuctionOrBidWindowCoder(FastCoder): - def _create_impl(self): return AuctionOrBidWindowCoderImpl() @@ -68,13 +69,15 @@ class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl): def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested): self._super_coder_impl.encode_to_stream(value, stream, True) self._id_coder_impl.encode_to_stream(value.auction, stream, True) - self._bool_coder_impl.encode_to_stream(value.is_auction_window, stream, True) + self._bool_coder_impl.encode_to_stream( + value.is_auction_window, stream, True) def decode_from_stream(self, stream, nested): super_window = self._super_coder_impl.decode_from_stream(stream, True) auction = self._id_coder_impl.decode_from_stream(stream, True) is_auction = self._bool_coder_impl.decode_from_stream(stream, True) - return AuctionOrBidWindow(super_window.start, super_window.end, auction, is_auction) + return AuctionOrBidWindow( + super_window.start, super_window.end, auction, is_auction) class AuctionOrBidWindowFn(WindowFn): @@ -86,9 +89,14 @@ def assign(self, assign_context): if isinstance(event, nexmark_model.Auction): return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)] elif isinstance(event, nexmark_model.Bid): - return [AuctionOrBidWindow.for_bid(self.expected_duration, assign_context.timestamp, event)] + return [ + AuctionOrBidWindow.for_bid( + self.expected_duration, assign_context.timestamp, event) + ] else: - raise ValueError('%s can only assign windows to auctions and bids, but received %s' % (self.__class__.__name__, event)) + raise ValueError( + '%s can only assign windows to auctions and bids, but received %s' % + (self.__class__.__name__, event)) def merge(self, merge_context): id_to_auction = {} @@ -149,24 +157,26 @@ def process(self, element): yield auction_bid.AuctionBid(auction, best_bid) - class WinningBids(beam.PTransform): def __init__(self): - expected_duration = 16667000 #TODO: change this to be calculated by event generation + expected_duration = 16667000 #TODO: change this to be calculated by event generation self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration) - def expand(self, pcoll): events = pcoll | beam.WindowInto(self.auction_or_bid_windowFn) - auction_by_id = (events - | nexmark_query_util.JustAuctions() - | 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn())) - bids_by_auction_id = (events - | nexmark_query_util.JustBids() - | 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn())) - - return ({nexmark_query_util.AUCTION_TAG: auction_by_id, - nexmark_query_util.BID_TAG: bids_by_auction_id} + auction_by_id = ( + events + | nexmark_query_util.JustAuctions() + | 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn())) + bids_by_auction_id = ( + events + | nexmark_query_util.JustBids() + | 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn())) + + return ({ + nexmark_query_util.AUCTION_TAG: auction_by_id, + nexmark_query_util.BID_TAG: bids_by_auction_id + } | beam.CoGroupByKey() | beam.ParDo(JoinAuctionBidFn())) From 1110ec9fe56f6f1a0233647bd64277da43cb5023 Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Wed, 29 Jul 2020 23:53:26 +0000 Subject: [PATCH 07/16] naming changes --- .../testing/benchmarks/nexmark/nexmark_util.py | 13 +++++++++---- .../benchmarks/nexmark/queries/winning_bids.py | 14 +++++++------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index e582dcc39ebc..9a3b7ae1ec8b 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -205,14 +205,19 @@ def display(elm): def model_to_json(model): - return json.dumps({k: timestamp_to_int(v) - for k, v in model.__dict__.items()}, - separators=(',', ':')) + return json.dumps(construct_json_dict(model), separators=(',', ':')) -def timestamp_to_int(cand): +def construct_json_dict(model): + return {k: unnest_to_json(v) for k, v in model.__dict__.items()} + + +def unnest_to_json(cand): if isinstance(cand, Timestamp): return cand.micros // 1000 + elif isinstance( + cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): + return construct_json_dict(cand) else: return cand diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index 5dac0be72b75..8aef9978187b 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -43,7 +43,7 @@ def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid): return AuctionOrBidWindow( timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) - def is_auction_window(self): + def is_auction_window_fn(self): return self.is_auction_window def __str__(self): @@ -102,7 +102,7 @@ def merge(self, merge_context): id_to_auction = {} id_to_bid = {} for window in merge_context.windows: - if window.is_auction_window(): + if window.is_auction_window_fn(): id_to_auction[window.auction] = window else: if window.auction in id_to_bid: @@ -113,12 +113,12 @@ def merge(self, merge_context): bid_windows.append(window) for auction, auction_window in id_to_auction.items(): - bid_window = id_to_bid.get(auction) - if bid_window is not None: + bid_window_list = id_to_bid.get(auction) + if bid_window_list is not None: to_merge = [] - for bid in bid_window: - if bid.start < auction_window.end: - to_merge.append(bid) + for bid_window in bid_window_list: + if bid_window.start < auction_window.end: + to_merge.append(bid_window) if len(to_merge) > 0: to_merge.append(auction_window) merge_context.merge(to_merge, auction_window) From 0129cd6b4cddc3e263f22f627f004068c99cf87b Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Thu, 30 Jul 2020 00:01:49 +0000 Subject: [PATCH 08/16] nexmark launcher deserialization, timestamping and result counting and printing to file --- .../benchmarks/nexmark/nexmark_launcher.py | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py index 79032ad483b0..d7332d67fba5 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py @@ -67,6 +67,7 @@ import sys import uuid +from apache_beam.transforms import window from google.cloud import pubsub import apache_beam as beam @@ -75,10 +76,12 @@ from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TestOptions +from apache_beam.testing.benchmarks.nexmark import nexmark_util from apache_beam.testing.benchmarks.nexmark.nexmark_util import Command from apache_beam.testing.benchmarks.nexmark.queries import query0 from apache_beam.testing.benchmarks.nexmark.queries import query1 from apache_beam.testing.benchmarks.nexmark.queries import query2 +from apache_beam.testing.benchmarks.nexmark.queries import query9 class NexmarkLauncher(object): @@ -110,7 +113,7 @@ def parse_args(self): type=int, action='append', required=True, - choices=[0, 1, 2], + choices=[0, 1, 2, 9], help='Query to run') parser.add_argument( @@ -193,15 +196,27 @@ def generate_events(self): else: raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub( topic=topic.full_name) - + raw_events = ( + raw_events + | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn()) + | 'timestamping' >> + beam.Map(lambda e: window.TimestampedValue(e, e.dateTime))) return raw_events def run_query(self, query, query_args, query_errors): try: self.parse_args() self.pipeline = beam.Pipeline(options=self.pipeline_options) - raw_events = self.generate_events() - query.load(raw_events, query_args) + nexmark_util.setup_coder() + events = self.generate_events() + output = query.load(events, query_args) + # print results + ( + output | beam.Map(repr) + | beam.io.WriteToText( + "py-query-result", file_name_suffix='.json', num_shards=1)) + output | nexmark_util.CountAndLog() + # end of results output result = self.pipeline.run() job_duration = ( self.pipeline_options.view_as(TestOptions).wait_until_finish_duration) @@ -228,6 +243,7 @@ def run(self): 0: query0, 1: query1, 2: query2, # TODO(mariagh): Add more queries. + 9: query9 } # TODO(mariagh): Move to a config file. @@ -263,4 +279,4 @@ def run(self): if __name__ == '__main__': launcher = NexmarkLauncher() launcher.run() - launcher.cleanup() + # launcher.cleanup() From 7420cb489d6d06ea34752b257d45ade8b95ed36a Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Thu, 30 Jul 2020 19:11:21 +0000 Subject: [PATCH 09/16] pylint style changes --- .../testing/benchmarks/nexmark/models/auction_price.py | 2 +- .../testing/benchmarks/nexmark/queries/nexmark_query_util.py | 3 +-- .../apache_beam/testing/benchmarks/nexmark/queries/query0.py | 1 - .../testing/benchmarks/nexmark/queries/winning_bids.py | 5 +++-- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py index 7f081be92591..a9b41e877e0d 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -52,4 +52,4 @@ def decode_from_stream(self, stream, nested): auction = self._int_coder_impl.decode_from_stream(stream, True) price = self._int_coder_impl.decode_from_stream(stream, True) - return AuctionPrice(auction, price) \ No newline at end of file + return AuctionPrice(auction, price) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index f23d93275526..e6e581a6ba6c 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -54,5 +54,4 @@ def process(self, element): def auction_or_bid(event): - return isinstance(event, nexmark_model.Bid) or isinstance( - event, nexmark_model.Auction) + return isinstance(event, (nexmark_model.Auction, nexmark_model.Bid)) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index 27dc34ed9b27..681c8cd4fc52 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -30,7 +30,6 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn class round_tripFn(beam.DoFn): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index 8aef9978187b..7faa9b4403bd 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -141,7 +141,7 @@ def higher_bid(bid, other): return bid.dateTime < other.dateTime def process(self, element): - auction_id, group = element + _, group = element auctions = group[nexmark_query_util.AUCTION_TAG] auction = auctions[0] if auctions else None if auction is None: @@ -159,7 +159,8 @@ def process(self, element): class WinningBids(beam.PTransform): def __init__(self): - expected_duration = 16667000 #TODO: change this to be calculated by event generation + #TODO: change this to be calculated by event generation + expected_duration = 16667000 self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration) def expand(self, pcoll): From 07896c96511eb55f07d5bd607a38284a55f7cf76 Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Thu, 30 Jul 2020 21:17:38 +0000 Subject: [PATCH 10/16] fieldname changes to comply with python convention --- .../nexmark/models/nexmark_model.py | 28 +++++++++---------- .../benchmarks/nexmark/nexmark_launcher.py | 2 +- .../benchmarks/nexmark/queries/query1.py | 2 +- .../nexmark/queries/winning_bids.py | 2 +- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 921ba54d5a5c..cd325803c08d 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -49,11 +49,11 @@ def __init__( self, id, name, email, credit_card, city, state, date_time, extra=None): self.id = id self.name = name - self.emailAddress = email # key - self.creditCard = credit_card + self.email_address = email # key + self.credit_card = credit_card self.city = city self.state = state - self.dateTime = date_time + self.date_time = date_time self.extra = extra def __repr__(self): @@ -89,11 +89,11 @@ def __init__( category, extra=None): self.id = id - self.itemName = item_name # key + self.item_name = item_name # key self.description = description - self.initialBid = initial_bid + self.initial_bid = initial_bid self.reserve = reserve_price - self.dateTime = date_time + self.date_time = date_time self.expires = expires self.seller = seller self.category = category @@ -120,7 +120,7 @@ def __init__(self, auction, bidder, price, date_time, extra=None): self.auction = auction # key self.bidder = bidder self.price = price - self.dateTime = date_time + self.date_time = date_time self.extra = extra def __repr__(self): @@ -134,11 +134,11 @@ class AuctionCoderImpl(coder_impl.StreamCoderImpl): def encode_to_stream(self, value: Auction, stream, nested): self._int_coder_impl.encode_to_stream(value.id, stream, True) - self._str_coder_impl.encode_to_stream(value.itemName, stream, True) + self._str_coder_impl.encode_to_stream(value.item_name, stream, True) self._str_coder_impl.encode_to_stream(value.description, stream, True) - self._int_coder_impl.encode_to_stream(value.initialBid, stream, True) + self._int_coder_impl.encode_to_stream(value.initial_bid, stream, True) self._int_coder_impl.encode_to_stream(value.reserve, stream, True) - self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._time_coder_impl.encode_to_stream(value.date_time, stream, True) self._time_coder_impl.encode_to_stream(value.expires, stream, True) self._int_coder_impl.encode_to_stream(value.seller, stream, True) self._int_coder_impl.encode_to_stream(value.category, stream, True) @@ -177,7 +177,7 @@ def encode_to_stream(self, value: Bid, stream, nested): self._int_coder_impl.encode_to_stream(value.auction, stream, True) self._int_coder_impl.encode_to_stream(value.bidder, stream, True) self._int_coder_impl.encode_to_stream(value.price, stream, True) - self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._time_coder_impl.encode_to_stream(value.date_time, stream, True) self._str_coder_impl.encode_to_stream(value.extra, stream, True) def decode_from_stream(self, stream, nested): @@ -197,11 +197,11 @@ class PersonCoderImpl(coder_impl.StreamCoderImpl): def encode_to_stream(self, value: Person, stream, nested): self._int_coder_impl.encode_to_stream(value.id, stream, True) self._str_coder_impl.encode_to_stream(value.name, stream, True) - self._str_coder_impl.encode_to_stream(value.emailAddress, stream, True) - self._str_coder_impl.encode_to_stream(value.creditCard, stream, True) + self._str_coder_impl.encode_to_stream(value.email_address, stream, True) + self._str_coder_impl.encode_to_stream(value.credit_card, stream, True) self._str_coder_impl.encode_to_stream(value.city, stream, True) self._str_coder_impl.encode_to_stream(value.state, stream, True) - self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._time_coder_impl.encode_to_stream(value.date_time, stream, True) self._str_coder_impl.encode_to_stream(value.extra, stream, True) def decode_from_stream(self, stream, nested): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py index d7332d67fba5..24c6da4fe1a1 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py @@ -200,7 +200,7 @@ def generate_events(self): raw_events | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn()) | 'timestamping' >> - beam.Map(lambda e: window.TimestampedValue(e, e.dateTime))) + beam.Map(lambda e: window.TimestampedValue(e, e.date_time))) return raw_events def run_query(self, query, query_args, query_errors): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py index a807259596c8..583dc46c7492 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py @@ -40,5 +40,5 @@ def load(raw_events, query_args=None): lambda bid: nexmark_model.Bid( bid.auction, bid.bidder, (bid.price * 89) // 100, - bid.dateTime, + bid.date_time, bid.extra))) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index 7faa9b4403bd..53e3a20e8e50 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -138,7 +138,7 @@ def higher_bid(bid, other): elif bid.price < other.price: return False else: - return bid.dateTime < other.dateTime + return bid.date_time < other.date_time def process(self, element): _, group = element From 061041c50777b2960259b837df78ed27cd3de96f Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Sun, 2 Aug 2020 06:33:32 +0000 Subject: [PATCH 11/16] resolve issues brought up in code review --- .../nexmark/models/nexmark_model.py | 3 -- .../benchmarks/nexmark/nexmark_launcher.py | 2 +- .../benchmarks/nexmark/nexmark_util.py | 5 ++- .../benchmarks/nexmark/queries/query0.py | 4 +-- .../nexmark/queries/winning_bids.py | 32 +++++++++---------- 5 files changed, 21 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index cd325803c08d..fb6217bec170 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -61,9 +61,6 @@ def __repr__(self): class AuctionCoder(FastCoder): - def to_type_hint(self): - pass - def _create_impl(self): return AuctionCoderImpl() diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py index 24c6da4fe1a1..5ff1654dad60 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py @@ -279,4 +279,4 @@ def run(self): if __name__ == '__main__': launcher = NexmarkLauncher() launcher.run() - # launcher.cleanup() + launcher.cleanup() diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 9a3b7ae1ec8b..603ea0f2584b 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -223,6 +223,5 @@ def unnest_to_json(cand): def millis_to_timestamp(millis: int): - second = millis // 1000 - micro_second = millis % 1000 * 1000 - return Timestamp(second, micro_second) + micro_second = millis * 1000 + return Timestamp(micros=micro_second) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index 681c8cd4fc52..a4c50550a6dc 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -32,7 +32,7 @@ import apache_beam as beam -class round_tripFn(beam.DoFn): +class RoundTripFn(beam.DoFn): def process(self, element): coder = element.CODER byte_value = coder.encode(element) @@ -43,4 +43,4 @@ def process(self, element): def load(events, query_args=None): return ( events - | 'serialization_and_deserialization' >> beam.ParDo(round_tripFn())) + | 'serialization_and_deserialization' >> beam.ParDo(RoundTripFn())) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index 53e3a20e8e50..802a07588e23 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -22,6 +22,7 @@ from apache_beam.coders.coders import FastCoder from apache_beam.transforms.window import WindowFn from apache_beam.transforms.window import IntervalWindow +from apache_beam.utils.timestamp import Duration from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.models import auction_bid from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util @@ -41,7 +42,10 @@ def for_auction(timestamp, auction: nexmark_model.Auction): @staticmethod def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid): return AuctionOrBidWindow( - timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) + timestamp, + timestamp + Duration(micros=expected_duration_micro * 2), + bid.auction, + False) def is_auction_window_fn(self): return self.is_auction_window @@ -99,21 +103,18 @@ def assign(self, assign_context): (self.__class__.__name__, event)) def merge(self, merge_context): - id_to_auction = {} - id_to_bid = {} + auction_id_to_auction_window = {} + auction_id_to_bid_window = {} for window in merge_context.windows: if window.is_auction_window_fn(): - id_to_auction[window.auction] = window + auction_id_to_auction_window[window.auction] = window else: - if window.auction in id_to_bid: - bid_windows = id_to_bid[window.auction] - else: - bid_windows = [] - id_to_bid[window.auction] = bid_windows - bid_windows.append(window) - - for auction, auction_window in id_to_auction.items(): - bid_window_list = id_to_bid.get(auction) + if window.auction not in auction_id_to_bid_window: + auction_id_to_bid_window[window.auction] = [] + auction_id_to_bid_window[window.auction].append(window) + + for auction, auction_window in auction_id_to_auction_window.items(): + bid_window_list = auction_id_to_bid_window.get(auction) if bid_window_list is not None: to_merge = [] for bid_window in bid_window_list: @@ -152,9 +153,8 @@ def process(self, element): continue if best_bid is None or JoinAuctionBidFn.higher_bid(bid, best_bid): best_bid = bid - if best_bid is None: - return - yield auction_bid.AuctionBid(auction, best_bid) + if best_bid: + yield auction_bid.AuctionBid(auction, best_bid) class WinningBids(beam.PTransform): From b75b65ff0d0978c6d54b98424d666fc595f1868f Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Mon, 3 Aug 2020 23:07:58 +0000 Subject: [PATCH 12/16] resolve py2.7 lint error for import and python3 type hint --- .../testing/benchmarks/nexmark/models/auction_bid.py | 5 +++-- .../benchmarks/nexmark/models/auction_price.py | 5 +++-- .../benchmarks/nexmark/models/nexmark_model.py | 11 +++++------ .../testing/benchmarks/nexmark/nexmark_util.py | 2 +- .../benchmarks/nexmark/queries/nexmark_query_util.py | 1 + .../benchmarks/nexmark/queries/winning_bids.py | 7 +++---- 6 files changed, 16 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py index 59bc9b29c3a6..bcb959e9fc1e 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py @@ -16,6 +16,8 @@ # """Result of WinningBid transform.""" +from __future__ import absolute_import + from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder from apache_beam.testing.benchmarks.nexmark import nexmark_util @@ -27,7 +29,6 @@ def _create_impl(self): return AuctionBidCoderImpl() def is_deterministic(self): - # type: () -> bool return True @@ -46,7 +47,7 @@ class AuctionBidCoderImpl(coder_impl.StreamCoderImpl): _auction_coder_impl = nexmark_model.AuctionCoderImpl() _bid_coder_Impl = nexmark_model.BidCoderImpl() - def encode_to_stream(self, value: AuctionBid, stream, nested): + def encode_to_stream(self, value, stream, nested): self._auction_coder_impl.encode_to_stream(value.auction, stream, True) self._bid_coder_Impl.encode_to_stream(value.bid, stream, True) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py index a9b41e877e0d..90f79f8dacf2 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -16,6 +16,8 @@ # """Result of Query2.""" +from __future__ import absolute_import + from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder from apache_beam.testing.benchmarks.nexmark import nexmark_util @@ -26,7 +28,6 @@ def _create_impl(self): return AuctionPriceCoderImpl() def is_deterministic(self): - # type: () -> bool return True @@ -44,7 +45,7 @@ def __repr__(self): class AuctionPriceCoderImpl(coder_impl.StreamCoderImpl): _int_coder_impl = coder_impl.VarIntCoderImpl() - def encode_to_stream(self, value: AuctionPrice, stream, nested): + def encode_to_stream(self, value, stream, nested): self._int_coder_impl.encode_to_stream(value.auction, stream, True) self._int_coder_impl.encode_to_stream(value.price, stream, True) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index fb6217bec170..f2fd4e0ec388 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,6 +26,8 @@ - The bid on an item for auction (Bid). """ +from __future__ import absolute_import + from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder from apache_beam.coders.coders import StrUtf8Coder @@ -37,7 +39,6 @@ def _create_impl(self): return PersonCoderImpl() def is_deterministic(self): - # type: () -> bool return True @@ -65,7 +66,6 @@ def _create_impl(self): return AuctionCoderImpl() def is_deterministic(self): - # type: () -> bool return True @@ -105,7 +105,6 @@ def _create_impl(self): return BidCoderImpl() def is_deterministic(self): - # type: () -> bool return True @@ -129,7 +128,7 @@ class AuctionCoderImpl(coder_impl.StreamCoderImpl): _str_coder_impl = StrUtf8Coder().get_impl() _time_coder_impl = coder_impl.TimestampCoderImpl() - def encode_to_stream(self, value: Auction, stream, nested): + def encode_to_stream(self, value, stream, nested): self._int_coder_impl.encode_to_stream(value.id, stream, True) self._str_coder_impl.encode_to_stream(value.item_name, stream, True) self._str_coder_impl.encode_to_stream(value.description, stream, True) @@ -170,7 +169,7 @@ class BidCoderImpl(coder_impl.StreamCoderImpl): _str_coder_impl = StrUtf8Coder().get_impl() _time_coder_impl = coder_impl.TimestampCoderImpl() - def encode_to_stream(self, value: Bid, stream, nested): + def encode_to_stream(self, value, stream, nested): self._int_coder_impl.encode_to_stream(value.auction, stream, True) self._int_coder_impl.encode_to_stream(value.bidder, stream, True) self._int_coder_impl.encode_to_stream(value.price, stream, True) @@ -191,7 +190,7 @@ class PersonCoderImpl(coder_impl.StreamCoderImpl): _str_coder_impl = StrUtf8Coder().get_impl() _time_coder_impl = coder_impl.TimestampCoderImpl() - def encode_to_stream(self, value: Person, stream, nested): + def encode_to_stream(self, value, stream, nested): self._int_coder_impl.encode_to_stream(value.id, stream, True) self._str_coder_impl.encode_to_stream(value.name, stream, True) self._str_coder_impl.encode_to_stream(value.email_address, stream, True) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 603ea0f2584b..e61da4b5e30c 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -222,6 +222,6 @@ def unnest_to_json(cand): return cand -def millis_to_timestamp(millis: int): +def millis_to_timestamp(millis): micro_second = millis * 1000 return Timestamp(micros=micro_second) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index e6e581a6ba6c..3e43e96c275c 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -16,6 +16,7 @@ # """Utilities for working with NEXmark data stream.""" +from __future__ import absolute_import import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index 802a07588e23..b7ac65291bca 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -36,11 +36,11 @@ def __init__(self, start, end, auction_id, is_auction_window): self.is_auction_window = is_auction_window @staticmethod - def for_auction(timestamp, auction: nexmark_model.Auction): + def for_auction(timestamp, auction): return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True) @staticmethod - def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid): + def for_bid(expected_duration_micro, timestamp, bid): return AuctionOrBidWindow( timestamp, timestamp + Duration(micros=expected_duration_micro * 2), @@ -61,7 +61,6 @@ def _create_impl(self): return AuctionOrBidWindowCoderImpl() def is_deterministic(self): - # type: () -> bool return True @@ -70,7 +69,7 @@ class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl): _id_coder_impl = coder_impl.VarIntCoderImpl() _bool_coder_impl = coder_impl.BooleanCoderImpl() - def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested): + def encode_to_stream(self, value, stream, nested): self._super_coder_impl.encode_to_stream(value, stream, True) self._id_coder_impl.encode_to_stream(value.auction, stream, True) self._bool_coder_impl.encode_to_stream( From a857f0a6e1cb343e737c5e833ce8ed7af76eb6cf Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Tue, 4 Aug 2020 08:14:33 +0000 Subject: [PATCH 13/16] fix 3.7 lint errors --- .../testing/benchmarks/nexmark/nexmark_launcher.py | 6 +++--- .../testing/benchmarks/nexmark/nexmark_util.py | 8 ++++---- .../testing/benchmarks/nexmark/queries/query2.py | 2 +- .../testing/benchmarks/nexmark/queries/winning_bids.py | 8 ++++---- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py index 5ff1654dad60..67f6bb19b5a9 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py @@ -67,7 +67,6 @@ import sys import uuid -from apache_beam.transforms import window from google.cloud import pubsub import apache_beam as beam @@ -82,6 +81,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import query1 from apache_beam.testing.benchmarks.nexmark.queries import query2 from apache_beam.testing.benchmarks.nexmark.queries import query9 +from apache_beam.transforms import window class NexmarkLauncher(object): @@ -211,11 +211,11 @@ def run_query(self, query, query_args, query_errors): events = self.generate_events() output = query.load(events, query_args) # print results - ( + ( # pylint: disable=expression-not-assigned output | beam.Map(repr) | beam.io.WriteToText( "py-query-result", file_name_suffix='.json', num_shards=1)) - output | nexmark_util.CountAndLog() + output | nexmark_util.CountAndLog() # pylint: disable=expression-not-assigned # end of results output result = self.pipeline.run() job_duration = ( diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index e61da4b5e30c..7e2de637101f 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -37,17 +37,17 @@ from __future__ import absolute_import from __future__ import print_function +import json import logging import threading -import json import apache_beam as beam -from apache_beam.transforms import window -from apache_beam.utils.timestamp import Timestamp -from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.models import auction_bid from apache_beam.testing.benchmarks.nexmark.models import auction_price +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames +from apache_beam.transforms import window +from apache_beam.utils.timestamp import Timestamp _LOGGER = logging.getLogger(__name__) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py index edd6ec809c25..46c9d1bae9e6 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py @@ -29,8 +29,8 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util from apache_beam.testing.benchmarks.nexmark.models import auction_price +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util def load(events, metadata=None): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index b7ac65291bca..ffbe5fb9fb5b 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -20,12 +20,12 @@ import apache_beam as beam from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder -from apache_beam.transforms.window import WindowFn -from apache_beam.transforms.window import IntervalWindow -from apache_beam.utils.timestamp import Duration -from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +from apache_beam.transforms.window import IntervalWindow +from apache_beam.transforms.window import WindowFn +from apache_beam.utils.timestamp import Duration class AuctionOrBidWindow(IntervalWindow): From f1942f91f6a1dda37d6af61a4b9ef1baa276f093 Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Thu, 6 Aug 2020 22:04:23 +0000 Subject: [PATCH 14/16] resolve issues brought up in code review --- .../testing/benchmarks/nexmark/queries/query1.py | 9 ++++++--- .../testing/benchmarks/nexmark/queries/query2.py | 9 ++++----- .../testing/benchmarks/nexmark/queries/query9.py | 5 +++++ .../benchmarks/nexmark/queries/winning_bids.py | 16 ++++++++++++++++ 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py index 583dc46c7492..acb205fe2395 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py @@ -31,14 +31,17 @@ from apache_beam.testing.benchmarks.nexmark.models import nexmark_model from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +USD_TO_EURO = 0.89 -def load(raw_events, query_args=None): + +def load(events, query_args=None): return ( - raw_events + events | nexmark_query_util.JustBids() | 'ConvertToEuro' >> beam.Map( lambda bid: nexmark_model.Bid( bid.auction, - bid.bidder, (bid.price * 89) // 100, + bid.bidder, + bid.price * USD_TO_EURO, bid.date_time, bid.extra))) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py index 46c9d1bae9e6..112760444869 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py @@ -15,12 +15,11 @@ # limitations under the License. # -"""Nexmark Query 2: Select auctions by auction id. - -The Nexmark suite is a series of queries (streaming pipelines) performed -on a simulation of auction events. +""" +Query 2: Find bids with specific auction ids and show their bid price -This query selects auctions (items) that have a particular id. +This query selects Bids that have a particular auctiuon id, and output their +auction id with bid price. It illustrates a simple filter. """ diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py index 7ac6d01689db..180d8500d062 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py @@ -15,6 +15,11 @@ # limitations under the License. # +""" +Query 9: Winning-bids: extract the most recent of the highest bids +See winning_bids.py for detailed documentation +""" + from __future__ import absolute_import import apache_beam as beam diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index ffbe5fb9fb5b..6cb69db3e263 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -15,6 +15,22 @@ # limitations under the License. # +""" +A transform to find winning bids for each closed auction. In pseudo CQL syntax: + +SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime) +FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] +WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME +GROUP BY A.id + +We will also check that the winning bid is above the auction reserve. Note that +we ignore the auction opening bid value since it has no impact on which bid +eventually wins, if any. + +Our implementation will use a custom windowing function in order to bring bids +and auctions together without requiring global state. +""" + from __future__ import absolute_import import apache_beam as beam From d048b0541d41e7d7a131a5a1a598175a3cac5a6e Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Fri, 7 Aug 2020 21:09:44 +0000 Subject: [PATCH 15/16] better formatting for nexmark_util docs --- .../benchmarks/nexmark/nexmark_util.py | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 7e2de637101f..a77f85c5a896 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -87,22 +87,23 @@ def setup_coder(): class ParseEventFn(beam.DoFn): - """Parses the raw event info into a Python objects. + """ + Original parser for parsing raw events info into a Python objects. Each event line has the following format: person: ,name,email,credit_card,city, \ - state,timestamp,extra + state,timestamp,extra auction: ,item_name, description,initial_bid, \ - reserve_price,timestamp,expires,seller,category,extra + reserve_price,timestamp,expires,seller,category,extra bid: ,bidder,price,timestamp,extra For example: 'p12345,maria,maria@maria.com,1234-5678-9012-3456, \ - sunnyvale,CA,1528098831536' + sunnyvale,CA,1528098831536' 'a12345,car67,2012 hyundai elantra,15000,20000, \ - 1528098831536,20180630,maria,vehicle' + 1528098831536,20180630,maria,vehicle' 'b12345,maria,20000,1528098831536' """ def process(self, elem): @@ -126,24 +127,24 @@ class ParseJsonEvnetFn(beam.DoFn): Each event line has the following format: - person: {id,name,email,credit_card,city, \ - state,timestamp,extra} + person: {id,name,email,credit_card,city, \ + state,timestamp,extra} auction: {id,item_name, description,initial_bid, \ - reserve_price,timestamp,expires,seller,category,extra} - bid: {auction,bidder,price,timestamp,extra} + reserve_price,timestamp,expires,seller,category,extra} + bid: {auction,bidder,price,timestamp,extra} For example: {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\ - "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\ - "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["} + "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\ + "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["} {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\ - "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\ - "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"} + "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\ + "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"} {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\ - "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"} + "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"} """ def process(self, elem): json_dict = json.loads(elem) From 85e8b28ddda71174e9924f126264dcfb6e764288 Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Wed, 12 Aug 2020 18:00:48 +0000 Subject: [PATCH 16/16] exported StreamCoderImpl to fix issue with pydoc --- sdks/python/scripts/generate_pydoc.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh index d1f246c295e3..0091a22182a8 100755 --- a/sdks/python/scripts/generate_pydoc.sh +++ b/sdks/python/scripts/generate_pydoc.sh @@ -176,6 +176,7 @@ ignore_identifiers = [ 'apache_beam.typehints.typehints.TypeConstraint', 'apache_beam.typehints.typehints.validate_composite_type_param()', 'apache_beam.utils.windowed_value._IntervalWindowBase', + 'apache_beam.coders.coder_impl.StreamCoderImpl', # Private classes which are used within the same module 'apache_beam.transforms.external_test.PayloadBase',