Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f784121
[WIP] Getting started on Pub/Sub.
May 25, 2017
c8a9fd6
stuff
May 25, 2017
9b09b8f
WIP
May 25, 2017
8f0832e
wip
lukesneeringer May 25, 2017
02d7658
WIP
May 30, 2017
3a3ebe9
Merge branch 'pubsub' of github.com:GoogleCloudPlatform/google-cloud-…
May 30, 2017
503d11f
WIP, fixing bugs.
May 30, 2017
3b21b93
WIP
May 30, 2017
1e879a1
wip
May 31, 2017
4bf0552
wip
May 31, 2017
c6dc098
wip
May 31, 2017
13821a7
wip
May 31, 2017
c2d7af8
wip
Jun 1, 2017
4c0bf84
Merge branch 'public-master' into pubsub
Jun 1, 2017
783e9dd
Merge branch 'pubsub' into pubsub-publisher
Jun 2, 2017
7dd719f
Clean up a couple small things.
Jun 2, 2017
c1042ac
A couple more small fixes.
Jun 2, 2017
ccaa865
WIP
Jun 2, 2017
f2ee4d4
Rework based on @jonparrott concurrency ideas.
Jun 2, 2017
12d5546
Refactor the batching implementation.
Jun 2, 2017
e99d959
Remove unrelated files.
Jun 2, 2017
4774f2a
wip
lukesneeringer Jun 3, 2017
1a53c37
Honor size and message count limits.
lukesneeringer Jun 15, 2017
9a6b7cb
Update publisher to be thread-based.
Jul 20, 2017
b0f03a8
Merge branch 'public-master' into pubsub-publisher
Aug 21, 2017
dfe52ef
Merge branch 'pubsub' into pubsub-publisher
Aug 21, 2017
f553fd6
Add better Batch docstring.
Aug 21, 2017
356749a
Improve the max latency thread comments.
Aug 21, 2017
8242c9d
Collapse property docstrings.
Aug 21, 2017
db87dab
More @jonparrott feedback.
Aug 21, 2017
58072b8
Remove the client as a public item in the base batch.
Aug 21, 2017
8f67488
Remove the rejection batch.
Aug 21, 2017
a86d9b7
Lock batch acquisition.
Aug 21, 2017
df18615
Alter exception superclass.
Aug 21, 2017
5f0549b
Inherit from google.api.core.future.Future.
Aug 21, 2017
101d9ca
Move to @jonparrott's Future interface.
Aug 21, 2017
e6e58bb
Move Future off into its own module.
Aug 21, 2017
9fd490c
Add is not None.
Aug 21, 2017
469400e
Feedback.
Aug 22, 2017
31c0c81
Use concurrent.futures.TimeoutError where possible.
Aug 22, 2017
04ac278
Add a lock on commit to ensure no duplication.
Aug 22, 2017
37ee908
Move to an Event.
Aug 22, 2017
12d44be
Make _names private.
Aug 22, 2017
6fe1309
Pubsub subscriber (#3637)
lukesneeringer Aug 24, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Clean up a couple small things.
  • Loading branch information
Luke Sneeringer committed Jun 2, 2017
commit 7dd719f4c68943221b2d6f6c7e2fbc0bc3b9bbf8
68 changes: 35 additions & 33 deletions pubsub/google/cloud/pubsub_v1/publisher/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ class Batch(object):
message to be published is received; subsequent messages are added to
that batch until the process of actual publishing _starts_.

Once this occurs, any new messages sent to ``publish`` open a new batch.
Once this occurs, any new messages sent to :meth:`publish` open a new
batch.

If you are using this library, you most likely do not need to instantiate
batch objects directly; they will be created for you. If you want to
change the actual batching settings, see the ``batching`` argument on
:class:`google.cloud.pubsub_v1.PublisherClient`.
:class:`~.pubsub_v1.PublisherClient`.

Args:
client (:class:`google.cloud.pubsub_v1.PublisherClient`): The
publisher client used to create this batch. Batch settings are
inferred from this.
client (~.pubsub_v1.PublisherClient): The publisher client used to
create this batch.
topic (str): The topic. The format for this is
``projects/{project}/topics/{topic}``.
settings (:class:`google.cloud.pubsub_v1.types.Batching`): The
settings for batch publishing. These should be considered
immutable once the batch has been opened.
settings (~.pubsub_v1.types.Batching): The settings for batch
publishing. These should be considered immutable once the batch
has been opened.
autocommit (bool): Whether to autocommit the batch when the time
has elapsed. Defaults to True unless ``settings.max_latency`` is
inf.
Expand All @@ -62,20 +62,20 @@ def __init__(self, client, topic, settings, autocommit=True):
# Create a namespace that is owned by the client manager; this

This comment was marked as spam.

# is necessary to be able to have these values be communicable between
# processes.
self._ = self.manager.Namespace()
self._.futures = self.manager.list()
self._.messages = self.manager.list()
self._.message_ids = self.manager.dict()
self._.settings = settings
self._.status = 'accepting messages'
self._.topic = topic
self._shared = self.manager.Namespace()
self._shared.futures = self.manager.list()
self._shared.messages = self.manager.list()
self._shared.message_ids = self.manager.dict()
self._shared.settings = settings
self._shared.status = 'accepting messages'
self._shared.topic = topic

# This is purely internal tracking.
self._process = None

# Continually monitor the thread until it is time to commit the
# batch, or the batch is explicitly committed.
if autocommit and self._.settings.max_latency < float('inf'):
if autocommit and self._shared.settings.max_latency < float('inf'):
self._process = self._client.thread_class(target=self.monitor)
self._process.start()

Expand All @@ -84,10 +84,9 @@ def client(self):
"""Return the client that created this batch.

Returns:
:class:~`pubsub_v1.client.Client`: The client that created this
batch.
~.pubsub_v1.client.Client: The client that created this batch.
"""
return self._client
return self._sharedclient

@property
def manager(self):
Expand All @@ -107,7 +106,7 @@ def status(self):
str: The status of this batch. All statuses are human-readable,
all-lowercase strings.
"""
return self._.status
return self._shared.status

def commit(self):
"""Actually publish all of the messages on the active batch.
Expand All @@ -117,12 +116,15 @@ def commit(self):
completion.
"""
# Update the status.
self._.status = 'in-flight'
self._shared.status = 'in-flight'

# Begin the request to publish these messages.
if len(self._.messages) == 0:
if len(self._shared.messages) == 0:
raise Exception('Empty queue')
response = self._client.api.publish(self._.topic, self._.messages)
response = self._client.api.publish(
self._shared.topic,
self._shared.messages,
)

# FIXME (lukesneeringer): Check for failures; retry.

This comment was marked as spam.


Expand All @@ -131,17 +133,17 @@ def commit(self):

# Sanity check: If the number of message IDs is not equal to the
# number of futures I have, then something went wrong.
if len(response.message_ids) != len(self._.futures):
if len(response.message_ids) != len(self._shared.futures):
raise exceptions.PublishError(
'Some messages were not successfully published.',
)

# Iterate over the futures on the queue and return the response IDs.
# We are trusting that there is a 1:1 mapping, and raise an exception
# if not.
self._.status = 'success'
for message_id, fut in zip(response.message_ids, self._.futures):
self._.message_ids[hash(fut)] = message_id
self._shared.status = 'success'
for message_id, fut in zip(response.message_ids, self._shared.futures):
self._shared.message_ids[hash(fut)] = message_id
fut._trigger()

def monitor(self):
Expand All @@ -154,11 +156,11 @@ def monitor(self):
# in a separate thread.
#
# Sleep for however long we should be waiting.
time.sleep(self._.settings.max_latency)
time.sleep(self._shared.settings.max_latency)

# If, in the intervening period, the batch started to be committed,
# then no-op at this point.
if self._.status != 'accepting messages':
if self._shared.status != 'accepting messages':
return

# Commit.
Expand Down Expand Up @@ -194,8 +196,8 @@ def publish(self, data, **attrs):
``attrs`` are not either a ``str`` or ``bytes``.

Returns:
Future: An object conforming to the ``concurrent.futures.Future``
interface.
~.pubsub_v1.publisher.future.Future: An object conforming to the
:class:`concurrent.futures.Future` interface.
"""
# Sanity check: Is the data being sent as a bytestring?

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

# If it is literally anything else, complain loudly about it.
Expand All @@ -214,14 +216,14 @@ def publish(self, data, **attrs):
'be sent as text strings.')

# Store the actual message in the batch's message queue.
self._.messages.append(
self._shared.messages.append(
types.PubsubMessage(data=data, attributes=attrs),
)

# Return a Future. That future needs to be aware of the status
# of this batch.
f = future.Future(self._)
self._.futures.append(f)
self._shared.futures.append(f)
return f


Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def publish(self, topic, data, **attrs):
>>> response = client.publish(topic, data, username='guido')

Args:
topic (:class:~`pubsub_v1.types.Topic`): The topic to publish
topic (~.pubsub_v1.types.Topic): The topic to publish
messages to.
data (bytes): A bytestring representing the message body. This
must be a bytestring (a text string will raise TypeError).
Expand Down