Skip to content

Commit

Permalink
Merge pull request #211 from jbernal0019/master
Browse files Browse the repository at this point in the history
Implement custom env variable injection into plugin containers
  • Loading branch information
jbernal0019 committed Sep 27, 2022
2 parents fba58e8 + f91e7b7 commit 22e5339
Show file tree
Hide file tree
Showing 53 changed files with 30 additions and 13 deletions.
Empty file modified .github/workflows/ci.yml
100644 → 100755
Empty file.
Empty file modified Dockerfile
100644 → 100755
Empty file.
Empty file modified LICENSE
100644 → 100755
Empty file.
Empty file modified README.md
100644 → 100755
Empty file.
Empty file modified docker-compose.yml
100644 → 100755
Empty file.
Empty file modified kubernetes/pman_dev.yaml
100644 → 100755
Empty file.
Empty file modified kubernetes/prod/extra/README
100644 → 100755
Empty file.
Empty file modified kubernetes/prod/extra/job_creator/README
100644 → 100755
Empty file.
Empty file modified kubernetes/prod/extra/job_creator/binding.yml
100644 → 100755
Empty file.
Empty file modified kubernetes/prod/extra/job_creator/role.yml
100644 → 100755
Empty file.
Empty file modified kubernetes/prod/extra/job_creator/sa.yml
100644 → 100755
Empty file.
Empty file modified kubernetes/prod/kustomization.yaml
100644 → 100755
Empty file.
Empty file modified kubernetes/prod/resources/pman.yaml
100644 → 100755
Empty file.
Empty file modified kubernetes/prod/secrets/.pman.env
100644 → 100755
Empty file.
Empty file modified openshift/README.rst
100644 → 100755
Empty file.
Empty file modified openshift/example-config.cfg
100644 → 100755
Empty file.
Empty file modified openshift/example-secret.yml
100644 → 100755
Empty file.
Empty file modified openshift/pman-openshift-template-without-swift.json
100644 → 100755
Empty file.
Empty file modified openshift/pman-openshift-template.json
100644 → 100755
Empty file.
Empty file modified pman/__init__.py
100644 → 100755
Empty file.
Empty file modified pman/__main__.py
100644 → 100755
Empty file.
2 changes: 1 addition & 1 deletion pman/abstractmgr.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __init__(self, config_dict: dict = None):

@abstractmethod
def schedule_job(self, image: Image, command: List[str], name: JobName,
resources_dict: Resources, mountdir: Optional[str] = None) -> J:
resources_dict: Resources, env: List[str], mountdir: Optional[str] = None) -> J:
"""
Schedule a new job and return the job object.
"""
Expand Down
Empty file modified pman/app.py
100644 → 100755
Empty file.
Empty file modified pman/config.py
100644 → 100755
Empty file.
Empty file modified pman/cromwell/__init__.py
100644 → 100755
Empty file.
Empty file modified pman/cromwell/client.py
100644 → 100755
Empty file.
Empty file modified pman/cromwell/models.py
100644 → 100755
Empty file.
Empty file modified pman/cromwell/slurm/__init__.py
100644 → 100755
Empty file.
Empty file modified pman/cromwell/slurm/wdl.py
100644 → 100755
Empty file.
Empty file modified pman/cromwellmgr.py
100644 → 100755
Empty file.
24 changes: 16 additions & 8 deletions pman/kubernetesmgr.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ def __init__(self, config_dict=None):
self.kube_client = k_client.CoreV1Api()
self.kube_v1_batch_client = k_client.BatchV1Api()

def schedule_job(self, image, command, name, resources_dict, mountdir=None) -> V1Job:
def schedule_job(self, image, command, name, resources_dict, env, mountdir=None) -> \
V1Job:
"""
Schedule a new job and return the job object.
"""
job_instance = self.create_job(image, command, name, resources_dict, mountdir)
job_instance = self.create_job(image, command, name, resources_dict, env,
mountdir)
job = self.submit_job(job_instance)
return job

Expand Down Expand Up @@ -58,7 +60,7 @@ def get_job_logs(self, job: V1Job, tail: int) -> AnyStr:
# and return immediately.
term_reason = self.__get_termination_reason(pod_item)
if term_reason is not None:
if term_reason is not 'Completed':
if term_reason != 'Completed':
logs += f'\n{term_reason}'
return logs
return logs
Expand Down Expand Up @@ -117,7 +119,8 @@ def remove_job(self, job):
self.kube_v1_batch_client.delete_namespaced_job(job.metadata.name, body=body,
namespace=job_namespace)

def create_job(self, image, command, name, resources_dict, mountdir=None) -> V1Job:
def create_job(self, image, command, name, resources_dict, env_l, mountdir=None) -> \
V1Job:
"""
Create and return a new job instance.
"""
Expand All @@ -133,14 +136,19 @@ def create_job(self, image, command, name, resources_dict, mountdir=None) -> V1J
# > request, Kubernetes automatically assigns a memory request that matches the limit.

limits = {'memory': memory_limit, 'cpu': cpu_limit}

env = []
for s in env_l:
key, val = s.split('=', 1)
env.append(k_client.V1EnvVar(name=key, value=val))

