Skip to content

Commit d8b9e07

Browse files
committed
Model changes to allow OpenTelemetry context propagation
add nullable
1 parent 8e0736a commit d8b9e07

File tree

19 files changed

+318
-70
lines changed

19 files changed

+318
-70
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,8 @@ class BeamModulePlugin implements Plugin<Project> {
634634
def netty_version = "4.1.124.Final"
635635
// [bomupgrader] determined by: io.opentelemetry:opentelemetry-sdk, consistent with: google_cloud_platform_libraries_bom
636636
def opentelemetry_version = "1.51.0"
637+
def opentelemetry_sdk_version = "1.56.0"
638+
def opentelemetry_contrib_version = "1.52.0"
637639
def postgres_version = "42.2.16"
638640
// [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom
639641
def protobuf_version = "4.33.2"
@@ -855,8 +857,13 @@ class BeamModulePlugin implements Plugin<Project> {
855857
netty_tcnative_boringssl_static : "io.netty:netty-tcnative-boringssl-static:2.0.52.Final",
856858
netty_transport : "io.netty:netty-transport:$netty_version",
857859
netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version",
858-
opentelemetry_api : "io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom sets version
859-
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", // alpha required by extensions
860+
opentelemetry_api : "io.opentelemetry:opentelemetry-api:$opentelemetry_sdk_version",
861+
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_sdk_version-alpha", // alpha required by extensions
862+
opentelemetry_context : "io.opentelemetry:opentelemetry-context:$opentelemetry_sdk_version",
863+
opentelemetry_gcp_auth : "io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_contrib_version-alpha",
864+
opentelemetry_sdk : "io.opentelemetry:opentelemetry-sdk:$opentelemetry_sdk_version",
865+
opentelemetry_exporter_otlp : "io.opentelemetry:opentelemetry-exporter-otlp:$opentelemetry_sdk_version",
866+
opentelemetry_extension_autoconfigure : "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:$opentelemetry_sdk_version",
860867
postgres : "org.postgresql:postgresql:$postgres_version",
861868
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
862869
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ dependencies {
112112
permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761
113113
implementation library.java.google_cloud_logging
114114
permitUnusedDeclared library.java.google_cloud_logging // BEAM-11761
115+
implementation library.java.opentelemetry_context
115116
implementation library.java.hamcrest
116117
implementation library.java.jackson_annotations
117118
implementation library.java.jackson_core

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2121

2222
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23+
import io.opentelemetry.context.Context;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.io.OutputStream;
@@ -1404,6 +1405,11 @@ public PaneInfo getPaneInfo() {
14041405
return null;
14051406
}
14061407

1408+
@Override
1409+
public @Nullable Context getOpenTelemetryContext() {
1410+
return null;
1411+
}
1412+
14071413
@Override
14081414
public @Nullable Long getRecordOffset() {
14091415
return null;

runners/google-cloud-dataflow-java/worker/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ dependencies {
212212
implementation library.java.jackson_core
213213
implementation library.java.jackson_databind
214214
implementation library.java.joda_time
215+
implementation library.java.opentelemetry_context
215216
implementation library.java.slf4j_api
216217
implementation library.java.vendored_grpc_1_69_0
217218
implementation library.java.error_prone_annotations

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,18 +155,28 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
155155
@SuppressWarnings("unchecked")
156156
T result =
157157
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
158+
// todo #37030 parse context from previous stage
158159
return WindowedValues.of(
159-
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
160+
result,
161+
timestampMillis,
162+
windows,
163+
paneInfo,
164+
null,
165+
null,
166+
drainingValueFromUpstream,
167+
null);
160168
} else {
161169
notifyElementRead(data.available() + metadata.available());
170+
// todo #37030 parse context from previous stage
162171
return WindowedValues.of(
163172
decode(valueCoder, data),
164173
timestampMillis,
165174
windows,
166175
paneInfo,
167176
null,
168177
null,
169-
drainingValueFromUpstream);
178+
drainingValueFromUpstream,
179+
null);
170180
}
171181
}
172182

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public Iterable<TimerData> timersIterable() {
159159
InputStream inputStream = message.getData().newInput();
160160
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
161161
return WindowedValues.of(
162-
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream);
162+
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream, null);
163163
} catch (RuntimeException | IOException e) {
164164
if (!skipUndecodableElements) {
165165
throw new RuntimeException(e);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker.util;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.Objects;
@@ -65,6 +66,11 @@ public CausedByDrain causedByDrain() {
6566
return CausedByDrain.NORMAL;
6667
}
6768

69+
@Override
70+
public @Nullable Context getOpenTelemetryContext() {
71+
return null;
72+
}
73+
6874
@Override
6975
public Iterable<WindowedValue<T>> explodeWindows() {
7076
return Collections.emptyList();

runners/spark/spark_runner.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ dependencies {
167167
implementation library.java.jackson_annotations
168168
implementation library.java.slf4j_api
169169
implementation library.java.joda_time
170+
implementation library.java.opentelemetry_context
170171
implementation library.java.commons_lang3
171172
implementation library.java.args4j
172173
implementation project(path: ":model:fn-execution", configuration: "shadow")

runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.spark.util;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.io.Serializable;
2122
import java.util.Collection;
2223
import java.util.Collections;
@@ -116,6 +117,11 @@ public PaneInfo getPaneInfo() {
116117
return null;
117118
}
118119

120+
@Override
121+
public @Nullable Context getOpenTelemetryContext() {
122+
return null;
123+
}
124+
119125
@Override
120126
public CausedByDrain causedByDrain() {
121127
return CausedByDrain.NORMAL;

sdks/java/container/license_scripts/dep_urls_java.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ org.eclipse.jgit:
6666
license: "https://www.eclipse.org/org/documents/edl-v10.html"
6767
type: "Eclipse Distribution License - v1.0"
6868
opentelemetry-bom:
69-
'1.51.0':
70-
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.51.0/LICENSE"
69+
'1.56.0':
70+
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.56.0/LICENSE"
7171
type: "Apache License 2.0"
7272
opentelemetry-bom-alpha:
73-
'1.51.0-alpha':
74-
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.51.0/LICENSE"
73+
'1.56.0-alpha':
74+
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.56.0/LICENSE"
7575
type: "Apache License 2.0"
7676
zstd-jni:
7777
'1.5.2-5':

0 commit comments

Comments
 (0)