-
Notifications
You must be signed in to change notification settings - Fork 417
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(celery): stop closing prerun_span too soon to account for Celery …
…chains scenario (#11498) We've made a few changes to handle celery context recently, including: #10676 In particular the goal of #10676 was to handle a scenario where a long running task may run into an exception, preventing it from closing. Unfortunately, this scenario did not account for cases where tasks are chained and may not close until later. See: #11479 and #11624 With this PR, the sample app in #11479 would attach the celery specific span back to the root span. I also need to add tests for the chains scenario. Related to AIDM-494 ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) (cherry picked from commit e8aab65)
- Loading branch information
1 parent
7a275e6
commit 4f6a19b
Showing
6 changed files
with
85 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
4 changes: 4 additions & 0 deletions
4
releasenotes/notes/fix-celery-closed-spans-34ff43868c1e33b8.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
--- | ||
fixes: | ||
- | | ||
tracing(celery): Fixes an issue where ``celery.apply`` spans from Celery prerun got closed too soon leading to span tags being missing. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from tasks import fn_a | ||
from tasks import fn_b | ||
|
||
|
||
(fn_a.si() | fn_b.si()).delay() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from celery import Celery | ||
|
||
|
||
app = Celery("tasks") | ||
|
||
|
||
@app.task(name="tests.contrib.celery.tasks.fn_a") | ||
def fn_a(): | ||
return "a" | ||
|
||
|
||
@app.task(name="tests.contrib.celery.tasks.fn_b") | ||
def fn_b(): | ||
return "b" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import os | ||
import re | ||
import subprocess | ||
import time | ||
|
||
from celery import Celery | ||
|
||
|
||
# Ensure that when we call Celery chains, the root span has celery specific span tags | ||
# The test_integration.py setup doesn't perfectly mimic the condition of a worker process running. | ||
# This test runs the worker as a side so we can check the tracer logs afterwards to ensure expected span results. | ||
# See https://github.com/DataDog/dd-trace-py/issues/11479 | ||
def test_task_chain_task_call_task(): | ||
app = Celery("tasks") | ||
|
||
celery_worker_cmd = "ddtrace-run celery -A tasks worker -c 1 -l DEBUG -n uniquename1 -P solo" | ||
celery_task_runner_cmd = "ddtrace-run python run_tasks.py" | ||
|
||
# The commands need to run from the directory where this test file lives | ||
current_directory = str(os.path.dirname(__file__)) | ||
|
||
worker_process = subprocess.Popen( | ||
celery_worker_cmd.split(), | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
preexec_fn=os.setsid, | ||
close_fds=True, | ||
cwd=current_directory, | ||
) | ||
|
||
max_wait_time = 10 | ||
waited_so_far = 0 | ||
# {app.control.inspect().active() returns {'celery@uniquename1': []} when the worker is running} | ||
while app.control.inspect().active() is None and waited_so_far < max_wait_time: | ||
time.sleep(1) | ||
waited_so_far += 1 | ||
|
||
# The task should only run after the Celery worker has sufficient time to start up | ||
task_runner_process = subprocess.Popen( | ||
celery_task_runner_cmd.split(), | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
preexec_fn=os.setsid, | ||
close_fds=True, | ||
cwd=current_directory, | ||
) | ||
|
||
task_runner_process.wait() | ||
# Kill the process so it starts to send traces to the Trace Agent | ||
worker_process.kill() | ||
worker_logs = worker_process.stderr.read() | ||
|
||
# Check that the root span was created with one of the Celery specific tags, such as celery.correlation_id | ||
# Some versions of python seem to require escaping when using `re.search`: | ||
old_pattern_match = r"resource=\\'tests.contrib.celery.tasks.fn_a\\' type=\\'worker\\' .* tags=.*correlation_id.*" | ||
new_pattern_match = r"resource=\'tests.contrib.celery.tasks.fn_a\' type=\'worker\' .* tags=.*correlation_id.*" | ||
|
||
pattern_exists = ( | ||
re.search(old_pattern_match, str(worker_logs)) is not None | ||
or re.search(new_pattern_match, str(worker_logs)) is not None | ||
) | ||
assert pattern_exists is not None |