Skip to content

Commit 57ac912

Browse files
authored
[chore] Bump arrow version to 57 (#58)
1 parent 86efc93 commit 57ac912

File tree

8 files changed

+46
-38
lines changed

8 files changed

+46
-38
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@ members = ["crates/fluss", "crates/examples", "bindings/python"]
3434
fluss = { version = "0.1.0", path = "./crates/fluss" }
3535
tokio = { version = "1.44.2", features = ["full"] }
3636
clap = { version = "4.5.37", features = ["derive"] }
37-
arrow = "55.1.0"
37+
arrow = "57.0.0"
3838
chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }

bindings/python/Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ name = "fluss"
2727
crate-type = ["cdylib"]
2828

2929
[dependencies]
30-
pyo3 = { version = "0.24", features = ["extension-module"] }
30+
pyo3 = { version = "0.26.0", features = ["extension-module"] }
3131
fluss = { path = "../../crates/fluss" }
3232
tokio = { workspace = true }
3333
arrow = { workspace = true }
34-
arrow-pyarrow = "55.1.0"
35-
pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] }
34+
arrow-pyarrow = "57.0.0"
35+
arrow-schema = "57.0.0"
36+
arrow-array = "57.0.0"
37+
pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
3638
chrono = { workspace = true }
3739
once_cell = "1.21.3"

bindings/python/src/admin.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl FlussAdmin {
4848
.await
4949
.map_err(|e| FlussError::new_err(e.to_string()))?;
5050

51-
Python::with_gil(|py| Ok(py.None()))
51+
Python::attach(|py| Ok(py.None()))
5252
})
5353
}
5454

@@ -67,7 +67,7 @@ impl FlussAdmin {
6767
.await
6868
.map_err(|e| FlussError::new_err(format!("Failed to get table: {e}")))?;
6969

70-
Python::with_gil(|py| {
70+
Python::attach(|py| {
7171
let table_info = TableInfo::from_core(core_table_info);
7272
Py::new(py, table_info)
7373
})
@@ -89,7 +89,7 @@ impl FlussAdmin {
8989
.await
9090
.map_err(|e| FlussError::new_err(format!("Failed to get lake snapshot: {e}")))?;
9191

92-
Python::with_gil(|py| {
92+
Python::attach(|py| {
9393
let lake_snapshot = LakeSnapshot::from_core(core_lake_snapshot);
9494
Py::new(py, lake_snapshot)
9595
})

bindings/python/src/connection.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl FlussConnection {
4141
inner: Arc::new(connection),
4242
};
4343

44-
Python::with_gil(|py| Py::new(py, py_connection))
44+
Python::attach(|py| Py::new(py, py_connection))
4545
})
4646
}
4747

@@ -57,7 +57,7 @@ impl FlussConnection {
5757

5858
let py_admin = FlussAdmin::from_core(admin);
5959

60-
Python::with_gil(|py| Py::new(py, py_admin))
60+
Python::attach(|py| Py::new(py, py_admin))
6161
})
6262
}
6363

@@ -84,7 +84,7 @@ impl FlussConnection {
8484
core_table.has_primary_key(),
8585
);
8686

87-
Python::with_gil(|py| Py::new(py, py_table))
87+
Python::attach(|py| Py::new(py, py_table))
8888
})
8989
}
9090

