Skip to content

Commit

Permalink
Configuring persistence plugins at runtime for EventSourcedBehavior (#…
Browse files Browse the repository at this point in the history
…1518)

* Add runtime journal plugin configuration to EventSourcedBehavior

* Add runtime journal plugin configuration test for event journals

* Add runtime journal plugin configuration test for snapshot store

* Mark SnapshotStorageEmulatorExtension class as internal API

* Add runtime journal plugin configuration test for event journals explicitly

* Address review comments

* Address review comments

* Add binary compat exclusions

* Fix test name

* Add Java API

* Use pekko option converters instead of scala jdk
  • Loading branch information
ptrdom authored Oct 16, 2024
1 parent 0e10d29 commit 4afe7cf
Show file tree
Hide file tree
Showing 13 changed files with 288 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,18 @@ object PersistenceTestKitPlugin {
* Persistence testkit plugin for snapshots.
*/
@InternalApi
class PersistenceTestKitSnapshotPlugin extends SnapshotStore {
class PersistenceTestKitSnapshotPlugin(
// providing this parameter in first position as unused
// because Persistence extension that instantiates the plugins
// does not support constructors without it
@unused cfg: Config,
cfgPath: String
) extends SnapshotStore {

private final val storage = SnapshotStorageEmulatorExtension(context.system)
private final val storage = {
log.debug("Using snapshot storage emulator extension [{}] for test kit snapshot storage", cfgPath)
SnapshotStorageEmulatorExtension(context.system).storageFor(cfgPath)
}

override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
Future.fromTry(Try(storage.tryRead(persistenceId, criteria)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.apache.pekko.persistence.testkit.internal

import java.util.concurrent.ConcurrentHashMap

import org.apache.pekko
import pekko.actor.Extension
import pekko.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider }
Expand All @@ -24,17 +26,34 @@ import pekko.persistence.testkit.scaladsl.SnapshotTestKit
* INTERNAL API
*/
@InternalApi
private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorage] with ExtensionIdProvider {
private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorageEmulatorExtension]
with ExtensionIdProvider {

override def get(system: ActorSystem): SnapshotStorage = super.get(system)
override def get(system: ActorSystem): SnapshotStorageEmulatorExtension = super.get(system)

override def createExtension(system: ExtendedActorSystem): SnapshotStorage =
if (SnapshotTestKit.Settings(system).serialize) {
new SerializedSnapshotStorageImpl(system)
} else {
new SimpleSnapshotStorageImpl
}
override def createExtension(system: ExtendedActorSystem): SnapshotStorageEmulatorExtension =
new SnapshotStorageEmulatorExtension(system)

override def lookup: ExtensionId[_ <: Extension] =
SnapshotStorageEmulatorExtension
}

/**
* INTERNAL API
*/
@InternalApi
final class SnapshotStorageEmulatorExtension(system: ExtendedActorSystem) extends Extension {
private val stores = new ConcurrentHashMap[String, SnapshotStorage]()
private lazy val shouldCreateSerializedSnapshotStorage = SnapshotTestKit.Settings(system).serialize

def storageFor(key: String): SnapshotStorage =
stores.computeIfAbsent(key,
_ => {
// we don't really care about the key here, we just want separate instances
if (shouldCreateSerializedSnapshotStorage) {
new SerializedSnapshotStorageImpl(system)
} else {
new SimpleSnapshotStorageImpl
}
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ class SnapshotTestKit(system: ActorSystem)

import SnapshotTestKit._

override protected val storage: SnapshotStorage = SnapshotStorageEmulatorExtension(system)
override protected val storage: SnapshotStorage =
SnapshotStorageEmulatorExtension(system).storageFor(PersistenceTestKitSnapshotPlugin.PluginId)

override def getItem(persistenceId: String, nextInd: Int): Option[Any] = {
storage.firstInExpectNextQueue(persistenceId).map(reprToAny)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.testkit.scaladsl

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.adapter._
import pekko.persistence.JournalProtocol.RecoverySuccess
import pekko.persistence.JournalProtocol.ReplayMessages
import pekko.persistence.JournalProtocol.ReplayedMessage
import pekko.persistence.Persistence
import pekko.persistence.SelectedSnapshot
import pekko.persistence.SnapshotProtocol.LoadSnapshot
import pekko.persistence.SnapshotProtocol.LoadSnapshotResult
import pekko.persistence.SnapshotSelectionCriteria
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.PersistenceTestKitSnapshotPlugin
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
import pekko.persistence.typed.scaladsl.EventSourcedBehavior
import pekko.persistence.typed.scaladsl.RetentionCriteria
import org.scalatest.Inside
import org.scalatest.wordspec.AnyWordSpecLike

object RuntimeJournalsSpec {

private object Actor {
sealed trait Command
case class Save(text: String, replyTo: ActorRef[Done]) extends Command
case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command
case object Stop extends Command

def apply(persistenceId: String, journal: String): Behavior[Command] =
EventSourcedBehavior[Command, String, String](
PersistenceId.ofUniqueId(persistenceId),
"",
(state, cmd) =>
cmd match {
case Save(text, replyTo) =>
Effect.persist(text).thenRun(_ => replyTo ! Done)
case ShowMeWhatYouGot(replyTo) =>
replyTo ! state
Effect.none
case Stop =>
Effect.stop()
},
(state, evt) => Seq(state, evt).filter(_.nonEmpty).mkString("|"))
.withRetention(RetentionCriteria.snapshotEvery(1, Int.MaxValue))
.withJournalPluginId(s"$journal.journal")
.withJournalPluginConfig(Some(config(journal)))
.withSnapshotPluginId(s"$journal.snapshot")
.withSnapshotPluginConfig(Some(config(journal)))

}

private def config(journal: String) = {
ConfigFactory.parseString(s"""
$journal {
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
snapshot.class = "${classOf[PersistenceTestKitSnapshotPlugin].getName}"
}
""")
}
}

class RuntimeJournalsSpec
extends ScalaTestWithActorTestKit
with AnyWordSpecLike
with LogCapturing
with Inside {

import RuntimeJournalsSpec._

"The testkit journal and snapshot store plugins" must {

"be possible to configure at runtime and use in multiple isolated instances" in {
val probe = createTestProbe[Any]()

{
// one actor in each journal with same id
val j1 = spawn(Actor("id1", "journal1"))
val j2 = spawn(Actor("id1", "journal2"))
j1 ! Actor.Save("j1m1", probe.ref)
probe.receiveMessage()
j2 ! Actor.Save("j2m1", probe.ref)
probe.receiveMessage()
}

{
def assertJournal(journal: String, expectedEvent: String) = {
val ref = Persistence(system).journalFor(s"$journal.journal", config(journal))
ref.tell(ReplayMessages(0, Long.MaxValue, Long.MaxValue, "id1", probe.ref.toClassic), probe.ref.toClassic)
inside(probe.receiveMessage()) {
case ReplayedMessage(persistentRepr) =>
persistentRepr.persistenceId shouldBe "id1"
persistentRepr.payload shouldBe expectedEvent
}
probe.expectMessage(RecoverySuccess(1))
}

assertJournal("journal1", "j1m1")
assertJournal("journal2", "j2m1")
}

{
def assertSnapshot(journal: String, expectedShapshot: String) = {
val ref = Persistence(system).snapshotStoreFor(s"$journal.snapshot", config(journal))
ref.tell(LoadSnapshot("id1", SnapshotSelectionCriteria.Latest, Long.MaxValue),
probe.ref.toClassic)
inside(probe.receiveMessage()) {
case LoadSnapshotResult(Some(SelectedSnapshot(_, snapshot)), _) =>
snapshot shouldBe expectedShapshot
}
}

assertSnapshot("journal1", "j1m1")
assertSnapshot("journal2", "j2m1")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Add EventSourcedBehavior.withJournalPluginConfig
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withJournalPluginConfig")
# Add EventSourcedBehavior.withSnapshotPluginConfig
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withSnapshotPluginConfig")
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package org.apache.pekko.persistence.typed.internal

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.util.Helpers.ConfigOps
import pekko.actor.Cancellable
import pekko.actor.typed.Signal
import pekko.actor.typed.scaladsl.ActorContext
Expand Down Expand Up @@ -74,8 +76,10 @@ private[pekko] final class BehaviorSetup[C, E, S](

val persistence: Persistence = Persistence(context.system.toClassic)

val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId)
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
val journal: ClassicActorRef = persistence
.journalFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
val snapshotStore: ClassicActorRef = persistence
.snapshotStoreFor(settings.snapshotPluginId, settings.snapshotPluginConfig.getOrElse(ConfigFactory.empty))

val isSnapshotOptional: Boolean =
Persistence(context.system.classicSystem).configFor(snapshotStore).getBoolean("snapshot-is-optional")
Expand Down Expand Up @@ -107,14 +111,18 @@ private[pekko] final class BehaviorSetup[C, E, S](

private var recoveryTimer: OptionVal[Cancellable] = OptionVal.None

val recoveryEventTimeout: FiniteDuration = persistence
.journalConfigFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
.getMillisDuration("recovery-event-timeout")

def startRecoveryTimer(snapshot: Boolean): Unit = {
cancelRecoveryTimer()
implicit val ec: ExecutionContext = context.executionContext
val timer =
if (snapshot)
context.scheduleOnce(settings.recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
context.scheduleOnce(recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
else
context.system.scheduler.scheduleWithFixedDelay(settings.recoveryEventTimeout, settings.recoveryEventTimeout) {
context.system.scheduler.scheduleWithFixedDelay(recoveryEventTimeout, recoveryEventTimeout) {
() =>
context.self ! RecoveryTickEvent(snapshot = false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import java.util.Optional
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger

import com.typesafe.config.Config
import org.apache.pekko
import pekko.actor.typed
import pekko.actor.typed.ActorRef
Expand Down Expand Up @@ -105,6 +106,8 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State](
loggerClass: Class[_],
journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None,
journalPluginConfig: Option[Config] = None,
snapshotPluginConfig: Option[Config] = None,
tagger: Event => Set[String] = (_: Event) => Set.empty[String],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State],
Expand Down Expand Up @@ -132,7 +135,8 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State](
case _ => false
}
if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""))
val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""),
journalPluginConfig, snapshotPluginConfig)

// stashState outside supervise because StashState should survive restarts due to persist failures
val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
Expand Down Expand Up @@ -261,6 +265,14 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State](
copy(snapshotPluginId = if (id != "") Some(id) else None)
}

override def withJournalPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
copy(journalPluginConfig = config)
}

override def withSnapshotPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
copy(snapshotPluginConfig = config)
}

override def withSnapshotSelectionCriteria(
selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection.toClassic))
Expand Down
Loading

0 comments on commit 4afe7cf

Please sign in to comment.