diff --git a/README.rst b/README.rst index 9007e3618..f4966c86c 100644 --- a/README.rst +++ b/README.rst @@ -243,6 +243,7 @@ Trace Exporter Stats Exporter -------------- +- `OCAgent`_ - `Prometheus`_ - `Stackdriver`_ diff --git a/contrib/opencensus-ext-google-cloud-clientlibs/tests/test_google_cloud_clientlibs_trace.py b/contrib/opencensus-ext-google-cloud-clientlibs/tests/test_google_cloud_clientlibs_trace.py index aacbcd5a4..fa10ba9f5 100644 --- a/contrib/opencensus-ext-google-cloud-clientlibs/tests/test_google_cloud_clientlibs_trace.py +++ b/contrib/opencensus-ext-google-cloud-clientlibs/tests/test_google_cloud_clientlibs_trace.py @@ -15,11 +15,18 @@ import unittest import mock +import grpc from opencensus.ext.google_cloud_clientlibs import trace class Test_google_cloud_clientlibs_trace(unittest.TestCase): + def setUp(self): + self._insecure_channel_func = getattr(grpc, 'insecure_channel') + + def tearDown(self): + setattr(grpc, 'insecure_channel', self._insecure_channel_func) + def test_trace_integration(self): mock_trace_grpc = mock.Mock() mock_trace_http = mock.Mock() diff --git a/contrib/opencensus-ext-grpc/CHANGELOG.md b/contrib/opencensus-ext-grpc/CHANGELOG.md index b8dd1799c..8aeb3a51c 100644 --- a/contrib/opencensus-ext-grpc/CHANGELOG.md +++ b/contrib/opencensus-ext-grpc/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- Create WrappedResponseIterator for intercepted bi-directional rpc stream. ## 0.1.1 Released 2019-04-08 diff --git a/contrib/opencensus-ext-grpc/opencensus/ext/grpc/client_interceptor.py b/contrib/opencensus-ext-grpc/opencensus/ext/grpc/client_interceptor.py index 945972867..4fa9ccdf5 100644 --- a/contrib/opencensus-ext-grpc/opencensus/ext/grpc/client_interceptor.py +++ b/contrib/opencensus-ext-grpc/opencensus/ext/grpc/client_interceptor.py @@ -181,17 +181,9 @@ def intercept_unary_stream( request_iterator=iter((request,)), grpc_type=oc_grpc.UNARY_STREAM) - response_it = continuation( - new_details, - next(new_request_iterator)) - response_it = grpc_utils.wrap_iter_with_message_events( - request_or_response_iter=response_it, - span=current_span, - message_event_type=time_event.Type.RECEIVED - ) - response_it = grpc_utils.wrap_iter_with_end_span(response_it) - - return response_it + return grpc_utils.WrappedResponseIterator( + continuation(new_details, next(new_request_iterator)), + current_span) def intercept_stream_unary( self, continuation, client_call_details, request_iterator @@ -225,17 +217,8 @@ def intercept_stream_stream( request_iterator=request_iterator, grpc_type=oc_grpc.STREAM_STREAM) - response_it = continuation( - new_details, - new_request_iterator) - response_it = grpc_utils.wrap_iter_with_message_events( - request_or_response_iter=response_it, - span=current_span, - message_event_type=time_event.Type.RECEIVED - ) - response_it = grpc_utils.wrap_iter_with_end_span(response_it) - - return response_it + return grpc_utils.WrappedResponseIterator( + continuation(new_details, new_request_iterator), current_span) def _get_span_name(client_call_details): diff --git a/contrib/opencensus-ext-grpc/opencensus/ext/grpc/utils.py b/contrib/opencensus-ext-grpc/opencensus/ext/grpc/utils.py index 7028d4ffd..b50b0ab22 100644 --- a/contrib/opencensus-ext-grpc/opencensus/ext/grpc/utils.py +++ b/contrib/opencensus-ext-grpc/opencensus/ext/grpc/utils.py @@ -1,7 +1,9 @@ from datetime import datetime -from opencensus.trace import time_event +from grpc.framework.foundation import future +from grpc.framework.interfaces.face import face from opencensus.trace import execution_context +from opencensus.trace import time_event def add_message_event(proto_message, span, message_event_type, message_id=1): @@ -43,3 +45,93 @@ def wrap_iter_with_end_span(response_iter): for response in response_iter: yield response execution_context.get_opencensus_tracer().end_span() + + +class WrappedResponseIterator(future.Future, face.Call): + """Wraps the rpc response iterator. + + The grpc.StreamStreamClientInterceptor abstract class states stream + interceptor method should return an object that's both a call (implementing + the response iterator) and a future. Thus, this class is a thin wrapper + around the rpc response to provide the opencensus extension. + + :type iterator: (future.Future, face.Call) + :param iterator: rpc response iterator + + :type span: opencensus.trace.Span + :param span: rpc span + """ + def __init__(self, iterator, span): + self._iterator = iterator + self._span = span + + self._messages_received = 0 + + def add_done_callback(self, fn): + self._iterator.add_done_callback(lambda ignored_callback: fn(self)) + + def __iter__(self): + return self + + def __next__(self): + try: + message = next(self._iterator) + except StopIteration: + execution_context.get_opencensus_tracer().end_span() + raise + + self._messages_received += 1 + add_message_event( + proto_message=message, + span=self._span, + message_event_type=time_event.Type.RECEIVED, + message_id=self._messages_received) + return message + + def next(self): + return self.__next__() + + def cancel(self): + return self._iterator.cancel() + + def is_active(self): + return self._iterator.is_active() + + def cancelled(self): + raise NotImplementedError() # pragma: NO COVER + + def running(self): + raise NotImplementedError() # pragma: NO COVER + + def done(self): + raise NotImplementedError() # pragma: NO COVER + + def result(self, timeout=None): + raise NotImplementedError() # pragma: NO COVER + + def exception(self, timeout=None): + raise NotImplementedError() # pragma: NO COVER + + def traceback(self, timeout=None): + raise NotImplementedError() # pragma: NO COVER + + def initial_metadata(self): + raise NotImplementedError() # pragma: NO COVER + + def terminal_metadata(self): + raise NotImplementedError() # pragma: NO COVER + + def code(self): + raise NotImplementedError() # pragma: NO COVER + + def details(self): + raise NotImplementedError() # pragma: NO COVER + + def time_remaining(self): + raise NotImplementedError() # pragma: NO COVER + + def add_abortion_callback(self, abortion_callback): + raise NotImplementedError() # pragma: NO COVER + + def protocol_context(self): + raise NotImplementedError() # pragma: NO COVER diff --git a/contrib/opencensus-ext-grpc/tests/test_client_interceptor.py b/contrib/opencensus-ext-grpc/tests/test_client_interceptor.py index b95bceaca..e45096bb5 100644 --- a/contrib/opencensus-ext-grpc/tests/test_client_interceptor.py +++ b/contrib/opencensus-ext-grpc/tests/test_client_interceptor.py @@ -12,9 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections +import mock +import threading import unittest -import mock +from google.api_core import bidi +from google.protobuf import proto_builder +from grpc.framework.foundation import logging_pool +import grpc from opencensus.ext.grpc import client_interceptor from opencensus.trace import execution_context @@ -282,6 +288,108 @@ def test_intercept_stream_stream_not_trace(self): self.assertFalse(mock_tracer.end_span.called) +class TestGrpcInterface(unittest.TestCase): + + def setUp(self): + self._server = _start_server() + self._port = self._server.add_insecure_port('[::]:0') + self._channel = grpc.insecure_channel('localhost:%d' % self._port) + + def tearDown(self): + self._server.stop(None) + self._channel.close() + + def _intercepted_channel(self, tracer=None): + return grpc.intercept_channel( + self._channel, + client_interceptor.OpenCensusClientInterceptor(tracer=tracer)) + + def test_bidi_rpc_stream(self): + event = threading.Event() + + def _helper(request_iterator, context): + counter = 0 + for _ in request_iterator: + counter += 1 + if counter == 2: + event.set() + yield + + self._server.add_generic_rpc_handlers( + (StreamStreamRpcHandler(_helper),)) + self._server.start() + + rpc = bidi.BidiRpc( + self._intercepted_channel().stream_stream( + '', EmptyMessage.SerializeToString), + initial_request=EmptyMessage()) + done_event = threading.Event() + rpc.add_done_callback(lambda _: done_event.set()) + + rpc.open() + rpc.send(EmptyMessage()) + self.assertTrue(event.wait(timeout=1)) + rpc.close() + self.assertTrue(done_event.wait(timeout=1)) + + @mock.patch('opencensus.trace.execution_context.get_opencensus_tracer') + def test_close_span_on_done(self, mock_tracer): + def _helper(request_iterator, context): + for _ in request_iterator: + yield EmptyMessage() + yield + + self._server.add_generic_rpc_handlers( + (StreamStreamRpcHandler(_helper), )) + self._server.start() + + mock_tracer.return_value = mock_tracer + rpc = self._intercepted_channel(NoopTracer()).stream_stream( + method='', + request_serializer=EmptyMessage.SerializeToString, + response_deserializer=EmptyMessage.FromString)(iter( + [EmptyMessage()])) + + for resp in rpc: + pass + + self.assertEqual(mock_tracer.end_span.call_count, 1) + + +EmptyMessage = proto_builder.MakeSimpleProtoClass( + collections.OrderedDict([]), + full_name='tests.test_client_interceptor.EmptyMessage') + + +def _start_server(): + """Starts an insecure grpc server.""" + return grpc.server(logging_pool.pool(max_workers=1), + options=(('grpc.so_reuseport', 0), )) + + +class StreamStreamMethodHandler(grpc.RpcMethodHandler): + + def __init__(self, stream_handler_func): + self.request_streaming = True + self.response_streaming = True + self.request_deserializer = None + self.response_serializer = EmptyMessage.SerializeToString + self.unary_unary = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = stream_handler_func + + +class StreamStreamRpcHandler(grpc.GenericRpcHandler): + + def __init__(self, stream_stream_handler): + self._stream_stream_handler = stream_stream_handler + + def service(self, handler_call_details): + resp = StreamStreamMethodHandler(self._stream_stream_handler) + return resp + + class MockTracer(object): def __init__(self, current_span): self.current_span = current_span diff --git a/contrib/opencensus-ext-grpc/tests/test_server_interceptor.py b/contrib/opencensus-ext-grpc/tests/test_server_interceptor.py index 464593e2a..b5f3cad8d 100644 --- a/contrib/opencensus-ext-grpc/tests/test_server_interceptor.py +++ b/contrib/opencensus-ext-grpc/tests/test_server_interceptor.py @@ -18,6 +18,7 @@ from google.rpc import code_pb2 from opencensus.ext.grpc import server_interceptor +from opencensus.ext.grpc import utils as grpc_utils from opencensus.trace import execution_context from opencensus.trace import span as span_module @@ -149,6 +150,17 @@ def test_intercept_handler_exception(self): self.assertEqual(current_span.status.code, code_pb2.UNKNOWN) self.assertEqual(current_span.status.message, 'Test') + @mock.patch( + 'opencensus.trace.execution_context.get_opencensus_tracer') + def test_resp_streaming_span_end(self, mock_tracer): + mock_tracer.return_value = mock_tracer + + it = grpc_utils.wrap_iter_with_end_span(iter(['test'])) + for i in it: + pass + + self.assertEqual(mock_tracer.end_span.call_count, 1) + def test__wrap_rpc_behavior_none(self): new_handler = server_interceptor._wrap_rpc_behavior(None, lambda: None) self.assertEqual(new_handler, None) diff --git a/contrib/opencensus-ext-ocagent/CHANGELOG.md b/contrib/opencensus-ext-ocagent/CHANGELOG.md index b14289750..064bf6a17 100644 --- a/contrib/opencensus-ext-ocagent/CHANGELOG.md +++ b/contrib/opencensus-ext-ocagent/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- Add stats exporter ## 0.2.0 Released 2019-04-08 diff --git a/contrib/opencensus-ext-ocagent/README.rst b/contrib/opencensus-ext-ocagent/README.rst index 5d74e604d..c2cd222b3 100644 --- a/contrib/opencensus-ext-ocagent/README.rst +++ b/contrib/opencensus-ext-ocagent/README.rst @@ -1,4 +1,4 @@ -OpenCensus OC-Agent Trace Exporter +OpenCensus OC-Agent Exporter ============================================================================ |pypi| @@ -16,6 +16,13 @@ Installation Usage ----- +Stats +~~~~~ + .. code:: python - # TBD + from opencensus.ext.ocagent import stats_exporter as ocagent_stats_exporter + + ocagent_stats_exporter.new_stats_exporter( + service_name='service_name', + endpoint='localhost:55678') diff --git a/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/stats_exporter/__init__.py b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/stats_exporter/__init__.py new file mode 100644 index 000000000..afb870c4d --- /dev/null +++ b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/stats_exporter/__init__.py @@ -0,0 +1,261 @@ +# Copyright 2018, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from google.api_core import bidi +from opencensus.common.monitored_resource import monitored_resource +from opencensus.ext.ocagent import utils +from opencensus.metrics import transport +from opencensus.metrics.export import metric_descriptor +from opencensus.metrics.export import value +from opencensus.proto.agent.metrics.v1 import metrics_service_pb2 +from opencensus.proto.agent.metrics.v1 import metrics_service_pb2_grpc +from opencensus.proto.metrics.v1 import metrics_pb2 +from opencensus.proto.resource.v1 import resource_pb2 +from opencensus.stats import stats +import grpc + + +class StatsExporter(object): + """Stats exporter for an opencensus metrics grpc service. + + :type rpc_handler: ExportRpcHandler + :param rpc_handler: export rpc handler + """ + + def __init__(self, rpc_handler): + self._rpc_handler = rpc_handler + + def export_metrics(self, metrics): + """ Exports given metrics to target metric service. + """ + metric_protos = [] + for metric in metrics: + metric_protos.append(_get_metric_proto(metric)) + + self._rpc_handler.send( + metrics_service_pb2.ExportMetricsServiceRequest( + metrics=metric_protos)) + + +def _get_metric_proto(metric): + return metrics_pb2.Metric( + metric_descriptor=_get_metric_descriptor_proto(metric.descriptor), + timeseries=_get_time_series_list_proto(metric.time_series)) + + +def _get_time_series_list_proto(series_list): + protos = [] + for series in series_list: + protos.append( + metrics_pb2.TimeSeries( + start_timestamp=utils.proto_ts_from_datetime_str( + series.start_timestamp), + label_values=_get_label_values_proto(series.label_values), + points=_get_points_proto(series.points))) + return protos + + +def _get_points_proto(points): + protos = [] + for point in points: + proto = metrics_pb2.Point( + timestamp=utils.proto_ts_from_datetime(point.timestamp)) + + if isinstance(point.value, value.ValueLong): + proto.int64_value = int(point.value.value) + elif isinstance(point.value, value.ValueDouble): + proto.double_value = float(point.value.value) + elif isinstance(point.value, value.ValueDistribution): + proto.distribution_value.MergeFrom( + metrics_pb2.DistributionValue( + sum=point.value.sum, + count=point.value.count, + sum_of_squared_deviation=point.value. + sum_of_squared_deviation, + bucket_options=_get_bucket_options_proto( + point.value.bucket_options) + if point.value.bucket_options else None, + buckets=_get_buckets_proto(point.value.buckets))) + + # TODO: handle SUMMARY metrics, #567 + else: # pragma: NO COVER + raise TypeError('Unsupported metric type: {}'.format( + type(point.value))) + protos.append(proto) + return protos + + +def _get_bucket_options_proto(bucket_options): + return metrics_pb2.DistributionValue.BucketOptions( + explicit=metrics_pb2.DistributionValue.BucketOptions.Explicit( + bounds=bucket_options.type_.bounds)) + + +def _get_buckets_proto(buckets): + protos = [] + for bucket in buckets: + protos.append( + metrics_pb2.DistributionValue.Bucket( + count=bucket.count, + exemplar=_get_exemplar_proto(bucket.exemplar) + if bucket.exemplar else None)) + return protos + + +def _get_exemplar_proto(exemplar): + return metrics_pb2.DistributionValue.Exemplar( + value=exemplar.value, + timestamp=utils.proto_ts_from_datetime_str(exemplar.timestamp), + attachments=exemplar.attachments) + + +def _get_label_values_proto(label_values): + protos = [] + for label_value in label_values: + protos.append( + metrics_pb2.LabelValue(has_value=label_value.value is not None, + value=label_value.value)) + return protos + + +def _get_metric_descriptor_proto(descriptor): + return metrics_pb2.MetricDescriptor( + name=descriptor.name, + description=descriptor.description, + unit=descriptor.unit, + type=_get_metric_descriptor_type_proto(descriptor.type), + label_keys=_get_label_keys_proto(descriptor.label_keys)) + + +def _get_label_keys_proto(label_keys): + return [ + metrics_pb2.LabelKey(key=l.key, description=l.description) + for l in label_keys + ] + + +def _get_metric_descriptor_type_proto(descriptor_type): + return { + metric_descriptor.MetricDescriptorType.CUMULATIVE_INT64: + metrics_pb2.MetricDescriptor.CUMULATIVE_INT64, + metric_descriptor.MetricDescriptorType.CUMULATIVE_DOUBLE: + metrics_pb2.MetricDescriptor.CUMULATIVE_DOUBLE, + metric_descriptor.MetricDescriptorType.CUMULATIVE_DISTRIBUTION: + metrics_pb2.MetricDescriptor.CUMULATIVE_DISTRIBUTION, + metric_descriptor.MetricDescriptorType.GAUGE_INT64: + metrics_pb2.MetricDescriptor.GAUGE_INT64, + metric_descriptor.MetricDescriptorType.GAUGE_DOUBLE: + metrics_pb2.MetricDescriptor.GAUGE_DOUBLE, + metric_descriptor.MetricDescriptorType.GAUGE_DISTRIBUTION: + metrics_pb2.MetricDescriptor.GAUGE_DISTRIBUTION, + metric_descriptor.MetricDescriptorType.SUMMARY: + metrics_pb2.MetricDescriptor.SUMMARY, + }.get(descriptor_type, metrics_pb2.MetricDescriptor.UNSPECIFIED) + + +class ExportRpcHandler(object): + """Manages the rpc to the exporter service. + + :type client: class:`~.metrics_service_pb2_grpc.MetricsServiceStub` + :param client: metrics export client + + :type service_name: str + :param service_name: name of the service + + :type host_name: str + :param host_name: name of the host (machine or host name) + """ + + def __init__(self, client, service_name, host_name=None): + self._initialized = False + self._initial_request = None + self._rpc = bidi.BidiRpc(client.Export, lambda: self._initial_request) + self._node = utils.get_node(service_name, host_name) + self._resource = _get_resource() + + def send(self, request): + """Dispatches incoming request on rpc. + + Initializes rpc if necessary and dispatches incoming request. If a rpc + error is thrown, this function will attempt to recreate the stream and + retry sending given request once. + + :type request: class: + `~.metrics_service_pb2.ExportMetricsServiceRequest` + :param request: incoming export request + """ + if not self._initialized: + self._initialize(request) + return + + try: + self._rpc.send(request) + except grpc.RpcError as e: + logging.info('Found rpc error %s', e, exc_info=True) + # If stream has closed due to error, attempt to reopen with the + # incoming request. + self._initialize(request) + + def _initialize(self, request): + """Initializes the exporter rpc stream.""" + + # Add node information on the first request dispatched on a stream. + request.node.MergeFrom(self._node) + request.resource.MergeFrom(self._resource) + self._initial_request = request + + self._rpc.open() + self._initialized = True + + +def _get_resource(): + instance = monitored_resource.get_instance() + + if instance is not None: + return resource_pb2.Resource(type=instance.get_type(), + labels=instance.get_labels()) + return resource_pb2.Resource(type='global') + + +def _create_stub(endpoint): + return metrics_service_pb2_grpc.MetricsServiceStub( + grpc.insecure_channel(endpoint)) + + +def new_stats_exporter(service_name, + hostname=None, + endpoint=None, + interval=None): + """Create a new worker thread and attach the exporter to it. + + :type endpoint: str + :param endpoint: address of the opencensus service. + + :type service_name: str + :param service_name: name of the service + + :type host_name: str + :param host_name: name of the host (machine or host name) + + :type interval: int or float + :param interval: Seconds between export calls. + """ + endpoint = utils.DEFAULT_ENDPOINT if endpoint is None else endpoint + exporter = StatsExporter( + ExportRpcHandler(_create_stub(endpoint), service_name, hostname)) + + transport.get_exporter_thread(stats.stats, exporter, interval) + return exporter diff --git a/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/__init__.py b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/__init__.py index cf7b96aeb..afb3f9bd3 100644 --- a/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/__init__.py +++ b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/__init__.py @@ -14,15 +14,11 @@ """Export opencensus spans to ocagent""" from threading import Lock -import datetime import grpc -import os -import socket from opencensus.common.transports import sync -from opencensus.common.version import __version__ +from opencensus.ext.ocagent import utils as ocagent_utils from opencensus.ext.ocagent.trace_exporter import utils -from opencensus.proto.agent.common.v1 import common_pb2 from opencensus.proto.agent.trace.v1 import trace_service_pb2 from opencensus.proto.agent.trace.v1 import trace_service_pb2_grpc from opencensus.trace import base_exporter @@ -75,20 +71,7 @@ def __init__( self.client = client self.service_name = service_name - self.node = common_pb2.Node( - identifier=common_pb2.ProcessIdentifier( - host_name=socket.gethostname() if host_name is None - else host_name, - pid=os.getpid(), - start_timestamp=utils.proto_ts_from_datetime( - datetime.datetime.utcnow()) - ), - library_info=common_pb2.LibraryInfo( - language=common_pb2.LibraryInfo.Language.Value('PYTHON'), - exporter_version=EXPORTER_VERSION, - core_library_version=__version__ - ), - service_info=common_pb2.ServiceInfo(name=self.service_name)) + self.node = ocagent_utils.get_node(self.service_name, host_name) def emit(self, span_datas): """ @@ -140,8 +123,10 @@ def generate_span_requests(self, span_datas): :returns: List of span export requests. """ - pb_spans = [utils.translate_to_trace_proto( - span_data) for span_data in span_datas] + pb_spans = [ + utils.translate_to_trace_proto(span_data) + for span_data in span_datas + ] # TODO: send node once per channel yield trace_service_pb2.ExportTraceServiceRequest( diff --git a/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/utils.py b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/utils.py index 08d32f17c..30be8475b 100644 --- a/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/utils.py +++ b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/utils.py @@ -14,9 +14,8 @@ """Translates opencensus span data to trace proto""" -from google.protobuf.internal.well_known_types import ParseError -from google.protobuf.timestamp_pb2 import Timestamp from google.protobuf.wrappers_pb2 import BoolValue, UInt32Value +from opencensus.ext.ocagent import utils as ocagent_utils from opencensus.proto.trace.v1 import trace_pb2 @@ -40,8 +39,9 @@ def translate_to_trace_proto(span_data): span_id=hex_str_to_bytes_str(span_data.span_id), parent_span_id=hex_str_to_bytes_str(span_data.parent_span_id) if span_data.parent_span_id is not None else None, - start_time=proto_ts_from_datetime_str(span_data.start_time), - end_time=proto_ts_from_datetime_str(span_data.end_time), + start_time=ocagent_utils.proto_ts_from_datetime_str( + span_data.start_time), + end_time=ocagent_utils.proto_ts_from_datetime_str(span_data.end_time), status=trace_pb2.Status( code=span_data.status.code, message=span_data.status.message) @@ -162,41 +162,6 @@ def hex_str_to_bytes_str(hex_str): return bytes(bytearray.fromhex(hex_str)) -def proto_ts_from_datetime_str(dt): - """Converts string datetime in ISO format to protobuf timestamp. - - :type dt: str - :param dt: string with datetime in ISO format - - :rtype: :class:`~google.protobuf.timestamp_pb2.Timestamp` - :returns: protobuf timestamp - """ - - ts = Timestamp() - if (dt is not None): - try: - ts.FromJsonString(dt) - except ParseError: - pass - return ts - - -def proto_ts_from_datetime(dt): - """Converts datetime to protobuf timestamp. - - :type dt: datetime - :param dt: date and time - - :rtype: :class:`~google.protobuf.timestamp_pb2.Timestamp` - :returns: protobuf timestamp - """ - - ts = Timestamp() - if (dt is not None): - ts.FromDatetime(dt) - return ts - - def add_proto_attribute_value( pb_attributes, attribute_key, diff --git a/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/utils/__init__.py b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/utils/__init__.py new file mode 100644 index 000000000..1e70037c5 --- /dev/null +++ b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/utils/__init__.py @@ -0,0 +1,64 @@ +import datetime +import os +import socket + +from google.protobuf.internal.well_known_types import ParseError +from google.protobuf.timestamp_pb2 import Timestamp +from opencensus.common.version import __version__ as opencensus_version +from opencensus.proto.agent.common.v1 import common_pb2 + +# Default agent endpoint +DEFAULT_ENDPOINT = 'localhost:55678' + +# OCAgent exporter version +EXPORTER_VERSION = '0.0.1' + + +def get_node(service_name, host_name): + """Generates Node message from params and system information. + """ + return common_pb2.Node( + identifier=common_pb2.ProcessIdentifier( + host_name=socket.gethostname() if host_name is None + else host_name, + pid=os.getpid(), + start_timestamp=proto_ts_from_datetime( + datetime.datetime.utcnow())), + library_info=common_pb2.LibraryInfo( + language=common_pb2.LibraryInfo.Language.Value('PYTHON'), + exporter_version=EXPORTER_VERSION, + core_library_version=opencensus_version), + service_info=common_pb2.ServiceInfo(name=service_name)) + + +def proto_ts_from_datetime(dt): + """Converts datetime to protobuf timestamp. + + :type dt: datetime + :param dt: date and time + + :rtype: :class:`~google.protobuf.timestamp_pb2.Timestamp` + :returns: protobuf timestamp + """ + + ts = Timestamp() + if (dt is not None): + ts.FromDatetime(dt) + return ts + + +def proto_ts_from_datetime_str(dt): + """Converts string datetime in ISO format to protobuf timestamp. + :type dt: str + :param dt: string with datetime in ISO format + :rtype: :class:`~google.protobuf.timestamp_pb2.Timestamp` + :returns: protobuf timestamp + """ + + ts = Timestamp() + if (dt is not None): + try: + ts.FromJsonString(dt) + except ParseError: + pass + return ts diff --git a/contrib/opencensus-ext-ocagent/tests/test_ocagent_utils.py b/contrib/opencensus-ext-ocagent/tests/test_ocagent_utils.py new file mode 100644 index 000000000..b97226091 --- /dev/null +++ b/contrib/opencensus-ext-ocagent/tests/test_ocagent_utils.py @@ -0,0 +1,69 @@ +# Copyright 2018, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +import unittest + +from opencensus.common import utils as common_utils +from opencensus.ext.ocagent import utils + + +class TestUtils(unittest.TestCase): + def test_datetime_str_to_proto_ts_conversion(self): + now = datetime.utcnow() + delta = now - datetime(1970, 1, 1) + expected_seconds = int(delta.total_seconds()) + expected_nanos = delta.microseconds * 1000 + + proto_ts = utils.proto_ts_from_datetime_str( + common_utils.to_iso_str(now)) + self.assertEqual(proto_ts.seconds, int(expected_seconds)) + self.assertEqual(proto_ts.nanos, expected_nanos) + + def test_datetime_str_to_proto_ts_conversion_none(self): + proto_ts = utils.proto_ts_from_datetime_str(None) + self.assertEquals(proto_ts.seconds, 0) + self.assertEquals(proto_ts.nanos, 0) + + def test_datetime_str_to_proto_ts_conversion_empty(self): + proto_ts = utils.proto_ts_from_datetime_str('') + self.assertEquals(proto_ts.seconds, 0) + self.assertEquals(proto_ts.nanos, 0) + + def test_datetime_str_to_proto_ts_conversion_invalid(self): + proto_ts = utils.proto_ts_from_datetime_str('2018 08 22 T 11:53') + self.assertEquals(proto_ts.seconds, 0) + self.assertEquals(proto_ts.nanos, 0) + + def test_datetime_to_proto_ts_conversion_none(self): + proto_ts = utils.proto_ts_from_datetime(None) + self.assertEquals(proto_ts.seconds, 0) + self.assertEquals(proto_ts.nanos, 0) + + def test_datetime_to_proto_ts_conversion(self): + now = datetime.utcnow() + delta = now - datetime(1970, 1, 1) + expected_seconds = int(delta.total_seconds()) + expected_nanos = delta.microseconds * 1000 + + proto_ts = utils.proto_ts_from_datetime(now) + self.assertEqual(proto_ts.seconds, int(expected_seconds)) + self.assertEqual(proto_ts.nanos, expected_nanos) + + def test_datetime_to_proto_ts_conversion_zero(self): + zero = datetime(1970, 1, 1) + + proto_ts = utils.proto_ts_from_datetime(zero) + self.assertEqual(proto_ts.seconds, 0) + self.assertEqual(proto_ts.nanos, 0) diff --git a/contrib/opencensus-ext-ocagent/tests/test_stats_exporter.py b/contrib/opencensus-ext-ocagent/tests/test_stats_exporter.py new file mode 100644 index 000000000..b542aceb9 --- /dev/null +++ b/contrib/opencensus-ext-ocagent/tests/test_stats_exporter.py @@ -0,0 +1,471 @@ +# Copyright 2018, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures +from datetime import datetime +import grpc +import mock +import os +import socket +import threading +import time +import unittest + +from google.protobuf import timestamp_pb2 +from opencensus.common import resource +from opencensus.common import utils +from opencensus.common.version import __version__ as opencensus_version +from opencensus.ext.ocagent import stats_exporter as ocagent +from opencensus.metrics import label_value +from opencensus.metrics.export import metric +from opencensus.metrics.export import metric_descriptor +from opencensus.metrics.export import point +from opencensus.metrics.export import time_series +from opencensus.metrics.export import value +from opencensus.proto.agent.common.v1 import common_pb2 +from opencensus.proto.agent.metrics.v1 import metrics_service_pb2 +from opencensus.proto.agent.metrics.v1 import metrics_service_pb2_grpc +from opencensus.proto.metrics.v1 import metrics_pb2 +from opencensus.proto.resource.v1 import resource_pb2 +from opencensus.stats import aggregation as aggregation_module +from opencensus.stats import measure as measure_module +from opencensus.stats import metric_utils +from opencensus.stats import stats as stats_module +from opencensus.stats import view as view_module +from opencensus.stats import view_data as view_data_module +from opencensus.tags import tag_key as tag_key_module +from opencensus.tags import tag_map as tag_map_module + +SERVICE_NAME = 'my-service' + +MiB = 1 << 20 +FRONTEND_KEY = tag_key_module.TagKey("my.org/keys/frontend") +FRONTEND_KEY_FLOAT = tag_key_module.TagKey("my.org/keys/frontend-FLOAT") +FRONTEND_KEY_INT = tag_key_module.TagKey("my.org/keys/frontend-INT") +FRONTEND_KEY_STR = tag_key_module.TagKey("my.org/keys/frontend-STR") + +FRONTEND_KEY_CLEAN = "my_org_keys_frontend" +FRONTEND_KEY_FLOAT_CLEAN = "my_org_keys_frontend_FLOAT" +FRONTEND_KEY_INT_CLEAN = "my_org_keys_frontend_INT" +FRONTEND_KEY_STR_CLEAN = "my_org_keys_frontend_STR" + +VIDEO_SIZE_MEASURE = measure_module.MeasureFloat( + "my.org/measure/video_size_test2", "size of processed videos", "By") +VIDEO_SIZE_MEASURE_2 = measure_module.MeasureFloat( + "my.org/measure/video_size_test_2", "size of processed videos", "By") + +VIDEO_SIZE_MEASURE_FLOAT = measure_module.MeasureFloat( + "my.org/measure/video_size_test-float", "size of processed videos-float", + "By") + +VIDEO_SIZE_VIEW_NAME = "my.org/views/video_size_test2" +VIDEO_SIZE_DISTRIBUTION = aggregation_module.DistributionAggregation( + [16.0 * MiB, 256.0 * MiB]) +VIDEO_SIZE_VIEW = view_module.View(VIDEO_SIZE_VIEW_NAME, + "processed video size over time", + [FRONTEND_KEY], VIDEO_SIZE_MEASURE, + VIDEO_SIZE_DISTRIBUTION) + +TEST_TIME = datetime(2018, 12, 25, 1, 2, 3, 4) +TEST_TIME_STR = utils.to_iso_str(TEST_TIME) + + +class TestStatsExporter(unittest.TestCase): + def test_export_view_data(self): + v_data = view_data_module.ViewData(view=VIDEO_SIZE_VIEW, + start_time=TEST_TIME_STR, + end_time=TEST_TIME_STR) + v_data.record(context=tag_map_module.TagMap(), value=2, timestamp=None) + view_data = [v_data] + view_data = [metric_utils.view_data_to_metric(view_data[0], TEST_TIME)] + + handler = mock.Mock(spec=ocagent.ExportRpcHandler) + ocagent.StatsExporter(handler).export_metrics(view_data) + + self.assertEqual( + handler.send.call_args[0][0].metrics[0].metric_descriptor, + metrics_pb2.MetricDescriptor( + name=VIDEO_SIZE_VIEW_NAME, + description='processed video size over time', + unit='By', + type=metrics_pb2.MetricDescriptor.CUMULATIVE_DISTRIBUTION, + label_keys=[metrics_pb2.LabelKey(key=FRONTEND_KEY)])) + + self.assertEqual( + handler.send.call_args[0][0].metrics[0].timeseries[0], + metrics_pb2.TimeSeries( + start_timestamp=timestamp_pb2.Timestamp(seconds=1545699723, + nanos=4000), + label_values=[metrics_pb2.LabelValue(has_value=False)], + points=[ + metrics_pb2.Point( + timestamp=timestamp_pb2.Timestamp(seconds=1545699723, + nanos=4000), + distribution_value=metrics_pb2.DistributionValue( + sum=2, + count=1, + bucket_options=metrics_pb2.DistributionValue. + BucketOptions( + explicit=metrics_pb2.DistributionValue. + BucketOptions.Explicit( + bounds=[16.0 * MiB, 256.0 * MiB])), + buckets=[ + metrics_pb2.DistributionValue.Bucket(count=1), + metrics_pb2.DistributionValue.Bucket(), + metrics_pb2.DistributionValue.Bucket(), + ])) + ])) + + def test_export_with_label_value(self): + view = view_module.View('', '', [FRONTEND_KEY], VIDEO_SIZE_MEASURE, + aggregation_module.SumAggregation()) + v_data = view_data_module.ViewData(view=view, + start_time=TEST_TIME_STR, + end_time=TEST_TIME_STR) + v_data.record(context=tag_map_module.TagMap({FRONTEND_KEY: + 'test-key'}), + value=2.5, + timestamp=None) + view_data = [v_data] + view_data = [metric_utils.view_data_to_metric(view_data[0], TEST_TIME)] + + handler = mock.Mock(spec=ocagent.ExportRpcHandler) + ocagent.StatsExporter(handler).export_metrics(view_data) + self.assertEquals( + handler.send.call_args[0] + [0].metrics[0].timeseries[0].label_values[0], + metrics_pb2.LabelValue(has_value=True, value='test-key')) + + def test_export_double_point_value(self): + view = view_module.View('', '', [FRONTEND_KEY], VIDEO_SIZE_MEASURE, + aggregation_module.SumAggregation()) + v_data = view_data_module.ViewData(view=view, + start_time=TEST_TIME_STR, + end_time=TEST_TIME_STR) + v_data.record(context=tag_map_module.TagMap(), + value=2.5, + timestamp=None) + view_data = [v_data] + view_data = [metric_utils.view_data_to_metric(view_data[0], TEST_TIME)] + + handler = mock.Mock(spec=ocagent.ExportRpcHandler) + ocagent.StatsExporter(handler).export_metrics(view_data) + self.assertEquals( + handler.send.call_args[0] + [0].metrics[0].timeseries[0].points[0].double_value, 2.5) + + def test_export_exemplar(self): + metric = _create_metric( + metric_descriptor.MetricDescriptorType.CUMULATIVE_DISTRIBUTION, + points=[ + point.Point(value=_create_distribution_value( + bounds=[1], + buckets=[ + value.Bucket(count=1, + exemplar=value.Exemplar( + value=2.5, + timestamp=TEST_TIME_STR, + attachments={'key1': 'value1'})), + value.Bucket(count=0), + ]), + timestamp=datetime.now()) + ]) + + handler = mock.Mock(spec=ocagent.ExportRpcHandler) + ocagent.StatsExporter(handler).export_metrics([metric]) + + self.assertEquals( + handler.send.call_args[0][0].metrics[0].timeseries[0].points[0]. + distribution_value.buckets[0].exemplar, + metrics_pb2.DistributionValue.Exemplar( + value=2.5, + timestamp=timestamp_pb2.Timestamp(seconds=1545699723, + nanos=4000), + attachments={'key1': 'value1'})) + + +def _create_distribution_value(count=1, + sum_=0, + sum_of_squared_deviation=0, + bounds=[], + buckets=[]): + return value.ValueDistribution( + count=count, + sum_=sum_, + sum_of_squared_deviation=sum_of_squared_deviation, + bucket_options=value.BucketOptions(type_=value.Explicit( + bounds=bounds)), + buckets=buckets) + + +def _create_metric(descriptor_type=metric_descriptor.MetricDescriptorType. + CUMULATIVE_INT64, + points=[]): + return metric.Metric( + metric_descriptor.MetricDescriptor('', '', '', descriptor_type, []), [ + time_series.TimeSeries([label_value.LabelValue()], points, + TEST_TIME_STR) + ]) + + +class GenericRpcHandler(metrics_service_pb2_grpc.MetricsServiceServicer): + def __init__(self, func): + self._func = func + + def Export(self, request_iterator, context): + return self._func(request_iterator, context) + + +class TestExportRpcInterface(unittest.TestCase): + def setUp(self): + self._server = _start_server() + self._port = self._server.add_insecure_port('[::]:0') + self._channel = grpc.insecure_channel('localhost:%d' % self._port) + + def tearDown(self): + self._server.stop(None) + self._channel.close() + + def _create_stub(self): + return metrics_service_pb2_grpc.MetricsServiceStub( + channel=self._channel) + + def _add_and_start_service(self, service): + metrics_service_pb2_grpc.add_MetricsServiceServicer_to_server( + service, self._server) + self._server.start() + + def test_rpc_handler_initialization(self): + requests = [] + event = threading.Event() + + def _helper(request_iterator, context): + for request in request_iterator: + requests.append(request) + event.set() + yield + + self._add_and_start_service(GenericRpcHandler(_helper)) + + request = metrics_service_pb2.ExportMetricsServiceRequest( + node=common_pb2.Node(service_info=common_pb2.ServiceInfo( + name='test-service'))) + _create_rpc_handler(self._create_stub()).send(request) + + self.assertTrue(event.wait(timeout=1)) + self.assertListEqual(requests, [request]) + + def test_rpc_handler_export_multiple_packets(self): + requests = [] + + event = threading.Event() + + def _helper(request_iterator, context): + # Ensure a stream has not been started before accepting a request. + if len(requests) != 0: + return + + for request in request_iterator: + requests.append(request) + if len(requests) == 2: + event.set() + yield + + self._add_and_start_service(GenericRpcHandler(_helper)) + + r1 = metrics_service_pb2.ExportMetricsServiceRequest( + node=common_pb2.Node(service_info=common_pb2.ServiceInfo( + name='request1'))) + r2 = metrics_service_pb2.ExportMetricsServiceRequest( + node=common_pb2.Node(service_info=common_pb2.ServiceInfo( + name='request2'))) + + handler = _create_rpc_handler(self._create_stub()) + handler.send(r1) + handler.send(r2) + + self.assertTrue(event.wait(timeout=1)) + self.assertListEqual(requests, [r1, r2]) + + def test_rpc_handler_stream_restart_on_error(self): + initialized = [] + requests = [] + + event = threading.Event() + + def _helper(request_iterator, context): + initialized.append(True) + for request in request_iterator: + requests.append(request) + if len(requests) == 2: + event.set() + context.abort(grpc.StatusCode.INTERNAL, '') + + self._add_and_start_service(GenericRpcHandler(_helper)) + + request = metrics_service_pb2.ExportMetricsServiceRequest() + handler = _create_rpc_handler(self._create_stub()) + + handler.send(request) + # Give server time to propagate failure to client + time.sleep(0.1) + handler.send(request) + + self.assertTrue(event.wait(timeout=1)) + self.assertEquals(len(initialized), 2) + self.assertEquals(len(requests), 2) + + @mock.patch('opencensus.metrics.transport.get_exporter_thread') + def test_create_stats_exporter_initialization(self, mock_transport): + event = threading.Event() + + def _helper(request_iterator, context): + for request in request_iterator: + event.set() + yield + + self._add_and_start_service(GenericRpcHandler(_helper)) + + exporter = ocagent.new_stats_exporter(SERVICE_NAME, + endpoint='localhost:%s' % + self._port, + interval=0.1) + + self.assertEqual(mock_transport.call_args[0][0], stats_module.stats) + self.assertEqual(mock_transport.call_args[0][1], exporter) + self.assertEqual(mock_transport.call_args[0][2], 0.1) + + exporter.export_metrics([ + _create_metric(points=[ + point.Point(value.ValueLong(1), timestamp=datetime.now()) + ]) + ]) + self.assertTrue(event.wait(timeout=1)) + + @mock.patch('opencensus.metrics.transport.get_exporter_thread') + @mock.patch('grpc.insecure_channel') + def test_create_stats_exporter_with_default_endpoint( + self, mock_channel, _): + ocagent.new_stats_exporter(SERVICE_NAME) + self.assertEquals(mock_channel.call_args[0][0], 'localhost:55678') + + def test_export_node(self): + requests = [] + event = threading.Event() + + def _helper(request_iterator, context): + for request in request_iterator: + requests.append(request) + event.set() + yield + + self._add_and_start_service(GenericRpcHandler(_helper)) + + _create_rpc_handler( + self._create_stub(), service_name=SERVICE_NAME).send( + metrics_service_pb2.ExportMetricsServiceRequest()) + + self.assertTrue(event.wait(timeout=1)) + request = requests[0] + self.assertEqual(request.node.service_info.name, SERVICE_NAME) + self.assertEqual(request.node.library_info.language, 8) + self.assertIsNotNone(request.node.library_info.exporter_version) + self.assertEqual(request.node.library_info.core_library_version, + opencensus_version) + + self.assertEqual(request.node.identifier.host_name, + socket.gethostname()) + self.assertEqual(request.node.identifier.pid, os.getpid()) + + self.assertIsNotNone(request.node.identifier.start_timestamp) + self.assertGreater(request.node.identifier.start_timestamp.seconds, 0) + + def test_export_node_with_hostname(self): + requests = [] + event = threading.Event() + + def _helper(request_iterator, context): + for request in request_iterator: + requests.append(request) + event.set() + yield + + self._add_and_start_service(GenericRpcHandler(_helper)) + + ocagent.ExportRpcHandler( + self._create_stub(), + service_name=SERVICE_NAME, + host_name='my host').send( + metrics_service_pb2.ExportMetricsServiceRequest()) + self.assertTrue(event.wait(timeout=1)) + self.assertEqual(requests[0].node.identifier.host_name, 'my host') + + @mock.patch( + 'opencensus.common.monitored_resource.monitored_resource.get_instance') + def test_export_with_resource(self, mock_get_instance): + event = threading.Event() + requests = [] + + def _helper(request_iterator, context): + for r in request_iterator: + requests.append(r) + event.set() + yield + + self._add_and_start_service(GenericRpcHandler(_helper)) + + mock_get_instance.return_value = resource.Resource( + type_='gce_instance', labels={'key1': 'value1'}) + + _create_rpc_handler(self._create_stub()).send( + metrics_service_pb2.ExportMetricsServiceRequest()) + + self.assertTrue(event.wait(timeout=1)) + self.assertEqual( + requests[0].resource, + resource_pb2.Resource(type='gce_instance', + labels={'key1': 'value1'})) + + @mock.patch( + 'opencensus.common.monitored_resource.monitored_resource.get_instance') + def test_export_with_no_resource_found(self, mock_get_instance): + event = threading.Event() + requests = [] + + def _helper(request_iterator, context): + for r in request_iterator: + requests.append(r) + event.set() + yield + + self._add_and_start_service(GenericRpcHandler(_helper)) + + mock_get_instance.return_value = None + + _create_rpc_handler(self._create_stub()).send( + metrics_service_pb2.ExportMetricsServiceRequest()) + + self.assertTrue(event.wait(timeout=1)) + self.assertEqual(requests[0].resource, + resource_pb2.Resource(type='global')) + + +def _start_server(): + """Starts an insecure grpc server.""" + return grpc.server(futures.ThreadPoolExecutor(max_workers=1), + options=(('grpc.so_reuseport', 0), )) + + +def _create_rpc_handler(stub, service_name=SERVICE_NAME): + return ocagent.ExportRpcHandler(stub, SERVICE_NAME) diff --git a/contrib/opencensus-ext-ocagent/tests/test_trace_exporter_utils.py b/contrib/opencensus-ext-ocagent/tests/test_trace_exporter_utils.py index e9195a747..5439dd138 100644 --- a/contrib/opencensus-ext-ocagent/tests/test_trace_exporter_utils.py +++ b/contrib/opencensus-ext-ocagent/tests/test_trace_exporter_utils.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime +from datetime import timedelta import codecs -from datetime import datetime, timedelta import unittest -from opencensus.common import utils as common_utils from opencensus.ext.ocagent.trace_exporter import utils from opencensus.proto.trace.v1 import trace_pb2 from opencensus.trace import attributes as attributes_module @@ -526,32 +526,6 @@ def test_set_annotation_without_attributes(self): self.assertEqual(len(pb_event0.annotation.attributes.attribute_map), 0) self.assertEqual(len(pb_event1.annotation.attributes.attribute_map), 0) - def test_datetime_str_to_proto_ts_conversion(self): - now = datetime.utcnow() - delta = now - datetime(1970, 1, 1) - expected_seconds = int(delta.total_seconds()) - expected_nanos = delta.microseconds * 1000 - - proto_ts = utils.proto_ts_from_datetime_str( - common_utils.to_iso_str(now)) - self.assertEqual(proto_ts.seconds, int(expected_seconds)) - self.assertEqual(proto_ts.nanos, expected_nanos) - - def test_datetime_str_to_proto_ts_conversion_none(self): - proto_ts = utils.proto_ts_from_datetime_str(None) - self.assertEquals(proto_ts.seconds, 0) - self.assertEquals(proto_ts.nanos, 0) - - def test_datetime_str_to_proto_ts_conversion_empty(self): - proto_ts = utils.proto_ts_from_datetime_str('') - self.assertEquals(proto_ts.seconds, 0) - self.assertEquals(proto_ts.nanos, 0) - - def test_datetime_str_to_proto_ts_conversion_invalid(self): - proto_ts = utils.proto_ts_from_datetime_str('2018 08 22 T 11:53') - self.assertEquals(proto_ts.seconds, 0) - self.assertEquals(proto_ts.nanos, 0) - def test_hex_str_to_proto_bytes_conversion(self): hex_encoder = codecs.getencoder('hex') @@ -559,25 +533,3 @@ def test_hex_str_to_proto_bytes_conversion(self): '00010203040506070a0b0c0d0eff') self.assertEqual( hex_encoder(proto_bytes)[0], b'00010203040506070a0b0c0d0eff') - - def test_datetime_to_proto_ts_conversion_none(self): - proto_ts = utils.proto_ts_from_datetime(None) - self.assertEquals(proto_ts.seconds, 0) - self.assertEquals(proto_ts.nanos, 0) - - def test_datetime_to_proto_ts_conversion(self): - now = datetime.utcnow() - delta = now - datetime(1970, 1, 1) - expected_seconds = int(delta.total_seconds()) - expected_nanos = delta.microseconds * 1000 - - proto_ts = utils.proto_ts_from_datetime(now) - self.assertEqual(proto_ts.seconds, int(expected_seconds)) - self.assertEqual(proto_ts.nanos, expected_nanos) - - def test_datetime_to_proto_ts_conversion_zero(self): - zero = datetime(1970, 1, 1) - - proto_ts = utils.proto_ts_from_datetime(zero) - self.assertEqual(proto_ts.seconds, 0) - self.assertEqual(proto_ts.nanos, 0)