Skip to content

Commit

Permalink
engine.io 쓰듯이 람다 웹소켓을 정의할수 있도록
Browse files Browse the repository at this point in the history
  • Loading branch information
if1live committed Dec 29, 2023
1 parent 34cbba8 commit 8c874fb
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 20 deletions.
35 changes: 35 additions & 0 deletions examples/plain/client_naive.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { setTimeout } from "timers/promises";
import { default as WebSocket } from "ws";

const host = process.env.WS_URL ?? "ws://127.0.0.1:3001";

async function main() {
console.log(`init ${Date.now()}`);

const ws = new WebSocket(`${host}/engine.io/?EIO=4&transport=websocket`);
ws.onopen = () => {
console.log(`open ${Date.now()}`);

ws.send('4ping');
console.log(`ping ${Date.now()}`);
};

ws.onclose = () => {
console.log(`close ${Date.now()}`);
};

ws.onerror = () => {
console.log(`error ${Date.now()}`);
};

/**
* @param {MessageEvent} ev
*/
ws.onmessage = (ev) => {
console.log(`message ${Date.now()} ${ev.data} [${ev.data.constructor.name}]`);
};

await setTimeout(3000);
ws.close();
}
await main();
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ socket.on("open", async () => {
socket.send("message-text");
await setTimeout(10);
}

{
const arr = new Uint8Array(2);
arr[0] = 0x12;
arr[1] = 0x34;
socket.send(arr);
await setTimeout(10);
}

// TODO: 바이너리 패킷은 aws websocket api 제약과 겹쳐서 더 봐야함
// {
// const arr = new Uint8Array(2);
// arr[0] = 0x12;
// arr[1] = 0x34;
// socket.send(arr);
// await setTimeout(10);
// }

await setTimeout(60_000);
socket.close();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"rimraf": "^5.0.5",
"serverless": "^3.38.0",
"serverless-scriptable-plugin": "^1.3.1",
"serverless-standalone": "^0.0.10",
"serverless-standalone": "^0.0.11",
"syncpack": "^12.0.1",
"tsx": "^4.7.0",
"typescript": "^5.3.3",
Expand Down
12 changes: 6 additions & 6 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions src/apps.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { Packet } from "engine.io-parser";
import { MySocketPolicy } from "./engine/MySocket.js";
import { encodePacketAsync } from "./engine/helpers.js";

// TODO: 핸들러 정책을 어디에서 관리하지?
const socket = new MySocketPolicy();

socket.on("message", async (sock, data) => {
console.log("my_message", sock.id, data);

// TODO: 간단하게 echo 구현
await sock.send(data);

// TODO: ping 야매로 보내기
// TODO: ping은 독립적으로 전송되도록 바뀌어야한다
{
const packet: Packet = { type: "ping" };
const encoded = await encodePacketAsync(packet);
await sock.send(encoded, { wsPreEncoded: encoded });
}
});

socket.on("close", (sock, reason) => {
console.log("my_close", sock.id, reason);
});

socket.on("error", (sock, error) => {
console.log("my_error", sock.id, error);
});

socket.on("heartbeat", (sock) => {
console.log("my_heartbeat", sock.id);
});

export const apps = {
handlers_socket: socket,
};
1 change: 1 addition & 0 deletions src/engine/MyServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class MyServerPolicy {}
114 changes: 114 additions & 0 deletions src/engine/MySocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { ApiGatewayManagementApiClient } from "@aws-sdk/client-apigatewaymanagementapi";
import { Packet } from "engine.io-parser";
import { ConnectionAction } from "../repositories.js";
import * as settings from "../settings.js";
import { encodePacketAsync } from "./helpers.js";

export class MySocket {
constructor(
private readonly connectionId: string,
readonly endpoint: string,
) {}

public get id() {
return this.connectionId;
}

public get client() {
return new ApiGatewayManagementApiClient({
endpoint: this.endpoint,
region: settings.AWS_REGION,
credentials: settings.AWS_CREDENTIALS,
});
}

public async send(data: string, options?: { wsPreEncoded?: string }) {
const wsPreEncoded = options?.wsPreEncoded;
if (typeof wsPreEncoded === "string") {
return await ConnectionAction.post(this.client, this.id, wsPreEncoded);
}

// else...
const packet: Packet = { type: "message", data };
const encoded = await encodePacketAsync(packet);
return await ConnectionAction.post(this.client, this.id, encoded);
}
}

// TODO: buffer는 나중에 신경쓰다. aws websocket api가 binary를 편법으로만 지원해서.
type Fn_Message = (sock: MySocket, data: string) => Promise<void> | void;

type Fn_Close = (sock: MySocket, reason: string) => Promise<void> | void;

type Fn_Error = (sock: MySocket, error: Error) => Promise<void> | void;

type Fn_NoArgs = (sock: MySocket) => Promise<void> | void;

export class MySocketPolicy {
private readonly list_close: Fn_Close[] = [];
private readonly list_message: Fn_Message[] = [];
private readonly list_error: Fn_Error[] = [];
private readonly list_heartbeat: Fn_NoArgs[] = [];

public on(tag: "close", fn: Fn_Close): void;
public on(tag: "message", fn: Fn_Message): void;
public on(tag: "error", fn: Fn_Error): void;
public on(tag: "heartbeat", fn: Fn_NoArgs): void;

public on(tag: string, fn: (...args: any[]) => Promise<void> | void): void {
switch (tag) {
case "close":
this.on_close(fn as any);
break;
case "message":
this.on_message(fn as any);
break;
case "error":
this.on_error(fn as any);
break;
case "heartbeat":
this.on_heartbeat(fn as any);
break;
}
}

private on_message(fn: Fn_Message) {
this.list_message.push(fn);
}

private on_close(fn: Fn_Close) {
this.list_close.push(fn);
}

private on_error(fn: Fn_Error) {
this.list_error.push(fn);
}

private on_heartbeat(fn: Fn_NoArgs) {
this.list_heartbeat.push(fn);
}

async dispatch_message(sock: MySocket, data: string) {
for (const fn of this.list_message) {
await fn(sock, data);
}
}

async dispatch_close(sock: MySocket, reason: string) {
for (const fn of this.list_close) {
await fn(sock, reason);
}
}

async dispatch_error(sock: MySocket, error: Error) {
for (const fn of this.list_error) {
await fn(sock, error);
}
}

async dispatch_heartbeat(sock: MySocket) {
for (const fn of this.list_heartbeat) {
await fn(sock);
}
}
}
15 changes: 15 additions & 0 deletions src/engine/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Packet, RawData, encodePacket } from "engine.io-parser";

export const encodePacketAsync = (
packet: Packet,
supportsBinary?: boolean,
): Promise<RawData> => {
// aws websocket api는 바이너리를 지원하지 않음
const opt_supportsBinary = supportsBinary ?? false;

return new Promise<string>((resolve) => {
encodePacket(packet, opt_supportsBinary, (encoded) => {
resolve(encoded);
});
});
};
14 changes: 14 additions & 0 deletions src/engine/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export interface Handshake {
sid: string;
upgrades: string[];
pingTimeout: number;
pingInterval: number;
maxPayload: number;
}

export const defaultHandshake: Omit<Handshake, "sid"> = {
upgrades: [],
pingInterval: 25000,
pingTimeout: 20000,
maxPayload: 1e6,
};
Loading

0 comments on commit 8c874fb

Please sign in to comment.