Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,59 @@ endif
ifneq (${ARGOCD_AGENT_IN_CLUSTER},)
./hack/dev-env/restart-all.sh
endif
@echo ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this code into its own shell script, for example ./hack/dev-env/setup-e2e-enable-redis-tls.sh (to make up a random name)

@echo "Waiting for LoadBalancer IPs to be assigned..."
@for context in vcluster-control-plane vcluster-agent-managed vcluster-agent-autonomous; do \
echo " Checking LoadBalancer in $$context..."; \
FOUND=""; \
for i in 1 2 3 4 5 6 7 8 9 10; do \
LB_IP=$$(kubectl get svc argocd-redis --context=$$context -n argocd -o jsonpath='{.status.loadBalancer.ingress[0].ip}' 2>/dev/null || echo ""); \
LB_HOST=$$(kubectl get svc argocd-redis --context=$$context -n argocd -o jsonpath='{.status.loadBalancer.ingress[0].hostname}' 2>/dev/null || echo ""); \
if [ -n "$$LB_IP" ] || [ -n "$$LB_HOST" ]; then \
FOUND="yes"; \
if [ -n "$$LB_IP" ]; then \
echo " ✓ LoadBalancer IP assigned: $$LB_IP"; \
else \
echo " ✓ LoadBalancer hostname assigned: $$LB_HOST"; \
fi; \
break; \
fi; \
echo " Waiting for LoadBalancer... (attempt $$i/10)"; \
sleep 5; \
done; \
if [ -z "$$FOUND" ]; then \
echo " ✗ ERROR: LoadBalancer not assigned for $$context after 10 attempts (50 seconds)"; \
echo ""; \
echo "This usually means:"; \
echo " 1. MetalLB is not installed or not configured on your cluster"; \
echo " 2. Your cluster doesn't support LoadBalancer services"; \
echo " 3. The cluster is slow to assign LoadBalancer IPs"; \
echo ""; \
echo "For local development, see: hack/dev-env/README.md"; \
exit 1; \
fi; \
done
@echo ""
@echo "Configuring Redis TLS (required for E2E)..."
./hack/dev-env/gen-redis-tls-certs.sh
@echo ""
@echo "Configuring each cluster for Redis TLS (Redis + ArgoCD components together)"
@echo "Note: Redis and ArgoCD components are configured together per-cluster to avoid"
@echo " connection errors during the transition period."
@echo ""
@echo "=== Control Plane ==="
./hack/dev-env/configure-redis-tls.sh vcluster-control-plane
./hack/dev-env/configure-argocd-redis-tls.sh vcluster-control-plane
@echo ""
@echo "=== Agent Managed ==="
./hack/dev-env/configure-redis-tls.sh vcluster-agent-managed
./hack/dev-env/configure-argocd-redis-tls.sh vcluster-agent-managed
@echo ""
@echo "=== Agent Autonomous ==="
./hack/dev-env/configure-redis-tls.sh vcluster-agent-autonomous
./hack/dev-env/configure-argocd-redis-tls.sh vcluster-agent-autonomous
@echo ""
@echo " E2E environment ready with Redis TLS enabled (required)"

.PHONY: teardown-e2e
teardown-e2e:
Expand Down
64 changes: 46 additions & 18 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package agent

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -139,10 +142,11 @@ type AgentOption func(*Agent) error
// options.
func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace string, opts ...AgentOption) (*Agent, error) {
a := &Agent{
version: version.New("argocd-agent"),
deletions: manager.NewDeletionTracker(),
sourceCache: cache.NewSourceCache(),
inflightLogs: make(map[string]struct{}),
version: version.New("argocd-agent"),
deletions: manager.NewDeletionTracker(),
sourceCache: cache.NewSourceCache(),
inflightLogs: make(map[string]struct{}),
cacheRefreshInterval: 30 * time.Second, // Default interval, can be overridden via WithCacheRefreshInterval
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than setting a default, let's just return an error if WithCacheRefreshInterval didn't set the vaue.

}
a.infStopCh = make(chan struct{})
a.namespace = namespace
Expand Down Expand Up @@ -321,7 +325,29 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
connMap: map[string]connectionEntry{},
}

