Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add ability to stop/interupt stream #1221

Open
TaylorN15 opened this issue Feb 2, 2024 · 19 comments
Open

Feature: Add ability to stop/interupt stream #1221

TaylorN15 opened this issue Feb 2, 2024 · 19 comments

Comments

@TaylorN15
Copy link

TaylorN15 commented Feb 2, 2024

This issue is for a: (mark with an x)

- [ ] bug report -> please search issues before submitting
- [X] feature request
- [ ] documentation issue or request
- [ ] regression (a behavior that used to work and stopped in a new release)

I would like to have a button that will stop/interrupt the stream being rendered (ChatGPT has this). So if the response is going downhill, the user would be able to stop it before it goes any further. I've tried implementing something myself, but it doesn't quite work with the ASyncGenerator.

@pamelafox
Copy link
Collaborator

Interesting. I wonder if you could try ReadableStream.cancel() method, as ReadableStream() is what we use in the frontend:
https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/cancel

The spec says that sending a cancel event should attempt to cancel the underlying mechanism, so I think it would close the TCP connection? I'm not sure if you'd still pay for the full tokens of the call though. That's worth experimenting with, if you haven't yet.

Otherwise I fear you'd need to move to something with better bidirectional support like web sockets.

@TaylorN15
Copy link
Author

Thanks for the prompt response! I was trying to cancel the asyncio thread in Quart, but cancelling the stream make much more sense :)

I've been able to implement this just now, and it seems to work quite well. As far as I can tell, it also stops the response coming back from the model, but I am not sure if there's a way I can check if I was still billed for the full token count, but it doesn't bother me that much, the user experience is more important :)

