This repository has been archived by the owner on May 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 161
/
test_contextvars.py
56 lines (41 loc) · 1.67 KB
/
test_contextvars.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from __future__ import print_function
import random
import asyncio
from opentracing.mocktracer import MockTracer
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
from ..testcase import OpenTracingTestCase
from ..utils import get_logger, stop_loop_when
random.seed()
logger = get_logger(__name__)
class TestAsyncioContextVars(OpenTracingTestCase):
def setUp(self):
self.tracer = MockTracer(ContextVarsScopeManager())
self.loop = asyncio.get_event_loop()
def test_main(self):
# Need to run within a Task, as the scope manager depends
# on Task.current_task()
async def main_task():
with self.tracer.start_active_span('parent'):
tasks = self.submit_callbacks()
await asyncio.gather(*tasks)
self.loop.create_task(main_task())
stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) >= 4)
self.loop.run_forever()
spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 4)
self.assertNamesEqual(spans, ['task', 'task', 'task', 'parent'])
for i in range(3):
self.assertSameTrace(spans[i], spans[-1])
self.assertIsChildOf(spans[i], spans[-1])
async def task(self, interval):
logger.info('Starting task')
with self.tracer.start_active_span('task'):
await asyncio.sleep(interval)
def submit_callbacks(self):
tasks = []
for i in range(3):
interval = 0.1 + random.randint(200, 500) * 0.001
t = self.loop.create_task(self.task(interval))
tasks.append(t)
return tasks