Skip to content

Draft for thoughts around weighting #2178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,27 @@ private static DestinationConfig CreateDestination(IConfigurationSection section
Address = section[nameof(DestinationConfig.Address)]!,
Health = section[nameof(DestinationConfig.Health)],
Metadata = section.GetSection(nameof(DestinationConfig.Metadata)).ReadStringDictionary(),
Weights = CreateWeightsConfig( section.GetSection(nameof(DestinationConfig.Weights)))
};
}

private static Dictionary<string,double>? CreateWeightsConfig(IConfigurationSection section)
{
if (!section.Exists())
{
return null;
}

var weights = new Dictionary<string,double>();
foreach (var child in section.GetChildren())
{
if (!double.TryParse(child.Value, out var weight)) { throw new ArgumentException($"Could not convert {child.Value} into a double for the weight setting {child.Key} at {section.Path}"); }
weights.Add(child.Key, weight);
}

return weights;
}

private static class Log
{
private static readonly Action<ILogger, Exception> _errorSignalingChange = LoggerMessage.Define(
Expand Down
6 changes: 6 additions & 0 deletions src/ReverseProxy/Configuration/DestinationConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public sealed record DestinationConfig
/// </summary>
public IReadOnlyDictionary<string, string>? Metadata { get; init; }


/// <summary>
/// Collection of weight values for the destination.
/// </summary>
public Dictionary<string, double>? Weights { get; init; }

public bool Equals(DestinationConfig? other)
{
if (other is null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
using Microsoft.AspNetCore.Http;
using Yarp.ReverseProxy.Model;
using Yarp.ReverseProxy.Utilities;
using Yarp.ReverseProxy.Weighting;

namespace Yarp.ReverseProxy.LoadBalancing;

internal sealed class PowerOfTwoChoicesLoadBalancingPolicy : ILoadBalancingPolicy
{
private readonly IRandomFactory _randomFactory;
private readonly IProxyWeightingProvider _weightingProvider;

public PowerOfTwoChoicesLoadBalancingPolicy(IRandomFactory randomFactory)
public PowerOfTwoChoicesLoadBalancingPolicy(IRandomFactory randomFactory, IProxyWeightingProvider weightingProvider)
{
_randomFactory = randomFactory;
_weightingProvider = weightingProvider;
}

public string Name => LoadBalancingPolicies.PowerOfTwoChoices;
Expand All @@ -26,23 +29,31 @@ public PowerOfTwoChoicesLoadBalancingPolicy(IRandomFactory randomFactory)
{
return null;
}

if (destinationCount == 1)
{
return availableDestinations[0];
}

// Pick two, and then return the least busy. This avoids the effort of searching the whole list, but
// still avoids overloading a single destination.
var random = _randomFactory.CreateRandomInstance();
var firstIndex = random.Next(destinationCount);
int secondIndex;
do

DestinationState first, second;
if (_weightingProvider is null)
{
secondIndex = random.Next(destinationCount);
} while (firstIndex == secondIndex);
var first = availableDestinations[firstIndex];
var second = availableDestinations[secondIndex];
var random = _randomFactory.CreateRandomInstance();
var firstIndex = random.Next(destinationCount);
var secondIndex = random.Next(destinationCount - 1);
// account for the firstIndex by skipping it and moving beyond its index in the list
if (secondIndex >= firstIndex) { secondIndex++; }
first = availableDestinations[firstIndex];
second = availableDestinations[secondIndex];
}
else
{
first = WeightUtils.getRandomWeightedDestination(availableDestinations, _randomFactory);
second = WeightUtils.getRandomWeightedDestinationWithSkip(availableDestinations, first, _randomFactory);
}
return (first.ConcurrentRequestCount <= second.ConcurrentRequestCount) ? first : second;
}
}
16 changes: 13 additions & 3 deletions src/ReverseProxy/LoadBalancing/RandomLoadBalancingPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
using Microsoft.AspNetCore.Http;
using Yarp.ReverseProxy.Model;
using Yarp.ReverseProxy.Utilities;
using Yarp.ReverseProxy.Weighting;

namespace Yarp.ReverseProxy.LoadBalancing;

internal sealed class RandomLoadBalancingPolicy : ILoadBalancingPolicy
{
private readonly IRandomFactory _randomFactory;
private readonly IProxyWeightingProvider _proxyWeightingProvider;

public RandomLoadBalancingPolicy(IRandomFactory randomFactory)
public RandomLoadBalancingPolicy(IRandomFactory randomFactory, IProxyWeightingProvider proxyWeightingProvider)
{
_randomFactory = randomFactory;
_proxyWeightingProvider = proxyWeightingProvider;
}

public string Name => LoadBalancingPolicies.Random;
Expand All @@ -26,7 +29,14 @@ public RandomLoadBalancingPolicy(IRandomFactory randomFactory)
return null;
}

var random = _randomFactory.CreateRandomInstance();
return availableDestinations[random.Next(availableDestinations.Count)];
if (_proxyWeightingProvider is null)
{
var random = _randomFactory.CreateRandomInstance();
return availableDestinations[random.Next(availableDestinations.Count)];
}
else
{
return WeightUtils.getRandomWeightedDestination(availableDestinations, _randomFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Yarp.ReverseProxy.SessionAffinity;
using Yarp.ReverseProxy.Transforms;
using Yarp.ReverseProxy.Utilities;
using Yarp.ReverseProxy.Weighting;

namespace Yarp.ReverseProxy.Management;

Expand Down Expand Up @@ -125,4 +126,11 @@ public static IReverseProxyBuilder AddHttpSysDelegation(this IReverseProxyBuilde

return builder;
}

public static IReverseProxyBuilder AddWeighting(this IReverseProxyBuilder builder)
{
builder.Services.TryAddSingleton<IProxyWeightingProvider, CompoundedWeightingProvider>();
return builder;
}

}
5 changes: 5 additions & 0 deletions src/ReverseProxy/Management/ProxyConfigManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Yarp.ReverseProxy.Model;
using Yarp.ReverseProxy.Routing;
using Yarp.ReverseProxy.Transforms.Builder;
using Yarp.ReverseProxy.Weighting;

namespace Yarp.ReverseProxy.Management;

Expand All @@ -46,6 +47,7 @@ internal sealed class ProxyConfigManager : EndpointDataSource, IProxyStateLookup
private readonly IForwarderHttpClientFactory _httpClientFactory;
private readonly ProxyEndpointFactory _proxyEndpointFactory;
private readonly ITransformBuilder _transformBuilder;
private readonly IProxyWeightingProvider _weightingProvider;
private readonly List<Action<EndpointBuilder>> _conventions;
private readonly IActiveHealthCheckMonitor _activeHealthCheckMonitor;
private readonly IClusterDestinationsUpdater _clusterDestinationsUpdater;
Expand All @@ -64,6 +66,7 @@ public ProxyConfigManager(
IConfigValidator configValidator,
ProxyEndpointFactory proxyEndpointFactory,
ITransformBuilder transformBuilder,
IProxyWeightingProvider weightingProvider,
IForwarderHttpClientFactory httpClientFactory,
IActiveHealthCheckMonitor activeHealthCheckMonitor,
IClusterDestinationsUpdater clusterDestinationsUpdater,
Expand All @@ -77,6 +80,7 @@ public ProxyConfigManager(
_configValidator = configValidator ?? throw new ArgumentNullException(nameof(configValidator));
_proxyEndpointFactory = proxyEndpointFactory ?? throw new ArgumentNullException(nameof(proxyEndpointFactory));
_transformBuilder = transformBuilder ?? throw new ArgumentNullException(nameof(transformBuilder));
_weightingProvider = weightingProvider ?? throw new ArgumentNullException(nameof(weightingProvider));
_httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
_activeHealthCheckMonitor = activeHealthCheckMonitor ?? throw new ArgumentNullException(nameof(activeHealthCheckMonitor));
_clusterDestinationsUpdater = clusterDestinationsUpdater ?? throw new ArgumentNullException(nameof(clusterDestinationsUpdater));
Expand Down Expand Up @@ -586,6 +590,7 @@ private bool UpdateRuntimeDestinations(IReadOnlyDictionary<string, DestinationCo
{
Log.DestinationChanged(_logger, incomingDestination.Key);
currentDestination.Model = new DestinationModel(incomingDestination.Value);
_weightingProvider.UpdateDestinationState(currentDestination);
changed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static IReverseProxyBuilder AddReverseProxy(this IServiceCollection servi
.AddPassiveHealthCheck()
.AddLoadBalancingPolicies()
.AddHttpSysDelegation()
.AddWeighting()
.AddProxy();

services.TryAddSingleton<ProxyEndpointFactory>();
Expand Down
4 changes: 4 additions & 0 deletions src/ReverseProxy/Model/DestinationState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections;
using System.Collections.Generic;
using Yarp.ReverseProxy.Utilities;
using Yarp.ReverseProxy.Weighting;

namespace Yarp.ReverseProxy.Model;

Expand Down Expand Up @@ -59,6 +60,9 @@ public int ConcurrentRequestCount

internal AtomicCounter ConcurrencyCounter { get; } = new AtomicCounter();

public IDestinationWeight? Weight { get; }


DestinationState IReadOnlyList<DestinationState>.this[int index]
=> index == 0 ? this : throw new IndexOutOfRangeException();

Expand Down
85 changes: 85 additions & 0 deletions src/ReverseProxy/Weighting/CompoundedDestinationWeight.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Yarp.ReverseProxy.Weighting;
public class CompoundedDestinationWeight : IDestinationWeight
{
private double[]? _weights;
private int[]? _keys;

//bug: do we need to lock to return this?
public double RelativeWeight { get; private set; } = 1.0;

internal void SetWeightInternal(int hashCode, double weight)
{
lock (this)
{
if (_keys is null)
{
_keys = new int[1];
_weights = new double[1];

_keys[0] = hashCode;
_weights[0] = weight;
}
else if (!_keys.Contains(hashCode))
{
Array.Resize(ref _keys, _keys.Length + 1);
Array.Resize(ref _weights, _weights.Length + 1);
_keys[_keys.Length - 1] = hashCode;
_weights[_weights.Length - 1] = weight;
}
else
{
var index = Array.IndexOf(_keys, hashCode);
_weights[index] = weight;
}

var totalWeight = 1.0;
foreach (var w in _weights) { totalWeight *= w; }

RelativeWeight = totalWeight;
}
}

internal double? GetWeightInternal(int hashCode)
{
if (_keys is not null && _keys.Contains(hashCode))
{
var index = Array.IndexOf(_keys, hashCode);
return _weights[index];
}
return null;
}

public void SetWeight(object key, double weight)
{
var k = key as string;
if (k is not null)
{
var hash = k.GetHashCode(StringComparison.OrdinalIgnoreCase);
SetWeightInternal(hash, weight);
}
else
{
SetWeightInternal(key.GetHashCode(), weight);
}
}

public double? GetWeight(object key)
{
var k = key as string;
if (k is not null)
{
var hash = k.GetHashCode(StringComparison.OrdinalIgnoreCase);
return GetWeightInternal(hash);
}
else
{
return GetWeightInternal(key.GetHashCode());
}
}
}
57 changes: 57 additions & 0 deletions src/ReverseProxy/Weighting/CompoundedWeightingProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Yarp.ReverseProxy.Configuration;
using Yarp.ReverseProxy.Model;

namespace Yarp.ReverseProxy.Weighting;
public class CompoundedWeightingProvider : IProxyWeightingProvider
{
private List<int> _keys = new();

public void SetDestinationWeight(DestinationState destination, object identifier, double weight)
{
var key = identifier.GetHashCode();
int index = 0;
if (!_keys.Contains(key))
{
_keys.Add(key);
index = _keys.Count;

// UpdateDestinationsWeights(key);
}
else
{
index=_keys.IndexOf(key);
}

var dw = destination.Weight as CompoundedDestinationWeight;
dw?.SetWeightInternal(index, weight);

}

private void UpdateDestinationsWeights(int key)
{
throw new NotImplementedException();
}

public void SetDestinationWeights(DestinationState destination, IConfigurationSection configuration)
{
throw new NotImplementedException();
}

public void UpdateDestinationState(DestinationState destination)
{
var newWeights = destination.Model.Config.Weights;
if (newWeights != null)
{
foreach (var kvp in newWeights)
{
SetDestinationWeight(destination, kvp.Key.ToLower(), kvp.Value);
}
}
}
}
13 changes: 13 additions & 0 deletions src/ReverseProxy/Weighting/IDestinationWeight.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Yarp.ReverseProxy.Weighting;
public interface IDestinationWeight
{
double RelativeWeight { get; }
void SetWeight(object key, double weight);
double? GetWeight(object key);
}
14 changes: 14 additions & 0 deletions src/ReverseProxy/Weighting/IProxyWeightingProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Yarp.ReverseProxy.Configuration;
using Yarp.ReverseProxy.Model;

namespace Yarp.ReverseProxy.Weighting;
public interface IProxyWeightingProvider
{
void UpdateDestinationState(DestinationState currentDestination);
}
Loading