Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions sentry_sdk/integrations/huey.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
except ImportError:
raise DidNotEnable("Huey is not installed")

try:
from huey.api import chord as HueyChord
from huey.api import group as HueyGroup
except ImportError:
HueyChord = None
HueyGroup = None


HUEY_CONTROL_FLOW_EXCEPTIONS = (CancelExecution, RetryTask, TaskLockedException)

Expand All @@ -53,22 +60,37 @@ def patch_enqueue() -> None:

@ensure_integration_enabled(HueyIntegration, old_enqueue)
def _sentry_enqueue(
self: "Huey", task: "Task"
self: "Huey", item: "Union[Task, HueyGroup, HueyChord]"
) -> "Optional[Union[Result, ResultGroup]]":
if HueyChord is not None and isinstance(item, HueyChord):
span_name = "Huey Chord"
elif HueyGroup is not None and isinstance(item, HueyGroup):
span_name = "Huey Task Group"
else:
span_name = item.name

with sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_HUEY,
name=task.name,
name=span_name,
origin=HueyIntegration.origin,
):
if not isinstance(task, PeriodicTask):
if (
not isinstance(item, PeriodicTask)
and not (HueyGroup is not None and isinstance(item, HueyGroup))
and not (HueyChord is not None and isinstance(item, HueyChord))
):
# Attach trace propagation data to task kwargs. We do
# not do this for periodic tasks, as these don't
# really have an originating transaction.
task.kwargs["sentry_headers"] = {
# Additionally, we do not do this for Huey groups or chords, as enqueue will
# recursively call this method for each task within the list, resulting
# in the trace propagation data being attached to each task individually
# (which we want)
item.kwargs["sentry_headers"] = {
BAGGAGE_HEADER_NAME: get_baggage(),
SENTRY_TRACE_HEADER_NAME: get_traceparent(),
}
return old_enqueue(self, task)
return old_enqueue(self, item)
Comment thread
ericapisani marked this conversation as resolved.

Huey.enqueue = _sentry_enqueue

Expand Down
117 changes: 117 additions & 0 deletions tests/integrations/huey/test_huey.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@

HUEY_VERSION = parse_version(HUEY_VERSION)

try:
from huey.api import chord, group
except ImportError:
chord = None
group = None


@pytest.fixture
def init_huey(sentry_init):
Expand Down Expand Up @@ -222,3 +228,114 @@ def propagated_trace_task():
(event,) = events

assert event["contexts"]["trace"]["origin"] == "auto.queue.huey"


@pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="group was added in 3.0")
def test_huey_enqueue_group(init_huey, capture_events):
huey = init_huey()

events = capture_events()

@huey.task()
def task1():
pass

@huey.task()
def task2():
pass

with start_transaction() as transaction:
huey.enqueue(group([task1.s(), task2.s()]))

for _ in range(2):
task = huey.dequeue()
huey.execute(task)

assert len(events) == 3

# Assert enqueue spans were successfully recorded
producer_event = events[0]
assert producer_event["type"] == "transaction"
assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert producer_event["contexts"]["trace"]["origin"] == "manual"

spans = producer_event["spans"]
assert len(spans) == 3
assert spans[0]["op"] == "queue.submit.huey"
assert spans[0]["description"] == "Huey Task Group"
assert spans[1]["op"] == "queue.submit.huey"
assert spans[1]["description"] == "task1"
assert spans[2]["op"] == "queue.submit.huey"
assert spans[2]["description"] == "task2"

# Consumer transaction assertions (one per task)
consumer_events = events[1:]
for _, (consumer_event, expected_name) in enumerate(
zip(consumer_events, ["task1", "task2"])
):
assert consumer_event["type"] == "transaction"
assert consumer_event["transaction"] == expected_name
assert consumer_event["transaction_info"] == {"source": "task"}
assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey"
assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey"
assert consumer_event["contexts"]["trace"]["status"] == "ok"
assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert "huey_task_id" in consumer_event["tags"]
assert consumer_event["tags"]["huey_task_retry"] is False


@pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="chord was added in 3.0")
def test_huey_enqueue_chord(init_huey, capture_events):
huey = init_huey()

events = capture_events()

@huey.task()
def task1():
pass

@huey.task()
def task2(results):
pass

with start_transaction() as transaction:
huey.enqueue(chord([task1.s()], task2.s()))

for _ in range(2):
task = huey.dequeue()
huey.execute(task)

assert len(events) == 3

# Enqueue spans
producer_event = events[0]
assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert producer_event["contexts"]["trace"]["origin"] == "manual"

spans = producer_event["spans"]
assert len(spans) == 2
assert spans[0]["op"] == "queue.submit.huey"
assert spans[0]["description"] == "Huey Chord"
assert spans[1]["op"] == "queue.submit.huey"
assert spans[1]["description"] == "task1"

task1_event = events[1]
# Confirm the first task enqueued the chord callback
task1_spans = task1_event["spans"]
assert len(task1_spans) == 1
assert task1_spans[0]["op"] == "queue.submit.huey"
assert task1_spans[0]["description"] == "task2"

consumer_events = events[1:]
for _, (consumer_event, expected_name) in enumerate(
zip(consumer_events, ["task1", "task2"])
):
assert consumer_event["type"] == "transaction"
assert consumer_event["transaction"] == expected_name
assert consumer_event["transaction_info"] == {"source": "task"}
assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey"
assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey"
assert consumer_event["contexts"]["trace"]["status"] == "ok"
assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert "huey_task_id" in consumer_event["tags"]
assert consumer_event["tags"]["huey_task_retry"] is False
Loading