Skip to content

Commit

Permalink
Initial Refactor to Better Modularize Processing, Execution, Optimiza…
Browse files Browse the repository at this point in the history
…tion, and Configuration (#73)

* New features (#58)

* add to_df, from_df for data records

Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df.

* Fix some includes

* fix imports in pytest

fix imports in pytest

* fix imports for test

fix imports for test

* centralize hash functions into a helper lib

centralize hash functions into a helper lib

* Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.

* New features (#57)

* add to_df, from_df for data records

Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df.

* fix imports in pytest

fix imports in pytest

* fix imports for test

fix imports for test

* centralize hash functions into a helper lib

centralize hash functions into a helper lib

* Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.

* I think we may still want to finish early for LimitScans; will check w/Mike

* Dataset accept list, df as inputs.

# Change Description

### Datasources.py
1. Auto generate schema for MemorySource output_schema
2. MemorySource accepts more types

### Record.py
1. Move build_schema_from_df to Schema class
2. Make as_df() take para fields_in_schema

### schema.py
1. Add a new defaultSchema

### datamanager.py
1. Generate ID for memorySource instead of requiring dataset_id input from users
   - Future plan: Consider auto-generation for all data registry
   - Rationale: Users shouldn't need to manage data IDs manually

### datasource.py
1. Add get_datasource() method to centralize source retrieval in DataSource

### logical.py
1. Remove input_schema from BaseScan init() when not supported
   - LogicalOperator now accepts exactly 2 params for cleaner BaseScan

### sets.py
1. Dataset supports memory data as input

* resolve merge conflicts

* Update sets.py

* demo doesn't write to disk

* add pz update --name for pz config

add pz update --name for pz config

* The changes move away from a single class handling all responsibilities to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations. Future plans include improving Optimizer interface and the QueryProcessor interface. The refactoring maintains the existing clean interface while setting up a more scalable foundation for future development.

# Rationals
##Why this change:
### Previously
ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice.
Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up.
It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules.
### After this change
Separate core concepts to their dedicated models. The names can speak for themselves.
Put long params to Config which will be easier to maintain.
Each module is supposed to take care of one thing, and team will be easier to work together on the codebase.
Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time.
Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory.
### Important Notes
This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future.
I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place.

##Next steps
After this change looks good to you, I’ll refactor all the demos.
I’ll see how to improve Optimizer interface.

QueryProcessor class can be improved further. Currently a new class inherits both ExecutionStrategy and ProcessorStrategy to make processor run correctly, I feel this can be improved.

Some strategies seem like not working, I’ll dig into some functionalities and further improve how we define and use strategies.

# Core Classes and Their Relationships
## QueryProcessor (Base Class)
Abstract base class that defines the query processing pipeline.

Dependencies:
- Optimizer - for plan optimization
- Dataset - for data source handling
- QueryProcessorConfig - for configuration, including policy

Implementation Includes
NoSentinelSequentialSingleThreadProcessor
NoSentinelPipelinedSinglelProcessor
NoSentinelPipelinedParallelProcessor
MABSentinelSequentialSingleThreadProcessor
MABSentinelPipelinedParallelProcessor
StreamingQueryProcessor
RandomSamplingSentinelSequentialSingleThreadProcessor
RandomSamplingSentinelPipelinedProcessor

## QueryProcessorFactory
Creates specific QueryProcessor implementations.

- Creates and configures different types of processors based on:
  - ProcessingStrategyType
  - ExecutionStrategyType
  - OptimizationStrategyType

## Optimizer
Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes

## OptimizationStrategy (Abstract Base Class)

Defines interface for different optimization strategies.

Implementations include:
- GreedyStrategy
- ParetoStrategy
- SentinelStrategy
- ConfidenceIntervalStrategy
- NoOptimizationStrategy
- AutoOptimizationStrategy

* Add option to use plan.run() to execute the plan (Dataset).

Support plan.run() to executes the data pipeline, offering a streamlined way to build and process the data.

This design centralizes all operations around a single Plan (or Dataset) object, making it more intuitive to construct the pipeline and maintain focus on the data workflow.

* improve the readability of get_champion_model()

improve the readability of get_champion_model()

* leave TODO for gerardo

* add field types when possible for schemas derived from dataframes

* use  for record field access

* map both json str methods --> to_json_str; keep indentation consistent w/indent=2

* map all as_* verb fcns --> to_* for consistency

* lint check for List --> list, Tuple --> tuple; tried addressing TODO in code

* assert first physical operator is DataSourcePhysicalOp & use source_operator.get_datasource()

* remove typo

* add newline at end of file

* mostly lint changes

* rename optimize --> get_optimal_plan to clarify purpose a bit

* adding note about None for optimization strategy

---------

Co-authored-by: Michael Cafarella <[email protected]>
Co-authored-by: Matthew Russo <[email protected]>

* Support list, df when init Dataset() (#68)

* Support list, df when init Dataset()

Support list, df when init Dataset()

1. Auto generate dataset_id for memory source
2. create _tempRegistry for  memory data, it's temporary and won't be saved to disk. This is for providing unified interface for Datasource, we will need to revisit how to provide the best cache story in the system.
3. Mike mentioned Dataset init() can only take str/DataSource and move the logic to Datasource, I moved some logic to datamanager, and I still make init() takes list/str/df/datasource, since users need to create DataSource() when creating Dataset() if I don't do this, I'm in less favor of that. But if you still think that's better, I can update it. Or after I submit this change, Mike can update the code based on the current base.
---------

Co-authored-by: Matthew Russo <[email protected]>

* improve readability for source_op get_datasource() (#69)

* improve readability for source_op get_datasource()

improve readability for source_op get_datasource()

* add abstract methods for get_datasource, get_datasource_type

* ruff --fix check

---------

Co-authored-by: Matthew Russo <[email protected]>

* Refactor to a more modular design with separate strategies for optimizer, execution, processing. (#70)

* Refactor to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations.

# Rationals
##Why this change:
### Previously
ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice.
Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up.
It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules.
### After this change
Separate core concepts to their dedicated models. The names can speak for themselves.
Put long params to Config which will be easier to maintain.
Each module is supposed to take care of one thing, and team will be easier to work together on the codebase.
Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time.
Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory.
### Important Notes
This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future.
I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place.

##Next steps
After this change looks good to you, I’ll refactor all the demos.

I’ll see how to improve Optimizer interface.

Explore how to add Human-in-the-loop for collecting users' feedback.

# Core Classes and Their Relationships
## QueryProcessor (Base Class)
Abstract base class that defines the query processing pipeline.

Dependencies:
- Optimizer - for plan optimization
- Dataset - for data source handling
- QueryProcessorConfig - for configuration, including policy

Implementation Includes
NoSentinelSequentialSingleThreadProcessor
NoSentinelPipelinedSinglelProcessor
NoSentinelPipelinedParallelProcessor
MABSentinelSequentialSingleThreadProcessor
MABSentinelPipelinedParallelProcessor
StreamingQueryProcessor
RandomSamplingSentinelSequentialSingleThreadProcessor
RandomSamplingSentinelPipelinedProcessor

## QueryProcessorFactory
Creates specific QueryProcessor implementations.

- Creates and configures different types of processors based on:
  - ProcessingStrategyType
  - ExecutionStrategyType
  - OptimizationStrategyType

## Optimizer
Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes

## OptimizationStrategy (Abstract Base Class)

Defines interface for different optimization strategies.

Implementations include:
- GreedyStrategy
- ParetoStrategy
- SentinelStrategy
- ConfidenceIntervalStrategy
- NoOptimizationStrategy
- AutoOptimizationStrategy

* lint code

lint code

* rename optimize() to get_optimal_plan()

rename optimize() to get_optimal_plan()

* rename get_optimal_plan ---> get_optimal_plans

* fix optimizer initiation problem for processors

fix optimizer initiation problem for processors.

1. init optimizer every time in create_sentinel_plan and execute
2. update optimizerStrategyType for create_sentinel_plan.

* rename _execute_stategy to _execute_best_plan which is corresponding to _execute_confidence_interval

* rename to_jsonstr to to_json_str

* add note for NoOptimizationStrategy

* to break circle dependency

* make askem demo to run with the new format

* update name in comments

* small fix: remove a bad import

* update execute interface for bdf-suite.py

The code failed with the following error, but I think this is not from my change, so i'll still submit this update for now.

  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/demos/bdf-suite.py", line 229, in <module>
    for idx, (reference, plan, stats) in enumerate(iterable):
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/processor/streaming_processor.py", line 84, in execute
    output_records = self.execute_opstream(self.plan, record)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/processor/streaming_processor.py", line 147, in execute_opstream
    record_set = operator(r)
                 ^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/operators/convert.py", line 190, in __call__
    field_answers, generation_stats = self.convert(candidate=candidate, fields=fields_to_generate)
                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/operators/convert.py", line 437, in convert
    field_answers, _, generation_stats = self.generator(candidate, fields, **gen_kwargs) # TODO: guarantee negative output from generator is None
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/generators/generators.py", line 351, in __call__
    prompt = self._generate_user_prompt(candidate, fields, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chjun/Documents/GitHub/code/0122/palimpzest/src/palimpzest/query/generators/generators.py", line 194, in _generate_user_prompt
    field_lengths = [(field, len(value)) for field, value in context.items()]
                                                             ^^^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'items'

* update bdf-usecase3 demo with the latest interface

* remove _should_stop_execution as we don't use it for now.

* update biofabric demo

* update fever-demo for the latest interface

* update demo-core

* Update image-demo.py

* execute_op_wrapper returns 3 values always

previously execute_op_wrapper() in different processes return 2 or 3 values, we unified this function and make it to returns 3 values.

* fix typo in caller functions

* add **kwargs in create_processor() to accept different params for different processors

1. add **kwargs in create_processor() to accept different params for different processors

2. readability: make functions classmethod so that calling name is shorter

* add **kwargs in run() to accept different params for different processors

* QueryProcessor should take dataset instead of datasource

* update optimizer-demo to use the latest interface

* update simple-demo.py to adopt the latest interface

* fix lint issue

* ruff --fix check

* fix a param error for fever demo

* more ruff --fix check

* Fix uint tests,  still 2 failing tests

There are still 2 tests failing:

tests/pytest/test_workloads.py::test_workload[enron-eval-tiny-enron-workload]

tests/pytest/test_optimizer.py::TestParetoOptimizer::test_pareto_optimization_strategy

* ruff fix

* fix test_optimizer unittest

* revert changes in conftest.py

* add one param in get_optimal_plans() to pass use_final_op_quality flag

Consider to use config structure if the params are getting larger.

---------

Co-authored-by: Matthew Russo <[email protected]>

* unit tests passing

* linting and fix sneaky data registration error in unit test

* look up dataset type in _tempRegistry as well

---------

Co-authored-by: Jun <[email protected]>
Co-authored-by: Michael Cafarella <[email protected]>
Co-authored-by: Matthew Russo <[email protected]>
  • Loading branch information
4 people authored Jan 24, 2025
1 parent e9f4f05 commit dd2f4a8
Show file tree
Hide file tree
Showing 67 changed files with 1,922 additions and 1,250 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ policy = MinCost()
results, execution_stats = Execute(dataset, policy)

# Writing output to disk
output_df = pd.DataFrame([r.as_dict() for r in results])[["date","sender","subject"]]
output_df = pd.DataFrame([r.to_dict() for r in results])[["date","sender","subject"]]
output_df.to_csv("july_holiday_emails.csv")
```

Expand Down
52 changes: 31 additions & 21 deletions demos/askem-var.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from palimpzest.core.lib.fields import Field
from palimpzest.core.lib.schemas import Schema, TextFile
from palimpzest.policy import MaxQuality
from palimpzest.query import StreamingSequentialExecution
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.query.processor.query_processor_factory import QueryProcessorFactory
from palimpzest.sets import Dataset


Expand All @@ -34,39 +35,48 @@ class Variable(Schema):
value = Field(desc="The value of the variable, optional, set 'null' if not found")


dict_of_excerpts = [
{"id": 0, "text": "ne of the few states producing detailed daily reports of COVID-19 confirmed cases, COVID-19 related cumulative hospitalizations, intensive care unit (ICU) admissions, and deaths per county. Likewise, Ohio is a state with marked variation of demographic and geographic attributes among counties along with substantial differences in the capacity of healthcare within the state. Our aim is to predict the spatiotemporal dynamics of the COVID-19 pandemic in relation with the distribution of the capacity of healthcare in Ohio. 2. Methods 2.1. Mathematical model We developed a spatial mathematical model to simulate the transmission dynamics of COVID-19 disease infection and spread. The spatially-explicit model incorporates geographic connectivity information at county level. The Susceptible-Infected-Hospitalized-Recovered- Dead (SIHRD) COVID-19 model classified the population into susceptibles (S), confirmed infections (I), hospitalized and ICU admitted (H), recovered (R) and dead (D). Based on a previous study that identified local air hubs and main roads as important geospatial attributes lio residing in the county. In the second scenario, we used the model to generate projections of the impact of potential easing on the non-pharmaceutical interventions in the critical care capacity of each county in Ohio. We assessed the impact of 50% reduction on the estimated impact of non-pharmaceutical interventions in reducing the hazard rate of infection. Under this scenario we calculated the proportion of ICU \n'"},
{"id": 1, "text": "t model incorporates geographic connectivity information at county level. The Susceptible-Infected-Hospitalized-Recovered- Dead (SIHRD) COVID-19 model classified the population into susceptibles (S), confirmed infections (I), hospitalized and ICU admitted (H), recovered (R) and dead (D). Based on a previous study that identified local air hubs and main roads as important geospatial attributes linked to differential COVID-19 related hospitalizations and mortality (Correa-Agudelo et a"}
]

list_of_strings = ["I have a variable a, the value is 1", "I have a variable b, the value is 2"]
list_of_numbers = [1, 2, 3, 4, 5]

if __name__ == "__main__":
run_pz = True
dataset = "askem"
file_path = "testdata/askem-tiny/"

if run_pz:
# reference, plan, stats = run_workload()
excerpts = Dataset(dataset, schema=TextFile)
df_input = pd.DataFrame(dict_of_excerpts)
excerpts = Dataset(df_input)
output = excerpts.convert(
Variable, desc="A variable used or introduced in the paper snippet", cardinality=Cardinality.ONE_TO_MANY
)

engine = StreamingSequentialExecution
Variable, desc="A variable used or introduced in the context", cardinality=Cardinality.ONE_TO_MANY
).filter("The value name is 'a'", depends_on="name")
policy = MaxQuality()
engine = StreamingSequentialExecution(
config = QueryProcessorConfig(
policy=policy,
nocache=True,
verbose=True,
allow_code_synth=False,
allow_token_reduction=False,
allow_bonded_query=True,
verbose=False,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
engine.generate_plan(output, policy)
processor = QueryProcessorFactory.create_processor(excerpts, config)
plan = processor.generate_plan(output, policy)
print(processor.plan)

print(engine.plan)
with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
for idx, op in enumerate(engine.plan.operators):
for idx, op in enumerate(processor.plan.operators):
strop = f"{idx + 1}. {str(op)}"
strop = strop.replace("\n", " \n")
st.write(strop)

input_records = engine.get_input_records()
input_records = processor.get_input_records()
input_df = DataRecord.as_df(input_records)
print(input_df)

Expand All @@ -77,13 +87,13 @@ class Variable(Schema):
for idx, record in enumerate(input_records):
print(f"idx: {idx}\n vars: {vars}")
index = idx
vars = engine.execute_opstream(engine.plan, record)
vars = processor.execute_opstream(processor.plan, record)
if idx == len(input_records) - 1:
total_plan_time = time.time() - start_time
engine.plan_stats.finalize(total_plan_time)
processor.plan_stats.finalize(total_plan_time)

record_time = time.time()
statistics.append(engine.plan_stats)
statistics.append(processor.plan_stats)

for var in vars:
# ref.key = ref.first_author.split()[0] + ref.title.split()[0] + str(ref.year)
Expand Down Expand Up @@ -120,8 +130,8 @@ class Variable(Schema):
st.write(" **value:** ", var.value, "\n")

# write variables to a json file with readable format
with open(f"askem-variables-{dataset}.json", "w") as f:
json.dump(variables, f, indent=4)
# with open(f"askem-variables-{dataset}.json", "w") as f:
# json.dump(variables, f, indent=4)
vars_df = pd.DataFrame(variables)

# G = nx.DiGraph()
Expand Down Expand Up @@ -151,7 +161,7 @@ class Variable(Schema):
#
# nx.write_gexf(G, "demos/bdf-usecase3.gexf")

print("References:", vars_df)
# print("References:", vars_df)
# st.write(table.title, table.author, table.abstract)
# endTime = time.time()
# print("Elapsed time:", endTime - startTime)
44 changes: 25 additions & 19 deletions demos/bdf-suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from palimpzest.core.lib.schemas import URL, File, PDFFile, Schema, Table, XLSFile
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality
from palimpzest.query import Execute, StreamingSequentialExecution
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.sets import Dataset
from palimpzest.utils import udfs

Expand Down Expand Up @@ -94,7 +94,7 @@ class CaseData(Schema):


@st.cache_resource()
def extract_supplemental(engine, policy):
def extract_supplemental(processing_strategy, execution_strategy, optimizer_strategy, policy):
papers = Dataset("biofabric-pdf", schema=ScientificPaper)
paper_urls = papers.convert(URL, desc="The DOI url of the paper")
html_doi = paper_urls.convert(File, udf=udfs.url_to_file)
Expand All @@ -107,15 +107,17 @@ def extract_supplemental(engine, policy):
xls = tables.convert(XLSFile, udf=udfs.file_to_xls)
patient_tables = xls.convert(Table, udf=udfs.xls_to_tables, cardinality=Cardinality.ONE_TO_MANY)

# output = patient_tables
iterable = Execute(
patient_tables,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy=processing_strategy,
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
iterable = patient_tables.run(config)


tables = []
statistics = []
Expand All @@ -128,22 +130,24 @@ def extract_supplemental(engine, policy):


@st.cache_resource()
def integrate_tables(engine, policy):
def integrate_tables(processing_strategy, execution_strategy, optimizer_strategy, policy):
xls = Dataset("biofabric-tiny", schema=XLSFile)
patient_tables = xls.convert(Table, udf=udfs.xls_to_tables, cardinality=Cardinality.ONE_TO_MANY)
patient_tables = patient_tables.filter("The table contains biometric information about the patient")
case_data = patient_tables.convert(
CaseData, desc="The patient data in the table", cardinality=Cardinality.ONE_TO_MANY
)

iterable = Execute(
case_data,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy=processing_strategy,
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
iterable = case_data.run(config)

tables = []
statistics = []
Expand All @@ -156,22 +160,23 @@ def integrate_tables(engine, policy):


@st.cache_resource()
def extract_references(engine, policy):
def extract_references(processing_strategy, execution_strategy, optimizer_strategy, policy):
papers = Dataset("bdf-usecase3-tiny", schema=ScientificPaper)
papers = papers.filter("The paper mentions phosphorylation of Exo1")
references = papers.convert(
Reference, desc="A paper cited in the reference section", cardinality=Cardinality.ONE_TO_MANY
)

output = references
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy=processing_strategy,
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
iterable = references.run(config)

tables = []
statistics = []
Expand Down Expand Up @@ -204,17 +209,18 @@ def extract_references(engine, policy):

# output = references
# engine = NoSentinelExecution
engine = StreamingSequentialExecution
# policy = MinCost()
policy = MaxQuality()
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

references = []
statistics = []
Expand Down
20 changes: 11 additions & 9 deletions demos/bdf-usecase3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from palimpzest.core.lib.schemas import PDFFile, Schema
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality, MinCost
from palimpzest.query import Execute, StreamingSequentialExecution
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.sets import Dataset

if not os.environ.get("OPENAI_API_KEY"):
Expand Down Expand Up @@ -61,16 +61,17 @@ def run_workload():

output = references
# engine = NoSentinelExecution
engine = StreamingSequentialExecution
policy = MinCost()
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

tables = []
statistics = []
Expand Down Expand Up @@ -103,17 +104,18 @@ def run_workload():

# output = references
# engine = NoSentinelExecution
engine = StreamingSequentialExecution
# policy = MinCost()
policy = MaxQuality()
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

references = []
statistics = []
Expand Down
Loading

0 comments on commit dd2f4a8

Please sign in to comment.