diff --git a/modules/api/src/main/scala/com/github/scoquelin/arugula/commands/RedisBaseAsyncCommands.scala b/modules/api/src/main/scala/com/github/scoquelin/arugula/commands/RedisBaseAsyncCommands.scala index 1e58208..2504999 100644 --- a/modules/api/src/main/scala/com/github/scoquelin/arugula/commands/RedisBaseAsyncCommands.scala +++ b/modules/api/src/main/scala/com/github/scoquelin/arugula/commands/RedisBaseAsyncCommands.scala @@ -1,9 +1,100 @@ package com.github.scoquelin.arugula.commands import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration trait RedisBaseAsyncCommands[K, V] { + + /** + * Echo the message + * @param message The message to echo + * @return The message + */ + def echo(message: V): Future[V] + + /** + * Ping the server + * @return PONG if the server is alive + */ def ping: Future[String] + + /** + * Publish a message to a channel + * @param channel The channel to publish to + * @param message The message to publish + * @return The number of clients that received the message + */ + def publish(channel: K, message: V): Future[Long] + + /** + * Get the list of channels + * @return The list of channels + */ + def pubsubChannels(): Future[List[K]] + + /** + * Get the list of channels matching the pattern + * @param pattern The pattern to match + * @return The list of channels matching the pattern + */ + def pubsubChannels(pattern: K): Future[List[K]] + + /** + * Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels. + * @param channels The channels to get the number of subscribers for + * @return The number of subscribers for each channel + */ + def pubsubNumsub(channels: K*): Future[Map[K, Long]] + + /** + * Returns the number of pattern subscribers + * @return The number of pattern subscribers + */ + def pubsubNumpat(): Future[Long] + + /** + * Instructs Redis to disconnect the connection. + * Note that if auto-reconnect is enabled then it will auto-reconnect if the connection was disconnected. + * @return Unit + */ + def quit(): Future[Unit] + + /** + * Switch the connection to read-only mode + * @return Unit + */ + def readOnly(): Future[Unit] + + /** + * Switch the connection to read-write mode + * @return Unit + */ + def readWrite(): Future[Unit] + + /** + * Get the role of the server. + * The role can be one of: + * - Master + * - Slave + * - Sentinel + * Each role has different information associated with it. + * Match on the role to get the specific information. + * @see https://redis.io/commands/role + * @return The role of the server + */ + def role(): Future[RedisBaseAsyncCommands.Role] + + /** + * Wait for replication + * @param replicas The number of replicas to wait for + * @param timeout The timeout to wait for replication + * @return The number of replicas that acknowledged the write + */ + def waitForReplication( + replicas: Int, + timeout: FiniteDuration + ): Future[Long] + } object RedisBaseAsyncCommands { @@ -11,4 +102,75 @@ object RedisBaseAsyncCommands { final case class ScanResults[T](cursor: String, finished: Boolean, values: T) -} \ No newline at end of file + sealed trait LinkStatus + object LinkStatus { + case object Connect extends LinkStatus + case object Connecting extends LinkStatus + case object Sync extends LinkStatus + case object Connected extends LinkStatus + + def apply(status: String): LinkStatus = status match { + case "connect" => Connect + case "connecting" => Connecting + case "sync" => Sync + case "connected" => Connected + case _ => throw new IllegalArgumentException(s"Unknown link status: $status") + } + } + + /** + * The role of the server + */ + sealed trait Role + + + object Role { + /** + * The master role + * @param replicationOffset The replication offset + * @param replicas The list of replicas + */ + case class Master( + replicationOffset: Long, + replicas: List[Replica] + ) extends Role + + /** + * The slave role + * @param masterIp The IP of the master + * @param masterPort The port of the master + * @param masterReplicationOffset The replication offset of the master + * @param linkStatus The link status + * @param replicationOffset The replication offset + */ + case class Slave( + masterIp: String, + masterPort: Int, + masterReplicationOffset: Long, + linkStatus: LinkStatus, + replicationOffset: Long, + ) extends Role + + /** + * The sentinel role + * @param masterNames The list of master names + */ + case class Sentinel( + masterNames: List[String] + ) extends Role + } + + /** + * A replica + * @param host The host of the replica + * @param port The port of the replica + * @param replicationOffset The replication offset of the replica + */ + case class Replica( + host: String, + port: Int, + replicationOffset: Long + ) + +} + diff --git a/modules/core/src/main/scala/com/github/scoquelin/arugula/commands/LettuceRedisBaseAsyncCommands.scala b/modules/core/src/main/scala/com/github/scoquelin/arugula/commands/LettuceRedisBaseAsyncCommands.scala index b93d600..4457be9 100644 --- a/modules/core/src/main/scala/com/github/scoquelin/arugula/commands/LettuceRedisBaseAsyncCommands.scala +++ b/modules/core/src/main/scala/com/github/scoquelin/arugula/commands/LettuceRedisBaseAsyncCommands.scala @@ -1,9 +1,88 @@ package com.github.scoquelin.arugula.commands +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala} import com.github.scoquelin.arugula.internal.LettuceRedisCommandDelegation private[arugula] trait LettuceRedisBaseAsyncCommands[K, V] extends RedisBaseAsyncCommands[K, V] with LettuceRedisCommandDelegation[K, V] { - override def ping: Future[String] = delegateRedisClusterCommandAndLift(_.ping()) + + override def echo(message: V): Future[V] = + delegateRedisClusterCommandAndLift(_.echo(message)) + + override def ping: Future[String] = + delegateRedisClusterCommandAndLift(_.ping()) + + override def publish(channel: K, message: V): Future[Long] = + delegateRedisClusterCommandAndLift(_.publish(channel, message)).map(Long2long) + + override def pubsubChannels(): Future[List[K]] = + delegateRedisClusterCommandAndLift(_.pubsubChannels()).map(_.toList) + + override def pubsubChannels(pattern: K): Future[List[K]] = + delegateRedisClusterCommandAndLift(_.pubsubChannels(pattern)).map(_.toList) + + override def pubsubNumsub(channels: K*): Future[Map[K, Long]] = + delegateRedisClusterCommandAndLift(_.pubsubNumsub(channels: _*)).map(_.asScala.map{ + case (k, v) => (k, Long2long(v)) + }.toMap) + + override def pubsubNumpat(): Future[Long] = + delegateRedisClusterCommandAndLift(_.pubsubNumpat()).map(Long2long) + + override def quit(): Future[Unit] = + delegateRedisClusterCommandAndLift(_.quit()).map(_ => ()) + + override def readOnly(): Future[Unit] = + delegateRedisClusterCommandAndLift(_.readOnly()).map(_ => ()) + + override def readWrite(): Future[Unit] = + delegateRedisClusterCommandAndLift(_.readWrite()).map(_ => ()) + + override def role(): Future[RedisBaseAsyncCommands.Role] = { + delegateRedisClusterCommandAndLift(_.role()).flatMap { info => + val role = info.get(0).asInstanceOf[String] + role match { + case "master" => + Future.successful(RedisBaseAsyncCommands.Role.Master( + replicationOffset = info.get(1).asInstanceOf[java.lang.Long], + replicas = info.get(2).asInstanceOf[java.util.List[AnyRef]].asScala.map { replicaInfo => + val parts = replicaInfo.asInstanceOf[java.util.List[AnyRef]] + RedisBaseAsyncCommands.Replica( + host = parts.get(0).asInstanceOf[String], + port = parts.get(1).asInstanceOf[String].toIntOption.getOrElse(0), + replicationOffset = parts.get(2).asInstanceOf[String].toLongOption.getOrElse(0), + ) + }.toList + )) + + case "slave" => + Future.successful(RedisBaseAsyncCommands.Role.Slave( + masterIp = info.get(1).asInstanceOf[String], + masterPort = info.get(2).asInstanceOf[java.lang.Integer], + masterReplicationOffset = info.get(3).asInstanceOf[String].toLongOption.getOrElse(0), + linkStatus = RedisBaseAsyncCommands.LinkStatus(info.get(4).asInstanceOf[String]), + replicationOffset = info.get(5).asInstanceOf[java.lang.Integer].longValue(), + )) + + case "sentinel" => + Future.successful(RedisBaseAsyncCommands.Role.Sentinel( + masterNames = info.get(1).asInstanceOf[java.util.List[String]].asScala.toList + )) + + case _ => + Future.failed(new IllegalStateException("Role command did not return expected values")) + } + } + } + + + override def waitForReplication( + replicas: Int, + timeout: FiniteDuration + ): Future[Long] = + delegateRedisClusterCommandAndLift(_.waitForReplication(replicas, timeout.toMillis)).map(Long2long) + } diff --git a/modules/tests/it/src/test/scala/com/github/scoquelin/arugula/RedisCommandsIntegrationSpec.scala b/modules/tests/it/src/test/scala/com/github/scoquelin/arugula/RedisCommandsIntegrationSpec.scala index 8af02f3..9eb4df4 100644 --- a/modules/tests/it/src/test/scala/com/github/scoquelin/arugula/RedisCommandsIntegrationSpec.scala +++ b/modules/tests/it/src/test/scala/com/github/scoquelin/arugula/RedisCommandsIntegrationSpec.scala @@ -9,7 +9,7 @@ import org.scalatest.matchers.should.Matchers import scala.concurrent.duration._ import com.github.scoquelin.arugula.commands.RedisBaseAsyncCommands.InitialCursor -import com.github.scoquelin.arugula.commands.{RedisKeyAsyncCommands, RedisListAsyncCommands} +import com.github.scoquelin.arugula.commands.{RedisBaseAsyncCommands, RedisKeyAsyncCommands, RedisListAsyncCommands} import com.github.scoquelin.arugula.commands.RedisStringAsyncCommands.{BitFieldCommand, BitFieldDataType} import java.time.Instant @@ -30,6 +30,15 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with } yield succeed } } + + "support ROLE command" in { + withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client => + for { + role <- client.role() + _ <- role shouldBe a[RedisBaseAsyncCommands.Role.Master] + } yield succeed + } + } } "leveraging RedisKeyAsyncCommands" should { diff --git a/modules/tests/test/src/test/scala/com/github/scoquelin/arugula/LettuceRedisBaseAsyncCommandsSpec.scala b/modules/tests/test/src/test/scala/com/github/scoquelin/arugula/LettuceRedisBaseAsyncCommandsSpec.scala index d6ffe12..3af357c 100644 --- a/modules/tests/test/src/test/scala/com/github/scoquelin/arugula/LettuceRedisBaseAsyncCommandsSpec.scala +++ b/modules/tests/test/src/test/scala/com/github/scoquelin/arugula/LettuceRedisBaseAsyncCommandsSpec.scala @@ -1,5 +1,9 @@ package com.github.scoquelin.arugula +import scala.concurrent.duration.DurationInt +import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala} + +import com.github.scoquelin.arugula.commands.RedisBaseAsyncCommands import io.lettuce.core.RedisFuture import org.mockito.Mockito.{verify, when} import org.scalatest.matchers.must.Matchers @@ -15,6 +19,20 @@ class LettuceRedisBaseAsyncCommandsSpec extends wordspec.FixtureAsyncWordSpec wi "LettuceRedisBaseAsyncCommands" should { + "delegate ECHO command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val expectedValue = "Hello, world!" + val mockRedisFuture: RedisFuture[String] = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.echo(expectedValue)).thenReturn(mockRedisFuture) + + testClass.echo(expectedValue).map { result => + result mustBe expectedValue + verify(lettuceAsyncCommands).echo(expectedValue) + succeed + } + } + "delegate PING command to Lettuce and lift result into a Future" in { testContext => import testContext._ @@ -28,5 +46,169 @@ class LettuceRedisBaseAsyncCommandsSpec extends wordspec.FixtureAsyncWordSpec wi succeed } } + + "delegate PUBLISH command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val expectedValue = 1L + val mockRedisFuture: RedisFuture[java.lang.Long] = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.publish("channel", "message")).thenReturn(mockRedisFuture) + + testClass.publish("channel", "message").map { result => + result mustBe expectedValue + verify(lettuceAsyncCommands).publish("channel", "message") + succeed + } + } + + "delegate PUBSUB CHANNELS command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val expectedValue = java.util.List.of("channel1", "channel2") + val mockRedisFuture: RedisFuture[java.util.List[String]] = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.pubsubChannels()).thenReturn(mockRedisFuture) + + testClass.pubsubChannels().map { result => + result mustBe expectedValue.asScala.toList + verify(lettuceAsyncCommands).pubsubChannels() + succeed + } + } + + "delegate PUBSUB CHANNELS with pattern command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val expectedValue = java.util.List.of("channel1", "channel2") + val mockRedisFuture: RedisFuture[java.util.List[String]] = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.pubsubChannels("chan*")).thenReturn(mockRedisFuture) + + testClass.pubsubChannels("chan*").map { result => + result mustBe expectedValue.asScala.toList + verify(lettuceAsyncCommands).pubsubChannels("chan*") + succeed + } + } + + "delegate PUBSUB NUMSUB command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val expectedValue: java.util.Map[String, java.lang.Long] = new java.util.HashMap[String, java.lang.Long]() + expectedValue.put("channel1", 1L) + expectedValue.put("channel2", 2L) + val mockRedisFuture: RedisFuture[java.util.Map[String, java.lang.Long]] = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.pubsubNumsub("channel1", "channel2")).thenReturn(mockRedisFuture) + + testClass.pubsubNumsub("channel1", "channel2").map { result => + result mustBe expectedValue.asScala.map { case (k, v) => (k, Long2long(v)) } + verify(lettuceAsyncCommands).pubsubNumsub("channel1", "channel2") + succeed + } + } + + "delegate PUBSUB NUMPAT command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val expectedValue = 1L + val mockRedisFuture: RedisFuture[java.lang.Long] = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.pubsubNumpat()).thenReturn(mockRedisFuture) + + testClass.pubsubNumpat().map { result => + result mustBe expectedValue + verify(lettuceAsyncCommands).pubsubNumpat() + succeed + } + } + + "delegate QUIT command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val mockRedisFuture: RedisFuture[String] = mockRedisFutureToReturn("OK") + when(lettuceAsyncCommands.quit()).thenReturn(mockRedisFuture) + + testClass.quit().map { result => + verify(lettuceAsyncCommands).quit() + succeed + } + } + + "delegate READONLY command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val mockRedisFuture: RedisFuture[String] = mockRedisFutureToReturn("OK") + when(lettuceAsyncCommands.readOnly()).thenReturn(mockRedisFuture) + + testClass.readOnly().map { result => + verify(lettuceAsyncCommands).readOnly() + succeed + } + } + + "delegate READWRITE command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val mockRedisFuture: RedisFuture[String] = mockRedisFutureToReturn("OK") + when(lettuceAsyncCommands.readWrite()).thenReturn(mockRedisFuture) + + testClass.readWrite().map { result => + verify(lettuceAsyncCommands).readWrite() + succeed + } + } + + "delegate ROLE command to Lettuce and lift master result into a Future" in { testContext => + import testContext._ + + val expectedValue = java.util.List.of("master", 3456789L, java.util.List.of(java.util.List.of("host", "1234", "0"))) + val mockRedisFuture = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.role()).thenReturn(mockRedisFuture) + + testClass.role().map { result => + result mustBe RedisBaseAsyncCommands.Role.Master(3456789, List(RedisBaseAsyncCommands.Replica("host", 1234, 0))) + verify(lettuceAsyncCommands).role() + succeed + } + } + + "delegate ROLE command to Lettuce and lift slave result into a Future" in { testContext => + import testContext._ + + val expectedValue = java.util.List.of("slave", "masterIp", 1234, "1234567", "connected", 1234567) + val mockRedisFuture = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.role()).thenReturn(mockRedisFuture) + + testClass.role().map { result => + result mustBe RedisBaseAsyncCommands.Role.Slave("masterIp", 1234, 1234567, RedisBaseAsyncCommands.LinkStatus.Connected, 1234567) + verify(lettuceAsyncCommands).role() + succeed + } + } + + "delegate ROLE command to Lettuce and lift sentinel result into a Future" in { testContext => + import testContext._ + + val expectedValue = java.util.List.of("sentinel", java.util.List.of("master1", "master2")) + val mockRedisFuture = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.role()).thenReturn(mockRedisFuture) + + testClass.role().map { result => + result mustBe RedisBaseAsyncCommands.Role.Sentinel(List("master1", "master2")) + verify(lettuceAsyncCommands).role() + succeed + } + } + + "delegate WAIT command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + + val expectedValue = 1L + val mockRedisFuture: RedisFuture[java.lang.Long] = mockRedisFutureToReturn(expectedValue) + when(lettuceAsyncCommands.waitForReplication(1, java.time.Duration.ofSeconds(1).toMillis)).thenReturn(mockRedisFuture) + + testClass.waitForReplication(1, 1.second).map { result => + result mustBe expectedValue + verify(lettuceAsyncCommands).waitForReplication(1, 1000) + succeed + } + } } }