Skip to content

Commit

Permalink
Support MinIO TLS connection
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>
Co-authored-by: Chen Rao <[email protected]>
  • Loading branch information
yhmo and chenraoCR committed Mar 19, 2024
1 parent ef530a2 commit 3a16dd2
Show file tree
Hide file tree
Showing 29 changed files with 103 additions and 8 deletions.
4 changes: 3 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ minio:
port: 9000 # Port of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
secretAccessKey: minioadmin # MinIO/S3 encryption string
useSSL: false # Access to MinIO/S3 with SSL
ssl:
enabled: false # Access to MinIO/S3 with SSL
tlsCACert: /path/to/public.crt # path to your CACert file, ignore when it is empty
bucketName: a-bucket # Bucket name in MinIO/S3
rootPath: files # The root path where the message is stored in MinIO/S3
# Whether to useIAM role to access S3/GCS instead of access/secret keys
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/common/type_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ typedef struct CStorageConfig {
const char* log_level;
const char* region;
bool useSSL;
const char* sslCACert;
bool useIAM;
bool useVirtualHost;
int64_t requestTimeoutMs;
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/indexbuilder/index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ NewBuildIndexInfo(CBuildIndexInfo* c_build_index_info,
storage_config.iam_endpoint =
std::string(c_storage_config.iam_endpoint);
storage_config.useSSL = c_storage_config.useSSL;
storage_config.sslCACert = c_storage_config.sslCACert;
storage_config.useIAM = c_storage_config.useIAM;
storage_config.region = c_storage_config.region;
storage_config.useVirtualHost = c_storage_config.useVirtualHost;
Expand Down
7 changes: 5 additions & 2 deletions internal/core/src/storage/ChunkManagers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ generateConfig(const StorageConfig& storage_config) {

if (storage_config.useSSL) {
config.scheme = Aws::Http::Scheme::HTTPS;
config.verifySSL = true;
} else {
config.scheme = Aws::Http::Scheme::HTTP;
config.verifySSL = false;
}

if (!storage_config.sslCACert.empty()) {
config.caPath = ConvertToAwsString(storage_config.sslCACert);
}
config.verifySSL = false;

if (!storage_config.region.empty()) {
config.region = ConvertToAwsString(storage_config.region);
}
Expand Down
7 changes: 5 additions & 2 deletions internal/core/src/storage/MinioChunkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,15 @@ MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config)

if (storage_config.useSSL) {
config.scheme = Aws::Http::Scheme::HTTPS;
config.verifySSL = true;
} else {
config.scheme = Aws::Http::Scheme::HTTP;
config.verifySSL = false;
}

if (!storage_config.sslCACert.empty()) {
config.caPath = ConvertToAwsString(storage_config.sslCACert);
}
config.verifySSL = false;

config.requestTimeoutMs = storage_config.requestTimeoutMs == 0
? DEFAULT_CHUNK_MANAGER_REQUEST_TIMEOUT_MS
: storage_config.requestTimeoutMs;
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/storage/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ struct StorageConfig {
std::string log_level = "warn";
std::string region = "";
bool useSSL = false;
std::string sslCACert = "";
bool useIAM = false;
bool useVirtualHost = false;
int64_t requestTimeoutMs = 3000;
Expand All @@ -105,6 +106,7 @@ struct StorageConfig {
<< ", cloud_provider=" << cloud_provider
<< ", iam_endpoint=" << iam_endpoint << ", log_level=" << log_level
<< ", region=" << region << ", useSSL=" << std::boolalpha << useSSL
<< ", sslCACert=" << sslCACert.size() // only print cert length
<< ", useIAM=" << std::boolalpha << useIAM
<< ", useVirtualHost=" << std::boolalpha << useVirtualHost
<< ", requestTimeoutMs=" << requestTimeoutMs << "]";
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/storage/storage_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config) {
std::string(c_storage_config.iam_endpoint);
storage_config.log_level = std::string(c_storage_config.log_level);
storage_config.useSSL = c_storage_config.useSSL;
storage_config.sslCACert = std::string(c_storage_config.sslCACert);
storage_config.useIAM = c_storage_config.useIAM;
storage_config.useVirtualHost = c_storage_config.useVirtualHost;
storage_config.region = c_storage_config.region;
Expand Down
2 changes: 2 additions & 0 deletions internal/core/unittest/test_azure_chunk_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ get_default_storage_config(bool useIam) {
"K1SZFPTOtr/KBHBeksoGMGw==";
auto rootPath = "files";
auto useSSL = false;
auto sslCACert = "";
auto iamEndPoint = "";
auto bucketName = "a-bucket";

Expand All @@ -44,6 +45,7 @@ get_default_storage_config(bool useIam) {
"error",
"",
useSSL,
sslCACert,
useIam};
}

Expand Down
4 changes: 3 additions & 1 deletion internal/core/unittest/test_minio_chunk_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class MinioChunkManagerTest : public testing::Test {
// auto accessKey = "";
// auto accessValue = "";
// auto rootPath = "files";
// auto useSSL = true;
// auto useSSL = false;
// auto sslCACert = "";
// auto useIam = true;
// auto iamEndPoint = "";
// auto bucketName = "vdc-infra-poc";
Expand All @@ -63,6 +64,7 @@ class MinioChunkManagerTest : public testing::Test {
// logLevel,
// region,
// useSSL,
// sslCACert,
// useIam};
//}

Expand Down
1 change: 1 addition & 0 deletions internal/core/unittest/test_remote_chunk_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ get_default_remote_storage_config() {
storage_config.storage_type = "remote";
storage_config.cloud_provider = "";
storage_config.useSSL = false;
storage_config.sslCACert = "";
storage_config.useIAM = false;
return storage_config;
}
Expand Down
1 change: 1 addition & 0 deletions internal/core/unittest/test_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ get_azure_storage_config() {
"error",
"",
false,
"",
false,
false,
30000};
Expand Down
9 changes: 9 additions & 0 deletions internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"bytes"
"context"
"os"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -246,6 +247,14 @@ func Test_garbageCollector_scan(t *testing.T) {
// initialize unit test sso env
func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, inserts []string, stats []string, delta []string, other []string, err error) {
paramtable.Init()

if Params.MinioCfg.UseSSL.GetAsBool() && len(Params.MinioCfg.SslCACert.GetValue()) > 0 {
err := os.Setenv("SSL_CERT_FILE", Params.MinioCfg.SslCACert.GetValue())
if err != nil {
return nil, nil, nil, nil, nil, err
}
}

cli, err := minio.New(Params.MinioCfg.Address.GetValue(), &minio.Options{
Creds: credentials.NewStaticV4(Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), ""),
Secure: Params.MinioCfg.UseSSL.GetAsBool(),
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/index_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(),
SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(),
UseSSL: Params.MinioCfg.UseSSL.GetAsBool(),
SslCACert: Params.MinioCfg.SslCACert.GetValue(),
BucketName: Params.MinioCfg.BucketName.GetValue(),
RootPath: Params.MinioCfg.RootPath.GetValue(),
UseIAM: Params.MinioCfg.UseIAM.GetAsBool(),
Expand Down
1 change: 1 addition & 0 deletions internal/indexnode/chunk_mgr_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (m *chunkMgrFactory) NewChunkManager(ctx context.Context, config *indexpb.S
storage.AccessKeyID(config.GetAccessKeyID()),
storage.SecretAccessKeyID(config.GetSecretAccessKey()),
storage.UseSSL(config.GetUseSSL()),
storage.SslCACert(config.GetSslCACert()),
storage.BucketName(config.GetBucketName()),
storage.UseIAM(config.GetUseIAM()),
storage.CloudProvider(config.GetCloudProvider()),
Expand Down
1 change: 1 addition & 0 deletions internal/proto/index_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ message StorageConfig {
string region = 11;
string cloud_provider = 12;
int64 request_timeout_ms = 13;
string sslCACert = 14;
}

message CreateJobRequest {
Expand Down
11 changes: 11 additions & 0 deletions internal/proxy/accesslog/minio_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package accesslog
import (
"context"
"fmt"
"os"
"path"
"strings"
"sync"
Expand All @@ -39,6 +40,7 @@ type config struct {
accessKeyID string
secretAccessKeyID string
useSSL bool
sslCACert string
createBucket bool
useIAM bool
iamEndpoint string
Expand Down Expand Up @@ -78,6 +80,7 @@ func NewMinioHandler(ctx context.Context, cfg *paramtable.MinioConfig, rootPath
accessKeyID: cfg.AccessKeyID.GetValue(),
secretAccessKeyID: cfg.SecretAccessKey.GetValue(),
useSSL: cfg.UseSSL.GetAsBool(),
sslCACert: cfg.SslCACert.GetValue(),
createBucket: true,
useIAM: cfg.UseIAM.GetAsBool(),
iamEndpoint: cfg.IAMEndpoint.GetValue(),
Expand All @@ -104,6 +107,14 @@ func newMinioClient(ctx context.Context, cfg config) (*minio.Client, error) {
} else {
creds = credentials.NewStaticV4(cfg.accessKeyID, cfg.secretAccessKeyID, "")
}

if cfg.useSSL && len(cfg.sslCACert) > 0 {
err := os.Setenv("SSL_CERT_FILE", cfg.sslCACert)
if err != nil {
return nil, err
}
}

minioClient, err := minio.New(cfg.address, &minio.Options{
Creds: creds,
Secure: cfg.useSSL,
Expand Down
2 changes: 2 additions & 0 deletions internal/proxy/accesslog/minio_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func TestMinioHandler_ConnectError(t *testing.T) {
params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true)))
params.Save(params.MinioCfg.UseIAM.Key, "true")
params.Save(params.MinioCfg.Address.Key, "")
params.Save(params.MinioCfg.UseSSL.Key, "true")
params.Save(params.MinioCfg.SslCACert.Key, "/tmp/dummy.crt")

_, err := NewMinioHandler(
context.Background(),
Expand Down
2 changes: 2 additions & 0 deletions internal/querynodev2/segments/mock_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ func NewTestChunkManagerFactory(params *paramtable.ComponentParam, rootPath stri
storage.AccessKeyID(params.MinioCfg.AccessKeyID.GetValue()),
storage.SecretAccessKeyID(params.MinioCfg.SecretAccessKey.GetValue()),
storage.UseSSL(params.MinioCfg.UseSSL.GetAsBool()),
storage.SslCACert(params.MinioCfg.SslCACert.GetValue()),
storage.BucketName(params.MinioCfg.BucketName.GetValue()),
storage.UseIAM(params.MinioCfg.UseIAM.GetAsBool()),
storage.CloudProvider(params.MinioCfg.CloudProvider.GetValue()),
Expand Down Expand Up @@ -994,6 +995,7 @@ func genStorageConfig() *indexpb.StorageConfig {
RootPath: paramtable.Get().MinioCfg.RootPath.GetValue(),
IAMEndpoint: paramtable.Get().MinioCfg.IAMEndpoint.GetValue(),
UseSSL: paramtable.Get().MinioCfg.UseSSL.GetAsBool(),
SslCACert: paramtable.Get().MinioCfg.SslCACert.GetValue(),
UseIAM: paramtable.Get().MinioCfg.UseIAM.GetAsBool(),
StorageType: paramtable.Get().CommonCfg.StorageType.GetValue(),
}
Expand Down
1 change: 1 addition & 0 deletions internal/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func NewChunkManagerFactoryWithParam(params *paramtable.ComponentParam) *ChunkMa
AccessKeyID(params.MinioCfg.AccessKeyID.GetValue()),
SecretAccessKeyID(params.MinioCfg.SecretAccessKey.GetValue()),
UseSSL(params.MinioCfg.UseSSL.GetAsBool()),
SslCACert(params.MinioCfg.SslCACert.GetValue()),
BucketName(params.MinioCfg.BucketName.GetValue()),
UseIAM(params.MinioCfg.UseIAM.GetAsBool()),
CloudProvider(params.MinioCfg.CloudProvider.GetValue()),
Expand Down
4 changes: 4 additions & 0 deletions internal/storage/minio_chunk_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ func newMinIOChunkManager(ctx context.Context, bucketName string, rootPath strin
accessKeyID := Params.MinioCfg.AccessKeyID.GetValue()
secretAccessKey := Params.MinioCfg.SecretAccessKey.GetValue()
useSSL := Params.MinioCfg.UseSSL.GetAsBool()
sslCACert := Params.MinioCfg.SslCACert.GetValue()
client, err := NewMinioChunkManager(ctx,
RootPath(rootPath),
Address(endPoint),
AccessKeyID(accessKeyID),
SecretAccessKeyID(secretAccessKey),
UseSSL(useSSL),
SslCACert(sslCACert),
BucketName(bucketName),
UseIAM(false),
CloudProvider("aws"),
Expand All @@ -66,11 +68,13 @@ func TestMinIOCMFail(t *testing.T) {
accessKeyID := Params.MinioCfg.AccessKeyID.GetValue()
secretAccessKey := Params.MinioCfg.SecretAccessKey.GetValue()
useSSL := Params.MinioCfg.UseSSL.GetAsBool()
sslCACert := Params.MinioCfg.SslCACert.GetValue()
client, err := NewMinioChunkManager(ctx,
Address("9.9.9.9:invalid"),
AccessKeyID(accessKeyID),
SecretAccessKeyID(secretAccessKey),
UseSSL(useSSL),
SslCACert(sslCACert),
BucketName("test"),
CreateBucket(true),
)
Expand Down
9 changes: 9 additions & 0 deletions internal/storage/minio_object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io"
"os"
"strings"
"time"

Expand Down Expand Up @@ -105,6 +106,14 @@ func newMinioClient(ctx context.Context, c *config) (*minio.Client, error) {
creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "")
}
}

if c.useSSL && len(c.sslCACert) > 0 {
err := os.Setenv("SSL_CERT_FILE", c.sslCACert)
if err != nil {
return nil, err
}
}

minioOpts := &minio.Options{
BucketLookup: bucketLookupType,
Creds: creds,
Expand Down
9 changes: 9 additions & 0 deletions internal/storage/minio_object_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ func TestMinioObjectStorage(t *testing.T) {
config.useIAM = false
})

t.Run("test ssl", func(t *testing.T) {
var err error
config.useSSL = true
config.sslCACert = "/tmp/dummy.crt"
_, err = newMinioObjectStorageWithConfig(ctx, &config)
assert.Error(t, err)
config.useSSL = false
})

t.Run("test cloud provider", func(t *testing.T) {
var err error
cloudProvider := config.cloudProvider
Expand Down
7 changes: 7 additions & 0 deletions internal/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type config struct {
accessKeyID string
secretAccessKeyID string
useSSL bool
sslCACert string
createBucket bool
rootPath string
useIAM bool
Expand Down Expand Up @@ -54,6 +55,12 @@ func UseSSL(useSSL bool) Option {
}
}

func SslCACert(sslCACert string) Option {
return func(c *config) {
c.sslCACert = sslCACert
}
}

func CreateBucket(createBucket bool) Option {
return func(c *config) {
c.createBucket = createBucket
Expand Down
1 change: 1 addition & 0 deletions internal/storage/remote_chunk_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newRemoteChunkManager(ctx context.Context, cloudProvider string, bucketName
AccessKeyID(Params.MinioCfg.AccessKeyID.GetValue()),
SecretAccessKeyID(Params.MinioCfg.SecretAccessKey.GetValue()),
UseSSL(Params.MinioCfg.UseSSL.GetAsBool()),
SslCACert(Params.MinioCfg.SslCACert.GetValue()),
BucketName(bucketName),
UseIAM(Params.MinioCfg.UseIAM.GetAsBool()),
CloudProvider(cloudProvider),
Expand Down
3 changes: 3 additions & 0 deletions internal/util/indexcgowrapper/build_index_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewBuildIndexInfo(config *indexpb.StorageConfig) (*BuildIndexInfo, error) {
cCloudProvider := C.CString(config.CloudProvider)
cIamEndPoint := C.CString(config.IAMEndpoint)
cRegion := C.CString(config.Region)
cSslCACert := C.CString(config.SslCACert)
defer C.free(unsafe.Pointer(cAddress))
defer C.free(unsafe.Pointer(cBucketName))
defer C.free(unsafe.Pointer(cAccessKey))
Expand All @@ -60,6 +61,7 @@ func NewBuildIndexInfo(config *indexpb.StorageConfig) (*BuildIndexInfo, error) {
defer C.free(unsafe.Pointer(cCloudProvider))
defer C.free(unsafe.Pointer(cIamEndPoint))
defer C.free(unsafe.Pointer(cRegion))
defer C.free(unsafe.Pointer(cSslCACert))
storageConfig := C.CStorageConfig{
address: cAddress,
bucket_name: cBucketName,
Expand All @@ -70,6 +72,7 @@ func NewBuildIndexInfo(config *indexpb.StorageConfig) (*BuildIndexInfo, error) {
cloud_provider: cCloudProvider,
iam_endpoint: cIamEndPoint,
useSSL: C.bool(config.UseSSL),
sslCACert: cSslCACert,
useIAM: C.bool(config.UseIAM),
region: cRegion,
useVirtualHost: C.bool(config.UseVirtualHost),
Expand Down
1 change: 1 addition & 0 deletions internal/util/indexcgowrapper/codec_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func genStorageConfig() *indexpb.StorageConfig {
RootPath: params.MinioCfg.RootPath.GetValue(),
IAMEndpoint: params.MinioCfg.IAMEndpoint.GetValue(),
UseSSL: params.MinioCfg.UseSSL.GetAsBool(),
SslCACert: params.MinioCfg.SslCACert.GetValue(),
UseIAM: params.MinioCfg.UseIAM.GetAsBool(),
}
}
Expand Down
Loading

0 comments on commit 3a16dd2

Please sign in to comment.