Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Aug 2, 2024
1 parent 442b71f commit 0c49622
Showing 1 changed file with 7 additions and 31 deletions.
38 changes: 7 additions & 31 deletions streamflow/deployment/connector/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,22 +172,10 @@ def __init__(
async def __aenter__(self) -> asyncssh.SSHClientProcess:
async with self._condition:
while True:
if all(c.ssh_attempts > c.retries for c in self._contexts):
raise WorkflowExecutionException(
"No more contexts available. Attempts to reconnect terminated."
)
if (
len(free_contexts := [c for c in self._contexts if not c.full()])
== 0
):
await self._condition.wait()
else:
for context in free_contexts:
if context.ssh_attempts > context.retries:
# context terminated the retries
continue
for context in self._contexts:
if not context.full():
ssh_connection = await context.get_connection()
try:
ssh_connection = await context.get_connection()
self._selected_context = context
self._proc = await ssh_connection.create_process(
self.command,
Expand All @@ -197,26 +185,14 @@ async def __aenter__(self) -> asyncssh.SSHClientProcess:
stderr=self.stderr,
encoding=self.encoding,
)
logger.info("self._proc acquiring")
await self._proc.__aenter__()
logger.info("self._proc return")
return self._proc
except (ConnectionError, ConnectionLost, ChannelOpenError) as e:
msg = ""
if isinstance(e, ChannelOpenError):
msg = f"code: {e.code} - reason: {e.reason} - lang: {e.lang}"
except ChannelOpenError as coe:
logger.warning(
f"Error {type(e)}: {e}. {msg}. Opening SSH session to {context.get_hostname()} "
f"to execute command `{self.command}`"
f"Error opening SSH session to {context.get_hostname()} "
f"to execute command `{self.command}`: [{coe.code}] {coe.reason}"
)
# if self._proc:
# await self._proc.__aexit__(None, None, None)
# self._proc = None
context.ssh_attempts += 1
context.close()
await asyncio.sleep(context._retry_delay)
finally:
logger.info(f"self._proc is None: {self._proc is None}")
await self._condition.wait()

async def __aexit__(self, exc_type, exc_val, exc_tb):
async with self._condition:
Expand Down

0 comments on commit 0c49622

Please sign in to comment.