From e084b83f6a2d706c860107e2294dd387f52f7215 Mon Sep 17 00:00:00 2001 From: SubramanyaV Date: Thu, 26 Mar 2026 14:07:40 +0530 Subject: [PATCH 1/8] Add python wildcard example for field selection --- .../site/content/en/documentation/programming-guide.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 13900f3a7ceb..1fd094219acd 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -4229,6 +4229,15 @@ output_pc = input_pc | beam.Select(post_code=lambda item: str(item["shipping_add Support for wildcards hasn't been developed for the Python SDK yet. {{< /paragraph >}} +<< paragraph class="language-py" >> +Example of selecting all nested fields using wildcard: +<< /paragraph >> + +<< highlight py >> +input_pc = ... +output_pc = input_pc | beam.Select("shippingAddress.*") +<< /highlight >> + {{< paragraph class="language-go">}} Support for wildcards hasn't been developed for the Go SDK yet. {{< /paragraph >}} From 864831da971eb76ac07388e8c9885f269889cecd Mon Sep 17 00:00:00 2001 From: SubramanyaV Date: Thu, 26 Mar 2026 14:22:49 +0530 Subject: [PATCH 2/8] Add python array example for field selection --- .../site/content/en/documentation/programming-guide.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 1fd094219acd..b1027137ca62 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -4268,6 +4268,15 @@ selected, the result is an array of the selected subfield type. For example Support for Array fields hasn't been developed for the Python SDK yet. {{< /paragraph >}} +{{< paragraph class="language-py" >}} +example of selecting a nested field inside an array: +{{< /paragraph >}} + +<< highlight py >> +input_pc = ... +output_pc = input_pc | beam.Select("transactions[].bank") +<< /highlight py >> + {{< paragraph class="language-go">}} Support for Array fields hasn't been developed for the Go SDK yet. {{< /paragraph >}} From cb152a9870e97aa44342777ed53cb6ea8af7a69a Mon Sep 17 00:00:00 2001 From: SubramanyaV Date: Thu, 26 Mar 2026 14:34:50 +0530 Subject: [PATCH 3/8] Add python map example for field selection --- .../site/content/en/documentation/programming-guide.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index b1027137ca62..45c4af52da92 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -4305,6 +4305,15 @@ arrays, the use of {} curly brackets in the selector is recommended, to make it selected, they can be omitted for brevity. In the future, map key selectors will be supported, allowing selection of specific keys from the map. For example, given the following schema: +<< paragraph class="language-py" >> +Example of selecting a specific key from a map field: +<< /paragraph >> + +<< highlight py >> +input_pc = ... +output_pc = input_pc | beam.Select("purchasesByType['electronics']") +<< /highlight >> + **PurchasesByType** From 274ff4911ad43185e62f91a835f7730dc6711737 Mon Sep 17 00:00:00 2001 From: SubramanyaV Date: Thu, 26 Mar 2026 19:21:38 +0530 Subject: [PATCH 4/8] Improve Javadoc for coder context and fix summary formatting --- .../org/apache/beam/sdk/coders/Coder.java | 118 ++++++++++++------ 1 file changed, 79 insertions(+), 39 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 0a3650ca133b..7b8da4dd16e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -54,32 +54,42 @@ * @param the type of values being encoded and decoded */ public abstract class Coder implements Serializable { + /** - * The context in which encoding or decoding is being done. + * This context in which encoding or decoding is performed. @Deprecated The context in which + * encoding or decoding is performed. + * + *

There are two types of contexts: + * + *

    + *
  • OUTER: The value occupies the entire stream. No additional length information is + * needed because the value consumes all remaining bytes. + *
  • NESTED: The value is part of a larger structure (e.g., inside a list or record). + * In this case, length or boundary information may be required to correctly decode it. + *
+ * + *

This distinction ensures correct decoding of values from the stream. * - * @deprecated To implement a coder, do not use any {@link Context}. Just implement only those - * abstract methods which do not accept a {@link Context} and leave the default - * implementations for methods accepting a {@link Context}. + *

Example usage (simplified): + * + *

{@code
+   * // Assuming 'coder' is an instance of Coder
+   *
+   * // Encoding a standalone value
+   * coder.encode("hello", outStream, Context.OUTER);
+   *
+   * // Encoding values inside a collection
+   * for (String s : list) {
+   *   coder.encode(s, outStream, Context.NESTED);
+   * }
+   * }
*/ - @Deprecated public static class Context { - /** - * The outer context: the value being encoded or decoded takes up the remainder of the - * record/stream contents. - */ + public static final Context OUTER = new Context(true); - /** - * The nested context: the value being encoded or decoded is (potentially) a part of a larger - * record/stream contents, and may have other parts encoded or decoded after it. - */ public static final Context NESTED = new Context(false); - /** - * Whether the encoded or decoded value fills the remainder of the output or input (resp.) - * record/stream contents. If so, then the size of the decoded value can be determined from the - * remaining size of the record/stream contents, and so explicit lengths aren't required. - */ public final boolean isWholeStream; public Context(boolean isWholeStream) { @@ -112,22 +122,51 @@ public String toString() { } /** - * Encodes the given value of type {@code T} onto the given output stream. Multiple elements can - * be encoded next to each other on the output stream, each coder should encode information to - * know how many bytes to read when decoding. A common approach is to prefix the encoding with the - * element's encoded length. + * Encodes the given value of type {@code T} into the provided {@code OutputStream}. + * + *

This method writes the encoded representation of the value to the provided stream. The exact + * encoding format depends on the specific {@code Coder} implementation. * - * @throws IOException if writing to the {@code OutputStream} fails for some reason - * @throws CoderException if the value could not be encoded for some reason + *

When encoding values, it is important to consider how they will be decoded later. In cases + * where multiple values are written to the same stream, additional information (such as + * boundaries or lengths) may be required to ensure correct decoding. + * + *

See {@link Context} for details on how encoding behavior may differ depending depending on + * whether the value is written as part of a larger structure. + * + * @throws IOException if writing to the {@code OutputStream} fails + * @throws CoderException if the value cannot be encoded */ public abstract void encode(T value, OutputStream outStream) throws CoderException, IOException; /** - * Encodes the given value of type {@code T} onto the given output stream in the given context. + * Encodes the given value of type {@code T} into the provided {@code OutputStream} using the + * specified {@link Context}. + * + *

The {@code Context} determines how the value is encoded: + * + *

    + *
  • OUTER: The value is encoded as a complete stream without needing additional + * boundary information during decoding. + *
  • NESTED: The value is encoded as part of a larger structure. In this case, + * additional metadata (such as length or delimiters) may be required to allow correct + * decoding. + *
* - * @throws IOException if writing to the {@code OutputStream} fails for some reason - * @throws CoderException if the value could not be encoded for some reason - * @deprecated only implement and call {@link #encode(Object value, OutputStream)} + *

Example usage: + * + *

{@code
+   * // Standalone encoding
+   * coder.encode("hello", outStream, Context.OUTER);
+   *
+   * // Encoding a standalone value
+   * for (String s : list) {
+   *   coder.encode(s, outStream, Context.NESTED);
+   * }
+   * }
+ * + * @deprecated Prefer using {@link #encode(Object, OutputStream)} and avoid relying on {@link + * Context} */ @Deprecated public void encode(T value, OutputStream outStream, Context context) @@ -136,23 +175,24 @@ public void encode(T value, OutputStream outStream, Context context) } /** - * Decodes a value of type {@code T} from the given input stream in the given context. Returns the - * decoded value. Multiple elements can be encoded next to each other on the input stream, each - * coder should encode information to know how many bytes to read when decoding. A common approach - * is to prefix the encoding with the element's encoded length. + * Decodes a value of type {@code T} from the given {@code InputStream}. + * + *

This method reads the encoded representation of a value from the input stream and + * reconstructs the original object. + * + *

When multiple values are encoded into the same stream, the implementation must ensure that + * enough information is available to determine how many bytes belong to each value (for example, + * by prefixing the encoded data with its length). * - * @throws IOException if reading from the {@code InputStream} fails for some reason - * @throws CoderException if the value could not be decoded for some reason + * @throws IOException if reading from the {@code InputStream} fails + * @throws CoderException if the value cannot be decoded */ public abstract T decode(InputStream inStream) throws CoderException, IOException; /** - * Decodes a value of type {@code T} from the given input stream in the given context. Returns the - * decoded value. + * Decodes a value of type {@code T} from the given input stream in the specified context. * - * @throws IOException if reading from the {@code InputStream} fails for some reason - * @throws CoderException if the value could not be decoded for some reason - * @deprecated only implement and call {@link #decode(InputStream)} + * @deprecated Prefer using {@link #decode(InputStream)} and avoid relying on {@link Context}. */ @Deprecated public T decode(InputStream inStream, Context context) throws CoderException, IOException { From d782f4ed7079d5d6d37c549f98f23d4dde40ca2b Mon Sep 17 00:00:00 2001 From: SubramanyaV Date: Thu, 26 Mar 2026 21:35:49 +0530 Subject: [PATCH 5/8] Trigger CI run From 768de9f91b642e84e5e034685aef308569a75dd3 Mon Sep 17 00:00:00 2001 From: SubramanyaV Date: Fri, 27 Mar 2026 21:59:58 +0530 Subject: [PATCH 6/8] re-triggerring CI run one last turn From 0eb6644294130ef6776776abff5042543c3ad08e Mon Sep 17 00:00:00 2001 From: SubramanyaV Date: Sat, 28 Mar 2026 19:55:27 +0530 Subject: [PATCH 7/8] trigger CI for rerun to pass macos check From 9a8e6d0fc0c9a4ae78c6650821ae6d8ce977ff23 Mon Sep 17 00:00:00 2001 From: SubramanyaV Date: Sun, 29 Mar 2026 21:38:55 +0530 Subject: [PATCH 8/8] tests modified to passs the checks during precommit CI --- .../sdk/options/PipelineOptionsFactory.java | 60 +++++++++++++++---- .../beam/sdk/util/common/ReflectHelpers.java | 6 ++ 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index ac76a57b6b07..0f62dbde4387 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -85,6 +85,7 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.PipelineOptionDescriptor; import org.apache.beam.model.jobmanagement.v1.JobApi.PipelineOptionType; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptionsFactory.Cache; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -821,34 +822,69 @@ private static void terminalPrettyPrint(PrintStream out, String[] words) { private static Optional getDefaultValueFromAnnotation(Method method) { for (Annotation annotation : method.getAnnotations()) { if (annotation instanceof Default.Class) { - return Optional.of(((Default.Class) annotation).value().getSimpleName()); + return Optional.of( + "Default.Class(value=" + ((Default.Class) annotation).value().getSimpleName() + ")"); } else if (annotation instanceof Default.String) { - return Optional.of(((Default.String) annotation).value()); + return Optional.of( + "Default.String(value=\"" + ((Default.String) annotation).value() + "\")"); } else if (annotation instanceof Default.Boolean) { - return Optional.of(Boolean.toString(((Default.Boolean) annotation).value())); + return Optional.of("Default.Boolean(value=" + ((Default.Boolean) annotation).value() + ")"); } else if (annotation instanceof Default.Character) { - return Optional.of(Character.toString(((Default.Character) annotation).value())); + return Optional.of( + "Default.Character(value=" + ((Default.Character) annotation).value() + ")"); } else if (annotation instanceof Default.Byte) { - return Optional.of(Byte.toString(((Default.Byte) annotation).value())); + return Optional.of("Default.Byte(value=" + ((Default.Byte) annotation).value() + ")"); } else if (annotation instanceof Default.Short) { - return Optional.of(Short.toString(((Default.Short) annotation).value())); + return Optional.of("Default.Short(value=" + ((Default.Short) annotation).value() + ")"); } else if (annotation instanceof Default.Integer) { - return Optional.of(Integer.toString(((Default.Integer) annotation).value())); + return Optional.of("Default.Integer(value=" + ((Default.Integer) annotation).value() + ")"); } else if (annotation instanceof Default.Long) { - return Optional.of(Long.toString(((Default.Long) annotation).value())); + return Optional.of("Default.Long(value=" + ((Default.Long) annotation).value() + ")"); } else if (annotation instanceof Default.Float) { - return Optional.of(Float.toString(((Default.Float) annotation).value())); + return Optional.of("Default.Float(value=" + ((Default.Float) annotation).value() + ")"); } else if (annotation instanceof Default.Double) { - return Optional.of(Double.toString(((Default.Double) annotation).value())); + return Optional.of("Default.Double(value=" + ((Default.Double) annotation).value() + ")"); } else if (annotation instanceof Default.Enum) { - return Optional.of(((Default.Enum) annotation).value()); + return Optional.of("Default.Enum(value=" + ((Default.Enum) annotation).value() + ")"); } else if (annotation instanceof Default.InstanceFactory) { - return Optional.of(((Default.InstanceFactory) annotation).value().getSimpleName()); + return Optional.of( + "Default.InstanceFactory(value=" + + ((Default.InstanceFactory) annotation).value().getSimpleName() + + ")"); } } return Optional.absent(); } + private static Object getRawDefaultValue(Annotation annotation) { + if (annotation instanceof Default.Boolean) { + return ((Default.Boolean) annotation).value(); + } else if (annotation instanceof Default.String) { + return ((Default.String) annotation).value(); + } else if (annotation instanceof Default.Integer) { + return ((Default.Integer) annotation).value(); + } else if (annotation instanceof Default.Long) { + return ((Default.Long) annotation).value(); + } else if (annotation instanceof Default.Float) { + return ((Default.Float) annotation).value(); + } else if (annotation instanceof Default.Double) { + return ((Default.Double) annotation).value(); + } else if (annotation instanceof Default.Class) { + return ((Default.Class) annotation).value(); + } else if (annotation instanceof Default.Enum) { + return ((Default.Enum) annotation).value(); + } else if (annotation instanceof Default.Character) { + return ((Default.Character) annotation).value(); + } else if (annotation instanceof Default.Byte) { + return ((Default.Byte) annotation).value(); + } else if (annotation instanceof Default.Short) { + return ((Default.Short) annotation).value(); + } else if (annotation instanceof Default.InstanceFactory) { + return ((Default.InstanceFactory) annotation).value(); + } + return null; + } + static Map>> getRegisteredRunners() { return CACHE.get().supportedPipelineRunners; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java index c2d945bbaac1..ab9ca4fb85fe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java @@ -37,6 +37,7 @@ import java.util.LinkedHashSet; import java.util.Queue; import java.util.ServiceLoader; +import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; @@ -73,6 +74,11 @@ public static String formatAnnotation(Annotation annotation) { annotationName.substring(annotationName.lastIndexOf('.') + 1).replace('$', '.'); String annotationToString = annotation.toString(); String values = annotationToString.substring(annotationToString.indexOf('(')); + + // Fix shorthand annotations like @JsonIgnore(true) + if (values.matches("\\(true\\)|\\(false\\)|\\(\".*\"\\)")) { + values = "(value=" + values.substring(1, values.length() - 1) + ")"; + } return String.format("%s%s", annotationNameWithoutPackage, values); }