Skip to content

Commit

Permalink
Merge pull request #30 from Stackmasters/display-deployment-logs
Browse files Browse the repository at this point in the history
Stream job logs when using `--wait`
  • Loading branch information
parath authored Jun 13, 2024
2 parents b7eceb1 + dc8398e commit 9a1347e
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 26 deletions.
51 changes: 50 additions & 1 deletion cycleops/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import requests
import sec
import typer
import websockets
from requests.models import Response
from websockets.legacy.client import WebSocketClientProtocol

from .auth import CycleopsAuthentication
from .exceptions import APIError, AuthenticationError
Expand Down Expand Up @@ -151,7 +153,7 @@ def deploy(self, setup_id: int) -> Optional[Dict[str, Any]]:
description: str = f"Deploying setup: {setup_id}"
type: str = "Deployment"

jobs_client = JobClient(cycleops_client)
jobs_client: JobClient = JobClient(cycleops_client)
return jobs_client.create(description=description, type=type, setup=setup_id)


Expand Down Expand Up @@ -266,3 +268,50 @@ def delete(self, hostgroup_id: int) -> Optional[Dict[str, Any]]:


cycleops_client: Client = Client()


class WebSocketClient:
"""
A client for interacting with Cycleops websockets to request and listen for job logs.
"""

def __init__(self, job_id: str):
self.url: str = "wss://cloud.cycleops.io/ansible-worker-ws/ws/ansible-output"
self.job_id: str = job_id
self._jwt: Optional[str] = None
self._job: Optional[Dict[str, Any]] = None

@property
def jwt(self):
if not self._jwt:
self._jwt = self.authenticate()
return self._jwt

@property
def job(self):
if not self._job:
self._job = self.get_job()
return self._job

def authenticate(self) -> Optional[str]:
token: str = cycleops_client._request("POST", f"identity/token")
return token["access_token"]

def get_job(self) -> Optional[Dict[str, Any]]:
job_client: JobClient = JobClient(cycleops_client)
job: Optional[Dict[str, Any]] = job_client.retrieve(self.job_id)

return job

async def get_job_logs(self, websocket: WebSocketClientProtocol) -> None:
message: str = f"id={self.job_id}|jwt={self.jwt}|account={self.job['account']}"
await websocket.send(message)

async def listen(self, websocket: WebSocketClientProtocol) -> None:
while message := await websocket.recv():
print(f"{message}\n")

async def run(self) -> None:
async with websockets.connect(self.url) as websocket:
await self.get_job_logs(websocket)
await self.listen(websocket)
62 changes: 39 additions & 23 deletions cycleops/setups.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import time
from typing import Any, Dict, List, Optional

import typer
import websockets
from rich import print

from .client import JobClient, SetupClient, cycleops_client
from .client import JobClient, SetupClient, WebSocketClient, cycleops_client
from .exceptions import NotFound
from .utils import display_error_message, display_success_message

Expand Down Expand Up @@ -180,33 +182,37 @@ def deploy(

try:
setup = get_setup(setup_identifier)

job = setup_client.deploy(setup["id"])
report_queued = print if wait else display_success_message
report_queued(f"Setup {setup['id']} has been queued for deployment")

while wait:
match status := job["status"]:
case "Initialized":
print(f"Setup {setup['id']} has been initialized")
case "Deploying":
print(f"Setup {setup['id']} is being deployed")
case "Deployed":
display_success_message(
f"Setup {setup['id']} has been deployed successfully"
)
break
case "Failed":
display_error_message(job)
raise Exception(f"Setup {setup['id']} could not be deployed")
case _:
print(f"Setup {setup['id']} is in status {status}")
time.sleep(3)
job = job_client.retrieve(job["id"])
except Exception as error:
display_error_message(error)
raise typer.Abort()

deployment_scheduled_message = (
f"Setup {setup_identifier} has been queued for deployment"
)

if not wait:
display_success_message(deployment_scheduled_message)
return

print(f"{deployment_scheduled_message}\n")

try:
display_job_logs(job["id"])
except websockets.exceptions.ConnectionClosed:
job = job_client.retrieve(job["id"])

match job["status"]:
case "Deployed":
display_success_message(
f"Setup {setup_identifier} has been deployed successfully"
)
case "Failed":
display_error_message(f"Setup {setup_identifier} could not be deployed")
case _:
print(f"Setup {setup_identifier} is in status {job['status']}")
return


def get_setup(setup_identifier: str) -> Optional[Dict[str, Any]]:
"""
Expand All @@ -221,3 +227,13 @@ def get_setup(setup_identifier: str) -> Optional[Dict[str, Any]]:
setup = setup_client.retrieve(setup_identifier)

return setup


def display_job_logs(job_id: str) -> None:
"""
Displays the deployements logs of the specified job
"""

websocket_client = WebSocketClient(job_id)

asyncio.get_event_loop().run_until_complete(websocket_client.run())
Loading

0 comments on commit 9a1347e

Please sign in to comment.