-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathProducerBot.scala
75 lines (56 loc) · 2.85 KB
/
ProducerBot.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package data_distribution
import java.util
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable}
import akka.cluster.Cluster
import akka.cluster.ddata.Replicator._
import akka.cluster.ddata.{DistributedData, ORSet, ORSetKey}
import scala.collection.mutable
import scala.concurrent.duration._
/**
* Created by pabloperezgarcia on 19/02/2017.
*
* With Akka cluster distributed data we can use the Replicator Actor, which we can subscribe to a ORSetKey-actor.
* This ORSetKey it´s a combination of object and key, the object it will be the data to modify/increase/reduce in the cluster
* For instance it could be a Set of events to be distributed through of cluster.
*
* In this example the producer it will subscribe to a specific ORSetKey and it will modify this one using scheduler.
*
* All nodes subscribed with the same ORSetKey will receive the notification of change in "element @ Changed(DataKey)"
*
* Official doc
* http://doc.akka.io/docs/akka/2.4.16/scala/distributed-data.html
*/
class ProducerBot extends Actor with ActorLogging {
implicit val node = Cluster(context.system)
import context.dispatcher
val tickTask: Cancellable = context.system.scheduler.schedule(5.seconds, 5.seconds, self, "Tick")
val queue = new util.ArrayDeque(util.Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
val DataKey: ORSetKey[String] = ORSetKey[String]("uniqueKey")
val replicator: ActorRef = DistributedData(context.system).replicator
replicator ! Subscribe(DataKey, self)
/**
* In Akka cluster distributed data we can write the changes in different ways:
*
* WriteLocal the value will immediately only be written to the local replica, and later disseminated with gossip
* WriteTo(n) the value will immediately be written to at least n replicas, including the local replica
* WriteMajority the value will immediately be written to a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
* WriteAll the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group)
*/
def receive = {
case "Tick" =>
// val randomElement = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString
val data = queue.poll()
log.info("Producer sending: {}", data)
replicator ! Update(DataKey, ORSet.empty[String], WriteAll(5 seconds))(previous => {
//We should persist previous in akka persistence
previous.clear(node) //Clean the old history of send
ORSet.empty + data.toString
})
case _: UpdateResponse[_] => //Ignore
//This case will get all changes in the ORSetKey
case replicatorMessage@Changed(DataKey) =>
val data = replicatorMessage.get(DataKey)
// data.clear(node)
}
override def postStop(): Unit = tickTask.cancel()
}