diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index a393bbeb2..aa3cea705 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -56,8 +56,9 @@ class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): - # By default, we defer to a value of 1 which represents running a connector using the Concurrent CDK engine on only one thread. - SINGLE_THREADED_CONCURRENCY_LEVEL = 1 + # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock + # because it has hit the limit of futures but not partition reader is consuming them. + _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 def __init__( self, @@ -107,8 +108,8 @@ def __init__( concurrency_level // 2, 1 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up else: - concurrency_level = self.SINGLE_THREADED_CONCURRENCY_LEVEL - initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL + concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL + initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 self._concurrent_source = ConcurrentSource.create( num_workers=concurrency_level, diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index dff7abab6..48c8f8c05 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -327,7 +327,7 @@ definitions: additionalProperties: true ConcurrencyLevel: title: Concurrency Level - description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time. + description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time. Note that a value of 1 could create deadlock if a stream has a very high number of partitions. type: object required: - default_concurrency