1
1
import { createNanoEvents } from 'nanoevents'
2
2
3
3
import {
4
- sendConnect ,
5
- sendConnected ,
4
+ connectedMessage ,
6
5
connectMessage ,
7
- connectedMessage
6
+ sendConnect ,
7
+ sendConnected
8
8
} from '../connect/index.js'
9
+ import { debugMessage , sendDebug } from '../debug/index.js'
10
+ import { errorMessage , sendError } from '../error/index.js'
11
+ import { headersMessage , sendHeaders } from '../headers/index.js'
12
+ import { LoguxError } from '../logux-error/index.js'
13
+ import { pingMessage , pongMessage , sendPing } from '../ping/index.js'
9
14
import {
10
- syncedMessage ,
11
- syncMessage ,
15
+ sendSync ,
12
16
sendSynced ,
13
- sendSync
17
+ syncedMessage ,
18
+ syncMessage
14
19
} from '../sync/index.js'
15
- import { sendPing , pingMessage , pongMessage } from '../ping/index.js'
16
- import { sendHeaders , headersMessage } from '../headers/index.js'
17
- import { sendDebug , debugMessage } from '../debug/index.js'
18
- import { sendError , errorMessage } from '../error/index.js'
19
- import { LoguxError } from '../logux-error/index.js'
20
20
21
21
const NOT_TO_THROW = {
22
- 'wrong-subprotocol ' : true ,
22
+ 'timeout ' : true ,
23
23
'wrong-protocol' : true ,
24
- 'timeout ' : true
24
+ 'wrong-subprotocol ' : true
25
25
}
26
26
27
27
const BEFORE_AUTH = [ 'connect' , 'connected' , 'error' , 'debug' , 'headers' ]
@@ -112,10 +112,6 @@ export class BaseNode {
112
112
this . remoteHeaders = { }
113
113
}
114
114
115
- on ( event , listener ) {
116
- return this . emitter . on ( event , listener )
117
- }
118
-
119
115
catch ( listener ) {
120
116
this . throwsError = false
121
117
let unbind = this . on ( 'error' , listener )
@@ -125,18 +121,13 @@ export class BaseNode {
125
121
}
126
122
}
127
123
128
- waitFor ( state ) {
129
- if ( this . state === state ) {
130
- return Promise . resolve ( )
131
- }
132
- return new Promise ( resolve => {
133
- let unbind = this . on ( 'state' , ( ) => {
134
- if ( this . state === state ) {
135
- unbind ( )
136
- resolve ( )
137
- }
138
- } )
139
- } )
124
+ delayPing ( ) {
125
+ if ( ! this . options . ping ) return
126
+ if ( this . pingTimeout ) clearTimeout ( this . pingTimeout )
127
+
128
+ this . pingTimeout = setTimeout ( ( ) => {
129
+ if ( this . connected && this . authenticated ) this . sendPing ( )
130
+ } , this . options . ping )
140
131
}
141
132
142
133
destroy ( ) {
@@ -150,32 +141,78 @@ export class BaseNode {
150
141
this . endTimeout ( )
151
142
}
152
143
153
- setLocalHeaders ( headers ) {
154
- this . localHeaders = headers
155
- if ( this . connected ) {
156
- this . sendHeaders ( headers )
144
+ duilianMessage ( line ) {
145
+ if ( DUILIANS [ line ] ) {
146
+ this . send ( [ 'duilian' , DUILIANS [ line ] ] )
157
147
}
158
148
}
159
149
160
- send ( msg ) {
161
- if ( ! this . connected ) return
162
- this . delayPing ( )
163
- try {
164
- this . connection . send ( msg )
165
- } catch ( e ) {
166
- this . error ( e )
150
+ endTimeout ( ) {
151
+ if ( this . timeouts . length > 0 ) {
152
+ clearTimeout ( this . timeouts . shift ( ) )
167
153
}
168
154
}
169
155
170
- onConnecting ( ) {
171
- this . setState ( 'connecting' )
156
+ error ( err ) {
157
+ this . emitter . emit ( 'error' , err )
158
+ this . connection . disconnect ( 'error' )
159
+ if ( this . throwsError ) {
160
+ throw err
161
+ }
162
+ }
163
+
164
+ async initialize ( ) {
165
+ let [ synced , added ] = await Promise . all ( [
166
+ this . log . store . getLastSynced ( ) ,
167
+ this . log . store . getLastAdded ( )
168
+ ] )
169
+ this . initialized = true
170
+ this . lastSent = synced . sent
171
+ this . lastReceived = synced . received
172
+ this . lastAddedCache = added
173
+ if ( this . connection . connected ) this . onConnect ( )
174
+ }
175
+
176
+ now ( ) {
177
+ return Date . now ( )
178
+ }
179
+
180
+ on ( event , listener ) {
181
+ return this . emitter . on ( event , listener )
182
+ }
183
+
184
+ async onAdd ( action , meta ) {
185
+ if ( ! this . authenticated ) return
186
+ if ( this . lastAddedCache < meta . added ) {
187
+ this . lastAddedCache = meta . added
188
+ }
189
+
190
+ if ( this . received && this . received [ meta . id ] ) {
191
+ delete this . received [ meta . id ]
192
+ return
193
+ }
194
+
195
+ if ( this . options . outFilter ) {
196
+ try {
197
+ let result = await this . options . outFilter ( action , meta )
198
+ if ( result ) syncMappedEvent ( this , action , meta )
199
+ } catch ( e ) {
200
+ this . error ( e )
201
+ }
202
+ } else {
203
+ syncMappedEvent ( this , action , meta )
204
+ }
172
205
}
173
206
174
207
onConnect ( ) {
175
208
this . delayPing ( )
176
209
this . connected = true
177
210
}
178
211
212
+ onConnecting ( ) {
213
+ this . setState ( 'connecting' )
214
+ }
215
+
179
216
onDisconnect ( ) {
180
217
while ( this . timeouts . length > 0 ) {
181
218
this . endTimeout ( )
@@ -199,42 +236,36 @@ export class BaseNode {
199
236
this [ name + 'Message' ] ( ...msg . slice ( 1 ) )
200
237
}
201
238
202
- async onAdd ( action , meta ) {
203
- if ( ! this . authenticated ) return
204
- if ( this . lastAddedCache < meta . added ) {
205
- this . lastAddedCache = meta . added
239
+ send ( msg ) {
240
+ if ( ! this . connected ) return
241
+ this . delayPing ( )
242
+ try {
243
+ this . connection . send ( msg )
244
+ } catch ( e ) {
245
+ this . error ( e )
206
246
}
247
+ }
207
248
208
- if ( this . received && this . received [ meta . id ] ) {
209
- delete this . received [ meta . id ]
210
- return
211
- }
249
+ sendDuilian ( ) {
250
+ this . send ( [ 'duilian' , Object . keys ( DUILIANS ) [ 0 ] ] )
251
+ }
212
252
213
- if ( this . options . outFilter ) {
214
- try {
215
- let result = await this . options . outFilter ( action , meta )
216
- if ( result ) syncMappedEvent ( this , action , meta )
217
- } catch ( e ) {
218
- this . error ( e )
219
- }
220
- } else {
221
- syncMappedEvent ( this , action , meta )
222
- }
253
+ setLastReceived ( value ) {
254
+ if ( this . lastReceived < value ) this . lastReceived = value
255
+ this . log . store . setLastSynced ( { received : value } )
223
256
}
224
257
225
- syncError ( type , options , received ) {
226
- let err = new LoguxError ( type , options , received )
227
- this . emitter . emit ( 'error' , err )
228
- if ( ! NOT_TO_THROW [ type ] && this . throwsError ) {
229
- throw err
258
+ setLastSent ( value ) {
259
+ if ( this . lastSent < value ) {
260
+ this . lastSent = value
261
+ this . log . store . setLastSynced ( { sent : value } )
230
262
}
231
263
}
232
264
233
- error ( err ) {
234
- this . emitter . emit ( 'error' , err )
235
- this . connection . disconnect ( 'error' )
236
- if ( this . throwsError ) {
237
- throw err
265
+ setLocalHeaders ( headers ) {
266
+ this . localHeaders = headers
267
+ if ( this . connected ) {
268
+ this . sendHeaders ( headers )
238
269
}
239
270
}
240
271
@@ -257,19 +288,36 @@ export class BaseNode {
257
288
this . timeouts . push ( timeout )
258
289
}
259
290
260
- endTimeout ( ) {
261
- if ( this . timeouts . length > 0 ) {
262
- clearTimeout ( this . timeouts . shift ( ) )
291
+ syncError ( type , options , received ) {
292
+ let err = new LoguxError ( type , options , received )
293
+ this . emitter . emit ( 'error' , err )
294
+ if ( ! NOT_TO_THROW [ type ] && this . throwsError ) {
295
+ throw err
263
296
}
264
297
}
265
298
266
- delayPing ( ) {
267
- if ( ! this . options . ping ) return
268
- if ( this . pingTimeout ) clearTimeout ( this . pingTimeout )
269
-
270
- this . pingTimeout = setTimeout ( ( ) => {
271
- if ( this . connected && this . authenticated ) this . sendPing ( )
272
- } , this . options . ping )
299
+ async syncSince ( lastSynced ) {
300
+ let data = await this . syncSinceQuery ( lastSynced )
301
+ if ( ! this . connected ) return
302
+ if ( data . entries . length > 0 ) {
303
+ if ( this . options . outMap ) {
304
+ Promise . all (
305
+ data . entries . map ( i => {
306
+ return this . options . outMap ( i [ 0 ] , i [ 1 ] )
307
+ } )
308
+ )
309
+ . then ( changed => {
310
+ this . sendSync ( data . added , changed )
311
+ } )
312
+ . catch ( e => {
313
+ this . error ( e )
314
+ } )
315
+ } else {
316
+ this . sendSync ( data . added , data . entries )
317
+ }
318
+ } else {
319
+ this . setState ( 'synchronized' )
320
+ }
273
321
}
274
322
275
323
async syncSinceQuery ( lastSynced ) {
@@ -309,66 +357,18 @@ export class BaseNode {
309
357
return data
310
358
}
311
359
312
- async syncSince ( lastSynced ) {
313
- let data = await this . syncSinceQuery ( lastSynced )
314
- if ( ! this . connected ) return
315
- if ( data . entries . length > 0 ) {
316
- if ( this . options . outMap ) {
317
- Promise . all (
318
- data . entries . map ( i => {
319
- return this . options . outMap ( i [ 0 ] , i [ 1 ] )
320
- } )
321
- )
322
- . then ( changed => {
323
- this . sendSync ( data . added , changed )
324
- } )
325
- . catch ( e => {
326
- this . error ( e )
327
- } )
328
- } else {
329
- this . sendSync ( data . added , data . entries )
330
- }
331
- } else {
332
- this . setState ( 'synchronized' )
333
- }
334
- }
335
-
336
- setLastSent ( value ) {
337
- if ( this . lastSent < value ) {
338
- this . lastSent = value
339
- this . log . store . setLastSynced ( { sent : value } )
340
- }
341
- }
342
-
343
- setLastReceived ( value ) {
344
- if ( this . lastReceived < value ) this . lastReceived = value
345
- this . log . store . setLastSynced ( { received : value } )
346
- }
347
-
348
- now ( ) {
349
- return Date . now ( )
350
- }
351
-
352
- async initialize ( ) {
353
- let [ synced , added ] = await Promise . all ( [
354
- this . log . store . getLastSynced ( ) ,
355
- this . log . store . getLastAdded ( )
356
- ] )
357
- this . initialized = true
358
- this . lastSent = synced . sent
359
- this . lastReceived = synced . received
360
- this . lastAddedCache = added
361
- if ( this . connection . connected ) this . onConnect ( )
362
- }
363
-
364
- sendDuilian ( ) {
365
- this . send ( [ 'duilian' , Object . keys ( DUILIANS ) [ 0 ] ] )
366
- }
367
-
368
- duilianMessage ( line ) {
369
- if ( DUILIANS [ line ] ) {
370
- this . send ( [ 'duilian' , DUILIANS [ line ] ] )
360
+ waitFor ( state ) {
361
+ if ( this . state === state ) {
362
+ return Promise . resolve ( )
371
363
}
364
+ return new Promise ( resolve => {
365
+ let unbind = this . on ( 'state' , ( ) => {
366
+ if ( this . state === state ) {
367
+ unbind ( )
368
+ resolve ( )
369
+ }
370
+ } )
371
+ } )
372
372
}
373
373
}
374
374
0 commit comments