Skip to content

Commit 86efc93

Browse files
authored
[feat] Support append arrow record batch (#34)
1 parent f32411e commit 86efc93

File tree

13 files changed

+443
-101
lines changed

13 files changed

+443
-101
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ jobs:
9191
# only run IT in linux since no docker in macos by default
9292
run: |
9393
if [ "$RUNNER_OS" == "Linux" ]; then
94-
cargo test --features integration_tests --all-targets --workspace
94+
RUST_TEST_THREADS=1 cargo test --features integration_tests --all-targets --workspace -- --nocapture
9595
fi
9696
env:
9797
RUST_LOG: DEBUG

crates/fluss/src/client/table/append.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
// under the License.
1717

1818
use crate::client::{WriteRecord, WriterClient};
19+
use crate::error::Result;
1920
use crate::metadata::{TableInfo, TablePath};
2021
use crate::row::GenericRow;
22+
use arrow::array::RecordBatch;
2123
use std::sync::Arc;
2224

23-
use crate::error::Result;
24-
2525
#[allow(dead_code)]
2626
pub struct TableAppend {
2727
table_path: TablePath,
@@ -63,6 +63,13 @@ impl AppendWriter {
6363
result_handle.result(result)
6464
}
6565

66+
pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
67+
let record = WriteRecord::new_record_batch(self.table_path.clone(), batch);
68+
let result_handle = self.writer_client.send(&record).await?;
69+
let result = result_handle.wait().await?;
70+
result_handle.result(result)
71+
}
72+
6673
pub async fn flush(&self) -> Result<()> {
6774
self.writer_client.flush().await
6875
}

crates/fluss/src/client/write/accumulator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::client::write::batch::WriteBatch::ArrowLog;
1919
use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch};
20-
use crate::client::{ResultHandle, WriteRecord};
20+
use crate::client::{Record, ResultHandle, WriteRecord};
2121
use crate::cluster::{BucketLocation, Cluster, ServerNode};
2222
use crate::config::Config;
2323
use crate::error::Result;
@@ -105,6 +105,7 @@ impl RecordAccumulator {
105105
row_type,
106106
bucket_id,
107107
current_time_ms(),
108+
matches!(record.row, Record::RecordBatch(_)),
108109
));
109110

110111
let batch_id = batch.batch_id();
@@ -159,7 +160,6 @@ impl RecordAccumulator {
159160
true, false, true,
160161
));
161162
}
162-
163163
self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
164164
}
165165

crates/fluss/src/client/write/batch.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818
use crate::BucketId;
1919
use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
2020
use crate::client::{ResultHandle, WriteRecord};
21-
use crate::metadata::{DataType, TablePath};
22-
use std::cmp::max;
23-
2421
use crate::error::Result;
22+
use crate::metadata::{DataType, TablePath};
2523
use crate::record::MemoryLogRecordsArrowBuilder;
24+
use std::cmp::max;
2625

2726
#[allow(dead_code)]
2827
pub struct InnerWriteBatch {
@@ -140,12 +139,16 @@ impl ArrowLogWriteBatch {
140139
row_type: &DataType,
141140
bucket_id: BucketId,
142141
create_ms: i64,
142+
to_append_record_batch: bool,
143143
) -> Self {
144144
let base = InnerWriteBatch::new(batch_id, table_path, create_ms, bucket_id);
145-
146145
Self {
147146
write_batch: base,
148-
arrow_builder: MemoryLogRecordsArrowBuilder::new(schema_id, row_type),
147+
arrow_builder: MemoryLogRecordsArrowBuilder::new(
148+
schema_id,
149+
row_type,
150+
to_append_record_batch,
151+
),
149152
}
150153
}
151154

@@ -157,8 +160,13 @@ impl ArrowLogWriteBatch {
157160
if self.arrow_builder.is_closed() || self.arrow_builder.is_full() {
158161
Ok(None)
159162
} else {
160-
self.arrow_builder.append(&write_record.row)?;
161-
Ok(Some(ResultHandle::new(self.write_batch.results.receiver())))
163+
// append successfully
164+
if self.arrow_builder.append(write_record)? {
165+
Ok(Some(ResultHandle::new(self.write_batch.results.receiver())))
166+
} else {
167+
// append fail
168+
Ok(None)
169+
}
162170
}
163171
}
164172

crates/fluss/src/client/write/mod.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::error::Error;
2323
use crate::metadata::TablePath;
2424
use crate::row::GenericRow;
2525
pub use accumulator::*;
26+
use arrow::array::RecordBatch;
2627
use std::sync::Arc;
2728

2829
pub(crate) mod broadcast;
@@ -34,13 +35,28 @@ mod writer_client;
3435
pub use writer_client::WriterClient;
3536

3637
pub struct WriteRecord<'a> {
37-
pub row: GenericRow<'a>,
38+
pub row: Record<'a>,
3839
pub table_path: Arc<TablePath>,
3940
}
4041

