Skip to content

Commit aaf264a

Browse files
gurkanguranGürkan Güran
andauthored
Implemented Proto cluster provider for Azure Container Apps (#1889)
* Implemented Proto Cluster for Azure Container Apps * Updated props and references * Fixed issues found during testing - Manage tags on a container app resource level as replicas cannot have tags - Find correct ip address of the replica Co-authored-by: Gürkan Güran <[email protected]_not_set.invalid>
1 parent abc58cf commit aaf264a

File tree

6 files changed

+409
-0
lines changed

6 files changed

+409
-0
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Azure;
5+
using Azure.ResourceManager;
6+
using Azure.ResourceManager.AppContainers;
7+
using Azure.ResourceManager.Resources;
8+
using Azure.ResourceManager.Resources.Models;
9+
using Microsoft.Extensions.Logging;
10+
11+
namespace Proto.Cluster.AzureContainerApps;
12+
13+
public static class ArmClientUtils
14+
{
15+
private static readonly ILogger Logger = Log.CreateLogger(nameof(ArmClientUtils));
16+
17+
public static async Task<Member[]> GetClusterMembers(this ArmClient client, string resourceGroupName, string containerAppName)
18+
{
19+
var members = new List<Member>();
20+
21+
var containerApp = await (await client.GetResourceGroupByName(resourceGroupName)).Value.GetContainerAppAsync(containerAppName);
22+
23+
if (containerApp is null || !containerApp.HasValue)
24+
{
25+
Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} is not found", containerApp, resourceGroupName);
26+
return members.ToArray();
27+
}
28+
29+
var containerAppRevisions = GetActiveRevisionsWithTraffic(containerApp).ToList();
30+
if (!containerAppRevisions.Any())
31+
{
32+
Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} does not contain any active revisions with traffic", containerAppName, resourceGroupName);
33+
return members.ToArray();
34+
}
35+
36+
var replicasWithTraffic = containerAppRevisions.SelectMany(r => r.GetContainerAppReplicas());
37+
38+
var allTags = (await containerApp.Value.GetTagResource().GetAsync()).Value.Data.TagValues;
39+
40+
foreach (var replica in replicasWithTraffic)
41+
{
42+
var replicaNameTag = allTags.FirstOrDefault(kvp => kvp.Value == replica.Data.Name);
43+
if (replicaNameTag.Key == null)
44+
{
45+
Logger.LogWarning("Skipping Replica with name: {Name}, no Proto Tags found", replica.Data.Name);
46+
continue;
47+
}
48+
49+
var replicaNameTagPrefix = replicaNameTag.Key.Replace(ResourceTagLabels.LabelReplicaNameWithoutPrefix, string.Empty);
50+
var currentReplicaTags = allTags.Where(kvp => kvp.Key.StartsWith(replicaNameTagPrefix)).ToDictionary(x => x.Key, x => x.Value);
51+
52+
var memberId = currentReplicaTags.FirstOrDefault(kvp => kvp.Key.ToString().Contains(ResourceTagLabels.LabelMemberIdWithoutPrefix)).Value;
53+
54+
var kinds = currentReplicaTags
55+
.Where(kvp => kvp.Key.StartsWith(ResourceTagLabels.LabelKind(memberId)))
56+
.Select(kvp => kvp.Key[(ResourceTagLabels.LabelKind(memberId).Length + 1)..])
57+
.ToArray();
58+
59+
var member = new Member
60+
{
61+
Id = currentReplicaTags[ResourceTagLabels.LabelMemberId(memberId)],
62+
Port = int.Parse(currentReplicaTags[ResourceTagLabels.LabelPort(memberId)]),
63+
Host = currentReplicaTags[ResourceTagLabels.LabelHost(memberId)],
64+
Kinds = { kinds }
65+
};
66+
67+
members.Add(member);
68+
}
69+
70+
return members.ToArray();
71+
}
72+
73+
public static async Task AddMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, Dictionary<string, string> newTags)
74+
{
75+
var resourceTag = new Tag();
76+
foreach (var tag in newTags)
77+
{
78+
resourceTag.TagValues.Add(tag);
79+
}
80+
81+
var resourceGroup = await client.GetResourceGroupByName(resourceGroupName);
82+
var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName);
83+
var tagResource = containerApp.Value.GetTagResource();
84+
85+
var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues;
86+
foreach (var tag in existingTags)
87+
{
88+
resourceTag.TagValues.Add(tag);
89+
}
90+
await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag));
91+
}
92+
93+
public static async Task ClearMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, string memberId)
94+
{
95+
var resourceGroup = await client.GetResourceGroupByName(resourceGroupName);
96+
var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName);
97+
var tagResource = containerApp.Value.GetTagResource();
98+
99+
var resourceTag = new Tag();
100+
var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues;
101+
102+
foreach (var tag in existingTags)
103+
{
104+
if (!tag.Key.StartsWith(ResourceTagLabels.LabelPrefix(memberId)))
105+
{
106+
resourceTag.TagValues.Add(tag);
107+
}
108+
}
109+
110+
await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag));
111+
}
112+
113+
public static async Task<Response<ResourceGroupResource>> GetResourceGroupByName(this ArmClient client, string resourceGroupName) =>
114+
await (await client.GetDefaultSubscriptionAsync()).GetResourceGroups().GetAsync(resourceGroupName);
115+
116+
private static IEnumerable<ContainerAppRevisionResource> GetActiveRevisionsWithTraffic(ContainerAppResource containerApp) =>
117+
containerApp.GetContainerAppRevisions().Where(r => r.HasData && r.Data.Active.GetValueOrDefault(false) && r.Data.TrafficWeight > 0);
118+
}
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using Azure.ResourceManager;
6+
using Azure.ResourceManager.AppContainers;
7+
using Microsoft.Extensions.Configuration;
8+
using Microsoft.Extensions.Logging;
9+
using Proto.Utils;
10+
11+
namespace Proto.Cluster.AzureContainerApps;
12+
13+
public class AzureContainerAppsProvider : IClusterProvider
14+
{
15+
public readonly string AdvertisedHost;
16+
17+
private readonly ArmClient _client;
18+
private readonly string _resourceGroup;
19+
private readonly string _containerAppName;
20+
private readonly string _revisionName;
21+
private readonly string _replicaName;
22+
23+
private string _memberId = null!;
24+
private string _address = null!;
25+
private Cluster _cluster = null!;
26+
private string _clusterName = null!;
27+
private string[] _kinds = null!;
28+
private string _host = null!;
29+
private int _port;
30+
31+
private readonly IConfiguration _configuration;
32+
private static readonly ILogger Logger = Log.CreateLogger<AzureContainerAppsProvider>();
33+
private static readonly TimeSpan PollIntervalInSeconds = TimeSpan.FromSeconds(5);
34+
35+
public AzureContainerAppsProvider(
36+
IConfiguration configuration,
37+
ArmClient client,
38+
string resourceGroup,
39+
string containerAppName,
40+
string revisionName,
41+
string replicaName,
42+
string advertisedHost = default)
43+
{
44+
_configuration = configuration;
45+
_client = client;
46+
_resourceGroup = resourceGroup;
47+
_containerAppName = containerAppName;
48+
_revisionName = revisionName;
49+
_replicaName = replicaName;
50+
AdvertisedHost = advertisedHost;
51+
52+
if (string.IsNullOrEmpty(AdvertisedHost))
53+
{
54+
AdvertisedHost = ConfigUtils.FindIpAddress().ToString();
55+
}
56+
}
57+
58+
public async Task StartMemberAsync(Cluster cluster)
59+
{
60+
var clusterName = cluster.Config.ClusterName;
61+
var (host, port) = cluster.System.GetAddress();
62+
var kinds = cluster.GetClusterKinds();
63+
_cluster = cluster;
64+
_clusterName = clusterName;
65+
_memberId = cluster.System.Id;
66+
_port = port;
67+
_host = host;
68+
_kinds = kinds;
69+
_address = $"{host}:{port}";
70+
71+
await RegisterMemberAsync();
72+
StartClusterMonitor();
73+
}
74+
75+
public Task StartClientAsync(Cluster cluster)
76+
{
77+
var clusterName = cluster.Config.ClusterName;
78+
var (host, port) = cluster.System.GetAddress();
79+
_cluster = cluster;
80+
_clusterName = clusterName;
81+
_memberId = cluster.System.Id;
82+
_port = port;
83+
_host = host;
84+
_kinds = Array.Empty<string>();
85+
86+
StartClusterMonitor();
87+
return Task.CompletedTask;
88+
}
89+
90+
public async Task ShutdownAsync(bool graceful) => await DeregisterMemberAsync();
91+
92+
private async Task RegisterMemberAsync()
93+
{
94+
await Retry.Try(RegisterMemberInner, onError: OnError, onFailed: OnFailed, retryCount: Retry.Forever);
95+
96+
static void OnError(int attempt, Exception exception) =>
97+
Logger.LogWarning(exception, "Failed to register service");
98+
99+
static void OnFailed(Exception exception) => Logger.LogError(exception, "Failed to register service");
100+
}
101+
102+
private async Task RegisterMemberInner()
103+
{
104+
var resourceGroup = await _client.GetResourceGroupByName(_resourceGroup);
105+
var containerApp = await resourceGroup.Value.GetContainerAppAsync(_containerAppName);
106+
var revision = await containerApp.Value.GetContainerAppRevisionAsync(_revisionName);
107+
108+
if (revision.Value.Data.TrafficWeight.GetValueOrDefault(0) == 0)
109+
{
110+
return;
111+
}
112+
113+
Logger.LogInformation(
114+
"[Cluster][AzureContainerAppsProvider] Registering service {ReplicaName} on {IpAddress}",
115+
_replicaName,
116+
_address);
117+
118+
var tags = new Dictionary<string, string>
119+
{
120+
[ResourceTagLabels.LabelCluster(_memberId)] = _clusterName,
121+
[ResourceTagLabels.LabelHost(_memberId)] = AdvertisedHost,
122+
[ResourceTagLabels.LabelPort(_memberId)] = _port.ToString(),
123+
[ResourceTagLabels.LabelMemberId(_memberId)] = _memberId,
124+
[ResourceTagLabels.LabelReplicaName(_memberId)] = _replicaName
125+
};
126+
127+
foreach (var kind in _kinds)
128+
{
129+
var labelKey = $"{ResourceTagLabels.LabelKind(_memberId)}-{kind}";
130+
tags.TryAdd(labelKey, "true");
131+
}
132+
133+
try
134+
{
135+
await _client.AddMemberTags(_resourceGroup, _containerAppName, tags);
136+
}
137+
catch (Exception x)
138+
{
139+
Logger.LogError(x, "Failed to update metadata");
140+
}
141+
}
142+
143+
private void StartClusterMonitor() =>
144+
_ = SafeTask.Run(async () =>
145+
{
146+
while (!_cluster.System.Shutdown.IsCancellationRequested)
147+
{
148+
Logger.LogInformation("Calling ECS API");
149+
150+
try
151+
{
152+
var members = await _client.GetClusterMembers(_resourceGroup, _containerAppName);
153+
154+
if (members.Any())
155+
{
156+
Logger.LogInformation("Got members {Members}", members.Length);
157+
_cluster.MemberList.UpdateClusterTopology(members);
158+
}
159+
else
160+
{
161+
Logger.LogWarning("Failed to get members from Azure Container Apps");
162+
}
163+
}
164+
catch (Exception x)
165+
{
166+
Logger.LogError(x, "Failed to get members from Azure Container Apps");
167+
}
168+
169+
await Task.Delay(PollIntervalInSeconds);
170+
}
171+
}
172+
);
173+
174+
private async Task DeregisterMemberAsync()
175+
{
176+
await Retry.Try(DeregisterMemberInner, onError: OnError, onFailed: OnFailed);
177+
178+
static void OnError(int attempt, Exception exception) =>
179+
Logger.LogWarning(exception, "Failed to deregister service");
180+
181+
static void OnFailed(Exception exception) => Logger.LogError(exception, "Failed to deregister service");
182+
}
183+
184+
private async Task DeregisterMemberInner()
185+
{
186+
Logger.LogInformation(
187+
"[Cluster][AzureContainerAppsProvider] Unregistering member {ReplicaName} on {IpAddress}",
188+
_replicaName,
189+
_address);
190+
191+
await _client.ClearMemberTags(_resourceGroup, _containerAppName, _memberId);
192+
}
193+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using System.Net;
4+
using System.Net.NetworkInformation;
5+
using System.Net.Sockets;
6+
7+
namespace Proto.Cluster.AzureContainerApps;
8+
9+
public static class ConfigUtils
10+
{
11+
internal static IPAddress FindIpAddress(AddressFamily family = AddressFamily.InterNetwork)
12+
{
13+
var addressCandidates = NetworkInterface.GetAllNetworkInterfaces()
14+
.Where(nif => nif.OperationalStatus == OperationalStatus.Up)
15+
.SelectMany(nif => nif.GetIPProperties().UnicastAddresses.Select(a => a.Address))
16+
.Where(addr => addr.AddressFamily == family && !IPAddress.IsLoopback(addr))
17+
.ToList();
18+
19+
return PickSmallestIpAddress(addressCandidates);
20+
}
21+
22+
private static IPAddress PickSmallestIpAddress(IEnumerable<IPAddress> candidates)
23+
{
24+
IPAddress result = null!;
25+
foreach (var addr in candidates)
26+
{
27+
if (CompareIpAddresses(addr, result))
28+
result = addr;
29+
}
30+
return result;
31+
32+
static bool CompareIpAddresses(IPAddress lhs, IPAddress rhs)
33+
{
34+
if (rhs == null)
35+
return true;
36+
37+
var lbytes = lhs.GetAddressBytes();
38+
var rbytes = rhs.GetAddressBytes();
39+
40+
if (lbytes.Length != rbytes.Length) return lbytes.Length < rbytes.Length;
41+
42+
for (var i = 0; i < lbytes.Length; i++)
43+
{
44+
if (lbytes[i] != rbytes[i])
45+
{
46+
return lbytes[i] < rbytes[i];
47+
}
48+
}
49+
return false;
50+
}
51+
}
52+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<LangVersion>10</LangVersion>
5+
<TargetFrameworks>netcoreapp3.1;net6.0;net7.0</TargetFrameworks>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Azure.ResourceManager.AppContainers" Version="1.0.0-beta.1" />
10+
<PackageReference Include="Azure.ResourceManager.Resources" Version="1.3.1" />
11+
</ItemGroup>
12+
13+
<ItemGroup>
14+
<ProjectReference Include="..\src\Proto.Cluster\Proto.Cluster.csproj" />
15+
</ItemGroup>
16+
17+
</Project>

0 commit comments

Comments
 (0)