Skip to content

Commit

Permalink
More responsive to network outages (#360)
Browse files Browse the repository at this point in the history
This PR introduces a number of improvements that make `YSweetProvider`
more responsive to network conditions.

The main issue this attempts to address is that browsers are
surprisingly inconsistent in how long it takes to terminate a WebSocket
connection when the network is lost. I have observed the timeout taking
anywhere from 10s (Firefox) to 95s (Chrome) when using a VPN. (Without
the VPN it is a bit better, presumably because the VPN makes the network
status more opaque to the browser.)

## Passive / active connection monitoring

This PR introduces passive connection monitoring that switches into an
active heartbeat when message volume is low.

When the server is sending the client lots of messages (e.g. frequent
awareness updates), we know the connection is alive, and don't have to
send any sort of heartbeat to confirm it.

When we haven't received a message for a certain time threshold
(`MAX_TIMEOUT_BETWEEN_HEARTBEATS`), we don't know whether it's because
the connection was lost or just a lull in events, so we start to send an
active heartbeat. This uses the same mechanism on the server introduced
for detecting local unsaved changes: we simply re-send the latest
`localVersion` (formerly `lastSyncSent`) instead of incrementing it
(since the underlying data hasn't changed).

## Browser connection status monitoring

Browsers also provide a mechanism for being notified about
online/offline state. It's not entirely reliable, but we can use it as
an additional signal to accelerate reconnection. The way it works is:

- When we receive and `offline` event, we immediately send a heartbeat,
which means we only have to wait
`MAX_TIMEOUT_WITHOUT_RECEIVING_HEARTBEAT` instead of up to
`MAX_TIMEOUT_BETWEEN_HEARTBEATS +
MAX_TIMEOUT_WITHOUT_RECEIVING_HEARTBEAT` before showing as offline.
- When we receive an `online` event, if we are currently waiting to
retry a network operation (either obtaining credentials or connecting to
a websocket), we bypass the wait and go directly to making the request.

---------

Co-authored-by: Jake Lazaroff <[email protected]>
  • Loading branch information
paulgb and jakelazaroff authored Dec 16, 2024
1 parent b49add3 commit 80fec8d
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 25 deletions.
152 changes: 127 additions & 25 deletions js-pkg/client/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as encoding from 'lib0/encoding'
import * as awarenessProtocol from 'y-protocols/awareness'
import * as syncProtocol from 'y-protocols/sync'
import * as Y from 'yjs'
import { Sleeper } from './sleeper'
import {
EVENT_CONNECTION_CLOSE,
EVENT_CONNECTION_ERROR,
Expand All @@ -18,7 +19,16 @@ const MESSAGE_SYNC_STATUS = 102

const RETRIES_BEFORE_TOKEN_REFRESH = 3
const DELAY_MS_BEFORE_RECONNECT = 500
const DELAY_MS_BEFORE_RETRY_TOKEN_REFRESH = 3000
const DELAY_MS_BEFORE_RETRY_TOKEN_REFRESH = 3_000

/** Amount of time without receiving any message that we should send a MESSAGE_SYNC_STATUS message. */
const MAX_TIMEOUT_BETWEEN_HEARTBEATS = 2_000

/**
* Amount of time after sending a MESSAGE_SYNC_STATUS message that we should close the connection
* unless any message has been received.
**/
const MAX_TIMEOUT_WITHOUT_RECEIVING_HEARTBEAT = 3_000

// Note: These should not conflict with y-websocket's events, defined in `ws-status.ts`.
export const EVENT_LOCAL_CHANGES = 'local-changes'
Expand Down Expand Up @@ -73,10 +83,6 @@ export type YSweetProviderParams = {
initialClientToken?: ClientToken
}

async function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}

async function getClientToken(authEndpoint: AuthEndpoint, roomname: string): Promise<ClientToken> {
if (typeof authEndpoint === 'function') {
return await authEndpoint()
Expand Down Expand Up @@ -110,22 +116,24 @@ export class YSweetProvider {
/** Current client token. */
public clientToken: ClientToken | null = null

/** Whether the local document has unsynced changes. */
public hasLocalChanges: boolean = true

/** Connection status. */
public status: YSweetStatus = STATUS_OFFLINE

private websocket: WebSocket | null = null
private WebSocketPolyfill: WebSocketPolyfillType
private listeners: Map<YSweetEvent | YWebsocketEvent, Set<EventListener>> = new Map()

private lastSyncSent: number = 0
private lastSyncAcked: number = -1
private localVersion: number = 0
private ackedVersion: number = -1

/** Whether a (re)connect loop is currently running. This acts as a lock to prevent two concurrent connect loops. */
private isConnecting: boolean = false

private heartbeatHandle: ReturnType<typeof setTimeout> | null = null
private connectionTimeoutHandle: ReturnType<typeof setTimeout> | null = null

private reconnectSleeper: Sleeper | null = null

constructor(
private authEndpoint: AuthEndpoint,
private docId: string,
Expand All @@ -143,27 +151,100 @@ export class YSweetProvider {
this.awareness.on('update', this.handleAwarenessUpdate.bind(this))
this.WebSocketPolyfill = extraOptions.WebSocketPolyfill || WebSocket

this.online = this.online.bind(this)
this.offline = this.offline.bind(this)
if (typeof window !== 'undefined') {
window.addEventListener('offline', this.offline)
window.addEventListener('online', this.online)
}

doc.on('update', this.update.bind(this))

if (extraOptions.connect !== false) {
this.connect()
}
}

private offline() {
// When the browser indicates that we are offline, we immediately
// probe the connection status.
// This accelerates the process of discovering we are offline, but
// doesn't mean we entirely trust the browser, since it can be wrong
// (e.g. in the case that the connection is over localhost).
this.checkSync()
}

private online() {
if (this.reconnectSleeper) {
this.reconnectSleeper.wake()
}
}

private clearHeartbeat() {
if (this.heartbeatHandle) {
clearTimeout(this.heartbeatHandle)
this.heartbeatHandle = null
}
}

private resetHeartbeat() {
this.clearHeartbeat()
this.heartbeatHandle = setTimeout(() => {
this.checkSync()
this.heartbeatHandle = null
}, MAX_TIMEOUT_BETWEEN_HEARTBEATS)
}

private clearConnectionTimeout() {
if (this.connectionTimeoutHandle) {
clearTimeout(this.connectionTimeoutHandle)
this.connectionTimeoutHandle = null
}
}

private setConnectionTimeout() {
if (this.connectionTimeoutHandle) {
return
}
this.connectionTimeoutHandle = setTimeout(() => {
if (this.websocket) {
this.websocket.close()
this.setStatus(STATUS_ERROR)
this.connect()
}
this.connectionTimeoutHandle = null
}, MAX_TIMEOUT_WITHOUT_RECEIVING_HEARTBEAT)
}

private send(message: Uint8Array) {
if (this.websocket?.readyState === this.WebSocketPolyfill.OPEN) {
this.websocket.send(message)
}
}

private updateSyncedState() {
let hasLocalChanges = this.lastSyncAcked !== this.lastSyncSent
if (hasLocalChanges === this.hasLocalChanges) {
return
private incrementLocalVersion() {
// We need to increment the local version before we emit, so that event
// listeners see the right hasLocalChanges value.
let emit = !this.hasLocalChanges
this.localVersion += 1

if (emit) {
this.emit(EVENT_LOCAL_CHANGES, true)
}
}

private updateAckedVersion(version: number) {
// The version _should_ never go backwards, but we guard for that in case it does.
version = Math.max(version, this.ackedVersion)

this.hasLocalChanges = hasLocalChanges
this.emit(EVENT_LOCAL_CHANGES, hasLocalChanges)
// We need to increment the local version before we emit, so that event
// listeners see the right hasLocalChanges value.
let emit = this.hasLocalChanges && version === this.localVersion
this.ackedVersion = version

if (emit) {
this.emit(EVENT_LOCAL_CHANGES, false)
}
}

private setStatus(status: YSweetStatus) {
Expand All @@ -182,22 +263,21 @@ export class YSweetProvider {
syncProtocol.writeUpdate(encoder, update)
this.send(encoding.toUint8Array(encoder))

this.incrementLocalVersion()
this.checkSync()
}
}

private checkSync() {
this.lastSyncSent += 1
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, MESSAGE_SYNC_STATUS)

const versionEncoder = encoding.createEncoder()
encoding.writeVarUint(versionEncoder, this.lastSyncSent)
encoding.writeVarUint(versionEncoder, this.localVersion)
encoding.writeVarUint8Array(encoder, encoding.toUint8Array(versionEncoder))

this.send(encoding.toUint8Array(encoder))

this.updateSyncedState()
this.setConnectionTimeout()
}

private async ensureClientToken(): Promise<ClientToken> {
Expand Down Expand Up @@ -257,7 +337,8 @@ export class YSweetProvider {
} catch (e) {
console.warn('Failed to get client token', e)
this.setStatus(STATUS_ERROR)
await sleep(DELAY_MS_BEFORE_RETRY_TOKEN_REFRESH)
this.reconnectSleeper = new Sleeper(DELAY_MS_BEFORE_RETRY_TOKEN_REFRESH)
await this.reconnectSleeper.sleep()
continue
}

Expand All @@ -266,7 +347,8 @@ export class YSweetProvider {
break
}

await sleep(DELAY_MS_BEFORE_RECONNECT)
this.reconnectSleeper = new Sleeper(DELAY_MS_BEFORE_RECONNECT)
await this.reconnectSleeper.sleep()
}

// Delete the current client token to force a token refresh on the next attempt.
Expand Down Expand Up @@ -368,9 +450,13 @@ export class YSweetProvider {
this.syncStep1()
this.checkSync()
this.broadcastAwareness()
this.resetHeartbeat()
}

private receiveMessage(event: MessageEvent) {
this.clearConnectionTimeout()
this.resetHeartbeat()

let message: Uint8Array = new Uint8Array(event.data)
const decoder = decoding.createDecoder(message)
const messageType = decoding.readVarUint(decoder)
Expand All @@ -387,8 +473,8 @@ export class YSweetProvider {
case MESSAGE_SYNC_STATUS:
let lastSyncBytes = decoding.readVarUint8Array(decoder)
let d2 = decoding.createDecoder(lastSyncBytes)
this.lastSyncAcked = decoding.readVarUint(d2)
this.updateSyncedState()
let ackedVersion = decoding.readVarUint(d2)
this.updateAckedVersion(ackedVersion)
break
default:
break
Expand All @@ -398,6 +484,8 @@ export class YSweetProvider {
private websocketClose(event: CloseEvent) {
this.emit(EVENT_CONNECTION_CLOSE, event)
this.setStatus(STATUS_ERROR)
this.clearHeartbeat()
this.clearConnectionTimeout()
this.connect()

// Remove all awareness states except for our own.
Expand All @@ -413,6 +501,8 @@ export class YSweetProvider {
private websocketError(event: Event) {
this.emit(EVENT_CONNECTION_ERROR, event)
this.setStatus(STATUS_ERROR)
this.clearHeartbeat()
this.clearConnectionTimeout()

this.connect()
}
Expand All @@ -436,7 +526,7 @@ export class YSweetProvider {
awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients),
)

this.websocket?.send(encoding.toUint8Array(encoder))
this.send(encoding.toUint8Array(encoder))
}

public destroy() {
Expand All @@ -445,6 +535,11 @@ export class YSweetProvider {
}

awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], 'window unload')

if (typeof window !== 'undefined') {
window.removeEventListener('offline', this.offline)
window.removeEventListener('online', this.online)
}
}

private _on(
Expand Down Expand Up @@ -481,6 +576,13 @@ export class YSweetProvider {
}
}

/**
* Whether the document has local changes.
*/
get hasLocalChanges() {
return this.ackedVersion !== this.localVersion
}

/**
* Whether the provider should attempt to connect.
*
Expand Down
33 changes: 33 additions & 0 deletions js-pkg/client/src/sleeper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* A timeout that can be woken up prematurely by calling `wake()`.
*/
export class Sleeper {
resolve?: () => void
reject?: () => void
promise: Promise<void>

constructor(timeout: number) {
this.promise = new Promise<void>((resolve, reject) => {
setTimeout(() => {
resolve()
}, timeout)

this.resolve = resolve
this.reject = reject
})
}

/**
* Sleeps until the timeout has completed (or has been woken).
*/
async sleep() {
await this.promise
}

/**
* Wakes up the timeout if it has not completed yet.
*/
wake() {
this.resolve && this.resolve()
}
}

0 comments on commit 80fec8d

Please sign in to comment.