Skip to content
This repository was archived by the owner on Apr 30, 2025. It is now read-only.

Commit e12b43c

Browse files
author
Jaroslav Tóth
authored
Implement topology-discovery sync worker (#60)
* Implement topology-discovery sync worker * Wrap topology-discovery workers under ServiceWorkersImpl class * Fix typo * Add missing dependency types-requests (dev group)
1 parent f6f3d06 commit e12b43c

File tree

5 files changed

+616
-343
lines changed

5 files changed

+616
-343
lines changed
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
# 1.0.0
2-
- Updated frinx-python-sdk to version ^1.1
2+
- Updated frinx-python-sdk to version ^1.1
3+
4+
# 1.1.0
5+
- Implemented worker and workflow for synchronization of the devices in the specified topology.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
from enum import Enum
2+
3+
import requests
4+
from frinx.common.conductor_enums import TaskResultStatus
5+
from frinx.common.frinx_rest import INVENTORY_HEADERS
6+
from frinx.common.frinx_rest import T0POLOGY_URL_BASE
7+
from frinx.common.type_aliases import DictAny
8+
from frinx.common.worker.task_def import TaskOutput
9+
from frinx.common.worker.task_result import TaskResult
10+
from graphql_pydantic_converter.graphql_types import QueryForm
11+
from pydantic import BaseModel
12+
from pydantic import Field
13+
14+
15+
class TopologyWorkerOutput(TaskOutput):
16+
"""Topology-Discovery worker output."""
17+
18+
query: str = Field(
19+
description="Constructed GraphQL query.",
20+
)
21+
variable: DictAny | None = Field(
22+
description="Constructed input GraphQL variables.",
23+
default=None
24+
)
25+
response_body: DictAny | None = Field(
26+
description="Response body.",
27+
)
28+
response_code: int = Field(
29+
description="Response code.",
30+
)
31+
32+
33+
class ResponseStatus(str, Enum):
34+
"""Response status."""
35+
36+
DATA = "data"
37+
"""Response contains valid data."""
38+
ERRORS = "errors"
39+
"""Response contains some errors."""
40+
FAILED = "failed"
41+
"""HTTP request failed without providing list of errors in response."""
42+
43+
44+
class TopologyOutput(BaseModel):
45+
"""Parsed response from Topology-Discovery service."""
46+
47+
status: ResponseStatus = Field(
48+
description="Response status."
49+
)
50+
code: int = Field(
51+
description="Parsed response code."
52+
)
53+
data: DictAny | None = Field(
54+
default=None,
55+
description="Structured response data."
56+
)
57+
58+
59+
def execute_graphql_operation(
60+
query: str,
61+
variables: DictAny | None = None,
62+
topology_url_base: str = T0POLOGY_URL_BASE
63+
) -> TopologyOutput:
64+
"""
65+
Execute GraphQL query.
66+
67+
:param query: GraphQL query
68+
:param variables: GraphQL variables in dictionary format
69+
:param topology_url_base: Topology-Discovery service URL base
70+
:return: TopologyOutput object
71+
""" ""
72+
response = requests.post(
73+
topology_url_base,
74+
json={
75+
"query": query,
76+
"variables": variables
77+
},
78+
headers=INVENTORY_HEADERS
79+
)
80+
data = response.json()
81+
status_code = response.status_code
82+
83+
if data.get("errors") is not None:
84+
return TopologyOutput(data=data["errors"], status=ResponseStatus.ERRORS, code=status_code)
85+
86+
if data.get("data") is not None:
87+
return TopologyOutput(data=data["data"], status=ResponseStatus.DATA, code=status_code)
88+
89+
return TopologyOutput(status=ResponseStatus.FAILED, code=status_code)
90+
91+
92+
def response_handler(query: QueryForm, response: TopologyOutput) -> TaskResult:
93+
"""
94+
Handle response from Topology-Discovery service.
95+
96+
:param query: GraphQL query information
97+
:param response: parsed topology-discovery response
98+
:return: built TaskResult object
99+
"""
100+
output = TopologyWorkerOutput(
101+
response_code=response.code,
102+
response_body=response.data,
103+
query=query.query,
104+
variable=query.variable
105+
)
106+
match response.status:
107+
case ResponseStatus.DATA:
108+
task_result = TaskResult(status=TaskResultStatus.COMPLETED)
109+
task_result.status = TaskResultStatus.COMPLETED
110+
task_result.output = output
111+
return task_result
112+
case _:
113+
task_result = TaskResult(status=TaskResultStatus.FAILED)
114+
task_result.status = TaskResultStatus.FAILED
115+
task_result.logs = str(response)
116+
task_result.output = output
117+
return task_result
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from frinx.common.type_aliases import ListStr
2+
from frinx.common.worker.service import ServiceWorkersImpl
3+
from frinx.common.worker.task_def import TaskDefinition
4+
from frinx.common.worker.task_def import TaskExecutionProperties
5+
from frinx.common.worker.task_def import TaskInput
6+
from frinx.common.worker.task_result import TaskResult
7+
from frinx.common.worker.worker import WorkerImpl
8+
from frinx_api.topology_discovery import SyncMutation
9+
from frinx_api.topology_discovery import SyncResponse
10+
from frinx_api.topology_discovery import TopologyType
11+
from pydantic import Field
12+
13+
from frinx_worker.topology_discovery.utils import TopologyOutput
14+
from frinx_worker.topology_discovery.utils import TopologyWorkerOutput
15+
from frinx_worker.topology_discovery.utils import execute_graphql_operation
16+
from frinx_worker.topology_discovery.utils import response_handler
17+
18+
19+
class TopologyDiscoveryWorkers(ServiceWorkersImpl):
20+
class TopologySync(WorkerImpl):
21+
_sync_topology: SyncMutation = SyncMutation(
22+
topologyType=TopologyType.PHYSICAL_TOPOLOGY,
23+
devices=["dev1"],
24+
labels=["label1"],
25+
payload=SyncResponse(
26+
labels=True,
27+
loadedDevices=True,
28+
devicesMissingInInventory=True,
29+
devicesMissingInUniconfig=True,
30+
)
31+
)
32+
33+
class ExecutionProperties(TaskExecutionProperties):
34+
exclude_empty_inputs: bool = True
35+
36+
class WorkerDefinition(TaskDefinition):
37+
name: str = "TOPOLOGY_sync"
38+
description: str = "Synchronize specified devices in the topology"
39+
labels: ListStr = ["BASIC", "TOPOLOGY"]
40+
timeout_seconds: int = 3600
41+
response_timeout_seconds: int = 3600
42+
43+
class WorkerInput(TaskInput):
44+
devices: ListStr = Field(
45+
description="List of device identifiers that should be synchronized",
46+
examples=[["device_id_1", "device_id_2"]],
47+
)
48+
topology: TopologyType = Field(
49+
description="To be synchronized topology type",
50+
examples=[TopologyType.ETH_TOPOLOGY],
51+
)
52+
labels: ListStr | None = Field(
53+
description="List of labels that are assigned to the synchronized devices",
54+
examples=[["label_1", "label_2"]],
55+
)
56+
57+
class WorkerOutput(TopologyWorkerOutput):
58+
...
59+
60+
def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:
61+
self._sync_topology.topology_type = worker_input.topology
62+
self._sync_topology.devices = worker_input.devices
63+
self._sync_topology.labels = worker_input.labels
64+
65+
query = self._sync_topology.render()
66+
response: TopologyOutput = execute_graphql_operation(query=query.query, variables=query.variable)
67+
return response_handler(query, response)

0 commit comments

Comments
 (0)