Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions buildbot/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ preconfigure:
configure:
@../setup.py configure --riak-admin=$(RIAK_ADMIN)

configure_timeseries:
@../setup.py setup_timeseries --riak-admin=$(RIAK_ADMIN)

compile:
@echo NO-OP

Expand Down
2 changes: 2 additions & 0 deletions docs/client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,12 @@ Key-level Operations
Timeseries Operations
--------------------

.. automethod:: RiakClient.ts_describe
.. automethod:: RiakClient.ts_get
.. automethod:: RiakClient.ts_put
.. automethod:: RiakClient.ts_delete
.. automethod:: RiakClient.ts_query
.. automethod:: RiakClient.ts_stream_keys

----------------
Query Operations
Expand Down
19 changes: 19 additions & 0 deletions riak/client/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,25 @@ def put(self, transport, robj, w=None, dw=None, pw=None, return_body=None,
if_none_match=if_none_match,
timeout=timeout)

@retryable
def ts_describe(self, transport, table):
"""
ts_describe(table)

Retrieve a time series table description from the Riak cluster.

.. note:: This request is automatically retried :attr:`retries`
times if it fails due to network error.

:param table: The timeseries table.
:type table: string or :class:`Table <riak.table.Table>`
:rtype: :class:`TsObject <riak.ts_object.TsObject>`
"""
t = table
if isinstance(t, string_types):
t = Table(self, table)
return transport.ts_describe(t)

@retryable
def ts_get(self, transport, table, key):
"""
Expand Down
8 changes: 8 additions & 0 deletions riak/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ def new(self, rows, columns=None):

return TsObject(self._client, self, rows, columns)

def describe(self):
"""
Retrieves a timeseries table's description.

:rtype: :class:`TsObject <riak.ts_object.TsObject>`
"""
return self._client.ts_describe(self)

def get(self, key):
"""
Gets a value from a timeseries table.
Expand Down
79 changes: 78 additions & 1 deletion riak/tests/test_timeseries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import datetime
import platform
import random
import string
import riak.pb.riak_ts_pb2

from riak import RiakError
Expand All @@ -27,7 +29,6 @@
ts1 = ts0 + fiveMins


@unittest.skipUnless(RUN_TIMESERIES, 'RUN_TIMESERIES is 0')
class TimeseriesUnitTests(unittest.TestCase):
def setUp(self):
self.c = RiakPbcCodec()
Expand Down Expand Up @@ -213,6 +214,58 @@ def validate_data(self, ts_obj):
self.assertEqual(row[3], 'wind')
self.assertIsNone(row[4])

def test_query_that_creates_table_using_interpolation(self):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is this interpolation because of the random table names?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because I have {table} in the query string, but do not use fmt to replace it before passing to ts_query. Within the code for queries, it will check for {table} and, if present, insert the table name.

Note that ts_query takes a table parameter than can be a Table object or a string. Either can provide the name of the table so it is convenient to replace the table name for the user.

See my comment here:

https://github.com/basho/riak-python-client/pull/422/files#r48791174

table = ''.join(
[random.choice(string.ascii_letters + string.digits)
for n in range(32)])
query = """CREATE TABLE test-{table} (
geohash varchar not null,
user varchar not null,
time timestamp not null,
weather varchar not null,
temperature double,
PRIMARY KEY((geohash, user, quantum(time, 15, m)),
geohash, user, time))
"""
ts_obj = self.client.ts_query(table, query)
self.assertIsNotNone(ts_obj)

def test_query_that_returns_table_description(self):
fmt = 'DESCRIBE {table}'
query = fmt.format(table=table_name)
ts_obj = self.client.ts_query('GeoCheckin', query)
self.assertIsNotNone(ts_obj)
self.assertEqual(len(ts_obj.columns), 5)
self.assertEqual(len(ts_obj.rows), 5)

def test_query_that_returns_table_description_using_interpolation(self):
query = 'Describe {table}'
ts_obj = self.client.ts_query('GeoCheckin', query)
self.assertIsNotNone(ts_obj)
self.assertEqual(len(ts_obj.columns), 5)
self.assertEqual(len(ts_obj.rows), 5)

