Skip to content

Commit ff1e4fe

Browse files
Jon Wayne Parrottlandrito
authored andcommitted
Add Future interface to BigQuery jobs (googleapis#3626)
* Add future interface to bigquery Jobs. * Make QueryJob return QueryResults from result() * Deprecate QueryJob.results()
1 parent e3bef45 commit ff1e4fe

File tree

3 files changed

+291
-22
lines changed

3 files changed

+291
-22
lines changed

bigquery/google/cloud/bigquery/job.py

Lines changed: 181 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,14 @@
1414

1515
"""Define API Jobs."""
1616

17+
import collections
18+
import threading
19+
import warnings
20+
1721
import six
22+
from six.moves import http_client
1823

24+
from google.cloud import exceptions
1925
from google.cloud.exceptions import NotFound
2026
from google.cloud._helpers import _datetime_from_microseconds
2127
from google.cloud.bigquery.dataset import Dataset
@@ -27,6 +33,60 @@
2733
from google.cloud.bigquery._helpers import UDFResourcesProperty
2834
from google.cloud.bigquery._helpers import _EnumProperty
2935
from google.cloud.bigquery._helpers import _TypedProperty
36+
import google.cloud.future.base
37+
38+
_DONE_STATE = 'DONE'
39+
_STOPPED_REASON = 'stopped'
40+
41+
_ERROR_REASON_TO_EXCEPTION = {
42+
'accessDenied': http_client.FORBIDDEN,
43+
'backendError': http_client.INTERNAL_SERVER_ERROR,
44+
'billingNotEnabled': http_client.FORBIDDEN,
45+
'billingTierLimitExceeded': http_client.BAD_REQUEST,
46+
'blocked': http_client.FORBIDDEN,
47+
'duplicate': http_client.CONFLICT,
48+
'internalError': http_client.INTERNAL_SERVER_ERROR,
49+
'invalid': http_client.BAD_REQUEST,
50+
'invalidQuery': http_client.BAD_REQUEST,
51+
'notFound': http_client.NOT_FOUND,
52+
'notImplemented': http_client.NOT_IMPLEMENTED,
53+
'quotaExceeded': http_client.FORBIDDEN,
54+
'rateLimitExceeded': http_client.FORBIDDEN,
55+
'resourceInUse': http_client.BAD_REQUEST,
56+
'resourcesExceeded': http_client.BAD_REQUEST,
57+
'responseTooLarge': http_client.FORBIDDEN,
58+
'stopped': http_client.OK,
59+
'tableUnavailable': http_client.BAD_REQUEST,
60+
}
61+
62+
_FakeResponse = collections.namedtuple('_FakeResponse', ['status'])
63+
64+
65+
def _error_result_to_exception(error_result):
66+
"""Maps BigQuery error reasons to an exception.
67+
68+
The reasons and their matching HTTP status codes are documented on
69+
the `troubleshooting errors`_ page.
70+
71+
.. _troubleshooting errors: https://cloud.google.com/bigquery\
72+
/troubleshooting-errors
73+
74+
:type error_result: Mapping[str, str]
75+
:param error_result: The error result from BigQuery.
76+
77+
:rtype google.cloud.exceptions.GoogleCloudError:
78+
:returns: The mapped exception.
79+
"""
80+
reason = error_result.get('reason')
81+
status_code = _ERROR_REASON_TO_EXCEPTION.get(
82+
reason, http_client.INTERNAL_SERVER_ERROR)
83+
# make_exception expects an httplib2 response object.
84+
fake_response = _FakeResponse(status=status_code)
85+
return exceptions.make_exception(
86+
fake_response,
87+
error_result.get('message', ''),
88+
error_info=error_result,
89+
use_json=False)
3090

3191

3292
class Compression(_EnumProperty):
@@ -82,16 +142,23 @@ class WriteDisposition(_EnumProperty):
82142
ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY)
83143

84144

85-
class _BaseJob(object):
86-
"""Base class for jobs.
145+
class _AsyncJob(google.cloud.future.base.PollingFuture):
146+
"""Base class for asynchronous jobs.
147+
148+
:type name: str
149+
:param name: the name of the job
87150
88151
:type client: :class:`google.cloud.bigquery.client.Client`
89152
:param client: A client which holds credentials and project configuration
90153
for the dataset (which requires a project).
91154
"""
92-
def __init__(self, client):
155+
def __init__(self, name, client):
156+
super(_AsyncJob, self).__init__()
157+
self.name = name
93158
self._client = client
94159
self._properties = {}
160+
self._result_set = False
161+
self._completion_lock = threading.Lock()
95162

96163
@property
97164
def project(self):
@@ -117,21 +184,6 @@ def _require_client(self, client):
117184
client = self._client
118185
return client
119186

120-
121-
class _AsyncJob(_BaseJob):
122-
"""Base class for asynchronous jobs.
123-
124-
:type name: str
125-
:param name: the name of the job
126-
127-
:type client: :class:`google.cloud.bigquery.client.Client`
128-
:param client: A client which holds credentials and project configuration
129-
for the dataset (which requires a project).
130-
"""
131-
def __init__(self, name, client):
132-
super(_AsyncJob, self).__init__(client)
133-
self.name = name
134-
135187
@property
136188
def job_type(self):
137189
"""Type of job
@@ -273,6 +325,9 @@ def _set_properties(self, api_response):
273325
self._properties.clear()
274326
self._properties.update(cleaned)
275327

328+
# For Future interface
329+
self._set_future_result()
330+
276331
@classmethod
277332
def _get_resource_config(cls, resource):
278333
"""Helper for :meth:`from_api_repr`
@@ -345,7 +400,7 @@ def exists(self, client=None):
345400
return True
346401

347402
def reload(self, client=None):
348-
"""API call: refresh job properties via a GET request
403+
"""API call: refresh job properties via a GET request.
349404
350405
See
351406
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
@@ -371,12 +426,85 @@ def cancel(self, client=None):
371426
``NoneType``
372427
:param client: the client to use. If not passed, falls back to the
373428
``client`` stored on the current dataset.
429+
430+
:rtype: bool
431+
:returns: Boolean indicating that the cancel request was sent.
374432
"""
375433
client = self._require_client(client)
376434

377435
api_response = client._connection.api_request(
378436
method='POST', path='%s/cancel' % (self.path,))
379437
self._set_properties(api_response['job'])
438+
# The Future interface requires that we return True if the *attempt*
439+
# to cancel was successful.
440+
return True
441+
442+
# The following methods implement the PollingFuture interface. Note that
443+
# the methods above are from the pre-Future interface and are left for
444+
# compatibility. The only "overloaded" method is :meth:`cancel`, which
445+
# satisfies both interfaces.
446+
447+
def _set_future_result(self):
448+
"""Set the result or exception from the job if it is complete."""
449+
# This must be done in a lock to prevent the polling thread
450+
# and main thread from both executing the completion logic
451+
# at the same time.
452+
with self._completion_lock:
453+
# If the operation isn't complete or if the result has already been
454+
# set, do not call set_result/set_exception again.
455+
# Note: self._result_set is set to True in set_result and
456+
# set_exception, in case those methods are invoked directly.
457+
if self.state != _DONE_STATE or self._result_set:
458+
return
459+
460+
if self.error_result is not None:
461+
exception = _error_result_to_exception(self.error_result)
462+
self.set_exception(exception)
463+
else:
464+
self.set_result(self)
465+
466+
def done(self):
467+
"""Refresh the job and checks if it is complete.
468+
469+
:rtype: bool
470+
:returns: True if the job is complete, False otherwise.
471+
"""
472+
# Do not refresh is the state is already done, as the job will not
473+
# change once complete.
474+
if self.state != _DONE_STATE:
475+
self.reload()
476+
return self.state == _DONE_STATE
477+
478+
def result(self, timeout=None):
479+
"""Start the job and wait for it to complete and get the result.
480+
481+
:type timeout: int
482+
:param timeout: How long to wait for job to complete before raising
483+
a :class:`TimeoutError`.
484+
485+
:rtype: _AsyncJob
486+
:returns: This instance.
487+
488+
:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
489+
failed or :class:`TimeoutError` if the job did not complete in the
490+
given timeout.
491+
"""
492+
if self.state is None:
493+
self.begin()
494+
return super(_AsyncJob, self).result(timeout=timeout)
495+
496+
def cancelled(self):
497+
"""Check if the job has been cancelled.
498+
499+
This always returns False. It's not possible to check if a job was
500+
cancelled in the API. This method is here to satisfy the interface
501+
for :class:`google.cloud.future.Future`.
502+
503+
:rtype: bool
504+
:returns: False
505+
"""
506+
return (self.error_result is not None
507+
and self.error_result.get('reason') == _STOPPED_REASON)
380508

381509

382510
class _LoadConfiguration(object):
@@ -1127,11 +1255,44 @@ def from_api_repr(cls, resource, client):
11271255
job._set_properties(resource)
11281256
return job
11291257

1130-
def results(self):
1258+
def query_results(self):
11311259
"""Construct a QueryResults instance, bound to this job.
11321260
11331261
:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
11341262
:returns: results instance
11351263
"""
11361264
from google.cloud.bigquery.query import QueryResults
11371265
return QueryResults.from_query_job(self)
1266+
1267+
def results(self):
1268+
"""DEPRECATED.
1269+
1270+
This method is deprecated. Use :meth:`query_results` or :meth:`result`.
1271+
1272+
Construct a QueryResults instance, bound to this job.
1273+
1274+
:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
1275+
:returns: The query results.
1276+
"""
1277+
warnings.warn(
1278+
'QueryJob.results() is deprecated. Please use query_results() or '
1279+
'result().', DeprecationWarning)
1280+
return self.query_results()
1281+
1282+
def result(self, timeout=None):
1283+
"""Start the job and wait for it to complete and get the result.
1284+
1285+
:type timeout: int
1286+
:param timeout: How long to wait for job to complete before raising
1287+
a :class:`TimeoutError`.
1288+
1289+
:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
1290+
:returns: The query results.
1291+
1292+
:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
1293+
failed or :class:`TimeoutError` if the job did not complete in the
1294+
given timeout.
1295+
"""
1296+
super(QueryJob, self).result(timeout=timeout)
1297+
# Return a QueryResults instance instead of returning the job.
1298+
return self.query_results()

bigquery/tests/system.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import os
2020
import time
2121
import unittest
22+
import uuid
2223

2324
from google.cloud import bigquery
2425
from google.cloud._helpers import UTC
@@ -1013,6 +1014,15 @@ def test_large_query_w_public_data(self):
10131014
rows = list(iterator)
10141015
self.assertEqual(len(rows), LIMIT)
10151016

1017+
def test_async_query_future(self):
1018+
query_job = Config.CLIENT.run_async_query(
1019+
str(uuid.uuid4()), 'SELECT 1')
1020+
query_job.use_legacy_sql = False
1021+
1022+
iterator = query_job.result().fetch_data()
1023+
rows = list(iterator)
1024+
self.assertEqual(rows, [(1,)])
1025+
10161026
def test_insert_nested_nested(self):
10171027
# See #2951
10181028
SF = bigquery.SchemaField

0 commit comments

Comments
 (0)