Skip to content

AssertionError on running tasks in parallel #7656

@deepanshu-zluri

Description

@deepanshu-zluri

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

  1. Having a main flow which uses sequential task runner, calling a subflow which just calls the .map function on the task trying to create parallel task runs.
  2. few of those tasks crash , giving AssertionError

Screenshot 2022-11-25 at 10 57 50 AM

Reproduction

from prefect import task, flow
from prefect import get_run_logger
import time
from prefect.task_runners import SequentialTaskRunner


@task(name='run_scheduler')
def run_scheduler(event):
    scheduler_output = [i for i in range(1, 300)]
    time.sleep(150)  # Adding sleep time to make task bit long running
    return scheduler_output


@task(name='run_executor', tags=['spends_executor'])
def run_executor(scheduler_output):
    time.sleep(150)  # Adding sleep time to make task bit long running
    executor_output = f"printing just the executor input {scheduler_output}"
    return executor_output


@flow(task_runner=SequentialTaskRunner())
def spends_flow(event):
    logger = get_run_logger()
    logger.info(event)

    # Calling Scheduler task
    scheduler_output = run_scheduler(event)

    # if the output is empty array just log it else calling subflow which creates parallel task executions

    if len(scheduler_output) < 1:
        logger.info("no elements in array")
    else:
        spends_executor(scheduler_output)
    logger.info('flow completed')


@flow
def spends_executor(scheduler_output):
    run_executor.map(scheduler_output)


if __name__ == "__main__":
    event = "test"
    spends_flow(event)

Error

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1247, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "flows/test_calculate_spends_flows/test_spends_flow.py", line 16, in run_executor
    output = run_deployment(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 197, in coroutine_wrapper
    return run_async_from_worker_thread(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 131, in run_deployment
    flow_run = await client.read_flow_run(flow_run_id)
  File "/usr/local/lib/python3.9/site-packages/prefect/client/orion.py", line 1443, in read_flow_run
    response = await self._client.get(f"/flow_runs/{flow_run_id}")
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1757, in get
    return await self.request(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.9/site-packages/prefect/client/base.py", line 160, in send
    await super().send(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 221, in handle_async_request
    await self._attempt_to_acquire_connection(status)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 160, in _attempt_to_acquire_connection
    status.set_connection(connection)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 22, in set_connection
    assert self.connection is None
AssertionError

Versions

agent
Version:             2.6.6
API version:         0.8.3
Python version:      3.9.15
Git commit:          87767cda
Built:               Thu, Nov 3, 2022 1:15 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

self hosted orion server
Version:             2.6.0
API version:         0.8.2
Python version:      3.9.14
Git commit:          96f09a51
Built:               Thu, Oct 13, 2022 3:21 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingupstream dependencyAn upstream issue caused by a bug in one of our dependencies

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions