diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f66ec617a..e33097f0d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Increment the: ## [Unreleased] * [EXPORTER] Jaeger Exporter - Populate Span Links ([#1251](https://github.com/open-telemetry/opentelemetry-cpp/pull/1251)) +* [EXPORTER] OTLP http exporter allow concurrency session ([#1209](https://github.com/open-telemetry/opentelemetry-cpp/pull/1209)) ## [1.2.0] 2022-01-31 diff --git a/api/include/opentelemetry/common/timestamp.h b/api/include/opentelemetry/common/timestamp.h index 54e7b7aa6c..da8765b9bc 100644 --- a/api/include/opentelemetry/common/timestamp.h +++ b/api/include/opentelemetry/common/timestamp.h @@ -169,5 +169,39 @@ class SteadyTimestamp private: int64_t nanos_since_epoch_; }; + +class DurationUtil +{ +public: + template + static std::chrono::duration AdjustWaitForTimeout( + std::chrono::duration timeout, + std::chrono::duration indefinite_value) noexcept + { + // Do not call now() when this duration is max value, now() may have a expensive cost. + if (timeout == std::chrono::duration::max()) + { + return indefinite_value; + } + + // std::future::wait_for, std::this_thread::sleep_for, and std::condition_variable::wait_for + // may use steady_clock or system_clock.We need make sure now() + timeout do not overflow. + auto max_timeout = std::chrono::duration_cast>( + std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now()); + if (timeout >= max_timeout) + { + return indefinite_value; + } + max_timeout = std::chrono::duration_cast>( + std::chrono::system_clock::time_point::max() - std::chrono::system_clock::now()); + if (timeout >= max_timeout) + { + return indefinite_value; + } + + return timeout; + } +}; + } // namespace common OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index ea58807e96..88de284831 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -89,6 +89,17 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor const opentelemetry::nostd::span> &records) noexcept override; + /** + * Exports a vector of log records to the Elasticsearch instance asynchronously. + * @param records A list of log records to send to Elasticsearch. + * @param result_callback callback function accepting ExportResult as argument + */ + void Export( + const opentelemetry::nostd::span> + &records, + std::function &&result_callback) noexcept + override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index a5a66ebe01..6a511d197f 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -110,6 +110,89 @@ class ResponseHandler : public http_client::EventHandler bool console_debug_ = false; }; +/** + * This class handles the async response message from the Elasticsearch request + */ +class AsyncResponseHandler : public http_client::EventHandler +{ +public: + /** + * Creates a response handler, that by default doesn't display to console + */ + AsyncResponseHandler( + std::shared_ptr session, + std::function &&result_callback, + bool console_debug = false) + : console_debug_{console_debug}, + session_{std::move(session)}, + result_callback_{std::move(result_callback)} + {} + + /** + * Cleans up the session in the destructor. + */ + ~AsyncResponseHandler() { session_->FinishSession(); } + + /** + * Automatically called when the response is received + */ + void OnResponse(http_client::Response &response) noexcept override + { + + // Store the body of the request + body_ = std::string(response.GetBody().begin(), response.GetBody().end()); + if (body_.find("\"failed\" : 0") == std::string::npos) + { + OTEL_INTERNAL_LOG_ERROR( + "[ES Trace Exporter] Logs were not written to Elasticsearch correctly, response body: " + << body_); + result_callback_(sdk::common::ExportResult::kFailure); + } + else + { + result_callback_(sdk::common::ExportResult::kSuccess); + } + } + + // Callback method when an http event occurs + void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override + { + // If any failure event occurs, release the condition variable to unblock main thread + switch (state) + { + case http_client::SessionState::ConnectFailed: + OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Connection to elasticsearch failed"); + break; + case http_client::SessionState::SendFailed: + OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request failed to be sent to elasticsearch"); + + break; + case http_client::SessionState::TimedOut: + OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request to elasticsearch timed out"); + + break; + case http_client::SessionState::NetworkError: + OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Network error to elasticsearch"); + break; + default: + break; + } + result_callback_(sdk::common::ExportResult::kFailure); + } + +private: + // Stores the session object for the request + std::shared_ptr session_; + // Callback to call to on receiving events + std::function result_callback_; + + // A string to store the response body + std::string body_ = ""; + + // Whether to print the results from the callback + bool console_debug_ = false; +}; + ElasticsearchLogExporter::ElasticsearchLogExporter() : options_{ElasticsearchExporterOptions()}, http_client_{new ext::http::client::curl::HttpClient()} @@ -162,8 +245,8 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export( request->SetBody(body_vec); // Send the request - std::unique_ptr handler(new ResponseHandler(options_.console_debug_)); - session->SendRequest(*handler); + auto handler = std::make_shared(options_.console_debug_); + session->SendRequest(handler); // Wait for the response to be received if (options_.console_debug_) @@ -198,6 +281,51 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export( return sdk::common::ExportResult::kSuccess; } +void ElasticsearchLogExporter::Export( + const opentelemetry::nostd::span> + &records, + std::function &&result_callback) noexcept +{ + // Return failure if this exporter has been shutdown + if (isShutdown()) + { + OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Exporting " + << records.size() << " log(s) failed, exporter is shutdown"); + return; + } + + // Create a connection to the ElasticSearch instance + auto session = http_client_->CreateSession(options_.host_ + std::to_string(options_.port_)); + auto request = session->CreateRequest(); + + // Populate the request with headers and methods + request->SetUri(options_.index_ + "/_bulk?pretty"); + request->SetMethod(http_client::Method::Post); + request->AddHeader("Content-Type", "application/json"); + request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_)); + + // Create the request body + std::string body = ""; + for (auto &record : records) + { + // Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified + // in URI + body += "{\"index\" : {}}\n"; + + // Add the context of the Recordable + auto json_record = std::unique_ptr( + static_cast(record.release())); + body += json_record->GetJSON().dump() + "\n"; + } + std::vector body_vec(body.begin(), body.end()); + request->SetBody(body_vec); + + // Send the request + auto handler = std::make_shared(session, std::move(result_callback), + options_.console_debug_); + session->SendRequest(handler); +} + bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h index 284bab2cab..c6d057b4ba 100644 --- a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h +++ b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h @@ -61,6 +61,15 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + void Export(const nostd::span> &spans, + std::function + &&result_callback) noexcept override; + /** * Shutdown the exporter. * @param timeout an option timeout, default to max. diff --git a/exporters/jaeger/src/jaeger_exporter.cc b/exporters/jaeger/src/jaeger_exporter.cc index c07f2f0100..f35b5bccd7 100644 --- a/exporters/jaeger/src/jaeger_exporter.cc +++ b/exporters/jaeger/src/jaeger_exporter.cc @@ -70,6 +70,15 @@ sdk_common::ExportResult JaegerExporter::Export( return sdk_common::ExportResult::kSuccess; } +void JaegerExporter::Export( + const nostd::span> &spans, + std::function &&result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); + auto status = Export(spans); + result_callback(status); +} + void JaegerExporter::InitializeEndpoint() { if (options_.transport_format == TransportFormat::kThriftUdpCompact) diff --git a/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h b/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h index 3e47ccb177..ee9d9e9876 100644 --- a/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h +++ b/exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h @@ -64,6 +64,20 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte return sdk::common::ExportResult::kSuccess; } + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + void Export(const nostd::span> &spans, + std::function + &&result_callback) noexcept override + { + OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call"); + auto status = Export(spans); + result_callback(status); + } + /** * @param timeout an optional value containing the timeout of the exporter * note: passing custom timeout values is not currently supported for this exporter diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h index ad1d54a215..deb8645926 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h @@ -39,6 +39,13 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter const opentelemetry::nostd::span> &records) noexcept override; + /** + * Exports a span of logs sent from the processor asynchronously. + */ + void Export( + const opentelemetry::nostd::span> &records, + std::function &&result_callback) noexcept; + /** * Marks the OStream Log Exporter as shut down. */ diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h index 8122b6777a..0044a29e47 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h @@ -38,6 +38,12 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter const opentelemetry::nostd::span> &spans) noexcept override; + void Export( + const opentelemetry::nostd::span> + &spans, + std::function &&result_callback) noexcept + override; + bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; diff --git a/exporters/ostream/src/log_exporter.cc b/exporters/ostream/src/log_exporter.cc index ef103bb6b2..46f88b46d4 100644 --- a/exporters/ostream/src/log_exporter.cc +++ b/exporters/ostream/src/log_exporter.cc @@ -180,7 +180,16 @@ sdk::common::ExportResult OStreamLogExporter::Export( return sdk::common::ExportResult::kSuccess; } -bool OStreamLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept +void OStreamLogExporter::Export( + const opentelemetry::nostd::span> &records, + std::function &&result_callback) noexcept +{ + // Do not have async support + auto result = Export(records); + result_callback(result); +} + +bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; diff --git a/exporters/ostream/src/span_exporter.cc b/exporters/ostream/src/span_exporter.cc index dea72f57f8..d566a66f93 100644 --- a/exporters/ostream/src/span_exporter.cc +++ b/exporters/ostream/src/span_exporter.cc @@ -96,6 +96,14 @@ sdk::common::ExportResult OStreamSpanExporter::Export( return sdk::common::ExportResult::kSuccess; } +void OStreamSpanExporter::Export( + const opentelemetry::nostd::span> &spans, + std::function &&result_callback) noexcept +{ + auto result = Export(spans); + result_callback(result); +} + bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h index a28e6fca85..df02f2e215 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h @@ -52,6 +52,15 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter sdk::common::ExportResult Export( const nostd::span> &spans) noexcept override; + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export(const nostd::span> &spans, + std::function + &&result_callback) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h index a8aeda85b8..13aba881bc 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_exporter.h @@ -55,6 +55,16 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records) noexcept override; + /** + * Exports a vector of log records asynchronously. + * @param records A list of log records. + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &records, + std::function &&result_callback) noexcept + override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return. diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h index 1a199bed48..a4b90bc13a 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -11,14 +11,21 @@ #include "opentelemetry/common/spin_lock_mutex.h" #include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/common/exporter_utils.h" #include "opentelemetry/exporters/otlp/otlp_environment.h" +#include #include +#include +#include +#include +#include #include #include #include +#include OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -71,20 +78,25 @@ struct OtlpHttpClientOptions // Additional HTTP headers OtlpHeaders http_headers = GetOtlpDefaultHeaders(); + // Concurrent sessions + std::size_t concurrent_sessions = 8; + inline OtlpHttpClientOptions(nostd::string_view input_url, HttpRequestContentType input_content_type, JsonBytesMappingKind input_json_bytes_mapping, bool input_use_json_name, bool input_console_debug, std::chrono::system_clock::duration input_timeout, - const OtlpHeaders &input_http_headers) + const OtlpHeaders &input_http_headers, + std::size_t input_concurrent_sessions = 8) : url(input_url), content_type(input_content_type), json_bytes_mapping(input_json_bytes_mapping), use_json_name(input_use_json_name), console_debug(input_console_debug), timeout(input_timeout), - http_headers(input_http_headers) + http_headers(input_http_headers), + concurrent_sessions(input_concurrent_sessions) {} }; @@ -99,13 +111,25 @@ class OtlpHttpClient */ explicit OtlpHttpClient(OtlpHttpClientOptions &&options); + ~OtlpHttpClient(); + /** - * Export + * Sync export * @param message message to export, it should be ExportTraceServiceRequest, * ExportMetricsServiceRequest or ExportLogsServiceRequest */ sdk::common::ExportResult Export(const google::protobuf::Message &message) noexcept; + /** + * Async export + * @param message message to export, it should be ExportTraceServiceRequest, + * ExportMetricsServiceRequest or ExportLogsServiceRequest + * @param result_callback callback to call when the exporting is done + */ + void Export( + const google::protobuf::Message &message, + std::function &&result_callback) noexcept; + /** * Shut down the HTTP client. * @param timeout an optional timeout, the default timeout of 0 means that no @@ -114,19 +138,68 @@ class OtlpHttpClient */ bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept; + /** + * @brief Release the lifetime of specify session. + * + * @param session the session to release + */ + void ReleaseSession(const opentelemetry::ext::http::client::Session &session) noexcept; + private: - // Stores if this HTTP client had its Shutdown() method called - bool is_shutdown_ = false; + struct HttpSessionData + { + std::shared_ptr session; + std::shared_ptr event_handle; + + inline HttpSessionData() = default; + + inline explicit HttpSessionData( + std::shared_ptr &&input_session, + std::shared_ptr &&input_handle) + { + session.swap(input_session); + event_handle.swap(input_handle); + } + + inline explicit HttpSessionData(HttpSessionData &&other) + { + session.swap(other.session); + event_handle.swap(other.event_handle); + } + + inline HttpSessionData &operator=(HttpSessionData &&other) noexcept + { + session.swap(other.session); + event_handle.swap(other.event_handle); + return *this; + } + }; - // The configuration options associated with this HTTP client. - const OtlpHttpClientOptions options_; + /** + * @brief Create a Session object or return a error result + * + * @param message The message to send + */ + nostd::variant createSession( + const google::protobuf::Message &message, + std::function &&result_callback) noexcept; + + /** + * Add http session and hold it's lifetime. + * @param session_data the session to add + */ + void addSession(HttpSessionData &&session_data) noexcept; + + /** + * @brief Real delete all sessions and event handles. + * @note This function is called in the same thread where we create sessions and handles + * + * @return return true if there are more sessions to delete + */ + bool cleanupGCSessions() noexcept; - // Object that stores the HTTP sessions that have been created - std::shared_ptr http_client_; - // Cached parsed URI - std::string http_uri_; - mutable opentelemetry::common::SpinLockMutex lock_; bool isShutdown() const noexcept; + // For testing friend class OtlpHttpExporterTestPeer; friend class OtlpHttpLogExporterTestPeer; @@ -138,6 +211,29 @@ class OtlpHttpClient */ OtlpHttpClient(OtlpHttpClientOptions &&options, std::shared_ptr http_client); + + // Stores if this HTTP client had its Shutdown() method called + bool is_shutdown_; + + // The configuration options associated with this HTTP client. + const OtlpHttpClientOptions options_; + + // Object that stores the HTTP sessions that have been created + std::shared_ptr http_client_; + + // Cached parsed URI + std::string http_uri_; + + // Running sessions and event handles + std::unordered_map + running_sessions_; + // Sessions and event handles that are waiting to be deleted + std::list gc_sessions_; + // Lock for running_sessions_, gc_sessions_ and http_client_ + std::recursive_mutex session_manager_lock_; + // Condition variable and mutex to control the concurrency count of running sessions + std::mutex session_waker_lock_; + std::condition_variable session_waker_; }; } // namespace otlp } // namespace exporter diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index 3e6a521194..3e2a783eff 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -11,6 +11,7 @@ #include "opentelemetry/exporters/otlp/otlp_environment.h" #include +#include #include #include @@ -50,6 +51,9 @@ struct OtlpHttpExporterOptions // Additional HTTP headers OtlpHeaders http_headers = GetOtlpDefaultHeaders(); + + // Concurrent sessions + std::size_t concurrent_sessions = 8; }; /** @@ -82,6 +86,16 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &spans, + std::function &&result_callback) noexcept + override; + /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h index d330e62be4..320a9bb963 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h @@ -11,6 +11,7 @@ # include "opentelemetry/exporters/otlp/otlp_environment.h" # include +# include # include # include @@ -50,6 +51,9 @@ struct OtlpHttpLogExporterOptions // Additional HTTP headers OtlpHeaders http_headers = GetOtlpDefaultLogHeaders(); + + // Concurrent sessions + std::size_t concurrent_sessions = 8; }; /** @@ -83,6 +87,16 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter const nostd::span> &records) noexcept override; + /** + * Exports a vector of log records asynchronously. + * @param records A list of log records. + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &records, + std::function &&result_callback) noexcept + override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 32f4a60a52..2192533f70 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -143,6 +143,16 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } +void OtlpGrpcExporter::Export( + const nostd::span> &spans, + std::function &&result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN( + "[OTLP TRACE GRPC Exporter] async not supported. Making sync interface call"); + auto status = Export(spans); + result_callback(status); +} + bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_grpc_log_exporter.cc b/exporters/otlp/src/otlp_grpc_log_exporter.cc index 38bfb0a5bb..06ee5de7ba 100644 --- a/exporters/otlp/src/otlp_grpc_log_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_exporter.cc @@ -161,6 +161,16 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export( return sdk::common::ExportResult::kSuccess; } +void OtlpGrpcLogExporter::Export( + const nostd::span> &logs, + std::function &&result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN( + "[OTLP LOG GRPC Exporter] async not supported. Making sync interface call"); + auto status = Export(logs); + result_callback(status); +} + bool OtlpGrpcLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 544f74ca7c..191939a609 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -14,7 +14,6 @@ #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" -#include #include "google/protobuf/message.h" #include "google/protobuf/reflection.h" #include "google/protobuf/stubs/common.h" @@ -35,9 +34,11 @@ LIBPROTOBUF_EXPORT void Base64Escape(StringPiece src, std::string *dest); #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" +#include "opentelemetry/common/timestamp.h" #include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/sdk_config.h" +#include #include #include #include @@ -70,7 +71,12 @@ class ResponseHandler : public http_client::EventHandler /** * Creates a response handler, that by default doesn't display to console */ - ResponseHandler(bool console_debug = false) : console_debug_{console_debug} {} + ResponseHandler(std::function &&callback, + bool console_debug = false) + : result_callback_{std::move(callback)}, console_debug_{console_debug} + { + stoping_.store(false); + } /** * Automatically called when the response is received, store the body into a string and notify any @@ -100,20 +106,15 @@ class ResponseHandler : public http_client::EventHandler // Set the response_received_ flag to true and notify any threads waiting on this result response_received_ = true; - stop_waiting_ = true; } - cv_.notify_all(); - } - /**resource - * A method the user calls to block their thread until the response is received. The longest - * duration is the timeout of the request, set by SetTimeoutMs() - */ - bool waitForResponse() - { - std::unique_lock lk(mutex_); - cv_.wait(lk, [this] { return stop_waiting_; }); - return response_received_; + { + bool expected = false; + if (stoping_.compare_exchange_strong(expected, true, std::memory_order_release)) + { + Unbind(sdk::common::ExportResult::kSuccess); + } + } } /** @@ -130,7 +131,8 @@ class ResponseHandler : public http_client::EventHandler void OnEvent(http_client::SessionState state, opentelemetry::nostd::string_view reason) noexcept override { - // need to modify stop_waiting_ under lock before calling notify_all + // need to modify stoping_ under lock before calling notify_all + bool need_stop = false; switch (state) { case http_client::SessionState::CreateFailed: @@ -140,8 +142,7 @@ class ResponseHandler : public http_client::EventHandler case http_client::SessionState::TimedOut: case http_client::SessionState::NetworkError: case http_client::SessionState::Cancelled: { - std::unique_lock lk(mutex_); - stop_waiting_ = true; + need_stop = true; } break; @@ -152,10 +153,16 @@ class ResponseHandler : public http_client::EventHandler // If any failure event occurs, release the condition variable to unblock main thread switch (state) { - case http_client::SessionState::CreateFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: session create failed"); - cv_.notify_all(); - break; + case http_client::SessionState::CreateFailed: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: session create failed."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; case http_client::SessionState::Created: if (console_debug_) @@ -178,10 +185,16 @@ class ResponseHandler : public http_client::EventHandler } break; - case http_client::SessionState::ConnectFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: connection failed"); - cv_.notify_all(); - break; + case http_client::SessionState::ConnectFailed: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: connection failed."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; case http_client::SessionState::Connected: if (console_debug_) @@ -197,10 +210,16 @@ class ResponseHandler : public http_client::EventHandler } break; - case http_client::SessionState::SendFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request send failed"); - cv_.notify_all(); - break; + case http_client::SessionState::SendFailed: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: request send failed."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; case http_client::SessionState::Response: if (console_debug_) @@ -209,20 +228,38 @@ class ResponseHandler : public http_client::EventHandler } break; - case http_client::SessionState::SSLHandshakeFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: SSL handshake failed"); - cv_.notify_all(); - break; + case http_client::SessionState::SSLHandshakeFailed: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: SSL handshake failed."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; - case http_client::SessionState::TimedOut: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request time out"); - cv_.notify_all(); - break; + case http_client::SessionState::TimedOut: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: request time out."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; - case http_client::SessionState::NetworkError: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: network error"); - cv_.notify_all(); - break; + case http_client::SessionState::NetworkError: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: network error."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; case http_client::SessionState::ReadError: if (console_debug_) @@ -238,23 +275,68 @@ class ResponseHandler : public http_client::EventHandler } break; - case http_client::SessionState::Cancelled: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: (manually) cancelled\n"); - cv_.notify_all(); - break; + case http_client::SessionState::Cancelled: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: (manually) cancelled."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; default: break; } + + if (need_stop) + { + bool expected = false; + if (stoping_.compare_exchange_strong(expected, true, std::memory_order_release)) + { + Unbind(sdk::common::ExportResult::kFailure); + } + } } + void Unbind(sdk::common::ExportResult result) + { + // ReleaseSession may destroy this object, so we need to move owner and session into stack + // first. + OtlpHttpClient *owner = owner_; + const opentelemetry::ext::http::client::Session *session = session_; + + owner_ = nullptr; + session_ = nullptr; + + if (nullptr != owner && nullptr != session) + { + // Release the session at last + owner->ReleaseSession(*session); + + if (result_callback_) + { + result_callback_(result); + } + } + } + + void Bind(OtlpHttpClient *owner, + const opentelemetry::ext::http::client::Session &session) noexcept + { + session_ = &session; + owner_ = owner; + }; + private: // Define a condition variable and mutex - std::condition_variable cv_; std::mutex mutex_; + OtlpHttpClient *owner_ = nullptr; + const opentelemetry::ext::http::client::Session *session_ = nullptr; // Whether notify has been called - bool stop_waiting_ = false; + std::atomic stoping_; // Whether the response has been received bool response_received_ = false; @@ -262,6 +344,9 @@ class ResponseHandler : public http_client::EventHandler // A string to store the response body std::string body_ = ""; + // Result callback when in async mode + std::function result_callback_; + // Whether to print the results from the callback bool console_debug_ = false; }; @@ -562,31 +647,185 @@ void ConvertListFieldToJson(nlohmann::json &value, } // namespace OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options) - : options_(options), http_client_(http_client::HttpClientFactory::Create()) + : is_shutdown_(false), options_(options), http_client_(http_client::HttpClientFactory::Create()) {} +OtlpHttpClient::~OtlpHttpClient() +{ + if (!isShutdown()) + { + Shutdown(); + } + + // Wait for all the sessions to finish + std::unique_lock lock(session_waker_lock_); + session_waker_.wait(lock, [this] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.empty(); + }); + + // And then remove all session datas + while (cleanupGCSessions()) + ; +} + OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options, std::shared_ptr http_client) - : options_(options), http_client_(http_client) + : is_shutdown_(false), options_(options), http_client_(http_client) {} // ----------------------------- HTTP Client methods ------------------------------ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( const google::protobuf::Message &message) noexcept { - // Return failure if this exporter has been shutdown - if (isShutdown()) + opentelemetry::sdk::common::ExportResult result = + opentelemetry::sdk::common::ExportResult::kSuccess; + auto session = + createSession(message, [&result](opentelemetry::sdk::common::ExportResult export_result) { + result = export_result; + return export_result == opentelemetry::sdk::common::ExportResult::kSuccess; + }); + + if (opentelemetry::nostd::holds_alternative(session)) { - const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown"; - if (options_.console_debug) + return opentelemetry::nostd::get(session); + } + + // Wait for the response to be received + if (options_.console_debug) + { + OTEL_INTERNAL_LOG_DEBUG( + "[OTLP HTTP Client] DEBUG: Waiting for response from " + << options_.url << " (timeout = " + << std::chrono::duration_cast(options_.timeout).count() + << " milliseconds)"); + } + + addSession(std::move(opentelemetry::nostd::get(session))); + + // Wait for any session to finish if there are to many sessions + std::unique_lock lock(session_waker_lock_); + bool wait_successful = session_waker_.wait_for(lock, options_.timeout, [this] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.size() <= 0; + }); + + cleanupGCSessions(); + + // If an error occurred with the HTTP request + if (!wait_successful) + { + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + return result; +} + +void OtlpHttpClient::Export( + const google::protobuf::Message &message, + std::function &&result_callback) noexcept +{ + auto session = createSession(message, std::move(result_callback)); + if (opentelemetry::nostd::holds_alternative(session)) + { + if (result_callback) { - std::cerr << error_message << std::endl; + result_callback(opentelemetry::nostd::get(session)); } - OTEL_INTERNAL_LOG_ERROR(error_message); + return; + } - return opentelemetry::sdk::common::ExportResult::kFailure; + addSession(std::move(opentelemetry::nostd::get(session))); + + // Wait for the response to be received + if (options_.console_debug) + { + OTEL_INTERNAL_LOG_DEBUG( + "[OTLP HTTP Client] DEBUG: Waiting for response from " + << options_.url << " (timeout = " + << std::chrono::duration_cast(options_.timeout).count() + << " milliseconds)"); + } + + // Wait for any session to finish if there are to many sessions + std::unique_lock lock(session_waker_lock_); + session_waker_.wait_for(lock, options_.timeout, [this] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.size() <= options_.concurrent_sessions; + }); + + cleanupGCSessions(); +} + +bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept +{ + { + std::lock_guard guard{session_manager_lock_}; + is_shutdown_ = true; + + // Shutdown the session manager + http_client_->CancelAllSessions(); + http_client_->FinishAllSessions(); + } + + // ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented + // in type 'long int' here. So we reset timeout to meet signed long int limit here. + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + + // Wait for all the sessions to finish + std::unique_lock lock(session_waker_lock_); + if (timeout <= std::chrono::microseconds::zero()) + { + session_waker_.wait(lock, [this] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.empty(); + }); + } + else + { + session_waker_.wait_for(lock, timeout, [this] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.empty(); + }); + } + + while (cleanupGCSessions()) + ; + return true; +} + +void OtlpHttpClient::ReleaseSession( + const opentelemetry::ext::http::client::Session &session) noexcept +{ + bool has_session = false; + + { + std::lock_guard guard{session_manager_lock_}; + + auto session_iter = running_sessions_.find(&session); + if (session_iter != running_sessions_.end()) + { + // Move session and handle into gc list, and they will be destroyed later + gc_sessions_.emplace_back(std::move(session_iter->second)); + running_sessions_.erase(session_iter); + + has_session = true; + } + } + + if (has_session) + { + session_waker_.notify_all(); } +} +opentelemetry::nostd::variant +OtlpHttpClient::createSession( + const google::protobuf::Message &message, + std::function &&result_callback) noexcept +{ // Parse uri and store it to cache if (http_uri_.empty()) { @@ -654,6 +893,20 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( } // Send the request + std::lock_guard guard{session_manager_lock_}; + // Return failure if this exporter has been shutdown + if (isShutdown()) + { + const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown"; + if (options_.console_debug) + { + std::cerr << error_message << std::endl; + } + OTEL_INTERNAL_LOG_ERROR(error_message); + + return opentelemetry::sdk::common::ExportResult::kFailure; + } + auto session = http_client_->CreateSession(options_.url); auto request = session->CreateRequest(); @@ -668,50 +921,51 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( request->ReplaceHeader("Content-Type", content_type); // Send the request - std::unique_ptr handler(new ResponseHandler(options_.console_debug)); - session->SendRequest(*handler); + return HttpSessionData{ + std::move(session), + std::shared_ptr{ + new ResponseHandler(std::move(result_callback), options_.console_debug)}}; +} - // Wait for the response to be received - if (options_.console_debug) +void OtlpHttpClient::addSession(HttpSessionData &&session_data) noexcept +{ + if (!session_data.session || !session_data.event_handle) { - OTEL_INTERNAL_LOG_DEBUG( - "[OTLP HTTP Client] DEBUG: Waiting for response from " - << options_.url << " (timeout = " - << std::chrono::duration_cast(options_.timeout).count() - << " milliseconds)"); + return; } - bool write_successful = handler->waitForResponse(); - // End the session - session->FinishSession(); + opentelemetry::ext::http::client::Session *key = session_data.session.get(); + ResponseHandler *handle = static_cast(session_data.event_handle.get()); - // If an error occurred with the HTTP request - if (!write_successful) - { - // TODO: retry logic - return opentelemetry::sdk::common::ExportResult::kFailure; - } + handle->Bind(this, *key); + + HttpSessionData &store_session_data = running_sessions_[key]; + store_session_data = std::move(session_data); - return opentelemetry::sdk::common::ExportResult::kSuccess; + // Send request after the session is added + key->SendRequest(store_session_data.event_handle); } -bool OtlpHttpClient::Shutdown(std::chrono::microseconds) noexcept +bool OtlpHttpClient::cleanupGCSessions() noexcept { + std::lock_guard guard{session_manager_lock_}; + std::list gc_sessions; + gc_sessions_.swap(gc_sessions); + + for (auto &session_data : gc_sessions) { - const std::lock_guard locked(lock_); - is_shutdown_ = true; + // FinishSession must be called with same thread and before the session is destroyed + if (session_data.session) + { + session_data.session->FinishSession(); + } } - // Shutdown the session manager - http_client_->CancelAllSessions(); - http_client_->FinishAllSessions(); - - return true; + return !gc_sessions_.empty(); } bool OtlpHttpClient::isShutdown() const noexcept { - const std::lock_guard locked(lock_); return is_shutdown_; } diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 92155dd00d..3f38443adc 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -11,6 +11,8 @@ #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" +#include "opentelemetry/sdk/common/global_log_handler.h" + namespace nostd = opentelemetry::nostd; OPENTELEMETRY_BEGIN_NAMESPACE @@ -29,7 +31,8 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options) options.use_json_name, options.console_debug, options.timeout, - options.http_headers))) + options.http_headers, + options.concurrent_sessions))) {} OtlpHttpExporter::OtlpHttpExporter(std::unique_ptr http_client) @@ -56,6 +59,20 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( return http_client_->Export(service_request); } +void OtlpHttpExporter::Export( + const nostd::span> &spans, + std::function &&result_callback) noexcept +{ + if (spans.empty()) + { + return; + } + + proto::collector::trace::v1::ExportTraceServiceRequest service_request; + OtlpRecordableUtils::PopulateRequest(spans, &service_request); + http_client_->Export(service_request, std::move(result_callback)); +} + bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept { return http_client_->Shutdown(timeout); diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc index 436c77beaa..51fb7a96a5 100644 --- a/exporters/otlp/src/otlp_http_log_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -13,6 +13,8 @@ # include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" +# include "opentelemetry/sdk/common/global_log_handler.h" + namespace nostd = opentelemetry::nostd; OPENTELEMETRY_BEGIN_NAMESPACE @@ -31,7 +33,8 @@ OtlpHttpLogExporter::OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &optio options.use_json_name, options.console_debug, options.timeout, - options.http_headers))) + options.http_headers, + options.concurrent_sessions))) {} OtlpHttpLogExporter::OtlpHttpLogExporter(std::unique_ptr http_client) @@ -57,6 +60,19 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export( return http_client_->Export(service_request); } +void OtlpHttpLogExporter::Export( + const nostd::span> &logs, + std::function &&result_callback) noexcept +{ + if (logs.empty()) + { + return; + } + proto::collector::logs::v1::ExportLogsServiceRequest service_request; + OtlpRecordableUtils::PopulateRequest(logs, &service_request); + http_client_->Export(service_request, std::move(result_callback)); +} + bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { return http_client_->Shutdown(timeout); diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index ef0b5a509e..8f38685963 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -3,6 +3,9 @@ #ifndef HAVE_CPP_STDLIB +# include +# include + # include "opentelemetry/exporters/otlp/otlp_http_exporter.h" # include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" @@ -81,164 +84,218 @@ class OtlpHttpExporterTestPeer : public ::testing::Test auto http_client = http_client::HttpClientFactory::CreateNoSend(); return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type), http_client), http_client}; } + + void ExportJsonIntegrationTest(bool is_async) + { + auto mock_otlp_client = + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}}; + resource_attributes["bool_value"] = true; + resource_attributes["int32_value"] = static_cast(1); + resource_attributes["uint32_value"] = static_cast(2); + resource_attributes["int64_value"] = static_cast(0x1100000000LL); + resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); + resource_attributes["double_value"] = static_cast(3.1); + resource_attributes["vec_bool_value"] = std::vector{true, false, true}; + resource_attributes["vec_int32_value"] = std::vector{1, 2}; + resource_attributes["vec_uint32_value"] = std::vector{3, 4}; + resource_attributes["vec_int64_value"] = std::vector{5, 6}; + resource_attributes["vec_uint64_value"] = std::vector{7, 8}; + resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; + resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; + auto resource = resource::Resource::Create(resource_attributes); + + auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); + processor_opts.max_export_batch_size = 5; + processor_opts.max_queue_size = 5; + processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); + processor_opts.is_export_async = is_async; + auto processor = std::unique_ptr( + new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); + auto provider = nostd::shared_ptr( + new sdk::trace::TracerProvider(std::move(processor), resource)); + + std::string report_trace_id; + + char trace_id_hex[2 * trace_api::TraceId::kSize] = {0}; + auto tracer = provider->GetTracer("test"); + auto parent_span = tracer->StartSpan("Test parent span"); + + trace_api::StartSpanOptions child_span_opts = {}; + child_span_opts.parent = parent_span->GetContext(); + + auto child_span = tracer->StartSpan("Test child span", child_span_opts); + + nostd::get(child_span_opts.parent) + .trace_id() + .ToLowerBase16(MakeSpan(trace_id_hex)); + report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, is_async]( + std::shared_ptr callback) { + auto check_json = + nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); + auto resource_span = *check_json["resource_spans"].begin(); + auto instrumentation_library_span = + *resource_span["instrumentation_library_spans"].begin(); + auto span = *instrumentation_library_span["spans"].begin(); + auto received_trace_id = span["trace_id"].get(); + EXPECT_EQ(received_trace_id, report_trace_id); + + auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); + if (custom_header != mock_session->GetRequest()->headers_.end()) + { + EXPECT_EQ("Custom-Header-Value", custom_header->second); + } + + // let the otlp_http_client to continue + if (is_async) + { + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + http_client::nosend::Response response; + response.Finish(*callback.get()); + }}; + async_finish.detach(); + } + else + { + http_client::nosend::Response response; + response.Finish(*callback.get()); + } + }); + + child_span->End(); + parent_span->End(); + + static_cast(provider.get())->ForceFlush(); + } + + void ExportBinaryIntegrationTest(bool is_async) + { + auto mock_otlp_client = + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}}; + resource_attributes["bool_value"] = true; + resource_attributes["int32_value"] = static_cast(1); + resource_attributes["uint32_value"] = static_cast(2); + resource_attributes["int64_value"] = static_cast(0x1100000000LL); + resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); + resource_attributes["double_value"] = static_cast(3.1); + resource_attributes["vec_bool_value"] = std::vector{true, false, true}; + resource_attributes["vec_int32_value"] = std::vector{1, 2}; + resource_attributes["vec_uint32_value"] = std::vector{3, 4}; + resource_attributes["vec_int64_value"] = std::vector{5, 6}; + resource_attributes["vec_uint64_value"] = std::vector{7, 8}; + resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; + resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; + auto resource = resource::Resource::Create(resource_attributes); + + auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); + processor_opts.max_export_batch_size = 5; + processor_opts.max_queue_size = 5; + processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); + processor_opts.is_export_async = is_async; + + auto processor = std::unique_ptr( + new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); + auto provider = nostd::shared_ptr( + new sdk::trace::TracerProvider(std::move(processor), resource)); + + std::string report_trace_id; + + uint8_t trace_id_binary[trace_api::TraceId::kSize] = {0}; + auto tracer = provider->GetTracer("test"); + auto parent_span = tracer->StartSpan("Test parent span"); + + trace_api::StartSpanOptions child_span_opts = {}; + child_span_opts.parent = parent_span->GetContext(); + + auto child_span = tracer->StartSpan("Test child span", child_span_opts); + nostd::get(child_span_opts.parent) + .trace_id() + .CopyBytesTo(MakeSpan(trace_id_binary)); + report_trace_id.assign(reinterpret_cast(trace_id_binary), sizeof(trace_id_binary)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, is_async]( + std::shared_ptr callback) { + opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request_body; + request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], + static_cast(mock_session->GetRequest()->body_.size())); + auto received_trace_id = + request_body.resource_spans(0).instrumentation_library_spans(0).spans(0).trace_id(); + EXPECT_EQ(received_trace_id, report_trace_id); + + auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); + if (custom_header != mock_session->GetRequest()->headers_.end()) + { + EXPECT_EQ("Custom-Header-Value", custom_header->second); + } + + // let the otlp_http_client to continue + if (is_async) + { + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + http_client::nosend::Response response; + response.Finish(*callback.get()); + }}; + async_finish.detach(); + } + else + { + http_client::nosend::Response response; + response.Finish(*callback.get()); + } + }); + + child_span->End(); + parent_span->End(); + + static_cast(provider.get())->ForceFlush(); + } }; // Create spans, let processor call Export() -TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTest) +TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestSync) { - auto mock_otlp_client = - OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); - auto mock_otlp_http_client = mock_otlp_client.first; - auto client = mock_otlp_client.second; - auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); - - resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}}; - resource_attributes["bool_value"] = true; - resource_attributes["int32_value"] = static_cast(1); - resource_attributes["uint32_value"] = static_cast(2); - resource_attributes["int64_value"] = static_cast(0x1100000000LL); - resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); - resource_attributes["double_value"] = static_cast(3.1); - resource_attributes["vec_bool_value"] = std::vector{true, false, true}; - resource_attributes["vec_int32_value"] = std::vector{1, 2}; - resource_attributes["vec_uint32_value"] = std::vector{3, 4}; - resource_attributes["vec_int64_value"] = std::vector{5, 6}; - resource_attributes["vec_uint64_value"] = std::vector{7, 8}; - resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; - resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; - auto resource = resource::Resource::Create(resource_attributes); - - auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); - processor_opts.max_export_batch_size = 5; - processor_opts.max_queue_size = 5; - processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); - auto processor = std::unique_ptr( - new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); - auto provider = nostd::shared_ptr( - new sdk::trace::TracerProvider(std::move(processor), resource)); - - std::string report_trace_id; - - char trace_id_hex[2 * trace_api::TraceId::kSize] = {0}; - auto tracer = provider->GetTracer("test"); - auto parent_span = tracer->StartSpan("Test parent span"); - - trace_api::StartSpanOptions child_span_opts = {}; - child_span_opts.parent = parent_span->GetContext(); - - auto child_span = tracer->StartSpan("Test child span", child_span_opts); - - nostd::get(child_span_opts.parent) - .trace_id() - .ToLowerBase16(MakeSpan(trace_id_hex)); - report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); - - auto no_send_client = std::static_pointer_cast(client); - auto mock_session = - std::static_pointer_cast(no_send_client->session_); - EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, - report_trace_id](opentelemetry::ext::http::client::EventHandler &callback) { - auto check_json = nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); - auto resource_span = *check_json["resource_spans"].begin(); - auto instrumentation_library_span = *resource_span["instrumentation_library_spans"].begin(); - auto span = *instrumentation_library_span["spans"].begin(); - auto received_trace_id = span["trace_id"].get(); - EXPECT_EQ(received_trace_id, report_trace_id); - - auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); - ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); - if (custom_header != mock_session->GetRequest()->headers_.end()) - { - EXPECT_EQ("Custom-Header-Value", custom_header->second); - } - // let the otlp_http_client to continue - http_client::nosend::Response response; - callback.OnResponse(response); - }); - - child_span->End(); - parent_span->End(); + ExportJsonIntegrationTest(false); +} + +TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestAsync) +{ + ExportJsonIntegrationTest(true); } // Create spans, let processor call Export() -TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTest) +TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestSync) +{ + ExportBinaryIntegrationTest(false); +} + +TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestAsync) { - auto mock_otlp_client = - OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); - auto mock_otlp_http_client = mock_otlp_client.first; - auto client = mock_otlp_client.second; - auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); - - resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}}; - resource_attributes["bool_value"] = true; - resource_attributes["int32_value"] = static_cast(1); - resource_attributes["uint32_value"] = static_cast(2); - resource_attributes["int64_value"] = static_cast(0x1100000000LL); - resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); - resource_attributes["double_value"] = static_cast(3.1); - resource_attributes["vec_bool_value"] = std::vector{true, false, true}; - resource_attributes["vec_int32_value"] = std::vector{1, 2}; - resource_attributes["vec_uint32_value"] = std::vector{3, 4}; - resource_attributes["vec_int64_value"] = std::vector{5, 6}; - resource_attributes["vec_uint64_value"] = std::vector{7, 8}; - resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; - resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; - auto resource = resource::Resource::Create(resource_attributes); - - auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); - processor_opts.max_export_batch_size = 5; - processor_opts.max_queue_size = 5; - processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); - - auto processor = std::unique_ptr( - new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); - auto provider = nostd::shared_ptr( - new sdk::trace::TracerProvider(std::move(processor), resource)); - - std::string report_trace_id; - - uint8_t trace_id_binary[trace_api::TraceId::kSize] = {0}; - auto tracer = provider->GetTracer("test"); - auto parent_span = tracer->StartSpan("Test parent span"); - - trace_api::StartSpanOptions child_span_opts = {}; - child_span_opts.parent = parent_span->GetContext(); - - auto child_span = tracer->StartSpan("Test child span", child_span_opts); - nostd::get(child_span_opts.parent) - .trace_id() - .CopyBytesTo(MakeSpan(trace_id_binary)); - report_trace_id.assign(reinterpret_cast(trace_id_binary), sizeof(trace_id_binary)); - - auto no_send_client = std::static_pointer_cast(client); - auto mock_session = - std::static_pointer_cast(no_send_client->session_); - EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, - report_trace_id](opentelemetry::ext::http::client::EventHandler &callback) { - opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request_body; - request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], - static_cast(mock_session->GetRequest()->body_.size())); - auto received_trace_id = - request_body.resource_spans(0).instrumentation_library_spans(0).spans(0).trace_id(); - EXPECT_EQ(received_trace_id, report_trace_id); - - auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); - ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); - if (custom_header != mock_session->GetRequest()->headers_.end()) - { - EXPECT_EQ("Custom-Header-Value", custom_header->second); - } - // let the otlp_http_client to continue - http_client::nosend::Response response; - callback.OnResponse(response); - }); - - child_span->End(); - parent_span->End(); + ExportBinaryIntegrationTest(true); } // Test exporter configuration options diff --git a/exporters/otlp/test/otlp_http_log_exporter_test.cc b/exporters/otlp/test/otlp_http_log_exporter_test.cc index ffd1a9a0f3..50034ed12d 100644 --- a/exporters/otlp/test/otlp_http_log_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_exporter_test.cc @@ -4,6 +4,9 @@ #ifndef HAVE_CPP_STDLIB # ifdef ENABLE_LOGS_PREVIEW +# include +# include + # include "opentelemetry/exporters/otlp/otlp_http_log_exporter.h" # include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" @@ -82,180 +85,241 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test auto http_client = http_client::HttpClientFactory::CreateNoSend(); return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type), http_client), http_client}; } + + void ExportJsonIntegrationTest(bool is_async) + { + auto mock_otlp_client = + OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + bool attribute_storage_bool_value[] = {true, false, true}; + int32_t attribute_storage_int32_value[] = {1, 2}; + uint32_t attribute_storage_uint32_value[] = {3, 4}; + int64_t attribute_storage_int64_value[] = {5, 6}; + uint64_t attribute_storage_uint64_value[] = {7, 8}; + double attribute_storage_double_value[] = {3.2, 3.3}; + std::string attribute_storage_string_value[] = {"vector", "string"}; + + auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); + provider->AddProcessor( + std::unique_ptr(new sdk::logs::BatchLogProcessor( + std::move(exporter), 5, std::chrono::milliseconds(256), 5, is_async))); + + std::string report_trace_id; + std::string report_span_id; + uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + char trace_id_hex[2 * opentelemetry::trace::TraceId::kSize] = {0}; + opentelemetry::trace::TraceId trace_id{trace_id_bin}; + uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', + '3', '2', '1', '0'}; + char span_id_hex[2 * opentelemetry::trace::SpanId::kSize] = {0}; + opentelemetry::trace::SpanId span_id{span_id_bin}; + + const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; + auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); + + trace_id.ToLowerBase16(MakeSpan(trace_id_hex)); + report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); + + span_id.ToLowerBase16(MakeSpan(span_id_hex)); + report_span_id.assign(span_id_hex, sizeof(span_id_hex)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, report_span_id, is_async]( + std::shared_ptr callback) { + auto check_json = + nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); + auto resource_logs = *check_json["resource_logs"].begin(); + auto instrumentation_library_span = + *resource_logs["instrumentation_library_logs"].begin(); + auto log = *instrumentation_library_span["logs"].begin(); + auto received_trace_id = log["trace_id"].get(); + auto received_span_id = log["span_id"].get(); + EXPECT_EQ(received_trace_id, report_trace_id); + EXPECT_EQ(received_span_id, report_span_id); + EXPECT_EQ("Log name", log["name"].get()); + EXPECT_EQ("Log message", log["body"]["string_value"].get()); + EXPECT_LE(15, log["attributes"].size()); + auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); + if (custom_header != mock_session->GetRequest()->headers_.end()) + { + EXPECT_EQ("Custom-Header-Value", custom_header->second); + } + + // let the otlp_http_client to continue + if (is_async) + { + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + http_client::nosend::Response response; + response.Finish(*callback.get()); + }}; + async_finish.detach(); + } + else + { + http_client::nosend::Response response; + response.Finish(*callback.get()); + } + }); + + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", attribute_storage_bool_value}, + {"vec_int32_value", attribute_storage_int32_value}, + {"vec_uint32_value", attribute_storage_uint32_value}, + {"vec_int64_value", attribute_storage_int64_value}, + {"vec_uint64_value", attribute_storage_uint64_value}, + {"vec_double_value", attribute_storage_double_value}, + {"vec_string_value", attribute_storage_string_value}}, + trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + provider->ForceFlush(); + } + + void ExportBinaryIntegrationTest(bool is_async) + { + auto mock_otlp_client = + OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + bool attribute_storage_bool_value[] = {true, false, true}; + int32_t attribute_storage_int32_value[] = {1, 2}; + uint32_t attribute_storage_uint32_value[] = {3, 4}; + int64_t attribute_storage_int64_value[] = {5, 6}; + uint64_t attribute_storage_uint64_value[] = {7, 8}; + double attribute_storage_double_value[] = {3.2, 3.3}; + std::string attribute_storage_string_value[] = {"vector", "string"}; + + sdk::logs::BatchLogProcessorOptions processor_options; + processor_options.max_export_batch_size = 5; + processor_options.max_queue_size = 5; + processor_options.schedule_delay_millis = std::chrono::milliseconds(256); + processor_options.is_export_async = is_async; + auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); + provider->AddProcessor(std::unique_ptr( + new sdk::logs::BatchLogProcessor(std::move(exporter), processor_options))); + + std::string report_trace_id; + std::string report_span_id; + uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + opentelemetry::trace::TraceId trace_id{trace_id_bin}; + uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', + '3', '2', '1', '0'}; + opentelemetry::trace::SpanId span_id{span_id_bin}; + + const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; + auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); + + report_trace_id.assign(reinterpret_cast(trace_id_bin), sizeof(trace_id_bin)); + report_span_id.assign(reinterpret_cast(span_id_bin), sizeof(span_id_bin)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, report_span_id, is_async]( + std::shared_ptr callback) { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request_body; + request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], + static_cast(mock_session->GetRequest()->body_.size())); + auto received_log = request_body.resource_logs(0).instrumentation_library_logs(0).logs(0); + EXPECT_EQ(received_log.trace_id(), report_trace_id); + EXPECT_EQ(received_log.span_id(), report_span_id); + EXPECT_EQ("Log name", received_log.name()); + EXPECT_EQ("Log message", received_log.body().string_value()); + EXPECT_LE(15, received_log.attributes_size()); + bool check_service_name = false; + for (auto &attribute : received_log.attributes()) + { + if ("service.name" == attribute.key()) + { + check_service_name = true; + EXPECT_EQ("unit_test_service", attribute.value().string_value()); + } + } + ASSERT_TRUE(check_service_name); + + // let the otlp_http_client to continue + if (is_async) + { + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + http_client::nosend::Response response; + response.Finish(*callback.get()); + }}; + async_finish.detach(); + } + else + { + http_client::nosend::Response response; + response.Finish(*callback.get()); + } + }); + + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", attribute_storage_bool_value}, + {"vec_int32_value", attribute_storage_int32_value}, + {"vec_uint32_value", attribute_storage_uint32_value}, + {"vec_int64_value", attribute_storage_int64_value}, + {"vec_uint64_value", attribute_storage_uint64_value}, + {"vec_double_value", attribute_storage_double_value}, + {"vec_string_value", attribute_storage_string_value}}, + trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + provider->ForceFlush(); + } }; // Create log records, let processor call Export() -TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest) +TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestSync) +{ + ExportJsonIntegrationTest(false); +} + +TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestAsync) { - auto mock_otlp_client = - OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); - auto mock_otlp_http_client = mock_otlp_client.first; - auto client = mock_otlp_client.second; - auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); - - bool attribute_storage_bool_value[] = {true, false, true}; - int32_t attribute_storage_int32_value[] = {1, 2}; - uint32_t attribute_storage_uint32_value[] = {3, 4}; - int64_t attribute_storage_int64_value[] = {5, 6}; - uint64_t attribute_storage_uint64_value[] = {7, 8}; - double attribute_storage_double_value[] = {3.2, 3.3}; - std::string attribute_storage_string_value[] = {"vector", "string"}; - - auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); - provider->AddProcessor(std::unique_ptr( - new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5))); - - std::string report_trace_id; - std::string report_span_id; - uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; - char trace_id_hex[2 * opentelemetry::trace::TraceId::kSize] = {0}; - opentelemetry::trace::TraceId trace_id{trace_id_bin}; - uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', - '3', '2', '1', '0'}; - char span_id_hex[2 * opentelemetry::trace::SpanId::kSize] = {0}; - opentelemetry::trace::SpanId span_id{span_id_bin}; - - const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; - auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); - logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", - {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}, - {"bool_value", true}, - {"int32_value", static_cast(1)}, - {"uint32_value", static_cast(2)}, - {"int64_value", static_cast(0x1100000000LL)}, - {"uint64_value", static_cast(0x1200000000ULL)}, - {"double_value", static_cast(3.1)}, - {"vec_bool_value", attribute_storage_bool_value}, - {"vec_int32_value", attribute_storage_int32_value}, - {"vec_uint32_value", attribute_storage_uint32_value}, - {"vec_int64_value", attribute_storage_int64_value}, - {"vec_uint64_value", attribute_storage_uint64_value}, - {"vec_double_value", attribute_storage_double_value}, - {"vec_string_value", attribute_storage_string_value}}, - trace_id, span_id, - opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, - std::chrono::system_clock::now()); - - trace_id.ToLowerBase16(MakeSpan(trace_id_hex)); - report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); - - span_id.ToLowerBase16(MakeSpan(span_id_hex)); - report_span_id.assign(span_id_hex, sizeof(span_id_hex)); - - auto no_send_client = std::static_pointer_cast(client); - auto mock_session = - std::static_pointer_cast(no_send_client->session_); - EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, - report_span_id](opentelemetry::ext::http::client::EventHandler &callback) { - auto check_json = nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); - auto resource_logs = *check_json["resource_logs"].begin(); - auto instrumentation_library_span = *resource_logs["instrumentation_library_logs"].begin(); - auto log = *instrumentation_library_span["logs"].begin(); - auto received_trace_id = log["trace_id"].get(); - auto received_span_id = log["span_id"].get(); - EXPECT_EQ(received_trace_id, report_trace_id); - EXPECT_EQ(received_span_id, report_span_id); - EXPECT_EQ("Log name", log["name"].get()); - EXPECT_EQ("Log message", log["body"]["string_value"].get()); - EXPECT_LE(15, log["attributes"].size()); - auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); - ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); - if (custom_header != mock_session->GetRequest()->headers_.end()) - { - EXPECT_EQ("Custom-Header-Value", custom_header->second); - } - // let the otlp_http_client to continue - http_client::nosend::Response response; - callback.OnResponse(response); - }); + ExportJsonIntegrationTest(true); } // Create log records, let processor call Export() -TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest) +TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestSync) { - auto mock_otlp_client = - OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); - auto mock_otlp_http_client = mock_otlp_client.first; - auto client = mock_otlp_client.second; - auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); - - bool attribute_storage_bool_value[] = {true, false, true}; - int32_t attribute_storage_int32_value[] = {1, 2}; - uint32_t attribute_storage_uint32_value[] = {3, 4}; - int64_t attribute_storage_int64_value[] = {5, 6}; - uint64_t attribute_storage_uint64_value[] = {7, 8}; - double attribute_storage_double_value[] = {3.2, 3.3}; - std::string attribute_storage_string_value[] = {"vector", "string"}; - - auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); - provider->AddProcessor(std::unique_ptr( - new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5))); - - std::string report_trace_id; - std::string report_span_id; - uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; - opentelemetry::trace::TraceId trace_id{trace_id_bin}; - uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', - '3', '2', '1', '0'}; - opentelemetry::trace::SpanId span_id{span_id_bin}; - - const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; - auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); - logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", - {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}, - {"bool_value", true}, - {"int32_value", static_cast(1)}, - {"uint32_value", static_cast(2)}, - {"int64_value", static_cast(0x1100000000LL)}, - {"uint64_value", static_cast(0x1200000000ULL)}, - {"double_value", static_cast(3.1)}, - {"vec_bool_value", attribute_storage_bool_value}, - {"vec_int32_value", attribute_storage_int32_value}, - {"vec_uint32_value", attribute_storage_uint32_value}, - {"vec_int64_value", attribute_storage_int64_value}, - {"vec_uint64_value", attribute_storage_uint64_value}, - {"vec_double_value", attribute_storage_double_value}, - {"vec_string_value", attribute_storage_string_value}}, - trace_id, span_id, - opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, - std::chrono::system_clock::now()); - - report_trace_id.assign(reinterpret_cast(trace_id_bin), sizeof(trace_id_bin)); - report_span_id.assign(reinterpret_cast(span_id_bin), sizeof(span_id_bin)); - - auto no_send_client = std::static_pointer_cast(client); - auto mock_session = - std::static_pointer_cast(no_send_client->session_); - EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, - report_span_id](opentelemetry::ext::http::client::EventHandler &callback) { - opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request_body; - request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], - static_cast(mock_session->GetRequest()->body_.size())); - auto received_log = request_body.resource_logs(0).instrumentation_library_logs(0).logs(0); - EXPECT_EQ(received_log.trace_id(), report_trace_id); - EXPECT_EQ(received_log.span_id(), report_span_id); - EXPECT_EQ("Log name", received_log.name()); - EXPECT_EQ("Log message", received_log.body().string_value()); - EXPECT_LE(15, received_log.attributes_size()); - bool check_service_name = false; - for (auto &attribute : received_log.attributes()) - { - if ("service.name" == attribute.key()) - { - check_service_name = true; - EXPECT_EQ("unit_test_service", attribute.value().string_value()); - } - } - ASSERT_TRUE(check_service_name); - http_client::nosend::Response response; - callback.OnResponse(response); - }); + ExportBinaryIntegrationTest(false); +} + +TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestAsync) +{ + ExportBinaryIntegrationTest(true); } // Test exporter configuration options diff --git a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h index ae0e8173f9..28f09deaba 100644 --- a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h +++ b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h @@ -78,6 +78,15 @@ class ZipkinExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Export asynchronosly a batch of span recordables in JSON format + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + void Export(const nostd::span> &spans, + std::function + &&result_callback) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, default to max. diff --git a/exporters/zipkin/src/zipkin_exporter.cc b/exporters/zipkin/src/zipkin_exporter.cc index 240144599f..202c048cca 100644 --- a/exporters/zipkin/src/zipkin_exporter.cc +++ b/exporters/zipkin/src/zipkin_exporter.cc @@ -93,6 +93,15 @@ sdk::common::ExportResult ZipkinExporter::Export( return sdk::common::ExportResult::kSuccess; } +void ZipkinExporter::Export( + const nostd::span> &spans, + std::function &&result_callback) noexcept +{ + OTEL_INTERNAL_LOG_WARN("[ZIPKIN EXPORTER] async not supported. Making sync interface call"); + auto status = Export(spans); + result_callback(status); +} + void ZipkinExporter::InitializeLocalEndpoint() { if (options_.service_name.length()) diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h index 9f2f05f3f0..ac263193bc 100644 --- a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h +++ b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h @@ -143,19 +143,19 @@ class Session : public opentelemetry::ext::http::client::Session } virtual void SendRequest( - opentelemetry::ext::http::client::EventHandler &callback) noexcept override + std::shared_ptr callback) noexcept override { is_session_active_ = true; std::string url = host_ + std::string(http_request_->uri_); - auto callback_ptr = &callback; + auto callback_ptr = callback.get(); curl_operation_.reset(new HttpOperation( http_request_->method_, url, callback_ptr, RequestMode::Async, http_request_->headers_, http_request_->body_, false, http_request_->timeout_ms_)); - curl_operation_->SendAsync([this, callback_ptr](HttpOperation &operation) { + curl_operation_->SendAsync([this, callback](HttpOperation &operation) { if (operation.WasAborted()) { // Manually cancelled - callback_ptr->OnEvent(opentelemetry::ext::http::client::SessionState::Cancelled, ""); + callback->OnEvent(opentelemetry::ext::http::client::SessionState::Cancelled, ""); } if (operation.GetResponseCode() >= CURL_LAST) @@ -165,7 +165,7 @@ class Session : public opentelemetry::ext::http::client::Session response->headers_ = operation.GetResponseHeaders(); response->body_ = operation.GetResponseBody(); response->status_code_ = operation.GetResponseCode(); - callback_ptr->OnResponse(*response); + callback->OnResponse(*response); } is_session_active_ = false; }); @@ -280,18 +280,30 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient bool CancelAllSessions() noexcept override { - for (auto &session : sessions_) + // CancelSession may change sessions_, we can not change a container while iterating it. + while (!sessions_.empty()) { - session.second->CancelSession(); + std::map> sessions; + sessions.swap(sessions_); + for (auto &session : sessions) + { + session.second->CancelSession(); + } } return true; } bool FinishAllSessions() noexcept override { - for (auto &session : sessions_) + // FinishSession may change sessions_, we can not change a container while iterating it. + while (!sessions_.empty()) { - session.second->FinishSession(); + std::map> sessions; + sessions.swap(sessions_); + for (auto &session : sessions) + { + session.second->FinishSession(); + } } return true; } diff --git a/ext/include/opentelemetry/ext/http/client/http_client.h b/ext/include/opentelemetry/ext/http/client/http_client.h index 308335e492..e939962653 100644 --- a/ext/include/opentelemetry/ext/http/client/http_client.h +++ b/ext/include/opentelemetry/ext/http/client/http_client.h @@ -212,7 +212,7 @@ class Session public: virtual std::shared_ptr CreateRequest() noexcept = 0; - virtual void SendRequest(EventHandler &) noexcept = 0; + virtual void SendRequest(std::shared_ptr) noexcept = 0; virtual bool IsSessionActive() noexcept = 0; diff --git a/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h b/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h index 02433d75ce..405381c285 100644 --- a/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h +++ b/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h @@ -93,6 +93,19 @@ class Response : public opentelemetry::ext::http::client::Response return status_code_; } + void Finish(opentelemetry::ext::http::client::EventHandler &callback) noexcept + { + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Created, ""); + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Connecting, ""); + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Connected, ""); + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Sending, ""); + + // let the otlp_http_client to continue + callback.OnResponse(*this); + + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Response, ""); + } + public: Headers headers_; opentelemetry::ext::http::client::Body body_; @@ -121,7 +134,7 @@ class Session : public opentelemetry::ext::http::client::Session MOCK_METHOD(void, SendRequest, - (opentelemetry::ext::http::client::EventHandler &), + (std::shared_ptr), (noexcept, override)); virtual bool CancelSession() noexcept override; diff --git a/ext/test/http/curl_http_test.cc b/ext/test/http/curl_http_test.cc index f8d248bae4..7085a1da33 100644 --- a/ext/test/http/curl_http_test.cc +++ b/ext/test/http/curl_http_test.cc @@ -196,12 +196,11 @@ TEST_F(BasicCurlHttpTests, SendGetRequest) auto session = session_manager->CreateSession("http://127.0.0.1:19000"); auto request = session->CreateRequest(); request->SetUri("get/"); - GetEventHandler *handler = new GetEventHandler(); - session->SendRequest(*handler); + auto handler = std::make_shared(); + session->SendRequest(handler); ASSERT_TRUE(waitForRequests(30, 1)); session->FinishSession(); ASSERT_TRUE(handler->is_called_); - delete handler; } TEST_F(BasicCurlHttpTests, SendPostRequest) @@ -219,16 +218,14 @@ TEST_F(BasicCurlHttpTests, SendPostRequest) http_client::Body body = {b, b + strlen(b)}; request->SetBody(body); request->AddHeader("Content-Type", "text/plain"); - PostEventHandler *handler = new PostEventHandler(); - session->SendRequest(*handler); + auto handler = std::make_shared(); + session->SendRequest(handler); ASSERT_TRUE(waitForRequests(30, 1)); session->FinishSession(); ASSERT_TRUE(handler->is_called_); session_manager->CancelAllSessions(); session_manager->FinishAllSessions(); - - delete handler; } TEST_F(BasicCurlHttpTests, RequestTimeout) @@ -240,11 +237,10 @@ TEST_F(BasicCurlHttpTests, RequestTimeout) auto session = session_manager->CreateSession("222.222.222.200:19000"); // Non Existing address auto request = session->CreateRequest(); request->SetUri("get/"); - GetEventHandler *handler = new GetEventHandler(); - session->SendRequest(*handler); + auto handler = std::make_shared(); + session->SendRequest(handler); session->FinishSession(); ASSERT_FALSE(handler->is_called_); - delete handler; } TEST_F(BasicCurlHttpTests, CurlHttpOperations) diff --git a/ext/test/w3c_tracecontext_test/main.cc b/ext/test/w3c_tracecontext_test/main.cc index 79aa4c9169..ca54475540 100644 --- a/ext/test/w3c_tracecontext_test/main.cc +++ b/ext/test/w3c_tracecontext_test/main.cc @@ -100,7 +100,7 @@ class NoopEventHandler : public http_client::EventHandler // Sends an HTTP POST request to the given url, with the given body. void send_request(curl::HttpClient &client, const std::string &url, const std::string &body) { - static std::unique_ptr handler(new NoopEventHandler()); + static std::shared_ptr handler(new NoopEventHandler()); auto request_span = get_tracer()->StartSpan(__func__); trace_api::Scope scope(request_span); @@ -126,7 +126,7 @@ void send_request(curl::HttpClient &client, const std::string &url, const std::s request->AddHeader(hdr.first, hdr.second); } - session->SendRequest(*handler); + session->SendRequest(handler); session->FinishSession(); } diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 1b6d443c8a..ab0f717a5b 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -10,6 +10,7 @@ # include # include +# include # include OPENTELEMETRY_BEGIN_NAMESPACE @@ -19,6 +20,33 @@ namespace sdk namespace logs { +/** + * Struct to hold batch SpanProcessor options. + */ +struct BatchLogProcessorOptions +{ + /** + * The maximum buffer/queue size. After the size is reached, spans are + * dropped. + */ + size_t max_queue_size = 2048; + + /* The time interval between two consecutive exports. */ + std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000); + + /** + * The maximum batch size of every export. It must be smaller or + * equal to max_queue_size. + */ + size_t max_export_batch_size = 512; + + /** + * Determines whether the export happens asynchronously. + * Default implementation is synchronous. + */ + bool is_export_async = false; +}; + /** * This is an implementation of the LogProcessor which creates batches of finished logs and passes * the export-friendly log data representations to the configured LogExporter. @@ -41,7 +69,18 @@ class BatchLogProcessor : public LogProcessor std::unique_ptr &&exporter, const size_t max_queue_size = 2048, const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), - const size_t max_export_batch_size = 512); + const size_t max_export_batch_size = 512, + const bool is_export_async = false); + + /** + * Creates a batch log processor by configuring the specified exporter and other parameters + * as per the official, language-agnostic opentelemetry specs. + * + * @param exporter - The backend exporter to pass the logs to + * @param options - The batch SpanProcessor options. + */ + explicit BatchLogProcessor(std::unique_ptr &&exporter, + const BatchLogProcessorOptions &options); /** Makes a new recordable **/ std::unique_ptr MakeRecordable() noexcept override; @@ -98,6 +137,32 @@ class BatchLogProcessor : public LogProcessor */ void DrainQueue(); + /* In case of async export, wait and notify for shutdown to be completed.*/ + void WaitForShutdownCompletion(); + + struct SynchronizationData + { + /* Synchronization primitives */ + std::condition_variable cv_, force_flush_cv_, async_shutdown_cv_; + std::mutex cv_m_, force_flush_cv_m_, shutdown_m_, async_shutdown_m_; + + /* Important boolean flags to handle the workflow of the processor */ + std::atomic is_shutdown_; + std::atomic is_force_flush_; + std::atomic is_force_flush_notified_; + std::atomic is_async_shutdown_notified_; + }; + + /** + * @brief Notify completion of shutdown and force flush. This may be called from the any thread at + * any time + * + * @param notify_force_flush Flag to indicate whether to notify force flush completion. + * @param synchronization_data Synchronization data to be notified. + */ + static void NotifyCompletion(bool notify_force_flush, + const std::shared_ptr &synchronization_data); + /* The configured backend log exporter */ std::unique_ptr exporter_; @@ -105,18 +170,12 @@ class BatchLogProcessor : public LogProcessor const size_t max_queue_size_; const std::chrono::milliseconds scheduled_delay_millis_; const size_t max_export_batch_size_; - - /* Synchronization primitives */ - std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_, shutdown_m_; + const bool is_export_async_; /* The buffer/queue to which the ended logs are added */ common::CircularBuffer buffer_; - /* Important boolean flags to handle the workflow of the processor */ - std::atomic is_shutdown_{false}; - std::atomic is_force_flush_{false}; - std::atomic is_force_flush_notified_{false}; + std::shared_ptr synchronization_data_; /* The background worker thread */ std::thread worker_thread_; @@ -125,4 +184,4 @@ class BatchLogProcessor : public LogProcessor } // namespace logs } // namespace sdk OPENTELEMETRY_END_NAMESPACE -#endif +#endif \ No newline at end of file diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h index 85c58e9f12..ee3bac92b4 100644 --- a/sdk/include/opentelemetry/sdk/logs/exporter.h +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -46,6 +46,15 @@ class LogExporter virtual sdk::common::ExportResult Export( const nostd::span> &records) noexcept = 0; + /** + * Exports asynchronously the batch of log records to their export destination + * @param records a span of unique pointers to log records + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &records, + std::function &&result_callback) noexcept = 0; + /** * Marks the exporter as ShutDown and cleans up any resources as required. * Shutdown should be called only once for each Exporter instance. diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h index cc3aec47b2..28fcca78a6 100644 --- a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -28,7 +28,8 @@ class SimpleLogProcessor : public LogProcessor { public: - explicit SimpleLogProcessor(std::unique_ptr &&exporter); + explicit SimpleLogProcessor(std::unique_ptr &&exporter, + bool is_export_async = false); virtual ~SimpleLogProcessor() = default; std::unique_ptr MakeRecordable() noexcept override; @@ -48,6 +49,7 @@ class SimpleLogProcessor : public LogProcessor opentelemetry::common::SpinLockMutex lock_; // The atomic boolean flag to ensure the ShutDown() function is only called once std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; + bool is_export_async_ = false; }; } // namespace logs } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index d25ff2d950..53b6a61ab5 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -37,6 +37,12 @@ struct BatchSpanProcessorOptions * equal to max_queue_size. */ size_t max_export_batch_size = 512; + + /** + * Determines whether the export happens asynchronously. + * Default implementation is synchronous. + */ + bool is_export_async = false; }; /** @@ -129,6 +135,32 @@ class BatchSpanProcessor : public SpanProcessor */ void DrainQueue(); + /* In case of async export, wait and notify for shutdown to be completed.*/ + void WaitForShutdownCompletion(); + + struct SynchronizationData + { + /* Synchronization primitives */ + std::condition_variable cv_, force_flush_cv_, async_shutdown_cv_; + std::mutex cv_m_, force_flush_cv_m_, shutdown_m_, async_shutdown_m_; + + /* Important boolean flags to handle the workflow of the processor */ + std::atomic is_shutdown_; + std::atomic is_force_flush_; + std::atomic is_force_flush_notified_; + std::atomic is_async_shutdown_notified_; + }; + + /** + * @brief Notify completion of shutdown and force flush. This may be called from the any thread at + * any time + * + * @param notify_force_flush Flag to indicate whether to notify force flush completion. + * @param synchronization_data Synchronization data to be notified. + */ + static void NotifyCompletion(bool notify_force_flush, + const std::shared_ptr &synchronization_data); + /* The configured backend exporter */ std::unique_ptr exporter_; @@ -136,18 +168,12 @@ class BatchSpanProcessor : public SpanProcessor const size_t max_queue_size_; const std::chrono::milliseconds schedule_delay_millis_; const size_t max_export_batch_size_; - - /* Synchronization primitives */ - std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_, shutdown_m_; + const bool is_export_async_; /* The buffer/queue to which the ended spans are added */ common::CircularBuffer buffer_; - /* Important boolean flags to handle the workflow of the processor */ - std::atomic is_shutdown_{false}; - std::atomic is_force_flush_{false}; - std::atomic is_force_flush_notified_{false}; + std::shared_ptr synchronization_data_; /* The background worker thread */ std::thread worker_thread_; diff --git a/sdk/include/opentelemetry/sdk/trace/exporter.h b/sdk/include/opentelemetry/sdk/trace/exporter.h index 5826b5f454..749fd897fb 100644 --- a/sdk/include/opentelemetry/sdk/trace/exporter.h +++ b/sdk/include/opentelemetry/sdk/trace/exporter.h @@ -42,6 +42,15 @@ class SpanExporter const nostd::span> &spans) noexcept = 0; + /** + * Exports a batch of span recordables asynchronously. + * @param spans a span of unique pointers to span recordables + * @param result_callback callback function accepting ExportResult as argument + */ + virtual void Export( + const nostd::span> &spans, + std::function &&result_callback) noexcept = 0; + /** * Shut down the exporter. * @param timeout an optional timeout. diff --git a/sdk/include/opentelemetry/sdk/trace/simple_processor.h b/sdk/include/opentelemetry/sdk/trace/simple_processor.h index accc685965..982a432e0c 100644 --- a/sdk/include/opentelemetry/sdk/trace/simple_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/simple_processor.h @@ -31,8 +31,9 @@ class SimpleSpanProcessor : public SpanProcessor * Initialize a simple span processor. * @param exporter the exporter used by the span processor */ - explicit SimpleSpanProcessor(std::unique_ptr &&exporter) noexcept - : exporter_(std::move(exporter)) + explicit SimpleSpanProcessor(std::unique_ptr &&exporter, + bool is_export_async = false) noexcept + : exporter_(std::move(exporter)), is_export_async_(is_export_async) {} std::unique_ptr MakeRecordable() noexcept override @@ -48,10 +49,21 @@ class SimpleSpanProcessor : public SpanProcessor { nostd::span> batch(&span, 1); const std::lock_guard locked(lock_); - if (exporter_->Export(batch) == sdk::common::ExportResult::kFailure) + if (is_export_async_ == false) { - /* Once it is defined how the SDK does logging, an error should be - * logged in this case. */ + if (exporter_->Export(batch) == sdk::common::ExportResult::kFailure) + { + /* Once it is defined how the SDK does logging, an error should be + * logged in this case. */ + } + } + else + { + exporter_->Export(batch, [](sdk::common::ExportResult result) { + /* Log the result + */ + return true; + }); } } @@ -78,6 +90,7 @@ class SimpleSpanProcessor : public SpanProcessor std::unique_ptr exporter_; opentelemetry::common::SpinLockMutex lock_; std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; + bool is_export_async_ = false; }; } // namespace trace } // namespace sdk diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 9b20705b0a..2606911aba 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -16,14 +16,39 @@ namespace logs BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, const size_t max_queue_size, const std::chrono::milliseconds scheduled_delay_millis, - const size_t max_export_batch_size) + const size_t max_export_batch_size, + const bool is_export_async) : exporter_(std::move(exporter)), max_queue_size_(max_queue_size), scheduled_delay_millis_(scheduled_delay_millis), max_export_batch_size_(max_export_batch_size), + is_export_async_(is_export_async), buffer_(max_queue_size_), + synchronization_data_(std::make_shared()), worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) -{} +{ + synchronization_data_->is_shutdown_.store(false); + synchronization_data_->is_force_flush_.store(false); + synchronization_data_->is_force_flush_notified_.store(false); + synchronization_data_->is_async_shutdown_notified_.store(false); +} + +BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, + const BatchLogProcessorOptions &options) + : exporter_(std::move(exporter)), + max_queue_size_(options.max_queue_size), + scheduled_delay_millis_(options.schedule_delay_millis), + max_export_batch_size_(options.max_export_batch_size), + is_export_async_(options.is_export_async), + buffer_(options.max_queue_size), + synchronization_data_(std::make_shared()), + worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) +{ + synchronization_data_->is_shutdown_.store(false); + synchronization_data_->is_force_flush_.store(false); + synchronization_data_->is_force_flush_notified_.store(false); + synchronization_data_->is_async_shutdown_notified_.store(false); +} std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept { @@ -32,7 +57,7 @@ std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept { - if (is_shutdown_.load() == true) + if (synchronization_data_->is_shutdown_.load() == true) { return; } @@ -44,39 +69,52 @@ void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept // If the queue gets at least half full a preemptive notification is // sent to the worker thread to start a new export cycle. - if (buffer_.size() >= max_queue_size_ / 2) + size_t buffer_size = buffer_.size(); + if (buffer_size >= max_queue_size_ / 2 || buffer_size >= max_export_batch_size_) { // signal the worker thread - cv_.notify_one(); + synchronization_data_->cv_.notify_one(); } } bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept { - if (is_shutdown_.load() == true) + if (synchronization_data_->is_shutdown_.load() == true) { return false; } - is_force_flush_ = true; + // Now wait for the worker thread to signal back from the Export method + std::unique_lock lk(synchronization_data_->force_flush_cv_m_); - // Keep attempting to wake up the worker thread - while (is_force_flush_.load() == true) + synchronization_data_->is_force_flush_notified_.store(false, std::memory_order_release); + auto break_condition = [this]() { + if (synchronization_data_->is_shutdown_.load() == true) + { + return true; + } + + // Wake up the worker thread once. + if (synchronization_data_->is_force_flush_.exchange(true) == false) + { + synchronization_data_->cv_.notify_one(); + } + + return synchronization_data_->is_force_flush_notified_.load(std::memory_order_acquire); + }; + + // Fix timeout to meet requirement of wait_for + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout <= std::chrono::microseconds::zero()) { - cv_.notify_one(); + synchronization_data_->force_flush_cv_.wait(lk, break_condition); + return true; } - - // Now wait for the worker thread to signal back from the Export method - std::unique_lock lk(force_flush_cv_m_); - while (is_force_flush_notified_.load() == false) + else { - force_flush_cv_.wait(lk); + return synchronization_data_->force_flush_cv_.wait_for(lk, timeout, break_condition); } - - // Notify the worker thread - is_force_flush_notified_ = false; - - return true; } void BatchLogProcessor::DoBackgroundWork() @@ -86,25 +124,20 @@ void BatchLogProcessor::DoBackgroundWork() while (true) { // Wait for `timeout` milliseconds - std::unique_lock lk(cv_m_); - cv_.wait_for(lk, timeout); + std::unique_lock lk(synchronization_data_->cv_m_); + synchronization_data_->cv_.wait_for(lk, timeout); - if (is_shutdown_.load() == true) + if (synchronization_data_->is_shutdown_.load() == true) { + // Break loop if another thread call ForceFlush DrainQueue(); return; } - bool was_force_flush_called = is_force_flush_.load(); + bool was_force_flush_called = synchronization_data_->is_force_flush_.exchange(false); // Check if this export was the result of a force flush. - if (was_force_flush_called == true) - { - // Since this export was the result of a force flush, signal the - // main thread that the worker thread has been notified - is_force_flush_ = false; - } - else + if (!was_force_flush_called) { // If the buffer was empty during the entire `timeout` time interval, // go back to waiting. If this was a spurious wake-up, we export only if @@ -128,67 +161,154 @@ void BatchLogProcessor::DoBackgroundWork() void BatchLogProcessor::Export(const bool was_force_flush_called) { - std::vector> records_arr; + bool notify_force_completion = true; + do + { + std::vector> records_arr; + size_t num_records_to_export; + + if (was_force_flush_called == true) + { + num_records_to_export = buffer_.size(); + } + else + { + num_records_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + } + + if (num_records_to_export == 0) + { + break; + } - size_t num_records_to_export; + buffer_.Consume(num_records_to_export, + [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + records_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); + }); + + if (is_export_async_ == false) + { + exporter_->Export( + nostd::span>(records_arr.data(), records_arr.size())); + } + else + { + notify_force_completion = false; + std::weak_ptr synchronization_data_watcher = synchronization_data_; + exporter_->Export( + nostd::span>(records_arr.data(), records_arr.size()), + [was_force_flush_called, synchronization_data_watcher](sdk::common::ExportResult result) { + // TODO: Print result + if (synchronization_data_watcher.expired()) + { + return true; + } + + NotifyCompletion(was_force_flush_called, synchronization_data_watcher.lock()); + return true; + }); + } + } while (was_force_flush_called); - if (was_force_flush_called == true) + if (notify_force_completion) { - num_records_to_export = buffer_.size(); + NotifyCompletion(was_force_flush_called, synchronization_data_); } - else +} + +void BatchLogProcessor::WaitForShutdownCompletion() +{ + // Since async export is invoked due to shutdown, need to wait + // for async thread to complete. + if (is_export_async_) { - num_records_to_export = - buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + std::unique_lock lk(synchronization_data_->async_shutdown_m_); + while (synchronization_data_->is_async_shutdown_notified_.load() == false) + { + synchronization_data_->async_shutdown_cv_.wait(lk); + } } +} - buffer_.Consume(num_records_to_export, - [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - std::unique_ptr swap_ptr = std::unique_ptr(nullptr); - ptr.Swap(swap_ptr); - records_arr.push_back(std::unique_ptr(swap_ptr.release())); - return true; - }); - }); +void BatchLogProcessor::NotifyCompletion( + bool notify_force_flush, + const std::shared_ptr &synchronization_data) +{ + if (!synchronization_data) + { + return; + } - exporter_->Export( - nostd::span>(records_arr.data(), records_arr.size())); + if (notify_force_flush && synchronization_data->is_force_flush_notified_.exchange( + true, std::memory_order_acq_rel) == false) + { + synchronization_data->force_flush_cv_.notify_all(); + } - // Notify the main thread in case this export was the result of a force flush. - if (was_force_flush_called == true) + // Notify the thread which is waiting on shutdown to complete. + if (synchronization_data->is_shutdown_.load() == true) { - is_force_flush_notified_ = true; - while (is_force_flush_notified_.load() == true) - { - force_flush_cv_.notify_one(); - } + synchronization_data->is_async_shutdown_notified_.store(true); + synchronization_data->async_shutdown_cv_.notify_all(); } } void BatchLogProcessor::DrainQueue() { - while (buffer_.empty() == false) + while (true) { - Export(false); + bool was_force_flush_called = synchronization_data_->is_force_flush_.exchange(false); + if (buffer_.empty() && !was_force_flush_called) + { + break; + } + + Export(was_force_flush_called); + + // Since async export is invoked due to shutdown, need to wait + // for async thread to complete. + WaitForShutdownCompletion(); } } bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { - std::lock_guard shutdown_guard{shutdown_m_}; - bool already_shutdown = is_shutdown_.exchange(true); + auto start_time = std::chrono::system_clock::now(); + + std::lock_guard shutdown_guard{synchronization_data_->shutdown_m_}; + bool already_shutdown = synchronization_data_->is_shutdown_.exchange(true); if (worker_thread_.joinable()) { - cv_.notify_one(); + synchronization_data_->cv_.notify_one(); worker_thread_.join(); } + auto worker_end_time = std::chrono::system_clock::now(); + auto offset = std::chrono::duration_cast(worker_end_time - start_time); + + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout > offset && timeout > std::chrono::microseconds::zero()) + { + timeout -= offset; + } + else + { + // Some module use zero as indefinite timeout.So we can not reset timeout to zero here + timeout = std::chrono::microseconds(1); + } + // Should only shutdown exporter ONCE. if (!already_shutdown && exporter_ != nullptr) { - return exporter_->Shutdown(); + return exporter_->Shutdown(timeout); } return true; @@ -196,7 +316,7 @@ bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept BatchLogProcessor::~BatchLogProcessor() { - if (is_shutdown_.load() == false) + if (synchronization_data_->is_shutdown_.load() == false) { Shutdown(); } diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index 6e2fde9f14..e16a5e631e 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -16,8 +16,9 @@ namespace logs * Initialize a simple log processor. * @param exporter the configured exporter where log records are sent */ -SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter) - : exporter_(std::move(exporter)) +SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter, + bool is_export_async) + : exporter_(std::move(exporter)), is_export_async_(is_export_async) {} std::unique_ptr SimpleLogProcessor::MakeRecordable() noexcept @@ -35,9 +36,20 @@ void SimpleLogProcessor::OnReceive(std::unique_ptr &&record) noexcep // Get lock to ensure Export() is never called concurrently const std::lock_guard locked(lock_); - if (exporter_->Export(batch) != sdk::common::ExportResult::kSuccess) + if (is_export_async_ == false) { - /* Alert user of the failed export */ + if (exporter_->Export(batch) != sdk::common::ExportResult::kSuccess) + { + /* Alert user of the failed export */ + } + } + else + { + exporter_->Export(batch, [](sdk::common::ExportResult result) { + /* Log the result + */ + return true; + }); } } /** diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index 0ab042b9ab..87bb503891 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -14,15 +14,23 @@ namespace sdk { namespace trace { + BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter, const BatchSpanProcessorOptions &options) : exporter_(std::move(exporter)), max_queue_size_(options.max_queue_size), schedule_delay_millis_(options.schedule_delay_millis), max_export_batch_size_(options.max_export_batch_size), + is_export_async_(options.is_export_async), buffer_(max_queue_size_), + synchronization_data_(std::make_shared()), worker_thread_(&BatchSpanProcessor::DoBackgroundWork, this) -{} +{ + synchronization_data_->is_shutdown_.store(false); + synchronization_data_->is_force_flush_.store(false); + synchronization_data_->is_force_flush_notified_.store(false); + synchronization_data_->is_async_shutdown_notified_.store(false); +} std::unique_ptr BatchSpanProcessor::MakeRecordable() noexcept { @@ -36,7 +44,7 @@ void BatchSpanProcessor::OnStart(Recordable &, const SpanContext &) noexcept void BatchSpanProcessor::OnEnd(std::unique_ptr &&span) noexcept { - if (is_shutdown_.load() == true) + if (synchronization_data_->is_shutdown_.load() == true) { return; } @@ -48,39 +56,52 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr &&span) noexcept // If the queue gets at least half full a preemptive notification is // sent to the worker thread to start a new export cycle. - if (buffer_.size() >= max_queue_size_ / 2) + size_t buffer_size = buffer_.size(); + if (buffer_size >= max_queue_size_ / 2 || buffer_size >= max_export_batch_size_) { // signal the worker thread - cv_.notify_one(); + synchronization_data_->cv_.notify_one(); } } bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept { - if (is_shutdown_.load() == true) + if (synchronization_data_->is_shutdown_.load() == true) { return false; } - is_force_flush_ = true; + // Now wait for the worker thread to signal back from the Export method + std::unique_lock lk(synchronization_data_->force_flush_cv_m_); - // Keep attempting to wake up the worker thread - while (is_force_flush_.load() == true) + synchronization_data_->is_force_flush_notified_.store(false, std::memory_order_release); + auto break_condition = [this]() { + if (synchronization_data_->is_shutdown_.load() == true) + { + return true; + } + + // Wake up the worker thread once. + if (synchronization_data_->is_force_flush_.exchange(true) == false) + { + synchronization_data_->cv_.notify_one(); + } + + return synchronization_data_->is_force_flush_notified_.load(std::memory_order_acquire); + }; + + // Fix timeout to meet requirement of wait_for + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout <= std::chrono::microseconds::zero()) { - cv_.notify_one(); + synchronization_data_->force_flush_cv_.wait(lk, break_condition); + return true; } - - // Now wait for the worker thread to signal back from the Export method - std::unique_lock lk(force_flush_cv_m_); - while (is_force_flush_notified_.load() == false) + else { - force_flush_cv_.wait(lk); + return synchronization_data_->force_flush_cv_.wait_for(lk, timeout, break_condition); } - - // Notify the worker thread - is_force_flush_notified_ = false; - - return true; } void BatchSpanProcessor::DoBackgroundWork() @@ -90,25 +111,20 @@ void BatchSpanProcessor::DoBackgroundWork() while (true) { // Wait for `timeout` milliseconds - std::unique_lock lk(cv_m_); - cv_.wait_for(lk, timeout); + std::unique_lock lk(synchronization_data_->cv_m_); + synchronization_data_->cv_.wait_for(lk, timeout); - if (is_shutdown_.load() == true) + if (synchronization_data_->is_shutdown_.load() == true) { + // Break loop if another thread call ForceFlush DrainQueue(); return; } - bool was_force_flush_called = is_force_flush_.load(); + bool was_force_flush_called = synchronization_data_->is_force_flush_.exchange(false); // Check if this export was the result of a force flush. - if (was_force_flush_called == true) - { - // Since this export was the result of a force flush, signal the - // main thread that the worker thread has been notified - is_force_flush_ = false; - } - else + if (!was_force_flush_called) { // If the buffer was empty during the entire `timeout` time interval, // go back to waiting. If this was a spurious wake-up, we export only if @@ -133,66 +149,153 @@ void BatchSpanProcessor::DoBackgroundWork() void BatchSpanProcessor::Export(const bool was_force_flush_called) { - std::vector> spans_arr; + bool notify_force_completion = true; + do + { + std::vector> spans_arr; + size_t num_spans_to_export; + + if (was_force_flush_called == true) + { + num_spans_to_export = buffer_.size(); + } + else + { + num_spans_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + } + + if (num_spans_to_export == 0) + { + break; + } + buffer_.Consume(num_spans_to_export, + [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + spans_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); + }); - size_t num_spans_to_export; + /* Call the sync Export when force flush was called, even if + is_export_async_ is true. + */ + if (is_export_async_ == false) + { + exporter_->Export( + nostd::span>(spans_arr.data(), spans_arr.size())); + } + else + { + notify_force_completion = false; + std::weak_ptr synchronization_data_watcher = synchronization_data_; + exporter_->Export( + nostd::span>(spans_arr.data(), spans_arr.size()), + [was_force_flush_called, synchronization_data_watcher](sdk::common::ExportResult result) { + // TODO: Print result + if (synchronization_data_watcher.expired()) + { + return true; + } + + NotifyCompletion(was_force_flush_called, synchronization_data_watcher.lock()); + return true; + }); + } + } while (was_force_flush_called); - if (was_force_flush_called == true) + if (notify_force_completion) { - num_spans_to_export = buffer_.size(); + NotifyCompletion(was_force_flush_called, synchronization_data_); } - else +} + +void BatchSpanProcessor::WaitForShutdownCompletion() +{ + // Since async export is invoked due to shutdown, need to wait + // for async thread to complete. + if (is_export_async_) { - num_spans_to_export = - buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + std::unique_lock lk(synchronization_data_->async_shutdown_m_); + while (synchronization_data_->is_async_shutdown_notified_.load() == false) + { + synchronization_data_->async_shutdown_cv_.wait(lk); + } } +} - buffer_.Consume(num_spans_to_export, - [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - std::unique_ptr swap_ptr = std::unique_ptr(nullptr); - ptr.Swap(swap_ptr); - spans_arr.push_back(std::unique_ptr(swap_ptr.release())); - return true; - }); - }); +void BatchSpanProcessor::NotifyCompletion( + bool notify_force_flush, + const std::shared_ptr &synchronization_data) +{ + if (!synchronization_data) + { + return; + } - exporter_->Export(nostd::span>(spans_arr.data(), spans_arr.size())); + if (notify_force_flush && synchronization_data->is_force_flush_notified_.exchange( + true, std::memory_order_acq_rel) == false) + { + synchronization_data->force_flush_cv_.notify_all(); + } - // Notify the main thread in case this export was the result of a force flush. - if (was_force_flush_called == true) + // Notify the thread which is waiting on shutdown to complete. + if (synchronization_data->is_shutdown_.load() == true) { - is_force_flush_notified_ = true; - while (is_force_flush_notified_.load() == true) - { - force_flush_cv_.notify_one(); - } + synchronization_data->is_async_shutdown_notified_.store(true); + synchronization_data->async_shutdown_cv_.notify_all(); } } void BatchSpanProcessor::DrainQueue() { - while (buffer_.empty() == false) + while (true) { - Export(false); + bool was_force_flush_called = synchronization_data_->is_force_flush_.exchange(false); + if (buffer_.empty() && !was_force_flush_called) + { + break; + } + + Export(was_force_flush_called); + WaitForShutdownCompletion(); } } bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { - std::lock_guard shutdown_guard{shutdown_m_}; - bool already_shutdown = is_shutdown_.exchange(true); + auto start_time = std::chrono::system_clock::now(); + std::lock_guard shutdown_guard{synchronization_data_->shutdown_m_}; + bool already_shutdown = synchronization_data_->is_shutdown_.exchange(true); if (worker_thread_.joinable()) { - cv_.notify_one(); + synchronization_data_->cv_.notify_one(); worker_thread_.join(); } + auto worker_end_time = std::chrono::system_clock::now(); + auto offset = std::chrono::duration_cast(worker_end_time - start_time); + + // Fix timeout to meet requirement of wait_for + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout > offset && timeout > std::chrono::microseconds::zero()) + { + timeout -= offset; + } + else + { + // Some module use zero as indefinite timeout.So we can not reset timeout to zero here + timeout = std::chrono::microseconds(1); + } + // Should only shutdown exporter ONCE. if (!already_shutdown && exporter_ != nullptr) { - return exporter_->Shutdown(); + return exporter_->Shutdown(timeout); } return true; @@ -200,7 +303,7 @@ bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept BatchSpanProcessor::~BatchSpanProcessor() { - if (is_shutdown_.load() == false) + if (synchronization_data_->is_shutdown_.load() == false) { Shutdown(); } diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index df503cb2aa..cfed951bcb 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -55,6 +55,20 @@ class MockLogExporter final : public LogExporter return ExportResult::kSuccess; } + void Export(const opentelemetry::nostd::span> &records, + std::function + &&result_callback) noexcept override + { + // We should keep the order of test records + auto result = Export(records); + auto th = std::thread( + [result](std::function &&result_callback) { + result_callback(result); + }, + std::move(result_callback)); + th.detach(); + } + // toggles the boolean flag marking this exporter as shut down bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override @@ -86,12 +100,13 @@ class BatchLogProcessorTest : public testing::Test // ::testing::Test const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), const size_t max_queue_size = 2048, - const size_t max_export_batch_size = 512) + const size_t max_export_batch_size = 512, + const bool is_export_async = false) { - return std::shared_ptr( - new BatchLogProcessor(std::unique_ptr(new MockLogExporter( - logs_received, is_shutdown, is_export_completed, export_delay)), - max_queue_size, scheduled_delay_millis, max_export_batch_size)); + return std::shared_ptr(new BatchLogProcessor( + std::unique_ptr( + new MockLogExporter(logs_received, is_shutdown, is_export_completed, export_delay)), + max_queue_size, scheduled_delay_millis, max_export_batch_size, is_export_async)); } }; @@ -133,6 +148,53 @@ TEST_F(BatchLogProcessorTest, TestShutdown) EXPECT_TRUE(is_shutdown->load()); } +TEST_F(BatchLogProcessorTest, TestAsyncShutdown) +{ + // initialize a batch log processor with the test exporter + std::shared_ptr>> logs_received( + new std::vector>); + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> is_export_completed(new std::atomic(false)); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds scheduled_delay_millis(5000); + const size_t max_export_batch_size = 512; + const size_t max_queue_size = 2048; + const bool is_export_async = true; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis, max_queue_size, + max_export_batch_size, is_export_async); + + // Create a few test log records and send them to the processor + const int num_logs = 3; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + // Test that shutting down the processor will first wait for the + // current batch of logs to be sent to the log exporter + // by checking the number of logs sent and the names of the logs sent + EXPECT_EQ(true, batch_processor->Shutdown()); + // It's safe to shutdown again + EXPECT_TRUE(batch_processor->Shutdown()); + + EXPECT_EQ(num_logs, logs_received->size()); + + // Assume logs are received by exporter in same order as sent by processor + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } + + // Also check that the processor is shut down at the end + EXPECT_TRUE(is_shutdown->load()); +} + TEST_F(BatchLogProcessorTest, TestForceFlush) { std::shared_ptr> is_shutdown(new std::atomic(false)); @@ -174,6 +236,57 @@ TEST_F(BatchLogProcessorTest, TestForceFlush) } } +TEST_F(BatchLogProcessorTest, TestAsyncForceFlush) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + std::shared_ptr> is_export_completed(new std::atomic(false)); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds scheduled_delay_millis(5000); + const size_t max_export_batch_size = 512; + const size_t max_queue_size = 2048; + const bool is_export_async = true; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis, max_queue_size, + max_export_batch_size, is_export_async); + + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs, logs_received->size()); + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } + + // Create some more logs to make sure that the processor still works + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs * 2, logs_received->size()); + for (int i = 0; i < num_logs * 2; ++i) + { + EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetName()); + } +} + TEST_F(BatchLogProcessorTest, TestManyLogsLoss) { /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc index 0bb6ba2667..e0c02203fa 100644 --- a/sdk/test/logs/simple_log_processor_test.cc +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -53,6 +53,15 @@ class TestExporter final : public LogExporter return ExportResult::kSuccess; } + // Dummy Async Export implementation + void Export(const nostd::span> &records, + std::function + &&result_callback) noexcept override + { + auto result = Export(records); + result_callback(result); + } + // Increment the shutdown counter everytime this method is called bool Shutdown(std::chrono::microseconds timeout) noexcept override { @@ -137,6 +146,13 @@ class FailShutDownExporter final : public LogExporter return ExportResult::kSuccess; } + void Export(const nostd::span> &records, + std::function + &&result_callback) noexcept override + { + result_callback(ExportResult::kSuccess); + } + bool Shutdown(std::chrono::microseconds timeout) noexcept override { return false; } }; diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc index 0e6f9c35aa..33d86a865f 100644 --- a/sdk/test/trace/batch_span_processor_test.cc +++ b/sdk/test/trace/batch_span_processor_test.cc @@ -56,6 +56,20 @@ class MockSpanExporter final : public sdk::trace::SpanExporter return sdk::common::ExportResult::kSuccess; } + void Export(const nostd::span> &records, + std::function + &&result_callback) noexcept override + { + // We should keep the order of test records + auto result = Export(records); + auto th = std::thread( + [result](std::function &&result_callback) { + result_callback(result); + }, + std::move(result_callback)); + th.detach(); + } + bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override { @@ -131,6 +145,94 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown) EXPECT_TRUE(is_shutdown->load()); } +TEST_F(BatchSpanProcessorTestPeer, TestAsyncShutdown) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> spans_received( + new std::vector>); + + sdk::trace::BatchSpanProcessorOptions options{}; + options.is_export_async = true; + + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), + options)); + const int num_spans = 3; + + auto test_spans = GetTestSpans(batch_processor, num_spans); + + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + EXPECT_TRUE(batch_processor->Shutdown()); + // It's safe to shutdown again + EXPECT_TRUE(batch_processor->Shutdown()); + + EXPECT_EQ(num_spans, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); + } + + EXPECT_TRUE(is_shutdown->load()); +} + +TEST_F(BatchSpanProcessorTestPeer, TestAsyncForceFlush) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> spans_received( + new std::vector>); + + sdk::trace::BatchSpanProcessorOptions options{}; + options.is_export_async = true; + + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown)), + options)); + const int num_spans = 2048; + + auto test_spans = GetTestSpans(batch_processor, num_spans); + + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(test_spans->at(i))); + } + + // Give some time to export + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_spans, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); + } + + // Create some more spans to make sure that the processor still works + auto more_test_spans = GetTestSpans(batch_processor, num_spans); + for (int i = 0; i < num_spans; ++i) + { + batch_processor->OnEnd(std::move(more_test_spans->at(i))); + } + + // Give some time to export the spans + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_spans * 2, spans_received->size()); + for (int i = 0; i < num_spans; ++i) + { + EXPECT_EQ("Span " + std::to_string(i % num_spans), + spans_received->at(num_spans + i)->GetName()); + } +} + TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) { std::shared_ptr> is_shutdown(new std::atomic(false)); diff --git a/sdk/test/trace/simple_processor_test.cc b/sdk/test/trace/simple_processor_test.cc index 9398b922a5..6c8a00ead2 100644 --- a/sdk/test/trace/simple_processor_test.cc +++ b/sdk/test/trace/simple_processor_test.cc @@ -51,6 +51,13 @@ class RecordShutdownExporter final : public SpanExporter return ExportResult::kSuccess; } + void Export(const opentelemetry::nostd::span> &spans, + std::function + &&result_callback) noexcept override + { + result_callback(ExportResult::kSuccess); + } + bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override {