@@ -35,6 +35,7 @@ class WorkerInput(TaskInput):
3535 node_id : str
3636 uri : str | None = None
3737 topology_id : str = "uniconfig"
38+ jsonb_filter : str | None = None
3839 transaction_id : str | None = None
3940 uniconfig_server_id : str | None = None
4041 uniconfig_url_base : str = UNICONFIG_URL_BASE
@@ -67,6 +68,65 @@ def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:
6768
6869 return handle_response (response , self .WorkerOutput )
6970
71+ class ReadStructuredDataWithJsonBFilter (WorkerImpl ):
72+ from frinx_api .uniconfig .rest_api import ReadStructuredData as UniconfigApi
73+
74+ class ExecutionProperties (TaskExecutionProperties ):
75+ exclude_empty_inputs : bool = True
76+
77+ class WorkerDefinition (TaskDefinition ):
78+ name : str = "UNICONFIG_read_structured_device_data_with_jsonb_filter"
79+ description : str = (
80+ "Read device configuration or operational data in structured format e.g. openconfig "
81+ "and filter the output using jsonb (JsonPath) filter"
82+ )
83+ labels : list [str ] = ["BASICS" , "UNICONFIG" , "OPENCONFIG" ]
84+
85+ class WorkerInput (TaskInput ):
86+ # url
87+ node_id : str
88+ uri : str | None = None
89+ topology_id : str = "uniconfig"
90+ # jsonb request parameters
91+ jsonb_filter : str
92+ add_parent_structure : bool = True
93+ # uniconfig
94+ transaction_id : str | None = None
95+ uniconfig_server_id : str | None = None
96+ uniconfig_url_base : str = UNICONFIG_URL_BASE
97+
98+ class WorkerOutput (TaskOutput ):
99+ output : DictAny
100+
101+ def execute (self , worker_input : WorkerInput ) -> TaskResult [WorkerOutput ]:
102+ uri = ""
103+ if worker_input .uri :
104+ if not worker_input .uri .startswith ("/" ):
105+ uri = f"/{ worker_input .uri } "
106+ else :
107+ uri = worker_input .uri
108+
109+ escaped_node_id = escape_uniconfig_uri_key (worker_input .node_id )
110+ url = worker_input .uniconfig_url_base + self .UniconfigApi .uri .format (
111+ topology_id = worker_input .topology_id , node_id = escaped_node_id , uri = uri
112+ )
113+
114+ response = requests .request (
115+ url = url ,
116+ method = "POST" ,
117+ cookies = uniconfig_zone_to_cookie (
118+ uniconfig_server_id = worker_input .uniconfig_server_id , transaction_id = worker_input .transaction_id
119+ ),
120+ headers = {"Content-Type" : "application/filter-data+json" },
121+ params = UNICONFIG_REQUEST_PARAMS ,
122+ json = {
123+ "query" : worker_input .jsonb_filter ,
124+ "addParentStructure" : worker_input .add_parent_structure
125+ }
126+ )
127+
128+ return handle_response (response , self .WorkerOutput )
129+
70130 class WriteStructuredData (WorkerImpl ):
71131 from frinx_api .uniconfig .rest_api import ReadStructuredData as UniconfigApi
72132
0 commit comments