22
22
import static com .linecorp .armeria .common .SessionProtocol .H1C ;
23
23
import static org .assertj .core .api .Assertions .assertThat ;
24
24
import static org .assertj .core .api .Assertions .assertThatThrownBy ;
25
+ import static org .awaitility .Awaitility .await ;
25
26
26
27
import java .time .Duration ;
27
28
import java .util .ArrayList ;
30
31
import java .util .List ;
31
32
import java .util .Map ;
32
33
import java .util .Objects ;
33
- import java .util .concurrent .BlockingDeque ;
34
34
import java .util .concurrent .BlockingQueue ;
35
35
import java .util .concurrent .CompletableFuture ;
36
36
import java .util .concurrent .CountDownLatch ;
37
37
import java .util .concurrent .Executors ;
38
- import java .util .concurrent .LinkedBlockingDeque ;
39
38
import java .util .concurrent .LinkedBlockingQueue ;
40
39
import java .util .concurrent .TimeUnit ;
41
40
import java .util .stream .IntStream ;
42
41
43
42
import org .apache .thrift .async .AsyncMethodCallback ;
44
43
import org .apache .thrift .transport .TTransportException ;
45
- import org .junit .jupiter .api .AfterAll ;
46
44
import org .junit .jupiter .api .AfterEach ;
47
45
import org .junit .jupiter .api .Test ;
48
46
import org .junit .jupiter .api .extension .RegisterExtension ;
100
98
class BraveIntegrationTest {
101
99
102
100
private static final String CLIENT_TYPE_HEADER = "x-client-type" ;
101
+ private static final String TIMEOUT_HEADER = "x-timeout" ;
103
102
private static final SpanHandlerImpl spanHandler = new SpanHandlerImpl ();
104
- private static final BlockingDeque <Tracing > perTestTracings = new LinkedBlockingDeque <>();
105
- private static final BlockingDeque <Tracing > perClassTracings = new LinkedBlockingDeque <>();
106
103
107
104
@ RegisterExtension
108
105
static ServerExtension server = new ServerExtension (true ) {
@@ -150,7 +147,8 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
150
147
countDownLatch .countDown ();
151
148
countDownLatch .await ();
152
149
}
153
- final Span span = Tracing .currentTracer ().nextSpan ().start ();
150
+ final Span span = Tracing .currentTracer ().nextSpan ()
151
+ .name ("aloha1:" + i ).start ();
154
152
try (SpanInScope unused =
155
153
Tracing .currentTracer ().withSpanInScope (span )) {
156
154
if (i == 1 ) {
@@ -171,8 +169,9 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
171
169
result -> allAsList (IntStream .range (1 , 3 ).mapToObj (
172
170
i -> executorService .submit (
173
171
RequestContext .current ().makeContextAware (() -> {
174
- final ScopedSpan span = Tracing .currentTracer ()
175
- .startScopedSpan ("aloha" );
172
+ final ScopedSpan span =
173
+ Tracing .currentTracer ()
174
+ .startScopedSpan ("aloha2:" + i );
176
175
try {
177
176
return null ;
178
177
} finally {
@@ -190,9 +189,15 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
190
189
}
191
190
}));
192
191
193
- sb .service ("/timeout" , tHttpDecorate ("service/timeout" ,
194
- // This service never calls the handler and will timeout.
195
- (name , resultHandler ) -> {}));
192
+ sb .service ("/timeout" ,
193
+ tHttpDecorate ("service/timeout" ,
194
+ // This service never calls the handler and will timeout.
195
+ (name , resultHandler ) -> {
196
+ final ServiceRequestContext ctx = ServiceRequestContext .current ();
197
+ if (ctx .request ().headers ().contains (TIMEOUT_HEADER )) {
198
+ ctx .timeoutNow ();
199
+ }
200
+ }));
196
201
197
202
sb .service ("/http" , (req , ctx ) -> HttpResponse .of (HttpStatus .OK ));
198
203
}
@@ -201,13 +206,6 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
201
206
@ AfterEach
202
207
void afterEach () {
203
208
assertThat (spanHandler .spans ).isEmpty ();
204
- perTestTracings .forEach (Tracing ::close );
205
- perTestTracings .clear ();
206
- }
207
-
208
- @ AfterAll
209
- static void afterAll () {
210
- perClassTracings .forEach (Tracing ::close );
211
209
}
212
210
213
211
private static HttpService tHttpDecorate (String name , AsyncIface asyncIface ) {
@@ -219,14 +217,14 @@ private static HttpService tHttpDecorate(String name, AsyncIface asyncIface) {
219
217
return (ctx , req ) -> {
220
218
final String braveServiceType = ctx .request ().headers ().get (CLIENT_TYPE_HEADER );
221
219
if ("http" .equals (braveServiceType )) {
222
- return BraveService .newDecorator (newTracing (name , false )).apply (service ).serve (ctx , req );
220
+ return BraveService .newDecorator (newTracing (name )).apply (service ).serve (ctx , req );
223
221
}
224
222
return service .serve (ctx , req );
225
223
};
226
224
}
227
225
228
226
private static HttpService httpDecorate (String name , HttpService service ) {
229
- return BraveService .newDecorator (newTracing (name , false )).apply (service );
227
+ return BraveService .newDecorator (newTracing (name )).apply (service );
230
228
}
231
229
232
230
private static TestService .AsyncIface newClient (String path ) {
@@ -240,27 +238,17 @@ private static TestService.AsyncIface newClient(String path) {
240
238
}
241
239
242
240
private static Tracing newTracing (String name ) {
243
- return newTracing (name , true );
244
- }
245
-
246
- private static Tracing newTracing (String name , boolean perTest ) {
247
241
final CurrentTraceContext currentTraceContext =
248
242
RequestContextCurrentTraceContext .builder ()
249
243
.nonRequestThread ("nonrequest-" )
250
244
.addScopeDecorator (StrictScopeDecorator .create ())
251
245
.build ();
252
- final Tracing tracing = Tracing .newBuilder ()
253
- .currentTraceContext (currentTraceContext )
254
- .localServiceName (name )
255
- .addSpanHandler (spanHandler )
256
- .sampler (Sampler .ALWAYS_SAMPLE )
257
- .build ();
258
- if (perTest ) {
259
- perTestTracings .add (tracing );
260
- } else {
261
- perClassTracings .add (tracing );
262
- }
263
- return tracing ;
246
+ return Tracing .newBuilder ()
247
+ .currentTraceContext (currentTraceContext )
248
+ .localServiceName (name )
249
+ .addSpanHandler (spanHandler )
250
+ .sampler (Sampler .ALWAYS_SAMPLE )
251
+ .build ();
264
252
}
265
253
266
254
@ Test
@@ -457,6 +445,7 @@ void testServiceInitiatedTrace(String type) throws Exception {
457
445
@ Test
458
446
void testSpanInThreadPoolHasSameTraceId () throws Exception {
459
447
server .webClient ().get ("pool" ).aggregate ().get ();
448
+ await ().untilAsserted (() -> assertThat (spanHandler .spans ).hasSize (5 ));
460
449
final MutableSpan [] spans = spanHandler .take (5 );
461
450
assertThat (Arrays .stream (spans ).map (MutableSpan ::traceId ).collect (toImmutableSet ())).hasSize (1 );
462
451
assertThat (Arrays .stream (spans ).map (MutableSpan ::parentId )
@@ -473,6 +462,7 @@ void testServerTimesOut(String type) throws Exception {
473
462
.path ("/timeout" )
474
463
.factory (cf )
475
464
.addHeader (CLIENT_TYPE_HEADER , type )
465
+ .addHeader (TIMEOUT_HEADER , true )
476
466
.decorator (BraveClient .newDecorator (newTracing ("client/timeout" )))
477
467
.build (TestService .Iface .class );
478
468
assertThatThrownBy (() -> timeoutClient .hello ("name" ))
@@ -499,7 +489,7 @@ void testHttp2ClientTimesOut(String type) throws Exception {
499
489
.path ("/timeout" )
500
490
.addHeader (CLIENT_TYPE_HEADER , type )
501
491
.decorator (BraveClient .newDecorator (newTracing ("client/timeout" )))
502
- .responseTimeout (Duration .ofSeconds (3 ))
492
+ .responseTimeout (Duration .ofSeconds (1 ))
503
493
.build (TestService .Iface .class );
504
494
testClientTimesOut (timeoutClientClientTimesOut );
505
495
}
@@ -608,8 +598,8 @@ public void onError(Exception exception) {
608
598
}
609
599
}
610
600
611
- private static class SpanHandlerImpl extends SpanHandler {
612
- private final BlockingQueue <MutableSpan > spans = new LinkedBlockingQueue <>();
601
+ static final class SpanHandlerImpl extends SpanHandler {
602
+ final BlockingQueue <MutableSpan > spans = new LinkedBlockingQueue <>();
613
603
614
604
@ Override
615
605
public boolean end (TraceContext context , MutableSpan span , Cause cause ) {
0 commit comments