Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,24 @@ impl StreamedBatch {
}
}

/// Per-row filter outcome tracking for full outer joins.
///
/// In a full outer join with a filter, buffered rows that match on join
/// keys but fail every filter evaluation must be emitted with NULLs on
/// the streamed side. Three states are needed because a simple boolean
/// cannot distinguish "never matched" (handled by [`BufferedBatch::null_joined`])
/// from "matched but all filters failed" (must be emitted as null-joined).
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum FilterState {
/// Row never appeared in a matched pair.
Unvisited = 0,
/// Row matched streamed rows, but all filter evaluations failed.
AllFailed = 1,
/// Row matched and at least one filter evaluation passed.
SomePassed = 2,
}

/// A buffered batch that contains contiguous rows with same join key
///
/// `BufferedBatch` can exist as either an in-memory `RecordBatch` or a `RefCountedTempFile` on disk.
Expand All @@ -219,11 +237,9 @@ pub(super) struct BufferedBatch {
pub null_joined: Vec<usize>,
/// Size estimation used for reserving / releasing memory
pub size_estimation: usize,
/// The indices of buffered batch that the join filter doesn't satisfy.
/// This is a map between right row index and a boolean value indicating whether all joined row
/// of the right row does not satisfy the filter .
/// When dequeuing the buffered batch, we need to produce null joined rows for these indices.
pub join_filter_not_matched_map: HashMap<u64, bool>,
/// Tracks filter outcomes for buffered rows in full outer joins.
/// Indexed by absolute row position within the batch. See [`FilterState`].
pub join_filter_status: Vec<FilterState>,
/// Current buffered batch number of rows. Equal to batch.num_rows()
/// but if batch is spilled to disk this property is preferable
/// and less expensive
Expand Down Expand Up @@ -260,7 +276,7 @@ impl BufferedBatch {
join_arrays,
null_joined: vec![],
size_estimation,
join_filter_not_matched_map: HashMap::new(),
join_filter_status: vec![FilterState::Unvisited; num_rows],
num_rows,
}
}
Expand Down Expand Up @@ -1199,12 +1215,16 @@ impl MaterializingSortMergeJoinStream {
return Ok(());
}

// For buffered row which is joined with streamed side rows but all joined rows
// don't satisfy the join filter
// Collect buffered rows that matched on join keys but had every
// filter evaluation fail — these must be emitted with NULLs on
// the streamed side to satisfy full outer join semantics.
let not_matched_buffered_indices = buffered_batch
.join_filter_not_matched_map
.join_filter_status
.iter()
.filter_map(|(idx, failed)| if *failed { Some(*idx) } else { None })
.enumerate()
.filter_map(|(i, state)| {
matches!(state, FilterState::AllFailed).then_some(i as u64)
})
.collect::<Vec<_>>();

let buffered_indices =
Expand All @@ -1219,7 +1239,9 @@ impl MaterializingSortMergeJoinStream {
self.joined_record_batches
.push_batch_with_null_metadata(record_batch, self.join_type);
}
buffered_batch.join_filter_not_matched_map.clear();
buffered_batch
.join_filter_status
.fill(FilterState::Unvisited);

Ok(())
}
Expand Down Expand Up @@ -1392,15 +1414,18 @@ impl MaterializingSortMergeJoinStream {
if right.is_null(i) {
continue;
}
let buffered_index = right.value(i);
buffered_batch.join_filter_not_matched_map.insert(
buffered_index,
*buffered_batch
.join_filter_not_matched_map
.get(&buffered_index)
.unwrap_or(&true)
&& !pre_mask.value(offset + i),
);
let idx = right.value(i) as usize;
match buffered_batch.join_filter_status[idx] {
FilterState::SomePassed => {}
_ if pre_mask.value(offset + i) => {
buffered_batch.join_filter_status[idx] =
FilterState::SomePassed;
}
_ => {
buffered_batch.join_filter_status[idx] =
FilterState::AllFailed;
}
}
}
offset += chunk_len;
}
Expand Down
Loading