diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py new file mode 100644 index 000000000000..59bc9b29c3a6 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_bid.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Result of WinningBid transform.""" +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.testing.benchmarks.nexmark import nexmark_util +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model + + +class AuctionBidCoder(FastCoder): + def _create_impl(self): + return AuctionBidCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + +class AuctionBid(object): + CODER = AuctionBidCoder() + + def __init__(self, auction, bid): + self.auction = auction + self.bid = bid + + def __repr__(self): + return nexmark_util.model_to_json(self) + + +class AuctionBidCoderImpl(coder_impl.StreamCoderImpl): + _auction_coder_impl = nexmark_model.AuctionCoderImpl() + _bid_coder_Impl = nexmark_model.BidCoderImpl() + + def encode_to_stream(self, value: AuctionBid, stream, nested): + self._auction_coder_impl.encode_to_stream(value.auction, stream, True) + self._bid_coder_Impl.encode_to_stream(value.bid, stream, True) + + def decode_from_stream(self, stream, nested): + auction = self._auction_coder_impl.decode_from_stream(stream, True) + bid = self._bid_coder_Impl.decode_from_stream(stream, True) + return AuctionBid(auction, bid) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py new file mode 100644 index 000000000000..a9b41e877e0d --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/auction_price.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Result of Query2.""" +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.testing.benchmarks.nexmark import nexmark_util + + +class AuctionPriceCoder(FastCoder): + def _create_impl(self): + return AuctionPriceCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + +class AuctionPrice(object): + CODER = AuctionPriceCoder() + + def __init__(self, auction, price): + self.auction = auction + self.price = price + + def __repr__(self): + return nexmark_util.model_to_json(self) + + +class AuctionPriceCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + + def encode_to_stream(self, value: AuctionPrice, stream, nested): + self._int_coder_impl.encode_to_stream(value.auction, stream, True) + self._int_coder_impl.encode_to_stream(value.price, stream, True) + + def decode_from_stream(self, stream, nested): + + auction = self._int_coder_impl.decode_from_stream(stream, True) + price = self._int_coder_impl.decode_from_stream(stream, True) + return AuctionPrice(auction, price) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py new file mode 100644 index 000000000000..5d583bca27f8 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/field_name.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" Field names for de-serializing json representation of Models +""" + + +class FieldNames: + ID = 'id' + NAME = 'name' + EMAIL_ADDRESS = 'emailAddress' + CREDIT_CARD = 'creditCard' + CITY = "city" + STATE = "state" + DATE_TIME = "dateTime" + EXTRA = "extra" + ITEM_NAME = "itemName" + DESCRIPTION = "description" + INITIAL_BID = "initialBid" + RESERVE = "reserve" + EXPIRES = "expires" + SELLER = "seller" + CATEGORY = "category" + AUCTION = "auction" + BIDDER = "bidder" + PRICE = "price" diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py index 2cd7bf0acf2e..921ba54d5a5c 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py @@ -26,31 +26,55 @@ - The bid on an item for auction (Bid). """ +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.coders.coders import StrUtf8Coder +from apache_beam.testing.benchmarks.nexmark import nexmark_util + + +class PersonCoder(FastCoder): + def _create_impl(self): + return PersonCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True class Person(object): "Author of an auction or a bid." + CODER = PersonCoder() def __init__( - self, id, name, email, credit_card, city, state, timestamp, extra=None): + self, id, name, email, credit_card, city, state, date_time, extra=None): self.id = id self.name = name - self.email = email # key - self.credit_card = credit_card + self.emailAddress = email # key + self.creditCard = credit_card self.city = city self.state = state - self.timestamp = timestamp + self.dateTime = date_time self.extra = extra def __repr__(self): - return 'Person({id}, {email})'.format( - **{ - 'id': self.id, 'email': self.email - }) + return nexmark_util.model_to_json(self) + + +class AuctionCoder(FastCoder): + def to_type_hint(self): + pass + + def _create_impl(self): + return AuctionCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True class Auction(object): "Item for auction." + CODER = AuctionCoder() def __init__( self, @@ -59,41 +83,134 @@ def __init__( description, initial_bid, reserve_price, - timestamp, + date_time, expires, seller, category, extra=None): self.id = id - self.item_name = item_name # key + self.itemName = item_name # key self.description = description - self.initial_bid = initial_bid - self.reserve_price = reserve_price - self.timestamp = timestamp + self.initialBid = initial_bid + self.reserve = reserve_price + self.dateTime = date_time self.expires = expires self.seller = seller self.category = category self.extra = extra def __repr__(self): - return 'Auction({id}, {item_name})'.format( - **{ - 'id': self.id, 'item_name': self.item_name - }) + return nexmark_util.model_to_json(self) + + +class BidCoder(FastCoder): + def _create_impl(self): + return BidCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True class Bid(object): "A bid for an item for auction." + CODER = BidCoder() - def __init__(self, auction, bidder, price, timestamp, extra=None): + def __init__(self, auction, bidder, price, date_time, extra=None): self.auction = auction # key self.bidder = bidder self.price = price - self.timestamp = timestamp + self.dateTime = date_time self.extra = extra def __repr__(self): - return 'Bid({auction}, {bidder}, {price})'.format( - **{ - 'auction': self.auction, 'bidder': self.bidder, 'price': self.price - }) + return nexmark_util.model_to_json(self) + + +class AuctionCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value: Auction, stream, nested): + self._int_coder_impl.encode_to_stream(value.id, stream, True) + self._str_coder_impl.encode_to_stream(value.itemName, stream, True) + self._str_coder_impl.encode_to_stream(value.description, stream, True) + self._int_coder_impl.encode_to_stream(value.initialBid, stream, True) + self._int_coder_impl.encode_to_stream(value.reserve, stream, True) + self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._time_coder_impl.encode_to_stream(value.expires, stream, True) + self._int_coder_impl.encode_to_stream(value.seller, stream, True) + self._int_coder_impl.encode_to_stream(value.category, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + id = self._int_coder_impl.decode_from_stream(stream, True) + item_name = self._str_coder_impl.decode_from_stream(stream, True) + description = self._str_coder_impl.decode_from_stream(stream, True) + initial_bid = self._int_coder_impl.decode_from_stream(stream, True) + reserve = self._int_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + expires = self._time_coder_impl.decode_from_stream(stream, True) + seller = self._int_coder_impl.decode_from_stream(stream, True) + category = self._int_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Auction( + id, + item_name, + description, + initial_bid, + reserve, + date_time, + expires, + seller, + category, + extra) + + +class BidCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value: Bid, stream, nested): + self._int_coder_impl.encode_to_stream(value.auction, stream, True) + self._int_coder_impl.encode_to_stream(value.bidder, stream, True) + self._int_coder_impl.encode_to_stream(value.price, stream, True) + self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + auction = self._int_coder_impl.decode_from_stream(stream, True) + bidder = self._int_coder_impl.decode_from_stream(stream, True) + price = self._int_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Bid(auction, bidder, price, date_time, extra) + + +class PersonCoderImpl(coder_impl.StreamCoderImpl): + _int_coder_impl = coder_impl.VarIntCoderImpl() + _str_coder_impl = StrUtf8Coder().get_impl() + _time_coder_impl = coder_impl.TimestampCoderImpl() + + def encode_to_stream(self, value: Person, stream, nested): + self._int_coder_impl.encode_to_stream(value.id, stream, True) + self._str_coder_impl.encode_to_stream(value.name, stream, True) + self._str_coder_impl.encode_to_stream(value.emailAddress, stream, True) + self._str_coder_impl.encode_to_stream(value.creditCard, stream, True) + self._str_coder_impl.encode_to_stream(value.city, stream, True) + self._str_coder_impl.encode_to_stream(value.state, stream, True) + self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) + self._str_coder_impl.encode_to_stream(value.extra, stream, True) + + def decode_from_stream(self, stream, nested): + id = self._int_coder_impl.decode_from_stream(stream, True) + name = self._str_coder_impl.decode_from_stream(stream, True) + email = self._str_coder_impl.decode_from_stream(stream, True) + credit_card = self._str_coder_impl.decode_from_stream(stream, True) + city = self._str_coder_impl.decode_from_stream(stream, True) + state = self._str_coder_impl.decode_from_stream(stream, True) + date_time = self._time_coder_impl.decode_from_stream(stream, True) + extra = self._str_coder_impl.decode_from_stream(stream, True) + return Person(id, name, email, credit_card, city, state, date_time, extra) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py index 79032ad483b0..d7332d67fba5 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py @@ -67,6 +67,7 @@ import sys import uuid +from apache_beam.transforms import window from google.cloud import pubsub import apache_beam as beam @@ -75,10 +76,12 @@ from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TestOptions +from apache_beam.testing.benchmarks.nexmark import nexmark_util from apache_beam.testing.benchmarks.nexmark.nexmark_util import Command from apache_beam.testing.benchmarks.nexmark.queries import query0 from apache_beam.testing.benchmarks.nexmark.queries import query1 from apache_beam.testing.benchmarks.nexmark.queries import query2 +from apache_beam.testing.benchmarks.nexmark.queries import query9 class NexmarkLauncher(object): @@ -110,7 +113,7 @@ def parse_args(self): type=int, action='append', required=True, - choices=[0, 1, 2], + choices=[0, 1, 2, 9], help='Query to run') parser.add_argument( @@ -193,15 +196,27 @@ def generate_events(self): else: raw_events = self.pipeline | 'ReadPubSub' >> beam.io.ReadFromPubSub( topic=topic.full_name) - + raw_events = ( + raw_events + | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEvnetFn()) + | 'timestamping' >> + beam.Map(lambda e: window.TimestampedValue(e, e.dateTime))) return raw_events def run_query(self, query, query_args, query_errors): try: self.parse_args() self.pipeline = beam.Pipeline(options=self.pipeline_options) - raw_events = self.generate_events() - query.load(raw_events, query_args) + nexmark_util.setup_coder() + events = self.generate_events() + output = query.load(events, query_args) + # print results + ( + output | beam.Map(repr) + | beam.io.WriteToText( + "py-query-result", file_name_suffix='.json', num_shards=1)) + output | nexmark_util.CountAndLog() + # end of results output result = self.pipeline.run() job_duration = ( self.pipeline_options.view_as(TestOptions).wait_until_finish_duration) @@ -228,6 +243,7 @@ def run(self): 0: query0, 1: query1, 2: query2, # TODO(mariagh): Add more queries. + 9: query9 } # TODO(mariagh): Move to a config file. @@ -263,4 +279,4 @@ def run(self): if __name__ == '__main__': launcher = NexmarkLauncher() launcher.run() - launcher.cleanup() + # launcher.cleanup() diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py index 9079596a1a2a..9a3b7ae1ec8b 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py @@ -39,9 +39,15 @@ import logging import threading +import json import apache_beam as beam +from apache_beam.transforms import window +from apache_beam.utils.timestamp import Timestamp from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.models import auction_price +from apache_beam.testing.benchmarks.nexmark.models.field_name import FieldNames _LOGGER = logging.getLogger(__name__) @@ -68,6 +74,18 @@ def thread_target(): thread.join(timeout) +def setup_coder(): + beam.coders.registry.register_coder( + nexmark_model.Auction, nexmark_model.AuctionCoder) + beam.coders.registry.register_coder( + nexmark_model.Person, nexmark_model.PersonCoder) + beam.coders.registry.register_coder(nexmark_model.Bid, nexmark_model.BidCoder) + beam.coders.registry.register_coder( + auction_bid.AuctionBid, auction_bid.AuctionBidCoder) + beam.coders.registry.register_coder( + auction_price.AuctionPrice, auction_price.AuctionPriceCoder) + + class ParseEventFn(beam.DoFn): """Parses the raw event info into a Python objects. @@ -103,6 +121,108 @@ def process(self, elem): yield event +class ParseJsonEvnetFn(beam.DoFn): + """Parses the raw event info into a Python objects. + + Each event line has the following format: + + person: {id,name,email,credit_card,city, \ + state,timestamp,extra} + auction: {id,item_name, description,initial_bid, \ + reserve_price,timestamp,expires,seller,category,extra} + bid: {auction,bidder,price,timestamp,extra} + + For example: + + {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\ + "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\ + "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["} + + {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\ + "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\ + "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"} + + {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\ + "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"} + """ + def process(self, elem): + json_dict = json.loads(elem) + if type(json_dict[FieldNames.DATE_TIME]) is dict: + json_dict[FieldNames.DATE_TIME] = json_dict[ + FieldNames.DATE_TIME]['millis'] + if FieldNames.NAME in json_dict: + yield nexmark_model.Person( + json_dict[FieldNames.ID], + json_dict[FieldNames.NAME], + json_dict[FieldNames.EMAIL_ADDRESS], + json_dict[FieldNames.CREDIT_CARD], + json_dict[FieldNames.CITY], + json_dict[FieldNames.STATE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) + elif FieldNames.ITEM_NAME in json_dict: + if type(json_dict[FieldNames.EXPIRES]) is dict: + json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis'] + yield nexmark_model.Auction( + json_dict[FieldNames.ID], + json_dict[FieldNames.ITEM_NAME], + json_dict[FieldNames.DESCRIPTION], + json_dict[FieldNames.INITIAL_BID], + json_dict[FieldNames.RESERVE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + millis_to_timestamp(json_dict[FieldNames.EXPIRES]), + json_dict[FieldNames.SELLER], + json_dict[FieldNames.CATEGORY], + json_dict[FieldNames.EXTRA]) + elif FieldNames.AUCTION in json_dict: + yield nexmark_model.Bid( + json_dict[FieldNames.AUCTION], + json_dict[FieldNames.BIDDER], + json_dict[FieldNames.PRICE], + millis_to_timestamp(json_dict[FieldNames.DATE_TIME]), + json_dict[FieldNames.EXTRA]) + else: + raise ValueError('Invalid event: %s.' % str(json_dict)) + + +class CountAndLog(beam.PTransform): + def expand(self, pcoll): + return ( + pcoll + | 'window' >> beam.WindowInto(window.GlobalWindows()) + | "Count" >> beam.combiners.Count.Globally() + | "Log" >> beam.Map(log_count_info)) + + +def log_count_info(count): + logging.info('Query resulted in %d results', count) + return count + + def display(elm): logging.debug(elm) return elm + + +def model_to_json(model): + return json.dumps(construct_json_dict(model), separators=(',', ':')) + + +def construct_json_dict(model): + return {k: unnest_to_json(v) for k, v in model.__dict__.items()} + + +def unnest_to_json(cand): + if isinstance(cand, Timestamp): + return cand.micros // 1000 + elif isinstance( + cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)): + return construct_json_dict(cand) + else: + return cand + + +def millis_to_timestamp(millis: int): + second = millis // 1000 + micro_second = millis % 1000 * 1000 + return Timestamp(second, micro_second) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py new file mode 100644 index 000000000000..e6e581a6ba6c --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/nexmark_query_util.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities for working with NEXmark data stream.""" + +import apache_beam as beam +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model + +AUCTION_TAG = 'auctions' +BID_TAG = 'bids' +PERSON_TAG = 'person' + + +def is_bid(event): + return isinstance(event, nexmark_model.Bid) + + +def is_auction(event): + return isinstance(event, nexmark_model.Auction) + + +class JustBids(beam.PTransform): + def expand(self, pcoll): + return pcoll | "IsBid" >> beam.Filter(is_bid) + + +class JustAuctions(beam.PTransform): + def expand(self, pcoll): + return pcoll | "IsAuction" >> beam.Filter(is_auction) + + +class AuctionByIdFn(beam.DoFn): + def process(self, element): + yield element.id, element + + +class BidByAuctionIdFn(beam.DoFn): + def process(self, element): + yield element.auction, element + + +def auction_or_bid(event): + return isinstance(event, (nexmark_model.Auction, nexmark_model.Bid)) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py index c2c0d509c0f4..681c8cd4fc52 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py @@ -30,8 +30,17 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -def load(raw_events, query_args=None): - return raw_events | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) # pylint: disable=expression-not-assigned +class round_tripFn(beam.DoFn): + def process(self, element): + coder = element.CODER + byte_value = coder.encode(element) + recon = coder.decode(byte_value) + yield recon + + +def load(events, query_args=None): + return ( + events + | 'serialization_and_deserialization' >> beam.ParDo(round_tripFn())) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py index 95f3f162fbdf..a807259596c8 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py @@ -29,20 +29,16 @@ import apache_beam as beam from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -from apache_beam.testing.benchmarks.nexmark.nexmark_util import display +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util def load(raw_events, query_args=None): return ( raw_events - | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) - | 'FilterInBids' >> - beam.Filter(lambda event: isinstance(event, nexmark_model.Bid)) + | nexmark_query_util.JustBids() | 'ConvertToEuro' >> beam.Map( lambda bid: nexmark_model.Bid( bid.auction, - bid.bidder, (float(bid.price) * 89) // 100, - bid.timestamp, - bid.extra)) - | 'DisplayQuery1' >> beam.Map(display)) # pylint: disable=expression-not-assigned + bid.bidder, (bid.price * 89) // 100, + bid.dateTime, + bid.extra))) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py index 51504a600dce..edd6ec809c25 100644 --- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py @@ -29,17 +29,15 @@ from __future__ import absolute_import import apache_beam as beam -from apache_beam.testing.benchmarks.nexmark.models import nexmark_model -from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn -from apache_beam.testing.benchmarks.nexmark.nexmark_util import display +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +from apache_beam.testing.benchmarks.nexmark.models import auction_price -def load(raw_events, metadata=None): +def load(events, metadata=None): return ( - raw_events - | 'ParseEventFn' >> beam.ParDo(ParseEventFn()) - | 'FilterInAuctionsWithSelectedId' >> beam.Filter( - lambda event: ( - isinstance(event, nexmark_model.Auction) and event.id == metadata. - get('auction_id'))) - | 'DisplayQuery2' >> beam.Map(display)) # pylint: disable=expression-not-assigned + events + | nexmark_query_util.JustBids() + | 'filter_by_skip' >> + beam.Filter(lambda bid: bid.auction % metadata.get('auction_skip') == 0) + | 'project' >> + beam.Map(lambda bid: auction_price.AuctionPrice(bid.auction, bid.price))) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py new file mode 100644 index 000000000000..7ac6d01689db --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util +from apache_beam.testing.benchmarks.nexmark.queries import winning_bids + + +def load(events, metadata=None): + return ( + events + | beam.Filter(nexmark_query_util.auction_or_bid) + | winning_bids.WinningBids()) diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py new file mode 100644 index 000000000000..7faa9b4403bd --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py @@ -0,0 +1,183 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.coders import coder_impl +from apache_beam.coders.coders import FastCoder +from apache_beam.transforms.window import WindowFn +from apache_beam.transforms.window import IntervalWindow +from apache_beam.testing.benchmarks.nexmark.models import nexmark_model +from apache_beam.testing.benchmarks.nexmark.models import auction_bid +from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util + + +class AuctionOrBidWindow(IntervalWindow): + """Windows for open auctions and bids.""" + def __init__(self, start, end, auction_id, is_auction_window): + super(AuctionOrBidWindow, self).__init__(start, end) + self.auction = auction_id + self.is_auction_window = is_auction_window + + @staticmethod + def for_auction(timestamp, auction: nexmark_model.Auction): + return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True) + + @staticmethod + def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid): + return AuctionOrBidWindow( + timestamp, timestamp + expected_duration_micro * 2, bid.auction, False) + + def is_auction_window_fn(self): + return self.is_auction_window + + def __str__(self): + return ( + 'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' % + (self.start, self.end, self.auction, self.is_auction_window)) + + +class AuctionOrBidWindowCoder(FastCoder): + def _create_impl(self): + return AuctionOrBidWindowCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + +class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl): + _super_coder_impl = coder_impl.IntervalWindowCoderImpl() + _id_coder_impl = coder_impl.VarIntCoderImpl() + _bool_coder_impl = coder_impl.BooleanCoderImpl() + + def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested): + self._super_coder_impl.encode_to_stream(value, stream, True) + self._id_coder_impl.encode_to_stream(value.auction, stream, True) + self._bool_coder_impl.encode_to_stream( + value.is_auction_window, stream, True) + + def decode_from_stream(self, stream, nested): + super_window = self._super_coder_impl.decode_from_stream(stream, True) + auction = self._id_coder_impl.decode_from_stream(stream, True) + is_auction = self._bool_coder_impl.decode_from_stream(stream, True) + return AuctionOrBidWindow( + super_window.start, super_window.end, auction, is_auction) + + +class AuctionOrBidWindowFn(WindowFn): + def __init__(self, expected_duration_micro): + self.expected_duration = expected_duration_micro + + def assign(self, assign_context): + event = assign_context.element + if isinstance(event, nexmark_model.Auction): + return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)] + elif isinstance(event, nexmark_model.Bid): + return [ + AuctionOrBidWindow.for_bid( + self.expected_duration, assign_context.timestamp, event) + ] + else: + raise ValueError( + '%s can only assign windows to auctions and bids, but received %s' % + (self.__class__.__name__, event)) + + def merge(self, merge_context): + id_to_auction = {} + id_to_bid = {} + for window in merge_context.windows: + if window.is_auction_window_fn(): + id_to_auction[window.auction] = window + else: + if window.auction in id_to_bid: + bid_windows = id_to_bid[window.auction] + else: + bid_windows = [] + id_to_bid[window.auction] = bid_windows + bid_windows.append(window) + + for auction, auction_window in id_to_auction.items(): + bid_window_list = id_to_bid.get(auction) + if bid_window_list is not None: + to_merge = [] + for bid_window in bid_window_list: + if bid_window.start < auction_window.end: + to_merge.append(bid_window) + if len(to_merge) > 0: + to_merge.append(auction_window) + merge_context.merge(to_merge, auction_window) + + def get_window_coder(self): + return AuctionOrBidWindowCoder() + + def get_transformed_output_time(self, window, input_timestamp): + return window.max_timestamp() + + +class JoinAuctionBidFn(beam.DoFn): + @staticmethod + def higher_bid(bid, other): + if bid.price > other.price: + return True + elif bid.price < other.price: + return False + else: + return bid.dateTime < other.dateTime + + def process(self, element): + _, group = element + auctions = group[nexmark_query_util.AUCTION_TAG] + auction = auctions[0] if auctions else None + if auction is None: + return + best_bid = None + for bid in group[nexmark_query_util.BID_TAG]: + if bid.price < auction.reserve: + continue + if best_bid is None or JoinAuctionBidFn.higher_bid(bid, best_bid): + best_bid = bid + if best_bid is None: + return + yield auction_bid.AuctionBid(auction, best_bid) + + +class WinningBids(beam.PTransform): + def __init__(self): + #TODO: change this to be calculated by event generation + expected_duration = 16667000 + self.auction_or_bid_windowFn = AuctionOrBidWindowFn(expected_duration) + + def expand(self, pcoll): + events = pcoll | beam.WindowInto(self.auction_or_bid_windowFn) + + auction_by_id = ( + events + | nexmark_query_util.JustAuctions() + | 'auction_by_id' >> beam.ParDo(nexmark_query_util.AuctionByIdFn())) + bids_by_auction_id = ( + events + | nexmark_query_util.JustBids() + | 'bid_by_auction' >> beam.ParDo(nexmark_query_util.BidByAuctionIdFn())) + + return ({ + nexmark_query_util.AUCTION_TAG: auction_by_id, + nexmark_query_util.BID_TAG: bids_by_auction_id + } + | beam.CoGroupByKey() + | beam.ParDo(JoinAuctionBidFn()))