diff --git a/packages/core/package.json b/packages/core/package.json index 1b6f2625..ac96dc2a 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,5 +1,5 @@ { - "version": "0.5.0", + "version": "0.6.0", "repository": { "type": "git", "url": "git+https://github.com/re-rxjs/react-rxjs.git" diff --git a/packages/core/src/Subscribe.test.tsx b/packages/core/src/Subscribe.test.tsx index 09ac70e9..6e68efb3 100644 --- a/packages/core/src/Subscribe.test.tsx +++ b/packages/core/src/Subscribe.test.tsx @@ -1,23 +1,26 @@ import React from "react" import { render } from "@testing-library/react" -import { defer, Subject } from "rxjs" -import { share, finalize } from "rxjs/operators" -import { Subscribe } from "./" +import { Observable } from "rxjs" +import { Subscribe, bind } from "./" describe("Subscribe", () => { it("subscribes to the provided observable and remains subscribed until it's unmounted", () => { let nSubscriptions = 0 - const source$ = defer(() => { - nSubscriptions++ - return new Subject() - }).pipe( - finalize(() => { - nSubscriptions-- + const [useNumber, number$] = bind( + new Observable(() => { + nSubscriptions++ + return () => { + nSubscriptions-- + } }), - share(), ) - const TestSubscribe: React.FC = () => + const Number: React.FC = () => <>{useNumber()} + const TestSubscribe: React.FC = () => ( + + + + ) expect(nSubscriptions).toBe(0) diff --git a/packages/core/src/Subscribe.tsx b/packages/core/src/Subscribe.tsx index 6c3fa1c9..e44c2a60 100644 --- a/packages/core/src/Subscribe.tsx +++ b/packages/core/src/Subscribe.tsx @@ -1,5 +1,10 @@ -import React, { useState, useEffect } from "react" -import { Observable } from "rxjs" +import React, { useState, Suspense, useLayoutEffect, ReactNode } from "react" +import { Observable, noop } from "rxjs" + +const p = Promise.resolve() +const Throw = () => { + throw p +} /** * A React Component that creates a subscription to the provided observable once @@ -12,15 +17,27 @@ import { Observable } from "rxjs" */ export const Subscribe: React.FC<{ source$: Observable - fallback?: null | JSX.Element + fallback?: NonNullable | null }> = ({ source$, children, fallback }) => { - const [mounted, setMounted] = useState(0) - useEffect(() => { - const subscription = source$.subscribe() + const [mounted, setMounted] = useState(() => { + try { + ;(source$ as any).gV() + return 1 + } catch (e) { + return e.then ? 1 : 0 + } + }) + useLayoutEffect(() => { + const subscription = source$.subscribe(noop, (e) => + setMounted(() => { + throw e + }), + ) setMounted(1) return () => { subscription.unsubscribe() } }, [source$]) - return <>{mounted ? children : fallback} + const fBack = fallback || null + return {mounted ? children : } } diff --git a/packages/core/src/bind/connectFactoryObservable.test.tsx b/packages/core/src/bind/connectFactoryObservable.test.tsx index 9b60d4ea..7ad360dd 100644 --- a/packages/core/src/bind/connectFactoryObservable.test.tsx +++ b/packages/core/src/bind/connectFactoryObservable.test.tsx @@ -3,14 +3,14 @@ import { of, defer, concat, - BehaviorSubject, throwError, Observable, Subject, + merge, } from "rxjs" import { renderHook, act as actHook } from "@testing-library/react-hooks" -import { switchMap, delay, take, catchError, map } from "rxjs/operators" -import { FC, Suspense, useState } from "react" +import { delay, take, catchError, map, switchMapTo } from "rxjs/operators" +import { FC, useState } from "react" import React from "react" import { act as componentAct, @@ -18,7 +18,7 @@ import { screen, render, } from "@testing-library/react" -import { bind } from "../" +import { bind, Subscribe } from "../" import { TestErrorBoundary } from "../test-helpers/TestErrorBoundary" const wait = (ms: number) => new Promise((res) => setTimeout(res, ms)) @@ -42,8 +42,8 @@ describe("connectFactoryObservable", () => { }) describe("hook", () => { it("returns the latest emitted value", async () => { - const valueStream = new BehaviorSubject(1) - const [useNumber] = bind(() => valueStream) + const valueStream = new Subject() + const [useNumber] = bind(() => valueStream, 1) const { result } = renderHook(() => useNumber()) expect(result.current).toBe(1) @@ -56,13 +56,15 @@ describe("connectFactoryObservable", () => { it("suspends the component when the observable hasn't emitted yet.", async () => { const source$ = of(1).pipe(delay(100)) const [useDelayedNumber, getDelayedNumber$] = bind(() => source$) - const subs = getDelayedNumber$().subscribe() const Result: React.FC = () =>
Result {useDelayedNumber()}
const TestSuspense: React.FC = () => { return ( - Waiting}> + Waiting} + > - +
) } @@ -75,7 +77,51 @@ describe("connectFactoryObservable", () => { expect(screen.queryByText("Result 1")).not.toBeNull() expect(screen.queryByText("Waiting")).toBeNull() - subs.unsubscribe() + }) + + it("synchronously mounts the emitted value if the observable emits synchronously", () => { + const source$ = of(1) + const [useDelayedNumber, getDelayedNumber$] = bind(() => source$) + const Result: React.FC = () =>
Result {useDelayedNumber()}
+ const TestSuspense: React.FC = () => { + return ( + Waiting} + > + + + ) + } + + render() + + expect(screen.queryByText("Result 1")).not.toBeNull() + expect(screen.queryByText("Waiting")).toBeNull() + }) + + it("doesn't mount the fallback element if the subscription is already active", () => { + const source$ = new Subject() + const [useDelayedNumber, getDelayedNumber$] = bind(() => source$) + const Result: React.FC = () =>
Result {useDelayedNumber()}
+ const TestSuspense: React.FC = () => { + return ( + Waiting} + > + + + ) + } + + const subscription = getDelayedNumber$().subscribe() + source$.next(1) + render() + + expect(screen.queryByText("Result 1")).not.toBeNull() + expect(screen.queryByText("Waiting")).toBeNull() + subscription.unsubscribe() }) it("shares the multicasted subscription with all of the components that use the same parameters", async () => { @@ -114,7 +160,12 @@ describe("connectFactoryObservable", () => { }) it("returns the value of next new Observable when the arguments change", () => { - const [useNumber] = bind((x: number) => of(x)) + const [useNumber, getNumber$] = bind((x: number) => of(x)) + const subs = merge( + getNumber$(0), + getNumber$(1), + getNumber$(2), + ).subscribe() const { result, rerender } = renderHook(({ input }) => useNumber(input), { initialProps: { input: 0 }, }) @@ -129,6 +180,7 @@ describe("connectFactoryObservable", () => { rerender({ input: 2 }) }) expect(result.current).toBe(2) + subs.unsubscribe() }) it("handles optional args correctly", () => { @@ -149,9 +201,12 @@ describe("connectFactoryObservable", () => { const [input, setInput] = useState(0) return ( <> - Waiting}> + Waiting} + > - + ) @@ -223,8 +278,8 @@ describe("connectFactoryObservable", () => { }) it("allows errors to be caught in error boundaries", () => { - const errStream = new BehaviorSubject(1) - const [useError] = bind(() => errStream) + const errStream = new Subject() + const [useError] = bind(() => errStream, 1) const ErrorComponent = () => { const value = useError() @@ -253,7 +308,7 @@ describe("connectFactoryObservable", () => { const errStream = new Observable((observer) => observer.error("controlled error"), ) - const [useError] = bind((_: string) => errStream) + const [useError, getErrStream$] = bind((_: string) => errStream) const ErrorComponent = () => { const value = useError("foo") @@ -264,9 +319,12 @@ describe("connectFactoryObservable", () => { const errorCallback = jest.fn() const { unmount } = render( - Loading...}> + Loading...} + > - + , ) @@ -279,7 +337,7 @@ describe("connectFactoryObservable", () => { it("allows async errors to be caught in error boundaries with suspense", async () => { const errStream = new Subject() - const [useError] = bind((_: string) => errStream) + const [useError, getErrStream$] = bind((_: string) => errStream) const ErrorComponent = () => { const value = useError("foo") @@ -290,9 +348,12 @@ describe("connectFactoryObservable", () => { const errorCallback = jest.fn() const { unmount } = render( - Loading...}> + Loading...} + > - + , ) @@ -325,19 +386,24 @@ describe("connectFactoryObservable", () => { .pipe(catchError(() => [])) .subscribe() + const Ok: React.FC<{ ok: boolean }> = ({ ok }) => <>{useOkKo(ok)} + const ErrorComponent = () => { const [ok, setOk] = useState(true) - const value = useOkKo(ok) - return setOk(false)}>{value} + return ( + Loading...}> + setOk(false)}> + + + + ) } const errorCallback = jest.fn() const { unmount } = render( - Loading...}> - - + , ) @@ -367,12 +433,11 @@ describe("connectFactoryObservable", () => { ) it("doesn't throw errors on components that will get unmounted on the next cycle", () => { - const valueStream = new BehaviorSubject(1) - const [useValue, value$] = bind(() => valueStream) - const [useError] = bind(() => - value$().pipe( - switchMap((v) => (v === 1 ? of(v) : throwError("error"))), - ), + const valueStream = new Subject() + const [useValue, value$] = bind(() => valueStream, 1) + const [useError] = bind( + () => value$().pipe(switchMapTo(throwError("error"))), + 1, ) const ErrorComponent: FC = () => { @@ -403,30 +468,6 @@ describe("connectFactoryObservable", () => { expect(errorCallback).not.toHaveBeenCalled() }) - it("does not resubscribe to an observable that emits synchronously and that does not have a top-level subscription after a re-render", () => { - let nTopSubscriptions = 0 - - const [useNTopSubscriptions] = bind((id: number) => - defer(() => { - return of(++nTopSubscriptions + id) - }), - ) - - const { result, rerender, unmount } = renderHook(() => - useNTopSubscriptions(0), - ) - - expect(result.current).toBe(2) - - actHook(() => { - rerender() - }) - expect(result.current).toBe(2) - expect(nTopSubscriptions).toBe(2) - - unmount() - }) - it("if the observable hasn't emitted and a defaultValue is provided, it does not start suspense", () => { const number$ = new Subject() const [useNumber] = bind( diff --git a/packages/core/src/bind/connectFactoryObservable.ts b/packages/core/src/bind/connectFactoryObservable.ts index ccf60085..81e7b563 100644 --- a/packages/core/src/bind/connectFactoryObservable.ts +++ b/packages/core/src/bind/connectFactoryObservable.ts @@ -1,9 +1,7 @@ import { Observable } from "rxjs" import shareLatest from "../internal/share-latest" -import reactEnhancer from "../internal/react-enhancer" import { BehaviorObservable } from "../internal/BehaviorObservable" import { useObservable } from "../internal/useObservable" -import { EMPTY_VALUE } from "../internal/empty-value" import { SUSPENSE } from "../SUSPENSE" /** @@ -27,16 +25,14 @@ import { SUSPENSE } from "../SUSPENSE" */ export default function connectFactoryObservable( getObservable: (...args: A) => Observable, - defaultValue: O = EMPTY_VALUE, + defaultValue: O, ): [ (...args: A) => Exclude, (...args: A) => Observable, ] { - const cache = new NestedMap, () => O]>() + const cache = new NestedMap>() - const getSharedObservables$ = ( - input: A, - ): [BehaviorObservable, () => O] => { + const getSharedObservables$ = (input: A): BehaviorObservable => { for (let i = input.length - 1; input[i] === undefined && i > -1; i--) { input.splice(-1) } @@ -50,6 +46,7 @@ export default function connectFactoryObservable( const sharedObservable$ = shareLatest( getObservable(...input), false, + defaultValue, () => { cache.delete(keys) }, @@ -61,31 +58,25 @@ export default function connectFactoryObservable( if (!inCache) { cache.set(keys, result) - } else if (inCache[0] !== publicShared$) { - source$ = inCache[0] - publicShared$.getValue = source$.getValue + } else if (inCache !== publicShared$) { + source$ = inCache + publicShared$.gV = source$.gV } return source$.subscribe(subscriber) }) as BehaviorObservable - publicShared$.getValue = sharedObservable$.getValue - const reactGetValue = reactEnhancer(publicShared$, defaultValue) + publicShared$.gV = sharedObservable$.gV - const result: [BehaviorObservable, () => O] = [ - publicShared$, - reactGetValue, - ] + const result: BehaviorObservable = publicShared$ cache.set(keys, result) return result } return [ - (...input: A) => { - const [source$, getValue] = getSharedObservables$(input) - return useObservable(source$, getValue, input) - }, - (...input: A) => getSharedObservables$(input)[0], + (...input: A) => + useObservable(getSharedObservables$(input), input, defaultValue), + (...input: A) => getSharedObservables$(input), ] } diff --git a/packages/core/src/bind/connectObservable.test.tsx b/packages/core/src/bind/connectObservable.test.tsx index e5d29f11..987a8d9c 100644 --- a/packages/core/src/bind/connectObservable.test.tsx +++ b/packages/core/src/bind/connectObservable.test.tsx @@ -7,7 +7,6 @@ import { import { act, renderHook } from "@testing-library/react-hooks" import React, { Suspense, useEffect, FC, StrictMode } from "react" import { - BehaviorSubject, defer, from, of, @@ -15,17 +14,17 @@ import { throwError, Observable, merge, - NEVER, + EMPTY, } from "rxjs" import { delay, scan, startWith, map, - switchMap, catchError, + switchMapTo, } from "rxjs/operators" -import { bind, SUSPENSE } from "../" +import { bind, SUSPENSE, Subscribe } from "../" import { TestErrorBoundary } from "../test-helpers/TestErrorBoundary" const wait = (ms: number) => new Promise((res) => setTimeout(res, ms)) @@ -37,6 +36,8 @@ describe("connectObservable", () => { if ( /Warning.*not wrapped in act/.test(args[0]) || /Uncaught 'controlled error'/.test(args[0]) || + /Missing subscription/.test(args[0]) || + /Empty observable/.test(args[0]) || /using the error boundary .* TestErrorBoundary/.test(args[0]) ) { return @@ -51,10 +52,12 @@ describe("connectObservable", () => { it("sets the initial state synchronously if it's available", async () => { const observable$ = of(1) - const [useLatestNumber] = bind(observable$) + const [useLatestNumber, latestNumber$] = bind(observable$) + const subs = latestNumber$.subscribe() const { result } = renderHook(() => useLatestNumber()) expect(result.current).toEqual(1) + subs.unsubscribe() }) it("suspends the component when the observable hasn't emitted yet.", async () => { @@ -108,8 +111,8 @@ describe("connectObservable", () => { }) it("updates with the last emitted value", async () => { - const numberStream = new BehaviorSubject(1) - const [useNumber] = bind(numberStream) + const numberStream = new Subject() + const [useNumber] = bind(numberStream, 1) const { result } = renderHook(() => useNumber()) expect(result.current).toBe(1) @@ -121,7 +124,7 @@ describe("connectObservable", () => { it("updates more than one component", async () => { const value = new Subject() - const [useValue] = bind(value.pipe(startWith(0))) + const [useValue] = bind(value, 0) const { result: result1, unmount: unmount1 } = renderHook(() => useValue()) const { result: result2, unmount: unmount2 } = renderHook(() => useValue()) const { result: result3, unmount: unmount3 } = renderHook(() => useValue()) @@ -162,10 +165,10 @@ describe("connectObservable", () => { }) it("allows React to batch synchronous updates", async () => { - const numberStream = new BehaviorSubject(1) - const stringStream = new BehaviorSubject("a") - const [useNumber] = bind(numberStream) - const [useString] = bind(stringStream) + const numberStream = new Subject() + const stringStream = new Subject() + const [useNumber] = bind(numberStream, 1) + const [useString] = bind(stringStream, "a") const BatchComponent: FC<{ onUpdate: () => void @@ -231,15 +234,15 @@ describe("connectObservable", () => { }), startWith(0), ) - const [useDelayedNumber] = bind(source$) + const [useDelayedNumber, delayedNumber$] = bind(source$) const Result: React.FC = () =>
Result {useDelayedNumber()}
const TestSuspense: React.FC = () => { return (
- Waiting}> + Waiting}> - +
) } @@ -271,15 +274,15 @@ describe("connectObservable", () => { }), startWith(0), ) - const [useDelayedNumber] = bind(source$) + const [useDelayedNumber, delayedNumber$] = bind(source$) const Result: React.FC = () =>
Result {useDelayedNumber()}
const TestSuspense: React.FC = () => { return (
- Waiting}> + Waiting}> - +
) } @@ -304,8 +307,8 @@ describe("connectObservable", () => { }) it("allows errors to be caught in error boundaries", () => { - const errStream = new BehaviorSubject(1) - const [useError] = bind(errStream) + const errStream = new Subject() + const [useError] = bind(errStream, 1) const ErrorComponent = () => { const value = useError() @@ -333,7 +336,7 @@ describe("connectObservable", () => { const errStream = new Observable((observer) => observer.error("controlled error"), ) - const [useError] = bind(errStream) + const [useError, errStream$] = bind(errStream) const ErrorComponent = () => { const value = useError() @@ -343,9 +346,9 @@ describe("connectObservable", () => { const errorCallback = jest.fn() const { unmount } = render( - Loading...}> + Loading...}> - + , ) @@ -358,7 +361,8 @@ describe("connectObservable", () => { it("allows async errors to be caught in error boundaries with suspense", async () => { const errStream = new Subject() - const [useError] = bind(errStream) + const [useError, errStream$] = bind(errStream) + const errStream$WithoutErrors = errStream$.pipe(catchError(() => EMPTY)) const ErrorComponent = () => { const value = useError() @@ -368,9 +372,12 @@ describe("connectObservable", () => { const errorCallback = jest.fn() const { unmount } = render( - Loading...}> + Loading...} + > - + , ) @@ -406,12 +413,11 @@ describe("connectObservable", () => { } const errorCallback = jest.fn() - error$.pipe(catchError((_, caught) => caught)).subscribe() const { unmount } = render( - Loading...}> + Loading...}> - + , ) @@ -435,13 +441,12 @@ describe("connectObservable", () => { await componentAct(async () => { await wait(200) }) - error$.subscribe() render( - Loading...}> + Loading...}> - + , ) expect(screen.queryByText("Loading...")).not.toBeNull() @@ -459,11 +464,9 @@ describe("connectObservable", () => { }) it("doesn't throw errors on components that will get unmounted on the next cycle", () => { - const valueStream = new BehaviorSubject(1) - const [useValue, value$] = bind(valueStream) - const [useError] = bind( - value$.pipe(switchMap((v) => (v === 1 ? of(v) : throwError("error")))), - ) + const valueStream = new Subject() + const [useValue, value$] = bind(valueStream, 1) + const [useError] = bind(value$.pipe(switchMapTo(throwError("error"))), 1) const ErrorComponent: FC = () => { const value = useError() @@ -493,20 +496,52 @@ describe("connectObservable", () => { expect(errorCallback).not.toHaveBeenCalled() }) - it("should not trigger suspense when the stream emits synchronously", () => { - const [useValue] = bind(NEVER.pipe(startWith("Hello"))) + it("should throw an error when the stream does not have a subscription", () => { + const [useValue] = bind(of("Hello")) + const errorCallback = jest.fn() const Component: FC = () => <>{useValue()} render( - Loading...}> - - + + Loading...}> + + + + , , ) + expect(errorCallback).toHaveBeenCalled() + }) + + it("should throw an error if the stream completes without emitting while on SUSPENSE", async () => { + const subject = new Subject() + const [useValue, value$] = bind(subject) + const errorCallback = jest.fn() + + const Component: FC = () => <>{useValue()} + render( + + + Loading...}> + + + + , + , + ) + + expect(errorCallback).not.toHaveBeenCalled() + expect(screen.queryByText("Loading...")).not.toBeNull() + + await componentAct(async () => { + subject.complete() + await wait(100) + }) + expect(screen.queryByText("Loading...")).toBeNull() - expect(screen.queryByText("Hello")).not.toBeNull() + expect(errorCallback).toHaveBeenCalled() }) it("if the observable hasn't emitted and a defaultValue is provided, it does not start suspense", () => { diff --git a/packages/core/src/bind/connectObservable.ts b/packages/core/src/bind/connectObservable.ts index 56beab63..b0d9bfc5 100644 --- a/packages/core/src/bind/connectObservable.ts +++ b/packages/core/src/bind/connectObservable.ts @@ -1,8 +1,6 @@ import { Observable } from "rxjs" import shareLatest from "../internal/share-latest" -import reactEnhancer from "../internal/react-enhancer" import { useObservable } from "../internal/useObservable" -import { EMPTY_VALUE } from "../internal/empty-value" /** * Accepts: An Observable. @@ -22,11 +20,10 @@ import { EMPTY_VALUE } from "../internal/empty-value" const emptyArr: Array = [] export default function connectObservable( observable: Observable, - defaultValue: T = EMPTY_VALUE, + defaultValue: T, ) { - const sharedObservable$ = shareLatest(observable, false) - const getValue = reactEnhancer(sharedObservable$, defaultValue) + const sharedObservable$ = shareLatest(observable, false, defaultValue) const useStaticObservable = () => - useObservable(sharedObservable$, getValue, emptyArr) + useObservable(sharedObservable$, emptyArr, defaultValue) return [useStaticObservable, sharedObservable$] as const } diff --git a/packages/core/src/bind/index.ts b/packages/core/src/bind/index.ts index 8fa13a04..c7635bd6 100644 --- a/packages/core/src/bind/index.ts +++ b/packages/core/src/bind/index.ts @@ -2,6 +2,7 @@ import { Observable } from "rxjs" import { SUSPENSE } from "../SUSPENSE" import connectFactoryObservable from "./connectFactoryObservable" import connectObservable from "./connectObservable" +import { EMPTY_VALUE } from "../internal/empty-value" /** * Binds an observable to React @@ -45,8 +46,8 @@ export function bind
( defaultValue?: O, ): [(...args: A) => Exclude, (...args: A) => Observable] -export function bind(...args: any[]) { - return (typeof args[0] === "function" +export function bind(observable: any, defaultValue: any = EMPTY_VALUE) { + return (typeof observable === "function" ? (connectFactoryObservable as any) - : connectObservable)(...args) + : connectObservable)(observable, defaultValue) } diff --git a/packages/core/src/internal/BehaviorObservable.ts b/packages/core/src/internal/BehaviorObservable.ts index 7095c18e..8621f257 100644 --- a/packages/core/src/internal/BehaviorObservable.ts +++ b/packages/core/src/internal/BehaviorObservable.ts @@ -1,6 +1,5 @@ import { Observable } from "rxjs" -import { SUSPENSE } from "../SUSPENSE" export interface BehaviorObservable extends Observable { - getValue: () => T | typeof SUSPENSE + gV: () => T } diff --git a/packages/core/src/internal/react-enhancer.ts b/packages/core/src/internal/react-enhancer.ts deleted file mode 100644 index 9ac5ec46..00000000 --- a/packages/core/src/internal/react-enhancer.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { SUSPENSE } from "../SUSPENSE" -import { BehaviorObservable } from "./BehaviorObservable" -import { EMPTY_VALUE } from "./empty-value" - -const reactEnhancer = ( - source$: BehaviorObservable, - defaultValue: T, -): (() => T) => { - let promise: Promise | null - let error: any = EMPTY_VALUE - - const res = (): T => { - const currentValue = source$.getValue() - if (currentValue !== SUSPENSE && currentValue !== EMPTY_VALUE) { - return currentValue - } - if (defaultValue !== EMPTY_VALUE) return defaultValue - - let timeoutToken - if (error !== EMPTY_VALUE) { - clearTimeout(timeoutToken) - timeoutToken = setTimeout(() => { - error = EMPTY_VALUE - }, 50) - throw error - } - - if (promise) throw promise - - let value: typeof EMPTY_VALUE | T = EMPTY_VALUE - - promise = new Promise((res) => { - const subscription = source$.subscribe( - (v) => { - if (v !== (SUSPENSE as any)) { - value = v - subscription && subscription.unsubscribe() - res(v) - } - }, - (e) => { - error = e - timeoutToken = setTimeout(() => { - error = EMPTY_VALUE - }, 50) - res() - }, - ) - if (value !== EMPTY_VALUE) { - subscription.unsubscribe() - } - }).finally(() => { - promise = null - }) - - if (value !== EMPTY_VALUE) { - promise = null - return value - } - - throw error !== EMPTY_VALUE ? error : promise - } - res.d = defaultValue - return res -} - -export default reactEnhancer diff --git a/packages/core/src/internal/share-latest.ts b/packages/core/src/internal/share-latest.ts index 361227bd..9d5209c5 100644 --- a/packages/core/src/internal/share-latest.ts +++ b/packages/core/src/internal/share-latest.ts @@ -1,18 +1,23 @@ import { Observable, Subscription, Subject, noop } from "rxjs" import { BehaviorObservable } from "./BehaviorObservable" import { EMPTY_VALUE } from "./empty-value" +import { SUSPENSE } from "../SUSPENSE" const shareLatest = ( source$: Observable, shouldComplete = true, + defaultValue: T = EMPTY_VALUE, teardown = noop, ): BehaviorObservable => { let subject: Subject | null let subscription: Subscription | null let refCount = 0 let currentValue: T = EMPTY_VALUE + let promise: Promise | null const result = new Observable((subscriber) => { + if (!shouldComplete) subscriber.complete = noop + refCount++ let innerSub: Subscription if (!subject) { @@ -31,7 +36,7 @@ const shareLatest = ( }, () => { subscription = null - shouldComplete && subject!.complete() + subject!.complete() }, ) if (subscription.closed) subscription = null @@ -53,11 +58,57 @@ const shareLatest = ( teardown() subject = null subscription = null + promise = null } } }) as BehaviorObservable - result.getValue = () => currentValue + let error: any = EMPTY_VALUE + let timeoutToken: any + result.gV = (): T => { + if ((currentValue as any) !== SUSPENSE && currentValue !== EMPTY_VALUE) { + return currentValue + } + if (defaultValue !== EMPTY_VALUE) return defaultValue + + if (error !== EMPTY_VALUE) { + clearTimeout(timeoutToken) + timeoutToken = setTimeout(() => { + error = EMPTY_VALUE + }, 50) + throw error + } + + if (!subscription) { + throw new Error("Missing subscription") + } + if (promise) throw promise + + throw (promise = new Promise((res) => { + const setError = (e: any) => { + error = e + timeoutToken = setTimeout(() => { + error = EMPTY_VALUE + }, 50) + res() + promise = null + } + const pSubs = subject!.subscribe( + (v) => { + if (v !== (SUSPENSE as any)) { + pSubs.unsubscribe() + res(v) + promise = null + } + }, + setError, + () => { + setError(new Error("Empty observable")) + }, + ) + subscription!.add(pSubs) + })) + } return result } diff --git a/packages/core/src/internal/useObservable.ts b/packages/core/src/internal/useObservable.ts index 094c34b0..330cfd41 100644 --- a/packages/core/src/internal/useObservable.ts +++ b/packages/core/src/internal/useObservable.ts @@ -1,17 +1,18 @@ import { useEffect, useState, useRef } from "react" import { SUSPENSE } from "../SUSPENSE" import { EMPTY_VALUE } from "./empty-value" -import { Observable } from "rxjs" +import { BehaviorObservable } from "../internal/BehaviorObservable" export const useObservable = ( - source$: Observable, - getValue: () => O, + source$: BehaviorObservable, keys: Array, + defaultValue: O, ): Exclude => { - const [state, setState] = useState(getValue) + const [state, setState] = useState(source$.gV) const prevStateRef = useRef O)>(state) useEffect(() => { + const { gV } = source$ let err: any = EMPTY_VALUE let syncVal: O | typeof SUSPENSE = EMPTY_VALUE @@ -32,14 +33,13 @@ export const useObservable = ( setState((prevStateRef.current = value)) } - const defaultValue = (getValue as any).d if (syncVal === EMPTY_VALUE) { - set(defaultValue === EMPTY_VALUE ? getValue : defaultValue) + set(defaultValue === EMPTY_VALUE ? gV : defaultValue) } const t = subscription subscription = source$.subscribe((value: O | typeof SUSPENSE) => { - set(value === SUSPENSE ? getValue : value) + set(value === SUSPENSE ? gV : value) }, onError) t.unsubscribe() diff --git a/packages/dom/package.json b/packages/dom/package.json index 06f359b6..2d0043d4 100644 --- a/packages/dom/package.json +++ b/packages/dom/package.json @@ -36,7 +36,7 @@ "Victor Oliva (https://github.com/voliva)" ], "devDependencies": { - "@react-rxjs/core": "0.5.0", + "@react-rxjs/core": "0.6.0", "@testing-library/react": "^11.1.0", "@testing-library/react-hooks": "^3.4.2", "@types/jest": "^26.0.15", diff --git a/packages/utils/package.json b/packages/utils/package.json index 46b8f688..1a21f61c 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -36,7 +36,7 @@ "Victor Oliva (https://github.com/voliva)" ], "devDependencies": { - "@react-rxjs/core": "0.5.0", + "@react-rxjs/core": "0.6.0", "@testing-library/react": "^11.1.0", "@testing-library/react-hooks": "^3.4.2", "@types/jest": "^26.0.15",