Skip to content

Commit 8fb5af9

Browse files
committed
assert first physical operator is DataSourcePhysicalOp & use source_operator.get_datasource()
1 parent bb0d3cd commit 8fb5af9

File tree

3 files changed

+15
-32
lines changed

3 files changed

+15
-32
lines changed

src/palimpzest/query/execution/parallel_execution_strategy.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@
88
from palimpzest.core.lib.schemas import SourceRecord
99
from palimpzest.query.execution.execution_strategy import ExecutionStrategy
1010
from palimpzest.query.operators.aggregate import AggregateOp
11-
from palimpzest.query.operators.datasource import MarshalAndScanDataOp
11+
from palimpzest.query.operators.datasource import DataSourcePhysicalOp
1212
from palimpzest.query.operators.limit import LimitScanOp
1313
from palimpzest.query.operators.physical import PhysicalOperator
1414
from palimpzest.query.optimizer.plan import PhysicalPlan
1515

16+
g
1617

1718
class PipelinedParallelExecutionStrategy(ExecutionStrategy):
1819
"""
@@ -84,12 +85,9 @@ def execute_plan(self, plan: PhysicalPlan, num_samples: int | float = float("inf
8485

8586
# get handle to DataSource and pre-compute its op_id and size
8687
source_operator = plan.operators[0]
88+
assert isinstance(source_operator, DataSourcePhysicalOp), "First operator in physical plan must be a DataSourcePhysicalOp"
8789
source_op_id = source_operator.get_op_id()
88-
datasource = (
89-
source_operator.get_datasource()
90-
if isinstance(source_operator, MarshalAndScanDataOp)
91-
else self.datadir.get_cached_result(source_operator.dataset_id)
92-
)
90+
datasource = source_operator.get_datasource()
9391
datasource_len = len(datasource)
9492

9593
# get limit of final limit operator (if one exists)

src/palimpzest/query/execution/single_threaded_execution_strategy.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from palimpzest.core.lib.schemas import SourceRecord
66
from palimpzest.query.execution.execution_strategy import ExecutionStrategy
77
from palimpzest.query.operators.aggregate import AggregateOp
8-
from palimpzest.query.operators.datasource import DataSourcePhysicalOp, MarshalAndScanDataOp
8+
from palimpzest.query.operators.datasource import DataSourcePhysicalOp
99
from palimpzest.query.operators.filter import FilterOp
1010
from palimpzest.query.operators.limit import LimitScanOp
1111
from palimpzest.query.optimizer.plan import PhysicalPlan
@@ -48,11 +48,8 @@ def execute_plan(self, plan: PhysicalPlan, num_samples: int | float = float("inf
4848

4949
# get handle to DataSource and pre-compute its size
5050
source_operator = plan.operators[0]
51-
datasource = (
52-
source_operator.get_datasource()
53-
if isinstance(source_operator, MarshalAndScanDataOp)
54-
else self.datadir.get_cached_result(source_operator.dataset_id)
55-
)
51+
assert isinstance(source_operator, DataSourcePhysicalOp), "First operator in physical plan must be a DataSourcePhysicalOp"
52+
datasource = source_operator.get_datasource()
5653
datasource_len = len(datasource)
5754

5855
# initialize processing queues for each operation
@@ -186,11 +183,8 @@ def execute_plan(self, plan: PhysicalPlan, num_samples: int | float = float("inf
186183

187184
# get handle to DataSource and pre-compute its size
188185
source_operator = plan.operators[0]
189-
datasource = (
190-
source_operator.get_datasource()
191-
if isinstance(source_operator, MarshalAndScanDataOp)
192-
else self.datadir.get_cached_result(source_operator.dataset_id)
193-
)
186+
assert isinstance(source_operator, DataSourcePhysicalOp), "First operator in physical plan must be a DataSourcePhysicalOp"
187+
datasource = source_operator.get_datasource()
194188
datasource_len = len(datasource)
195189

196190
# initialize processing queues for each operation

src/palimpzest/query/processor/nosentinel_processor.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,8 @@ def execute_plan(self, plan: PhysicalPlan, num_samples: int | float = float("inf
9292

9393
# get handle to DataSource and pre-compute its size
9494
source_operator = plan.operators[0]
95-
datasource = (
96-
source_operator.get_datasource()
97-
if isinstance(source_operator, MarshalAndScanDataOp)
98-
else self.datadir.get_cached_result(source_operator.dataset_id)
99-
)
95+
assert isinstance(source_operator, DataSourcePhysicalOp), "First operator in physical plan must be a DataSourcePhysicalOp"
96+
datasource = source_operator.get_datasource()
10097
datasource_len = len(datasource)
10198

10299
# Calculate total work units - each record needs to go through each operator
@@ -272,11 +269,8 @@ def execute_plan(self, plan: PhysicalPlan, num_samples: int | float = float("inf
272269

273270
# get handle to DataSource and pre-compute its size
274271
source_operator = plan.operators[0]
275-
datasource = (
276-
source_operator.get_datasource()
277-
if isinstance(source_operator, MarshalAndScanDataOp)
278-
else self.datadir.get_cached_result(source_operator.dataset_id)
279-
)
272+
assert isinstance(source_operator, DataSourcePhysicalOp), "First operator in physical plan must be a DataSourcePhysicalOp"
273+
datasource = source_operator.get_datasource()
280274
datasource_len = len(datasource)
281275

282276
# Calculate total work units - each record needs to go through each operator
@@ -468,11 +462,8 @@ def __init__(self, *args, **kwargs):
468462

469463
# # get handle to DataSource and pre-compute its size
470464
# source_operator = plan.operators[0]
471-
# datasource = (
472-
# source_operator.get_datasource()
473-
# if isinstance(source_operator, MarshalAndScanDataOp)
474-
# else self.datadir.get_cached_result(source_operator.dataset_id)
475-
# )
465+
# assert isinstance(source_operator, DataSourcePhysicalOp), "First operator in physical plan must be a DataSourcePhysicalOp"
466+
# datasource = source_operator.get_datasource()
476467
# datasource_len = len(datasource)
477468

478469
# # Calculate total work units - each record needs to go through each operator

0 commit comments

Comments
 (0)