generated from SAP/repository-template
-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Streaming] Fix issues when not using try-with-resource
#49
Open
newtork
wants to merge
92
commits into
main
Choose a base branch
from
streaming-2
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+221
−24
Open
Changes from 83 commits
Commits
Show all changes
92 commits
Select commit
Hold shift + click to select a range
d319aa9
OpenAI streaming
CharlesDuboisSAP 69ae7eb
Added homepage and error handling todo
CharlesDuboisSAP 7870e6d
Renamed vars
CharlesDuboisSAP 652ec1e
Added todos
CharlesDuboisSAP 727b3d4
Made stream generic, try-with resources, TEXT_EVENT_STREAM, exception…
CharlesDuboisSAP b3190a5
Formatting
bot-sdk-js f0fa3e6
close stream correctly
CharlesDuboisSAP 09ca6ea
Formatting
bot-sdk-js d86243a
Created OpenAiStreamOutput
CharlesDuboisSAP 2a4ce7b
Merge remote-tracking branch 'origin/streaming' into streaming
CharlesDuboisSAP cf6ec46
Formatting
bot-sdk-js a73f037
Renamed stream to streamChatCompletion, Added comments
CharlesDuboisSAP eb3f24a
Added total output
CharlesDuboisSAP fb2cdaf
Total output is printed
CharlesDuboisSAP fe078c7
Formatting
bot-sdk-js 09e1be0
addDelta is propagated everywhere
CharlesDuboisSAP 42ae946
addDelta is propagated everywhere
CharlesDuboisSAP e6e009a
forgotten addDeltas
CharlesDuboisSAP bee8fdc
Added jackson dependencies
CharlesDuboisSAP 5f03c6f
Added Javadoc
CharlesDuboisSAP e79ca8e
Removed 1 TODO
CharlesDuboisSAP ba2c5e0
PMD
CharlesDuboisSAP c10eecb
PMD again
CharlesDuboisSAP cdae1c6
Merge branch 'refs/heads/main' into streaming
CharlesDuboisSAP faa3b70
Merge branch 'refs/heads/main' into streaming
CharlesDuboisSAP 0e1a167
Added OpenAiClientTest.streamChatCompletion()
CharlesDuboisSAP 31dbd52
Change return type of stream, added e2e test
CharlesDuboisSAP de7e7f0
Added documentation
CharlesDuboisSAP 349936f
Added documentation framework-agnostic + throw if finish reason is in…
CharlesDuboisSAP 58b0bc9
Merge branch 'refs/heads/main' into streaming
CharlesDuboisSAP 3366c2e
Added error handling test
CharlesDuboisSAP c709d31
Updates from pair review / discussion
MatKuhr 73031d1
Cleanup + streamChatCompletion doesn't throw
CharlesDuboisSAP 6b1bfd0
PMD
CharlesDuboisSAP 23474ba
Added errorHandling test
CharlesDuboisSAP 769cd7d
Apply suggestions from code review
CharlesDuboisSAP 118dc69
Dependency analyze
CharlesDuboisSAP acd21c0
Review comments
CharlesDuboisSAP 28268b2
Make client static
CharlesDuboisSAP 9a9a44b
Formatting
bot-sdk-js 788db03
PMD
CharlesDuboisSAP 0616f55
Fix tests
CharlesDuboisSAP 3446bf0
Removed exception constructors no args
CharlesDuboisSAP 45a20c6
Refactor exception message
CharlesDuboisSAP f843061
Readme sentences
CharlesDuboisSAP 5edcf71
Remove superfluous call super
CharlesDuboisSAP 7474fb1
reset httpclient-cache and -factory after each test case
newtork ac6f36c
Very minor code-style improvements in test
newtork ffa369a
Minor code-style in OpenAIController
newtork 6cfeee9
Reduce README sample code
newtork 6d4fd2f
Update OpenAiStreamingHandler.java (#43)
newtork a6c566a
Fix import
newtork 543e003
Initial
newtork e810b52
Format
newtork 89b7315
Improve type
newtork f6a4fe6
Added stream_options to model
CharlesDuboisSAP ead57b3
Change Executor#submit() to #execute()
newtork 05dedf9
Change Executor#submit() to #execute()
newtork 2604969
Merge branch 'streaming' of https://github.com/SAP/ai-sdk-java into s…
newtork 9a3bf2f
Merge remote-tracking branch 'origin/main' into streaming
newtork a0ae779
Added usage testing
CharlesDuboisSAP 2c934f7
Added beautiful Javadoc to enableStreaming
CharlesDuboisSAP 77eb464
typo
CharlesDuboisSAP 488f060
Fix mistake
CharlesDuboisSAP b5f48cd
Merge remote-tracking branch 'origin/streaming' into streaming-2
newtork 06e3143
Merge remote-tracking branch 'origin/main' into streaming-2
newtork 676c8ad
Syntax improvement to improve API stability.
newtork 93e16d9
Syntax improvement to improve API stability.
newtork 6e15131
Make exception types similar to BufferedReader original logic
newtork f6f122d
Format
newtork 7aad621
Add nonnull characteristic to mirror BufferedReader original logic
newtork 58e84b2
Merge branch 'main' into streaming-2
newtork db05057
Merge remote-tracking branch 'origin/main' into streaming-2
newtork 9188893
Make buffer size accessible
a-d 67a0489
Add test
a-d 20cc897
Add assertion on stream count
a-d f4947a8
Simplify e2e code
a-d 98e61de
Simplify README
a-d d034d2a
Partially revert
a-d aa7ae8e
Add assertion
a-d 66ad4d7
Partially revert
a-d 9dcb57a
Merge remote-tracking branch 'origin/main' into streaming-2
a-d 7b275df
Minor code adjustments
a-d 2e2c0df
Replace unnecessary nested types
a-d fa284ad
Merge nested type to renamed parent type
a-d cde54d6
Change code to ensure our lazy `hasNext()` has no unexpected side effect
a-d 5088b99
Revert removing `emitter#complete()`
a-d fa8f91d
Add JavaDoc; Replace VAVR type
a-d e90883e
Address PMD warnings: change exception type
a-d 4183ca3
Add unhappy-path test cases
a-d 1463eb2
Revert code change in test app
a-d 9fb63e3
Merge branch 'main' into streaming-2
newtork File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
108 changes: 108 additions & 0 deletions
108
...n-models/openai/src/main/java/com/sap/ai/sdk/foundationmodels/openai/StreamConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package com.sap.ai.sdk.foundationmodels.openai; | ||
|
||
import static java.nio.charset.StandardCharsets.UTF_8; | ||
import static java.util.Spliterator.NONNULL; | ||
import static java.util.Spliterator.ORDERED; | ||
|
||
import io.vavr.control.Try; | ||
import java.io.BufferedReader; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.io.UncheckedIOException; | ||
import java.util.Iterator; | ||
import java.util.NoSuchElementException; | ||
import java.util.Spliterators; | ||
import java.util.stream.Stream; | ||
import java.util.stream.StreamSupport; | ||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.hc.core5.http.HttpEntity; | ||
|
||
@Slf4j | ||
class StreamConverter { | ||
|
||
/** see {@link BufferedReader#DEFAULT_CHAR_BUFFER_SIZE} */ | ||
static final int BUFFER_SIZE = 8192; | ||
|
||
@SuppressWarnings("PMD.CloseResource") | ||
@Nonnull | ||
static Stream<String> streamLines(@Nullable final HttpEntity entity) | ||
throws OpenAiClientException { | ||
if (entity == null) { | ||
throw new OpenAiClientException("OpenAI response was empty."); | ||
} | ||
|
||
final InputStream inputStream; | ||
try { | ||
inputStream = entity.getContent(); | ||
} catch (IOException e) { | ||
throw new OpenAiClientException("Failed to read response content.", e); | ||
} | ||
|
||
final var reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8), BUFFER_SIZE); | ||
final CloseHandler closeHandler = | ||
() -> | ||
Try.run(reader::close) | ||
.onFailure(e -> log.error("Could not close HTTP input stream", e)); | ||
|
||
final var iterator = new SequentialIterator<>(reader::readLine, closeHandler); | ||
final var spliterator = Spliterators.spliteratorUnknownSize(iterator, ORDERED | NONNULL); | ||
return StreamSupport.stream(spliterator, /* NOT PARALLEL */ false).onClose(closeHandler::close); | ||
} | ||
|
||
@FunctionalInterface | ||
private interface ReadHandler<T> { | ||
newtork marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/** | ||
* Read next entry for Stream. | ||
* | ||
* @return The next entry, or {@code null} when no further entry can be read. | ||
* @throws IOException if no entry can be read anymore, unexpected. | ||
*/ | ||
@Nullable | ||
T readEntry() throws IOException; | ||
} | ||
|
||
@FunctionalInterface | ||
private interface CloseHandler { | ||
/** Close handler to be called when Stream terminated. */ | ||
void close(); | ||
} | ||
|
||
@RequiredArgsConstructor | ||
private static class SequentialIterator<T> implements Iterator<T> { | ||
private final ReadHandler<T> readHandler; | ||
private final CloseHandler stopHandler; | ||
private boolean done = false; | ||
private T next = null; | ||
|
||
@Override | ||
public boolean hasNext() { | ||
newtork marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (done) { | ||
return false; | ||
} | ||
try { | ||
next = readHandler.readEntry(); | ||
newtork marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (next == null) { | ||
done = true; | ||
stopHandler.close(); | ||
} | ||
} catch (final IOException e) { | ||
done = true; | ||
stopHandler.close(); | ||
throw new UncheckedIOException("Iterator stopped unexpectedly.", e); | ||
} | ||
return !done; | ||
} | ||
|
||
@Override | ||
public T next() { | ||
if (next == null && !hasNext()) { | ||
throw new NoSuchElementException(); | ||
} | ||
return next; | ||
} | ||
} | ||
} |
47 changes: 47 additions & 0 deletions
47
...dels/openai/src/test/java/com/sap/ai/sdk/foundationmodels/openai/StreamConverterTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package com.sap.ai.sdk.foundationmodels.openai; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.anyInt; | ||
import static org.mockito.ArgumentMatchers.eq; | ||
import static org.mockito.Mockito.never; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import lombok.SneakyThrows; | ||
import org.apache.hc.core5.http.ContentType; | ||
import org.apache.hc.core5.http.io.entity.InputStreamEntity; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class StreamConverterTest { | ||
@SneakyThrows | ||
@Test | ||
void testStreamLines() { | ||
final var TEMPLATE = "THIS\nIS\nA\nTEST\n"; | ||
final var input = TEMPLATE.repeat(StreamConverter.BUFFER_SIZE); | ||
final var inputStream = spy(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))); | ||
final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN); | ||
|
||
final var sut = StreamConverter.streamLines(entity); | ||
verify(inputStream, never()).read(); | ||
verify(inputStream, never()).read(any()); | ||
verify(inputStream, never()).read(any(), anyInt(), anyInt()); | ||
|
||
final var streamCounter = new AtomicInteger(0); | ||
sut.peek(s -> streamCounter.incrementAndGet()) | ||
.forEach( | ||
s -> | ||
assertThat(s) | ||
.containsAnyOf("THIS", "IS", "A", "TEST") | ||
.doesNotContainAnyWhitespaces()); | ||
|
||
assertThat(streamCounter).hasValue(StreamConverter.BUFFER_SIZE * 4); | ||
verify(inputStream, times(TEMPLATE.length() + 1)) | ||
.read(any(), anyInt(), eq(StreamConverter.BUFFER_SIZE)); | ||
verify(inputStream, times(1)).close(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is false now. Could you update this comment and duplicate it in
IterableStreamConverter.lines()
Alternatively just merge
parseResponse
inside ofhandleResponse
sinceparseResponse
only contains areturn
and nothing else