Skip to content
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@

## New Features / Improvements

* (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)).
* (Python) Added a pipeline option `--experiments=pip_no_build_isolation` to disable build isolation when installing dependencies in the runtime environment ([#37331](https://github.com/apache/beam/issues/37331)).
* (Go) Added OrderedListState support to the Go SDK stateful DoFn API ([#37629](https://github.com/apache/beam/issues/37629)).
* Added support for large pipeline options via a file (Python) ([#37370](https://github.com/apache/beam/issues/37370)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Supported infer schema from dataclass (Python) ([#22085](https://github.com/apache/beam/issues/22085)). Default coder for typehint-ed (or set with_output_type) for non-frozen dataclasses changed to RowCoder. To preserve the old behavior (fast primitive coder), explicitly register the type with FastPrimitiveCoder.
* Updates minimum Go version to 1.26.1 ([#37897](https://github.com/apache/beam/issues/37897)).

Expand Down
93 changes: 93 additions & 0 deletions sdks/go/examples/ordered_list_state/ordered_list_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// ordered_list_state is a toy pipeline demonstrating the use of OrderedListState.
// It creates keyed elements with timestamps, stores them in ordered list state,
// and reads back sub-ranges to emit summaries per key.
package main

import (
"context"
"flag"
"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)

// eventLogFn accumulates timestamped events per key using OrderedListState
// and emits a summary of events seen so far.
type eventLogFn struct {
Events state.OrderedList[string]
}

func (fn *eventLogFn) ProcessElement(p state.Provider, key string, ts int64, emit func(string)) error {
// Store an event using the input value as the sort key.
event := fmt.Sprintf("event@%d", ts)
fn.Events.Add(p, ts, event)

// Read all events accumulated so far for this key.
entries, ok, err := fn.Events.Read(p)
if err != nil {
return err
}
if ok {
latest := entries[len(entries)-1]
emit(fmt.Sprintf("key=%s count=%d latest=%s (sort_key=%d)", key, len(entries), latest.Value, latest.SortKey))
}

return nil
}

func init() {
register.DoFn4x1[state.Provider, string, int64, func(string), error](&eventLogFn{})
register.Emitter1[string]()
register.Function1x2(toKeyed)
}

// toKeyed maps an integer to a KV pair of (key, timestamp).
func toKeyed(i int) (string, int64) {
return fmt.Sprintf("user-%d", i%3), int64(i * 1000)
}

func main() {
flag.Parse()
beam.Init()

ctx := context.Background()

p, s := beam.NewPipelineWithRoot()

// Create a small set of input elements.
impulse := beam.CreateList(s, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})

// Key and timestamp each element.
keyed := beam.ParDo(s, toKeyed, impulse)

// Apply the stateful DoFn with OrderedListState.
summaries := beam.ParDo(s, &eventLogFn{
Events: state.MakeOrderedListState[string]("events"),
}, keyed)

debug.Print(s, summaries)

if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
32 changes: 32 additions & 0 deletions sdks/go/examples/snippets/04transforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,38 @@ func combineState(s beam.Scope, input beam.PCollection) beam.PCollection {
return combined
}

// [START ordered_list_state]

// orderedListStateFn tracks timestamped events per key and reads a sub-range.
type orderedListStateFn struct {
Events state.OrderedList[string]
}

func (s *orderedListStateFn) ProcessElement(p state.Provider, key string, event string, emit func(string)) error {
// Add the event with the current timestamp as the sort key.
now := time.Now().UnixMilli()
s.Events.Add(p, now, event)

// Read a sub-range of events (e.g. the last hour).
oneHourAgo := now - 3600000
entries, ok, err := s.Events.ReadRange(p, oneHourAgo, now+1)
if err != nil {
return err
}
if ok {
for _, e := range entries {
emit(fmt.Sprintf("%s@%d", e.Value, e.SortKey))
}
}

// Clear events older than one hour.
s.Events.ClearRange(p, 0, oneHourAgo)

return nil
}

// [END ordered_list_state]

// [START event_time_timer]

type eventTimerDoFn struct {
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,10 +1368,10 @@ func validateState(fn *DoFn, numIn mainInputs) error {
"unique per DoFn", k, orig, s)
}
t := s.StateType()
if t != state.TypeValue && t != state.TypeBag && t != state.TypeCombining && t != state.TypeSet && t != state.TypeMap {
if t != state.TypeValue && t != state.TypeBag && t != state.TypeCombining && t != state.TypeSet && t != state.TypeMap && t != state.TypeOrderedList {
err := errors.Errorf("Unrecognized state type %v for state %v", t, s)
return errors.SetTopLevelMsgf(err, "Unrecognized state type %v for state %v. Currently the only supported state"+
"types are state.Value, state.Combining, state.Bag, state.Set, and state.Map", t, s)
"types are state.Value, state.Combining, state.Bag, state.Set, state.Map, and state.OrderedList", t, s)
}
stateKeys[k] = s
}
Expand Down
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ type StateReader interface {
OpenMultimapKeysUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error)
// OpenMultimapKeysUserStateClearer opens a byte stream for clearing all keys of user multimap state.
OpenMultimapKeysUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
// OpenOrderedListUserStateReader opens a byte stream for reading user ordered list state in the range [start, end).
OpenOrderedListUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.ReadCloser, error)
// OpenOrderedListUserStateAppender opens a byte stream for appending user ordered list state.
OpenOrderedListUserStateAppender(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
// OpenOrderedListUserStateClearer opens a byte stream for clearing user ordered list state in the range [start, end).
OpenOrderedListUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.Writer, error)
// GetSideInputCache returns the SideInputCache being used at the harness level.
GetSideInputCache() SideCache
}
Expand Down
15 changes: 15 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/sideinput_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,21 @@ func (t *testStateReader) OpenMultimapKeysUserStateClearer(ctx context.Context,
return nil, nil
}

// OpenOrderedListUserStateReader for the testStateReader is a no-op.
func (t *testStateReader) OpenOrderedListUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.ReadCloser, error) {
return nil, nil
}

// OpenOrderedListUserStateAppender for the testStateReader is a no-op.
func (t *testStateReader) OpenOrderedListUserStateAppender(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error) {
return nil, nil
}

// OpenOrderedListUserStateClearer for the testStateReader is a no-op.
func (t *testStateReader) OpenOrderedListUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.Writer, error) {
return nil, nil
}

func (t *testStateReader) GetSideInputCache() SideCache {
return &testSideCache{}
}
Expand Down
2 changes: 2 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
kcID = ms.KeyCoderId
} else if ss := spec.GetSetSpec(); ss != nil {
kcID = ss.ElementCoderId
} else if ols := spec.GetOrderedListSpec(); ols != nil {
cID = ols.ElementCoderId
} else {
return nil, errors.Errorf("Unrecognized state type %v", spec)
}
Expand Down
150 changes: 150 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/userstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"fmt"
"io"
"math"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"google.golang.org/protobuf/encoding/protowire"
)

type stateProvider struct {
Expand All @@ -41,6 +43,7 @@ type stateProvider struct {
blindBagWriteCountsByKey map[string]int // Tracks blind writes to bags before a read.
initialMapValuesByKey map[string]map[string]any
initialMapKeysByKey map[string][]any
initialOrderedListByKey map[string][]any
readersByKey map[string]io.ReadCloser
appendersByKey map[string]io.Writer
clearersByKey map[string]io.Writer
Expand Down Expand Up @@ -466,6 +469,152 @@ func (s *stateProvider) getMultiMapKeyReader(userStateID string) (io.ReadCloser,
return s.readersByKey[userStateID], nil
}

// ReadOrderedListState reads an ordered list state from the State API.
// It fetches the full range on first access and caches the result.
func (s *stateProvider) ReadOrderedListState(userStateID string) ([]any, []state.Transaction, error) {
initialValue, ok := s.initialOrderedListByKey[userStateID]
if !ok {
initialValue = []any{}
rw, err := s.getOrderedListReader(userStateID, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, err
}
for {
entry, err := decodeOrderedListEntry(rw, s.codersByKey[userStateID])
if err == io.EOF {
break
}
if err != nil {
return nil, nil, err
}
initialValue = append(initialValue, entry)
}
s.initialOrderedListByKey[userStateID] = initialValue
}

transactions, ok := s.transactionsByKey[userStateID]
if !ok {
transactions = []state.Transaction{}
}

return initialValue, transactions, nil
}

// WriteOrderedListState writes a single entry to the ordered list state.
// The wire format is: varint(sortKey) || coder_encoded(value).
func (s *stateProvider) WriteOrderedListState(val state.Transaction) error {
ap, err := s.getOrderedListAppender(val.Key)
if err != nil {
return err
}

sortKey := val.MapKey.(int64)
if err := encodeOrderedListEntry(sortKey, val.Val, ap, s.codersByKey[val.Key]); err != nil {
return err
}

if transactions, ok := s.transactionsByKey[val.Key]; ok {
s.transactionsByKey[val.Key] = append(transactions, val)
} else {
s.transactionsByKey[val.Key] = []state.Transaction{val}
}

return nil
}

// ClearOrderedListState clears entries in a range from the ordered list state.
func (s *stateProvider) ClearOrderedListState(val state.Transaction) error {
r := val.MapKey.([2]int64)
cl, err := s.getOrderedListClearer(val.Key, r[0], r[1])
if err != nil {
return err
}
_, err = cl.Write([]byte{})
if err != nil {
return err
}

if transactions, ok := s.transactionsByKey[val.Key]; ok {
s.transactionsByKey[val.Key] = append(transactions, val)
} else {
s.transactionsByKey[val.Key] = []state.Transaction{val}
}

return nil
}

func (s *stateProvider) getOrderedListReader(userStateID string, start, end int64) (io.ReadCloser, error) {
r, err := s.sr.OpenOrderedListUserStateReader(s.ctx, s.SID, userStateID, s.elementKey, s.window, start, end)
if err != nil {
return nil, err
}
return r, nil
}

func (s *stateProvider) getOrderedListAppender(userStateID string) (io.Writer, error) {
w, err := s.sr.OpenOrderedListUserStateAppender(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
return w, nil
}

func (s *stateProvider) getOrderedListClearer(userStateID string, start, end int64) (io.Writer, error) {
w, err := s.sr.OpenOrderedListUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window, start, end)
if err != nil {
return nil, err
}
return w, nil
}

// encodeOrderedListEntry writes varint(uint64(sortKey)) || coder_encoded(value) to w.
// The entire entry is buffered before writing so that each w.Write call
// delivers a complete entry (important when w is a stateKeyWriter that
// sends each Write as a separate gRPC Append request).
func encodeOrderedListEntry(sortKey int64, val any, w io.Writer, c *coder.Coder) error {
var buf bytes.Buffer
b := protowire.AppendVarint(nil, uint64(sortKey))
buf.Write(b)
fv := FullValue{Elm: val}
enc := MakeElementEncoder(coder.SkipW(c))
if err := enc.Encode(&fv, &buf); err != nil {
return err
}
_, err := w.Write(buf.Bytes())
return err
}

// decodeOrderedListEntry reads varint(sortKey) || coder_encoded(value) from r.
func decodeOrderedListEntry(r io.Reader, c *coder.Coder) (state.OrderedListEntry, error) {
// Read varint byte-by-byte.
var buf [10]byte // max varint size
var n int
for n = 0; n < len(buf); n++ {
_, err := r.Read(buf[n : n+1])
if err != nil {
if n == 0 {
return state.OrderedListEntry{}, err
}
return state.OrderedListEntry{}, fmt.Errorf("unexpected error reading varint: %w", err)
}
if buf[n]&0x80 == 0 {
n++
break
}
}
sortKey, consumed := protowire.ConsumeVarint(buf[:n])
if consumed < 0 {
return state.OrderedListEntry{}, fmt.Errorf("invalid varint in ordered list entry")
}

dec := MakeElementDecoder(coder.SkipW(c))
fv, err := dec.Decode(r)
if err != nil {
return state.OrderedListEntry{}, err
}
return state.OrderedListEntry{SortKey: int64(sortKey), Value: fv.Elm}, nil
}

func (s *stateProvider) encodeKey(userStateID string, key any) ([]byte, error) {
fv := FullValue{Elm: key}
enc := MakeElementEncoder(coder.SkipW(s.keyCodersByID[userStateID]))
Expand Down Expand Up @@ -533,6 +682,7 @@ func (s *userStateAdapter) NewStateProvider(ctx context.Context, reader StateRea
blindBagWriteCountsByKey: make(map[string]int),
initialMapValuesByKey: make(map[string]map[string]any),
initialMapKeysByKey: make(map[string][]any),
initialOrderedListByKey: make(map[string][]any),
readersByKey: make(map[string]io.ReadCloser),
appendersByKey: make(map[string]io.Writer),
clearersByKey: make(map[string]io.Writer),
Expand Down
Loading
Loading