Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
3b20639
Get benchmarks in their own spot to add some for timeseries.
Jan 28, 2016
02f509e
Begin timeseries benchmarks
Jan 28, 2016
2baadfb
No need to limit connections to localhost.
lukebakken Jan 28, 2016
270a856
Put / Get TS benchmark complete.
lukebakken Feb 1, 2016
e99c696
Adding TTB encoding and tests.
lukebakken Feb 1, 2016
2dd27d6
Unit test for tsgetreq ttb encoding complete
lukebakken Feb 2, 2016
b251fa9
Unit test for tsputreq ttb encoding complete
lukebakken Feb 2, 2016
84f81ab
Merge branch 'master' into features/lrb/rts-842-perf-test-t2b-encoding
lukebakken Feb 22, 2016
aabd600
Merge fixes.
lukebakken Feb 22, 2016
c8e7341
Update riak_pb to origin/riak_ts-develop-1.2
Feb 24, 2016
714e7f4
TTB fix
lukebakken Feb 24, 2016
4cddfe5
Fix toggle encoding request
Feb 24, 2016
246ad6b
Add ttb integration test.
lukebakken Feb 24, 2016
c89872d
TTB is ALIVE
lukebakken Feb 24, 2016
ac1afce
TTB encode rows correctly
lukebakken Feb 25, 2016
2db4157
Add args to benchmark program.
lukebakken Feb 25, 2016
c340dd5
bump submodules
lukebakken Mar 21, 2016
fd157fb
Python 3 + TTB
Mar 28, 2016
f917696
Fix setup version check, TTB for Python 2
Mar 28, 2016
77f09de
make lint happy
Mar 28, 2016
80aac21
Add basho-erlastic as a dependency
Mar 28, 2016
5cf97fa
Use Python version comparison that uses ints instead of strings
Mar 28, 2016
65b9e2c
Merge branch 'master' into features/lrb/rts-842-perf-test-t2b-encoding
Mar 29, 2016
4325130
Improve README generation
Mar 29, 2016
8515893
Move transports/pbc dir to transports/tcp
Mar 29, 2016
cb1cffd
Test suite fixes for Windows
lukebakken Mar 29, 2016
ac43d1b
Moving code around to separate codecs from transports.
lukebakken Mar 29, 2016
0bcea33
fix test on PY2
Mar 29, 2016
dd52c94
Rewrite TCP connection class to be much more efficient.
lukebakken Mar 30, 2016
460b015
Moving code to PbufCodec
lukebakken Mar 30, 2016
22ec2a4
Standardize on kwargs for double-star parameter name.
lukebakken Mar 30, 2016
38bd286
Make linters happy.
lukebakken Mar 30, 2016
5dff949
Fix message encoding for Auth and ToggleEncoding PB requests
Mar 30, 2016
3ac26e8
Added Msg named tuple to contain data between codecs and transport la…
lukebakken Mar 30, 2016
86511b4
make linters happy
lukebakken Mar 30, 2016
47eaee1
Move more code into the codecs.
lukebakken Mar 31, 2016
da5c13a
make timeseries tests happy
Mar 31, 2016
642e4b7
All code for PB and TTB encoding / decoding now in dedicated classes.
lukebakken Apr 1, 2016
a2f6b92
Updating for TTB changes server-side.
lukebakken Apr 6, 2016
b946f0f
Continuing to work on TTB changes
lukebakken Apr 7, 2016
1eed9f2
TTB and PBUF timeseries tests working correctly.
lukebakken Apr 7, 2016
7736ae3
Decode column type the same way for ttb and pbuf
lukebakken Apr 7, 2016
051bf5c
Merge branch 'master' into features/lrb/rts-842-perf-test-t2b-encoding
lukebakken Apr 7, 2016
57daac0
More work on TTB encoding
Apr 13, 2016
4eec937
finish up Riak TS 1.3 changes
Apr 14, 2016
2e5b0aa
process rpberrorresp correctly
lukebakken Apr 19, 2016
c6ecbfb
make lint happy
Apr 22, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
README.rst
*.pyc
.python-version
__pycache__/

.tox/

.tox/

docs/_build

.*.swp
.coverage

riak-*/
py-build/
dist/

riak.egg-info/
*.egg
.eggs/

