Skip to content

Add CatalogStream Interface#1246

Merged
dougbrn merged 24 commits intomainfrom
data_iterator
Feb 11, 2026
Merged

Add CatalogStream Interface#1246
dougbrn merged 24 commits intomainfrom
data_iterator

Conversation

@dougbrn
Copy link
Contributor

@dougbrn dougbrn commented Feb 5, 2026

Closes #1042. Generally pretty open on a lot of the design decisions, naming, etc here. I didn't attempt the pytorch integration mentioned in the issue, happy to discuss more about that!

@codecov
Copy link

codecov bot commented Feb 5, 2026

Codecov Report

❌ Patch coverage is 97.22222% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 96.67%. Comparing base (b1f2780) to head (5954018).
⚠️ Report is 25 commits behind head on main.

Files with missing lines Patch % Lines
src/lsdb/streams/catalog_streams.py 97.18% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1246      +/-   ##
==========================================
+ Coverage   96.66%   96.67%   +0.01%     
==========================================
  Files          46       48       +2     
  Lines        2877     2949      +72     
==========================================
+ Hits         2781     2851      +70     
- Misses         96       98       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link

github-actions bot commented Feb 5, 2026

Before [b1f2780] After [c09cf7a] Ratio Benchmark (Parameter)
178±1ms 188±5ms 1.05 benchmarks.time_open_many_columns_list
51.2±1ms 53.6±0.6ms 1.05 benchmarks.time_polygon_search
107±6ms 110±2ms 1.03 benchmarks.time_kdtree_crossmatch
7.00±0.05s 7.16±0.09s 1.02 benchmarks.time_create_large_catalog
396±8ms 403±7ms 1.02 benchmarks.time_open_many_columns_default
30.9±0.7ms 31.1±0.5ms 1.01 benchmarks.time_box_filter_on_partition
1.06±0.02s 1.06±0.01s 1 benchmarks.time_create_midsize_catalog
8.47±0.01s 8.44±0.03s 1 benchmarks.time_lazy_crossmatch_many_columns_all_suffixes
8.52±0.01s 8.55±0.05s 1 benchmarks.time_lazy_crossmatch_many_columns_overlapping_suffixes
3.90±0.03s 3.90±0.03s 1 benchmarks.time_open_many_columns_all

Click here to view all benchmarks.

@dougbrn dougbrn marked this pull request as ready for review February 5, 2026 22:13
@dougbrn dougbrn requested a review from hombit February 5, 2026 22:45
Copy link
Contributor

@hombit hombit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few overall design questions

  1. Maybe we should implement an iterator and iterable separately, so the iterable object may be reused.
  2. I'm not sure if there is a good use case for non-None iter_limit > 1: we could get row duplicates with it even in a single batch, which is not the case for 1 and None. I believe that for most ML approaches iter_limit = 1 and iter_limit = None are enough, and if people would like to do multiple epochs, they would reuse an iterable object n times.
  3. I understand that it may be out of scope and looks more like a Dask feature request, but I think we should also support anything a user can get from a Catalog object, including a Dask series they can get with ['column'] or .map_partitions.

@dougbrn
Copy link
Contributor Author

dougbrn commented Feb 6, 2026

Thanks for taking a look!

  1. I agree with this, not 100% sure on where the dividing line between the two will be and how the re-use will look API wise but I can make an attempt at it.
  2. Fair point, I had originally introduced this as a way to allow larger iteration without stepping on the API rake of this way of interacting with a single iteration:
cat_iter = lsdb.CatalogIterator(cat, loop=False)

for chunk in cat_iter:
     # do something

resulting in an endless loop after simply switching the kwarg value

cat_iter = lsdb.CatalogIterator(cat, loop=True)

for chunk in cat_iter:
     # do something

but it changing the behavior makes it probably not worth it and users will just need to be aware of what they're doing by switching that bool.
3. I'm not sold on this. I do understand that there will be situations where users may have a series as a result of catalog work, but it feels like blurring the lines between what a user can do in LSDB with a series vs with a catalog. For example, even if something like catalog["nested"].crossmatch might feel potentially right to do, we don't allow crossmatch to flex for a dd.series (I know it's not a perfect comparison). It's very trivial in column selection and in map_partitions to return a dataframe (catalog) in place of a series, from my perspective, as well.

@hombit
Copy link
Contributor

hombit commented Feb 6, 2026

@dougbrn

(2). I like loop!
(3). Ok, let's keep it catalogy for now!

(1). This is how it could look like:

# interface
stream = catalog.partition_stream()
for epoch in range(1000):
    print(f'Training epoch {epoch}')
    for df in stream:
        train_batch(df)

# implementation

class Catalog:
    def partition_stream(self, ...): return CatalogPartitionStream(...)

class CatalogPartitionStream:
    # Defines random seed, etc, so each epoch training has different shuffling
    # Doesn't define partitions_left, etc
    def __init__(self, ...): ...
    def __iter__(self): CatalogPartitionIterator(...)
    # Doesn't have __next__

class CatalogPartitionIterator:
    # Defines iteration state, e.g. partitions_left, etc
    def __init__(self,...): ...
    def __iter__(self): return self
    def __next__(self): ...

@hombit
Copy link
Contributor

hombit commented Feb 6, 2026

It also may be infinite instead of loop. Or we can just drop that functionality and focus on a one-time catalog scanning

@dougbrn dougbrn changed the title Add CatalogIterator Interface Add CatalogStream Interface Feb 6, 2026
@dougbrn dougbrn requested a review from hombit February 6, 2026 23:42
@dougbrn
Copy link
Contributor Author

dougbrn commented Feb 6, 2026

@hombit make some big changes based on our conversations!

Copy link
Contributor

@hombit hombit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like this implementation!

I think that the Stream object should not be changed by an Iterator object. I propose to "split" rng when creating an iterator and pass a new one inside it, and then pass it back to get_next_partitions. So basically, the stream's rng is used to initialize new iterators, and the iterator's rng is used for all the shuffling.

@dougbrn dougbrn requested a review from hombit February 10, 2026 22:05
@dougbrn
Copy link
Contributor Author

dougbrn commented Feb 10, 2026

@hombit Now the rng should be split

@hombit
Copy link
Contributor

hombit commented Feb 10, 2026

@dougbrn rng.spawn(1)[0]

Copy link
Contributor

@hombit hombit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thank you!

@dougbrn dougbrn merged commit 2372c11 into main Feb 11, 2026
12 checks passed
@dougbrn dougbrn deleted the data_iterator branch February 11, 2026 17:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add a background running data iterator interface

2 participants