From 4b59e5d96ceba638fe53d50982ccfa0f01722d8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Fri, 15 Sep 2023 13:05:34 +0200 Subject: [PATCH 1/6] Use a single and bidirectional socket to communicate with the worker ### Rationale - In mediasoup server we used to use also a single bidirectional socket at the beginning, but there was a bug in Node for Windows so we ended spliting it into two (one for sending and one for receiving). - Anyway, **mediasoup-client-aiortc** doesn't support Windowes. - And probably, such a bug in Node for Windows is already fixed (I don't care). --- src/Channel.ts | 51 +++++++++++++++-------------------------------- src/Worker.ts | 9 +++------ worker/channel.py | 16 ++++----------- worker/worker.py | 7 +++---- 4 files changed, 26 insertions(+), 57 deletions(-) diff --git a/src/Channel.ts b/src/Channel.ts index 388d4ab..d35daae 100644 --- a/src/Channel.ts +++ b/src/Channel.ts @@ -25,10 +25,8 @@ export class Channel extends EnhancedEventEmitter { // Closed flag. #closed = false; - // Unix Socket instance for sending messages to the worker process. - readonly #sendSocket: Duplex; - // Unix Socket instance for receiving messages to the worker process. - readonly #recvSocket: Duplex; + // Unix Socket instance for communicating with the worker process. + readonly #socket: Duplex; // Next id for requests sent to the worker process. #nextId = 0; // Map of pending sent requests. @@ -38,13 +36,11 @@ export class Channel extends EnhancedEventEmitter constructor( { - sendSocket, - recvSocket, + socket, pid }: { - sendSocket: any; - recvSocket: any; + socket: any; pid: number; }) { @@ -52,11 +48,10 @@ export class Channel extends EnhancedEventEmitter logger.debug('constructor()'); - this.#sendSocket = sendSocket as Duplex; - this.#recvSocket = recvSocket as Duplex; + this.#socket = socket as Duplex; // Read Channel responses/notifications from the worker. - this.#recvSocket.on('data', (buffer: Buffer) => + this.#socket.on('data', (buffer: Buffer) => { if (!this.#recvBuffer) { @@ -132,20 +127,12 @@ export class Channel extends EnhancedEventEmitter } }); - this.#sendSocket.on('end', () => ( - logger.debug('send Channel ended by the worker process') + this.#socket.on('end', () => ( + logger.debug('Channel ended by the worker process') )); - this.#sendSocket.on('error', (error) => ( - logger.error('send Channel error: %s', String(error)) - )); - - this.#recvSocket.on('end', () => ( - logger.debug('receive Channel ended by the worker process') - )); - - this.#recvSocket.on('error', (error) => ( - logger.error('receive Channel error: %s', String(error)) + this.#socket.on('error', (error) => ( + logger.error('Channel error: %s', String(error)) )); } @@ -168,20 +155,14 @@ export class Channel extends EnhancedEventEmitter // Remove event listeners but leave a fake 'error' hander to avoid // propagation. - this.#sendSocket.removeAllListeners('end'); - this.#sendSocket.removeAllListeners('error'); - this.#sendSocket.on('error', () => {}); - - this.#recvSocket.removeAllListeners('end'); - this.#recvSocket.removeAllListeners('error'); - this.#recvSocket.on('error', () => {}); + this.#socket.removeAllListeners('end'); + this.#socket.removeAllListeners('error'); + this.#socket.on('error', () => {}); // Destroy the socket after a while to allow pending incoming messages. setTimeout(() => { - try { this.#sendSocket.destroy(); } - catch (error) {} - try { this.#recvSocket.destroy(); } + try { this.#socket.destroy(); } catch (error) {} }, 200); } @@ -210,7 +191,7 @@ export class Channel extends EnhancedEventEmitter // This may throw if closed or remote side ended. // Terminate with \r\n since we are expecting for it on the python side. - this.#sendSocket.write(ns); + this.#socket.write(ns); return new Promise((pResolve, pReject) => { @@ -286,7 +267,7 @@ export class Channel extends EnhancedEventEmitter // Terminate with \r\n since we are expecting for it on the python side. try { - this.#sendSocket.write(ns); + this.#socket.write(ns); } catch (error) { diff --git a/src/Worker.ts b/src/Worker.ts index c4de780..4f60f7a 100644 --- a/src/Worker.ts +++ b/src/Worker.ts @@ -75,14 +75,12 @@ export class Worker extends EnhancedEventEmitter // fd 0 (stdin) : Just ignore it. // fd 1 (stdout) : Pipe it for 3rd libraries that log their own stuff. // fd 2 (stderr) : Same as stdout. - // fd 3 (channel) : Producer Channel fd. - // fd 4 (channel) : Consumer Channel fd. + // fd 3 (channel) : Channel fd. stdio : [ 'ignore', PYTHON_LOG_VIA_PIPE ? 'pipe' : 'inherit', PYTHON_LOG_VIA_PIPE ? 'pipe' : 'inherit', - 'pipe', 'pipe' ] }); @@ -91,9 +89,8 @@ export class Worker extends EnhancedEventEmitter this.#channel = new Channel( { - sendSocket : this.#child.stdio[3], - recvSocket : this.#child.stdio[4], - pid : this.#pid + socket : this.#child.stdio[3], + pid : this.#pid }); let spawnDone = false; diff --git a/worker/channel.py b/worker/channel.py index f9d70c1..4f0bf07 100644 --- a/worker/channel.py +++ b/worker/channel.py @@ -34,9 +34,8 @@ def object_from_string(message_str) -> Optional[Dict[str, Any]]: class Channel: - def __init__(self, readfd, writefd) -> None: - self._readfd = readfd - self._writefd = writefd + def __init__(self, fd) -> None: + self._fd = fd self._reader = Optional[StreamReader] self._writer = Optional[StreamWriter] self._nsDecoder = pynetstring.Decoder() @@ -49,16 +48,9 @@ async def _connect(self) -> None: """ Create the sender and receivers """ - rsock = socket.socket( - socket.AF_UNIX, socket.SOCK_STREAM, 0, self._readfd) - self._reader, writer = await asyncio.open_connection( - sock=rsock) - - wsock = socket.socket( - socket.AF_UNIX, socket.SOCK_STREAM, 0, self._writefd) - reader, self._writer = await asyncio.open_connection( - sock=wsock) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0, self._fd) + self._reader, self._writer = await asyncio.open_connection(sock=sock) self._connected = True async def close(self) -> None: diff --git a/worker/worker.py b/worker/worker.py index 97c59f5..de266c4 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -9,9 +9,8 @@ from handler import Handler from logger import Logger -# File descriptors to communicate with the Node.js process -READ_FD = 3 -WRITE_FD = 4 +# File descriptor to communicate with the Node.js process +CHANNEL_FD = 3 if __name__ == "__main__": @@ -43,7 +42,7 @@ loop = asyncio.get_event_loop() # create channel - channel = Channel(READ_FD, WRITE_FD) + channel = Channel(CHANNEL_FD) def getTrack(playerId: str, kind: str) -> MediaStreamTrack: player = players[playerId] From 045d603a1b331d85152b8df155ab022637eb2281 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Fri, 15 Sep 2023 13:15:53 +0200 Subject: [PATCH 2/6] test --- worker/channel.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/worker/channel.py b/worker/channel.py index 4f0bf07..5b9fce0 100644 --- a/worker/channel.py +++ b/worker/channel.py @@ -57,8 +57,7 @@ async def close(self) -> None: if self._writer is not None: self._writer.close() - if self._reader is not None: - self._reader.close() + # NOTE: For whatever reason I don't remember, we must not close self._reader. async def receive(self) -> Optional[Dict[str, Any]]: await self._connect() From c82323eb7a689f7c4b5797885cb89ede11961450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Fri, 15 Sep 2023 13:19:30 +0200 Subject: [PATCH 3/6] tests: increase timeout for getUserMedia --- src/tests/test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tests/test.ts b/src/tests/test.ts index e08be42..6c1e385 100644 --- a/src/tests/test.ts +++ b/src/tests/test.ts @@ -110,7 +110,7 @@ test('worker.getUserMedia() succeeds', async () => players : [], handlers : [] }); -}, 5000); +}, 15000); test('create a Device with worker.createHandlerFactory() as argument succeeds', () => { @@ -418,7 +418,7 @@ test('transport.produce() succeeds', async () => sendTransport.removeAllListeners('connect'); sendTransport.removeAllListeners('produce'); -}, 5000); +}, 15000); test('transport.consume() succeeds', async () => { @@ -761,7 +761,7 @@ test('producer.replaceTrack() succeeds', async () => trackId : secondAudioProducer.track!.id } }); -}, 5000); +}, 15000); test('producer.getStats() succeeds', async () => { From fcf7ee35ce800151edeff7c6d75508a0b4b978ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Fri, 15 Sep 2023 13:27:30 +0200 Subject: [PATCH 4/6] Enable PYTHON_LOG_TO_STDOUT=true in CI tests --- .github/workflows/mediasoup-client-aiortc.yaml | 2 +- README.md | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/mediasoup-client-aiortc.yaml b/.github/workflows/mediasoup-client-aiortc.yaml index 1dbdf82..4b3e936 100644 --- a/.github/workflows/mediasoup-client-aiortc.yaml +++ b/.github/workflows/mediasoup-client-aiortc.yaml @@ -42,4 +42,4 @@ jobs: # NOTE: Avoid lint:python due to # https://github.com/versatica/mediasoup-client-aiortc/issues/25 - run: npm run lint:node - - run: npm run test + - run: PYTHON_LOG_TO_STDOUT=true npm run test diff --git a/README.md b/README.md index e0a63bb..ff29242 100644 --- a/README.md +++ b/README.md @@ -256,15 +256,20 @@ When sending, `dataChannel.send()` (and hence `dataProducer.send()`) allows pass ## Development -In order to run `npm run lint` task, the following Python dependencies are required: +### Lint task -- `flake8` >= 5.0.4 -- `mypy` >= 0.982 +In order to run `npm run lint` task, install Python dev dependencies: ```bash $ npm run install-python-dev-deps ``` +### Make Python log to stdout/stderr while running tests + +```bash +PYTHON_LOG_TO_STDOUT=true npm run test +``` + ### Issue with Python >= 3.11 See https://github.com/versatica/mediasoup-client-aiortc/issues/22. From 92a8da6dd6d037e3b56b99f8266ec03dffe0a55b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Fri, 15 Sep 2023 13:49:06 +0200 Subject: [PATCH 5/6] Remove issue from README --- README.md | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/README.md b/README.md index ff29242..572a983 100644 --- a/README.md +++ b/README.md @@ -270,30 +270,6 @@ $ npm run install-python-dev-deps PYTHON_LOG_TO_STDOUT=true npm run test ``` -### Issue with Python >= 3.11 - -See https://github.com/versatica/mediasoup-client-aiortc/issues/22. - -As a workaround: - -1. Install `python@3.10`. -2. Make `PYTHON` environment variable point to it: - ```bash - export PYTHON=python3.10 - ``` -3. Make `PIP` environment variable point to `pip@3.10`: - ```bash - export PIP=pip3.10 - ``` -4. Install deps: - ```bash - npm ci - ``` -5. Run tests: - ```bash - npm test - ``` - ## Caveats From 370c463d09b798c79e7c93c61e6f7fc5c4f0ceab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Fri, 15 Sep 2023 13:56:35 +0200 Subject: [PATCH 6/6] Jest coveragE: ignore tests/ folder --- package.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/package.json b/package.json index 32d8e69..1c79973 100644 --- a/package.json +++ b/package.json @@ -61,6 +61,9 @@ } ] }, + "coveragePathIgnorePatterns": [ + "src/tests" + ], "cacheDirectory": ".cache/jest" }, "dependencies": {