Skip to content

Commit

Permalink
fix(controller): Adjust controller -> scheduler state recreation upon…
Browse files Browse the repository at this point in the history
… scheduler disconnect. (#5944)

* move initial sync to scheduler to a separate logic (servers)

* add coverage for model statues

* add control plane rpc (scheduler<->controller)

* add control plane stream code and test

* add stop control plane stream

* add control plane client handling

* add handle state reconstruction to the control plane

* adjust name of the logger func

* block streams connecting until scheduler is ready

* adjust simple schedule code

* adjust simple sync (2)

* add exec with timeout for the control plane ops (for state)

* do not block Load/Unload models on scheduling

* add test for subscribe control plane stream

* extend control plane tests

* fix lint

* PR comments
  • Loading branch information
sakoush authored Oct 4, 2024
1 parent d62e997 commit 6c3410d
Show file tree
Hide file tree
Showing 27 changed files with 1,554 additions and 304 deletions.
427 changes: 277 additions & 150 deletions apis/go/mlops/scheduler/scheduler.pb.go

Large diffs are not rendered by default.

67 changes: 67 additions & 0 deletions apis/go/mlops/scheduler/scheduler_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions apis/mlops/scheduler/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ message SchedulerStatusResponse {
string applicationVersion = 1;
}

message ControlPlaneSubscriptionRequest {
string subscriberName = 1; //Name of the subscription caller
}

message ControlPlaneResponse {

}

// [END Messages]


Expand Down Expand Up @@ -412,6 +420,9 @@ service Scheduler {
rpc SubscribeModelStatus(ModelSubscriptionRequest) returns (stream ModelStatusResponse) {};
rpc SubscribeExperimentStatus(ExperimentSubscriptionRequest) returns (stream ExperimentStatusResponse) {};
rpc SubscribePipelineStatus(PipelineSubscriptionRequest) returns (stream PipelineStatusResponse) {};

// control plane stream with controller
rpc SubscribeControlPlane(ControlPlaneSubscriptionRequest) returns (stream ControlPlaneResponse) {};
}

// [END Services]
55 changes: 50 additions & 5 deletions operator/scheduler/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,17 @@ func getSchedulerHost(namespace string) string {
// we also add a retry mechanism to reconnect if the connection is lost, this can happen if the scheduler is restarted
// or if the network connection is lost. We use an exponential backoff to retry the connection.
// note that when the scheduler is completely dead we will be not be able to reconnect and these go routines will retry forever
// on reconnect we send the state of the different resources to the scheduler, this is to make sure that the scheduler has the correct state
// TODO: add a max retry count and report back to the caller.
// TODO add done for graceful shutdown otherwise these go routines will run forever
// TODO tidy up ctx from the different handlers, currently they are all context.Background()
func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientConn) {
s.logger.Info("Starting event handling", "namespace", namespace)

// Subscribe the event streams from scheduler
go func() {
for {
err := retryFn(s.SubscribeModelEvents, conn, namespace, s.logger)
err := retryFn(s.SubscribeModelEvents, conn, namespace, s.logger.WithName("SubscribeModelEvents"))
if err != nil {
s.logger.Error(err, "Subscribe ended for model events", "namespace", namespace)
} else {
Expand All @@ -88,7 +93,7 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC
}()
go func() {
for {
err := retryFn(s.SubscribeServerEvents, conn, namespace, s.logger)
err := retryFn(s.SubscribeServerEvents, conn, namespace, s.logger.WithName("SubscribeServerEvents"))
if err != nil {
s.logger.Error(err, "Subscribe ended for server events", "namespace", namespace)
} else {
Expand All @@ -98,7 +103,7 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC
}()
go func() {
for {
err := retryFn(s.SubscribePipelineEvents, conn, namespace, s.logger)
err := retryFn(s.SubscribePipelineEvents, conn, namespace, s.logger.WithName("SubscribePipelineEvents"))
if err != nil {
s.logger.Error(err, "Subscribe ended for pipeline events", "namespace", namespace)
} else {
Expand All @@ -108,14 +113,54 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC
}()
go func() {
for {
err := retryFn(s.SubscribeExperimentEvents, conn, namespace, s.logger)
err := retryFn(s.SubscribeExperimentEvents, conn, namespace, s.logger.WithName("SubscribeExperimentEvents"))
if err != nil {
s.logger.Error(err, "Subscribe ended for experiment events", "namespace", namespace)
} else {
s.logger.Info("Subscribe ended for experiment events", "namespace", namespace)
}
}
}()
go func() {
for {
err := retryFn(s.SubscribeControlPlaneEvents, conn, namespace, s.logger.WithName("SubscribeControlPlaneEvents"))
if err != nil {
s.logger.Error(err, "Subscribe ended for control plane events", "namespace", namespace)
} else {
s.logger.Info("Subscribe ended for control plane events", "namespace", namespace)
}
}
}()
}

func (s *SchedulerClient) handleStateOnReconnect(context context.Context, grpcClient scheduler.SchedulerClient, namespace string) error {
// on new reconnects we send a list of servers to the schedule
err := s.handleRegisteredServers(context, grpcClient, namespace)
if err != nil {
s.logger.Error(err, "Failed to send registered server to scheduler")
}

if err == nil {
err = s.handleExperiments(context, grpcClient, namespace)
if err != nil {
s.logger.Error(err, "Failed to send experiments to scheduler")
}
}

if err == nil {
err = s.handlePipelines(context, grpcClient, namespace)
if err != nil {
s.logger.Error(err, "Failed to send pipelines to scheduler")
}
}

if err == nil {
err = s.handleModels(context, grpcClient, namespace)
if err != nil {
s.logger.Error(err, "Failed to send models to scheduler")
}
}
return err
}

func (s *SchedulerClient) RemoveConnection(namespace string) {
Expand Down Expand Up @@ -253,7 +298,7 @@ func retryFn(
fn func(context context.Context, grpcClient scheduler.SchedulerClient, namespace string) error,
conn *grpc.ClientConn, namespace string, logger logr.Logger,
) error {
logger.Info("RetryFn", "namespace", namespace)
logger.Info("Retrying to connect", "namespace", namespace)
logFailure := func(err error, delay time.Duration) {
logger.Error(err, "Scheduler not ready")
}
Expand Down
83 changes: 83 additions & 0 deletions operator/scheduler/control_plane.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright (c) 2024 Seldon Technologies Ltd.
Use of this software is governed by
(1) the license included in the LICENSE file or
(2) if the license included in the LICENSE file is the Business Source License 1.1,
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
*/

package scheduler

import (
"context"
"io"
"time"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"
)

const (
execTimeOut = 5 * time.Minute
)

func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcClient scheduler.SchedulerClient, namespace string) error {
logger := s.logger.WithName("SubscribeControlPlaneEvents")

stream, err := grpcClient.SubscribeControlPlane(
ctx,
&scheduler.ControlPlaneSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
)
if err != nil {
return err
}

for {
event, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
logger.Error(err, "event recv failed")
return err
}
logger.Info("Received event to handle state", "event", event)

fn := func() error {
return s.handleStateOnReconnect(ctx, grpcClient, namespace)
}
_, err = execWithTimeout(fn, execTimeOut)
if err != nil {
logger.Error(err, "Failed to handle state on reconnect")
return err
}

logger.Info("Handled state on reconnect")

}
return nil
}

func execWithTimeout(f func() error, d time.Duration) (bool, error) {
errChan := make(chan error, 1)
go func() {
errChan <- f()
close(errChan)
}()
t := time.NewTimer(d)
select {
case <-t.C:
return true, status.Errorf(codes.DeadlineExceeded, "Failed to send event within timeout")
case err := <-errChan:
if !t.Stop() {
<-t.C
}
return false, err
}
}
Loading

0 comments on commit 6c3410d

Please sign in to comment.