Skip to content

Commit

Permalink
Add support for pipeline node level resource configuration (#1203)
Browse files Browse the repository at this point in the history
Adds three optional fields to the pipeline node properties editor to enable
users to specify  CPU, memory, and GPU usage for each node.

Co-authored-by: Karla Spuldaro <[email protected]>
Co-authored-by: Alan Chin <[email protected]>
Co-authored-by: Patrick Titzler <[email protected]>
  • Loading branch information
4 people authored and lresende committed Jan 24, 2021
1 parent 3866441 commit fb680df
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 13 deletions.
45 changes: 45 additions & 0 deletions .github/ISSUE_TEMPLATE/pipeline-issue-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
name: Pipeline issue report
about: Report a potential issue related to the pipeline editor or pipeline execution
title: ''
labels: component:pipeline-editor, component:pipeline-runtime, status:Needs Triage, kind:user-feedback
assignees: ''

---

**Describe the issue**
A clear and concise description of what the issue is.

**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error

**Screenshots or log output**
If applicable, add screenshots or log output to help explain your problem.
<details><summary>Log Output</summary>
<pre>
Paste the log output here.
</pre>
</details>

**Expected behavior**
A clear and concise description of what you expected to happen.

**Deployment information**
Describe what you've deployed and how:
- Elyra version: [e.g. 1.5.3]
- Operating system: [e.g. macos, linux]
- Installation source: [e.g. PyPI, conda, from source, official container image, custom container image]
- Deployment type: [e.g. local installation, Docker, Kubernetes, Kubeflow [notebook server] , Open Data Hub]

**Pipeline runtime environment**
If the issue is related to pipeline execution, identify the environment where the pipeline is executed
- Local execution
- Kubeflow Pipelines (provide version number, whether multi-user auth enabled)
- Apache Airflow (provide version number)

**Runtime configuration settings**
If the issue is related to pipeline execution, document the runtime configuration settings from the Elyra UI, omitting confidential information.
3 changes: 3 additions & 0 deletions elyra/pipeline/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ def _create_pipeline_operation(node: Dict, super_node: Optional[Dict] = None):
id=node_id,
type=node.get('type'),
classifier=node.get('op'),
cpu=PipelineParser._get_app_data_field(node, 'cpu'),
gpu=PipelineParser._get_app_data_field(node, 'gpu'),
memory=PipelineParser._get_app_data_field(node, 'memory'),
filename=PipelineParser._get_app_data_field(node, 'filename'),
runtime_image=PipelineParser._get_app_data_field(node, 'runtime_image'),
dependencies=PipelineParser._get_app_data_field(node, 'dependencies', []),
Expand Down
51 changes: 39 additions & 12 deletions elyra/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class Operation(object):
Represents a single operation in a pipeline
"""

def __init__(self, id, type, classifier, filename, runtime_image, dependencies=None,
include_subdirectories: bool = False, env_vars=None, inputs=None, outputs=None,
def __init__(self, id, type, classifier, filename, runtime_image, memory=None, cpu=None, gpu=None,
dependencies=None, include_subdirectories: bool = False, env_vars=None, inputs=None, outputs=None,
parent_operations=None):
"""
:param id: Generated UUID, 128 bit number used as a unique identifier
Expand All @@ -42,6 +42,9 @@ def __init__(self, id, type, classifier, filename, runtime_image, dependencies=N
:param inputs: List of files to be consumed by this operation, produced by parent operation(s)
:param outputs: List of files produced by this operation to be included in a child operation(s)
:param parent_operations: List of parent operation 'ids' required to execute prior to this operation
:param cpu: number of cpus requested to run the operation
:param memory: amount of memory requested to run the operation (in Gi)
:param gpu: number of gpus requested to run the operation
"""

# validate that the operation has all required properties
Expand All @@ -67,6 +70,9 @@ def __init__(self, id, type, classifier, filename, runtime_image, dependencies=N
self._inputs = inputs or []
self._outputs = outputs or []
self._parent_operations = parent_operations or []
self._cpu = cpu
self._gpu = gpu
self._memory = memory

@property
def id(self):
Expand Down Expand Up @@ -104,6 +110,18 @@ def include_subdirectories(self):
def env_vars(self):
return self._env_vars

@property
def cpu(self):
return self._cpu

@property
def memory(self):
return self._memory

@property
def gpu(self):
return self._gpu

def env_vars_as_dict(self, logger: Optional[object] = None) -> Dict:
"""Operation stores environment variables in a list of name=value pairs, while
subprocess.run() requires a dictionary - so we must convert. If no envs are
Expand Down Expand Up @@ -155,7 +173,10 @@ def __eq__(self, other: object) -> bool:
self.include_subdirectories == other.include_subdirectories and \
self.outputs == other.outputs and \
self.inputs == other.inputs and \
self.parent_operations == other.parent_operations
self.parent_operations == other.parent_operations and \
self.cpu == other.cpu and \
self.gpu == other.gpu and \
self.memory == other.memory

def __str__(self) -> str:
return "componentID : {id} \n " \
Expand All @@ -166,15 +187,21 @@ def __str__(self) -> str:
"filename : {filename} \n " \
"inputs : {inputs} \n " \
"outputs : {outputs} \n " \
"runtime image : {image} \n ".format(id=self.id,
name=self.name,
parent_op=self.parent_operations,
depends=self.dependencies,
inc_subdirs=self.include_subdirectories,
filename=self.filename,
inputs=self.inputs,
outputs=self.outputs,
image=self.runtime_image)
"image : {image} \n " \
"gpu: {gpu} \n " \
"memory: {memory} \n " \
"cpu : {cpu} \n ".format(id=self.id,
name=self.name,
parent_op=self.parent_operations,
depends=self.dependencies,
inc_subdirs=self.include_subdirectories,
filename=self.filename,
inputs=self.inputs,
outputs=self.outputs,
image=self.runtime_image,
gpu=self.gpu,
cpu=self.cpu,
memory=self.memory)


class Pipeline(object):
Expand Down
3 changes: 3 additions & 0 deletions elyra/pipeline/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ def _cc_pipeline(self, pipeline, pipeline_name):
pipeline_outputs=operation.outputs,
pipeline_envs=pipeline_envs,
emptydir_volume_size=emptydir_volume_size,
cpu_request=operation.cpu,
mem_request=operation.memory,
gpu_limit=operation.gpu,
image=operation.runtime_image,
file_outputs={
'mlpipeline-metrics':
Expand Down
3 changes: 3 additions & 0 deletions elyra/templates/kfp_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def create_pipeline():
cos_dependencies_archive='{{ operation.cos_dependencies_archive }}',
pipeline_inputs={{ operation.pipeline_inputs }},
pipeline_outputs={{ operation.pipeline_outputs }},
cpu_request='{{ operation.cpu_request }}',
mem_request='{{ operation.mem_request }}G',
gpu_limit='{{ operation.gpu_limit }}',
image='{{ operation.image }}',
file_outputs={
'mlpipeline-metrics': '{{ metrics_file }}',
Expand Down
26 changes: 26 additions & 0 deletions packages/pipeline-editor/src/PipelineEditorWidget.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ export class PipelineEditor extends React.Component<
app_data.dependencies;
node_props.parameterDef.current_parameters.include_subdirectories =
app_data.include_subdirectories;
node_props.parameterDef.current_parameters.cpu = app_data.cpu;
node_props.parameterDef.current_parameters.memory = app_data.memory;
node_props.parameterDef.current_parameters.gpu = app_data.gpu;
node_props.parameterDef.titleDefinition = {
title: this.canvasController.getNode(source.id).label,
editable: true
Expand Down Expand Up @@ -540,6 +543,9 @@ export class PipelineEditor extends React.Component<
app_data.env_vars = propertySet.env_vars;
app_data.dependencies = propertySet.dependencies;
app_data.include_subdirectories = propertySet.include_subdirectories;
app_data.cpu = propertySet.cpu;
app_data.memory = propertySet.memory;
app_data.gpu = propertySet.gpu;
this.validateAllNodes();
this.updateModel();
}
Expand Down Expand Up @@ -848,6 +854,22 @@ export class PipelineEditor extends React.Component<
}
}

cleanNullProperties(): void {
// Delete optional fields that have null value
for (const node of this.canvasController.getPipelineFlow().pipelines[0]
.nodes) {
if (node.app_data.cpu === null) {
delete node.app_data.cpu;
}
if (node.app_data.memory === null) {
delete node.app_data.memory;
}
if (node.app_data.gpu === null) {
delete node.app_data.gpu;
}
}
}

async handleExportPipeline(): Promise<void> {
// Warn user if the pipeline has invalid nodes
const errorMessage = await this.validatePipeline();
Expand Down Expand Up @@ -906,6 +928,8 @@ export class PipelineEditor extends React.Component<
this.widgetContext.path
);

this.cleanNullProperties();

pipelineFlow.pipelines[0]['app_data']['name'] = pipeline_name;
pipelineFlow.pipelines[0]['app_data']['runtime'] = runtime;
pipelineFlow.pipelines[0]['app_data']['runtime-config'] = runtime_config;
Expand Down Expand Up @@ -1259,6 +1283,8 @@ export class PipelineEditor extends React.Component<
this.widgetContext.path
);

this.cleanNullProperties();

pipelineFlow.pipelines[0]['app_data']['name'] =
dialogResult.value.pipeline_name;
pipelineFlow.pipelines[0]['app_data']['runtime'] = runtime;
Expand Down
Loading

0 comments on commit fb680df

Please sign in to comment.