22
22
import static com .linecorp .armeria .common .SessionProtocol .H1C ;
23
23
import static org .assertj .core .api .Assertions .assertThat ;
24
24
import static org .assertj .core .api .Assertions .assertThatThrownBy ;
25
+ import static org .awaitility .Awaitility .await ;
25
26
26
27
import java .time .Duration ;
27
28
import java .util .ArrayList ;
40
41
41
42
import org .apache .thrift .async .AsyncMethodCallback ;
42
43
import org .apache .thrift .transport .TTransportException ;
44
+ import org .junit .jupiter .api .AfterAll ;
43
45
import org .junit .jupiter .api .AfterEach ;
44
46
import org .junit .jupiter .api .Test ;
45
47
import org .junit .jupiter .api .extension .RegisterExtension ;
57
59
import com .linecorp .armeria .client .WebClient ;
58
60
import com .linecorp .armeria .client .brave .BraveClient ;
59
61
import com .linecorp .armeria .client .thrift .ThriftClients ;
62
+ import com .linecorp .armeria .common .AggregatedHttpResponse ;
60
63
import com .linecorp .armeria .common .HttpRequest ;
61
64
import com .linecorp .armeria .common .HttpResponse ;
62
65
import com .linecorp .armeria .common .HttpStatus ;
97
100
class BraveIntegrationTest {
98
101
99
102
private static final String CLIENT_TYPE_HEADER = "x-client-type" ;
103
+ private static final String TIMEOUT_HEADER = "x-timeout" ;
100
104
private static final SpanHandlerImpl spanHandler = new SpanHandlerImpl ();
101
105
102
106
@ RegisterExtension
@@ -130,7 +134,8 @@ protected void configure(ServerBuilder sb) throws Exception {
130
134
sb .service ("/qux" , tHttpDecorate ("service/qux" , (name , resultHandler ) ->
131
135
resultHandler .onComplete ("Hello, " + name + '!' )));
132
136
133
- sb .service ("/pool" , httpDecorate ("service/pool" , new AbstractHttpService () {
137
+ final Tracing servicePoolTracing = newTracing ("service/pool" );
138
+ sb .service ("/pool" , httpDecorate (servicePoolTracing , new AbstractHttpService () {
134
139
@ Override
135
140
protected HttpResponse doGet (ServiceRequestContext ctx , HttpRequest req )
136
141
throws Exception {
@@ -145,7 +150,7 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
145
150
countDownLatch .countDown ();
146
151
countDownLatch .await ();
147
152
}
148
- final Span span = Tracing . currentTracer ().nextSpan ().start ();
153
+ final Span span = servicePoolTracing . tracer ().nextSpan ().start ();
149
154
try (SpanInScope unused =
150
155
Tracing .currentTracer ().withSpanInScope (span )) {
151
156
if (i == 1 ) {
@@ -166,8 +171,8 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
166
171
result -> allAsList (IntStream .range (1 , 3 ).mapToObj (
167
172
i -> executorService .submit (
168
173
RequestContext .current ().makeContextAware (() -> {
169
- final ScopedSpan span = Tracing . currentTracer ()
170
- .startScopedSpan ("aloha" );
174
+ final ScopedSpan span =
175
+ servicePoolTracing . tracer () .startScopedSpan ("aloha" );
171
176
try {
172
177
return null ;
173
178
} finally {
@@ -185,19 +190,30 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
185
190
}
186
191
}));
187
192
188
- sb .service ("/timeout" , tHttpDecorate ("service/timeout" ,
189
- // This service never calls the handler and will timeout.
190
- (name , resultHandler ) -> {}));
193
+ sb .service ("/timeout" ,
194
+ tHttpDecorate ("service/timeout" ,
195
+ // This service never calls the handler and will timeout.
196
+ (name , resultHandler ) -> {
197
+ final ServiceRequestContext ctx = ServiceRequestContext .current ();
198
+ if (ctx .request ().headers ().contains (TIMEOUT_HEADER )) {
199
+ ctx .timeoutNow ();
200
+ }
201
+ }));
191
202
192
203
sb .service ("/http" , (req , ctx ) -> HttpResponse .of (HttpStatus .OK ));
193
204
}
194
205
};
195
206
196
207
@ AfterEach
197
- void shouldHaveNoExtraSpans () {
208
+ void afterEach () {
198
209
assertThat (spanHandler .spans ).isEmpty ();
199
210
}
200
211
212
+ @ AfterAll
213
+ static void afterAll () throws Exception {
214
+ Tracing .current ().close ();
215
+ }
216
+
201
217
private static HttpService tHttpDecorate (String name , AsyncIface asyncIface ) {
202
218
final THttpService service =
203
219
THttpService .builder ()
@@ -213,16 +229,16 @@ private static HttpService tHttpDecorate(String name, AsyncIface asyncIface) {
213
229
};
214
230
}
215
231
216
- private static HttpService httpDecorate (String name , HttpService service ) {
217
- return BraveService .newDecorator (newTracing ( name ) ).apply (service );
232
+ private static HttpService httpDecorate (Tracing tracing , HttpService service ) {
233
+ return BraveService .newDecorator (tracing ).apply (service );
218
234
}
219
235
220
236
private static TestService .AsyncIface newClient (String path ) {
221
237
final ServiceRequestContext ctx = ServiceRequestContext .current ();
222
238
final String braveServiceType = ctx .request ().headers ().get (CLIENT_TYPE_HEADER );
223
239
return ThriftClients .builder (server .httpUri ())
224
240
.path (path )
225
- .addHeader (CLIENT_TYPE_HEADER , "http" )
241
+ .addHeader (CLIENT_TYPE_HEADER , braveServiceType )
226
242
.decorator (BraveClient .newDecorator (newTracing ("client" + path )))
227
243
.build (TestService .AsyncIface .class );
228
244
}
@@ -434,7 +450,9 @@ void testServiceInitiatedTrace(String type) throws Exception {
434
450
435
451
@ Test
436
452
void testSpanInThreadPoolHasSameTraceId () throws Exception {
437
- server .webClient ().get ("pool" ).aggregate ().get ();
453
+ final AggregatedHttpResponse res = server .blockingWebClient ().get ("pool" );
454
+ assertThat (res .contentUtf8 ()).isEqualTo ("Lee" );
455
+ await ().untilAsserted (() -> assertThat (spanHandler .spans ).hasSize (5 ));
438
456
final MutableSpan [] spans = spanHandler .take (5 );
439
457
assertThat (Arrays .stream (spans ).map (MutableSpan ::traceId ).collect (toImmutableSet ())).hasSize (1 );
440
458
assertThat (Arrays .stream (spans ).map (MutableSpan ::parentId )
@@ -445,24 +463,29 @@ void testSpanInThreadPoolHasSameTraceId() throws Exception {
445
463
@ ParameterizedTest
446
464
@ ValueSource (strings = {"http" , "rpc" })
447
465
void testServerTimesOut (String type ) throws Exception {
448
- final TestService .Iface timeoutClient =
449
- ThriftClients .builder (server .httpUri ())
450
- .path ("/timeout" )
451
- .addHeader (CLIENT_TYPE_HEADER , type )
452
- .decorator (BraveClient .newDecorator (newTracing ("client/timeout" )))
453
- .build (TestService .Iface .class );
454
- assertThatThrownBy (() -> timeoutClient .hello ("name" ))
455
- .isInstanceOf (TTransportException .class )
456
- .hasCauseInstanceOf (InvalidResponseHeadersException .class );
457
- final MutableSpan [] spans = spanHandler .take (2 );
458
-
459
- final MutableSpan serverSpan = findSpan (spans , "service/timeout" );
460
- final MutableSpan clientSpan = findSpan (spans , "client/timeout" );
461
-
462
- // Server timed out meaning it did still send a timeout response to the client and we have all
463
- // annotations.
464
- assertThat (serverSpan .annotations ()).hasSize (2 );
465
- assertThat (clientSpan .annotations ()).hasSize (2 );
466
+ try (ClientFactory cf = ClientFactory .builder ().build ()) {
467
+ final TestService .Iface timeoutClient =
468
+ ThriftClients .builder (server .httpUri ())
469
+ .path ("/timeout" )
470
+ .factory (cf )
471
+ .addHeader (CLIENT_TYPE_HEADER , type )
472
+ .addHeader (TIMEOUT_HEADER , true )
473
+ .decorator (BraveClient .newDecorator (newTracing ("client/timeout" )))
474
+ .build (TestService .Iface .class );
475
+ assertThatThrownBy (() -> timeoutClient .hello ("name" ))
476
+ .isInstanceOf (TTransportException .class )
477
+ .hasCauseInstanceOf (InvalidResponseHeadersException .class );
478
+ final MutableSpan [] spans = spanHandler .take (2 );
479
+
480
+ final MutableSpan serverSpan = findSpan (spans , "service/timeout" );
481
+ final MutableSpan clientSpan = findSpan (spans , "client/timeout" );
482
+
483
+ // Server timed out meaning it did still send a timeout response to the client and we have all
484
+ // annotations. A separate client factory is used to guarantee that client span annotations
485
+ // always contain connection related extra annotations.
486
+ assertThat (serverSpan .annotations ()).hasSize (2 );
487
+ assertThat (clientSpan .annotations ()).hasSize (6 );
488
+ }
466
489
}
467
490
468
491
@ ParameterizedTest
@@ -473,7 +496,7 @@ void testHttp2ClientTimesOut(String type) throws Exception {
473
496
.path ("/timeout" )
474
497
.addHeader (CLIENT_TYPE_HEADER , type )
475
498
.decorator (BraveClient .newDecorator (newTracing ("client/timeout" )))
476
- .responseTimeout (Duration .ofSeconds (3 ))
499
+ .responseTimeout (Duration .ofSeconds (1 ))
477
500
.build (TestService .Iface .class );
478
501
testClientTimesOut (timeoutClientClientTimesOut );
479
502
}
@@ -582,8 +605,8 @@ public void onError(Exception exception) {
582
605
}
583
606
}
584
607
585
- private static class SpanHandlerImpl extends SpanHandler {
586
- private final BlockingQueue <MutableSpan > spans = new LinkedBlockingQueue <>();
608
+ static final class SpanHandlerImpl extends SpanHandler {
609
+ final BlockingQueue <MutableSpan > spans = new LinkedBlockingQueue <>();
587
610
588
611
@ Override
589
612
public boolean end (TraceContext context , MutableSpan span , Cause cause ) {
0 commit comments