Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(signals): use Injector of rxMethod instance caller if available #4529

Merged
Merged
224 changes: 224 additions & 0 deletions modules/signals/rxjs-interop/spec/rx-method.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import {
Component,
createEnvironmentInjector,
EnvironmentInjector,
inject,
Injectable,
Injector,
OnInit,
signal,
} from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { provideLocationMocks } from '@angular/common/testing';
import { provideRouter } from '@angular/router';
import { RouterTestingHarness } from '@angular/router/testing';
import { BehaviorSubject, pipe, Subject, tap } from 'rxjs';
import { rxMethod } from '../src';
import { createLocalService } from '../../spec/helpers';
Expand Down Expand Up @@ -231,4 +238,221 @@ describe('rxMethod', () => {
TestBed.flushEffects();
expect(counter()).toBe(4);
});

/**
* This test suite verifies that a signal or observable passed to a reactive
* method that is initialized at the ancestor injector level is tracked within
* the correct injection context and untracked at the specified time.
*
* Components use `globalSignal` or `globalObservable` from `GlobalService`
* and pass it to the reactive method. If the component is destroyed but
* signal or observable change still increases the corresponding counter,
* the internal effect or subscription is still active.
*/
describe('with instance injector', () => {
@Injectable({ providedIn: 'root' })
class GlobalService {
readonly globalSignal = signal(1);
readonly globalObservable = new BehaviorSubject(1);

globalSignalChangeCounter = 0;
globalObservableChangeCounter = 0;

readonly signalMethod = rxMethod<number>(
tap(() => this.globalSignalChangeCounter++)
);
readonly observableMethod = rxMethod<number>(
tap(() => this.globalObservableChangeCounter++)
);

incrementSignal(): void {
this.globalSignal.update((value) => value + 1);
}

incrementObservable(): void {
this.globalObservable.next(this.globalObservable.value + 1);
}
}

@Component({
selector: 'app-without-store',
template: '',
standalone: true,
})
class WithoutStoreComponent {}

function setup(WithStoreComponent: new () => unknown): GlobalService {
TestBed.configureTestingModule({
providers: [
provideRouter([
{ path: 'with-store', component: WithStoreComponent },
{
path: 'without-store',
component: WithoutStoreComponent,
},
]),
provideLocationMocks(),
],
});

return TestBed.inject(GlobalService);
}

it('tracks a signal until the component is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent {
store = inject(GlobalService);

constructor() {
this.store.signalMethod(this.store.globalSignal);
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

expect(globalService.globalSignalChangeCounter).toBe(1);

globalService.incrementSignal();
TestBed.flushEffects();
expect(globalService.globalSignalChangeCounter).toBe(2);

globalService.incrementSignal();
TestBed.flushEffects();
expect(globalService.globalSignalChangeCounter).toBe(3);

await harness.navigateByUrl('/without-store');
globalService.incrementSignal();
TestBed.flushEffects();

expect(globalService.globalSignalChangeCounter).toBe(3);
});

it('tracks an observable until the component is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent {
store = inject(GlobalService);

constructor() {
this.store.observableMethod(this.store.globalObservable);
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

expect(globalService.globalObservableChangeCounter).toBe(1);

globalService.incrementObservable();
expect(globalService.globalObservableChangeCounter).toBe(2);

globalService.incrementObservable();
expect(globalService.globalObservableChangeCounter).toBe(3);

await harness.navigateByUrl('/without-store');
globalService.incrementObservable();

expect(globalService.globalObservableChangeCounter).toBe(3);
});

it('tracks a signal until the provided injector is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent implements OnInit {
store = inject(GlobalService);
injector = inject(Injector);

ngOnInit() {
this.store.signalMethod(this.store.globalSignal, {
injector: this.injector,
});
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

globalService.incrementSignal();
TestBed.flushEffects();

expect(globalService.globalSignalChangeCounter).toBe(2);

await harness.navigateByUrl('/without-store');
globalService.incrementSignal();
TestBed.flushEffects();

expect(globalService.globalSignalChangeCounter).toBe(2);
});

it('tracks an observable until the provided injector is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent implements OnInit {
store = inject(GlobalService);
injector = inject(Injector);

ngOnInit() {
this.store.observableMethod(this.store.globalObservable, {
injector: this.injector,
});
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

globalService.incrementObservable();

expect(globalService.globalObservableChangeCounter).toBe(2);

await harness.navigateByUrl('/without-store');
globalService.incrementObservable();

expect(globalService.globalObservableChangeCounter).toBe(2);
});

it('falls back to source injector when reactive method is called outside of the injection context', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent implements OnInit {
store = inject(GlobalService);

ngOnInit() {
this.store.signalMethod(this.store.globalSignal);
this.store.observableMethod(this.store.globalObservable);
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

expect(globalService.globalSignalChangeCounter).toBe(1);
expect(globalService.globalObservableChangeCounter).toBe(1);

await harness.navigateByUrl('/without-store');
globalService.incrementSignal();
TestBed.flushEffects();
globalService.incrementObservable();

expect(globalService.globalSignalChangeCounter).toBe(2);
expect(globalService.globalObservableChangeCounter).toBe(2);
});
});
});
49 changes: 36 additions & 13 deletions modules/signals/rxjs-interop/src/rx-method.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import {
import { isObservable, noop, Observable, Subject, Unsubscribable } from 'rxjs';

type RxMethod<Input> = ((
input: Input | Signal<Input> | Observable<Input>
input: Input | Signal<Input> | Observable<Input>,
config?: { injector?: Injector }
) => Unsubscribable) &
Unsubscribable;

Expand All @@ -23,39 +24,61 @@ export function rxMethod<Input>(
assertInInjectionContext(rxMethod);
}

const injector = config?.injector ?? inject(Injector);
const destroyRef = injector.get(DestroyRef);
const sourceInjector = config?.injector ?? inject(Injector);
const source$ = new Subject<Input>();

const sourceSub = generator(source$).subscribe();
destroyRef.onDestroy(() => sourceSub.unsubscribe());
sourceInjector.get(DestroyRef).onDestroy(() => sourceSub.unsubscribe());

const rxMethodFn = (
input: Input | Signal<Input> | Observable<Input>,
config?: { injector?: Injector }
) => {
if (isStatic(input)) {
source$.next(input);
return { unsubscribe: noop };
}

const instanceInjector =
config?.injector ?? getCallerInjector() ?? sourceInjector;

const rxMethodFn = (input: Input | Signal<Input> | Observable<Input>) => {
if (isSignal(input)) {
const watcher = effect(
() => {
const value = input();
untracked(() => source$.next(value));
},
{ injector }
{ injector: instanceInjector }
);
const instanceSub = { unsubscribe: () => watcher.destroy() };
sourceSub.add(instanceSub);

return instanceSub;
}

if (isObservable(input)) {
const instanceSub = input.subscribe((value) => source$.next(value));
sourceSub.add(instanceSub);
const instanceSub = input.subscribe((value) => source$.next(value));
sourceSub.add(instanceSub);

return instanceSub;
if (instanceInjector !== sourceInjector) {
instanceInjector
.get(DestroyRef)
.onDestroy(() => instanceSub.unsubscribe());
}

source$.next(input);
return { unsubscribe: noop };
return instanceSub;
};
rxMethodFn.unsubscribe = sourceSub.unsubscribe.bind(sourceSub);

return rxMethodFn;
}

function isStatic<T>(value: T | Signal<T> | Observable<T>): value is T {
return !isSignal(value) && !isObservable(value);
}

function getCallerInjector(): Injector | null {
try {
return inject(Injector);
} catch {
return null;
}
}