Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New features #58

Merged
merged 34 commits into from
Jan 23, 2025
Merged

New features #58

merged 34 commits into from
Jan 23, 2025

Conversation

chjuncn
Copy link
Collaborator

@chjuncn chjuncn commented Jan 10, 2025

Overall, Dataset accept list, df as inputs.

Most of the change is for Dataset accepting memorySource, others are resolving conflicts with main branch.

Change Description

Datasources.py

  1. Auto generate schema for MemorySource output_schema
  2. MemorySource accepts more types

Record.py

  1. Move build_schema_from_df to Schema class
  2. Make as_df() take para fields_in_schema

schema.py

  1. Add a new defaultSchema

datamanager.py

  1. Generate ID for memorySource instead of requiring dataset_id input from users
    • Future plan: Consider auto-generation for all data registry
    • Rationale: Users shouldn't need to manage data IDs manually

datasource.py

  1. Add get_datasource() method to centralize source retrieval in DataSource

logical.py

  1. Remove input_schema from BaseScan init() when not supported
    • LogicalOperator now accepts exactly 2 params for cleaner BaseScan

sets.py

  1. Dataset supports memory data as input

chjuncn and others added 16 commits January 8, 2025 17:35
Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df.
fix imports in pytest
fix imports for test
centralize hash functions into a helper lib
…eir own data in the computations.

Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.
* add to_df, from_df for data records

Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df.

* fix imports in pytest

fix imports in pytest

* fix imports for test

fix imports for test

* centralize hash functions into a helper lib

centralize hash functions into a helper lib

* Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.
# Change Description

### Datasources.py
1. Auto generate schema for MemorySource output_schema
2. MemorySource accepts more types

### Record.py
1. Move build_schema_from_df to Schema class
2. Make as_df() take para fields_in_schema

### schema.py
1. Add a new defaultSchema

### datamanager.py
1. Generate ID for memorySource instead of requiring dataset_id input from users
   - Future plan: Consider auto-generation for all data registry
   - Rationale: Users shouldn't need to manage data IDs manually

### datasource.py
1. Add get_datasource() method to centralize source retrieval in DataSource

### logical.py
1. Remove input_schema from BaseScan init() when not supported
   - LogicalOperator now accepts exactly 2 params for cleaner BaseScan

### sets.py
1. Dataset supports memory data as input
chjuncn and others added 5 commits January 13, 2025 10:48
add pz update --name for pz config
…es to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations. Future plans include improving Optimizer interface and the QueryProcessor interface. The refactoring maintains the existing clean interface while setting up a more scalable foundation for future development.

# Rationals
##Why this change:
### Previously
ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice.
Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up.
It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules.
### After this change
Separate core concepts to their dedicated models. The names can speak for themselves.
Put long params to Config which will be easier to maintain.
Each module is supposed to take care of one thing, and team will be easier to work together on the codebase.
Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time.
Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory.
### Important Notes
This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future.
I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place.

##Next steps
After this change looks good to you, I’ll refactor all the demos.
I’ll see how to improve Optimizer interface.

QueryProcessor class can be improved further. Currently a new class inherits both ExecutionStrategy and ProcessorStrategy to make processor run correctly, I feel this can be improved.

Some strategies seem like not working, I’ll dig into some functionalities and further improve how we define and use strategies.

# Core Classes and Their Relationships
## QueryProcessor (Base Class)
Abstract base class that defines the query processing pipeline.

Dependencies:
- Optimizer - for plan optimization
- Dataset - for data source handling
- QueryProcessorConfig - for configuration, including policy

Implementation Includes
NoSentinelSequentialSingleThreadProcessor
NoSentinelPipelinedSinglelProcessor
NoSentinelPipelinedParallelProcessor
MABSentinelSequentialSingleThreadProcessor
MABSentinelPipelinedParallelProcessor
StreamingQueryProcessor
RandomSamplingSentinelSequentialSingleThreadProcessor
RandomSamplingSentinelPipelinedProcessor

## QueryProcessorFactory
Creates specific QueryProcessor implementations.

- Creates and configures different types of processors based on:
  - ProcessingStrategyType
  - ExecutionStrategyType
  - OptimizationStrategyType

## Optimizer
Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes

## OptimizationStrategy (Abstract Base Class)

Defines interface for different optimization strategies.

