Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Refactor to Better Modularize Processing, Execution, Optimization, and Configuration #73

Merged
merged 7 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ policy = MinCost()
results, execution_stats = Execute(dataset, policy)

# Writing output to disk
output_df = pd.DataFrame([r.as_dict() for r in results])[["date","sender","subject"]]
output_df = pd.DataFrame([r.to_dict() for r in results])[["date","sender","subject"]]
output_df.to_csv("july_holiday_emails.csv")
```

Expand Down
52 changes: 31 additions & 21 deletions demos/askem-var.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from palimpzest.core.lib.fields import Field
from palimpzest.core.lib.schemas import Schema, TextFile
from palimpzest.policy import MaxQuality
from palimpzest.query import StreamingSequentialExecution
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.query.processor.query_processor_factory import QueryProcessorFactory
from palimpzest.sets import Dataset


Expand All @@ -34,39 +35,48 @@ class Variable(Schema):
value = Field(desc="The value of the variable, optional, set 'null' if not found")


dict_of_excerpts = [
{"id": 0, "text": "ne of the few states producing detailed daily reports of COVID-19 confirmed cases, COVID-19 related cumulative hospitalizations, intensive care unit (ICU) admissions, and deaths per county. Likewise, Ohio is a state with marked variation of demographic and geographic attributes among counties along with substantial differences in the capacity of healthcare within the state. Our aim is to predict the spatiotemporal dynamics of the COVID-19 pandemic in relation with the distribution of the capacity of healthcare in Ohio. 2. Methods 2.1. Mathematical model We developed a spatial mathematical model to simulate the transmission dynamics of COVID-19 disease infection and spread. The spatially-explicit model incorporates geographic connectivity information at county level. The Susceptible-Infected-Hospitalized-Recovered- Dead (SIHRD) COVID-19 model classified the population into susceptibles (S), confirmed infections (I), hospitalized and ICU admitted (H), recovered (R) and dead (D). Based on a previous study that identified local air hubs and main roads as important geospatial attributes lio residing in the county. In the second scenario, we used the model to generate projections of the impact of potential easing on the non-pharmaceutical interventions in the critical care capacity of each county in Ohio. We assessed the impact of 50% reduction on the estimated impact of non-pharmaceutical interventions in reducing the hazard rate of infection. Under this scenario we calculated the proportion of ICU \n'"},
{"id": 1, "text": "t model incorporates geographic connectivity information at county level. The Susceptible-Infected-Hospitalized-Recovered- Dead (SIHRD) COVID-19 model classified the population into susceptibles (S), confirmed infections (I), hospitalized and ICU admitted (H), recovered (R) and dead (D). Based on a previous study that identified local air hubs and main roads as important geospatial attributes linked to differential COVID-19 related hospitalizations and mortality (Correa-Agudelo et a"}
]

list_of_strings = ["I have a variable a, the value is 1", "I have a variable b, the value is 2"]
list_of_numbers = [1, 2, 3, 4, 5]

if __name__ == "__main__":
run_pz = True
dataset = "askem"
file_path = "testdata/askem-tiny/"

if run_pz:
# reference, plan, stats = run_workload()
excerpts = Dataset(dataset, schema=TextFile)
df_input = pd.DataFrame(dict_of_excerpts)
excerpts = Dataset(df_input)
output = excerpts.convert(
Variable, desc="A variable used or introduced in the paper snippet", cardinality=Cardinality.ONE_TO_MANY
)

engine = StreamingSequentialExecution
Variable, desc="A variable used or introduced in the context", cardinality=Cardinality.ONE_TO_MANY
).filter("The value name is 'a'", depends_on="name")
policy = MaxQuality()
engine = StreamingSequentialExecution(
config = QueryProcessorConfig(
policy=policy,
nocache=True,
verbose=True,
allow_code_synth=False,
allow_token_reduction=False,
allow_bonded_query=True,
verbose=False,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
engine.generate_plan(output, policy)
processor = QueryProcessorFactory.create_processor(excerpts, config)
plan = processor.generate_plan(output, policy)
print(processor.plan)

print(engine.plan)
with st.container():
st.write("### Executed plan: \n")
# st.write(" " + str(plan).replace("\n", " \n "))
for idx, op in enumerate(engine.plan.operators):
for idx, op in enumerate(processor.plan.operators):
strop = f"{idx + 1}. {str(op)}"
strop = strop.replace("\n", " \n")
st.write(strop)

input_records = engine.get_input_records()
input_records = processor.get_input_records()
input_df = DataRecord.as_df(input_records)
print(input_df)

Expand All @@ -77,13 +87,13 @@ class Variable(Schema):
for idx, record in enumerate(input_records):
print(f"idx: {idx}\n vars: {vars}")
index = idx
vars = engine.execute_opstream(engine.plan, record)
vars = processor.execute_opstream(processor.plan, record)
if idx == len(input_records) - 1:
total_plan_time = time.time() - start_time
engine.plan_stats.finalize(total_plan_time)
processor.plan_stats.finalize(total_plan_time)

record_time = time.time()
statistics.append(engine.plan_stats)
statistics.append(processor.plan_stats)

for var in vars:
# ref.key = ref.first_author.split()[0] + ref.title.split()[0] + str(ref.year)
Expand Down Expand Up @@ -120,8 +130,8 @@ class Variable(Schema):
st.write(" **value:** ", var.value, "\n")

# write variables to a json file with readable format
with open(f"askem-variables-{dataset}.json", "w") as f:
json.dump(variables, f, indent=4)
# with open(f"askem-variables-{dataset}.json", "w") as f:
# json.dump(variables, f, indent=4)
vars_df = pd.DataFrame(variables)

# G = nx.DiGraph()
Expand Down Expand Up @@ -151,7 +161,7 @@ class Variable(Schema):
#
# nx.write_gexf(G, "demos/bdf-usecase3.gexf")

print("References:", vars_df)
# print("References:", vars_df)
# st.write(table.title, table.author, table.abstract)
# endTime = time.time()
# print("Elapsed time:", endTime - startTime)
44 changes: 25 additions & 19 deletions demos/bdf-suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from palimpzest.core.lib.schemas import URL, File, PDFFile, Schema, Table, XLSFile
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality
from palimpzest.query import Execute, StreamingSequentialExecution
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.sets import Dataset
from palimpzest.utils import udfs

Expand Down Expand Up @@ -94,7 +94,7 @@ class CaseData(Schema):


@st.cache_resource()
def extract_supplemental(engine, policy):
def extract_supplemental(processing_strategy, execution_strategy, optimizer_strategy, policy):
papers = Dataset("biofabric-pdf", schema=ScientificPaper)
paper_urls = papers.convert(URL, desc="The DOI url of the paper")
html_doi = paper_urls.convert(File, udf=udfs.url_to_file)
Expand All @@ -107,15 +107,17 @@ def extract_supplemental(engine, policy):
xls = tables.convert(XLSFile, udf=udfs.file_to_xls)
patient_tables = xls.convert(Table, udf=udfs.xls_to_tables, cardinality=Cardinality.ONE_TO_MANY)

# output = patient_tables
iterable = Execute(
patient_tables,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy=processing_strategy,
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
iterable = patient_tables.run(config)


tables = []
statistics = []
Expand All @@ -128,22 +130,24 @@ def extract_supplemental(engine, policy):


@st.cache_resource()
def integrate_tables(engine, policy):
def integrate_tables(processing_strategy, execution_strategy, optimizer_strategy, policy):
xls = Dataset("biofabric-tiny", schema=XLSFile)
patient_tables = xls.convert(Table, udf=udfs.xls_to_tables, cardinality=Cardinality.ONE_TO_MANY)
patient_tables = patient_tables.filter("The table contains biometric information about the patient")
case_data = patient_tables.convert(
CaseData, desc="The patient data in the table", cardinality=Cardinality.ONE_TO_MANY
)

iterable = Execute(
case_data,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy=processing_strategy,
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
iterable = case_data.run(config)

tables = []
statistics = []
Expand All @@ -156,22 +160,23 @@ def integrate_tables(engine, policy):


@st.cache_resource()
def extract_references(engine, policy):
def extract_references(processing_strategy, execution_strategy, optimizer_strategy, policy):
papers = Dataset("bdf-usecase3-tiny", schema=ScientificPaper)
papers = papers.filter("The paper mentions phosphorylation of Exo1")
references = papers.convert(
Reference, desc="A paper cited in the reference section", cardinality=Cardinality.ONE_TO_MANY
)

output = references
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy=processing_strategy,
execution_strategy=execution_strategy,
optimizer_strategy=optimizer_strategy,
)
iterable = references.run(config)

tables = []
statistics = []
Expand Down Expand Up @@ -204,17 +209,18 @@ def extract_references(engine, policy):

# output = references
# engine = NoSentinelExecution
engine = StreamingSequentialExecution
# policy = MinCost()
policy = MaxQuality()
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

references = []
statistics = []
Expand Down
20 changes: 11 additions & 9 deletions demos/bdf-usecase3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from palimpzest.core.lib.schemas import PDFFile, Schema
from palimpzest.datamanager.datamanager import DataDirectory
from palimpzest.policy import MaxQuality, MinCost
from palimpzest.query import Execute, StreamingSequentialExecution
from palimpzest.query.processor.config import QueryProcessorConfig
from palimpzest.sets import Dataset

if not os.environ.get("OPENAI_API_KEY"):
Expand Down Expand Up @@ -61,16 +61,17 @@ def run_workload():

output = references
# engine = NoSentinelExecution
engine = StreamingSequentialExecution
policy = MinCost()
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

tables = []
statistics = []
Expand Down Expand Up @@ -103,17 +104,18 @@ def run_workload():

# output = references
# engine = NoSentinelExecution
engine = StreamingSequentialExecution
# policy = MinCost()
policy = MaxQuality()
iterable = Execute(
output,
config = QueryProcessorConfig(
policy=policy,
nocache=True,
allow_code_synth=False,
allow_token_reduction=False,
execution_engine=engine,
processing_strategy="streaming",
execution_strategy="sequential",
optimizer_strategy="pareto",
)
iterable = output.run(config)

references = []
statistics = []
Expand Down
Loading
Loading