From 84f09446575ca9915648dc7e09bf24e2a6eaa280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Tue, 2 Jun 2026 18:29:01 +0200 Subject: [PATCH 1/4] Add first draft of changes to make workflows processing command accept additional images metadata for each image, based on metadata file associated to each image --- inference/core/version.py | 2 +- inference_cli/lib/workflows/common.py | 7 +++ .../lib/workflows/local_image_adapter.py | 44 +++++++++++++++++-- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/inference/core/version.py b/inference/core/version.py index 8fcc5d0dae..8099760fb0 100644 --- a/inference/core/version.py +++ b/inference/core/version.py @@ -1,4 +1,4 @@ -__version__ = "1.2.12" +__version__ = "1.2.13" if __name__ == "__main__": diff --git a/inference_cli/lib/workflows/common.py b/inference_cli/lib/workflows/common.py index d3c66c0f07..f6a9f7ce34 100644 --- a/inference_cli/lib/workflows/common.py +++ b/inference_cli/lib/workflows/common.py @@ -298,3 +298,10 @@ def export_images_for_field( registered_images = index_entry.image_outputs[field_name] results.append((image_path, registered_images)) return results + + +def replace_file_extension(path: str, extension: str) -> str: + root, _ = os.path.splitext(path) + if extension and not extension.startswith("."): + extension = f".{extension}" + return root + extension diff --git a/inference_cli/lib/workflows/local_image_adapter.py b/inference_cli/lib/workflows/local_image_adapter.py index d83ad092b8..bf3ca24e27 100644 --- a/inference_cli/lib/workflows/local_image_adapter.py +++ b/inference_cli/lib/workflows/local_image_adapter.py @@ -15,18 +15,16 @@ from inference.core.registries.roboflow import RoboflowModelRegistry from inference.core.roboflow_api import get_workflow_specification from inference.core.workflows.execution_engine.core import ExecutionEngine -from inference.core.workflows.execution_engine.profiling.core import ( - NullWorkflowsProfiler, -) from inference.models.utils import ROBOFLOW_MODEL_TYPES from inference_cli.lib.logger import CLI_LOGGER -from inference_cli.lib.utils import get_all_images_in_directory +from inference_cli.lib.utils import get_all_images_in_directory, read_json from inference_cli.lib.workflows.common import ( WorkflowsImagesProcessingIndex, aggregate_batch_processing_results, denote_image_processed, dump_image_processing_results, open_progress_log, + replace_file_extension, report_failed_files, ) from inference_cli.lib.workflows.entities import ( @@ -110,6 +108,7 @@ def process_image_directory_with_workflow_using_inference_package( debug_mode: bool = False, max_failures: Optional[int] = None, max_concurrent_workflows_steps: int = 4, + images_metadata_input_mapping: Optional[Dict[str, str]] = None, ) -> ImagesDirectoryProcessingDetails: if api_key is None: api_key = API_KEY @@ -139,6 +138,7 @@ def process_image_directory_with_workflow_using_inference_package( max_concurrent_workflows_steps=max_concurrent_workflows_steps, debug_mode=debug_mode, max_failures=max_failures, + images_metadata_input_mapping=images_metadata_input_mapping, ) finally: log_file.close() @@ -180,6 +180,7 @@ def _process_images_within_directory( max_concurrent_workflows_steps: int, debug_mode: bool = False, max_failures: Optional[int] = None, + images_metadata_input_mapping: Optional[Dict[str, str]] = None, ) -> List[Tuple[str, str]]: workflow_specification = _get_workflow_specification( workflow_specification=workflow_specification, @@ -228,6 +229,7 @@ def _process_images_within_directory( thread_pool_executor=thread_pool_executor, max_concurrent_workflows_steps=max_concurrent_workflows_steps, debug_mode=debug_mode, + images_metadata_input_mapping=images_metadata_input_mapping, ) failures = 0 succeeded_files = set() @@ -293,8 +295,42 @@ def _process_single_image_from_directory( max_concurrent_workflows_steps: int, log_file_lock: Optional[Lock] = None, debug_mode: bool = False, + images_metadata_input_mapping: Optional[Dict[str, str]] = None, ) -> bool: + metadata_driven_workflow_parameters = {} + if images_metadata_input_mapping: + assumed_metadata_file_path = replace_file_extension( + image_path, extension=".json" + ) + if not os.path.isfile(assumed_metadata_file_path): + error_summary = ( + f"Cold not find required metadata file for image: {image_path}. " + f"Since `images_metadata_input_mapping` was specified, presence of metadata file is enforced." + ) + if debug_mode: + CLI_LOGGER.exception(error_summary) + on_failure(image_path, error_summary) + return False + metadata = read_json(path=assumed_metadata_file_path) + missing_keys = set(metadata.keys()).difference( + images_metadata_input_mapping.values() + ) + if len(missing_keys) > 0: + error_summary = ( + f"Cold not find required metadata keys specified in `images_metadata_input_mapping` for image: " + f"{image_path}. Missing keys: {missing_keys}." + ) + if debug_mode: + CLI_LOGGER.exception(error_summary) + on_failure(image_path, error_summary) + return False + metadata_driven_workflow_parameters = { + key: metadata[value] for key, value in images_metadata_input_mapping.items() + } try: + workflow_parameters = workflow_parameters or {} + metadata_driven_workflow_parameters.update(workflow_parameters) + workflow_parameters = metadata_driven_workflow_parameters result = _run_workflow_for_single_image_with_inference( model_manager=model_manager, image_path=image_path, From 4246b993d2fd7290429a18f907cb92277a53687f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Tue, 2 Jun 2026 18:41:32 +0200 Subject: [PATCH 2/4] Bump version --- inference/core/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inference/core/version.py b/inference/core/version.py index 8099760fb0..797939a3c3 100644 --- a/inference/core/version.py +++ b/inference/core/version.py @@ -1,4 +1,4 @@ -__version__ = "1.2.13" +__version__ = "1.2.14" if __name__ == "__main__": From 14db97d7776df6eda629af60908c48bed8db8e7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Tue, 2 Jun 2026 20:18:20 +0200 Subject: [PATCH 3/4] Add CLI extension + fix problem with metadata keys --- .../batch_processing/api_operations.py | 6 +++- .../roboflow_cloud/batch_processing/core.py | 33 +++++++++++++++++-- .../batch_processing/entities.py | 8 ++++- .../lib/workflows/local_image_adapter.py | 4 +-- 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/inference_cli/lib/roboflow_cloud/batch_processing/api_operations.py b/inference_cli/lib/roboflow_cloud/batch_processing/api_operations.py index 7309aaed14..78e925d14f 100644 --- a/inference_cli/lib/roboflow_cloud/batch_processing/api_operations.py +++ b/inference_cli/lib/roboflow_cloud/batch_processing/api_operations.py @@ -4,7 +4,7 @@ import string from collections import Counter from datetime import datetime, timezone -from typing import Generator, List, Optional, Set, Union +from typing import Generator, List, Optional, Set, Union, Dict import backoff import requests @@ -402,6 +402,8 @@ def trigger_job_with_workflows_images_processing( inference_backend: Optional[InferenceBackend] = None, job_name: Optional[str] = None, max_image_failure_rate: Optional[float] = None, + images_metadata_part_name: Optional[str] = None, + image_metadata_mapping: Optional[Dict[str, str]] = None, ) -> str: workspace = get_workspace(api_key=api_key) compute_configuration = ComputeConfigurationV2( @@ -411,6 +413,7 @@ def trigger_job_with_workflows_images_processing( input_configuration = StagingBatchInputV1( batch_id=batch_id, part_name=part_name, + images_metadata_part=images_metadata_part_name, ) workflow_parameters = None if workflow_parameters_path: @@ -423,6 +426,7 @@ def trigger_job_with_workflows_images_processing( persist_images_outputs=save_image_outputs, images_outputs_to_be_persisted=image_outputs_to_save, aggregation_format=aggregation_format, + images_metadata_inputs_mapping=image_metadata_mapping, ) if not job_id: job_id = f"job-{_generate_random_string(length=12)}" diff --git a/inference_cli/lib/roboflow_cloud/batch_processing/core.py b/inference_cli/lib/roboflow_cloud/batch_processing/core.py index c59da797cc..c095729e57 100644 --- a/inference_cli/lib/roboflow_cloud/batch_processing/core.py +++ b/inference_cli/lib/roboflow_cloud/batch_processing/core.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List, Optional +from typing import List, Optional, Dict import typer from typing_extensions import Annotated @@ -112,6 +112,16 @@ def show_job_details( raise typer.Exit(code=1) +def parse_key_value(values: List[str]) -> Dict[str, str]: + result: Dict[str, str] = {} + for item in values: + if "=" not in item: + raise typer.BadParameter(f"'{item}' must be in key=value format") + key, value = item.split("=", 1) # split once, so values may contain '=' + result[key] = value + return result + + @batch_processing_app.command(help="Trigger batch job to process images with Workflow") def process_images_with_workflow( batch_id: Annotated[ @@ -156,7 +166,15 @@ def process_images_with_workflow( typer.Option( "--part-name", "-p", - help="Name of the batch part " "(relevant for multipart batches", + help="Name of the batch part with images (relevant for multipart batches)", + ), + ] = None, + images_metadata_part_name: Annotated[ + Optional[str], + typer.Option( + "--images-metadata-part-name", + "-imp", + help="Name of batch part bringing images metadata (relevant for multipart batches)", ), ] = None, machine_type: Annotated[ @@ -243,7 +261,14 @@ def process_images_with_workflow( job_name: Annotated[ Optional[str], typer.Option("--job-name", "-jn", help="Name of your job") ] = None, + image_metadata_mapping: Annotated[Optional[List[str]], typer.Option( + "--metadata-mapping", + "-mm", + help="Key-value mapping of workflow input to metadata key as workflow_input=metadata_key. Repeatable.", + )] = None, ) -> None: + if image_metadata_mapping is not None: + image_metadata_mapping = parse_key_value(image_metadata_mapping) if api_key is None: api_key = ROBOFLOW_API_KEY if workers_per_machine is None and machine_size is not None: @@ -275,6 +300,8 @@ def process_images_with_workflow( inference_backend=inference_backend, job_name=job_name, max_image_failure_rate=max_image_failure_rate, + images_metadata_part_name=images_metadata_part_name, + image_metadata_mapping=image_metadata_mapping, ) print(f"Triggered job with ID: {job_id}") except KeyboardInterrupt: @@ -331,7 +358,7 @@ def process_videos_with_workflow( typer.Option( "--part-name", "-p", - help="Name of the batch part " "(relevant for multipart batches", + help="Name of the batch part relevant for multipart batches", ), ] = None, machine_type: Annotated[ diff --git a/inference_cli/lib/roboflow_cloud/batch_processing/entities.py b/inference_cli/lib/roboflow_cloud/batch_processing/entities.py index 29905b3da6..e3c6d8b5d0 100644 --- a/inference_cli/lib/roboflow_cloud/batch_processing/entities.py +++ b/inference_cli/lib/roboflow_cloud/batch_processing/entities.py @@ -99,7 +99,9 @@ class StagingBatchInputV1(BaseModel): type: Literal["staging-batch-input-v1"] = Field(default="staging-batch-input-v1") batch_id: str = Field(serialization_alias="batchId") part_name: Optional[str] = Field(serialization_alias="partName", default=None) - + images_metadata_part: Optional[str] = Field( + serialization_alias="imagesMetadataPart", default=None + ) class AggregationFormat(str, Enum): CSV = "csv" @@ -130,6 +132,10 @@ class WorkflowsProcessingSpecificationV1(BaseModel): max_video_fps: Optional[Union[int, float]] = Field( serialization_alias="maxVideoFPS", default=None ) + images_metadata_inputs_mapping: Optional[Dict[str, str]] = Field( + serialization_alias="imagesMetadataInputsMapping", + default=None, + ) class WorkflowProcessingJobV1(BaseModel): diff --git a/inference_cli/lib/workflows/local_image_adapter.py b/inference_cli/lib/workflows/local_image_adapter.py index bf3ca24e27..7f96223c1e 100644 --- a/inference_cli/lib/workflows/local_image_adapter.py +++ b/inference_cli/lib/workflows/local_image_adapter.py @@ -312,8 +312,8 @@ def _process_single_image_from_directory( on_failure(image_path, error_summary) return False metadata = read_json(path=assumed_metadata_file_path) - missing_keys = set(metadata.keys()).difference( - images_metadata_input_mapping.values() + missing_keys = set(images_metadata_input_mapping.values()).difference( + metadata.keys() ) if len(missing_keys) > 0: error_summary = ( From 142a3b3a676c5c85c2f20ae3627e55cc5f540ddb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 3 Jun 2026 20:04:18 +0200 Subject: [PATCH 4/4] Add ability to inject workflows ee init params --- inference_cli/lib/workflows/local_image_adapter.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/inference_cli/lib/workflows/local_image_adapter.py b/inference_cli/lib/workflows/local_image_adapter.py index 7f96223c1e..216499418b 100644 --- a/inference_cli/lib/workflows/local_image_adapter.py +++ b/inference_cli/lib/workflows/local_image_adapter.py @@ -109,6 +109,7 @@ def process_image_directory_with_workflow_using_inference_package( max_failures: Optional[int] = None, max_concurrent_workflows_steps: int = 4, images_metadata_input_mapping: Optional[Dict[str, str]] = None, + workflows_execution_engine_init_params: Optional[Dict[str, Any]] = None, ) -> ImagesDirectoryProcessingDetails: if api_key is None: api_key = API_KEY @@ -139,6 +140,7 @@ def process_image_directory_with_workflow_using_inference_package( debug_mode=debug_mode, max_failures=max_failures, images_metadata_input_mapping=images_metadata_input_mapping, + workflows_execution_engine_init_params=workflows_execution_engine_init_params, ) finally: log_file.close() @@ -181,6 +183,7 @@ def _process_images_within_directory( debug_mode: bool = False, max_failures: Optional[int] = None, images_metadata_input_mapping: Optional[Dict[str, str]] = None, + workflows_execution_engine_init_params: Optional[Dict[str, Any]] = None, ) -> List[Tuple[str, str]]: workflow_specification = _get_workflow_specification( workflow_specification=workflow_specification, @@ -230,6 +233,7 @@ def _process_images_within_directory( max_concurrent_workflows_steps=max_concurrent_workflows_steps, debug_mode=debug_mode, images_metadata_input_mapping=images_metadata_input_mapping, + workflows_execution_engine_init_params=workflows_execution_engine_init_params, ) failures = 0 succeeded_files = set() @@ -296,6 +300,7 @@ def _process_single_image_from_directory( log_file_lock: Optional[Lock] = None, debug_mode: bool = False, images_metadata_input_mapping: Optional[Dict[str, str]] = None, + workflows_execution_engine_init_params: Optional[Dict[str, Any]] = None, ) -> bool: metadata_driven_workflow_parameters = {} if images_metadata_input_mapping: @@ -341,6 +346,7 @@ def _process_single_image_from_directory( api_key=api_key, thread_pool_executor=thread_pool_executor, max_concurrent_workflows_steps=max_concurrent_workflows_steps, + workflows_execution_engine_init_params=workflows_execution_engine_init_params, ) index_entry = dump_image_processing_results( result=result, @@ -408,12 +414,15 @@ def _run_workflow_for_single_image_with_inference( api_key: Optional[str], thread_pool_executor: ThreadPoolExecutor, max_concurrent_workflows_steps: int, + workflows_execution_engine_init_params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: workflow_init_parameters = { "workflows_core.model_manager": model_manager, "workflows_core.api_key": api_key, "workflows_core.thread_pool_executor": thread_pool_executor, } + if workflows_execution_engine_init_params: + workflow_init_parameters.update(workflows_execution_engine_init_params) execution_engine = ExecutionEngine.init( workflow_definition=workflow_specification, init_parameters=workflow_init_parameters,