From 9eedc47b27778089c386d6b177f594c01716ccf6 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 4 Jan 2016 13:46:05 -0800 Subject: [PATCH 1/6] Add tests to ensure DESCRIBE statement returns columns and rows. Add feature to interpolate table name in queries if "{table}" is in query string. --- riak/tests/test_timeseries.py | 47 ++++++++++++++++++++++++++++++++ riak/transports/pbc/transport.py | 7 ++++- 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/riak/tests/test_timeseries.py b/riak/tests/test_timeseries.py index b0b28423..8261f170 100644 --- a/riak/tests/test_timeseries.py +++ b/riak/tests/test_timeseries.py @@ -213,6 +213,29 @@ def validate_data(self, ts_obj): self.assertEqual(row[3], 'wind') self.assertIsNone(row[4]) + def test_query_that_returns_table_description(self): + fmt = 'DESCRIBE {table}' + query = fmt.format(table=table_name) + ts_obj = self.client.ts_query('GeoCheckin', query) + self.assertIsNotNone(ts_obj) + self.assertGreater(len(ts_obj.columns), 0) + self.assertGreater(len(ts_obj.rows), 0) + + def test_query_that_returns_table_description_using_interpolation(self): + query = 'Describe {table}' + ts_obj = self.client.ts_query('GeoCheckin', query) + self.assertIsNotNone(ts_obj) + self.assertGreater(len(ts_obj.columns), 0) + self.assertGreater(len(ts_obj.rows), 0) + + def test_query_description_via_table(self): + query = 'describe {table}' + table = Table(self.client, 'GeoCheckin') + ts_obj = table.query(query) + self.assertIsNotNone(ts_obj) + self.assertGreater(len(ts_obj.columns), 0) + self.assertGreater(len(ts_obj.rows), 0) + def test_query_that_returns_no_data(self): fmt = """ select * from {table} where @@ -225,6 +248,17 @@ def test_query_that_returns_no_data(self): self.assertEqual(len(ts_obj.columns), 0) self.assertEqual(len(ts_obj.rows), 0) + def test_query_that_returns_no_data_using_interpolation(self): + query = """ + select * from {table} where + time > 0 and time < 10 and + geohash = 'hash1' and + user = 'user1' + """ + ts_obj = self.client.ts_query('GeoCheckin', query) + self.assertEqual(len(ts_obj.columns), 0) + self.assertEqual(len(ts_obj.rows), 0) + def test_query_that_matches_some_data(self): fmt = """ select * from {table} where @@ -239,6 +273,19 @@ def test_query_that_matches_some_data(self): ts_obj = self.client.ts_query('GeoCheckin', query) self.validate_data(ts_obj) + def test_query_that_matches_some_data_using_interpolation(self): + fmt = """ + select * from {table} where + time > {t1} and time < {t2} and + geohash = 'hash1' and + user = 'user2' + """ + query = fmt.format( + t1=self.tenMinsAgoMsec, + t2=self.nowMsec) + ts_obj = self.client.ts_query('GeoCheckin', query) + self.validate_data(ts_obj) + def test_query_that_matches_more_data(self): fmt = """ select * from {table} where diff --git a/riak/transports/pbc/transport.py b/riak/transports/pbc/transport.py index 3ad5ae1c..9960cabb 100644 --- a/riak/transports/pbc/transport.py +++ b/riak/transports/pbc/transport.py @@ -213,7 +213,12 @@ def ts_delete(self, table, key): def ts_query(self, table, query, interpolations=None): req = riak.pb.riak_ts_pb2.TsQueryReq() - req.query.base = str_to_bytes(query) + + q = query + if '{table}' in q: + q = q.format(table=table.name) + + req.query.base = str_to_bytes(q) msg_code, ts_query_resp = self._request( riak.pb.messages.MSG_CODE_TS_QUERY_REQ, req, From fa12c25b6e4be47f7a636e141d14077815dbd394 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 4 Jan 2016 14:30:16 -0800 Subject: [PATCH 2/6] Add methods to retrieve a table description via client object and table object --- docs/client.rst | 2 ++ riak/client/operations.py | 19 +++++++++++++++++++ riak/table.py | 8 ++++++++ riak/tests/test_timeseries.py | 15 ++++++++++++++- riak/transports/pbc/transport.py | 4 ++++ riak/transports/transport.py | 18 ++++++++++++++++++ 6 files changed, 65 insertions(+), 1 deletion(-) diff --git a/docs/client.rst b/docs/client.rst index ae2f8e54..f014afd9 100644 --- a/docs/client.rst +++ b/docs/client.rst @@ -127,10 +127,12 @@ Key-level Operations Timeseries Operations -------------------- +.. automethod:: RiakClient.ts_describe .. automethod:: RiakClient.ts_get .. automethod:: RiakClient.ts_put .. automethod:: RiakClient.ts_delete .. automethod:: RiakClient.ts_query +.. automethod:: RiakClient.ts_stream_keys ---------------- Query Operations diff --git a/riak/client/operations.py b/riak/client/operations.py index aaecae7d..d3541b3c 100644 --- a/riak/client/operations.py +++ b/riak/client/operations.py @@ -536,6 +536,25 @@ def put(self, transport, robj, w=None, dw=None, pw=None, return_body=None, if_none_match=if_none_match, timeout=timeout) + @retryable + def ts_describe(self, transport, table): + """ + ts_describe(table) + + Retrieve a time series table description from the Riak cluster. + + .. note:: This request is automatically retried :attr:`retries` + times if it fails due to network error. + + :param table: The timeseries table. + :type table: string or :class:`Table ` + :rtype: :class:`TsObject ` + """ + t = table + if isinstance(t, string_types): + t = Table(self, table) + return transport.ts_describe(t) + @retryable def ts_get(self, transport, table, key): """ diff --git a/riak/table.py b/riak/table.py index c477a32b..d026bf18 100644 --- a/riak/table.py +++ b/riak/table.py @@ -49,6 +49,14 @@ def new(self, rows, columns=None): return TsObject(self._client, self, rows, columns) + def describe(self): + """ + Retrieves a timeseries table's description. + + :rtype: :class:`TsObject ` + """ + return self._client.ts_describe(self) + def get(self, key): """ Gets a value from a timeseries table. diff --git a/riak/tests/test_timeseries.py b/riak/tests/test_timeseries.py index 8261f170..5560512d 100644 --- a/riak/tests/test_timeseries.py +++ b/riak/tests/test_timeseries.py @@ -236,6 +236,19 @@ def test_query_description_via_table(self): self.assertGreater(len(ts_obj.columns), 0) self.assertGreater(len(ts_obj.rows), 0) + def test_get_description(self): + ts_obj = self.client.ts_describe('GeoCheckin') + self.assertIsNotNone(ts_obj) + self.assertGreater(len(ts_obj.columns), 0) + self.assertGreater(len(ts_obj.rows), 0) + + def test_get_description_via_table(self): + table = Table(self.client, 'GeoCheckin') + ts_obj = table.describe() + self.assertIsNotNone(ts_obj) + self.assertGreater(len(ts_obj.columns), 0) + self.assertGreater(len(ts_obj.rows), 0) + def test_query_that_returns_no_data(self): fmt = """ select * from {table} where @@ -275,7 +288,7 @@ def test_query_that_matches_some_data(self): def test_query_that_matches_some_data_using_interpolation(self): fmt = """ - select * from {table} where + select * from {{table}} where time > {t1} and time < {t2} and geohash = 'hash1' and user = 'user2' diff --git a/riak/transports/pbc/transport.py b/riak/transports/pbc/transport.py index 9960cabb..53df2181 100644 --- a/riak/transports/pbc/transport.py +++ b/riak/transports/pbc/transport.py @@ -173,6 +173,10 @@ def put(self, robj, w=None, dw=None, pw=None, return_body=True, return robj + def ts_describe(self, table): + query = 'DESCRIBE {table}'.format(table=table.name) + return self.ts_query(table, query) + def ts_get(self, table, key): req = riak.pb.riak_ts_pb2.TsGetReq() self._encode_timeseries_keyreq(table, key, req) diff --git a/riak/transports/transport.py b/riak/transports/transport.py index f9fcae6d..4f33168c 100644 --- a/riak/transports/transport.py +++ b/riak/transports/transport.py @@ -92,12 +92,30 @@ def delete(self, robj, rw=None, r=None, w=None, dw=None, pr=None, """ raise NotImplementedError + def ts_describe(self, table): + """ + Retrieves a timeseries table description. + """ + raise NotImplementedError + + def ts_get(self, table, key): + """ + Retrieves a timeseries object. + """ + raise NotImplementedError + def ts_put(self, tsobj): """ Stores a timeseries object. """ raise NotImplementedError + def ts_delete(self, table, key): + """ + Deletes a timeseries object. + """ + raise NotImplementedError + def ts_query(self, table, query, interpolations=None): """ Query timeseries data. From a3929635b3b88cb88543bbebc26c515bdfa5bba5 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 5 Jan 2016 10:06:47 -0800 Subject: [PATCH 3/6] Add test to create a TS table via the query interface --- riak/tests/test_timeseries.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/riak/tests/test_timeseries.py b/riak/tests/test_timeseries.py index 5560512d..cef6406b 100644 --- a/riak/tests/test_timeseries.py +++ b/riak/tests/test_timeseries.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- import datetime import platform +import random +import string import riak.pb.riak_ts_pb2 from riak import RiakError @@ -27,7 +29,6 @@ ts1 = ts0 + fiveMins -@unittest.skipUnless(RUN_TIMESERIES, 'RUN_TIMESERIES is 0') class TimeseriesUnitTests(unittest.TestCase): def setUp(self): self.c = RiakPbcCodec() @@ -213,6 +214,22 @@ def validate_data(self, ts_obj): self.assertEqual(row[3], 'wind') self.assertIsNone(row[4]) + def test_query_that_creates_table_using_interpolation(self): + table = ''.join( + [random.choice(string.ascii_letters + string.digits) + for n in range(32)]) + query = """CREATE TABLE {table} ( + geohash varchar not null, + user varchar not null, + time timestamp not null, + weather varchar not null, + temperature double, + PRIMARY KEY((geohash, user, quantum(time, 15, m)), + geohash, user, time)) + """ + ts_obj = self.client.ts_query(table, query) + self.assertIsNotNone(ts_obj) + def test_query_that_returns_table_description(self): fmt = 'DESCRIBE {table}' query = fmt.format(table=table_name) From 44721f8273cf6c3d3be3466382b15e491fb2553b Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 5 Jan 2016 16:56:28 -0800 Subject: [PATCH 4/6] Validate the rows and columns returned from a DESCRIBE query --- riak/tests/test_timeseries.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/riak/tests/test_timeseries.py b/riak/tests/test_timeseries.py index cef6406b..d819d146 100644 --- a/riak/tests/test_timeseries.py +++ b/riak/tests/test_timeseries.py @@ -235,36 +235,36 @@ def test_query_that_returns_table_description(self): query = fmt.format(table=table_name) ts_obj = self.client.ts_query('GeoCheckin', query) self.assertIsNotNone(ts_obj) - self.assertGreater(len(ts_obj.columns), 0) - self.assertGreater(len(ts_obj.rows), 0) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) def test_query_that_returns_table_description_using_interpolation(self): query = 'Describe {table}' ts_obj = self.client.ts_query('GeoCheckin', query) self.assertIsNotNone(ts_obj) - self.assertGreater(len(ts_obj.columns), 0) - self.assertGreater(len(ts_obj.rows), 0) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) def test_query_description_via_table(self): query = 'describe {table}' table = Table(self.client, 'GeoCheckin') ts_obj = table.query(query) self.assertIsNotNone(ts_obj) - self.assertGreater(len(ts_obj.columns), 0) - self.assertGreater(len(ts_obj.rows), 0) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) def test_get_description(self): ts_obj = self.client.ts_describe('GeoCheckin') self.assertIsNotNone(ts_obj) - self.assertGreater(len(ts_obj.columns), 0) - self.assertGreater(len(ts_obj.rows), 0) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) def test_get_description_via_table(self): table = Table(self.client, 'GeoCheckin') ts_obj = table.describe() self.assertIsNotNone(ts_obj) - self.assertGreater(len(ts_obj.columns), 0) - self.assertGreater(len(ts_obj.rows), 0) + self.assertEqual(len(ts_obj.columns), 5) + self.assertEqual(len(ts_obj.rows), 5) def test_query_that_returns_no_data(self): fmt = """ From f33059d74856b4599ba7b92942b8ac3298a853fb Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 8 Jan 2016 12:32:52 -0800 Subject: [PATCH 5/6] Ensure table name starts with alpha char --- riak/tests/test_timeseries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/riak/tests/test_timeseries.py b/riak/tests/test_timeseries.py index d819d146..21a0d3b9 100644 --- a/riak/tests/test_timeseries.py +++ b/riak/tests/test_timeseries.py @@ -218,7 +218,7 @@ def test_query_that_creates_table_using_interpolation(self): table = ''.join( [random.choice(string.ascii_letters + string.digits) for n in range(32)]) - query = """CREATE TABLE {table} ( + query = """CREATE TABLE test-{table} ( geohash varchar not null, user varchar not null, time timestamp not null, From 6f169a7bd58fa0de3d8152c0239f34dde6ac2691 Mon Sep 17 00:00:00 2001 From: Brett Hazen Date: Mon, 11 Jan 2016 16:58:46 +0000 Subject: [PATCH 6/6] Add convenience Makefile target configure_timeseries in buildbot --- buildbot/Makefile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/buildbot/Makefile b/buildbot/Makefile index a32f3910..fd0ee0ed 100644 --- a/buildbot/Makefile +++ b/buildbot/Makefile @@ -13,6 +13,9 @@ preconfigure: configure: @../setup.py configure --riak-admin=$(RIAK_ADMIN) +configure_timeseries: + @../setup.py setup_timeseries --riak-admin=$(RIAK_ADMIN) + compile: @echo NO-OP