Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ class BeamModulePlugin implements Plugin<Project> {
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
def netty_version = "4.1.124.Final"
// [bomupgrader] determined by: io.opentelemetry:opentelemetry-sdk, consistent with: google_cloud_platform_libraries_bom
def opentelemetry_version = "1.51.0"
def opentelemetry_sdk_version = "1.56.0"
def opentelemetry_contrib_version = "1.52.0"
def postgres_version = "42.2.16"
// [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom
def protobuf_version = "4.33.2"
Expand Down Expand Up @@ -856,7 +857,12 @@ class BeamModulePlugin implements Plugin<Project> {
netty_transport : "io.netty:netty-transport:$netty_version",
netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version",
opentelemetry_api : "io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom sets version
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", // alpha required by extensions
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_sdk_version-alpha", // alpha required by extensions
opentelemetry_context : "io.opentelemetry:opentelemetry-context", // google_cloud_platform_libraries_bom sets version
opentelemetry_gcp_auth : "io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_contrib_version-alpha",
opentelemetry_sdk : "io.opentelemetry:opentelemetry-sdk", // google_cloud_platform_libraries_bom sets version
opentelemetry_exporter_otlp : "io.opentelemetry:opentelemetry-exporter-otlp", // google_cloud_platform_libraries_bom sets version
opentelemetry_extension_autoconfigure : "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure", // google_cloud_platform_libraries_bom sets version
postgres : "org.postgresql:postgresql:$postgres_version",
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp)
element.getPaneInfo(),
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain()));
element.causedByDrain(),
element.getOpenTelemetryContext()));
}

@Override
Expand All @@ -474,7 +475,8 @@ public <T> void outputWindowedValue(
paneInfo,
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain()));
element.causedByDrain(),
element.getOpenTelemetryContext()));
}

private void noteOutput() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ public String getErrorContext() {
read.getPaneInfo(),
read.getRecordId(),
read.getRecordOffset(),
CausedByDrain.CAUSED_BY_DRAIN);
CausedByDrain.CAUSED_BY_DRAIN,
read.getOpenTelemetryContext());
}
elementAndRestriction = KV.of(read, restrictionState.read());
watermarkEstimatorStateT = watermarkEstimatorState.read();
Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ dependencies {
permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761
implementation library.java.google_cloud_logging
permitUnusedDeclared library.java.google_cloud_logging // BEAM-11761
implementation library.java.opentelemetry_context
implementation library.java.hamcrest
implementation library.java.jackson_annotations
implementation library.java.jackson_core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentelemetry.context.Context;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -1404,6 +1405,11 @@ public PaneInfo getPaneInfo() {
return null;
}

@Override
public @Nullable Context getOpenTelemetryContext() {
return null;
}

@Override
public @Nullable Long getRecordOffset() {
return null;
Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ dependencies {
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation library.java.joda_time
implementation library.java.opentelemetry_context
implementation library.java.slf4j_api
implementation library.java.vendored_grpc_1_69_0
implementation library.java.error_prone_annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,28 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
@SuppressWarnings("unchecked")
T result =
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
// todo #37030 parse context from previous stage
return WindowedValues.of(
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
result,
timestampMillis,
windows,
paneInfo,
null,
null,
drainingValueFromUpstream,
null);
} else {
notifyElementRead(data.available() + metadata.available());
// todo #37030 parse context from previous stage
return WindowedValues.of(
decode(valueCoder, data),
timestampMillis,
windows,
paneInfo,
null,
null,
drainingValueFromUpstream);
drainingValueFromUpstream,
null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Iterable<TimerData> timersIterable() {
InputStream inputStream = message.getData().newInput();
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
return WindowedValues.of(
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream);
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream, null);
} catch (RuntimeException | IOException e) {
if (!skipUndecodableElements) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.util;

import io.opentelemetry.context.Context;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
Expand Down Expand Up @@ -65,6 +66,11 @@ public CausedByDrain causedByDrain() {
return CausedByDrain.NORMAL;
}

@Override
public @Nullable Context getOpenTelemetryContext() {
return null;
}

@Override
public Iterable<WindowedValue<T>> explodeWindows() {
return Collections.emptyList();
Expand Down
1 change: 1 addition & 0 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ dependencies {
implementation library.java.jackson_annotations
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.opentelemetry_context
implementation library.java.commons_lang3
implementation library.java.args4j
implementation project(path: ":model:fn-execution", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.spark.util;

import io.opentelemetry.context.Context;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -116,6 +117,11 @@ public PaneInfo getPaneInfo() {
return null;
}

@Override
public @Nullable Context getOpenTelemetryContext() {
return null;
}

@Override
public CausedByDrain causedByDrain() {
return CausedByDrain.NORMAL;
Expand Down
8 changes: 4 additions & 4 deletions sdks/java/container/license_scripts/dep_urls_java.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ org.eclipse.jgit:
license: "https://www.eclipse.org/org/documents/edl-v10.html"
type: "Eclipse Distribution License - v1.0"
opentelemetry-bom:
'1.51.0':
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.51.0/LICENSE"
'1.56.0':
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.56.0/LICENSE"
type: "Apache License 2.0"
opentelemetry-bom-alpha:
'1.51.0-alpha':
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.51.0/LICENSE"
'1.56.0-alpha':
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.56.0/LICENSE"
type: "Apache License 2.0"
zstd-jni:
'1.5.2-5':
Expand Down
1 change: 1 addition & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ dependencies {
shadow library.java.jackson_databind
shadow platform(library.java.opentelemetry_bom)
shadow library.java.opentelemetry_api
shadow library.java.opentelemetry_context
shadow library.java.slf4j_api
shadow library.java.snappy_java
shadow library.java.joda_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWind
PaneInfo.NO_FIRING,
null,
null,
CausedByDrain.NORMAL));
CausedByDrain.NORMAL,
null));
}
};
}
Expand Down Expand Up @@ -646,7 +647,8 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
element.getPaneInfo(),
null,
null,
CausedByDrain.NORMAL));
CausedByDrain.NORMAL,
null));
}

