Skip to content

Commit 6c6687b

Browse files
authored
chore: add integration test for scan records after append (#51)
1 parent 57ac912 commit 6c6687b

File tree

1 file changed

+48
-2
lines changed

1 file changed

+48
-2
lines changed

crates/fluss/tests/integration/table.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ mod table_test {
3636
use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder};
3737
use crate::integration::utils::create_table;
3838
use arrow::array::record_batch;
39-
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
39+
use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath};
40+
use fluss::row::InternalRow;
4041
use std::sync::Arc;
4142
use std::sync::atomic::AtomicUsize;
4243
use std::thread;
@@ -127,6 +128,51 @@ mod table_test {
127128
.await
128129
.expect("Failed to append batch");
129130

130-
// todo: add scan code to verify the records appended in #30
131+
// Create scanner to verify appended records
132+
let table = connection
133+
.get_table(&table_path)
134+
.await
135+
.expect("Failed to get table");
136+
137+
let table_scan = table.new_scan();
138+
let log_scanner = table_scan.create_log_scanner();
139+
140+
// Subscribe to bucket 0 starting from offset 0
141+
log_scanner
142+
.subscribe(0, 0)
143+
.await
144+
.expect("Failed to subscribe to bucket");
145+
146+
// Poll for records
147+
let scan_records = log_scanner
148+
.poll(tokio::time::Duration::from_secs(5))
149+
.await
150+
.expect("Failed to poll records");
151+
152+
// Verify the scanned records
153+
let table_bucket = TableBucket::new(table.table_info().table_id, 0);
154+
let records = scan_records.records(&table_bucket);
155+
156+
assert_eq!(records.len(), 6, "Expected 6 records");
157+
158+
// Verify record contents match what was appended
159+
let expected_c1_values = vec![1, 2, 3, 4, 5, 6];
160+
let expected_c2_values = vec!["a1", "a2", "a3", "a4", "a5", "a6"];
161+
162+
for (i, record) in records.iter().enumerate() {
163+
let row = record.row();
164+
assert_eq!(
165+
row.get_int(0),
166+
expected_c1_values[i],
167+
"c1 value mismatch at row {}",
168+
i
169+
);
170+
assert_eq!(
171+
row.get_string(1),
172+
expected_c2_values[i],
173+
"c2 value mismatch at row {}",
174+
i
175+
);
176+
}
131177
}
132178
}

0 commit comments

Comments
 (0)