Skip to content

Commit

Permalink
Merge pull request #26 from versatica/use-a-single-bidirectional-sock…
Browse files Browse the repository at this point in the history
…et-to-communicate-with-worker

Use a single and bidirectional socket to communicate with the worker (fix Python3 <= 3.10 requirement)
  • Loading branch information
ibc authored Sep 15, 2023
2 parents d275dea + 370c463 commit f8eafc0
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/mediasoup-client-aiortc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ jobs:
# NOTE: Avoid lint:python due to
# https://github.com/versatica/mediasoup-client-aiortc/issues/25
- run: npm run lint:node
- run: npm run test
- run: PYTHON_LOG_TO_STDOUT=true npm run test
33 changes: 7 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,38 +256,19 @@ When sending, `dataChannel.send()` (and hence `dataProducer.send()`) allows pass
## Development
In order to run `npm run lint` task, the following Python dependencies are required:
### Lint task
- `flake8` >= 5.0.4
- `mypy` >= 0.982
In order to run `npm run lint` task, install Python dev dependencies:
```bash
$ npm run install-python-dev-deps
```
### Issue with Python >= 3.11
See https://github.com/versatica/mediasoup-client-aiortc/issues/22.
As a workaround:
1. Install `python@3.10`.
2. Make `PYTHON` environment variable point to it:
```bash
export PYTHON=python3.10
```
3. Make `PIP` environment variable point to `[email protected]`:
```bash
export PIP=pip3.10
```
4. Install deps:
```bash
npm ci
```
5. Run tests:
```bash
npm test
```
### Make Python log to stdout/stderr while running tests
```bash
PYTHON_LOG_TO_STDOUT=true npm run test
```
## Caveats
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
}
]
},
"coveragePathIgnorePatterns": [
"src/tests"
],
"cacheDirectory": ".cache/jest"
},
"dependencies": {
Expand Down
51 changes: 16 additions & 35 deletions src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ export class Channel extends EnhancedEventEmitter
{
// Closed flag.
#closed = false;
// Unix Socket instance for sending messages to the worker process.
readonly #sendSocket: Duplex;
// Unix Socket instance for receiving messages to the worker process.
readonly #recvSocket: Duplex;
// Unix Socket instance for communicating with the worker process.
readonly #socket: Duplex;
// Next id for requests sent to the worker process.
#nextId = 0;
// Map of pending sent requests.
Expand All @@ -38,25 +36,22 @@ export class Channel extends EnhancedEventEmitter

constructor(
{
sendSocket,
recvSocket,
socket,
pid
}:
{
sendSocket: any;
recvSocket: any;
socket: any;
pid: number;
})
{
super();

logger.debug('constructor()');

this.#sendSocket = sendSocket as Duplex;
this.#recvSocket = recvSocket as Duplex;
this.#socket = socket as Duplex;

// Read Channel responses/notifications from the worker.
this.#recvSocket.on('data', (buffer: Buffer) =>
this.#socket.on('data', (buffer: Buffer) =>
{
if (!this.#recvBuffer)
{
Expand Down Expand Up @@ -132,20 +127,12 @@ export class Channel extends EnhancedEventEmitter
}
});

this.#sendSocket.on('end', () => (
logger.debug('send Channel ended by the worker process')
this.#socket.on('end', () => (
logger.debug('Channel ended by the worker process')
));

this.#sendSocket.on('error', (error) => (
logger.error('send Channel error: %s', String(error))
));

this.#recvSocket.on('end', () => (
logger.debug('receive Channel ended by the worker process')
));

this.#recvSocket.on('error', (error) => (
logger.error('receive Channel error: %s', String(error))
this.#socket.on('error', (error) => (
logger.error('Channel error: %s', String(error))
));
}

Expand All @@ -168,20 +155,14 @@ export class Channel extends EnhancedEventEmitter

// Remove event listeners but leave a fake 'error' hander to avoid
// propagation.
this.#sendSocket.removeAllListeners('end');
this.#sendSocket.removeAllListeners('error');
this.#sendSocket.on('error', () => {});

this.#recvSocket.removeAllListeners('end');
this.#recvSocket.removeAllListeners('error');
this.#recvSocket.on('error', () => {});
this.#socket.removeAllListeners('end');
this.#socket.removeAllListeners('error');
this.#socket.on('error', () => {});

