From e609dbaa8b6c672bec0d15ce8d6d1af5c7f18c83 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 19 Jun 2026 14:55:37 +0200 Subject: [PATCH] ref(params): Remove JSON parameter encoding ref STREAM-1190 Co-Authored-By: Claude Opus 4.8 --- clients/python/pyproject.toml | 1 - .../python/src/taskbroker_client/constants.py | 13 -- clients/python/src/taskbroker_client/task.py | 46 +----- .../src/taskbroker_client/worker/client.py | 4 +- .../taskbroker_client/worker/workerchild.py | 24 +--- clients/python/tests/test_app.py | 3 +- clients/python/tests/test_registry.py | 10 +- clients/python/tests/test_task.py | 8 +- clients/python/tests/worker/test_worker.py | 100 +------------ src/kafka/deserialize_activation.rs | 135 +++++++----------- src/test_utils.rs | 18 ++- src/upkeep.rs | 22 ++- uv.lock | 2 - 13 files changed, 93 insertions(+), 293 deletions(-) diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml index 35a44cd6..2d320b8a 100644 --- a/clients/python/pyproject.toml +++ b/clients/python/pyproject.toml @@ -12,7 +12,6 @@ dependencies = [ "grpcio>=1.67.1", "grpcio-health-checking>=1.67.1", "msgpack>=1.0.0", - "orjson>=3.10.10", "protobuf>=5.28.3", "redis>=3.4.1", "redis-py-cluster>=2.1.0", diff --git a/clients/python/src/taskbroker_client/constants.py b/clients/python/src/taskbroker_client/constants.py index 66f113d5..817a70fb 100644 --- a/clients/python/src/taskbroker_client/constants.py +++ b/clients/python/src/taskbroker_client/constants.py @@ -85,16 +85,3 @@ class CompressionType(Enum): ZSTD = "zstd" PLAINTEXT = "plaintext" - - -class ParametersFormat(Enum): - """ - How the producer populates the legacy `parameters` (JSON) and new - `parameters_bytes` (msgpack) fields on TaskActivation. - - Set via env var `TASKBROKER_CLIENT_PARAMETERS_FORMAT`. Default BOTH. - """ - - BOTH = "both" - JSON = "json" - MSGPACK = "msgpack" diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 69cb1b8a..da604436 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -1,9 +1,7 @@ from __future__ import annotations -import base64 import datetime import inspect -import os import time from collections.abc import Callable, Collection, Mapping, MutableMapping from functools import update_wrapper @@ -11,7 +9,6 @@ from uuid import uuid4 import msgpack -import orjson import sentry_sdk import zstandard as zstd from google.protobuf.timestamp_pb2 import Timestamp @@ -25,22 +22,9 @@ DEFAULT_PROCESSING_DEADLINE, MAX_PARAMETER_BYTES_BEFORE_COMPRESSION, CompressionType, - ParametersFormat, ) from taskbroker_client.retry import Retry - -def _get_parameters_format() -> ParametersFormat: - raw = os.environ.get("TASKBROKER_CLIENT_PARAMETERS_FORMAT", ParametersFormat.BOTH.value) - try: - return ParametersFormat(raw.lower()) - except ValueError: - raise ValueError( - f"Invalid TASKBROKER_CLIENT_PARAMETERS_FORMAT={raw!r}. " - f"Expected one of: {', '.join(f.value for f in ParametersFormat)}" - ) - - if TYPE_CHECKING: from taskbroker_client.registry import TaskNamespace @@ -270,38 +254,18 @@ def create_activation( f"The `{key}` header value is of type {type(value)}" ) - parameters_format = _get_parameters_format() data = {"args": args, "kwargs": kwargs} - - msgpack_bytes = ( - msgpack.packb(data, use_bin_type=True) - if parameters_format in (ParametersFormat.BOTH, ParametersFormat.MSGPACK) - else b"" - ) - # JSON can't encode some values msgpack can (e.g. raw bytes). In - # JSON-only mode we surface the TypeError; in BOTH mode we silently - # skip the legacy field so msgpack-aware workers can still run. - json_bytes: bytes | None = None - if parameters_format in (ParametersFormat.BOTH, ParametersFormat.JSON): - try: - json_bytes = orjson.dumps(data) - except TypeError: - if parameters_format == ParametersFormat.JSON: - raise + msgpack_bytes = msgpack.packb(data, use_bin_type=True) should_compress = ( self.compression_type == CompressionType.ZSTD - or (len(msgpack_bytes) + len(json_bytes or b"")) - > MAX_PARAMETER_BYTES_BEFORE_COMPRESSION + or len(msgpack_bytes) > MAX_PARAMETER_BYTES_BEFORE_COMPRESSION ) if should_compress: headers["compression-type"] = CompressionType.ZSTD.value start_time = time.perf_counter() - parameters_bytes_val = zstd.compress(msgpack_bytes) if msgpack_bytes else b"" - parameters_str = ( - base64.b64encode(zstd.compress(json_bytes)).decode("utf8") if json_bytes else "" - ) + parameters_bytes_val = zstd.compress(msgpack_bytes) elapsed = time.perf_counter() - start_time metric_tags = { @@ -311,7 +275,7 @@ def create_activation( } self.namespace.metrics.distribution( "taskworker.producer.compressed_parameters_size", - len(parameters_bytes_val) or len(parameters_str), + len(parameters_bytes_val), tags=metric_tags, ) self.namespace.metrics.distribution( @@ -321,7 +285,6 @@ def create_activation( ) else: parameters_bytes_val = msgpack_bytes - parameters_str = json_bytes.decode("utf8") if json_bytes else "" return TaskActivation( id=uuid4().hex, @@ -329,7 +292,6 @@ def create_activation( namespace=self._namespace.name, taskname=self.name, headers=headers, - parameters=parameters_str, parameters_bytes=parameters_bytes_val, retry_state=self._create_retry_state(), received_at=received_at, diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index baf609d6..46896763 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -1,5 +1,6 @@ import hashlib import hmac +import json import logging import random import threading @@ -10,7 +11,6 @@ from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple, Union import grpc -import orjson from google.protobuf.message import Message from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( FetchNextTask, @@ -130,7 +130,7 @@ def parse_rpc_secret_list(rpc_secret: str | None) -> list[str] | None: return None # Try to parse the provided secret - parsed = orjson.loads(rpc_secret) + parsed = json.loads(rpc_secret) if not isinstance(parsed, list) or len(parsed) == 0: # If the secret string is not a list with at least one element, it is invalid diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index a63634e4..52b1941c 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -1,6 +1,5 @@ from __future__ import annotations -import base64 import contextlib import logging import queue @@ -15,7 +14,6 @@ # XXX: Don't import any modules that will import django here, do those within child_process import msgpack -import orjson import sentry_sdk import zstandard as zstd from arroyo.backends.abstract import ProducerFuture @@ -94,24 +92,10 @@ def load_parameters(activation: TaskActivation) -> dict[str, Any]: headers = dict(activation.headers) compression_type = headers.get("compression-type", None) - # Prefer new msgpack field - if activation.parameters_bytes: - data = activation.parameters_bytes - if compression_type == CompressionType.ZSTD.value: - data = zstd.decompress(data) - return msgpack.unpackb(data, raw=False) - - # Legacy JSON fallback - data_str = activation.parameters - if not compression_type or compression_type == CompressionType.PLAINTEXT.value: - return orjson.loads(data_str) - elif compression_type == CompressionType.ZSTD.value: - return orjson.loads(zstd.decompress(base64.b64decode(data_str))) - else: - logger.error( - "Unsupported compression type: %s. Continuing with plaintext.", compression_type - ) - return orjson.loads(data_str) + data = activation.parameters_bytes + if compression_type == CompressionType.ZSTD.value: + data = zstd.decompress(data) + return msgpack.unpackb(data, raw=False) def status_name(status: TaskActivationStatus.ValueType) -> str: diff --git a/clients/python/tests/test_app.py b/clients/python/tests/test_app.py index 48b42ec5..91eafe6c 100644 --- a/clients/python/tests/test_app.py +++ b/clients/python/tests/test_app.py @@ -1,3 +1,4 @@ +import msgpack import pytest from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation @@ -42,7 +43,7 @@ def test_should_attempt_at_most_once() -> None: id="111", taskname="examples.simple_task", namespace="examples", - parameters='{"args": [], "kwargs": {}}', + parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), processing_deadline_duration=2, ) at_most = StubAtMostOnce() diff --git a/clients/python/tests/test_registry.py b/clients/python/tests/test_registry.py index c0a661ab..ab21e7ec 100644 --- a/clients/python/tests/test_registry.py +++ b/clients/python/tests/test_registry.py @@ -1,9 +1,7 @@ -import base64 from concurrent.futures import Future from unittest.mock import Mock import msgpack -import orjson import pytest import zstandard as zstd from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( @@ -179,9 +177,7 @@ def simple_task_with_compression(param: str) -> None: actual_params = msgpack.unpackb(decompressed_data, raw=False) assert actual_params == expected_params - - legacy_decompressed = zstd.decompress(base64.b64decode(activation.parameters.encode("utf-8"))) - assert orjson.loads(legacy_decompressed) == expected_params + assert activation.parameters == "" def test_namespace_send_task_with_auto_compression() -> None: @@ -211,9 +207,7 @@ def simple_task_with_compression(param: str) -> None: actual_params = msgpack.unpackb(decompressed_data, raw=False) assert actual_params == expected_params - - legacy_decompressed = zstd.decompress(base64.b64decode(activation.parameters.encode("utf-8"))) - assert orjson.loads(legacy_decompressed) == expected_params + assert activation.parameters == "" def test_namespace_send_task_with_retry() -> None: diff --git a/clients/python/tests/test_task.py b/clients/python/tests/test_task.py index 2ea584d4..166cf7f2 100644 --- a/clients/python/tests/test_task.py +++ b/clients/python/tests/test_task.py @@ -5,7 +5,6 @@ from unittest.mock import patch import msgpack -import orjson import pytest import sentry_sdk from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( @@ -81,7 +80,7 @@ def test_func(*args: Any, **kwargs: Any) -> None: assert activation.expires == 10 expected_params = {"args": ["arg2"], "kwargs": {"org_id": 2}} assert msgpack.unpackb(activation.parameters_bytes, raw=False) == expected_params - assert orjson.loads(activation.parameters) == expected_params + assert activation.parameters == "" def test_apply_async_countdown(task_namespace: TaskNamespace) -> None: @@ -102,7 +101,7 @@ def test_func(*args: Any, **kwargs: Any) -> None: assert activation.delay == 600 expected_params = {"args": ["arg2"], "kwargs": {"org_id": 2}} assert msgpack.unpackb(activation.parameters_bytes, raw=False) == expected_params - assert orjson.loads(activation.parameters) == expected_params + assert activation.parameters == "" def test_delay_immediate_mode(task_namespace: TaskNamespace) -> None: @@ -274,8 +273,7 @@ def with_parameters(one: str, two: int, org_id: int) -> None: assert params["args"] == ["one", 22] assert params["kwargs"] == {"org_id": 99} - json_params = orjson.loads(activation.parameters) - assert json_params == params + assert activation.parameters == "" def test_create_activation_tracing(task_namespace: TaskNamespace) -> None: diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 65e565ed..2d35a903 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -1,4 +1,3 @@ -import base64 import contextlib import os import queue @@ -15,7 +14,6 @@ import grpc import msgpack -import orjson import pytest import zstandard as zstd from arroyo.backends.kafka import KafkaPayload @@ -166,44 +164,6 @@ ), ) -# Legacy fixture using the old JSON parameters field for backward compat testing -LEGACY_JSON_TASK = InflightTaskActivation( - host="localhost:50051", - receive_timestamp=0, - activation=TaskActivation( - id="legacy_json_123", - taskname="examples.simple_task", - namespace="examples", - parameters='{"args": ["legacy_arg"], "kwargs": {}}', - processing_deadline_duration=2, - ), -) - -# Legacy compressed fixture using base64+zstd in the old parameters field -LEGACY_COMPRESSED_TASK = InflightTaskActivation( - host="localhost:50051", - receive_timestamp=0, - activation=TaskActivation( - id="legacy_compressed_123", - taskname="examples.simple_task", - namespace="examples", - parameters=base64.b64encode( - zstd.compress( - orjson.dumps( - { - "args": ["test_arg1", "test_arg2"], - "kwargs": {"test_key": "test_value", "number": 42}, - } - ) - ) - ).decode("utf8"), - headers={ - "compression-type": CompressionType.ZSTD.value, - }, - processing_deadline_duration=2, - ), -) - # Task with Retry logic, expected exceptions to silence reporting RETRY_TASK_WITH_SILENCED_TIMEOUT = InflightTaskActivation( host="localhost:50051", @@ -825,7 +785,9 @@ def test_child_process_remove_start_time_kwargs() -> None: id="6789", taskname="examples.will_retry", namespace="examples", - parameters='{"args": ["stuff"], "kwargs": {"__start_time": 123}}', + parameters_bytes=msgpack.packb( + {"args": ["stuff"], "kwargs": {"__start_time": 123}}, use_bin_type=True + ), processing_deadline_duration=100000, ), ) @@ -886,7 +848,7 @@ def test_child_process_retry_task_max_attempts( id="6789", taskname="examples.will_retry", namespace="examples", - parameters='{"args": ["raise"], "kwargs": {}}', + parameters_bytes=msgpack.packb({"args": ["raise"], "kwargs": {}}, use_bin_type=True), processing_deadline_duration=100000, retry_state=RetryState( attempts=2, @@ -1104,7 +1066,7 @@ def test_child_process_terminate_task(mock_logger: mock.Mock) -> None: id="111", taskname="examples.timed", namespace="examples", - parameters='{"args": [3], "kwargs": {}}', + parameters_bytes=msgpack.packb({"args": [3], "kwargs": {}}, use_bin_type=True), processing_deadline_duration=1, ), ) @@ -1161,54 +1123,6 @@ def test_child_process_decompression(mock_capture_checkin: mock.MagicMock) -> No assert mock_capture_checkin.call_count == 0 -@mock.patch("sentry_sdk.crons.api.capture_checkin") -def test_child_process_legacy_json_parameters(mock_capture_checkin: mock.MagicMock) -> None: - """Test backward compat: worker can handle legacy JSON parameters field.""" - todo: queue.Queue[InflightTaskActivation] = queue.Queue() - processed: queue.Queue[ProcessingResult] = queue.Queue() - shutdown = Event() - - todo.put(LEGACY_JSON_TASK) - child_process( - "examples.app:app", - todo, - processed, - shutdown, - max_task_count=1, - processing_pool_name="test", - process_type="fork", - ) - - assert todo.empty() - result = processed.get() - assert result.task_id == LEGACY_JSON_TASK.activation.id - assert result.status == TASK_ACTIVATION_STATUS_COMPLETE - - -@mock.patch("sentry_sdk.crons.api.capture_checkin") -def test_child_process_legacy_compressed_parameters(mock_capture_checkin: mock.MagicMock) -> None: - """Test backward compat: worker can handle legacy base64+zstd compressed JSON parameters.""" - todo: queue.Queue[InflightTaskActivation] = queue.Queue() - processed: queue.Queue[ProcessingResult] = queue.Queue() - shutdown = Event() - - todo.put(LEGACY_COMPRESSED_TASK) - child_process( - "examples.app:app", - todo, - processed, - shutdown, - max_task_count=1, - processing_pool_name="test", - process_type="fork", - ) - - assert todo.empty() - result = processed.get() - assert result.task_id == LEGACY_COMPRESSED_TASK.activation.id - assert result.status == TASK_ACTIVATION_STATUS_COMPLETE - - def test_child_process_context_hooks() -> None: """Context hooks' on_execute is called with activation headers during task execution.""" executed_headers: list[dict[str, str]] = [] @@ -1234,7 +1148,7 @@ def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManag id="hook-test", taskname="examples.simple_task", namespace="examples", - parameters='{"args": [], "kwargs": {}}', + parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), headers={"x-viewer-org": "42", "x-viewer-user": "7"}, processing_deadline_duration=5, ), @@ -1628,7 +1542,7 @@ def deliver_sigterm() -> None: # id="failed-future-retry", # taskname="examples.will_retry", # namespace="examples", -# parameters=orjson.dumps({"args": ["noop"], "kwargs": {}}).decode("utf8"), +# parameters_bytes=msgpack.packb({"args": ["noop"], "kwargs": {}}, use_bin_type=True), # processing_deadline_duration=2, # retry_state=RetryState( # attempts=0, diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 50655858..11b3bf97 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -118,7 +118,6 @@ pub fn new( #[cfg(test)] mod tests { - use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -126,10 +125,9 @@ mod tests { use prost::Message as _; use rdkafka::Timestamp; use rdkafka::message::OwnedMessage; - use sentry_protos::taskbroker::v1::TaskActivation; use crate::store::activation::ActivationStatus; - use crate::test_utils::generate_unique_namespace; + use crate::test_utils::{TaskActivationBuilder, generate_unique_namespace}; use super::{Config, DeserializeActivationConfig, new}; @@ -140,25 +138,18 @@ mod tests { let now = Utc::now(); let the_past = now - Duration::from_secs(60 * 10); - #[allow(deprecated)] - let activation = TaskActivation { - id: "id_0".into(), - application: Some("sentry".to_string()), - namespace: generate_unique_namespace(), - taskname: "taskname".into(), - parameters: "{}".into(), - parameters_bytes: vec![], - headers: HashMap::new(), + let activation = TaskActivationBuilder::new() + .id("id_0") + .application("sentry") + .namespace(generate_unique_namespace()) + .taskname("taskname") // not used when the activation doesn't have expires. - received_at: Some(prost_types::Timestamp { + .received_at(prost_types::Timestamp { seconds: the_past.timestamp(), nanos: 0, - }), - retry_state: None, - processing_deadline_duration: 10, - expires: None, - delay: None, - }; + }) + .processing_deadline_duration(10) + .build(); let message = OwnedMessage::new( Some(activation.encode_to_vec()), None, @@ -185,25 +176,19 @@ mod tests { let now = Utc::now(); let the_past = now - Duration::from_secs(60 * 10); - #[allow(deprecated)] - let activation = TaskActivation { - id: "id_0".into(), - application: Some("sentry".to_string()), - namespace: generate_unique_namespace(), - taskname: "taskname".into(), - parameters: "{}".into(), - parameters_bytes: vec![], - headers: HashMap::new(), + let activation = TaskActivationBuilder::new() + .id("id_0") + .application("sentry") + .namespace(generate_unique_namespace()) + .taskname("taskname") // used because the activation has expires - received_at: Some(prost_types::Timestamp { + .received_at(prost_types::Timestamp { seconds: the_past.timestamp(), nanos: 0, - }), - retry_state: None, - processing_deadline_duration: 10, - expires: Some(100), - delay: None, - }; + }) + .processing_deadline_duration(10) + .expires(100) + .build(); let message = OwnedMessage::new( Some(activation.encode_to_vec()), None, @@ -231,25 +216,19 @@ mod tests { let now = Utc::now(); let the_past = now - Duration::from_secs(60 * 10); - #[allow(deprecated)] - let activation = TaskActivation { - id: "id_0".into(), - application: Some("sentry".to_string()), - namespace: generate_unique_namespace(), - taskname: "taskname".into(), - parameters: "{}".into(), - parameters_bytes: vec![], - headers: HashMap::new(), - // used because the activation has expires - received_at: Some(prost_types::Timestamp { + let activation = TaskActivationBuilder::new() + .id("id_0") + .application("sentry") + .namespace(generate_unique_namespace()) + .taskname("taskname") + // used because the activation has delay + .received_at(prost_types::Timestamp { seconds: the_past.timestamp(), nanos: 0, - }), - retry_state: None, - processing_deadline_duration: 10, - expires: None, - delay: Some(100), - }; + }) + .processing_deadline_duration(10) + .delay(100) + .build(); let message = OwnedMessage::new( Some(activation.encode_to_vec()), None, @@ -277,25 +256,19 @@ mod tests { let deserializer = new(DeserializeActivationConfig::from_config(&config)); let now = Utc::now(); - #[allow(deprecated)] - let activation = TaskActivation { - id: "id_0".into(), - application: Some("sentry".to_string()), - namespace: generate_unique_namespace(), - taskname: "taskname".into(), - parameters: "{}".into(), - parameters_bytes: vec![], - headers: HashMap::new(), + let activation = TaskActivationBuilder::new() + .id("id_0") + .application("sentry") + .namespace(generate_unique_namespace()) + .taskname("taskname") // used because the activation has delay - received_at: Some(prost_types::Timestamp { + .received_at(prost_types::Timestamp { seconds: now.timestamp(), nanos: 0, - }), - retry_state: None, - processing_deadline_duration: 10, - expires: None, - delay: Some(100), - }; + }) + .processing_deadline_duration(10) + .delay(100) + .build(); let message = OwnedMessage::new( Some(activation.encode_to_vec()), None, @@ -324,25 +297,19 @@ mod tests { let now = Utc::now(); let delay_sec = config.max_delayed_task_allowed_sec * 2; - #[allow(deprecated)] - let activation = TaskActivation { - id: "id_0".into(), - application: Some("sentry".to_string()), - namespace: generate_unique_namespace(), - taskname: "taskname".into(), - parameters: "{}".into(), - parameters_bytes: vec![], - headers: HashMap::new(), + let activation = TaskActivationBuilder::new() + .id("id_0") + .application("sentry") + .namespace(generate_unique_namespace()) + .taskname("taskname") // used because the activation has delay - received_at: Some(prost_types::Timestamp { + .received_at(prost_types::Timestamp { seconds: now.timestamp(), nanos: 0, - }), - retry_state: None, - processing_deadline_duration: 10, - expires: None, - delay: Some(delay_sec), - }; + }) + .processing_deadline_duration(10) + .delay(delay_sec) + .build(); let message = OwnedMessage::new( Some(activation.encode_to_vec()), None, diff --git a/src/test_utils.rs b/src/test_utils.rs index 48dda5ed..30ef3eba 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -23,13 +23,15 @@ use crate::store::adapters::postgres::{self, PostgresStore}; use crate::store::adapters::sqlite::SqliteStore; use crate::store::traits::ActivationStore; +/// msgpack encoding of an empty map (`{}`), used as default task parameters in tests. +pub const EMPTY_MSGPACK_MAP: &[u8] = &[0x80]; + /// Builder for `TaskActivation`. We cannot generate a builder automatically because `TaskActivation` is defined in `sentry-protos`. pub struct TaskActivationBuilder { pub id: Option, pub application: Option, pub namespace: Option, pub taskname: Option, - pub parameters: Option, pub parameters_bytes: Option>, pub headers: Option>, pub received_at: Option, @@ -46,7 +48,6 @@ impl TaskActivationBuilder { application: None, namespace: None, taskname: None, - parameters: None, parameters_bytes: None, headers: None, received_at: None, @@ -77,12 +78,6 @@ impl TaskActivationBuilder { self } - #[allow(deprecated)] - pub fn parameters>(mut self, parameters: T) -> Self { - self.parameters = Some(parameters.into()); - self - } - pub fn parameters_bytes(mut self, parameters_bytes: Vec) -> Self { self.parameters_bytes = Some(parameters_bytes); self @@ -125,8 +120,11 @@ impl TaskActivationBuilder { application: Some(self.application.expect("application is required")), namespace: self.namespace.expect("namespace is required"), taskname: self.taskname.expect("taskname is required"), - parameters: self.parameters.unwrap_or_else(|| "{}".to_string()), - parameters_bytes: self.parameters_bytes.unwrap_or_default(), + parameters: String::new(), + // Default to an empty msgpack map (`{}`) when no parameters are provided. + parameters_bytes: self + .parameters_bytes + .unwrap_or_else(|| EMPTY_MSGPACK_MAP.to_vec()), headers: self.headers.unwrap_or_default(), processing_deadline_duration: self.processing_deadline_duration.unwrap_or(0), received_at: self.received_at, diff --git a/src/upkeep.rs b/src/upkeep.rs index f5d8a012..fab354bc 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -743,10 +743,8 @@ mod tests { activation.received_at.unwrap().nanos as u32, ) .expect(""); - { - #![allow(deprecated)] - activation.parameters = r#"{"a":"b"}"#.into(); - } + // msgpack-encoded `{"a": "b"}` + activation.parameters_bytes = vec![0x81, 0xa1, b'a', 0xa1, b'b']; activation.delay = Some(30); records[0].status = ActivationStatus::Retry; records[0].delay_until = Some(Utc::now() + Duration::from_secs(30)); @@ -785,10 +783,10 @@ mod tests { let activation_to_check = TaskActivation::decode(&records[0].activation as &[u8]).unwrap(); assert_eq!(activation.taskname, activation_to_check.taskname); assert_eq!(activation.namespace, activation_to_check.namespace); - { - #![allow(deprecated)] - assert_eq!(activation.parameters, activation_to_check.parameters); - } + assert_eq!( + activation.parameters_bytes, + activation_to_check.parameters_bytes + ); // received_at should be set be later than the original activation assert!( activation.received_at.unwrap().seconds @@ -1121,10 +1119,10 @@ mod tests { let activation_to_check = TaskActivation::decode(&records[0].activation as &[u8]).unwrap(); assert_eq!(activation.id, activation_to_check.id); // DLQ should retain parameters of original task - { - #![allow(deprecated)] - assert_eq!(activation.parameters, activation_to_check.parameters); - } + assert_eq!( + activation.parameters_bytes, + activation_to_check.parameters_bytes + ); } #[tokio::test] diff --git a/uv.lock b/uv.lock index 7e1bd95d..9967edb3 100644 --- a/uv.lock +++ b/uv.lock @@ -775,7 +775,6 @@ dependencies = [ { name = "grpcio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpcio-health-checking", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "msgpack", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, - { name = "orjson", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "redis", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "redis-py-cluster", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -819,7 +818,6 @@ requires-dist = [ { name = "grpcio", specifier = ">=1.67.1" }, { name = "grpcio-health-checking", specifier = ">=1.67.1" }, { name = "msgpack", specifier = ">=1.0.0" }, - { name = "orjson", specifier = ">=3.10.10" }, { name = "protobuf", specifier = ">=5.28.3" }, { name = "redis", specifier = ">=3.4.1" }, { name = "redis-py-cluster", specifier = ">=2.1.0" },