From ebf1afa19b6a557eddde0ab55cca78d56ff39387 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Wed, 25 May 2022 10:43:33 +0200 Subject: [PATCH] System props (#1601) * allow configure of system props * fix log messages --- src/Proto.Actor/ActorSystem.cs | 6 +- .../Configuration/ActorSystemConfig.cs | 15 + .../Context/ActorLoggingContext.cs | 13 +- src/Proto.Actor/Context/RootContext.cs | 4 +- src/Proto.Actor/Context/SystemContext.cs | 33 ++ src/Proto.Actor/{ => Props}/Props.cs | 308 +++++++++--------- .../Cache/ClusterCacheInvalidation.cs | 2 +- src/Proto.Cluster/Cluster.cs | 2 +- src/Proto.Cluster/Gossip/Gossiper.cs | 4 +- .../Identity/IdentityStorageLookup.cs | 2 +- .../Partition/PartitionManager.cs | 9 +- .../PartitionActivatorManager.cs | 4 +- src/Proto.Cluster/PubSub/PubSubManager.cs | 2 +- .../Seed/SeedNodeClusterProvider.cs | 4 +- src/Proto.Remote/Endpoints/EndpointManager.cs | 5 +- 15 files changed, 235 insertions(+), 178 deletions(-) create mode 100644 src/Proto.Actor/Context/SystemContext.cs rename src/Proto.Actor/{ => Props}/Props.cs (97%) diff --git a/src/Proto.Actor/ActorSystem.cs b/src/Proto.Actor/ActorSystem.cs index a3a5f634b9..b9b545e0ad 100644 --- a/src/Proto.Actor/ActorSystem.cs +++ b/src/Proto.Actor/ActorSystem.cs @@ -19,7 +19,7 @@ namespace Proto; [PublicAPI] public sealed class ActorSystem : IAsyncDisposable { - private static readonly ILogger Logger = Log.CreateLogger(); + private readonly ILogger _logger = Log.CreateLogger(); public const string NoHost = "nonhost"; public const string Client = "$client"; private string _host = NoHost; @@ -101,7 +101,7 @@ public Task ShutdownAsync(string reason="") { try { - Logger.LogInformation("Shutting down actor system {Id}", Id); + _logger.LogInformation("Shutting down actor system {Id}", Id); Stopper.Stop(reason); } catch @@ -132,6 +132,8 @@ public RootContext NewRoot(MessageHeader? headers = null, params Func (_host, _port); public Props ConfigureProps(Props props) => Config.ConfigureProps(props); + + public Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props); public async ValueTask DisposeAsync() { diff --git a/src/Proto.Actor/Configuration/ActorSystemConfig.cs b/src/Proto.Actor/Configuration/ActorSystemConfig.cs index a53b432cc8..f82f9a1e5e 100644 --- a/src/Proto.Actor/Configuration/ActorSystemConfig.cs +++ b/src/Proto.Actor/Configuration/ActorSystemConfig.cs @@ -7,6 +7,7 @@ using System.Diagnostics; using JetBrains.Annotations; using Microsoft.Extensions.Logging; +using Proto.Context; // ReSharper disable once CheckNamespace namespace Proto; @@ -47,6 +48,18 @@ public record ActorSystemConfig /// All props are translated via this function /// public Func ConfigureProps { get; init; } = props => props; + + /// + /// Allows ActorSystem-wide augmentation of system Props + /// All system props are translated via this function + /// + public Func ConfigureSystemProps { get; init; } = (_,props) => { + var logger = Log.CreateLogger("SystemActors"); + return props + .WithDeadlineDecorator(TimeSpan.FromSeconds(1), logger) + .WithLoggingContextDecorator(logger, LogLevel.None, LogLevel.Information, LogLevel.Error ) + .WithGuardianSupervisorStrategy(Supervision.AlwaysRestartStrategy); + }; /// /// Enables SharedFutures @@ -106,6 +119,8 @@ public ActorSystemConfig WithDeadLetterThrottleCount(int deadLetterThrottleCount public ActorSystemConfig WithDiagnosticsSerializer(Func serializer) => this with {DiagnosticsSerializer = serializer}; public ActorSystemConfig WithConfigureProps(Func configureProps) => this with {ConfigureProps = configureProps}; + + public ActorSystemConfig WithConfigureSystemProps(Func configureSystemProps) => this with {ConfigureSystemProps = configureSystemProps}; public ActorSystemConfig WithThreadPoolStatsTimeout(TimeSpan threadPoolStatsTimeout) => this with {ThreadPoolStatsTimeout = threadPoolStatsTimeout}; diff --git a/src/Proto.Actor/Context/ActorLoggingContext.cs b/src/Proto.Actor/Context/ActorLoggingContext.cs index 066364a478..3e55187312 100644 --- a/src/Proto.Actor/Context/ActorLoggingContext.cs +++ b/src/Proto.Actor/Context/ActorLoggingContext.cs @@ -88,17 +88,24 @@ public override async Task Receive(MessageEnvelope envelope) public override void ReenterAfter(Task target, Func, Task> action) { + if (_logger.IsEnabled(_logLevel)) + { + _logger.Log(_logLevel, "Actor {Self} {ActorType} ReenterAfter {Action}", Self, ActorType, action.Method.Name); + } base.ReenterAfter(target, action); } public override void ReenterAfter(Task target, Action action) { + if (_logger.IsEnabled(_logLevel)) + { + _logger.Log(_logLevel, "Actor {Self} {ActorType} ReenterAfter {Action}", Self, ActorType, action.Method.Name); + } base.ReenterAfter(target, action); } public override async Task RequestAsync(PID target, object message, CancellationToken cancellationToken) { - T response; if (_logger.IsEnabled(_logLevel)) { _logger.Log(_logLevel, "Actor {Self} {ActorType} Sending ReqeustAsync {MessageType}:{Message} to {Target}", Self, ActorType, @@ -108,7 +115,7 @@ public override async Task RequestAsync(PID target, object message, Cancel try { - response = await base.RequestAsync(target, message, cancellationToken); + var response = await base.RequestAsync(target, message, cancellationToken); if (_logger.IsEnabled(_logLevel)) { @@ -182,8 +189,6 @@ public override void Respond(object message) base.Respond(message); } - - private string ActorType => Actor?.GetType().Name ?? "None"; } \ No newline at end of file diff --git a/src/Proto.Actor/Context/RootContext.cs b/src/Proto.Actor/Context/RootContext.cs index ba3fbdf2b1..267e9f4c6f 100644 --- a/src/Proto.Actor/Context/RootContext.cs +++ b/src/Proto.Actor/Context/RootContext.cs @@ -66,10 +66,12 @@ public PID SpawnNamed(Props props, string name) } catch (Exception x) { - Logger.LogError(x, "RootContext Failed to spawn child actor {Name}", name); + Logger.LogError(x, "RootContext Failed to spawn root level actor {Name}", name); throw; } } + + public object? Message => null; diff --git a/src/Proto.Actor/Context/SystemContext.cs b/src/Proto.Actor/Context/SystemContext.cs new file mode 100644 index 0000000000..467de771df --- /dev/null +++ b/src/Proto.Actor/Context/SystemContext.cs @@ -0,0 +1,33 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using Microsoft.Extensions.Logging; + +namespace Proto; + +public static class SystemContext +{ + private static readonly ILogger Logger = Log.CreateLogger(nameof(SystemContext)); + + public static PID SpawnNamedSystem(this RootContext self, Props props, string name) + { + try + { + var parent = props.GuardianStrategy is not null + ? self.System.Guardians.GetGuardianPid(props.GuardianStrategy) + : null; + + //augment props with system actor specific settings + props = self.System.ConfigureSystemProps(name, props); + return props.Spawn(self.System, name, parent); + } + catch (Exception x) + { + Logger.LogError(x, "SystemContext Failed to spawn system actor {Name}", name); + throw; + } + } +} \ No newline at end of file diff --git a/src/Proto.Actor/Props.cs b/src/Proto.Actor/Props/Props.cs similarity index 97% rename from src/Proto.Actor/Props.cs rename to src/Proto.Actor/Props/Props.cs index 85bc6ee07d..8ec054ec7b 100644 --- a/src/Proto.Actor/Props.cs +++ b/src/Proto.Actor/Props/Props.cs @@ -1,155 +1,155 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2015-2022 Asynkron AB All rights reserved -// -// ----------------------------------------------------------------------- -using System; -using System.Collections.Immutable; -using System.Linq; -using JetBrains.Annotations; -using Proto.Context; -using Proto.Mailbox; - -namespace Proto; - -[PublicAPI] -public sealed record Props -{ - private static IActor NullProducer(ActorSystem _, IContext __) => null!; - public static readonly Props Empty = new(); - - public ProducerWithSystemAndContext Producer { get; init; } = NullProducer; - public MailboxProducer MailboxProducer { get; init; } = () => UnboundedMailbox.Create(); - public ISupervisorStrategy? GuardianStrategy { get; init; } - public ISupervisorStrategy SupervisorStrategy { get; init; } = Supervision.DefaultStrategy; - public IDispatcher Dispatcher { get; init; } = Dispatchers.DefaultDispatcher; - - public ImmutableList> ReceiverMiddleware { get; init; } = - ImmutableList>.Empty; - - public ImmutableList> SenderMiddleware { get; init; } = - ImmutableList>.Empty; - - public Receiver? ReceiverMiddlewareChain { get; init; } - public Sender? SenderMiddlewareChain { get; init; } - - public ImmutableList> ContextDecorator { get; init; } = - ImmutableList>.Empty; - - public ImmutableList> OnInit { get; init; } = ImmutableList>.Empty; - - public Func? ContextDecoratorChain { get; init; } - - public Spawner Spawner { get; init; } = DefaultSpawner; - - private static IContext DefaultContextDecorator(IContext context) => context; - - public static PID DefaultSpawner(ActorSystem system, string name, Props props, PID? parent) - { - //Ordering is important here - //first we create a mailbox and attach it to a process - props = system.ConfigureProps(props); - var mailbox = props.MailboxProducer(); - var dispatcher = props.Dispatcher; - var process = new ActorProcess(system, mailbox); - - //then we register it to the process registry - var (self, absent) = system.ProcessRegistry.TryAdd(name, process); - //if this fails we exit and the process and mailbox is Garbage Collected - if (!absent) throw new ProcessNameExistException(name, self); - - //if successful, we create the actor and attach it to the mailbox - var ctx = ActorContext.Setup(system, props, parent, self, mailbox); - Initialize(props, ctx); - mailbox.RegisterHandlers(ctx, dispatcher); - mailbox.PostSystemMessage(Started.Instance); - - //finally, start the mailbox and make the actor consume messages - mailbox.Start(); - - return self; - } - - private static void Initialize(Props props, ActorContext ctx) - { - foreach (var init in props.OnInit) - { - init(ctx); - } - } - - public Props WithProducer(Producer producer) => - this with {Producer = (_,_) => producer()}; - - public Props WithProducer(ProducerWithSystem producer) => - this with {Producer = (system, _) => producer(system)}; - - public Props WithProducer(ProducerWithSystemAndContext producer) => - this with {Producer = producer}; - - public Props WithDispatcher(IDispatcher dispatcher) => - this with {Dispatcher = dispatcher}; - - public Props WithMailbox(MailboxProducer mailboxProducer) => - this with {MailboxProducer = mailboxProducer}; - - public Props WithContextDecorator(params Func[] contextDecorator) - { - var x = ContextDecorator.AddRange(contextDecorator); - return this with - { - ContextDecorator = x, - ContextDecoratorChain = x - .AsEnumerable() - .Reverse() - .Aggregate( - (Func) DefaultContextDecorator, - (inner, outer) => ctx => outer(inner(ctx)) - ) - }; - } - - public Props WithOnInit(params Action[] callback) => this with - { - OnInit = OnInit.AddRange(callback), - }; - - public Props WithGuardianSupervisorStrategy(ISupervisorStrategy guardianStrategy) => - this with {GuardianStrategy = guardianStrategy}; - - public Props WithChildSupervisorStrategy(ISupervisorStrategy supervisorStrategy) => - this with {SupervisorStrategy = supervisorStrategy}; - - public Props WithReceiverMiddleware(params Func[] middleware) - { - var x = ReceiverMiddleware.AddRange(middleware); - return this with - { - ReceiverMiddleware = x, - ReceiverMiddlewareChain = x.AsEnumerable().Reverse() - .Aggregate((Receiver) Middleware.Receive, (inner, outer) => outer(inner)) - }; - } - - public Props WithSenderMiddleware(params Func[] middleware) - { - var x = SenderMiddleware.AddRange(middleware); - return this with - { - SenderMiddleware = x, - SenderMiddlewareChain = x.AsEnumerable().Reverse() - .Aggregate((Sender) Middleware.Sender, (inner, outer) => outer(inner)) - }; - } - - public Props WithSpawner(Spawner spawner) => - this with {Spawner = spawner}; - - internal PID Spawn(ActorSystem system, string name, PID? parent) => Spawner(system, name, this, parent); - - public static Props FromProducer(Producer producer) => Empty.WithProducer(_ => producer()); - - public static Props FromProducer(ProducerWithSystem producer) => Empty.WithProducer(producer); - - public static Props FromFunc(Receive receive) => FromProducer(() => new FunctionActor(receive)); +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Collections.Immutable; +using System.Linq; +using JetBrains.Annotations; +using Proto.Context; +using Proto.Mailbox; + +namespace Proto; + +[PublicAPI] +public sealed record Props +{ + private static IActor NullProducer(ActorSystem _, IContext __) => null!; + public static readonly Props Empty = new(); + + public ProducerWithSystemAndContext Producer { get; init; } = NullProducer; + public MailboxProducer MailboxProducer { get; init; } = () => UnboundedMailbox.Create(); + public ISupervisorStrategy? GuardianStrategy { get; init; } + public ISupervisorStrategy SupervisorStrategy { get; init; } = Supervision.DefaultStrategy; + public IDispatcher Dispatcher { get; init; } = Dispatchers.DefaultDispatcher; + + public ImmutableList> ReceiverMiddleware { get; init; } = + ImmutableList>.Empty; + + public ImmutableList> SenderMiddleware { get; init; } = + ImmutableList>.Empty; + + public Receiver? ReceiverMiddlewareChain { get; init; } + public Sender? SenderMiddlewareChain { get; init; } + + public ImmutableList> ContextDecorator { get; init; } = + ImmutableList>.Empty; + + public ImmutableList> OnInit { get; init; } = ImmutableList>.Empty; + + public Func? ContextDecoratorChain { get; init; } + + public Spawner Spawner { get; init; } = DefaultSpawner; + + private static IContext DefaultContextDecorator(IContext context) => context; + + public static PID DefaultSpawner(ActorSystem system, string name, Props props, PID? parent) + { + //Ordering is important here + //first we create a mailbox and attach it to a process + props = system.ConfigureProps(props); + var mailbox = props.MailboxProducer(); + var dispatcher = props.Dispatcher; + var process = new ActorProcess(system, mailbox); + + //then we register it to the process registry + var (self, absent) = system.ProcessRegistry.TryAdd(name, process); + //if this fails we exit and the process and mailbox is Garbage Collected + if (!absent) throw new ProcessNameExistException(name, self); + + //if successful, we create the actor and attach it to the mailbox + var ctx = ActorContext.Setup(system, props, parent, self, mailbox); + Initialize(props, ctx); + mailbox.RegisterHandlers(ctx, dispatcher); + mailbox.PostSystemMessage(Started.Instance); + + //finally, start the mailbox and make the actor consume messages + mailbox.Start(); + + return self; + } + + private static void Initialize(Props props, ActorContext ctx) + { + foreach (var init in props.OnInit) + { + init(ctx); + } + } + + public Props WithProducer(Producer producer) => + this with {Producer = (_,_) => producer()}; + + public Props WithProducer(ProducerWithSystem producer) => + this with {Producer = (system, _) => producer(system)}; + + public Props WithProducer(ProducerWithSystemAndContext producer) => + this with {Producer = producer}; + + public Props WithDispatcher(IDispatcher dispatcher) => + this with {Dispatcher = dispatcher}; + + public Props WithMailbox(MailboxProducer mailboxProducer) => + this with {MailboxProducer = mailboxProducer}; + + public Props WithContextDecorator(params Func[] contextDecorator) + { + var x = ContextDecorator.AddRange(contextDecorator); + return this with + { + ContextDecorator = x, + ContextDecoratorChain = x + .AsEnumerable() + .Reverse() + .Aggregate( + (Func) DefaultContextDecorator, + (inner, outer) => ctx => outer(inner(ctx)) + ) + }; + } + + public Props WithOnInit(params Action[] callback) => this with + { + OnInit = OnInit.AddRange(callback), + }; + + public Props WithGuardianSupervisorStrategy(ISupervisorStrategy guardianStrategy) => + this with {GuardianStrategy = guardianStrategy}; + + public Props WithChildSupervisorStrategy(ISupervisorStrategy supervisorStrategy) => + this with {SupervisorStrategy = supervisorStrategy}; + + public Props WithReceiverMiddleware(params Func[] middleware) + { + var x = ReceiverMiddleware.AddRange(middleware); + return this with + { + ReceiverMiddleware = x, + ReceiverMiddlewareChain = x.AsEnumerable().Reverse() + .Aggregate((Receiver) Middleware.Receive, (inner, outer) => outer(inner)) + }; + } + + public Props WithSenderMiddleware(params Func[] middleware) + { + var x = SenderMiddleware.AddRange(middleware); + return this with + { + SenderMiddleware = x, + SenderMiddlewareChain = x.AsEnumerable().Reverse() + .Aggregate((Sender) Middleware.Sender, (inner, outer) => outer(inner)) + }; + } + + public Props WithSpawner(Spawner spawner) => + this with {Spawner = spawner}; + + internal PID Spawn(ActorSystem system, string name, PID? parent) => Spawner(system, name, this, parent); + + public static Props FromProducer(Producer producer) => Empty.WithProducer(_ => producer()); + + public static Props FromProducer(ProducerWithSystem producer) => Empty.WithProducer(producer); + + public static Props FromFunc(Receive receive) => FromProducer(() => new FunctionActor(receive)); } \ No newline at end of file diff --git a/src/Proto.Cluster/Cache/ClusterCacheInvalidation.cs b/src/Proto.Cluster/Cache/ClusterCacheInvalidation.cs index 3e9443b046..a289e4a9b7 100644 --- a/src/Proto.Cluster/Cache/ClusterCacheInvalidation.cs +++ b/src/Proto.Cluster/Cache/ClusterCacheInvalidation.cs @@ -21,7 +21,7 @@ public ClusterCacheInvalidation(Cluster cluster) { Cluster = cluster; Cluster.System.Extensions.Register(this); - Cluster.System.Root.SpawnNamed( + Cluster.System.Root.SpawnNamedSystem( Props.FromFunc(context => { if (context.Message is ActivationTerminated terminated) cluster.PidCache.RemoveByVal(terminated.ClusterIdentity, terminated.Pid); diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 513e9f8d8d..861ba7f8aa 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -168,7 +168,7 @@ private void InitClusterKinds() } private void InitIdentityProxy() - => System.Root.SpawnNamed(Props.FromProducer(() => new IdentityActivatorProxy(this)), IdentityActivatorProxy.ActorName); + => System.Root.SpawnNamedSystem(Props.FromProducer(() => new IdentityActivatorProxy(this)), IdentityActivatorProxy.ActorName); public async Task ShutdownAsync(bool graceful = true, string reason = "") { diff --git a/src/Proto.Cluster/Gossip/Gossiper.cs b/src/Proto.Cluster/Gossip/Gossiper.cs index 897d68bfc4..fea9408506 100644 --- a/src/Proto.Cluster/Gossip/Gossiper.cs +++ b/src/Proto.Cluster/Gossip/Gossiper.cs @@ -11,6 +11,7 @@ using System.Threading.Tasks; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; +using JetBrains.Annotations; using Microsoft.Extensions.Logging; using Proto.Logging; using Proto.Remote; @@ -41,6 +42,7 @@ public record AddConsensusCheck(ConsensusCheck Check, CancellationToken Token); public record GetGossipStateSnapshot; +[PublicAPI] public class Gossiper { public const string GossipActorName = "gossip"; @@ -136,7 +138,7 @@ internal Task StartAsync() { var props = Props.FromProducer(() => new GossipActor(_cluster.System, _cluster.Config.GossipRequestTimeout, _context.System.Id, _cluster.System.Logger(), _cluster.Config.GossipFanout, _cluster.Config.GossipMaxSend)); - _pid = _context.SpawnNamed(props, GossipActorName); + _pid = _context.SpawnNamedSystem(props, GossipActorName); _cluster.System.EventStream.Subscribe(topology => _context.Send(_pid, topology)); Logger.LogInformation("Started Cluster Gossip"); _ = SafeTask.Run(GossipLoop); diff --git a/src/Proto.Cluster/Identity/IdentityStorageLookup.cs b/src/Proto.Cluster/Identity/IdentityStorageLookup.cs index 57bb2398ce..bb0a657a13 100644 --- a/src/Proto.Cluster/Identity/IdentityStorageLookup.cs +++ b/src/Proto.Cluster/Identity/IdentityStorageLookup.cs @@ -52,7 +52,7 @@ public async Task SetupAsync(Cluster cluster, string[] kinds, bool isClient) if (isClient) return; var props = Props.FromProducer(() => new IdentityStoragePlacementActor(Cluster, this)); - _placementActor = _system.Root.SpawnNamed(props, PlacementActorName); + _placementActor = _system.Root.SpawnNamedSystem(props, PlacementActorName); } public async Task ShutdownAsync() diff --git a/src/Proto.Cluster/Partition/PartitionManager.cs b/src/Proto.Cluster/Partition/PartitionManager.cs index bca5320076..45aa0470f3 100644 --- a/src/Proto.Cluster/Partition/PartitionManager.cs +++ b/src/Proto.Cluster/Partition/PartitionManager.cs @@ -14,7 +14,7 @@ class PartitionManager private const string PartitionIdentityActorName = "partition-identity"; private const string PartitionPlacementActorName = "partition-activator"; private readonly Cluster _cluster; - private readonly IRootContext _context; + private readonly RootContext _context; private readonly bool _isClient; private readonly ActorSystem _system; private PID _partitionPlacementActor = null!; @@ -49,12 +49,11 @@ public void Setup() else { var partitionActorProps = Props - .FromProducer(() => new PartitionIdentityActor(_cluster, _config)) - .WithGuardianSupervisorStrategy(Supervision.AlwaysRestartStrategy); - _partitionIdentityActor = _context.SpawnNamed(partitionActorProps, PartitionIdentityActorName); + .FromProducer(() => new PartitionIdentityActor(_cluster, _config)); + _partitionIdentityActor = _context.SpawnNamedSystem(partitionActorProps, PartitionIdentityActorName); var partitionActivatorProps = Props.FromProducer(() => new PartitionPlacementActor(_cluster, _config)); - _partitionPlacementActor = _context.SpawnNamed(partitionActivatorProps, PartitionPlacementActorName); + _partitionPlacementActor = _context.SpawnNamedSystem(partitionActivatorProps, PartitionPlacementActorName); //synchronous subscribe to keep accurate var topologyHash = 0ul; diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorManager.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorManager.cs index a090214321..71c383d5a2 100644 --- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorManager.cs +++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorManager.cs @@ -15,7 +15,7 @@ public class PartitionActivatorManager private readonly Cluster _cluster; - private readonly IRootContext _context; + private readonly RootContext _context; private readonly bool _isClient; private readonly ActorSystem _system; private PID _partitionActivatorActor = null!; @@ -49,7 +49,7 @@ public void Setup() { var partitionActivatorProps = Props.FromProducer(() => new PartitionActivatorActor(_cluster, this)); - _partitionActivatorActor = _context.SpawnNamed(partitionActivatorProps, PartitionActivatorActorName); + _partitionActivatorActor = _context.SpawnNamedSystem(partitionActivatorProps, PartitionActivatorActorName); //synchronous subscribe to keep accurate diff --git a/src/Proto.Cluster/PubSub/PubSubManager.cs b/src/Proto.Cluster/PubSub/PubSubManager.cs index 4ed5306cca..4e808f82ba 100644 --- a/src/Proto.Cluster/PubSub/PubSubManager.cs +++ b/src/Proto.Cluster/PubSub/PubSubManager.cs @@ -17,7 +17,7 @@ public class PubSubManager public Task StartAsync() { var props = Props.FromProducer(() => new PubSubMemberDeliveryActor()); - _cluster.System.Root.SpawnNamed(props, PubSubDeliveryName); + _cluster.System.Root.SpawnNamedSystem(props, PubSubDeliveryName); return Task.CompletedTask; } diff --git a/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs b/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs index 363967e517..409255bb03 100644 --- a/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs +++ b/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs @@ -26,7 +26,7 @@ public class SeedNodeClusterProvider : IClusterProvider public async Task StartMemberAsync(Cluster cluster) { _cluster = cluster; - _pid = cluster.System.Root.SpawnNamed(SeedNodeActor.Props(_options), SeedNodeActor.Name); + _pid = cluster.System.Root.SpawnNamedSystem(SeedNodeActor.Props(_options), SeedNodeActor.Name); cluster.System.EventStream.Subscribe(x => x.Key == GossipKeys.Topology, x => cluster.System.Root.Send(_pid, x)); cluster.System.EventStream.Subscribe(cluster.System.Root, _pid); var result = await cluster.System.Root.RequestAsync(_pid, new Connect(), _cts.Token); @@ -43,7 +43,7 @@ public async Task StartMemberAsync(Cluster cluster) public async Task StartClientAsync(Cluster cluster) { _cluster = cluster; - _pid = cluster.System.Root.SpawnNamed(SeedClientNodeActor.Props(_options), SeedClientNodeActor.Name); + _pid = cluster.System.Root.SpawnNamedSystem(SeedClientNodeActor.Props(_options), SeedClientNodeActor.Name); var result = await cluster.System.Root.RequestAsync(_pid, new Connect(), _cts.Token); switch (result) { diff --git a/src/Proto.Remote/Endpoints/EndpointManager.cs b/src/Proto.Remote/Endpoints/EndpointManager.cs index bb8d391794..24d4424ee5 100644 --- a/src/Proto.Remote/Endpoints/EndpointManager.cs +++ b/src/Proto.Remote/Endpoints/EndpointManager.cs @@ -195,9 +195,8 @@ internal IEndpoint GetClientEndpoint(string systemId) } private void SpawnActivator() { - var props = Props.FromProducer(() => new Activator(_remoteConfig, _system)) - .WithGuardianSupervisorStrategy(Supervision.AlwaysRestartStrategy); - ActivatorPid = _system.Root.SpawnNamed(props, "activator"); + var props = Props.FromProducer(() => new Activator(_remoteConfig, _system)); + ActivatorPid = _system.Root.SpawnNamedSystem(props, "activator"); } private void StopActivator() => _system.Root.Stop(ActivatorPid); } \ No newline at end of file