Skip to content

Commit

Permalink
Add support for listening to Azure Maintenance Events (#1876)
Browse files Browse the repository at this point in the history
Adding an automatic subscription to the AzureRedisEvents pubsub channel for Azure caches. This channel notifies clients of upcoming maintenance and failover events.

By exposing these events, users will be able to know about maintenance ahead of time, and can implement their own logic (e.g. diverting traffic from the cache to another database for the duration of the maintenance event) in response, with the goal of minimizing downtime and disrupted connections. We also automatically refresh our view of the topology of the cluster in response to certain events.

Here are some of the possible notifications:

```
// Indicates that a maintenance event is scheduled. May be several minutes from now
NodeMaintenanceScheduled,

// This event gets fired ~20s before maintenance begins
NodeMaintenanceStarting,

// This event gets fired when maintenance is imminent (<5s)
NodeMaintenanceStart,

// Indicates that the node maintenance operation is over
NodeMaintenanceEnded,

// Indicates that a replica has been promoted to primary
NodeMaintenanceFailover
```

Co-authored-by: Michelle Soedal <[email protected]>
Co-authored-by: Nick Craver <[email protected]>
  • Loading branch information
3 people committed Oct 7, 2021
1 parent 082c69b commit ef1178e
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 17 deletions.
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- add: `Condition` API (transactions) now supports `StreamLengthEqual` and variants (#1807 via AlphaGremlin)
- Add support for count argument to `ListLeftPop`, `ListLeftPopAsync`, `ListRightPop`, and `ListRightPopAsync` (#1850 via jjfmarket)
- fix potential task/thread exhaustion from the backlog processor (#1854 via mgravell)
- add support for listening to Azure Maintenance Events (#1876 via amsoedal)
- add `StringGetDelete`/`StringGetDeleteAsync` API for Redis `GETDEL` command(#1840 via WeihanLi)

## 2.2.62
Expand Down
25 changes: 16 additions & 9 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -790,22 +790,29 @@ private void DoParse(string configuration, bool ignoreUnknown)
// Microsoft Azure team wants abortConnect=false by default
private bool GetDefaultAbortOnConnectFailSetting() => !IsAzureEndpoint();

private bool IsAzureEndpoint()
/// <summary>
/// List of domains known to be Azure Redis, so we can light up some helpful functionality
/// for minimizing downtime during maintenance events and such.
/// </summary>
private static readonly List<string> azureRedisDomains = new List<string> {
".redis.cache.windows.net",
".redis.cache.chinacloudapi.cn",
".redis.cache.usgovcloudapi.net",
".redis.cache.cloudapi.de",
".redisenterprise.cache.azure.net",
};

internal bool IsAzureEndpoint()
{
foreach (var ep in EndPoints)
{
if (ep is DnsEndPoint dnsEp)
{
int firstDot = dnsEp.Host.IndexOf('.');
if (firstDot >= 0)
foreach (var host in azureRedisDomains)
{
switch (dnsEp.Host.Substring(firstDot).ToLowerInvariant())
if (dnsEp.Host.EndsWith(host, StringComparison.InvariantCultureIgnoreCase))
{
case ".redis.cache.windows.net":
case ".redis.cache.chinacloudapi.cn":
case ".redis.cache.usgovcloudapi.net":
case ".redis.cache.cloudapi.de":
return true;
return true;
}
}
}
Expand Down
30 changes: 23 additions & 7 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Reflection;
using System.IO.Compression;
using System.Runtime.CompilerServices;
using StackExchange.Redis.Profiling;
using Pipelines.Sockets.Unofficial;
using System.ComponentModel;
using System.Runtime.InteropServices;
using StackExchange.Redis.Maintenance;
using StackExchange.Redis.Profiling;

namespace StackExchange.Redis
{
Expand Down Expand Up @@ -142,6 +143,7 @@ public ServerCounters GetCounters()
public string ClientName => RawConfig.ClientName ?? GetDefaultClientName();

private static string defaultClientName;

private static string GetDefaultClientName()
{
return defaultClientName ??= TryGetAzureRoleInstanceIdNoThrow()
Expand Down Expand Up @@ -571,6 +573,11 @@ private static void WriteNormalizingLineEndings(string source, StreamWriter writ
/// </summary>
public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast;

/// <summary>
/// Raised when server indicates a maintenance event is going to happen.
/// </summary>
public event EventHandler<ServerMaintenanceEvent> ServerMaintenanceEvent;

/// <summary>
/// Gets the synchronous timeout associated with the connections
/// </summary>
Expand All @@ -591,6 +598,9 @@ public EndPoint[] GetEndPoints(bool configuredOnly = false)
return _serverSnapshot.GetEndPoints();
}

internal void InvokeServerMaintenanceEvent(ServerMaintenanceEvent e)
=> ServerMaintenanceEvent?.Invoke(this, e);

internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved)
{
return ServerSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved);
Expand Down Expand Up @@ -892,6 +902,9 @@ private static async Task<ConnectionMultiplexer> ConnectImplAsync(ConfigurationO
// Initialize the Sentinel handlers
muxer.InitializeSentinel(logProxy);
}

await Maintenance.ServerMaintenanceEvent.AddListenersAsync(muxer, logProxy).ForAwait();

return muxer;
}
finally
Expand Down Expand Up @@ -1187,6 +1200,9 @@ private static ConnectionMultiplexer ConnectImpl(ConfigurationOptions configurat
// Initialize the Sentinel handlers
muxer.InitializeSentinel(logProxy);
}

Maintenance.ServerMaintenanceEvent.AddListenersAsync(muxer, logProxy).Wait(muxer.SyncConnectTimeout(true));

return muxer;
}
finally
Expand Down Expand Up @@ -2863,7 +2879,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
}
}
// snapshot these so that we can recycle the box
var val = source.GetResult(out var ex, canRecycle: true); // now that we aren't locking it...
var val = source.GetResult(out var ex, canRecycle: true); // now that we aren't locking it...
if (ex != null) throw ex;
Trace(message + " received " + val);
return val;
Expand Down
6 changes: 5 additions & 1 deletion src/StackExchange.Redis/Format.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
using System.Globalization;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;

namespace StackExchange.Redis
{
internal static class Format
{
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER
public static int ParseInt32(ReadOnlySpan<char> s) => int.Parse(s, NumberStyles.Integer, NumberFormatInfo.InvariantInfo);
public static bool TryParseInt32(ReadOnlySpan<char> s, out int value) => int.TryParse(s, NumberStyles.Integer, NumberFormatInfo.InvariantInfo, out value);
#endif

public static int ParseInt32(string s) => int.Parse(s, NumberStyles.Integer, NumberFormatInfo.InvariantInfo);

public static long ParseInt64(string s) => long.Parse(s, NumberStyles.Integer, NumberFormatInfo.InvariantInfo);
Expand Down
194 changes: 194 additions & 0 deletions src/StackExchange.Redis/Maintenance/AzureMaintenanceEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
using System;
using System.Globalization;
using System.Net;
using System.Threading.Tasks;
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER
using System.Buffers.Text;
#endif

namespace StackExchange.Redis.Maintenance
{
/// <summary>
/// Azure node maintenance event. For more information, please see: https://aka.ms/redis/maintenanceevents
/// </summary>
public sealed class AzureMaintenanceEvent : ServerMaintenanceEvent
{
private const string PubSubChannelName = "AzureRedisEvents";

internal AzureMaintenanceEvent(string azureEvent)
{
if (azureEvent == null)
{
return;
}

// The message consists of key-value pairs delimited by pipes. For example, a message might look like:
// NotificationType|NodeMaintenanceStarting|StartTimeUtc|2021-09-23T12:34:19|IsReplica|False|IpAddress|13.67.42.199|SSLPort|15001|NonSSLPort|13001
var message = azureEvent.AsSpan();
try
{
while (message.Length > 0)
{
if (message[0] == '|')
{
message = message.Slice(1);
continue;
}

// Grab the next pair
var nextDelimiter = message.IndexOf('|');
if (nextDelimiter < 0)
{
// The rest of the message is not a key-value pair and is therefore malformed. Stop processing it.
break;
}

if (nextDelimiter == message.Length - 1)
{
// The message is missing the value for this key-value pair. It is malformed so we stop processing it.
break;
}

var key = message.Slice(0, nextDelimiter);
message = message.Slice(key.Length + 1);

var valueEnd = message.IndexOf('|');
var value = valueEnd > -1 ? message.Slice(0, valueEnd) : message;
message = message.Slice(value.Length);

if (key.Length > 0 && value.Length > 0)
{
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER
switch (key)
{
case var _ when key.SequenceEqual(nameof(NotificationType).AsSpan()):
NotificationTypeString = value.ToString();
NotificationType = ParseNotificationType(NotificationTypeString);
break;
case var _ when key.SequenceEqual("StartTimeInUTC".AsSpan()) && DateTime.TryParseExact(value, "s", CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal, out DateTime startTime):
StartTimeUtc = DateTime.SpecifyKind(startTime, DateTimeKind.Utc);
break;
case var _ when key.SequenceEqual(nameof(IsReplica).AsSpan()) && bool.TryParse(value, out var isReplica):
IsReplica = isReplica;
break;
case var _ when key.SequenceEqual(nameof(IPAddress).AsSpan()) && IPAddress.TryParse(value, out var ipAddress):
IPAddress = ipAddress;
break;
case var _ when key.SequenceEqual("SSLPort".AsSpan()) && Format.TryParseInt32(value, out var port):
SslPort = port;
break;
case var _ when key.SequenceEqual("NonSSLPort".AsSpan()) && Format.TryParseInt32(value, out var nonsslport):
NonSslPort = nonsslport;
break;
default:
break;
}
#else
switch (key)
{
case var _ when key.SequenceEqual(nameof(NotificationType).AsSpan()):
NotificationTypeString = value.ToString();
NotificationType = ParseNotificationType(NotificationTypeString);
break;
case var _ when key.SequenceEqual("StartTimeInUTC".AsSpan()) && DateTime.TryParseExact(value.ToString(), "s", CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal, out DateTime startTime):
StartTimeUtc = DateTime.SpecifyKind(startTime, DateTimeKind.Utc);
break;
case var _ when key.SequenceEqual(nameof(IsReplica).AsSpan()) && bool.TryParse(value.ToString(), out var isReplica):
IsReplica = isReplica;
break;
case var _ when key.SequenceEqual(nameof(IPAddress).AsSpan()) && IPAddress.TryParse(value.ToString(), out var ipAddress):
IPAddress = ipAddress;
break;
case var _ when key.SequenceEqual("SSLPort".AsSpan()) && Format.TryParseInt32(value.ToString(), out var port):
SslPort = port;
break;
case var _ when key.SequenceEqual("NonSSLPort".AsSpan()) && Format.TryParseInt32(value.ToString(), out var nonsslport):
NonSslPort = nonsslport;
break;
default:
break;
}
#endif
}
}
}
catch
{
// TODO: Append to rolling debug log when it's present
}
}

internal async static Task AddListenerAsync(ConnectionMultiplexer multiplexer, ConnectionMultiplexer.LogProxy logProxy)
{
try
{
var sub = multiplexer.GetSubscriber();
if (sub == null)
{
logProxy?.WriteLine("Failed to GetSubscriber for AzureRedisEvents");
return;
}

await sub.SubscribeAsync(PubSubChannelName, async (channel, message) =>
{
var newMessage = new AzureMaintenanceEvent(message);
multiplexer.InvokeServerMaintenanceEvent(newMessage);
switch (newMessage.NotificationType)
{
case AzureNotificationType.NodeMaintenanceEnded:
case AzureNotificationType.NodeMaintenanceFailoverComplete:
await multiplexer.ReconfigureAsync(first: false, reconfigureAll: true, log: logProxy, blame: null, cause: $"Azure Event: {newMessage.NotificationType}").ForAwait();
break;
}
}).ForAwait();
}
catch (Exception e)
{
logProxy?.WriteLine($"Encountered exception: {e}");
}
}

/// <summary>
/// Indicates the type of event (raw string form).
/// </summary>
public string NotificationTypeString { get; }

/// <summary>
/// The parsed version of <see cref="NotificationTypeString"/> for easier consumption.
/// </summary>
public AzureNotificationType NotificationType { get; }

/// <summary>
/// Indicates if the event is for a replica node.
/// </summary>
public bool IsReplica { get; }

/// <summary>
/// IPAddress of the node event is intended for.
/// </summary>
public IPAddress IPAddress { get; }

/// <summary>
/// SSL Port.
/// </summary>
public int SslPort { get; }

/// <summary>
/// Non-SSL port.
/// </summary>
public int NonSslPort { get; }

private AzureNotificationType ParseNotificationType(string typeString) => typeString switch
{
"NodeMaintenanceScheduled" => AzureNotificationType.NodeMaintenanceScheduled,
"NodeMaintenanceStarting" => AzureNotificationType.NodeMaintenanceStarting,
"NodeMaintenanceStart" => AzureNotificationType.NodeMaintenanceStart,
"NodeMaintenanceEnded" => AzureNotificationType.NodeMaintenanceEnded,
// This is temporary until server changes go into effect - to be removed in later versions
"NodeMaintenanceFailover" => AzureNotificationType.NodeMaintenanceFailoverComplete,
"NodeMaintenanceFailoverComplete" => AzureNotificationType.NodeMaintenanceFailoverComplete,
_ => AzureNotificationType.Unknown,
};
}
}
38 changes: 38 additions & 0 deletions src/StackExchange.Redis/Maintenance/AzureNotificationType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace StackExchange.Redis.Maintenance
{
/// <summary>
/// The types of notifications that Azure is sending for events happening.
/// </summary>
public enum AzureNotificationType
{
/// <summary>
/// Unrecognized event type, likely needs a library update to recognize new events.
/// </summary>
Unknown,

/// <summary>
/// Indicates that a maintenance event is scheduled. May be several minutes from now.
/// </summary>
NodeMaintenanceScheduled,

/// <summary>
/// This event gets fired ~20s before maintenance begins.
/// </summary>
NodeMaintenanceStarting,

/// <summary>
/// This event gets fired when maintenance is imminent (&lt;5s).
/// </summary>
NodeMaintenanceStart,

/// <summary>
/// Indicates that the node maintenance operation is over.
/// </summary>
NodeMaintenanceEnded,

/// <summary>
/// Indicates that a replica has been promoted to primary.
/// </summary>
NodeMaintenanceFailoverComplete,
}
}
Loading

0 comments on commit ef1178e

Please sign in to comment.