Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import pekko.persistence.jdbc.config.{ ConfigKeys, SlickConfiguration }
import pekko.persistence.jdbc.util.ConfigOps._
import com.typesafe.config.{ Config, ConfigObject }

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{ Failure, Success }

object SlickExtension extends ExtensionId[SlickExtensionImpl] with ExtensionIdProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package org.apache.pekko.persistence.jdbc.journal.dao

import org.apache.pekko
import pekko.NotUsed
import pekko.dispatch.ExecutionContexts
import pekko.persistence.jdbc.PekkoSerialization
import pekko.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import pekko.persistence.jdbc.journal.dao.JournalTables.JournalPekkoSerializationRow
Expand Down Expand Up @@ -52,7 +51,7 @@ class DefaultJournalDao(
override def baseDaoConfig: BaseDaoConfig = journalConfig.daoConfig

override def writeJournalRows(xs: immutable.Seq[(JournalPekkoSerializationRow, Set[String])]): Future[Unit] = {
db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())(ExecutionContexts.parasitic)
db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())(ExecutionContext.parasitic)
}

val queries =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import pekko.persistence.jdbc.query.scaladsl.{ JdbcReadJournal => ScalaJdbcReadJ
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.persistence.query.javadsl._
import pekko.stream.javadsl.Source
import pekko.util.FutureConverters._
import pekko.util.OptionConverters._

import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters._

object JdbcReadJournal {
final val Identifier = ScalaJdbcReadJournal.Identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import pekko.persistence.jdbc.config.SnapshotConfig
import pekko.serialization.Serialization
import pekko.stream.Materializer
import SnapshotTables._
import pekko.dispatch.ExecutionContexts
import pekko.persistence.jdbc.PekkoSerialization

import scala.concurrent.{ ExecutionContext, Future }
Expand Down Expand Up @@ -107,23 +106,23 @@ class DefaultSnapshotDao(

override def save(snapshotMetadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
val eventualSnapshotRow = Future.fromTry(serializeSnapshot(snapshotMetadata, snapshot))
eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContexts.parasitic)
eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContext.parasitic)
}

override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete)
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)

override def deleteAllSnapshots(persistenceId: String): Future[Unit] =
db.run(queries.selectAll(persistenceId).delete).map(_ => ())(ExecutionContexts.parasitic)
db.run(queries.selectAll(persistenceId).delete).map(_ => ())(ExecutionContext.parasitic)

override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete)
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)

override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete)
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)

override def deleteUpToMaxSequenceNrAndMaxTimestamp(
persistenceId: String,
Expand All @@ -133,5 +132,5 @@ class DefaultSnapshotDao(
queries
.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.delete)
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package org.apache.pekko.persistence.jdbc.state.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.concurrent.ExecutionContext

import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.{ Done, NotUsed }
Expand All @@ -27,9 +27,11 @@ import pekko.persistence.query.{ DurableStateChange, Offset }
import pekko.persistence.query.javadsl.DurableStateStoreQuery
import pekko.persistence.state.javadsl.{ DurableStateUpdateStore, GetObjectResult }
import pekko.stream.javadsl.Source
import pekko.util.FutureConverters._
import slick.jdbc.JdbcProfile

import scala.concurrent.ExecutionContext
import scala.jdk.FutureConverters._

object JdbcDurableStateStore {
val Identifier = ScalaJdbcDurableStateStore.Identifier
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.ExtendedActorSystem
import pekko.annotation.ApiMayChange
import pekko.dispatch.ExecutionContexts
import pekko.pattern.ask
import pekko.persistence.jdbc.PekkoSerialization
import pekko.persistence.jdbc.state.DurableStateQueries
Expand Down Expand Up @@ -131,7 +130,7 @@ class JdbcDurableStateStore[A](
.foreach(throw _)
}
Done
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)

override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = {
Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import pekko.Done
import pekko.actor.ClassicActorSystemProvider
import pekko.annotation.ApiMayChange
import pekko.persistence.jdbc.testkit.internal.SchemaUtilsImpl
import pekko.util.FutureConverters._
import org.slf4j.LoggerFactory

import scala.jdk.FutureConverters._

object SchemaUtils {

private val logger = LoggerFactory.getLogger("org.apache.pekko.persistence.jdbc.testkit.javadsl.SchemaUtils")
Expand Down