Skip to content

Commit

Permalink
Merge pull request #40 from scoquelin/jl-base
Browse files Browse the repository at this point in the history
implement most of the remaining base commands
  • Loading branch information
72squared authored Aug 30, 2024
2 parents 7e50f83 + fb6bf2a commit 87d8bb0
Show file tree
Hide file tree
Showing 4 changed files with 435 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,176 @@
package com.github.scoquelin.arugula.commands

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration

trait RedisBaseAsyncCommands[K, V] {

/**
* Echo the message
* @param message The message to echo
* @return The message
*/
def echo(message: V): Future[V]

/**
* Ping the server
* @return PONG if the server is alive
*/
def ping: Future[String]

/**
* Publish a message to a channel
* @param channel The channel to publish to
* @param message The message to publish
* @return The number of clients that received the message
*/
def publish(channel: K, message: V): Future[Long]

/**
* Get the list of channels
* @return The list of channels
*/
def pubsubChannels(): Future[List[K]]

/**
* Get the list of channels matching the pattern
* @param pattern The pattern to match
* @return The list of channels matching the pattern
*/
def pubsubChannels(pattern: K): Future[List[K]]

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels.
* @param channels The channels to get the number of subscribers for
* @return The number of subscribers for each channel
*/
def pubsubNumsub(channels: K*): Future[Map[K, Long]]

/**
* Returns the number of pattern subscribers
* @return The number of pattern subscribers
*/
def pubsubNumpat(): Future[Long]

/**
* Instructs Redis to disconnect the connection.
* Note that if auto-reconnect is enabled then it will auto-reconnect if the connection was disconnected.
* @return Unit
*/
def quit(): Future[Unit]

/**
* Switch the connection to read-only mode
* @return Unit
*/
def readOnly(): Future[Unit]

/**
* Switch the connection to read-write mode
* @return Unit
*/
def readWrite(): Future[Unit]

/**
* Get the role of the server.
* The role can be one of:
* - Master
* - Slave
* - Sentinel
* Each role has different information associated with it.
* Match on the role to get the specific information.
* @see https://redis.io/commands/role
* @return The role of the server
*/
def role(): Future[RedisBaseAsyncCommands.Role]

/**
* Wait for replication
* @param replicas The number of replicas to wait for
* @param timeout The timeout to wait for replication
* @return The number of replicas that acknowledged the write
*/
def waitForReplication(
replicas: Int,
timeout: FiniteDuration
): Future[Long]

}

object RedisBaseAsyncCommands {
val InitialCursor: String = "0"

final case class ScanResults[T](cursor: String, finished: Boolean, values: T)

}
sealed trait LinkStatus
object LinkStatus {
case object Connect extends LinkStatus
case object Connecting extends LinkStatus
case object Sync extends LinkStatus
case object Connected extends LinkStatus

def apply(status: String): LinkStatus = status match {
case "connect" => Connect
case "connecting" => Connecting
case "sync" => Sync
case "connected" => Connected
case _ => throw new IllegalArgumentException(s"Unknown link status: $status")
}
}

/**
* The role of the server
*/
sealed trait Role


object Role {
/**
* The master role
* @param replicationOffset The replication offset
* @param replicas The list of replicas
*/
case class Master(
replicationOffset: Long,
replicas: List[Replica]
) extends Role

/**
* The slave role
* @param masterIp The IP of the master
* @param masterPort The port of the master
* @param masterReplicationOffset The replication offset of the master
* @param linkStatus The link status
* @param replicationOffset The replication offset
*/
case class Slave(
masterIp: String,
masterPort: Int,
masterReplicationOffset: Long,
linkStatus: LinkStatus,
replicationOffset: Long,
) extends Role

/**
* The sentinel role
* @param masterNames The list of master names
*/
case class Sentinel(
masterNames: List[String]
) extends Role
}

/**
* A replica
* @param host The host of the replica
* @param port The port of the replica
* @param replicationOffset The replication offset of the replica
*/
case class Replica(
host: String,
port: Int,
replicationOffset: Long
)

}

Original file line number Diff line number Diff line change
@@ -1,9 +1,88 @@
package com.github.scoquelin.arugula.commands

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala}

import com.github.scoquelin.arugula.internal.LettuceRedisCommandDelegation

private[arugula] trait LettuceRedisBaseAsyncCommands[K, V] extends RedisBaseAsyncCommands[K, V] with LettuceRedisCommandDelegation[K, V] {
override def ping: Future[String] = delegateRedisClusterCommandAndLift(_.ping())

override def echo(message: V): Future[V] =
delegateRedisClusterCommandAndLift(_.echo(message))

override def ping: Future[String] =
delegateRedisClusterCommandAndLift(_.ping())

override def publish(channel: K, message: V): Future[Long] =
delegateRedisClusterCommandAndLift(_.publish(channel, message)).map(Long2long)

override def pubsubChannels(): Future[List[K]] =
delegateRedisClusterCommandAndLift(_.pubsubChannels()).map(_.toList)

override def pubsubChannels(pattern: K): Future[List[K]] =
delegateRedisClusterCommandAndLift(_.pubsubChannels(pattern)).map(_.toList)

override def pubsubNumsub(channels: K*): Future[Map[K, Long]] =
delegateRedisClusterCommandAndLift(_.pubsubNumsub(channels: _*)).map(_.asScala.map{
case (k, v) => (k, Long2long(v))
}.toMap)

override def pubsubNumpat(): Future[Long] =
delegateRedisClusterCommandAndLift(_.pubsubNumpat()).map(Long2long)

override def quit(): Future[Unit] =
delegateRedisClusterCommandAndLift(_.quit()).map(_ => ())

override def readOnly(): Future[Unit] =
delegateRedisClusterCommandAndLift(_.readOnly()).map(_ => ())

override def readWrite(): Future[Unit] =
delegateRedisClusterCommandAndLift(_.readWrite()).map(_ => ())

override def role(): Future[RedisBaseAsyncCommands.Role] = {
delegateRedisClusterCommandAndLift(_.role()).flatMap { info =>
val role = info.get(0).asInstanceOf[String]
role match {
case "master" =>
Future.successful(RedisBaseAsyncCommands.Role.Master(
replicationOffset = info.get(1).asInstanceOf[java.lang.Long],
replicas = info.get(2).asInstanceOf[java.util.List[AnyRef]].asScala.map { replicaInfo =>
val parts = replicaInfo.asInstanceOf[java.util.List[AnyRef]]
RedisBaseAsyncCommands.Replica(
host = parts.get(0).asInstanceOf[String],
port = parts.get(1).asInstanceOf[String].toIntOption.getOrElse(0),
replicationOffset = parts.get(2).asInstanceOf[String].toLongOption.getOrElse(0),
)
}.toList
))

case "slave" =>
Future.successful(RedisBaseAsyncCommands.Role.Slave(
masterIp = info.get(1).asInstanceOf[String],
masterPort = info.get(2).asInstanceOf[java.lang.Integer],
masterReplicationOffset = info.get(3).asInstanceOf[String].toLongOption.getOrElse(0),
linkStatus = RedisBaseAsyncCommands.LinkStatus(info.get(4).asInstanceOf[String]),
replicationOffset = info.get(5).asInstanceOf[java.lang.Integer].longValue(),
))

case "sentinel" =>
Future.successful(RedisBaseAsyncCommands.Role.Sentinel(
masterNames = info.get(1).asInstanceOf[java.util.List[String]].asScala.toList
))

case _ =>
Future.failed(new IllegalStateException("Role command did not return expected values"))
}
}
}


override def waitForReplication(
replicas: Int,
timeout: FiniteDuration
): Future[Long] =
delegateRedisClusterCommandAndLift(_.waitForReplication(replicas, timeout.toMillis)).map(Long2long)

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.scalatest.matchers.should.Matchers
import scala.concurrent.duration._

import com.github.scoquelin.arugula.commands.RedisBaseAsyncCommands.InitialCursor
import com.github.scoquelin.arugula.commands.{RedisKeyAsyncCommands, RedisListAsyncCommands}
import com.github.scoquelin.arugula.commands.{RedisBaseAsyncCommands, RedisKeyAsyncCommands, RedisListAsyncCommands}
import com.github.scoquelin.arugula.commands.RedisStringAsyncCommands.{BitFieldCommand, BitFieldDataType}

import java.time.Instant
Expand All @@ -30,6 +30,15 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
} yield succeed
}
}

"support ROLE command" in {
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
for {
role <- client.role()
_ <- role shouldBe a[RedisBaseAsyncCommands.Role.Master]
} yield succeed
}
}
}

"leveraging RedisKeyAsyncCommands" should {
Expand Down
Loading

0 comments on commit 87d8bb0

Please sign in to comment.