From 03c8cc9c456e5400ac93f7acb50136ab90bb23d7 Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Fri, 24 Jul 2020 05:04:55 +0000 Subject: [PATCH 1/9] changed parser and serialization code to use the same json format to represent models, added auctionprice model and corrected the behavior of query2, corrected the behavior of query 0 and 1 to align with the nexmark specification. created fieldname file to map the fieldname with string literals. created nexmark_query_util to put transforms that gets reused across different queries. --- .../nexmark/models/auction_price.py | 29 ++++++++ .../benchmarks/nexmark/models/field_name.py | 40 +++++++++++ .../nexmark/models/nexmark_model.py | 38 ++++------ .../benchmarks/nexmark/nexmark_util.py | 71 +++++++++++++++++++ .../nexmark/queries/nexmark_query_util.py | 32 +++++++++ .../benchmarks/nexmark/queries/query0.py | 7 +- .../benchmarks/nexmark/queries/query1.py | 14 ++-- .../benchmarks/nexmark/queries/query2.py | 25 ++++--- 8 files changed, 210 insertions(+), 46 deletions(-) create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py new file mode 100644 index 000000000000..fc09331c312d --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Result of Query2.""" +import json + + +class AuctionPrice(object): + + def __init__(self, auction, price): + self.auction = auction + self.price = price + + def __repr__(self): + return json.dumps(self.__dict__) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py new file mode 100644 index 000000000000..bb41fd6d523f --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" Field names for de-serializing json representation of Models +""" + + +class FieldName: + id = 'id' + name = 'name' + emailAddress = 'emailAddress' + creditCard = 'creditCard' + city = "city" + state = "state" + dateTime = "dateTime" + extra = "extra" + itemName = "itemName" + description = "description" + initialBid = "initialBid" + reserve = "reserve" + expires = "expires" + seller = "seller" + category = "category" + auction = "auction" + bidder = "bidder" + price = "price" diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 2cd7bf0acf2e..14d4f974ca4d 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,27 +26,25 @@ - The bid on an item for auction (Bid). """ +import json class Person(object): "Author of an auction or a bid." def __init__( - self, id, name, email, credit_card, city, state, timestamp, extra=None): + self, id, name, email, credit_card, city, state, date_time, extra=None): self.id = id self.name = name - self.email = email # key - self.credit_card = credit_card + self.emailAddress = email # key + self.creditCard = credit_card self.city = city self.state = state - self.timestamp = timestamp + self.dateTime = date_time self.extra = extra def __repr__(self): - return 'Person({id}, {email})'.format( - **{ - 'id': self.id, 'email': self.email - }) + return json.dumps(self.__dict__) class Auction(object): @@ -59,41 +57,35 @@ def __init__( description, initial_bid, reserve_price, - timestamp, + date_time, expires, seller, category, extra=None): self.id = id - self.item_name = item_name # key + self.itemName = item_name # key self.description = description - self.initial_bid = initial_bid - self.reserve_price = reserve_price - self.timestamp = timestamp + self.initialBid = initial_bid + self.reserve = reserve_price + self.dateTime = date_time self.expires = expires self.seller = seller self.category = category self.extra = extra def __repr__(self): - return 'Auction({id}, {item_name})'.format( - **{ - 'id': self.id, 'item_name': self.item_name - }) + return json.dumps(self.__dict__) class Bid(object): "A bid for an item for auction." - def __init__(self, auction, bidder, price, timestamp, extra=None): + def __init__(self, auction, bidder, price, date_time, extra=None): self.auction = auction # key self.bidder = bidder self.price = price - self.timestamp = timestamp + self.dateTime = date_time self.extra = extra def __repr__(self): - return 'Bid({auction}, {bidder}, {price})'.format( - **{ - 'auction': self.auction, 'bidder': self.bidder, 'price': self.price - }) + return json.dumps(self.__dict__) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 9079596a1a2a..8da5eecccaf6 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -39,9 +39,11 @@ import logging import threading +import json import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldName _LOGGER = logging.getLogger(__name__) @@ -103,6 +105,75 @@ def process(self, elem): yield event +class ParseJsonEvnetFn(beam.DoFn): + """Parses the raw event info into a Python objects. + + Each event line has the following format: + + person: {id,name,email,credit_card,city, \ + state,timestamp,extra} + auction: {id,item_name, description,initial_bid, \ + reserve_price,timestamp,expires,seller,category,extra} + bid: {auction,bidder,price,timestamp,extra} + + For example: + + {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\ + "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\ + "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["} + + {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\ + "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\ + "expires":1528098840451,"seller":1000,"category":13,"extra":"zcurskupiz"} + + {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\ + "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"} + """ + def process(self, elem): + json_dict = json.loads(elem) + if type(json_dict[FieldName.dateTime]) is dict: + json_dict[FieldName.dateTime] = json_dict[FieldName.dateTime]['millis'] + if FieldName.name in json_dict: + yield nexmark_model.Person(json_dict[FieldName.id], + json_dict[FieldName.name], + json_dict[FieldName.emailAddress], + json_dict[FieldName.creditCard], + json_dict[FieldName.city], + json_dict[FieldName.state], + json_dict[FieldName.dateTime], + json_dict[FieldName.extra]) + elif FieldName.itemName in json_dict: + yield nexmark_model.Auction(json_dict[FieldName.id], + json_dict[FieldName.itemName], + json_dict[FieldName.description], + json_dict[FieldName.initialBid], + json_dict[FieldName.reserve], + json_dict[FieldName.dateTime], + json_dict[FieldName.expires], + json_dict[FieldName.seller], + json_dict[FieldName.category], + json_dict[FieldName.extra]) + elif FieldName.auction in json_dict: + yield nexmark_model.Bid(json_dict[FieldName.auction], + json_dict[FieldName.bidder], + json_dict[FieldName.price], + json_dict[FieldName.dateTime], + json_dict[FieldName.extra]) + else: + raise ValueError('Invalid event: %s.' % str(json_dict)) + + +class CountAndLog(beam.PTransform): + def expand(self, pcoll): + return (pcoll | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) + + +def log_count_info(count): + logging.info('Query resulted in %d results', count) + return count + + def display(elm): logging.debug(elm) return elm diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py new file mode 100644 index 000000000000..8724d81a33d9 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities for working with NEXmark data stream.""" + +import logging + +import apache_beam as beam +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model + + +def is_bid(event): + return isinstance(event, nexmark_model.Bid) + + +class JustBids(beam.PTransform): + def expand(self, pcoll): + return pcoll | "IsBid" >> beam.Filter(is_bid) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index c2c0d509c0f4..2245a78485ce 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -30,8 +30,9 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn +from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn -def load(raw_events, query_args=None): - return raw_events | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) # pylint: disable=expression-not-assigned +def load(events, query_args=None): + return (events | 'serialization' >> beam.Map(repr) + | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py index 95f3f162fbdf..a807259596c8 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py @@ -29,20 +29,16 @@ import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -from apache_beam.testing.benchmarks.nexmark.nexmark_util import display +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util def load(raw_events, query_args=None): return ( raw_events - | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) - | 'FilterInBids' >> - beam.Filter(lambda event: isinstance(event, nexmark_model.Bid)) + | nexmark_query_util.JustBids() | 'ConvertToEuro' >> beam.Map( lambda bid: nexmark_model.Bid( bid.auction, - bid.bidder, (float(bid.price) * 89) // 100, - bid.timestamp, - bid.extra)) - | 'DisplayQuery1' >> beam.Map(display)) # pylint: disable=expression-not-assigned + bid.bidder, (bid.price * 89) // 100, + bid.dateTime, + bid.extra))) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py index 51504a600dce..a104aa0c4857 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py @@ -29,17 +29,20 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -from apache_beam.testing.benchmarks.nexmark.nexmark_util import display +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +from apache_beam.testing.benchmarks.nexmark.models import auction_price -def load(raw_events, metadata=None): +def load(events, metadata=None): return ( - raw_events - | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) - | 'FilterInAuctionsWithSelectedId' >> beam.Filter( - lambda event: ( - isinstance(event, nexmark_model.Auction) and event.id == metadata. - get('auction_id'))) - | 'DisplayQuery2' >> beam.Map(display)) # pylint: disable=expression-not-assigned + events + | nexmark_query_util.JustBids() + | beam.Filter(lambda bid: bid.auction % metadata.get('auction_skip') == 0) + | 'project' >> beam.Map( + lambda bid: auction_price.AuctionPrice( + bid.auction, bid.price))) + # | 'FilterInAuctionsWithSelectedId' >> beam.Filter( + # lambda event: ( + # isinstance(event, nexmark_model.Auction) and event.id == metadata. + # get('auction_id'))) + # | 'DisplayQuery2' >> beam.Map(display)) # pylint: disable=expression-not-assigned From ff865ef882be9d8a06f7f4a494b0cad501bbeafd Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Fri, 24 Jul 2020 17:33:18 +0000 Subject: [PATCH 2/9] changed repr and aligned code style. Changed repr for all models to eliminate spaces in their json string representation; aligned code style for some files --- .../nexmark/models/auction_price.py | 3 +- .../benchmarks/nexmark/models/field_name.py | 36 +++++----- .../nexmark/models/nexmark_model.py | 6 +- .../benchmarks/nexmark/nexmark_util.py | 66 ++++++++++--------- .../nexmark/queries/nexmark_query_util.py | 6 +- .../benchmarks/nexmark/queries/query0.py | 6 +- .../benchmarks/nexmark/queries/query2.py | 13 ++-- 7 files changed, 70 insertions(+), 66 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py index fc09331c312d..7cb60ec1bd54 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -20,10 +20,9 @@ class AuctionPrice(object): - def __init__(self, auction, price): self.auction = auction self.price = price def __repr__(self): - return json.dumps(self.__dict__) + return json.dumps(self.__dict__, separators=(',', ':')) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py index bb41fd6d523f..c564d8689cc7 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py @@ -20,21 +20,21 @@ class FieldName: - id = 'id' - name = 'name' - emailAddress = 'emailAddress' - creditCard = 'creditCard' - city = "city" - state = "state" - dateTime = "dateTime" - extra = "extra" - itemName = "itemName" - description = "description" - initialBid = "initialBid" - reserve = "reserve" - expires = "expires" - seller = "seller" - category = "category" - auction = "auction" - bidder = "bidder" - price = "price" + ID = 'id' + NAME = 'name' + EMAIL_ADDRESS = 'emailAddress' + CREDIT_CARD = 'creditCard' + CITY = "city" + STATE = "state" + DATE_TIME = "dateTime" + EXTRA = "extra" + ITEM_NAME = "itemName" + DESCRIPTION = "description" + INITIAL_BID = "initialBid" + RESERVE = "reserve" + EXPIRES = "expires" + SELLER = "seller" + CATEGORY = "category" + AUCTION = "auction" + BIDDER = "bidder" + PRICE = "price" diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 14d4f974ca4d..94a98cdd0a26 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -44,7 +44,7 @@ def __init__( self.extra = extra def __repr__(self): - return json.dumps(self.__dict__) + return json.dumps(self.__dict__, separators=(',', ':')) class Auction(object): @@ -74,7 +74,7 @@ def __init__( self.extra = extra def __repr__(self): - return json.dumps(self.__dict__) + return json.dumps(self.__dict__, separators=(',', ':')) class Bid(object): @@ -88,4 +88,4 @@ def __init__(self, auction, bidder, price, date_time, extra=None): self.extra = extra def __repr__(self): - return json.dumps(self.__dict__) + return json.dumps(self.__dict__, separators=(',', ':')) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 8da5eecccaf6..f2b2fbd33084 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -124,49 +124,53 @@ class ParseJsonEvnetFn(beam.DoFn): {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\ "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\ - "expires":1528098840451,"seller":1000,"category":13,"extra":"zcurskupiz"} + "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"} {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\ "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"} """ def process(self, elem): json_dict = json.loads(elem) - if type(json_dict[FieldName.dateTime]) is dict: - json_dict[FieldName.dateTime] = json_dict[FieldName.dateTime]['millis'] - if FieldName.name in json_dict: - yield nexmark_model.Person(json_dict[FieldName.id], - json_dict[FieldName.name], - json_dict[FieldName.emailAddress], - json_dict[FieldName.creditCard], - json_dict[FieldName.city], - json_dict[FieldName.state], - json_dict[FieldName.dateTime], - json_dict[FieldName.extra]) - elif FieldName.itemName in json_dict: - yield nexmark_model.Auction(json_dict[FieldName.id], - json_dict[FieldName.itemName], - json_dict[FieldName.description], - json_dict[FieldName.initialBid], - json_dict[FieldName.reserve], - json_dict[FieldName.dateTime], - json_dict[FieldName.expires], - json_dict[FieldName.seller], - json_dict[FieldName.category], - json_dict[FieldName.extra]) - elif FieldName.auction in json_dict: - yield nexmark_model.Bid(json_dict[FieldName.auction], - json_dict[FieldName.bidder], - json_dict[FieldName.price], - json_dict[FieldName.dateTime], - json_dict[FieldName.extra]) + if type(json_dict[FieldName.DATE_TIME]) is dict: + json_dict[FieldName.DATE_TIME] = json_dict[FieldName.DATE_TIME]['millis'] + if FieldName.NAME in json_dict: + yield nexmark_model.Person( + json_dict[FieldName.ID], + json_dict[FieldName.NAME], + json_dict[FieldName.EMAIL_ADDRESS], + json_dict[FieldName.CREDIT_CARD], + json_dict[FieldName.CITY], + json_dict[FieldName.STATE], + json_dict[FieldName.DATE_TIME], + json_dict[FieldName.EXTRA]) + elif FieldName.ITEM_NAME in json_dict: + yield nexmark_model.Auction( + json_dict[FieldName.ID], + json_dict[FieldName.ITEM_NAME], + json_dict[FieldName.DESCRIPTION], + json_dict[FieldName.INITIAL_BID], + json_dict[FieldName.RESERVE], + json_dict[FieldName.DATE_TIME], + json_dict[FieldName.EXPIRES], + json_dict[FieldName.SELLER], + json_dict[FieldName.CATEGORY], + json_dict[FieldName.EXTRA]) + elif FieldName.AUCTION in json_dict: + yield nexmark_model.Bid( + json_dict[FieldName.AUCTION], + json_dict[FieldName.BIDDER], + json_dict[FieldName.PRICE], + json_dict[FieldName.DATE_TIME], + json_dict[FieldName.EXTRA]) else: raise ValueError('Invalid event: %s.' % str(json_dict)) class CountAndLog(beam.PTransform): def expand(self, pcoll): - return (pcoll | "Count" >> beam.combiners.Count.Globally() - | "Log" >> beam.Map(log_count_info)) + return ( + pcoll | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) def log_count_info(count): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index 8724d81a33d9..f07fd8eaccfc 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -17,7 +17,6 @@ """Utilities for working with NEXmark data stream.""" -import logging import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model @@ -27,6 +26,11 @@ def is_bid(event): return isinstance(event, nexmark_model.Bid) +class ToString(beam.PTransform): + def expand(self, pcoll): + return pcoll | beam.Map(repr) + + class JustBids(beam.PTransform): def expand(self, pcoll): return pcoll | "IsBid" >> beam.Filter(is_bid) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index 2245a78485ce..918926def747 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -31,8 +31,10 @@ import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util def load(events, query_args=None): - return (events | 'serialization' >> beam.Map(repr) - | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) + return ( + events | 'serialization' >> nexmark_query_util.ToString() + | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py index a104aa0c4857..edd6ec809c25 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py @@ -37,12 +37,7 @@ def load(events, metadata=None): return ( events | nexmark_query_util.JustBids() - | beam.Filter(lambda bid: bid.auction % metadata.get('auction_skip') == 0) - | 'project' >> beam.Map( - lambda bid: auction_price.AuctionPrice( - bid.auction, bid.price))) - # | 'FilterInAuctionsWithSelectedId' >> beam.Filter( - # lambda event: ( - # isinstance(event, nexmark_model.Auction) and event.id == metadata. - # get('auction_id'))) - # | 'DisplayQuery2' >> beam.Map(display)) # pylint: disable=expression-not-assigned + | 'filter_by_skip' >> + beam.Filter(lambda bid: bid.auction % metadata.get('auction_skip') == 0) + | 'project' >> + beam.Map(lambda bid: auction_price.AuctionPrice(bid.auction, bid.price))) From 5b5f6ce6af834e1832b315661d162b8b2d51370f Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Tue, 28 Jul 2020 07:54:17 +0000 Subject: [PATCH 3/9] refactoring code to have a better code practice --- .../nexmark/models/auction_price.py | 4 +- .../benchmarks/nexmark/models/field_name.py | 2 +- .../nexmark/models/nexmark_model.py | 8 +-- .../benchmarks/nexmark/nexmark_util.py | 71 +++++++++++-------- .../nexmark/queries/nexmark_query_util.py | 9 ++- .../benchmarks/nexmark/queries/query0.py | 3 +- 6 files changed, 54 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py index 7cb60ec1bd54..851e7c0a559f 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -16,7 +16,7 @@ # """Result of Query2.""" -import json +from apache_beam.testing.benchmarks.nexmark import nexmark_util class AuctionPrice(object): @@ -25,4 +25,4 @@ def __init__(self, auction, price): self.price = price def __repr__(self): - return json.dumps(self.__dict__, separators=(',', ':')) + return nexmark_util.model_to_json(self) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py index c564d8689cc7..5d583bca27f8 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py @@ -19,7 +19,7 @@ """ -class FieldName: +class FieldNames: ID = 'id' NAME = 'name' EMAIL_ADDRESS = 'emailAddress' diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 94a98cdd0a26..9a5048301177 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,7 +26,7 @@ - The bid on an item for auction (Bid). """ -import json +from apache_beam.testing.benchmarks.nexmark import nexmark_util class Person(object): @@ -44,7 +44,7 @@ def __init__( self.extra = extra def __repr__(self): - return json.dumps(self.__dict__, separators=(',', ':')) + return nexmark_util.model_to_json(self) class Auction(object): @@ -74,7 +74,7 @@ def __init__( self.extra = extra def __repr__(self): - return json.dumps(self.__dict__, separators=(',', ':')) + return nexmark_util.model_to_json(self) class Bid(object): @@ -88,4 +88,4 @@ def __init__(self, auction, bidder, price, date_time, extra=None): self.extra = extra def __repr__(self): - return json.dumps(self.__dict__, separators=(',', ':')) + return nexmark_util.model_to_json(self) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index f2b2fbd33084..26529818fa4d 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -42,8 +42,9 @@ import json import apache_beam as beam +from apache_beam.utils.timestamp import Timestamp from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldName +from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames _LOGGER = logging.getLogger(__name__) @@ -131,37 +132,39 @@ class ParseJsonEvnetFn(beam.DoFn): """ def process(self, elem): json_dict = json.loads(elem) - if type(json_dict[FieldName.DATE_TIME]) is dict: - json_dict[FieldName.DATE_TIME] = json_dict[FieldName.DATE_TIME]['millis'] - if FieldName.NAME in json_dict: + if type(json_dict[FieldNames.DATE_TIME]) is dict: + json_dict[FieldNames.DATE_TIME] = json_dict[FieldNames.DATE_TIME]['millis'] + if FieldNames.NAME in json_dict: yield nexmark_model.Person( - json_dict[FieldName.ID], - json_dict[FieldName.NAME], - json_dict[FieldName.EMAIL_ADDRESS], - json_dict[FieldName.CREDIT_CARD], - json_dict[FieldName.CITY], - json_dict[FieldName.STATE], - json_dict[FieldName.DATE_TIME], - json_dict[FieldName.EXTRA]) - elif FieldName.ITEM_NAME in json_dict: + json_dict[FieldNames.ID], + json_dict[FieldNames.NAME], + json_dict[FieldNames.EMAIL_ADDRESS], + json_dict[FieldNames.CREDIT_CARD], + json_dict[FieldNames.CITY], + json_dict[FieldNames.STATE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) + elif FieldNames.ITEM_NAME in json_dict: + if type(json_dict[FieldNames.EXPIRES]) is dict: + json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis'] yield nexmark_model.Auction( - json_dict[FieldName.ID], - json_dict[FieldName.ITEM_NAME], - json_dict[FieldName.DESCRIPTION], - json_dict[FieldName.INITIAL_BID], - json_dict[FieldName.RESERVE], - json_dict[FieldName.DATE_TIME], - json_dict[FieldName.EXPIRES], - json_dict[FieldName.SELLER], - json_dict[FieldName.CATEGORY], - json_dict[FieldName.EXTRA]) - elif FieldName.AUCTION in json_dict: + json_dict[FieldNames.ID], + json_dict[FieldNames.ITEM_NAME], + json_dict[FieldNames.DESCRIPTION], + json_dict[FieldNames.INITIAL_BID], + json_dict[FieldNames.RESERVE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + millis_to_timestamp(json_dict[FieldNames.EXPIRES]), + json_dict[FieldNames.SELLER], + json_dict[FieldNames.CATEGORY], + json_dict[FieldNames.EXTRA]) + elif FieldNames.AUCTION in json_dict: yield nexmark_model.Bid( - json_dict[FieldName.AUCTION], - json_dict[FieldName.BIDDER], - json_dict[FieldName.PRICE], - json_dict[FieldName.DATE_TIME], - json_dict[FieldName.EXTRA]) + json_dict[FieldNames.AUCTION], + json_dict[FieldNames.BIDDER], + json_dict[FieldNames.PRICE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) else: raise ValueError('Invalid event: %s.' % str(json_dict)) @@ -181,3 +184,13 @@ def log_count_info(count): def display(elm): logging.debug(elm) return elm + + +def model_to_json(model): + return json.dumps(model.__dict__, separators=(',', ':')) + + +def millis_to_timestamp(millis: int): + second = millis // 1000 + micro_second = millis % 1000 * 1000 + return Timestamp(second, micro_second) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index f07fd8eaccfc..dece9b04b5f6 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -26,11 +26,10 @@ def is_bid(event): return isinstance(event, nexmark_model.Bid) -class ToString(beam.PTransform): - def expand(self, pcoll): - return pcoll | beam.Map(repr) - - class JustBids(beam.PTransform): def expand(self, pcoll): return pcoll | "IsBid" >> beam.Filter(is_bid) + + +def auction_or_bid(event): + return isinstance(event, nexmark_model.Bid) or isinstance(event, nexmark_model.Auction) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index 918926def747..8df56069f3c1 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -31,10 +31,9 @@ import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn -from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util def load(events, query_args=None): return ( - events | 'serialization' >> nexmark_query_util.ToString() + events | 'serialization' >> beam.Map(repr) | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) From d6002dc3a73874088326f45a7cb084b795fa037d Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Tue, 28 Jul 2020 22:27:01 +0000 Subject: [PATCH 4/9] implemented coder for all models, query9 --- .../benchmarks/nexmark/models/auction_bid.py | 56 ++++++ .../nexmark/models/auction_price.py | 27 +++ .../nexmark/models/nexmark_model.py | 114 ++++++++++++ .../benchmarks/nexmark/nexmark_util.py | 18 +- .../nexmark/queries/nexmark_query_util.py | 24 +++ .../benchmarks/nexmark/queries/query9.py | 27 +++ .../nexmark/queries/winning_bids.py | 172 ++++++++++++++++++ 7 files changed, 435 insertions(+), 3 deletions(-) create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py create mode 100644 sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py new file mode 100644 index 000000000000..59bc9b29c3a6 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Result of WinningBid transform.""" +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.testing.benchmarks.nexmark import nexmark_util +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model + + +class AuctionBidCoder(FastCoder): + def _create_impl(self): + return AuctionBidCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + +class AuctionBid(object): + CODER = AuctionBidCoder() + + def __init__(self, auction, bid): + self.auction = auction + self.bid = bid + + def __repr__(self): + return nexmark_util.model_to_json(self) + + +class AuctionBidCoderImpl(coder_impl.StreamCoderImpl): + _auction_coder_impl = nexmark_model.AuctionCoderImpl() + _bid_coder_Impl = nexmark_model.BidCoderImpl() + + def encode_to_stream(self, value: AuctionBid, stream, nested): + self._auction_coder_impl.encode_to_stream(value.auction, stream, True) + self._bid_coder_Impl.encode_to_stream(value.bid, stream, True) + + def decode_from_stream(self, stream, nested): + auction = self._auction_coder_impl.decode_from_stream(stream, True) + bid = self._bid_coder_Impl.decode_from_stream(stream, True) + return AuctionBid(auction, bid) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py index 851e7c0a559f..7f081be92591 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -16,13 +16,40 @@ # """Result of Query2.""" +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder from apache_beam.testing.benchmarks.nexmark import nexmark_util +class AuctionPriceCoder(FastCoder): + def _create_impl(self): + return AuctionPriceCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + class AuctionPrice(object): + CODER = AuctionPriceCoder() + def __init__(self, auction, price): self.auction = auction self.price = price def __repr__(self): return nexmark_util.model_to_json(self) + + +class AuctionPriceCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + + def encode_to_stream(self, value: AuctionPrice, stream, nested): + self._int_coder_impl.encode_to_stream(value.auction, stream, True) + self._int_coder_impl.encode_to_stream(value.price, stream, True) + + def decode_from_stream(self, stream, nested): + + auction = self._int_coder_impl.decode_from_stream(stream, True) + price = self._int_coder_impl.decode_from_stream(stream, True) + return AuctionPrice(auction, price) \ No newline at end of file diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 9a5048301177..44bd19a23ac1 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,11 +26,23 @@ - The bid on an item for auction (Bid). """ +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder from apache_beam.testing.benchmarks.nexmark import nexmark_util +class PersonCoder(FastCoder): + def _create_impl(self): + return PersonCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + class Person(object): "Author of an auction or a bid." + CODER = PersonCoder() def __init__( self, id, name, email, credit_card, city, state, date_time, extra=None): @@ -47,8 +59,21 @@ def __repr__(self): return nexmark_util.model_to_json(self) +class AuctionCoder(FastCoder): + def to_type_hint(self): + pass + + def _create_impl(self): + return AuctionCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + class Auction(object): "Item for auction." + CODER = AuctionCoder() def __init__( self, @@ -77,8 +102,18 @@ def __repr__(self): return nexmark_util.model_to_json(self) +class BidCoder(FastCoder): + def _create_impl(self): + return BidCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + class Bid(object): "A bid for an item for auction." + CODER = BidCoder() def __init__(self, auction, bidder, price, date_time, extra=None): self.auction = auction # key @@ -89,3 +124,82 @@ def __init__(self, auction, bidder, price, date_time, extra=None): def __repr__(self): return nexmark_util.model_to_json(self) + + +class AuctionCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = coder_impl.BytesCoderImpl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value: Auction, stream, nested): + self._int_coder_impl.encode_to_stream(value.id, stream, True) + self._str_coder_impl.encode_to_stream(value.itemName, stream, True) + self._str_coder_impl.encode_to_stream(value.description, stream, True) + self._int_coder_impl.encode_to_stream(value.initialBid, stream, True) + self._int_coder_impl.encode_to_stream(value.reserve, stream, True) + self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._time_coder_impl.encode_to_stream(value.expires, stream, True) + self._int_coder_impl.encode_to_stream(value.seller, stream, True) + self._int_coder_impl.encode_to_stream(value.category, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + id = self._int_coder_impl.decode_from_stream(stream, True) + item_name = self._str_coder_impl.decode_from_stream(stream, True) + description = self._str_coder_impl.decode_from_stream(stream, True) + initial_bid = self._int_coder_impl.decode_from_stream(stream, True) + reserve = self._int_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + expires = self._time_coder_impl.decode_from_stream(stream, True) + seller = self._int_coder_impl.decode_from_stream(stream, True) + category = self._int_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Auction(id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra) + + +class BidCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = coder_impl.BytesCoderImpl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value: Bid, stream, nested): + self._int_coder_impl.encode_to_stream(value.auction, stream, True) + self._int_coder_impl.encode_to_stream(value.bidder, stream, True) + self._int_coder_impl.encode_to_stream(value.price, stream, True) + self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + auction = self._int_coder_impl.decode_from_stream(stream, True) + bidder = self._int_coder_impl.decode_from_stream(stream, True) + price = self._int_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Bid(auction, bidder, price, date_time, extra) + + +class PersonCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = coder_impl.BytesCoderImpl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value: Person, stream, nested): + self._int_coder_impl.encode_to_stream(value.id, stream, True) + self._str_coder_impl.encode_to_stream(value.name, stream, True) + self._str_coder_impl.encode_to_stream(value.emailAddress, stream, True) + self._str_coder_impl.encode_to_stream(value.creditCard, stream, True) + self._str_coder_impl.encode_to_stream(value.city, stream, True) + self._str_coder_impl.encode_to_stream(value.state, stream, True) + self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + id = self._int_coder_impl.decode_from_stream(stream, True) + name = self._str_coder_impl.decode_from_stream(stream, True) + email = self._str_coder_impl.decode_from_stream(stream, True) + credit_card = self._str_coder_impl.decode_from_stream(stream, True) + city = self._str_coder_impl.decode_from_stream(stream, True) + state = self._str_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Person(id, name, email, credit_card, city, state, date_time, extra) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 26529818fa4d..8758e165c9e0 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -42,8 +42,11 @@ import json import apache_beam as beam +from apache_beam.transforms import window from apache_beam.utils.timestamp import Timestamp from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.models import auction_price from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames _LOGGER = logging.getLogger(__name__) @@ -71,6 +74,14 @@ def thread_target(): thread.join(timeout) +def setup_coder(): + beam.coders.registry.register_coder(nexmark_model.Auction, nexmark_model.AuctionCoder) + beam.coders.registry.register_coder(nexmark_model.Person, nexmark_model.PersonCoder) + beam.coders.registry.register_coder(nexmark_model.Bid, nexmark_model.BidCoder) + beam.coders.registry.register_coder(auction_bid.AuctionBid, auction_bid.AuctionBidCoder) + beam.coders.registry.register_coder(auction_price.AuctionPrice, auction_price.AuctionPriceCoder) + + class ParseEventFn(beam.DoFn): """Parses the raw event info into a Python objects. @@ -171,9 +182,10 @@ def process(self, elem): class CountAndLog(beam.PTransform): def expand(self, pcoll): - return ( - pcoll | "Count" >> beam.combiners.Count.Globally() - | "Log" >> beam.Map(log_count_info)) + return (pcoll + | 'window' >> beam.WindowInto(window.GlobalWindows()) + | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) def log_count_info(count): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index dece9b04b5f6..1c8a4a0ff3ff 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -22,14 +22,38 @@ from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +AUCTION_TAG = 'auctions' +BID_TAG = 'bids' +PERSON_TAG = 'person' + + def is_bid(event): return isinstance(event, nexmark_model.Bid) +def is_auction(event): + return isinstance(event, nexmark_model.Auction) + + class JustBids(beam.PTransform): def expand(self, pcoll): return pcoll | "IsBid" >> beam.Filter(is_bid) +class JustAuctions(beam.PTransform): + def expand(self, pcoll): + return pcoll | "IsAuction" >> beam.Filter(is_auction) + + +class AuctionByIdFn(beam.DoFn): + def process(self, element): + yield element.id, element + + +class BidByAuctionIdFn(beam.DoFn): + def process(self, element): + yield element.auction, element + + def auction_or_bid(event): return isinstance(event, nexmark_model.Bid) or isinstance(event, nexmark_model.Auction) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py new file mode 100644 index 000000000000..4e97825744c1 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +from apache_beam.testing.benchmarks.nexmark.queries import winning_bids + +def load(events, metadata=None): + return (events + | beam.Filter(nexmark_query_util.auction_or_bid) + | winning_bids.WinningBids()) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py new file mode 100644 index 000000000000..0c1001ff8508 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -0,0 +1,172 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.transforms.window import WindowFn +from apache_beam.transforms.window import IntervalWindow +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util + + +class AuctionOrBidWindow(IntervalWindow): + """Windows for open auctions and bids.""" + def __init__(self, start, end, auction_id, is_auction_window): + super(AuctionOrBidWindow, self).__init__(start, end) + self.auction = auction_id + self.is_auction_window = is_auction_window + + @staticmethod + def for_auction(timestamp, auction: nexmark_model.Auction): + return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True) + + @staticmethod + def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid): + return AuctionOrBidWindow(timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) + + def is_auction_window(self): + return self.is_auction_window + + def __str__(self): + return ('AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' + % (self.start, self.end, self.auction, self.is_auction_window)) + + +class AuctionOrBidWindowCoder(FastCoder): + + def _create_impl(self): + return AuctionOrBidWindowCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + +class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl): + _super_coder_impl = coder_impl.IntervalWindowCoderImpl() + _id_coder_impl = coder_impl.VarIntCoderImpl() + _bool_coder_impl = coder_impl.BooleanCoderImpl() + + def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested): + self._super_coder_impl.encode_to_stream(value, stream, True) + self._id_coder_impl.encode_to_stream(value.auction, stream, True) + self._bool_coder_impl.encode_to_stream(value.is_auction_window, stream, True) + + def decode_from_stream(self, stream, nested): + super_window = self._super_coder_impl.decode_from_stream(stream, True) + auction = self._id_coder_impl.decode_from_stream(stream, True) + is_auction = self._bool_coder_impl.decode_from_stream(stream, True) + return AuctionOrBidWindow(super_window.start, super_window.end, auction, is_auction) + + +class AuctionOrBidWindowFn(WindowFn): + def __init__(self, expected_duration_micro): + self.expected_duration = expected_duration_micro + + def assign(self, assign_context): + event = assign_context.element + if isinstance(event, nexmark_model.Auction): + return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)] + elif isinstance(event, nexmark_model.Bid): + return [AuctionOrBidWindow.for_bid(self.expected_duration, assign_context.timestamp, event)] + else: + raise ValueError('%s can only assign windows to auctions and bids, but received %s' % (self.__class__.__name__, event)) + + def merge(self, merge_context): + id_to_auction = {} + id_to_bid = {} + for window in merge_context.windows: + if window.is_auction_window(): + id_to_auction[window.auction] = window + else: + if window.auction in id_to_bid: + bid_windows = id_to_bid[window.auction] + else: + bid_windows = [] + id_to_bid[window.auction] = bid_windows + bid_windows.append(window) + + for auction, auction_window in id_to_auction.items(): + bid_window = id_to_bid.get(auction) + if bid_window is not None: + to_merge = [] + for bid in bid_window: + if bid.start < auction_window.end: + to_merge.append(bid) + if len(to_merge) > 0: + to_merge.append(auction_window) + merge_context.merge(to_merge, auction_window) + + def get_window_coder(self): + return AuctionOrBidWindowCoder() + + def get_transformed_output_time(self, window, input_timestamp): + return window.max_timestamp() + + +class JoinAuctionBidFn(beam.DoFn): + @staticmethod + def higher_bid(bid, other): + if bid.price > other.price: + return True + elif bid.price < other.price: + return False + else: + return bid.dateTime < other.dateTime + + def process(self, element): + auction_id, group = element + auctions = group[nexmark_query_util.AUCTION_TAG] + auction = auctions[0] if auctions else None + if auction is None: + return + best_bid = None + for bid in group[nexmark_query_util.BID_TAG]: + if bid.price < auction.reserve: + continue + if best_bid is None or JoinAuctionBidFn.higher_bid(bid, best_bid): + best_bid = bid + if best_bid is None: + return + yield auction_bid.AuctionBid(auction, best_bid) + + + +class WinningBids(beam.PTransform): + def __init__(self): + expected_duration = 16667000 #TODO: change this to be calculated by event generation + self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration) + + + def expand(self, pcoll): + events = pcoll | beam.WindowInto(self.auction_or_bid_windowFn) + + auction_by_id = (events + | nexmark_query_util.JustAuctions() + | 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn())) + bids_by_auction_id = (events + | nexmark_query_util.JustBids() + | 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn())) + + return ({nexmark_query_util.AUCTION_TAG: auction_by_id, + nexmark_query_util.BID_TAG: bids_by_auction_id} + | beam.CoGroupByKey() + | beam.ParDo(JoinAuctionBidFn())) From 1fbf968b80f96f77dbcef4f9246e1b1e34f941bc Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Tue, 28 Jul 2020 23:59:11 +0000 Subject: [PATCH 5/9] yapf style, implemented query0 to use the coder --- .../nexmark/models/nexmark_model.py | 19 +++++++--- .../benchmarks/nexmark/nexmark_util.py | 35 +++++++++++++------ .../nexmark/queries/nexmark_query_util.py | 5 ++- .../benchmarks/nexmark/queries/query0.py | 12 +++++-- 4 files changed, 52 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 44bd19a23ac1..921ba54d5a5c 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -28,6 +28,7 @@ """ from apache_beam.coders import coder_impl from apache_beam.coders.coders import FastCoder +from apache_beam.coders.coders import StrUtf8Coder from apache_beam.testing.benchmarks.nexmark import nexmark_util @@ -128,7 +129,7 @@ def __repr__(self): class AuctionCoderImpl(coder_impl.StreamCoderImpl): _int_coder_impl = coder_impl.VarIntCoderImpl() - _str_coder_impl = coder_impl.BytesCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() _time_coder_impl = coder_impl.TimestampCoderImpl() def encode_to_stream(self, value: Auction, stream, nested): @@ -154,12 +155,22 @@ def decode_from_stream(self, stream, nested): seller = self._int_coder_impl.decode_from_stream(stream, True) category = self._int_coder_impl.decode_from_stream(stream, True) extra = self._str_coder_impl.decode_from_stream(stream, True) - return Auction(id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra) + return Auction( + id, + item_name, + description, + initial_bid, + reserve, + date_time, + expires, + seller, + category, + extra) class BidCoderImpl(coder_impl.StreamCoderImpl): _int_coder_impl = coder_impl.VarIntCoderImpl() - _str_coder_impl = coder_impl.BytesCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() _time_coder_impl = coder_impl.TimestampCoderImpl() def encode_to_stream(self, value: Bid, stream, nested): @@ -180,7 +191,7 @@ def decode_from_stream(self, stream, nested): class PersonCoderImpl(coder_impl.StreamCoderImpl): _int_coder_impl = coder_impl.VarIntCoderImpl() - _str_coder_impl = coder_impl.BytesCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() _time_coder_impl = coder_impl.TimestampCoderImpl() def encode_to_stream(self, value: Person, stream, nested): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 8758e165c9e0..e582dcc39ebc 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -75,11 +75,15 @@ def thread_target(): def setup_coder(): - beam.coders.registry.register_coder(nexmark_model.Auction, nexmark_model.AuctionCoder) - beam.coders.registry.register_coder(nexmark_model.Person, nexmark_model.PersonCoder) + beam.coders.registry.register_coder( + nexmark_model.Auction, nexmark_model.AuctionCoder) + beam.coders.registry.register_coder( + nexmark_model.Person, nexmark_model.PersonCoder) beam.coders.registry.register_coder(nexmark_model.Bid, nexmark_model.BidCoder) - beam.coders.registry.register_coder(auction_bid.AuctionBid, auction_bid.AuctionBidCoder) - beam.coders.registry.register_coder(auction_price.AuctionPrice, auction_price.AuctionPriceCoder) + beam.coders.registry.register_coder( + auction_bid.AuctionBid, auction_bid.AuctionBidCoder) + beam.coders.registry.register_coder( + auction_price.AuctionPrice, auction_price.AuctionPriceCoder) class ParseEventFn(beam.DoFn): @@ -144,7 +148,8 @@ class ParseJsonEvnetFn(beam.DoFn): def process(self, elem): json_dict = json.loads(elem) if type(json_dict[FieldNames.DATE_TIME]) is dict: - json_dict[FieldNames.DATE_TIME] = json_dict[FieldNames.DATE_TIME]['millis'] + json_dict[FieldNames.DATE_TIME] = json_dict[ + FieldNames.DATE_TIME]['millis'] if FieldNames.NAME in json_dict: yield nexmark_model.Person( json_dict[FieldNames.ID], @@ -182,10 +187,11 @@ def process(self, elem): class CountAndLog(beam.PTransform): def expand(self, pcoll): - return (pcoll - | 'window' >> beam.WindowInto(window.GlobalWindows()) - | "Count" >> beam.combiners.Count.Globally() - | "Log" >> beam.Map(log_count_info)) + return ( + pcoll + | 'window' >> beam.WindowInto(window.GlobalWindows()) + | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) def log_count_info(count): @@ -199,7 +205,16 @@ def display(elm): def model_to_json(model): - return json.dumps(model.__dict__, separators=(',', ':')) + return json.dumps({k: timestamp_to_int(v) + for k, v in model.__dict__.items()}, + separators=(',', ':')) + + +def timestamp_to_int(cand): + if isinstance(cand, Timestamp): + return cand.micros // 1000 + else: + return cand def millis_to_timestamp(millis: int): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index 1c8a4a0ff3ff..f23d93275526 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -17,11 +17,9 @@ """Utilities for working with NEXmark data stream.""" - import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model - AUCTION_TAG = 'auctions' BID_TAG = 'bids' PERSON_TAG = 'person' @@ -56,4 +54,5 @@ def process(self, element): def auction_or_bid(event): - return isinstance(event, nexmark_model.Bid) or isinstance(event, nexmark_model.Auction) + return isinstance(event, nexmark_model.Bid) or isinstance( + event, nexmark_model.Auction) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index 8df56069f3c1..27dc34ed9b27 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -33,7 +33,15 @@ from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn +class round_tripFn(beam.DoFn): + def process(self, element): + coder = element.CODER + byte_value = coder.encode(element) + recon = coder.decode(byte_value) + yield recon + + def load(events, query_args=None): return ( - events | 'serialization' >> beam.Map(repr) - | 'deserialization' >> beam.ParDo(ParseJsonEvnetFn())) + events + | 'serialization_and_deserialization' >> beam.ParDo(round_tripFn())) From 37add17a70ce74955a9b2ef24c50fcc84ac5de07 Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Wed, 29 Jul 2020 00:01:11 +0000 Subject: [PATCH 6/9] yapf style change --- .../benchmarks/nexmark/queries/query9.py | 8 +-- .../nexmark/queries/winning_bids.py | 50 +++++++++++-------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py index 4e97825744c1..7ac6d01689db 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py @@ -21,7 +21,9 @@ from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util from apache_beam.testing.benchmarks.nexmark.queries import winning_bids + def load(events, metadata=None): - return (events - | beam.Filter(nexmark_query_util.auction_or_bid) - | winning_bids.WinningBids()) + return ( + events + | beam.Filter(nexmark_query_util.auction_or_bid) + | winning_bids.WinningBids()) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index 0c1001ff8508..5dac0be72b75 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -40,18 +40,19 @@ def for_auction(timestamp, auction: nexmark_model.Auction): @staticmethod def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid): - return AuctionOrBidWindow(timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) + return AuctionOrBidWindow( + timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) def is_auction_window(self): return self.is_auction_window def __str__(self): - return ('AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' - % (self.start, self.end, self.auction, self.is_auction_window)) + return ( + 'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' % + (self.start, self.end, self.auction, self.is_auction_window)) class AuctionOrBidWindowCoder(FastCoder): - def _create_impl(self): return AuctionOrBidWindowCoderImpl() @@ -68,13 +69,15 @@ class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl): def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested): self._super_coder_impl.encode_to_stream(value, stream, True) self._id_coder_impl.encode_to_stream(value.auction, stream, True) - self._bool_coder_impl.encode_to_stream(value.is_auction_window, stream, True) + self._bool_coder_impl.encode_to_stream( + value.is_auction_window, stream, True) def decode_from_stream(self, stream, nested): super_window = self._super_coder_impl.decode_from_stream(stream, True) auction = self._id_coder_impl.decode_from_stream(stream, True) is_auction = self._bool_coder_impl.decode_from_stream(stream, True) - return AuctionOrBidWindow(super_window.start, super_window.end, auction, is_auction) + return AuctionOrBidWindow( + super_window.start, super_window.end, auction, is_auction) class AuctionOrBidWindowFn(WindowFn): @@ -86,9 +89,14 @@ def assign(self, assign_context): if isinstance(event, nexmark_model.Auction): return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)] elif isinstance(event, nexmark_model.Bid): - return [AuctionOrBidWindow.for_bid(self.expected_duration, assign_context.timestamp, event)] + return [ + AuctionOrBidWindow.for_bid( + self.expected_duration, assign_context.timestamp, event) + ] else: - raise ValueError('%s can only assign windows to auctions and bids, but received %s' % (self.__class__.__name__, event)) + raise ValueError( + '%s can only assign windows to auctions and bids, but received %s' % + (self.__class__.__name__, event)) def merge(self, merge_context): id_to_auction = {} @@ -149,24 +157,26 @@ def process(self, element): yield auction_bid.AuctionBid(auction, best_bid) - class WinningBids(beam.PTransform): def __init__(self): - expected_duration = 16667000 #TODO: change this to be calculated by event generation + expected_duration = 16667000 #TODO: change this to be calculated by event generation self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration) - def expand(self, pcoll): events = pcoll | beam.WindowInto(self.auction_or_bid_windowFn) - auction_by_id = (events - | nexmark_query_util.JustAuctions() - | 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn())) - bids_by_auction_id = (events - | nexmark_query_util.JustBids() - | 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn())) - - return ({nexmark_query_util.AUCTION_TAG: auction_by_id, - nexmark_query_util.BID_TAG: bids_by_auction_id} + auction_by_id = ( + events + | nexmark_query_util.JustAuctions() + | 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn())) + bids_by_auction_id = ( + events + | nexmark_query_util.JustBids() + | 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn())) + + return ({ + nexmark_query_util.AUCTION_TAG: auction_by_id, + nexmark_query_util.BID_TAG: bids_by_auction_id + } | beam.CoGroupByKey() | beam.ParDo(JoinAuctionBidFn())) From 1110ec9fe56f6f1a0233647bd64277da43cb5023 Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Wed, 29 Jul 2020 23:53:26 +0000 Subject: [PATCH 7/9] naming changes --- .../testing/benchmarks/nexmark/nexmark_util.py | 13 +++++++++---- .../benchmarks/nexmark/queries/winning_bids.py | 14 +++++++------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index e582dcc39ebc..9a3b7ae1ec8b 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -205,14 +205,19 @@ def display(elm): def model_to_json(model): - return json.dumps({k: timestamp_to_int(v) - for k, v in model.__dict__.items()}, - separators=(',', ':')) + return json.dumps(construct_json_dict(model), separators=(',', ':')) -def timestamp_to_int(cand): +def construct_json_dict(model): + return {k: unnest_to_json(v) for k, v in model.__dict__.items()} + + +def unnest_to_json(cand): if isinstance(cand, Timestamp): return cand.micros // 1000 + elif isinstance( + cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): + return construct_json_dict(cand) else: return cand diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index 5dac0be72b75..8aef9978187b 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -43,7 +43,7 @@ def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid): return AuctionOrBidWindow( timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) - def is_auction_window(self): + def is_auction_window_fn(self): return self.is_auction_window def __str__(self): @@ -102,7 +102,7 @@ def merge(self, merge_context): id_to_auction = {} id_to_bid = {} for window in merge_context.windows: - if window.is_auction_window(): + if window.is_auction_window_fn(): id_to_auction[window.auction] = window else: if window.auction in id_to_bid: @@ -113,12 +113,12 @@ def merge(self, merge_context): bid_windows.append(window) for auction, auction_window in id_to_auction.items(): - bid_window = id_to_bid.get(auction) - if bid_window is not None: + bid_window_list = id_to_bid.get(auction) + if bid_window_list is not None: to_merge = [] - for bid in bid_window: - if bid.start < auction_window.end: - to_merge.append(bid) + for bid_window in bid_window_list: + if bid_window.start < auction_window.end: + to_merge.append(bid_window) if len(to_merge) > 0: to_merge.append(auction_window) merge_context.merge(to_merge, auction_window) From 0129cd6b4cddc3e263f22f627f004068c99cf87b Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Thu, 30 Jul 2020 00:01:49 +0000 Subject: [PATCH 8/9] nexmark launcher deserialization, timestamping and result counting and printing to file --- .../benchmarks/nexmark/nexmark_launcher.py | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py index 79032ad483b0..d7332d67fba5 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py @@ -67,6 +67,7 @@ import sys import uuid +from apache_beam.transforms import window from google.cloud import pubsub import apache_beam as beam @@ -75,10 +76,12 @@ from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TestOptions +from apache_beam.testing.benchmarks.nexmark import nexmark_util from apache_beam.testing.benchmarks.nexmark.nexmark_util import Command from apache_beam.testing.benchmarks.nexmark.queries import query0 from apache_beam.testing.benchmarks.nexmark.queries import query1 from apache_beam.testing.benchmarks.nexmark.queries import query2 +from apache_beam.testing.benchmarks.nexmark.queries import query9 class NexmarkLauncher(object): @@ -110,7 +113,7 @@ def parse_args(self): type=int, action='append', required=True, - choices=[0, 1, 2], + choices=[0, 1, 2, 9], help='Query to run') parser.add_argument( @@ -193,15 +196,27 @@ def generate_events(self): else: raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub( topic=topic.full_name) - + raw_events = ( + raw_events + | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn()) + | 'timestamping' >> + beam.Map(lambda e: window.TimestampedValue(e, e.dateTime))) return raw_events def run_query(self, query, query_args, query_errors): try: self.parse_args() self.pipeline = beam.Pipeline(options=self.pipeline_options) - raw_events = self.generate_events() - query.load(raw_events, query_args) + nexmark_util.setup_coder() + events = self.generate_events() + output = query.load(events, query_args) + # print results + ( + output | beam.Map(repr) + | beam.io.WriteToText( + "py-query-result", file_name_suffix='.json', num_shards=1)) + output | nexmark_util.CountAndLog() + # end of results output result = self.pipeline.run() job_duration = ( self.pipeline_options.view_as(TestOptions).wait_until_finish_duration) @@ -228,6 +243,7 @@ def run(self): 0: query0, 1: query1, 2: query2, # TODO(mariagh): Add more queries. + 9: query9 } # TODO(mariagh): Move to a config file. @@ -263,4 +279,4 @@ def run(self): if __name__ == '__main__': launcher = NexmarkLauncher() launcher.run() - launcher.cleanup() + # launcher.cleanup() From 7420cb489d6d06ea34752b257d45ade8b95ed36a Mon Sep 17 00:00:00 2001 From: Leiyi Zhang Date: Thu, 30 Jul 2020 19:11:21 +0000 Subject: [PATCH 9/9] pylint style changes --- .../testing/benchmarks/nexmark/models/auction_price.py | 2 +- .../testing/benchmarks/nexmark/queries/nexmark_query_util.py | 3 +-- .../apache_beam/testing/benchmarks/nexmark/queries/query0.py | 1 - .../testing/benchmarks/nexmark/queries/winning_bids.py | 5 +++-- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py index 7f081be92591..a9b41e877e0d 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -52,4 +52,4 @@ def decode_from_stream(self, stream, nested): auction = self._int_coder_impl.decode_from_stream(stream, True) price = self._int_coder_impl.decode_from_stream(stream, True) - return AuctionPrice(auction, price) \ No newline at end of file + return AuctionPrice(auction, price) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py index f23d93275526..e6e581a6ba6c 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -54,5 +54,4 @@ def process(self, element): def auction_or_bid(event): - return isinstance(event, nexmark_model.Bid) or isinstance( - event, nexmark_model.Auction) + return isinstance(event, (nexmark_model.Auction, nexmark_model.Bid)) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index 27dc34ed9b27..681c8cd4fc52 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -30,7 +30,6 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseJsonEvnetFn class round_tripFn(beam.DoFn): diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py index 8aef9978187b..7faa9b4403bd 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -141,7 +141,7 @@ def higher_bid(bid, other): return bid.dateTime < other.dateTime def process(self, element): - auction_id, group = element + _, group = element auctions = group[nexmark_query_util.AUCTION_TAG] auction = auctions[0] if auctions else None if auction is None: @@ -159,7 +159,8 @@ def process(self, element): class WinningBids(beam.PTransform): def __init__(self): - expected_duration = 16667000 #TODO: change this to be calculated by event generation + #TODO: change this to be calculated by event generation + expected_duration = 16667000 self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration) def expand(self, pcoll):