Skip to content

Commit 6823740

Browse files
committed
refactor(dynamic mount): introduce dynamic server manager
To ensure secure isolation for each dynamic mount and avoid unstable mount propagation, an independent csi.sock is currently created under each dynamic mount directory instead of using a shared csi.sock, these individual csi.sock servers are managed by the DynamicServerManager. Signed-off-by: imeoer <[email protected]>
1 parent 7d62a09 commit 6823740

File tree

10 files changed

+328
-177
lines changed

10 files changed

+328
-177
lines changed

pkg/config/config.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,23 @@ type RawConfig struct {
3535
// static: /var/lib/dragonfly/model-csi/volumes/$volumeName/model
3636
// dynamic: /var/lib/dragonfly/model-csi/volumes/$volumeName/models
3737
// /var/lib/dragonfly/model-csi/volumes/$volumeName/csi.sock
38-
ServiceName string `yaml:"service_name"`
39-
RootDir string `yaml:"root_dir"`
40-
ExternalCSIEndpoint string `yaml:"external_csi_endpoint"`
41-
ExternalCSIAuthorization string `yaml:"external_csi_authorization"`
42-
DynamicCSIEndpoint string `yaml:"dynamic_csi_endpoint"`
43-
CSIEndpoint string `yaml:"csi_endpoint"`
44-
MetricsAddr string `yaml:"metrics_addr"`
45-
TraceEndpoint string `yaml:"trace_endpoint"`
46-
PprofAddr string `yaml:"pprof_addr"`
47-
PullConfig PullConfig `yaml:"pull_config"`
48-
Features Features `yaml:"features"`
49-
NodeID string // From env CSI_NODE_ID
50-
Mode string // From env X_CSI_MODE: "controller" or "node"
38+
ServiceName string `yaml:"service_name"`
39+
RootDir string `yaml:"root_dir"`
40+
ExternalCSIEndpoint string `yaml:"external_csi_endpoint"`
41+
ExternalCSIAuthorization string `yaml:"external_csi_authorization"`
42+
// Deprecated: To ensure secure isolation for each dynamic mount and avoid
43+
// unstable mount propagation, an independent csi.sock is currently created
44+
// under each dynamic mount directory instead of using a shared csi.sock,
45+
// these individual csi.sock servers are managed by the DynamicServerManager.
46+
DynamicCSIEndpoint string `yaml:"dynamic_csi_endpoint"`
47+
CSIEndpoint string `yaml:"csi_endpoint"`
48+
MetricsAddr string `yaml:"metrics_addr"`
49+
TraceEndpoint string `yaml:"trace_endpoint"`
50+
PprofAddr string `yaml:"pprof_addr"`
51+
PullConfig PullConfig `yaml:"pull_config"`
52+
Features Features `yaml:"features"`
53+
NodeID string // From env CSI_NODE_ID
54+
Mode string // From env X_CSI_MODE: "controller" or "node"
5155
}
5256

