From 4a70aeade81114d4f8596775bb19e6268a9752fc Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 6 Dec 2024 13:20:30 -0500 Subject: [PATCH 1/2] Ensure default concurrency level does not generate deadlock --- .../sources/declarative/concurrent_declarative_source.py | 7 ++++--- .../sources/declarative/declarative_component_schema.yaml | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index a393bbeb2..050cb6dd7 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -56,8 +56,9 @@ class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): - # By default, we defer to a value of 1 which represents running a connector using the Concurrent CDK engine on only one thread. - SINGLE_THREADED_CONCURRENCY_LEVEL = 1 + # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock + # because it has hit the limit of futures but not partition reader is consuming them. + SINGLE_THREADED_CONCURRENCY_LEVEL = 2 def __init__( self, @@ -108,7 +109,7 @@ def __init__( ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up else: concurrency_level = self.SINGLE_THREADED_CONCURRENCY_LEVEL - initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL + initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL // 2 self._concurrent_source = ConcurrentSource.create( num_workers=concurrency_level, diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index dff7abab6..48c8f8c05 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -327,7 +327,7 @@ definitions: additionalProperties: true ConcurrencyLevel: title: Concurrency Level - description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time. + description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time. Note that a value of 1 could create deadlock if a stream has a very high number of partitions. type: object required: - default_concurrency From 153b25bfb4ad8abd03389638c724ef6582b3bfd3 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 6 Dec 2024 13:24:31 -0500 Subject: [PATCH 2/2] rename variable to align with new meaning --- .../sources/declarative/concurrent_declarative_source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 050cb6dd7..aa3cea705 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -58,7 +58,7 @@ class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock # because it has hit the limit of futures but not partition reader is consuming them. - SINGLE_THREADED_CONCURRENCY_LEVEL = 2 + _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 def __init__( self, @@ -108,8 +108,8 @@ def __init__( concurrency_level // 2, 1 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up else: - concurrency_level = self.SINGLE_THREADED_CONCURRENCY_LEVEL - initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL // 2 + concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL + initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 self._concurrent_source = ConcurrentSource.create( num_workers=concurrency_level,