Skip to content

Commit

Permalink
Refactor to a more modular design with separate strategies for optimi…
Browse files Browse the repository at this point in the history
…zer, execution, processing. (#70)

* Refactor to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations.

# Rationals
##Why this change:
### Previously
ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice.
Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up.
It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules.
### After this change
Separate core concepts to their dedicated models. The names can speak for themselves.
Put long params to Config which will be easier to maintain.
Each module is supposed to take care of one thing, and team will be easier to work together on the codebase.
Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time.
Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory.
### Important Notes
This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future.
I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place.

##Next steps
After this change looks good to you, I’ll refactor all the demos.

I’ll see how to improve Optimizer interface.

Explore how to add Human-in-the-loop for collecting users' feedback.

# Core Classes and Their Relationships
## QueryProcessor (Base Class)
Abstract base class that defines the query processing pipeline.

Dependencies:
- Optimizer - for plan optimization
- Dataset - for data source handling
- QueryProcessorConfig - for configuration, including policy

Implementation Includes
NoSentinelSequentialSingleThreadProcessor
NoSentinelPipelinedSinglelProcessor
NoSentinelPipelinedParallelProcessor
MABSentinelSequentialSingleThreadProcessor
MABSentinelPipelinedParallelProcessor
StreamingQueryProcessor
RandomSamplingSentinelSequentialSingleThreadProcessor
RandomSamplingSentinelPipelinedProcessor

## QueryProcessorFactory
Creates specific QueryProcessor implementations.

- Creates and configures different types of processors based on:
  - ProcessingStrategyType
  - ExecutionStrategyType
  - OptimizationStrategyType

## Optimizer
Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes

## OptimizationStrategy (Abstract Base Class)

Defines interface for different optimization strategies.

Implementations include:
- GreedyStrategy
- ParetoStrategy
- SentinelStrategy
- ConfidenceIntervalStrategy
- NoOptimizationStrategy
- AutoOptimizationStrategy

* lint code

lint code

* rename optimize() to get_optimal_plan()

rename optimize() to get_optimal_plan()

* rename get_optimal_plan ---> get_optimal_plans

* fix optimizer initiation problem for processors

fix optimizer initiation problem for processors.

1. init optimizer every time in create_sentinel_plan and execute
2. update optimizerStrategyType for create_sentinel_plan.

* rename _execute_stategy to _execute_best_plan which is corresponding to _execute_confidence_interval

* rename to_jsonstr to to_json_str

* add note for NoOptimizationStrategy

* to break circle dependency

* make askem demo to run with the new format

* update name in comments

* small fix: remove a bad import

* update execute interface for bdf-suite.py

The code failed with the following error, but I think this is not from my change, so i'll still submit this update for now.

  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/demos/bdf-suite.py", line 229, in <module>
    for idx, (reference, plan, stats) in enumerate(iterable):
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/processor/streaming_processor.py", line 84, in execute
    output_records = self.execute_opstream(self.plan, record)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/processor/streaming_processor.py", line 147, in execute_opstream
    record_set = operator(r)
                 ^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/operators/convert.py", line 190, in __call__
    field_answers, generation_stats = self.convert(candidate=candidate, fields=fields_to_generate)
                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/operators/convert.py", line 437, in convert
    field_answers, _, generation_stats = self.generator(candidate, fields, **gen_kwargs) # TODO: guarantee negative output from generator is None
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/generators/generators.py", line 351, in __call__
    prompt = self._generate_user_prompt(candidate, fields, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/generators/generators.py", line 194, in _generate_user_prompt
    field_lengths = [(field, len(value)) for field, value in context.items()]
                                                             ^^^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'items'

* update bdf-usecase3 demo with the latest interface

* remove _should_stop_execution as we don't use it for now.

* update biofabric demo

* update fever-demo for the latest interface

* update demo-core

* Update image-demo.py

* execute_op_wrapper returns 3 values always

previously execute_op_wrapper() in different processes return 2 or 3 values, we unified this function and make it to returns 3 values.

* fix typo in caller functions

* add **kwargs in create_processor() to accept different params for different processors

1. add **kwargs in create_processor() to accept different params for different processors

2. readability: make functions classmethod so that calling name is shorter

* add **kwargs in run() to accept different params for different processors

* QueryProcessor should take dataset instead of datasource

* update optimizer-demo to use the latest interface

* update simple-demo.py to adopt the latest interface

* fix lint issue

* ruff --fix check

* fix a param error for fever demo

* more ruff --fix check

* Fix uint tests,  still 2 failing tests

There are still 2 tests failing:

tests/pytest/test_workloads.py::test_workload[enron-eval-tiny-enron-workload]

tests/pytest/test_optimizer.py::TestParetoOptimizer::test_pareto_optimization_strategy

* ruff fix

* fix test_optimizer unittest

* revert changes in conftest.py

* add one param in get_optimal_plans() to pass use_final_op_quality flag

Consider to use config structure if the params are getting larger.

---------

Co-authored-by: Matthew Russo <[email protected]>
  • Loading branch information
chjuncn and mdr223 authored Jan 24, 2025
1 parent 34f4a64 commit 6cddd43
Show file tree
Hide file tree
Showing 33 changed files with 539 additions and 588 deletions.
35 changes: 19 additions & 16 deletions demos/askem-var.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from palimpzest.core.lib.fields import Field
from palimpzest.core.lib.schemas import Schema, TextFile
from palimpzest.policy import MaxQuality
from palimpzest.query import StreamingSequentialExecution
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.query.processor.query_processor_factory import QueryProcessorFactory
from palimpzest.sets import Dataset


Expand Down Expand Up @@ -55,26 +56,28 @@ class Variable(Schema):
Variable, desc="A variable used or introduced in the context", cardinality=Cardinality.ONE_TO_MANY
).filter("The value name is 'a'", depends_on="name")
policy = MaxQuality()
engine = StreamingSequentialExecution(
config = QueryProcessorConfig(
policy=policy,
nocache=True,
verbose=True,
allow_code_synth=False,
allow_token_reduction=False,
allow_bonded_query=True,
verbose=False,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
engine.generate_plan(output, policy)
print("Generated plan:\n", engine.plan)
processor = QueryProcessorFactory.create_processor(excerpts, config)
plan = processor.generate_plan(output, policy)
print(processor.plan)

with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
for idx, op in enumerate(engine.plan.operators):
for idx, op in enumerate(processor.plan.operators):
strop = f"{idx + 1}. {str(op)}"
strop = strop.replace("\n", " \n")
st.write(strop)

input_records = engine.get_input_records()
input_df = DataRecord.to_df(input_records, fields_in_schema=True)
input_records = processor.get_input_records()
input_df = DataRecord.as_df(input_records)
print(input_df)

variables = []
Expand All @@ -84,14 +87,14 @@ class Variable(Schema):
for idx, record in enumerate(input_records):
print(f"idx: {idx}\n vars: {vars}")
index = idx
vars = engine.execute_opstream(engine.plan, record)
vars = processor.execute_opstream(processor.plan, record)
if idx == len(input_records) - 1:
total_plan_time = time.time() - start_time
engine.plan_stats.finalize(total_plan_time)
processor.plan_stats.finalize(total_plan_time)

record_time = time.time()
statistics.append(processor.plan_stats)

statistics.append(engine.plan_stats)
intermediate_vars = DataRecord.to_df(vars, fields_in_schema=True)
print(intermediate_vars)
for var in vars:
# ref.key = ref.first_author.split()[0] + ref.title.split()[0] + str(ref.year)
try:
Expand Down
44 changes: 25 additions & 19 deletions demos/bdf-suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from palimpzest.core.lib.schemas import URL, File, PDFFile, Schema, Table, XLSFile
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality
from palimpzest.query import Execute, StreamingSequentialExecution
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.sets import Dataset
from palimpzest.utils import udfs

Expand Down Expand Up @@ -94,7 +94,7 @@ class CaseData(Schema):


@st.cache_resource()
def extract_supplemental(engine, policy):
def extract_supplemental(processing_strategy, execution_strategy, optimizer_strategy, policy):
papers = Dataset("biofabric-pdf", schema=ScientificPaper)
paper_urls = papers.convert(URL, desc="The DOI url of the paper")
html_doi = paper_urls.convert(File, udf=udfs.url_to_file)
Expand All @@ -107,15 +107,17 @@ def extract_supplemental(engine, policy):
xls = tables.convert(XLSFile, udf=udfs.file_to_xls)
patient_tables = xls.convert(Table, udf=udfs.xls_to_tables, cardinality=Cardinality.ONE_TO_MANY)

# output = patient_tables
iterable = Execute(
patient_tables,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy=processing_strategy,
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
iterable = patient_tables.run(config)


tables = []
statistics = []
Expand All @@ -128,22 +130,24 @@ def extract_supplemental(engine, policy):


@st.cache_resource()
def integrate_tables(engine, policy):
def integrate_tables(processing_strategy, execution_strategy, optimizer_strategy, policy):
xls = Dataset("biofabric-tiny", schema=XLSFile)
patient_tables = xls.convert(Table, udf=udfs.xls_to_tables, cardinality=Cardinality.ONE_TO_MANY)
patient_tables = patient_tables.filter("The table contains biometric information about the patient")
case_data = patient_tables.convert(
CaseData, desc="The patient data in the table", cardinality=Cardinality.ONE_TO_MANY
)

iterable = Execute(
case_data,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy=processing_strategy,
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
iterable = case_data.run(config)

tables = []
statistics = []
Expand All @@ -156,22 +160,23 @@ def integrate_tables(engine, policy):


@st.cache_resource()
def extract_references(engine, policy):
def extract_references(processing_strategy, execution_strategy, optimizer_strategy, policy):
papers = Dataset("bdf-usecase3-tiny", schema=ScientificPaper)
papers = papers.filter("The paper mentions phosphorylation of Exo1")
references = papers.convert(
Reference, desc="A paper cited in the reference section", cardinality=Cardinality.ONE_TO_MANY
)

output = references
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy=processing_strategy,
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
iterable = references.run(config)

tables = []
statistics = []
Expand Down Expand Up @@ -204,17 +209,18 @@ def extract_references(engine, policy):

# output = references
# engine = NoSentinelExecution
engine = StreamingSequentialExecution
# policy = MinCost()
policy = MaxQuality()
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

references = []
statistics = []
Expand Down
20 changes: 11 additions & 9 deletions demos/bdf-usecase3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from palimpzest.core.lib.schemas import PDFFile, Schema
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality, MinCost
from palimpzest.query import Execute, StreamingSequentialExecution
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.sets import Dataset

if not os.environ.get("OPENAI_API_KEY"):
Expand Down Expand Up @@ -61,16 +61,17 @@ def run_workload():

output = references
# engine = NoSentinelExecution
engine = StreamingSequentialExecution
policy = MinCost()
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

tables = []
statistics = []
Expand Down Expand Up @@ -103,17 +104,18 @@ def run_workload():

# output = references
# engine = NoSentinelExecution
engine = StreamingSequentialExecution
# policy = MinCost()
policy = MaxQuality()
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

references = []
statistics = []
Expand Down
Loading

0 comments on commit 6cddd43

Please sign in to comment.