Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 41 additions & 35 deletions Akka/src/main/scala/API/AkkaExp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,30 @@ package simulation.akka.API

import akka.cluster.typed.Cluster
import meta.runtime.Actor
import meta.API.{SimulationDataBuilder, TimeseriesBuilder, SnapshotBuilder}
import com.typesafe.config.ConfigFactory
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
import scala.collection.JavaConversions._
import akka.actor.typed.{Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.NoSerializationVerificationNeeded

// import akka.actor.typed.DispatcherSelector

object AkkaExp {
sealed trait Command extends NoSerializationVerificationNeeded
final case class SpawnDriver(totalWorkers: Int, totalTurn: Long) extends Command
final case class SpawnWorker(workerId: Int, sims: Seq[Actor], totalWorkers: Int) extends Command
final case class SpawnDriver(totalWorkers: Int, totalTurn: Long, logControllerOn: Boolean) extends Command
final case class SpawnWorker(workerId: Int, sims: Seq[Actor], totalWorkers: Int, logControllerOn: Boolean) extends Command
final case class SpawnLogController(totalWorkers: Int) extends Command
final case class DriverStopped() extends Command
final case class WorkerStopped(workerId: Int, sims: Seq[Actor]) extends Command
final case class LogControllerStopped() extends Command

var cluster: Cluster = null
var totalWorkers: Int = 0
val stoppedWorkers: ConcurrentLinkedQueue[Int] = new ConcurrentLinkedQueue[Int]()
var activeWorkers: ConcurrentLinkedQueue[Int] = new ConcurrentLinkedQueue[Int]()
var finalAgents: ConcurrentLinkedQueue[Actor] = new ConcurrentLinkedQueue[Actor]()
val defaultHaltCond: Iterable[Iterable[Serializable]] => Boolean = (x: Iterable[Iterable[Serializable]]) => false
var haltCond: Iterable[Iterable[Serializable]] => Boolean = null

def materializedMachine(mid: Int, totalTurn: Long, totalWorkers: Int, actors: IndexedSeq[Actor]=Vector[Actor]()): Behavior[Command] =
def materializedMachine(mid: Int, totalTurn: Long, totalWorkers: Int, builder: SimulationDataBuilder, actors: IndexedSeq[Actor]): Behavior[Command] =
Behaviors.setup { ctx =>
cluster = Cluster(ctx.system)
this.totalWorkers = totalWorkers
Expand All @@ -49,30 +46,28 @@ object AkkaExp {
} else {
actors.slice(i*actorsPerWorker, (i+1)*actorsPerWorker)
}
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers)
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, false)
}
waitTillFinish(Vector.empty)
// simulateUntil supports only Standalone mode for now
waitTillFinish(Vector.empty, builder, None)
}

def apply(totalTurn: Long, totalWorkers: Int, actors: IndexedSeq[Actor]=Vector[Actor](),
cond: Iterable[Iterable[Serializable]] => Boolean = defaultHaltCond): Behavior[Command] =
def apply(totalTurn: Long, totalWorkers: Int, builder: SimulationDataBuilder, actors: IndexedSeq[Actor], haltCond: Option[Iterable[Iterable[Serializable]] => Boolean]): Behavior[Command] =
Behaviors.setup { ctx =>
cluster = Cluster(ctx.system)
this.totalWorkers = totalWorkers
val roles: Set[String] = cluster.selfMember.getRoles.toSet
val totalActors = actors.size
var actorsPerWorker = totalActors/totalWorkers

if (cond != defaultHaltCond) {
haltCond = cond
}

stoppedWorkers.clear()
activeWorkers.clear()
finalAgents.clear()

ctx.log.debug(f"${actorsPerWorker} actors per worker")

val logControllerOn = haltCond.isDefined || builder.isInstanceOf[TimeseriesBuilder]

// Worker id is 0-indexed
if (roles.exists(p => p.startsWith("Worker"))) {
ctx.log.debug(f"Creating a worker!")
Expand All @@ -82,7 +77,7 @@ object AkkaExp {
} else {
actors.slice(wid*actorsPerWorker, (wid+1)*actorsPerWorker)
}
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers)
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, logControllerOn)
}

// Machine id is 0-indexed
Expand All @@ -97,56 +92,67 @@ object AkkaExp {
} else {
actors.slice(wid*actorsPerWorker, (wid+1)*actorsPerWorker)
}
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers)
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, logControllerOn)
}
}

