Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address review comments
  • Loading branch information
Jon Wayne Parrott committed Jul 19, 2017
commit 5163cc19b30dbf383cd1b298a5f386376373709b
16 changes: 11 additions & 5 deletions core/google/cloud/future/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import abc
import concurrent.futures

This comment was marked as spam.

This comment was marked as spam.

import functools
import operator

import six
import tenacity
Expand Down Expand Up @@ -115,16 +117,20 @@ def _blocking_poll(self, timeout=None):
if self._result_set:

This comment was marked as spam.

This comment was marked as spam.

return

retry_on = tenacity.retry_if_result(lambda result: result is not True)
wait_on = tenacity.wait_exponential(multiplier=1, max=10)
retry_on = tenacity.retry_if_result(
functools.partial(operator.is_not, True))
# Use exponential backoff with jitter.
wait_on = (
tenacity.wait_exponential(multiplier=1, max=10) +
tenacity.wait_random(0, 1))

if timeout is not None:
if timeout is None:
retry = tenacity.retry(retry=retry_on, wait=wait_on)
else:
retry = tenacity.retry(
retry=retry_on,
wait=wait_on,
stop=tenacity.stop_after_delay(timeout))
else:
retry = tenacity.retry(retry=retry_on, wait=wait_on)

try:
retry(self.done)()
Expand Down
55 changes: 26 additions & 29 deletions core/google/cloud/future/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@


class Operation(base.PollingFuture):
"""A Future for interacting with a Google API Long-Running Operation."""
"""A Future for interacting with a Google API Long-Running Operation.

Args:
operation (google.longrunning.operations_pb2.Operation): The
initial operation.
refresh (Callable[[], Operation]): A callable that returns the
latest state of the operation.
cancel (Callable[[], None]), A callable that tries to cancel
the operation.
result_type (type): The protobuf type for the operation's result.
metadata_type (type): The protobuf type for the operation's
metadata.
"""

def __init__(
self, operation, refresh, cancel,
result_type, metadata_type=None):
"""
Args:
operation (google.longrunning.operations_pb2.Operation): The
initial operation.
refresh (Callable[[], Operation]): A callable that returns the
latest state of the operation.
cancel (Callable[[], None]), A callable that tries to cancel
the operation.
result_type (type): The protobuf type for the operation's result.
metadata_type (type): The protobuf type for the operation's
metadata.
"""
super(Operation, self).__init__()
self._operation = operation
self._refresh = refresh
Expand All @@ -56,8 +56,7 @@ def __init__(

@property
def operation(self):
"""google.longrunning.Operation: The current long-running operation
message."""
"""google.longrunning.Operation: The current long-running operation."""
return self._operation

@property
Expand All @@ -70,8 +69,7 @@ def metadata(self):
self._metadata_type, self._operation.metadata)

def _set_result_from_operation(self):
"""Set the result or exception from the current Operation message,
if it is complete."""
"""Set the result or exception from the operation if it is complete."""
# This must be done in a lock to prevent the polling thread
# and main thread from both executing the completion logic
# at the same time.
Expand All @@ -94,7 +92,8 @@ def _set_result_from_operation(self):
self.set_exception(exception)
else:
exception = exceptions.GoogleCloudError(
'Unknown operation error')
'Unexpected state: Long-running operation had neither '
'response nor error set.')

This comment was marked as spam.

self.set_exception(exception)

def _refresh_and_update(self):
Expand All @@ -105,16 +104,14 @@ def _refresh_and_update(self):
self._operation = self._refresh()
self._set_result_from_operation()

return self._operation

def done(self):
"""Checks to see if the operation is complete.

Returns:
bool: True if the operation is complete, False otherwise.
"""
operation = self._refresh_and_update()
return operation.done
self._refresh_and_update()
return self._operation.done

def cancel(self):
"""Attempt to cancel the operation.
Expand All @@ -131,17 +128,17 @@ def cancel(self):

def cancelled(self):
"""True if the operation was cancelled."""
operation = self._refresh_and_update()
return (operation.HasField('error') and
operation.error.code == code_pb2.CANCELLED)
self._refresh_and_update()
return (self._operation.HasField('error') and
self._operation.error.code == code_pb2.CANCELLED)


def _refresh_http(api_request, operation_name):
"""Refresh an operation using a JSON/HTTP client.

Args:
api_request (Callable): A callable used to make an API request. This
should generally be an instance of
should generally be
:meth:`google.cloud._http.Connection.api_request`.
operation_name (str): The name of the operation.

Expand All @@ -159,7 +156,7 @@ def _cancel_http(api_request, operation_name):

Args:
api_request (Callable): A callable used to make an API request. This
should generally be an instance of
should generally be
:meth:`google.cloud._http.Connection.api_request`.
operation_name (str): The name of the operation.
"""
Expand All @@ -174,12 +171,12 @@ def from_http_json(operation, api_request, result_type, **kwargs):
to a given API) vis `HTTP/JSON`_.

.. _HTTP/JSON: https://cloud.google.com/speech/reference/rest/\
v1beta1/operations#Operation
v1beta1/operations#Operation

Args:
operation (dict): Operation as a dictionary.
api_request (Callable): A callable used to make an API request. This

This comment was marked as spam.

This comment was marked as spam.

should generally be an instance of
should generally be
:meth:`google.cloud._http.Connection.api_request`.
result_type (type): The protobuf result type.
kwargs: Keyword args passed into the :class:`Operation` constructor.
Expand Down
2 changes: 1 addition & 1 deletion core/tests/unit/future/test_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_unexpected_result():

exception = future.exception()

assert 'Unknown operation error' in '{!r}'.format(exception)
assert 'Unexpected state' in '{!r}'.format(exception)


def test__refresh_http():
Expand Down