@@ -102,9 +102,9 @@ impl FlussConnection {
102102
#[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
103103
fn __exit__(
104104
&mut self,
105-
_exc_type: Option<PyObject>,
106-
_exc_value: Option<PyObject>,
107-
_traceback: Option<PyObject>,
105+
_exc_type: Option<Bound<'_, PyAny>>,
106+
_exc_value: Option<Bound<'_, PyAny>>,
107+
_traceback: Option<Bound<'_, PyAny>>,
108108
) -> PyResult<bool> {
109109
self.close()?;
110110
Ok(false)

bindings/python/src/metadata.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl Schema {
106106
#[new]
107107
#[pyo3(signature = (schema, primary_keys=None))]
108108
pub fn new(
109-
schema: PyObject, // PyArrow schema
109+
schema: Py<PyAny>, // PyArrow schema
110110
primary_keys: Option<Vec<String>>,
111111
) -> PyResult<Self> {
112112
let arrow_schema = crate::utils::Utils::pyarrow_to_arrow_schema(&schema)?;
@@ -553,7 +553,7 @@ impl LakeSnapshot {
553553

554554
/// Get table bucket offsets as a Python dictionary with TableBucket keys
555555
#[getter]
556-
pub fn table_buckets_offset(&self, py: Python) -> PyResult<PyObject> {
556+
pub fn table_buckets_offset(&self, py: Python) -> PyResult<Py<PyAny>> {
557557
let dict = PyDict::new(py);
558558
for (bucket, offset) in &self.table_buckets_offset {
559559
let py_bucket = TableBucket::from_core(bucket.clone());
@@ -569,7 +569,7 @@ impl LakeSnapshot {
569569
}
570570

571571
/// Get all table buckets
572-
pub fn get_table_buckets(&self, py: Python) -> PyResult<Vec<PyObject>> {
572+
pub fn get_table_buckets(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {
573573
let mut buckets = Vec::new();
574574
for bucket in self.table_buckets_offset.keys() {
575575
let py_bucket = TableBucket::from_core(bucket.clone());

bindings/python/src/table.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl FlussTable {
5151

5252
let py_writer = AppendWriter::from_core(rust_writer);
5353

54-
Python::with_gil(|py| Py::new(py, py_writer))
54+
Python::attach(|py| Py::new(py, py_writer))
5555
})
5656
}
5757

@@ -75,7 +75,7 @@ impl FlussTable {
7575
.map_err(|e| FlussError::new_err(e.to_string()))?;
7676

7777
let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone());
78-
Python::with_gil(|py| Py::new(py, py_scanner))
78+
Python::attach(|py| Py::new(py, py_scanner))
7979
})
8080
}
8181

@@ -131,10 +131,10 @@ pub struct AppendWriter {
131131
#[pymethods]
132132
impl AppendWriter {
133133
/// Write Arrow table data
134-
pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()> {
134+
pub fn write_arrow(&mut self, py: Python, table: Py<PyAny>) -> PyResult<()> {
135135
// Convert Arrow Table to batches and write each batch
136136
let batches = table.call_method0(py, "to_batches")?;
137-
let batch_list: Vec<PyObject> = batches.extract(py)?;
137+
let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
138138

139139
for batch in batch_list {
140140
self.write_arrow_batch(py, batch)?;
@@ -143,7 +143,7 @@ impl AppendWriter {
143143
}
144144

145145
/// Write Arrow batch data
146-
pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) -> PyResult<()> {
146+
pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) -> PyResult<()> {
147147
// Extract number of rows and columns from the Arrow batch
148148
let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
149149
let num_columns: usize = batch.getattr(py, "num_columns")?.extract(py)?;
@@ -175,7 +175,7 @@ impl AppendWriter {
175175
}
176176

177177
/// Write Pandas DataFrame data
178-
pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
178+
pub fn write_pandas(&mut self, py: Python, df: Py<PyAny>) -> PyResult<()> {
179179
// Import pyarrow module
180180
let pyarrow = py.import("pyarrow")?;
181181

@@ -213,7 +213,7 @@ impl AppendWriter {
213213
fn convert_python_value_to_datum(
214214
&self,
215215
py: Python,
216-
value: PyObject,
216+
value: Py<PyAny>,
217217
) -> PyResult<fcore::row::Datum<'static>> {
218218
use fcore::row::{Blob, Datum, F32, F64};
219219

@@ -321,7 +321,7 @@ impl LogScanner {
321321
}
322322

323323
/// Convert all data to Arrow Table
324-
fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
324+
fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
325325
use std::collections::HashMap;
326326
use std::time::Duration;
327327

@@ -387,7 +387,7 @@ impl LogScanner {
387387
}
388388

389389
/// Convert all data to Pandas DataFrame
390-
fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
390+
fn to_pandas(&self, py: Python) -> PyResult<Py<PyAny>> {
391391
let arrow_table = self.to_arrow(py)?;
392392

393393
// Convert Arrow Table to Pandas DataFrame using pyarrow

bindings/python/src/utils.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,19 @@
1616
// under the License.
1717

1818
use crate::*;
19-
use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
20-
use arrow_pyarrow::ToPyArrow;
19+
use arrow_pyarrow::{FromPyArrow, ToPyArrow};
20+
use arrow_schema::SchemaRef;
2121
use std::sync::Arc;
2222

2323
/// Utilities for schema conversion between PyArrow, Arrow, and Fluss
2424
pub struct Utils;
2525

2626
impl Utils {
2727
/// Convert PyArrow schema to Rust Arrow schema
28-
pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) -> PyResult<SchemaRef> {
29-
Python::with_gil(|py| {
28+
pub fn pyarrow_to_arrow_schema(py_schema: &Py<PyAny>) -> PyResult<SchemaRef> {
29+
Python::attach(|py| {
3030
let schema_bound = py_schema.bind(py);
31-
32-
let schema: ArrowSchema = arrow_pyarrow::FromPyArrow::from_pyarrow_bound(schema_bound)
31+
let schema: arrow_schema::Schema = FromPyArrow::from_pyarrow_bound(schema_bound)
3332
.map_err(|e| {
3433
FlussError::new_err(format!("Failed to convert PyArrow schema: {e}"))
3534
})?;
@@ -172,14 +171,21 @@ impl Utils {
172171
pub fn combine_batches_to_table(
173172
py: Python,
174173
batches: Vec<Arc<arrow::record_batch::RecordBatch>>,
175-
) -> PyResult<PyObject> {
176-
// Convert Rust Arrow RecordBatch to PyObject
177-
let py_batches: Result<Vec<PyObject>, _> = batches
174+
) -> PyResult<Py<PyAny>> {
175+
use arrow_array::RecordBatch as ArrowArrayRecordBatch;
176+
177+
let py_batches: Result<Vec<Py<PyAny>>, _> = batches
178178
.iter()
179179
.map(|batch| {
180-
batch.as_ref().to_pyarrow(py).map_err(|e| {
181-
FlussError::new_err(format!("Failed to convert RecordBatch to PyObject: {e}"))
182-
})
180+
ArrowArrayRecordBatch::try_new(batch.schema().clone(), batch.columns().to_vec())
181+
.map_err(|e| FlussError::new_err(format!("Failed to convert RecordBatch: {e}")))
182+
.and_then(|b| {
183+
ToPyArrow::to_pyarrow(&b, py)
184+
.map(|x| x.into())
185+
.map_err(|e| {
186+
FlussError::new_err(format!("Failed to convert to PyObject: {e}"))
187+
})
188+
})
183189
})
184190
.collect();
185191

crates/fluss/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ build = "src/build.rs"
2424

2525
[dependencies]
2626
arrow = { workspace = true }
27-
arrow-schema = "55.1.0"
27+
arrow-schema = "57.0.0"
2828
byteorder = "1.5"
2929
futures = "0.3"
3030
clap = { workspace = true }

0 commit comments

Comments
 (0)