-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New features #58
New features #58
Conversation
Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df.
fix imports in pytest
fix imports for test
centralize hash functions into a helper lib
…eir own data in the computations. Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations. Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.
* add to_df, from_df for data records Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df. * fix imports in pytest fix imports in pytest * fix imports for test fix imports for test * centralize hash functions into a helper lib centralize hash functions into a helper lib * Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations. Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations. Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.
# Change Description ### Datasources.py 1. Auto generate schema for MemorySource output_schema 2. MemorySource accepts more types ### Record.py 1. Move build_schema_from_df to Schema class 2. Make as_df() take para fields_in_schema ### schema.py 1. Add a new defaultSchema ### datamanager.py 1. Generate ID for memorySource instead of requiring dataset_id input from users - Future plan: Consider auto-generation for all data registry - Rationale: Users shouldn't need to manage data IDs manually ### datasource.py 1. Add get_datasource() method to centralize source retrieval in DataSource ### logical.py 1. Remove input_schema from BaseScan init() when not supported - LogicalOperator now accepts exactly 2 params for cleaner BaseScan ### sets.py 1. Dataset supports memory data as input
…nto new-features
add pz update --name for pz config
…es to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations. Future plans include improving Optimizer interface and the QueryProcessor interface. The refactoring maintains the existing clean interface while setting up a more scalable foundation for future development. # Rationals ##Why this change: ### Previously ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice. Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up. It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules. ### After this change Separate core concepts to their dedicated models. The names can speak for themselves. Put long params to Config which will be easier to maintain. Each module is supposed to take care of one thing, and team will be easier to work together on the codebase. Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time. Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory. ### Important Notes This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future. I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place. ##Next steps After this change looks good to you, I’ll refactor all the demos. I’ll see how to improve Optimizer interface. QueryProcessor class can be improved further. Currently a new class inherits both ExecutionStrategy and ProcessorStrategy to make processor run correctly, I feel this can be improved. Some strategies seem like not working, I’ll dig into some functionalities and further improve how we define and use strategies. # Core Classes and Their Relationships ## QueryProcessor (Base Class) Abstract base class that defines the query processing pipeline. Dependencies: - Optimizer - for plan optimization - Dataset - for data source handling - QueryProcessorConfig - for configuration, including policy Implementation Includes NoSentinelSequentialSingleThreadProcessor NoSentinelPipelinedSinglelProcessor NoSentinelPipelinedParallelProcessor MABSentinelSequentialSingleThreadProcessor MABSentinelPipelinedParallelProcessor StreamingQueryProcessor RandomSamplingSentinelSequentialSingleThreadProcessor RandomSamplingSentinelPipelinedProcessor ## QueryProcessorFactory Creates specific QueryProcessor implementations. - Creates and configures different types of processors based on: - ProcessingStrategyType - ExecutionStrategyType - OptimizationStrategyType ## Optimizer Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes ## OptimizationStrategy (Abstract Base Class) Defines interface for different optimization strategies. Implementations include: - GreedyStrategy - ParetoStrategy - SentinelStrategy - ConfidenceIntervalStrategy - NoOptimizationStrategy - AutoOptimizationStrategy
…py which will need to be addressed
Support plan.run() to executes the data pipeline, offering a streamlined way to build and process the data. This design centralizes all operations around a single Plan (or Dataset) object, making it more intuitive to construct the pipeline and maintain focus on the data workflow.
improve the readability of get_champion_model()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MemorySource has a static default dataset identifier. Clearly, we can't keep the same method for identifiers that we do for pre-registered datasets.
But maybe it would be better to force a dataset identifier that reflects the input data contents. We could force subclasses to provide an ID that is computed via a hash of the raw input file (or dataframe). This would allow us to support caching behavior.
@@ -133,12 +134,34 @@ def register_dataset(self, vals, dataset_id): | |||
self._registry[dataset_id] = ("memory", vals) | |||
with open(self._dir + "/data/cache/registry.pkl", "wb") as f: | |||
pickle.dump(self._registry, f) | |||
|
|||
# TODO(Jun): Consider to make dataset_id optional for all register_* methods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really want to register a memory source? Won't this leave a permanent modification to the registered dataset table?
When someone calls "get" for a dataset, I think we should have two tables: conventionally-registered datasets, and ephemeral datasets that exist in the table but only because they were loaded when the user program started. They will be expected to disappear when the program ends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was on the fence for "really want to register a memory source", but the reason why I still added a function get_or_register_memory_source() when I was trying to support dataframe from Dataset():
- I saw an existing method called register_dataset(self, vals, dataset_id) in datamanager, so I guess this is part of system design.
- It's possibly useful as there will be an unified interface for cache usage later through the system.
I like your idea of having 2 tables, one is for persisted data, one is for temporary data, and we have unified interface this these 2 tables, so the callers don't need to worry about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, when adding new features to the code, I kind-of follow 2 principles:
- I only update core functions directly related to the feature I'm working on
- I make small code improvements that will 100% improve readability when I encounter them during development
The advantage of this approach is that we can focus on and complete tasks one at a time.
But I'm always happy to discuss the related topics and update the code if you think this is the right time.
schema = DefaultSchema | ||
source = DataDirectory().get_or_register_memory_source(source) | ||
elif not isinstance(source, (DataSource, Set)): | ||
raise Exception(f"Invalid source type: {type(source)}, We only support DataSource, Dataset, pd.DataFrame, list, and str") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more reasonable to do this in datasource.
(ProcessingStrategyType.NO_SENTINEL, ExecutionStrategyType.SEQUENTIAL): | ||
lambda ds, opt, cfg: NoSentinelSequentialSingleThreadProcessor(datasource=ds, optimizer=opt, config=cfg), | ||
(ProcessingStrategyType.NO_SENTINEL, ExecutionStrategyType.PIPELINED_SINGLE_THREAD): | ||
lambda ds, opt, cfg: NoSentinelPipelinedSinglelProcessor(datasource=ds, optimizer=opt, config=cfg), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
processing_strategy: str = "no_sentinel"): | ||
return hash_for_id(config.to_jsonstr() + optimizer_strategy + execution_strategy + processing_strategy) | ||
|
||
def run(self, config: QueryProcessorConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is a convenience function, which gives users a nicer way to run the executor. However, the more formal and configurable way, I think, is to create an instance of type Executor and pass in the Dataset. That's mostly what's happening in this code in any case. It mainly just calls QueryProcessorFactory, gets a processor, and then sends itself to the processor.
Since the Dataset is not intended to have any substantive code about execution, it would be nice if we could shrink this code even more than it has, so that in the future we don't need to maintain the cache logic or the various strategy parameters.
Would it be possible to change this function so that all it does is something like this?
def run(self, config: QueryProcessorConfig):
return QueryProcessorFactory.createAndRunProcessor(this, *args, *kwargs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. I moved the logic to the factory, and drop the cache logic for processor then, we'll need to revisit for the cache story for the whole system.
…perator.get_datasource()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a handful of comments, but overall the PR content looks good to me! Tomorrow I can try to help with passing unit tests and linting checks while I'm on the plane.
demos/paper-demo.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving as a mental note that we need to update the other demos as well to use the new execution syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much! I'll update demos to adopt the latest execution format.
@@ -286,7 +273,8 @@ def as_json_str(self, include_bytes: bool = True, project_cols: list[str] | None | |||
|
|||
def as_dict(self, include_bytes: bool = True, project_cols: list[str] | None = None): | |||
"""Return a dictionary representation of this DataRecord""" | |||
dct = self.field_values.copy() | |||
# In case of numpy types, the json.dumps will fail. Convert to native types. | |||
dct = pd.Series(self.field_values).to_dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the long term, I'd like to see if we could avoid the overhead of serializing and de-serializing with Pandas, but in the short term this is a great trick to avoid this issue!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, add a todo.
total_execution_cost=sum(ps.total_cost for ps in plan_stats) | ||
) | ||
|
||
def _should_stop_execution( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a duplicate of the abstract method above? I'm not sure which one you want to keep, but we should probably only keep one. (Also, if we keep the one with @AbstractMethod, then let's have it raise an exception (instead of returning False))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a mistake :) . Only the abstract one.
"""Override to implement early stopping logic""" | ||
return False | ||
|
||
def _create_execution_stats( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this does not seem to be used anywhere -- I just wanted to make sure that's intended?
If/when you do want to use this to replace the code duplicated across each of the QueryProcessors (which I think is great), just make sure to aggregate the plan_stats
before setting them in the ExecutionStats()
(and also get the plan_strs
as well):
Ex. from no-sentinel query processor:
# aggregate plan stats
aggregate_plan_stats = self.aggregate_plan_stats(plan_stats)
# add sentinel records and plan stats (if captured) to plan execution data
execution_stats = ExecutionStats(
execution_id=self.execution_id(),
plan_stats=aggregate_plan_stats,
total_execution_time=time.time() - execution_start_time,
total_execution_cost=sum(
list(map(lambda plan_stats: plan_stats.total_plan_cost, aggregate_plan_stats.values()))
),
plan_strs={plan_id: plan_stats.plan_str for plan_id, plan_stats in aggregate_plan_stats.items()},
)
(My guess is you will also want to pull the aggregate_plan_stats
function into this class as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I planned to do use this function to replace execution stats setup, and then didn't do that as I don't want to make the change include too many things, then forgot to remove this.
I make a todo for this function, will do that in separate PR.
@@ -98,7 +106,7 @@ def execute_plan(self, plan: PhysicalPlan, num_samples: int | float = float("inf | |||
candidate = DataRecord(schema=SourceRecord, source_id=current_scan_idx) | |||
candidate.idx = current_scan_idx | |||
candidate.get_item_fn = datasource.get_item | |||
futures.append(executor.submit(PipelinedParallelPlanExecutor.execute_op_wrapper, source_operator, candidate)) | |||
futures.append(executor.submit(self.execute_op_wrapper, source_operator, candidate)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason I recalled hitting an error where the ThreadPoolExecutor
did not like passing class functions (i.e. self.<something>
) as an input to executor.submit()
because I believe the fact that self
is supposed to be the first argument to execute_op_wrapper
messes up the passing of args
into the execute_op_wrapper
function? (hence why I turned it into an ugly @staticmethod
)
I could be wrong b/c it's been a while since I checked this, but it might be good to confirm that a demo can run in parallel execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just tested a small example locally and using self
worked just fine! Thanks for making this much cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #70, I moved this function to PhysicalOperator and removed the duplicate code.
sentinel_plans = optimizer.optimize(dataset, policy) | ||
# TODO: Do we need to re-initialize the optimizer here? | ||
self.optimizer.update_cost_model(CostModel()) | ||
sentinel_plans = self.optimizer.optimize(dataset, policy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One note of caution: previously this class had an OptimizationStrategy
of OptimizationStrategy.PARETO
(or OptimizationStrategy.GREEDY
).
In this function, we used OptimizationStrategy.SENTINEL
to modify the behavior of the Optimizer
to return a SentinelPlan
. So you may want to add a function like update_strategy()
to the Optimizer
which can take in an optimizer_strategy_type
which would be set to OptimizationStrategyType.SENTINEL
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. updated. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, we have to create a new optimizer, I was not sure when we really need a new optimizer. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we deepcopy the self.optimizer and update the new-ed optimizer, so we don't change self.optimizer.
Later, we probably need to think if we need to re-init optimizer every time in execute().
optimization_strategy=self.optimization_strategy, | ||
use_final_op_quality=self.use_final_op_quality, | ||
) | ||
optimizer = self.deepcopy_clean_optimizer().update_cost_model(cost_model) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to the note of caution above; just make sure that by this point you are back to having an optimizer
with the original OptimizationStrategyType
which was used in creating this instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we already have self.optimizer instance for this class, do we need to init a new optimizer in this function? We need to init a new one because we need to clean cost_model data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Effectively, at this point in the code we want to have an optimizer which:
- has the preferred optimization strategy type (PARETO or GREEDY)
- has a cost model with statistics collected from sentinel execution
This part was in place given your code changes, so that is good 👍
The issue is that in order to get execution statistics, we first need to create a SentinelPlan
and then execute that plan. Currently, the best way to create a SentinelPlan
is to use the Optimizer
class with the optimization strategy type SENTINEL. This Optimizer instance will have an empty cost model (i.e. just CostModel()
) because it has no statistics captured.
I don't care that much whether you init two optimizers, or init one optimizer and then update its OptimizationStrategyType and CostModel accordingly -- whichever approach you like better is fine by me!
) | ||
|
||
# TODO: Do we need to re-initialize the optimizer here? | ||
optimizer = self.optimizer.deepcopy_clean_optimizer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same note of caution here re: OptimizationStrategyType.SENTINEL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update for all processors.
optimization_strategy=self.optimization_strategy, | ||
use_final_op_quality=self.use_final_op_quality, | ||
) | ||
optimizer = self.optimizer.deepcopy_clean_optimizer().update_cost_model(cost_model) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same note of caution here re: setting OptimizationStrategyType
back to whatever was used to create this instance.
@@ -215,6 +215,8 @@ def cosmos_client(name: str, data: BinaryIO, output_dir: str, delay=10): | |||
# pieces which are related to setting / reading external configurations (like "pdfprocessor"). | |||
# However, given that I can fix this in two minutes by adding this is a kwarg, I'm going to | |||
# do that for now and revisit the issue if/when this matters. | |||
|
|||
# TODO(Jun): 1. cosmos returns 202 for me. 2. why only accept "pypdf" and "cosmos" as pdfprocessor? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Tranway1 might know best about these questions?
Merging this into |
…tion, and Configuration (#73) * New features (#58) * add to_df, from_df for data records Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df. * Fix some includes * fix imports in pytest fix imports in pytest * fix imports for test fix imports for test * centralize hash functions into a helper lib centralize hash functions into a helper lib * Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations. Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations. Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible. * New features (#57) * add to_df, from_df for data records Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df. * fix imports in pytest fix imports in pytest * fix imports for test fix imports for test * centralize hash functions into a helper lib centralize hash functions into a helper lib * Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations. Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations. Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible. * I think we may still want to finish early for LimitScans; will check w/Mike * Dataset accept list, df as inputs. # Change Description ### Datasources.py 1. Auto generate schema for MemorySource output_schema 2. MemorySource accepts more types ### Record.py 1. Move build_schema_from_df to Schema class 2. Make as_df() take para fields_in_schema ### schema.py 1. Add a new defaultSchema ### datamanager.py 1. Generate ID for memorySource instead of requiring dataset_id input from users - Future plan: Consider auto-generation for all data registry - Rationale: Users shouldn't need to manage data IDs manually ### datasource.py 1. Add get_datasource() method to centralize source retrieval in DataSource ### logical.py 1. Remove input_schema from BaseScan init() when not supported - LogicalOperator now accepts exactly 2 params for cleaner BaseScan ### sets.py 1. Dataset supports memory data as input * resolve merge conflicts * Update sets.py * demo doesn't write to disk * add pz update --name for pz config add pz update --name for pz config * The changes move away from a single class handling all responsibilities to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations. Future plans include improving Optimizer interface and the QueryProcessor interface. The refactoring maintains the existing clean interface while setting up a more scalable foundation for future development. # Rationals ##Why this change: ### Previously ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice. Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up. It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules. ### After this change Separate core concepts to their dedicated models. The names can speak for themselves. Put long params to Config which will be easier to maintain. Each module is supposed to take care of one thing, and team will be easier to work together on the codebase. Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time. Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory. ### Important Notes This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future. I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place. ##Next steps After this change looks good to you, I’ll refactor all the demos. I’ll see how to improve Optimizer interface. QueryProcessor class can be improved further. Currently a new class inherits both ExecutionStrategy and ProcessorStrategy to make processor run correctly, I feel this can be improved. Some strategies seem like not working, I’ll dig into some functionalities and further improve how we define and use strategies. # Core Classes and Their Relationships ## QueryProcessor (Base Class) Abstract base class that defines the query processing pipeline. Dependencies: - Optimizer - for plan optimization - Dataset - for data source handling - QueryProcessorConfig - for configuration, including policy Implementation Includes NoSentinelSequentialSingleThreadProcessor NoSentinelPipelinedSinglelProcessor NoSentinelPipelinedParallelProcessor MABSentinelSequentialSingleThreadProcessor MABSentinelPipelinedParallelProcessor StreamingQueryProcessor RandomSamplingSentinelSequentialSingleThreadProcessor RandomSamplingSentinelPipelinedProcessor ## QueryProcessorFactory Creates specific QueryProcessor implementations. - Creates and configures different types of processors based on: - ProcessingStrategyType - ExecutionStrategyType - OptimizationStrategyType ## Optimizer Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes ## OptimizationStrategy (Abstract Base Class) Defines interface for different optimization strategies. Implementations include: - GreedyStrategy - ParetoStrategy - SentinelStrategy - ConfidenceIntervalStrategy - NoOptimizationStrategy - AutoOptimizationStrategy * Add option to use plan.run() to execute the plan (Dataset). Support plan.run() to executes the data pipeline, offering a streamlined way to build and process the data. This design centralizes all operations around a single Plan (or Dataset) object, making it more intuitive to construct the pipeline and maintain focus on the data workflow. * improve the readability of get_champion_model() improve the readability of get_champion_model() * leave TODO for gerardo * add field types when possible for schemas derived from dataframes * use for record field access * map both json str methods --> to_json_str; keep indentation consistent w/indent=2 * map all as_* verb fcns --> to_* for consistency * lint check for List --> list, Tuple --> tuple; tried addressing TODO in code * assert first physical operator is DataSourcePhysicalOp & use source_operator.get_datasource() * remove typo * add newline at end of file * mostly lint changes * rename optimize --> get_optimal_plan to clarify purpose a bit * adding note about None for optimization strategy --------- Co-authored-by: Michael Cafarella <[email protected]> Co-authored-by: Matthew Russo <[email protected]> * Support list, df when init Dataset() (#68) * Support list, df when init Dataset() Support list, df when init Dataset() 1. Auto generate dataset_id for memory source 2. create _tempRegistry for memory data, it's temporary and won't be saved to disk. This is for providing unified interface for Datasource, we will need to revisit how to provide the best cache story in the system. 3. Mike mentioned Dataset init() can only take str/DataSource and move the logic to Datasource, I moved some logic to datamanager, and I still make init() takes list/str/df/datasource, since users need to create DataSource() when creating Dataset() if I don't do this, I'm in less favor of that. But if you still think that's better, I can update it. Or after I submit this change, Mike can update the code based on the current base. --------- Co-authored-by: Matthew Russo <[email protected]> * improve readability for source_op get_datasource() (#69) * improve readability for source_op get_datasource() improve readability for source_op get_datasource() * add abstract methods for get_datasource, get_datasource_type * ruff --fix check --------- Co-authored-by: Matthew Russo <[email protected]> * Refactor to a more modular design with separate strategies for optimizer, execution, processing. (#70) * Refactor to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations. # Rationals ##Why this change: ### Previously ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice. Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up. It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules. ### After this change Separate core concepts to their dedicated models. The names can speak for themselves. Put long params to Config which will be easier to maintain. Each module is supposed to take care of one thing, and team will be easier to work together on the codebase. Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time. Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory. ### Important Notes This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future. I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place. ##Next steps After this change looks good to you, I’ll refactor all the demos. I’ll see how to improve Optimizer interface. Explore how to add Human-in-the-loop for collecting users' feedback. # Core Classes and Their Relationships ## QueryProcessor (Base Class) Abstract base class that defines the query processing pipeline. Dependencies: - Optimizer - for plan optimization - Dataset - for data source handling - QueryProcessorConfig - for configuration, including policy Implementation Includes NoSentinelSequentialSingleThreadProcessor NoSentinelPipelinedSinglelProcessor NoSentinelPipelinedParallelProcessor MABSentinelSequentialSingleThreadProcessor MABSentinelPipelinedParallelProcessor StreamingQueryProcessor RandomSamplingSentinelSequentialSingleThreadProcessor RandomSamplingSentinelPipelinedProcessor ## QueryProcessorFactory Creates specific QueryProcessor implementations. - Creates and configures different types of processors based on: - ProcessingStrategyType - ExecutionStrategyType - OptimizationStrategyType ## Optimizer Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes ## OptimizationStrategy (Abstract Base Class) Defines interface for different optimization strategies. Implementations include: - GreedyStrategy - ParetoStrategy - SentinelStrategy - ConfidenceIntervalStrategy - NoOptimizationStrategy - AutoOptimizationStrategy * lint code lint code * rename optimize() to get_optimal_plan() rename optimize() to get_optimal_plan() * rename get_optimal_plan ---> get_optimal_plans * fix optimizer initiation problem for processors fix optimizer initiation problem for processors. 1. init optimizer every time in create_sentinel_plan and execute 2. update optimizerStrategyType for create_sentinel_plan. * rename _execute_stategy to _execute_best_plan which is corresponding to _execute_confidence_interval * rename to_jsonstr to to_json_str * add note for NoOptimizationStrategy * to break circle dependency * make askem demo to run with the new format * update name in comments * small fix: remove a bad import * update execute interface for bdf-suite.py The code failed with the following error, but I think this is not from my change, so i'll still submit this update for now. File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/demos/bdf-suite.py", line 229, in <module> for idx, (reference, plan, stats) in enumerate(iterable): File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/processor/streaming_processor.py", line 84, in execute output_records = self.execute_opstream(self.plan, record) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/processor/streaming_processor.py", line 147, in execute_opstream record_set = operator(r) ^^^^^^^^^^^ File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/operators/convert.py", line 190, in __call__ field_answers, generation_stats = self.convert(candidate=candidate, fields=fields_to_generate) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/operators/convert.py", line 437, in convert field_answers, _, generation_stats = self.generator(candidate, fields, **gen_kwargs) # TODO: guarantee negative output from generator is None ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/generators/generators.py", line 351, in __call__ prompt = self._generate_user_prompt(candidate, fields, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/generators/generators.py", line 194, in _generate_user_prompt field_lengths = [(field, len(value)) for field, value in context.items()] ^^^^^^^^^^^^^ AttributeError: 'str' object has no attribute 'items' * update bdf-usecase3 demo with the latest interface * remove _should_stop_execution as we don't use it for now. * update biofabric demo * update fever-demo for the latest interface * update demo-core * Update image-demo.py * execute_op_wrapper returns 3 values always previously execute_op_wrapper() in different processes return 2 or 3 values, we unified this function and make it to returns 3 values. * fix typo in caller functions * add **kwargs in create_processor() to accept different params for different processors 1. add **kwargs in create_processor() to accept different params for different processors 2. readability: make functions classmethod so that calling name is shorter * add **kwargs in run() to accept different params for different processors * QueryProcessor should take dataset instead of datasource * update optimizer-demo to use the latest interface * update simple-demo.py to adopt the latest interface * fix lint issue * ruff --fix check * fix a param error for fever demo * more ruff --fix check * Fix uint tests, still 2 failing tests There are still 2 tests failing: tests/pytest/test_workloads.py::test_workload[enron-eval-tiny-enron-workload] tests/pytest/test_optimizer.py::TestParetoOptimizer::test_pareto_optimization_strategy * ruff fix * fix test_optimizer unittest * revert changes in conftest.py * add one param in get_optimal_plans() to pass use_final_op_quality flag Consider to use config structure if the params are getting larger. --------- Co-authored-by: Matthew Russo <[email protected]> * unit tests passing * linting and fix sneaky data registration error in unit test * look up dataset type in _tempRegistry as well --------- Co-authored-by: Jun <[email protected]> Co-authored-by: Michael Cafarella <[email protected]> Co-authored-by: Matthew Russo <[email protected]>
Overall, Dataset accept list, df as inputs.
Most of the change is for Dataset accepting memorySource, others are resolving conflicts with main branch.
Change Description
Datasources.py
Record.py
schema.py
datamanager.py
datasource.py
logical.py
sets.py