Skip to content

Commit

Permalink
refactor: divide subscribeToConfigPod method to manage GRPC server co…
Browse files Browse the repository at this point in the history
…nnection separately through the modules

refactor:
- add ConnectToGrpcServer method to ConfClient interface
- change signatures of PublishOnConfigChange and subsribeToConfigPod methods

Signed-off-by: gatici <[email protected]>
  • Loading branch information
gatici committed Oct 2, 2024
1 parent c2e6e90 commit 37117a3
Showing 1 changed file with 64 additions and 66 deletions.
130 changes: 64 additions & 66 deletions proto/client/gClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,22 @@ type ConfigClient struct {
}

type ConfClient interface {
// channel is created on which subscription is done.
// A channel is created on which subscription is done.
// On Receiving Configuration from ConfigServer, this api publishes
// on created channel and returns the channel
PublishOnConfigChange(bool) chan *protos.NetworkSliceResponse
PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse

// returns grpc connection object
// GetConfigClientConn returns grpc connection object
GetConfigClientConn() *grpc.ClientConn

// Client Subscribing channel to ConfigPod to receive configuration
subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse)
subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient)

// ConnectToGrpcServer connects to a GRPC server with a Client ID and returns a stream
ConnectToGrpcServer() (stream protos.ConfigService_NetworkSliceSubscribeClient)
}

// This API is added to control metadata from NF Clients
// ConnectToConfigServer this API is added to control metadata from NF clients
func ConnectToConfigServer(host string) ConfClient {
confClient := CreateChannel(host, 10000)
if confClient == nil {
Expand All @@ -72,25 +75,24 @@ func ConnectToConfigServer(host string) ConfClient {
return confClient
}

func (confClient *ConfigClient) PublishOnConfigChange(mdataFlag bool) chan *protos.NetworkSliceResponse {
confClient.MetadataRequested = mdataFlag
func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse {
confClient.MetadataRequested = metadataFlag
commChan := make(chan *protos.NetworkSliceResponse)
confClient.Channel = commChan
go confClient.subscribeToConfigPod(commChan)
go confClient.subscribeToConfigPod(commChan, stream)
return commChan
}

// pass structr which has configChangeUpdate interface
func ConfigWatcher(webuiUri string) chan *protos.NetworkSliceResponse {
// var confClient *gClient.ConfigClient
// ConfigWatcher pass structr which has configChangeUpdate interface
func ConfigWatcher(webuiUri string, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse {
// TODO: use port from configmap.
confClient := CreateChannel(webuiUri, 10000)
if confClient == nil {
logger.GrpcLog.Errorf("create grpc channel to config pod failed")
return nil
}
commChan := make(chan *protos.NetworkSliceResponse)
go confClient.subscribeToConfigPod(commChan)
go confClient.subscribeToConfigPod(commChan, stream)
return commChan
}

Expand Down Expand Up @@ -156,64 +158,60 @@ func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn {
return confClient.Conn
}

func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) {
logger.GrpcLog.Infoln("subscribeToConfigPod")
// ConnectToGrpcServer connects to a GRPC server with a Client ID
// and returns a stream if connection is successful else returns nil
func (confClient *ConfigClient) ConnectToGrpcServer() (stream protos.ConfigService_NetworkSliceSubscribeClient) {
logger.GrpcLog.Infoln("connectToGrpcServer")
myid := os.Getenv("HOSTNAME")
var stream protos.ConfigService_NetworkSliceSubscribeClient
for {
if stream == nil {
status := confClient.Conn.GetState()
var err error
if status == connectivity.Ready {
logger.GrpcLog.Infoln("connectivity ready")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
logger.GrpcLog.Errorf("failed to subscribe: %v", err)
time.Sleep(time.Second * 5)
// Retry on failure
continue
}
} else if status == connectivity.Idle {
logger.GrpcLog.Errorln("connectivity status idle, trying to connect again")
time.Sleep(time.Second * 5)
continue
} else {
logger.GrpcLog.Errorln("connectivity status not ready")
time.Sleep(time.Second * 5)
continue
}
}
rsp, err := stream.Recv()
if err != nil {
logger.GrpcLog.Errorf("failed to receive message: %v", err)
// Clearing the stream will force the client to resubscribe on next iteration
stream = nil
time.Sleep(time.Second * 5)
// Retry on failure
continue
stream = nil
status := confClient.Conn.GetState()
var err error
if status == connectivity.Ready {
logger.GrpcLog.Infoln("connectivity ready")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
logger.GrpcLog.Errorf("failed to subscribe: %v", err)
return stream
}
return stream
} else if status == connectivity.Idle {
logger.GrpcLog.Errorf("connectivity status idle, trying to connect again")
return stream
} else {
logger.GrpcLog.Errorf("connectivity status not ready")
return stream
}
}

logger.GrpcLog.Infoln("stream msg received")
logger.GrpcLog.Debugf("network slices %d, RC of configpod %d", len(rsp.NetworkSlice), rsp.RestartCounter)
if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) {
// first time connection or config update
configPodRestartCounter = rsp.RestartCounter
if len(rsp.NetworkSlice) > 0 {
// always carries full config copy
logger.GrpcLog.Infoln("first time config received", rsp)
commChan <- rsp
} else if rsp.ConfigUpdated == 1 {
// config delete , all slices deleted
logger.GrpcLog.Infoln("complete config deleted")
commChan <- rsp
}
} else if len(rsp.NetworkSlice) > 0 {
logger.GrpcLog.Errorln("config received after config pod restart")
// config received after config pod restart
configPodRestartCounter = rsp.RestartCounter
// subscribeToConfigPod subscribing channel to ConfigPod to receive configuration
// using stream and communication channel as inputs
func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) {
rsp, err := stream.Recv()
if err != nil {
logger.GrpcLog.Errorf("failed to receive message: %v", err)
return
}

logger.GrpcLog.Infoln("stream msg received")
logger.GrpcLog.Debugf("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter)
if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) {
// first time connection or config update
configPodRestartCounter = rsp.RestartCounter
if len(rsp.NetworkSlice) > 0 {
// always carries full config copy
logger.GrpcLog.Infoln("first time config received", rsp)
commChan <- rsp
} else if rsp.ConfigUpdated == 1 {
// config delete , all slices deleted
logger.GrpcLog.Infoln("complete config deleted")
commChan <- rsp
} else {
logger.GrpcLog.Errorln("config pod is restarted and no config received")
}
} else if len(rsp.NetworkSlice) > 0 {
logger.GrpcLog.Errorf("config received after config pod restart")
// config received after config pod restart
configPodRestartCounter = rsp.RestartCounter
commChan <- rsp
} else {
logger.GrpcLog.Errorf("config pod is restarted and no config received")
}
}

0 comments on commit 37117a3

Please sign in to comment.