Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
f784121
[WIP] Getting started on Pub/Sub.
May 25, 2017
c8a9fd6
stuff
May 25, 2017
9b09b8f
WIP
May 25, 2017
8f0832e
wip
lukesneeringer May 25, 2017
02d7658
WIP
May 30, 2017
3a3ebe9
Merge branch 'pubsub' of github.com:GoogleCloudPlatform/google-cloud-…
May 30, 2017
503d11f
WIP, fixing bugs.
May 30, 2017
3b21b93
WIP
May 30, 2017
1e879a1
wip
May 31, 2017
4bf0552
wip
May 31, 2017
c6dc098
wip
May 31, 2017
13821a7
wip
May 31, 2017
c2d7af8
wip
Jun 1, 2017
4c0bf84
Merge branch 'public-master' into pubsub
Jun 1, 2017
98c1c6c
Add GAPIC/proto in the base branch to remove them from comparison.
Jun 1, 2017
0771416
Do GAPIC changeout on base branch.
Jun 1, 2017
1d05d81
Merge branch 'public-master' into pubsub
Jun 2, 2017
783e9dd
Merge branch 'pubsub' into pubsub-publisher
Jun 2, 2017
7dd719f
Clean up a couple small things.
Jun 2, 2017
c1042ac
A couple more small fixes.
Jun 2, 2017
ccaa865
WIP
Jun 2, 2017
f2ee4d4
Rework based on @jonparrott concurrency ideas.
Jun 2, 2017
12d5546
Refactor the batching implementation.
Jun 2, 2017
e99d959
Remove unrelated files.
Jun 2, 2017
4774f2a
wip
lukesneeringer Jun 3, 2017
0b55ff5
wip
lukesneeringer Jun 3, 2017
210ef3b
Wrote some docs; not much else.
lukesneeringer Jun 3, 2017
7cd6156
subscriber wip
lukesneeringer Jun 4, 2017
fc4ead3
WIP
Jun 5, 2017
890de3a
wip
Jun 5, 2017
12ace0e
wip
Jun 7, 2017
7305000
wip
lukesneeringer Jun 14, 2017
1a53c37
Honor size and message count limits.
lukesneeringer Jun 15, 2017
e5a27ae
Fix a couple minor lint issues.
Jun 27, 2017
d50a22e
Adapting a subscriber that will work.
Jun 27, 2017
14f200a
WIP
Jun 27, 2017
6a7e846
Implement lease management.
Jun 28, 2017
3bb130b
WIP
Jun 29, 2017
f97dc23
WIP
Jun 29, 2017
303436c
WIP
Jul 5, 2017
933d2f3
WIP
Jul 13, 2017
1df0ccf
WIP
Jul 13, 2017
acb4534
WIP
Jul 13, 2017
2fb2785
Update subscriber client config to be sane.
Jul 13, 2017
ef178e9
Start adding unit tests.
Jul 18, 2017
147ad18
Beginning work on unit tests.
Jul 19, 2017
9a6b7cb
Update publisher to be thread-based.
Jul 20, 2017
c96367a
Merge branch 'pubsub-publisher' into pubsub-subscriber
Jul 20, 2017
9c701e3
Publisher tests complete.
Jul 25, 2017
de38b83
subscriber/client.py tests
Jul 26, 2017
faeaa8e
Consumer tests
Jul 26, 2017
d467719
Fix minor linting error.
Jul 26, 2017
c821d33
Histogram tests
Jul 26, 2017
ed750b2
Minor fix based on Max feedback.
Jul 26, 2017
216310c
starting on helper thread tests
Jul 26, 2017
a1fd287
Add tests for helper_threads.
Jul 27, 2017
32701e1
Almost done with unit tests.
Jul 27, 2017
34272ad
Full coverage.
lukesneeringer Jul 27, 2017
e1c7c84
Do not send policy across the concurrency boundary.
Jul 31, 2017
2b21f48
Shift flow control to the policy class.
Jul 31, 2017
7f4b91c
Move the request queue to using keyword arguments.
Jul 31, 2017
3852805
Can has flow control.
Aug 1, 2017
b697be2
Merge branch 'public-master' into pubsub-subscriber
Aug 1, 2017
b964c11
Start working on docs.
Aug 2, 2017
81b37f4
Subscription fixes.
Aug 3, 2017
5784d4d
Change batch time, add gRPC time logging.
Aug 3, 2017
97d8431
Unit test fix.
Aug 3, 2017
cb7dc05
Minor RST fixes (thanks @jonparrott).
Aug 4, 2017
6994465
Remove the ignore in .flake8.
Aug 4, 2017
eae7e14
Set gRPC limit to 20MB + 1
Aug 9, 2017
6afcd2a
Suppress not-working grpc options.
lukesneeringer Aug 15, 2017
e8c0a78
Merge branch 'public-master' into pubsub-subscriber
Aug 18, 2017
5398487
Merge branch 'pubsub-subscriber' into pubsub-docs
Aug 18, 2017
8c7c30e
Narrative publishing docs.
Aug 18, 2017
8b502ed
Fix RST misformatting.
Aug 21, 2017
965b2ba
Subscriber docs, 50% complete.
Aug 21, 2017
3cc1c09
Manual layer documentation
Aug 21, 2017
41cfc08
Merge branch 'public-master' into pubsub-subscriber
Aug 21, 2017
0de9cd1
Merge branch 'pubsub-subscriber' into pubsub-docs
Aug 21, 2017
13532b3
Doc updates.
Aug 21, 2017
b0f03a8
Merge branch 'public-master' into pubsub-publisher
Aug 21, 2017
dd096e1
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
570f48d
Merge branch 'public-master' into pubsub
Aug 21, 2017
dfe52ef
Merge branch 'pubsub' into pubsub-publisher
Aug 21, 2017
f553fd6
Add better Batch docstring.
Aug 21, 2017
356749a
Improve the max latency thread comments.
Aug 21, 2017
8242c9d
Collapse property docstrings.
Aug 21, 2017
db87dab
More @jonparrott feedback.
Aug 21, 2017
58072b8
Remove the client as a public item in the base batch.
Aug 21, 2017
8f67488
Remove the rejection batch.
Aug 21, 2017
a86d9b7
Lock batch acquisition.
Aug 21, 2017
df18615
Alter exception superclass.
Aug 21, 2017
5f0549b
Inherit from google.api.core.future.Future.
Aug 21, 2017
b76d363
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
101d9ca
Move to @jonparrott's Future interface.
Aug 21, 2017
760bef6
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
f196b5e
Fix some tests to match new futures.
Aug 21, 2017
e6e58bb
Move Future off into its own module.
Aug 21, 2017
5dbfd0a
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
9fd490c
Add is not None.
Aug 21, 2017
ee144aa
Move the future tests to match the code.
Aug 21, 2017
40ea1e6
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
8cb8f98
Fix a publish failure test.
Aug 21, 2017
47678c3
Fix final test.
Aug 21, 2017
d2a130d
Merge branch 'pubsub-subscriber' into pubsub-docs
Aug 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Can has flow control.
  • Loading branch information
Luke Sneeringer committed Aug 1, 2017
commit 3852805e8b9115f491cfdc60a3eb9b3f729df8b2
5 changes: 3 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,11 @@ def _request_generator_thread(self):
"""
# First, yield the initial request. This occurs on every new
# connection, fundamentally including a resumed connection.
initial_request = self._policy.get_initial_request(ack_queue=True)
_LOGGER.debug('Sending initial request: {initial_request}'.format(
initial_request=self._policy.initial_request,
initial_request=initial_request,
))
yield self._policy.initial_request
yield initial_request

# Now yield each of the items on the request queue, and block if there
# are none. This can and must block to keep the stream open.
Expand Down
8 changes: 5 additions & 3 deletions pubsub/google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def ack(self):
time_to_ack = math.ceil(time.time() - self._received_timestamp)
self._request_queue.put(('ack', {
'ack_id': self._ack_id,
'byte_size': self.size,
'time_to_ack': time_to_ack,
}))
self.drop()

def drop(self):
"""Release the message from lease management.
Expand Down Expand Up @@ -196,5 +196,7 @@ def nack(self):

This will cause the message to be re-delivered to the subscription.
"""
self.modify_ack_deadline(seconds=0)
self.drop()
self._request_queue.put(('nack', {
'ack_id': self._ack_id,
'byte_size': self.size,
}))
168 changes: 135 additions & 33 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import absolute_import, division

import abc
import logging
Expand Down Expand Up @@ -69,10 +69,15 @@ def __init__(self, client, subscription,
self._consumer = consumer.Consumer(self)
self._ack_deadline = 10
self._last_histogram_size = 0
self._bytes = 0
self.flow_control = flow_control
self.histogram = histogram.Histogram(data=histogram_data)

# These are for internal flow control tracking.
# They should not need to be used by subclasses.
self._bytes = 0
self._ack_on_resume = set()
self._paused = False

@property
def ack_deadline(self):
"""Return the appropriate ack deadline.
Expand All @@ -92,18 +97,6 @@ def ack_deadline(self):
self._ack_deadline = self.histogram.percentile(percent=99)
return self._ack_deadline

