diff --git a/buildbot/Makefile b/buildbot/Makefile index a32f3910..fd0ee0ed 100644 --- a/buildbot/Makefile +++ b/buildbot/Makefile @@ -13,6 +13,9 @@ preconfigure: configure: @../setup.py configure --riak-admin=$(RIAK_ADMIN) +configure_timeseries: + @../setup.py setup_timeseries --riak-admin=$(RIAK_ADMIN) + compile: @echo NO-OP diff --git a/docs/client.rst b/docs/client.rst index ae2f8e54..f014afd9 100644 --- a/docs/client.rst +++ b/docs/client.rst @@ -127,10 +127,12 @@ Key-level Operations Timeseries Operations -------------------- +.. automethod:: RiakClient.ts_describe .. automethod:: RiakClient.ts_get .. automethod:: RiakClient.ts_put .. automethod:: RiakClient.ts_delete .. automethod:: RiakClient.ts_query +.. automethod:: RiakClient.ts_stream_keys ---------------- Query Operations diff --git a/riak/client/operations.py b/riak/client/operations.py index aaecae7d..d3541b3c 100644 --- a/riak/client/operations.py +++ b/riak/client/operations.py @@ -536,6 +536,25 @@ def put(self, transport, robj, w=None, dw=None, pw=None, return_body=None, if_none_match=if_none_match, timeout=timeout) + @retryable + def ts_describe(self, transport, table): + """ + ts_describe(table) + + Retrieve a time series table description from the Riak cluster. + + .. note:: This request is automatically retried :attr:`retries` + times if it fails due to network error. + + :param table: The timeseries table. + :type table: string or :class:`Table ` + :rtype: :class:`TsObject ` + """ + t = table + if isinstance(t, string_types): + t = Table(self, table) + return transport.ts_describe(t) + @retryable def ts_get(self, transport, table, key): """ diff --git a/riak/table.py b/riak/table.py index c477a32b..d026bf18 100644 --- a/riak/table.py +++ b/riak/table.py @@ -49,6 +49,14 @@ def new(self, rows, columns=None): return TsObject(self._client, self, rows, columns) + def describe(self): + """ + Retrieves a timeseries table's description. + + :rtype: :class:`TsObject ` + """ + return self._client.ts_describe(self) + def get(self, key): """ Gets a value from a timeseries table. diff --git a/riak/tests/test_timeseries.py b/riak/tests/test_timeseries.py index b0b28423..21a0d3b9 100644 --- a/riak/tests/test_timeseries.py +++ b/riak/tests/test_timeseries.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- import datetime import platform +import random +import string import riak.pb.riak_ts_pb2 from riak import RiakError @@ -27,7 +29,6 @@ ts1 = ts0 + fiveMins -@unittest.skipUnless(RUN_TIMESERIES, 'RUN_TIMESERIES is 0') class TimeseriesUnitTests(unittest.TestCase): def setUp(self): self.c = RiakPbcCodec() @@ -213,6 +214,58 @@ def validate_data(self, ts_obj): self.assertEqual(row[3], 'wind') self.assertIsNone(row[4]) + def test_query_that_creates_table_using_interpolation(self): + table = ''.join( + [random.choice(string.ascii_letters + string.digits) + for n in range(32)]) + query = """CREATE TABLE test-{table} ( + geohash varchar not null, + user varchar not null, + time timestamp not null, + weather varchar not null, + temperature double, + PRIMARY KEY((geohash, user, quantum(time, 15, m)), + geohash, user, time)) + """ + ts_obj = self.client.ts_query(table, query) + self.assertIsNotNone(ts_obj) + + def test_query_that_returns_table_description(self): + fmt = 'DESCRIBE {table}' + query = fmt.format(table=table_name) + ts_obj = self.client.ts_query('GeoCheckin', query) + self.assertIsNotNone(ts_obj) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) + + def test_query_that_returns_table_description_using_interpolation(self): + query = 'Describe {table}' + ts_obj = self.client.ts_query('GeoCheckin', query) + self.assertIsNotNone(ts_obj) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) + + def test_query_description_via_table(self): + query = 'describe {table}' + table = Table(self.client, 'GeoCheckin') + ts_obj = table.query(query) + self.assertIsNotNone(ts_obj) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) + + def test_get_description(self): + ts_obj = self.client.ts_describe('GeoCheckin') + self.assertIsNotNone(ts_obj) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) + + def test_get_description_via_table(self): + table = Table(self.client, 'GeoCheckin') + ts_obj = table.describe() + self.assertIsNotNone(ts_obj) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) + def test_query_that_returns_no_data(self): fmt = """ select * from {table} where @@ -225,6 +278,17 @@ def test_query_that_returns_no_data(self): self.assertEqual(len(ts_obj.columns), 0) self.assertEqual(len(ts_obj.rows), 0) + def test_query_that_returns_no_data_using_interpolation(self): + query = """ + select * from {table} where + time > 0 and time < 10 and + geohash = 'hash1' and + user = 'user1' + """ + ts_obj = self.client.ts_query('GeoCheckin', query) + self.assertEqual(len(ts_obj.columns), 0) + self.assertEqual(len(ts_obj.rows), 0) + def test_query_that_matches_some_data(self): fmt = """ select * from {table} where @@ -239,6 +303,19 @@ def test_query_that_matches_some_data(self): ts_obj = self.client.ts_query('GeoCheckin', query) self.validate_data(ts_obj) + def test_query_that_matches_some_data_using_interpolation(self): + fmt = """ + select * from {{table}} where + time > {t1} and time < {t2} and + geohash = 'hash1' and + user = 'user2' + """ + query = fmt.format( + t1=self.tenMinsAgoMsec, + t2=self.nowMsec) + ts_obj = self.client.ts_query('GeoCheckin', query) + self.validate_data(ts_obj) + def test_query_that_matches_more_data(self): fmt = """ select * from {table} where diff --git a/riak/transports/pbc/transport.py b/riak/transports/pbc/transport.py index 3ad5ae1c..53df2181 100644 --- a/riak/transports/pbc/transport.py +++ b/riak/transports/pbc/transport.py @@ -173,6 +173,10 @@ def put(self, robj, w=None, dw=None, pw=None, return_body=True, return robj + def ts_describe(self, table): + query = 'DESCRIBE {table}'.format(table=table.name) + return self.ts_query(table, query) + def ts_get(self, table, key): req = riak.pb.riak_ts_pb2.TsGetReq() self._encode_timeseries_keyreq(table, key, req) @@ -213,7 +217,12 @@ def ts_delete(self, table, key): def ts_query(self, table, query, interpolations=None): req = riak.pb.riak_ts_pb2.TsQueryReq() - req.query.base = str_to_bytes(query) + + q = query + if '{table}' in q: + q = q.format(table=table.name) + + req.query.base = str_to_bytes(q) msg_code, ts_query_resp = self._request( riak.pb.messages.MSG_CODE_TS_QUERY_REQ, req, diff --git a/riak/transports/transport.py b/riak/transports/transport.py index f9fcae6d..4f33168c 100644 --- a/riak/transports/transport.py +++ b/riak/transports/transport.py @@ -92,12 +92,30 @@ def delete(self, robj, rw=None, r=None, w=None, dw=None, pr=None, """ raise NotImplementedError + def ts_describe(self, table): + """ + Retrieves a timeseries table description. + """ + raise NotImplementedError + + def ts_get(self, table, key): + """ + Retrieves a timeseries object. + """ + raise NotImplementedError + def ts_put(self, tsobj): """ Stores a timeseries object. """ raise NotImplementedError + def ts_delete(self, table, key): + """ + Deletes a timeseries object. + """ + raise NotImplementedError + def ts_query(self, table, query, interpolations=None): """ Query timeseries data.