1
1
import * as Y from 'yjs'
2
- import * as redis from 'redis'
2
+ import { createClient , defineScript , commandOptions } from 'redis'
3
3
import * as map from 'lib0/map'
4
4
import * as decoding from 'lib0/decoding'
5
5
import * as awarenessProtocol from 'y-protocols/awareness'
@@ -97,7 +97,6 @@ export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwar
97
97
}
98
98
99
99
export class Api {
100
- /** @type {import('@redis/client').RedisClientType<any, any, any> & { addMessage: (key: string, message: Buffer) => Promise<any>, xDelIfEmpty: (key: string) => Promise<any> } } */
101
100
redis
102
101
/**
103
102
* @param {import('./storage.js').AbstractStorage } store
@@ -140,12 +139,11 @@ export class Api {
140
139
redis.call("EXPIRE", KEYS[1], ${ ROOM_STREAM_TTL } )
141
140
`
142
141
143
- /** @type {import('@redis/client').RedisClientType & { addMessage: (key: string, message: Buffer) => Promise<any>, xDelIfEmpty: (key: string) => Promise<any> } } */
144
- this . redis = redis . createClient ( {
142
+ this . redis = createClient ( {
145
143
url,
146
144
// scripting: https://github.com/redis/node-redis/#lua-scripts
147
145
scripts : {
148
- addMessage : redis . defineScript ( {
146
+ addMessage : defineScript ( {
149
147
NUMBER_OF_KEYS : 1 ,
150
148
SCRIPT : addScript ,
151
149
/**
@@ -162,7 +160,7 @@ export class Api {
162
160
return x
163
161
}
164
162
} ) ,
165
- xDelIfEmpty : redis . defineScript ( {
163
+ xDelIfEmpty : defineScript ( {
166
164
NUMBER_OF_KEYS : 1 ,
167
165
SCRIPT : `
168
166
if redis.call("XLEN", KEYS[1]) == 0 then
@@ -196,7 +194,7 @@ export class Api {
196
194
return [ ]
197
195
}
198
196
const reads = await this . redis . xRead (
199
- redis . commandOptions ( { returnBuffers : true } ) ,
197
+ commandOptions ( { returnBuffers : true } ) ,
200
198
streams ,
201
199
{ BLOCK : 1000 , COUNT : 1000 }
202
200
)
@@ -244,7 +242,7 @@ export class Api {
244
242
* @param {string } docid
245
243
*/
246
244
async getDoc ( room , docid ) {
247
- const ms = extractMessagesFromStreamReply ( await this . redis . xRead ( redis . commandOptions ( { returnBuffers : true } ) , { key : computeRedisRoomStreamName ( room , docid , this . prefix ) , id : '0' } ) , this . prefix )
245
+ const ms = extractMessagesFromStreamReply ( await this . redis . xRead ( commandOptions ( { returnBuffers : true } ) , { key : computeRedisRoomStreamName ( room , docid , this . prefix ) , id : '0' } ) , this . prefix )
248
246
const docMessages = ms . get ( room ) ?. get ( docid ) || null
249
247
if ( docMessages ?. messages ) logApi ( `processing messages of length: ${ docMessages ?. messages . length } in room: ${ room } ` )
250
248
const docstate = await this . store . retrieveDoc ( room , docid )
@@ -292,7 +290,7 @@ export class Api {
292
290
* @param {string } docid
293
291
*/
294
292
async getRedisLastId ( room , docid ) {
295
- const ms = extractMessagesFromStreamReply ( await this . redis . xRead ( redis . commandOptions ( { returnBuffers : true } ) , { key : computeRedisRoomStreamName ( room , docid , this . prefix ) , id : '0' } ) , this . prefix )
293
+ const ms = extractMessagesFromStreamReply ( await this . redis . xRead ( commandOptions ( { returnBuffers : true } ) , { key : computeRedisRoomStreamName ( room , docid , this . prefix ) , id : '0' } ) , this . prefix )
296
294
const docMessages = ms . get ( room ) ?. get ( docid ) || null
297
295
return docMessages ?. lastId . toString ( ) || '0'
298
296
}
@@ -342,7 +340,6 @@ export class Api {
342
340
const streamlen = await this . redis . xLen ( task . stream )
343
341
if ( streamlen === 0 ) {
344
342
await this . redis . multi ( )
345
- // @ts -expect-error custom script on multi
346
343
. xDelIfEmpty ( task . stream )
347
344
. xAck ( this . redisWorkerStreamName , this . redisWorkerGroupName , task . id )
348
345
. xDel ( this . redisWorkerStreamName , task . id )
0 commit comments