Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partial websocket implementation and video feed code #1

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "src/main-protos/protos"]
path = src/main-protos/protos
url = https://github.com/pioneers/protos.git
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"@types/node": "^12.0.0",
"@types/react": "^17.0.0",
"@types/react-dom": "^17.0.0",
"buffer": "^6.0.3",
"protobufjs": "^6.11.2",
"react": "^17.0.2",
"react-dom": "^17.0.2",
"react-scripts": "4.0.3",
Expand Down
45 changes: 28 additions & 17 deletions src/App.tsx
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
import React from 'react';
import logo from './logo.svg';
import React, { useEffect } from 'react';
// import {VideoFeed} from './video-feed/VideoFeed';
import { useRuntimeConnection } from './hooks';
import './App.css';

function App() {
export const App = () => {
const runtimeConnection = useRuntimeConnection();

useEffect(() => {
runtimeConnection.connect('127.0.0.1')
}, [runtimeConnection]);

// const ws = new WebSocket('ws://127.0.0.1:5000');

// ws.addEventListener('close', (ev: Event) => console.log('Connection closed', ev));

// ws.onopen = ((ev: Event) => {
// console.log('web socket open', ev);
// ws.send(Buffer.from([1]));
// setInterval(() => ws.send('testing'), 1000);
// });

// ws.addEventListener('message', (message: MessageEvent<string | Blob>) => {
// console.log('received data: %s', message.data.toString());
// });

// ws.addEventListener('error', (ev: Event) => console.log('error', ev));

return (
<div className="App">
<header className="App-header">
<img src={logo} className="App-logo" alt="logo" />
<p>
Edit <code>src/App.tsx</code> and save to reload.
</p>
<a
className="App-link"
href="https://reactjs.org"
target="_blank"
rel="noopener noreferrer"
>
Learn React
</a>
</header>
<p>Dawn is the best software team.</p>
{/* <VideoFeed webSocket={ws}> */}
</div>
);
}
Expand Down
280 changes: 280 additions & 0 deletions src/connections/RuntimeConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
import { Logger } from '../utils';
import * as protos from '../main-protos/protos';
import { Buffer } from 'buffer';

/**
* Define port constants, which must match with Runtime
*/
const DEFAULT_CONNECTION_PORT = 5000;

/**
* Runtime IP Address used for TCP and UDP connections
*/
const DEFAULT_CONNECTION_IP = '192.168.0.0';

/**
* Define message ID constants, which must match with Runtime
*/
enum MsgType {
RUN_MODE = 0,
START_POS = 1,
LOG = 2,
DEVICE_DATA = 3,
// 4 reserved for some Shepherd msg type
INPUTS = 5,
TIME_STAMPS = 6
}

interface Packet {
type: MsgType;
length: number;
payload: Buffer;
}

/** Given a data buffer, read as many TCP Packets as possible.
* If there are leftover bytes, return them so that they can be used in the next cycle of data.
*/
function readPackets(data: Buffer, previousLeftoverBytes?: Buffer): { leftoverBytes?: Buffer; processedTCPPackets: Packet[] } {
const HEADER_NUM_BYTES = 3;

const bytesToRead = Buffer.concat([previousLeftoverBytes ?? new Uint8Array(), data]);
const processedTCPPackets: Packet[] = [];

let leftoverBytes;
let currentPos = 0;

while (currentPos < bytesToRead.length) {
let header: Buffer;
let msgType: number;
let msgLength: number;
let payload: Buffer;

if (currentPos + HEADER_NUM_BYTES <= bytesToRead.length) {
// Have enough bytes to read in 3 byte header
header = bytesToRead.slice(currentPos, currentPos + HEADER_NUM_BYTES);
msgType = header[0];
msgLength = (header[2] << 8) | header[1];
} else {
// Don't have enough bytes to read 3 byte header so we save the bytes for the next data cycle
leftoverBytes = bytesToRead.slice(currentPos);

return {
leftoverBytes,
processedTCPPackets
};
}

currentPos += HEADER_NUM_BYTES;

if (currentPos + msgLength <= bytesToRead.length) {
// Have enough bytes to read entire payload from 1 TCP packet
payload = bytesToRead.slice(currentPos, currentPos + msgLength);
} else {
// Don't have enough bytes to read entire payload
leftoverBytes = bytesToRead.slice(currentPos);

return {
// Note: Need to save header so we know how many bytes to read for this packet in the next data cycle
leftoverBytes: Buffer.concat([header, leftoverBytes]),
processedTCPPackets
};
}

const newTCPPacket = { type: msgType, length: msgLength, payload };
processedTCPPackets.push(newTCPPacket);

currentPos += msgLength;
}

return {
leftoverBytes,
processedTCPPackets
};
}

/**
* Create TCP packet header and prepend to
* payload to send to Runtime.
*/
function createPacket(payload: unknown, messageType: MsgType): Buffer {
let encodedPayload: Uint8Array;

switch (messageType) {
case MsgType.DEVICE_DATA:
encodedPayload = protos.DevData.encode(protos.DevData.create(payload as protos.IDevData)).finish();
break;
case MsgType.RUN_MODE:
encodedPayload = protos.RunMode.encode(protos.RunMode.create(payload as protos.IRunMode)).finish();
break;
case MsgType.START_POS:
encodedPayload = protos.StartPos.encode(protos.StartPos.create(payload as protos.IStartPos)).finish();
break;
case MsgType.TIME_STAMPS:
encodedPayload = protos.TimeStamps.encode(protos.TimeStamps.create(payload as protos.ITimeStamps)).finish();
break;
case MsgType.INPUTS:
encodedPayload = protos.UserInputs.encode(
protos.UserInputs.create({ inputs: payload as protos.Input[] } as protos.IUserInputs)
).finish();
break;
default:
console.log('ERROR: trying to create TCP Packet with unknown message type');
encodedPayload = new Uint8Array();
break;
}

const msgLength = Buffer.byteLength(encodedPayload);
const msgLengthArr = new Uint8Array([msgLength & 0x00ff, msgLength & 0xff00]); // Assuming little-endian byte order, since runs on x64
const msgTypeArr = new Uint8Array([messageType]);

return Buffer.concat([Buffer.from(msgTypeArr.buffer), Buffer.from(msgLengthArr.buffer), Buffer.from(encodedPayload.buffer)], msgLength + 3);
}

export class RuntimeConnection {
currentIp: string = DEFAULT_CONNECTION_IP;
loggerName: string = 'RuntimeConnection';
logger: Logger = new Logger(this.loggerName);
socket: WebSocket | undefined;
leftoverBytes: Buffer | undefined;
isConnecting: boolean = false;
socketReady: boolean = false;

constructor() {
this.tick();
}

private openNewConnection() {
this.isConnecting = true;

const ip = this.currentIp;

if (ip.includes(':')) {
// ip most likely already includes port, so no need to use `DEFAULT_CONNECTION_PORT`
this.socket = new WebSocket(`ws://${ip}`);
} else {
this.socket = new WebSocket(`ws://${ip}:${DEFAULT_CONNECTION_PORT}`);
}

this.socket.addEventListener('open', () => {
this.logger.log('connected');
this.socketReady = true;
this.socket!.send(new Uint8Array([1])); // Runtime needs first byte to be 1 to recognize client as Dawn (instead of Shepherd)
});

this.socket.addEventListener('end', () => {
this.logger.log('Runtime disconnected');
});

this.socket.addEventListener('error', (ev: Event) => {
this.logger.log(`Encountered error -- ${ev}`);
});

/**
* Runtime TCP Message Handler.
* TODO: Distinguish between challenge outputs and console logs
* when using payload to update console
*/
this.socket.addEventListener('message', async (message: MessageEvent<Blob>) => {
// this.logger.log('Received message');
const dataArrayBuffer = await message.data.arrayBuffer();
const { leftoverBytes, processedTCPPackets } = readPackets(Buffer.from(dataArrayBuffer), this.leftoverBytes);

for (const packet of processedTCPPackets) {
let decoded;

switch (packet.type) {
case MsgType.TIME_STAMPS:
decoded = protos.TimeStamps.decode(packet.payload);
const oneWayLatency = (Date.now() - Number(decoded.dawnTimestamp)) / 2;
this.logger.log(`${this.loggerName}: oneWayLatency -- ${oneWayLatency} msec`);

// TODO: use `oneWayLatency` in UI
break;

case MsgType.DEVICE_DATA:
try {
const sensorData: protos.Device[] = protos.DevData.decode(packet.payload).devices;
this.logger.log(`sensorData -- ${JSON.stringify(sensorData)}`)
} catch (err) {
this.logger.log(err);
}
break;

default:
this.logger.log(`Unsupported received message type: ${packet.type}`)
}
}

this.leftoverBytes = leftoverBytes;
});

this.isConnecting = false;
}

private tick = () => {
console.log('socket connected', this.socketReady);
console.log('socket is connecting', this.isConnecting);
console.log('current ip', this.currentIp);
console.log('\n');

if (this.socket === undefined && !this.isConnecting) {
this.openNewConnection();
}

setTimeout(this.tick, 5000);
}

public connect = (newIp: string) => {
if (newIp === this.currentIp) {
// Same ip, no need to reconnect
return;
}

if (this.socketReady) {
this.logger.log(`Closed existing connection to connect to new ip: ${newIp}`);
// Close existing connected socket to open new connection with new ip
this.socket?.close();
this.socket = undefined;
}

this.currentIp = newIp;
this.openNewConnection();
}

/**
* Initiates latency check by sending first packet to Runtime
*/
public initiateLatencyCheck = (data: protos.ITimeStamps) => {
const message = createPacket(data, MsgType.TIME_STAMPS);
this.socket?.send(message);
};

public sendRunMode = (runModeData: protos.IRunMode) => {
if (!this.socketReady) {
return;
}

const message = createPacket(runModeData, MsgType.RUN_MODE);
this.socket?.send(message);
this.logger.log(`Sent run mode data -- ${JSON.stringify(runModeData)}\n`);
};

public sendInputs = (data: protos.Input[], source: protos.Source) => {
if (data.length === 0) {
data.push(
protos.Input.create({
connected: false,
source
})
);
}
const message = createPacket(data, MsgType.INPUTS);
this.socket?.send(message);
};

close = () => {
this.logger.log('Closed socket connection');
this.socket?.close();
this.socketReady = false;
};
}
1 change: 1 addition & 0 deletions src/connections/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { RuntimeConnection } from './RuntimeConnection';
1 change: 1 addition & 0 deletions src/hooks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { useRuntimeConnection } from './useRuntimeConnection';
20 changes: 20 additions & 0 deletions src/hooks/useRuntimeConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { RuntimeConnection } from '../connections';
import { useEffect, useState } from 'react';

export const useRuntimeConnection = () => {
const [runtimeConnection, _setRuntimeConnection] = useState(new RuntimeConnection());

useEffect(() => {
setInterval(() => runtimeConnection.sendRunMode({ mode: 2 }), 5000);

return () => {
runtimeConnection.close();
}
}, [runtimeConnection]);

const connect = (newIp: string) => {
runtimeConnection.connect(newIp);
}

return { connect };
}
Loading