const [responseStream, setResponseStream] = useState<ReadableStream|null>(null);
const handleAsyncRequest = async (question: string, answers: [string, ChatAppResponse][], setAnswers: Function, responseBody: ReadableStream<any>) => {
        setResponseStream(responseBody);
// ... other code ... //

Then I've just added a button that calls onStopClick()

const onStopClick = async () => {
    try {
        if (responseStream) {
            responseStream.cancel("User cancelled the stream");
        }
    } catch (e) {
        console.log(e);
    }
};

@pamelafox
Copy link
Collaborator

Very cool! You might be able to monitor token usage in your Azure OpenAI metrics dashboard.

@oracle-code
Copy link

Looks nice, very cool indeed. We will try it on our end and give you feedback!

@TrentinMarc
Copy link

TrentinMarc commented Mar 27, 2024

Hi all,
@TaylorN15 don't you face an error such as :

ReadableStream.cancel: Cannot cancel a stream locked by a reader ?

I could not interrupt my chat the way you shared, but I just found a workaround like this :

export class ClassForReadNDJSONStream {
    reader: ReadableStreamDefaultReader;
    responseBody: ReadableStream<any>;

    constructor(responseBody: ReadableStream<any>) {
        this.responseBody = responseBody;
        this.reader = responseBody.getReader();
    }
    async *readStream() {
        let runningText = "";
        let decoder = new TextDecoder("utf-8");
        while (true) {
            const { done, value } = await this.reader.read();
            if (done) break;
            var text = decoder.decode(value, { stream: true });
            const objects = text.split("\n");
            for (const obj of objects) {
                try {
                    runningText += obj;
                    let result = JSON.parse(runningText);
                    yield result;
                    runningText = "";
                } catch (e) {
                    // Not a valid JSON object
                }
            }
        }
    }
}

and

const c4readNDSJonstream = new ClassForReadNDJSONStream(responseBody);
setResponseStream(c4readNDSJonstream); // Using your method with ClassForReadNDJSONStream signature

for await (const event of c4readNDSJonstream.readStream()) {
// other code ...

Based on @pamelafox readNDJSONStream lib but expose reader to allow reader.releaseLock() (and then responseBody.cancel()) in onStopClick() to interrupt chat answer.

What do you think about it ?

@TaylorN15
Copy link
Author

Yes, actually, I did get that error :)

I ended up implementing an abort controller instead.

@oracle-code
Copy link

oracle-code commented Mar 27, 2024

Would be able to share the code ? @TaylorN15

@TaylorN15
Copy link
Author

Add the AbortController
const [abortController, setAbortController] = useState<AbortController | null>(null);

Add the below into makeApiRequest

const makeApiRequest = async (question: string) => {
	const controller = new AbortController();
	setAbortController(controller);

Add the AbortSignal to the props of handleAsyncRequest

const handleAsyncRequest = async (
	question: string,
	answers: [string, ChatAppResponse][],
	responseBody: ReadableStream<any>,
	signal: AbortSignal
) => {

And the logic for handling the abort, also in handleAsyncRequest

setIsStreaming(true);
for await (const event of readNDJSONStream(responseBody)) {
	if (signal.aborted) {
		// Abort the stream if requested
		break;
	}

Create a function for when the button is clicked, and call from where you want, I have mine in the question container, so that the stop button appears instead of the send button, if the answer is streaming

const onStopClick = async () => {
	try {
		if (abortController) {
			abortController.abort();
		}
	} catch (e) {
		console.log("An error occurred trying to stop the stream: ", error);
	}
};

@TrentinMarc
Copy link

Thanks a lot @TaylorN15

AbortController works properly for this usage. Just in case, I passed signal directly to chatApi function, it allows you to cancel chat at any time (before you enter the "for await" loop).

export async function chatApi(request: ChatAppRequest, idToken: string | undefined, signal: AbortSignal): Promise<Response> {
    return await fetch(`${BACKEND_URI}/chat`, {
        method: "POST",
        headers: getHeaders(idToken),
        body: JSON.stringify(request),
        signal: signal
    });
}

@TaylorN15
Copy link
Author

Nice, does yours actually stop the stream on the backend? It seems that most of the time, mine doesn’t. It doesn’t bother me much, but it would be nice to reduce the token usage :)

@TrentinMarc
Copy link

Hello, sorry for the delay.

Yes, it stops the stream on the backend. Depending on when the chat is stopped, I may save a few tokens.

image

@TaylorN15
Copy link
Author

Did you add any code to the backend to handle the abort?

@TrentinMarc
Copy link

No, I didn't add anything to the backend, as fetch api natively handles AbortController signals.

@pamelafox pamelafox reopened this Apr 12, 2024
@ssjdan27
Copy link

@TrentinMarc I am curious about how you implemented this. I tried following the code layed out here, but when I cancel the stream in the chat I get the following error message on the screen:
AbortError: BodyStreamBuffer was aborted

I was wondering if there is a more graceful approach to handling this that you may be aware of.

@TrentinMarc
Copy link

@ssjdan27 I did not check the recent updates on this project as I stopped working with, but when we were working on it, the simpliest way was to add the signal parameter to the fecth() function performed by chapApi().

Perhaps the asynchronus process has been edited in the meantime ?

If you run out of solution, let's share more informations about your problem :)

@ssjdan27
Copy link

@TrentinMarc Thank you for your response and I believe I figured it out! I will share my code below and can maybe get some input on it. Excuse my formatting. I currently do not have this code in Github, but if more people are interested, I could potentially make a PR for it so that the changes are easier to see.

This is what chatAPI looks like in api.ts:

export async function chatApi(request: ChatAppRequest, idToken: string | undefined, signal: AbortSignal): Promise<Response> {
    return await fetch(`${BACKEND_URI}/chat`, {
        method: "POST",
        headers: { ...getHeaders(idToken), "Content-Type": "application/json" },
        body: JSON.stringify(request),
        signal: signal
    });
}

I made some changed to QuestionInput.tsx in order to conditionally render the stop streaming button. Here are the changes in QuestionInput.tsx:
import { Send28Filled, Stop24Filled } from "@fluentui/react-icons";

interface Props {
// all other code stays the same just add these
    onStop: () => void;
    isStreaming: boolean;
}
export const QuestionInput = ({ onSend, onStop, disabled, isStreaming, placeholder, clearOnSend, initQuestion }: Props) => {
// more code here

return (
        /* other code here but this is the changed code */
         <div className={styles.questionInputButtonsContainer}>
                {isStreaming ? (  
                    <Tooltip content="Stop streaming" relationship="label">
                        <Button size="large" icon={<Stop24Filled primaryFill="rgba(255, 0, 0, 1)" />} onClick={onStop} />
                    </Tooltip>
                ) : (
                    <Tooltip content="Ask question" relationship="label">
                        <Button size="large" icon={<Send28Filled primaryFill="rgba(115, 118, 225, 1)" />} disabled={sendQuestionDisabled} onClick={sendQuestion} />
                    </Tooltip>
                )}
            </div>
    );
}

The rest of the changes are made in Chat.tsx:
const [partialResponse, setPartialResponse] = useState<string>(""); - added the creation of a partialResponse variable so that we can record the partially streamed input to the frontend

const [abortController, setAbortController] = useState<AbortController | null>(null); - creation of abortController

Here is the handleAsyncRequest function.

 const handleAsyncRequest = async (question: string, answers: [string, ChatAppResponse][], responseBody: ReadableStream<any>, signal: AbortSignal) => {
        let answer: string = "";
        let askResponse: ChatAppResponse = {} as ChatAppResponse;
    
        const updateState = (newContent: string) => {
            return new Promise(resolve => {
                setTimeout(() => {
                    answer += newContent;
                    setPartialResponse(answer); // Update partial response state
                    const latestResponse: ChatAppResponse = {
                        ...askResponse,
                        choices: [{ ...askResponse.choices[0], message: { content: answer, role: askResponse.choices[0].message.role } }]
                    };
                    setStreamedAnswers([...answers, [question, latestResponse]]);
                    resolve(null);
                }, 20); // 20 seems to be a nice value for this because it allows for faster streaming to the frontend and isn't so fast that it catches up to the stream object
            });
        };
    
        try {
            setIsStreaming(true);
            for await (const event of readNDJSONStream(responseBody)) {
                if (signal.aborted) {
                    // Abort the stream if requested
                    break;
                }
                if (event["choices"] && event["choices"][0]["context"] && event["choices"][0]["context"]["data_points"]) {
                    event["choices"][0]["message"] = event["choices"][0]["delta"];
                    askResponse = event as ChatAppResponse;
                } else if (event["choices"] && event["choices"][0]["delta"]["content"]) {
                    setIsLoading(false);
                    await updateState(event["choices"][0]["delta"]["content"]);
                } else if (event["choices"] && event["choices"][0]["context"]) {
                    // Update context with new keys from latest event
                    askResponse.choices[0].context = { ...askResponse.choices[0].context, ...event["choices"][0]["context"] };
                } else if (event["error"]) {
                    throw Error(event["error"]);
                }
            }
        } 
        
        catch (e) {
            console.error("error in handleAsyncRequest: ", e);
        }
        
        finally {
            setIsStreaming(false);
            setPartialResponse(""); // make sure to reset the partialResponse to a clean slate once the streaming has finished; this is because the response is no longer partial and rather complete!
        }
        const fullResponse: ChatAppResponse = {
            ...askResponse,
            choices: [{ ...askResponse.choices[0], message: { content: answer, role: askResponse.choices[0].message.role } }]
        };
        return fullResponse;
    };

Here are the changes made in makeApiRequest function as well:

const makeApiRequest = async (question: string) => {
        const controller = new AbortController();
        setAbortController(controller);
    
        lastQuestionRef.current = question;
    
        error && setError(undefined);
        setIsLoading(true);
        setActiveCitation(undefined);
        setActiveAnalysisPanelTab(undefined);
    
        const token = client ? await getToken(client) : undefined;
    
        try {
            const messages: ResponseMessage[] = answers.flatMap(a => [
                { content: a[0], role: "user" },
                { content: a[1].choices[0].message.content, role: "assistant" }
            ]);
    
            const request: ChatAppRequest = {
                messages: [...messages, { content: question, role: "user" }],
                stream: shouldStream,
                context: {
                    overrides: {
                        prompt_template: promptTemplate.length === 0 ? undefined : promptTemplate,
                        exclude_category: excludeCategory.length === 0 ? undefined : excludeCategory,
                        top: retrieveCount,
                        temperature: temperature,
                        minimum_reranker_score: minimumRerankerScore,
                        minimum_search_score: minimumSearchScore,
                        retrieval_mode: retrievalMode,
                        semantic_ranker: useSemanticRanker,
                        semantic_captions: useSemanticCaptions,
                        suggest_followup_questions: useSuggestFollowupQuestions,
                        use_oid_security_filter: useOidSecurityFilter,
                        use_groups_security_filter: useGroupsSecurityFilter,
                        vector_fields: vectorFieldList,
                        use_gpt4v: useGPT4V,
                        gpt4v_input: gpt4vInput
                    }
                },
                // ChatAppProtocol: Client must pass on any session state received from the server
                session_state: answers.length ? answers[answers.length - 1][1].choices[0].session_state : null
            };
    
            const response = await chatApi(request, token, controller.signal);
            if (!response.body) {
                throw Error("No response body");
            }
            if (shouldStream) {
                const parsedResponse: ChatAppResponse = await handleAsyncRequest(question, answers, response.body, controller.signal);
                setAnswers([...answers, [question, parsedResponse]]);
            } else {
                const parsedResponse: ChatAppResponseOrError = await response.json();
                if (response.status > 299 || !response.ok) {
                    throw Error(parsedResponse.error || "Unknown error");
                }
                setAnswers([...answers, [question, parsedResponse as ChatAppResponse]]);
            }
        } catch (e) {
            setError(e);
        } finally {
            setIsLoading(false);
        }
    };

I also added another conditional step to the rendering output:

{partialResponse && !isStreaming && (
                                <div>
                                    <UserChatMessage message={lastQuestionRef.current} />
                                    <div className={styles.chatMessageGpt}>
                                        <Answer
                                            isStreaming={false}
                                            answer={{
                                                choices: [
                                                    {
                                                        message: { content: partialResponse, role: "assistant" }, /* it might be best to figure out a way to side step having to fulfill these parameters; or we could also tell the LLM that the partialResponse is only a partialResponse so that it does not get confused */
                                                        context: {
                                                            data_points: [],
                                                            followup_questions: null,
                                                            thoughts: []
                                                        },
                                                        index: 0,
                                                        session_state: undefined
                                                    }
                                                ]
                                            }}
                                            isSelected={false}
                                            onCitationClicked={() => {}}
                                            onThoughtProcessClicked={() => {}}
                                            onSupportingContentClicked={() => {}}
                                            onFollowupQuestionClicked={() => {}}
                                            showFollowupQuestions={false}
                                        />
                                    </div>
                                </div>
                            )}

<div className="{styles.chatInput}">
  <QuestionInput clearOnSend placeholder="Type a new question"
  disabled={isLoading} onSend={question => makeApiRequest(question)}
  isStreaming={isStreaming} onStop={onStopClick} />
</div>

@cforce
Copy link
Contributor

cforce commented Jul 31, 2024

Please create an Pull Request

@ssjdan27
Copy link

@cforce I will work on it! I will do my best to get that open within the next few days.

@ssjdan27
Copy link

ssjdan27 commented Aug 4, 2024

@cforce I just opened #1884. Feel free to check it out!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants