Skip to content

Commit

Permalink
Sharding add actor registry overloads (#70)
Browse files Browse the repository at this point in the history
* Added `ActorRegistry` overloads for Akka.Cluster.Sharding

* separated `TestHelper` from  Akka.Cluster.Hosting.Tests

* successfully tested new ActorRegistry-enabled methods
  • Loading branch information
Aaronontheweb authored Jun 23, 2022
1 parent 67776ce commit 639b905
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 74 deletions.
113 changes: 113 additions & 0 deletions src/Akka.Cluster.Hosting.Tests/ClusterShardingSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.Hosting;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Hosting.Tests;

public class ClusterShardingSpecs
{
public sealed class MyTopLevelActor : ReceiveActor
{
}

public sealed class MyEntityActor : ReceiveActor
{
public MyEntityActor(string entityId, IActorRef sourceRef)
{
EntityId = entityId;
SourceRef = sourceRef;

Receive<GetId>(g => { Sender.Tell(EntityId); });
Receive<GetSourceRef>(g => Sender.Tell(SourceRef));
}

public string EntityId { get; }

public IActorRef SourceRef { get; }

public sealed class GetId : IWithId
{
public GetId(string id)
{
Id = id;
}

public string Id { get; }
}

public sealed class GetSourceRef : IWithId
{
public GetSourceRef(string id)
{
Id = id;
}

public string Id { get; }
}
}

public interface IWithId
{
string Id { get; }
}

public sealed class Extractor : HashCodeMessageExtractor
{
public Extractor() : base(30)
{
}

public override string EntityId(object message)
{
if (message is IWithId withId)
return withId.Id;
return string.Empty;
}
}

public ClusterShardingSpecs(ITestOutputHelper output)
{
Output = output;
}

public ITestOutputHelper Output { get; }

[Fact]
public async Task Should_use_ActorRegistry_with_ShardRegion()
{
// arrange
using var host = await TestHelper.CreateHost(builder =>
{
builder.WithActors((system, registry) =>
{
var tLevel = system.ActorOf(Props.Create(() => new MyTopLevelActor()), "toplevel");
registry.Register<MyTopLevelActor>(tLevel);
})
.WithShardRegion<MyEntityActor>("entities", (system, registry) =>
{
var tLevel = registry.Get<MyTopLevelActor>();
return s => Props.Create(() => new MyEntityActor(s, tLevel));
}, new Extractor(), new ShardOptions() { Role = "my-host", StateStoreMode = StateStoreMode.DData });
}, new ClusterOptions() { Roles = new[] { "my-host" } }, Output);

var actorSystem = host.Services.GetRequiredService<ActorSystem>();
var actorRegistry = ActorRegistry.For(actorSystem);
var shardRegion = actorRegistry.Get<MyEntityActor>();

// act
var id = await shardRegion.Ask<string>(new MyEntityActor.GetId("foo"), TimeSpan.FromSeconds(3));
var sourceRef =
await shardRegion.Ask<IActorRef>(new MyEntityActor.GetSourceRef("foo"), TimeSpan.FromSeconds(3));

// assert
id.Should().Be("foo");
sourceRef.Should().Be(actorRegistry.Get<MyTopLevelActor>());
}
}
71 changes: 12 additions & 59 deletions src/Akka.Cluster.Hosting.Tests/ClusterSingletonSpecs.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.Hosting;
using Akka.Remote.Hosting;
using Akka.TestKit.Xunit2.Internals;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -25,66 +20,24 @@ public ClusterSingletonSpecs(ITestOutputHelper output)

private class MySingletonActor : ReceiveActor
{
public static Props MyProps => Props.Create(() => new MySingletonActor());
public static Props MyProps => Props.Create(() => new ClusterSingletonSpecs.MySingletonActor());

public MySingletonActor()
{
ReceiveAny(_ => Sender.Tell(_));
}
}

private async Task<IHost> CreateHost(Action<AkkaConfigurationBuilder> specBuilder, ClusterOptions options)
{
var tcs = new TaskCompletionSource();
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var host = new HostBuilder()
.ConfigureServices(collection =>
{
collection.AddAkka("TestSys", (configurationBuilder, provider) =>
{
configurationBuilder
.WithRemoting("localhost", 0)
.WithClustering(options)
.WithActors((system, registry) =>
{
var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test");
logger.Tell(new InitializeLogger(system.EventStream));
})
.WithActors(async (system, registry) =>
{
var cluster = Cluster.Get(system);
cluster.RegisterOnMemberUp(() =>
{
tcs.SetResult();
});
if (options.SeedNodes == null || options.SeedNodes.Length == 0)
{
var myAddress = cluster.SelfAddress;
await cluster.JoinAsync(myAddress); // force system to wait until we're up
}
});
specBuilder(configurationBuilder);
});
}).Build();

await host.StartAsync(cancellationTokenSource.Token);
await (tcs.Task.WaitAsync(cancellationTokenSource.Token));

return host;
}

[Fact]
public async Task Should_launch_ClusterSingletonAndProxy()
{
// arrange
using var host = await CreateHost(
builder => { builder.WithSingleton<MySingletonActor>("my-singleton", MySingletonActor.MyProps); },
new ClusterOptions(){ Roles = new[] { "my-host"}});
using var host = await TestHelper.CreateHost(
builder => { builder.WithSingleton<ClusterSingletonSpecs.MySingletonActor>("my-singleton", MySingletonActor.MyProps); },
new ClusterOptions(){ Roles = new[] { "my-host"}}, Output);

var registry = host.Services.GetRequiredService<ActorRegistry>();
var singletonProxy = registry.Get<MySingletonActor>();
var singletonProxy = registry.Get<ClusterSingletonSpecs.MySingletonActor>();

// act

Expand All @@ -103,19 +56,19 @@ public async Task Should_launch_ClusterSingleton_and_Proxy_separately()
// arrange

var singletonOptions = new ClusterSingletonOptions() { Role = "my-host" };
using var singletonHost = await CreateHost(
builder => { builder.WithSingleton<MySingletonActor>("my-singleton", MySingletonActor.MyProps, singletonOptions, createProxyToo:false); },
new ClusterOptions(){ Roles = new[] { "my-host"}});
using var singletonHost = await TestHelper.CreateHost(
builder => { builder.WithSingleton<ClusterSingletonSpecs.MySingletonActor>("my-singleton", MySingletonActor.MyProps, singletonOptions, createProxyToo:false); },
new ClusterOptions(){ Roles = new[] { "my-host"}}, Output);

var singletonSystem = singletonHost.Services.GetRequiredService<ActorSystem>();
var address = Cluster.Get(singletonSystem).SelfAddress;

using var singletonProxyHost = await CreateHost(
builder => { builder.WithSingletonProxy<MySingletonActor>("my-singleton", singletonOptions); },
new ClusterOptions(){ Roles = new[] { "proxy" }, SeedNodes = new Address[]{ address } });
using var singletonProxyHost = await TestHelper.CreateHost(
builder => { builder.WithSingletonProxy<ClusterSingletonSpecs.MySingletonActor>("my-singleton", singletonOptions); },
new ClusterOptions(){ Roles = new[] { "proxy" }, SeedNodes = new Address[]{ address } }, Output);

var registry = singletonProxyHost.Services.GetRequiredService<ActorRegistry>();
var singletonProxy = registry.Get<MySingletonActor>();
var singletonProxy = registry.Get<ClusterSingletonSpecs.MySingletonActor>();

// act

Expand Down
58 changes: 58 additions & 0 deletions src/Akka.Cluster.Hosting.Tests/TestHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.Hosting;
using Akka.Remote.Hosting;
using Akka.TestKit.Xunit2.Internals;
using Microsoft.Extensions.Hosting;
using Xunit.Abstractions;

namespace Akka.Cluster.Hosting.Tests;

public static class TestHelper
{

public static async Task<IHost> CreateHost(Action<AkkaConfigurationBuilder> specBuilder, ClusterOptions options, ITestOutputHelper output)
{
var tcs = new TaskCompletionSource();
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var host = new HostBuilder()
.ConfigureServices(collection =>
{
collection.AddAkka("TestSys", (configurationBuilder, provider) =>
{
configurationBuilder
.WithRemoting("localhost", 0)
.WithClustering(options)
.WithActors((system, registry) =>
{
var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(output)), "log-test");
logger.Tell(new InitializeLogger(system.EventStream));
})
.WithActors(async (system, registry) =>
{
var cluster = Cluster.Get(system);
cluster.RegisterOnMemberUp(() =>
{
tcs.SetResult();
});
if (options.SeedNodes == null || options.SeedNodes.Length == 0)
{
var myAddress = cluster.SelfAddress;
await cluster.JoinAsync(myAddress); // force system to wait until we're up
}
});
specBuilder(configurationBuilder);
});
}).Build();

await host.StartAsync(cancellationTokenSource.Token);
await (tcs.Task.WaitAsync(cancellationTokenSource.Token));

return host;
}
}
59 changes: 44 additions & 15 deletions src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,8 @@ public static AkkaConfigurationBuilder WithShardRegion<TKey>(this AkkaConfigurat
.WithRole(shardOptions.Role)
.WithRememberEntities(shardOptions.RememberEntities)
.WithStateStoreMode(shardOptions.StateStoreMode), messageExtractor);

// TODO: should throw here if duplicate key used

registry.TryRegister<TKey>(shardRegion);

registry.Register<TKey>(shardRegion);
});
}

Expand Down Expand Up @@ -179,9 +177,44 @@ public static AkkaConfigurationBuilder WithShardRegion<TKey>(this AkkaConfigurat
.WithRememberEntities(shardOptions.RememberEntities)
.WithStateStoreMode(shardOptions.StateStoreMode), extractEntityId, extractShardId);

// TODO: should throw here if duplicate key used
registry.Register<TKey>(shardRegion);
});
}

public static AkkaConfigurationBuilder WithShardRegion<TKey>(this AkkaConfigurationBuilder builder,
string typeName,
Func<ActorSystem, IActorRegistry, Func<string, Props>> compositePropsFactory, IMessageExtractor messageExtractor, ShardOptions shardOptions)
{
return builder.WithActors(async (system, registry) =>
{
var entityPropsFactory = compositePropsFactory(system, registry);

var shardRegion = await ClusterSharding.Get(system).StartAsync(typeName, entityPropsFactory,
ClusterShardingSettings.Create(system)
.WithRole(shardOptions.Role)
.WithRememberEntities(shardOptions.RememberEntities)
.WithStateStoreMode(shardOptions.StateStoreMode), messageExtractor);

registry.Register<TKey>(shardRegion);
});
}

public static AkkaConfigurationBuilder WithShardRegion<TKey>(this AkkaConfigurationBuilder builder,
string typeName,
Func<ActorSystem, IActorRegistry, Func<string, Props>> compositePropsFactory, ExtractEntityId extractEntityId,
ExtractShardId extractShardId, ShardOptions shardOptions)
{
return builder.WithActors(async (system, registry) =>
{
var entityPropsFactory = compositePropsFactory(system, registry);

var shardRegion = await ClusterSharding.Get(system).StartAsync(typeName, entityPropsFactory,
ClusterShardingSettings.Create(system)
.WithRole(shardOptions.Role)
.WithRememberEntities(shardOptions.RememberEntities)
.WithStateStoreMode(shardOptions.StateStoreMode), extractEntityId, extractShardId);

registry.TryRegister<TKey>(shardRegion);
registry.Register<TKey>(shardRegion);
});
}

Expand Down Expand Up @@ -210,10 +243,8 @@ public static AkkaConfigurationBuilder WithShardRegionProxy<TKey>(this AkkaConfi
{
var shardRegionProxy = await ClusterSharding.Get(system)
.StartProxyAsync(typeName, roleName, extractEntityId, extractShardId);

// TODO: should throw here if duplicate key used

registry.TryRegister<TKey>(shardRegionProxy);

registry.Register<TKey>(shardRegionProxy);
});
}

Expand All @@ -237,10 +268,8 @@ public static AkkaConfigurationBuilder WithShardRegionProxy<TKey>(this AkkaConfi
{
var shardRegionProxy = await ClusterSharding.Get(system)
.StartProxyAsync(typeName, roleName, messageExtractor);

// TODO: should throw here if duplicate key used

registry.TryRegister<TKey>(shardRegionProxy);

registry.Register<TKey>(shardRegionProxy);
});
}

Expand All @@ -267,7 +296,7 @@ public static AkkaConfigurationBuilder WithDistributedPubSub(this AkkaConfigurat
{
// force the initialization
var mediator = DistributedPubSub.Get(system).Mediator;
registry.TryRegister<DistributedPubSub>(mediator);
registry.Register<DistributedPubSub>(mediator);
});
}

Expand Down

0 comments on commit 639b905

Please sign in to comment.