if gpu_limit > 0:
# ref: https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/
limits['nvidia.com/gpu'] = gpu_limit
env = [k_client.V1EnvVar(name='NVIDIA_VISIBLE_DEVICES', value='all'),
k_client.V1EnvVar(name='NVIDIA_DRIVER_CAPABILITIES',
value='compute,utility'),
k_client.V1EnvVar(name='NVIDIA_REQUIRE_CUDA', value='cuda>=9.0')],
env.append(k_client.V1EnvVar(name='NVIDIA_VISIBLE_DEVICES', value='all'))
env.append(k_client.V1EnvVar(name='NVIDIA_DRIVER_CAPABILITIES',
value='compute,utility'))
env.append(k_client.V1EnvVar(name='NVIDIA_REQUIRE_CUDA', value='cuda>=9.0'))

security_context = {
'allow_privilege_escalation': False,
Expand Down
Empty file modified pman/openshiftmgr.py
100644 → 100755
Empty file.
13 changes: 10 additions & 3 deletions pman/resources.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
parser = reqparse.RequestParser(bundle_errors=True)
parser.add_argument('jid', dest='jid', required=True)
parser.add_argument('args', dest='args', type=list, location='json', required=True)
parser.add_argument('args_path_flags', dest='args_path_flags', type=frozenset, location='json', required=False, default=frozenset())
parser.add_argument('args_path_flags', dest='args_path_flags', type=frozenset,
location='json', required=False, default=frozenset())
parser.add_argument('auid', dest='auid', required=True)
parser.add_argument('number_of_workers', dest='number_of_workers', type=int,
required=True)
parser.add_argument('cpu_limit', dest='cpu_limit', type=int, required=True)
parser.add_argument('memory_limit', dest='memory_limit', type=int, required=True)
parser.add_argument('gpu_limit', dest='gpu_limit', type=int, required=True)
parser.add_argument('image', dest='image', required=True)
parser.add_argument('entrypoint', dest='entrypoint', type=list, location='json', required=True)
parser.add_argument('entrypoint', dest='entrypoint', type=list, location='json',
required=True)
parser.add_argument('type', dest='type', choices=('ds', 'fs', 'ts'), required=True)
parser.add_argument('env', dest='env', type=list, location='json', default=[])


def get_compute_mgr(container_env):
Expand Down Expand Up @@ -67,6 +70,10 @@ def post(self):
if len(args.entrypoint) == 0:
abort(400, message='"entrypoint" cannot be empty')

for s in args.env:
if len(s.split('=', 1)) != 2:
abort(400, message='"env" must be a list of "key=value" strings')

job_id = args.jid.lstrip('/')

cmd = self.build_app_cmd(args.args, args.args_path_flags, args.entrypoint, args.type)
Expand All @@ -87,7 +94,7 @@ def post(self):
compute_mgr = get_compute_mgr(self.container_env)
try:
job = compute_mgr.schedule_job(args.image, cmd, job_id, resources_dict,
share_dir)
args.env, share_dir)
except ManagerException as e:
logger.error(f'Error from {self.container_env} while scheduling job '
f'{job_id}, detail: {str(e)}')
Expand Down
4 changes: 3 additions & 1 deletion pman/swarmmgr.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def __init__(self, config_dict=None):
else:
self.docker_client = docker.from_env(environment=self.config)

def schedule_job(self, image, command, name, resources_dict, mountdir=None) -> Service:
def schedule_job(self, image, command, name, resources_dict, env, mountdir=None) -> \
Service:
"""
Schedule a new job and return the job (swarm service) object.
"""
Expand All @@ -30,6 +31,7 @@ def schedule_job(self, image, command, name, resources_dict, mountdir=None) -> S
try:
job = self.docker_client.services.create(image, command,
name=name,
env=env,
mounts=mounts,
restart_policy=restart_policy,
tty=True)
Expand Down
Empty file modified pman/wsgi.py
100644 → 100755
Empty file.
Empty file modified requirements/base.txt
100644 → 100755
Empty file.
Empty file modified requirements/local.txt
100644 → 100755
Empty file.
Empty file modified requirements/production.txt
100644 → 100755
Empty file.
Empty file modified setup.cfg
100644 → 100755
Empty file.
Empty file modified setup.py
100644 → 100755
Empty file.
Empty file modified tests/__init__.py
100644 → 100755
Empty file.
Empty file modified tests/cromwell/__init__.py
100644 → 100755
Empty file.
Empty file modified tests/cromwell/examples/__init__.py
100644 → 100755
Empty file.
Empty file modified tests/cromwell/examples/metadata.py
100644 → 100755
Empty file.
Empty file modified tests/cromwell/examples/query.py
100644 → 100755
Empty file.
Empty file modified tests/cromwell/examples/wdl.py
100644 → 100755
Empty file.
Empty file modified tests/cromwell/helpers.py
100644 → 100755
Empty file.
Empty file modified tests/cromwell/test_client.py
100644 → 100755
Empty file.
Empty file modified tests/cromwell/test_cromwellmgr.py
100644 → 100755
Empty file.
Empty file modified tests/cromwell/test_wdl.py
100644 → 100755
Empty file.
Empty file modified tests/test_cmd.py
100644 → 100755
Empty file.
Empty file modified tests/test_openshiftmgr.py
100644 → 100755
Empty file.
Empty file modified tests/test_resources.py
100644 → 100755
Empty file.

0 comments on commit 22e5339

Please sign in to comment.