Skip to content

Commit

Permalink
Bump version to 0.0.30
Browse files Browse the repository at this point in the history
  • Loading branch information
GlassOfWhiskey committed May 27, 2021
1 parent 200785b commit 71b669a
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 5 deletions.
2 changes: 1 addition & 1 deletion helm/chart/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: {{ include "streamflow.fullname" . }}
labels:
{{- include "streamflow.labels" . | nindent 4 }}
annottions:
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-delete-policy": hook-succeeded
spec:
Expand Down
5 changes: 5 additions & 0 deletions streamflow/config/schemas/v1.0/helm2.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
"type": "string",
"description": "Absolute path of the kubeconfig file to be used"
},
"maxConcurrentConnections": {
"type": "integer",
"description": "Maximum number of concurrent connections to open for a single Kubernetes client",
"default": 10
},
"namespace": {
"type": "string",
"description": "Namespace to install the release into",
Expand Down
57 changes: 54 additions & 3 deletions streamflow/deployment/connector/helm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import tarfile
import tempfile
import uuid
import zlib
from abc import ABC
from typing import MutableMapping, MutableSequence, Optional, Any, Tuple, Union
from urllib.parse import urlencode

import yaml
from cachetools import Cache, TTLCache
Expand Down Expand Up @@ -44,6 +44,53 @@ def _set_config(self, configuration: Optional[Configuration] = None):
configuration.api_key['authorization'] = "bearer " + self.token


class PatchedWsApiClient(WsApiClient):

async def request(self, method, url, query_params=None, headers=None,
post_params=None, body=None, _preload_content=True,
_request_timeout=None):
if query_params:
new_query_params = []
for key, value in query_params:
if key == 'command' and isinstance(value, list):
for command in value:
new_query_params.append((key, command))
else:
new_query_params.append((key, value))
query_params = new_query_params
if headers is None:
headers = {}
if 'sec-websocket-protocol' not in headers:
headers['sec-websocket-protocol'] = 'v4.channel.k8s.io'

if query_params:
url += '?' + urlencode(query_params)

url = ws_client.get_websocket_url(url)

if _preload_content:

resp_all = ''
async with self.rest_client.pool_manager.ws_connect(
url,
headers=headers,
heartbeat=30) as ws:
async for msg in ws:
msg = msg.data.decode('utf-8')
if len(msg) > 1:
channel = ord(msg[0])
data = msg[1:]
if data:
if channel in [ws_client.STDOUT_CHANNEL, ws_client.STDERR_CHANNEL]:
resp_all += data

return ws_client.WsResponse(resp_all.encode('utf-8'))

else:

return await self.rest_client.pool_manager.ws_connect(url, headers=headers, heartbeat=30)


class BaseHelmConnector(BaseConnector, ABC):

def __init__(self,
Expand All @@ -53,7 +100,8 @@ def __init__(self,
namespace: Optional[Text] = None,
releaseName: Optional[Text] = "release-%s" % str(uuid.uuid1()),
resourcesCacheTTL: int = 10,
transferBufferSize: int = (2 ** 25) - 1):
transferBufferSize: int = (2 ** 25) - 1,
maxConcurrentConnections: int = 4096):
super().__init__(
streamflow_config_dir=streamflow_config_dir,
transferBufferSize=transferBufferSize)
Expand All @@ -65,6 +113,7 @@ def __init__(self,
self.configuration: Optional[Configuration] = None
self.client: Optional[client.CoreV1Api] = None
self.client_ws: Optional[client.CoreV1Api] = None
self.maxConcurrentConnections: int = maxConcurrentConnections

def _configure_incluster_namespace(self):
if self.namespace is None:
Expand Down Expand Up @@ -209,10 +258,12 @@ async def _get_effective_resources(self,
async def deploy(self, external: bool):
# Init standard client
configuration = await self._get_configuration()
configuration.connection_pool_maxsize = self.maxConcurrentConnections
self.client = client.CoreV1Api(api_client=ApiClient(configuration=configuration))
# Init WebSocket client
configuration = await self._get_configuration()
ws_api_client = WsApiClient(configuration=configuration)
configuration.connection_pool_maxsize = self.maxConcurrentConnections
ws_api_client = PatchedWsApiClient(configuration=configuration)
ws_api_client.set_default_header('Connection', 'upgrade,keep-alive')
self.client_ws = client.CoreV1Api(api_client=ws_api_client)

Expand Down
2 changes: 1 addition & 1 deletion streamflow/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "0.0.29"
VERSION = "0.0.30"

0 comments on commit 71b669a

Please sign in to comment.