diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 9296c99b71d..8cc28e090da 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -10,9 +10,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/kafka/auth" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/kafka" @@ -64,15 +64,15 @@ func TestTLSFlags(t *testing.T) { }, { flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: configtls.ClientConfig{}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{}, PlainText: plain}, }, } diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index 00d9f28e58d..39bb8c971a3 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -68,9 +68,6 @@ func main() { consumer.Start() svc.RunAndThen(func() { - if err := options.TLS.Close(); err != nil { - logger.Error("Failed to close TLS certificates watcher", zap.Error(err)) - } if err = consumer.Close(); err != nil { logger.Error("Failed to close consumer", zap.Error(err)) } diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index 955a347777c..74287767991 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -9,6 +9,7 @@ import ( "github.com/Shopify/sarama" "github.com/spf13/viper" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" @@ -30,10 +31,10 @@ var authTypes = []string{ // AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster type AuthenticationConfig struct { - Authentication string `mapstructure:"type"` - Kerberos KerberosConfig `mapstructure:"kerberos"` - TLS tlscfg.Options `mapstructure:"tls"` - PlainText PlainTextConfig `mapstructure:"plaintext"` + Authentication string `mapstructure:"type"` + Kerberos KerberosConfig `mapstructure:"kerberos"` + TLS configtls.ClientConfig `mapstructure:"tls"` + PlainText PlainTextConfig `mapstructure:"plaintext"` } // SetConfiguration set configure authentication into sarama config structure @@ -42,7 +43,7 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config if strings.Trim(authentication, " ") == "" { authentication = none } - if config.Authentication == tls || config.TLS.Enabled { + if config.Authentication == tls { err := setTLSConfiguration(&config.TLS, saramaConfig, logger) if err != nil { return err @@ -79,15 +80,15 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. Prefix: configPrefix, } - var err error - config.TLS, err = tlsClientConfig.InitFromViper(v) + opts, err := tlsClientConfig.InitFromViper(v) if err != nil { return fmt.Errorf("failed to process Kafka TLS options: %w", err) } if config.Authentication == tls { - config.TLS.Enabled = true + opts.Enabled = true + config.TLS = opts.ToOtelClientConfig() } - + fmt.Printf("%v \n", opts.Enabled) config.PlainText.Username = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUsername) config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword) config.PlainText.Mechanism = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextMechanism) diff --git a/pkg/kafka/auth/config_test.go b/pkg/kafka/auth/config_test.go index a32b0077c02..f5354c67d2f 100644 --- a/pkg/kafka/auth/config_test.go +++ b/pkg/kafka/auth/config_test.go @@ -11,11 +11,11 @@ import ( "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) func addFlags(flags *flag.FlagSet) { @@ -64,9 +64,7 @@ func Test_InitFromViper(t *testing.T) { KeyTabPath: "/path/to/keytab", DisablePAFXFast: true, }, - TLS: tlscfg.Options{ - Enabled: true, - }, + TLS: configtls.ClientConfig{}, PlainText: PlainTextConfig{ Username: "user", Password: "password", @@ -139,7 +137,7 @@ func TestSetConfiguration(t *testing.T) { { name: "TLS authentication with invalid cipher suite", authType: "tls", - expectedError: "error loading tls config: failed to get cipher suite ids from cipher suite names: cipher suite fail not supported or doesn't exist", + expectedError: "error loading tls config: failed to load TLS config: invalid TLS cipher suite: \"fail\"", }, } @@ -149,7 +147,6 @@ func TestSetConfiguration(t *testing.T) { "--kafka.auth.authentication=" + tt.authType, }) authConfig := &AuthenticationConfig{} - defer authConfig.TLS.Close() err := authConfig.InitFromViper(configPrefix, v) require.NoError(t, err) diff --git a/pkg/kafka/auth/tls.go b/pkg/kafka/auth/tls.go index db905b7fc21..2e6bb45515f 100644 --- a/pkg/kafka/auth/tls.go +++ b/pkg/kafka/auth/tls.go @@ -4,17 +4,17 @@ package auth import ( + "context" "fmt" "github.com/Shopify/sarama" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) -func setTLSConfiguration(config *tlscfg.Options, saramaConfig *sarama.Config, logger *zap.Logger) error { - if config.Enabled { - tlsConfig, err := config.Config(logger) +func setTLSConfiguration(config *configtls.ClientConfig, saramaConfig *sarama.Config, _ *zap.Logger) error { + if !config.Insecure { + tlsConfig, err := config.LoadTLSConfig(context.Background()) if err != nil { return fmt.Errorf("error loading tls config: %w", err) } diff --git a/pkg/kafka/auth/tls_test.go b/pkg/kafka/auth/tls_test.go index d8dadf6cd64..b817b0b82dd 100644 --- a/pkg/kafka/auth/tls_test.go +++ b/pkg/kafka/auth/tls_test.go @@ -9,20 +9,16 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap/zaptest" - - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) func TestSetTLSConfiguration(t *testing.T) { logger := zaptest.NewLogger(t) saramaConfig := sarama.NewConfig() - tlsConfig := &tlscfg.Options{ - Enabled: true, - } + tlsConfig := &configtls.ClientConfig{} err := setTLSConfiguration(tlsConfig, saramaConfig, logger) require.NoError(t, err) assert.True(t, saramaConfig.Net.TLS.Enable) assert.NotNil(t, saramaConfig.Net.TLS.Config) - defer tlsConfig.Close() } diff --git a/plugin/storage/kafka/factory.go b/plugin/storage/kafka/factory.go index d1b59194073..1e089eeab35 100644 --- a/plugin/storage/kafka/factory.go +++ b/plugin/storage/kafka/factory.go @@ -105,6 +105,5 @@ func (f *Factory) Close() error { if f.producer != nil { errs = append(errs, f.producer.Close()) } - errs = append(errs, f.options.Config.TLS.Close()) return errors.Join(errs...) } diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index a2b10405e3b..cb41caac0a3 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -11,9 +11,9 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) @@ -181,15 +181,15 @@ func TestTLSFlags(t *testing.T) { }, { flags: []string{"--kafka.producer.authentication=kerberos", "--kafka.producer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: configtls.ClientConfig{}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls", "--kafka.producer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: configtls.ClientConfig{}, PlainText: plain}, }, }