Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@

## New Features / Improvements

* (Python) Removed the `envoy-data-plane` (and transitive `betterproto`) dependency; `EnvoyRateLimiter` now uses a small vendored protobuf definition instead, resolving dependency conflicts for downstream projects ([#37854](https://github.com/apache/beam/issues/37854)).
* Dataflow Runner v2 has been renamed to Dataflow Portable Runner. Please refer to Dataflow [public documentation](https://docs.cloud.google.com/dataflow/docs/runner-v2) on when to enable Portable Runner.([#39000](https://github.com/apache/beam/issues/39000)).
* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` or `--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)).
* (Python) Added instrumentation to support off-the-shelf profiling agents when launching Python SDK Harness ([#38853](https://github.com/apache/beam/issues/38853)).
Expand Down
1 change: 1 addition & 0 deletions sdks/python/.yapfignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py
apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py
apache_beam/coders/proto2_coder_test_messages_pb2.py
apache_beam/io/components/rate_limit_pb2.py
apache_beam/portability/api/*

# Avoid excessive wrapping.
Expand Down
64 changes: 64 additions & 0 deletions sdks/python/apache_beam/io/components/rate_limit.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Minimal, self-contained subset of the Envoy Rate Limit Service protocol,
// used by EnvoyRateLimiter in rate_limiter.py. Only the fields the client
// touches are declared. Field numbers MUST match Envoy's rls.proto and
// ratelimit.proto so this is wire-compatible with a real Envoy RLS server
// (protobuf carries only field numbers and types on the wire, not message or
// package names). This lets Beam use its standard protobuf/grpcio stack
// instead of pulling in envoy-data-plane and its betterproto dependency.
//
// Regenerate rate_limit_pb2.py after editing this file:
// cd sdks/python
// python -m grpc_tools.protoc -I. --python_out=. \
// apache_beam/io/components/rate_limit.proto
// then prepend the Apache license header and remove the generated
// `runtime_version` guard so the module stays compatible with the full
// protobuf runtime range Beam supports (see setup.py).

syntax = "proto3";

package apache_beam.io.components.ratelimit;

import "google/protobuf/duration.proto";

message RateLimitDescriptor {
message Entry {
string key = 1;
string value = 2;
}
repeated Entry entries = 1;
}

message RateLimitRequest {
string domain = 1;
repeated RateLimitDescriptor descriptors = 2;
uint32 hits_addend = 3;
}

message RateLimitResponse {
enum Code {
UNKNOWN = 0;
OK = 1;
OVER_LIMIT = 2;
}
message DescriptorStatus {
Code code = 1;
google.protobuf.Duration duration_until_reset = 4;
}
Code overall_code = 1;
repeated DescriptorStatus statuses = 2;
}
63 changes: 63 additions & 0 deletions sdks/python/apache_beam/io/components/rate_limit_pb2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: apache_beam/io/components/rate_limit.proto
#
# Regenerate with (from sdks/python):
# python -m grpc_tools.protoc -I. --python_out=. \
# apache_beam/io/components/rate_limit.proto
# then re-apply this license header and delete the generated
# `runtime_version` guard so the module works across the full protobuf
# runtime range Beam supports (see setup.py).

"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()

from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2

DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n*apache_beam/io/components/rate_limit.proto\x12#apache_beam.io.components.ratelimit\x1a\x1egoogle/protobuf/duration.proto\"\x8b\x01\n\x13RateLimitDescriptor\x12O\n\x07\x65ntries\x18\x01 \x03(\x0b\x32>.apache_beam.io.components.ratelimit.RateLimitDescriptor.Entry\x1a#\n\x05\x45ntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"\x86\x01\n\x10RateLimitRequest\x12\x0e\n\x06\x64omain\x18\x01 \x01(\t\x12M\n\x0b\x64\x65scriptors\x18\x02 \x03(\x0b\x32\x38.apache_beam.io.components.ratelimit.RateLimitDescriptor\x12\x13\n\x0bhits_addend\x18\x03 \x01(\r\"\x87\x03\n\x11RateLimitResponse\x12Q\n\x0coverall_code\x18\x01 \x01(\x0e\x32;.apache_beam.io.components.ratelimit.RateLimitResponse.Code\x12Y\n\x08statuses\x18\x02 \x03(\x0b\x32G.apache_beam.io.components.ratelimit.RateLimitResponse.DescriptorStatus\x1a\x96\x01\n\x10\x44\x65scriptorStatus\x12I\n\x04\x63ode\x18\x01 \x01(\x0e\x32;.apache_beam.io.components.ratelimit.RateLimitResponse.Code\x12\x37\n\x14\x64uration_until_reset\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\"+\n\x04\x43ode\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x06\n\x02OK\x10\x01\x12\x0e\n\nOVER_LIMIT\x10\x02\x62\x06proto3'
)

_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, 'apache_beam.io.components.rate_limit_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals['_RATELIMITDESCRIPTOR']._serialized_start = 116
_globals['_RATELIMITDESCRIPTOR']._serialized_end = 255
_globals['_RATELIMITDESCRIPTOR_ENTRY']._serialized_start = 220
_globals['_RATELIMITDESCRIPTOR_ENTRY']._serialized_end = 255
_globals['_RATELIMITREQUEST']._serialized_start = 258
_globals['_RATELIMITREQUEST']._serialized_end = 392
_globals['_RATELIMITRESPONSE']._serialized_start = 395
_globals['_RATELIMITRESPONSE']._serialized_end = 786
_globals['_RATELIMITRESPONSE_DESCRIPTORSTATUS']._serialized_start = 591
_globals['_RATELIMITRESPONSE_DESCRIPTORSTATUS']._serialized_end = 741
_globals['_RATELIMITRESPONSE_CODE']._serialized_start = 743
_globals['_RATELIMITRESPONSE_CODE']._serialized_end = 786
# @@protoc_insertion_point(module_scope)
39 changes: 17 additions & 22 deletions sdks/python/apache_beam/io/components/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@
import time

import grpc
from envoy_data_plane.envoy.extensions.common.ratelimit.v3 import RateLimitDescriptor
from envoy_data_plane.envoy.extensions.common.ratelimit.v3 import RateLimitDescriptorEntry
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitRequest
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponse
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponseCode

from apache_beam.io.components import adaptive_throttler
from apache_beam.io.components import rate_limit_pb2
from apache_beam.metrics import Metrics

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -122,19 +118,18 @@ def __init__(
self._lock = threading.Lock()

class RateLimitServiceStub(object):
"""
Wrapper for gRPC stub to be compatible with envoy_data_plane messages.

The envoy-data-plane package uses 'betterproto' which generates async stubs
for 'grpclib'. As Beam uses standard synchronous 'grpcio',
RateLimitServiceStub is a bridge class to use the betterproto Message types
(RateLimitRequest) with a standard grpcio Channel.
"""
Minimal gRPC stub for the Envoy Rate Limit Service ShouldRateLimit method.

The method path is fixed by the Envoy RLS proto, so we bind it by hand
against a standard synchronous grpcio Channel using the protobuf message
types from rate_limit_pb2.
"""
def __init__(self, channel):
self.ShouldRateLimit = channel.unary_unary(
'/envoy.service.ratelimit.v3.RateLimitService/ShouldRateLimit',
request_serializer=RateLimitRequest.SerializeToString,
response_deserializer=RateLimitResponse.FromString,
request_serializer=rate_limit_pb2.RateLimitRequest.SerializeToString,
response_deserializer=rate_limit_pb2.RateLimitResponse.FromString,
)

def init_connection(self):
Expand Down Expand Up @@ -171,10 +166,11 @@ def allow(self, hits_added: int = 1) -> bool:
for d in self.descriptors:
entries = []
for k, v in d.items():
entries.append(RateLimitDescriptorEntry(key=k, value=v))
proto_descriptors.append(RateLimitDescriptor(entries=entries))
entries.append(rate_limit_pb2.RateLimitDescriptor.Entry(key=k, value=v))
proto_descriptors.append(
rate_limit_pb2.RateLimitDescriptor(entries=entries))

request = RateLimitRequest(
request = rate_limit_pb2.RateLimitRequest(
domain=self.domain,
descriptors=proto_descriptors,
hits_addend=hits_added)
Expand Down Expand Up @@ -205,22 +201,21 @@ def allow(self, hits_added: int = 1) -> bool:
e)
time.sleep(_RPC_RETRY_DELAY_SECONDS)

if response.overall_code == RateLimitResponseCode.OK:
if response.overall_code == rate_limit_pb2.RateLimitResponse.OK:
self.requests_allowed.inc()
throttled = True
break
elif response.overall_code == RateLimitResponseCode.OVER_LIMIT:
elif response.overall_code == rate_limit_pb2.RateLimitResponse.OVER_LIMIT:
self.requests_throttled.inc()
# Ratelimit exceeded, sleep for duration until reset and retry
# multiple rules can be set in the RLS config, so we need to find the
# max duration
sleep_s = 0.0
if response.statuses:
for status in response.statuses:
if status.code == RateLimitResponseCode.OVER_LIMIT:
if status.code == rate_limit_pb2.RateLimitResponse.OVER_LIMIT:
dur = status.duration_until_reset
# duration_until_reset is converted to timedelta by betterproto
val = dur.total_seconds()
val = dur.ToTimedelta().total_seconds()
if val > sleep_s:
sleep_s = val

Expand Down
62 changes: 50 additions & 12 deletions sdks/python/apache_beam/io/components/rate_limiter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
#

import unittest
from datetime import timedelta
from unittest import mock

import grpc
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponse
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponseCode
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponseDescriptorStatus
from google.protobuf.duration_pb2 import Duration

from apache_beam.io.components import rate_limit_pb2
from apache_beam.io.components import rate_limiter

RateLimitResponse = rate_limit_pb2.RateLimitResponse


class EnvoyRateLimiterTest(unittest.TestCase):
def setUp(self):
Expand All @@ -45,7 +45,7 @@ def setUp(self):
def test_allow_success(self, mock_channel):
# Mock successful OK response
mock_stub = mock.Mock()
mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
mock_response = RateLimitResponse(overall_code=RateLimitResponse.OK)
mock_stub.ShouldRateLimit.return_value = mock_response

# Inject mock stub
Expand All @@ -60,8 +60,7 @@ def test_allow_success(self, mock_channel):
def test_allow_over_limit_retries_exceeded(self, mock_channel):
# Mock OVER_LIMIT response
mock_stub = mock.Mock()
mock_response = RateLimitResponse(
overall_code=RateLimitResponseCode.OVER_LIMIT)
mock_response = RateLimitResponse(overall_code=RateLimitResponse.OVER_LIMIT)
mock_stub.ShouldRateLimit.return_value = mock_response

self.limiter._stub = mock_stub
Expand All @@ -86,7 +85,7 @@ def test_allow_over_limit_retries_exceeded(self, mock_channel):
def test_allow_rpc_error_retry(self, mock_channel):
# Mock RpcError then Success
mock_stub = mock.Mock()
mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
mock_response = RateLimitResponse(overall_code=RateLimitResponse.OK)

# Side effect: Error, Error, Success
error = grpc.RpcError()
Expand Down Expand Up @@ -123,11 +122,11 @@ def test_extract_duration_from_response(self, mock_random, mock_channel):
mock_stub = mock.Mock()

# Valid until 5 seconds
status = RateLimitResponseDescriptorStatus(
code=RateLimitResponseCode.OVER_LIMIT,
duration_until_reset=timedelta(seconds=5))
status = RateLimitResponse.DescriptorStatus(
code=RateLimitResponse.OVER_LIMIT,
duration_until_reset=Duration(seconds=5))
mock_response = RateLimitResponse(
overall_code=RateLimitResponseCode.OVER_LIMIT, statuses=[status])
overall_code=RateLimitResponse.OVER_LIMIT, statuses=[status])

mock_stub.ShouldRateLimit.return_value = mock_response
self.limiter._stub = mock_stub
Expand All @@ -139,5 +138,44 @@ def test_extract_duration_from_response(self, mock_random, mock_channel):
mock_sleep.assert_called_with(5.0)


class RateLimitWireFormatTest(unittest.TestCase):
"""Pins the on-the-wire layout of the vendored rate_limit_pb2 messages.

Wire compatibility with a real Envoy Rate Limit Service depends solely on
field numbers and types (protobuf carries neither message nor package names
on the wire), so these must stay in lockstep with Envoy's rls.proto and
ratelimit.proto. The mock-based tests above would pass even if a field were
renumbered; these golden-byte assertions fail if that ever happens.
"""
def test_request_wire_layout(self):
request = rate_limit_pb2.RateLimitRequest(
domain='d',
descriptors=[
rate_limit_pb2.RateLimitDescriptor(
entries=[
rate_limit_pb2.RateLimitDescriptor.Entry(
key='k', value='v')
])
],
hits_addend=1)
# domain=1 (LEN "d"); descriptors=2 (LEN {entries=1 (LEN {key=1 "k",
# value=2 "v"})}); hits_addend=3 (VARINT 1).
self.assertEqual(
request.SerializeToString().hex(), '0a016412080a060a016b1201761801')

def test_descriptor_status_wire_layout(self):
status = rate_limit_pb2.RateLimitResponse.DescriptorStatus(
code=rate_limit_pb2.RateLimitResponse.OVER_LIMIT,
duration_until_reset=Duration(seconds=5))
# code=1 (VARINT OVER_LIMIT=2); duration_until_reset=4 (LEN
# Duration{seconds=1 (VARINT 5)}).
self.assertEqual(status.SerializeToString().hex(), '080222020805')

def test_response_code_enum_values(self):
self.assertEqual(int(rate_limit_pb2.RateLimitResponse.UNKNOWN), 0)
self.assertEqual(int(rate_limit_pb2.RateLimitResponse.OK), 1)
self.assertEqual(int(rate_limit_pb2.RateLimitResponse.OVER_LIMIT), 2)


if __name__ == '__main__':
unittest.main()
2 changes: 0 additions & 2 deletions sdks/python/container/ml/py310/base_image_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ attrs==26.1.0
backports.tarfile==1.2.0
beartype==0.22.9
beautifulsoup4==4.15.0
betterproto==2.0.0b7
bs4==0.0.2
build==1.5.0
cachetools==6.2.6
Expand All @@ -52,7 +51,6 @@ distro==1.9.0
dnspython==2.8.0
docker==7.1.0
docstring_parser==0.18.0
envoy-data-plane==0.2.6
exceptiongroup==1.3.1
execnet==2.1.2
fastavro==1.12.2
Expand Down
2 changes: 0 additions & 2 deletions sdks/python/container/ml/py310/gpu_image_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ attrs==26.1.0
backports.tarfile==1.2.0
beartype==0.22.9
beautifulsoup4==4.15.0
betterproto==2.0.0b7
blake3==1.0.9
bs4==0.0.2
build==1.5.0
Expand Down Expand Up @@ -67,7 +66,6 @@ docker==7.1.0
docstring_parser==0.18.0
einops==0.8.2
email-validator==2.3.0
envoy-data-plane==0.2.6
exceptiongroup==1.3.1
execnet==2.1.2
fastapi==0.138.1
Expand Down
2 changes: 0 additions & 2 deletions sdks/python/container/ml/py311/base_image_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ attrs==26.1.0
backports.tarfile==1.2.0
beartype==0.22.9
beautifulsoup4==4.15.0
betterproto==2.0.0b6
bs4==0.0.2
build==1.5.0
cachetools==6.2.6
Expand All @@ -51,7 +50,6 @@ distro==1.9.0
dnspython==2.8.0
docker==7.1.0
docstring_parser==0.18.0
envoy_data_plane==1.0.3
execnet==2.1.2
fastavro==1.12.2
fasteners==0.20
Expand Down
Loading
Loading