@property
def initial_request(self):
"""Return the initial request.

This defines the initial request that must always be sent to Pub/Sub
immediately upon opening the subscription.
"""
return types.StreamingPullRequest(
stream_ack_deadline_seconds=self.histogram.percentile(99),
subscription=self.subscription,
)

@property
def managed_ack_ids(self):
"""Return the ack IDs currently being managed by the policy.
Expand All @@ -124,19 +117,54 @@ def subscription(self):
"""
return self._subscription

def ack(self, ack_id, time_to_ack=None):
@property
def _load(self):
"""Return the current load.

The load is represented as a float, where 1.0 represents having
hit one of the flow control limits, and values between 0.0 and 1.0
represent how close we are to them. (0.5 means we have exactly half
of what the flow control setting allows, for example.)

There are (currently) two flow control settings; this property
computes how close the subscriber is to each of them, and returns
whichever value is higher. (It does not matter that we have lots of
running room on setting A if setting B is over.)

Returns:
float: The load value.
"""
return max([
len(self.managed_ack_ids) / self.flow_control.max_messages,
self._bytes / self.flow_control.max_bytes,
])

def ack(self, ack_id, time_to_ack=None, byte_size=None):
"""Acknowledge the message corresponding to the given ack_id.

Args:
ack_id (str): The ack ID.
time_to_ack (int): The time it took to ack the message, measured
from when it was received from the subscription. This is used
to improve the automatic ack timing.
byte_size (int): The size of the PubSub message, in bytes.
"""
# If we got timing information, add it to the histogram.
if time_to_ack is not None:
self.histogram.add(int(time_to_ack))
request = types.StreamingPullRequest(ack_ids=[ack_id])
self._consumer.send_request(request)

# Send the request to ack the message.
# However, if the consumer is inactive, then queue the ack_id here
# instead; it will be acked as part of the initial request when the
# consumer is started again.
if self._consumer.active:
request = types.StreamingPullRequest(ack_ids=[ack_id])
self._consumer.send_request(request)
else:
self._ack_on_resume.add(ack_id)

# Remove the message from lease management.
self.drop(ack_id=ack_id, byte_size=byte_size)

def call_rpc(self, request_generator):
"""Invoke the Pub/Sub streaming pull RPC.
Expand All @@ -155,22 +183,89 @@ def drop(self, ack_id, byte_size):
ack_id (str): The ack ID.
byte_size (int): The size of the PubSub message, in bytes.
"""
# Remove the ack ID from lease management, and decrement the
# byte counter.
if ack_id in self.managed_ack_ids:
self.managed_ack_ids.remove(ack_id)
self._bytes -= byte_size
self._bytes = min([self._bytes, 0])

# If we have been paused by flow control, check and see if we are
# back within our limits.
#
# In order to not thrash too much, require us to have passed below
# the resume threshold (80% by default) of each flow control setting
# before restarting.
if self._paused and self._load < self.flow_control.resume_threshold:
self._paused = False
self.open(self._callback)

def get_initial_request(self, ack_queue=False):
"""Return the initial request.

This defines the initial request that must always be sent to Pub/Sub
immediately upon opening the subscription.

Args:
ack_queue (bool): Whether to include any acks that were sent
while the connection was paused.

Returns:
~.pubsub_v1.types.StreamingPullRequest: A request suitable
for being the first request on the stream (and not suitable
for any other purpose).

.. note::
If ``ack_queue`` is set to True, this includes the ack_ids, but
also clears the internal set.

This means that calls to :meth:`get_initial_request` with
``ack_queue`` set to True are not idempotent.
"""
# Any ack IDs that are under lease management and not being acked
# need to have their deadline extended immediately.
ack_ids = set()
lease_ids = self.managed_ack_ids
if ack_queue:
ack_ids = self._ack_on_resume
lease_ids = lease_ids.difference(ack_ids)

# Put the request together.
request = types.StreamingPullRequest(
ack_ids=list(ack_ids),
modify_deadline_ack_ids=list(lease_ids),
modify_deadline_seconds=[self.ack_deadline] * len(lease_ids),
stream_ack_deadline_seconds=self.histogram.percentile(99),
subscription=self.subscription,
)

