Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming] Fix issues when not using try-with-resource #49

Open
wants to merge 92 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 83 commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
d319aa9
OpenAI streaming
CharlesDuboisSAP Aug 16, 2024
69ae7eb
Added homepage and error handling todo
CharlesDuboisSAP Aug 16, 2024
7870e6d
Renamed vars
CharlesDuboisSAP Aug 19, 2024
652ec1e
Added todos
CharlesDuboisSAP Aug 19, 2024
727b3d4
Made stream generic, try-with resources, TEXT_EVENT_STREAM, exception…
CharlesDuboisSAP Aug 21, 2024
b3190a5
Formatting
bot-sdk-js Aug 21, 2024
f0fa3e6
close stream correctly
CharlesDuboisSAP Aug 21, 2024
09ca6ea
Formatting
bot-sdk-js Aug 21, 2024
d86243a
Created OpenAiStreamOutput
CharlesDuboisSAP Aug 21, 2024
2a4ce7b
Merge remote-tracking branch 'origin/streaming' into streaming
CharlesDuboisSAP Aug 21, 2024
cf6ec46
Formatting
bot-sdk-js Aug 21, 2024
a73f037
Renamed stream to streamChatCompletion, Added comments
CharlesDuboisSAP Aug 22, 2024
eb3f24a
Added total output
CharlesDuboisSAP Aug 23, 2024
fb2cdaf
Total output is printed
CharlesDuboisSAP Aug 23, 2024
fe078c7
Formatting
bot-sdk-js Aug 23, 2024
09e1be0
addDelta is propagated everywhere
CharlesDuboisSAP Aug 23, 2024
42ae946
addDelta is propagated everywhere
CharlesDuboisSAP Aug 23, 2024
e6e009a
forgotten addDeltas
CharlesDuboisSAP Aug 23, 2024
bee8fdc
Added jackson dependencies
CharlesDuboisSAP Aug 23, 2024
5f03c6f
Added Javadoc
CharlesDuboisSAP Aug 23, 2024
e79ca8e
Removed 1 TODO
CharlesDuboisSAP Aug 23, 2024
ba2c5e0
PMD
CharlesDuboisSAP Aug 27, 2024
c10eecb
PMD again
CharlesDuboisSAP Aug 27, 2024
cdae1c6
Merge branch 'refs/heads/main' into streaming
CharlesDuboisSAP Aug 27, 2024
faa3b70
Merge branch 'refs/heads/main' into streaming
CharlesDuboisSAP Aug 27, 2024
0e1a167
Added OpenAiClientTest.streamChatCompletion()
CharlesDuboisSAP Aug 28, 2024
31dbd52
Change return type of stream, added e2e test
CharlesDuboisSAP Aug 29, 2024
de7e7f0
Added documentation
CharlesDuboisSAP Aug 29, 2024
349936f
Added documentation framework-agnostic + throw if finish reason is in…
CharlesDuboisSAP Aug 29, 2024
58b0bc9
Merge branch 'refs/heads/main' into streaming
CharlesDuboisSAP Aug 30, 2024
3366c2e
Added error handling test
CharlesDuboisSAP Aug 30, 2024
c709d31
Updates from pair review / discussion
MatKuhr Aug 30, 2024
73031d1
Cleanup + streamChatCompletion doesn't throw
CharlesDuboisSAP Sep 2, 2024
6b1bfd0
PMD
CharlesDuboisSAP Sep 2, 2024
23474ba
Added errorHandling test
CharlesDuboisSAP Sep 2, 2024
769cd7d
Apply suggestions from code review
CharlesDuboisSAP Sep 3, 2024
118dc69
Dependency analyze
CharlesDuboisSAP Sep 3, 2024
acd21c0
Review comments
CharlesDuboisSAP Sep 3, 2024
28268b2
Make client static
CharlesDuboisSAP Sep 3, 2024
9a9a44b
Formatting
bot-sdk-js Sep 3, 2024
788db03
PMD
CharlesDuboisSAP Sep 3, 2024
0616f55
Fix tests
CharlesDuboisSAP Sep 3, 2024
3446bf0
Removed exception constructors no args
CharlesDuboisSAP Sep 3, 2024
45a20c6
Refactor exception message
CharlesDuboisSAP Sep 3, 2024
f843061
Readme sentences
CharlesDuboisSAP Sep 3, 2024
5edcf71
Remove superfluous call super
CharlesDuboisSAP Sep 3, 2024
7474fb1
reset httpclient-cache and -factory after each test case
newtork Sep 3, 2024
ac6f36c
Very minor code-style improvements in test
newtork Sep 3, 2024
ffa369a
Minor code-style in OpenAIController
newtork Sep 3, 2024
6cfeee9
Reduce README sample code
newtork Sep 3, 2024
6d4fd2f
Update OpenAiStreamingHandler.java (#43)
newtork Sep 3, 2024
a6c566a
Fix import
newtork Sep 3, 2024
543e003
Initial
newtork Sep 3, 2024
e810b52
Format
newtork Sep 3, 2024
89b7315
Improve type
newtork Sep 3, 2024
f6a4fe6
Added stream_options to model
CharlesDuboisSAP Sep 4, 2024
ead57b3
Change Executor#submit() to #execute()
newtork Sep 4, 2024
05dedf9
Change Executor#submit() to #execute()
newtork Sep 4, 2024
2604969
Merge branch 'streaming' of https://github.com/SAP/ai-sdk-java into s…
newtork Sep 4, 2024
9a3bf2f
Merge remote-tracking branch 'origin/main' into streaming
newtork Sep 4, 2024
a0ae779
Added usage testing
CharlesDuboisSAP Sep 4, 2024
2c934f7
Added beautiful Javadoc to enableStreaming
CharlesDuboisSAP Sep 4, 2024
77eb464
typo
CharlesDuboisSAP Sep 4, 2024
488f060
Fix mistake
CharlesDuboisSAP Sep 4, 2024
b5f48cd
Merge remote-tracking branch 'origin/streaming' into streaming-2
newtork Sep 4, 2024
06e3143
Merge remote-tracking branch 'origin/main' into streaming-2
newtork Sep 4, 2024
676c8ad
Syntax improvement to improve API stability.
newtork Sep 5, 2024
93e16d9
Syntax improvement to improve API stability.
newtork Sep 5, 2024
6e15131
Make exception types similar to BufferedReader original logic
newtork Sep 5, 2024
f6f122d
Format
newtork Sep 5, 2024
7aad621
Add nonnull characteristic to mirror BufferedReader original logic
newtork Sep 5, 2024
58e84b2
Merge branch 'main' into streaming-2
newtork Sep 6, 2024
db05057
Merge remote-tracking branch 'origin/main' into streaming-2
newtork Sep 10, 2024
9188893
Make buffer size accessible
a-d Sep 10, 2024
67a0489
Add test
a-d Sep 10, 2024
20cc897
Add assertion on stream count
a-d Sep 10, 2024
f4947a8
Simplify e2e code
a-d Sep 10, 2024
98e61de
Simplify README
a-d Sep 10, 2024
d034d2a
Partially revert
a-d Sep 10, 2024
aa7ae8e
Add assertion
a-d Sep 10, 2024
66ad4d7
Partially revert
a-d Sep 10, 2024
9dcb57a
Merge remote-tracking branch 'origin/main' into streaming-2
a-d Sep 24, 2024
7b275df
Minor code adjustments
a-d Sep 24, 2024
2e2c0df
Replace unnecessary nested types
a-d Sep 24, 2024
fa284ad
Merge nested type to renamed parent type
a-d Sep 24, 2024
cde54d6
Change code to ensure our lazy `hasNext()` has no unexpected side effect
a-d Sep 24, 2024
5088b99
Revert removing `emitter#complete()`
a-d Sep 24, 2024
fa8f91d
Add JavaDoc; Replace VAVR type
a-d Sep 24, 2024
e90883e
Address PMD warnings: change exception type
a-d Sep 24, 2024
4183ca3
Add unhappy-path test cases
a-d Sep 24, 2024
1463eb2
Revert code change in test app
a-d Sep 24, 2024
9fb63e3
Merge branch 'main' into streaming-2
newtork Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,12 @@
import static com.sap.ai.sdk.foundationmodels.openai.OpenAiResponseHandler.parseErrorAndThrow;

import com.sap.ai.sdk.foundationmodels.openai.model.StreamedDelta;
import io.vavr.control.Try;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpEntity;

@Slf4j
@RequiredArgsConstructor
Expand All @@ -42,20 +36,8 @@ Stream<D> handleResponse(@Nonnull final ClassicHttpResponse response)
@SuppressWarnings("PMD.CloseResource")
private Stream<D> parseResponse(@Nonnull final ClassicHttpResponse response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream is closed by the user of the Stream

This is false now. Could you update this comment and duplicate it in IterableStreamConverter.lines()
Alternatively just merge parseResponse inside of handleResponse since parseResponse only contains a return and nothing else

throws OpenAiClientException {
final HttpEntity responseEntity = response.getEntity();
if (responseEntity == null) {
throw new OpenAiClientException("Response from OpenAI model was empty.");
}
final InputStream inputStream;
try {
inputStream = responseEntity.getContent();
} catch (IOException e) {
throw new OpenAiClientException("Failed to read response content.", e);
}
final var br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));

// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
return br.lines()
return StreamConverter.streamLines(response.getEntity())
// half of the lines are empty newlines, the last line is "data: [DONE]"
.filter(line -> !line.isEmpty() && !"data: [DONE]".equals(line.trim()))
.peek(
Expand All @@ -74,10 +56,6 @@ private Stream<D> parseResponse(@Nonnull final ClassicHttpResponse response)
log.error("Failed to parse the following response from OpenAI model: {}", line);
throw new OpenAiClientException("Failed to parse delta message: " + line, e);
}
})
.onClose(
() ->
Try.run(inputStream::close)
.onFailure(e -> log.error("Could not close HTTP input stream", e)));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.sap.ai.sdk.foundationmodels.openai;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Spliterator.NONNULL;
import static java.util.Spliterator.ORDERED;

