diff --git a/helm/chart/templates/job.yaml b/helm/chart/templates/job.yaml index 6a69c5b4..9ab6851e 100644 --- a/helm/chart/templates/job.yaml +++ b/helm/chart/templates/job.yaml @@ -4,7 +4,7 @@ metadata: name: {{ include "streamflow.fullname" . }} labels: {{- include "streamflow.labels" . | nindent 4 }} - annottions: + annotations: "helm.sh/hook": post-install "helm.sh/hook-delete-policy": hook-succeeded spec: diff --git a/streamflow/config/schemas/v1.0/helm2.json b/streamflow/config/schemas/v1.0/helm2.json index 569892e4..61b64b6e 100644 --- a/streamflow/config/schemas/v1.0/helm2.json +++ b/streamflow/config/schemas/v1.0/helm2.json @@ -80,6 +80,11 @@ "type": "string", "description": "Absolute path of the kubeconfig file to be used" }, + "maxConcurrentConnections": { + "type": "integer", + "description": "Maximum number of concurrent connections to open for a single Kubernetes client", + "default": 10 + }, "namespace": { "type": "string", "description": "Namespace to install the release into", diff --git a/streamflow/deployment/connector/helm.py b/streamflow/deployment/connector/helm.py index bb0e49b9..c590ad73 100644 --- a/streamflow/deployment/connector/helm.py +++ b/streamflow/deployment/connector/helm.py @@ -9,9 +9,9 @@ import tarfile import tempfile import uuid -import zlib from abc import ABC from typing import MutableMapping, MutableSequence, Optional, Any, Tuple, Union +from urllib.parse import urlencode import yaml from cachetools import Cache, TTLCache @@ -44,6 +44,53 @@ def _set_config(self, configuration: Optional[Configuration] = None): configuration.api_key['authorization'] = "bearer " + self.token +class PatchedWsApiClient(WsApiClient): + + async def request(self, method, url, query_params=None, headers=None, + post_params=None, body=None, _preload_content=True, + _request_timeout=None): + if query_params: + new_query_params = [] + for key, value in query_params: + if key == 'command' and isinstance(value, list): + for command in value: + new_query_params.append((key, command)) + else: + new_query_params.append((key, value)) + query_params = new_query_params + if headers is None: + headers = {} + if 'sec-websocket-protocol' not in headers: + headers['sec-websocket-protocol'] = 'v4.channel.k8s.io' + + if query_params: + url += '?' + urlencode(query_params) + + url = ws_client.get_websocket_url(url) + + if _preload_content: + + resp_all = '' + async with self.rest_client.pool_manager.ws_connect( + url, + headers=headers, + heartbeat=30) as ws: + async for msg in ws: + msg = msg.data.decode('utf-8') + if len(msg) > 1: + channel = ord(msg[0]) + data = msg[1:] + if data: + if channel in [ws_client.STDOUT_CHANNEL, ws_client.STDERR_CHANNEL]: + resp_all += data + + return ws_client.WsResponse(resp_all.encode('utf-8')) + + else: + + return await self.rest_client.pool_manager.ws_connect(url, headers=headers, heartbeat=30) + + class BaseHelmConnector(BaseConnector, ABC): def __init__(self, @@ -53,7 +100,8 @@ def __init__(self, namespace: Optional[Text] = None, releaseName: Optional[Text] = "release-%s" % str(uuid.uuid1()), resourcesCacheTTL: int = 10, - transferBufferSize: int = (2 ** 25) - 1): + transferBufferSize: int = (2 ** 25) - 1, + maxConcurrentConnections: int = 4096): super().__init__( streamflow_config_dir=streamflow_config_dir, transferBufferSize=transferBufferSize) @@ -65,6 +113,7 @@ def __init__(self, self.configuration: Optional[Configuration] = None self.client: Optional[client.CoreV1Api] = None self.client_ws: Optional[client.CoreV1Api] = None + self.maxConcurrentConnections: int = maxConcurrentConnections def _configure_incluster_namespace(self): if self.namespace is None: @@ -209,10 +258,12 @@ async def _get_effective_resources(self, async def deploy(self, external: bool): # Init standard client configuration = await self._get_configuration() + configuration.connection_pool_maxsize = self.maxConcurrentConnections self.client = client.CoreV1Api(api_client=ApiClient(configuration=configuration)) # Init WebSocket client configuration = await self._get_configuration() - ws_api_client = WsApiClient(configuration=configuration) + configuration.connection_pool_maxsize = self.maxConcurrentConnections + ws_api_client = PatchedWsApiClient(configuration=configuration) ws_api_client.set_default_header('Connection', 'upgrade,keep-alive') self.client_ws = client.CoreV1Api(api_client=ws_api_client) diff --git a/streamflow/version.py b/streamflow/version.py index 896872d4..2e461129 100644 --- a/streamflow/version.py +++ b/streamflow/version.py @@ -1 +1 @@ -VERSION = "0.0.29" +VERSION = "0.0.30"