// Destroy the socket after a while to allow pending incoming messages.
setTimeout(() =>
{
try { this.#sendSocket.destroy(); }
catch (error) {}
try { this.#recvSocket.destroy(); }
try { this.#socket.destroy(); }
catch (error) {}
}, 200);
}
Expand Down Expand Up @@ -210,7 +191,7 @@ export class Channel extends EnhancedEventEmitter

// This may throw if closed or remote side ended.
// Terminate with \r\n since we are expecting for it on the python side.
this.#sendSocket.write(ns);
this.#socket.write(ns);

return new Promise((pResolve, pReject) =>
{
Expand Down Expand Up @@ -286,7 +267,7 @@ export class Channel extends EnhancedEventEmitter
// Terminate with \r\n since we are expecting for it on the python side.
try
{
this.#sendSocket.write(ns);
this.#socket.write(ns);
}
catch (error)
{
Expand Down
9 changes: 3 additions & 6 deletions src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,12 @@ export class Worker extends EnhancedEventEmitter
// fd 0 (stdin) : Just ignore it.
// fd 1 (stdout) : Pipe it for 3rd libraries that log their own stuff.
// fd 2 (stderr) : Same as stdout.
// fd 3 (channel) : Producer Channel fd.
// fd 4 (channel) : Consumer Channel fd.
// fd 3 (channel) : Channel fd.
stdio :
[
'ignore',
PYTHON_LOG_VIA_PIPE ? 'pipe' : 'inherit',
PYTHON_LOG_VIA_PIPE ? 'pipe' : 'inherit',
'pipe',
'pipe'
]
});
Expand All @@ -91,9 +89,8 @@ export class Worker extends EnhancedEventEmitter

this.#channel = new Channel(
{
sendSocket : this.#child.stdio[3],
recvSocket : this.#child.stdio[4],
pid : this.#pid
socket : this.#child.stdio[3],
pid : this.#pid
});

let spawnDone = false;
Expand Down
6 changes: 3 additions & 3 deletions src/tests/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ test('worker.getUserMedia() succeeds', async () =>
players : [],
handlers : []
});
}, 5000);
}, 15000);

test('create a Device with worker.createHandlerFactory() as argument succeeds', () =>
{
Expand Down Expand Up @@ -418,7 +418,7 @@ test('transport.produce() succeeds', async () =>

sendTransport.removeAllListeners('connect');
sendTransport.removeAllListeners('produce');
}, 5000);
}, 15000);

test('transport.consume() succeeds', async () =>
{
Expand Down Expand Up @@ -761,7 +761,7 @@ test('producer.replaceTrack() succeeds', async () =>
trackId : secondAudioProducer.track!.id
}
});
}, 5000);
}, 15000);

test('producer.getStats() succeeds', async () =>
{
Expand Down
19 changes: 5 additions & 14 deletions worker/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ def object_from_string(message_str) -> Optional[Dict[str, Any]]:


class Channel:
def __init__(self, readfd, writefd) -> None:
self._readfd = readfd
self._writefd = writefd
def __init__(self, fd) -> None:
self._fd = fd
self._reader = Optional[StreamReader]
self._writer = Optional[StreamWriter]
self._nsDecoder = pynetstring.Decoder()
Expand All @@ -49,24 +48,16 @@ async def _connect(self) -> None:
"""
Create the sender and receivers
"""
rsock = socket.socket(
socket.AF_UNIX, socket.SOCK_STREAM, 0, self._readfd)
self._reader, writer = await asyncio.open_connection(
sock=rsock)

wsock = socket.socket(
socket.AF_UNIX, socket.SOCK_STREAM, 0, self._writefd)
reader, self._writer = await asyncio.open_connection(
sock=wsock)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0, self._fd)

self._reader, self._writer = await asyncio.open_connection(sock=sock)
self._connected = True

async def close(self) -> None:
if self._writer is not None:
self._writer.close()

if self._reader is not None:
self._reader.close()
# NOTE: For whatever reason I don't remember, we must not close self._reader.

async def receive(self) -> Optional[Dict[str, Any]]:
await self._connect()
Expand Down
7 changes: 3 additions & 4 deletions worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
from handler import Handler
from logger import Logger

# File descriptors to communicate with the Node.js process
READ_FD = 3
WRITE_FD = 4
# File descriptor to communicate with the Node.js process
CHANNEL_FD = 3


if __name__ == "__main__":
Expand Down Expand Up @@ -43,7 +42,7 @@
loop = asyncio.get_event_loop()

# create channel
channel = Channel(READ_FD, WRITE_FD)
channel = Channel(CHANNEL_FD)

def getTrack(playerId: str, kind: str) -> MediaStreamTrack:
player = players[playerId]
Expand Down

0 comments on commit f8eafc0

Please sign in to comment.