5357
type Features struct {
@@ -135,6 +139,11 @@ func (cfg *RawConfig) GetCSISockDirForDynamic(volumeName string) string {
135139
return filepath.Join(cfg.GetVolumeDirForDynamic(volumeName), "csi")
136140
}
137141

142+
// /var/lib/dragonfly/model-csi/volumes/$volumeName/csi/csi.sock
143+
func (cfg *RawConfig) GetCSISockPathForDynamic(volumeName string) string {
144+
return filepath.Join(cfg.GetCSISockDirForDynamic(volumeName), "csi.sock")
145+
}
146+
138147
func (cfg *RawConfig) IsControllerMode() bool {
139148
return cfg.Mode == "controller"
140149
}

pkg/server/http.go

Lines changed: 0 additions & 74 deletions
This file was deleted.

pkg/server/server.go

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"net/http"
77
"net/url"
88
"os"
9-
"path/filepath"
109
"strings"
1110
"time"
1211

@@ -27,6 +26,7 @@ import (
2726
"github.com/modelpack/model-csi-driver/pkg/metrics"
2827
"github.com/modelpack/model-csi-driver/pkg/provider"
2928
"github.com/modelpack/model-csi-driver/pkg/service"
29+
"github.com/modelpack/model-csi-driver/pkg/utils"
3030
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
3131
)
3232

@@ -42,20 +42,6 @@ var kasp = keepalive.ServerParameters{
4242
Timeout: 30 * time.Second, // Wait 30 second for the ping ack before assuming the connection is dead
4343
}
4444

45-
func ensureSockNotExists(ctx context.Context, sockPath string) error {
46-
_, err := os.Stat(sockPath)
47-
if err == nil {
48-
if err = os.Remove(sockPath); err != nil {
49-
return errors.Wrapf(err, "remove existed sock path: %s", sockPath)
50-
}
51-
logger.WithContext(ctx).Infof("removed existed sock path: %s", sockPath)
52-
}
53-
if err = os.MkdirAll(filepath.Dir(sockPath), 0755); err != nil {
54-
return errors.Wrapf(err, "create sock path dir: %s", filepath.Dir(sockPath))
55-
}
56-
return nil
57-
}
58-
5945
func isSockListening(sockPath string) bool {
6046
if _, err := os.Stat(sockPath); err != nil {
6147
return false
@@ -157,7 +143,7 @@ func (server *Server) Run(ctx context.Context) error {
157143
return errors.Wrap(err, "parse external csi endpoint")
158144
}
159145
if endpoint.Path != "" {
160-
if err := ensureSockNotExists(ctx, endpoint.Path); err != nil {
146+
if err := utils.EnsureSockNotExists(ctx, endpoint.Path); err != nil {
161147
return errors.Wrapf(err, "ensure socket not exists: %s", endpoint.Path)
162148
}
163149
}
@@ -243,26 +229,28 @@ func (server *Server) Run(ctx context.Context) error {
243229
}))
244230
}
245231

232+
// nolint:staticcheck
246233
if server.cfg.Get().DynamicCSIEndpoint != "" {
247234
eg.Go(withFatalError(func() error {
235+
if err := server.svc.DynamicServerManager.RecoverServers(context.Background()); err != nil {
236+
return errors.Wrap(err, "recover dynamic http servers")
237+
}
238+
239+
// Deprecated: use DynamicServerManager to manage dynamic csi.sock servers,
240+
// keep this for backward compatibility.
241+
// nolint:staticcheck
248242
endpoint, err := url.Parse(server.cfg.Get().DynamicCSIEndpoint)
249243
if err != nil {
250244
return errors.Wrap(err, "parse dynamic csi endpoint")
251245
}
252246
if endpoint.Path != "" {
253-
if err := ensureSockNotExists(ctx, endpoint.Path); err != nil {
254-
return errors.Wrapf(err, "ensure socket not exists: %s", endpoint.Path)
247+
_, err = server.svc.DynamicServerManager.CreateServer(context.Background(), endpoint.Path)
248+
if err != nil {
249+
return errors.Wrap(err, "create dynamic http server")
255250
}
256251
}
257252

258-
logger.WithContext(ctx).Infof("serving dynamic http server on %s", server.cfg.Get().DynamicCSIEndpoint)
259-
260-
httpServer, err := NewHTTPServer(server.cfg, server.svc)
261-
if err != nil {
262-
return errors.Wrap(err, "create dynamic http server")
263-
}
264-
265-
return httpServer.Serve()
253+
return nil
266254
}))
267255
}
268256
}

pkg/server/server_test.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,22 @@ package server
33
import (
44
"context"
55
"fmt"
6-
"net/http"
7-
"net/http/httptest"
8-
"net/url"
96
"os"
107
"os/exec"
118
"path/filepath"
129
"strings"
13-
"syscall"
1410
"testing"
1511
"time"
1612

17-
"github.com/labstack/echo/v4"
1813
"github.com/modelpack/model-csi-driver/pkg/client"
1914
"github.com/modelpack/model-csi-driver/pkg/config"
2015
"github.com/modelpack/model-csi-driver/pkg/service"
2116
"github.com/modelpack/model-csi-driver/pkg/status"
2217
modelspec "github.com/modelpack/model-spec/specs-go/v1"
2318
"github.com/opencontainers/go-digest"
2419
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
25-
"github.com/pkg/errors"
2620
"github.com/stretchr/testify/require"
2721
"golang.org/x/sync/errgroup"
28-
"google.golang.org/grpc/codes"
29-
grpcStatus "google.golang.org/grpc/status"
3022
)
3123

3224
const (
@@ -593,16 +585,3 @@ func TestServer(t *testing.T) {
593585

594586
run(t, "curl http://127.0.0.1:5244/metrics | grep -v '# '")
595587
}
596-
597-
func TestHandleError(t *testing.T) {
598-
echo := echo.New()
599-
recorder := httptest.NewRecorder()
600-
echoCtx := echo.NewContext(&http.Request{URL: &url.URL{RawQuery: ""}}, recorder)
601-
err := handleError(echoCtx, grpcStatus.Error(codes.ResourceExhausted, errors.Wrap(errors.Wrapf(errors.Wrapf(syscall.ENOSPC, "model image is , but only of disk quota is available"), "pull model failed"), "pull model for dynamic volume").Error()))
602-
require.NoError(t, err)
603-
require.Equal(t, http.StatusNotAcceptable, echoCtx.Response().Status)
604-
605-
err = handleError(echoCtx, grpcStatus.Error(codes.FailedPrecondition, "error test"))
606-
require.NoError(t, err)
607-
require.Equal(t, http.StatusInternalServerError, echoCtx.Response().Status)
608-
}

0 commit comments

Comments
 (0)