Skip to content

redpanda/migrator: set IMPORT mode per subject instead of per version for serverless SR#4090

Closed
mmatczuk wants to merge 1 commit intomainfrom
mmt/migrator_schema_sync_fix
Closed

redpanda/migrator: set IMPORT mode per subject instead of per version for serverless SR#4090
mmatczuk wants to merge 1 commit intomainfrom
mmt/migrator_schema_sync_fix

Conversation

@mmatczuk
Copy link
Contributor

Introduce importModeManager to batch per-subject IMPORT mode transitions in serverless schema registries. Previously ensureSubjectImportMode was called inside syncSubjectSchema for every version, toggling IMPORT mode on and off repeatedly. The new manager sets IMPORT mode once before the first version of a subject is written and restores the original mode after all versions are migrated. Close acts as a safety net, retrying restore for any subjects left over on error paths.

@mmatczuk mmatczuk force-pushed the mmt/migrator_schema_sync_fix branch from c0488c0 to 099989a Compare March 11, 2026 13:23
@mmatczuk mmatczuk force-pushed the mmt/migrator_schema_sync_fix branch from 099989a to efcbef2 Compare March 11, 2026 13:37
@mmatczuk mmatczuk force-pushed the mmt/migrator_schema_sync_fix branch 2 times, most recently from 3210527 to 8d27e6b Compare March 11, 2026 13:44
@redpanda-data redpanda-data deleted a comment from claude bot Mar 11, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 11, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 11, 2026
// its previous mode once all its versions have been migrated.
//
// When the destination global mode is already IMPORT, or when the migrator is
// not in serverless mode, all operations are no-ops.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: resolveSubject(srcSubject, 0) passes a hardcoded version 0, but resolveSubject exposes schema_registry_version as interpolation metadata for the NameResolver. If the name resolver references that metadata, version 0 produces an incorrect destination subject, and IMPORT mode gets set on the wrong subject.

The old code avoided this because ensureSubjectImportMode was called from syncSubjectSchema with the already-resolved dstSubject (which used ss.Version). Now that TrySetImportMode resolves the subject itself, it needs the actual version — or, preferably, accept the already-resolved dstSubject to stay consistent with the previous approach and avoid duplicating resolution logic.

@mmatczuk mmatczuk force-pushed the mmt/migrator_schema_sync_fix branch from 8d27e6b to 9c0f53e Compare March 11, 2026 13:52
@mmatczuk mmatczuk force-pushed the mmt/migrator_schema_sync_fix branch from 9c0f53e to 9e3ec37 Compare March 11, 2026 15:14
@redpanda-data redpanda-data deleted a comment from claude bot Mar 11, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 11, 2026
Comment on lines +979 to +1018
// TrySetImportMode sets the subject to IMPORT mode if not already done.
// No-op when inactive or when the subject was already switched.
func (c *importModeManager) TrySetImportMode(ctx context.Context, src schemaSubjectVersion) error {
if !c.active {
return nil
}

dstSubject, err := c.resolveSubject(src.Subject, src.Version)
if err != nil {
return err
}

// Fast path: subject already tracked.
c.mu.RLock()
_, ok := c.prevMode[src]
c.mu.RUnlock()
if ok {
return nil
}

// Slow path: hold exclusive lock across the entire check-fetch-set
// sequence to prevent concurrent goroutines from racing on the same
// subject and clobbering the original mode.
c.mu.Lock()
defer c.mu.Unlock()

if _, ok := c.prevMode[src]; ok {
return nil
}

mode, err := srSubjectMode(ctx, c.dst, dstSubject)
if err != nil {
if strings.Contains(err.Error(), "does not have subject-level mode configured") {
mode = noMode
} else {
return noop, err
return err
}
} else if mode == sr.ModeImport {
return noop, nil
c.prevMode[src] = subjectMode{Subject: dstSubject, Mode: mode}
return nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: prevMode is keyed by source schemaSubjectVersion (subject+version), but the IMPORT mode operations target the destination subject. When VersionsAll is configured, the DFS processes multiple versions of the same subject sequentially via TrySetImportMode. The first version (e.g. C-v1) correctly records the original destination mode (READWRITE), sets IMPORT, and stores prevMode[C-v1] = {dst, READWRITE}. The second version (C-v2) then fetches the destination mode (now IMPORT), hits the else if mode == sr.ModeImport branch, and stores prevMode[C-v2] = {dst, IMPORT}.

During restore (in Close()), if C-v1 is restored first (→ READWRITE) and then C-v2 is restored (→ IMPORT), the subject is left in IMPORT mode. Since Go map iteration order is non-deterministic, this produces the wrong final state ~50% of the time.

The fix: key prevMode by destination subject instead of source subject+version, so only the first call per destination subject records the original mode and subsequent versions are no-ops.

@mmatczuk mmatczuk force-pushed the mmt/migrator_schema_sync_fix branch from 9e3ec37 to 7853684 Compare March 11, 2026 16:27
@claude
Copy link

claude bot commented Mar 11, 2026

Commits
LGTM

Review
This PR introduces importModeManager to batch per-subject IMPORT mode transitions in serverless schema registries, replacing the previous per-version toggling. The new manager sets IMPORT mode once before the first version of a subject is written and restores the original mode after all versions are migrated, with Close() as a safety net for error paths. The concurrency model is sound — Sync always uses VersionsLatest so each subject gets exactly one channel item with Last: true, avoiding races between Restore and TrySetImportMode. Tests cover multi-version subjects with shared dependencies, single subjects, and pre-existing IMPORT mode.

LGTM

@mmatczuk mmatczuk force-pushed the mmt/migrator_schema_sync_fix branch from 7853684 to 32a0666 Compare March 11, 2026 16:32
@redpanda-data redpanda-data deleted a comment from claude bot Mar 11, 2026
err = fmt.Errorf("get schema for subject %q version %d: %w", s, v, err)
}
if !yield(schema, err) {
if !yield(subjectSchema{SubjectSchema: schema, Last: i == len(subs)-1}, err) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race condition — Last is i == len(subs)-1, meaning it's true for all versions of the last subject (by index), not the last version of each subject. This creates two problems:

  1. Race with concurrent workers: In Sync, items are distributed to MaxParallelHTTPRequests workers via a channel. Multiple versions of the last subject can be processed concurrently by different workers. The first worker to finish any version of the last subject calls Restore (line 709), reverting IMPORT mode while other workers may still be syncing other versions of that same subject — causing those in-flight syncs to fail.

  2. Non-last subjects never get Restore called by workers: For all subjects except the last one, Last is always false, so their IMPORT mode is only restored during Close() after the entire sync completes. This holds IMPORT mode longer than necessary.

The Last flag needs to track whether a given item is the last version for its subject, and Restore should only be called after all versions of that subject have been fully processed — which requires coordination across workers (e.g., a per-subject counter or processing all versions of a subject within a single worker).

…n for serverless SR

Introduce importModeManager to batch per-subject IMPORT mode transitions
in serverless schema registries. Previously ensureSubjectImportMode was
called inside syncSubjectSchema for every version, toggling IMPORT mode
on and off repeatedly. The new manager sets IMPORT mode once before the
first version of a subject is written and restores the original mode
after all versions are migrated. Close acts as a safety net, retrying
restore for any subjects left over on error paths.
@claude
Copy link

claude bot commented Mar 11, 2026

Commits
LGTM

Review
The PR refactors per-version IMPORT mode toggling into a centralized importModeManager with double-checked locking, deferred cleanup, and retry logic. The new integration tests cover multi-version, shared-dependency, and pre-set IMPORT scenarios well.

  1. Bug: Race condition with Last flag in VersionsAll mode — In listSubjectSchemas, Last is set based on whether the subject is the last in the list (i == len(subs)-1), not whether the version is the last for its subject. Since Sync fans items out to concurrent workers, a worker finishing early can call Restore (reverting IMPORT mode) while other workers are still syncing other versions of the same subject. For non-last subjects, Restore is never called by workers at all — they rely entirely on Close(). See inline comment.

@mmatczuk mmatczuk force-pushed the mmt/migrator_schema_sync_fix branch from 32a0666 to 1ab4d89 Compare March 11, 2026 16:36
err = fmt.Errorf("get schema for subject %q version %d: %w", s, v, err)
}
if !yield(schema, err) {
if !yield(subjectSchema{SubjectSchema: schema, Last: i == len(subs)-1}, err) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Last: i == len(subs)-1 marks all versions of the last subject as Last: true, and no versions of any earlier subject. The intended semantic (based on how Sync uses Last to trigger Restore) is "last version of each subject", which should be checking the inner loop index against len(vers)-1, not the outer loop index against len(subs)-1.

Currently this is masked because Sync always calls listSubjectSchemas with VersionsLatest (where Last: true is unconditional), so the VersionsAll path's Last value is never consumed by Sync. However, ListSubjectSchemas does call this with VersionsAll and silently carries the wrong Last value (only ignored because it discards it via ss.SubjectSchema). If Sync is ever changed to use VersionsAll, this would cause:

  1. Intermediate subjects never eagerly restored (relying entirely on Close() safety net)
  2. A race condition for the last subject — multiple workers processing different versions could call Restore while another worker is still syncing, removing IMPORT mode mid-write

@claude
Copy link

claude bot commented Mar 11, 2026

Commits
LGTM

Review
This PR introduces importModeManager to batch per-subject IMPORT mode transitions for serverless schema registries, replacing the previous per-version toggling. The overall design is sound — the manager correctly deduplicates mode transitions, handles concurrent access with a read/write mutex, and provides a Close() safety net for error paths.

  1. Last field logic incorrect for VersionsAll: In listSubjectSchemas, the Last flag in the VersionsAll case uses the outer subject index (i == len(subs)-1) instead of the inner version index, marking all versions of only the last subject as Last. Currently masked because Sync always uses VersionsLatest, but would cause race conditions and missed eager restores if Sync ever switches to VersionsAll.

@mmatczuk mmatczuk closed this Mar 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant