Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New features #58

Merged
merged 34 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
baeb799
add to_df, from_df for data records
chjuncn Jan 8, 2025
eabbb11
Fix some includes
mikecafarella Jan 8, 2025
d173f3c
fix imports in pytest
chjuncn Jan 9, 2025
a0db287
fix imports for test
chjuncn Jan 9, 2025
964924f
centralize hash functions into a helper lib
chjuncn Jan 9, 2025
36e4764
Add as_df, from_df methods for DataRecord, so users can easily use th…
chjuncn Jan 9, 2025
15f82b4
Merge branch 'main' of github.com:mitdbg/palimpzest
mikecafarella Jan 9, 2025
53628b4
Merge branch 'main' into new-features
chjuncn Jan 10, 2025
41cfde0
New features (#57)
chjuncn Jan 10, 2025
c212b8d
I think we may still want to finish early for LimitScans; will check …
mdr223 Jan 10, 2025
6edcd79
Dataset accept list, df as inputs.
chjuncn Jan 10, 2025
670cdcb
Merge branch 'new-features' of https://github.com/mitdbg/palimpzest i…
chjuncn Jan 10, 2025
e3b7da2
Merge branch 'main' into new-features
chjuncn Jan 10, 2025
2d80618
resolve merge conflicts
chjuncn Jan 10, 2025
83e5922
Update sets.py
chjuncn Jan 10, 2025
dd415f4
demo doesn't write to disk
chjuncn Jan 10, 2025
50dea7b
add pz update --name for pz config
chjuncn Jan 13, 2025
6ba6e36
The changes move away from a single class handling all responsibiliti…
chjuncn Jan 15, 2025
e2ae35b
resolved merge conflicts with main; still an error in test_optimizer.…
mdr223 Jan 16, 2025
c6dddef
Add option to use plan.run() to execute the plan (Dataset).
chjuncn Jan 17, 2025
1fe61a2
improve the readability of get_champion_model()
chjuncn Jan 17, 2025
d1939bb
resolving merge conflicts
mdr223 Jan 22, 2025
1ec6746
leave TODO for gerardo
mdr223 Jan 22, 2025
5a45ca7
add field types when possible for schemas derived from dataframes
mdr223 Jan 22, 2025
a275a07
use for record field access
mdr223 Jan 22, 2025
0caa080
map both json str methods --> to_json_str; keep indentation consisten…
mdr223 Jan 22, 2025
5548c22
map all as_* verb fcns --> to_* for consistency
mdr223 Jan 22, 2025
bb0d3cd
lint check for List --> list, Tuple --> tuple; tried addressing TODO …
mdr223 Jan 23, 2025
8fb5af9
assert first physical operator is DataSourcePhysicalOp & use source_o…
mdr223 Jan 23, 2025
b0cee01
remove typo
mdr223 Jan 23, 2025
95b8ce9
add newline at end of file
mdr223 Jan 23, 2025
67751e3
mostly lint changes
mdr223 Jan 23, 2025
f194bb2
rename optimize --> get_optimal_plan to clarify purpose a bit
mdr223 Jan 23, 2025
2612154
adding note about None for optimization strategy
mdr223 Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ policy = MinCost()
results, execution_stats = Execute(dataset, policy)

# Writing output to disk
output_df = pd.DataFrame([r.as_dict() for r in results])[["date","sender","subject"]]
output_df = pd.DataFrame([r.to_dict() for r in results])[["date","sender","subject"]]
output_df.to_csv("july_holiday_emails.csv")
```

Expand Down
33 changes: 20 additions & 13 deletions demos/askem-var.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,26 @@ class Variable(Schema):
value = Field(desc="The value of the variable, optional, set 'null' if not found")


dict_of_excerpts = [
{"id": 0, "text": "ne of the few states producing detailed daily reports of COVID-19 confirmed cases, COVID-19 related cumulative hospitalizations, intensive care unit (ICU) admissions, and deaths per county. Likewise, Ohio is a state with marked variation of demographic and geographic attributes among counties along with substantial differences in the capacity of healthcare within the state. Our aim is to predict the spatiotemporal dynamics of the COVID-19 pandemic in relation with the distribution of the capacity of healthcare in Ohio. 2. Methods 2.1. Mathematical model We developed a spatial mathematical model to simulate the transmission dynamics of COVID-19 disease infection and spread. The spatially-explicit model incorporates geographic connectivity information at county level. The Susceptible-Infected-Hospitalized-Recovered- Dead (SIHRD) COVID-19 model classified the population into susceptibles (S), confirmed infections (I), hospitalized and ICU admitted (H), recovered (R) and dead (D). Based on a previous study that identified local air hubs and main roads as important geospatial attributes lio residing in the county. In the second scenario, we used the model to generate projections of the impact of potential easing on the non-pharmaceutical interventions in the critical care capacity of each county in Ohio. We assessed the impact of 50% reduction on the estimated impact of non-pharmaceutical interventions in reducing the hazard rate of infection. Under this scenario we calculated the proportion of ICU \n'"},
{"id": 1, "text": "t model incorporates geographic connectivity information at county level. The Susceptible-Infected-Hospitalized-Recovered- Dead (SIHRD) COVID-19 model classified the population into susceptibles (S), confirmed infections (I), hospitalized and ICU admitted (H), recovered (R) and dead (D). Based on a previous study that identified local air hubs and main roads as important geospatial attributes linked to differential COVID-19 related hospitalizations and mortality (Correa-Agudelo et a"}
]

list_of_strings = ["I have a variable a, the value is 1", "I have a variable b, the value is 2"]
list_of_numbers = [1, 2, 3, 4, 5]

if __name__ == "__main__":
run_pz = True
dataset = "askem"
file_path = "testdata/askem-tiny/"

if run_pz:
# reference, plan, stats = run_workload()
excerpts = Dataset(dataset, schema=TextFile)
df_input = pd.DataFrame(dict_of_excerpts)
excerpts = Dataset(df_input)
output = excerpts.convert(
Variable, desc="A variable used or introduced in the paper snippet", cardinality=Cardinality.ONE_TO_MANY
)

engine = StreamingSequentialExecution
Variable, desc="A variable used or introduced in the context", cardinality=Cardinality.ONE_TO_MANY
).filter("The value name is 'a'", depends_on="name")
policy = MaxQuality()
engine = StreamingSequentialExecution(
policy=policy,
Expand All @@ -56,8 +64,7 @@ class Variable(Schema):
allow_bonded_query=True,
)
engine.generate_plan(output, policy)

print(engine.plan)
print("Generated plan:\n", engine.plan)
with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
Expand All @@ -67,7 +74,7 @@ class Variable(Schema):
st.write(strop)

input_records = engine.get_input_records()
input_df = DataRecord.as_df(input_records)
input_df = DataRecord.to_df(input_records, fields_in_schema=True)
print(input_df)

variables = []
Expand All @@ -82,9 +89,9 @@ class Variable(Schema):
total_plan_time = time.time() - start_time
engine.plan_stats.finalize(total_plan_time)

record_time = time.time()
statistics.append(engine.plan_stats)

intermediate_vars = DataRecord.to_df(vars, fields_in_schema=True)
print(intermediate_vars)
for var in vars:
# ref.key = ref.first_author.split()[0] + ref.title.split()[0] + str(ref.year)
try:
Expand Down Expand Up @@ -120,8 +127,8 @@ class Variable(Schema):
st.write(" **value:** ", var.value, "\n")

# write variables to a json file with readable format
with open(f"askem-variables-{dataset}.json", "w") as f:
json.dump(variables, f, indent=4)
# with open(f"askem-variables-{dataset}.json", "w") as f:
# json.dump(variables, f, indent=4)
vars_df = pd.DataFrame(variables)

# G = nx.DiGraph()
Expand Down Expand Up @@ -151,7 +158,7 @@ class Variable(Schema):
#
# nx.write_gexf(G, "demos/bdf-usecase3.gexf")

print("References:", vars_df)
# print("References:", vars_df)
# st.write(table.title, table.author, table.abstract)
# endTime = time.time()
# print("Elapsed time:", endTime - startTime)
12 changes: 6 additions & 6 deletions demos/biofabric-demo-matching.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@
"\n",
"output_rows = []\n",
"for output_table in matched_tables:\n",
" output_rows.append(output_table.as_dict()) \n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
"display(output_df)"
Expand Down Expand Up @@ -904,7 +904,7 @@
"\n",
"output_rows = []\n",
"for output_table in matched_tables:\n",
" output_rows.append(output_table.as_dict()) \n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
"display(output_df)"
Expand Down Expand Up @@ -1125,8 +1125,8 @@
"output_rows = []\n",
"for matched_tables, plan, stats in iterable: # noqa: B007\n",
" for output_table in matched_tables:\n",
" print(output_table.as_dict().keys())\n",
" output_rows.append(output_table.as_dict()) \n",
" print(output_table.to_dict().keys())\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
"display(output_df)"
Expand Down Expand Up @@ -1614,8 +1614,8 @@
"output_rows = []\n",
"for matched_tables, plan, stats in iterable: # noqa: B007\n",
" for output_table in matched_tables:\n",
" print(output_table.as_dict().keys())\n",
" output_rows.append(output_table.as_dict()) \n",
" print(output_table.to_dict().keys())\n",
" output_rows.append(output_table.to_dict()) \n",
"\n",
"output_df = pd.DataFrame(output_rows)\n",
"display(output_df)"
Expand Down
2 changes: 1 addition & 1 deletion demos/fever-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def get_item(self, idx: int, val: bool=False, include_label: bool=False):

record_jsons = []
for record in records:
record_dict = record.as_dict()
record_dict = record.to_dict()
### field_to_keep = ["claim", "id", "label"]
### record_dict = {k: v for k, v in record_dict.items() if k in fields_to_keep}
record_jsons.append(record_dict)
Expand Down
2 changes: 1 addition & 1 deletion demos/optimizer-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ def get_item(self, idx: int, val: bool = False, include_label: bool = False):
# save record outputs
record_jsons = []
for record in records:
record_dict = record.as_dict()
record_dict = record.to_dict()
if workload == "biodex":
record_dict = {
k: v for k, v in record_dict.items() if k in ["pmid", "serious", "patientsex", "drugs", "reactions"]
Expand Down
49 changes: 21 additions & 28 deletions demos/paper-demo.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving as a mental note that we need to update the other demos as well to use the new execution syntax.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much! I'll update demos to adopt the latest execution format.

Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,14 @@
import numpy as np
from PIL import Image

from palimpzest.constants import Cardinality, OptimizationStrategy
from palimpzest.constants import Cardinality
from palimpzest.core.data.datasources import UserSource
from palimpzest.core.elements.records import DataRecord
from palimpzest.core.lib.fields import BooleanField, Field, ImageFilepathField, ListField, NumericField, StringField
from palimpzest.core.lib.schemas import Schema, Table, TextFile, XLSFile
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality, MinCost, MinTime
from palimpzest.query import (
Execute,
NoSentinelPipelinedParallelExecution,
NoSentinelPipelinedSingleThreadExecution,
NoSentinelSequentialSingleThreadExecution,
)
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.sets import Dataset
from palimpzest.utils.udfs import xls_to_tables

Expand Down Expand Up @@ -211,18 +206,6 @@ def get_item(self, idx: int):
print("Policy not supported for this demo")
exit(1)

execution_engine = None
executor = args.executor
if executor == "sequential":
execution_engine = NoSentinelSequentialSingleThreadExecution
elif executor == "pipelined":
execution_engine = NoSentinelPipelinedSingleThreadExecution
elif executor == "parallel":
execution_engine = NoSentinelPipelinedParallelExecution
else:
print("Executor not supported for this demo")
exit(1)

if os.getenv("OPENAI_API_KEY") is None and os.getenv("TOGETHER_API_KEY") is None:
print("WARNING: Both OPENAI_API_KEY and TOGETHER_API_KEY are unset")

Expand Down Expand Up @@ -262,15 +245,25 @@ def get_item(self, idx: int):
plan = plan.filter("The rows of the table contain the patient age")
plan = plan.convert(CaseData, desc="The patient data in the table", cardinality=Cardinality.ONE_TO_MANY)

# execute pz plan
records, execution_stats = Execute(
plan,
policy,
nocache=True,
optimization_strategy=OptimizationStrategy.PARETO,
execution_engine=execution_engine,
verbose=verbose,
)

config = QueryProcessorConfig(nocache=True, policy=policy, max_workers=10)
# # Option1: Create a basic processor
# # We could pass this process around to different service if needed.
# from palimpzest.query.processor.query_processor_factory import QueryProcessorFactory
# processor = QueryProcessorFactory.create_processor(
# datasource=plan,
# processing_strategy="no_sentinel",
# execution_strategy="sequential",
# optimizer_strategy="pareto",
# config=config
# )
# records, execution_stats = processor.execute()

# Option2: Use the new interface
records, execution_stats = plan.run(config,
optimizer_strategy="pareto",
execution_strategy="sequential",
processing_strategy="no_sentinel")

# save statistics
if profile:
Expand Down
4 changes: 2 additions & 2 deletions quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@
"### Displaying the output\n",
"\n",
"The output of our data pipeline can be found in the `results` variable. \n",
"To print the results as a table, we will initialize a pandas dataframe using the `as_dict` method of the output objects."
"To print the results as a table, we will initialize a pandas dataframe using the `to_dict` method of the output objects."
]
},
{
Expand Down Expand Up @@ -284,7 +284,7 @@
"source": [
"import pandas as pd\n",
"\n",
"output_df = pd.DataFrame([r.as_dict() for r in results])[[\"date\",\"sender\",\"subject\"]]\n",
"output_df = pd.DataFrame([r.to_dict() for r in results])[[\"date\",\"sender\",\"subject\"]]\n",
"display(output_df)\n"
]
},
Expand Down
16 changes: 16 additions & 0 deletions src/cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ name: together-conf
parallel: true
```

You can update an existing config using the `pz update` command (also aliased as `pz uc`):
```bash
$ pz update --name default --settings parallel=true,pdfprocessor=pdfplumber
Updated config: default

$ pz config
--- default ---
filecachedir: /some/local/filepath
llmservice: anthropic
name: default
parallel: true
pdfprocessor: pdfplumber
```

The `--name` parameter specifies which config to update. `--settings` specifies all the parameter name and value pairs in the format `param_name=param_value`, separated by commas.

Finally, you can delete a config with the `pz rm-config` command (also aliased as `pz rmc`):
```bash
$ pz rmc --name together-conf
Expand Down
48 changes: 48 additions & 0 deletions src/cli/cli_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,53 @@ def set_config(name: str) -> None:
_print_msg(f"Set config: {name}")


@cli.command(aliases=["uc", "update"])
@click.option("--name", type=str, default=None, required=True, help="Name of the config to update.")
@click.option(
"--settings",
type=str,
required=True,
help="Parameters to update in format 'param1=value1,param2=value2'. Example: 'llmservice=openai,parallel=true,pdfprocessor=pdfplumber'"
)
def update_config(name: str, settings: str) -> None:
"""
Update multiple parameters in an existing Palimpzest config.
Parameters
----------
name: str
Name of the config to update
params: str
Comma-separated list of parameter=value pairs to update
"""
from palimpzest.config import Config
from palimpzest.constants import PZ_DIR

# check that config exists
if not os.path.exists(os.path.join(PZ_DIR, f"config_{name}.yaml")):
raise InvalidCommandError(f"Config with name {name} does not exist.")

# load the specified config
config = Config(name)

# Parse the params string into a dictionary
try:
param_pairs = settings.split(',')
updates = {}
for pair in param_pairs:
if pair.strip() == "":
continue
param, value = pair.split('=')
updates[param.strip()] = value.strip()
except Exception as e:
raise InvalidCommandError("Invalid params format. Use: param1=value1,param2=value2") from e

# Update each parameter
for param, value in updates.items():
config.set(param, value)

_print_msg(f"Updated config {name} with: {updates}")

def main():
"""
Entrypoint for Palimpzest CLI tool implemented using Click.
Expand All @@ -339,4 +386,5 @@ def main():
cli.add_command(create_config)
cli.add_command(rm_config)
cli.add_command(set_config)
cli.add_command(update_config)
cli()
3 changes: 1 addition & 2 deletions src/palimpzest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from palimpzest.constants import MAX_ROWS, Cardinality, OptimizationStrategy
from palimpzest.constants import MAX_ROWS, Cardinality

# Dataset functionality
#from palimpzest.sets import Dataset
Expand All @@ -20,7 +20,6 @@
# constants
"MAX_ROWS",
"Cardinality",
"OptimizationStrategy",
# datamanager
"DataDirectory",
# policy
Expand Down
13 changes: 1 addition & 12 deletions src/palimpzest/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ class PromptStrategy(str, Enum):
COT_MOA_AGG = "chain-of-thought-mixture-of-agents-aggregation"


class OptimizationStrategy(str, Enum):
"""
OptimizationStrategy determines which (set of) plan(s) the Optimizer
will return to the Execution layer.
"""
GREEDY = "greedy"
CONFIDENCE_INTERVAL = "confidence-interval"
PARETO = "pareto"
SENTINEL = "sentinel"
NONE = "none"


class AggFunc(str, Enum):
COUNT = "count"
AVERAGE = "average"
Expand Down Expand Up @@ -81,6 +69,7 @@ class PickOutputStrategy(str, Enum):

# character limit for various IDs
MAX_ID_CHARS = 10
MAX_DATASET_ID_CHARS = 16

# retry LLM executions 2^x * (multiplier) for up to 10 seconds and at most 4 times
RETRY_MULTIPLIER = 2
Expand Down
Loading
Loading