Skip to content

Sketch out a Morselize API#20820

Draft
alamb wants to merge 14 commits intoapache:mainfrom
alamb:alamb/morsel_api
Draft

Sketch out a Morselize API#20820
alamb wants to merge 14 commits intoapache:mainfrom
alamb:alamb/morsel_api

Conversation

@alamb
Copy link
Contributor

@alamb alamb commented Mar 9, 2026

Which issue does this PR close?

Rationale for this change

I am working on reviewing / helping with #20481 and I am trying to work out what a API for morsels can look like. See thoughts here #20529 (comment)

I am therefore sketching out what a morsel API can look like, along with

  1. Sketching out adding IO fetch depth
  2. Where a work stealing algorithm could fit
  3. How to cleanly separate IO and CPU (if possible)

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the datasource Changes to the datasource crate label Mar 9, 2026
#[derive(Debug)]
pub struct Morsel {
/// The original [`PartitionedFile`] that this morsel belongs to
file: Arc<PartitionedFile>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want PartitionedFile here? In my PR I figured out it is better to just initialize the config at file level and morselize from there, but perhaps it could be different in other cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good question -- I will see what it would look like without a partition file

I was thinking that some non trivial amount of the information from partitioned file was needed (like the schema and partition columns). However, now that i type that maybe not

@alamb
Copy link
Contributor Author

alamb commented Mar 9, 2026

What I am planning to do next in this PR (after I finish up some other things for work) is

  1. Integrate in "morsels" into the Parquet opener flow (as a proof of concept)
  2. Sketch out how IO and CPU might be split

I am also trying to figure out what types of queues would belong in the FIleStream (queues of MorselFutures perhaps 🤔 )

Or maybe I need to make a more explicit state machine

  • IOMorsels
  • CPUMorsels

🤔

pub struct Morsel {
/// The original [`PartitionedFile`] that this morsel belongs to
file: Arc<PartitionedFile>,
/// File format specific information that describes the morsel, such as byte range, row group, etc.
Copy link
Contributor

@Dandandan Dandandan Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my PR it also shares ParquetMetadata on this level - without caching that saves doing the metadata IO/decode multiple times over when splitting is on a single file.

Currently the caching mechanism seems not really optimal when splitting a single file in PartitionedFile as there is no sharing - each partition will just try and start reading it.

/// 1. The API is `async` for file formats that require I/O operations to
/// determine the morsels, such as reading metadata from the file.
/// 2.
fn morselize(&self, file: Arc<PartitionedFile>) -> MorselOpenFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases we might also want to do (decoding/further processing) happening on a nr of files. E.g. with clickbench query 6, even when having 1 morsel = 1 file, I still see a small slowdown from having 100 files as 100 morsels:

│ QQuery 6  │           5.77 / 6.12 ±0.35 / 6.72 ms │                        6.63 / 7.95 ±1.29 / 9.75 ms │  1.30x slower │

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query 6 is

SELECT MIN("EventDate"), MAX("EventDate") FROM hits

So it is just scanning a super small and very well compressible (sorted) column.

Copy link
Contributor

@Dandandan Dandandan Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I suppose it was from doing some stuff over and over for morsels (in this case twice instead of once).
When also caching all of the file level stuff, we seem to be doing better here:

│ QQuery 6  │     6.51 ms │                                            7.33 ms │  1.13x slower │

Copy link
Contributor

@Dandandan Dandandan Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it seems still for a really small query it seems slightly more eagerly loading/prefetching the ParquetMetadata might help in the uncached case (below is min/avg+std/max from here )

│ QQuery 6  │           5.50 / 5.90 ±0.31 / 6.26 ms │                       6.65 / 8.86 ±2.00 / 12.13 ms │  1.50x slower │

/// This is typically an `async` function that opens the file, and returns a
/// stream of Morsels that can be processed independently. The stream may yield
/// an error if the file cannot be opened or read.
pub type MorselOpenFuture = BoxFuture<'static, Result<BoxStream<'static, Morsel>>>;
Copy link
Contributor

@Dandandan Dandandan Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One part that seems clear to me now based on some benchmarking is that we will want to split morsels into smaller ones if we're running out of them (i.e. no more files to morselize and number of morsels is < threads).
One approach that seems useful on the benchmarks is n_threads * 2 or 1MB pieces (whichever leads to larger morsels) to increase parallelism when low amount of morsels while not increasing the overhead too much.

I think this still should allow for that, but at least we want to know:

