Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion inference/core/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "1.2.12"
__version__ = "1.2.14"


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import string
from collections import Counter
from datetime import datetime, timezone
from typing import Generator, List, Optional, Set, Union
from typing import Generator, List, Optional, Set, Union, Dict

import backoff
import requests
Expand Down Expand Up @@ -402,6 +402,8 @@ def trigger_job_with_workflows_images_processing(
inference_backend: Optional[InferenceBackend] = None,
job_name: Optional[str] = None,
max_image_failure_rate: Optional[float] = None,
images_metadata_part_name: Optional[str] = None,
image_metadata_mapping: Optional[Dict[str, str]] = None,
) -> str:
workspace = get_workspace(api_key=api_key)
compute_configuration = ComputeConfigurationV2(
Expand All @@ -411,6 +413,7 @@ def trigger_job_with_workflows_images_processing(
input_configuration = StagingBatchInputV1(
batch_id=batch_id,
part_name=part_name,
images_metadata_part=images_metadata_part_name,
)
workflow_parameters = None
if workflow_parameters_path:
Expand All @@ -423,6 +426,7 @@ def trigger_job_with_workflows_images_processing(
persist_images_outputs=save_image_outputs,
images_outputs_to_be_persisted=image_outputs_to_save,
aggregation_format=aggregation_format,
images_metadata_inputs_mapping=image_metadata_mapping,
)
if not job_id:
job_id = f"job-{_generate_random_string(length=12)}"
Expand Down
33 changes: 30 additions & 3 deletions inference_cli/lib/roboflow_cloud/batch_processing/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Dict

import typer
from typing_extensions import Annotated
Expand Down Expand Up @@ -112,6 +112,16 @@ def show_job_details(
raise typer.Exit(code=1)


def parse_key_value(values: List[str]) -> Dict[str, str]:
result: Dict[str, str] = {}
for item in values:
if "=" not in item:
raise typer.BadParameter(f"'{item}' must be in key=value format")
key, value = item.split("=", 1) # split once, so values may contain '='
result[key] = value
return result


@batch_processing_app.command(help="Trigger batch job to process images with Workflow")
def process_images_with_workflow(
batch_id: Annotated[
Expand Down Expand Up @@ -156,7 +166,15 @@ def process_images_with_workflow(
typer.Option(
"--part-name",
"-p",
help="Name of the batch part " "(relevant for multipart batches",
help="Name of the batch part with images (relevant for multipart batches)",
),
] = None,
images_metadata_part_name: Annotated[
Optional[str],
typer.Option(
"--images-metadata-part-name",
"-imp",
help="Name of batch part bringing images metadata (relevant for multipart batches)",
),
] = None,
machine_type: Annotated[
Expand Down Expand Up @@ -243,7 +261,14 @@ def process_images_with_workflow(
job_name: Annotated[
Optional[str], typer.Option("--job-name", "-jn", help="Name of your job")
] = None,
image_metadata_mapping: Annotated[Optional[List[str]], typer.Option(
"--metadata-mapping",
"-mm",
help="Key-value mapping of workflow input to metadata key as workflow_input=metadata_key. Repeatable.",
)] = None,
) -> None:
if image_metadata_mapping is not None:
image_metadata_mapping = parse_key_value(image_metadata_mapping)
if api_key is None:
api_key = ROBOFLOW_API_KEY
if workers_per_machine is None and machine_size is not None:
Expand Down Expand Up @@ -275,6 +300,8 @@ def process_images_with_workflow(
inference_backend=inference_backend,
job_name=job_name,
max_image_failure_rate=max_image_failure_rate,
images_metadata_part_name=images_metadata_part_name,
image_metadata_mapping=image_metadata_mapping,
)
print(f"Triggered job with ID: {job_id}")
except KeyboardInterrupt:
Expand Down Expand Up @@ -331,7 +358,7 @@ def process_videos_with_workflow(
typer.Option(
"--part-name",
"-p",
help="Name of the batch part " "(relevant for multipart batches",
help="Name of the batch part relevant for multipart batches",
),
] = None,
machine_type: Annotated[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ class StagingBatchInputV1(BaseModel):
type: Literal["staging-batch-input-v1"] = Field(default="staging-batch-input-v1")
batch_id: str = Field(serialization_alias="batchId")
part_name: Optional[str] = Field(serialization_alias="partName", default=None)

images_metadata_part: Optional[str] = Field(
serialization_alias="imagesMetadataPart", default=None
)

class AggregationFormat(str, Enum):
CSV = "csv"
Expand Down Expand Up @@ -130,6 +132,10 @@ class WorkflowsProcessingSpecificationV1(BaseModel):
max_video_fps: Optional[Union[int, float]] = Field(
serialization_alias="maxVideoFPS", default=None
)
images_metadata_inputs_mapping: Optional[Dict[str, str]] = Field(
serialization_alias="imagesMetadataInputsMapping",
default=None,
)


class WorkflowProcessingJobV1(BaseModel):
Expand Down
7 changes: 7 additions & 0 deletions inference_cli/lib/workflows/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,10 @@ def export_images_for_field(
registered_images = index_entry.image_outputs[field_name]
results.append((image_path, registered_images))
return results


def replace_file_extension(path: str, extension: str) -> str:
root, _ = os.path.splitext(path)
if extension and not extension.startswith("."):
extension = f".{extension}"
return root + extension
53 changes: 49 additions & 4 deletions inference_cli/lib/workflows/local_image_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@
from inference.core.registries.roboflow import RoboflowModelRegistry
from inference.core.roboflow_api import get_workflow_specification
from inference.core.workflows.execution_engine.core import ExecutionEngine
from inference.core.workflows.execution_engine.profiling.core import (
NullWorkflowsProfiler,
)
from inference.models.utils import ROBOFLOW_MODEL_TYPES
from inference_cli.lib.logger import CLI_LOGGER
from inference_cli.lib.utils import get_all_images_in_directory
from inference_cli.lib.utils import get_all_images_in_directory, read_json
from inference_cli.lib.workflows.common import (
WorkflowsImagesProcessingIndex,
aggregate_batch_processing_results,
denote_image_processed,
dump_image_processing_results,
open_progress_log,
replace_file_extension,
report_failed_files,
)
from inference_cli.lib.workflows.entities import (
Expand Down Expand Up @@ -110,6 +108,8 @@ def process_image_directory_with_workflow_using_inference_package(
debug_mode: bool = False,
max_failures: Optional[int] = None,
max_concurrent_workflows_steps: int = 4,
images_metadata_input_mapping: Optional[Dict[str, str]] = None,
workflows_execution_engine_init_params: Optional[Dict[str, Any]] = None,
) -> ImagesDirectoryProcessingDetails:
if api_key is None:
api_key = API_KEY
Expand Down Expand Up @@ -139,6 +139,8 @@ def process_image_directory_with_workflow_using_inference_package(
max_concurrent_workflows_steps=max_concurrent_workflows_steps,
debug_mode=debug_mode,
max_failures=max_failures,
images_metadata_input_mapping=images_metadata_input_mapping,
workflows_execution_engine_init_params=workflows_execution_engine_init_params,
)
finally:
log_file.close()
Expand Down Expand Up @@ -180,6 +182,8 @@ def _process_images_within_directory(
max_concurrent_workflows_steps: int,
debug_mode: bool = False,
max_failures: Optional[int] = None,
images_metadata_input_mapping: Optional[Dict[str, str]] = None,
workflows_execution_engine_init_params: Optional[Dict[str, Any]] = None,
) -> List[Tuple[str, str]]:
workflow_specification = _get_workflow_specification(
workflow_specification=workflow_specification,
Expand Down Expand Up @@ -228,6 +232,8 @@ def _process_images_within_directory(
thread_pool_executor=thread_pool_executor,
max_concurrent_workflows_steps=max_concurrent_workflows_steps,
debug_mode=debug_mode,
images_metadata_input_mapping=images_metadata_input_mapping,
workflows_execution_engine_init_params=workflows_execution_engine_init_params,
)
failures = 0
succeeded_files = set()
Expand Down Expand Up @@ -293,8 +299,43 @@ def _process_single_image_from_directory(
max_concurrent_workflows_steps: int,
log_file_lock: Optional[Lock] = None,
debug_mode: bool = False,
images_metadata_input_mapping: Optional[Dict[str, str]] = None,
workflows_execution_engine_init_params: Optional[Dict[str, Any]] = None,
) -> bool:
metadata_driven_workflow_parameters = {}
if images_metadata_input_mapping:
assumed_metadata_file_path = replace_file_extension(
image_path, extension=".json"
)
if not os.path.isfile(assumed_metadata_file_path):
error_summary = (
f"Cold not find required metadata file for image: {image_path}. "
f"Since `images_metadata_input_mapping` was specified, presence of metadata file is enforced."
)
if debug_mode:
CLI_LOGGER.exception(error_summary)
on_failure(image_path, error_summary)
return False
metadata = read_json(path=assumed_metadata_file_path)
missing_keys = set(images_metadata_input_mapping.values()).difference(
metadata.keys()
)
if len(missing_keys) > 0:
error_summary = (
f"Cold not find required metadata keys specified in `images_metadata_input_mapping` for image: "
f"{image_path}. Missing keys: {missing_keys}."
)
if debug_mode:
CLI_LOGGER.exception(error_summary)
on_failure(image_path, error_summary)
return False
metadata_driven_workflow_parameters = {
key: metadata[value] for key, value in images_metadata_input_mapping.items()
}
try:
workflow_parameters = workflow_parameters or {}
metadata_driven_workflow_parameters.update(workflow_parameters)
workflow_parameters = metadata_driven_workflow_parameters
result = _run_workflow_for_single_image_with_inference(
model_manager=model_manager,
image_path=image_path,
Expand All @@ -305,6 +346,7 @@ def _process_single_image_from_directory(
api_key=api_key,
thread_pool_executor=thread_pool_executor,
max_concurrent_workflows_steps=max_concurrent_workflows_steps,
workflows_execution_engine_init_params=workflows_execution_engine_init_params,
)
index_entry = dump_image_processing_results(
result=result,
Expand Down Expand Up @@ -372,12 +414,15 @@ def _run_workflow_for_single_image_with_inference(
api_key: Optional[str],
thread_pool_executor: ThreadPoolExecutor,
max_concurrent_workflows_steps: int,
workflows_execution_engine_init_params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
workflow_init_parameters = {
"workflows_core.model_manager": model_manager,
"workflows_core.api_key": api_key,
"workflows_core.thread_pool_executor": thread_pool_executor,
}
if workflows_execution_engine_init_params:
workflow_init_parameters.update(workflows_execution_engine_init_params)
execution_engine = ExecutionEngine.init(
workflow_definition=workflow_specification,
init_parameters=workflow_init_parameters,
Expand Down
Loading