1+ /*
2+ * ====================================================================
3+ * Licensed to the Apache Software Foundation (ASF) under one
4+ * or more contributor license agreements. See the NOTICE file
5+ * distributed with this work for additional information
6+ * regarding copyright ownership. The ASF licenses this file
7+ * to you under the Apache License, Version 2.0 (the
8+ * "License"); you may not use this file except in compliance
9+ * with the License. You may obtain a copy of the License at
10+ *
11+ * http://www.apache.org/licenses/LICENSE-2.0
12+ *
13+ * Unless required by applicable law or agreed to in writing,
14+ * software distributed under the License is distributed on an
15+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+ * KIND, either express or implied. See the License for the
17+ * specific language governing permissions and limitations
18+ * under the License.
19+ * ====================================================================
20+ *
21+ * This software consists of voluntary contributions made by many
22+ * individuals on behalf of the Apache Software Foundation. For more
23+ * information on the Apache Software Foundation, please see
24+ * <http://www.apache.org/>.
25+ *
26+ */
27+ package org .apache .hc .core5 .testing .pool ;
28+
29+ import static org .junit .jupiter .api .Assertions .assertTrue ;
30+ import static org .junit .jupiter .api .Assertions .fail ;
31+
32+ import java .io .IOException ;
33+ import java .util .concurrent .CountDownLatch ;
34+ import java .util .concurrent .ExecutionException ;
35+ import java .util .concurrent .Future ;
36+ import java .util .concurrent .TimeUnit ;
37+
38+ import org .apache .hc .core5 .http .ContentType ;
39+ import org .apache .hc .core5 .http .HttpHost ;
40+ import org .apache .hc .core5 .http .HttpResponse ;
41+ import org .apache .hc .core5 .http .HttpStatus ;
42+ import org .apache .hc .core5 .http .Message ;
43+ import org .apache .hc .core5 .http .Method ;
44+ import org .apache .hc .core5 .http .impl .bootstrap .AsyncRequesterBootstrap ;
45+ import org .apache .hc .core5 .http .impl .bootstrap .HttpAsyncRequester ;
46+ import org .apache .hc .core5 .http .io .SocketConfig ;
47+ import org .apache .hc .core5 .http .io .entity .StringEntity ;
48+ import org .apache .hc .core5 .http .nio .AsyncClientEndpoint ;
49+ import org .apache .hc .core5 .http .nio .entity .StringAsyncEntityConsumer ;
50+ import org .apache .hc .core5 .http .nio .support .BasicRequestProducer ;
51+ import org .apache .hc .core5 .http .nio .support .BasicResponseConsumer ;
52+ import org .apache .hc .core5 .io .CloseMode ;
53+ import org .apache .hc .core5 .pool .PoolConcurrencyPolicy ;
54+ import org .apache .hc .core5 .pool .PoolStats ;
55+ import org .apache .hc .core5 .testing .classic .ClassicTestServer ;
56+ import org .apache .hc .core5 .util .TimeValue ;
57+ import org .apache .hc .core5 .util .Timeout ;
58+ import org .junit .jupiter .params .ParameterizedTest ;
59+ import org .junit .jupiter .params .provider .EnumSource ;
60+
61+ final class TestConnPoolAsyncRequesterIT {
62+
63+ private static final TimeValue WAIT = TimeValue .ofSeconds (2 );
64+
65+ private static final class ServerResources {
66+
67+ private final ClassicTestServer server ;
68+ private final HttpHost target ;
69+ private final CountDownLatch blockArrived ;
70+ private final CountDownLatch blockRelease ;
71+
72+ private ServerResources (
73+ final ClassicTestServer server ,
74+ final HttpHost target ,
75+ final CountDownLatch blockArrived ,
76+ final CountDownLatch blockRelease ) {
77+ this .server = server ;
78+ this .target = target ;
79+ this .blockArrived = blockArrived ;
80+ this .blockRelease = blockRelease ;
81+ }
82+
83+ private void releaseBlocked () {
84+ blockRelease .countDown ();
85+ }
86+ }
87+
88+ private static ServerResources startServer () throws IOException {
89+ final ClassicTestServer server = new ClassicTestServer (SocketConfig .DEFAULT );
90+
91+ final CountDownLatch blockArrived = new CountDownLatch (1 );
92+ final CountDownLatch blockRelease = new CountDownLatch (1 );
93+
94+ server .register ("/ok" , (request , response , context ) -> {
95+ response .setCode (HttpStatus .SC_OK );
96+ response .setEntity (new StringEntity ("OK" , ContentType .TEXT_PLAIN ));
97+ });
98+
99+ server .register ("/block" , (request , response , context ) -> {
100+ blockArrived .countDown ();
101+ try {
102+ blockRelease .await (10 , TimeUnit .SECONDS );
103+ } catch (final InterruptedException ex ) {
104+ Thread .currentThread ().interrupt ();
105+ }
106+ response .setCode (HttpStatus .SC_OK );
107+ response .setEntity (new StringEntity ("OK" , ContentType .TEXT_PLAIN ));
108+ });
109+
110+ server .start ();
111+
112+ final HttpHost target = new HttpHost (
113+ "http" ,
114+ server .getInetAddress ().getHostAddress (),
115+ server .getPort ());
116+
117+ return new ServerResources (server , target , blockArrived , blockRelease );
118+ }
119+
120+ private static HttpAsyncRequester createRequester (final PoolConcurrencyPolicy policy ) {
121+ final HttpAsyncRequester requester = AsyncRequesterBootstrap .bootstrap ()
122+ .setPoolConcurrencyPolicy (policy )
123+ .setDefaultMaxPerRoute (1 )
124+ .setMaxTotal (1 )
125+ .setTimeToLive (Timeout .ofSeconds (30 ))
126+ .create ();
127+ requester .start ();
128+ return requester ;
129+ }
130+
131+ private static boolean awaitPending (
132+ final HttpAsyncRequester requester ,
133+ final HttpHost route ,
134+ final int expectedPending ,
135+ final TimeValue maxWait ) throws InterruptedException {
136+
137+ final long deadline = System .currentTimeMillis () + maxWait .toMilliseconds ();
138+ while (System .currentTimeMillis () < deadline ) {
139+ final PoolStats stats = requester .getStats (route );
140+ if (stats .getPending () >= expectedPending ) {
141+ return true ;
142+ }
143+ Thread .sleep (10 );
144+ }
145+ return false ;
146+ }
147+
148+ private static boolean awaitQuiescent (
149+ final HttpAsyncRequester requester ,
150+ final HttpHost route ,
151+ final TimeValue maxWait ) throws InterruptedException {
152+
153+ final long deadline = System .currentTimeMillis () + maxWait .toMilliseconds ();
154+ while (System .currentTimeMillis () < deadline ) {
155+ final PoolStats total = requester .getTotalStats ();
156+ final PoolStats stats = requester .getStats (route );
157+ if (total .getLeased () == 0 && total .getPending () == 0
158+ && stats .getLeased () == 0 && stats .getPending () == 0 ) {
159+ return true ;
160+ }
161+ Thread .sleep (10 );
162+ }
163+ return false ;
164+ }
165+
166+ @ ParameterizedTest
167+ @ EnumSource (PoolConcurrencyPolicy .class )
168+ @ org .junit .jupiter .api .Timeout (value = 30 , unit = TimeUnit .SECONDS )
169+ void cancelPendingConnectMustNotLeakOrBlockNextConnect (final PoolConcurrencyPolicy policy ) throws Exception {
170+ final ServerResources srv = startServer ();
171+ final HttpAsyncRequester requester = createRequester (policy );
172+
173+ try {
174+ final Future <AsyncClientEndpoint > f1 = requester .connect (srv .target , Timeout .ofSeconds (2 ));
175+ final AsyncClientEndpoint ep1 = f1 .get (2 , TimeUnit .SECONDS );
176+
177+ final Future <AsyncClientEndpoint > f2 = requester .connect (srv .target , Timeout .ofSeconds (30 ));
178+ assertTrue (awaitPending (requester , srv .target , 1 , WAIT ), "Second connect did not become pending" );
179+ assertTrue (f2 .cancel (true ), "Cancel failed" );
180+
181+ ep1 .releaseAndReuse ();
182+
183+ final AsyncClientEndpoint ep3 = requester .connect (srv .target , Timeout .ofSeconds (2 )).get (2 , TimeUnit .SECONDS );
184+ ep3 .releaseAndReuse ();
185+
186+ assertTrue (awaitQuiescent (requester , srv .target , WAIT ), "Pool did not quiesce" );
187+ } finally {
188+ requester .close (CloseMode .IMMEDIATE );
189+ srv .server .shutdown (CloseMode .IMMEDIATE );
190+ }
191+ }
192+
193+ @ ParameterizedTest
194+ @ EnumSource (PoolConcurrencyPolicy .class )
195+ @ org .junit .jupiter .api .Timeout (value = 30 , unit = TimeUnit .SECONDS )
196+ void connectTimeoutWhilePendingMustNotLeak (final PoolConcurrencyPolicy policy ) throws Exception {
197+ final ServerResources srv = startServer ();
198+ final HttpAsyncRequester requester = createRequester (policy );
199+
200+ try {
201+ final AsyncClientEndpoint ep1 = requester .connect (srv .target , Timeout .ofSeconds (2 )).get (2 , TimeUnit .SECONDS );
202+
203+ final Future <AsyncClientEndpoint > f2 = requester .connect (srv .target , Timeout .ofMilliseconds (200 ));
204+ assertTrue (awaitPending (requester , srv .target , 1 , WAIT ), "Second connect did not become pending" );
205+
206+ // Ensure the pending request's deadline is definitely expired before we trigger pool servicing.
207+ Thread .sleep (500 );
208+
209+ // Trigger servicing of pending connects.
210+ ep1 .releaseAndReuse ();
211+
212+ try {
213+ final AsyncClientEndpoint ep2 = f2 .get (2 , TimeUnit .SECONDS );
214+ if (ep2 != null ) {
215+ ep2 .releaseAndDiscard ();
216+ }
217+ fail ("Expected pending connect to fail due to request timeout" );
218+ } catch (final ExecutionException ignore ) {
219+ // expected: deadline timeout / connect request timeout
220+ }
221+
222+ // Pool must still be usable
223+ final AsyncClientEndpoint ep3 = requester .connect (srv .target , Timeout .ofSeconds (2 )).get (2 , TimeUnit .SECONDS );
224+ ep3 .releaseAndReuse ();
225+
226+ assertTrue (awaitQuiescent (requester , srv .target , WAIT ), "Pool did not quiesce" );
227+
228+ } finally {
229+ requester .close (CloseMode .IMMEDIATE );
230+ srv .server .shutdown (CloseMode .IMMEDIATE );
231+ }
232+ }
233+
234+ @ ParameterizedTest
235+ @ EnumSource (PoolConcurrencyPolicy .class )
236+ @ org .junit .jupiter .api .Timeout (value = 30 , unit = TimeUnit .SECONDS )
237+ void closeImmediateMidFlightMustNotHang (final PoolConcurrencyPolicy policy ) throws Exception {
238+ final ServerResources srv = startServer ();
239+ final HttpAsyncRequester requester = createRequester (policy );
240+
241+ try {
242+ final Future <Message <HttpResponse , String >> f = requester .execute (
243+ srv .target ,
244+ new BasicRequestProducer (Method .GET , srv .target , "/block" ),
245+ new BasicResponseConsumer <>(new StringAsyncEntityConsumer ()),
246+ Timeout .ofSeconds (30 ),
247+ null );
248+
249+ assertTrue (srv .blockArrived .await (5 , TimeUnit .SECONDS ), "Server did not receive /block" );
250+
251+ requester .close (CloseMode .IMMEDIATE );
252+ srv .releaseBlocked ();
253+
254+ try {
255+ f .get (5 , TimeUnit .SECONDS );
256+ } catch (final Exception ignore ) {
257+ // Close mid-flight may abort; we only assert: no hang.
258+ }
259+ } finally {
260+ srv .releaseBlocked ();
261+ requester .close (CloseMode .IMMEDIATE );
262+ srv .server .shutdown (CloseMode .IMMEDIATE );
263+ }
264+ }
265+
266+ }
0 commit comments