Skip to content

Commit f577e23

Browse files
authored
Use existing schema when querying local data. (#86)
Datafusion needs a valid schema to execute a query. If no associated schema found for registered table then datafusion tries to infer that schema. If there are no available listings from which schema can be derived then it fails and returns an error. This causes entire query to fail and return error response. This is fixed by giving it a proper schema to work with that is already part of metadata. There are however ways that this schema is might not be available ( maybe first event has happened yet), then we simply return doing no changes to RecordBatch. Fixes parseablehq/console#14
1 parent 3c1adf7 commit f577e23

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

server/src/query.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
*/
1818

1919
use chrono::{DateTime, Utc};
20+
use datafusion::arrow::datatypes::Schema;
2021
use datafusion::arrow::record_batch::RecordBatch;
2122
use datafusion::datasource::file_format::parquet::ParquetFormat;
2223
use datafusion::datasource::listing::ListingOptions;
2324
use datafusion::prelude::*;
2425
use serde_json::Value;
2526
use std::sync::Arc;
2627

28+
use crate::metadata::STREAM_INFO;
2729
use crate::option::CONFIG;
2830
use crate::storage;
2931
use crate::storage::ObjectStorage;
@@ -92,6 +94,14 @@ impl Query {
9294
target_partitions: 1,
9395
};
9496

97+
let schema = &STREAM_INFO.schema(&self.stream_name)?;
98+
99+
if schema.is_empty() {
100+
return Ok(());
101+
}
102+
103+
let schema: Arc<Schema> = Arc::new(serde_json::from_str(schema)?);
104+
95105
ctx.register_listing_table(
96106
&self.stream_name,
97107
CONFIG
@@ -100,7 +110,7 @@ impl Query {
100110
.to_str()
101111
.unwrap(),
102112
listing_options,
103-
None,
113+
Some(schema),
104114
)
105115
.await?;
106116

0 commit comments

Comments
 (0)