Skip to content

Commit

Permalink
feat: Make bytewax job write as mini-batches (#5)
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Nguyen <[email protected]>
Co-authored-by: Hai Nguyen <[email protected]>
  • Loading branch information
KarolisKont and sudohainguyen authored Oct 17, 2023
1 parent 2949d12 commit b55e06e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from typing import List

import pyarrow as pa
Expand All @@ -13,6 +14,7 @@
from feast import FeatureStore, FeatureView, RepoConfig
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping

DEFAULT_BATCH_SIZE = 1000
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -48,6 +50,11 @@ def input_builder(self, worker_index, worker_count, _state):
return [(None, self.paths[self.worker_index])]

def output_builder(self, worker_index, worker_count):
def yield_batch(iterable, batch_size):
"""Yield mini-batches from an iterable."""
for i in range(0, len(iterable), batch_size):
yield iterable[i : i + batch_size]

def output_fn(batch):
table = pa.Table.from_batches([batch])

Expand All @@ -66,12 +73,17 @@ def output_fn(batch):
)
provider = self.feature_store._get_provider()
with tqdm(total=len(rows_to_write)) as progress:
provider.online_write_batch(
config=self.config,
table=self.feature_view,
data=rows_to_write,
progress=progress.update,
# break rows_to_write to mini-batches
batch_size = int(
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
)
for mini_batch in yield_batch(rows_to_write, batch_size):
provider.online_write_batch(
config=self.config,
table=self.feature_view,
data=mini_batch,
progress=progress.update,
)

return output_fn

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
print_pod_logs_on_failure: bool = True
"""(optional) Print pod logs on job failure. Only applies to synchronous materialization"""

mini_batch_size: int = 1000
""" (optional) Number of rows to process per write operation (default 1000)"""


class BytewaxMaterializationEngine(BatchMaterializationEngine):
def __init__(
Expand Down Expand Up @@ -362,6 +365,10 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
"name": "BYTEWAX_STATEFULSET_NAME",
"value": f"dataflow-{job_id}",
},
{
"name": "BYTEWAX_MINI_BATCH_SIZE",
"value": str(self.batch_engine_config.mini_batch_size),
},
]
# Add any Feast configured environment variables
job_env.extend(env)
Expand Down

0 comments on commit b55e06e

Please sign in to comment.