Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.3
1.1.4
8 changes: 3 additions & 5 deletions backend/kaos_backend/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import os

from flask import Flask
from kaos_backend.clients.pachyderm import PachydermClient
from kaos_backend.controllers.data import DataController
Expand All @@ -17,15 +16,14 @@
from kaos_backend.routes.train import build_train_blueprint
from kaos_backend.routes.workspace import build_workspace_blueprint
from kaos_backend.services.job_service import JobService
from python_pachyderm import PfsClient, PpsClient
from python_pachyderm import Client

PACHY_HOST = os.getenv("PACHD_SERVICE_HOST", "localhost")
PACHY_PORT = os.getenv("PACHD_SERVICE_PORT_API_GRPC_PORT", 30650)

pfs_client = PfsClient(PACHY_HOST, PACHY_PORT)
pps_client = PpsClient(PACHY_HOST, PACHY_PORT)
pclient = Client(PACHY_HOST, PACHY_PORT)

pachyderm_client = PachydermClient(pps_client, pfs_client)
pachyderm_client = PachydermClient(pclient)
job_service = JobService(pachyderm_client)

train_blueprint = build_train_blueprint(TrainController(job_service))
Expand Down
94 changes: 46 additions & 48 deletions backend/kaos_backend/clients/pachyderm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
from concurrent.futures import ThreadPoolExecutor

import bitstring
import python_pachyderm.client.pfs.pfs_pb2 as pfs_proto
import python_pachyderm.proto.pfs.pfs_pb2 as pfs_proto
import urllib3
from cgroupspy import trees
from flask import current_app as app
from kaos_backend.exceptions.exceptions import JobNotFoundError, PipelineNotFoundError, PipelineInStandby
from kaos_backend.util.error_handling import handle_pachyderm_error
from kaos_backend.util.protobuf import proto_to_dict
from psutil import virtual_memory
from python_pachyderm import PpsClient, PfsClient
from python_pachyderm.client.pps import pps_pb2 as proto
from python_pachyderm import Client
from python_pachyderm.proto.pps import pps_pb2 as proto
from urllib3 import PoolManager

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Expand All @@ -26,16 +26,14 @@ class PachydermClient:

GPU_TYPE = "nvidia.com/gpu"

def __init__(self, pps_client: PpsClient, pfs_client: PfsClient):
def __init__(self, pclient: Client):
"""
PachydermClient constructor.

Args:
pps_client (PpsClient): Pachyderm Pipeline System client
pfs_client (PfsClient): Pachyderm File System client
pclient (Client): Pachyderm System client
"""
self.pps_client = pps_client
self.pfs_client = pfs_client
self.pclient = pclient
self.pool = PoolManager()
# TODO: expose
self.max_workers = 20
Expand Down Expand Up @@ -74,7 +72,7 @@ def create_pipeline(self,

resource_limits, resource_requests = self.define_resources(cpu, gpu, memory)

return self.pps_client.create_pipeline(
return self.pclient.create_pipeline(
name,
description=description,
transform=transform,
Expand Down Expand Up @@ -125,11 +123,11 @@ def create_repo(self, repo: str, desc=None):
app.logger.debug("@%s: creating repo %s", PachydermClient.__name__, repo)

# make new repo (if needed)
repos = [r.repo.name for r in self.pfs_client.list_repo()]
repos = [r.repo.name for r in self.pclient.list_repo()]
if repo not in repos:
app.logger.debug("@%s: repo does not exists %s", PachydermClient.__name__, repo)
self.pfs_client.create_repo(repo, description=desc)
self.pfs_client.create_branch(repo, "master")
self.pclient.create_repo(repo, description=desc)
self.pclient.create_branch(repo, "master")
else:
app.logger.debug("@%s: repo exists %s", PachydermClient.__name__, repo)

Expand Down Expand Up @@ -161,21 +159,21 @@ def kill_notebook_service_pipeline(self):
def put_blobs(self, repo: str, blobs_list: list, desc=None):
app.logger.debug("@%s: put blobs in repository %s", PachydermClient.__name__, repo)
# keep single commit for input data
with self.pfs_client.commit(repo, 'master', description=desc) as c:
with self.pclient.commit(repo, 'master', description=desc) as c:
for blob in blobs_list:
self.pfs_client.put_file_bytes(c, blob['path'], blob['blob'], overwrite_index=0)
self.pclient.put_file_bytes(c, blob['path'], blob['blob'], overwrite_index=0)
commit_id = c.id

return commit_id

@handle_pachyderm_error
def put_blob(self, repo: str, path, blob, split_by_lines=None, desc=None):
app.logger.debug("@%s: put blob in repository %s", PachydermClient.__name__, repo)
with self.pfs_client.commit(repo, 'master', description=desc) as c:
with self.pclient.commit(repo, 'master', description=desc) as c:
delimiter = "LINE" if split_by_lines else None
app.logger.debug("putting blob split: %s; commit: %s", split_by_lines, c)
self.pfs_client.put_file_bytes(c, path, blob,
overwrite_index=0, delimiter=delimiter)
self.pclient.put_file_bytes(c, path, blob,
overwrite_index=0, delimiter=delimiter)

@handle_pachyderm_error
def put_dir_base(self, repo: str, path: str, upload_f: callable, desc=None):
Expand All @@ -185,7 +183,7 @@ def put_dir_base(self, repo: str, path: str, upload_f: callable, desc=None):
os.walk(path, followlinks=True))
for pathname in paths]
# keep single commit for input data
with self.pfs_client.commit(repo, 'master', description=desc) as c:
with self.pclient.commit(repo, 'master', description=desc) as c:
for file in files:
if os.stat(file).st_size != 0:
upload_f(path, c, file)
Expand All @@ -199,9 +197,9 @@ def upload_files(path: str, commit, file: str):
repo_name = commit.repo.name
app.logger.debug("@%s: upload files at path %s with commit id %s on repo %s",
PachydermClient.__name__, path, commit_id, repo_name)
self.pfs_client.put_file_bytes(commit, os.path.join(prefix, os.path.relpath(file, path)),
open(file, "rb").read(),
overwrite_index=0)
self.pclient.put_file_bytes(commit, os.path.join(prefix, os.path.relpath(file, path)),
open(file, "rb").read(),
overwrite_index=0)

return self.put_dir_base(repo, source_path, upload_files, desc)

Expand All @@ -211,8 +209,8 @@ def get_dir(self, repo: str, commit: str, path: str, out_dir=os.getcwd(), remove
objs = self.list_file(f"{repo}/{commit}", path=path, recursive=True)
for obj in objs:
obj_path = obj.file.path
x = self.pfs_client.get_file(f"{repo}/{commit}",
path=obj_path)
x = self.pclient.get_file(f"{repo}/{commit}",
path=obj_path)
if remove_prefix:
obj_path = os.path.relpath(obj_path, path)
# TODO -> attach <output_branch> when saving model (for consistency)
Expand All @@ -227,8 +225,8 @@ def get_blob(self, repo: str, commit: str, path: str):
app.logger.debug("@%s: get blob from repo %s at path %s with commit id %s", PachydermClient.__name__, repo,
path, commit)

file = self.pfs_client.get_file(f"{repo}/{commit}",
path=path)
file = self.pclient.get_file(f"{repo}/{commit}",
path=path)
stream = bitstring.BitStream()

for chunk in file:
Expand All @@ -239,22 +237,22 @@ def get_blob(self, repo: str, commit: str, path: str):
@handle_pachyderm_error
def list_pipelines(self):
app.logger.debug("@%s: list pipelines", PachydermClient.__name__)
return [r.pipeline.name for r in self.pps_client.list_pipeline().pipeline_info]
return [r.pipeline.name for r in self.pclient.list_pipeline().pipeline_info]

@handle_pachyderm_error
def list_repos(self):
app.logger.debug("@%s: list repo", PachydermClient.__name__)
return [r.repo.name for r in self.pfs_client.list_repo()]
return [r.repo.name for r in self.pclient.list_repo()]

@handle_pachyderm_error
def check_repo_empty(self, repo: str):
app.logger.debug("@%s: check repo %s is empty", PachydermClient.__name__, repo)
return self.pfs_client.inspect_repo(repo).size_bytes == 0
return self.pclient.inspect_repo(repo).size_bytes == 0

@handle_pachyderm_error
def inspect_pipeline(self, pipeline: str):
app.logger.debug("@%s: inspect pipeline %s", PachydermClient.__name__, pipeline)
return self.pps_client.inspect_pipeline(pipeline)
return self.pclient.inspect_pipeline(pipeline)

@handle_pachyderm_error
def check_pipeline_exists(self, pipeline: str):
Expand All @@ -264,32 +262,32 @@ def check_pipeline_exists(self, pipeline: str):
@handle_pachyderm_error
def check_repo_exists(self, repo: str):
app.logger.debug("@%s: check repo %s exists", PachydermClient.__name__, repo)
repos = [r.repo.name for r in self.pfs_client.list_repo()]
repos = [r.repo.name for r in self.pclient.list_repo()]
return repo in repos

@handle_pachyderm_error
def check_branch_exists(self, repo: str, branch: str):
app.logger.debug("@%s: check branch %s exists in %s", PachydermClient.__name__, branch, repo)
branches = [r.name for r in self.pfs_client.list_branch(repo)]
branches = [r.name for r in self.pclient.list_branch(repo)]
return branch in branches

@handle_pachyderm_error
def check_job_running(self, pipeline_name: str, job_id: str):
app.logger.debug("@%s: check job %s exists from %s", PachydermClient.__name__, job_id, pipeline_name)
jobs = [(r.job.id, r.state) for r in self.pps_client.list_job(pipeline_name, history=-1)]
jobs = [(r.job.id, r.state) for r in self.pclient.list_job(pipeline_name, history=-1)]
app.logger.debug(jobs)
return (job_id, 0) in jobs or (job_id, 1) in jobs

@handle_pachyderm_error
def check_job_exists(self, pipeline_name: str, job_id: str):
app.logger.debug("@%s: check job %s exists from %s", PachydermClient.__name__, job_id, pipeline_name)
jobs = [r.job.id for r in self.pps_client.list_job(pipeline_name, history=-1)]
jobs = [r.job.id for r in self.pclient.list_job(pipeline_name, history=-1)]
return job_id in jobs

@handle_pachyderm_error
def list_file(self, commit: str, path: str, recursive=False, history=-1):
app.logger.debug("@%s: list file from commit %s in path %s", PachydermClient.__name__, commit, path)
file_infos = list(self.pfs_client.list_file(commit, path, history=history))
file_infos = list(self.pclient.list_file(commit, path, history=history))

if recursive:
dirs = [f for f in file_infos if f.file_type == pfs_proto.DIR]
Expand All @@ -301,32 +299,32 @@ def list_file(self, commit: str, path: str, recursive=False, history=-1):
@handle_pachyderm_error
def inspect_file(self, commit: str, path: str):
app.logger.debug("@%s: list file from commit %s in path %s", PachydermClient.__name__, commit, path)
return self.pfs_client.inspect_file(commit=commit, path=path)
return self.pclient.inspect_file(commit=commit, path=path)

@handle_pachyderm_error
def list_commit(self, repo: str, to_commit=None):
app.logger.debug("@%s: list commit %s in repo %s", PachydermClient.__name__, to_commit, repo)
return self.pfs_client.list_commit(repo_name=repo, to_commit=to_commit)
return self.pclient.list_commit(repo_name=repo, to_commit=to_commit)

@handle_pachyderm_error
def inspect_commit(self, commit: str):
app.logger.debug("@%s: inspect commit %s", PachydermClient.__name__, commit)
return self.pfs_client.inspect_commit(commit)
return self.pclient.inspect_commit(commit)

@handle_pachyderm_error
def inspect_job(self, job_id: str):
app.logger.debug("@%s: inspect job %s", PachydermClient.__name__, job_id)
return self.pps_client.inspect_job(job_id=job_id)
return self.pclient.inspect_job(job_id=job_id)

@handle_pachyderm_error
def delete_repo(self, repo_name: str):
app.logger.debug("@%s: delete repo %s", PachydermClient.__name__, repo_name)
return self.pfs_client.delete_repo(repo_name, force=True)
return self.pclient.delete_repo(repo_name, force=True)

@handle_pachyderm_error
def delete_pipeline(self, pipeline_name):
app.logger.debug("@%s: delete pipeline %s", PachydermClient.__name__, pipeline_name)
return self.pps_client.delete_pipeline(pipeline_name)
return self.pclient.delete_pipeline(pipeline_name)

@handle_pachyderm_error
def delete_job(self, pipeline_name, job_id):
Expand All @@ -339,7 +337,7 @@ def delete_job(self, pipeline_name, job_id):
if not self.check_job_exists(pipeline_name, job_id):
raise JobNotFoundError(job_id)

return self.pps_client.delete_job(job_id)
return self.pclient.delete_job(job_id)

@handle_pachyderm_error
def get_job_logs(self, pipeline_name, job_id):
Expand All @@ -358,7 +356,7 @@ def get_job_logs(self, pipeline_name, job_id):
app.logger.debug("@%s: job %s not complete - using pipeline logs!", PachydermClient.__name__, job_id)
return self.get_pipeline_logs(pipeline_name)
else:
return self.pps_client.get_job_logs(job_id=job_id)
return self.pclient.get_job_logs(job_id=job_id)

@handle_pachyderm_error
def get_pipeline_logs(self, pipeline_name):
Expand All @@ -372,26 +370,26 @@ def get_pipeline_logs(self, pipeline_name):
if self.inspect_pipeline(pipeline_name).state == 5:
raise PipelineInStandby(pipeline_name)

return self.pps_client.get_pipeline_logs(pipeline_name=pipeline_name)
return self.pclient.get_pipeline_logs(pipeline_name=pipeline_name)

@handle_pachyderm_error
def get_jobs(self, pipeline_name: str, history=-1):
app.logger.debug("@%s: list jobs from pipeline %s", PachydermClient.__name__, pipeline_name)
job_iterator = self.pps_client.list_job(pipeline_name=pipeline_name, history=history)
job_iterator = self.pclient.list_job(pipeline_name=pipeline_name, history=history)
return [job for job in job_iterator]

@handle_pachyderm_error
def get_job_info(self, job_id: str):
app.logger.debug("@%s: inspect jobs from job %s", PachydermClient.__name__, job_id)
return self.pps_client.inspect_job(job_id)
return self.pclient.inspect_job(job_id)

@handle_pachyderm_error
def list_datum(self, job_id):
app.logger.debug("@%s: list datum by job %s", PachydermClient.__name__, job_id)
return [datum for datum in self.pps_client.list_datum(job_id)]
return [datum for datum in self.pclient.list_datum(job_id)]

@handle_pachyderm_error
def delete_all(self):
app.logger.debug("@%s: delete all", PachydermClient.__name__)
self.pps_client.delete_all()
self.pfs_client.delete_all()
self.pclient.delete_all()
self.pclient.delete_all()
4 changes: 2 additions & 2 deletions backend/kaos_backend/controllers/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from kaos_backend.services.job_service import JobService
from kaos_backend.util.tests import create_zip_file
from kaos_backend.util.validators import BundleValidator
from python_pachyderm import PpsClient, PfsClient
from python_pachyderm import Client


def t_any(cls):
Expand All @@ -27,7 +27,7 @@ def create_job_service(mocker, workspaces=None):
service: `kaos_backend.services.JobService`

"""
client = PachydermClient(PpsClient(), PfsClient())
client = PachydermClient(Client())
service = JobService(client)

def check_pipeline_exists_mock(pipeline_name):
Expand Down
6 changes: 3 additions & 3 deletions backend/kaos_backend/services/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from io import StringIO

import docker
import python_pachyderm.client.pps.pps_pb2 as proto
import python_pachyderm.proto.pps.pps_pb2 as proto
from flask import current_app as app
from kaos_backend.clients.pachyderm import PachydermClient
from kaos_backend.constants import BUILD_IMAGE, BUILD_NOTEBOOK_PIPELINE_PREFIX, BUILD_SERVE_PIPELINE_PREFIX, \
Expand Down Expand Up @@ -875,7 +875,7 @@ def define_train_pipeline(self,

# build dynamic output_branch
output_branch = self.build_output_branch(image_name, data_name, hyper_name)
self.client.pfs_client.create_branch(repo_name=pipeline_name, branch_name=output_branch)
self.client.pclient.create_branch(repo_name=pipeline_name, branch_name=output_branch)

data_input = proto.Input(pfs=proto.PFSInput(glob=f"/{data_name}",
repo=data_repo,
Expand Down Expand Up @@ -931,7 +931,7 @@ def update_training_pipeline(self, pipeline_name: str, payload: dict):
app.logger.debug(pipeline_def["output_branch"])

if not self.client.check_branch_exists(repo=pipeline_name, branch=pipeline_def["output_branch"]):
self.client.pfs_client.create_branch(repo_name=pipeline_name, branch_name=pipeline_def["output_branch"])
self.client.pclient.create_branch(repo_name=pipeline_name, branch_name=pipeline_def["output_branch"])

# format according to create_pipeline
data_input = proto.Input(pfs=proto.PFSInput(glob=pipeline_def["data_glob"],
Expand Down
2 changes: 1 addition & 1 deletion backend/kaos_backend/util/error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def wrapper(*args, **kwargs):
return a
except _Rendezvous as e:
print(e.debug_error_string())
err_desc = json.loads(e.debug_error_string())
err_desc = json.loads(e.debug_error_string().replace(" \"", " ").replace("\" ", " "))
current_app.logger.error("@handle_pachyderm_error: %s", str(err_desc))

grpc_message = err_desc.get(GRPC_MESSAGE, None)
Expand Down
4 changes: 4 additions & 0 deletions backend/kaos_backend/util/flask.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
from flask import jsonify as flask_jsonify
from google.protobuf.empty_pb2 import Empty

from kaos_model.api import Response, PagedResponse, Error

Expand All @@ -11,5 +12,8 @@ def wrapped(*args, **kwargs):
obj = f(*args, **kwargs)
if type(obj) in (Response, PagedResponse, Error):
obj = obj.to_dict()
if type(obj) is dict:
obj = {k: None if isinstance(v, Empty) else v for k, v in obj.items()}
return flask_jsonify(obj)

return wrapped
2 changes: 1 addition & 1 deletion backend/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Flask==1.0.2
gunicorn==19.9.0
Werkzeug==0.15.3
python-pachyderm==1.9.0.post5
python-pachyderm==2.4.0
protobuf==3.8.0
docker==3.7.2
graphviz==0.10.1
Expand Down
Loading