Skip to content

Commit

Permalink
Kubernetes API Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing authored Mar 23, 2022
1 parent 2905239 commit 0ce2330
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 32 deletions.
3 changes: 1 addition & 2 deletions benchmarks/ClusterBenchmark/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ private static IClusterProvider ClusterProvider()
{
try
{
var kubernetes = new Kubernetes(KubernetesClientConfiguration.InClusterConfig());
Console.WriteLine("Running with Kubernetes Provider");
return new KubernetesProvider(kubernetes);
return new KubernetesProvider();
}
catch
{
Expand Down
18 changes: 3 additions & 15 deletions src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,11 @@ class KubernetesClusterMonitor : IActor
private bool _watching;
private readonly KubernetesProviderConfig _config;
private DateTime _lastRestart;
private readonly Func<IKubernetes> _factory;

public KubernetesClusterMonitor(Cluster cluster, Func<IKubernetes> kubernetesFactory,KubernetesProviderConfig config)
public KubernetesClusterMonitor(Cluster cluster, KubernetesProviderConfig config)
{
_cluster = cluster;
_factory = kubernetesFactory;
_kubernetes = _factory();
_config = config;
}

[Obsolete("Use overload with kubernetesFactory argument instead",false)]
public KubernetesClusterMonitor(Cluster cluster, IKubernetes kubernetes,KubernetesProviderConfig config)
{
_cluster = cluster;
_kubernetes = kubernetes;
_kubernetes = config.ClientFactory();
_config = config;
}

Expand Down Expand Up @@ -142,14 +132,12 @@ void Restart()

private void RecreateKubernetesClient()
{
if (_factory == null) return;

DisposeWatcher();
DisposeWatcherTask();
DisposeKubernetesClient();

Logger.LogWarning("[Cluster][KubernetesProvider] Recreating Kubernetes client due to connectivity error");
_kubernetes = _factory();
_kubernetes = _config.ClientFactory();
}

private void DisposeKubernetesClient()
Expand Down
29 changes: 16 additions & 13 deletions src/Proto.Cluster.Kubernetes/KubernetesProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using k8s;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Proto.Mailbox;
using Proto.Utils;
using static Proto.Cluster.Kubernetes.Messages;
using static Proto.Cluster.Kubernetes.ProtoLabels;
Expand All @@ -21,8 +20,6 @@ namespace Proto.Cluster.Kubernetes;
public class KubernetesProvider : IClusterProvider
{
private static readonly ILogger Logger = Log.CreateLogger<KubernetesProvider>();

private readonly IKubernetes _kubernetes;
private string _address;
private Cluster _cluster;

Expand All @@ -35,17 +32,21 @@ public class KubernetesProvider : IClusterProvider
private int _port;
private readonly KubernetesProviderConfig _config;

public KubernetesProvider(IKubernetes kubernetes) : this(kubernetes, new KubernetesProviderConfig())
public KubernetesProvider() : this(new KubernetesProviderConfig())
{
}

public KubernetesProvider(IKubernetes kubernetes, KubernetesProviderConfig config)
public KubernetesProvider(KubernetesProviderConfig config)
{
if (KubernetesExtensions.GetKubeNamespace() is null)
throw new InvalidOperationException("The application doesn't seem to be running in Kubernetes");

_config = config;
_kubernetes = kubernetes;
}

[Obsolete("Do not pass a Kubernetes client directly, pass Client factory as part of Config, or use Config defaults", true)]
public KubernetesProvider(IKubernetes kubernetes, KubernetesProviderConfig config)
{
}

public async Task StartMemberAsync(Cluster cluster)
Expand Down Expand Up @@ -99,14 +100,15 @@ public async Task RegisterMemberAsync()

public async Task RegisterMemberInner()
{
var kubernetes = _config.ClientFactory();
Logger.LogInformation("[Cluster][KubernetesProvider] Registering service {PodName} on {PodIp}", _podName, _address);

var pod = await _kubernetes.ReadNamespacedPodAsync(_podName, KubernetesExtensions.GetKubeNamespace());
var pod = await kubernetes.ReadNamespacedPodAsync(_podName, KubernetesExtensions.GetKubeNamespace());
if (pod is null) throw new ApplicationException($"Unable to get own pod information for {_podName}");

Logger.LogInformation("[Cluster][KubernetesProvider] Using Kubernetes namespace: " + pod.Namespace());
Logger.LogInformation("[Cluster][KubernetesProvider] Using Kubernetes namespace: {Namespace}", pod.Namespace());

Logger.LogInformation("[Cluster][KubernetesProvider] Using Kubernetes port: " + _port);
Logger.LogInformation("[Cluster][KubernetesProvider] Using Kubernetes port: {Port}" , _port);

var existingLabels = pod.Metadata.Labels;

Expand All @@ -131,7 +133,7 @@ public async Task RegisterMemberInner()

try
{
await _kubernetes.ReplacePodLabels(_podName, KubernetesExtensions.GetKubeNamespace(), pod, labels);
await kubernetes.ReplacePodLabels(_podName, KubernetesExtensions.GetKubeNamespace(), pod, labels);
}
catch (Exception e)
{
Expand All @@ -143,7 +145,7 @@ public async Task RegisterMemberInner()
private void StartClusterMonitor()
{
var props = Props
.FromProducer(() => new KubernetesClusterMonitor(_cluster, _kubernetes, _config))
.FromProducer(() => new KubernetesClusterMonitor(_cluster, _config))
.WithGuardianSupervisorStrategy(Supervision.AlwaysRestartStrategy);

_clusterMonitor = _cluster.System.Root.SpawnNamed(props, "kubernetes-cluster-monitor");
Expand Down Expand Up @@ -173,11 +175,12 @@ public async Task DeregisterMemberAsync(Cluster cluster)

private async Task DeregisterMemberInner(Cluster cluster)
{
var kubernetes = _config.ClientFactory();
Logger.LogInformation("[Cluster][KubernetesProvider] Unregistering service {PodName} on {PodIp}", _podName, _address);

var kubeNamespace = KubernetesExtensions.GetKubeNamespace();

var pod = await _kubernetes.ReadNamespacedPodAsync(_podName, kubeNamespace);
var pod = await kubernetes.ReadNamespacedPodAsync(_podName, kubeNamespace);

var labels = new Dictionary<string, string>(pod.Metadata.Labels);
foreach (var kind in _kinds)
Expand All @@ -195,7 +198,7 @@ private async Task DeregisterMemberInner(Cluster cluster)

labels.Remove(LabelCluster);

await _kubernetes.ReplacePodLabels(_podName, kubeNamespace,pod, labels);
await kubernetes.ReplacePodLabels(_podName, kubeNamespace,pod, labels);

cluster.System.Root.Send(_clusterMonitor, new DeregisterMember());
}
Expand Down
11 changes: 9 additions & 2 deletions src/Proto.Cluster.Kubernetes/KubernetesProviderConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using k8s;
using Microsoft.Extensions.Logging;

namespace Proto.Cluster.Kubernetes;
Expand All @@ -11,12 +13,17 @@ public record KubernetesProviderConfig
{
public int WatchTimeoutSeconds { get; }
private bool DeveloperLogging { get; }

public Func<IKubernetes> ClientFactory { get; }

public KubernetesProviderConfig(int watchTimeoutSeconds = 30, bool developerLogging = false)
public KubernetesProviderConfig(int watchTimeoutSeconds = 30, bool developerLogging = false, Func<IKubernetes> clientFactory = null)
{
WatchTimeoutSeconds = watchTimeoutSeconds;
DeveloperLogging = developerLogging;
ClientFactory = clientFactory ?? DefaultFactory;
}


private static IKubernetes DefaultFactory() => new k8s.Kubernetes(KubernetesClientConfiguration.InClusterConfig());

internal LogLevel DebugLogLevel => DeveloperLogging ? LogLevel.Information : LogLevel.Debug;
}

0 comments on commit 0ce2330

Please sign in to comment.