if (cluster.selfMember.hasRole("Driver")) {
ctx.log.debug(f"Creating a driver!")
ctx.self ! SpawnDriver(totalWorkers, totalTurn)
ctx.self ! SpawnDriver(totalWorkers, totalTurn, logControllerOn)
// Co-locate the log controller with driver
if (simulation.akka.API.OptimizationConfig.logControllerEnabled) {
if (logControllerOn) {
ctx.self ! SpawnLogController(totalWorkers)
}
}

if (cluster.selfMember.hasRole("Standalone")) {
ctx.log.debug(f"Standalone mode")
ctx.self ! SpawnDriver(totalWorkers, totalTurn)
if (simulation.akka.API.OptimizationConfig.logControllerEnabled) {
ctx.self ! SpawnDriver(totalWorkers, totalTurn, logControllerOn)

if (logControllerOn) {
ctx.self ! SpawnLogController(totalWorkers)
}

for (i <- Range(0, totalWorkers)){
val containedAgents = if (i == totalWorkers-1){
actors.slice(i*actorsPerWorker, totalActors)
} else {
actors.slice(i*actorsPerWorker, (i+1)*actorsPerWorker)
}
ctx.self ! SpawnWorker(i, containedAgents, totalWorkers)
ctx.self ! SpawnWorker(i, containedAgents, totalWorkers, logControllerOn)
}
}
waitTillFinish(Vector.empty)
waitTillFinish(Vector.empty, builder, haltCond)
}

def waitTillFinish(finalAgents: IndexedSeq[Actor]): Behavior[Command] = {
def waitTillFinish(finalAgents: IndexedSeq[Actor], builder: SimulationDataBuilder, haltCond: Option[Iterable[Iterable[Serializable]] => Boolean]): Behavior[Command] = {
Behaviors.receive { (ctx, message) =>
message match {
case SpawnDriver(totalWorkers, totalTurn) =>
val driver = ctx.spawn((new simulation.akka.core.Driver).apply(totalWorkers, totalTurn), "driver")
case SpawnDriver(totalWorkers, totalTurn, logControllerOn) =>
val driver = ctx.spawn((new simulation.akka.core.Driver).apply(totalWorkers, totalTurn, logControllerOn), "driver")
ctx.watchWith(driver, DriverStopped())
Behaviors.same

case SpawnLogController(totalWorkers) =>
val logController = if (haltCond != null) {
ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, haltCond), "logController")
val logController = if (haltCond.isDefined) {
// ctx.log.info("Conditional termination is defined!")
ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, haltCond.get, builder.asInstanceOf[TimeseriesBuilder]), "logController")
} else {
ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers), "logController")
// ctx.log.info("Conditional termination is nto defined!")
ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, builder.asInstanceOf[TimeseriesBuilder]), "logController")
}
ctx.watchWith(logController, LogControllerStopped())
Behaviors.same

