Skip to content

Simplify ServiceBusConsumer supervisor: remove double-loop, fix version, revert logging scope creep#4884

Merged
marrobi merged 2 commits into
copilot/fix-4464from
copilot/fix-remove-receive-messages-loop
Mar 11, 2026
Merged

Simplify ServiceBusConsumer supervisor: remove double-loop, fix version, revert logging scope creep#4884
marrobi merged 2 commits into
copilot/fix-4464from
copilot/fix-remove-receive-messages-loop

Conversation

Copilot AI commented Mar 10, 2026

Copy link
Copy Markdown
Contributor

The initial service bus monitoring implementation had two nested restart mechanisms (_receive_messages_loop + supervisor_with_heartbeat_check), an incorrect version bump, unrelated logging format changes, and heartbeat updates placed at the wrong point in the polling loop.

What is being addressed

  • _receive_messages_loop + supervisor created redundant, overlapping restart logic — the inner loop's while True meant the supervisor's task-done detection rarely triggered
  • Version was bumped from 0.25.140.26.1, skipping 0.26.0 — this is a patch fix, not a minor release
  • WorkerIdFilter, LOG_FORMAT, and apply_worker_id_to_logger in services/logging.py were unrelated scope creep changing log format globally
  • update_heartbeat() was called at the top of each polling iteration, so a hang during message processing wouldn't be detected until after the next poll started

How is this addressed

  • Remove _receive_messages_loop: supervisor now creates tasks directly from self.receive_messages() and owns all backoff/restart logic. A task_just_started flag ensures backoff resets only after the task has survived a full heartbeat cycle — not immediately on creation (which would have silently broken exponential backoff):
# Backoff resets only when task was already running at start of iteration
elif not task_just_started:
    self._restart_delay = RESTART_DELAY_SECONDS
  • Fix version: 0.26.10.25.15
  • Revert services/logging.py to main branch state — removes import os, LOG_FORMAT, WorkerIdFilter, apply_worker_id_to_logger, and the handler/filter setup in initialize_logging()
  • Move update_heartbeat() to the end of the try block in both consumers, so heartbeat reflects a completed polling cycle rather than a started one
  • Simplify tests: removed all _receive_messages_loop tests, backoff is now exercised through the supervisor, consolidated duplicates between the two test files
Original prompt

Apply the following review changes to the existing code on the copilot/fix-4464 branch:

1. Remove _receive_messages_loop — simplify to single supervisor layer

In api_app/service_bus/service_bus_consumer.py, remove the _receive_messages_loop() method entirely. The supervisor (supervisor_with_heartbeat_check) should create tasks directly from self.receive_messages() instead of from self._receive_messages_loop(). The supervisor itself handles backoff on task failure. Here's the simplified approach:

import asyncio
import time

from services.logging import logger

# Configuration constants for monitoring intervals
HEARTBEAT_CHECK_INTERVAL_SECONDS = 60
HEARTBEAT_STALENESS_THRESHOLD_SECONDS = 300
RESTART_DELAY_SECONDS = 5
MAX_RESTART_DELAY_SECONDS = 300
SUPERVISOR_ERROR_DELAY_SECONDS = 30


class ServiceBusConsumer:

    def __init__(self, consumer_name: str):
        self.service_name = consumer_name.replace('_', ' ').title()
        self._last_heartbeat: float = time.monotonic()
        self._restart_delay: float = RESTART_DELAY_SECONDS
        logger.info(f"Initializing {self.service_name}")

    def update_heartbeat(self):
        self._last_heartbeat = time.monotonic()

    def check_heartbeat(self, max_age_seconds: int = HEARTBEAT_STALENESS_THRESHOLD_SECONDS) -> bool:
        age = time.monotonic() - self._last_heartbeat
        if age > max_age_seconds:
            logger.warning(f"{self.service_name} heartbeat is {age:.1f}s old (threshold: {max_age_seconds}s)")
            return False
        return True

    async def supervisor_with_heartbeat_check(self):
        task = None
        try:
            while True:
                try:
                    if task is None or task.done():
                        if task and task.done():
                            try:
                                await task
                            except Exception as e:
                                logger.exception(f"{self.service_name} task failed: {e}")
                            await asyncio.sleep(self._restart_delay)
                            self._restart_delay = min(self._restart_delay * 2, MAX_RESTART_DELAY_SECONDS)

                        logger.info(f"Starting {self.service_name} task...")
                        task = asyncio.create_task(self.receive_messages())
                        self.update_heartbeat()
                        self._restart_delay = RESTART_DELAY_SECONDS

                    await asyncio.sleep(HEARTBEAT_CHECK_INTERVAL_SECONDS)

                    if not self.check_heartbeat():
                        logger.warning(f"{self.service_name} heartbeat stale, restarting...")
                        task.cancel()
                        try:
                            await task
                        except asyncio.CancelledError:
                            pass
                        task = None
                except Exception as e:
                    logger.exception(f"{self.service_name} supervisor error: {e}")
                    await asyncio.sleep(SUPERVISOR_ERROR_DELAY_SECONDS)
        finally:
            if task and not task.done():
                logger.info(f"Cleaning up {self.service_name} task...")
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass

    async def receive_messages(self):
        raise NotImplementedError("Subclasses must implement receive_messages()")

2. Fix the version number

In api_app/_version.py, change the version from "0.26.1" to "0.25.15". The original version was 0.25.14 and this is a bug fix, so it should be a patch increment.

3. Revert logging format changes (scope creep)

In api_app/services/logging.py, revert ALL changes introduced by this PR. Specifically:

  • Remove the import os addition
  • Remove the LOG_FORMAT constant
  • Remove the WorkerIdFilter class
  • Remove the apply_worker_id_to_logger function
  • Revert initialize_logging() to its original form — remove the log_format=LOG_FORMAT parameter from LoggingInstrumentor().instrument(), remove the "Set up a handler if none exists" block, and remove the "Apply worker ID filter" call.

The file should be restored to its state on the main branch. The logging format changes are unrelated to the service bus monitoring fix.

4. Move update_heartbeat() to end of loop iteration in both consumers

In api_app/service_bus/deployment_status_updater.py and api_app/service_bus/airlock_request_status_update.py:

  • Move the self.update_heartbeat() call from the top of the try block inside the while loop to the bottom of the try block (end of each successful iteration), so that heartbeat reflects "I just completed a polling cycle" rather than "I just started one". This ensures hangs during message processing are detected.

5. Simplify tests to match simplified supervisor

Update `api_app/tests_ma/test_servic...

This pull request was created from Copilot chat.


💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more Copilot coding agent tips in the docs.

…ert logging, move heartbeat, simplify tests

Co-authored-by: marrobi <17089773+marrobi@users.noreply.github.com>
Copilot AI changed the title [WIP] Remove _receive_messages_loop method and simplify supervision Simplify ServiceBusConsumer supervisor: remove double-loop, fix version, revert logging scope creep Mar 10, 2026
@marrobi marrobi marked this pull request as ready for review March 11, 2026 11:55
@marrobi marrobi requested a review from a team as a code owner March 11, 2026 11:55
@marrobi marrobi merged commit de60163 into copilot/fix-4464 Mar 11, 2026
1 check passed
@marrobi marrobi deleted the copilot/fix-remove-receive-messages-loop branch March 11, 2026 11:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants