Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for system as actor #4677

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fcd1bca
feat(core): add support for subscribing to system registration events
cevr Jan 13, 2024
ce1cdc7
refactor: put systemId in event, use single set for all subscriptions…
cevr Jan 14, 2024
ace2404
make event type consistent with others
cevr Jan 14, 2024
5c94a82
make implementation consistent with createActor
cevr Jan 15, 2024
d9bd505
update changeset
cevr Jan 15, 2024
69fe023
reuse subscribable type
cevr Jan 15, 2024
afb5807
make system subscribable, and use snapshot
cevr Jan 25, 2024
4315d52
initialize as empty actors first
cevr Jan 25, 2024
5894a57
Merge remote-tracking branch 'upstream/main' into cevr/system-subscribe
cevr Jan 25, 2024
2a68b5b
update changeset
cevr Jan 25, 2024
8d289c3
copy snapshot
cevr Jan 25, 2024
0cbb0c0
ensure new references
cevr Jan 25, 2024
3326fb3
allow useSelector to subscribe to actor system
cevr Jan 25, 2024
dd87510
remove log
cevr Jan 25, 2024
81cf130
update system snapshot on _set, ensure subscribers are only called on…
cevr Jan 25, 2024
61f0b77
add changeset for @xstate/react
cevr Jan 25, 2024
6840952
remove unused imports
cevr Jan 26, 2024
ae2bfee
ensure snapshot is updated for scheduled events as well
cevr Jan 26, 2024
2dff805
collapse test
cevr Jan 26, 2024
6f1dd44
add nested child state to useSelector test
cevr Jan 26, 2024
2f416a1
Merge remote-tracking branch 'upstream/main' into cevr/system-subscribe
cevr Feb 20, 2024
2f0fbd9
Merge remote-tracking branch 'upstream/main' into cevr/system-subscribe
cevr Mar 5, 2024
3fbe432
revert unintended changes
cevr Mar 5, 2024
2c12777
revert unintended changes
cevr Mar 5, 2024
98ac5ee
Merge remote-tracking branch 'upstream/main' into cevr/system-subscribe
cevr Apr 7, 2024
e7d105b
Merge remote-tracking branch 'upstream/main' into cevr/system-subscribe
cevr Apr 23, 2024
299d1da
revert unintended changes
cevr Apr 23, 2024
d98a89f
Merge remote-tracking branch 'upstream/main' into cevr/system-subscribe
cevr May 24, 2024
60d3db7
revert change
cevr May 24, 2024
8eec984
Merge remote-tracking branch 'upstream/main' into cevr/system-subscribe
cevr Jul 29, 2024
fe762fc
Merge remote-tracking branch 'upstream/main' into cevr/system-subscribe
cevr Aug 16, 2024
841d26c
Merge branch 'main' into cevr/system-subscribe
cevr Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .changeset/tasty-baboons-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
'xstate': minor
---

added support for `actor.system.subscribe` to subscribe to registration and unregistration events within an actor's system.
`actor.system.subscribe` returns a `Subscription` object you can `.unsubscribe` to at any time.

ex:

```js
// observer object
const subscription = actor.system.subscribe({
next: (event) => console.log(event),
error: (err) => console.error(err),
complete: () => console.log('done')
});

// observer parameters
const subscription = actor.system.subscribe(
(event) => console.log(event),
(err) => console.error(err),
() => console.log('done')
);

// callback function
const subscription = actor.system.subscribe((event) => console.log(event));

// unsubscribe
subscription.unsubscribe();
```
5 changes: 4 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ export type {
InspectedActorEvent,
InspectedEventEvent,
InspectedSnapshotEvent,
InspectionEvent
InspectionEvent,
RegisteredActorEvent,
UnregisteredActorEvent,
RegistrationEvent
} from './system.ts';
export { toPromise } from './toPromise.ts';
export {
Expand Down
131 changes: 128 additions & 3 deletions packages/core/src/system.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { ProcessingStatus } from './createActor.ts';
import { reportUnhandledError } from './reportUnhandledError.ts';
import {
AnyEventObject,
ActorSystemInfo,
AnyActorRef,
Observer,
Snapshot,
Subscription,
HomomorphicOmit,
EventObject
} from './types.ts';
import { toObserver } from './utils.ts';

export interface ScheduledEvent {
id: string;
Expand Down Expand Up @@ -61,6 +65,43 @@ export interface ActorSystem<T extends ActorSystemInfo> {
*/
_set: <K extends keyof T['actors']>(key: K, actorRef: T['actors'][K]) => void;
get: <K extends keyof T['actors']>(key: K) => T['actors'][K] | undefined;
subscribe: {
cevr marked this conversation as resolved.
Show resolved Hide resolved
(
observer: Observer<
RegistrationEvent<
T['actors'][keyof T['actors']],
keyof T['actors'] & string
>
>
): Subscription;
(
nextListener?: (
event: RegistrationEvent<
T['actors'][keyof T['actors']],
keyof T['actors'] & string
>
) => void,
errorListener?: (error: any) => void,
completeListener?: () => void
): Subscription;
(
nextListenerOrObserver?:
| ((
event: RegistrationEvent<
T['actors'][keyof T['actors']],
keyof T['actors'] & string
>
) => void)
| Observer<
RegistrationEvent<
T['actors'][keyof T['actors']],
keyof T['actors'] & string
>
>,
errorListener?: (error: any) => void,
completeListener?: () => void
): Subscription;
};
inspect: (observer: Observer<InspectionEvent>) => void;
/**
* @internal
Expand Down Expand Up @@ -102,7 +143,10 @@ export function createSystem<T extends ActorSystemInfo>(
const children = new Map<string, AnyActorRef>();
const keyedActors = new Map<keyof T['actors'], AnyActorRef | undefined>();
const reverseKeyedActors = new WeakMap<AnyActorRef, keyof T['actors']>();
const observers = new Set<Observer<InspectionEvent>>();
const inspectionObservers = new Set<Observer<InspectionEvent>>();
const registrationObservers = new Set<
Observer<RegistrationEvent<T['actors'][keyof T['actors']]>>
>();
const timerMap: { [id: ScheduledEventId]: number } = {};
const clock = options.clock;

Expand Down Expand Up @@ -164,6 +208,17 @@ export function createSystem<T extends ActorSystemInfo>(
_bookId: () => `x:${idCounter++}`,
_register: (sessionId, actorRef) => {
children.set(sessionId, actorRef);
const systemId = reverseKeyedActors.get(actorRef);
if (systemId !== undefined) {
const event = {
type: `@xstate.actor.register`,
systemId: systemId as string,
actorRef: actorRef as T['actors'][keyof T['actors']]
} as const;
registrationObservers.forEach((listener) => {
listener.next?.(event);
});
}
return sessionId;
},
_unregister: (actorRef) => {
Expand All @@ -173,11 +228,54 @@ export function createSystem<T extends ActorSystemInfo>(
if (systemId !== undefined) {
keyedActors.delete(systemId);
reverseKeyedActors.delete(actorRef);
const event = {
type: `@xstate.actor.unregister`,
systemId: systemId as string,
actorRef: actorRef as T['actors'][keyof T['actors']]
} as const;
registrationObservers.forEach((listener) => {
listener.next?.(event);
});
}
},
get: (systemId) => {
return keyedActors.get(systemId) as T['actors'][any];
},
subscribe: (
nextListenerOrObserver?:
| ((event: RegistrationEvent<T['actors'][keyof T['actors']]>) => void)
| Observer<RegistrationEvent<T['actors'][keyof T['actors']]>>,
errorListener?: (error: any) => void,
completeListener?: () => void
) => {
const observer = toObserver(
nextListenerOrObserver,
errorListener,
completeListener
);

if (rootActor._processingStatus !== ProcessingStatus.Stopped) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied the implementation in createActor. I think it would make sense to keep this since system whose root actor is stopped won't be receiving updates, right?

registrationObservers.add(observer);
} else {
const snapshot = rootActor.getSnapshot();
switch (snapshot.status) {
case 'done':
try {
observer.complete?.();
} catch (err) {
reportUnhandledError(err);
}
break;
// can this error?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would an error look like here?

}
}

return {
unsubscribe: () => {
registrationObservers.delete(observer);
}
};
},
_set: (systemId, actorRef) => {
const existing = keyedActors.get(systemId);
if (existing && existing !== actorRef) {
Expand All @@ -190,14 +288,16 @@ export function createSystem<T extends ActorSystemInfo>(
reverseKeyedActors.set(actorRef, systemId);
},
inspect: (observer) => {
observers.add(observer);
inspectionObservers.add(observer);
},
_sendInspectionEvent: (event) => {
const resolvedInspectionEvent: InspectionEvent = {
...event,
rootId: rootActor.sessionId
};
observers.forEach((observer) => observer.next?.(resolvedInspectionEvent));
inspectionObservers.forEach(
(observer) => observer.next?.(resolvedInspectionEvent)
);
},
_relay: (source, target, event) => {
system._sendInspectionEvent({
Expand Down Expand Up @@ -262,3 +362,28 @@ export type InspectionEvent =
| InspectedSnapshotEvent
| InspectedEventEvent
| InspectedActorEvent;

export interface RegisteredActorEvent<
TActorRef extends AnyActorRef,
TSystemId extends string = string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This made sense to me if there were any plans to allow for extracting systemId types from a machine.

Let me if it doesn't!

> {
type: `@xstate.actor.register`;
systemId: TSystemId;
actorRef: TActorRef;
}

export interface UnregisteredActorEvent<
TActorRef extends AnyActorRef,
TSystemId extends string = string
> {
type: `@xstate.actor.unregister`;
systemId: TSystemId;
actorRef: TActorRef;
}

export type RegistrationEvent<
TActorRef extends AnyActorRef = AnyActorRef,
TSystemId extends string = string
> =
| RegisteredActorEvent<TActorRef, TSystemId>
| UnregisteredActorEvent<TActorRef, TSystemId>;
114 changes: 114 additions & 0 deletions packages/core/test/system.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,4 +535,118 @@ describe('system', () => {

expect(spy).toHaveBeenCalledTimes(1);
});

it('should allow subscribing to system registration events', () => {
const aSystemId = 'a_child';
const bSystemId = 'b_child';
const machine = createMachine({
initial: 'a',
states: {
a: {
invoke: {
src: createMachine({}),
systemId: aSystemId
},
on: {
to_b: 'b'
}
},
b: {
invoke: {
src: createMachine({}),
systemId: bSystemId
},
on: {
to_a: 'a'
}
}
}
});

const actorRef = createActor(machine).start();

const events: string[] = [];

actorRef.system.subscribe((event) => {
events.push(`${event.type}.${event.systemId}`);
});

actorRef.send({ type: 'to_b' });
expect(events).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`
]);
actorRef.send({ type: 'to_a' });
expect(events).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`,
`@xstate.actor.unregister.${bSystemId}`,
`@xstate.actor.register.${aSystemId}`
]);
});

it('should allow unsubscribing from a system registration subscription', () => {
const aSystemId = 'a_child';
const bSystemId = 'b_child';
const machine = createMachine({
initial: 'a',
states: {
a: {
invoke: {
src: createMachine({}),
systemId: aSystemId
},
on: {
to_b: 'b'
}
},
b: {
invoke: {
src: createMachine({}),
systemId: bSystemId
},
on: {
to_a: 'a'
}
}
}
});

const actorRef = createActor(machine).start();

const events: string[] = [];
const unsubscribedEvents: string[] = [];

const subscription = actorRef.system.subscribe((event) => {
events.push(`${event.type}.${event.systemId}`);
});

actorRef.system.subscribe((event) => {
unsubscribedEvents.push(`${event.type}.${event.systemId}`);
});

actorRef.send({ type: 'to_b' });
expect(events).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`
]);
expect(unsubscribedEvents).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`
]);

subscription.unsubscribe();
actorRef.send({ type: 'to_a' });

expect(events).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`
]);
expect(unsubscribedEvents).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`,
`@xstate.actor.unregister.${bSystemId}`,
`@xstate.actor.register.${aSystemId}`
]);
});
});