Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
Removing the messagesend and messagerecv classes in favor of using th…
Browse files Browse the repository at this point in the history
…e protobuf messages.
  • Loading branch information
mcottontensor committed Jan 30, 2024
1 parent 801663e commit f8e503c
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 331 deletions.
2 changes: 1 addition & 1 deletion Common/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@epicgames-ps/lib-pixelstreamingcommon-ue5.5",
"version": "0.0.6",
"version": "0.0.7",
"description": "Common utilities library for Unreal Engine 5.5 Pixel Streaming",
"main": "build/commonjs/pixelstreamingcommon.js",
"module": "build/es2015/pixelstreamingcommon.js",
Expand Down
6 changes: 6 additions & 0 deletions Common/protobuf/signalling_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ message offer {
string type = 1;
string sdp = 2;
optional string playerId = 3;
optional bool sfu = 4;
}

message peerDataChannelsReady {
Expand Down Expand Up @@ -132,3 +133,8 @@ message subscribe {
message unsubscribe {
string type = 1;
}

message streamerIdChanged {
string type = 1;
string newID = 2;
}
38 changes: 38 additions & 0 deletions Common/src/Messages/message_helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { IMessageType } from "@protobuf-ts/runtime";
import { BaseMessage } from './base_message';
import { Logger } from '../Logger/Logger';
import { MessageRegistry } from './message_registry';

export function createMessage(messageType: IMessageType<BaseMessage>, params?: any) {
const message = messageType.create();
Expand All @@ -9,3 +11,39 @@ export function createMessage(messageType: IMessageType<BaseMessage>, params?: a
}
return message;
}

export function validateMessage(msg: any): IMessageType<BaseMessage> | null {
let valid: boolean = true;

if (!msg.type) {
Logger.Error(Logger.GetStackTrace(), `Parsed message has no type. Rejected. ${JSON.stringify(msg)}`);
return null;
}

const messageType = MessageRegistry[msg.type];
if (!messageType) {
Logger.Error(Logger.GetStackTrace(), `Message is of an unknown type: "${messageType}". Rejected.`);
return null;
}

if (messageType.fields) {
for (let field of messageType.fields) {
if (!field.opt) {
if (!msg.hasOwnProperty(field.name)) {
Logger.Error(Logger.GetStackTrace(), `Message "${msg.type}"" is missing required field "${field.name}". Rejected.`);
valid = false;
}
}
}
}

for (const fieldName in msg) {
const found = messageType.fields.find(field => field.name === fieldName);
if (!found) {
Logger.Error(Logger.GetStackTrace(), `Message "${msg.type}" contains unknown field "${fieldName}". Rejected.`);
valid = false;
}
}

return valid ? messageType : null;
}
81 changes: 80 additions & 1 deletion Common/src/Messages/signalling_messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ export interface offer {
* @generated from protobuf field: optional string playerId = 3;
*/
playerId?: string;
/**
* @generated from protobuf field: optional bool sfu = 4;
*/
sfu?: boolean;
}
/**
* @generated from protobuf message peerDataChannelsReady
Expand Down Expand Up @@ -365,6 +369,19 @@ export interface unsubscribe {
*/
type: string;
}
/**
* @generated from protobuf message streamerIdChanged
*/
export interface streamerIdChanged {
/**
* @generated from protobuf field: string type = 1;
*/
type: string;
/**
* @generated from protobuf field: string newID = 2;
*/
newID: string;
}
// @generated message type with reflection information, may provide speed optimized methods
class base_message$Type extends MessageType<base_message> {
constructor() {
Expand Down Expand Up @@ -1033,7 +1050,8 @@ class offer$Type extends MessageType<offer> {
super("offer", [
{ no: 1, name: "type", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "sdp", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "playerId", kind: "scalar", opt: true, T: 9 /*ScalarType.STRING*/ }
{ no: 3, name: "playerId", kind: "scalar", opt: true, T: 9 /*ScalarType.STRING*/ },
{ no: 4, name: "sfu", kind: "scalar", opt: true, T: 8 /*ScalarType.BOOL*/ }
]);
}
create(value?: PartialMessage<offer>): offer {
Expand All @@ -1058,6 +1076,9 @@ class offer$Type extends MessageType<offer> {
case /* optional string playerId */ 3:
message.playerId = reader.string();
break;
case /* optional bool sfu */ 4:
message.sfu = reader.bool();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
Expand All @@ -1079,6 +1100,9 @@ class offer$Type extends MessageType<offer> {
/* optional string playerId = 3; */
if (message.playerId !== undefined)
writer.tag(3, WireType.LengthDelimited).string(message.playerId);
/* optional bool sfu = 4; */
if (message.sfu !== undefined)
writer.tag(4, WireType.Varint).bool(message.sfu);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
Expand Down Expand Up @@ -1772,3 +1796,58 @@ class unsubscribe$Type extends MessageType<unsubscribe> {
* @generated MessageType for protobuf message unsubscribe
*/
export const unsubscribe = new unsubscribe$Type();
// @generated message type with reflection information, may provide speed optimized methods
class streamerIdChanged$Type extends MessageType<streamerIdChanged> {
constructor() {
super("streamerIdChanged", [
{ no: 1, name: "type", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "newID", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
]);
}
create(value?: PartialMessage<streamerIdChanged>): streamerIdChanged {
const message = globalThis.Object.create((this.messagePrototype!));
message.type = "";
message.newID = "";
if (value !== undefined)
reflectionMergePartial<streamerIdChanged>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: streamerIdChanged): streamerIdChanged {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* string type */ 1:
message.type = reader.string();
break;
case /* string newID */ 2:
message.newID = reader.string();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: streamerIdChanged, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* string type = 1; */
if (message.type !== "")
writer.tag(1, WireType.LengthDelimited).string(message.type);
/* string newID = 2; */
if (message.newID !== "")
writer.tag(2, WireType.LengthDelimited).string(message.newID);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message streamerIdChanged
*/
export const streamerIdChanged = new streamerIdChanged$Type();
7 changes: 3 additions & 4 deletions Common/src/Transport/ITransport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { MessageSend } from '../WebSockets/MessageSend';
import { MessageRecv } from '../WebSockets/MessageReceive';
import { BaseMessage } from '../Messages/base_message';
import { EventEmitter } from 'events';

/**
Expand All @@ -16,12 +15,12 @@ export interface ITransport {
/**
* Called when the protocol wants to send a message over the transport.
*/
sendMessage(msg: MessageSend): void;
sendMessage(msg: BaseMessage): void;

/**
* Callback filled in by the SignallingProtocol and should be called by the transport when a new message arrives.
*/
onMessage: (msg: MessageRecv) => void;
onMessage: (msg: BaseMessage) => void;

/**
* Connect to a given URL.
Expand Down
23 changes: 14 additions & 9 deletions Common/src/Transport/WebSocketTransport.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright Epic Games, Inc. All Rights Reserved.

import { Logger } from '../Logger/Logger';
import { MessageRecv } from '../WebSockets/MessageReceive';
import { MessageSend } from '../WebSockets/MessageSend';
import { ITransport } from './ITransport';
import { EventEmitter } from 'events';
import { BaseMessage } from '../Messages/base_message';
import * as MessageHelpers from '../Messages/message_helpers';

// declare the new method for the websocket interface
declare global {
Expand All @@ -25,11 +25,11 @@ export class WebSocketTransport implements ITransport {
this.events = new EventEmitter();
}

sendMessage(msg: MessageSend): void {
this.webSocket.send(msg.payload());
sendMessage(msg: BaseMessage): void {
this.webSocket.send(JSON.stringify(msg));
}

onMessage: (msg: MessageRecv) => void;
onMessage: (msg: BaseMessage) => void;

/**
* Connect to the signaling server
Expand Down Expand Up @@ -104,17 +104,22 @@ export class WebSocketTransport implements ITransport {
return;
}

const message: MessageRecv = JSON.parse(event.data);
Logger.Log(
Logger.GetStackTrace(),
'received => \n' +
JSON.stringify(JSON.parse(event.data), undefined, 4),
6
);

this.onMessage(message);
// Send to our signalling protocol to handle the incoming message
//this.signallingProtocol.handleMessage(message.type, event.data);
let parsedMessage;
try {
parsedMessage = JSON.parse(event.data);
} catch (e) {
Logger.Error(Logger.GetStackTrace(), `Error parsing message string ${event.data}.\n${e}`);
return;
}

this.onMessage(parsedMessage);
}

/**
Expand Down
97 changes: 0 additions & 97 deletions Common/src/WebSockets/MessageReceive.ts

This file was deleted.

Loading

0 comments on commit f8e503c

Please sign in to comment.