@@ -70,6 +70,12 @@ function wrapReason(reason) {
70
70
}
71
71
72
72
class MessageHandler {
73
+ #cancelledPageIds = new Set ( ) ;
74
+
75
+ #executorRunning = false ;
76
+
77
+ #queue = [ ] ;
78
+
73
79
constructor ( sourceName , targetName , comObj ) {
74
80
this . sourceName = sourceName ;
75
81
this . targetName = targetName ;
@@ -81,71 +87,111 @@ class MessageHandler {
81
87
this . callbackCapabilities = Object . create ( null ) ;
82
88
this . actionHandler = Object . create ( null ) ;
83
89
84
- this . _onComObjOnMessage = event => {
85
- const data = event . data ;
86
- if ( data . targetName !== this . sourceName ) {
87
- return ;
88
- }
89
- if ( data . stream ) {
90
- this . #processStreamMessage( data ) ;
91
- return ;
92
- }
93
- if ( data . callback ) {
94
- const callbackId = data . callbackId ;
95
- const capability = this . callbackCapabilities [ callbackId ] ;
96
- if ( ! capability ) {
97
- throw new Error ( `Cannot resolve callback ${ callbackId } ` ) ;
90
+ this . _onComObjOnMessage = ( { data } ) => {
91
+ if ( data . targetName === this . sourceName ) {
92
+ // The meesages in the worker queue are processed with a
93
+ // higher priority than the tasks in the event queue.
94
+ // So, postponing the task execution, will ensure that the message
95
+ // queue is drained.
96
+ // If at some point we've a cancelled task (e.g. GetOperatorList),
97
+ // we're able to skip the task execution with the same pageId.
98
+ this . #queue. push ( data ) ;
99
+ if ( data . pageId && data . stream === StreamKind . CANCEL ) {
100
+ this . #cancelledPageIds. add ( data . pageId ) ;
98
101
}
99
- delete this . callbackCapabilities [ callbackId ] ;
100
-
101
- if ( data . callback === CallbackKind . DATA ) {
102
- capability . resolve ( data . data ) ;
103
- } else if ( data . callback === CallbackKind . ERROR ) {
104
- capability . reject ( wrapReason ( data . reason ) ) ;
105
- } else {
106
- throw new Error ( "Unexpected callback case" ) ;
102
+ if ( ! this . #executorRunning) {
103
+ this . #executorRunning = true ;
104
+ this . #postponeExecution( ) ;
107
105
}
108
- return ;
109
106
}
110
- const action = this . actionHandler [ data . action ] ;
111
- if ( ! action ) {
112
- throw new Error ( `Unknown action from worker: ${ data . action } ` ) ;
107
+ } ;
108
+ comObj . addEventListener ( "message" , this . _onComObjOnMessage ) ;
109
+ }
110
+
111
+ #postponeExecution( ) {
112
+ setTimeout ( this . #executor. bind ( this ) , 0 ) ;
113
+ }
114
+
115
+ #executor( ) {
116
+ if ( this . #queue. length === 0 ) {
117
+ this . #cancelledPageIds. clear ( ) ;
118
+ this . #executorRunning = false ;
119
+ return ;
120
+ }
121
+
122
+ const data = this . #queue. shift ( ) ;
123
+
124
+ if ( data . stream ) {
125
+ if (
126
+ data . stream === StreamKind . CANCEL ||
127
+ ! this . #cancelledPageIds. has ( data . pageId )
128
+ ) {
129
+ this . #processStreamMessage( data ) ;
113
130
}
114
- if ( data . callbackId ) {
115
- const cbSourceName = this . sourceName ;
116
- const cbTargetName = data . sourceName ;
131
+ this . #postponeExecution ( ) ;
132
+ return ;
133
+ }
117
134
118
- new Promise ( function ( resolve ) {
119
- resolve ( action ( data . data ) ) ;
120
- } ) . then (
121
- function ( result ) {
122
- comObj . postMessage ( {
123
- sourceName : cbSourceName ,
124
- targetName : cbTargetName ,
125
- callback : CallbackKind . DATA ,
126
- callbackId : data . callbackId ,
127
- data : result ,
128
- } ) ;
129
- } ,
130
- function ( reason ) {
131
- comObj . postMessage ( {
132
- sourceName : cbSourceName ,
133
- targetName : cbTargetName ,
134
- callback : CallbackKind . ERROR ,
135
- callbackId : data . callbackId ,
136
- reason : wrapReason ( reason ) ,
137
- } ) ;
138
- }
139
- ) ;
140
- return ;
135
+ const pageId = data . data ?. pageId ;
136
+ if ( pageId && this . #cancelledPageIds. has ( pageId ) ) {
137
+ this . #postponeExecution( ) ;
138
+ return ;
139
+ }
140
+
141
+ if ( data . callback ) {
142
+ const callbackId = data . callbackId ;
143
+ const capability = this . callbackCapabilities [ callbackId ] ;
144
+ if ( ! capability ) {
145
+ throw new Error ( `Cannot resolve callback ${ callbackId } ` ) ;
141
146
}
142
- if ( data . streamId ) {
143
- this . #createStreamSink( data ) ;
144
- return ;
147
+ delete this . callbackCapabilities [ callbackId ] ;
148
+
149
+ if ( data . callback === CallbackKind . DATA ) {
150
+ capability . resolve ( data . data ) ;
151
+ } else if ( data . callback === CallbackKind . ERROR ) {
152
+ capability . reject ( wrapReason ( data . reason ) ) ;
153
+ } else {
154
+ throw new Error ( "Unexpected callback case" ) ;
145
155
}
156
+ this . #postponeExecution( ) ;
157
+ return ;
158
+ }
159
+ const action = this . actionHandler [ data . action ] ;
160
+ if ( ! action ) {
161
+ throw new Error ( `Unknown action from worker: ${ data . action } ` ) ;
162
+ }
163
+ if ( data . callbackId ) {
164
+ const cbSourceName = this . sourceName ;
165
+ const cbTargetName = data . sourceName ;
166
+
167
+ new Promise ( function ( resolve ) {
168
+ resolve ( action ( data . data ) ) ;
169
+ } ) . then (
170
+ result => {
171
+ this . comObj . postMessage ( {
172
+ sourceName : cbSourceName ,
173
+ targetName : cbTargetName ,
174
+ callback : CallbackKind . DATA ,
175
+ callbackId : data . callbackId ,
176
+ data : result ,
177
+ } ) ;
178
+ } ,
179
+ reason => {
180
+ this . comObj . postMessage ( {
181
+ sourceName : cbSourceName ,
182
+ targetName : cbTargetName ,
183
+ callback : CallbackKind . ERROR ,
184
+ callbackId : data . callbackId ,
185
+ reason : wrapReason ( reason ) ,
186
+ } ) ;
187
+ }
188
+ ) ;
189
+ } else if ( data . streamId ) {
190
+ this . #createStreamSink( data ) ;
191
+ } else {
146
192
action ( data . data ) ;
147
- } ;
148
- comObj . addEventListener ( "message" , this . _onComObjOnMessage ) ;
193
+ }
194
+ this . #postponeExecution ( ) ;
149
195
}
150
196
151
197
on ( actionName , handler ) {
@@ -224,6 +270,7 @@ class MessageHandler {
224
270
sourceName = this . sourceName ,
225
271
targetName = this . targetName ,
226
272
comObj = this . comObj ;
273
+ const pageId = data ?. pageId ;
227
274
228
275
return new ReadableStream (
229
276
{
@@ -276,6 +323,7 @@ class MessageHandler {
276
323
targetName,
277
324
stream : StreamKind . CANCEL ,
278
325
streamId,
326
+ pageId,
279
327
reason : wrapReason ( reason ) ,
280
328
} ) ;
281
329
// Return Promise to signal success or failure.
0 commit comments