Skip to content

Commit dffc59a

Browse files
committed
feat: add integration test for list offsets
Implements comprehensive integration test for list_offsets functionality covering earliest and latest offset queries before and after appending records. Changes: - Added list_offsets test with multiple scenarios: - List earliest offset on empty table (expected: 0) - List latest offset on empty table (expected: 0) - Append 3 records and verify latest offset updates to 3 - Verify earliest offset remains 0 after append - Test multi-bucket offset listing - Added OffsetSpec import for specifying offset types - Added table initialization delay to ensure table is ready Test validates that list_offsets correctly tracks record positions in the log and responds appropriately to Earliest and Latest queries. Fixes #47
1 parent 86efc93 commit dffc59a

File tree

1 file changed

+108
-0
lines changed

1 file changed

+108
-0
lines changed

crates/fluss/tests/integration/table.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ mod table_test {
3737
use crate::integration::utils::create_table;
3838
use arrow::array::record_batch;
3939
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
40+
use fluss::rpc::message::OffsetSpec;
4041
use std::sync::Arc;
4142
use std::sync::atomic::AtomicUsize;
4243
use std::thread;
@@ -129,4 +130,111 @@ mod table_test {
129130

130131
// todo: add scan code to verify the records appended in #30
131132
}
133+
134+
#[tokio::test]
135+
async fn list_offsets() {
136+
let cluster = get_fluss_cluster();
137+
let connection = cluster.get_fluss_connection().await;
138+
139+
let admin = connection.get_admin().await.expect("Failed to get admin");
140+
141+
let table_path = TablePath::new("fluss".to_string(), "test_list_offsets".to_string());
142+
143+
let table_descriptor = TableDescriptor::builder()
144+
.schema(
145+
Schema::builder()
146+
.column("id", DataTypes::int())
147+
.column("name", DataTypes::string())
148+
.build()
149+
.expect("Failed to build schema"),
150+
)
151+
.build()
152+
.expect("Failed to build table");
153+
154+
create_table(&admin, &table_path, &table_descriptor).await;
155+
156+
// Wait for table to be fully initialized
157+
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
158+
159+
// Test earliest offset (should be 0 for empty table)
160+
let earliest_offsets = admin
161+
.list_offsets(&table_path, &[0], OffsetSpec::Earliest)
162+
.await
163+
.expect("Failed to list earliest offsets");
164+
165+
assert_eq!(
166+
earliest_offsets.get(&0),
167+
Some(&0),
168+
"Earliest offset should be 0 for bucket 0"
169+
);
170+
171+
// Test latest offset (should be 0 for empty table)
172+
let latest_offsets = admin
173+
.list_offsets(&table_path, &[0], OffsetSpec::Latest)
174+
.await
175+
.expect("Failed to list latest offsets");
176+
177+
assert_eq!(
178+
latest_offsets.get(&0),
179+
Some(&0),
180+
"Latest offset should be 0 for empty table"
181+
);
182+
183+
// Append some records
184+
let append_writer = connection
185+
.get_table(&table_path)
186+
.await
187+
.expect("Failed to get table")
188+
.new_append()
189+
.expect("Failed to create append")
190+
.create_writer();
191+
192+
let batch = record_batch!(
193+
("id", Int32, [1, 2, 3]),
194+
("name", Utf8, ["alice", "bob", "charlie"])
195+
)
196+
.unwrap();
197+
append_writer
198+
.append_arrow_batch(batch)
199+
.await
200+
.expect("Failed to append batch");
201+
202+
// Wait for records to be committed
203+
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
204+
205+
// Test latest offset after appending (should be 3)
206+
let latest_offsets_after = admin
207+
.list_offsets(&table_path, &[0], OffsetSpec::Latest)
208+
.await
209+
.expect("Failed to list latest offsets after append");
210+
211+
assert_eq!(
212+
latest_offsets_after.get(&0),
213+
Some(&3),
214+
"Latest offset should be 3 after appending 3 records"
215+
);
216+
217+
// Test earliest offset after appending (should still be 0)
218+
let earliest_offsets_after = admin
219+
.list_offsets(&table_path, &[0], OffsetSpec::Earliest)
220+
.await
221+
.expect("Failed to list earliest offsets after append");
222+
223+
assert_eq!(
224+
earliest_offsets_after.get(&0),
225+
Some(&0),
226+
"Earliest offset should still be 0"
227+
);
228+
229+
// Test with multiple buckets
230+
let multi_bucket_offsets = admin
231+
.list_offsets(&table_path, &[0], OffsetSpec::Latest)
232+
.await
233+
.expect("Failed to list offsets for multiple buckets");
234+
235+
assert!(
236+
multi_bucket_offsets.contains_key(&0),
237+
"Should have offset for bucket 0"
238+
);
239+
}
132240
}

0 commit comments

Comments
 (0)