# Clear the ack_ids set.
# Note: If `ack_queue` is False, this just ends up being a no-op,
# since the set is just an empty set.
ack_ids.clear()

# Return the initial request.
return request

def lease(self, ack_id, byte_size):
"""Add the given ack ID to lease management.

Args:
ack_id (str): The ack ID.
byte_size (int): The size of the PubSub message, in bytes.
"""
# Add the ack ID to the set of managed ack IDs, and increment
# the size counter.
if ack_id not in self.managed_ack_ids:
self.managed_ack_ids.add(ack_id)
self._bytes += byte_size

# Sanity check: Do we have too many things in our inventory?
# If we do, we need to stop the stream.
if self._load >= 1.0:
self._paused = True
self.close()

def maintain_leases(self):
"""Maintain all of the leases being managed by the policy.

Expand Down Expand Up @@ -202,7 +297,7 @@ def maintain_leases(self):
# it is more efficient to make a single request.
ack_ids = list(self.managed_ack_ids)
logger.debug('Renewing lease for %d ack IDs.' % len(ack_ids))
if len(ack_ids) > 0:
if len(ack_ids) > 0 and self._consumer.active:
request = types.StreamingPullRequest(
modify_deadline_ack_ids=ack_ids,
modify_deadline_seconds=[p99] * len(ack_ids),
Expand Down Expand Up @@ -233,13 +328,33 @@ def modify_ack_deadline(self, ack_id, seconds):
)
self._consumer.send_request(request)

def nack(self, ack_id):
def nack(self, ack_id, byte_size=None):
"""Explicitly deny receipt of a message.

Args:
ack_id (str): The ack ID.
byte_size (int): The size of the PubSub message, in bytes.
"""
self.modify_ack_deadline(ack_id=ack_id, seconds=0)
self.drop(ack_id=ack_id, byte_size=byte_size)

@abc.abstractmethod
def close(self):
"""Close the existing connection."""
raise NotImplementedError

@abc.abstractmethod
def on_exception(self, exception):
"""Called when a gRPC exception occurs.

If this method does nothing, then the stream is re-started. If this
raises an exception, it will stop the consumer thread.
This is executed on the response consumer helper thread.

Args:
exception (Exception): The exception raised by the RPC.
"""
return self.modify_ack_deadline(ack_id=ack_id, seconds=0)
raise NotImplementedError

@abc.abstractmethod
def on_response(self, response):
Expand All @@ -262,19 +377,6 @@ def on_response(self, response):
"""
raise NotImplementedError

@abc.abstractmethod
def on_exception(self, exception):
"""Called when a gRPC exception occurs.

If this method does nothing, then the stream is re-started. If this
raises an exception, it will stop the consumer thread.
This is executed on the response consumer helper thread.

Args:
exception (Exception): The exception raised by the RPC.
"""
raise NotImplementedError

@abc.abstractmethod
def open(self, callback):
"""Open a streaming pull connection and begin receiving messages.
Expand Down
3 changes: 2 additions & 1 deletion pubsub/google/cloud/pubsub_v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@
# these settings can be altered to tweak Pub/Sub behavior.
# The defaults should be fine for most use cases.
FlowControl = collections.namedtuple('FlowControl',
['max_bytes', 'max_messages'],
['max_bytes', 'max_messages', 'resume_threshold'],
)
FlowControl.__new__.__defaults__ = (
psutil.virtual_memory().total * 0.2, # max_bytes: 20% of total RAM
float('inf'), # max_messages: no limit
0.8, # resume_threshold: 80%
)


Expand Down
15 changes: 8 additions & 7 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ def test_ack():
msg.ack()
put.assert_called_once_with(('ack', {
'ack_id': 'bogus_ack_id',
'byte_size': 25,
'time_to_ack': mock.ANY,
}))
drop.assert_called_once_with()


def test_drop():
Expand Down Expand Up @@ -93,9 +93,10 @@ def test_modify_ack_deadline():


def test_nack():
msg = create_message(b'foo')
with mock.patch.object(message.Message, 'modify_ack_deadline') as mad:
with mock.patch.object(message.Message, 'drop') as drop:
msg.nack()
mad.assert_called_once_with(seconds=0)
drop.assert_called_once_with()
msg = create_message(b'foo', ack_id='bogus_id')
with mock.patch.object(msg._request_queue, 'put') as put:
msg.nack()
put.assert_called_once_with(('nack', {
'ack_id': 'bogus_id',
'byte_size': 25,
}))
Loading