diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index a6a8c0456..f74bc4f2a 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -1120,6 +1120,17 @@ pub async fn analyze_flow( .with(&flow_inst)? .with(&flow_schema.schema)? .into_fingerprint(); + + // Build a lineage fingerprint that focuses on export-level field lineage. + // It include export op identity, target kind, primary key schema and the + // output value fingerprinter (which is derived from the output value schema) + // so that benign changes (like exclude_pattern on sources or intermediate + // field ordering) that don't affect exported field lineage will not change + // this fingerprint. + let mut lineage_fp = Fingerprinter::default(); + lineage_fp = lineage_fp.with(&flow_inst.export_ops)?; + lineage_fp = lineage_fp.with(&flow_schema.schema)?; + let lineage_fingerprint = lineage_fp.into_fingerprint(); let plan_fut = async move { let (import_ops, op_scope, export_ops) = try_join3( try_join_all(import_ops_futs), @@ -1130,6 +1141,7 @@ pub async fn analyze_flow( Ok(ExecutionPlan { logic_fingerprint, + lineage_fingerprint, import_ops, op_scope, export_ops, diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 88d08c60e..a5b2d520e 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -135,6 +135,9 @@ pub struct AnalyzedOpScope { pub struct ExecutionPlan { pub logic_fingerprint: Fingerprint, + /// Coarser-grained fingerprint that captures field-level lineage relevant to outputs/exports. + /// Changes that don't affect lineage should not modify this fingerprint. + pub lineage_fingerprint: Fingerprint, pub import_ops: Vec, pub op_scope: AnalyzedOpScope, diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index 9def9fb06..45ae0b919 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -88,6 +88,7 @@ pub struct SourceTrackingInfoForProcessing { pub processed_source_ordinal: Option, pub processed_source_fp: Option>, pub process_logic_fingerprint: Option>, + pub process_lineage_fingerprint: Option>, pub max_process_ordinal: Option, pub process_ordinal: Option, } @@ -99,7 +100,7 @@ pub async fn read_source_tracking_info_for_processing( pool: &PgPool, ) -> Result> { let query_str = format!( - "SELECT memoization_info, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2", + "SELECT memoization_info, processed_source_ordinal, {}, process_logic_fingerprint, process_lineage_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2", if db_setup.has_fast_fingerprint_column { "processed_source_fp" } else { @@ -124,6 +125,7 @@ pub struct SourceTrackingInfoForPrecommit { pub processed_source_ordinal: Option, pub processed_source_fp: Option>, pub process_logic_fingerprint: Option>, + pub process_lineage_fingerprint: Option>, pub process_ordinal: Option, pub target_keys: Option>, } @@ -135,7 +137,7 @@ pub async fn read_source_tracking_info_for_precommit( db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>, ) -> Result> { let query_str = format!( - "SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, {}, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2", + "SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, {}, process_logic_fingerprint, process_lineage_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2", if db_setup.has_fast_fingerprint_column { "processed_source_fp" } else { @@ -240,6 +242,7 @@ pub async fn commit_source_tracking_info( processed_source_ordinal: Option, processed_source_fp: Option>, logic_fingerprint: &[u8], + lineage_fingerprint: &[u8], process_ordinal: i64, process_time_micros: i64, target_keys: TrackedTargetKeyForSource, @@ -252,8 +255,8 @@ pub async fn commit_source_tracking_info( "INSERT INTO {} ( \ source_id, source_key, \ max_process_ordinal, staging_target_keys, \ - processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys{}) \ - VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8{})", + processed_source_ordinal, process_logic_fingerprint, process_lineage_fingerprint, process_ordinal, process_time_micros, target_keys{}) \ + VALUES ($1, $2, $7 + 1, $3, $4, $5, $6, $7, $8, $9{})", db_setup.table_name, if db_setup.has_fast_fingerprint_column { ", processed_source_fp" @@ -261,16 +264,16 @@ pub async fn commit_source_tracking_info( "" }, if db_setup.has_fast_fingerprint_column { - ", $9" + ", $10" } else { "" }, ), WriteAction::Update => format!( - "UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8{} WHERE source_id = $1 AND source_key = $2", + "UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_lineage_fingerprint = $6, process_ordinal = $7, process_time_micros = $8, target_keys = $9{} WHERE source_id = $1 AND source_key = $2", db_setup.table_name, if db_setup.has_fast_fingerprint_column { - ", processed_source_fp = $9" + ", processed_source_fp = $10" } else { "" }, @@ -282,12 +285,12 @@ pub async fn commit_source_tracking_info( .bind(sqlx::types::Json(staging_target_keys)) // $3 .bind(processed_source_ordinal) // $4 .bind(logic_fingerprint) // $5 - .bind(process_ordinal) // $6 - .bind(process_time_micros) // $7 - .bind(sqlx::types::Json(target_keys)); // $8 - + .bind(lineage_fingerprint) // $6 + .bind(process_ordinal) // $7 + .bind(process_time_micros) // $8 + .bind(sqlx::types::Json(target_keys)); // $9 if db_setup.has_fast_fingerprint_column { - query = query.bind(processed_source_fp); // $9 + query = query.bind(processed_source_fp); // $10 } query.execute(db_executor).await?; @@ -317,7 +320,8 @@ pub struct TrackedSourceKeyMetadata { pub source_key: serde_json::Value, pub processed_source_ordinal: Option, pub processed_source_fp: Option>, - pub process_logic_fingerprint: Option>, + // pub process_logic_fingerprint: Option>, + pub process_lineage_fingerprint: Option>, pub max_process_ordinal: Option, pub process_ordinal: Option, } @@ -341,7 +345,7 @@ impl ListTrackedSourceKeyMetadataState { ) -> impl Stream> + 'a { self.query_str = format!( "SELECT \ - source_key, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal \ + source_key, processed_source_ordinal, {}, process_logic_fingerprint, process_lineage_fingerprint, max_process_ordinal, process_ordinal \ FROM {} WHERE source_id = $1", if db_setup.has_fast_fingerprint_column { "processed_source_fp" diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index fec3afca8..368bf97d4 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -45,6 +45,7 @@ async fn upgrade_tracking_table( processed_source_ordinal BIGINT, {opt_fast_fingerprint_column} process_logic_fingerprint BYTEA, + process_lineage_fingerprint BYTEA, process_ordinal BIGINT, process_time_micros BIGINT, target_keys JSONB, diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index 59cf310cf..5ea641472 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -54,14 +54,14 @@ pub struct SourceVersion { impl SourceVersion { pub fn from_stored( stored_ordinal: Option, - stored_fp: &Option>, - curr_fp: Fingerprint, + stored_lineage_fp: &Option>, + curr_lineage_fp: Fingerprint, ) -> Self { Self { ordinal: Ordinal(stored_ordinal), - kind: match &stored_fp { + kind: match &stored_lineage_fp { Some(stored_fp) => { - if stored_fp.as_slice() == curr_fp.0.as_slice() { + if stored_fp.as_slice() == curr_lineage_fp.0.as_slice() { SourceVersionKind::CurrentLogic } else { SourceVersionKind::DifferentLogic @@ -74,23 +74,23 @@ impl SourceVersion { pub fn from_stored_processing_info( info: &db_tracking::SourceTrackingInfoForProcessing, - curr_fp: Fingerprint, + curr_lineage_fp: Fingerprint, ) -> Self { Self::from_stored( info.processed_source_ordinal, - &info.process_logic_fingerprint, - curr_fp, + &info.process_lineage_fingerprint, + curr_lineage_fp, ) } pub fn from_stored_precommit_info( info: &db_tracking::SourceTrackingInfoForPrecommit, - curr_fp: Fingerprint, + curr_lineage_fp: Fingerprint, ) -> Self { Self::from_stored( info.processed_source_ordinal, - &info.process_logic_fingerprint, - curr_fp, + &info.process_lineage_fingerprint, + curr_lineage_fp, ) } @@ -119,9 +119,16 @@ impl SourceVersion { // Never process older ordinals to maintain consistency let should_skip = match (self.ordinal.0, target.ordinal.0) { (Some(existing_ordinal), Some(target_ordinal)) => { - // Skip if target ordinal is older, or same ordinal with same/older logic version - existing_ordinal > target_ordinal - || (existing_ordinal == target_ordinal && self.kind >= target.kind) + // Skip if target ordinal is older + if existing_ordinal > target_ordinal { + true + // If ordinals are equal, only skip if logic is unchanged (CurrentLogic) + } else if existing_ordinal == target_ordinal { + self.kind == SourceVersionKind::CurrentLogic + && target.kind == SourceVersionKind::CurrentLogic + } else { + false + } } _ => false, }; @@ -835,6 +842,7 @@ impl<'a> RowIndexer<'a> { source_version.ordinal.into(), source_fp, &self.src_eval_ctx.plan.logic_fingerprint.0, + &self.src_eval_ctx.plan.lineage_fingerprint.0, precommit_metadata.process_ordinal, self.process_time.timestamp_micros(), precommit_metadata.new_target_keys, @@ -1076,4 +1084,87 @@ mod tests { "After optimization, same ordinal should be skipped" ); } + + #[test] + fn test_lineage_fingerprint_skip_logic_happy_path() { + // Happy path: lineage fingerprint matches, should skip if ordinal is same or older + let stored_version = SourceVersion { + ordinal: Ordinal(Some(100)), + kind: SourceVersionKind::CurrentLogic, + }; + let target_version = SourceVersion { + ordinal: Ordinal(Some(100)), + kind: SourceVersionKind::CurrentLogic, + }; + assert!(stored_version.should_skip(&target_version, None)); + // Newer ordinal should not skip + let newer_version = SourceVersion { + ordinal: Ordinal(Some(101)), + kind: SourceVersionKind::CurrentLogic, + }; + assert!(!stored_version.should_skip(&newer_version, None)); + } + + #[test] + fn test_lineage_fingerprint_skip_logic_benign_change() { + // Benign change: lineage fingerprint unchanged, ordinal increases + let stored_version = SourceVersion { + ordinal: Ordinal(Some(100)), + kind: SourceVersionKind::CurrentLogic, + }; + let benign_version = SourceVersion { + ordinal: Ordinal(Some(101)), + kind: SourceVersionKind::CurrentLogic, + }; + assert!(!stored_version.should_skip(&benign_version, None)); + } + + #[test] + fn test_lineage_fingerprint_skip_logic_breaking_change() { + // Breaking change: lineage fingerprint changes, should not skip even if ordinal is same + let stored_version = SourceVersion { + ordinal: Ordinal(Some(100)), + kind: SourceVersionKind::CurrentLogic, + }; + let breaking_version = SourceVersion { + ordinal: Ordinal(Some(100)), + kind: SourceVersionKind::DifferentLogic, + }; + assert!(!stored_version.should_skip(&breaking_version, None)); + } + + #[test] + fn test_lineage_fingerprint_skip_logic_edge_cases() { + // Edge case: missing ordinals + let stored_version = SourceVersion { + ordinal: Ordinal(None), + kind: SourceVersionKind::CurrentLogic, + }; + let target_version = SourceVersion { + ordinal: Ordinal(None), + kind: SourceVersionKind::CurrentLogic, + }; + // Should not skip if ordinals are missing + assert!(!stored_version.should_skip(&target_version, None)); + // Edge case: stored ordinal is older + let stored_version = SourceVersion { + ordinal: Ordinal(Some(99)), + kind: SourceVersionKind::CurrentLogic, + }; + let target_version = SourceVersion { + ordinal: Ordinal(Some(100)), + kind: SourceVersionKind::CurrentLogic, + }; + assert!(!stored_version.should_skip(&target_version, None)); + // Edge case: stored ordinal is newer + let stored_version = SourceVersion { + ordinal: Ordinal(Some(101)), + kind: SourceVersionKind::CurrentLogic, + }; + let target_version = SourceVersion { + ordinal: Ordinal(Some(100)), + kind: SourceVersionKind::CurrentLogic, + }; + assert!(stored_version.should_skip(&target_version, None)); + } } diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 0dca67779..5eda50cdf 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -283,8 +283,8 @@ impl SourceIndexingContext { version_state: SourceRowVersionState { source_version: SourceVersion::from_stored( key_metadata.processed_source_ordinal, - &key_metadata.process_logic_fingerprint, - plan.logic_fingerprint, + &key_metadata.process_lineage_fingerprint, + plan.lineage_fingerprint, ), content_version_fp: key_metadata.processed_source_fp, },