Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a simpler implementation of socket interface for different platform #71

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
"eslint-config-airbnb-base": "^15.0.0",
"eslint-config-airbnb-typescript": "^17.1.0",
"eslint-plugin-import": "^2.29.1",
"events": "^3.3.0",
"express": "^4.18.2",
"fake-indexeddb": "^5.0.2",
"jest": "^29.7.0",
Expand Down
43 changes: 43 additions & 0 deletions packages/base/src/event-emitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// There are many where use event emitter, but the platform may not
// natively support it, so we made our own.
export class EventEmitter<E extends string, C extends Function> {
private events: Map<E, C[]> = new Map();

addEventListener(eventName: E, callback: C) {
let list = this.events.get(eventName);
if (!list) {
list = [];
this.events.set(eventName, list);
}
if (list.indexOf(callback) < 0) {
list.push(callback);
}
}

removeEventListener(eventName: E, callback: C) {
if (this.events.has(eventName)) {
const list = this.events.get(eventName);
const i = list!.indexOf(callback);
if (i >= 0) {
list?.splice(i, 1);
}
}
}

emit(eventName: E, ...args: any[]) {
const list = this.events.get(eventName);
if (list) {
list.forEach((cb) => {
cb(...args);
});
}
}

clearListeners(eventName: E) {
this.events.delete(eventName);
}

clearAllListeners() {
this.events.clear();
}
}
81 changes: 25 additions & 56 deletions packages/base/src/socket-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ interface GetterMember {
export type WebSocketEvents = keyof WebSocketEventMap;
type CallbackType = (data?: any) => void;

// fork WebSocket state
// WebSocket state. Enum defs for WebSocket readyState
export enum SocketState {
CONNECTING = 0,
OPEN = 1,
Expand All @@ -36,55 +36,17 @@ export enum SocketState {

const HEARTBEAT_INTERVAL = 5000;

// 封装不同平台的 socket
export abstract class SocketWrapper {
abstract init(url: string): void;
abstract send(data: string): void;
abstract close(data?: {}): void;
abstract getState(): SocketState;
protected events: Record<WebSocketEvents, CallbackType[]> = {
open: [],
close: [],
error: [],
message: [],
};

protected emit(event: WebSocketEvents, data: any) {
this.events[event].forEach((fun) => {
fun(data);
});
// for close and error, clear all listeners or they will be called on next socket instance.
if (event === 'close' || event === 'error') {
this.clearListeners();
}
}

onOpen(fun: (res: { header?: Record<string, string> }) => void) {
this.events.open.push(fun);
}

onClose(fun: (res: { code: number; reason: string }) => void) {
this.events.close.push(fun);
}

onError(fun: (msg: string) => void) {
this.events.error.push(fun);
}

onMessage(fun: (data: string | ArrayBuffer) => void) {
this.events.message.push(fun);
}

protected clearListeners() {
// clear listeners
Object.entries(this.events).forEach(([, funs]) => {
funs.splice(0);
});
}
// socket interface, be consistent with websocket.
export interface ISocket {
send(data: string): void;
close(): void;
addEventListener(event: WebSocketEvents, callback: CallbackType): void;
removeEventListener(event: WebSocketEvents, callback: CallbackType): void;
readonly readyState: SocketState;
}

export abstract class SocketStoreBase {
protected abstract socketWrapper: SocketWrapper;
protected socket: ISocket | null = null;

private socketUrl: string = '';

Expand Down Expand Up @@ -133,35 +95,42 @@ export abstract class SocketStoreBase {
this.addListener('debugger-online', this.handleFlushBuffer);
}

public getSocket() {
return this.socket;
}

// Simple offline listener
abstract onOffline(): void;

// Create socket instance of different platform implementations
abstract createSocket(url: string): ISocket;

public init(url: string) {
try {
if (!url) {
throw Error('WebSocket url cannot be empty');
}
// close existing connection
if (this.socketWrapper.getState() === SocketState.OPEN) {
this.socketWrapper.close();
if (this.socket?.readyState === SocketState.OPEN) {
this.socket.close();
}
this.socketWrapper?.onOpen(() => {
this.socket = this.createSocket(url);
this.socket.addEventListener('open', () => {
this.connectOnline();
});
// Strictly, the onMessage should be called after onOpen. But for some platform(alipay,)
// this may cause some message losing.
this.socketWrapper?.onMessage((evt) => {
this.socket.addEventListener('message', (evt) => {
this.handleMessage(evt);
});
this.socketWrapper?.onClose(() => {
this.socket.addEventListener('close', () => {
this.connectOffline();
});
this.socketWrapper?.onError(() => {
this.socket.addEventListener('error', () => {
// we treat on error the same with on close.
this.connectOffline();
});
this.socketUrl = url;
this.socketWrapper?.init(url);
} catch (e: any) {
psLog.error(e.message);
}
Expand Down Expand Up @@ -195,7 +164,7 @@ export abstract class SocketStoreBase {
this.clearPing();
this.reconnectTimes = 0;
this.reconnectable = false;
this.socketWrapper?.close();
this.socket?.close();
this.messages = [];
Object.entries(this.events).forEach(([, fns]) => {
fns.splice(0);
Expand Down Expand Up @@ -433,7 +402,7 @@ export abstract class SocketStoreBase {
pkMsg.createdAt = Date.now();
pkMsg.requestId = getRandomId();
const dataString = stringifyData(pkMsg);
this.socketWrapper?.send(dataString);
this.socket?.send(dataString);
} catch (e) {
throw Error(`Incompatible: ${(e as Error).message}`);
}
Expand Down
25 changes: 17 additions & 8 deletions packages/base/tests/network/base.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import {
SocketState,
SocketStoreBase,
SocketWrapper,
} from 'base/src/socket-base';
import { SocketState, SocketStoreBase, ISocket } from 'base/src/socket-base';
import NetworkProxyBase from 'base/src/network/base';
import RequestItem from 'base/src/request-item';

class PlatformSocketWrapper extends SocketWrapper {
class PlatformSocketImpl implements ISocket {
readyState: SocketState = 0;
constructor(url: string) {
this.init(url);
}
init(url: string): void {
throw new Error('Method not implemented.');
}
Expand All @@ -22,10 +22,19 @@ class PlatformSocketWrapper extends SocketWrapper {
getState(): SocketState {
throw new Error('Method not implemented.');
}
addEventListener(
event: keyof WebSocketEventMap,
callback: (data?: any) => void,
): void {}
removeEventListener(
event: keyof WebSocketEventMap,
callback: (data?: any) => void,
): void {}
}

class PlatformSocket extends SocketStoreBase {
protected socketWrapper: SocketWrapper = new PlatformSocketWrapper();
createSocket(url: string): ISocket {
return new PlatformSocketImpl(url);
}
onOffline(): void {
throw new Error('Method not implemented.');
}
Expand Down
52 changes: 28 additions & 24 deletions packages/mp-base/src/helpers/socket.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,44 @@
import { ROOM_SESSION_KEY } from 'base/src/constants';
import {
SocketStoreBase,
SocketState,
SocketWrapper,
} from 'base/src/socket-base';
import { SocketStoreBase, SocketState, ISocket } from 'base/src/socket-base';
import { getMPSDK, utilAPI } from '../utils';
import { EventEmitter } from 'base/src/event-emitter';

export class MPSocketWrapper extends SocketWrapper {
// Mini program implementation of ISocket
export class MPSocketImpl
extends EventEmitter<'open' | 'close' | 'error' | 'message', Function>
implements ISocket
{
private socketInstance: MPSocket | null = null;

private state: SocketState = 0;
private _state: SocketState = SocketState.CONNECTING;

get readyState() {
return this._state;
}

// some ali-family app only support single socket connection...
public static isSingleSocket = false;

init(url: string) {
this.state = SocketState.CONNECTING;
constructor(url: string) {
super();
this._state = SocketState.CONNECTING;
const mp = getMPSDK();
const closeHandler: SocketOnCloseHandler = (data) => {
this.state = SocketState.CLOSED;
this._state = SocketState.CLOSED;
this.emit('close', data);
};
const openHandler: SocketOnOpenHandler = (data) => {
this.state = SocketState.OPEN;
this._state = SocketState.OPEN;
this.emit('open', data);
};
const errorHandler: SocketOnErrorHandler = (data) => {
this.state = SocketState.CLOSED;
this._state = SocketState.CLOSED;
this.emit('error', data);
};
const messageHandler: SocketOnMessageHandler = (data) => {
this.emit('message', data);
};

if (!MPSocketWrapper.isSingleSocket) {
if (!MPSocketImpl.isSingleSocket) {
this.socketInstance = mp.connectSocket({
url,
multiple: true, // for alipay mp to return a task
Expand All @@ -53,7 +58,7 @@ export class MPSocketWrapper extends SocketWrapper {
}

send(data: string) {
if (MPSocketWrapper.isSingleSocket) {
if (MPSocketImpl.isSingleSocket) {
getMPSDK().sendSocketMessage({ data });
} else {
this.socketInstance?.send({
Expand All @@ -63,27 +68,22 @@ export class MPSocketWrapper extends SocketWrapper {
}

close() {
if (MPSocketWrapper.isSingleSocket) {
if (MPSocketImpl.isSingleSocket) {
getMPSDK().closeSocket({});
} else {
this.socketInstance?.close({});
}
this.state = SocketState.CLOSED;
this.clearListeners();
this._state = SocketState.CLOSED;
this.clearAllListeners();
}

getState(): SocketState {
return this.state;
return this._state;
}
}

export class MPSocketStore extends SocketStoreBase {
// websocket socketInstance
protected socketWrapper = new MPSocketWrapper();

public getSocket() {
return this.socketWrapper;
}

constructor() {
super();
Expand All @@ -94,6 +94,10 @@ export class MPSocketStore extends SocketStoreBase {
onOffline() {
utilAPI.setStorage(ROOM_SESSION_KEY, JSON.stringify({ usable: false }));
}

createSocket(url: string): ISocket {
return new MPSocketImpl(url);
}
}

export default new MPSocketStore();
6 changes: 3 additions & 3 deletions packages/mp-base/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import NetworkPlugin from './plugins/network';
import SystemPlugin from './plugins/system';
import StoragePlugin from './plugins/storage';

import socketStore, { MPSocketWrapper } from './helpers/socket';
import socketStore, { MPSocketImpl } from './helpers/socket';
import Request from './api';

// import './index.less';
Expand Down Expand Up @@ -100,7 +100,7 @@ class PageSpy {
const config = this.config.mergeConfig(init);

if (config.singletonSocket) {
MPSocketWrapper.isSingleSocket = true;
MPSocketImpl.isSingleSocket = true;
}

const mp = getMPSDK();
Expand Down Expand Up @@ -174,7 +174,7 @@ class PageSpy {
mp.onAppShow(() => {
// Mini programe can not detect ws disconnect (before we add heart beat ping pong).
// So we need to refresh the connection.
const state = socketStore.getSocket().getState();
const state = socketStore.getSocket()?.readyState;
if (state === SocketState.CLOSED || state === SocketState.CLOSING) {
this.useOldConnection();
}
Expand Down
Loading
Loading