@Override
Expand All @@ -660,7 +662,7 @@ public <T> void outputWindowedValue(
getMutableOutput(tag)
.add(
ValueInSingleWindow.of(
output, timestamp, w, paneInfo, null, null, CausedByDrain.NORMAL));
output, timestamp, w, paneInfo, null, null, CausedByDrain.NORMAL, null));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.values;

import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapSetter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;

class OpenTelemetryContextPropagator {

private static final TextMapSetter<BeamFnApi.Elements.ElementMetadata.Builder> SETTER =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like errorprone is going to tell you to make this a static method and use a method reference.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't because TextMapSetter is custom interface. I will keep it as is, as Getter is not a lambda.

(carrier, key, value) -> {
if (carrier == null) {
return;
}
if ("traceparent".equals(key)) {
carrier.setTraceparent(value);
} else if ("tracestate".equals(key)) {
carrier.setTracestate(value);
}
};

private static final TextMapGetter<BeamFnApi.Elements.ElementMetadata> GETTER =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

new TextMapGetter<BeamFnApi.Elements.ElementMetadata>() {
@Override
public Iterable<String> keys(BeamFnApi.Elements.ElementMetadata carrier) {
return Lists.newArrayList("traceparent", "tracestate");
}

@Override
public @Nullable String get(
BeamFnApi.Elements.@Nullable ElementMetadata carrier, String key) {
if (carrier == null) {
return null;
}
if ("traceparent".equals(key)) {
return carrier.getTraceparent();
} else if ("tracestate".equals(key)) {
return carrier.getTracestate();
}
return null;
}
};

static void set(Context from, BeamFnApi.Elements.ElementMetadata.Builder builder) {
W3CTraceContextPropagator.getInstance().inject(from, builder, SETTER);
}

static Context read(BeamFnApi.Elements.ElementMetadata from) {
return W3CTraceContextPropagator.getInstance().extract(Context.root(), from, GETTER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.values;

import io.opentelemetry.context.Context;
import java.util.Collection;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand Down Expand Up @@ -50,5 +51,7 @@ public interface OutputBuilder<T> extends WindowedValue<T> {

OutputBuilder<T> setCausedByDrain(CausedByDrain causedByDrain);

OutputBuilder<T> setOpenTelemetryContext(@Nullable Context openTelemetryContext);

void output();
}
Loading
Loading