Skip to content

Commit

Permalink
[y sweet/client] support provider.on() (#301)
Browse files Browse the repository at this point in the history
When we [reworked the provider to support
reconnections](#292), we lost
the `on`/`off`/`once` methods from the `Observer` base class that used
to exist on `YSweetProvider`. It turns out those methods are often used
by third-party libraries. This PR adds support for those back in.
  • Loading branch information
rolyatmax authored Oct 4, 2024
1 parent b6326c1 commit e1082a8
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 31 deletions.
4 changes: 2 additions & 2 deletions examples/nextjs/src/app/(demos)/slate/SlateEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ export function SlateEditor() {
}, [yDoc])

useEffect(() => {
provider.observable.on('sync', setConnected)
return () => provider.observable.off('sync', setConnected)
provider.on('sync', setConnected)
return () => provider.off('sync', setConnected)
}, [provider])

if (!connected) return 'Loading...'
Expand Down
10 changes: 5 additions & 5 deletions examples/nextjs/src/app/tldraw/useYjsStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,18 @@ export function useYjsStore() {
return
}

provider.observable.off('sync', handleSync)
provider.off('sync', handleSync)

if (status === 'connected') {
if (hasConnectedBefore) return
hasConnectedBefore = true
provider.observable.on('sync', handleSync)
unsubs.push(() => provider.observable.off('sync', handleSync))
provider.on('sync', handleSync)
unsubs.push(() => provider.off('sync', handleSync))
}
}

provider.observable.on('status', handleStatusChange)
unsubs.push(() => provider.observable.off('status', handleStatusChange))
provider.on('status', handleStatusChange)
unsubs.push(() => provider.off('status', handleStatusChange))