  • how many files yet to read
  • how many morsels there are left to read

This might change as well a bit once we start doing split IO and CPU - I guess at that point a smaller morsel size might even start paying off as well (we avoid the syscalls, while perhaps decoding in even smaller chunks is better for cache).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

smaller ones if we're running out of them (i.e. no more files to morselize and number of morsels is < threads).

Is this the same thing as just using smaller morsels to begin with (rather than reading entire row groups, for example, pair down the plan to just read 2MB chunks "morsels")?

I realize we would have to work out of to avoid blowing out the IOs in this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current approach, using smaller morsels to begin with, some queries slow down because of the extra overhead of small morsels, i think mainly the queries that scan a lot of data are already well parallelizable (a large number of pretty balanced row groups on large columns).

Though it could be due to the current approach which does just the same IO over and over and it shows for these queries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, specifically maybe loading the page index

@alamb
Copy link
Contributor Author

alamb commented Mar 10, 2026

I was playing around with some other ideas this afternoon and I am somewhat worried about over thinking the API. I will try and push some updates tomorrow morning

@alamb
Copy link
Contributor Author

alamb commented Mar 11, 2026

Update here is I think I have a reasonable API sketched out and I am 50% of the way through integrating it into FileStream. Once I get it more fully integrated into FileStream I'll let you know when it is ready for another look

@alamb
Copy link
Contributor Author

alamb commented Mar 12, 2026

show benchmark queue

@alamb-ghbot

This comment was marked as outdated.

@alamb alamb force-pushed the alamb/morsel_api branch 2 times, most recently from add50da to 3ed647c Compare March 12, 2026 21:44
@alamb alamb force-pushed the alamb/morsel_api branch from 3ed647c to 9450a07 Compare March 12, 2026 22:12
@alamb
Copy link
Contributor Author

alamb commented Mar 12, 2026

Update here is that I am pretty happy with how this API now looks in theory and how it integrates into the existing FileStream

The next thing I will do is to try and implement the parquet opener in terms of the native morsel API and see how that goes and if I can replicate the improvements seen by Daniel in his PR

let load_future = async move {
let stream = file_opener
// open the file to get a stream
.open(file)?
Copy link
Contributor

@Dandandan Dandandan Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (currently) does some bunch of IO/CPU before creating the stream

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this file opener "prefetching" here #20916 and get identical results (on local storage).

Perhaps we should also have a benchmark that either simulates high get latencies or uses s3 calls to see if it has any impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the prefetching I think is especially important for object stores (I think @thinkharderdev actually implemented it)

In general I have been thinking about how to best test this (especially the timing of CPU/IO requests). I like the idea of having a object store that simulates high latencies (aka looks like S3) on local disks...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create an issue seems like it should be pretty easy to add something for this :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nice, better even :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably add some randomization to it so it will simulate ranges like 10-250ms or something.

/// to get the next morsels. Best practice is to run this in a task on a
/// separate IO runtime to ensure that CPU work is not blocked by IO work,
/// but this is not strictly required by the API.
io_future: Option<BoxFuture<'static, Result<()>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking for IO / CPU separation we should be globally aware of outstanding requests so it also allows bounding the IO (on a local level)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like a shared understanding across all the FileStreams of outstanding requests.. It sounds somewhat like MemoryManager or something similar -- maybe attached to the RuntimeEnv 🤔

It is a good idea -- I haven't quite worked out how the different FileStreams will collaborate (e.g. for work stealing)

}

impl Morselizer for FileOpenerMorselizer {
fn morselize(&self, file: PartitionedFile) -> Result<Vec<Arc<dyn MorselPlanner>>> {
Copy link
Contributor

@Dandandan Dandandan Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might also want to be able to have morsels span multiple files (or otherwise somehow make reading small rowgroups across files efficient) - for locality / instruction cache / allocations ... it is more efficient to read a number of files across files until we have batch_size rows instead of decoding many record batches that are combined later in the plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTOH this also means more eager loading, so I guess it's something to carefully tune.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind the FileOpenMorselizer is basically a backwards compatibility shim -- and to get the best performance we'll have to implement specialized Morselizers for each file format.

/// without requiring the scan to eagerly buffer an unbounded amount of work.
///
/// TODO make this a config option
fn max_buffered_morsels() -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my PR it doesn't do this:

  • Any worker/partition can "morselize" a file (and then put them on the morsel queue)
  • If morsels are running out, any worker will be able to split morsels and put them back on the queue

The work stealing isn't "stealing" from other partitions, but just picking the first item from a global queue (I think we don't have to have global / local queues - just a single global queue for files / morsels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the less cross-core communication the better (so better to have per-thread work).

Though I do think the number of outstanding IOs makes more sense as some sort of global / per session setting... Though I am still thinking work stealing is a better strategy. I'll see what I can come up with

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm it makes it more complex though (also need to implement some work-stealing strategy...) with little benefit (as the morsels themselves don't hold much data) there won't be a lot of contention/cross-communication anyway...

Also I think it might be beneficial in certain cases (topk pruning) to execute the morsels in a predefined global order (e.g. for topk pruning) instead of per-partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points. I am hoping that we'll soon be in a position to test these strategies (I think we can express it all in FileStream, and the basic morselizing API is th esame)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @Dandandan, a global morsel queue seems like it would be pretty lightweight in terms of cross-core communication as long as the morsels are reasonable sized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess if morsels start getting small (e.g. ~batch size?) then using
https://docs.rs/crossbeam-deque/latest/crossbeam_deque/ or Rayon for the CPU-bound work starts making sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants