redpanda/migrator: set IMPORT mode per subject instead of per version for serverless SR#4090
redpanda/migrator: set IMPORT mode per subject instead of per version for serverless SR#4090
Conversation
c0488c0 to
099989a
Compare
099989a to
efcbef2
Compare
3210527 to
8d27e6b
Compare
| // its previous mode once all its versions have been migrated. | ||
| // | ||
| // When the destination global mode is already IMPORT, or when the migrator is | ||
| // not in serverless mode, all operations are no-ops. |
There was a problem hiding this comment.
Bug: resolveSubject(srcSubject, 0) passes a hardcoded version 0, but resolveSubject exposes schema_registry_version as interpolation metadata for the NameResolver. If the name resolver references that metadata, version 0 produces an incorrect destination subject, and IMPORT mode gets set on the wrong subject.
The old code avoided this because ensureSubjectImportMode was called from syncSubjectSchema with the already-resolved dstSubject (which used ss.Version). Now that TrySetImportMode resolves the subject itself, it needs the actual version — or, preferably, accept the already-resolved dstSubject to stay consistent with the previous approach and avoid duplicating resolution logic.
8d27e6b to
9c0f53e
Compare
9c0f53e to
9e3ec37
Compare
| // TrySetImportMode sets the subject to IMPORT mode if not already done. | ||
| // No-op when inactive or when the subject was already switched. | ||
| func (c *importModeManager) TrySetImportMode(ctx context.Context, src schemaSubjectVersion) error { | ||
| if !c.active { | ||
| return nil | ||
| } | ||
|
|
||
| dstSubject, err := c.resolveSubject(src.Subject, src.Version) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Fast path: subject already tracked. | ||
| c.mu.RLock() | ||
| _, ok := c.prevMode[src] | ||
| c.mu.RUnlock() | ||
| if ok { | ||
| return nil | ||
| } | ||
|
|
||
| // Slow path: hold exclusive lock across the entire check-fetch-set | ||
| // sequence to prevent concurrent goroutines from racing on the same | ||
| // subject and clobbering the original mode. | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
| if _, ok := c.prevMode[src]; ok { | ||
| return nil | ||
| } | ||
|
|
||
| mode, err := srSubjectMode(ctx, c.dst, dstSubject) | ||
| if err != nil { | ||
| if strings.Contains(err.Error(), "does not have subject-level mode configured") { | ||
| mode = noMode | ||
| } else { | ||
| return noop, err | ||
| return err | ||
| } | ||
| } else if mode == sr.ModeImport { | ||
| return noop, nil | ||
| c.prevMode[src] = subjectMode{Subject: dstSubject, Mode: mode} | ||
| return nil |
There was a problem hiding this comment.
Bug: prevMode is keyed by source schemaSubjectVersion (subject+version), but the IMPORT mode operations target the destination subject. When VersionsAll is configured, the DFS processes multiple versions of the same subject sequentially via TrySetImportMode. The first version (e.g. C-v1) correctly records the original destination mode (READWRITE), sets IMPORT, and stores prevMode[C-v1] = {dst, READWRITE}. The second version (C-v2) then fetches the destination mode (now IMPORT), hits the else if mode == sr.ModeImport branch, and stores prevMode[C-v2] = {dst, IMPORT}.
During restore (in Close()), if C-v1 is restored first (→ READWRITE) and then C-v2 is restored (→ IMPORT), the subject is left in IMPORT mode. Since Go map iteration order is non-deterministic, this produces the wrong final state ~50% of the time.
The fix: key prevMode by destination subject instead of source subject+version, so only the first call per destination subject records the original mode and subsequent versions are no-ops.
9e3ec37 to
7853684
Compare
|
Commits Review LGTM |
7853684 to
32a0666
Compare
| err = fmt.Errorf("get schema for subject %q version %d: %w", s, v, err) | ||
| } | ||
| if !yield(schema, err) { | ||
| if !yield(subjectSchema{SubjectSchema: schema, Last: i == len(subs)-1}, err) { |
There was a problem hiding this comment.
Bug: Race condition — Last is i == len(subs)-1, meaning it's true for all versions of the last subject (by index), not the last version of each subject. This creates two problems:
-
Race with concurrent workers: In
Sync, items are distributed toMaxParallelHTTPRequestsworkers via a channel. Multiple versions of the last subject can be processed concurrently by different workers. The first worker to finish any version of the last subject callsRestore(line 709), reverting IMPORT mode while other workers may still be syncing other versions of that same subject — causing those in-flight syncs to fail. -
Non-last subjects never get
Restorecalled by workers: For all subjects except the last one,Lastis alwaysfalse, so their IMPORT mode is only restored duringClose()after the entire sync completes. This holds IMPORT mode longer than necessary.
The Last flag needs to track whether a given item is the last version for its subject, and Restore should only be called after all versions of that subject have been fully processed — which requires coordination across workers (e.g., a per-subject counter or processing all versions of a subject within a single worker).
…n for serverless SR Introduce importModeManager to batch per-subject IMPORT mode transitions in serverless schema registries. Previously ensureSubjectImportMode was called inside syncSubjectSchema for every version, toggling IMPORT mode on and off repeatedly. The new manager sets IMPORT mode once before the first version of a subject is written and restores the original mode after all versions are migrated. Close acts as a safety net, retrying restore for any subjects left over on error paths.
|
Commits Review
|
32a0666 to
1ab4d89
Compare
| err = fmt.Errorf("get schema for subject %q version %d: %w", s, v, err) | ||
| } | ||
| if !yield(schema, err) { | ||
| if !yield(subjectSchema{SubjectSchema: schema, Last: i == len(subs)-1}, err) { |
There was a problem hiding this comment.
Bug: Last: i == len(subs)-1 marks all versions of the last subject as Last: true, and no versions of any earlier subject. The intended semantic (based on how Sync uses Last to trigger Restore) is "last version of each subject", which should be checking the inner loop index against len(vers)-1, not the outer loop index against len(subs)-1.
Currently this is masked because Sync always calls listSubjectSchemas with VersionsLatest (where Last: true is unconditional), so the VersionsAll path's Last value is never consumed by Sync. However, ListSubjectSchemas does call this with VersionsAll and silently carries the wrong Last value (only ignored because it discards it via ss.SubjectSchema). If Sync is ever changed to use VersionsAll, this would cause:
- Intermediate subjects never eagerly restored (relying entirely on
Close()safety net) - A race condition for the last subject — multiple workers processing different versions could call
Restorewhile another worker is still syncing, removing IMPORT mode mid-write
|
Commits Review
|
Introduce importModeManager to batch per-subject IMPORT mode transitions in serverless schema registries. Previously ensureSubjectImportMode was called inside syncSubjectSchema for every version, toggling IMPORT mode on and off repeatedly. The new manager sets IMPORT mode once before the first version of a subject is written and restores the original mode after all versions are migrated. Close acts as a safety net, retrying restore for any subjects left over on error paths.