Skip to content

Commit 2da5fe2

Browse files
committed
Runs now have a "default" stream
1 parent 667bf8d commit 2da5fe2

File tree

6 files changed

+385
-35
lines changed

6 files changed

+385
-35
lines changed

packages/react-hooks/src/hooks/useRealtime.ts

Lines changed: 129 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -620,30 +620,138 @@ export type UseRealtimeStreamOptions<TPart> = UseApiClientOptions & {
620620
};
621621

622622
/**
623-
* Hook to subscribe to realtime updates of a stream.
623+
* Hook to subscribe to realtime updates of a stream with a specific stream key.
624624
*
625-
* @template TPart - The type of the part
626-
* @param {string} runId - The unique identifier of the run to subscribe to
627-
* @param {string} streamKey - The unique identifier of the stream to subscribe to
628-
* @param {UseRealtimeStreamOptions<TPart>} [options] - Configuration options for the subscription
629-
* @returns {UseRealtimeStreamInstance<TPart>} An object containing the current state of the stream, and error handling
625+
* This hook automatically subscribes to a stream and updates the `parts` array as new data arrives.
626+
* The stream subscription is automatically managed: it starts when the component mounts (or when
627+
* `enabled` becomes `true`) and stops when the component unmounts or when `stop()` is called.
628+
*
629+
* @template TPart - The type of each chunk/part in the stream
630+
* @param runId - The unique identifier of the run to subscribe to
631+
* @param streamKey - The unique identifier of the stream to subscribe to. Use this overload
632+
* when you want to read from a specific stream key.
633+
* @param options - Optional configuration for the stream subscription
634+
* @returns An object containing:
635+
* - `parts`: An array of all stream chunks received so far (accumulates over time)
636+
* - `error`: Any error that occurred during subscription
637+
* - `stop`: A function to manually stop the subscription
630638
*
631639
* @example
632-
* ```ts
633-
* const { parts, error } = useRealtimeStream<string>('run-id-123', 'stream-key-123');
640+
* ```tsx
641+
* "use client";
642+
* import { useRealtimeStream } from "@trigger.dev/react-hooks";
643+
*
644+
* function StreamViewer({ runId }: { runId: string }) {
645+
* const { parts, error } = useRealtimeStream<string>(
646+
* runId,
647+
* "my-stream",
648+
* {
649+
* accessToken: process.env.NEXT_PUBLIC_TRIGGER_PUBLIC_KEY,
650+
* }
651+
* );
652+
*
653+
* if (error) return <div>Error: {error.message}</div>;
654+
*
655+
* // Parts array accumulates all chunks
656+
* const fullText = parts.join("");
634657
*
635-
* for (const part of parts) {
636-
* console.log(part);
658+
* return <div>{fullText}</div>;
637659
* }
638660
* ```
661+
*
662+
* @example
663+
* ```tsx
664+
* // With custom options
665+
* const { parts, error, stop } = useRealtimeStream<ChatChunk>(
666+
* runId,
667+
* "chat-stream",
668+
* {
669+
* accessToken: publicKey,
670+
* timeoutInSeconds: 120,
671+
* startIndex: 10, // Start from the 10th chunk
672+
* throttleInMs: 50, // Throttle updates to every 50ms
673+
* onData: (chunk) => {
674+
* console.log("New chunk received:", chunk);
675+
* },
676+
* }
677+
* );
678+
*
679+
* // Manually stop the subscription
680+
* <button onClick={stop}>Stop Stream</button>
681+
* ```
639682
*/
640683
export function useRealtimeStream<TPart>(
641684
runId: string,
642685
streamKey: string,
643686
options?: UseRealtimeStreamOptions<TPart>
687+
): UseRealtimeStreamInstance<TPart>;
688+
/**
689+
* Hook to subscribe to realtime updates of a stream using the default stream key (`"default"`).
690+
*
691+
* This is a convenience overload that allows you to subscribe to the default stream without
692+
* specifying a stream key. The stream will be accessed with the key `"default"`.
693+
*
694+
* @template TPart - The type of each chunk/part in the stream
695+
* @param runId - The unique identifier of the run to subscribe to
696+
* @param options - Optional configuration for the stream subscription
697+
* @returns An object containing:
698+
* - `parts`: An array of all stream chunks received so far (accumulates over time)
699+
* - `error`: Any error that occurred during subscription
700+
* - `stop`: A function to manually stop the subscription
701+
*
702+
* @example
703+
* ```tsx
704+
* "use client";
705+
* import { useRealtimeStream } from "@trigger.dev/react-hooks";
706+
*
707+
* function DefaultStreamViewer({ runId }: { runId: string }) {
708+
* // Subscribe to the default stream
709+
* const { parts, error } = useRealtimeStream<string>(runId, {
710+
* accessToken: process.env.NEXT_PUBLIC_TRIGGER_PUBLIC_KEY,
711+
* });
712+
*
713+
* if (error) return <div>Error: {error.message}</div>;
714+
*
715+
* const fullText = parts.join("");
716+
* return <div>{fullText}</div>;
717+
* }
718+
* ```
719+
*
720+
* @example
721+
* ```tsx
722+
* // Conditionally enable the stream
723+
* const { parts } = useRealtimeStream<string>(runId, {
724+
* accessToken: publicKey,
725+
* enabled: !!runId && isStreaming, // Only subscribe when runId exists and isStreaming is true
726+
* });
727+
* ```
728+
*/
729+
export function useRealtimeStream<TPart>(
730+
runId: string,
731+
options?: UseRealtimeStreamOptions<TPart>
732+
): UseRealtimeStreamInstance<TPart>;
733+
export function useRealtimeStream<TPart>(
734+
runId: string,
735+
streamKeyOrOptions?: string | UseRealtimeStreamOptions<TPart>,
736+
options?: UseRealtimeStreamOptions<TPart>
644737
): UseRealtimeStreamInstance<TPart> {
738+
// Handle overload: useRealtimeStream(runId, options?) or useRealtimeStream(runId, streamKey, options?)
739+
const DEFAULT_STREAM_KEY = "default";
740+
741+
let streamKey: string;
742+
let opts: UseRealtimeStreamOptions<TPart> | undefined;
743+
744+
if (typeof streamKeyOrOptions === "string") {
745+
// useRealtimeStream(runId, streamKey, options?)
746+
streamKey = streamKeyOrOptions;
747+
opts = options;
748+
} else {
749+
// useRealtimeStream(runId, options?)
750+
streamKey = DEFAULT_STREAM_KEY;
751+
opts = streamKeyOrOptions;
752+
}
645753
const hookId = useId();
646-
const idKey = options?.id ?? hookId;
754+
const idKey = opts?.id ?? hookId;
647755

648756
const [initialPartsFallback] = useState([] as Array<TPart>);
649757

@@ -685,14 +793,14 @@ export function useRealtimeStream<TPart>(
685793

686794
const onData = useCallback(
687795
(data: TPart) => {
688-
if (options?.onData) {
689-
options.onData(data);
796+
if (opts?.onData) {
797+
opts.onData(data);
690798
}
691799
},
692-
[options?.onData]
800+
[opts?.onData]
693801
);
694802

695-
const apiClient = useApiClient(options);
803+
const apiClient = useApiClient(opts);
696804

697805
const triggerRequest = useCallback(async () => {
698806
try {
@@ -712,9 +820,9 @@ export function useRealtimeStream<TPart>(
712820
setError,
713821
onData,
714822
abortControllerRef,
715-
options?.timeoutInSeconds,
716-
options?.startIndex,
717-
options?.throttleInMs ?? 16
823+
opts?.timeoutInSeconds,
824+
opts?.startIndex,
825+
opts?.throttleInMs ?? 16
718826
);
719827
} catch (err) {
720828
// Ignore abort errors as they are expected.
@@ -732,10 +840,10 @@ export function useRealtimeStream<TPart>(
732840
// Mark the subscription as complete
733841
setIsComplete(true);
734842
}
735-
}, [runId, streamKey, mutateParts, partsRef, abortControllerRef, apiClient, setError]);
843+
}, [runId, streamKey, mutateParts, partsRef, abortControllerRef, apiClient, setError, onData, opts]);
736844

737845
useEffect(() => {
738-
if (typeof options?.enabled === "boolean" && !options.enabled) {
846+
if (typeof opts?.enabled === "boolean" && !opts.enabled) {
739847
return;
740848
}
741849

@@ -748,7 +856,7 @@ export function useRealtimeStream<TPart>(
748856
return () => {
749857
stop();
750858
};
751-
}, [runId, stop, options?.enabled]);
859+
}, [runId, stop, opts?.enabled, triggerRequest]);
752860

753861
return { parts: parts ?? initialPartsFallback, error, stop };
754862
}

0 commit comments

Comments
 (0)