clusterCache, err := cluster.NewClusterCacheInstance(a.redisProxyMsgHandler.redisAddress, a.redisProxyMsgHandler.redisPassword, cacheutil.RedisCompressionGZip)
// Create TLS config for cluster cache Redis client (same as for Redis proxy)
var clusterCacheTLSConfig *tls.Config = nil
if a.redisProxyMsgHandler.redisTLSEnabled {
clusterCacheTLSConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
}
if a.redisProxyMsgHandler.redisTLSInsecure {
log().Warn("INSECURE: Not verifying Redis TLS certificate for cluster cache")
clusterCacheTLSConfig.InsecureSkipVerify = true
} else if a.redisProxyMsgHandler.redisTLSCAPath != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return an error if a.redisProxyMsgHandler.redisTLSCAPat == "" or is this still a valid configuration?

caCertPEM, err := os.ReadFile(a.redisProxyMsgHandler.redisTLSCAPath)
if err != nil {
return nil, fmt.Errorf("failed to read CA certificate for cluster cache: %w", err)
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCertPEM) {
return nil, fmt.Errorf("failed to parse CA certificate for cluster cache from %s", a.redisProxyMsgHandler.redisTLSCAPath)
}
clusterCacheTLSConfig.RootCAs = certPool
}
}

clusterCache, err := cluster.NewClusterCacheInstance(a.redisProxyMsgHandler.redisAddress, a.redisProxyMsgHandler.redisPassword, cacheutil.RedisCompressionGZip, clusterCacheTLSConfig)
if err != nil {
return nil, fmt.Errorf("failed to create cluster cache instance: %v", err)
}
Expand Down Expand Up @@ -421,20 +447,22 @@ func (a *Agent) Start(ctx context.Context) error {

// Start the background process of periodic sync of cluster cache info.
// This will send periodic updates of Application, Resource and API counts to principal.
if a.mode == types.AgentModeManaged {
go func() {
ticker := time.NewTicker(a.cacheRefreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
a.addClusterCacheInfoUpdateToQueue()
case <-a.context.Done():
return
}
// Both managed and autonomous agents need to send cluster cache info updates
go func() {
// Send initial update immediately on startup (don't wait for first ticker)
a.addClusterCacheInfoUpdateToQueue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change required for/related to this PR?


ticker := time.NewTicker(a.cacheRefreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
a.addClusterCacheInfoUpdateToQueue()
case <-a.context.Done():
return
}
}()
}
}
}()

