Skip to content
Closed
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Increment the:
## [Unreleased]

* [EXPORTER] Jaeger Exporter - Populate Span Links ([#1251](https://github.com/open-telemetry/opentelemetry-cpp/pull/1251))
* [EXPORTER] OTLP http exporter allow concurrency session ([#1209](https://github.com/open-telemetry/opentelemetry-cpp/pull/1209))

## [1.2.0] 2022-01-31

Expand Down
34 changes: 34 additions & 0 deletions api/include/opentelemetry/common/timestamp.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,39 @@ class SteadyTimestamp
private:
int64_t nanos_since_epoch_;
};

class DurationUtil
{
public:
template <class Rep, class Period>
static std::chrono::duration<Rep, Period> AdjustWaitForTimeout(
std::chrono::duration<Rep, Period> timeout,
std::chrono::duration<Rep, Period> indefinite_value) noexcept
{
// Do not call now() when this duration is max value, now() may have a expensive cost.
if (timeout == std::chrono::duration<Rep, Period>::max())
{
return indefinite_value;
}

// std::future<T>::wait_for, std::this_thread::sleep_for, and std::condition_variable::wait_for
// may use steady_clock or system_clock.We need make sure now() + timeout do not overflow.
auto max_timeout = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now());
if (timeout >= max_timeout)
{
return indefinite_value;
}
max_timeout = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
std::chrono::system_clock::time_point::max() - std::chrono::system_clock::now());
if (timeout >= max_timeout)
{
return indefinite_value;
}

return timeout;
}
};

} // namespace common
OPENTELEMETRY_END_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records) noexcept override;

/**
* Exports a vector of log records to the Elasticsearch instance asynchronously.
* @param records A list of log records to send to Elasticsearch.
* @param result_callback callback function accepting ExportResult as argument
*/
void Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;

/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return
Expand Down
132 changes: 130 additions & 2 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,89 @@ class ResponseHandler : public http_client::EventHandler
bool console_debug_ = false;
};

/**
* This class handles the async response message from the Elasticsearch request
*/
class AsyncResponseHandler : public http_client::EventHandler
{
public:
/**
* Creates a response handler, that by default doesn't display to console
*/
AsyncResponseHandler(
std::shared_ptr<ext::http::client::Session> session,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback,
bool console_debug = false)
: console_debug_{console_debug},
session_{std::move(session)},
result_callback_{std::move(result_callback)}
{}

/**
* Cleans up the session in the destructor.
*/
~AsyncResponseHandler() { session_->FinishSession(); }

/**
* Automatically called when the response is received
*/
void OnResponse(http_client::Response &response) noexcept override
{

// Store the body of the request
body_ = std::string(response.GetBody().begin(), response.GetBody().end());
if (body_.find("\"failed\" : 0") == std::string::npos)
{
OTEL_INTERNAL_LOG_ERROR(
"[ES Trace Exporter] Logs were not written to Elasticsearch correctly, response body: "
<< body_);
result_callback_(sdk::common::ExportResult::kFailure);
}
else
{
result_callback_(sdk::common::ExportResult::kSuccess);
}
}

// Callback method when an http event occurs
void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override
{
// If any failure event occurs, release the condition variable to unblock main thread
switch (state)
{
case http_client::SessionState::ConnectFailed:
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Connection to elasticsearch failed");
break;
case http_client::SessionState::SendFailed:
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request failed to be sent to elasticsearch");

break;
case http_client::SessionState::TimedOut:
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request to elasticsearch timed out");

break;
case http_client::SessionState::NetworkError:
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Network error to elasticsearch");
break;
default:
break;
}
result_callback_(sdk::common::ExportResult::kFailure);
}

private:
// Stores the session object for the request
std::shared_ptr<ext::http::client::Session> session_;
// Callback to call to on receiving events
std::function<bool(opentelemetry::sdk::common::ExportResult)> result_callback_;

// A string to store the response body
std::string body_ = "";

// Whether to print the results from the callback
bool console_debug_ = false;
};

