-
Notifications
You must be signed in to change notification settings - Fork 13
Add YMQ Python Tests #322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Add YMQ Python Tests #322
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
fb23fc6
Add YMQ Python Tests
magniloquency e904792
Fix no_address test
magniloquency a47f50e
Remove interrupted exception test
magniloquency a3b5d4f
Fix typing, lint
magniloquency bb81105
Merge branch 'main' into ymq-pymod-tests-3
magniloquency 584fb01
Merge branch 'main' into ymq-pymod-tests-3
magniloquency 931a9a6
Fix linter
magniloquency 7b1f497
Merge branch 'main' into ymq-pymod-tests-3
magniloquency a5933e2
Merge branch 'main' into ymq-pymod-tests-3
magniloquency f0448bb
Merge branch 'main' into ymq-pymod-tests-3
magniloquency 442b090
refactor addresses
magniloquency f385550
Merge branch 'main' into ymq-pymod-tests-3
magniloquency 7ca26e2
Merge branch 'main' into ymq-pymod-tests-3
magniloquency b83529d
Fix GIL
magniloquency 9ae0a6a
Apply gxu's patch
magniloquency b5ee3e3
Merge branch 'main' into ymq-pymod-tests-3
magniloquency 0c8aed2
delete assert
magniloquency 897c5a5
Merge branch 'ymq-pymod-tests-3' of https://github.com/magniloquency/…
magniloquency 40846e8
Merge branch 'main' into ymq-pymod-tests-3
magniloquency 885aa3f
Merge branch 'main' into ymq-pymod-tests-3
magniloquency 9668062
remove print include
magniloquency d515c6d
remove cassert include
magniloquency d457b03
Merge branch 'main' into ymq-pymod-tests-3
magniloquency 670e34b
Merge branch 'main' into ymq-pymod-tests-3
magniloquency File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| import asyncio | ||
| import unittest | ||
| from scaler.io.ymq import ymq | ||
| from scaler.io.utility import serialize, deserialize | ||
| from scaler.protocol.python.message import TaskCancel | ||
| from scaler.utility.identifiers import TaskID | ||
|
|
||
|
|
||
| class TestPymodYMQ(unittest.IsolatedAsyncioTestCase): | ||
| async def test_basic(self): | ||
| ctx = ymq.IOContext() | ||
| binder = await ctx.createIOSocket("binder", ymq.IOSocketType.Binder) | ||
| self.assertEqual(binder.identity, "binder") | ||
| self.assertEqual(binder.socket_type, ymq.IOSocketType.Binder) | ||
|
|
||
| connector = await ctx.createIOSocket("connector", ymq.IOSocketType.Connector) | ||
| self.assertEqual(connector.identity, "connector") | ||
| self.assertEqual(connector.socket_type, ymq.IOSocketType.Connector) | ||
|
|
||
| address = "tcp://127.0.0.1:35793" | ||
| await binder.bind(address) | ||
| await connector.connect(address) | ||
|
|
||
| await connector.send(ymq.Message(address=None, payload=b"payload")) | ||
| msg = await binder.recv() | ||
|
|
||
| assert msg.address is not None | ||
| self.assertEqual(msg.address.data, b"connector") | ||
| self.assertEqual(msg.payload.data, b"payload") | ||
|
|
||
| async def test_no_address(self): | ||
| ctx = ymq.IOContext() | ||
| binder = await ctx.createIOSocket("binder", ymq.IOSocketType.Binder) | ||
| connector = await ctx.createIOSocket("connector", ymq.IOSocketType.Connector) | ||
|
|
||
| address = "tcp://127.0.0.1:35794" | ||
| await binder.bind(address) | ||
| await connector.connect(address) | ||
|
|
||
| with self.assertRaises(ymq.YMQException) as exc: | ||
| await binder.send(ymq.Message(address=None, payload=b"payload")) | ||
| self.assertEqual(exc.exception.code, ymq.ErrorCode.BinderSendMessageWithNoAddress) | ||
|
|
||
| async def test_routing(self): | ||
| ctx = ymq.IOContext() | ||
| binder = await ctx.createIOSocket("binder", ymq.IOSocketType.Binder) | ||
| connector1 = await ctx.createIOSocket("connector1", ymq.IOSocketType.Connector) | ||
| connector2 = await ctx.createIOSocket("connector2", ymq.IOSocketType.Connector) | ||
|
|
||
| address = "tcp://127.0.0.1:35795" | ||
| await binder.bind(address) | ||
| await connector1.connect(address) | ||
| await connector2.connect(address) | ||
|
|
||
| await binder.send(ymq.Message(b"connector2", b"2")) | ||
| await binder.send(ymq.Message(b"connector1", b"1")) | ||
|
|
||
| msg1 = await connector1.recv() | ||
| self.assertEqual(msg1.payload.data, b"1") | ||
|
|
||
| msg2 = await connector2.recv() | ||
| self.assertEqual(msg2.payload.data, b"2") | ||
|
|
||
| async def test_pingpong(self): | ||
| ctx = ymq.IOContext() | ||
| binder = await ctx.createIOSocket("binder", ymq.IOSocketType.Binder) | ||
| connector = await ctx.createIOSocket("connector", ymq.IOSocketType.Connector) | ||
|
|
||
| address = "tcp://127.0.0.1:35791" | ||
| await binder.bind(address) | ||
| await connector.connect(address) | ||
|
|
||
| async def binder_routine(binder: ymq.IOSocket, limit: int) -> bool: | ||
| i = 0 | ||
| while i < limit: | ||
| await binder.send(ymq.Message(address=b"connector", payload=f"{i}".encode())) | ||
| msg = await binder.recv() | ||
| assert msg.payload.data is not None | ||
|
|
||
| recv_i = int(msg.payload.data.decode()) | ||
| if recv_i - i > 1: | ||
| return False | ||
| i = recv_i + 1 | ||
| return True | ||
|
|
||
| async def connector_routine(connector: ymq.IOSocket, limit: int) -> bool: | ||
| i = 0 | ||
| while True: | ||
| msg = await connector.recv() | ||
| assert msg.payload.data is not None | ||
| recv_i = int(msg.payload.data.decode()) | ||
| if recv_i - i > 1: | ||
| return False | ||
| i = recv_i + 1 | ||
| await connector.send(ymq.Message(address=None, payload=f"{i}".encode())) | ||
|
|
||
| # when the connector sends `limit - 1`, we're done | ||
| if i >= limit - 1: | ||
| break | ||
| return True | ||
|
|
||
| binder_success, connector_success = await asyncio.gather( | ||
| binder_routine(binder, 100), connector_routine(connector, 100) | ||
| ) | ||
|
|
||
| if not binder_success: | ||
| self.fail("binder failed") | ||
|
|
||
| if not connector_success: | ||
| self.fail("connector failed") | ||
|
|
||
| async def test_big_message(self): | ||
| ctx = ymq.IOContext() | ||
| binder = await ctx.createIOSocket("binder", ymq.IOSocketType.Binder) | ||
| self.assertEqual(binder.identity, "binder") | ||
| self.assertEqual(binder.socket_type, ymq.IOSocketType.Binder) | ||
|
|
||
| connector = await ctx.createIOSocket("connector", ymq.IOSocketType.Connector) | ||
| self.assertEqual(connector.identity, "connector") | ||
| self.assertEqual(connector.socket_type, ymq.IOSocketType.Connector) | ||
|
|
||
| address = "tcp://127.0.0.1:35792" | ||
| await binder.bind(address) | ||
| await connector.connect(address) | ||
|
|
||
| for _ in range(10): | ||
| await connector.send(ymq.Message(address=None, payload=b"." * 500_000_000)) | ||
| msg = await binder.recv() | ||
|
|
||
| assert msg.address is not None | ||
| self.assertEqual(msg.address.data, b"connector") | ||
| self.assertEqual(msg.payload.data, b"." * 500_000_000) | ||
|
|
||
| async def test_buffer_interface(self): | ||
| msg = TaskCancel.new_msg(TaskID.generate_task_id()) | ||
| data = serialize(msg) | ||
|
|
||
| # verify that capnp can deserialize this data | ||
| _ = deserialize(data) | ||
|
|
||
| # this creates a copy of the data | ||
| copy = ymq.Bytes(data) | ||
|
|
||
| # this should deserialize without creating a copy | ||
| # because ymq.Bytes uses the buffer protocol | ||
| deserialized: TaskCancel = deserialize(copy) # type: ignore | ||
| self.assertEqual(deserialized.task_id, msg.task_id) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| import unittest | ||
| from enum import IntEnum | ||
| from scaler.io.ymq import ymq | ||
| import array | ||
|
|
||
|
|
||
| class TestTypes(unittest.TestCase): | ||
| def test_exception(self): | ||
| # type checkers misidentify this as "unnecessary" due to the type hints file | ||
| self.assertTrue(issubclass(ymq.YMQException, Exception)) # type: ignore | ||
|
|
||
| exc = ymq.YMQException(ymq.ErrorCode.CoreBug, "oh no") | ||
| self.assertEqual(exc.args, (ymq.ErrorCode.CoreBug, "oh no")) | ||
| self.assertEqual(exc.code, ymq.ErrorCode.CoreBug) | ||
| self.assertEqual(exc.message, "oh no") | ||
|
|
||
| def test_error_code(self): | ||
| self.assertTrue(issubclass(ymq.ErrorCode, IntEnum)) # type: ignore | ||
| self.assertEqual( | ||
| ymq.ErrorCode.ConfigurationError.explanation(), | ||
| "An error generated by system call that's likely due to mis-configuration", | ||
| ) | ||
|
|
||
| def test_bytes(self): | ||
| b = ymq.Bytes(b"data") | ||
| self.assertEqual(b.len, len(b)) | ||
| self.assertEqual(b.len, 4) | ||
| self.assertEqual(b.data, b"data") | ||
|
|
||
| # would raise an exception if ymq.Bytes didn't support the buffer interface | ||
| m = memoryview(b) | ||
| self.assertTrue(m.obj is b) | ||
| self.assertEqual(m.tobytes(), b"data") | ||
|
|
||
| b = ymq.Bytes() | ||
| self.assertEqual(b.len, 0) | ||
| self.assertTrue(b.data is None) | ||
|
|
||
| b = ymq.Bytes(b"") | ||
| self.assertEqual(b.len, 0) | ||
| self.assertEqual(b.data, b"") | ||
|
|
||
| b = ymq.Bytes(array.array("B", [115, 99, 97, 108, 101, 114])) | ||
| assert b.len == 6 | ||
| assert b.data == b"scaler" | ||
|
|
||
| def test_message(self): | ||
| m = ymq.Message(b"address", b"payload") | ||
| assert m.address is not None | ||
| self.assertEqual(m.address.data, b"address") | ||
| self.assertEqual(m.payload.data, b"payload") | ||
|
|
||
| m = ymq.Message(address=None, payload=ymq.Bytes(b"scaler")) | ||
| self.assertTrue(m.address is None) | ||
| self.assertEqual(m.payload.data, b"scaler") | ||
|
|
||
| m = ymq.Message(b"address", payload=b"payload") | ||
| assert m.address is not None | ||
| self.assertEqual(m.address.data, b"address") | ||
| self.assertEqual(m.payload.data, b"payload") | ||
|
|
||
| def test_io_context(self): | ||
| ctx = ymq.IOContext() | ||
| self.assertEqual(ctx.num_threads, 1) | ||
|
|
||
| ctx = ymq.IOContext(2) | ||
| self.assertEqual(ctx.num_threads, 2) | ||
|
|
||
| ctx = ymq.IOContext(num_threads=3) | ||
| self.assertEqual(ctx.num_threads, 3) | ||
|
|
||
| # TODO: backporting to 3.8 broke this somehow | ||
| # it causes a segmentation fault | ||
| # re-enable once fixed | ||
| @unittest.skip("causes segmentation fault") | ||
| def test_io_socket(self): | ||
| # check that we can't create io socket instances directly | ||
| self.assertRaises(TypeError, lambda: ymq.IOSocket()) # type: ignore | ||
|
|
||
| def test_io_socket_type(self): | ||
| self.assertTrue(issubclass(ymq.IOSocketType, IntEnum)) # type: ignore | ||
|
|
||
| def test_bad_socket_type(self): | ||
| ctx = ymq.IOContext() | ||
|
|
||
| # TODO: should the core reject this? | ||
| socket = ctx.createIOSocket_sync("identity", ymq.IOSocketType.Uninit) | ||
| self.assertEqual(socket.socket_type, ymq.IOSocketType.Uninit) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.