import io.vavr.control.Try;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.core5.http.HttpEntity;

@Slf4j
class StreamConverter {

/** see {@link BufferedReader#DEFAULT_CHAR_BUFFER_SIZE} */
static final int BUFFER_SIZE = 8192;

@SuppressWarnings("PMD.CloseResource")
@Nonnull
static Stream<String> streamLines(@Nullable final HttpEntity entity)
throws OpenAiClientException {
if (entity == null) {
throw new OpenAiClientException("OpenAI response was empty.");
}

final InputStream inputStream;
try {
inputStream = entity.getContent();
} catch (IOException e) {
throw new OpenAiClientException("Failed to read response content.", e);
}

final var reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8), BUFFER_SIZE);
final CloseHandler closeHandler =
() ->
Try.run(reader::close)
.onFailure(e -> log.error("Could not close HTTP input stream", e));

final var iterator = new SequentialIterator<>(reader::readLine, closeHandler);
final var spliterator = Spliterators.spliteratorUnknownSize(iterator, ORDERED | NONNULL);
return StreamSupport.stream(spliterator, /* NOT PARALLEL */ false).onClose(closeHandler::close);
}

@FunctionalInterface
private interface ReadHandler<T> {
newtork marked this conversation as resolved.
Show resolved Hide resolved
/**
* Read next entry for Stream.
*
* @return The next entry, or {@code null} when no further entry can be read.
* @throws IOException if no entry can be read anymore, unexpected.
*/
@Nullable
T readEntry() throws IOException;
}

