Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
18650af
added function for populating shard
daniel-sanche Jul 24, 2023
df05171
Merge branch 'v3' into benchmarks_prod
daniel-sanche Jul 24, 2023
4db25b3
got table population working
daniel-sanche Jul 24, 2023
28cc1f4
finished table population
daniel-sanche Jul 24, 2023
9e97ac0
pulled out table setup fixtures from system tests
daniel-sanche Jul 25, 2023
52550cd
get system tests working
daniel-sanche Jul 25, 2023
a191236
added some basic throughput code
daniel-sanche Jul 25, 2023
31dbbd9
count operations
daniel-sanche Jul 25, 2023
eac206a
added splits to table
daniel-sanche Jul 25, 2023
ea771de
extracted cluster config
daniel-sanche Jul 25, 2023
2eda0ac
chanted test name
daniel-sanche Jul 25, 2023
f0649fd
improved benchmark output
daniel-sanche Jul 25, 2023
fbf2636
fixed population to use new format
daniel-sanche Jul 25, 2023
21df0c2
added sharded scan benchmark
daniel-sanche Jul 25, 2023
91ef008
more accurate time tracking
daniel-sanche Jul 25, 2023
f91750f
added script to deploy to GCE
daniel-sanche Jul 26, 2023
0236f85
limit to one qualifier
daniel-sanche Jul 26, 2023
fd94350
added separate test for point reads
daniel-sanche Jul 26, 2023
82ecaf2
made row size configurable
daniel-sanche Jul 26, 2023
a35f985
don't timeout benchmarks at deadline
daniel-sanche Jul 26, 2023
a440d76
added fastapi benchmark
daniel-sanche Jul 26, 2023
d0fcdb9
added driver for fastapi class
daniel-sanche Jul 26, 2023
0bb70ef
delete VM on complete; improved formatting
daniel-sanche Jul 27, 2023
3f3f59a
improved fastapi benchmark
daniel-sanche Jul 27, 2023
5a6ffad
run tests concurrently
daniel-sanche Jul 27, 2023
604071f
saved test_system changes
daniel-sanche Jul 27, 2023
3686ef6
added profling to point reads
daniel-sanche Jul 27, 2023
c42f596
add duration to run script
daniel-sanche Jul 27, 2023
e2ac258
build async generator manually
daniel-sanche Jul 28, 2023
1228c8a
lazy parse cell data
daniel-sanche Jul 28, 2023
1a3a8e7
added profiling to scans
daniel-sanche Jul 28, 2023
0659161
removed iscouroutine check
daniel-sanche Jul 28, 2023
ce2b733
added legacy benchmark
daniel-sanche Jul 28, 2023
4714e8a
added thread to legacy tests
daniel-sanche Jul 28, 2023
3111543
cut out row adapter
daniel-sanche Jul 28, 2023
c717a0b
lazy load rows
daniel-sanche Jul 28, 2023
b5cb253
simplify generator layers for non-stremaing
daniel-sanche Jul 29, 2023
aa4bed4
change pool size
daniel-sanche Jul 29, 2023
613b032
disabled lazy loading (75k rps)
daniel-sanche Jul 29, 2023
8e37967
only check for last_scanned when there are no chunks (81k rps)
daniel-sanche Jul 30, 2023
6775e2b
refactored chunk parsing (80k rps)
daniel-sanche Jul 31, 2023
537f4ff
removed subclass for row
daniel-sanche Jul 31, 2023
58b3fbb
added igor's row merger
daniel-sanche Jul 31, 2023
a7478ab
use stream by default
daniel-sanche Aug 7, 2023
689f072
use our models
daniel-sanche Aug 7, 2023
107b8c5
wrapped in object
daniel-sanche Aug 7, 2023
d76a7dc
updated row merger for acceptance tests
daniel-sanche Aug 8, 2023
1d9a475
improve performance
daniel-sanche Aug 8, 2023
37bc3a0
removed HasField from family
daniel-sanche Aug 8, 2023
ea95dcc
added request revision logic
daniel-sanche Aug 8, 2023
9b60693
added back split cell verifications
daniel-sanche Aug 8, 2023
24de29e
got all acceptance tests passing
daniel-sanche Aug 8, 2023
9c9ba5e
track last yielded key
daniel-sanche Aug 8, 2023
5d49ad8
track reamining count
daniel-sanche Aug 8, 2023
5ecfa3e
added retries
daniel-sanche Aug 8, 2023
13603ea
added exception conversion
daniel-sanche Aug 8, 2023
b40e1cb
moved into single function
daniel-sanche Aug 8, 2023
0f58066
Fixed style issues
daniel-sanche Aug 9, 2023
07341da
Merge branch 'v3' into optimize_read_rows_2
daniel-sanche Aug 9, 2023
90b85b3
fixed row and helpers tests
daniel-sanche Aug 9, 2023
4298e38
fixed some read rows tests
daniel-sanche Aug 9, 2023
62f1bf7
comments and cleanup
daniel-sanche Aug 9, 2023
98510c3
fixed issues with conformance tests
daniel-sanche Aug 9, 2023
4cf4aab
improved proto conversion
daniel-sanche Aug 9, 2023
0f0ccee
fixed read_rows tests
daniel-sanche Aug 10, 2023
42f44da
fixed tests
daniel-sanche Aug 10, 2023
3d1804c
optimizing query class
daniel-sanche Aug 10, 2023
b0882af
keep filter and limit out of proto storage
daniel-sanche Aug 10, 2023
2f2286d
fixed tests
daniel-sanche Aug 10, 2023
fbe298e
skip check when not necessary
daniel-sanche Aug 10, 2023
2a82343
experiment: yield in row batches
daniel-sanche Aug 10, 2023
4e6dd6f
Revert "experiment: yield in row batches"
daniel-sanche Aug 10, 2023
daa7a59
removed benchmarks dir
daniel-sanche Aug 10, 2023
e91d693
fixed type annotation
daniel-sanche Aug 10, 2023
cb28451
added slots to query
daniel-sanche Aug 10, 2023
1dd4d0c
use separate instances for each run
daniel-sanche Aug 14, 2023
6d2dab6
made _ReadRowOperation into slots
daniel-sanche Aug 14, 2023
309405a
clean up style issues
daniel-sanche Aug 14, 2023
e36d70b
removed commented test
daniel-sanche Aug 14, 2023
eb4bcfa
use optimizations from retries
daniel-sanche Aug 15, 2023
f6e8cd2
removed layer of wrapping
daniel-sanche Aug 15, 2023
5037891
fixed tests
daniel-sanche Aug 15, 2023
ea5b4f9
updated retryable stream submodule
daniel-sanche Aug 15, 2023
9456196
updated dependency version
daniel-sanche Aug 16, 2023
b9f8cdf
added todo
daniel-sanche Aug 16, 2023
fa0734c
Merge branch 'v3' into optimize_retries
daniel-sanche Aug 17, 2023
3323a66
moved exception builder into own method
daniel-sanche Aug 17, 2023
76d6df7
updated constraints
daniel-sanche Aug 17, 2023
bf669ad
fixed lint issues
daniel-sanche Aug 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
optimizing query class
  • Loading branch information
