diff --git a/packages/utils/src/selfDependent.test.ts b/packages/utils/src/selfDependent.test.ts index 410a05df..bfc69e5e 100644 --- a/packages/utils/src/selfDependent.test.ts +++ b/packages/utils/src/selfDependent.test.ts @@ -6,6 +6,7 @@ import { takeWhile, switchMapTo, delay, + startWith, } from "rxjs/operators" import { TestScheduler } from "rxjs/testing" import { selfDependent } from "." @@ -53,4 +54,64 @@ describe("selfDependent", () => { expectSubscriptions((source as any).subscriptions).toBe(sourceSub) }) }) + + it("works after unsubscription and re-subscription", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("abcde") + const sourceSub1 = " ^--!" + const expected1 = " abc" + const sourceSub2 = " -----^---!" + const expected2 = " -----abcd" + + const [lastValue$, connect] = selfDependent() + const result$ = source.pipe( + withLatestFrom(lastValue$.pipe(startWith(""))), + map(([v]) => v), + connect(), + ) + + expectObservable(result$, sourceSub1).toBe(expected1) + expectObservable(result$, sourceSub2).toBe(expected2) + }) + }) + + it("works after complete and re-subscription", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("abc|") + const sourceSub1 = " ^---!" + const expected1 = " abc|" + const sourceSub2 = " -----^---!" + const expected2 = " -----abc|" + + const [lastValue$, connect] = selfDependent() + const result$ = source.pipe( + withLatestFrom(lastValue$.pipe(startWith(""))), + map(([v]) => v), + connect(), + ) + + expectObservable(result$, sourceSub1).toBe(expected1) + expectObservable(result$, sourceSub2).toBe(expected2) + }) + }) + + it("works after error and re-subscription", () => { + scheduler().run(({ expectObservable, cold }) => { + const source = cold("abc#") + const sourceSub1 = " ^---!" + const expected1 = " abc#" + const sourceSub2 = " -----^---!" + const expected2 = " -----abc#" + + const [lastValue$, connect] = selfDependent() + const result$ = source.pipe( + withLatestFrom(lastValue$.pipe(startWith(""))), + map(([v]) => v), + connect(), + ) + + expectObservable(result$, sourceSub1).toBe(expected1) + expectObservable(result$, sourceSub2).toBe(expected2) + }) + }) }) diff --git a/packages/utils/src/selfDependent.ts b/packages/utils/src/selfDependent.ts index 67287675..bb35aea2 100644 --- a/packages/utils/src/selfDependent.ts +++ b/packages/utils/src/selfDependent.ts @@ -1,5 +1,10 @@ -import { Observable, Subject, MonoTypeOperatorFunction } from "rxjs" -import { tap } from "rxjs/operators" +import { + Observable, + Subject, + MonoTypeOperatorFunction, + BehaviorSubject, +} from "rxjs" +import { switchAll, tap } from "rxjs/operators" /** * A creation operator that helps at creating observables that have circular @@ -13,10 +18,23 @@ export const selfDependent = (): [ Observable, () => MonoTypeOperatorFunction, ] => { - const mirrored$ = new Subject() + const activeSubject: BehaviorSubject> = new BehaviorSubject( + new Subject(), + ) return [ - mirrored$.asObservable(), - () => tap(mirrored$) as MonoTypeOperatorFunction, + activeSubject.pipe(switchAll()), + () => + tap({ + next: (v) => activeSubject.value.next(v), + error: (e) => { + activeSubject.value.error(e) + activeSubject.next(new Subject()) + }, + complete: () => { + activeSubject.value.complete() + activeSubject.next(new Subject()) + }, + }) as MonoTypeOperatorFunction, ] }