From 7726af5f4a175840bcac8b0158512c6a6eb57d19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Peri=C4=87?= Date: Mon, 26 Jun 2023 16:52:02 +0200 Subject: [PATCH 01/12] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 695944b1..7ae37600 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ if __name__ == "__main__": initializeProject(main) ``` -(Read the documentation and learn how you can migrate your project to the Coretex platform -> [Migrate your project to Coretex](https://app.gitbook.com/o/6QxmEiF5ygi67vFH3kV1/s/YoN0XCeop3vrJ0hyRKxx/getting-started/demo-experiments/migrate-your-project-to-coretex)) +Read the documentation and learn how you can migrate your project to the Coretex platform -> [Migrate your project to Coretex](https://app.gitbook.com/o/6QxmEiF5ygi67vFH3kV1/s/YoN0XCeop3vrJ0hyRKxx/getting-started/demo-experiments/migrate-your-project-to-coretex) ## Key Features From 26b8bf011b3c4bdef5fe47ace04e744c4a41c007 Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Wed, 28 Jun 2023 16:23:12 +0200 Subject: [PATCH 02/12] CTX-3890: Changed width and height variables from float to int --- coretex/coretex/annotation/image/bbox.py | 40 ++++++++-------- .../annotation/image/coretex_format.py | 46 +++++++++---------- 2 files changed, 43 insertions(+), 43 deletions(-) diff --git a/coretex/coretex/annotation/image/bbox.py b/coretex/coretex/annotation/image/bbox.py index 68e0e71f..a5d47ed4 100644 --- a/coretex/coretex/annotation/image/bbox.py +++ b/coretex/coretex/annotation/image/bbox.py @@ -28,17 +28,17 @@ class BBox(Codable): Properties ---------- - minX : float + minX : int top left x coordinate - minY : float + minY : int top left y coordinate - width : float + width : int width of the bounding box - height : float + height : int height of the bounding box """ - def __init__(self, minX: float = 0, minY: float = 0, width: float = 0, height: float = 0) -> None: + def __init__(self, minX: int = 0, minY: int = 0, width: int = 0, height: int = 0) -> None: self.minX: Final = minX self.minY: Final = minY @@ -46,31 +46,31 @@ def __init__(self, minX: float = 0, minY: float = 0, width: float = 0, height: f self.height: Final = height @property - def maxX(self) -> float: + def maxX(self) -> int: """ Returns ------- - float -> bottom right x coordinate + int -> bottom right x coordinate """ return self.minX + self.width @property - def maxY(self) -> float: + def maxY(self) -> int: """ Returns ------- - float -> bottom right y coordinate + int -> bottom right y coordinate """ return self.minY + self.height @property - def polygon(self) -> List[float]: + def polygon(self) -> List[int]: """ Returns ------- - List[float] -> Bounding box represented as a polygon (x, y) values + List[int] -> Bounding box represented as a polygon (x, y) values """ return [ @@ -91,20 +91,20 @@ def _keyDescriptors(cls) -> Dict[str, KeyDescriptor]: return descriptors @classmethod - def create(cls, minX: float, minY: float, maxX: float, maxY: float) -> Self: + def create(cls, minX: int, minY: int, maxX: int, maxY: int) -> Self: """ Utility constructor which has maxX and maxY as parameters instead of width and height Parameters ---------- - minX : float + minX : int top left x coordinate - minY : float + minY : int top left y coordinate - maxX : float + maxX : int bottom right x coordinate - maxY : float + maxY : int bottom right y coordinate Returns @@ -115,7 +115,7 @@ def create(cls, minX: float, minY: float, maxX: float, maxY: float) -> Self: return cls(minX, minY, maxX - minX, maxY - minY) @classmethod - def fromPoly(cls, polygon: List[float]) -> Self: + def fromPoly(cls, polygon: List[int]) -> Self: """ Creates bounding box from a polygon, by finding the minimum x and y coordinates and calculating @@ -123,7 +123,7 @@ def fromPoly(cls, polygon: List[float]) -> Self: Parameters ---------- - polygon : List[float] + polygon : List[int] list of x, y points - length must be even Returns @@ -140,8 +140,8 @@ def fromPoly(cls, polygon: List[float]) -> Self: "minX: 0, minY: 0, width: 4, height: 3" """ - x: List[float] = [] - y: List[float] = [] + x: List[int] = [] + y: List[int] = [] for index, value in enumerate(polygon): if index % 2 == 0: diff --git a/coretex/coretex/annotation/image/coretex_format.py b/coretex/coretex/annotation/image/coretex_format.py index 7333c323..d7502c68 100644 --- a/coretex/coretex/annotation/image/coretex_format.py +++ b/coretex/coretex/annotation/image/coretex_format.py @@ -29,11 +29,11 @@ from ....codable import Codable, KeyDescriptor -SegmentationType = List[float] +SegmentationType = List[int] -def toPoly(segmentation: List[float]) -> List[Tuple[float, float]]: - points: List[Tuple[float, float]] = [] +def toPoly(segmentation: List[int]) -> List[Tuple[int, int]]: + points: List[Tuple[int, int]] = [] for index in range(0, len(segmentation) - 1, 2): points.append((segmentation[index], segmentation[index + 1])) @@ -145,42 +145,42 @@ def extractBinaryMask(self, width: int, height: int) -> np.ndarray: return binaryMask - def centroid(self) -> Tuple[float, float]: + def centroid(self) -> Tuple[int, int]: """ Calculates centroid of segmentations Returns ------- - Tuple[float, float] -> x, y coordinates of centroid + Tuple[int, int] -> x, y coordinates of centroid """ flattenedSegmentations = [element for sublist in self.segmentations for element in sublist] listCX = [value for index, value in enumerate(flattenedSegmentations) if index % 2 == 0] - centerX = fsum(listCX) / len(listCX) + centerX = int(sum(listCX) / len(listCX)) listCY = [value for index, value in enumerate(flattenedSegmentations) if index % 2 != 0] - centerY = fsum(listCY) / len(listCY) + centerY = int(sum(listCY) / len(listCY)) return centerX, centerY - def centerSegmentations(self, newCentroid: Tuple[float, float]) -> None: + def centerSegmentations(self, newCentroid: Tuple[int, int]) -> None: """ Centers segmentations to the specified center point Parameters ---------- - newCentroid : Tuple[float, float] + newCentroid : Tuple[int, int] x, y coordinates of centroid """ newCenterX, newCenterY = newCentroid oldCenterX, oldCenterY = self.centroid() - modifiedSegmentations: List[List[float]] = [] + modifiedSegmentations: List[List[int]] = [] for segmentation in self.segmentations: - modifiedSegmentation: List[float] = [] + modifiedSegmentation: List[int] = [] for i in range(0, len(segmentation), 2): x = segmentation[i] + (newCenterX - oldCenterX) @@ -203,7 +203,7 @@ def rotateSegmentations(self, degrees: int) -> None: degree of rotation """ - rotatedSegmentations: List[List[float]] = [] + rotatedSegmentations: List[List[int]] = [] centerX, centerY = self.centroid() # because rotations with image and segmentations doesn't go in same direction @@ -212,14 +212,14 @@ def rotateSegmentations(self, degrees: int) -> None: cosang, sinang = cos(theta), sin(theta) for segmentation in self.segmentations: - rotatedSegmentation: List[float] = [] + rotatedSegmentation: List[int] = [] for i in range(0, len(segmentation), 2): x = segmentation[i] - centerX y = segmentation[i + 1] - centerY - newX = (x * cosang - y * sinang) + centerX - newY = (x * sinang + y * cosang) + centerY + newX = int(x * cosang - y * sinang) + centerX + newY = int(x * sinang + y * cosang) + centerY rotatedSegmentation.append(newX) rotatedSegmentation.append(newY) @@ -238,17 +238,17 @@ class CoretexImageAnnotation(Codable): ---------- name : str name of annotation class - width : float + width : int width of annotation - height : float + height : int height of annotation instances : List[CoretexSegmentationInstance] list of SegmentationInstance objects """ name: str - width: float - height: float + width: int + height: int instances: List[CoretexSegmentationInstance] @classmethod @@ -262,8 +262,8 @@ def _keyDescriptors(cls) -> Dict[str, KeyDescriptor]: def create( cls, name: str, - width: float, - height: float, + width: int, + height: int, instances: List[CoretexSegmentationInstance] ) -> Self: """ @@ -273,9 +273,9 @@ def create( ---------- name : str name of annotation class - width : float + width : int width of annotation - height : float + height : int height of annotation instances : List[CoretexSegmentationInstance] list of SegmentationInstance objects From 3ecb7638f37d95e4cab344b39159cd9863a4c63c Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Thu, 29 Jun 2023 10:18:52 +0200 Subject: [PATCH 03/12] CTX-3890: Converted wider code to be compatible with int based bounding boxes --- .../conversion/converters/create_ml_converter.py | 6 +++--- .../converters/human_segmentation_converter.py | 4 ++-- .../conversion/converters/label_me_converter.py | 2 +- .../conversion/converters/pascal/instance_extractor.py | 2 +- .../converters/pascal/pascal_2012_converter.py | 4 ++-- coretex/coretex/conversion/converters/pascal/shared.py | 10 ++++++++++ coretex/coretex/conversion/converters/voc_converter.py | 4 ++-- .../coretex/conversion/converters/yolo_converter.py | 8 ++++---- .../dataset/image_dataset/synthetic_image_generator.py | 8 ++++---- 9 files changed, 29 insertions(+), 19 deletions(-) diff --git a/coretex/coretex/conversion/converters/create_ml_converter.py b/coretex/coretex/conversion/converters/create_ml_converter.py index 9e9054b6..47c37228 100644 --- a/coretex/coretex/conversion/converters/create_ml_converter.py +++ b/coretex/coretex/conversion/converters/create_ml_converter.py @@ -53,10 +53,10 @@ def _extractLabels(self) -> Set[str]: return labels - def __extractBBox(self, bbox: Dict[str, float]) -> BBox: + def __extractBBox(self, bbox: Dict[str, int]) -> BBox: return BBox( - bbox["x"] - bbox["width"] / 2, - bbox["y"] - bbox["height"] / 2, + bbox["x"] - int(bbox["width"] / 2), + bbox["y"] - int(bbox["height"] / 2), bbox["width"], bbox["height"] ) diff --git a/coretex/coretex/conversion/converters/human_segmentation_converter.py b/coretex/coretex/conversion/converters/human_segmentation_converter.py index 0bc8d68a..61bd28bf 100644 --- a/coretex/coretex/conversion/converters/human_segmentation_converter.py +++ b/coretex/coretex/conversion/converters/human_segmentation_converter.py @@ -65,7 +65,7 @@ def _dataSource(self) -> List[str]: def _extractLabels(self) -> Set[str]: return set(["background", "human"]) - def __extractPolygons(self, annotationPath: str, imageWidth: int, imageHeight: int) -> List[List[float]]: + def __extractPolygons(self, annotationPath: str, imageWidth: int, imageHeight: int) -> List[List[int]]: maskImage = Image.open(annotationPath) if maskImage is None: return [] @@ -81,7 +81,7 @@ def __extractPolygons(self, annotationPath: str, imageWidth: int, imageHeight: i contours = skimage.measure.find_contours(subMaskArray, 0.5) - segmentations: List[List[float]] = [] + segmentations: List[List[int]] = [] for contour in contours: for i in range(len(contour)): row, col = contour[i] diff --git a/coretex/coretex/conversion/converters/label_me_converter.py b/coretex/coretex/conversion/converters/label_me_converter.py index abddc311..ebcf45db 100644 --- a/coretex/coretex/conversion/converters/label_me_converter.py +++ b/coretex/coretex/conversion/converters/label_me_converter.py @@ -62,7 +62,7 @@ def __extractInstance(self, shape: Dict[str, Any]) -> Optional[CoretexSegmentati logging.getLogger("coretexpylib").info(f">> [Coretex] Class: ({label}) is not a part of dataset") return None - points: List[float] = np.array(shape["points"]).flatten().tolist() + points: List[int] = np.array(shape["points"]).flatten().tolist() bbox = BBox.fromPoly(points) return CoretexSegmentationInstance.create(coretexClass.classIds[0], bbox, [points]) diff --git a/coretex/coretex/conversion/converters/pascal/instance_extractor.py b/coretex/coretex/conversion/converters/pascal/instance_extractor.py index 0aa4b12f..f06a127d 100644 --- a/coretex/coretex/conversion/converters/pascal/instance_extractor.py +++ b/coretex/coretex/conversion/converters/pascal/instance_extractor.py @@ -31,7 +31,7 @@ from ....dataset import ImageDataset -ContourPoints = List[List[float]] +ContourPoints = List[List[int]] SegmentationPolygon = List[ContourPoints] diff --git a/coretex/coretex/conversion/converters/pascal/pascal_2012_converter.py b/coretex/coretex/conversion/converters/pascal/pascal_2012_converter.py index cff792d3..4acf8a3a 100644 --- a/coretex/coretex/conversion/converters/pascal/pascal_2012_converter.py +++ b/coretex/coretex/conversion/converters/pascal/pascal_2012_converter.py @@ -21,7 +21,7 @@ import glob import xml.etree.ElementTree as ET -from .shared import getTag, toFloat +from .shared import getTag, toInt from .instance_extractor import InstanceExtractor from ...base_converter import BaseConverter from .....coretex import CoretexImageAnnotation @@ -84,7 +84,7 @@ def __extractImageAnnotation(self, root: ET.Element) -> None: if size is None: return - width, height = toFloat(size, "width", "height") + width, height = toInt(size, "width", "height") if width is None or height is None: return diff --git a/coretex/coretex/conversion/converters/pascal/shared.py b/coretex/coretex/conversion/converters/pascal/shared.py index efc7e322..3f3acb57 100644 --- a/coretex/coretex/conversion/converters/pascal/shared.py +++ b/coretex/coretex/conversion/converters/pascal/shared.py @@ -38,6 +38,16 @@ def toFloat(rootEl: ET.Element, firstEl: str, secondEl: str) -> Tuple[Optional[f return (float(firstVal), float(secondVal)) +def toInt(rootEl: ET.Element, firstEl: str, secondEl: str) -> Tuple[Optional[int], Optional[int]]: + firstVal = getTag(rootEl, firstEl) + secondVal = getTag(rootEl, secondEl) + + if firstVal is None or secondVal is None: + return (None, None) + + return (int(firstVal), int(secondVal)) + + def getBoxes(bndbox: ET.Element) -> Optional[Dict[str, float]]: xmin, ymin = toFloat(bndbox, "xmin", "ymin") xmax, ymax = toFloat(bndbox, "xmax", "ymax") diff --git a/coretex/coretex/conversion/converters/voc_converter.py b/coretex/coretex/conversion/converters/voc_converter.py index 7cf43718..d4950957 100644 --- a/coretex/coretex/conversion/converters/voc_converter.py +++ b/coretex/coretex/conversion/converters/voc_converter.py @@ -22,7 +22,7 @@ import glob import xml.etree.ElementTree as ET -from .pascal.shared import getTag, getBoxes, toFloat +from .pascal.shared import getTag, getBoxes, toInt from ..base_converter import BaseConverter from ...annotation import CoretexImageAnnotation, CoretexSegmentationInstance, BBox @@ -91,7 +91,7 @@ def _extractImageAnnotation(self, root: ET.Element) -> None: if size is None: return - width, height = toFloat(size, "width", "height") + width, height = toInt(size, "width", "height") if width is None or height is None: return diff --git a/coretex/coretex/conversion/converters/yolo_converter.py b/coretex/coretex/conversion/converters/yolo_converter.py index 99eb6365..58938da8 100644 --- a/coretex/coretex/conversion/converters/yolo_converter.py +++ b/coretex/coretex/conversion/converters/yolo_converter.py @@ -67,10 +67,10 @@ def __extractBBox(self, rawInstance: List[str], width: int, height: int) -> BBox wYolo = float(rawInstance[3]) hYolo = float(rawInstance[4]) - boxWidth = wYolo * width - boxHeight = hYolo * height - xMin = float(xYolo * width - (boxWidth / 2)) - yMin = float(yYolo * height - (boxHeight / 2)) + boxWidth = int(wYolo * width) + boxHeight = int(hYolo * height) + xMin = int(xYolo * width - (boxWidth / 2)) + yMin = int(yYolo * height - (boxHeight / 2)) return BBox(xMin, yMin, boxWidth, boxHeight) diff --git a/coretex/coretex/dataset/image_dataset/synthetic_image_generator.py b/coretex/coretex/dataset/image_dataset/synthetic_image_generator.py index 6d20e0bb..d431340f 100644 --- a/coretex/coretex/dataset/image_dataset/synthetic_image_generator.py +++ b/coretex/coretex/dataset/image_dataset/synthetic_image_generator.py @@ -108,9 +108,9 @@ def composeImage( backgroundImage: np.ndarray, angle: int, scale: float -) -> Tuple[PILImage, List[Tuple[float, float]]]: +) -> Tuple[PILImage, List[Tuple[int, int]]]: - centroids: List[Tuple[float, float]] = [] + centroids: List[Tuple[int, int]] = [] locations: List[Tuple[int, int, int, int]] = [] background = Image.fromarray(backgroundImage) @@ -136,8 +136,8 @@ def composeImage( background.paste(resizedImage, (x, y), resizedImage) - centerX = x + resizedImage.width / 2 - centerY = y + resizedImage.height / 2 + centerX = x + int(resizedImage.width / 2) + centerY = y + int(resizedImage.height / 2) centroids.append((centerX, centerY)) From 2c84ef3619086a295337bde5832b3f741adaf703 Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Thu, 29 Jun 2023 10:46:09 +0200 Subject: [PATCH 04/12] CTX-3890: Changed division to // --- coretex/coretex/annotation/image/coretex_format.py | 4 ++-- coretex/coretex/conversion/converters/create_ml_converter.py | 4 ++-- .../dataset/image_dataset/synthetic_image_generator.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/coretex/coretex/annotation/image/coretex_format.py b/coretex/coretex/annotation/image/coretex_format.py index d7502c68..396d9e60 100644 --- a/coretex/coretex/annotation/image/coretex_format.py +++ b/coretex/coretex/annotation/image/coretex_format.py @@ -157,10 +157,10 @@ def centroid(self) -> Tuple[int, int]: flattenedSegmentations = [element for sublist in self.segmentations for element in sublist] listCX = [value for index, value in enumerate(flattenedSegmentations) if index % 2 == 0] - centerX = int(sum(listCX) / len(listCX)) + centerX = sum(listCX) // len(listCX) listCY = [value for index, value in enumerate(flattenedSegmentations) if index % 2 != 0] - centerY = int(sum(listCY) / len(listCY)) + centerY = sum(listCY) // len(listCY) return centerX, centerY diff --git a/coretex/coretex/conversion/converters/create_ml_converter.py b/coretex/coretex/conversion/converters/create_ml_converter.py index 47c37228..2ca23d7d 100644 --- a/coretex/coretex/conversion/converters/create_ml_converter.py +++ b/coretex/coretex/conversion/converters/create_ml_converter.py @@ -55,8 +55,8 @@ def _extractLabels(self) -> Set[str]: def __extractBBox(self, bbox: Dict[str, int]) -> BBox: return BBox( - bbox["x"] - int(bbox["width"] / 2), - bbox["y"] - int(bbox["height"] / 2), + bbox["x"] - bbox["width"] // 2, + bbox["y"] - bbox["height"] // 2, bbox["width"], bbox["height"] ) diff --git a/coretex/coretex/dataset/image_dataset/synthetic_image_generator.py b/coretex/coretex/dataset/image_dataset/synthetic_image_generator.py index d431340f..d7b56645 100644 --- a/coretex/coretex/dataset/image_dataset/synthetic_image_generator.py +++ b/coretex/coretex/dataset/image_dataset/synthetic_image_generator.py @@ -136,8 +136,8 @@ def composeImage( background.paste(resizedImage, (x, y), resizedImage) - centerX = x + int(resizedImage.width / 2) - centerY = y + int(resizedImage.height / 2) + centerX = x + resizedImage.width // 2 + centerY = y + resizedImage.height // 2 centroids.append((centerX, centerY)) From 3ff34522147d5579b513975f312ce03cb45e5aa9 Mon Sep 17 00:00:00 2001 From: Dusko Mirkovic Date: Thu, 29 Jun 2023 16:27:26 +0200 Subject: [PATCH 05/12] CTX-4166: Implemented Sequence sample import --- coretex/coretex/dataset/__init__.py | 1 + .../dataset/sequence_dataset/__init__.py | 19 ++++ .../coretex/dataset/sequence_dataset/base.py | 19 ++++ .../local_sequence_dataset.py | 33 ++++++ .../sequence_dataset/sequence_dataset.py | 38 +++++++ coretex/coretex/sample/__init__.py | 2 + .../sample/custom_sample/custom_sample.py | 23 +--- .../sample/sequence_sample/__init__.py | 19 ++++ .../sequence_sample/local_sequence_sample.py | 70 ++++++++++++ .../sample/sequence_sample/sequence_sample.py | 102 ++++++++++++++++++ coretex/coretex/sample/utils.py | 79 ++++++++++++++ coretex/utils/file.py | 24 +++++ 12 files changed, 408 insertions(+), 21 deletions(-) create mode 100644 coretex/coretex/dataset/sequence_dataset/__init__.py create mode 100644 coretex/coretex/dataset/sequence_dataset/base.py create mode 100644 coretex/coretex/dataset/sequence_dataset/local_sequence_dataset.py create mode 100644 coretex/coretex/dataset/sequence_dataset/sequence_dataset.py create mode 100644 coretex/coretex/sample/sequence_sample/__init__.py create mode 100644 coretex/coretex/sample/sequence_sample/local_sequence_sample.py create mode 100644 coretex/coretex/sample/sequence_sample/sequence_sample.py create mode 100644 coretex/coretex/sample/utils.py diff --git a/coretex/coretex/dataset/__init__.py b/coretex/coretex/dataset/__init__.py index c1c0b53f..b92d5570 100644 --- a/coretex/coretex/dataset/__init__.py +++ b/coretex/coretex/dataset/__init__.py @@ -23,3 +23,4 @@ from .local_dataset import LocalDataset from .network_dataset import NetworkDataset from .utils import downloadDataset +from .sequence_dataset import SequenceDataset, LocalSequenceDataset diff --git a/coretex/coretex/dataset/sequence_dataset/__init__.py b/coretex/coretex/dataset/sequence_dataset/__init__.py new file mode 100644 index 00000000..34aef8c5 --- /dev/null +++ b/coretex/coretex/dataset/sequence_dataset/__init__.py @@ -0,0 +1,19 @@ +# Copyright (C) 2023 Coretex LLC + +# This file is part of Coretex.ai + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from .sequence_dataset import SequenceDataset +from .local_sequence_dataset import LocalSequenceDataset diff --git a/coretex/coretex/dataset/sequence_dataset/base.py b/coretex/coretex/dataset/sequence_dataset/base.py new file mode 100644 index 00000000..d9d3f654 --- /dev/null +++ b/coretex/coretex/dataset/sequence_dataset/base.py @@ -0,0 +1,19 @@ +# Copyright (C) 2023 Coretex LLC + +# This file is part of Coretex.ai + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +class BaseSequenceDataset: + pass diff --git a/coretex/coretex/dataset/sequence_dataset/local_sequence_dataset.py b/coretex/coretex/dataset/sequence_dataset/local_sequence_dataset.py new file mode 100644 index 00000000..f1d4a366 --- /dev/null +++ b/coretex/coretex/dataset/sequence_dataset/local_sequence_dataset.py @@ -0,0 +1,33 @@ +# Copyright (C) 2023 Coretex LLC + +# This file is part of Coretex.ai + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from pathlib import Path + +from .base import BaseSequenceDataset +from ..local_dataset import LocalDataset +from ...sample import LocalSequenceSample + + +class LocalSequenceDataset(BaseSequenceDataset, LocalDataset[LocalSequenceSample]): + + """ + Local Sequence Dataset class which is used for Datasets whose + samples contain sequence data (.fasta, .fastq) + """ + + def __init__(self, path: Path) -> None: + super().__init__(path, LocalSequenceSample) diff --git a/coretex/coretex/dataset/sequence_dataset/sequence_dataset.py b/coretex/coretex/dataset/sequence_dataset/sequence_dataset.py new file mode 100644 index 00000000..45631f2a --- /dev/null +++ b/coretex/coretex/dataset/sequence_dataset/sequence_dataset.py @@ -0,0 +1,38 @@ +# Copyright (C) 2023 Coretex LLC + +# This file is part of Coretex.ai + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Dict + +from .base import BaseSequenceDataset +from ..network_dataset import NetworkDataset +from ...sample import SequenceSample +from ....codable import KeyDescriptor + + +class SequenceDataset(BaseSequenceDataset, NetworkDataset[SequenceSample]): + + """ + Sequence Dataset class which is used for Datasets whose + samples contain sequence data (.fasta, .fastq) + """ + + @classmethod + def _keyDescriptors(cls) -> Dict[str, KeyDescriptor]: + descriptors = super()._keyDescriptors() + descriptors["samples"] = KeyDescriptor("sessions", SequenceSample, list) + + return descriptors diff --git a/coretex/coretex/sample/__init__.py b/coretex/coretex/sample/__init__.py index c3a0e54a..47afe1e2 100644 --- a/coretex/coretex/sample/__init__.py +++ b/coretex/coretex/sample/__init__.py @@ -23,3 +23,5 @@ from .sample import Sample from .local_sample import LocalSample from .network_sample import NetworkSample +from .sequence_sample import LocalSequenceSample, SequenceSample +from .utils import chunkSampleImport diff --git a/coretex/coretex/sample/custom_sample/custom_sample.py b/coretex/coretex/sample/custom_sample/custom_sample.py index 133fcf14..a6232b5a 100644 --- a/coretex/coretex/sample/custom_sample/custom_sample.py +++ b/coretex/coretex/sample/custom_sample/custom_sample.py @@ -21,9 +21,8 @@ from .custom_sample_data import CustomSampleData from .local_custom_sample import LocalCustomSample +from ..utils import chunkSampleImport from ..network_sample import NetworkSample -from ....networking import networkManager, ChunkUploadSession, MAX_CHUNK_SIZE -from ....utils.file import isArchive class CustomSample(NetworkSample[CustomSampleData], LocalCustomSample): @@ -75,23 +74,5 @@ def createCustomSample( print("Failed to create custom sample") """ - if isinstance(filePath, str): - filePath = Path(filePath) - - if not isArchive(filePath): - raise ValueError(">> [Coretex] File must be an archive [.zip, .tar, .tar.gz]") - - uploadSession = ChunkUploadSession(MAX_CHUNK_SIZE, filePath) - uploadId = uploadSession.run() - - parameters = { - "name": name, - "dataset_id": datasetId, - "file_id": uploadId - } - - response = networkManager.genericFormData("session/import", parameters) - if response.hasFailed(): - return None - + response = chunkSampleImport(name, datasetId, filePath) return cls.decode(response.json) diff --git a/coretex/coretex/sample/sequence_sample/__init__.py b/coretex/coretex/sample/sequence_sample/__init__.py new file mode 100644 index 00000000..2c43043c --- /dev/null +++ b/coretex/coretex/sample/sequence_sample/__init__.py @@ -0,0 +1,19 @@ +# Copyright (C) 2023 Coretex LLC + +# This file is part of Coretex.ai + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from .local_sequence_sample import LocalSequenceSample +from .sequence_sample import SequenceSample diff --git a/coretex/coretex/sample/sequence_sample/local_sequence_sample.py b/coretex/coretex/sample/sequence_sample/local_sequence_sample.py new file mode 100644 index 00000000..43849715 --- /dev/null +++ b/coretex/coretex/sample/sequence_sample/local_sequence_sample.py @@ -0,0 +1,70 @@ +# Copyright (C) 2023 Coretex LLC + +# This file is part of Coretex.ai + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import List +from pathlib import Path + +from ..local_sample import LocalSample +from ....utils import file as file_utils + + +def getSequenceFile(directoryPath: Path, extensions: List[str]) -> Path: + for path in directoryPath.iterdir(): + if path.suffix in extensions: + return path + + raise FileNotFoundError(f">> [Coretex] {directoryPath} has no files with extensions \"{extensions}\"") + + +class LocalSequenceSample(LocalSample): + + """ + Represents the local custom Sample class + which is used for working with Other Task locally + """ + + @classmethod + def supportedExtensions(cls) -> List[str]: + return [".fasta", ".fastq", ".fa", ".fq"] + + @property + def sequencePath(self) -> Path: + """ + Returns the path of the .fasta or .fastq sequence file + contained inside the sample. If the sample contains gzip compressed + sequences, you will have to call Sample.unzip method first otherwise + calling Sample.sequencePath will raise an exception + + Raises + ------ + FileNotFoundError -> if no .fasta, .fastq, .fq, or .fq files are found inside the sample + """ + return getSequenceFile(Path(self.path), self.supportedExtensions()) + + def unzip(self, ignoreCache: bool = False) -> None: + super().unzip(ignoreCache) + + try: + compressedSequencePath = getSequenceFile(Path(self.path), [".gz"]) + decompressedSequencePath = compressedSequencePath.parent / compressedSequencePath.stem + + if decompressedSequencePath.exists() and not ignoreCache: + return + + file_utils.gzipDecompress(compressedSequencePath, decompressedSequencePath) + except FileNotFoundError: + pass diff --git a/coretex/coretex/sample/sequence_sample/sequence_sample.py b/coretex/coretex/sample/sequence_sample/sequence_sample.py new file mode 100644 index 00000000..50290f17 --- /dev/null +++ b/coretex/coretex/sample/sequence_sample/sequence_sample.py @@ -0,0 +1,102 @@ +# Copyright (C) 2023 Coretex LLC + +# This file is part of Coretex.ai + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Union +from typing_extensions import Self +from pathlib import Path + +import logging + +from .local_sequence_sample import LocalSequenceSample +from ..utils import chunkSampleImport +from ..network_sample import NetworkSample +from ....utils import file as file_utils +from ....folder_management import FolderManager + + +class SequenceSample(NetworkSample, LocalSequenceSample): + + """ + Represents the local custom Sample class + which is used for working with Other Task locally + """ + + def __init__(self) -> None: + NetworkSample.__init__(self) + + @classmethod + def createSequenceSample(cls, path: Union[Path, str], datasetId: int) -> Self: + """ + Creates a new sequence sample and adds it to the specified dataset + + Parameters + ---------- + datasetId : int + id of dataset to which the sample will be added + path : Union[Path, str] + path to the sample + + Returns + ------- + Optional[Self] -> The created sample object or None if creation failed + + Raises + ------ + NetworkRequestError, ValueError -> if some kind of error happened during + the upload of the provided file + + Example + ------- + >>> from coretex import SequenceSample + \b + >>> sample = SequenceSample.createSequenceSample("path/to/file", 1023) + >>> print(sample.id) + """ + + if isinstance(path, str): + path = Path(path) + + archivePath = path + isTemp = False + + if not file_utils.isArchive(path): + # Check if the file is valid sequence file + if not path.suffix in cls.supportedExtensions(): + # If not check if file is valid compressed sequence file + if len(path.suffixes) < 2: + raise ValueError(f">> [Coretex] \"{path}\" is not a valid sequence file, supported extensions are \"{cls.supportedExtensions()}\"") + + if not path.suffixes[-2] in cls.supportedExtensions(): + raise ValueError(f">> [Coretex] \"{path}\" is not a valid sequence file, supported extensions are \"{cls.supportedExtensions()}\"") + + if not path.suffix == ".gz": + raise ValueError(f">> [Coretex] \"{path}\" is not a valid sequence file, supported extensions are \"{cls.supportedExtensions()}\"") + + logging.getLogger("coretexpylib").info(f">> [Coretex] Provided path \"{path}\" is not an archive, zipping...") + + archivePath = Path(FolderManager.instance().temp) / f"{path.stem}.zip" + file_utils.archive(path, archivePath) + + isTemp = True + + response = chunkSampleImport(path.stem, datasetId, archivePath) + + # Delete the archive if we created it + if isTemp: + archivePath.unlink() + + return cls.decode(response.json) diff --git a/coretex/coretex/sample/utils.py b/coretex/coretex/sample/utils.py new file mode 100644 index 00000000..b1ab3187 --- /dev/null +++ b/coretex/coretex/sample/utils.py @@ -0,0 +1,79 @@ +# Copyright (C) 2023 Coretex LLC + +# This file is part of Coretex.ai + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Union +from pathlib import Path + +from ...networking import NetworkResponse, NetworkRequestError, networkManager, ChunkUploadSession, MAX_CHUNK_SIZE +from ...utils import file as file_utils + + +def chunkSampleImport( + name: str, + datasetId: int, + filePath: Union[Path, str] +) -> NetworkResponse: + + """ + Creates a new sample with specified properties\n + When performing chunked sample import the provided + path must be an archive + + Parameters + ---------- + name : str + sample name + datasetId : int + id of dataset to which the sample will be added + filePath : Union[Path, str] + path to the sample + + Returns + ------- + NetworkResponse -> response of the session/import request + + Raises + ------ + NetworkRequestError, ValueError -> if some kind of error happened during + the upload of the provided file + + Example + ------- + >>> from coretex import chunkSampleImport + >>> chunkSampleImport("name", 1023, "path/to/archive.ext") + """ + + if isinstance(filePath, str): + filePath = Path(filePath) + + if not file_utils.isArchive(filePath): + raise ValueError(">> [Coretex] File must be a compressed archive [.zip, .tar.gz]") + + uploadSession = ChunkUploadSession(MAX_CHUNK_SIZE, filePath) + uploadId = uploadSession.run() + + parameters = { + "name": name, + "dataset_id": datasetId, + "file_id": uploadId + } + + response = networkManager.genericFormData("session/import", parameters) + if response.hasFailed(): + raise NetworkRequestError(response, "Failed to create sample from \"{path}\"") + + return response diff --git a/coretex/utils/file.py b/coretex/utils/file.py index e50b752e..e6544b8b 100644 --- a/coretex/utils/file.py +++ b/coretex/utils/file.py @@ -134,6 +134,30 @@ def gzipDecompress(source: Path, destination: Path) -> None: shutil.copyfileobj(gzipFile, destinationFile) +def archive(source: Path, destination: Path) -> None: + """ + Archives and compresses the provided file or directory + using ZipFile module + + Parameters + ---------- + source : Path + file to be archived and compressed + destination : Path + location to which the zip file will be stored + """ + + with ZipFile(destination, "w", zipfile.ZIP_DEFLATED) as destinationFile: + if source.is_file(): + destinationFile.write(source, source.name) + else: + for path in source.rglob("*"): + if not path.is_file(): + continue + + destinationFile.write(path, path.relative_to(source)) + + def walk(path: Path) -> Generator[Path, None, None]: """ os.walk implementation for pathlib.Path From f3a7dee764a262757498ce7201265ac8871bf920 Mon Sep 17 00:00:00 2001 From: Dusko Mirkovic Date: Fri, 30 Jun 2023 16:52:27 +0200 Subject: [PATCH 06/12] CTX-4102: Fix terminating metrics process on experiment process kill --- coretex/project/base.py | 5 ++++- coretex/project/calculate_metrics.py | 12 +++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/coretex/project/base.py b/coretex/project/base.py index 7b016350..06e68eeb 100644 --- a/coretex/project/base.py +++ b/coretex/project/base.py @@ -38,7 +38,8 @@ def __init__(self, experiment: Experiment, refreshToken: str) -> None: self.process = multiprocessing.Process( target = uploadMetricsWorker, - args = (self.__outputStream, refreshToken, self._experiment.id) + args = (self.__outputStream, refreshToken, self._experiment.id), + daemon = True ) @classmethod @@ -71,7 +72,9 @@ def onNetworkConnectionLost(self) -> None: def onCleanUp(self) -> None: logging.getLogger("coretexpylib").info("Experiment execution finished") + self.process.kill() + self.process.join() try: from py3nvml import py3nvml diff --git a/coretex/project/calculate_metrics.py b/coretex/project/calculate_metrics.py index f7c3cf81..d4309036 100644 --- a/coretex/project/calculate_metrics.py +++ b/coretex/project/calculate_metrics.py @@ -90,7 +90,17 @@ def uploadMetricsWorker(outputStream: Connection, refreshToken: str, experimentI sendSuccess(outputStream, "Metrics worker succcessfully started") - while True: + while outputStream.writable and not outputStream.closed: + try: + # If parent process gets killed with SIGKILL there + # is no guarantee that the child process will get + # closed so we ping the parent process to check if + # the pipe is available or not + outputStream.send(None) + except BrokenPipeError: + outputStream.close() + break + startTime = time.time() metricValues: Dict[str, Tuple[float, float]] = {} From 813c31a8be3dec9e1814531e3998f3d5a36ae20b Mon Sep 17 00:00:00 2001 From: Dusko Mirkovic Date: Mon, 3 Jul 2023 10:13:53 +0200 Subject: [PATCH 07/12] Version bump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1b969398..eaffb4ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "coretex" -version = "1.0.7" +version = "1.0.8" authors = [ { name="Duško Mirković", email="dmirkovic@coretex.ai" } ] From 551b1329a1eb0007a5056b3a17847be3b890d81f Mon Sep 17 00:00:00 2001 From: Dusko Mirkovic Date: Mon, 3 Jul 2023 13:06:32 +0200 Subject: [PATCH 08/12] CTX-4083: Moved heartbeat to the metrics worker process - Renamed metrics worker process to experiment worker --- coretex/coretex/experiment/experiment.py | 75 ++++++++----------- coretex/project/base.py | 13 ++-- ...culate_metrics.py => experiment_worker.py} | 61 +++++++++------ coretex/project/heartbeat.py | 50 ------------- coretex/project/local.py | 6 -- 5 files changed, 75 insertions(+), 130 deletions(-) rename coretex/project/{calculate_metrics.py => experiment_worker.py} (73%) delete mode 100644 coretex/project/heartbeat.py diff --git a/coretex/coretex/experiment/experiment.py b/coretex/coretex/experiment/experiment.py index 1545de06..5116eb38 100644 --- a/coretex/coretex/experiment/experiment.py +++ b/coretex/coretex/experiment/experiment.py @@ -72,8 +72,6 @@ class Experiment(NetworkObject, Generic[DatasetType]): if True chached env will be used, otherwise new environment will be created """ - __statusUpdateLock: Final = Lock() - name: str description: str meta: Dict[str, Any] @@ -91,8 +89,6 @@ def __init__(self) -> None: super(Experiment, self).__init__() self.metrics = [] - - self.__lastStatusMessage: Optional[str] = None self.__parameters: Dict[str, Any] = {} @property @@ -147,7 +143,6 @@ def _keyDescriptors(cls) -> Dict[str, KeyDescriptor]: descriptors["projectName"] = KeyDescriptor("sub_project_name") # private properties of the object should not be encoded - descriptors["__lastStatusMessage"] = KeyDescriptor(isEncodable = False) descriptors["__parameters"] = KeyDescriptor(isEncodable = False) return descriptors @@ -168,7 +163,13 @@ def onDecode(self) -> None: parameters = [ExperimentParameter.decode(value) for value in self.meta["parameters"]] self.__parameters = parseParameters(parameters, self.spaceTask) - def updateStatus(self, status: ExperimentStatus, message: Optional[str] = None, notifyServer: bool = True) -> bool: + def updateStatus( + self, + status: Optional[ExperimentStatus] = None, + message: Optional[str] = None, + notifyServer: bool = True + ) -> bool: + """ Updates Experiment status, if message parameter is None default message value will be used\n @@ -176,10 +177,11 @@ def updateStatus(self, status: ExperimentStatus, message: Optional[str] = None, Parameters ---------- - status : ExperimentStatus - ExperimentStatus type + status : Optional[ExperimentStatus] + Status to which the experiment will be updated to message : Optional[str] - Descriptive message for experiment status + Descriptive message for experiment status, it is diplayed + when the status is hovered on the Coretex Web App notifyServer : bool if True update request will be sent to Coretex.ai @@ -193,35 +195,33 @@ def updateStatus(self, status: ExperimentStatus, message: Optional[str] = None, True """ - with Experiment.__statusUpdateLock: - if message is None: - message = status.defaultMessage - - assert(len(message) > 10) # Some information needs to be sent to Coretex.ai + if status is not None: self.status = status - self.__lastStatusMessage = message + + if notifyServer: + if status is not None and message is None: + message = status.defaultMessage parameters: Dict[str, Any] = { - "id": self.id, - "status": status.value, - "status_message": message + "id": self.id } - if notifyServer: - # TODO: Should API rename this too? - endpoint = "model-queue/job-status-update" - response = networkManager.genericJSONRequest( - endpoint = endpoint, - requestType = RequestType.post, - parameters = parameters - ) + if status is not None: + parameters["status"] = status + + if message is not None: + parameters["message"] = message - if response.hasFailed(): - logging.getLogger("coretexpylib").error(">> [Coretex] Error while updating experiment status") + # TODO: Should API rename this too? + endpoint = "model-queue/job-status-update" + response = networkManager.genericJSONRequest(endpoint, RequestType.post, parameters) - return not response.hasFailed() + if response.hasFailed(): + logging.getLogger("coretexpylib").error(">> [Coretex] Error while updating experiment status") - return True + return not response.hasFailed() + + return True def createMetrics(self, metrics: List[Metric]) -> None: """ @@ -311,21 +311,6 @@ def submitMetrics(self, metricValues: Dict[str, Tuple[float, float]]) -> bool: return not response.hasFailed() - def sendHeartbeat(self) -> bool: - """ - Sends an update status request with the status and the - status message of the previous status update - - Returns - ------- - bool -> True if success, False otherwise - """ - - if self.__lastStatusMessage: - return False - - return self.updateStatus(self.status, self.__lastStatusMessage) - def downloadProject(self) -> bool: """ Downloads project snapshot linked to the experiment diff --git a/coretex/project/base.py b/coretex/project/base.py index 06e68eeb..a4c0dc98 100644 --- a/coretex/project/base.py +++ b/coretex/project/base.py @@ -26,7 +26,7 @@ from ..folder_management import FolderManager from ..logging import LogHandler -from .calculate_metrics import uploadMetricsWorker +from .experiment_worker import experimentWorker class ProjectCallback: @@ -36,8 +36,9 @@ def __init__(self, experiment: Experiment, refreshToken: str) -> None: self.__outputStream, self.__inputStream = multiprocessing.Pipe() - self.process = multiprocessing.Process( - target = uploadMetricsWorker, + self.workerProcess = multiprocessing.Process( + name = f"Experiment {experiment.id} worker process", + target = experimentWorker, args = (self.__outputStream, refreshToken, self._experiment.id), daemon = True ) @@ -47,7 +48,7 @@ def create(cls, experimentId: int, refreshToken: str) -> Self: return cls(Experiment.fetchById(experimentId), refreshToken) def onStart(self) -> None: - self.process.start() + self.workerProcess.start() result = self.__inputStream.recv() if result["code"] != 0: @@ -73,8 +74,8 @@ def onNetworkConnectionLost(self) -> None: def onCleanUp(self) -> None: logging.getLogger("coretexpylib").info("Experiment execution finished") - self.process.kill() - self.process.join() + self.workerProcess.kill() + self.workerProcess.join() try: from py3nvml import py3nvml diff --git a/coretex/project/calculate_metrics.py b/coretex/project/experiment_worker.py similarity index 73% rename from coretex/project/calculate_metrics.py rename to coretex/project/experiment_worker.py index d4309036..b73f8850 100644 --- a/coretex/project/calculate_metrics.py +++ b/coretex/project/experiment_worker.py @@ -69,7 +69,39 @@ def setupGPUMetrics() -> None: pass -def uploadMetricsWorker(outputStream: Connection, refreshToken: str, experimentId: int) -> None: +def _isAlive(output: Connection) -> bool: + try: + # If parent process gets killed with SIGKILL there + # is no guarantee that the child process will get + # closed so we ping the parent process to check if + # the pipe is available or not + output.send(None) + except BrokenPipeError: + output.close() + return False + + return output.writable and not output.closed + + +def _heartbeat(experiment: Experiment) -> None: + # Update status without parameters is considered experiment hearbeat + experiment.updateStatus() + + +def _uploadMetrics(experiment: Experiment) -> None: + x = time.time() + metricValues: Dict[str, Tuple[float, float]] = {} + + for metric in experiment.metrics: + metricValue = metric.extract() + + if metricValue is not None: + metricValues[metric.name] = x, metricValue + + experiment.submitMetrics(metricValues) + + +def experimentWorker(outputStream: Connection, refreshToken: str, experimentId: int) -> None: setupGPUMetrics() response = networkManager.authenticateWithRefreshToken(refreshToken) @@ -80,7 +112,7 @@ def uploadMetricsWorker(outputStream: Connection, refreshToken: str, experimentI try: experiment: Experiment = Experiment.fetchById(experimentId) except NetworkRequestError: - sendFailure(outputStream, f"Failed to fetch experiment with id: {experimentId}") + sendFailure(outputStream, f"Failed to fetch experiment with id \"{experimentId}\"") return try: @@ -88,27 +120,10 @@ def uploadMetricsWorker(outputStream: Connection, refreshToken: str, experimentI except NetworkRequestError: sendFailure(outputStream, "Failed to create metrics") - sendSuccess(outputStream, "Metrics worker succcessfully started") - - while outputStream.writable and not outputStream.closed: - try: - # If parent process gets killed with SIGKILL there - # is no guarantee that the child process will get - # closed so we ping the parent process to check if - # the pipe is available or not - outputStream.send(None) - except BrokenPipeError: - outputStream.close() - break - - startTime = time.time() - metricValues: Dict[str, Tuple[float, float]] = {} - - for metric in experiment.metrics: - metricValue = metric.extract() + sendSuccess(outputStream, "Experiment worker succcessfully started") - if metricValue is not None: - metricValues[metric.name] = startTime, metricValue + while _isAlive(outputStream): + _heartbeat(experiment) + _uploadMetrics(experiment) - experiment.submitMetrics(metricValues) time.sleep(5) # delay between sending generic metrics diff --git a/coretex/project/heartbeat.py b/coretex/project/heartbeat.py deleted file mode 100644 index 2f869dee..00000000 --- a/coretex/project/heartbeat.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright (C) 2023 Coretex LLC - -# This file is part of Coretex.ai - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -from threading import Thread - -import time -import logging - -from ..coretex import Experiment - - -class Heartbeat(Thread): - - def __init__(self, experiment: Experiment, heartbeatRate: int = 10): - super(Heartbeat, self).__init__() - - # Don't wait for this thread to finish once the - # non daemon threads have exited - self.setDaemon(True) - self.setName("Heartbeat") - - if heartbeatRate < 1: - raise ValueError(">> [Coretex] updateInterval must be expressed as an integer of seconds") - - self.__experiment = experiment - self.__heartbeatRate = heartbeatRate - - def run(self) -> None: - while True: - time.sleep(self.__heartbeatRate) - - logging.getLogger("coretexpylib").debug(">> [Coretex] Heartbeat") - - result = self.__experiment.sendHeartbeat() - if not result: - logging.getLogger("coretexpylib").debug(">> [Coretex] Heartbeat failed") diff --git a/coretex/project/local.py b/coretex/project/local.py index 1c7e8600..28628298 100644 --- a/coretex/project/local.py +++ b/coretex/project/local.py @@ -25,7 +25,6 @@ import psutil -from .heartbeat import Heartbeat from .base import ProjectCallback from ..coretex import Experiment, ExperimentStatus from ..folder_management import FolderManager @@ -40,11 +39,6 @@ def onStart(self) -> None: FolderManager.instance().clearTempFiles() - logging.getLogger("coretexpylib").info("Heartbeat started") - - heartbeat = Heartbeat(self._experiment) - heartbeat.start() - def onSuccess(self) -> None: super().onSuccess() From 90ff7d69f70d6e449000c03de7014e2a99a52302 Mon Sep 17 00:00:00 2001 From: Dusko Mirkovic Date: Thu, 6 Jul 2023 14:16:19 +0200 Subject: [PATCH 09/12] CTX-4219: Make root not private to make it available outside the class - Name starts with an underscore to mark it as internally used don't touch --- coretex/folder_management/folder_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coretex/folder_management/folder_manager.py b/coretex/folder_management/folder_manager.py index d5b4f41f..41863e13 100644 --- a/coretex/folder_management/folder_manager.py +++ b/coretex/folder_management/folder_manager.py @@ -64,7 +64,7 @@ def instance(cls) -> FolderManager: return cls.__instance def __init__(self) -> None: - self.__root: Final = Path(os.environ["CTX_STORAGE_PATH"]).expanduser() + self._root: Final = Path(os.environ["CTX_STORAGE_PATH"]).expanduser() # These paths are str paths for backwards compatibility self.samplesFolder: Final = str(self.__createFolder("samples")) @@ -175,7 +175,7 @@ def clearTempFiles(self) -> None: self.__clearDirectory(self.__artifactsFolder) def __createFolder(self, name: str) -> Path: - path = self.__root / name + path = self._root / name if not path.exists(): path.mkdir(parents = True, exist_ok = True) From 5ae9dbcc6ce6d425950c5e6e3724a63c5cb517aa Mon Sep 17 00:00:00 2001 From: Dusko Mirkovic Date: Thu, 6 Jul 2023 15:33:38 +0200 Subject: [PATCH 10/12] Version bump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index eaffb4ce..fe3a8b46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "coretex" -version = "1.0.8" +version = "1.0.9" authors = [ { name="Duško Mirković", email="dmirkovic@coretex.ai" } ] From e361f670405df784679f14909780a8379bc6b0f1 Mon Sep 17 00:00:00 2001 From: Dusko Mirkovic Date: Mon, 10 Jul 2023 14:04:58 +0200 Subject: [PATCH 11/12] CTX-4251: Update "model-queue/custom" to "run" endpoint --- coretex/coretex/experiment/experiment.py | 22 +++++++++------------- coretex/project/local.py | 2 +- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/coretex/coretex/experiment/experiment.py b/coretex/coretex/experiment/experiment.py index 5116eb38..f97c888a 100644 --- a/coretex/coretex/experiment/experiment.py +++ b/coretex/coretex/experiment/experiment.py @@ -390,7 +390,7 @@ def createQiimeArtifact(self, rootArtifactFolderName: str, qiimeArtifactPath: Pa # logging.getLogger("coretexpylib").warning(f">> [Coretex] Failed to upload {localFilePath} to {remoteFilePath}") @classmethod - def startCustomExperiment( + def run( cls, projectId: int, nodeId: Union[int, str], @@ -438,7 +438,7 @@ def startCustomExperiment( ] \b >>> try: - experiment = Experiment.startCustomExperiment( + experiment = Experiment.run( projectId = 1023, nodeId = 23, name = "Dummy Custom Experiment @@ -457,17 +457,13 @@ def startCustomExperiment( if parameters is None: parameters = [] - response = networkManager.genericJSONRequest( - f"{cls._endpoint()}/custom", - RequestType.post, - parameters={ - "sub_project_id": projectId, - "service_id": nodeId, - "name": name, - "description": description, - "parameters": parameters - } - ) + response = networkManager.genericJSONRequest("run", RequestType.post, { + "sub_project_id": projectId, + "service_id": nodeId, + "name": name, + "description": description, + "parameters": parameters + }) if response.hasFailed(): raise NetworkRequestError(response, "Failed to create experiment") diff --git a/coretex/project/local.py b/coretex/project/local.py index 28628298..6a80144b 100644 --- a/coretex/project/local.py +++ b/coretex/project/local.py @@ -113,7 +113,7 @@ def processLocal(args: Optional[List[str]] = None) -> Tuple[int, ProjectCallback parameters = ExperimentParameter.readExperimentConfig() - experiment: Experiment = Experiment.startCustomExperiment( + experiment: Experiment = Experiment.run( parser.projectId, # Dummy Local node ID, hardcoded as it is only a temporary solution, # backend will add a new ExperimentType (local) which does not require a specific From 5284968f11f7f3a775d3e9e57df9875bdaef6803 Mon Sep 17 00:00:00 2001 From: Dusko Mirkovic Date: Tue, 11 Jul 2023 14:43:15 +0200 Subject: [PATCH 12/12] CTX-4079: Redesigned terminate condition for experiment worker process - Added log file output for experiment worker process for debugging purposes --- coretex/project/experiment_worker.py | 47 ++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/coretex/project/experiment_worker.py b/coretex/project/experiment_worker.py index b73f8850..be91e17c 100644 --- a/coretex/project/experiment_worker.py +++ b/coretex/project/experiment_worker.py @@ -19,10 +19,15 @@ from multiprocessing.connection import Connection import time +import os +import logging -from ..coretex import MetricType +import psutil + +from ..coretex import MetricType, Experiment from ..networking import networkManager, NetworkRequestError -from ..coretex.experiment import Experiment +from ..logging import initializeLogger, LogSeverity +from ..folder_management import FolderManager from ..coretex.experiment.metrics.metric_factory import createMetric @@ -65,8 +70,10 @@ def setupGPUMetrics() -> None: createMetric("gpu_usage", "time (s)", MetricType.interval, "usage (%)", MetricType.percent, None, [0, 100]), createMetric("gpu_temperature", "time (s)", MetricType.interval, "usage (%)", MetricType.percent) ]) + + logging.getLogger("coretexpylib").debug(">> [Coretex] Initialized GPU metrics") except: - pass + logging.getLogger("coretexpylib").debug(">> [Coretex] Failed to initialize GPU metrics") def _isAlive(output: Connection) -> bool: @@ -101,29 +108,51 @@ def _uploadMetrics(experiment: Experiment) -> None: experiment.submitMetrics(metricValues) -def experimentWorker(outputStream: Connection, refreshToken: str, experimentId: int) -> None: +def experimentWorker(output: Connection, refreshToken: str, experimentId: int) -> None: + workerLogPath = FolderManager.instance().logs / f"experiment_worker_{experimentId}.log" + initializeLogger(LogSeverity.debug, workerLogPath) + + currentProcess = psutil.Process(os.getpid()) + setupGPUMetrics() response = networkManager.authenticateWithRefreshToken(refreshToken) if response.hasFailed(): - sendFailure(outputStream, "Failed to authenticate with refresh token") + sendFailure(output, "Failed to authenticate with refresh token") return try: experiment: Experiment = Experiment.fetchById(experimentId) except NetworkRequestError: - sendFailure(outputStream, f"Failed to fetch experiment with id \"{experimentId}\"") + sendFailure(output, f"Failed to fetch experiment with id \"{experimentId}\"") return try: experiment.createMetrics(METRICS) except NetworkRequestError: - sendFailure(outputStream, "Failed to create metrics") + sendFailure(output, "Failed to create metrics") - sendSuccess(outputStream, "Experiment worker succcessfully started") + sendSuccess(output, "Experiment worker succcessfully started") - while _isAlive(outputStream): + while (parent := currentProcess.parent()) is not None: + logging.getLogger("coretexpylib").debug(f">> [Coretex] Worker process id {currentProcess.pid}, parent process id {parent.pid}...") + + # If parent process ID is set to 1 then that means that the parent process has terminated + # the process (this is only true for Unix-based systems), but since we run the Node + # from the docker container which uses Linux as a base then it is safe to use. + # + # In docker container the pid of the Node process is 1, but we are safe to chech since the + # node should never be a parent of this process for metric upload, only the experiment + # process can be the parent. + if parent.pid == 1 or not parent.is_running() or not _isAlive(output): + logging.getLogger("coretexpylib").debug(">> [Coretex] Terminating worker process...") + break + + logging.getLogger("coretexpylib").debug(">> [Coretex] Heartbeat") _heartbeat(experiment) + + logging.getLogger("coretexpylib").debug(">> [Coretex] Uploading metrics") _uploadMetrics(experiment) + logging.getLogger("coretexpylib").debug(">> [Coretex] Sleeping for 5s") time.sleep(5) # delay between sending generic metrics