@FunctionalInterface
private interface CloseHandler {
/** Close handler to be called when Stream terminated. */
void close();
}

@RequiredArgsConstructor
private static class SequentialIterator<T> implements Iterator<T> {
private final ReadHandler<T> readHandler;
private final CloseHandler stopHandler;
private boolean done = false;
private T next = null;

@Override
public boolean hasNext() {
newtork marked this conversation as resolved.
Show resolved Hide resolved
if (done) {
return false;
}
try {
next = readHandler.readEntry();
newtork marked this conversation as resolved.
Show resolved Hide resolved
if (next == null) {
done = true;
stopHandler.close();
}
} catch (final IOException e) {
done = true;
stopHandler.close();
throw new UncheckedIOException("Iterator stopped unexpectedly.", e);
}
return !done;
}

@Override
public T next() {
if (next == null && !hasNext()) {
throw new NoSuchElementException();
}
return next;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.sap.ai.sdk.foundationmodels.openai;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
import org.junit.jupiter.api.Test;

public class StreamConverterTest {
@SneakyThrows
@Test
void testStreamLines() {
final var TEMPLATE = "THIS\nIS\nA\nTEST\n";
final var input = TEMPLATE.repeat(StreamConverter.BUFFER_SIZE);
final var inputStream = spy(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);

final var sut = StreamConverter.streamLines(entity);
verify(inputStream, never()).read();
verify(inputStream, never()).read(any());
verify(inputStream, never()).read(any(), anyInt(), anyInt());

final var streamCounter = new AtomicInteger(0);
sut.peek(s -> streamCounter.incrementAndGet())
.forEach(
s ->
assertThat(s)
.containsAnyOf("THIS", "IS", "A", "TEST")
.doesNotContainAnyWhitespaces());

assertThat(streamCounter).hasValue(StreamConverter.BUFFER_SIZE * 4);
verify(inputStream, times(TEMPLATE.length() + 1))
.read(any(), anyInt(), eq(StreamConverter.BUFFER_SIZE));
verify(inputStream, times(1)).close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,10 @@ public static ResponseEntity<ResponseBodyEmitter> streamChatCompletionDeltas() {
final Runnable consumeStream =
() -> {
final var totalOutput = new OpenAiChatCompletionOutput();
// try-with-resources ensures the stream is closed
try (stream) {
stream
.peek(totalOutput::addDelta)
.forEach(delta -> send(emitter, delta.getDeltaContent()));
} finally {
send(emitter, "\n\n-----Total Output-----\n\n" + objectToJson(totalOutput));
emitter.complete();
newtork marked this conversation as resolved.
Show resolved Hide resolved
}
stream
.peek(totalOutput::addDelta)
.forEach(delta -> send(emitter, delta.getDeltaContent()));
send(emitter, "\n\n-----Total Output-----\n\n" + objectToJson(totalOutput));
};

ThreadContextExecutors.getExecutor().execute(consumeStream);
Expand Down