1
1
use crate :: collect:: k8s_resources:: common:: KUBERNETES_HOST_LABEL_KEY ;
2
+ use k8s_operators:: diskpool:: crd:: DiskPool ;
3
+
2
4
use k8s_openapi:: api:: {
3
5
apps:: v1:: { DaemonSet , Deployment , StatefulSet } ,
4
6
core:: v1:: { Event , Node , Pod } ,
5
7
} ;
6
- use k8s_operators:: diskpool:: crd:: DiskPool ;
7
- use kube:: { api:: ListParams , Api , Client , Resource } ;
8
-
8
+ use kube:: {
9
+ api:: { DynamicObject , ListParams } ,
10
+ discovery:: { verbs, Scope } ,
11
+ Api , Client , Discovery , Resource ,
12
+ } ;
9
13
use std:: { collections:: HashMap , convert:: TryFrom } ;
10
14
15
+ const SNAPSHOT_GROUP : & str = "snapshot.storage.k8s.io" ;
16
+ const SNAPSHOT_VERSION : & str = "v1" ;
17
+ const VOLUME_SNAPSHOT_CLASS : & str = "VolumeSnapshotClass" ;
18
+ const VOLUME_SNAPSHOT_CONTENT : & str = "VolumeSnapshotContent" ;
19
+ const DRIVER : & str = "driver" ;
20
+ const SPEC : & str = "spec" ;
21
+
11
22
/// K8sResourceError holds errors that can obtain while fetching
12
23
/// information of Kubernetes Objects
13
24
#[ allow( clippy:: enum_variant_names) ]
@@ -87,6 +98,43 @@ impl ClientSet {
87
98
self . client . clone ( )
88
99
}
89
100
101
+ /// Get a new api for a `dynamic_object` for the provided GVK.
102
+ pub ( crate ) async fn dynamic_object_api (
103
+ & self ,
104
+ namespace : Option < & str > ,
105
+ group_name : & str ,
106
+ version : & str ,
107
+ kind : & str ,
108
+ ) -> Result < Api < DynamicObject > , K8sResourceError > {
109
+ let discovery = Discovery :: new ( self . kube_client ( ) ) . run ( ) . await ?;
110
+ for group in discovery. groups ( ) {
111
+ if group. name ( ) == group_name {
112
+ for ( ar, caps) in group. recommended_resources ( ) {
113
+ if !caps. supports_operation ( verbs:: LIST ) {
114
+ continue ;
115
+ }
116
+ if ar. version == version && ar. kind == kind {
117
+ let result = match namespace {
118
+ None if caps. scope == Scope :: Cluster => {
119
+ Ok ( Api :: all_with ( self . kube_client ( ) , & ar) )
120
+ }
121
+ Some ( ns) if caps. scope == Scope :: Namespaced => {
122
+ Ok ( Api :: namespaced_with ( self . kube_client ( ) , ns, & ar) )
123
+ }
124
+ _ => Err ( K8sResourceError :: CustomError ( format ! (
125
+ "DynamicObject Api not available for {kind} of {group_name}/{version}"
126
+ ) ) ) ,
127
+ } ;
128
+ return result;
129
+ }
130
+ }
131
+ }
132
+ }
133
+ Err ( K8sResourceError :: CustomError ( format ! (
134
+ "DynamicObject Api not available for {kind} of {group_name}/{version}"
135
+ ) ) )
136
+ }
137
+
90
138
/// Fetch node objects from API-server then form and return map of node name to node object
91
139
pub ( crate ) async fn get_nodes_map ( & self ) -> Result < HashMap < String , Node > , K8sResourceError > {
92
140
let node_api: Api < Node > = Api :: all ( self . client . clone ( ) ) ;
@@ -164,6 +212,115 @@ impl ClientSet {
164
212
Ok ( pools. items )
165
213
}
166
214
215
+ /// Fetch list of volume snapshot classes based on the driver if provided.
216
+ pub ( crate ) async fn list_volumesnapshot_classes (
217
+ & self ,
218
+ driver_selector : Option < & str > ,
219
+ label_selector : Option < & str > ,
220
+ field_selector : Option < & str > ,
221
+ ) -> Result < Vec < DynamicObject > , K8sResourceError > {
222
+ let list_params = ListParams :: default ( )
223
+ . labels ( label_selector. unwrap_or_default ( ) )
224
+ . fields ( field_selector. unwrap_or_default ( ) ) ;
225
+ let vsc_api: Api < DynamicObject > = self
226
+ . dynamic_object_api (
227
+ None ,
228
+ SNAPSHOT_GROUP ,
229
+ SNAPSHOT_VERSION ,
230
+ VOLUME_SNAPSHOT_CLASS ,
231
+ )
232
+ . await ?;
233
+ let vscs = match vsc_api. list ( & list_params) . await {
234
+ Ok ( val) => val,
235
+ Err ( kube_error) => match kube_error {
236
+ kube:: Error :: Api ( e) => {
237
+ if e. code == 404 {
238
+ return Ok ( vec ! [ ] ) ;
239
+ }
240
+ return Err ( K8sResourceError :: ClientError ( kube:: Error :: Api ( e) ) ) ;
241
+ }
242
+ _ => return Err ( K8sResourceError :: ClientError ( kube_error) ) ,
243
+ } ,
244
+ } ;
245
+ Ok ( vscs
246
+ . items
247
+ . into_iter ( )
248
+ . filter ( |item| match driver_selector {
249
+ None => true ,
250
+ Some ( driver_selector) => match item. data . get ( DRIVER ) {
251
+ None => false ,
252
+ Some ( value) => match value. as_str ( ) {
253
+ None => false ,
254
+ Some ( driver) => driver == driver_selector,
255
+ } ,
256
+ } ,
257
+ } )
258
+ . collect ( ) )
259
+ }
260
+
261
+ /// Fetch list of volume snapshot contents based on the driver if provided.
262
+ pub ( crate ) async fn list_volumesnapshotcontents (
263
+ & self ,
264
+ driver_selector : Option < & str > ,
265
+ label_selector : Option < & str > ,
266
+ field_selector : Option < & str > ,
267
+ ) -> Result < Vec < DynamicObject > , K8sResourceError > {
268
+ let mut list_params = ListParams :: default ( )
269
+ . labels ( label_selector. unwrap_or_default ( ) )
270
+ . fields ( field_selector. unwrap_or_default ( ) )
271
+ . limit ( 2 ) ;
272
+ let vsc_api: Api < DynamicObject > = self
273
+ . dynamic_object_api (
274
+ None ,
275
+ SNAPSHOT_GROUP ,
276
+ SNAPSHOT_VERSION ,
277
+ VOLUME_SNAPSHOT_CONTENT ,
278
+ )
279
+ . await ?;
280
+
281
+ let mut vscs_filtered: Vec < DynamicObject > = vec ! [ ] ;
282
+ loop {
283
+ let vscs = match vsc_api. list ( & list_params) . await {
284
+ Ok ( val) => val,
285
+ Err ( kube_error) => match kube_error {
286
+ kube:: Error :: Api ( e) => {
287
+ if e. code == 404 {
288
+ return Ok ( vec ! [ ] ) ;
289
+ }
290
+ return Err ( K8sResourceError :: ClientError ( kube:: Error :: Api ( e) ) ) ;
291
+ }
292
+ _ => return Err ( K8sResourceError :: ClientError ( kube_error) ) ,
293
+ } ,
294
+ } ;
295
+ vscs_filtered. append (
296
+ & mut vscs
297
+ . items
298
+ . into_iter ( )
299
+ . filter ( |item| match driver_selector {
300
+ None => true ,
301
+ Some ( driver_selector) => match item. data . get ( SPEC ) {
302
+ None => false ,
303
+ Some ( value) => match value. get ( DRIVER ) {
304
+ None => false ,
305
+ Some ( value) => match value. as_str ( ) {
306
+ None => false ,
307
+ Some ( driver) => driver == driver_selector,
308
+ } ,
309
+ } ,
310
+ } ,
311
+ } )
312
+ . collect ( ) ,
313
+ ) ;
314
+ match vscs. metadata . continue_ {
315
+ Some ( token) if !token. is_empty ( ) => {
316
+ list_params = list_params. continue_token ( token. as_str ( ) )
317
+ }
318
+ _ => break ,
319
+ } ;
320
+ }
321
+ Ok ( vscs_filtered)
322
+ }
323
+
167
324
/// Fetch list of k8s events associated to given label_selector & field_selector
168
325
pub ( crate ) async fn get_events (
169
326
& self ,
@@ -183,8 +340,10 @@ impl ClientSet {
183
340
let mut result = events_api. list ( & list_params) . await ?;
184
341
events. append ( & mut result. items ) ;
185
342
match result. metadata . continue_ {
186
- None => break ,
187
- Some ( token) => list_params = list_params. continue_token ( token. as_str ( ) ) ,
343
+ Some ( token) if !token. is_empty ( ) => {
344
+ list_params = list_params. continue_token ( token. as_str ( ) )
345
+ }
346
+ _ => break ,
188
347
} ;
189
348
}
190
349
0 commit comments