18
18
19
19
import android .os .Process ;
20
20
21
+ import java .util .HashMap ;
22
+ import java .util .LinkedList ;
23
+ import java .util .Map ;
24
+ import java .util .Queue ;
21
25
import java .util .concurrent .BlockingQueue ;
22
26
23
27
/**
@@ -48,6 +52,9 @@ public class CacheDispatcher extends Thread {
48
52
/** Used for telling us to die. */
49
53
private volatile boolean mQuit = false ;
50
54
55
+ /** Manage list of waiting requests and de-duplicate requests with same cache key. */
56
+ private final WaitingRequestManager mWaitingRequestManager ;
57
+
51
58
/**
52
59
* Creates a new cache triage dispatcher thread. You must call {@link #start()}
53
60
* in order to begin processing.
@@ -64,6 +71,7 @@ public CacheDispatcher(
64
71
mNetworkQueue = networkQueue ;
65
72
mCache = cache ;
66
73
mDelivery = delivery ;
74
+ mWaitingRequestManager = new WaitingRequestManager (this );
67
75
}
68
76
69
77
/**
@@ -101,15 +109,19 @@ public void run() {
101
109
if (entry == null ) {
102
110
request .addMarker ("cache-miss" );
103
111
// Cache miss; send off to the network dispatcher.
104
- mNetworkQueue .put (request );
112
+ if (!mWaitingRequestManager .maybeAddToWaitingRequests (request )) {
113
+ mNetworkQueue .put (request );
114
+ }
105
115
continue ;
106
116
}
107
117
108
118
// If it is completely expired, just send it to the network.
109
119
if (entry .isExpired ()) {
110
120
request .addMarker ("cache-hit-expired" );
111
121
request .setCacheEntry (entry );
112
- mNetworkQueue .put (request );
122
+ if (!mWaitingRequestManager .maybeAddToWaitingRequests (request )) {
123
+ mNetworkQueue .put (request );
124
+ }
113
125
continue ;
114
126
}
115
127
@@ -128,22 +140,28 @@ public void run() {
128
140
// refreshing.
129
141
request .addMarker ("cache-hit-refresh-needed" );
130
142
request .setCacheEntry (entry );
131
-
132
143
// Mark the response as intermediate.
133
144
response .intermediate = true ;
134
145
135
- // Post the intermediate response back to the user and have
136
- // the delivery then forward the request along to the network.
137
- mDelivery .postResponse (request , response , new Runnable () {
138
- @ Override
139
- public void run () {
140
- try {
141
- mNetworkQueue .put (request );
142
- } catch (InterruptedException e ) {
143
- // Not much we can do about this.
146
+ if (!mWaitingRequestManager .maybeAddToWaitingRequests (request )) {
147
+ // Post the intermediate response back to the user and have
148
+ // the delivery then forward the request along to the network.
149
+ mDelivery .postResponse (request , response , new Runnable () {
150
+ @ Override
151
+ public void run () {
152
+ try {
153
+ mNetworkQueue .put (request );
154
+ } catch (InterruptedException e ) {
155
+ // Restore the interrupted status
156
+ Thread .currentThread ().interrupt ();
157
+ }
144
158
}
145
- }
146
- });
159
+ });
160
+ } else {
161
+ // request has been added to list of waiting requests
162
+ // to receive the network response from the first request once it returns.
163
+ mDelivery .postResponse (request , response );
164
+ }
147
165
}
148
166
149
167
} catch (InterruptedException e ) {
@@ -154,4 +172,112 @@ public void run() {
154
172
}
155
173
}
156
174
}
175
+
176
+ private static class WaitingRequestManager implements Request .NetworkRequestCompleteListener {
177
+
178
+ /**
179
+ * Staging area for requests that already have a duplicate request in flight.
180
+ *
181
+ * <ul>
182
+ * <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache
183
+ * key.</li>
184
+ * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request
185
+ * is <em>not</em> contained in that list. Is null if no requests are staged.</li>
186
+ * </ul>
187
+ */
188
+ private final Map <String , Queue <Request <?>>> mWaitingRequests = new HashMap <>();
189
+
190
+ private final CacheDispatcher mCacheDispatcher ;
191
+
192
+ WaitingRequestManager (CacheDispatcher cacheDispatcher ) {
193
+ mCacheDispatcher = cacheDispatcher ;
194
+ }
195
+
196
+ /** Request received a valid response that can be used by other waiting requests. */
197
+ @ Override
198
+ public void onResponseReceived (Request <?> request , Response <?> response ) {
199
+ if (response .cacheEntry == null || response .cacheEntry .isExpired ()) {
200
+ onNoUsableResponseReceived (request );
201
+ return ;
202
+ }
203
+ String cacheKey = request .getCacheKey ();
204
+ Queue <Request <?>> waitingRequests ;
205
+ synchronized (this ) {
206
+ waitingRequests = mWaitingRequests .remove (cacheKey );
207
+ }
208
+ if (waitingRequests != null ) {
209
+ if (VolleyLog .DEBUG ) {
210
+ VolleyLog .v ("Releasing %d waiting requests for cacheKey=%s." ,
211
+ waitingRequests .size (), cacheKey );
212
+ }
213
+ // Process all queued up requests.
214
+ for (Request <?> waiting : waitingRequests ) {
215
+ mCacheDispatcher .mDelivery .postResponse (waiting , response );
216
+ }
217
+ }
218
+ }
219
+
220
+ /** No valid response received from network, release waiting requests. */
221
+ @ Override
222
+ public synchronized void onNoUsableResponseReceived (Request <?> request ) {
223
+ String cacheKey = request .getCacheKey ();
224
+ Queue <Request <?>> waitingRequests = mWaitingRequests .remove (cacheKey );
225
+ if (waitingRequests != null ) {
226
+ if (VolleyLog .DEBUG ) {
227
+ VolleyLog .v ("%d waiting requests for cacheKey=%s; resend to network" ,
228
+ waitingRequests .size (), cacheKey );
229
+ }
230
+ Request <?> nextInLine = waitingRequests .remove ();
231
+ if (nextInLine == null ) {
232
+ return ;
233
+ }
234
+ mWaitingRequests .put (cacheKey , waitingRequests );
235
+ try {
236
+ mCacheDispatcher .mNetworkQueue .put (nextInLine );
237
+ } catch (InterruptedException iex ) {
238
+ VolleyLog .e ("Couldn't add request to queue. %s" , iex .toString ());
239
+ // Restore the interrupted status of the calling thread (i.e. NetworkDispatcher)
240
+ Thread .currentThread ().interrupt ();
241
+ // Quit the current CacheDispatcher thread.
242
+ mCacheDispatcher .quit ();
243
+ }
244
+ }
245
+ }
246
+
247
+ /**
248
+ * For cacheable requests, if a request for the same cache key is already in flight,
249
+ * add it to a queue to wait for that in-flight request to finish.
250
+ * @return whether the request was queued. If false, we should continue issuing the request
251
+ * over the network. If true, we should put the request on hold to be processed when
252
+ * the in-flight request finishes.
253
+ */
254
+ private synchronized boolean maybeAddToWaitingRequests (Request <?> request ) {
255
+ String cacheKey = request .getCacheKey ();
256
+ // Insert request into stage if there's already a request with the same cache key
257
+ // in flight.
258
+ if (mWaitingRequests .containsKey (cacheKey )) {
259
+ // There is already a request in flight. Queue up.
260
+ Queue <Request <?>> stagedRequests = mWaitingRequests .get (cacheKey );
261
+ if (stagedRequests == null ) {
262
+ stagedRequests = new LinkedList <Request <?>>();
263
+ }
264
+ request .addMarker ("waiting-for-response" );
265
+ stagedRequests .add (request );
266
+ mWaitingRequests .put (cacheKey , stagedRequests );
267
+ if (VolleyLog .DEBUG ) {
268
+ VolleyLog .d ("Request for cacheKey=%s is in flight, putting on hold." , cacheKey );
269
+ }
270
+ return true ;
271
+ } else {
272
+ // Insert 'null' queue for this cacheKey, indicating there is now a request in
273
+ // flight.
274
+ mWaitingRequests .put (cacheKey , null );
275
+ request .setNetworkRequestCompleteListener (this );
276
+ if (VolleyLog .DEBUG ) {
277
+ VolleyLog .d ("new request, sending to network %s" , cacheKey );
278
+ }
279
+ return false ;
280
+ }
281
+ }
282
+ }
157
283
}
0 commit comments