diff --git a/Directory.Packages.props b/Directory.Packages.props
index 3fa9e0e3d..d704be4ce 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -29,6 +29,7 @@
+
diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs
index c0021f024..1c32dab48 100644
--- a/src/StackExchange.Redis/ConfigurationOptions.cs
+++ b/src/StackExchange.Redis/ConfigurationOptions.cs
@@ -40,6 +40,12 @@ public static int ParseInt32(string key, string value, int minValue = int.MinVal
return tmp;
}
+ public static float ParseSingle(string key, string value)
+ {
+ if (!Format.TryParseDouble(value, out double tmp)) throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' requires a numeric value; the value '{value}' is not recognised.");
+ return (float)tmp;
+ }
+
internal static bool ParseBoolean(string key, string value)
{
if (!Format.TryParseBoolean(value, out bool tmp)) throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' requires a boolean value; the value '{value}' is not recognised.");
@@ -940,9 +946,9 @@ public string ToString(bool includePassword)
};
}
- private static void Append(StringBuilder sb, object value)
+ private static void Append(StringBuilder sb, object? value)
{
- if (value == null) return;
+ if (value is null) return;
string s = Format.ToString(value);
if (!string.IsNullOrWhiteSpace(s))
{
@@ -953,7 +959,8 @@ private static void Append(StringBuilder sb, object value)
private static void Append(StringBuilder sb, string prefix, object? value)
{
- string? s = value?.ToString();
+ if (value is null) return;
+ string? s = value.ToString();
if (!string.IsNullOrWhiteSpace(s))
{
if (sb.Length != 0) sb.Append(',');
diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs
index e19de6d52..cd08a2cb3 100644
--- a/src/StackExchange.Redis/ConnectionMultiplexer.cs
+++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs
@@ -1036,7 +1036,7 @@ public void UnRoot(int token)
}
}
- private void OnHeartbeat()
+ internal void OnHeartbeat()
{
try
{
@@ -1129,7 +1129,7 @@ public IDatabase GetDatabase(int db = -1, object? asyncState = null)
}
// DB zero is stored separately, since 0-only is a massively common use-case
- private const int MaxCachedDatabaseInstance = 16; // 17 items - [0,16]
+ internal const int MaxCachedDatabaseInstance = 16; // 17 items - [0,16]
// Side note: "databases 16" is the default in redis.conf; happy to store one extra to get nice alignment etc
private IDatabase? dbCacheZero;
private IDatabase[]? dbCacheLow;
@@ -1282,6 +1282,8 @@ public long OperationCount
}
}
+ internal uint LatencyTicks { get; private set; } = uint.MaxValue;
+
// note that the RedisChannel->byte[] converter is always direct, so this is not an alloc
// (we deal with channels far less frequently, so pay the encoding cost up-front)
internal byte[] ChannelPrefix => ((byte[]?)RawConfig.ChannelPrefix) ?? [];
@@ -2359,5 +2361,29 @@ private Task[] QuitAllServers()
long? IInternalConnectionMultiplexer.GetConnectionId(EndPoint endpoint, ConnectionType type)
=> GetServerEndPoint(endpoint)?.GetBridge(type)?.ConnectionId;
+
+ internal uint UpdateLatency()
+ {
+ var snapshot = GetServerSnapshot();
+ uint max = uint.MaxValue;
+ foreach (var server in snapshot)
+ {
+ if (server.IsConnected)
+ {
+ var latency = server.LatencyTicks;
+ if (max is uint.MaxValue || latency > max)
+ {
+ max = latency;
+ }
+ }
+ }
+
+ if (max != uint.MaxValue)
+ {
+ LatencyTicks = max;
+ }
+
+ return LatencyTicks;
+ }
}
}
diff --git a/src/StackExchange.Redis/Interfaces/IConnectionGroup.cs b/src/StackExchange.Redis/Interfaces/IConnectionGroup.cs
new file mode 100644
index 000000000..a066e5a32
--- /dev/null
+++ b/src/StackExchange.Redis/Interfaces/IConnectionGroup.cs
@@ -0,0 +1,106 @@
+using System;
+using System.IO;
+using System.Text;
+using System.Threading.Tasks;
+
+// ReSharper disable once CheckNamespace
+namespace StackExchange.Redis;
+
+///
+/// A group of connections to redis servers, that manages connections to multiple
+/// servers, routing traffic based on the availability of the servers and their
+/// relative .
+///
+public interface IConnectionGroup : IConnectionMultiplexer
+{
+ ///
+ /// A change occured to one of the connection groups.
+ ///
+ event EventHandler? ConnectionChanged;
+
+ ///
+ /// Adds a new member to the group.
+ ///
+ Task AddAsync(ConnectionGroupMember group, TextWriter? log = null);
+
+ ///
+ /// Removes a member from the group.
+ ///
+ bool Remove(ConnectionGroupMember group);
+
+ ///
+ /// Get the members of the group.
+ ///
+ ReadOnlySpan GetMembers();
+}
+
+///
+/// Represents a change to a connection group.
+///
+public class GroupConnectionChangedEventArgs(GroupConnectionChangedEventArgs.ChangeType type, ConnectionGroupMember group, ConnectionGroupMember? previousGroup = null) : EventArgs, ICompletable
+{
+ ///
+ /// The group relating to the change. For , this is the new group.
+ ///
+ public ConnectionGroupMember Group => group;
+
+ ///
+ /// The previous group relating to the change, if applicable.
+ ///
+ public ConnectionGroupMember? PreviousGroup => previousGroup;
+
+ ///
+ /// The type of change that occurred.
+ ///
+ public ChangeType Type => type;
+
+ private EventHandler? _handler;
+ private object? _sender;
+
+ ///
+ /// The type of change that occurred.
+ ///
+ public enum ChangeType
+ {
+ ///
+ /// Unused.
+ ///
+ Unknown = 0,
+
+ ///
+ /// A new connection group was added.
+ ///
+ Added = 1,
+
+ ///
+ /// A connection group was removed.
+ ///
+ Removed = 2,
+
+ ///
+ /// A connection group became disconnected.
+ ///
+ Disconnected = 3,
+
+ ///
+ /// A connection group became reconnected.
+ ///
+ Reconnected = 4,
+
+ ///
+ /// The active connection group changed, changing how traffic is routed.
+ ///
+ ActiveChanged = 5,
+ }
+
+ internal void CompleteAsWorker(EventHandler handler, object sender)
+ {
+ _handler = handler;
+ _sender = sender;
+ ConnectionMultiplexer.CompleteAsWorker(this);
+ }
+
+ void ICompletable.AppendStormLog(StringBuilder sb) { }
+
+ bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(_handler, _sender!, this, isAsync);
+}
diff --git a/src/StackExchange.Redis/MultiGroupDatabase.Async.cs b/src/StackExchange.Redis/MultiGroupDatabase.Async.cs
new file mode 100644
index 000000000..03706f355
--- /dev/null
+++ b/src/StackExchange.Redis/MultiGroupDatabase.Async.cs
@@ -0,0 +1,66 @@
+using System;
+using System.Net;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis;
+
+internal sealed partial class MultiGroupDatabase
+{
+ // Async methods - Core operations
+ public Task DebugObjectAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().DebugObjectAsync(key, flags);
+
+ public Task IdentifyEndpointAsync(RedisKey key = default, CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().IdentifyEndpointAsync(key, flags);
+
+ public Task KeyMigrateAsync(RedisKey key, EndPoint toServer, int toDatabase = 0, int timeoutMilliseconds = 0, MigrateOptions migrateOptions = MigrateOptions.None, CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().KeyMigrateAsync(key, toServer, toDatabase, timeoutMilliseconds, migrateOptions, flags);
+
+ public Task PingAsync(CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().PingAsync(flags);
+
+ public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
+ => GetActiveDatabase().PublishAsync(channel, message, flags);
+
+ public Task ExecuteAsync(string command, params object[] args)
+ => GetActiveDatabase().ExecuteAsync(command, args);
+
+ public Task ExecuteAsync(string command, System.Collections.Generic.ICollection