Skip to content

Commit ecf9ab1

Browse files
authored
Remove reactive http body support (#444)
1 parent 20f3f2b commit ecf9ab1

File tree

66 files changed

+693
-2035
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+693
-2035
lines changed

experimental/camunda-rest-undertow/src/main/java/ru/tinkoff/kora/camunda/rest/telemetry/CamundaRestTelemetry.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,15 @@
22

33
import jakarta.annotation.Nullable;
44
import ru.tinkoff.kora.http.common.HttpResultCode;
5-
import ru.tinkoff.kora.http.common.body.HttpBodyInput;
65
import ru.tinkoff.kora.http.common.header.HttpHeaders;
76

87
import java.util.Collection;
98
import java.util.Map;
109

1110
public interface CamundaRestTelemetry {
1211

13-
CamundaRestTelemetryContext EMPTY_CTX = (s, r, h, ex) -> {};
14-
CamundaRestTelemetry EMPTY = (s, host, m, p, pt, h, q, b) -> EMPTY_CTX;
12+
CamundaRestTelemetryContext EMPTY_CTX = (_, _, _, _) -> {};
13+
CamundaRestTelemetry EMPTY = (_, _, _, _, _, _, _) -> EMPTY_CTX;
1514

1615
interface CamundaRestTelemetryContext {
1716

@@ -24,6 +23,5 @@ CamundaRestTelemetryContext get(String scheme,
2423
String path,
2524
@Nullable String pathTemplate,
2625
HttpHeaders headers,
27-
Map<String, ? extends Collection<String>> queryParams,
28-
HttpBodyInput body);
26+
Map<String, ? extends Collection<String>> queryParams);
2927
}

experimental/camunda-rest-undertow/src/main/java/ru/tinkoff/kora/camunda/rest/telemetry/CamundaRestTracer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,5 @@ CamundaRestSpan createSpan(String scheme,
2828
String path,
2929
String pathTemplate,
3030
HttpHeaders headers,
31-
Map<String, ? extends Collection<String>> queryParams,
32-
HttpBodyInput body);
31+
Map<String, ? extends Collection<String>> queryParams);
3332
}

experimental/camunda-rest-undertow/src/main/java/ru/tinkoff/kora/camunda/rest/telemetry/DefaultCamundaRestTelemetry.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package ru.tinkoff.kora.camunda.rest.telemetry;
22

33
import jakarta.annotation.Nullable;
4-
import ru.tinkoff.kora.http.common.body.HttpBodyInput;
54
import ru.tinkoff.kora.http.common.header.HttpHeaders;
65