ElasticsearchLogExporter::ElasticsearchLogExporter()
: options_{ElasticsearchExporterOptions()},
http_client_{new ext::http::client::curl::HttpClient()}
Expand Down Expand Up @@ -162,8 +245,8 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
request->SetBody(body_vec);

// Send the request
std::unique_ptr<ResponseHandler> handler(new ResponseHandler(options_.console_debug_));
session->SendRequest(*handler);
auto handler = std::make_shared<ResponseHandler>(options_.console_debug_);
session->SendRequest(handler);

// Wait for the response to be received
if (options_.console_debug_)
Expand Down Expand Up @@ -198,6 +281,51 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

void ElasticsearchLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
// Return failure if this exporter has been shutdown
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Exporting "
<< records.size() << " log(s) failed, exporter is shutdown");
return;
}

// Create a connection to the ElasticSearch instance
auto session = http_client_->CreateSession(options_.host_ + std::to_string(options_.port_));
auto request = session->CreateRequest();

// Populate the request with headers and methods
request->SetUri(options_.index_ + "/_bulk?pretty");
request->SetMethod(http_client::Method::Post);
request->AddHeader("Content-Type", "application/json");
request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_));

// Create the request body
std::string body = "";
for (auto &record : records)
{
// Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified
// in URI
body += "{\"index\" : {}}\n";

// Add the context of the Recordable
auto json_record = std::unique_ptr<ElasticSearchRecordable>(
static_cast<ElasticSearchRecordable *>(record.release()));
body += json_record->GetJSON().dump() + "\n";
}
std::vector<uint8_t> body_vec(body.begin(), body.end());
request->SetBody(body_vec);

// Send the request
auto handler = std::make_shared<AsyncResponseHandler>(session, std::move(result_callback),
options_.console_debug_);
session->SendRequest(handler);
}

bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
* @param result_callback callback function accepting ExportResult as argument
*/
void Export(const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;

/**
* Shutdown the exporter.
* @param timeout an option timeout, default to max.
Expand Down
9 changes: 9 additions & 0 deletions exporters/jaeger/src/jaeger_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ sdk_common::ExportResult JaegerExporter::Export(
return sdk_common::ExportResult::kSuccess;
}

void JaegerExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call");
auto status = Export(spans);
result_callback(status);
}

void JaegerExporter::InitializeEndpoint()
{
if (options_.transport_format == TransportFormat::kThriftUdpCompact)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
return sdk::common::ExportResult::kSuccess;
}

/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
* @param result_callback callback function accepting ExportResult as argument
*/
void Export(const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override
{
OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call");
auto status = Export(spans);
result_callback(status);
}

/**
* @param timeout an optional value containing the timeout of the exporter
* note: passing custom timeout values is not currently supported for this exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records) noexcept
override;

/**
* Exports a span of logs sent from the processor asynchronously.
*/
void Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept;

/**
* Marks the OStream Log Exporter as shut down.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans) noexcept override;

void Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;

bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

Expand Down
11 changes: 10 additions & 1 deletion exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,16 @@ sdk::common::ExportResult OStreamLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

bool OStreamLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
void OStreamLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
// Do not have async support
auto result = Export(records);
result_callback(result);
}

bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
Expand Down
8 changes: 8 additions & 0 deletions exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ sdk::common::ExportResult OStreamSpanExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

void OStreamSpanExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
auto result = Export(spans);
result_callback(result);
}

bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept override;

/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
* @param result_callback callback function accepting ExportResult as argument
*/
virtual void Export(const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;

/**
* Shut down the exporter.
* @param timeout an optional timeout, the default timeout of 0 means that no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records) noexcept
override;

/**
* Exports a vector of log records asynchronously.
* @param records A list of log records.
* @param result_callback callback function accepting ExportResult as argument
*/
virtual void Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;

/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return.
Expand Down
Loading