@@ -21,6 +21,7 @@ const MAX_PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-max
2121const REVALIDATE_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-server-revalidate-timeout' ) || '60000' )
2222const WORKER_DISABLED = env . getConf ( 'y-worker-disabled' ) === 'true'
2323const DEFAULT_CLEAR_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-default-clear-timeout' ) || '30000' )
24+ const WORKER_HEALTH_CHECK_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-worker-health-check-interval' ) || '5000' )
2425
2526process . on ( 'SIGINT' , function ( ) {
2627 // calling .shutdown allows your process to exit normally
@@ -146,6 +147,26 @@ export class YSocketIO {
146147 * @readonly
147148 */
148149 awaitingCleanupNamespace = new Map ( )
150+ /**
151+ * @type {boolean }
152+ * @private
153+ */
154+ workerReady = false
155+ /**
156+ * @type {number | null }
157+ * @private
158+ */
159+ workerLastHeartbeat = null
160+ /**
161+ * @type {{ promise: Promise<boolean>, resolve: (result: boolean) => void } | null }
162+ * @private
163+ */
164+ workerHeartbeatContext = null
165+ /**
166+ * @type {NodeJS.Timeout | null }
167+ * @private
168+ */
169+ persistWorkerHealthCheckTimeout = null
149170
150171 /**
151172 * YSocketIO constructor.
@@ -169,20 +190,22 @@ export class YSocketIO {
169190 *
170191 * It also starts socket connection listeners.
171192 * @param {import('../storage.js').AbstractStorage } store
172- * @param {{ redisPrefix?: string, redisUrl?: string, persistWorker?: import('worker_threads').Worker }= } opts
193+ * @param {{ redisPrefix?: string, redisUrl?: string, getPersistWorker?: () => import('worker_threads').Worker }= } opts
173194 * @public
174195 */
175- async initialize ( store , { redisUrl, redisPrefix = 'y' , persistWorker } = { } ) {
196+ async initialize ( store , { redisUrl, redisPrefix = 'y' , getPersistWorker } = { } ) {
176197 const { enableAwareness } = this . configuration
177198 const [ client , subscriber ] = await promise . all ( [
178199 api . createApiClient ( store , { redisUrl, redisPrefix, enableAwareness } ) ,
179200 createSubscriber ( store , { redisUrl, redisPrefix, enableAwareness } )
180201 ] )
181202 this . client = client
182203 this . subscriber = subscriber
183- if ( persistWorker ) {
184- this . client . persistWorker = persistWorker
204+ if ( getPersistWorker ) {
205+ this . getPersistWorker = getPersistWorker
206+ this . persistWorker = getPersistWorker ( )
185207 this . registerPersistWorkerResolve ( )
208+ this . registerPersistWorkerHealthCheck ( )
186209 }
187210
188211 this . nsp = this . io . of ( / ^ \/ y j s \| .* $ / )
@@ -518,16 +541,15 @@ export class YSocketIO {
518541 const doc = this . namespaceDocMap . get ( namespace ) ?. ydoc
519542 logSocketIO ( `trying to persist ${ namespace } ` )
520543 if ( ! doc ) return
521- if ( this . client . persistWorker ) {
544+ if ( this . persistWorker && this . workerReady ) {
522545 /** @type {ReturnType<typeof promiseWithResolvers<void>> } */
523546 const { promise, resolve } = promiseWithResolvers ( )
524- assert ( this . client ?. persistWorker )
525547 this . awaitingPersistMap . set ( namespace , { promise, resolve } )
526548
527549 const docState = Y . encodeStateAsUpdateV2 ( doc )
528550 const buf = new Uint8Array ( new SharedArrayBuffer ( docState . length ) )
529551 buf . set ( docState )
530- this . client . persistWorker . postMessage ( {
552+ this . persistWorker . postMessage ( {
531553 room : namespace ,
532554 docstate : buf
533555 } )
@@ -627,6 +649,9 @@ export class YSocketIO {
627649
628650 destroy ( ) {
629651 try {
652+ if ( this . persistWorkerHealthCheckTimeout ) {
653+ clearInterval ( this . persistWorkerHealthCheckTimeout )
654+ }
630655 this . subscriber ?. destroy ( )
631656 return this . client ?. destroy ( )
632657 } catch ( e ) {
@@ -635,9 +660,13 @@ export class YSocketIO {
635660 }
636661
637662 registerPersistWorkerResolve ( ) {
638- if ( ! this . client ?. persistWorker ) return
639- this . client . persistWorker . on ( 'message' , ( { event, room } ) => {
663+ if ( ! this . persistWorker ) return
664+ this . persistWorker . on ( 'message' , ( { event, room } ) => {
640665 if ( event === 'persisted' ) this . awaitingPersistMap . get ( room ) ?. resolve ( )
666+ if ( event === 'pong' && this . workerHeartbeatContext ) {
667+ this . workerHeartbeatContext . resolve ( true )
668+ }
669+ this . workerReady = true
641670 } )
642671 }
643672
@@ -677,4 +706,58 @@ export class YSocketIO {
677706 this . namespaceDocMap . delete ( namespace )
678707 this . namespacePersistentMap . delete ( namespace )
679708 }
709+
710+ async waitUntilWorkerReady ( ) {
711+ if ( ! this . persistWorker || this . workerReady ) return
712+ /** @type {ReturnType<typeof promiseWithResolvers<void>> } */
713+ const { promise, resolve } = promiseWithResolvers ( )
714+ const timer = setInterval ( ( ) => {
715+ if ( ! this . workerReady ) return
716+ clearInterval ( timer )
717+ resolve ( )
718+ } , 100 )
719+ await promise
720+ }
721+
722+ registerPersistWorkerHealthCheck ( ) {
723+ this . persistWorkerHealthCheckTimeout = setTimeout ( async ( ) => {
724+ const workerHealth = await this . workerHealthCheck ( )
725+ if ( ! workerHealth ) {
726+ logSocketIO ( 'worker thread is unhealthy, recreating' )
727+ assert ( this . getPersistWorker )
728+ this . workerReady = false
729+ await this . persistWorker ?. removeAllListeners ( ) . terminate ( )
730+ this . persistWorker = this . getPersistWorker ( )
731+ this . registerPersistWorkerResolve ( )
732+ await this . waitUntilWorkerReady ( )
733+ }
734+ this . registerPersistWorkerHealthCheck ( )
735+ } , WORKER_HEALTH_CHECK_INTERVAL )
736+ }
737+
738+ async workerHealthCheck ( ) {
739+ if ( ! this . persistWorker || this . workerHeartbeatContext ) return null
740+ if (
741+ this . workerLastHeartbeat &&
742+ Date . now ( ) - this . workerLastHeartbeat < WORKER_HEALTH_CHECK_INTERVAL * 2
743+ ) {
744+ return true
745+ }
746+
747+ /** @type {ReturnType<typeof promiseWithResolvers<boolean>> } */
748+ const { promise : heartbeatPromise , resolve } = promiseWithResolvers ( )
749+ this . workerHeartbeatContext = { promise : heartbeatPromise , resolve }
750+ const now = performance . now ( )
751+ this . persistWorker . postMessage ( { event : 'ping' } )
752+ const health = await Promise . race ( [
753+ heartbeatPromise ,
754+ promise . wait ( 3000 ) . then ( ( ) => false )
755+ ] )
756+ this . workerHeartbeatContext = null
757+ if ( health ) {
758+ logSocketIO ( `worker health check: responded in ${ performance . now ( ) - now } ms` )
759+ this . workerLastHeartbeat = Date . now ( )
760+ }
761+ return health
762+ }
680763}
0 commit comments