Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Synchronizers #5071

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
18 changes: 18 additions & 0 deletions .changeset/silly-needles-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
'xstate': minor
---

Added support for synchronizers in XState, allowing state persistence and synchronization across different storage mechanisms.

- Introduced `Synchronizer` interface for implementing custom synchronization logic
- Added `sync` option to `createActor` for attaching synchronizers to actors

```ts
import { createActor } from 'xstate';
import { someMachine } from './someMachine';
import { createLocalStorageSync } from './localStorageSynchronizer';

const actor = createActor(someMachine, {
sync: createLocalStorageSync('someKey')
});
```
14 changes: 8 additions & 6 deletions packages/core/src/StateMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,15 @@ export class StateMachine<
TConfig
>
): void {
Object.values(snapshot.children as Record<string, AnyActorRef>).forEach(
(child: any) => {
if (child.getSnapshot().status === 'active') {
child.start();
if (snapshot.children) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could likely be reverted - no tests fails without this if 😉

Object.values(snapshot.children as Record<string, AnyActorRef>).forEach(
(child: any) => {
if (child.getSnapshot().status === 'active') {
child.start();
}
}
}
);
);
}
}

public getStateNodeById(stateId: string): StateNode<TContext, TEvent> {
Expand Down
26 changes: 24 additions & 2 deletions packages/core/src/createActor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import type {
InputFrom,
IsNotNever,
Snapshot,
SnapshotFrom
SnapshotFrom,
Synchronizer
} from './types.ts';
import {
ActorOptions,
Expand Down Expand Up @@ -119,6 +120,9 @@ export class Actor<TLogic extends AnyActorLogic>

public src: string | AnyActorLogic;

private _synchronizer?: Synchronizer<any>;
private _synchronizerSubscription?: Subscription;

/**
* Creates a new actor instance for the given logic with the provided options,
* if any.
Expand Down Expand Up @@ -207,7 +211,23 @@ export class Actor<TLogic extends AnyActorLogic>
this.system._set(systemId, this);
}

this._initState(options?.snapshot ?? options?.state);
this._synchronizer = options?.sync;

const initialSnapshot =
this._synchronizer?.getSnapshot() ?? options?.snapshot ?? options?.state;

this._initState(initialSnapshot);

if (this._synchronizer) {
this._synchronizerSubscription = this._synchronizer.subscribe(
(rawSnapshot) => {
const restoredSnapshot =
this.logic.restoreSnapshot?.(rawSnapshot, this._actorScope) ??
rawSnapshot;
this.update(restoredSnapshot, { type: '@xstate.sync' });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use @ for other builtin events like init, done.invoke and more

Suggested change
this.update(restoredSnapshot, { type: '@xstate.sync' });
this.update(restoredSnapshot, { type: 'xstate.sync' });

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should get enqueued and not handled immediately because with this model we allow for this.update to be potentially reentered (and that's a bad thing™). And if we are in the middle of processing 5 events mailbox and we receive a rawSnapshot here then likely we should append to the mailbox instead of processing it right away

}
);
}

if (systemId && (this._snapshot as any).status !== 'active') {
this.system._unregister(this);
Expand Down Expand Up @@ -263,6 +283,7 @@ export class Actor<TLogic extends AnyActorLogic>

switch ((this._snapshot as any).status) {
case 'active':
this._synchronizer?.setSnapshot(snapshot);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only for the active status? done and error should likely be passed to it too (and likely stopped too)

for (const observer of this.observers) {
try {
observer.next?.(snapshot);
Expand Down Expand Up @@ -567,6 +588,7 @@ export class Actor<TLogic extends AnyActorLogic>
return this;
}
this.mailbox.clear();
this._synchronizerSubscription?.unsubscribe();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what snapshot should the synchronizer be aware of when somebody does smth like this:

const actorRef = createActor(m, { sync: someSynchronizer })

// ... do stuff

actorRef.stop()
actorRef.getSnapshot() // .status === 'stopped'

If I'm not mistaken the known snapshot after this operation has a stopped status but the synchronizer won't be aware of it because we have unsubscribed earlier.

if (this._processingStatus === ProcessingStatus.NotStarted) {
this._processingStatus = ProcessingStatus.Stopped;
return this;
Expand Down
13 changes: 13 additions & 0 deletions packages/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1884,6 +1884,19 @@ export interface ActorOptions<TLogic extends AnyActorLogic> {
inspect?:
| Observer<InspectionEvent>
| ((inspectionEvent: InspectionEvent) => void);

sync?: Synchronizer<SnapshotFrom<TLogic>>;
}

export interface Synchronizer<T> extends Subscribable<T> {
/**
* Gets the snapshot or undefined
*
* An undefined snapshot means the synchronizer does not intend to override
* the initial or provided snapshot of the actor
*/
getSnapshot(): Snapshot<T> | undefined;
setSnapshot(snapshot: T): void;
}

export type AnyActor = Actor<any>;
Expand Down
176 changes: 176 additions & 0 deletions packages/core/test/sync.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import {
createActor,
createMachine,
Observer,
Synchronizer,
toObserver,
waitFor
} from '../src';

describe('synchronizers', () => {
it('work with a synchronous synchronizer', () => {
const snapshotRef = {
current: JSON.stringify({ value: 'b', children: {}, status: 'active' })
};
const pseudoStorage = {
getItem: (key: string) => {
return JSON.parse(snapshotRef.current);
},
setItem: (key: string, value: string) => {
snapshotRef.current = value;
}
};
const createStorageSync = (key: string): Synchronizer<any> => {
const observers = new Set();
return {
getSnapshot: () => pseudoStorage.getItem(key),
setSnapshot: (snapshot) => {
pseudoStorage.setItem(key, JSON.stringify(snapshot));
},
subscribe: (o) => {
const observer = toObserver(o);

const state = pseudoStorage.getItem(key);

observer.next?.(state);

observers.add(observer);

return {
unsubscribe: () => {
observers.delete(observer);
}
};
}
};
};

const machine = createMachine({
initial: 'a',
states: {
a: {},
b: {
on: {
next: 'c'
}
},
c: {}
}
});

const actor = createActor(machine, {
sync: createStorageSync('test')
}).start();

expect(actor.getSnapshot().value).toBe('b');

actor.send({ type: 'next' });

expect(actor.getSnapshot().value).toBe('c');

expect(pseudoStorage.getItem('test').value).toBe('c');
});

it('work with an asynchronous synchronizer', async () => {
let snapshotRef = {
current: undefined as any
};
let onChangeRef = {
current: (() => {}) as (value: any) => void
};
const pseudoStorage = {
getItem: async (key: string) => {
if (!snapshotRef.current) {
return undefined;
}
return JSON.parse(snapshotRef.current);
},
setItem: (key: string, value: string, source?: 'sync') => {
snapshotRef.current = value;

if (source !== 'sync') {
onChangeRef.current(JSON.parse(value));
}
},
subscribe: (fn: (value: any) => void) => {
onChangeRef.current = fn;
}
};

const createStorageSync = (key: string): Synchronizer<any> => {
const observers = new Set<Observer<any>>();

pseudoStorage.subscribe((value) => {
observers.forEach((observer) => {
observer.next?.(value);
});
});

const getSnapshot = () => {
if (!snapshotRef.current) {
return undefined;
}
return JSON.parse(snapshotRef.current);
};

const storageSync = {
getSnapshot,
setSnapshot: (snapshot) => {
const s = JSON.stringify(snapshot);
pseudoStorage.setItem(key, s, 'sync');
},
subscribe: (o) => {
const observer = toObserver(o);

const state = getSnapshot();

if (state) {
observer.next?.(state);
}

observers.add(observer);

return {
unsubscribe: () => {
observers.delete(observer);
}
};
}
} satisfies Synchronizer<any>;

setTimeout(() => {
pseudoStorage.setItem(
'key',
JSON.stringify({ value: 'b', children: {}, status: 'active' })
);
}, 100);

return storageSync;
};

const machine = createMachine({
initial: 'a',
states: {
a: {},
b: {
on: {
next: 'c'
}
},
c: {}
}
});

const actor = createActor(machine, {
sync: createStorageSync('test')
}).start();

expect(actor.getSnapshot().value).toBe('a');

await waitFor(actor, () => actor.getSnapshot().value === 'b');

actor.send({ type: 'next' });

expect(actor.getSnapshot().value).toBe('c');
});
});