Add concurrent payload visitor#2140
Conversation
| - name: Check generated payload visitor is up to date | ||
| run: | | ||
| pnpm run gen:payload-visitor | ||
| if ! git diff --exit-code -- packages/proto/src/payload-visitor.generated.ts; then | ||
| echo "::error::payload-visitor.generated.ts is out of date with the protos. Run 'pnpm run gen:payload-visitor' and commit the result." | ||
| exit 1 | ||
| fi |
There was a problem hiding this comment.
safeguard to make sure generated protos stay up to data
| "rebuild": "pnpm run clean && pnpm run build", | ||
| "build": "pnpm --recursive --stream run build", | ||
| "build:watch": "pnpm run build:protos && tsc --build --watch packages/*/tsconfig.json", | ||
| "build:watch": "pnpm run build:protos && pnpm run gen:payload-visitor && tsc --build --watch packages/*/tsconfig.json", |
There was a problem hiding this comment.
Building the protos package now builds the payload visitor as well. I can't think of a reason why you wouldn't want to keep them in sync but you can still run build:protos to just build the protos.
There was a problem hiding this comment.
Same as before, let's rename that one to gen:protos, to be consistent with the name you just introduced. Also, introduce a new gen script, that calls gen:protos and gen:payload-visitor, and use that one from build:watch instead of explicitly calling both gen:xxx.
| * A counting semaphore. `acquire` resolves once a permit is available; `release` returns one, | ||
| * handing it directly to the longest-waiting acquirer if any. | ||
| */ | ||
| class Semaphore { |
There was a problem hiding this comment.
Hand rolling concurrency primitives feels bad but I didn't see any reusable alternatives. I could move this to a different place if folks have opinions.
There was a problem hiding this comment.
I'd be inclined to move this to common/src/concurrency-utils.ts (new file), but no strong opinion.
jmaeagle99
left a comment
There was a problem hiding this comment.
Haven't read too closely, but I think a few things need to be changed:
- The visitor shouldn't enforce cardinality. If a callback wants to mutate the arity of an array, it should be able to e.g. codecs.
- There needs to be some kind of context that is passed through while walking and a separate visitor that allows updating/replacing the context on a per-message basis. This would allow things like SerializationContext or StorageDriverStoreContext to change as we iterate commands in a WFT completion.
- There should be settings for skipping search attributes and headers.
| skipHeaders?: boolean; | ||
| skipSearchAttributes?: boolean; |
| export type PayloadTransform<Ctx> = ( | ||
| payloads: Payload[], | ||
| context: Ctx, | ||
| abortSignal?: AbortSignal | ||
| ) => Promise<Payload[]>; |
There was a problem hiding this comment.
This was the most simple implementation: a PayloadTransform gets Payload[] in and give Payload[] back out. However, there is a hidden contract here: if a non-repeatable field represents a payload is being transformed here, we must return exactly one payload in the transform. Practically this means that if payloads.length === 1 then the return must also be of length 1. As @jmaeagle99 pointed out, we can't just not return a payload when the proto message needs one.
Few options:
- Leave it as it is and document it in the comments.
- Add separate "transform" mechanism that enforce this rule statically
export interface PayloadVisitor<Ctx> {
transformSingle(payload: Payload, context: Ctx): Promise<Payload>;
transformRepeated(payloads: Payload[], context: Ctx): Promise<Payload[]>;
}- Keep one
PayloadTransformbut have the type of the inputpayloadsbe generic and encode the output type information. I think this is elegant and I might reach for this option in other languages but I think it would be a little unwieldy in TS.
There was a problem hiding this comment.
Additionally, is returning a different number of payloads from transforming a repeated field something we want to allow for or encourage? Does it break any assumptions that eventual users of this payload visitor might reasonably have?
For example, if a codec using this payload visitor returned a different number of payloads and we didn't enforce any rules this could happen:
let input = ...
let output = decode(encode(input))
input.length === output.length // could be false!Whether that is "our problem" or not is debatable but we can at least prevent that behavior if we wish to.
There was a problem hiding this comment.
Lastly, I want to point out that for map fields containing payloads they are visited per entry. The visitor receives each map value as a single-payload rather than receiving the whole map batch. This means map keys and cardinality are preserved (since we enforce that one payload in means one payload out). Technically, its OK to "remove" an item from a map but I'm not sure that is something we want to allow in this visitor (maps are represented as repeated fields on the wire).
Just wanted to bring this up in case anyone can think of a reason to allow for changing the structure of a map with the payload visitor.
There was a problem hiding this comment.
Technically, its OK to "remove" an item from a map but I'm not sure that is something we want to allow in this visitor (maps are represented as repeated fields on the wire).
We don't have any need to do this as part of the payload visitor.
| export function boundPayloadTransform<Ctx>( | ||
| transform: PayloadTransform<Ctx>, | ||
| concurrency: number, | ||
| failure: AbortController | ||
| ): (payloads: Payload[], context: Ctx) => Promise<Payload[]> { |
There was a problem hiding this comment.
I chose to wrap the user supplied "transform" in the concurrency limiting logic to keep the actual visiting/walking logic synchronous and agnostic to whatever constraints we want to place on concurrent visiting.
5ce0cb3 to
1978dce
Compare
…'t enforce strict cardinality checks on payload transforms.
1978dce to
7cf4f4d
Compare
| "build": "pnpm run build:protos && pnpm run gen:payload-visitor && pnpm run build:ts", | ||
| "build:ts": "tsc --build", | ||
| "build:protos": "tsx ./scripts/compile-proto.ts" | ||
| "build:protos": "tsx ./scripts/compile-proto.ts", |
There was a problem hiding this comment.
Let's rename that one to gen:protos, to be consistent with the name you just introduced. Also, introduce a new gen script, that calls gen:protos and gen:payload-visitor, and use that one from build instead of explicitly calling both gen:xxx.
| } else { | ||
| const walk = fnName(message); | ||
| if (field.map) { | ||
| lines = [`const m = ${access};`, `if (m) for (const v of Object.values(m)) ${walk}(v, env, ctx, pending);`]; |
There was a problem hiding this comment.
nit: Use a single, multiline `...` block. That's what we do in many other places (e.g. reusable-vm.ts, bundler.ts, create-project/src/index.ts, etc). If you want to avoid extra leading spaces in the generated output file (not a big deal tbh), use dedent, with the "dedent..." syntax (see the examples I named before).
| } | ||
|
|
||
| const env: WalkEnv<Ctx> = { | ||
| transform: boundPayloadTransform(payloadTransform, concurrency, failure), |
There was a problem hiding this comment.
I'd suggest moving concurrency control out of this function. The caller can itself call boundPayloadTransform() with appropriate settings, and pass the bounded transformer to the visitor.
This would give the caller more control on the scope of what is concurrency controlled, e.g. applying a global concurrency limit per payload store. As it is now, concurrency limit applies per "visit job", but there could be multiple visit jobs being run concurrently (e.g. processing multiple workflow activations), which means the total number of access operations to the payload store really performed concurrently might be multiple times the per-visit-job limit.
| * @internal | ||
| * @experimental | ||
| */ | ||
| export function visitWorkflowActivation<Ctx = void>( |
There was a problem hiding this comment.
Please add the async keyword on this function. No need to await the runVisit's promise, just add the keyword.
Without the async keyword, the visitWorkflowActivation function will be completely missing in stack traces produced by async code downstream of this function, making it harder to diagnose eventual bugs.
| * @internal | ||
| * @experimental | ||
| */ | ||
| export function visitWorkflowActivationCompletion<Ctx = void>( |
What was changed
WorkflowActivation/WorkflowActivationCompletionproto trees.proto/scripts/gen-payload-visitor.tsscript that generates the visitors atproto/src/payload-visitor.generated.ts. Runs automatically when new protos are generated. About ~1k lines of checked in code.Why?
Prerequisite for External Storage store/retrieve (and reusable by codecs / payload validation): we need to find and transform every payload in an arbitrary proto message with bounded concurrency.