@@ -30,25 +30,34 @@ class ShardManager(
30
30
ZStream .fromHub(eventsHub)
31
31
32
32
def register (pod : Pod ): UIO [Unit ] =
33
- ZIO .logInfo(s " Registering $pod" ) *>
34
- (stateRef
35
- .updateAndGetZIO(state =>
36
- ZIO
37
- .succeed(OffsetDateTime .now())
38
- .map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata (pod, cdt))))
39
- )
40
- .flatMap(state => ZIO .when(state.unassignedShards.nonEmpty)(rebalance(false ))) *>
41
- persistPods.forkDaemon).unit
33
+ for {
34
+ _ <- ZIO .logInfo(s " Registering $pod" )
35
+ state <- stateRef.updateAndGetZIO(state =>
36
+ ZIO
37
+ .succeed(OffsetDateTime .now())
38
+ .map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata (pod, cdt))))
39
+ )
40
+ _ <- eventsHub.publish(ShardingEvent .PodRegistered (pod.address))
41
+ _ <- ZIO .when(state.unassignedShards.nonEmpty)(rebalance(false ))
42
+ _ <- persistPods.forkDaemon
43
+ } yield ()
42
44
43
45
def notifyUnhealthyPod (podAddress : PodAddress ): UIO [Unit ] =
44
46
ZIO
45
47
.whenZIO(stateRef.get.map(_.pods.contains(podAddress))) {
46
- ZIO .unlessZIO(healthApi.isAlive(podAddress))(
47
- ZIO .logWarning(s " $podAddress is not alive, unregistering " ) *> unregister(podAddress)
48
- )
48
+ eventsHub.publish(ShardingEvent .PodHealthChecked (podAddress)) *>
49
+ ZIO .unlessZIO(healthApi.isAlive(podAddress))(
50
+ ZIO .logWarning(s " $podAddress is not alive, unregistering " ) *> unregister(podAddress)
51
+ )
49
52
}
50
53
.unit
51
54
55
+ def checkAllPodsHealth : UIO [Unit ] =
56
+ for {
57
+ pods <- stateRef.get.map(_.pods.keySet)
58
+ _ <- ZIO .foreachParDiscard(pods)(notifyUnhealthyPod).withParallelism(4 )
59
+ } yield ()
60
+
52
61
def unregister (podAddress : PodAddress ): UIO [Unit ] =
53
62
ZIO
54
63
.whenZIO(stateRef.get.map(_.pods.contains(podAddress))) {
@@ -64,6 +73,7 @@ class ShardManager(
64
73
)
65
74
)
66
75
}
76
+ _ <- eventsHub.publish(ShardingEvent .PodUnregistered (podAddress))
67
77
_ <- eventsHub
68
78
.publish(ShardingEvent .ShardsUnassigned (podAddress, unassignments))
69
79
.when(unassignments.nonEmpty)
@@ -242,6 +252,9 @@ object ShardManager {
242
252
object ShardingEvent {
243
253
case class ShardsAssigned (pod : PodAddress , shards : Set [ShardId ]) extends ShardingEvent
244
254
case class ShardsUnassigned (pod : PodAddress , shards : Set [ShardId ]) extends ShardingEvent
255
+ case class PodRegistered (pod : PodAddress ) extends ShardingEvent
256
+ case class PodUnregistered (pod : PodAddress ) extends ShardingEvent
257
+ case class PodHealthChecked (pod : PodAddress ) extends ShardingEvent
245
258
}
246
259
247
260
def decideAssignmentsForUnassignedShards (
0 commit comments