Implementations include:
- GreedyStrategy
- ParetoStrategy
- SentinelStrategy
- ConfidenceIntervalStrategy
- NoOptimizationStrategy
- AutoOptimizationStrategy
Support plan.run() to executes the data pipeline, offering a streamlined way to build and process the data.

This design centralizes all operations around a single Plan (or Dataset) object, making it more intuitive to construct the pipeline and maintain focus on the data workflow.
improve the readability of get_champion_model()
@mikecafarella mikecafarella self-assigned this Jan 18, 2025
Copy link
Collaborator

@mikecafarella mikecafarella left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemorySource has a static default dataset identifier. Clearly, we can't keep the same method for identifiers that we do for pre-registered datasets.

But maybe it would be better to force a dataset identifier that reflects the input data contents. We could force subclasses to provide an ID that is computed via a hash of the raw input file (or dataframe). This would allow us to support caching behavior.

@@ -133,12 +134,34 @@ def register_dataset(self, vals, dataset_id):
self._registry[dataset_id] = ("memory", vals)
with open(self._dir + "/data/cache/registry.pkl", "wb") as f:
pickle.dump(self._registry, f)

# TODO(Jun): Consider to make dataset_id optional for all register_* methods
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really want to register a memory source? Won't this leave a permanent modification to the registered dataset table?

When someone calls "get" for a dataset, I think we should have two tables: conventionally-registered datasets, and ephemeral datasets that exist in the table but only because they were loaded when the user program started. They will be expected to disappear when the program ends.

Copy link
Collaborator Author

@chjuncn chjuncn Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was on the fence for "really want to register a memory source", but the reason why I still added a function get_or_register_memory_source() when I was trying to support dataframe from Dataset():

  1. I saw an existing method called register_dataset(self, vals, dataset_id) in datamanager, so I guess this is part of system design.
  2. It's possibly useful as there will be an unified interface for cache usage later through the system.

I like your idea of having 2 tables, one is for persisted data, one is for temporary data, and we have unified interface this these 2 tables, so the callers don't need to worry about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, when adding new features to the code, I kind-of follow 2 principles:

  1. I only update core functions directly related to the feature I'm working on
  2. I make small code improvements that will 100% improve readability when I encounter them during development

The advantage of this approach is that we can focus on and complete tasks one at a time.

But I'm always happy to discuss the related topics and update the code if you think this is the right time.

schema = DefaultSchema
source = DataDirectory().get_or_register_memory_source(source)
elif not isinstance(source, (DataSource, Set)):
raise Exception(f"Invalid source type: {type(source)}, We only support DataSource, Dataset, pd.DataFrame, list, and str")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more reasonable to do this in datasource.

(ProcessingStrategyType.NO_SENTINEL, ExecutionStrategyType.SEQUENTIAL):
lambda ds, opt, cfg: NoSentinelSequentialSingleThreadProcessor(datasource=ds, optimizer=opt, config=cfg),
(ProcessingStrategyType.NO_SENTINEL, ExecutionStrategyType.PIPELINED_SINGLE_THREAD):
lambda ds, opt, cfg: NoSentinelPipelinedSinglelProcessor(datasource=ds, optimizer=opt, config=cfg),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

processing_strategy: str = "no_sentinel"):
return hash_for_id(config.to_jsonstr() + optimizer_strategy + execution_strategy + processing_strategy)

def run(self, config: QueryProcessorConfig,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is a convenience function, which gives users a nicer way to run the executor. However, the more formal and configurable way, I think, is to create an instance of type Executor and pass in the Dataset. That's mostly what's happening in this code in any case. It mainly just calls QueryProcessorFactory, gets a processor, and then sends itself to the processor.

Since the Dataset is not intended to have any substantive code about execution, it would be nice if we could shrink this code even more than it has, so that in the future we don't need to maintain the cache logic or the various strategy parameters.

Would it be possible to change this function so that all it does is something like this?

def run(self, config: QueryProcessorConfig):
  return QueryProcessorFactory.createAndRunProcessor(this, *args, *kwargs)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. I moved the logic to the factory, and drop the cache logic for processor then, we'll need to revisit for the cache story for the whole system.

Copy link
Collaborator

@mdr223 mdr223 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a handful of comments, but overall the PR content looks good to me! Tomorrow I can try to help with passing unit tests and linting checks while I'm on the plane.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving as a mental note that we need to update the other demos as well to use the new execution syntax.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much! I'll update demos to adopt the latest execution format.

@@ -286,7 +273,8 @@ def as_json_str(self, include_bytes: bool = True, project_cols: list[str] | None

def as_dict(self, include_bytes: bool = True, project_cols: list[str] | None = None):
"""Return a dictionary representation of this DataRecord"""
dct = self.field_values.copy()
# In case of numpy types, the json.dumps will fail. Convert to native types.
dct = pd.Series(self.field_values).to_dict()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long term, I'd like to see if we could avoid the overhead of serializing and de-serializing with Pandas, but in the short term this is a great trick to avoid this issue!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, add a todo.

total_execution_cost=sum(ps.total_cost for ps in plan_stats)
)

def _should_stop_execution(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a duplicate of the abstract method above? I'm not sure which one you want to keep, but we should probably only keep one. (Also, if we keep the one with @AbstractMethod, then let's have it raise an exception (instead of returning False))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mistake :) . Only the abstract one.

