Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for a Sirius initialization callback. (Issue #17) #21

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2014 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.comcast.xfinity.sirius.api.impl

import com.comcast.xfinity.sirius.api.Sirius

/**
* These are additional methods supported by an expanded Sirius interface.
*/
trait Sirius1Dot2Extensions {
/**
* Register a callback to be invoked once the Sirius subsystem has been
* initialized (i.e. log replay has completed). This may be called
* multiple times to install multiple init hook callbacks; each will be
* called once upon initialization. If Sirius has already been
* initialized, the callback will be invoked right away.
*
* @param initHook callback to run
*/
def onInitialized(initHook: Runnable): Unit
}

trait Sirius1Dot2 extends Sirius with Sirius1Dot2Extensions
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import akka.actor._
import java.util.concurrent.Future
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
import com.comcast.xfinity.sirius.api.SiriusConfiguration
import scala.concurrent.{Await, Future => AkkaFuture}
import scala.concurrent.{Future => AkkaFuture, ExecutionContext, Await}
import akka.util.Timeout
import scala.concurrent.duration._
import status.NodeStats.FullNodeStatus
import status.StatusWorker._
import scala.concurrent.ExecutionContext.Implicits.global

object SiriusImpl {

Expand Down Expand Up @@ -61,7 +62,7 @@ object SiriusImpl {
* @param actorSystem the actorSystem to use to create the Actors for Sirius
*/
class SiriusImpl(config: SiriusConfiguration, supProps: Props)(implicit val actorSystem: ActorSystem)
extends Sirius {
extends Sirius1Dot2 {

val supName = config.getProp(SiriusConfiguration.SIRIUS_SUPERVISOR_NAME, "sirius")
implicit val timeout: Timeout =
Expand Down Expand Up @@ -131,6 +132,12 @@ class SiriusImpl(config: SiriusConfiguration, supProps: Props)(implicit val acto
onShutdownHook = Some(() => shutdownHook)
}

def onInitialized(initHook: Runnable) {
(supervisor ? SiriusSupervisor.RegisterInitHook) onSuccess {
case _ => initHook.run()
}
}

/**
* Terminate this instance. Shuts down all associated Actors.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ object SiriusSupervisor {
case object CheckPaxosMembership extends SupervisorMessage

case class IsInitializedResponse(initialized: Boolean)
case object RegisterInitHook
case object Initialized

/**
* Factory for creating the children actors of SiriusSupervisor.
Expand Down Expand Up @@ -133,6 +135,8 @@ private[impl] class SiriusSupervisor(childProvider: ChildProvider, config: Siriu
val membershipCheckSchedule = context.system.scheduler.
schedule(0 seconds, checkIntervalSecs seconds, self, CheckPaxosMembership)

var initHookClients : Set[ActorRef] = Set.empty

override def postStop() {
membershipCheckSchedule.cancel()
}
Expand All @@ -149,9 +153,13 @@ private[impl] class SiriusSupervisor(childProvider: ChildProvider, config: Siriu
context.become(initialized)

sender ! SiriusSupervisor.IsInitializedResponse(initialized = true)

initHookClients foreach { _ ! SiriusSupervisor.Initialized }
initHookClients = Set.empty
} else {
sender ! SiriusSupervisor.IsInitializedResponse(initialized = false)
}
case SiriusSupervisor.RegisterInitHook => initHookClients += sender

// Ignore other messages until Initialized.
case _ =>
Expand All @@ -162,6 +170,7 @@ private[impl] class SiriusSupervisor(childProvider: ChildProvider, config: Siriu
case logQuery: LogQuery => stateSup forward logQuery
case membershipMessage: MembershipMessage => membershipActor forward membershipMessage
case SiriusSupervisor.IsInitializedRequest => sender ! new SiriusSupervisor.IsInitializedResponse(true)
case SiriusSupervisor.RegisterInitHook => sender ! SiriusSupervisor.Initialized
case statusQuery: StatusQuery => statusSubsystem forward statusQuery
case compactionMessage: CompactionMessage => compactionManager match {
case Some(actor) => actor forward compactionMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import com.comcast.xfinity.sirius.{TimedTest, NiceTest}
import com.comcast.xfinity.sirius.api.{SiriusConfiguration, SiriusResult}
import status.NodeStats.FullNodeStatus
import status.StatusWorker._
import scala.concurrent.ExecutionContext.Implicits.global
import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.{Initialized, RegisterInitHook}
import scala.concurrent.{Await, Promise}

object SiriusImplTestCompanion {

Expand Down Expand Up @@ -91,6 +94,9 @@ class SiriusImplTest extends NiceTest with TimedTest {
case GetStatus =>
sender ! mockNodeStatus
this
case RegisterInitHook =>
sender ! Initialized
this
}
})

Expand All @@ -105,6 +111,17 @@ class SiriusImplTest extends NiceTest with TimedTest {
}

describe("a SiriusImpl") {
it("should send a RegisterInitHook message to the supervisor actor when onInitialized is called") {
underTest.onInitialized(new Runnable() { def run() { } })
supervisorActorProbe.expectMsg(SiriusSupervisor.RegisterInitHook)
}

it("should call the initHook once the supervisor actor responds with an Initialized message") {
val p : Promise[Boolean] = Promise()
underTest.onInitialized(new Runnable() { def run() { p.success(true) }})
assert(Await.result(p.future, 50 millis))
}

it("should send a Get message to the supervisor actor when enqueueGet is called") {
val key = "hello"
val getFuture = underTest.enqueueGet(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,32 @@ class SiriusSupervisorTest extends NiceTest with BeforeAndAfterAll with TimedTes
waitForTrue(stateAgent().supervisorInitialized, 5000, 250)
}

it("should reply to a registered initHook once initialized") {
val probe = TestProbe()
probe.send(supervisor, SiriusSupervisor.RegisterInitHook)
initializeSupervisor(supervisor)
probe.expectMsg(SiriusSupervisor.Initialized)
}

it("should reply to all registered initHooks once initialized") {
val probe1 = TestProbe()
val probe2 = TestProbe()
probe1.send(supervisor, SiriusSupervisor.RegisterInitHook)
probe2.send(supervisor, SiriusSupervisor.RegisterInitHook)
initializeSupervisor(supervisor)
probe1.expectMsg(SiriusSupervisor.Initialized)
probe2.expectMsg(SiriusSupervisor.Initialized)
}

it("should reply immediately to an initHook registration once already initialized") {
val probe = TestProbe()
val stateAgent = supervisor.underlyingActor.siriusStateAgent
initializeSupervisor(supervisor)
waitForTrue(stateAgent().supervisorInitialized, 5000, 250)
probe.send(supervisor, SiriusSupervisor.RegisterInitHook)
probe.expectMsg(SiriusSupervisor.Initialized)
}

it("should forward MembershipMessages to the membershipActor") {
initializeSupervisor(supervisor)
initializeOrdering(supervisor, Some(paxosProbe.ref))
Expand Down