daniel-sanche committed Aug 10, 2023
commit 3d1804c063e8d2cef6ca168b531cb0f16f7cb4f9
169 changes: 78 additions & 91 deletions google/cloud/bigtable/data/read_rows_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from bisect import bisect_left
from bisect import bisect_right
from collections import defaultdict
from dataclasses import dataclass
from google.cloud.bigtable.data.row_filters import RowFilter

from google.cloud.bigtable_v2.types import RowRange as RowRangePB
Expand All @@ -29,27 +28,13 @@
from google.cloud.bigtable.data import ShardedQuery


@dataclass
class _RangePoint:
"""Model class for a point in a row range"""

key: bytes
is_inclusive: bool

def __hash__(self) -> int:
return hash((self.key, self.is_inclusive))

def __eq__(self, other: Any) -> bool:
if not isinstance(other, _RangePoint):
return NotImplemented
return self.key == other.key and self.is_inclusive == other.is_inclusive


class RowRange:
"""
Represents a range of keys in a ReadRowsQuery
"""

__slots__ = ("_pb",)

def __init__(
self,
start_key: str | bytes | None = None,
Expand Down Expand Up @@ -95,58 +80,63 @@ def __init__(
if start_key is not None and end_key is not None and start_key > end_key:
raise ValueError("start_key must be less than or equal to end_key")

self._start: _RangePoint | None = (
_RangePoint(start_key, start_is_inclusive)
if start_key is not None
else None
)
self._end: _RangePoint | None = (
_RangePoint(end_key, end_is_inclusive) if end_key is not None else None
)
init_dict = {}
if start_key is not None:
if start_is_inclusive:
init_dict["start_key_closed"] = start_key
else:
init_dict["start_key_open"] = start_key
if end_key is not None:
if end_is_inclusive:
init_dict["end_key_closed"] = end_key
else:
init_dict["end_key_open"] = end_key
self._pb = RowRangePB(**init_dict)

@property
def start_key(self) -> bytes | None:
"""
Returns the start key of the range. If None, the range is unbounded on the left.
"""
return self._start.key if self._start is not None else None
return self._pb.start_key_closed or self._pb.start_key_open or None

@property
def end_key(self) -> bytes | None:
"""
Returns the end key of the range. If None, the range is unbounded on the right.
"""
return self._end.key if self._end is not None else None
return self._pb.end_key_closed or self._pb.end_key_open or None

@property
def start_is_inclusive(self) -> bool:
"""
Returns whether the range is inclusive of the start key.
Returns True if the range is unbounded on the left.
"""
return self._start.is_inclusive if self._start is not None else True
return bool(self._pb.start_key_closed)

@property
def end_is_inclusive(self) -> bool:
"""
Returns whether the range is inclusive of the end key.
Returns True if the range is unbounded on the right.
"""
return self._end.is_inclusive if self._end is not None else True
return bool(self._pb.end_key_closed)

def _to_dict(self) -> dict[str, bytes]:
"""Converts this object to a dictionary"""
output = {}
if self._start is not None:
if self.start_key is not None:
key = "start_key_closed" if self.start_is_inclusive else "start_key_open"
output[key] = self._start.key
if self._end is not None:
output[key] = self.start_key
if self.end_key is not None:
key = "end_key_closed" if self.end_is_inclusive else "end_key_open"
output[key] = self._end.key
output[key] = self.end_key
return output

def __hash__(self) -> int:
return hash((self._start, self._end))
def _to_pb(self) -> RowRangePB:
"""Converts this object to a protobuf"""
return self._pb

@classmethod
def _from_dict(cls, data: dict[str, bytes]) -> RowRange:
Expand All @@ -163,30 +153,23 @@ def _from_dict(cls, data: dict[str, bytes]) -> RowRange:
)

@classmethod
def _from_points(
cls, start: _RangePoint | None, end: _RangePoint | None
) -> RowRange:
"""Creates a RowRange from two RangePoints"""
kwargs: dict[str, Any] = {}
if start is not None:
kwargs["start_key"] = start.key
kwargs["start_is_inclusive"] = start.is_inclusive
if end is not None:
kwargs["end_key"] = end.key
kwargs["end_is_inclusive"] = end.is_inclusive
return cls(**kwargs)
def _from_pb(cls, data: RowRangePB) -> RowRange:
"""Creates a RowRange from a protobuf"""
instance = cls()
instance._pb = data
return instance

def __bool__(self) -> bool:
"""
Empty RowRanges (representing a full table scan) are falsy, because
they can be substituted with None. Non-empty RowRanges are truthy.
"""
return self._start is not None or self._end is not None
return bool(self._pb.start_key_closed or self._pb.start_key_open or self._pb.end_key_closed or self._pb.end_key_open)

def __eq__(self, other: Any) -> bool:
if not isinstance(other, RowRange):
return NotImplemented
return self._start == other._start and self._end == other._end
return self._pb == other._pb

def __str__(self) -> str:
"""
Expand All @@ -206,7 +189,7 @@ def __repr__(self) -> str:
if self.start_is_inclusive is False:
# only show start_is_inclusive if it is different from the default
args_list.append(f"start_is_inclusive={self.start_is_inclusive}")
if self.end_is_inclusive is True and self._end is not None:
if self.end_is_inclusive is True and self.end_key is not None:
# only show end_is_inclusive if it is different from the default
args_list.append(f"end_is_inclusive={self.end_is_inclusive}")
return f"RowRange({', '.join(args_list)})"
Expand Down Expand Up @@ -235,24 +218,30 @@ def __init__(
default: None (no limit)
- row_filter: a RowFilter to apply to the query
"""
self.row_keys: set[bytes] = set()
self.row_ranges: set[RowRange] = set()
if row_ranges is not None:
if isinstance(row_ranges, RowRange):
row_ranges = [row_ranges]
for r in row_ranges:
self.add_range(r)
if row_keys is not None:
if not isinstance(row_keys, list):
row_keys = [row_keys]
for k in row_keys:
self.add_key(k)
self.limit: int | None = limit
self.filter: RowFilter | None = row_filter
row_ranges = row_ranges or []
row_keys = row_keys or []
if not isinstance(row_ranges, list):
row_ranges = [row_ranges]
if not isinstance(row_keys, list):
row_keys = [row_keys]
self._row_set = RowSetPB(row_keys=row_keys, row_ranges=[r._pb for r in row_ranges])
self._pb = ReadRowsRequestPB(
rows=self._row_set,
filter=row_filter._to_pb() if row_filter else None,
rows_limit=limit,
)

@property
def row_keys(self) -> list[bytes]:
return self._pb.rows.row_keys

@property
def row_ranges(self) -> list[RowRange]:
return [RowRange._from_pb(r) for r in self._pb.rows.row_ranges]

@property
def limit(self) -> int | None:
return self._limit
return self._pb.rows_limit or None

@limit.setter
def limit(self, new_limit: int | None):
Expand All @@ -270,11 +259,11 @@ def limit(self, new_limit: int | None):
"""
if new_limit is not None and new_limit < 0:
raise ValueError("limit must be >= 0")
self._limit = new_limit
self._pb.rows_limit = new_limit or 0

@property
def filter(self) -> RowFilter | None:
return self._filter
return self._pb.filter

@filter.setter
def filter(self, row_filter: RowFilter | None):
Expand All @@ -286,7 +275,7 @@ def filter(self, row_filter: RowFilter | None):
Returns:
- a reference to this query for chaining
"""
self._filter = row_filter
self._pb.filter = row_filter._to_pb() if row_filter else None

def add_key(self, row_key: str | bytes):
"""
Expand All @@ -305,7 +294,7 @@ def add_key(self, row_key: str | bytes):
row_key = row_key.encode()
elif not isinstance(row_key, bytes):
raise ValueError("row_key must be string or bytes")
self.row_keys.add(row_key)
self._pb.rows.row_keys.append(row_key)

def add_range(
self,
Expand All @@ -317,7 +306,7 @@ def add_range(
Args:
- row_range: a range of row keys to add to this query
"""
self.row_ranges.add(row_range)
self._pb.rows.row_ranges.append(row_range._pb)

def shard(self, shard_keys: RowKeySamples) -> ShardedQuery:
"""
Expand Down Expand Up @@ -383,24 +372,24 @@ def _shard_range(
- a list of tuples, containing a segment index and a new sub-range.
"""
# 1. find the index of the segment the start key belongs to
if orig_range._start is None:
if orig_range.start_key is None:
# if range is open on the left, include first segment
start_segment = 0
else:
# use binary search to find the segment the start key belongs to
# bisect method determines how we break ties when the start key matches a split point
# if inclusive, bisect_left to the left segment, otherwise bisect_right
bisect = bisect_left if orig_range._start.is_inclusive else bisect_right
start_segment = bisect(split_points, orig_range._start.key)
bisect = bisect_left if orig_range.start_is_inclusive else bisect_right
start_segment = bisect(split_points, orig_range.start_key)

# 2. find the index of the segment the end key belongs to
if orig_range._end is None:
if orig_range.end_key is None:
# if range is open on the right, include final segment
end_segment = len(split_points)
else:
# use binary search to find the segment the end key belongs to.
end_segment = bisect_left(
split_points, orig_range._end.key, lo=start_segment
split_points, orig_range.end_key, lo=start_segment
)
# note: end_segment will always bisect_left, because split points represent inclusive ends
# whether the end_key is includes the split point or not, the result is the same segment
Expand All @@ -415,18 +404,22 @@ def _shard_range(
# 3a. add new range for first segment this_range spans
# first range spans from start_key to the split_point representing the last key in the segment
last_key_in_first_segment = split_points[start_segment]
start_range = RowRange._from_points(
start=orig_range._start,
end=_RangePoint(last_key_in_first_segment, is_inclusive=True),
start_range = RowRange(
start_key=orig_range.start_key,
start_is_inclusive=orig_range.start_is_inclusive,
end_key=last_key_in_first_segment,
end_is_inclusive=True,
)
results.append((start_segment, start_range))
# 3b. add new range for last segment this_range spans
# we start the final range using the end key from of the previous segment, with is_inclusive=False
previous_segment = end_segment - 1
last_key_before_segment = split_points[previous_segment]
end_range = RowRange._from_points(
start=_RangePoint(last_key_before_segment, is_inclusive=False),
end=orig_range._end,
end_range = RowRange(
start_key=last_key_before_segment,
start_is_inclusive=False,
end_key=orig_range.end_key,
end_is_inclusive=orig_range.end_is_inclusive,
)
results.append((end_segment, end_range))
# 3c. add new spanning range to all segments other than the first and last
Expand Down Expand Up @@ -474,16 +467,10 @@ def _to_pb(self, table) -> ReadRowsRequestPB:
Convert this query into a dictionary that can be used to construct a
ReadRowsRequest protobuf
"""
return ReadRowsRequestPB(
table_name=table.table_name,
app_profile_id=table.app_profile_id,
rows=RowSetPB(
row_keys=list(self.row_keys),
row_ranges=[RowRangePB(r._to_dict()) for r in self.row_ranges],
),
filter=self._filter._to_pb() if self._filter else None,
rows_limit=self.limit,
)
# self._pb.table_name = table.table_name
# if table.app_profile_id:
# self._pb.app_profile_id = table.app_profile_id
return ReadRowsRequestPB(self._pb, table_name=table.table_name, app_profile_id=table.app_profile_id)

def __eq__(self, other):
"""
Expand Down