"""Override to implement early stopping logic"""
return False

def _create_execution_stats(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this does not seem to be used anywhere -- I just wanted to make sure that's intended?

If/when you do want to use this to replace the code duplicated across each of the QueryProcessors (which I think is great), just make sure to aggregate the plan_stats before setting them in the ExecutionStats() (and also get the plan_strs as well):

Ex. from no-sentinel query processor:

# aggregate plan stats
aggregate_plan_stats = self.aggregate_plan_stats(plan_stats)

# add sentinel records and plan stats (if captured) to plan execution data
execution_stats = ExecutionStats(
    execution_id=self.execution_id(),
    plan_stats=aggregate_plan_stats,
    total_execution_time=time.time() - execution_start_time,
    total_execution_cost=sum(
        list(map(lambda plan_stats: plan_stats.total_plan_cost, aggregate_plan_stats.values()))
    ),
    plan_strs={plan_id: plan_stats.plan_str for plan_id, plan_stats in aggregate_plan_stats.items()},
)

(My guess is you will also want to pull the aggregate_plan_stats function into this class as well)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I planned to do use this function to replace execution stats setup, and then didn't do that as I don't want to make the change include too many things, then forgot to remove this.

I make a todo for this function, will do that in separate PR.

@@ -98,7 +106,7 @@ def execute_plan(self, plan: PhysicalPlan, num_samples: int | float = float("inf
candidate = DataRecord(schema=SourceRecord, source_id=current_scan_idx)
candidate.idx = current_scan_idx
candidate.get_item_fn = datasource.get_item
futures.append(executor.submit(PipelinedParallelPlanExecutor.execute_op_wrapper, source_operator, candidate))
futures.append(executor.submit(self.execute_op_wrapper, source_operator, candidate))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason I recalled hitting an error where the ThreadPoolExecutor did not like passing class functions (i.e. self.<something>) as an input to executor.submit() because I believe the fact that self is supposed to be the first argument to execute_op_wrapper messes up the passing of args into the execute_op_wrapper function? (hence why I turned it into an ugly @staticmethod)

I could be wrong b/c it's been a while since I checked this, but it might be good to confirm that a demo can run in parallel execution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tested a small example locally and using self worked just fine! Thanks for making this much cleaner

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #70, I moved this function to PhysicalOperator and removed the duplicate code.

sentinel_plans = optimizer.optimize(dataset, policy)
# TODO: Do we need to re-initialize the optimizer here?
self.optimizer.update_cost_model(CostModel())
sentinel_plans = self.optimizer.optimize(dataset, policy)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One note of caution: previously this class had an OptimizationStrategy of OptimizationStrategy.PARETO (or OptimizationStrategy.GREEDY).

In this function, we used OptimizationStrategy.SENTINEL to modify the behavior of the Optimizer to return a SentinelPlan. So you may want to add a function like update_strategy() to the Optimizer which can take in an optimizer_strategy_type which would be set to OptimizationStrategyType.SENTINEL here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. updated. Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, we have to create a new optimizer, I was not sure when we really need a new optimizer. Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we deepcopy the self.optimizer and update the new-ed optimizer, so we don't change self.optimizer.

Later, we probably need to think if we need to re-init optimizer every time in execute().

optimization_strategy=self.optimization_strategy,
use_final_op_quality=self.use_final_op_quality,
)
optimizer = self.deepcopy_clean_optimizer().update_cost_model(cost_model)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to the note of caution above; just make sure that by this point you are back to having an optimizer with the original OptimizationStrategyType which was used in creating this instance.

Copy link
Collaborator Author

@chjuncn chjuncn Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we already have self.optimizer instance for this class, do we need to init a new optimizer in this function? We need to init a new one because we need to clean cost_model data?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Effectively, at this point in the code we want to have an optimizer which:

  1. has the preferred optimization strategy type (PARETO or GREEDY)
  2. has a cost model with statistics collected from sentinel execution

This part was in place given your code changes, so that is good 👍

The issue is that in order to get execution statistics, we first need to create a SentinelPlan and then execute that plan. Currently, the best way to create a SentinelPlan is to use the Optimizer class with the optimization strategy type SENTINEL. This Optimizer instance will have an empty cost model (i.e. just CostModel()) because it has no statistics captured.

I don't care that much whether you init two optimizers, or init one optimizer and then update its OptimizationStrategyType and CostModel accordingly -- whichever approach you like better is fine by me!

)

# TODO: Do we need to re-initialize the optimizer here?
optimizer = self.optimizer.deepcopy_clean_optimizer()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same note of caution here re: OptimizationStrategyType.SENTINEL

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update for all processors.

optimization_strategy=self.optimization_strategy,
use_final_op_quality=self.use_final_op_quality,
)
optimizer = self.optimizer.deepcopy_clean_optimizer().update_cost_model(cost_model)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same note of caution here re: setting OptimizationStrategyType back to whatever was used to create this instance.

@@ -215,6 +215,8 @@ def cosmos_client(name: str, data: BinaryIO, output_dir: str, delay=10):
# pieces which are related to setting / reading external configurations (like "pdfprocessor").
# However, given that I can fix this in two minutes by adding this is a kwarg, I'm going to
# do that for now and revisit the issue if/when this matters.

# TODO(Jun): 1. cosmos returns 202 for me. 2. why only accept "pypdf" and "cosmos" as pdfprocessor?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tranway1 might know best about these questions?

@mdr223 mdr223 changed the base branch from main to dev January 23, 2025 11:14
@mdr223
Copy link
Collaborator

mdr223 commented Jan 23, 2025

Merging this into dev so we can see the diff between subsequent PRs and this code

@mdr223 mdr223 merged commit 5d1f3fa into dev Jan 23, 2025
0 of 2 checks passed
mdr223 added a commit that referenced this pull request Jan 23, 2025
@mdr223 mdr223 deleted the new-features branch January 24, 2025 00:14
mdr223 added a commit that referenced this pull request Jan 24, 2025
…tion, and Configuration (#73)

* New features (#58)

* add to_df, from_df for data records

Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df.

* Fix some includes

* fix imports in pytest

fix imports in pytest

* fix imports for test

fix imports for test

* centralize hash functions into a helper lib

centralize hash functions into a helper lib

* Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.

* New features (#57)

* add to_df, from_df for data records

Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df.

* fix imports in pytest

fix imports in pytest

* fix imports for test

fix imports for test

* centralize hash functions into a helper lib

centralize hash functions into a helper lib

* Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.

* I think we may still want to finish early for LimitScans; will check w/Mike

* Dataset accept list, df as inputs.

# Change Description

### Datasources.py
1. Auto generate schema for MemorySource output_schema
2. MemorySource accepts more types

### Record.py
1. Move build_schema_from_df to Schema class
2. Make as_df() take para fields_in_schema

### schema.py
1. Add a new defaultSchema

### datamanager.py
1. Generate ID for memorySource instead of requiring dataset_id input from users
   - Future plan: Consider auto-generation for all data registry
   - Rationale: Users shouldn't need to manage data IDs manually

### datasource.py
1. Add get_datasource() method to centralize source retrieval in DataSource

### logical.py
1. Remove input_schema from BaseScan init() when not supported
   - LogicalOperator now accepts exactly 2 params for cleaner BaseScan

### sets.py
1. Dataset supports memory data as input

* resolve merge conflicts

* Update sets.py

* demo doesn't write to disk

* add pz update --name for pz config

add pz update --name for pz config

* The changes move away from a single class handling all responsibilities to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations. Future plans include improving Optimizer interface and the QueryProcessor interface. The refactoring maintains the existing clean interface while setting up a more scalable foundation for future development.

# Rationals
##Why this change:
### Previously
ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice.
Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up.
It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules.
### After this change
Separate core concepts to their dedicated models. The names can speak for themselves.
Put long params to Config which will be easier to maintain.
Each module is supposed to take care of one thing, and team will be easier to work together on the codebase.
Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time.
Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory.
### Important Notes
This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future.
I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place.

##Next steps
After this change looks good to you, I’ll refactor all the demos.
I’ll see how to improve Optimizer interface.

QueryProcessor class can be improved further. Currently a new class inherits both ExecutionStrategy and ProcessorStrategy to make processor run correctly, I feel this can be improved.

Some strategies seem like not working, I’ll dig into some functionalities and further improve how we define and use strategies.

# Core Classes and Their Relationships
## QueryProcessor (Base Class)
Abstract base class that defines the query processing pipeline.

Dependencies:
- Optimizer - for plan optimization
- Dataset - for data source handling
- QueryProcessorConfig - for configuration, including policy

Implementation Includes
NoSentinelSequentialSingleThreadProcessor
NoSentinelPipelinedSinglelProcessor
NoSentinelPipelinedParallelProcessor
MABSentinelSequentialSingleThreadProcessor
MABSentinelPipelinedParallelProcessor
StreamingQueryProcessor
RandomSamplingSentinelSequentialSingleThreadProcessor
RandomSamplingSentinelPipelinedProcessor

## QueryProcessorFactory
Creates specific QueryProcessor implementations.

- Creates and configures different types of processors based on:
  - ProcessingStrategyType
  - ExecutionStrategyType
  - OptimizationStrategyType

## Optimizer
Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes

## OptimizationStrategy (Abstract Base Class)

Defines interface for different optimization strategies.

Implementations include:
- GreedyStrategy
- ParetoStrategy
- SentinelStrategy
- ConfidenceIntervalStrategy
- NoOptimizationStrategy
- AutoOptimizationStrategy

* Add option to use plan.run() to execute the plan (Dataset).

Support plan.run() to executes the data pipeline, offering a streamlined way to build and process the data.

This design centralizes all operations around a single Plan (or Dataset) object, making it more intuitive to construct the pipeline and maintain focus on the data workflow.

* improve the readability of get_champion_model()

improve the readability of get_champion_model()

* leave TODO for gerardo

* add field types when possible for schemas derived from dataframes

* use  for record field access

* map both json str methods --> to_json_str; keep indentation consistent w/indent=2

* map all as_* verb fcns --> to_* for consistency

* lint check for List --> list, Tuple --> tuple; tried addressing TODO in code

* assert first physical operator is DataSourcePhysicalOp & use source_operator.get_datasource()

* remove typo

* add newline at end of file

* mostly lint changes

* rename optimize --> get_optimal_plan to clarify purpose a bit

* adding note about None for optimization strategy

---------

Co-authored-by: Michael Cafarella <[email protected]>
Co-authored-by: Matthew Russo <[email protected]>

* Support list, df when init Dataset() (#68)

* Support list, df when init Dataset()

Support list, df when init Dataset()

1. Auto generate dataset_id for memory source
2. create _tempRegistry for  memory data, it's temporary and won't be saved to disk. This is for providing unified interface for Datasource, we will need to revisit how to provide the best cache story in the system.
3. Mike mentioned Dataset init() can only take str/DataSource and move the logic to Datasource, I moved some logic to datamanager, and I still make init() takes list/str/df/datasource, since users need to create DataSource() when creating Dataset() if I don't do this, I'm in less favor of that. But if you still think that's better, I can update it. Or after I submit this change, Mike can update the code based on the current base.
---------

Co-authored-by: Matthew Russo <[email protected]>

* improve readability for source_op get_datasource() (#69)

* improve readability for source_op get_datasource()

improve readability for source_op get_datasource()

* add abstract methods for get_datasource, get_datasource_type

* ruff --fix check

---------

Co-authored-by: Matthew Russo <[email protected]>

* Refactor to a more modular design with separate strategies for optimizer, execution, processing. (#70)

* Refactor to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations.

# Rationals
##Why this change:
### Previously
ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice.
Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up.
It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules.
### After this change
Separate core concepts to their dedicated models. The names can speak for themselves.
Put long params to Config which will be easier to maintain.
Each module is supposed to take care of one thing, and team will be easier to work together on the codebase.
Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time.
Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory.
### Important Notes
This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future.
I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place.

##Next steps
After this change looks good to you, I’ll refactor all the demos.

I’ll see how to improve Optimizer interface.

Explore how to add Human-in-the-loop for collecting users' feedback.

# Core Classes and Their Relationships
## QueryProcessor (Base Class)
Abstract base class that defines the query processing pipeline.

Dependencies:
- Optimizer - for plan optimization
- Dataset - for data source handling
- QueryProcessorConfig - for configuration, including policy

Implementation Includes
NoSentinelSequentialSingleThreadProcessor
NoSentinelPipelinedSinglelProcessor
NoSentinelPipelinedParallelProcessor
MABSentinelSequentialSingleThreadProcessor
MABSentinelPipelinedParallelProcessor
StreamingQueryProcessor
RandomSamplingSentinelSequentialSingleThreadProcessor
RandomSamplingSentinelPipelinedProcessor

## QueryProcessorFactory
Creates specific QueryProcessor implementations.

- Creates and configures different types of processors based on:
  - ProcessingStrategyType
  - ExecutionStrategyType
  - OptimizationStrategyType

## Optimizer
Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes

## OptimizationStrategy (Abstract Base Class)

Defines interface for different optimization strategies.

Implementations include:
- GreedyStrategy
- ParetoStrategy
- SentinelStrategy
- ConfidenceIntervalStrategy
- NoOptimizationStrategy
- AutoOptimizationStrategy

* lint code

lint code

* rename optimize() to get_optimal_plan()

rename optimize() to get_optimal_plan()

* rename get_optimal_plan ---> get_optimal_plans

* fix optimizer initiation problem for processors

fix optimizer initiation problem for processors.

1. init optimizer every time in create_sentinel_plan and execute
2. update optimizerStrategyType for create_sentinel_plan.

* rename _execute_stategy to _execute_best_plan which is corresponding to _execute_confidence_interval

* rename to_jsonstr to to_json_str

* add note for NoOptimizationStrategy

* to break circle dependency

* make askem demo to run with the new format

* update name in comments

* small fix: remove a bad import

* update execute interface for bdf-suite.py

The code failed with the following error, but I think this is not from my change, so i'll still submit this update for now.

  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/demos/bdf-suite.py", line 229, in <module>
    for idx, (reference, plan, stats) in enumerate(iterable):
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/processor/streaming_processor.py", line 84, in execute
    output_records = self.execute_opstream(self.plan, record)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/processor/streaming_processor.py", line 147, in execute_opstream
    record_set = operator(r)
                 ^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/operators/convert.py", line 190, in __call__
    field_answers, generation_stats = self.convert(candidate=candidate, fields=fields_to_generate)
                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/operators/convert.py", line 437, in convert
    field_answers, _, generation_stats = self.generator(candidate, fields, **gen_kwargs) # TODO: guarantee negative output from generator is None
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/generators/generators.py", line 351, in __call__
    prompt = self._generate_user_prompt(candidate, fields, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/generators/generators.py", line 194, in _generate_user_prompt
    field_lengths = [(field, len(value)) for field, value in context.items()]
                                                             ^^^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'items'

* update bdf-usecase3 demo with the latest interface

* remove _should_stop_execution as we don't use it for now.

* update biofabric demo

* update fever-demo for the latest interface

* update demo-core

* Update image-demo.py

* execute_op_wrapper returns 3 values always

previously execute_op_wrapper() in different processes return 2 or 3 values, we unified this function and make it to returns 3 values.

* fix typo in caller functions

* add **kwargs in create_processor() to accept different params for different processors

1. add **kwargs in create_processor() to accept different params for different processors

2. readability: make functions classmethod so that calling name is shorter

* add **kwargs in run() to accept different params for different processors

* QueryProcessor should take dataset instead of datasource

* update optimizer-demo to use the latest interface

* update simple-demo.py to adopt the latest interface

* fix lint issue

* ruff --fix check

* fix a param error for fever demo

* more ruff --fix check

* Fix uint tests,  still 2 failing tests

There are still 2 tests failing:

tests/pytest/test_workloads.py::test_workload[enron-eval-tiny-enron-workload]

tests/pytest/test_optimizer.py::TestParetoOptimizer::test_pareto_optimization_strategy

* ruff fix

* fix test_optimizer unittest

* revert changes in conftest.py

* add one param in get_optimal_plans() to pass use_final_op_quality flag

Consider to use config structure if the params are getting larger.

---------

Co-authored-by: Matthew Russo <[email protected]>

* unit tests passing

* linting and fix sneaky data registration error in unit test

* look up dataset type in _tempRegistry as well

---------

Co-authored-by: Jun <[email protected]>
Co-authored-by: Michael Cafarella <[email protected]>
Co-authored-by: Matthew Russo <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants