Skip to content

Commit

Permalink
Merge branch 'ThreeMammals:develop' into feat_enableHttp2
Browse files Browse the repository at this point in the history
  • Loading branch information
EngRajabi authored Aug 28, 2024
2 parents 5ba1f47 + 19a8e2f commit d2d438c
Show file tree
Hide file tree
Showing 25 changed files with 1,970 additions and 1,378 deletions.
40 changes: 24 additions & 16 deletions src/Ocelot.Provider.Kubernetes/Kube.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
using KubeClient.Models;
using Ocelot.Infrastructure.DesignPatterns;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;
using Ocelot.Values;

namespace Ocelot.Provider.Kubernetes;

/// <summary>
/// Default Kubernetes service discovery provider.
/// </summary>
/// <summary>Default Kubernetes service discovery provider.</summary>
/// <remarks>
/// <list type="bullet">
/// <item>NuGet: <see href="https://www.nuget.org/packages/KubeClient">KubeClient</see></item>
/// <item>GitHub: <see href="https://github.com/tintoy/dotnet-kube-client">dotnet-kube-client</see></item>
/// </list>
/// </remarks>
public class Kube : IServiceDiscoveryProvider
{
private readonly KubeRegistryConfiguration _configuration;
private readonly IOcelotLogger _logger;
private readonly IKubeApiClient _kubeApi;
private readonly IKubeServiceBuilder _serviceBuilder;
private readonly List<Service> _services;

public Kube(
KubeRegistryConfiguration configuration,
Expand All @@ -26,28 +30,32 @@ public Kube(
_logger = factory.CreateLogger<Kube>();
_kubeApi = kubeApi;
_serviceBuilder = serviceBuilder;
_services = new();
}

public virtual async Task<List<Service>> GetAsync()
{
var endpoint = await _kubeApi
.ResourceClient(client => new EndPointClientV1(client))
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);
var endpoint = await Retry.OperationAsync(GetEndpoint, CheckErroneousState, logger: _logger);

_services.Clear();
if (endpoint?.Subsets.Count != 0)
if (CheckErroneousState(endpoint))
{
_services.AddRange(BuildServices(_configuration, endpoint));
}
else
{
_logger.LogWarning(() => $"K8s Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; Unable to use: it is invalid. Address must contain host only e.g. localhost and port must be greater than 0!");
_logger.LogWarning(() => GetMessage($"Unable to use bad result returned by {nameof(Kube)} integration endpoint because the final result is invalid/unknown after multiple retries!"));
return new(0);
}

return _services;
return BuildServices(_configuration, endpoint)
.ToList();
}

private Task<EndpointsV1> GetEndpoint() => _kubeApi
.ResourceClient(client => new EndPointClientV1(client))
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);

private bool CheckErroneousState(EndpointsV1 endpoint)
=> (endpoint?.Subsets?.Count ?? 0) == 0; // null or count is zero

private string GetMessage(string message)
=> $"{nameof(Kube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}";

protected virtual IEnumerable<Service> BuildServices(KubeRegistryConfiguration configuration, EndpointsV1 endpoint)
=> _serviceBuilder.BuildServices(configuration, endpoint);
}
8 changes: 4 additions & 4 deletions src/Ocelot.Provider.Kubernetes/KubeServiceCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ namespace Ocelot.Provider.Kubernetes;

public class KubeServiceCreator : IKubeServiceCreator
{
private readonly IOcelotLogger _logger;

public KubeServiceCreator(IOcelotLoggerFactory factory)
{
ArgumentNullException.ThrowIfNull(factory);
_logger = factory.CreateLogger<KubeServiceCreator>();
Logger = factory.CreateLogger<KubeServiceCreator>();
}

public virtual IEnumerable<Service> Create(KubeRegistryConfiguration configuration, EndpointsV1 endpoint, EndpointSubsetV1 subset)
Expand All @@ -34,6 +32,8 @@ public virtual IEnumerable<Service> CreateInstance(KubeRegistryConfiguration con
return new Service[] { instance };
}

protected IOcelotLogger Logger { get; }

protected virtual string GetServiceName(KubeRegistryConfiguration configuration, EndpointsV1 endpoint, EndpointSubsetV1 subset, EndpointAddressV1 address)
=> endpoint.Metadata?.Name;

Expand All @@ -46,7 +46,7 @@ protected virtual ServiceHostAndPort GetServiceHostAndPort(KubeRegistryConfigura
: ports.FirstOrDefault(portNameToScheme);
portV1 ??= new();
portV1.Name ??= configuration.Scheme ?? string.Empty;
_logger.LogDebug(() => $"K8s service with key '{configuration.KeyOfServiceInK8s}' and address {address.Ip}; Detected port is {portV1.Name}:{portV1.Port}. Total {ports.Count} ports of [{string.Join(',', ports.Select(p => p.Name))}].");
Logger.LogDebug(() => $"K8s service with key '{configuration.KeyOfServiceInK8s}' and address {address.Ip}; Detected port is {portV1.Name}:{portV1.Port}. Total {ports.Count} ports of [{string.Join(',', ports.Select(p => p.Name))}].");
return new ServiceHostAndPort(address.Ip, portV1.Port, portV1.Name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
<Compile Remove="KubeApiClientFactory.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="KubeClient" Version="2.4.10" />
<PackageReference Include="KubeClient.Extensions.DependencyInjection" Version="2.4.10" />
<PackageReference Include="KubeClient" Version="2.5.8" />
<PackageReference Include="KubeClient.Extensions.DependencyInjection" Version="2.5.8" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.507">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
115 changes: 115 additions & 0 deletions src/Ocelot/Infrastructure/DesignPatterns/Retry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
using Ocelot.Logging;

namespace Ocelot.Infrastructure.DesignPatterns;

/// <summary>
/// Basic <seealso href="https://www.bing.com/search?q=Retry+pattern">Retry pattern</seealso> for stabilizing integrated services.
/// </summary>
/// <remarks>Docs:
/// <list type="bullet">
/// <item><see href="https://learn.microsoft.com/en-us/azure/architecture/patterns/retry">Microsoft Learn | Retry pattern</see></item>
/// </list>
/// </remarks>
public static class Retry
{
public const int DefaultRetryTimes = 3;
public const int DefaultWaitTimeMilliseconds = 25;

private static string GetMessage<T>(T operation, int retryNo, string message)
where T : Delegate
=> $"Ocelot {nameof(Retry)} strategy for the operation of '{operation.GetType()}' type -> {nameof(Retry)} No {retryNo}: {message}";

/// <summary>
/// Retry a synchronous operation when an exception occurs or predicate is true, then delay and retry again.
/// </summary>
/// <typeparam name="TResult">Type of the result of the sync operation.</typeparam>
/// <param name="operation">Required Func-delegate of the operation.</param>
/// <param name="predicate">Predicate to check, optionally.</param>
/// <param name="retryTimes">Number of retries.</param>
/// <param name="waitTime">Waiting time in milliseconds.</param>
/// <param name="logger">Concrete logger from upper context.</param>
/// <returns>A <typeparamref name="TResult"/> value as the result of the sync operation.</returns>
public static TResult Operation<TResult>(
Func<TResult> operation,
Predicate<TResult> predicate = null,
int retryTimes = DefaultRetryTimes, int waitTime = DefaultWaitTimeMilliseconds,
IOcelotLogger logger = null)
{
for (int n = 1; n < retryTimes; n++)
{
TResult result;
try
{
result = operation.Invoke();
}
catch (Exception e)
{
logger?.LogError(() => GetMessage(operation, n, $"Caught exception of the {e.GetType()} type -> Message: {e.Message}."), e);
Thread.Sleep(waitTime);
continue; // the result is unknown, so continue to retry
}

// Apply predicate for known result
if (predicate?.Invoke(result) == true)
{
logger?.LogWarning(() => GetMessage(operation, n, $"The predicate has identified erroneous state in the returned result. For further details, implement logging of the result's value or properties within the predicate method."));
Thread.Sleep(waitTime);
continue; // on erroneous state
}

// Happy path
return result;
}

// Last retry should generate native exception or other erroneous state(s)
logger?.LogDebug(() => GetMessage(operation, retryTimes, $"Retrying lastly..."));
return operation.Invoke(); // also final result must be analyzed in the upper context
}

/// <summary>
/// Retry an asynchronous operation when an exception occurs or predicate is true, then delay and retry again.
/// </summary>
/// <typeparam name="TResult">Type of the result of the async operation.</typeparam>
/// <param name="operation">Required Func-delegate of the operation.</param>
/// <param name="predicate">Predicate to check, optionally.</param>
/// <param name="retryTimes">Number of retries.</param>
/// <param name="waitTime">Waiting time in milliseconds.</param>
/// <param name="logger">Concrete logger from upper context.</param>
/// <returns>A <typeparamref name="TResult"/> value as the result of the async operation.</returns>
public static async Task<TResult> OperationAsync<TResult>(
Func<Task<TResult>> operation, // required operation delegate
Predicate<TResult> predicate = null, // optional retry predicate for the result
int retryTimes = DefaultRetryTimes, int waitTime = DefaultWaitTimeMilliseconds, // retrying options
IOcelotLogger logger = null) // static injections
{
for (int n = 1; n < retryTimes; n++)
{
TResult result;
try
{
result = await operation?.Invoke();
}
catch (Exception e)
{
logger?.LogError(() => GetMessage(operation, n, $"Caught exception of the {e.GetType()} type -> Message: {e.Message}."), e);
await Task.Delay(waitTime);
continue; // the result is unknown, so continue to retry
}

// Apply predicate for known result
if (predicate?.Invoke(result) == true)
{
logger?.LogWarning(() => GetMessage(operation, n, $"The predicate has identified erroneous state in the returned result. For further details, implement logging of the result's value or properties within the predicate method."));
await Task.Delay(waitTime);
continue; // on erroneous state
}

// Happy path
return result;
}

// Last retry should generate native exception or other erroneous state(s)
logger?.LogDebug(() => GetMessage(operation, retryTimes, $"Retrying lastly..."));
return await operation?.Invoke(); // also final result must be analyzed in the upper context
}
}
116 changes: 57 additions & 59 deletions src/Ocelot/LoadBalancer/LoadBalancers/CookieStickySessions.cs
Original file line number Diff line number Diff line change
@@ -1,85 +1,83 @@
using Microsoft.AspNetCore.Http;
using Ocelot.Infrastructure;
using Ocelot.Middleware;
using Ocelot.Responses;
using Ocelot.Values;

namespace Ocelot.LoadBalancer.LoadBalancers
namespace Ocelot.LoadBalancer.LoadBalancers;

public class CookieStickySessions : ILoadBalancer
{
public class CookieStickySessions : ILoadBalancer
private readonly int _keyExpiryInMs;
private readonly string _cookieName;
private readonly ILoadBalancer _loadBalancer;
private readonly IBus<StickySession> _bus;

private static readonly object Locker = new();
private static readonly Dictionary<string, StickySession> Stored = new(); // TODO Inject instead of static sharing

public CookieStickySessions(ILoadBalancer loadBalancer, string cookieName, int keyExpiryInMs, IBus<StickySession> bus)
{
private readonly int _keyExpiryInMs;
private readonly string _key;
private readonly ILoadBalancer _loadBalancer;
private readonly ConcurrentDictionary<string, StickySession> _stored;
private readonly IBus<StickySession> _bus;
private readonly object _lock = new();
_bus = bus;
_cookieName = cookieName;
_keyExpiryInMs = keyExpiryInMs;
_loadBalancer = loadBalancer;
_bus.Subscribe(CheckExpiry);
}

public CookieStickySessions(ILoadBalancer loadBalancer, string key, int keyExpiryInMs, IBus<StickySession> bus)
private void CheckExpiry(StickySession sticky)
{
// TODO Get test coverage for this
lock (Locker)
{
_bus = bus;
_key = key;
_keyExpiryInMs = keyExpiryInMs;
_loadBalancer = loadBalancer;
_stored = new ConcurrentDictionary<string, StickySession>();
_bus.Subscribe(ss =>
if (!Stored.TryGetValue(sticky.Key, out var session) || session.Expiry >= DateTime.UtcNow)
{
//todo - get test coverage for this.
if (_stored.TryGetValue(ss.Key, out var stickySession))
{
lock (_lock)
{
if (stickySession.Expiry < DateTime.UtcNow)
{
_stored.TryRemove(stickySession.Key, out _);
_loadBalancer.Release(stickySession.HostAndPort);
}
}
}
});
return;
}

Stored.Remove(session.Key);
_loadBalancer.Release(session.HostAndPort);
}
}

public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
public Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
{
var route = httpContext.Items.DownstreamRoute();
var serviceName = route.LoadBalancerKey;
var cookie = httpContext.Request.Cookies[_cookieName];
var key = $"{serviceName}:{cookie}"; // strong key name because of static store
lock (Locker)
{
var key = httpContext.Request.Cookies[_key];

lock (_lock)
if (!string.IsNullOrEmpty(key) && Stored.TryGetValue(key, out StickySession cached))
{
if (!string.IsNullOrEmpty(key) && _stored.ContainsKey(key))
{
var cached = _stored[key];

var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);

_stored[key] = updated;

_bus.Publish(updated, _keyExpiryInMs);

return new OkResponse<ServiceHostAndPort>(updated.HostAndPort);
}
var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
Update(key, updated);
return Task.FromResult<Response<ServiceHostAndPort>>(new OkResponse<ServiceHostAndPort>(updated.HostAndPort));
}

var next = await _loadBalancer.Lease(httpContext);

// There is no value in the store, so lease it now!
var next = _loadBalancer.Lease(httpContext).GetAwaiter().GetResult(); // unfortunately the operation must be synchronous
if (next.IsError)
{
return new ErrorResponse<ServiceHostAndPort>(next.Errors);
}

lock (_lock)
{
if (!string.IsNullOrEmpty(key) && !_stored.ContainsKey(key))
{
var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
_stored[key] = ss;
_bus.Publish(ss, _keyExpiryInMs);
}
return Task.FromResult<Response<ServiceHostAndPort>>(new ErrorResponse<ServiceHostAndPort>(next.Errors));
}

return new OkResponse<ServiceHostAndPort>(next.Data);
var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
Update(key, ss);
return Task.FromResult<Response<ServiceHostAndPort>>(new OkResponse<ServiceHostAndPort>(next.Data));
}
}

public void Release(ServiceHostAndPort hostAndPort)
protected void Update(string key, StickySession value)
{
lock (Locker)
{
Stored[key] = value;
_bus.Publish(value, _keyExpiryInMs);
}
}

public void Release(ServiceHostAndPort hostAndPort)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ public class CookieStickySessionsCreator : ILoadBalancerCreator
{
public Response<ILoadBalancer> Create(DownstreamRoute route, IServiceDiscoveryProvider serviceProvider)
{
var loadBalancer = new RoundRobin(async () => await serviceProvider.GetAsync());
var options = route.LoadBalancerOptions;
var loadBalancer = new RoundRobin(serviceProvider.GetAsync, route.LoadBalancerKey);
var bus = new InMemoryBus<StickySession>();
return new OkResponse<ILoadBalancer>(new CookieStickySessions(loadBalancer, route.LoadBalancerOptions.Key,
route.LoadBalancerOptions.ExpiryInMs, bus));
return new OkResponse<ILoadBalancer>(
new CookieStickySessions(loadBalancer, options.Key, options.ExpiryInMs, bus));
}

public string Type => nameof(CookieStickySessions);
Expand Down
Loading

0 comments on commit d2d438c

Please sign in to comment.