|
13 | 13 |
|
14 | 14 | ENABLE_LOGGING = False # Set True for test debugging and development |
15 | 15 | CHILD_TIMEOUT_SEC = 20 |
16 | | -NBYTES = 1024*1024*1024 |
17 | | -POOL_SIZE = NBYTES |
18 | | -NCOPIES = 500 |
| 16 | +NBYTES = 64 |
19 | 17 |
|
20 | | -class TestIpcEvent: |
21 | | - """Check the basic usage of IPC-enabled events.""" |
22 | | - |
23 | | - def test_main(self, ipc_device): |
24 | | - log = TimestampedLogger(prefix="parent: ", enabled=ENABLE_LOGGING) |
25 | | - device = ipc_device |
26 | | - stream1 = device.create_stream() |
27 | | - mr_options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) |
28 | | - mr = DeviceMemoryResource(device, options=mr_options) |
29 | | - |
30 | | - # Start the child process. |
31 | | - q_out, q_in = [mp.Queue() for _ in range(2)] |
32 | | - process = mp.Process(target=self.child_main, args=(log, q_out, q_in)) |
33 | | - process.start() |
34 | | - |
35 | | - # Prepare scratch buffers. |
36 | | - target = make_scratch_buffer(device, 0, NBYTES) |
37 | | - ones = make_scratch_buffer(device, 1, NBYTES) |
38 | | - twos = make_scratch_buffer(device, 2, NBYTES) |
39 | | - |
40 | | - # Allocate the buffer and send it to the child. |
41 | | - buffer = mr.allocate(NBYTES, stream=stream1) |
42 | | - log("sending buffer") |
43 | | - q_out.put(buffer) |
44 | | - |
45 | | - e_copy_start, e_copy_end = [device.create_event({"enable_timing": True}) for _ in range(2)] |
46 | | - |
47 | | - # Stream 1: |
48 | | - stream1.record(e_copy_start) |
49 | | - log("begin enqueuing copies on stream1") |
50 | | - for i in range(NCOPIES): |
51 | | - buffer.copy_from(ones, stream=stream1) |
52 | | - if i % 100 == 0: |
53 | | - log(f"{i:>3}/{NCOPIES}") |
54 | | - stream1.record(e_copy_end) |
55 | | - log("done enqueuing copies") |
56 | | - |
57 | | - ipc_event_options = EventOptions(ipc_enabled=True) |
58 | | - e = stream1.record(options=ipc_event_options) |
59 | | - log(f"recorded event ({hex(e.handle)})") |
60 | | - q_out.put(e) |
61 | | - log("sent event") |
62 | | - |
63 | | - # Wait on the child. |
64 | | - log("waiting for copies") |
65 | | - e_copy_end.sync() |
66 | | - parent_done_copying_timestamp = time.time_ns() |
67 | | - log("done copying") |
68 | | - process.join() |
69 | | - assert process.exitcode == 0 |
70 | | - log("done") |
71 | | - |
72 | | - # Finish up. |
73 | | - target.copy_from(buffer, stream=stream1) |
74 | | - stream1.sync() |
75 | | - assert compare_equal_buffers(target, twos) |
76 | | - elapsed_ms = e_copy_end - e_copy_start |
77 | | - log(f"Elapsed time for {NCOPIES} copies: {int(elapsed_ms)} ms") |
78 | | - |
79 | | - # Make sure the child finished enqueuing its work before the copies finished; |
80 | | - # otherwise the test has no meaning. If this trips, adjust NCOPIES and/or |
81 | | - # NBYTES. |
82 | | - child_done_enqueuing_timestamp = q_in.get(timeout=CHILD_TIMEOUT_SEC) |
83 | | - assert child_done_enqueuing_timestamp < parent_done_copying_timestamp |
84 | | - |
85 | | - |
86 | | - def child_main(self, log, q_in, q_out): |
87 | | - log.prefix = " child: " |
88 | | - log("ready") |
89 | | - device = Device() |
90 | | - device.set_current() |
91 | | - stream2 = device.create_stream() |
92 | | - twos = make_scratch_buffer(device, 2, NBYTES) |
93 | | - buffer = q_in.get(timeout=CHILD_TIMEOUT_SEC) |
94 | | - log("got buffer") |
95 | | - e = q_in.get(timeout=CHILD_TIMEOUT_SEC) |
96 | | - log(f"got event ({hex(e.handle)})") |
97 | | - stream2.wait(e) |
98 | | - log("enqueuing copy on stream2") |
99 | | - buffer.copy_from(twos, stream=stream2) |
100 | | - q_out.put(time.time_ns()) # Time when enqueuing finished |
101 | | - log("waiting") |
102 | | - stream2.sync() |
103 | | - log("done") |
104 | | - |
105 | | -class TestIpcEventWithLatch: |
| 18 | +class TestEventIpc: |
106 | 19 | """Check the basic usage of IPC-enabled events with a latch kernel.""" |
107 | 20 |
|
108 | 21 | @skipif_need_cuda_headers # libcu++ |
109 | | - def test_main(self, ipc_device): |
| 22 | + def test_main(self, ipc_device, ipc_memory_resource): |
110 | 23 | log = TimestampedLogger(prefix="parent: ", enabled=ENABLE_LOGGING) |
111 | 24 | device = ipc_device |
| 25 | + mr = ipc_memory_resource |
112 | 26 | stream1 = device.create_stream() |
113 | | - mr_options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) |
114 | | - mr = DeviceMemoryResource(device, options=mr_options) |
115 | 27 |
|
116 | 28 | # Start the child process. |
117 | 29 | q_out, q_in = [mp.Queue() for _ in range(2)] |
|
0 commit comments