@@ -24,6 +24,7 @@ import (
24
24
"github.com/container-storage-interface/spec/lib/go/csi"
25
25
apisv1 "github.com/openebs/api/v2/pkg/apis/cstor/v1"
26
26
"github.com/openebs/cstor-csi/pkg/env"
27
+ k8snode "github.com/openebs/cstor-csi/pkg/kubernetes/node"
27
28
csipayload "github.com/openebs/cstor-csi/pkg/payload"
28
29
analytics "github.com/openebs/cstor-csi/pkg/usage"
29
30
utils "github.com/openebs/cstor-csi/pkg/utils"
@@ -33,6 +34,7 @@ import (
33
34
"google.golang.org/grpc/codes"
34
35
"google.golang.org/grpc/status"
35
36
k8serror "k8s.io/apimachinery/pkg/api/errors"
37
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
38
)
37
39
38
40
// controller is the server implementation
@@ -106,7 +108,10 @@ func (cs *controller) CreateVolume(
106
108
pvcName := req .GetParameters ()[pvcNameKey ]
107
109
pvcNamespace := req .GetParameters ()[pvcNamespaceKey ]
108
110
109
- nodeID = getAccessibilityRequirements (req .GetAccessibilityRequirements ())
111
+ nodeID , err = getAccessibilityRequirements (req .GetAccessibilityRequirements ())
112
+ if err != nil {
113
+ return nil , err
114
+ }
110
115
111
116
contentSource := req .GetVolumeContentSource ()
112
117
if contentSource != nil && contentSource .GetSnapshot () != nil {
@@ -337,20 +342,20 @@ func (cs *controller) ListVolumes(
337
342
return nil , status .Error (codes .Unimplemented , "" )
338
343
}
339
344
340
- func getAccessibilityRequirements (requirement * csi.TopologyRequirement ) string {
345
+ func getAccessibilityRequirements (requirement * csi.TopologyRequirement ) ( string , error ) {
341
346
if requirement == nil {
342
- return ""
347
+ return "" , status . Error ( codes . Internal , "accessibility_requirements not found" )
343
348
}
344
349
345
- preferredNode , exists := requirement . GetPreferred ()[ 0 ]. GetSegments ()[ TopologyNodeKey ]
346
- if exists {
347
- return preferredNode
350
+ node , err := getNode ( requirement )
351
+ if err != nil {
352
+ return "" , status . Errorf ( codes . Internal , "failed to get the accessibility_requirements node %v" , err )
348
353
}
349
- preferredNode , exists = requirement . GetRequisite ()[ 0 ]. GetSegments ()[ TopologyNodeKey ]
350
- if exists {
351
- return preferredNode
354
+
355
+ if len ( node ) == 0 {
356
+ return "" , status . Error ( codes . Internal , "can not find any node" )
352
357
}
353
- return ""
358
+ return node , nil
354
359
}
355
360
356
361
// sendEventOrIgnore sends anonymous cstor provision/delete events
@@ -366,3 +371,28 @@ func sendEventOrIgnore(pvcName, pvName, capacity, replicaCount, stgType, method
366
371
SetVolumeCapacity (capacity ).Send ()
367
372
}
368
373
}
374
+
375
+ // getNode gets the node which satisfies the topology info
376
+ func getNode (topo * csi.TopologyRequirement ) (string , error ) {
377
+
378
+ list , err := k8snode .NewKubeClient ().List (metav1.ListOptions {})
379
+ if err != nil {
380
+ return "" , err
381
+ }
382
+
383
+ for _ , prf := range topo .Preferred {
384
+ for _ , node := range list .Items {
385
+ nodeFiltered := false
386
+ for key , value := range prf .Segments {
387
+ if node .Labels [key ] != value {
388
+ nodeFiltered = true
389
+ break
390
+ }
391
+ }
392
+ if nodeFiltered == false {
393
+ return node .Name , nil
394
+ }
395
+ }
396
+ }
397
+ return "" , nil
398
+ }
0 commit comments