case SpawnWorker(workerId, agents, totalWorkers) =>
val sim = ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers), f"worker${workerId}")
case SpawnWorker(workerId, agents, totalWorkers, logControllerOn) =>
val sim = builder match {
case x: TimeseriesBuilder => {
ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers, Some(x.asInstanceOf[TimeseriesBuilder])), f"worker${workerId}")
}
case _: SnapshotBuilder => {
ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers, None), f"worker${workerId}")
}
}
activeWorkers.add(workerId)
ctx.watchWith(sim, WorkerStopped(workerId, agents))
Behaviors.same
Expand Down Expand Up @@ -175,20 +181,20 @@ object AkkaExp {
if (!stoppedWorkers.contains(workerId)){
stoppedWorkers.add(workerId)
if (activeWorkers.toSet.diff(stoppedWorkers.toSet).isEmpty){
Simulate.addStoppedAgents(finalAgents ++ agents)
builder.addAgents(finalAgents ++ agents)
Behaviors.stopped {() =>
ctx.system.terminate()
}
} else {
waitTillFinish(finalAgents ++ agents)
waitTillFinish(finalAgents ++ agents, builder, haltCond)
}
} else {
if (activeWorkers.toSet.diff(stoppedWorkers.toSet).isEmpty){
Behaviors.stopped {() =>
ctx.system.terminate()
}
} else {
waitTillFinish(finalAgents)
waitTillFinish(finalAgents, builder, haltCond)
}
}
} else {
Expand Down
4 changes: 0 additions & 4 deletions Akka/src/main/scala/API/Optimization.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ case object MergedWorker extends Optimization
object OptimizationConfig {
var conf: Optimization = MergedWorker

var logControllerEnabled: Boolean = false

var timeseriesSchema: SimulationTimeseries = FullTimeseries

// todo: tmp, fix with proper availability input
var availability: Int = 1

Expand Down
139 changes: 63 additions & 76 deletions Akka/src/main/scala/API/Simulate.scala
Original file line number Diff line number Diff line change
@@ -1,101 +1,88 @@
package simulation.akka.API

import com.typesafe.config.ConfigFactory
import meta.API.SimulationSnapshot
import meta.API.{DeforestationStrategy, SimulationData, SimulationDataBuilder, SnapshotBuilder, TimeseriesBuilder}
import meta.runtime.{Actor, Message}
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.typed.ActorSystem

object Simulate {
private var stoppedAgents = IndexedSeq[Actor]()
def apply(actors: IndexedSeq[Actor], totalTurn: Long, conf: Map[String, Any], cond: Option[Iterable[Iterable[Serializable]] => Boolean] = None)(implicit strategy: DeforestationStrategy): SimulationData = {

var lastWords: IndexedSeq[Message] = IndexedSeq[Message]()
require(conf.isDefinedAt("role")) // Standalone, Driver, Machine-$id
require(conf.isDefinedAt("port")) // network port
require(conf.isDefinedAt("name")) // name of the actor system, to allow concurrent simulations
require(conf.isDefinedAt("data")) // timeseries or snapshot

def addStoppedAgents(agents: IndexedSeq[Actor]): Unit = {
stoppedAgents = agents
}

var timeseries: Iterable[Iterable[Serializable]] = null

def initialize(): Unit = {
stoppedAgents=IndexedSeq[Actor]()
lastWords=IndexedSeq[Message]()
}

def driver(totalTurn: Long, port: Int = 25251): SimulationSnapshot = {
initialize()
val role: String = conf("role").asInstanceOf[String]
val port: Int = conf("port").asInstanceOf[Int]
val name: String = conf("name").asInstanceOf[String]
val dataConf: String = conf("data").asInstanceOf[String]

val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [Driver]
""").withFallback(ConfigFactory.load("application"))
// If there are more workers than agents, then set the worker number to the same as agents
val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt
val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt
var totalWorkers = workersPerMachine * totalMachines
println(f"${totalMachines} total machines, ${totalWorkers} total workers")
val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers), "SimsCluster", config)
Await.ready(actorSystem.whenTerminated, 10.days)
println("Simulation ends!")
SimulationSnapshot(stoppedAgents, lastWords)
}


// Materialized (actors are all containedActors)
def machine(mid: Int, actors: IndexedSeq[Actor], totalTurn: Long, port: Int = 0): SimulationSnapshot = {
initialize()
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [Machine-$mid]
""").withFallback(ConfigFactory.load("application"))
// If there are more workers than agents, then set the worker number to the same as agents
val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt
val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt
var totalWorkers = workersPerMachine * totalMachines
println(f"${totalMachines} total machines, ${totalWorkers} total workers")

val actorSystem = ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, actors), "SimsCluster", config)
Await.ready(actorSystem.whenTerminated, 10.days)
println("Simulation ends!")
SimulationSnapshot(stoppedAgents, lastWords)
}

def apply(actors: IndexedSeq[Actor], totalTurn: Long,
role: String= "Standalone", port: Int = 25251): SimulationSnapshot = {
initialize()
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [$role]
""").withFallback(ConfigFactory.load("application"))
// If there are more workers than agents, then set the worker number to the same as agents
val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt
val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt
var totalWorkers = workersPerMachine * totalMachines

println(f"${totalMachines} total machines, ${totalWorkers} total workers, and ${actors.size} actors")
// well-formedness check
val machinePrefix = "Machine-"
val workerPrefix = "Worker-"
try {
role match {
case "Standalone" => totalWorkers = workersPerMachine // ignore totalMachine setting
case "Driver" =>
case s if s.startsWith(machinePrefix) && s.stripPrefix(machinePrefix).toInt < totalMachines =>
case s if s.startsWith(workerPrefix) && s.stripPrefix(workerPrefix).toInt < totalWorkers =>
case _ => throw new Exception("Invalid role!")
}
} catch {
case e: Exception => throw new Exception(f"Invalid role ${role}. Available roles are Standalone, Driver, Machine-id, or Worker-id. Replacing id with 0-based int (less than total machines or workers)")
}

if (totalWorkers > actors.size){
println(f"Found more workers than agents! Set total workers from ${totalWorkers} to ${actors.size}")
totalWorkers = actors.size
}

val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers, actors), "SimsCluster", config)
val machinePrefix = "Machine-"
val builder: SimulationDataBuilder = if (dataConf == "timeseries") {
new TimeseriesBuilder(strategy)
} else {
new SnapshotBuilder()
}

val ip: String = conf.getOrElse("ip", "localhost").asInstanceOf[String]

val actorSystem = role match {
case "Standalone" => {
// local mode
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.remote.artery.canonical.hostname=localhost
akka.cluster.roles = [$role]
akka.cluster.seed-nodes = ["akka://$name@localhost:$port"]
""").withFallback(ConfigFactory.load("application"))
ActorSystem(AkkaExp(totalTurn, totalWorkers, builder, actors, cond), name, config)
}
case "Driver" => {
require(conf.isDefinedAt("ip"))
// By default, driver is also the seed node
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.hostname=$ip
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [$role]
akka.cluster.seed-nodes = ["akka://$name@$ip:$port"]
""").withFallback(ConfigFactory.load("application"))
ActorSystem(AkkaExp(totalTurn, totalWorkers, builder, Vector[Actor](), None), name, config)
}
case s if s.startsWith(machinePrefix) => {
require(conf.isDefinedAt("ip"))
require(conf.isDefinedAt("seed")) // ip:port
val seed: String = conf("seed").asInstanceOf[String]
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.hostname=$ip
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [$role]
akka.cluster.seed-nodes = ["akka://$name@$seed"]
""").withFallback(ConfigFactory.load("application"))

// 0-based
val mid = s.stripPrefix(machinePrefix).toInt
assert(mid < totalMachines)
ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, builder, actors), name, config)
}
case _ => throw new Exception("Invalid role! Supported roles are Standalone, Driver, and Machine-$id (o-based)")
}
Await.ready(actorSystem.whenTerminated, 10.days)

println("Simulation ends!")
SimulationSnapshot(stoppedAgents, lastWords)
builder.build()
}
}
}
Loading