if a.remote != nil {
a.remote.SetClientMode(a.mode)
Expand Down
36 changes: 36 additions & 0 deletions agent/inbound_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package agent
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"
Expand All @@ -45,6 +47,11 @@ type redisProxyMsgHandler struct {

// connections maintains statistics about redis connections from principal
connections *connectionEntries

// Redis TLS configuration
redisTLSEnabled bool
redisTLSCAPath string
redisTLSInsecure bool
}

// connectionEntries maintains statistics about redis connections from principal
Expand Down Expand Up @@ -335,6 +342,35 @@ func stripNamespaceFromRedisKey(key string, logCtx *logrus.Entry) (string, error
func (a *Agent) getRedisClientAndCache() (*redis.Client, *rediscache.Cache, error) {
var tlsConfig *tls.Config = nil

if a.redisProxyMsgHandler.redisTLSEnabled {
tlsConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
}

if a.redisProxyMsgHandler.redisTLSInsecure {
log().Warn("INSECURE: Not verifying Redis TLS certificate")
tlsConfig.InsecureSkipVerify = true
} else if a.redisProxyMsgHandler.redisTLSCAPath != "" {
// Load CA certificate from file
caCertPEM, err := os.ReadFile(a.redisProxyMsgHandler.redisTLSCAPath)
if err != nil {
return nil, nil, fmt.Errorf("failed to read CA certificate: %w", err)
}

// Create a new cert pool and add the CA cert
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCertPEM) {
return nil, nil, fmt.Errorf("failed to parse CA certificate from %s", a.redisProxyMsgHandler.redisTLSCAPath)
}

tlsConfig.RootCAs = certPool
log().Debugf("Using CA certificate from %s for Redis TLS", a.redisProxyMsgHandler.redisTLSCAPath)
} else {
// No CA specified, will use system CAs
log().Warn("Redis TLS enabled but no CA certificate specified, using system CAs. This may fail with self-signed certificates.")
}
}

opts := &redis.Options{
Addr: a.redisProxyMsgHandler.redisAddress,
Password: a.redisProxyMsgHandler.redisPassword,
Expand Down
24 changes: 24 additions & 0 deletions agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,27 @@ func WithCacheRefreshInterval(interval time.Duration) AgentOption {
return nil
}
}

// WithRedisTLSEnabled enables or disables TLS for Redis connections
func WithRedisTLSEnabled(enabled bool) AgentOption {
return func(o *Agent) error {
o.redisProxyMsgHandler.redisTLSEnabled = enabled
return nil
}
}

// WithRedisTLSCAPath sets the CA certificate path for Redis TLS
func WithRedisTLSCAPath(caPath string) AgentOption {
return func(o *Agent) error {
o.redisProxyMsgHandler.redisTLSCAPath = caPath
return nil
}
}

// WithRedisTLSInsecure enables insecure Redis TLS (for testing only)
func WithRedisTLSInsecure(insecure bool) AgentOption {
return func(o *Agent) error {
o.redisProxyMsgHandler.redisTLSInsecure = insecure
return nil
}
}
2 changes: 1 addition & 1 deletion agent/outbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func Test_addClusterCacheInfoUpdateToQueue(t *testing.T) {
a.emitter = event.NewEventSource("principal")

// First populate the cache with dummy data
clusterMgr, err := cluster.NewManager(a.context, a.namespace, miniRedis.Addr(), "", cacheutil.RedisCompressionGZip, a.kubeClient.Clientset)
clusterMgr, err := cluster.NewManager(a.context, a.namespace, miniRedis.Addr(), "", cacheutil.RedisCompressionGZip, a.kubeClient.Clientset, nil)
require.NoError(t, err)
err = clusterMgr.MapCluster("test-agent", &v1alpha1.Cluster{
Name: "test-cluster",
Expand Down
33 changes: 33 additions & 0 deletions cmd/argocd-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func NewAgentRunCommand() *cobra.Command {

// Time interval for agent to refresh cluster cache info in principal
cacheRefreshInterval time.Duration

// Redis TLS configuration
redisTLSEnabled bool
redisTLSCAPath string
redisTLSInsecure bool
)
command := &cobra.Command{
Use: "agent",
Expand Down Expand Up @@ -176,6 +181,23 @@ func NewAgentRunCommand() *cobra.Command {
agentOpts = append(agentOpts, agent.WithRedisUsername(redisUsername))
agentOpts = append(agentOpts, agent.WithRedisPassword(redisPassword))

// Configure Redis TLS
agentOpts = append(agentOpts, agent.WithRedisTLSEnabled(redisTLSEnabled))
if redisTLSEnabled {
// Validate Redis TLS configuration - only one mode allowed
if redisTLSInsecure && redisTLSCAPath != "" {
cmdutil.Fatal("Only one Redis TLS mode can be specified: --redis-tls-insecure or --redis-tls-ca-path")
}

if redisTLSInsecure {
logrus.Warn("INSECURE: Not verifying Redis TLS certificate")
agentOpts = append(agentOpts, agent.WithRedisTLSInsecure(true))
} else if redisTLSCAPath != "" {
logrus.Infof("Loading Redis CA certificate from file %s", redisTLSCAPath)
agentOpts = append(agentOpts, agent.WithRedisTLSCAPath(redisTLSCAPath))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if redisTLSCAPath == "" is this still a valid configuration, or should we just return an error?

}

agentOpts = append(agentOpts, agent.WithEnableResourceProxy(enableResourceProxy))
agentOpts = append(agentOpts, agent.WithCacheRefreshInterval(cacheRefreshInterval))

Expand Down Expand Up @@ -216,6 +238,17 @@ func NewAgentRunCommand() *cobra.Command {
env.StringWithDefault("REDIS_PASSWORD", nil, ""),
"The password to connect to redis with")

// Redis TLS flags
command.Flags().BoolVar(&redisTLSEnabled, "redis-tls-enabled",
env.BoolWithDefault("ARGOCD_AGENT_REDIS_TLS_ENABLED", true),
"Enable TLS for Redis connections (enabled by default for security)")
command.Flags().StringVar(&redisTLSCAPath, "redis-tls-ca-path",
env.StringWithDefault("ARGOCD_AGENT_REDIS_TLS_CA_PATH", nil, ""),
"Path to CA certificate for Redis TLS")
command.Flags().BoolVar(&redisTLSInsecure, "redis-tls-insecure",
env.BoolWithDefault("ARGOCD_AGENT_REDIS_TLS_INSECURE", false),
"INSECURE: Do not verify Redis TLS certificate")

command.Flags().StringVar(&logFormat, "log-format",
env.StringWithDefault("ARGOCD_PRINCIPAL_LOG_FORMAT", nil, "text"),
"The log format to use (one of: text, json)")
Expand Down
Loading
Loading