Skip to content

Commit 9c6f6e6

Browse files
committed
Update endpoints for NEGs from non-default subnet.
1 parent 16c8d35 commit 9c6f6e6

File tree

2 files changed

+255
-18
lines changed

2 files changed

+255
-18
lines changed

pkg/neg/syncers/transaction.go

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -559,19 +559,15 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
559559
}
560560

561561
if operation == attachOp {
562-
// TODO(sawsa307): Pass in subnet to help distinguish which NEGs needs
563-
// update(in default/non-default subnets).
564-
go s.attachNetworkEndpoints(zone, batch)
562+
go s.attachNetworkEndpoints(endpointGroupInfo, batch)
565563
}
566564
if operation == detachOp {
567565
if zone == migrationZone.Zone && subnet == migrationZone.Subnet {
568566
// Prevent any further migration-detachments from starting while one
569567
// is already in progress.
570568
s.dsMigrator.Pause()
571569
}
572-
// TODO(sawsa307): Pass in subnet to help distinguish which NEGs needs
573-
// update(in default/non-default subnets).
574-
go s.detachNetworkEndpoints(zone, batch, zone == migrationZone.Zone && subnet == migrationZone.Subnet)
570+
go s.detachNetworkEndpoints(endpointGroupInfo, batch, zone == migrationZone.Zone && subnet == migrationZone.Subnet)
575571
}
576572
}
577573
return nil
@@ -588,18 +584,18 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
588584
}
589585

590586
// attachNetworkEndpoints runs operation for attaching network endpoints.
591-
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
592-
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
593-
err := s.operationInternal(attachOp, zone, networkEndpointMap, s.logger)
587+
func (s *transactionSyncer) attachNetworkEndpoints(epGroupInfo negtypes.EndpointGroupInfo, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
588+
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", epGroupInfo.Zone, "subnet", epGroupInfo.Subnet)
589+
err := s.operationInternal(attachOp, epGroupInfo, networkEndpointMap, s.logger)
594590

595591
// WARNING: commitTransaction must be called at last for analyzing the operation result
596592
s.commitTransaction(err, networkEndpointMap)
597593
}
598594

599595
// detachNetworkEndpoints runs operation for detaching network endpoints.
600-
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, hasMigrationDetachments bool) {
601-
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
602-
err := s.operationInternal(detachOp, zone, networkEndpointMap, s.logger)
596+
func (s *transactionSyncer) detachNetworkEndpoints(epGroupInfo negtypes.EndpointGroupInfo, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, hasMigrationDetachments bool) {
597+
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", epGroupInfo.Zone, "subnet", epGroupInfo.Subnet)
598+
err := s.operationInternal(detachOp, epGroupInfo, networkEndpointMap, s.logger)
603599

604600
if hasMigrationDetachments {
605601
// Unpause the migration since the ongoing migration-detachments have
@@ -614,26 +610,38 @@ func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointM
614610
// operationInternal executes NEG API call and commits the transactions
615611
// It will record events when operations are completed
616612
// If error occurs or any transaction entry requires reconciliation, it will trigger resync
617-
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, logger klog.Logger) error {
613+
func (s *transactionSyncer) operationInternal(operation transactionOp, epGroupInfo negtypes.EndpointGroupInfo, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, logger klog.Logger) error {
618614
var err error
619615
start := time.Now()
620616
networkEndpoints := []*composite.NetworkEndpoint{}
621617
for _, ne := range networkEndpointMap {
622618
networkEndpoints = append(networkEndpoints, ne)
623619
}
620+
zone := epGroupInfo.Zone
621+
negName := s.NegSyncerKey.NegName
622+
if flags.F.EnableMultiSubnetClusterPhase1 {
623+
defaultSubnet, err := utils.KeyName(s.networkInfo.SubnetworkURL)
624+
if err != nil {
625+
s.logger.Error(err, "Errored getting default subnet from NetworkInfo when commiting pods")
626+
return err
627+
}
624628

629+
if epGroupInfo.Subnet != defaultSubnet {
630+
negName = s.namer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, epGroupInfo.Subnet, s.NegSyncerKey.PortTuple.Port)
631+
}
632+
}
625633
if operation == attachOp {
626-
err = s.cloud.AttachNetworkEndpoints(s.NegSyncerKey.NegName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger)
634+
err = s.cloud.AttachNetworkEndpoints(negName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger)
627635
}
628636
if operation == detachOp {
629-
err = s.cloud.DetachNetworkEndpoints(s.NegSyncerKey.NegName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger)
637+
err = s.cloud.DetachNetworkEndpoints(negName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger)
630638
}
631639

632640
if err == nil {
633-
s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone))
641+
s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), negName, zone))
634642
s.syncMetricsCollector.UpdateSyncerStatusInMetrics(s.NegSyncerKey, nil, s.inErrorState())
635643
} else {
636-
s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err))
644+
s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), negName, zone, err))
637645
err := checkEndpointBatchErr(err, operation)
638646
syncErr := negtypes.ClassifyError(err)
639647
// If the API call fails for invalid endpoint update request in any goroutine,

0 commit comments

Comments
 (0)