From 0ce23305002cf594773c6efda8334c6f2c4bf76e Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Wed, 23 Mar 2022 20:47:57 +0100 Subject: [PATCH] Kubernetes API Changes --- benchmarks/ClusterBenchmark/Configuration.cs | 3 +- .../KubernetesClusterMonitor.cs | 18 ++---------- .../KubernetesProvider.cs | 29 ++++++++++--------- .../KubernetesProviderConfig.cs | 11 +++++-- 4 files changed, 29 insertions(+), 32 deletions(-) diff --git a/benchmarks/ClusterBenchmark/Configuration.cs b/benchmarks/ClusterBenchmark/Configuration.cs index d01a83e29a..0710f7f18b 100644 --- a/benchmarks/ClusterBenchmark/Configuration.cs +++ b/benchmarks/ClusterBenchmark/Configuration.cs @@ -102,9 +102,8 @@ private static IClusterProvider ClusterProvider() { try { - var kubernetes = new Kubernetes(KubernetesClientConfiguration.InClusterConfig()); Console.WriteLine("Running with Kubernetes Provider"); - return new KubernetesProvider(kubernetes); + return new KubernetesProvider(); } catch { diff --git a/src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs b/src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs index 0db624a5d2..3a9de84a46 100644 --- a/src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs +++ b/src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs @@ -34,21 +34,11 @@ class KubernetesClusterMonitor : IActor private bool _watching; private readonly KubernetesProviderConfig _config; private DateTime _lastRestart; - private readonly Func _factory; - public KubernetesClusterMonitor(Cluster cluster, Func kubernetesFactory,KubernetesProviderConfig config) + public KubernetesClusterMonitor(Cluster cluster, KubernetesProviderConfig config) { _cluster = cluster; - _factory = kubernetesFactory; - _kubernetes = _factory(); - _config = config; - } - - [Obsolete("Use overload with kubernetesFactory argument instead",false)] - public KubernetesClusterMonitor(Cluster cluster, IKubernetes kubernetes,KubernetesProviderConfig config) - { - _cluster = cluster; - _kubernetes = kubernetes; + _kubernetes = config.ClientFactory(); _config = config; } @@ -142,14 +132,12 @@ void Restart() private void RecreateKubernetesClient() { - if (_factory == null) return; - DisposeWatcher(); DisposeWatcherTask(); DisposeKubernetesClient(); Logger.LogWarning("[Cluster][KubernetesProvider] Recreating Kubernetes client due to connectivity error"); - _kubernetes = _factory(); + _kubernetes = _config.ClientFactory(); } private void DisposeKubernetesClient() diff --git a/src/Proto.Cluster.Kubernetes/KubernetesProvider.cs b/src/Proto.Cluster.Kubernetes/KubernetesProvider.cs index 4d6680b000..4462fd6ef5 100644 --- a/src/Proto.Cluster.Kubernetes/KubernetesProvider.cs +++ b/src/Proto.Cluster.Kubernetes/KubernetesProvider.cs @@ -10,7 +10,6 @@ using k8s; using k8s.Models; using Microsoft.Extensions.Logging; -using Proto.Mailbox; using Proto.Utils; using static Proto.Cluster.Kubernetes.Messages; using static Proto.Cluster.Kubernetes.ProtoLabels; @@ -21,8 +20,6 @@ namespace Proto.Cluster.Kubernetes; public class KubernetesProvider : IClusterProvider { private static readonly ILogger Logger = Log.CreateLogger(); - - private readonly IKubernetes _kubernetes; private string _address; private Cluster _cluster; @@ -35,17 +32,21 @@ public class KubernetesProvider : IClusterProvider private int _port; private readonly KubernetesProviderConfig _config; - public KubernetesProvider(IKubernetes kubernetes) : this(kubernetes, new KubernetesProviderConfig()) + public KubernetesProvider() : this(new KubernetesProviderConfig()) { } - public KubernetesProvider(IKubernetes kubernetes, KubernetesProviderConfig config) + public KubernetesProvider(KubernetesProviderConfig config) { if (KubernetesExtensions.GetKubeNamespace() is null) throw new InvalidOperationException("The application doesn't seem to be running in Kubernetes"); _config = config; - _kubernetes = kubernetes; + } + + [Obsolete("Do not pass a Kubernetes client directly, pass Client factory as part of Config, or use Config defaults", true)] + public KubernetesProvider(IKubernetes kubernetes, KubernetesProviderConfig config) + { } public async Task StartMemberAsync(Cluster cluster) @@ -99,14 +100,15 @@ public async Task RegisterMemberAsync() public async Task RegisterMemberInner() { + var kubernetes = _config.ClientFactory(); Logger.LogInformation("[Cluster][KubernetesProvider] Registering service {PodName} on {PodIp}", _podName, _address); - var pod = await _kubernetes.ReadNamespacedPodAsync(_podName, KubernetesExtensions.GetKubeNamespace()); + var pod = await kubernetes.ReadNamespacedPodAsync(_podName, KubernetesExtensions.GetKubeNamespace()); if (pod is null) throw new ApplicationException($"Unable to get own pod information for {_podName}"); - Logger.LogInformation("[Cluster][KubernetesProvider] Using Kubernetes namespace: " + pod.Namespace()); + Logger.LogInformation("[Cluster][KubernetesProvider] Using Kubernetes namespace: {Namespace}", pod.Namespace()); - Logger.LogInformation("[Cluster][KubernetesProvider] Using Kubernetes port: " + _port); + Logger.LogInformation("[Cluster][KubernetesProvider] Using Kubernetes port: {Port}" , _port); var existingLabels = pod.Metadata.Labels; @@ -131,7 +133,7 @@ public async Task RegisterMemberInner() try { - await _kubernetes.ReplacePodLabels(_podName, KubernetesExtensions.GetKubeNamespace(), pod, labels); + await kubernetes.ReplacePodLabels(_podName, KubernetesExtensions.GetKubeNamespace(), pod, labels); } catch (Exception e) { @@ -143,7 +145,7 @@ public async Task RegisterMemberInner() private void StartClusterMonitor() { var props = Props - .FromProducer(() => new KubernetesClusterMonitor(_cluster, _kubernetes, _config)) + .FromProducer(() => new KubernetesClusterMonitor(_cluster, _config)) .WithGuardianSupervisorStrategy(Supervision.AlwaysRestartStrategy); _clusterMonitor = _cluster.System.Root.SpawnNamed(props, "kubernetes-cluster-monitor"); @@ -173,11 +175,12 @@ public async Task DeregisterMemberAsync(Cluster cluster) private async Task DeregisterMemberInner(Cluster cluster) { + var kubernetes = _config.ClientFactory(); Logger.LogInformation("[Cluster][KubernetesProvider] Unregistering service {PodName} on {PodIp}", _podName, _address); var kubeNamespace = KubernetesExtensions.GetKubeNamespace(); - var pod = await _kubernetes.ReadNamespacedPodAsync(_podName, kubeNamespace); + var pod = await kubernetes.ReadNamespacedPodAsync(_podName, kubeNamespace); var labels = new Dictionary(pod.Metadata.Labels); foreach (var kind in _kinds) @@ -195,7 +198,7 @@ private async Task DeregisterMemberInner(Cluster cluster) labels.Remove(LabelCluster); - await _kubernetes.ReplacePodLabels(_podName, kubeNamespace,pod, labels); + await kubernetes.ReplacePodLabels(_podName, kubeNamespace,pod, labels); cluster.System.Root.Send(_clusterMonitor, new DeregisterMember()); } diff --git a/src/Proto.Cluster.Kubernetes/KubernetesProviderConfig.cs b/src/Proto.Cluster.Kubernetes/KubernetesProviderConfig.cs index 02e753c369..9fd34be013 100644 --- a/src/Proto.Cluster.Kubernetes/KubernetesProviderConfig.cs +++ b/src/Proto.Cluster.Kubernetes/KubernetesProviderConfig.cs @@ -3,6 +3,8 @@ // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- +using System; +using k8s; using Microsoft.Extensions.Logging; namespace Proto.Cluster.Kubernetes; @@ -11,12 +13,17 @@ public record KubernetesProviderConfig { public int WatchTimeoutSeconds { get; } private bool DeveloperLogging { get; } + + public Func ClientFactory { get; } - public KubernetesProviderConfig(int watchTimeoutSeconds = 30, bool developerLogging = false) + public KubernetesProviderConfig(int watchTimeoutSeconds = 30, bool developerLogging = false, Func clientFactory = null) { WatchTimeoutSeconds = watchTimeoutSeconds; DeveloperLogging = developerLogging; + ClientFactory = clientFactory ?? DefaultFactory; } - + + private static IKubernetes DefaultFactory() => new k8s.Kubernetes(KubernetesClientConfiguration.InClusterConfig()); + internal LogLevel DebugLogLevel => DeveloperLogging ? LogLevel.Information : LogLevel.Debug; } \ No newline at end of file