Skip to content

Commit

Permalink
Merge pull request #16 from moe-mizrak/dev
Browse files Browse the repository at this point in the history
buffering for filterStreamingResponse for incomplete data is added
  • Loading branch information
moe-mizrak authored Jun 26, 2024
2 parents 9ff0285 + 62325ad commit a0ac154
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 14 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ $stream = $promise->wait(); // $stream is type of GuzzleHttp\Psr7\Stream
* 1) You can retrieve whole raw response as: - Choose 1) or 2) depending on your case.
*/
$rawResponseAll = $stream->getContents(); // Instead of chunking streamed response as below - while (! $stream->eof()), it waits and gets raw response all together.
$response = LaravelOpenRouter::filterStreamingResponse($rawResponse); // Optionally you can use filterStreamingResponse to filter raw streamed response, and map it into array of responseData DTO same as chatRequest response format.
$response = LaravelOpenRouter::filterStreamingResponse($rawResponseAll); // Optionally you can use filterStreamingResponse to filter raw streamed response, and map it into array of responseData DTO same as chatRequest response format.

// 2) Or Retrieve streamed raw response as it becomes available:
while (! $stream->eof()) {
Expand Down
65 changes: 52 additions & 13 deletions src/OpenRouterRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use GuzzleHttp\Exception\GuzzleException;
use GuzzleHttp\Promise\PromiseInterface;
use JsonException;
use MoeMizrak\LaravelOpenrouter\DTO\ChatData;
use MoeMizrak\LaravelOpenrouter\DTO\CostResponseData;
use MoeMizrak\LaravelOpenrouter\DTO\ErrorData;
Expand All @@ -25,6 +26,9 @@ class OpenRouterRequest extends OpenRouterAPI
{
use DataHandlingTrait;

// Buffer variable for incomplete streaming data.
private static string $buffer = '';

/**
* Sends a model request for the given chat conversation.
*
Expand Down Expand Up @@ -124,24 +128,59 @@ function (ResponseInterface $response) {
*/
public function filterStreamingResponse(string $streamingResponse): array
{
// Prepend any leftover data from the previous iteration
$streamingResponse = self::$buffer . $streamingResponse;
// Clear buffer
self::$buffer = '';

// Split the string by lines
$lines = explode("\n", $streamingResponse);

// Filter out unnecessary lines
$filteredLines = array_filter($lines, function($line) {
// Check if line starts with "data: {"
return str_starts_with($line, 'data: {');
});

// Decode the JSON data in each line
// Filter out unnecessary lines and decode the JSON data
$responseDataArray = [];

foreach ($filteredLines as $line) {
// Remove "data: " and decode JSON
$data = json_decode(substr($line, strlen('data: ')), true);
// Check whether decoding was successful.
if (json_last_error() === JSON_ERROR_NONE) {
$responseDataArray[] = $this->formChatResponse($data);
// Flag to indicate if the first line is a complete JSON
$firstLineComplete = false;

foreach ($lines as $line) {
if (str_starts_with($line, 'data: ')) {
// Remove "data: " prefix
$jsonData = substr($line, strlen('data: '));

try {
// Attempt to decode the JSON data
$data = json_decode($jsonData, true, 512, JSON_THROW_ON_ERROR);
$responseDataArray[] = $this->formChatResponse($data);
$firstLineComplete = true;
} catch (JsonException $e) {
// If JSON decoding fails, buffer the line and continue
self::$buffer = $line;
continue;
}
} else if (trim($line) === '' && ! empty(self::$buffer)) {
// If the line is empty and there's something in the buffer, try to process the buffer
try {
// Attempt to decode the JSON data
$data = json_decode(self::$buffer, true, 512, JSON_THROW_ON_ERROR);
$responseDataArray[] = $this->formChatResponse($data);
self::$buffer = ''; // Clear buffer after successful processing
} catch (JsonException $e) {
// If JSON decoding fails, retain the buffer for next iteration
continue;
}
} else if (! str_starts_with($line, 'data: ') && ! empty(trim($line))) {
// If the line doesn't start with 'data: ', it might be part of a multiline JSON or a partial line
if (! $firstLineComplete) {
// If it's the first line and not complete, assume it's part of the first JSON object
self::$buffer = $line;
$firstLineComplete = true; // Set flag to true after buffering incomplete first line
} else {
// If it's not the first line or the first line is complete, buffer it for next iteration
self::$buffer .= $line;
}
} else {
// Line does not contain 'data: ' and is not part of a multiline JSON, likely incomplete
self::$buffer .= $line;
}
}

Expand Down

0 comments on commit a0ac154

Please sign in to comment.