return () => {
unsubs.forEach((fn) => fn())
Expand Down
60 changes: 43 additions & 17 deletions js-pkg/client/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ const setupWS = (provider: YSweetProvider) => {
}
}
websocket.onerror = (event) => {
provider.observable.emit('connection-error', [event, provider])
provider.emit('connection-error', [event, provider])
}
websocket.onclose = (event) => {
provider.observable.emit('connection-close', [event, provider])
provider.emit('connection-close', [event, provider])
provider.ws = null
provider.wsconnecting = false
if (provider.wsconnected) {
Expand All @@ -131,7 +131,7 @@ const setupWS = (provider: YSweetProvider) => {
),
provider,
)
provider.observable.emit('status', [
provider.emit('status', [
{
status: 'disconnected',
},
Expand Down Expand Up @@ -161,7 +161,7 @@ const setupWS = (provider: YSweetProvider) => {
provider.wsconnecting = false
provider.wsconnected = true
provider.wsUnsuccessfulReconnects = 0
provider.observable.emit('status', [
provider.emit('status', [
{
status: 'connected',
},
Expand All @@ -182,7 +182,7 @@ const setupWS = (provider: YSweetProvider) => {
websocket.send(encoding.toUint8Array(encoderAwarenessState))
}
}
provider.observable.emit('status', [
provider.emit('status', [
{
status: 'connecting',
},
Expand Down Expand Up @@ -224,7 +224,6 @@ export type YSweetProviderParams = {
resyncInterval?: number
maxBackoffTime?: number
disableBc?: boolean
observable?: Observable<string>
}

/**
Expand All @@ -238,14 +237,13 @@ export type YSweetProviderParams = {
* const doc = new Y.Doc()
* const provider = new YSweetProvider('http://localhost:1234', 'my-document-name', doc)
*/
export class YSweetProvider {
export class YSweetProvider extends Observable<string> {
onFailureHandlers: Array<() => void> = []
maxBackoffTime: number
bcChannel: string
url: string
roomname: string
doc: Y.Doc
observable: Observable<string>
_WS: WebSocketPolyfillType
awareness: awarenessProtocol.Awareness
wsconnected: boolean
Expand Down Expand Up @@ -277,7 +275,6 @@ export class YSweetProvider {
* @param opts.resyncInterval - resync interval
* @param opts.maxBackoffTime - maximum backoff time
* @param opts.disableBc - disable broadcast channel
* @param opts.observable - an observable instance to emit events on
*/
constructor(
serverUrl: string,
Expand All @@ -291,15 +288,14 @@ export class YSweetProvider {
resyncInterval = -1,
maxBackoffTime = 2500,
disableBc = false,
observable = new Observable<string>(),
}: YSweetProviderParams = {},
) {
super()
// ensure that url is always ends with /
while (serverUrl[serverUrl.length - 1] === '/') {
serverUrl = serverUrl.slice(0, serverUrl.length - 1)
}
const encodedParams = url.encodeQueryParams(params)
this.observable = observable
this.maxBackoffTime = maxBackoffTime
this.bcChannel = serverUrl + '/' + roomname
this.url = serverUrl + '/' + roomname + (encodedParams.length === 0 ? '' : '?' + encodedParams)
Expand Down Expand Up @@ -407,8 +403,8 @@ export class YSweetProvider {
set synced(state) {
if (this._synced !== state) {
this._synced = state
this.observable.emit('synced', [state])
this.observable.emit('sync', [state])
this.emit('synced', [state])
this.emit('sync', [state])
}
}

Expand All @@ -425,6 +421,7 @@ export class YSweetProvider {
}
this.awareness.off('update', this._awarenessUpdateHandler)
this.doc.off('update', this._updateHandler)
super.destroy()
}

connectBc() {
Expand Down Expand Up @@ -531,9 +528,15 @@ export async function ySweetProviderWrapper(
doc: Y.Doc,
providerParams: YSweetProviderParams = {},
): Promise<YSweetProviderWithClientToken> {
const observable = providerParams.observable ?? new Observable<string>()
// we use an observable that lives outside the provider to store event listeners
// so that we can re-subscribe to events when the provider is re-created
const observable = new Observable<string>()
// keep track of which events have been subscribed to on the local observable
// so we can re-subscribe to them when the provider is re-created
const subscribedEvents = new Set<string>()

const awareness = providerParams.awareness ?? new awarenessProtocol.Awareness(doc)
providerParams = { ...providerParams, observable, awareness }
providerParams = { ...providerParams, awareness }

let _clientToken = await getClientToken(authEndpoint, roomname)
let _provider = new YSweetProvider(_clientToken.url, roomname, doc, {
Expand All @@ -548,10 +551,33 @@ export async function ySweetProviderWrapper(
connect: true,
})
_provider.addOnFailureHandler(recreateProvider)
// the previous provider's destroy() method should have been called before
// recreateProvider() is called, so we don't need to unsubscribe from events
// before re-subscribing to events here
for (const event of subscribedEvents) {
subscribeToEvent(event)
}
}

// for each event that is subscribed to on the local observable, make sure to
// subscribe to it on the provider
function subscribeToEvent(name: string) {
_provider.on(name, (...args: any[]) => observable.emit(name, args))
subscribedEvents.add(name)
}

return {
observable,
on: (name: string, f: (...args: any[]) => void) => {
if (!subscribedEvents.has(name)) subscribeToEvent(name)
observable.on(name, f)
},
once: (name: string, f: (...args: any[]) => void) => {
if (!subscribedEvents.has(name)) subscribeToEvent(name)
observable.once(name, f)
},
off: (name: string, f: (...args: any[]) => void) => {
observable.off(name, f)
},
awareness,
get clientToken() {
return _clientToken
Expand Down Expand Up @@ -619,5 +645,5 @@ export async function ySweetProviderWrapper(
get shouldConnect() {
return _provider.shouldConnect
},
} as YSweetProviderWithClientToken
} as unknown as YSweetProviderWithClientToken
}
2 changes: 1 addition & 1 deletion tests/src/convert.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async function connectToDoc(server: Server, docId: string): Promise<Y.Doc> {
})

await new Promise<void>((resolve, reject) => {
provider.observable.on('synced', resolve)
provider.on('synced', resolve)

setTimeout(() => {
reject('Timed out waiting for sync')
Expand Down
8 changes: 4 additions & 4 deletions tests/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ describe.each(CONFIGURATIONS)(
const provider = await createYjsProvider(doc, docResult.docId, getClientToken, {})

await new Promise((resolve, reject) => {
provider.observable.on('synced', resolve)
provider.observable.on('syncing', reject)
provider.on('synced', resolve)
provider.on('syncing', reject)
})
})

Expand Down Expand Up @@ -227,7 +227,7 @@ describe.each(CONFIGURATIONS)(

await new Promise<void>((resolve, reject) => {
setTimeout(() => reject('Expected to disconnect.'), 1_000)
provider.observable.on('connection-close', () => {
provider.on('connection-close', () => {
resolve()
})
})
Expand All @@ -240,7 +240,7 @@ describe.each(CONFIGURATIONS)(
// Reconnect to the doc.
provider.connect()
await new Promise<void>((resolve, reject) => {
provider.observable.on('status', (event: { status: string }) => {
provider.on('status', (event: { status: string }) => {
if (event.status === 'connected') {
resolve()
} else {
Expand Down
4 changes: 2 additions & 2 deletions tests/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { YSweetProvider } from '@y-sweet/react'

export async function waitForProviderSync(provider: YSweetProvider, timeoutMillis: number = 1_000) {
return new Promise((resolve, reject) => {
provider.observable.on('synced', resolve)
provider.observable.on('syncing', reject)
provider.on('synced', resolve)
provider.on('syncing', reject)

setTimeout(() => reject('Timed out waiting for provider to sync.'), timeoutMillis)
})
Expand Down

0 comments on commit e1082a8

Please sign in to comment.