def test_query_description_via_table(self):
query = 'describe {table}'
table = Table(self.client, 'GeoCheckin')
ts_obj = table.query(query)
self.assertIsNotNone(ts_obj)
self.assertEqual(len(ts_obj.columns), 5)
self.assertEqual(len(ts_obj.rows), 5)

def test_get_description(self):
ts_obj = self.client.ts_describe('GeoCheckin')
self.assertIsNotNone(ts_obj)
self.assertEqual(len(ts_obj.columns), 5)
self.assertEqual(len(ts_obj.rows), 5)

def test_get_description_via_table(self):
table = Table(self.client, 'GeoCheckin')
ts_obj = table.describe()
self.assertIsNotNone(ts_obj)
self.assertEqual(len(ts_obj.columns), 5)
self.assertEqual(len(ts_obj.rows), 5)

def test_query_that_returns_no_data(self):
fmt = """
select * from {table} where
Expand All @@ -225,6 +278,17 @@ def test_query_that_returns_no_data(self):
self.assertEqual(len(ts_obj.columns), 0)
self.assertEqual(len(ts_obj.rows), 0)

def test_query_that_returns_no_data_using_interpolation(self):
query = """
select * from {table} where
time > 0 and time < 10 and
geohash = 'hash1' and
user = 'user1'
"""
ts_obj = self.client.ts_query('GeoCheckin', query)
self.assertEqual(len(ts_obj.columns), 0)
self.assertEqual(len(ts_obj.rows), 0)

def test_query_that_matches_some_data(self):
fmt = """
select * from {table} where
Expand All @@ -239,6 +303,19 @@ def test_query_that_matches_some_data(self):
ts_obj = self.client.ts_query('GeoCheckin', query)
self.validate_data(ts_obj)

def test_query_that_matches_some_data_using_interpolation(self):
fmt = """
select * from {{table}} where
time > {t1} and time < {t2} and
geohash = 'hash1' and
user = 'user2'
"""
query = fmt.format(
t1=self.tenMinsAgoMsec,
t2=self.nowMsec)
ts_obj = self.client.ts_query('GeoCheckin', query)
self.validate_data(ts_obj)

def test_query_that_matches_more_data(self):
fmt = """
select * from {table} where
Expand Down
11 changes: 10 additions & 1 deletion riak/transports/pbc/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ def put(self, robj, w=None, dw=None, pw=None, return_body=True,

return robj

def ts_describe(self, table):
query = 'DESCRIBE {table}'.format(table=table.name)
return self.ts_query(table, query)

def ts_get(self, table, key):
req = riak.pb.riak_ts_pb2.TsGetReq()
self._encode_timeseries_keyreq(table, key, req)
Expand Down Expand Up @@ -213,7 +217,12 @@ def ts_delete(self, table, key):

def ts_query(self, table, query, interpolations=None):
req = riak.pb.riak_ts_pb2.TsQueryReq()
req.query.base = str_to_bytes(query)

q = query
if '{table}' in q:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new feature that I don't necessarily have to include. It eliminates the redundancy of specifying the table name as the first parameter to ts_query() as well as within the query string. However, if the query string is itself a format string, the user must remember to escape the parameter via {{table}} as you can see in the tests.

q = q.format(table=table.name)

req.query.base = str_to_bytes(q)

msg_code, ts_query_resp = self._request(
riak.pb.messages.MSG_CODE_TS_QUERY_REQ, req,
Expand Down
18 changes: 18 additions & 0 deletions riak/transports/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,30 @@ def delete(self, robj, rw=None, r=None, w=None, dw=None, pr=None,
"""
raise NotImplementedError

def ts_describe(self, table):
"""
Retrieves a timeseries table description.
"""
raise NotImplementedError

def ts_get(self, table, key):
"""
Retrieves a timeseries object.
"""
raise NotImplementedError

def ts_put(self, tsobj):
"""
Stores a timeseries object.
"""
raise NotImplementedError

def ts_delete(self, table, key):
"""
Deletes a timeseries object.
"""
raise NotImplementedError

def ts_query(self, table, query, interpolations=None):
"""
Query timeseries data.
Expand Down