76
import java.util.Collection;
@@ -35,8 +34,7 @@ public CamundaRestTelemetryContext get(String scheme,
3534
String path,
3635
@Nullable String routeTemplate,
3736
HttpHeaders headers,
38-
Map<String, ? extends Collection<String>> queryParams,
39-
HttpBodyInput body) {
37+
Map<String, ? extends Collection<String>> queryParams) {
4038
var metrics = this.metrics;
4139
var logger = this.logger;
4240
var tracer = this.tracer;
@@ -56,7 +54,7 @@ public CamundaRestTelemetryContext get(String scheme,
5654
logger.logStart(method, path, routeTemplate, queryParams, headers);
5755
}
5856
if (tracer != null) {
59-
span = tracer.createSpan(scheme, host, method, path, routeTemplate, headers, queryParams, body);
57+
span = tracer.createSpan(scheme, host, method, path, routeTemplate, headers, queryParams);
6058
} else {
6159
span = null;
6260
}

experimental/camunda-rest-undertow/src/main/java/ru/tinkoff/kora/camunda/rest/telemetry/DefaultCamundaRestTelemetryFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
public final class DefaultCamundaRestTelemetryFactory implements CamundaRestTelemetryFactory {
77

8-
private static final CamundaRestTelemetry.CamundaRestTelemetryContext EMPTY_CTX = (s, r, h, ex) -> {};
9-
private static final CamundaRestTelemetry EMPTY = (s, host, m, p, pt, h, q, b) -> EMPTY_CTX;
8+
private static final CamundaRestTelemetry.CamundaRestTelemetryContext EMPTY_CTX = (_, _, _, _) -> {};
9+
private static final CamundaRestTelemetry EMPTY = (_, _, _, _, _, _, _) -> EMPTY_CTX;
1010

1111
@Nullable
1212
private final CamundaRestLoggerFactory logger;

experimental/camunda-rest-undertow/src/main/java/ru/tinkoff/kora/camunda/rest/undertow/UndertowCamundaRestHttpHandler.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import ru.tinkoff.kora.http.server.common.handler.HttpServerRequestHandler;
4040
import ru.tinkoff.kora.http.server.undertow.UndertowHttpHeaders;
4141
import ru.tinkoff.kora.http.server.undertow.UndertowHttpServer;
42-
import ru.tinkoff.kora.http.server.undertow.request.UndertowPublicApiRequest;
4342
import ru.tinkoff.kora.openapi.management.OpenApiHttpServerHandler;
4443
import ru.tinkoff.kora.openapi.management.RapidocHttpServerHandler;
4544
import ru.tinkoff.kora.openapi.management.SwaggerUIHttpServerHandler;
@@ -123,14 +122,16 @@ public void init() throws Exception {
123122
var restHandler = deploymentManager.start();
124123
root.addPrefixPath(camundaRestConfig.path(), exchange -> {
125124
var match = restMatcher.getMatch(exchange.getRequestMethod().toString(), exchange.getRequestPath());
126-
final CamundaRestTelemetryContext telemetryContext;
127-
var context = Context.clear();
128-
var req = new UndertowPublicApiRequest(exchange, context);
129-
if (match.isPresent()) {
130-
telemetryContext = telemetry.get(req.scheme(), req.hostName(), req.method(), req.path(), match.get().pathTemplate(), req.headers(), req.queryParams(), req.body());
131-
} else {
132-
telemetryContext = telemetry.get(req.scheme(), req.hostName(), req.method(), req.path(), null, req.headers(), req.queryParams(), req.body());
133-
}
125+
var pathTemplate = match == null ? null : match.pathTemplate();
126+
var telemetryContext = telemetry.get(
127+
exchange.getRequestScheme(),
128+
exchange.getHostName(),
129+
exchange.getRequestMethod().toString(),
130+
exchange.getRelativePath(),
131+
pathTemplate,
132+
new UndertowHttpHeaders(exchange.getRequestHeaders()),
133+
Map.of()// todo do we need query here?
134+
);
134135

135136
restHandler.handleRequest(exchange.addExchangeCompleteListener((ex, nextListener) -> {
136137
var httpResultCode = HttpResultCode.fromStatusCode(ex.getStatusCode());
@@ -611,7 +612,7 @@ private OpenApiHttpHandler(CamundaRestConfig restConfig,
611612
public void handleRequest(HttpServerExchange exchange) {
612613
var requestPath = exchange.getRequestPath();
613614
var match = pathMatcher.getMatch(exchange.getRequestMethod().toString(), requestPath);
614-
if (match.isEmpty()) {
615+
if (match == null) {
615616
exchange.setStatusCode(404);
616617
exchange.endExchange();
617618
return;
@@ -620,10 +621,17 @@ public void handleRequest(HttpServerExchange exchange) {
620621
exchange.dispatch(executor, exchange1 -> {
621622
var context = Context.clear();
622623

623-
var req = new UndertowPublicApiRequest(exchange1, context);
624-
var telemetryContext = telemetry.get(req.scheme(), req.hostName(), req.method(), req.path(), match.get().pathTemplate(), req.headers(), req.queryParams(), req.body());
624+
var telemetryContext = telemetry.get(
625+
exchange.getRequestScheme(),
626+
exchange.getHostName(),
627+
exchange.getRequestMethod().toString(),
628+
exchange.getRelativePath(),
629+
match.pathTemplate(),
630+
new UndertowHttpHeaders(exchange.getRequestHeaders()),
631+
Map.of()// todo do we need query here?
632+
);
625633

626-
var fakeRequest = getFakeRequest(match.get());
634+
var fakeRequest = getFakeRequest(match);
627635
var openapi = restConfig.openapi();
628636
if (openapi.enabled() && requestPath.startsWith(openapi.endpoint())) {
629637
executeHandler(context, telemetryContext, exchange1, openApiHandler, fakeRequest);

experimental/camunda-rest-undertow/src/main/java/ru/tinkoff/kora/camunda/rest/undertow/UndertowPathMatcher.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,19 @@ record HttpMethodPath(String method, String routeTemplate) {}
3131

3232
record Match(String method, String pathTemplate, Map<String, String> pathParameters) {}
3333

34-
Optional<Match> getMatch(String method, String path) {
34+
@Nullable
35+
Match getMatch(String method, String path) {
3536
final Map<String, String> templateParameters;
3637
final @Nullable String routeTemplate;
3738

3839
var methodMatchers = pathTemplateMatcher.get(method);
3940
var pathTemplateMatch = methodMatchers == null ? null : methodMatchers.match(path);
4041
if (pathTemplateMatch == null) {
41-
return Optional.empty();
42+
return null;
4243
} else {
4344
templateParameters = pathTemplateMatch.parameters();
4445
routeTemplate = pathTemplateMatch.matchedTemplate();
45-
return Optional.of(new Match(method, routeTemplate, templateParameters));
46+
return new Match(method, routeTemplate, templateParameters);
4647
}
4748
}
4849
}

experimental/s3-client-aws/src/main/java/ru/tinkoff/kora/s3/client/aws/KoraAwsSdkHttpClient.java

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,32 @@
11
package ru.tinkoff.kora.s3.client.aws;
22

3+
import jakarta.annotation.Nullable;
34
import org.jetbrains.annotations.ApiStatus;
5+
import org.reactivestreams.Subscriber;
6+
import org.reactivestreams.Subscription;
47
import reactor.adapter.JdkFlowAdapter;
58
import ru.tinkoff.kora.http.client.common.HttpClient;
69
import ru.tinkoff.kora.http.client.common.request.HttpClientRequest;
710
import ru.tinkoff.kora.http.client.common.request.HttpClientRequestBuilder;
811
import ru.tinkoff.kora.http.client.common.response.HttpClientResponse;
912
import ru.tinkoff.kora.http.common.body.HttpBodyInput;
1013
import ru.tinkoff.kora.http.common.body.HttpBodyOutput;
14+
import software.amazon.awssdk.core.async.AsyncRequestBody;
1115
import software.amazon.awssdk.http.*;
1216
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
1317
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
1418

15-
import java.io.ByteArrayInputStream;
1619
import java.io.IOException;
1720
import java.io.InputStream;
21+
import java.io.OutputStream;
1822
import java.net.URI;
1923
import java.net.URISyntaxException;
2024
import java.nio.ByteBuffer;
2125
import java.util.HashMap;
2226
import java.util.List;
2327
import java.util.Map;
2428
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.ExecutionException;
2530
import java.util.concurrent.Flow;
2631

2732
@ApiStatus.Experimental
@@ -46,9 +51,14 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest httpExecuteReques
4651

4752
@Override
4853
public HttpExecuteResponse call() {
49-
final HttpClientRequest request = asKoraRequest(httpExecuteRequest);
50-
final HttpClientResponse response = httpClient.execute(request);
51-
return asAwsResponse(response);
54+
var contentProvider = httpExecuteRequest.contentStreamProvider().orElse(null);
55+
try (var content = contentProvider == null ? null : contentProvider.newStream()) {
56+
final HttpClientRequest request = asKoraRequest(httpExecuteRequest, content);
57+
final HttpClientResponse response = httpClient.execute(request);
58+
return asAwsResponse(response);
59+
} catch (IOException e) {
60+
throw new RuntimeException(e);
61+
}
5262
}
5363

5464
@Override
@@ -70,25 +80,27 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest)
7080
}
7181
final SdkHttpResponse sdkHttpResponse = asSdkResponse(response);
7282
asyncExecuteRequest.responseHandler().onHeaders(sdkHttpResponse);
73-
asyncExecuteRequest.responseHandler().onStream(JdkFlowAdapter.flowPublisherToFlux(response.body()));
83+
try (var body = response.body(); var is = body.asInputStream()) {
84+
asyncExecuteRequest.responseHandler().onStream(AsyncRequestBody.fromBytes(is.readAllBytes()));
85+
} catch (IOException e) {
86+
asyncExecuteRequest.responseHandler().onError(e);
87+
}
7488

7589
return CompletableFuture.completedFuture(null);
7690
}
7791

78-
private HttpClientRequest asKoraRequest(HttpExecuteRequest httpExecuteRequest) {
92+
private HttpClientRequest asKoraRequest(HttpExecuteRequest httpExecuteRequest, @Nullable InputStream content) {
7993
final SdkHttpRequest sdkHttpRequest = httpExecuteRequest.httpRequest();
8094
final HttpClientRequestBuilder builder = getBaseBuilder(sdkHttpRequest.getUri(), sdkHttpRequest.method().name(), sdkHttpRequest.rawQueryParameters(), sdkHttpRequest.headers());
81-
82-
httpExecuteRequest.contentStreamProvider().ifPresent(provider -> {
83-
String contentType = sdkHttpRequest.firstMatchingHeader("Content-Type").orElse("application/octet-stream");
84-
String contentLength = sdkHttpRequest.firstMatchingHeader("Content-Length").orElse(null);
95+
if (content != null) {
96+
var contentType = sdkHttpRequest.firstMatchingHeader("Content-Type").orElse("application/octet-stream");
97+
var contentLength = sdkHttpRequest.firstMatchingHeader("Content-Length").orElse(null);
8598
if (contentLength == null) {
86-
builder.body(HttpBodyOutput.of(contentType, provider.newStream()));
99+
builder.body(HttpBodyOutput.of(contentType, content));
87100
} else {
88-
builder.body(HttpBodyOutput.of(contentType, Long.parseLong(contentLength), provider.newStream()));
101+
builder.body(HttpBodyOutput.of(contentType, Long.parseLong(contentLength), content));
89102
}
90-
});
91-
103+
}
92104
return builder
93105
.requestTimeout(clientConfig.requestTimeout())
94106
.build();
@@ -99,14 +111,60 @@ private HttpClientRequest asKoraRequest(AsyncExecuteRequest asyncExecuteRequest)
99111
final HttpClientRequestBuilder builder = getBaseBuilder(sdkHttpRequest.getUri(), sdkHttpRequest.method().name(), sdkHttpRequest.rawQueryParameters(), sdkHttpRequest.headers());
100112

101113
Flow.Publisher<ByteBuffer> bodyFlow = JdkFlowAdapter.publisherToFlowPublisher(asyncExecuteRequest.requestContentPublisher());
102-
String contentType = sdkHttpRequest.firstMatchingHeader("Content-Type").orElse("application/octet-stream");
103-
String contentLength = sdkHttpRequest.firstMatchingHeader("Content-Length").orElse(null);
104-
if (contentLength == null) {
105-
builder.body(HttpBodyOutput.of(contentType, bodyFlow));
106-
} else {
107-
builder.body(HttpBodyOutput.of(contentType, Long.parseLong(contentLength), bodyFlow));
108-
}
114+
var contentType = sdkHttpRequest.firstMatchingHeader("Content-Type").orElse("application/octet-stream");
115+
var contentLengthStr = sdkHttpRequest.firstMatchingHeader("Content-Length").orElse(null);
116+
var contentLength = contentLengthStr == null ? -1 : Long.parseLong(contentLengthStr);
117+
builder.body(new HttpBodyOutput() {
118+
@Override
119+
public long contentLength() {
120+
return contentLength;
121+
}
109122

123+
@Override
124+
public String contentType() {
125+
return contentType;
126+
}
127+
128+
@Override
129+
public void write(OutputStream os) throws IOException {
130+
var future = new CompletableFuture<Void>();
131+
asyncExecuteRequest.requestContentPublisher().subscribe(new Subscriber<ByteBuffer>() {
132+
@Override
133+
public void onSubscribe(Subscription s) {
134+
s.request(Long.MAX_VALUE);
135+
}
136+
137+
@Override
138+
public void onNext(ByteBuffer byteBuffer) {
139+
try {
140+
os.write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), byteBuffer.remaining());
141+
} catch (IOException e) {
142+
throw new RuntimeException(e);
143+
}
144+
}
145+
146+
@Override
147+
public void onError(Throwable t) {
148+
future.completeExceptionally(t);
149+
}
150+
151+
@Override
152+
public void onComplete() {
153+
future.complete(null);
154+
}
155+
});
156+
try {
157+
future.get();
158+
} catch (InterruptedException | ExecutionException e) {
159+
throw new RuntimeException(e);
160+
}
161+
}
162+
163+
@Override
164+
public void close() throws IOException {
165+
166+
}
167+
});
110168
return builder
111169
.requestTimeout(clientConfig.requestTimeout())
112170
.build();
@@ -176,13 +234,10 @@ private static SdkHttpFullResponse asSdkResponse(HttpClientResponse koraHttpResp
176234
private static AbortableInputStream asSdkResponseStream(HttpClientResponse koraHttpResponse) {
177235
final HttpBodyInput body = koraHttpResponse.body();
178236
final InputStream bodyIS = body.asInputStream();
179-
final InputStream bodyAsInputStream = bodyIS != null
180-
? bodyIS
181-
: new ByteArrayInputStream(body.asArrayStage().toCompletableFuture().join());
182237

183-
return AbortableInputStream.create(bodyAsInputStream, () -> {
238+
return AbortableInputStream.create(bodyIS, () -> {
184239
try {
185-
bodyAsInputStream.close();
240+
bodyIS.close();
186241
} catch (IOException e) {
187242
// ignore
188243
}

0 commit comments

Comments
 (0)