-
Notifications
You must be signed in to change notification settings - Fork 4.5k
python nexmark query 0-2 improvements #12365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
03c8cc9
9ede1d6
ff865ef
5b5f6ce
d6002dc
1fbf968
37add17
1110ec9
0129cd6
7420cb4
e2ae823
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| """Result of WinningBid transform.""" | ||
| from apache_beam.coders import coder_impl | ||
| from apache_beam.coders.coders import FastCoder | ||
| from apache_beam.testing.benchmarks.nexmark import nexmark_util | ||
| from apache_beam.testing.benchmarks.nexmark.models import nexmark_model | ||
|
|
||
|
|
||
| class AuctionBidCoder(FastCoder): | ||
| def _create_impl(self): | ||
| return AuctionBidCoderImpl() | ||
|
|
||
| def is_deterministic(self): | ||
| # type: () -> bool | ||
| return True | ||
|
|
||
|
|
||
| class AuctionBid(object): | ||
| CODER = AuctionBidCoder() | ||
|
|
||
| def __init__(self, auction, bid): | ||
| self.auction = auction | ||
| self.bid = bid | ||
|
|
||
| def __repr__(self): | ||
| return nexmark_util.model_to_json(self) | ||
|
|
||
|
|
||
| class AuctionBidCoderImpl(coder_impl.StreamCoderImpl): | ||
| _auction_coder_impl = nexmark_model.AuctionCoderImpl() | ||
| _bid_coder_Impl = nexmark_model.BidCoderImpl() | ||
|
|
||
| def encode_to_stream(self, value: AuctionBid, stream, nested): | ||
| self._auction_coder_impl.encode_to_stream(value.auction, stream, True) | ||
| self._bid_coder_Impl.encode_to_stream(value.bid, stream, True) | ||
|
|
||
| def decode_from_stream(self, stream, nested): | ||
| auction = self._auction_coder_impl.decode_from_stream(stream, True) | ||
| bid = self._bid_coder_Impl.decode_from_stream(stream, True) | ||
| return AuctionBid(auction, bid) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| """Result of Query2.""" | ||
| from apache_beam.coders import coder_impl | ||
| from apache_beam.coders.coders import FastCoder | ||
| from apache_beam.testing.benchmarks.nexmark import nexmark_util | ||
|
|
||
|
|
||
| class AuctionPriceCoder(FastCoder): | ||
| def _create_impl(self): | ||
| return AuctionPriceCoderImpl() | ||
|
|
||
| def is_deterministic(self): | ||
| # type: () -> bool | ||
| return True | ||
|
|
||
|
|
||
| class AuctionPrice(object): | ||
| CODER = AuctionPriceCoder() | ||
|
|
||
| def __init__(self, auction, price): | ||
| self.auction = auction | ||
| self.price = price | ||
|
|
||
| def __repr__(self): | ||
| return nexmark_util.model_to_json(self) | ||
|
|
||
|
|
||
| class AuctionPriceCoderImpl(coder_impl.StreamCoderImpl): | ||
| _int_coder_impl = coder_impl.VarIntCoderImpl() | ||
|
|
||
| def encode_to_stream(self, value: AuctionPrice, stream, nested): | ||
| self._int_coder_impl.encode_to_stream(value.auction, stream, True) | ||
| self._int_coder_impl.encode_to_stream(value.price, stream, True) | ||
|
|
||
| def decode_from_stream(self, stream, nested): | ||
|
|
||
| auction = self._int_coder_impl.decode_from_stream(stream, True) | ||
| price = self._int_coder_impl.decode_from_stream(stream, True) | ||
| return AuctionPrice(auction, price) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| """ Field names for de-serializing json representation of Models | ||
| """ | ||
|
|
||
|
|
||
| class FieldNames: | ||
| ID = 'id' | ||
| NAME = 'name' | ||
| EMAIL_ADDRESS = 'emailAddress' | ||
| CREDIT_CARD = 'creditCard' | ||
| CITY = "city" | ||
| STATE = "state" | ||
| DATE_TIME = "dateTime" | ||
| EXTRA = "extra" | ||
| ITEM_NAME = "itemName" | ||
| DESCRIPTION = "description" | ||
| INITIAL_BID = "initialBid" | ||
| RESERVE = "reserve" | ||
| EXPIRES = "expires" | ||
| SELLER = "seller" | ||
| CATEGORY = "category" | ||
| AUCTION = "auction" | ||
| BIDDER = "bidder" | ||
| PRICE = "price" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,31 +26,55 @@ | |
| - The bid on an item for auction (Bid). | ||
|
|
||
| """ | ||
| from apache_beam.coders import coder_impl | ||
| from apache_beam.coders.coders import FastCoder | ||
| from apache_beam.coders.coders import StrUtf8Coder | ||
| from apache_beam.testing.benchmarks.nexmark import nexmark_util | ||
|
|
||
|
|
||
| class PersonCoder(FastCoder): | ||
| def _create_impl(self): | ||
| return PersonCoderImpl() | ||
|
|
||
| def is_deterministic(self): | ||
| # type: () -> bool | ||
| return True | ||
|
|
||
|
|
||
| class Person(object): | ||
| "Author of an auction or a bid." | ||
| CODER = PersonCoder() | ||
|
|
||
| def __init__( | ||
| self, id, name, email, credit_card, city, state, timestamp, extra=None): | ||
| self, id, name, email, credit_card, city, state, date_time, extra=None): | ||
| self.id = id | ||
| self.name = name | ||
| self.email = email # key | ||
| self.credit_card = credit_card | ||
| self.emailAddress = email # key | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. python's style convention uses snake case for fields, so this is normally self.email_address, ditto for below
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did that because json.dumps(self.__dict__)outputs fieldnames that is the same with the name of fields so doing camel case sort of lines up with java version. I'll try to look for ways to change the names |
||
| self.creditCard = credit_card | ||
| self.city = city | ||
| self.state = state | ||
| self.timestamp = timestamp | ||
| self.dateTime = date_time | ||
| self.extra = extra | ||
|
|
||
| def __repr__(self): | ||
| return 'Person({id}, {email})'.format( | ||
| **{ | ||
| 'id': self.id, 'email': self.email | ||
| }) | ||
| return nexmark_util.model_to_json(self) | ||
|
|
||
|
|
||
| class AuctionCoder(FastCoder): | ||
| def to_type_hint(self): | ||
| pass | ||
|
|
||
| def _create_impl(self): | ||
| return AuctionCoderImpl() | ||
|
|
||
| def is_deterministic(self): | ||
| # type: () -> bool | ||
| return True | ||
|
|
||
|
|
||
| class Auction(object): | ||
| "Item for auction." | ||
| CODER = AuctionCoder() | ||
|
|
||
| def __init__( | ||
| self, | ||
|
|
@@ -59,41 +83,134 @@ def __init__( | |
| description, | ||
| initial_bid, | ||
| reserve_price, | ||
| timestamp, | ||
| date_time, | ||
| expires, | ||
| seller, | ||
| category, | ||
| extra=None): | ||
| self.id = id | ||
| self.item_name = item_name # key | ||
| self.itemName = item_name # key | ||
| self.description = description | ||
| self.initial_bid = initial_bid | ||
| self.reserve_price = reserve_price | ||
| self.timestamp = timestamp | ||
| self.initialBid = initial_bid | ||
| self.reserve = reserve_price | ||
| self.dateTime = date_time | ||
| self.expires = expires | ||
| self.seller = seller | ||
| self.category = category | ||
| self.extra = extra | ||
|
|
||
| def __repr__(self): | ||
| return 'Auction({id}, {item_name})'.format( | ||
| **{ | ||
| 'id': self.id, 'item_name': self.item_name | ||
| }) | ||
| return nexmark_util.model_to_json(self) | ||
|
|
||
|
|
||
| class BidCoder(FastCoder): | ||
| def _create_impl(self): | ||
| return BidCoderImpl() | ||
|
|
||
| def is_deterministic(self): | ||
| # type: () -> bool | ||
| return True | ||
|
|
||
|
|
||
| class Bid(object): | ||
| "A bid for an item for auction." | ||
| CODER = BidCoder() | ||
|
|
||
| def __init__(self, auction, bidder, price, timestamp, extra=None): | ||
| def __init__(self, auction, bidder, price, date_time, extra=None): | ||
| self.auction = auction # key | ||
| self.bidder = bidder | ||
| self.price = price | ||
| self.timestamp = timestamp | ||
| self.dateTime = date_time | ||
| self.extra = extra | ||
|
|
||
| def __repr__(self): | ||
| return 'Bid({auction}, {bidder}, {price})'.format( | ||
| **{ | ||
| 'auction': self.auction, 'bidder': self.bidder, 'price': self.price | ||
| }) | ||
| return nexmark_util.model_to_json(self) | ||
|
|
||
|
|
||
| class AuctionCoderImpl(coder_impl.StreamCoderImpl): | ||
| _int_coder_impl = coder_impl.VarIntCoderImpl() | ||
| _str_coder_impl = StrUtf8Coder().get_impl() | ||
| _time_coder_impl = coder_impl.TimestampCoderImpl() | ||
|
|
||
| def encode_to_stream(self, value: Auction, stream, nested): | ||
| self._int_coder_impl.encode_to_stream(value.id, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.itemName, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.description, stream, True) | ||
| self._int_coder_impl.encode_to_stream(value.initialBid, stream, True) | ||
| self._int_coder_impl.encode_to_stream(value.reserve, stream, True) | ||
| self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) | ||
| self._time_coder_impl.encode_to_stream(value.expires, stream, True) | ||
| self._int_coder_impl.encode_to_stream(value.seller, stream, True) | ||
| self._int_coder_impl.encode_to_stream(value.category, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.extra, stream, True) | ||
|
|
||
| def decode_from_stream(self, stream, nested): | ||
| id = self._int_coder_impl.decode_from_stream(stream, True) | ||
| item_name = self._str_coder_impl.decode_from_stream(stream, True) | ||
| description = self._str_coder_impl.decode_from_stream(stream, True) | ||
| initial_bid = self._int_coder_impl.decode_from_stream(stream, True) | ||
| reserve = self._int_coder_impl.decode_from_stream(stream, True) | ||
| date_time = self._time_coder_impl.decode_from_stream(stream, True) | ||
| expires = self._time_coder_impl.decode_from_stream(stream, True) | ||
| seller = self._int_coder_impl.decode_from_stream(stream, True) | ||
| category = self._int_coder_impl.decode_from_stream(stream, True) | ||
| extra = self._str_coder_impl.decode_from_stream(stream, True) | ||
| return Auction( | ||
| id, | ||
| item_name, | ||
| description, | ||
| initial_bid, | ||
| reserve, | ||
| date_time, | ||
| expires, | ||
| seller, | ||
| category, | ||
| extra) | ||
|
|
||
|
|
||
| class BidCoderImpl(coder_impl.StreamCoderImpl): | ||
| _int_coder_impl = coder_impl.VarIntCoderImpl() | ||
| _str_coder_impl = StrUtf8Coder().get_impl() | ||
| _time_coder_impl = coder_impl.TimestampCoderImpl() | ||
|
|
||
| def encode_to_stream(self, value: Bid, stream, nested): | ||
| self._int_coder_impl.encode_to_stream(value.auction, stream, True) | ||
| self._int_coder_impl.encode_to_stream(value.bidder, stream, True) | ||
| self._int_coder_impl.encode_to_stream(value.price, stream, True) | ||
| self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.extra, stream, True) | ||
|
|
||
| def decode_from_stream(self, stream, nested): | ||
| auction = self._int_coder_impl.decode_from_stream(stream, True) | ||
| bidder = self._int_coder_impl.decode_from_stream(stream, True) | ||
| price = self._int_coder_impl.decode_from_stream(stream, True) | ||
| date_time = self._time_coder_impl.decode_from_stream(stream, True) | ||
| extra = self._str_coder_impl.decode_from_stream(stream, True) | ||
| return Bid(auction, bidder, price, date_time, extra) | ||
|
|
||
|
|
||
| class PersonCoderImpl(coder_impl.StreamCoderImpl): | ||
| _int_coder_impl = coder_impl.VarIntCoderImpl() | ||
| _str_coder_impl = StrUtf8Coder().get_impl() | ||
| _time_coder_impl = coder_impl.TimestampCoderImpl() | ||
|
|
||
| def encode_to_stream(self, value: Person, stream, nested): | ||
| self._int_coder_impl.encode_to_stream(value.id, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.name, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.emailAddress, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.creditCard, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.city, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.state, stream, True) | ||
| self._time_coder_impl.encode_to_stream(value.dateTime, stream, True) | ||
| self._str_coder_impl.encode_to_stream(value.extra, stream, True) | ||
|
|
||
| def decode_from_stream(self, stream, nested): | ||
| id = self._int_coder_impl.decode_from_stream(stream, True) | ||
| name = self._str_coder_impl.decode_from_stream(stream, True) | ||
| email = self._str_coder_impl.decode_from_stream(stream, True) | ||
| credit_card = self._str_coder_impl.decode_from_stream(stream, True) | ||
| city = self._str_coder_impl.decode_from_stream(stream, True) | ||
| state = self._str_coder_impl.decode_from_stream(stream, True) | ||
| date_time = self._time_coder_impl.decode_from_stream(stream, True) | ||
| extra = self._str_coder_impl.decode_from_stream(stream, True) | ||
| return Person(id, name, email, credit_card, city, state, date_time, extra) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this needed, why not just use tuple for the result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I never thought about tuple but auctionPrice would allow me to define
__repr__method to serialize to json