42+
pub enum Record<'a> {
43+
Row(GenericRow<'a>),
44+
RecordBatch(Arc<RecordBatch>),
45+
}
46+
4147
impl<'a> WriteRecord<'a> {
4248
pub fn new(table_path: Arc<TablePath>, row: GenericRow<'a>) -> Self {
43-
Self { row, table_path }
49+
Self {
50+
row: Record::Row(row),
51+
table_path,
52+
}
53+
}
54+
55+
pub fn new_record_batch(table_path: Arc<TablePath>, row: RecordBatch) -> Self {
56+
Self {
57+
row: Record::RecordBatch(Arc::new(row)),
58+
table_path,
59+
}
4460
}
4561
}
4662

crates/fluss/src/client/write/sender.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ impl Sender {
122122
collated: &HashMap<i32, Vec<Arc<ReadyWriteBatch>>>,
123123
) -> Result<()> {
124124
for (leader_id, batches) in collated {
125-
println!("send request batch");
126125
self.send_write_request(*leader_id, self.ack, batches)
127126
.await?;
128127
}

crates/fluss/src/client/write/writer_client.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,12 @@ impl WriterClient {
9090
let table_path = &record.table_path;
9191
let cluster = self.metadata.get_cluster();
9292

93-
let bucket_assigner = {
94-
if let Some(assigner) = self.bucket_assigners.get(table_path) {
95-
assigner.clone()
96-
} else {
97-
let assigner = Arc::new(Self::create_bucket_assigner(table_path.as_ref()));
98-
self.bucket_assigners
99-
.insert(table_path.as_ref().clone(), assigner.clone());
100-
assigner
101-
}
102-
};
93+
let (bucket_assigner, bucket_id) = self.assign_bucket(table_path);
10394

104-
let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
105-
106-
let mut result = self.accumulate.append(record, 1, &cluster, true).await?;
95+
let mut result = self
96+
.accumulate
97+
.append(record, bucket_id, &cluster, true)
98+
.await?;
10799

108100
if result.abort_record_for_new_batch {
109101
let prev_bucket_id = bucket_id;
@@ -121,6 +113,21 @@ impl WriterClient {
121113

122114
Ok(result.result_handle.expect("result_handle should exist"))
123115
}
116+
fn assign_bucket(&self, table_path: &Arc<TablePath>) -> (Arc<Box<dyn BucketAssigner>>, i32) {
117+
let cluster = self.metadata.get_cluster();
118+
let bucket_assigner = {
119+
if let Some(assigner) = self.bucket_assigners.get(table_path) {
120+
assigner.clone()
121+
} else {
122+
let assigner = Arc::new(Self::create_bucket_assigner(table_path.as_ref()));
123+
self.bucket_assigners
124+
.insert(table_path.as_ref().clone(), assigner.clone());
125+
assigner
126+
}
127+
};
128+
let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
129+
(bucket_assigner, bucket_id)
130+
}
124131

125132
pub async fn close(self) -> Result<()> {
126133
self.shutdown_tx

0 commit comments

Comments
 (0)