Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f784121
[WIP] Getting started on Pub/Sub.
May 25, 2017
c8a9fd6
stuff
May 25, 2017
9b09b8f
WIP
May 25, 2017
8f0832e
wip
lukesneeringer May 25, 2017
02d7658
WIP
May 30, 2017
3a3ebe9
Merge branch 'pubsub' of github.com:GoogleCloudPlatform/google-cloud-…
May 30, 2017
503d11f
WIP, fixing bugs.
May 30, 2017
3b21b93
WIP
May 30, 2017
1e879a1
wip
May 31, 2017
4bf0552
wip
May 31, 2017
c6dc098
wip
May 31, 2017
13821a7
wip
May 31, 2017
c2d7af8
wip
Jun 1, 2017
4c0bf84
Merge branch 'public-master' into pubsub
Jun 1, 2017
783e9dd
Merge branch 'pubsub' into pubsub-publisher
Jun 2, 2017
7dd719f
Clean up a couple small things.
Jun 2, 2017
c1042ac
A couple more small fixes.
Jun 2, 2017
ccaa865
WIP
Jun 2, 2017
f2ee4d4
Rework based on @jonparrott concurrency ideas.
Jun 2, 2017
12d5546
Refactor the batching implementation.
Jun 2, 2017
e99d959
Remove unrelated files.
Jun 2, 2017
4774f2a
wip
lukesneeringer Jun 3, 2017
1a53c37
Honor size and message count limits.
lukesneeringer Jun 15, 2017
9a6b7cb
Update publisher to be thread-based.
Jul 20, 2017
b0f03a8
Merge branch 'public-master' into pubsub-publisher
Aug 21, 2017
dfe52ef
Merge branch 'pubsub' into pubsub-publisher
Aug 21, 2017
f553fd6
Add better Batch docstring.
Aug 21, 2017
356749a
Improve the max latency thread comments.
Aug 21, 2017
8242c9d
Collapse property docstrings.
Aug 21, 2017
db87dab
More @jonparrott feedback.
Aug 21, 2017
58072b8
Remove the client as a public item in the base batch.
Aug 21, 2017
8f67488
Remove the rejection batch.
Aug 21, 2017
a86d9b7
Lock batch acquisition.
Aug 21, 2017
df18615
Alter exception superclass.
Aug 21, 2017
5f0549b
Inherit from google.api.core.future.Future.
Aug 21, 2017
101d9ca
Move to @jonparrott's Future interface.
Aug 21, 2017
e6e58bb
Move Future off into its own module.
Aug 21, 2017
9fd490c
Add is not None.
Aug 21, 2017
469400e
Feedback.
Aug 22, 2017
31c0c81
Use concurrent.futures.TimeoutError where possible.
Aug 22, 2017
04ac278
Add a lock on commit to ensure no duplication.
Aug 22, 2017
37ee908
Move to an Event.
Aug 22, 2017
12d44be
Make _names private.
Aug 22, 2017
6fe1309
Pubsub subscriber (#3637)
lukesneeringer Aug 24, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
  • Loading branch information
Luke Sneeringer committed May 31, 2017
commit 1e879a1e8df4fbcbe94c665f4fa9888edf2c9217
121 changes: 70 additions & 51 deletions pubsub/google/cloud/pubsub_v1/publisher/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import six

from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher import future

QueueItem = collections.namedtuple('QueueItem', ['message', 'future'])


class Batch(object):
"""A batch of messages.
Expand Down Expand Up @@ -59,16 +58,24 @@ class Batch(object):
"""
def __init__(self, client, topic, settings, autocommit=True):
self._client = client
self._topic = topic
self._settings = settings
self._messages = queue.Queue()
self._futures = queue.Queue()
self._status = 'accepting messages'
self._message_ids = {}

# Create a namespace that is owned by the client manager; this

This comment was marked as spam.

# is necessary to be able to have these values be communicable between
# processes.
self._ = self.manager.Namespace()

This comment was marked as spam.

This comment was marked as spam.

self._.futures = self.manager.list()
self._.messages = self.manager.list()
self._.message_ids = self.manager.dict()
self._.settings = settings
self._.status = 'accepting messages'

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

self._.topic = topic

# This is purely internal tracking.
self._process = None

# Continually monitor the thread until it is time to commit the
# batch, or the batch is explicitly committed.
if autocommit and self._settings.max_latency < float('inf'):
if autocommit and self._.settings.max_latency < float('inf'):
self._process = self._client.thread_class(target=self.monitor)
self._process.start()

Expand All @@ -82,6 +89,16 @@ def client(self):
"""
return self._client

@property
def manager(self):
"""Return the client's manager.

Returns:
:class:`multiprocessing.Manager`: The manager responsible for
handling shared memory objects.
"""
return self._client.manager

@property
def status(self):
"""Return the status of this batch.
Expand All @@ -90,7 +107,7 @@ def status(self):
str: The status of this batch. All statuses are human-readable,
all-lowercase strings.
"""
return self._status
return self._.status

def commit(self):
"""Actually publish all of the messages on the active batch.
Expand All @@ -99,52 +116,46 @@ def commit(self):
batch on the publisher, and then the batch is discarded upon
completion.
"""
# If this is the active batch on the cleint right now, remove it.
self._client.batch(self._topic, pop=self)

# Update the status.
self._status = 'in-flight'
self._.status = 'in-flight'

# Begin the request to publish these messages.
response = self._client.api.publish(self._topic, list(self.flush()))
if len(self._.messages) == 0:
raise Exception('Empty queue')
response = self._client.api.publish(self._.topic, self._.messages)

# Sanity check: If the number of message IDs is not equal to the
# number of futures I have, then something went wrong.
if len(response.message_ids) != len(self._.futures):
raise exceptions.PublishError(
'Some messages were not successfully published.',
)

# FIXME (lukesneeringer): How do I check for errors on this?
self._status = 'success'
self._.status = 'success'

# Iterate over the futures on the queue and return the response IDs.
# We are trusting that there is a 1:1 mapping, and raise an exception
# if not.
try:
for message_id in response.message_ids:
future_ = self._futures.get(block=False)
self._message_ids[future_] = message_id
future_._trigger()
except queue.Empty:
raise ValueError('More message IDs came back than messages '
'were published.')

# If the queue of futures is not empty, we did not get enough IDs
# back.
if self._futures.empty():
raise ValueError('Fewer message IDs came back than messages '
'were published.')


def flush(self):
"""Flush the messages off of this queue, one at a time.

This method is called when the batch is committed. Calling it outside
of the context of committing will effectively remove messages
from the batch.

Yields:
:class:~`pubsub_v1.types.PubsubMessage`: A Pub/Sub Message.
for mid, fut in zip(response.message_ids, self._.futures):
self._message_ids[fut] = mid
fut._trigger()

def get_message_id(self, publish_future):
"""Return the message ID corresponding to the given future.

Args:
publish_future (:class:~`future.Future`): The future returned
from a ``publish`` call.

Returns:
str: The message ID.

Raises:
KeyError: If the future is not yet done or there is no message
ID corresponding to it.
"""
try:
while True:
yield self._messages.get(block=False)
except queue.Empty:
raise StopIteration
return self._message_ids[publish_future]

def monitor(self):
"""Commit this batch after sufficient time has elapsed.
Expand All @@ -156,11 +167,11 @@ def monitor(self):
# in a separate thread.
#
# Sleep for however long we should be waiting.
time.sleep(self._settings.max_latency)
time.sleep(self._.settings.max_latency)

# If, in the intervening period, the batch started to be committed,
# then no-op at this point.
if self._status != 'accepting messages':
if self._.status != 'accepting messages':
return

# Commit.
Expand Down Expand Up @@ -216,10 +227,18 @@ def publish(self, data, **attrs):
'be sent as text strings.')

# Store the actual message in the batch's message queue.
self._messages.put(types.PubsubMessage(data=data, attributes=attrs))
self._.messages.append(
types.PubsubMessage(data=data, attributes=attrs),
)

# Return a Future. That future needs to be aware of the status
# of this batch.
f = future.Future(self)
self._futures.put(f)
f = future.Future(self._)
self._.futures.append(f)
return f


# Make a fake batch. This is used by the client to do single-op checks

This comment was marked as spam.

# for batch existence.
FakeBatch = collections.namedtuple('FakeBatch', ['status'])
FAKE = FakeBatch(status='fake')
28 changes: 18 additions & 10 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher.batch import Batch
from google.cloud.pubsub_v1.publisher.batch import FAKE


__VERSION__ = pkg_resources.get_distribution('google-cloud-pubsub').version
Expand Down Expand Up @@ -58,7 +59,11 @@ def __init__(self, batching=(), thread_class=multiprocessing.Process,
kwargs['lib_name'] = 'gccl'
kwargs['lib_version'] = __VERSION__
self.api = self._gapic_class(**kwargs)
self.batching = types.Batching(batching)
self.batching = types.Batching(*batching)

# Set the manager, which is responsible for granting shared memory
# objects.
self._manager = multiprocessing.Manager()

# Set the thread class.
self._thread_class = thread_class
Expand All @@ -67,6 +72,16 @@ def __init__(self, batching=(), thread_class=multiprocessing.Process,
# messages. One batch exists for each topic.
self._batches = {}

@property
def manager(self):
"""Return the manager.

Returns:
:class:`multiprocessing.Manager`: The manager responsible for
handling shared memory objects.
"""
return self._manager

@property
def thread_class(self):
"""Return the thread class provided at instantiation.
Expand All @@ -76,7 +91,7 @@ def thread_class(self):
"""
return self._thread_class

def batch(self, topic, create=True, pop=None):
def batch(self, topic, create=True):
"""Return the current batch.

This will create a new batch only if no batch currently exists.
Expand All @@ -85,16 +100,13 @@ def batch(self, topic, create=True, pop=None):
topic (str): A string representing the topic.
create (bool): Whether to create a new batch if no batch is
found. Defaults to True.
pop (:class:~`pubsub_v1.batch.Batch`): Pop the batch off
if it is found *and* is the batch that was sent. Defaults
to None (never pop).

Returns:
:class:~`pubsub_v1.batch.Batch` The batch object.
"""
# If there is no matching batch yet, then potentially create one
# and place it on the batches dictionary.
if topic not in self._batches:
if self._batches.get(topic, FAKE).status != 'accepting messages':

This comment was marked as spam.

if not create:
return None
self._batches[topic] = Batch(
Expand All @@ -103,10 +115,6 @@ def batch(self, topic, create=True, pop=None):
topic=topic,
)

# If we are supposed to remove the batch, pop it off and return it.
if pop and self._batches[topic] == pop:
return self._batches.pop(topic)

# Simply return the appropriate batch.
return self._batches[topic]

Expand Down
16 changes: 16 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2017, Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

class PublishError(RuntimeError):

This comment was marked as spam.

pass
29 changes: 10 additions & 19 deletions pubsub/google/cloud/pubsub_v1/publisher/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class Future(object):
methods in this library.

Args:
batch (:class:~`pubsub_v1.batch.Batch`): The batch object that
is committing this message.
batch (:class:`multiprocessing.Namespace`): Information about the
batch object that is committing this message.
"""
def __init__(self, batch):
self._batch = batch
def __init__(self, batch_info):
self._batch_info = batch_info
self._hash = hash(uuid.uuid4())
self._callbacks = queue.Queue()

Expand Down Expand Up @@ -66,7 +66,7 @@ def done(self):
This still returns True in failure cases; checking `result` or
`exception` is the canonical way to assess success or failure.
"""
return self.batch.status in ('success', 'error')
return self._batch_info.status in ('success', 'error')

def result(self, timeout=None):
"""Return the message ID, or raise an exception.
Expand All @@ -88,7 +88,7 @@ def result(self, timeout=None):
# return an appropriate value.
err = self.exception(timeout=timeout)
if err is None:
return self.batch.get_message_id(self._client_id)
return self._batch_info.message_ids[self]
raise err

def exception(self, timeout=None, _wait=1):
Expand All @@ -102,18 +102,18 @@ def exception(self, timeout=None, _wait=1):
times out and raises TimeoutError.

Raises:
:class:~`pubsub_v1.TimeoutError`: If the request times out.
:exc:`TimeoutError`: If the request times out.

Returns:
:class:`Exception`: The exception raised by the call, if any.
"""
# If the batch completed successfully, this should return None.
if self.batch.status == 'success':
if self.batch_info.status == 'success':
return None

# If this batch had an error, this should return it.
if self.batch.status == 'error':
return self.batch._error
if self.batch_info.status == 'error':
return self.batch_info.error

# If the timeout has been exceeded, raise TimeoutError.
if timeout < 0:
Expand Down Expand Up @@ -151,12 +151,3 @@ def _trigger(self):
callback(self)
except queue.Empty:
return None


class TimeoutError(object):
"""Exception subclass for timeout-related errors.

This exception is only returned by the :class:~`pubsub_v1.future.Future`
class.
"""
pass
35 changes: 35 additions & 0 deletions pubsub/google/cloud/pubsub_v1/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2017, Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

def retry(func, delay=0, count=0, err=None, **kwargs):

This comment was marked as spam.

"""Attempt to retry a function after the provided delay.

If there have been too many retries, raise an exception.

Args:
func (callable): The function to retry.
delay (int): The period to delay before retrying; specified in seconds.
count (int): The number of previous retries that have occurred.
If this is >= 5, an exception will be raised.
**kwargs (dict): Other keyword arguments to pass to the function.
"""
# If there have been too many retries, simply raise the exception.
if count >= 5:
raise err

# Sleep the given delay.
time.sleep(delay)

# Try calling the method again.
return func(delay=delay, count=count, **kwargs)
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
)
Batching.__new__.__defaults__ = (
1024 * 1024 * 5, # max_bytes: 5 MB
0.001, # max_latency: 1 millisecond
0.25, # max_latency: 0.25 seconds
1000, # max_messages: 1,000
)

Expand Down