diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py new file mode 100644 index 000000000000..bcb959e9fc1e --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Result of WinningBid transform.""" +from __future__ import absolute_import + +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.testing.benchmarks.nexmark import nexmark_util +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model + + +class AuctionBidCoder(FastCoder): + def _create_impl(self): + return AuctionBidCoderImpl() + + def is_deterministic(self): + return True + + +class AuctionBid(object): + CODER = AuctionBidCoder() + + def __init__(self, auction, bid): + self.auction = auction + self.bid = bid + + def __repr__(self): + return nexmark_util.model_to_json(self) + + +class AuctionBidCoderImpl(coder_impl.StreamCoderImpl): + _auction_coder_impl = nexmark_model.AuctionCoderImpl() + _bid_coder_Impl = nexmark_model.BidCoderImpl() + + def encode_to_stream(self, value, stream, nested): + self._auction_coder_impl.encode_to_stream(value.auction, stream, True) + self._bid_coder_Impl.encode_to_stream(value.bid, stream, True) + + def decode_from_stream(self, stream, nested): + auction = self._auction_coder_impl.decode_from_stream(stream, True) + bid = self._bid_coder_Impl.decode_from_stream(stream, True) + return AuctionBid(auction, bid) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py new file mode 100644 index 000000000000..90f79f8dacf2 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Result of Query2.""" +from __future__ import absolute_import + +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.testing.benchmarks.nexmark import nexmark_util + + +class AuctionPriceCoder(FastCoder): + def _create_impl(self): + return AuctionPriceCoderImpl() + + def is_deterministic(self): + return True + + +class AuctionPrice(object): + CODER = AuctionPriceCoder() + + def __init__(self, auction, price): + self.auction = auction + self.price = price + + def __repr__(self): + return nexmark_util.model_to_json(self) + + +class AuctionPriceCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + + def encode_to_stream(self, value, stream, nested): + self._int_coder_impl.encode_to_stream(value.auction, stream, True) + self._int_coder_impl.encode_to_stream(value.price, stream, True) + + def decode_from_stream(self, stream, nested): + + auction = self._int_coder_impl.decode_from_stream(stream, True) + price = self._int_coder_impl.decode_from_stream(stream, True) + return AuctionPrice(auction, price) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py new file mode 100644 index 000000000000..5d583bca27f8 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" Field names for de-serializing json representation of Models +""" + + +class FieldNames: + ID = 'id' + NAME = 'name' + EMAIL_ADDRESS = 'emailAddress' + CREDIT_CARD = 'creditCard' + CITY = "city" + STATE = "state" + DATE_TIME = "dateTime" + EXTRA = "extra" + ITEM_NAME = "itemName" + DESCRIPTION = "description" + INITIAL_BID = "initialBid" + RESERVE = "reserve" + EXPIRES = "expires" + SELLER = "seller" + CATEGORY = "category" + AUCTION = "auction" + BIDDER = "bidder" + PRICE = "price" diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 2cd7bf0acf2e..f2fd4e0ec388 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,31 +26,52 @@ - The bid on an item for auction (Bid). """ +from __future__ import absolute_import + +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.coders.coders import StrUtf8Coder +from apache_beam.testing.benchmarks.nexmark import nexmark_util + + +class PersonCoder(FastCoder): + def _create_impl(self): + return PersonCoderImpl() + + def is_deterministic(self): + return True class Person(object): "Author of an auction or a bid." + CODER = PersonCoder() def __init__( - self, id, name, email, credit_card, city, state, timestamp, extra=None): + self, id, name, email, credit_card, city, state, date_time, extra=None): self.id = id self.name = name - self.email = email # key + self.email_address = email # key self.credit_card = credit_card self.city = city self.state = state - self.timestamp = timestamp + self.date_time = date_time self.extra = extra def __repr__(self): - return 'Person({id}, {email})'.format( - **{ - 'id': self.id, 'email': self.email - }) + return nexmark_util.model_to_json(self) + + +class AuctionCoder(FastCoder): + def _create_impl(self): + return AuctionCoderImpl() + + def is_deterministic(self): + return True class Auction(object): "Item for auction." + CODER = AuctionCoder() def __init__( self, @@ -59,7 +80,7 @@ def __init__( description, initial_bid, reserve_price, - timestamp, + date_time, expires, seller, category, @@ -68,32 +89,124 @@ def __init__( self.item_name = item_name # key self.description = description self.initial_bid = initial_bid - self.reserve_price = reserve_price - self.timestamp = timestamp + self.reserve = reserve_price + self.date_time = date_time self.expires = expires self.seller = seller self.category = category self.extra = extra def __repr__(self): - return 'Auction({id}, {item_name})'.format( - **{ - 'id': self.id, 'item_name': self.item_name - }) + return nexmark_util.model_to_json(self) + + +class BidCoder(FastCoder): + def _create_impl(self): + return BidCoderImpl() + + def is_deterministic(self): + return True class Bid(object): "A bid for an item for auction." + CODER = BidCoder() - def __init__(self, auction, bidder, price, timestamp, extra=None): + def __init__(self, auction, bidder, price, date_time, extra=None): self.auction = auction # key self.bidder = bidder self.price = price - self.timestamp = timestamp + self.date_time = date_time self.extra = extra def __repr__(self): - return 'Bid({auction}, {bidder}, {price})'.format( - **{ - 'auction': self.auction, 'bidder': self.bidder, 'price': self.price - }) + return nexmark_util.model_to_json(self) + + +class AuctionCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value, stream, nested): + self._int_coder_impl.encode_to_stream(value.id, stream, True) + self._str_coder_impl.encode_to_stream(value.item_name, stream, True) + self._str_coder_impl.encode_to_stream(value.description, stream, True) + self._int_coder_impl.encode_to_stream(value.initial_bid, stream, True) + self._int_coder_impl.encode_to_stream(value.reserve, stream, True) + self._time_coder_impl.encode_to_stream(value.date_time, stream, True) + self._time_coder_impl.encode_to_stream(value.expires, stream, True) + self._int_coder_impl.encode_to_stream(value.seller, stream, True) + self._int_coder_impl.encode_to_stream(value.category, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + id = self._int_coder_impl.decode_from_stream(stream, True) + item_name = self._str_coder_impl.decode_from_stream(stream, True) + description = self._str_coder_impl.decode_from_stream(stream, True) + initial_bid = self._int_coder_impl.decode_from_stream(stream, True) + reserve = self._int_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + expires = self._time_coder_impl.decode_from_stream(stream, True) + seller = self._int_coder_impl.decode_from_stream(stream, True) + category = self._int_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Auction( + id, + item_name, + description, + initial_bid, + reserve, + date_time, + expires, + seller, + category, + extra) + + +class BidCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value, stream, nested): + self._int_coder_impl.encode_to_stream(value.auction, stream, True) + self._int_coder_impl.encode_to_stream(value.bidder, stream, True) + self._int_coder_impl.encode_to_stream(value.price, stream, True) + self._time_coder_impl.encode_to_stream(value.date_time, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + auction = self._int_coder_impl.decode_from_stream(stream, True) + bidder = self._int_coder_impl.decode_from_stream(stream, True) + price = self._int_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Bid(auction, bidder, price, date_time, extra) + + +class PersonCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value, stream, nested): + self._int_coder_impl.encode_to_stream(value.id, stream, True) + self._str_coder_impl.encode_to_stream(value.name, stream, True) + self._str_coder_impl.encode_to_stream(value.email_address, stream, True) + self._str_coder_impl.encode_to_stream(value.credit_card, stream, True) + self._str_coder_impl.encode_to_stream(value.city, stream, True) + self._str_coder_impl.encode_to_stream(value.state, stream, True) + self._time_coder_impl.encode_to_stream(value.date_time, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + id = self._int_coder_impl.decode_from_stream(stream, True) + name = self._str_coder_impl.decode_from_stream(stream, True) + email = self._str_coder_impl.decode_from_stream(stream, True) + credit_card = self._str_coder_impl.decode_from_stream(stream, True) + city = self._str_coder_impl.decode_from_stream(stream, True) + state = self._str_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Person(id, name, email, credit_card, city, state, date_time, extra) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py index 79032ad483b0..67f6bb19b5a9 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py @@ -75,10 +75,13 @@ from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TestOptions +from apache_beam.testing.benchmarks.nexmark import nexmark_util from apache_beam.testing.benchmarks.nexmark.nexmark_util import Command from apache_beam.testing.benchmarks.nexmark.queries import query0 from apache_beam.testing.benchmarks.nexmark.queries import query1 from apache_beam.testing.benchmarks.nexmark.queries import query2 +from apache_beam.testing.benchmarks.nexmark.queries import query9 +from apache_beam.transforms import window class NexmarkLauncher(object): @@ -110,7 +113,7 @@ def parse_args(self): type=int, action='append', required=True, - choices=[0, 1, 2], + choices=[0, 1, 2, 9], help='Query to run') parser.add_argument( @@ -193,15 +196,27 @@ def generate_events(self): else: raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub( topic=topic.full_name) - + raw_events = ( + raw_events + | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn()) + | 'timestamping' >> + beam.Map(lambda e: window.TimestampedValue(e, e.date_time))) return raw_events def run_query(self, query, query_args, query_errors): try: self.parse_args() self.pipeline = beam.Pipeline(options=self.pipeline_options) - raw_events = self.generate_events() - query.load(raw_events, query_args) + nexmark_util.setup_coder() + events = self.generate_events() + output = query.load(events, query_args) + # print results + ( # pylint: disable=expression-not-assigned + output | beam.Map(repr) + | beam.io.WriteToText( + "py-query-result", file_name_suffix='.json', num_shards=1)) + output | nexmark_util.CountAndLog() # pylint: disable=expression-not-assigned + # end of results output result = self.pipeline.run() job_duration = ( self.pipeline_options.view_as(TestOptions).wait_until_finish_duration) @@ -228,6 +243,7 @@ def run(self): 0: query0, 1: query1, 2: query2, # TODO(mariagh): Add more queries. + 9: query9 } # TODO(mariagh): Move to a config file. diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 9079596a1a2a..a77f85c5a896 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -37,11 +37,17 @@ from __future__ import absolute_import from __future__ import print_function +import json import logging import threading import apache_beam as beam +from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.models import auction_price from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames +from apache_beam.transforms import window +from apache_beam.utils.timestamp import Timestamp _LOGGER = logging.getLogger(__name__) @@ -68,23 +74,36 @@ def thread_target(): thread.join(timeout) +def setup_coder(): + beam.coders.registry.register_coder( + nexmark_model.Auction, nexmark_model.AuctionCoder) + beam.coders.registry.register_coder( + nexmark_model.Person, nexmark_model.PersonCoder) + beam.coders.registry.register_coder(nexmark_model.Bid, nexmark_model.BidCoder) + beam.coders.registry.register_coder( + auction_bid.AuctionBid, auction_bid.AuctionBidCoder) + beam.coders.registry.register_coder( + auction_price.AuctionPrice, auction_price.AuctionPriceCoder) + + class ParseEventFn(beam.DoFn): - """Parses the raw event info into a Python objects. + """ + Original parser for parsing raw events info into a Python objects. Each event line has the following format: person: ,name,email,credit_card,city, \ - state,timestamp,extra + state,timestamp,extra auction: ,item_name, description,initial_bid, \ - reserve_price,timestamp,expires,seller,category,extra + reserve_price,timestamp,expires,seller,category,extra bid: ,bidder,price,timestamp,extra For example: 'p12345,maria,maria@maria.com,1234-5678-9012-3456, \ - sunnyvale,CA,1528098831536' + sunnyvale,CA,1528098831536' 'a12345,car67,2012 hyundai elantra,15000,20000, \ - 1528098831536,20180630,maria,vehicle' + 1528098831536,20180630,maria,vehicle' 'b12345,maria,20000,1528098831536' """ def process(self, elem): @@ -103,6 +122,107 @@ def process(self, elem): yield event +class ParseJsonEvnetFn(beam.DoFn): + """Parses the raw event info into a Python objects. + + Each event line has the following format: + + person: {id,name,email,credit_card,city, \ + state,timestamp,extra} + auction: {id,item_name, description,initial_bid, \ + reserve_price,timestamp,expires,seller,category,extra} + bid: {auction,bidder,price,timestamp,extra} + + For example: + + {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\ + "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\ + "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["} + + {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\ + "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\ + "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"} + + {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\ + "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"} + """ + def process(self, elem): + json_dict = json.loads(elem) + if type(json_dict[FieldNames.DATE_TIME]) is dict: + json_dict[FieldNames.DATE_TIME] = json_dict[ + FieldNames.DATE_TIME]['millis'] + if FieldNames.NAME in json_dict: + yield nexmark_model.Person( + json_dict[FieldNames.ID], + json_dict[FieldNames.NAME], + json_dict[FieldNames.EMAIL_ADDRESS], + json_dict[FieldNames.CREDIT_CARD], + json_dict[FieldNames.CITY], + json_dict[FieldNames.STATE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) + elif FieldNames.ITEM_NAME in json_dict: + if type(json_dict[FieldNames.EXPIRES]) is dict: + json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis'] + yield nexmark_model.Auction( + json_dict[FieldNames.ID], + json_dict[FieldNames.ITEM_NAME], + json_dict[FieldNames.DESCRIPTION], + json_dict[FieldNames.INITIAL_BID], + json_dict[FieldNames.RESERVE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + millis_to_timestamp(json_dict[FieldNames.EXPIRES]), + json_dict[FieldNames.SELLER], + json_dict[FieldNames.CATEGORY], + json_dict[FieldNames.EXTRA]) + elif FieldNames.AUCTION in json_dict: + yield nexmark_model.Bid( + json_dict[FieldNames.AUCTION], + json_dict[FieldNames.BIDDER], + json_dict[FieldNames.PRICE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) + else: + raise ValueError('Invalid event: %s.' % str(json_dict)) + + +class CountAndLog(beam.PTransform): + def expand(self, pcoll): + return ( + pcoll + | 'window' >> beam.WindowInto(window.GlobalWindows()) + | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) + + +def log_count_info(count): + logging.info('Query resulted in %d results', count) + return count + + def display(elm): logging.debug(elm) return elm + + +def model_to_json(model): + return json.dumps(construct_json_dict(model), separators=(',', ':')) + + +def construct_json_dict(model): + return {k: unnest_to_json(v) for k, v in model.__dict__.items()} + + +def unnest_to_json(cand): + if isinstance(cand, Timestamp): + return cand.micros // 1000 + elif isinstance( + cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): + return construct_json_dict(cand) + else: + return cand + + +def millis_to_timestamp(millis): + micro_second = millis * 1000 + return Timestamp(micros=micro_second) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py new file mode 100644 index 000000000000..3e43e96c275c --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities for working with NEXmark data stream.""" +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model + +AUCTION_TAG = 'auctions' +BID_TAG = 'bids' +PERSON_TAG = 'person' + + +def is_bid(event): + return isinstance(event, nexmark_model.Bid) + + +def is_auction(event): + return isinstance(event, nexmark_model.Auction) + + +class JustBids(beam.PTransform): + def expand(self, pcoll): + return pcoll | "IsBid" >> beam.Filter(is_bid) + + +class JustAuctions(beam.PTransform): + def expand(self, pcoll): + return pcoll | "IsAuction" >> beam.Filter(is_auction) + + +class AuctionByIdFn(beam.DoFn): + def process(self, element): + yield element.id, element + + +class BidByAuctionIdFn(beam.DoFn): + def process(self, element): + yield element.auction, element + + +def auction_or_bid(event): + return isinstance(event, (nexmark_model.Auction, nexmark_model.Bid)) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index c2c0d509c0f4..a4c50550a6dc 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -30,8 +30,17 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -def load(raw_events, query_args=None): - return raw_events | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) # pylint: disable=expression-not-assigned +class RoundTripFn(beam.DoFn): + def process(self, element): + coder = element.CODER + byte_value = coder.encode(element) + recon = coder.decode(byte_value) + yield recon + + +def load(events, query_args=None): + return ( + events + | 'serialization_and_deserialization' >> beam.ParDo(RoundTripFn())) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py index 95f3f162fbdf..acb205fe2395 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py @@ -29,20 +29,19 @@ import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -from apache_beam.testing.benchmarks.nexmark.nexmark_util import display +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +USD_TO_EURO = 0.89 -def load(raw_events, query_args=None): + +def load(events, query_args=None): return ( - raw_events - | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) - | 'FilterInBids' >> - beam.Filter(lambda event: isinstance(event, nexmark_model.Bid)) + events + | nexmark_query_util.JustBids() | 'ConvertToEuro' >> beam.Map( lambda bid: nexmark_model.Bid( bid.auction, - bid.bidder, (float(bid.price) * 89) // 100, - bid.timestamp, - bid.extra)) - | 'DisplayQuery1' >> beam.Map(display)) # pylint: disable=expression-not-assigned + bid.bidder, + bid.price * USD_TO_EURO, + bid.date_time, + bid.extra))) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py index 51504a600dce..112760444869 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py @@ -15,12 +15,11 @@ # limitations under the License. # -"""Nexmark Query 2: Select auctions by auction id. - -The Nexmark suite is a series of queries (streaming pipelines) performed -on a simulation of auction events. +""" +Query 2: Find bids with specific auction ids and show their bid price -This query selects auctions (items) that have a particular id. +This query selects Bids that have a particular auctiuon id, and output their +auction id with bid price. It illustrates a simple filter. """ @@ -29,17 +28,15 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -from apache_beam.testing.benchmarks.nexmark.nexmark_util import display +from apache_beam.testing.benchmarks.nexmark.models import auction_price +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util -def load(raw_events, metadata=None): +def load(events, metadata=None): return ( - raw_events - | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) - | 'FilterInAuctionsWithSelectedId' >> beam.Filter( - lambda event: ( - isinstance(event, nexmark_model.Auction) and event.id == metadata. - get('auction_id'))) - | 'DisplayQuery2' >> beam.Map(display)) # pylint: disable=expression-not-assigned + events + | nexmark_query_util.JustBids() + | 'filter_by_skip' >> + beam.Filter(lambda bid: bid.auction % metadata.get('auction_skip') == 0) + | 'project' >> + beam.Map(lambda bid: auction_price.AuctionPrice(bid.auction, bid.price))) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py new file mode 100644 index 000000000000..180d8500d062 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Query 9: Winning-bids: extract the most recent of the highest bids +See winning_bids.py for detailed documentation +""" + +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +from apache_beam.testing.benchmarks.nexmark.queries import winning_bids + + +def load(events, metadata=None): + return ( + events + | beam.Filter(nexmark_query_util.auction_or_bid) + | winning_bids.WinningBids()) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py new file mode 100644 index 000000000000..6cb69db3e263 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -0,0 +1,198 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A transform to find winning bids for each closed auction. In pseudo CQL syntax: + +SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime) +FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] +WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME +GROUP BY A.id + +We will also check that the winning bid is above the auction reserve. Note that +we ignore the auction opening bid value since it has no impact on which bid +eventually wins, if any. + +Our implementation will use a custom windowing function in order to bring bids +and auctions together without requiring global state. +""" + +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +from apache_beam.transforms.window import IntervalWindow +from apache_beam.transforms.window import WindowFn +from apache_beam.utils.timestamp import Duration + + +class AuctionOrBidWindow(IntervalWindow): + """Windows for open auctions and bids.""" + def __init__(self, start, end, auction_id, is_auction_window): + super(AuctionOrBidWindow, self).__init__(start, end) + self.auction = auction_id + self.is_auction_window = is_auction_window + + @staticmethod + def for_auction(timestamp, auction): + return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True) + + @staticmethod + def for_bid(expected_duration_micro, timestamp, bid): + return AuctionOrBidWindow( + timestamp, + timestamp + Duration(micros=expected_duration_micro * 2), + bid.auction, + False) + + def is_auction_window_fn(self): + return self.is_auction_window + + def __str__(self): + return ( + 'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' % + (self.start, self.end, self.auction, self.is_auction_window)) + + +class AuctionOrBidWindowCoder(FastCoder): + def _create_impl(self): + return AuctionOrBidWindowCoderImpl() + + def is_deterministic(self): + return True + + +class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl): + _super_coder_impl = coder_impl.IntervalWindowCoderImpl() + _id_coder_impl = coder_impl.VarIntCoderImpl() + _bool_coder_impl = coder_impl.BooleanCoderImpl() + + def encode_to_stream(self, value, stream, nested): + self._super_coder_impl.encode_to_stream(value, stream, True) + self._id_coder_impl.encode_to_stream(value.auction, stream, True) + self._bool_coder_impl.encode_to_stream( + value.is_auction_window, stream, True) + + def decode_from_stream(self, stream, nested): + super_window = self._super_coder_impl.decode_from_stream(stream, True) + auction = self._id_coder_impl.decode_from_stream(stream, True) + is_auction = self._bool_coder_impl.decode_from_stream(stream, True) + return AuctionOrBidWindow( + super_window.start, super_window.end, auction, is_auction) + + +class AuctionOrBidWindowFn(WindowFn): + def __init__(self, expected_duration_micro): + self.expected_duration = expected_duration_micro + + def assign(self, assign_context): + event = assign_context.element + if isinstance(event, nexmark_model.Auction): + return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)] + elif isinstance(event, nexmark_model.Bid): + return [ + AuctionOrBidWindow.for_bid( + self.expected_duration, assign_context.timestamp, event) + ] + else: + raise ValueError( + '%s can only assign windows to auctions and bids, but received %s' % + (self.__class__.__name__, event)) + + def merge(self, merge_context): + auction_id_to_auction_window = {} + auction_id_to_bid_window = {} + for window in merge_context.windows: + if window.is_auction_window_fn(): + auction_id_to_auction_window[window.auction] = window + else: + if window.auction not in auction_id_to_bid_window: + auction_id_to_bid_window[window.auction] = [] + auction_id_to_bid_window[window.auction].append(window) + + for auction, auction_window in auction_id_to_auction_window.items(): + bid_window_list = auction_id_to_bid_window.get(auction) + if bid_window_list is not None: + to_merge = [] + for bid_window in bid_window_list: + if bid_window.start < auction_window.end: + to_merge.append(bid_window) + if len(to_merge) > 0: + to_merge.append(auction_window) + merge_context.merge(to_merge, auction_window) + + def get_window_coder(self): + return AuctionOrBidWindowCoder() + + def get_transformed_output_time(self, window, input_timestamp): + return window.max_timestamp() + + +class JoinAuctionBidFn(beam.DoFn): + @staticmethod + def higher_bid(bid, other): + if bid.price > other.price: + return True + elif bid.price < other.price: + return False + else: + return bid.date_time < other.date_time + + def process(self, element): + _, group = element + auctions = group[nexmark_query_util.AUCTION_TAG] + auction = auctions[0] if auctions else None + if auction is None: + return + best_bid = None + for bid in group[nexmark_query_util.BID_TAG]: + if bid.price < auction.reserve: + continue + if best_bid is None or JoinAuctionBidFn.higher_bid(bid, best_bid): + best_bid = bid + if best_bid: + yield auction_bid.AuctionBid(auction, best_bid) + + +class WinningBids(beam.PTransform): + def __init__(self): + #TODO: change this to be calculated by event generation + expected_duration = 16667000 + self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration) + + def expand(self, pcoll): + events = pcoll | beam.WindowInto(self.auction_or_bid_windowFn) + + auction_by_id = ( + events + | nexmark_query_util.JustAuctions() + | 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn())) + bids_by_auction_id = ( + events + | nexmark_query_util.JustBids() + | 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn())) + + return ({ + nexmark_query_util.AUCTION_TAG: auction_by_id, + nexmark_query_util.BID_TAG: bids_by_auction_id + } + | beam.CoGroupByKey() + | beam.ParDo(JoinAuctionBidFn())) diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh index d1f246c295e3..0091a22182a8 100755 --- a/sdks/python/scripts/generate_pydoc.sh +++ b/sdks/python/scripts/generate_pydoc.sh @@ -176,6 +176,7 @@ ignore_identifiers = [ 'apache_beam.typehints.typehints.TypeConstraint', 'apache_beam.typehints.typehints.validate_composite_type_param()', 'apache_beam.utils.windowed_value._IntervalWindowBase', + 'apache_beam.coders.coder_impl.StreamCoderImpl', # Private classes which are used within the same module 'apache_beam.transforms.external_test.PayloadBase',