Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LanceDatasink for integration with ray doesn't support None #3308

Closed
andrijazz opened this issue Dec 27, 2024 · 2 comments · Fixed by #3322
Closed

LanceDatasink for integration with ray doesn't support None #3308

andrijazz opened this issue Dec 27, 2024 · 2 comments · Fixed by #3322

Comments

@andrijazz
Copy link

andrijazz commented Dec 27, 2024

Passing None to nullable fields crashes ray jobs.

import lancedb
import pyarrow as pa
import ray.data
from lance.ray.sink import LanceDatasink

uri = 'testdb'
table = 'test_table'

schema = pa.schema([
    pa.field("a", pa.list_(pa.string()), nullable=False),
    pa.field("b", pa.list_(pa.string()), nullable=True),
    pa.field("c", pa.list_(pa.string()), nullable=True),
])
data = [{"a": ["foo"], "b": None, "c": None}] * 100

db = lancedb.connect('testdb')
db.create_table('test_table', schema=schema, exist_ok=True)

# init datasink
datasink = LanceDatasink(
    uri=f"{uri}/{table}.lance",
    schema=schema
)

ds = ray.data.from_items(data)
ds.write_datasink(datasink)

# Read data from table
table = db.open_table('test_table')
print(table.count_rows())

Same thing is happening for other datatypes. For example if datatype is pa.string() passing None still crashes ray job. Using pa.null() instead works for primitive datatypes but doesn't work for pa.list_(pa.string())

@westonpace
Copy link
Contributor

Here's the error. It appears the pyarrow -> arrow-rs conversion is failing for null arrays:

(Write pid=278398) thread 'lance_background_thread' panicked at /home/pace/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-53.3.0/src/ffi.rs:311:9:                           
(Write pid=278398) assertion failed: !self.children.is_null()                                                                                                                                 
(Write pid=278398) stack backtrace:                                                                                                                                                           
(Write pid=278398)    0: rust_begin_unwind                                                                                                                                                    
(Write pid=278398)    1: core::panicking::panic_fmt                                                                                                                                           
(Write pid=278398)    2: core::panicking::panic                                                                                                                                               
(Write pid=278398)    3: arrow_array::ffi::ImportedArrowArray::consume_children                                                                                                               
(Write pid=278398)    4: arrow_array::ffi::ImportedArrowArray::consume                                                                                                                        
(Write pid=278398)    5: arrow_array::ffi::ImportedArrowArray::consume_children                                                                                                               
(Write pid=278398)    6: arrow_array::ffi::ImportedArrowArray::consume                                                                                                                        
(Write pid=278398)    7: arrow_array::ffi::from_ffi_and_data_type                                                                                                                             
(Write pid=278398)    8: <arrow_array::ffi_stream::ArrowArrayStreamReader as core::iter::traits::iterator::Iterator>::next                                                                    
(Write pid=278398)    9: tokio::runtime::task::raw::poll                                                                                                                                      
(Write pid=278398) note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.                                                                                    
(Write pid=278398) thread 'lance_background_thread' panicked at /home/pace/dev/lance/rust/lance-datafusion/src/utils.rs:24:35:                                                                

A workaround for the above script (not sure if it works on your real data) is to supply the schema when you create the table so you never get null arrays.

Change ds = ray.data.from_items(data) to ds = ray.data.from_arrow([pa.Table.from_pylist(data, schema=schema)])

@andrijazz
Copy link
Author

Thanks for the fast response! I am reading data from parquet, performing transformations and assigning None's when needed. Job ends with .map where I pack data into dict ... smth like this:

def f(row):
    return {
        'a': row['a']
        'b': None,
        'c': None
    }

ds = ray.data.read_parquet(input_file)
ds = ds.map(f)
ds.write_datasink(datasink)

write_datasink fails in this case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants