Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuring persistence plugins at runtime for EventSourcedBehavior #1518

Merged
merged 11 commits into from
Oct 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,18 @@ object PersistenceTestKitPlugin {
* Persistence testkit plugin for snapshots.
*/
@InternalApi
class PersistenceTestKitSnapshotPlugin extends SnapshotStore {
class PersistenceTestKitSnapshotPlugin(
// providing this parameter in first position as unused
// because Persistence extension that instantiates the plugins
// does not support constructors without it
@unused cfg: Config,
cfgPath: String
) extends SnapshotStore {

private final val storage = SnapshotStorageEmulatorExtension(context.system)
private final val storage = {
log.debug("Using snapshot storage emulator extension [{}] for test kit snapshot storage", cfgPath)
SnapshotStorageEmulatorExtension(context.system).storageFor(cfgPath)
}

override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
Future.fromTry(Try(storage.tryRead(persistenceId, criteria)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.apache.pekko.persistence.testkit.internal

import java.util.concurrent.ConcurrentHashMap

import org.apache.pekko
import pekko.actor.Extension
import pekko.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider }
Expand All @@ -24,17 +26,34 @@ import pekko.persistence.testkit.scaladsl.SnapshotTestKit
* INTERNAL API
*/
@InternalApi
private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorage] with ExtensionIdProvider {
private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorageEmulatorExtension]
with ExtensionIdProvider {

override def get(system: ActorSystem): SnapshotStorage = super.get(system)
override def get(system: ActorSystem): SnapshotStorageEmulatorExtension = super.get(system)

override def createExtension(system: ExtendedActorSystem): SnapshotStorage =
if (SnapshotTestKit.Settings(system).serialize) {
new SerializedSnapshotStorageImpl(system)
} else {
new SimpleSnapshotStorageImpl
}
override def createExtension(system: ExtendedActorSystem): SnapshotStorageEmulatorExtension =
new SnapshotStorageEmulatorExtension(system)

override def lookup: ExtensionId[_ <: Extension] =
SnapshotStorageEmulatorExtension
}

/**
* INTERNAL API
*/
@InternalApi
final class SnapshotStorageEmulatorExtension(system: ExtendedActorSystem) extends Extension {
private val stores = new ConcurrentHashMap[String, SnapshotStorage]()
private lazy val shouldCreateSerializedSnapshotStorage = SnapshotTestKit.Settings(system).serialize

def storageFor(key: String): SnapshotStorage =
stores.computeIfAbsent(key,
_ => {
// we don't really care about the key here, we just want separate instances
if (shouldCreateSerializedSnapshotStorage) {
new SerializedSnapshotStorageImpl(system)
} else {
new SimpleSnapshotStorageImpl
}
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ class SnapshotTestKit(system: ActorSystem)

import SnapshotTestKit._

override protected val storage: SnapshotStorage = SnapshotStorageEmulatorExtension(system)
override protected val storage: SnapshotStorage =
SnapshotStorageEmulatorExtension(system).storageFor(PersistenceTestKitSnapshotPlugin.PluginId)

override def getItem(persistenceId: String, nextInd: Int): Option[Any] = {
storage.firstInExpectNextQueue(persistenceId).map(reprToAny)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.testkit.scaladsl
pjfanning marked this conversation as resolved.
Show resolved Hide resolved

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.adapter._
import pekko.persistence.JournalProtocol.RecoverySuccess
import pekko.persistence.JournalProtocol.ReplayMessages
import pekko.persistence.JournalProtocol.ReplayedMessage
import pekko.persistence.Persistence
import pekko.persistence.SelectedSnapshot
import pekko.persistence.SnapshotProtocol.LoadSnapshot
import pekko.persistence.SnapshotProtocol.LoadSnapshotResult
import pekko.persistence.SnapshotSelectionCriteria
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.PersistenceTestKitSnapshotPlugin
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
import pekko.persistence.typed.scaladsl.EventSourcedBehavior
import pekko.persistence.typed.scaladsl.RetentionCriteria
import org.scalatest.Inside
import org.scalatest.wordspec.AnyWordSpecLike

object RuntimeJournalsSpec {

private object Actor {
sealed trait Command
case class Save(text: String, replyTo: ActorRef[Done]) extends Command
case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command
case object Stop extends Command

def apply(persistenceId: String, journal: String): Behavior[Command] =
EventSourcedBehavior[Command, String, String](
PersistenceId.ofUniqueId(persistenceId),
"",
(state, cmd) =>
cmd match {
case Save(text, replyTo) =>
Effect.persist(text).thenRun(_ => replyTo ! Done)
case ShowMeWhatYouGot(replyTo) =>
replyTo ! state
Effect.none
case Stop =>
Effect.stop()
},
(state, evt) => Seq(state, evt).filter(_.nonEmpty).mkString("|"))
.withRetention(RetentionCriteria.snapshotEvery(1, Int.MaxValue))
.withJournalPluginId(s"$journal.journal")
.withJournalPluginConfig(Some(config(journal)))
.withSnapshotPluginId(s"$journal.snapshot")
.withSnapshotPluginConfig(Some(config(journal)))

}

private def config(journal: String) = {
ConfigFactory.parseString(s"""
$journal {
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
snapshot.class = "${classOf[PersistenceTestKitSnapshotPlugin].getName}"
}
""")
}
}

class RuntimeJournalsSpec
extends ScalaTestWithActorTestKit
with AnyWordSpecLike
with LogCapturing
with Inside {

import RuntimeJournalsSpec._

"The testkit journal and snapshot store plugins" must {

"be possible to configure at runtime and use in multiple isolated instances" in {
val probe = createTestProbe[Any]()

{
// one actor in each journal with same id
val j1 = spawn(Actor("id1", "journal1"))
val j2 = spawn(Actor("id1", "journal2"))
j1 ! Actor.Save("j1m1", probe.ref)
probe.receiveMessage()
j2 ! Actor.Save("j2m1", probe.ref)
probe.receiveMessage()
}

{
def assertJournal(journal: String, expectedEvent: String) = {
val ref = Persistence(system).journalFor(s"$journal.journal", config(journal))
ref.tell(ReplayMessages(0, Long.MaxValue, Long.MaxValue, "id1", probe.ref.toClassic), probe.ref.toClassic)
inside(probe.receiveMessage()) {
case ReplayedMessage(persistentRepr) =>
persistentRepr.persistenceId shouldBe "id1"
persistentRepr.payload shouldBe expectedEvent
}
probe.expectMessage(RecoverySuccess(1))
}

assertJournal("journal1", "j1m1")
assertJournal("journal2", "j2m1")
}

{
def assertSnapshot(journal: String, expectedShapshot: String) = {
val ref = Persistence(system).snapshotStoreFor(s"$journal.snapshot", config(journal))
ref.tell(LoadSnapshot("id1", SnapshotSelectionCriteria.Latest, Long.MaxValue),
probe.ref.toClassic)
inside(probe.receiveMessage()) {
case LoadSnapshotResult(Some(SelectedSnapshot(_, snapshot)), _) =>
snapshot shouldBe expectedShapshot
}
}

assertSnapshot("journal1", "j1m1")
assertSnapshot("journal2", "j2m1")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Add EventSourcedBehavior.withJournalPluginConfig
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withJournalPluginConfig")
# Add EventSourcedBehavior.withSnapshotPluginConfig
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withSnapshotPluginConfig")
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package org.apache.pekko.persistence.typed.internal

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.util.Helpers.ConfigOps
import pekko.actor.Cancellable
import pekko.actor.typed.Signal
import pekko.actor.typed.scaladsl.ActorContext
Expand Down Expand Up @@ -74,8 +76,10 @@ private[pekko] final class BehaviorSetup[C, E, S](

val persistence: Persistence = Persistence(context.system.toClassic)

val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId)
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
val journal: ClassicActorRef = persistence
.journalFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
val snapshotStore: ClassicActorRef = persistence
.snapshotStoreFor(settings.snapshotPluginId, settings.snapshotPluginConfig.getOrElse(ConfigFactory.empty))

val isSnapshotOptional: Boolean =
Persistence(context.system.classicSystem).configFor(snapshotStore).getBoolean("snapshot-is-optional")
Expand Down Expand Up @@ -107,14 +111,18 @@ private[pekko] final class BehaviorSetup[C, E, S](

private var recoveryTimer: OptionVal[Cancellable] = OptionVal.None

val recoveryEventTimeout: FiniteDuration = persistence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private please

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is accessed in multiple places of EventSourcedBehavior state machine - ReplayingEvents and ReplayingSnapshot are using this to make error messages, so cannot be made private. Please advise if some different solution is required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a @since 1.1.3 annotation to any new public classes, variables or functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double-checking - is this required for public members of classes marked with @InternalApi and private[pekko]?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure - this whole PR is looking like something we might need to stick in a future major release as opposed to an upcoming patch release. Leave the since annotations out for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Is there any issue with creating a PR with this code in Akka repository? Wondering about any potential license conflicts, would not want interaction with BSL repositories voiding Pekko of this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to submit this to Akka. You own this code and submitting it Akka does not change that. The problem will only arise if you collaborate on some of the code with a Lightbend employee and then take that collaboration and try to submit that to us.

.journalConfigFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
.getMillisDuration("recovery-event-timeout")

def startRecoveryTimer(snapshot: Boolean): Unit = {
cancelRecoveryTimer()
implicit val ec: ExecutionContext = context.executionContext
val timer =
if (snapshot)
context.scheduleOnce(settings.recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
context.scheduleOnce(recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
else
context.system.scheduler.scheduleWithFixedDelay(settings.recoveryEventTimeout, settings.recoveryEventTimeout) {
context.system.scheduler.scheduleWithFixedDelay(recoveryEventTimeout, recoveryEventTimeout) {
() =>
context.self ! RecoveryTickEvent(snapshot = false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import java.util.Optional
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger

import com.typesafe.config.Config
import org.apache.pekko
import pekko.actor.typed
import pekko.actor.typed.ActorRef
Expand Down Expand Up @@ -105,6 +106,8 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State](
loggerClass: Class[_],
journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None,
journalPluginConfig: Option[Config] = None,
snapshotPluginConfig: Option[Config] = None,
tagger: Event => Set[String] = (_: Event) => Set.empty[String],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State],
Expand Down Expand Up @@ -132,7 +135,8 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State](
case _ => false
}
if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""))
val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""),
journalPluginConfig, snapshotPluginConfig)

// stashState outside supervise because StashState should survive restarts due to persist failures
val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
Expand Down Expand Up @@ -261,6 +265,14 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State](
copy(snapshotPluginId = if (id != "") Some(id) else None)
}

override def withJournalPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
copy(journalPluginConfig = config)
}

override def withSnapshotPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
copy(snapshotPluginConfig = config)
}

override def withSnapshotSelectionCriteria(
selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection.toClassic))
Expand Down
Loading