Skip to content

Commit

Permalink
New features (#58)
Browse files Browse the repository at this point in the history
* add to_df, from_df for data records

Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df.

* Fix some includes

* fix imports in pytest

fix imports in pytest

* fix imports for test

fix imports for test

* centralize hash functions into a helper lib

centralize hash functions into a helper lib

* Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.

* New features (#57)

* add to_df, from_df for data records

Help users easily use DataRecord (and the chosen plan), that means they can easily use their data to execute a plan, the data could be from some df.

* fix imports in pytest

fix imports in pytest

* fix imports for test

fix imports for test

* centralize hash functions into a helper lib

centralize hash functions into a helper lib

* Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Add as_df, from_df methods for DataRecord, so users can easily use their own data in the computations.

Next step will be making df format more natural for the computation engines. DataRecord is just a special format of df, so it should be possible.

* I think we may still want to finish early for LimitScans; will check w/Mike

* Dataset accept list, df as inputs.

# Change Description

### Datasources.py
1. Auto generate schema for MemorySource output_schema
2. MemorySource accepts more types

### Record.py
1. Move build_schema_from_df to Schema class
2. Make as_df() take para fields_in_schema

### schema.py
1. Add a new defaultSchema

### datamanager.py
1. Generate ID for memorySource instead of requiring dataset_id input from users
   - Future plan: Consider auto-generation for all data registry
   - Rationale: Users shouldn't need to manage data IDs manually

### datasource.py
1. Add get_datasource() method to centralize source retrieval in DataSource

### logical.py
1. Remove input_schema from BaseScan init() when not supported
   - LogicalOperator now accepts exactly 2 params for cleaner BaseScan

### sets.py
1. Dataset supports memory data as input

* resolve merge conflicts

* Update sets.py

* demo doesn't write to disk

* add pz update --name for pz config

add pz update --name for pz config

* The changes move away from a single class handling all responsibilities to a more modular design with separate strategies for optimization, execution, and processing. Key improvements include better configuration management through a dedicated Config class, clearer separation of concerns, and more maintainable strategy implementations. Future plans include improving Optimizer interface and the QueryProcessor interface. The refactoring maintains the existing clean interface while setting up a more scalable foundation for future development.

# Rationals
##Why this change:
### Previously
ExecutionEngine takes too many responsibilities for everything. In order to give one interface to users, we put everything into one class, which is not a good practice.
Different concepts are coupling together in ExecutionEngine, for example, execute_plan(), execute(), which is confusing. It’s easy to get messed up.
It’s not a good practice to new Execute() for running, as initiating an instance and running an instance are likely having different concerns, e.g. when testing, or we might want to pass instances to different places and run them in different modules.
### After this change
Separate core concepts to their dedicated models. The names can speak for themselves.
Put long params to Config which will be easier to maintain.
Each module is supposed to take care of one thing, and team will be easier to work together on the codebase.
Split out OptmizerStrategy, ExecutionStrategy, ProcessorStrategy QueryProcessor from ExecutionEngine, it will be easier to know where we need a new strategy, and don’t need to extend the huge class every time.
Interface is still clean. In the future, we’ll add “auto” for strategies, saying, the system can figure out the best strategies based on Dataset and params in Config, which will be easy to do in Factory.
### Important Notes
This is the first infra update, and I expect we can further refine the infrastructure so that PZ will be easier to scalable in the future.
I didn’t change any code inside functions to make this change easier to review, mostly just copy things around. If you see I deleted something, 99% because I moved it to another place.

##Next steps
After this change looks good to you, I’ll refactor all the demos.
I’ll see how to improve Optimizer interface.

QueryProcessor class can be improved further. Currently a new class inherits both ExecutionStrategy and ProcessorStrategy to make processor run correctly, I feel this can be improved.

Some strategies seem like not working, I’ll dig into some functionalities and further improve how we define and use strategies.

# Core Classes and Their Relationships
## QueryProcessor (Base Class)
Abstract base class that defines the query processing pipeline.

Dependencies:
- Optimizer - for plan optimization
- Dataset - for data source handling
- QueryProcessorConfig - for configuration, including policy

Implementation Includes
NoSentinelSequentialSingleThreadProcessor
NoSentinelPipelinedSinglelProcessor
NoSentinelPipelinedParallelProcessor
MABSentinelSequentialSingleThreadProcessor
MABSentinelPipelinedParallelProcessor
StreamingQueryProcessor
RandomSamplingSentinelSequentialSingleThreadProcessor
RandomSamplingSentinelPipelinedProcessor

## QueryProcessorFactory
Creates specific QueryProcessor implementations.

- Creates and configures different types of processors based on:
  - ProcessingStrategyType
  - ExecutionStrategyType
  - OptimizationStrategyType

## Optimizer
Create OptimizerStategy to implement optimize() for different OptimizerStategyTypes

## OptimizationStrategy (Abstract Base Class)

Defines interface for different optimization strategies.

Implementations include:
- GreedyStrategy
- ParetoStrategy
- SentinelStrategy
- ConfidenceIntervalStrategy
- NoOptimizationStrategy
- AutoOptimizationStrategy

* Add option to use plan.run() to execute the plan (Dataset).

Support plan.run() to executes the data pipeline, offering a streamlined way to build and process the data.

This design centralizes all operations around a single Plan (or Dataset) object, making it more intuitive to construct the pipeline and maintain focus on the data workflow.

* improve the readability of get_champion_model()

improve the readability of get_champion_model()

* leave TODO for gerardo

* add field types when possible for schemas derived from dataframes

* use  for record field access

* map both json str methods --> to_json_str; keep indentation consistent w/indent=2

* map all as_* verb fcns --> to_* for consistency

* lint check for List --> list, Tuple --> tuple; tried addressing TODO in code

* assert first physical operator is DataSourcePhysicalOp & use source_operator.get_datasource()

* remove typo

* add newline at end of file

* mostly lint changes

* rename optimize --> get_optimal_plan to clarify purpose a bit

* adding note about None for optimization strategy

---------

Co-authored-by: Michael Cafarella <[email protected]>
Co-authored-by: Matthew Russo <[email protected]>
  • Loading branch information
3 people authored Jan 23, 2025
1 parent e9f4f05 commit 5d1f3fa
Show file tree
Hide file tree
Showing 56 changed files with 1,629 additions and 849 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ policy = MinCost()
results, execution_stats = Execute(dataset, policy)

# Writing output to disk
output_df = pd.DataFrame([r.as_dict() for r in results])[["date","sender","subject"]]
output_df = pd.DataFrame([r.to_dict() for r in results])[["date","sender","subject"]]
output_df.to_csv("july_holiday_emails.csv")
```

Expand Down
33 changes: 20 additions & 13 deletions demos/askem-var.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,26 @@ class Variable(Schema):
value = Field(desc="The value of the variable, optional, set 'null' if not found")


dict_of_excerpts = [
{"id": 0, "text": "ne of the few states producing detailed daily reports of COVID-19 confirmed cases, COVID-19 related cumulative hospitalizations, intensive care unit (ICU) admissions, and deaths per county. Likewise, Ohio is a state with marked variation of demographic and geographic attributes among counties along with substantial differences in the capacity of healthcare within the state. Our aim is to predict the spatiotemporal dynamics of the COVID-19 pandemic in relation with the distribution of the capacity of healthcare in Ohio. 2. Methods 2.1. Mathematical model We developed a spatial mathematical model to simulate the transmission dynamics of COVID-19 disease infection and spread. The spatially-explicit model incorporates geographic connectivity information at county level. The Susceptible-Infected-Hospitalized-Recovered- Dead (SIHRD) COVID-19 model classified the population into susceptibles (S), confirmed infections (I), hospitalized and ICU admitted (H), recovered (R) and dead (D). Based on a previous study that identified local air hubs and main roads as important geospatial attributes lio residing in the county. In the second scenario, we used the model to generate projections of the impact of potential easing on the non-pharmaceutical interventions in the critical care capacity of each county in Ohio. We assessed the impact of 50% reduction on the estimated impact of non-pharmaceutical interventions in reducing the hazard rate of infection. Under this scenario we calculated the proportion of ICU \n'"},
{"id": 1, "text": "t model incorporates geographic connectivity information at county level. The Susceptible-Infected-Hospitalized-Recovered- Dead (SIHRD) COVID-19 model classified the population into susceptibles (S), confirmed infections (I), hospitalized and ICU admitted (H), recovered (R) and dead (D). Based on a previous study that identified local air hubs and main roads as important geospatial attributes linked to differential COVID-19 related hospitalizations and mortality (Correa-Agudelo et a"}
]

list_of_strings = ["I have a variable a, the value is 1", "I have a variable b, the value is 2"]
list_of_numbers = [1, 2, 3, 4, 5]

if __name__ == "__main__":
run_pz = True
dataset = "askem"
file_path = "testdata/askem-tiny/"

if run_pz:
# reference, plan, stats = run_workload()
excerpts = Dataset(dataset, schema=TextFile)
df_input = pd.DataFrame(dict_of_excerpts)
excerpts = Dataset(df_input)
output = excerpts.convert(
Variable, desc="A variable used or introduced in the paper snippet", cardinality=Cardinality.ONE_TO_MANY
)

engine = StreamingSequentialExecution
Variable, desc="A variable used or introduced in the context", cardinality=Cardinality.ONE_TO_MANY
).filter("The value name is 'a'", depends_on="name")
policy = MaxQuality()
engine = StreamingSequentialExecution(
policy=policy,
Expand All @@ -56,8 +64,7 @@ class Variable(Schema):
allow_bonded_query=True,
)
engine.generate_plan(output, policy)

print(engine.plan)
print("Generated plan:\n", engine.plan)
with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
Expand All @@ -67,7 +74,7 @@ class Variable(Schema):
st.write(strop)

input_records = engine.get_input_records()
input_df = DataRecord.as_df(input_records)
input_df = DataRecord.to_df(input_records, fields_in_schema=True)
print(input_df)

variables = []
Expand All @@ -82,9 +89,9 @@ class Variable(Schema):
total_plan_time = time.time() - start_time
engine.plan_stats.finalize(total_plan_time)

record_time = time.time()
statistics.append(engine.plan_stats)

intermediate_vars = DataRecord.to_df(vars, fields_in_schema=True)
print(intermediate_vars)
for var in vars:
# ref.key = ref.first_author.split()[0] + ref.title.split()[0] + str(ref.year)
try:
Expand Down Expand Up @@ -120,8 +127,8 @@ class Variable(Schema):
st.write(" **value:** ", var.value, "\n")

# write variables to a json file with readable format
with open(f"askem-variables-{dataset}.json", "w") as f:
json.dump(variables, f, indent=4)
# with open(f"askem-variables-{dataset}.json", "w") as f:
# json.dump(variables, f, indent=4)
vars_df = pd.DataFrame(variables)

# G = nx.DiGraph()
Expand Down Expand Up @@ -151,7 +158,7 @@ class Variable(Schema):
#
# nx.write_gexf(G, "demos/bdf-usecase3.gexf")

print("References:", vars_df)
# print("References:", vars_df)
# st.write(table.title, table.author, table.abstract)
# endTime = time.time()
# print("Elapsed time:", endTime - startTime)
12 changes: 6 additions & 6 deletions demos/biofabric-demo-matching.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@
"\n",
"output_rows = []\n",
"for output_table in matched_tables:\n",
" output_rows.append(output_table.as_dict()) \n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
"display(output_df)"
Expand Down Expand Up @@ -904,7 +904,7 @@
"\n",
"output_rows = []\n",
"for output_table in matched_tables:\n",
" output_rows.append(output_table.as_dict()) \n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
"display(output_df)"
Expand Down Expand Up @@ -1125,8 +1125,8 @@
"output_rows = []\n",
"for matched_tables, plan, stats in iterable: # noqa: B007\n",
" for output_table in matched_tables:\n",
" print(output_table.as_dict().keys())\n",
" output_rows.append(output_table.as_dict()) \n",
" print(output_table.to_dict().keys())\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
"display(output_df)"
Expand Down Expand Up @@ -1614,8 +1614,8 @@
"output_rows = []\n",
"for matched_tables, plan, stats in iterable: # noqa: B007\n",
" for output_table in matched_tables:\n",
" print(output_table.as_dict().keys())\n",
" output_rows.append(output_table.as_dict()) \n",
" print(output_table.to_dict().keys())\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
"display(output_df)"
Expand Down
2 changes: 1 addition & 1 deletion demos/fever-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def get_item(self, idx: int, val: bool=False, include_label: bool=False):

record_jsons = []
for record in records:
record_dict = record.as_dict()
record_dict = record.to_dict()
### field_to_keep = ["claim", "id", "label"]
### record_dict = {k: v for k, v in record_dict.items() if k in fields_to_keep}
record_jsons.append(record_dict)
Expand Down
2 changes: 1 addition & 1 deletion demos/optimizer-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ def get_item(self, idx: int, val: bool = False, include_label: bool = False):
# save record outputs
record_jsons = []
for record in records:
record_dict = record.as_dict()
record_dict = record.to_dict()
if workload == "biodex":
record_dict = {
k: v for k, v in record_dict.items() if k in ["pmid", "serious", "patientsex", "drugs", "reactions"]
Expand Down
49 changes: 21 additions & 28 deletions demos/paper-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,14 @@
import numpy as np
from PIL import Image

from palimpzest.constants import Cardinality, OptimizationStrategy
from palimpzest.constants import Cardinality
from palimpzest.core.data.datasources import UserSource
from palimpzest.core.elements.records import DataRecord
from palimpzest.core.lib.fields import BooleanField, Field, ImageFilepathField, ListField, NumericField, StringField
from palimpzest.core.lib.schemas import Schema, Table, TextFile, XLSFile
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality, MinCost, MinTime
from palimpzest.query import (
Execute,
NoSentinelPipelinedParallelExecution,
NoSentinelPipelinedSingleThreadExecution,
NoSentinelSequentialSingleThreadExecution,
)
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.sets import Dataset
from palimpzest.utils.udfs import xls_to_tables

Expand Down Expand Up @@ -211,18 +206,6 @@ def get_item(self, idx: int):
print("Policy not supported for this demo")
exit(1)

execution_engine = None
executor = args.executor
if executor == "sequential":
execution_engine = NoSentinelSequentialSingleThreadExecution
elif executor == "pipelined":
execution_engine = NoSentinelPipelinedSingleThreadExecution
elif executor == "parallel":
execution_engine = NoSentinelPipelinedParallelExecution
else:
print("Executor not supported for this demo")
exit(1)

if os.getenv("OPENAI_API_KEY") is None and os.getenv("TOGETHER_API_KEY") is None:
print("WARNING: Both OPENAI_API_KEY and TOGETHER_API_KEY are unset")

Expand Down Expand Up @@ -262,15 +245,25 @@ def get_item(self, idx: int):
plan = plan.filter("The rows of the table contain the patient age")
plan = plan.convert(CaseData, desc="The patient data in the table", cardinality=Cardinality.ONE_TO_MANY)

# execute pz plan
records, execution_stats = Execute(
plan,
policy,
nocache=True,
optimization_strategy=OptimizationStrategy.PARETO,
execution_engine=execution_engine,
verbose=verbose,
)

config = QueryProcessorConfig(nocache=True, policy=policy, max_workers=10)
# # Option1: Create a basic processor
# # We could pass this process around to different service if needed.
# from palimpzest.query.processor.query_processor_factory import QueryProcessorFactory
# processor = QueryProcessorFactory.create_processor(
# datasource=plan,
# processing_strategy="no_sentinel",
# execution_strategy="sequential",
# optimizer_strategy="pareto",
# config=config
# )
# records, execution_stats = processor.execute()

# Option2: Use the new interface
records, execution_stats = plan.run(config,
optimizer_strategy="pareto",
execution_strategy="sequential",
processing_strategy="no_sentinel")

# save statistics
if profile:
Expand Down
4 changes: 2 additions & 2 deletions quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@
"### Displaying the output\n",
"\n",
"The output of our data pipeline can be found in the `results` variable. \n",
"To print the results as a table, we will initialize a pandas dataframe using the `as_dict` method of the output objects."
"To print the results as a table, we will initialize a pandas dataframe using the `to_dict` method of the output objects."
]
},
{
Expand Down Expand Up @@ -284,7 +284,7 @@
"source": [
"import pandas as pd\n",
"\n",
"output_df = pd.DataFrame([r.as_dict() for r in results])[[\"date\",\"sender\",\"subject\"]]\n",
"output_df = pd.DataFrame([r.to_dict() for r in results])[[\"date\",\"sender\",\"subject\"]]\n",
"display(output_df)\n"
]
},
Expand Down
16 changes: 16 additions & 0 deletions src/cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ name: together-conf
parallel: true
```

You can update an existing config using the `pz update` command (also aliased as `pz uc`):
```bash
$ pz update --name default --settings parallel=true,pdfprocessor=pdfplumber
Updated config: default

$ pz config
--- default ---
filecachedir: /some/local/filepath
llmservice: anthropic
name: default
parallel: true
pdfprocessor: pdfplumber
```

The `--name` parameter specifies which config to update. `--settings` specifies all the parameter name and value pairs in the format `param_name=param_value`, separated by commas.

Finally, you can delete a config with the `pz rm-config` command (also aliased as `pz rmc`):
```bash
$ pz rmc --name together-conf
Expand Down
48 changes: 48 additions & 0 deletions src/cli/cli_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,53 @@ def set_config(name: str) -> None:
_print_msg(f"Set config: {name}")


@cli.command(aliases=["uc", "update"])
@click.option("--name", type=str, default=None, required=True, help="Name of the config to update.")
@click.option(
"--settings",
type=str,
required=True,
help="Parameters to update in format 'param1=value1,param2=value2'. Example: 'llmservice=openai,parallel=true,pdfprocessor=pdfplumber'"
)
def update_config(name: str, settings: str) -> None:
"""
Update multiple parameters in an existing Palimpzest config.
Parameters
----------
name: str
Name of the config to update
params: str
Comma-separated list of parameter=value pairs to update
"""
from palimpzest.config import Config
from palimpzest.constants import PZ_DIR

# check that config exists
if not os.path.exists(os.path.join(PZ_DIR, f"config_{name}.yaml")):
raise InvalidCommandError(f"Config with name {name} does not exist.")

# load the specified config
config = Config(name)

# Parse the params string into a dictionary
try:
param_pairs = settings.split(',')
updates = {}
for pair in param_pairs:
if pair.strip() == "":
continue
param, value = pair.split('=')
updates[param.strip()] = value.strip()
except Exception as e:
raise InvalidCommandError("Invalid params format. Use: param1=value1,param2=value2") from e

# Update each parameter
for param, value in updates.items():
config.set(param, value)

_print_msg(f"Updated config {name} with: {updates}")

def main():
"""
Entrypoint for Palimpzest CLI tool implemented using Click.
Expand All @@ -339,4 +386,5 @@ def main():
cli.add_command(create_config)
cli.add_command(rm_config)
cli.add_command(set_config)
cli.add_command(update_config)
cli()
3 changes: 1 addition & 2 deletions src/palimpzest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from palimpzest.constants import MAX_ROWS, Cardinality, OptimizationStrategy
from palimpzest.constants import MAX_ROWS, Cardinality

# Dataset functionality
#from palimpzest.sets import Dataset
Expand All @@ -20,7 +20,6 @@
# constants
"MAX_ROWS",
"Cardinality",
"OptimizationStrategy",
# datamanager
"DataDirectory",
# policy
Expand Down
13 changes: 1 addition & 12 deletions src/palimpzest/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ class PromptStrategy(str, Enum):
COT_MOA_AGG = "chain-of-thought-mixture-of-agents-aggregation"


class OptimizationStrategy(str, Enum):
"""
OptimizationStrategy determines which (set of) plan(s) the Optimizer
will return to the Execution layer.
"""
GREEDY = "greedy"
CONFIDENCE_INTERVAL = "confidence-interval"
PARETO = "pareto"
SENTINEL = "sentinel"
NONE = "none"


class AggFunc(str, Enum):
COUNT = "count"
AVERAGE = "average"
Expand Down Expand Up @@ -81,6 +69,7 @@ class PickOutputStrategy(str, Enum):

# character limit for various IDs
MAX_ID_CHARS = 10
MAX_DATASET_ID_CHARS = 16

# retry LLM executions 2^x * (multiplier) for up to 10 seconds and at most 4 times
RETRY_MULTIPLIER = 2
Expand Down
Loading

0 comments on commit 5d1f3fa

Please sign in to comment.