Skip to content

Commit

Permalink
refactor system subscribe to make use of inspect. add registration ev…
Browse files Browse the repository at this point in the history
…ents to inspect, and have return a subscription.
  • Loading branch information
cevr committed Jan 16, 2024
1 parent ff8ce31 commit e201278
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 213 deletions.
12 changes: 6 additions & 6 deletions .changeset/tasty-baboons-dance.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@
'xstate': minor
---

added support for `actor.system.subscribe` to subscribe to registration and unregistration events within an actor's system.
`actor.system.subscribe` returns a `Subscription` object you can `.unsubscribe` to at any time.
added registration and unregistration events to `actor.system.inspect`.
`actor.system.inspect` now returns a `Subscription` object you can `.unsubscribe` to at any time.

ex:

```js
// observer object
const subscription = actor.system.subscribe({
const subscription = actor.system.inspect({
next: (event) => console.log(event),
error: (err) => console.error(err),
complete: () => console.log('done')
});

// observer parameters
const subscription = actor.system.subscribe(
const subscription = actor.system.inspect(
(event) => console.log(event),
(err) => console.error(err),
() => console.log('done')
);

// callback function
const subscription = actor.system.subscribe((event) => console.log(event));
// single listener
const subscription = actor.system.inspect((event) => console.log(event));

// unsubscribe
subscription.unsubscribe();
Expand Down
143 changes: 50 additions & 93 deletions packages/core/src/system.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import { ProcessingStatus } from './createActor.ts';
import { reportUnhandledError } from './reportUnhandledError.ts';
import {
AnyEventObject,
ActorSystemInfo,
AnyActorRef,
Observer,
Snapshot,
Subscribable,
HomomorphicOmit,
EventObject
EventObject,
Subscription
} from './types.ts';
import { toObserver } from './utils.ts';

Expand Down Expand Up @@ -47,13 +45,7 @@ function createScheduledEventId(
return `${actorRef.sessionId}.${id}` as ScheduledEventId;
}

export interface ActorSystem<T extends ActorSystemInfo>
extends Subscribable<
RegistrationEvent<
T['actors'][keyof T['actors']],
keyof T['actors'] & string
>
> {
export interface ActorSystem<T extends ActorSystemInfo> {
/**
* @internal
*/
Expand All @@ -66,18 +58,19 @@ export interface ActorSystem<T extends ActorSystemInfo>
* @internal
*/
_unregister: (actorRef: AnyActorRef) => void;
/**
* @internal
*/
_sendRegistrationEvent: (
event: RegistrationEvent<T['actors'][keyof T['actors']]>
) => void;
/**
* @internal
*/
_set: <K extends keyof T['actors']>(key: K, actorRef: T['actors'][K]) => void;
get: <K extends keyof T['actors']>(key: K) => T['actors'][K] | undefined;
inspect: (observer: Observer<InspectionEvent>) => void;
inspect: {
(observer: Observer<InspectionEvent>): Subscription;
(
next: (value: InspectionEvent) => void,
error?: (error: any) => void,
complete?: () => void
): Subscription;
};
/**
* @internal
*/
Expand Down Expand Up @@ -118,10 +111,7 @@ export function createSystem<T extends ActorSystemInfo>(
const children = new Map<string, AnyActorRef>();
const keyedActors = new Map<keyof T['actors'], AnyActorRef | undefined>();
const reverseKeyedActors = new WeakMap<AnyActorRef, keyof T['actors']>();
const inspectionObservers = new Set<Observer<InspectionEvent>>();
const registrationObservers = new Set<
Observer<RegistrationEvent<T['actors'][keyof T['actors']]>>
>();
const observers = new Set<Observer<InspectionEvent>>();
const timerMap: { [id: ScheduledEventId]: number } = {};
const clock = options.clock;

Expand Down Expand Up @@ -185,10 +175,10 @@ export function createSystem<T extends ActorSystemInfo>(
children.set(sessionId, actorRef);
const systemId = reverseKeyedActors.get(actorRef);
if (systemId !== undefined) {
system._sendRegistrationEvent({
type: `@xstate.actor.register`,
system._sendInspectionEvent({
type: `@xstate.register`,
systemId: systemId as string,
actorRef: actorRef as T['actors'][keyof T['actors']]
actorRef
});
}
return sessionId;
Expand All @@ -200,20 +190,31 @@ export function createSystem<T extends ActorSystemInfo>(
if (systemId !== undefined) {
keyedActors.delete(systemId);
reverseKeyedActors.delete(actorRef);
system._sendRegistrationEvent({
type: `@xstate.actor.unregister`,
system._sendInspectionEvent({
type: `@xstate.unregister`,
systemId: systemId as string,
actorRef: actorRef as T['actors'][keyof T['actors']]
} as const);
actorRef
});
}
},
get: (systemId) => {
return keyedActors.get(systemId) as T['actors'][any];
},
subscribe: (
_set: (systemId, actorRef) => {
const existing = keyedActors.get(systemId);
if (existing && existing !== actorRef) {
throw new Error(
`Actor with system ID '${systemId as string}' already exists.`
);
}

keyedActors.set(systemId, actorRef);
reverseKeyedActors.set(actorRef, systemId);
},
inspect: (
nextListenerOrObserver:
| ((event: RegistrationEvent<T['actors'][keyof T['actors']]>) => void)
| Observer<RegistrationEvent<T['actors'][keyof T['actors']]>>,
| ((event: InspectionEvent) => void)
| Observer<InspectionEvent>,
errorListener?: (error: any) => void,
completeListener?: () => void
) => {
Expand All @@ -223,53 +224,20 @@ export function createSystem<T extends ActorSystemInfo>(
completeListener
);

if (rootActor._processingStatus !== ProcessingStatus.Stopped) {
registrationObservers.add(observer);
} else {
const snapshot = rootActor.getSnapshot();
switch (snapshot.status) {
case 'done':
try {
observer.complete?.();
} catch (err) {
reportUnhandledError(err);
}
break;
// can this error?
}
}
observers.add(observer);

return {
unsubscribe: () => {
registrationObservers.delete(observer);
observers.delete(observer);
}
};
},
_set: (systemId, actorRef) => {
const existing = keyedActors.get(systemId);
if (existing && existing !== actorRef) {
throw new Error(
`Actor with system ID '${systemId as string}' already exists.`
);
}

keyedActors.set(systemId, actorRef);
reverseKeyedActors.set(actorRef, systemId);
},
inspect: (observer) => {
inspectionObservers.add(observer);
},
_sendInspectionEvent: (event) => {
const resolvedInspectionEvent: InspectionEvent = {
...event,
rootId: rootActor.sessionId
};
inspectionObservers.forEach(
(observer) => observer.next?.(resolvedInspectionEvent)
);
},
_sendRegistrationEvent: (event) => {
registrationObservers.forEach((observer) => observer.next?.(event));
observers.forEach((observer) => observer.next?.(resolvedInspectionEvent));
},
_relay: (source, target, event) => {
system._sendInspectionEvent({
Expand Down Expand Up @@ -307,6 +275,7 @@ export interface BaseInspectionEventProperties {
* - For snapshot events, this is the `actorRef` of the snapshot.
* - For event events, this is the target `actorRef` (recipient of event).
* - For actor events, this is the `actorRef` of the registered actor.
* - For registration events, this is the `actorRef` of the registered actor.
*/
actorRef: AnyActorRef;
}
Expand All @@ -330,32 +299,20 @@ export interface InspectedActorEvent extends BaseInspectionEventProperties {
type: '@xstate.actor';
}

export type InspectionEvent =
| InspectedSnapshotEvent
| InspectedEventEvent
| InspectedActorEvent;

export interface RegisteredActorEvent<
TActorRef extends AnyActorRef,
TSystemId extends string = string
> {
type: `@xstate.actor.register`;
systemId: TSystemId;
actorRef: TActorRef;
export interface InspectedRegisterEvent extends BaseInspectionEventProperties {
type: `@xstate.register`;
systemId: string;
}

export interface UnregisteredActorEvent<
TActorRef extends AnyActorRef,
TSystemId extends string = string
> {
type: `@xstate.actor.unregister`;
systemId: TSystemId;
actorRef: TActorRef;
export interface InspectedUnregisterEvent
extends BaseInspectionEventProperties {
type: `@xstate.unregister`;
systemId: string;
}

export type RegistrationEvent<
TActorRef extends AnyActorRef = AnyActorRef,
TSystemId extends string = string
> =
| RegisteredActorEvent<TActorRef, TSystemId>
| UnregisteredActorEvent<TActorRef, TSystemId>;
export type InspectionEvent =
| InspectedSnapshotEvent
| InspectedEventEvent
| InspectedActorEvent
| InspectedRegisterEvent
| InspectedUnregisterEvent;
Loading

0 comments on commit e201278

Please sign in to comment.