#*#
*~
*.ps1
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
include docs/*
include riak/erl_src/*
include README.md
include README.rst
include LICENSE
include RELNOTES.md
include version.py
Expand Down
3 changes: 1 addition & 2 deletions buildbot/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ compile:

lint:
@pip install --upgrade pep8 flake8
@cd ..; pep8 --exclude=riak/pb riak *.py
@cd ..; flake8 --exclude=riak/pb riak *.py

test: setup test_normal test_security
Expand All @@ -46,7 +45,7 @@ test_security:
test_timeseries:
@echo "Testing Riak Python Client (timeseries)"
@$(RIAK_ADMIN) security disable
@RIAK_TEST_PROTOCOL='pbc' RUN_YZ=0 RUN_DATATYPES=0 RUN_INDEXES=1 RUN_TIMESERIES=1 ./tox_runner.sh ..
@RIAK_TEST_PROTOCOL='pbc' RIAK_TEST_PB_PORT=8087 RUN_YZ=0 RUN_DATATYPES=0 RUN_INDEXES=1 RUN_TIMESERIES=1 ./tox_runner.sh ..

setup:
./tox_setup.sh
20 changes: 12 additions & 8 deletions docs/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Transports

.. currentmodule:: riak.transports.transport

.. autoclass:: RiakTransport
.. autoclass:: Transport
:members:
:private-members:

Expand Down Expand Up @@ -124,20 +124,24 @@ HTTP Transport

.. currentmodule:: riak.transports.http

.. autoclass:: RiakHttpPool
.. autoclass:: HttpPool

.. autofunction:: is_retryable

.. autoclass:: RiakHttpTransport
.. autoclass:: HttpTransport
:members:

^^^^^^^^^^^^^^^^^^^^^^^^^^
Protocol Buffers Transport
^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^
TCP Transport
^^^^^^^^^^^^^

.. currentmodule:: riak.transports.tcp

.. currentmodule:: riak.transports.pbc
.. autoclass:: TcpPool

.. autofunction:: is_retryable

.. autoclass:: RiakPbcTransport
.. autoclass:: TcpTransport
:members:

---------
Expand Down
27 changes: 7 additions & 20 deletions riak/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,9 @@
"""
Copyright 2013 Basho Technologies, Inc.

This file is provided to you under the Apache License,
Version 2.0 (the "License"); you may not use this file
except in compliance with the License. You may obtain
a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""

from __future__ import print_function

import os
import gc
import sys
import traceback

__all__ = ['measure', 'measure_with_rehearsal']

Expand Down Expand Up @@ -172,5 +157,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
elif exc_type is KeyboardInterrupt:
return False
else:
print("EXCEPTION! %r" % ((exc_type, exc_val, exc_tb),))
return True
msg = "EXCEPTION! type: %r val: %r" % (exc_type, exc_val)
print(msg, file=sys.stderr)
traceback.print_tb(exc_tb)
return True if exc_type is None else False
74 changes: 74 additions & 0 deletions riak/benchmarks/timeseries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import datetime
import random
import sys

import riak.benchmark as benchmark

from multiprocessing import cpu_count
from riak import RiakClient

# logger = logging.getLogger()
# logger.level = logging.DEBUG
# logger.addHandler(logging.StreamHandler(sys.stdout))

# batch sizes 8, 16, 32, 64, 128, 256
if len(sys.argv) != 3:
raise AssertionError(
'first arg is batch size, second arg is true / false'
'for use_ttb')

rowcount = 32768
batchsz = int(sys.argv[1])
if rowcount % batchsz != 0:
raise AssertionError('rowcount must be divisible by batchsz')
use_ttb = sys.argv[2].lower() == 'true'

epoch = datetime.datetime.utcfromtimestamp(0)
onesec = datetime.timedelta(0, 1)

weather = ['typhoon', 'hurricane', 'rain', 'wind', 'snow']
rows = []
for i in range(rowcount):
ts = datetime.datetime(2016, 1, 1, 12, 0, 0) + \
datetime.timedelta(seconds=i)
family_idx = i % batchsz
series_idx = i % batchsz
family = 'hash{:d}'.format(family_idx)
series = 'user{:d}'.format(series_idx)
w = weather[i % len(weather)]
temp = (i % 100) + random.random()
row = [family, series, ts, w, temp]
key = [family, series, ts]
rows.append(row)

print("Benchmarking timeseries:")
print(" Use TTB: {}".format(use_ttb))
print("Batch Size: {}".format(batchsz))
print(" CPUs: {}".format(cpu_count()))
print(" Rows: {}".format(len(rows)))
print()

tbl = 'GeoCheckin'
h = 'riak-test'
n = [
{'host': h, 'pb_port': 10017},
{'host': h, 'pb_port': 10027},
{'host': h, 'pb_port': 10037},
{'host': h, 'pb_port': 10047},
{'host': h, 'pb_port': 10057}
]
client = RiakClient(nodes=n, protocol='pbc',
transport_options={'use_ttb': use_ttb})
table = client.table(tbl)

with benchmark.measure() as b:
for i in (1, 2, 3):
with b.report('populate-%d' % i):
for i in range(0, rowcount, batchsz):
x = i
y = i + batchsz
r = rows[x:y]
ts_obj = table.new(r)
result = ts_obj.store()
if result is not True:
raise AssertionError("expected success")
20 changes: 10 additions & 10 deletions riak/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from riak.mapreduce import RiakMapReduceChain
from riak.resolver import default_resolver
from riak.table import Table
from riak.transports.http import RiakHttpPool
from riak.transports.pbc import RiakPbcPool
from riak.transports.http import HttpPool
from riak.transports.tcp import TcpPool
from riak.security import SecurityCreds
from riak.util import lazy_property, bytes_to_str, str_to_bytes
from six import string_types, PY2
Expand Down Expand Up @@ -68,7 +68,7 @@ class RiakClient(RiakMapReduceChain, RiakClientOperations):
PROTOCOLS = ['http', 'pbc']

def __init__(self, protocol='pbc', transport_options={}, nodes=None,
credentials=None, multiget_pool_size=None, **unused_args):
credentials=None, multiget_pool_size=None, **kwargs):
"""
Construct a new ``RiakClient`` object.

Expand All @@ -88,19 +88,19 @@ def __init__(self, protocol='pbc', transport_options={}, nodes=None,
CPUs in the system
:type multiget_pool_size: int
"""
unused_args = unused_args.copy()
kwargs = kwargs.copy()

if nodes is None:
self.nodes = [self._create_node(unused_args), ]
self.nodes = [self._create_node(kwargs), ]
else:
self.nodes = [self._create_node(n) for n in nodes]

self._multiget_pool_size = multiget_pool_size
self.protocol = protocol or 'pbc'
self._resolver = None
self._credentials = self._create_credentials(credentials)
self._http_pool = RiakHttpPool(self, **transport_options)
self._pb_pool = RiakPbcPool(self, **transport_options)
self._http_pool = HttpPool(self, **transport_options)
self._tcp_pool = TcpPool(self, **transport_options)

if PY2:
self._encoders = {'application/json': default_encoder,
Expand Down Expand Up @@ -167,7 +167,7 @@ def _get_client_id(self):
def _set_client_id(self, client_id):
for http in self._http_pool:
http.client_id = client_id
for pb in self._pb_pool:
for pb in self._tcp_pool:
pb.client_id = client_id

client_id = property(_get_client_id, _set_client_id,
Expand Down Expand Up @@ -298,8 +298,8 @@ def close(self):
"""
if self._http_pool is not None:
self._http_pool.clear()
if self._pb_pool is not None:
self._pb_pool.clear()
if self._tcp_pool is not None:
self._tcp_pool.clear()

def _create_node(self, n):
if isinstance(n, RiakNode):
Expand Down
21 changes: 2 additions & 19 deletions riak/client/multiget.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,9 @@
"""
Copyright 2013 Basho Technologies, Inc.

This file is provided to you under the Apache License,
Version 2.0 (the "License"); you may not use this file
except in compliance with the License. You may obtain
a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""

from __future__ import print_function
from collections import namedtuple
from threading import Thread, Lock, Event
from multiprocessing import cpu_count
from six import PY2

if PY2:
from Queue import Queue
else:
Expand Down Expand Up @@ -177,8 +160,8 @@ def multiget(client, keys, **options):
:meth:`RiakBucket.get <riak.bucket.RiakBucket.get>`
:type options: dict
:rtype: list

"""

outq = Queue()

if 'pool' in options:
Expand Down
25 changes: 4 additions & 21 deletions riak/client/transport.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,6 @@
"""
Copyright 2012 Basho Technologies, Inc.

This file is provided to you under the Apache License,
Version 2.0 (the "License"); you may not use this file
except in compliance with the License. You may obtain
a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""
from contextlib import contextmanager
from riak.transports.pool import BadResource
from riak.transports.pbc import is_retryable as is_pbc_retryable
from riak.transports.tcp import is_retryable as is_pbc_retryable
from riak.transports.http import is_retryable as is_http_retryable
import threading
from six import PY2
Expand Down Expand Up @@ -49,7 +32,7 @@ class RiakClientTransport(object):
# These will be set or redefined by the RiakClient initializer
protocol = 'pbc'
_http_pool = None
_pb_pool = None
_tcp_pool = None
_locals = _client_locals()

def _get_retry_count(self):
Expand Down Expand Up @@ -163,8 +146,8 @@ def _choose_pool(self, protocol=None):
protocol = self.protocol
if protocol == 'http':
pool = self._http_pool
elif protocol == 'pbc':
pool = self._pb_pool
elif protocol == 'tcp' or protocol == 'pbc':
pool = self._tcp_pool
else:
raise ValueError("invalid protocol %s" % protocol)
return pool
Expand Down
25 changes: 25 additions & 0 deletions riak/codecs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import collections

from riak import RiakError

Msg = collections.namedtuple('Msg',
['msg_code', 'data', 'resp_code'],
verbose=False)


class Codec(object):
def parse_msg(self):
raise NotImplementedError('parse_msg not implemented')

def maybe_incorrect_code(self, resp_code, expect=None):
if expect and resp_code != expect:
raise RiakError("unexpected message code: %d, expected %d"
% (resp_code, expect))

def maybe_riak_error(self, err_code, msg_code, data=None):
if msg_code == err_code:
if data is None:
raise RiakError('no error provided!')
return data
else:
return None
Loading