Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Aug 2, 2024
1 parent a14179a commit 442b71f
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions streamflow/deployment/connector/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async def get_connection(self) -> asyncssh.SSHClientConnection:
logger.warning(
f"Connection to {self._config.hostname} failed: {e}."
)
self._connect_event.set()
raise
except asyncssh.Error:
self._connect_event.set()
Expand Down Expand Up @@ -180,15 +181,11 @@ async def __aenter__(self) -> asyncssh.SSHClientProcess:
== 0
):
await self._condition.wait()
self._sleeping_contexts = [
t for t in self._sleeping_contexts if not t.done()
]
else:
for context in free_contexts:
if context.ssh_attempts > context.retries:
# context terminated the retries
continue
ssh_connection = None
try:
ssh_connection = await context.get_connection()
self._selected_context = context
Expand All @@ -200,22 +197,26 @@ async def __aenter__(self) -> asyncssh.SSHClientProcess:
stderr=self.stderr,
encoding=self.encoding,
)
logger.info("self._proc acquiring")
await self._proc.__aenter__()
logger.info("self._proc return")
return self._proc
except (ConnectionError, ConnectionLost, ChannelOpenError) as e:
msg = ""
if isinstance(e, ChannelOpenError):
msg = f"code: {e.code} - reason: {e.reason} - lang: {e.lang}"
logger.warning(
f"Error {e}. Opening SSH session to {context.get_hostname()} "
f"Error {type(e)}: {e}. {msg}. Opening SSH session to {context.get_hostname()} "
f"to execute command `{self.command}`"
)
if self._proc:
await self._proc.__aexit__(None, None, None)
# if self._proc:
# await self._proc.__aexit__(None, None, None)
# self._proc = None
context.ssh_attempts += 1
context.close()
self._sleeping_contexts.append(
asyncio.create_task(context.sleep(self._condition))
)
if ssh_connection:
ssh_connection.close()
await asyncio.sleep(context._retry_delay)
finally:
logger.info(f"self._proc is None: {self._proc is None}")

async def __aexit__(self, exc_type, exc_val, exc_tb):
async with self._condition:
Expand Down Expand Up @@ -768,9 +769,19 @@ async def run(
environment=environment,
) as proc:
result = await proc.wait(timeout=timeout)
logger.info(f"CMD: {command} returncode: {result.returncode}")
if result.returncode is None:
result.returncode = 9999
if result.returncode is None or result.returncode != 0:
logger.info(
f"CMD: {command}"
f"\n\tcommand: {result.command}"
f"\n\tenv: {result.env}"
f"\n\texit_signal: {result.exit_signal}"
f"\n\texit_status: {result.exit_status}"
f"\n\tstderr: {result.stderr}"
f"\n\tstdout: {result.stdout}"
f"\n\treturncode: {result.returncode}"
)
if result.returncode is None:
result.returncode = 9999
return (result.stdout.strip(), result.returncode) if capture_output else None

async def undeploy(self, external: bool) -> None:
Expand Down

0 comments on commit 442b71f

Please sign in to comment.