diff --git a/modules/api/src/main/scala/com/github/scoquelin/arugula/RedisCommandsClient.scala b/modules/api/src/main/scala/com/github/scoquelin/arugula/RedisCommandsClient.scala index 66d0223..7c79373 100644 --- a/modules/api/src/main/scala/com/github/scoquelin/arugula/RedisCommandsClient.scala +++ b/modules/api/src/main/scala/com/github/scoquelin/arugula/RedisCommandsClient.scala @@ -20,4 +20,5 @@ trait RedisCommandsClient[K, V] with RedisGeoAsyncCommands[K, V] with RedisScriptingAsyncCommands[K, V] with RedisServerAsyncCommands[K, V] + with RedisStreamAsyncCommands[K, V] with RedisPipelineAsyncCommands[K, V] \ No newline at end of file diff --git a/modules/api/src/main/scala/com/github/scoquelin/arugula/commands/RedisStreamAsyncCommands.scala b/modules/api/src/main/scala/com/github/scoquelin/arugula/commands/RedisStreamAsyncCommands.scala new file mode 100644 index 0000000..34695dc --- /dev/null +++ b/modules/api/src/main/scala/com/github/scoquelin/arugula/commands/RedisStreamAsyncCommands.scala @@ -0,0 +1,517 @@ +package com.github.scoquelin.arugula.commands + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +import java.time.Instant + +/** + * Asynchronous commands for Redis streams + * @tparam K The key type + * @tparam V The value type + */ +trait RedisStreamAsyncCommands [K, V]{ + /** + * Acknowledge one or more messages as processed + * @param key The stream key + * @param group The consumer group + * @param ids The message IDs to acknowledge + * @return The number of acknowledged messages + */ + def xAck(key: K, group: K, ids: String*): Future[Long] + + /** + * Add a new entry to a stream + * @param key The stream key + * @param values The message field-value pairs + * @return The message ID + */ + def xAdd(key: K, values: Map[K, V]): Future[String] + + /** + * Add a new entry to a stream + * @param key The stream key + * @param id Specify the message ID instead of generating one automatically + * @param values The message field-value pairs + * @return the message ID + */ + def xAdd(key: K, id: K, values: Map[K, V]): Future[String] + + /** + * Claim messages from a stream + * @param key The stream key + * @param group The consumer group + * @param consumer The consumer name + * @param minIdleTime The minimum idle time + * @param startId The start ID + * @param justId Whether to return just the IDs + * @param count The number of messages to claim + * @return The claimed messages + */ + def xAutoClaim( + key: K, group: K, consumer: K, + minIdleTime: FiniteDuration, + startId: String, + justId: Boolean=false, + count: Option[Long] = None + ): Future[RedisStreamAsyncCommands.ClaimedMessages[K, V]] + + /** + * Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group. + * @param key The stream key + * @param group The consumer group + * @param consumer The consumer name + * @param minIdleTime The minimum idle time + * @param messageIds The message IDs to claim + * @param justId Whether to return just the IDs. + * Set the JUSTID flag to return just the message id and do not increment the retry counter. + * The message body is not returned when calling XCLAIM. + * @param force Creates the pending message entry in the PEL even if certain specified IDs are not already in the PEL assigned to a different client. + * However the message must be exist in the stream, otherwise the IDs of non existing messages are ignored. + * @param retryCount Set the retry counter to the specified value. This counter is incremented every time a message is delivered again. + * Normally XCLAIM does not alter this counter, which is just served to clients when the XPENDING command is called: + * this way clients can detect anomalies, like messages that are never processed for some reason after a big number of delivery attempts. + * @param idle Set the idle time (last time it was delivered) of the message. + * If IDLE is not specified, an IDLE of 0 is assumed, that is, the time count is reset + * because the message has now a new owner trying to process it. + * @param time This is the same as IDLE but instead of a relative amount of milliseconds, + * it sets the idle time to a specific unix time (in milliseconds). + * This is useful in order to rewrite the AOF file generating XCLAIM commands. + * @return The claimed messages + */ + def xClaim( + key: K, + group: K, + consumer: K, + minIdleTime: FiniteDuration, + messageIds: List[String], + justId: Boolean = false, + force: Boolean = false, + retryCount: Option[Long] = None, + idle: Option[FiniteDuration] = None, + time: Option[Instant] = None, + ): Future[List[RedisStreamAsyncCommands.StreamMessage[K, V]]] + + + /** + * Delete one or more messages from a stream + * @param key The stream key + * @param ids The message IDs to delete + * @return The number of deleted messages + */ + def xDel(key: K, ids: String*): Future[Long] + + /** + * Create a new consumer group + * @param streamOffset The stream offset consisting of the stream key and the offset + * @param group The group name + * @param mkStream Whether to create the stream if it does not exist + * @param entriesRead The number of entries to read + * @return Unit + */ + def xGroupCreate( + streamOffset: RedisStreamAsyncCommands.StreamOffset[K], + group: K, + mkStream: Boolean = false, + entriesRead: Option[Long] = None + ): Future[Unit] + + + /** + * Create a new consumer group + * @param key The stream key + * @param group The group name + * @param consumer The consumer name + * @return Whether the consumer was created + */ + def xGroupCreateConsumer(key: K, group: K, consumer: K): Future[Boolean] + + /** + * Delete a consumer from a consumer group + * @param key The stream key + * @param group The group name + * @param consumer The consumer name + * @return The number of pending messages the consumer had before it was deleted. + */ + def xGroupDelConsumer(key: K, group: K, consumer: K): Future[Long] + + /** + * Destroy a consumer group + * @param key The stream key + * @param group The group name + * @return Whether the group was destroyed + */ + def xGroupDestroy(key: K, group: K): Future[Boolean] + + /** + * Set the ID of the last message successfully processed by a consumer + * @param streamOffset The stream offset consisting of the stream key and the offset + * @param group The group name + * @return Unit + */ + def xGroupSetId(streamOffset: RedisStreamAsyncCommands.StreamOffset[K], group: K): Future[Unit] + + /** + * Get the length of a stream + * @param key The stream key + * @return The length of the stream + */ + def xLen(key: K): Future[Long] + + /** + * Get the pending messages for a consumer group + * @param key The stream key + * @param group The group name + * @return The pending messages + */ + def xPending(key: K, group: K): Future[RedisStreamAsyncCommands.PendingMessages] + + /** + * Get the pending messages for a consumer group + * @param key The stream key + * @param group The group name + * @param consumer The consumer name + * @return The pending messages + */ + def xPending( + key: K, + group: K, + consumer: K, + range: Option[RedisStreamAsyncCommands.XRange] = None, + limit: Option[RedisStreamAsyncCommands.XRangeLimit] = None, + idle: Option[FiniteDuration] = None + ): Future[List[RedisStreamAsyncCommands.PendingMessage]] + + /** + * Get a range of messages from a stream + * @param key The stream key + * @param range The range to get + * @return The messages in the range + */ + def xRange(key: K, range: RedisStreamAsyncCommands.XRange): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] + + /** + * Get a range of messages from a stream with a limit + * @param key The stream key + * @param range The range to get + * @param limit The limit + * @return The messages in the range + */ + def xRange(key: K, range: RedisStreamAsyncCommands.XRange, limit: RedisStreamAsyncCommands.XRangeLimit): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] + + /** + * Read messages from one or more streams + * @param streams The streams to read from + * @return The messages read + */ + def xRead(streams: RedisStreamAsyncCommands.StreamOffset[K]*): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] + + /** + * Read messages from one or more streams + * @param streams The streams to read from + * @param count The maximum number of messages to read + * @param block The block duration + * @return The messages read + */ + def xRead(streams: List[RedisStreamAsyncCommands.StreamOffset[K]], count: Option[Long] = None, block: Option[FiniteDuration] = None, noAck: Boolean = false): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] + + /** + * Read messages from a consumer group + * @param group The group name + * @param consumer The consumer name + * @param streams The streams to read from + * @return The messages read + */ + def xReadGroup( + group: K, + consumer: K, + streams: RedisStreamAsyncCommands.StreamOffset[K]*, + ): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] + + /** + * Read messages from a consumer group + * @param group The group name + * @param consumer The consumer name + * @param streams The streams to read from + * @param count The maximum number of messages to read + * @param block The block duration + * @param noAck Whether to acknowledge the messages + * @return The messages read + */ + def xReadGroup( + group: K, + consumer: K, + streams: List[RedisStreamAsyncCommands.StreamOffset[K]], + count: Option[Long] = None, + block: Option[FiniteDuration] = None, + noAck: Boolean = false + ): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] + + /** + * Get a reverse range of messages from a stream + * @param key The stream key + * @param range The range to get + * @return The messages in the range + */ + def xRevRange(key: K, range: RedisStreamAsyncCommands.XRange): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] + + /** + * Trim the stream to a certain size + * @param key The stream key + * @param count The number of messages to keep + * @return The number of messages removed + */ + def xTrim(key: K, count: Long): Future[Long] + + /** + * Trim the stream to a certain size + * @param key The stream key + * @param approximateTrimming Whether to use approximate trimming + * @param count The number of messages to keep + * @return The number of messages removed + */ + def xTrim(key: K, approximateTrimming: Boolean, count: Long): Future[Long] + + /** + * Trim the stream to a certain size + * @param key The stream key + * @param args The trimming arguments + * @return The number of messages removed + */ + def xTrim(key: K, args: RedisStreamAsyncCommands.XTrimArgs): Future[Long] + + +} + +object RedisStreamAsyncCommands { + /** + * A message in a stream + * @param stream The stream key + * @param id The message ID + * @param entries The message field-value pairs + * @tparam K The key type + * @tparam V The value type + */ + case class StreamMessage[K, V](stream: K, id: String, entries: Map[K, V] = Map.empty) + + /** + * A group of claimed messages + * @param id The ID of the group + * @param messages The claimed messages + * @tparam K The key type + * @tparam V The value type + */ + case class ClaimedMessages[K, V](id: String, messages: List[StreamMessage[K, V]]) + + + /** + * A range of values + * + * @param lower The lower bound + * @param upper The upper bound + */ + final case class XRange(lower: XRange.Boundary, upper: XRange.Boundary) + + object XRange { + + /** + * A boundary value, used for range queries (upper or lower bounds) + * @param value An optional value to use as the boundary. If None, it is unbounded. + * @param inclusive Whether the boundary is inclusive. default is false. + */ + case class Boundary(value: Option[String] = None, inclusive: Boolean = false) + + object Boundary { + /** + * Create a new inclusive boundary + * @param value The value + * @return The boundary + */ + def including(value: String): Boundary = Boundary(Some(value), inclusive = true) + + /** + * Create a new inclusive boundary + * @param value The value + * @return The boundary + */ + def including(value: java.time.Instant): Boundary = Boundary(Some(s"${value.toEpochMilli.toString}-0"), inclusive = true) + + /** + * Create a new exclusive boundary + * @param value The value + * @return The boundary + */ + def excluding(value: String): Boundary = Boundary(Some(value)) + + /** + * Create a new exclusive boundary + * @param value The value + * @return The boundary + */ + def excluding(value: java.time.Instant): Boundary = Boundary(Some(s"${value.toEpochMilli.toString}-0")) + + /** + * Create an unbounded boundary + * @return The boundary + */ + def unbounded: Boundary = Boundary(None) + } + + /** + * Create a new range + * @param lower The lower bound + * @param upper The upper bound + * @return The range + */ + def apply(lower: String, upper: String): XRange = XRange(Boundary.including(lower), Boundary.including(upper)) + + + /** + * Create a new range + * @param lower The lower bound + * @param upper The upper bound + * @return The range + */ + def apply(lower: java.time.Instant, upper: java.time.Instant): XRange = XRange(Boundary.including(lower), Boundary.including(upper)) + + /** + * Create a new range + * @param lower The lower bound + * @param upper The upper bound + * @return The range + */ + def including(lower: String, upper: String): XRange = XRange(Boundary.including(lower), Boundary.including(upper)) + + /** + * Create a new range + * @param lower The lower bound + * @param upper The upper bound + * @return The range + */ + def including(lower: java.time.Instant, upper: java.time.Instant): XRange = XRange(Boundary.including(lower), Boundary.including(upper)) + + /** + * Create a new range + * @param lower The lower bound + * @param upper The upper bound + * @return The range + */ + def from(lower: String, upper: String, inclusive: Boolean = false): XRange = { + if(inclusive) XRange(Boundary.including(lower), Boundary.including(upper)) + else XRange(Boundary.excluding(lower), Boundary.excluding(upper)) + } + + def from(lower: java.time.Instant, upper: java.time.Instant): XRange = { + XRange(Boundary.excluding(lower), Boundary.excluding(upper)) + } + + def from(lower: java.time.Instant, upper: java.time.Instant, inclusive: Boolean): XRange = { + if(inclusive) XRange(Boundary.including(lower), Boundary.including(upper)) + else XRange(Boundary.excluding(lower), Boundary.excluding(upper)) + } + + + /** + * Create a new range from lower to unbounded + * @param lower The start boundary + * @return The range + */ + def fromLower(lower: String, inclusive: Boolean = false): XRange = { + if(inclusive) XRange(Boundary.including(lower), Boundary.unbounded) + else XRange(Boundary.excluding(lower), Boundary.unbounded) + } + + def fromLower(lower: java.time.Instant): XRange = { + XRange(Boundary.excluding(lower), Boundary.unbounded) + } + + def fromLower(lower: java.time.Instant, inclusive: Boolean): XRange = { + if(inclusive) XRange(Boundary.including(lower), Boundary.unbounded) + else XRange(Boundary.excluding(lower), Boundary.unbounded) + } + + /** + * Create a new range to upper bound + * @param upper The upper boundary + * @return The range + */ + def toUpper(upper: String, inclusive: Boolean = false): XRange = { + if(inclusive) XRange(Boundary.unbounded, Boundary.including(upper)) + else XRange(Boundary.unbounded, Boundary.excluding(upper)) + } + + def toUpper(upper: java.time.Instant): XRange = { + XRange(Boundary.unbounded, Boundary.excluding(upper)) + } + + def toUpper(upper: java.time.Instant, inclusive: Boolean): XRange = { + if(inclusive) XRange(Boundary.unbounded, Boundary.including(upper)) + else XRange(Boundary.unbounded, Boundary.excluding(upper)) + } + + /** + * Create a new unbounded range + * @return The range + */ + def unbounded: XRange = new XRange(Boundary.unbounded, Boundary.unbounded) + } + + /** + * A range limit + * + * @param offset The offset + * @param count The count + */ + final case class XRangeLimit(offset: Long, count: Long) + + + /** + * Arguments for trimming a stream + * @param maxLen The maximum length of the stream + * @param approximateTrimming Whether to use approximate trimming + * @param exactTrimming Whether to use exact trimming + * @param minId The minimum ID to trim to + * @param limit The maximum number of elements to trim + */ + case class XTrimArgs( + maxLen: Option[Long] = None, + approximateTrimming: Boolean = false, + exactTrimming: Boolean = false, + minId: Option[String] = None, + limit: Option[Long] = None + ) + + case class StreamOffset[K](name: K, offset: String) + + object StreamOffset { + def latest[K](key: K): StreamOffset[K] = StreamOffset(key, "$") + + def earliest[K](key: K): StreamOffset[K] = StreamOffset(key, "0") + + def lastConsumed[K](key: K): StreamOffset[K] = StreamOffset(key, ">") + } + + /** + * A pending message + * @param count The number of pending messages + * @param messageIds a range of message IDs + * @param consumerMessageCount The number of messages per consumer + */ + case class PendingMessages(count: Long, messageIds: XRange, consumerMessageCount: Map[String, Long] = Map.empty) + + + /** + * A pending message + * @param id The message ID + * @param consumer The consumer name + * @param sinceLastDelivery The time since the last delivery + * @param reDeliveryCount The number of times the message has been redelivered + */ + case class PendingMessage(id: String, consumer: String, sinceLastDelivery: FiniteDuration, reDeliveryCount: Long) + +} + +// need to implement: +//xinfoStream +//xinfoGroups +//xinfoConsumers \ No newline at end of file diff --git a/modules/core/src/main/scala/com/github/scoquelin/arugula/LettuceRedisCommandsClient.scala b/modules/core/src/main/scala/com/github/scoquelin/arugula/LettuceRedisCommandsClient.scala index 8632af8..8ab85e3 100644 --- a/modules/core/src/main/scala/com/github/scoquelin/arugula/LettuceRedisCommandsClient.scala +++ b/modules/core/src/main/scala/com/github/scoquelin/arugula/LettuceRedisCommandsClient.scala @@ -38,6 +38,7 @@ private[arugula] class LettuceRedisCommandsClient[K, V]( with LettuceRedisStringAsyncCommands[K, V] with LettuceRedisSetAsyncCommands[K, V] with LettuceRedisSortedSetAsyncCommands[K, V] + with LettuceRedisStreamAsyncCommands[K, V] with LettuceRedisGeoAsyncCommands[K, V] with LettuceRedisPipelineAsyncCommands[K, V] { diff --git a/modules/core/src/main/scala/com/github/scoquelin/arugula/commands/LettuceRedisStreamAsyncCommands.scala b/modules/core/src/main/scala/com/github/scoquelin/arugula/commands/LettuceRedisStreamAsyncCommands.scala new file mode 100644 index 0000000..6e26406 --- /dev/null +++ b/modules/core/src/main/scala/com/github/scoquelin/arugula/commands/LettuceRedisStreamAsyncCommands.scala @@ -0,0 +1,268 @@ +package com.github.scoquelin.arugula.commands + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsJava, MapHasAsScala} +import scala.jdk.DurationConverters.{JavaDurationOps, ScalaDurationOps} + +import com.github.scoquelin.arugula.commands.RedisStreamAsyncCommands.XRange +import com.github.scoquelin.arugula.internal.LettuceRedisCommandDelegation + +import java.time.Instant + +trait LettuceRedisStreamAsyncCommands[K, V] extends RedisStreamAsyncCommands[K, V] with LettuceRedisCommandDelegation[K, V] { + override def xAck(key: K, group: K, ids: String*): Future[Long] = + delegateRedisClusterCommandAndLift(_.xack(key, group, ids: _*)).map(Long2long) + + override def xAdd(key: K, values: Map[K, V]): Future[String] = { + delegateRedisClusterCommandAndLift(_.xadd(key, values.asJava)) + } + + override def xAdd(key: K, id: K, values: Map[K, V]): Future[String] ={ + delegateRedisClusterCommandAndLift(_.xadd(key, id, values.asJava)) + } + + override def xAutoClaim(key: K, group: K, consumer: K, minIdleTime: FiniteDuration, startId: String, justId: Boolean=false, count: Option[Long] = None): Future[RedisStreamAsyncCommands.ClaimedMessages[K, V]] = { + val args: io.lettuce.core.XAutoClaimArgs[K] = io.lettuce.core.XAutoClaimArgs.Builder.xautoclaim( + io.lettuce.core.Consumer.from(group, consumer), + minIdleTime.toJava, + startId + ) + if(justId) args.justid() + count.foreach(args.count) + + delegateRedisClusterCommandAndLift(_.xautoclaim(key, args)).map{ claimedMessages => + RedisStreamAsyncCommands.ClaimedMessages(claimedMessages.getId, claimedMessages.getMessages.asScala.map { kv => + RedisStreamAsyncCommands.StreamMessage(key, kv.getId, kv.getBody.asScala.toMap) + }.toList) + } + } + + override def xClaim(key: K, + group: K, + consumer: K, + minIdleTime: FiniteDuration, + messageIds: List[String], + justId: Boolean = false, + force: Boolean = false, + retryCount: Option[Long] = None, + idle: Option[FiniteDuration] = None, + time: Option[Instant] = None, + ): Future[List[RedisStreamAsyncCommands.StreamMessage[K, V]]] = { + val args: io.lettuce.core.XClaimArgs = io.lettuce.core.XClaimArgs.Builder.minIdleTime(minIdleTime.toJava) + if (justId) args.justid() + if(force) args.force() + retryCount.foreach(args.retryCount) + idle.foreach(d => args.idle(d.toJava)) + time.foreach(d => args.time(d.toEpochMilli)) + delegateRedisClusterCommandAndLift(_.xclaim(key, io.lettuce.core.Consumer.from( + group, consumer + ), args, messageIds: _*)).map(_.asScala.toList.map { kv => + RedisStreamAsyncCommands.StreamMessage(key, kv.getId, kv.getBody.asScala.toMap) + }) + } + + override def xDel(key: K, ids: String*): Future[Long] = + delegateRedisClusterCommandAndLift(_.xdel(key, ids: _*)).map(Long2long) + + override def xGroupCreate( + streamOffset: RedisStreamAsyncCommands.StreamOffset[K], + group: K, + mkStream: Boolean = false, + entriesRead: Option[Long] = None + ): Future[Unit] = { + val args = io.lettuce.core.XGroupCreateArgs.Builder.mkstream(mkStream) + entriesRead.foreach(args.entriesRead) + delegateRedisClusterCommandAndLift(_.xgroupCreate(LettuceRedisStreamAsyncCommands.streamOffsetToJava(streamOffset), group, args)).map(_ => ()) + } + + override def xGroupCreateConsumer(key: K, group: K, consumer: K): Future[Boolean] = { + delegateRedisClusterCommandAndLift(_.xgroupCreateconsumer(key, io.lettuce.core.Consumer.from(group, consumer))).map(Boolean2boolean) + } + + override def xGroupDelConsumer(key: K, group: K, consumer: K): Future[Long] = { + delegateRedisClusterCommandAndLift(_.xgroupDelconsumer(key, io.lettuce.core.Consumer.from(group, consumer))).map(Long2long) + } + + override def xGroupDestroy(key: K, group: K): Future[Boolean] = { + delegateRedisClusterCommandAndLift(_.xgroupDestroy(key, group)).map(Boolean2boolean) + } + + override def xGroupSetId( + streamOffset: RedisStreamAsyncCommands.StreamOffset[K], + group: K): Future[Unit] = { + delegateRedisClusterCommandAndLift(_.xgroupSetid(LettuceRedisStreamAsyncCommands.streamOffsetToJava(streamOffset), group)).map(_ => ()) + } + + override def xLen(key: K): Future[Long] = + delegateRedisClusterCommandAndLift(_.xlen(key)).map(Long2long) + + override def xPending( + key: K, + group: K, + ): Future[RedisStreamAsyncCommands.PendingMessages] = { + delegateRedisClusterCommandAndLift(_.xpending(key, group)).map(LettuceRedisStreamAsyncCommands.pendingMessagesFromJava) + } + + + override def xPending( + key: K, + group: K, + consumer: K, + range: Option[RedisStreamAsyncCommands.XRange] = None, + limit: Option[RedisStreamAsyncCommands.XRangeLimit] = None, + idle: Option[FiniteDuration] = None + ): Future[List[RedisStreamAsyncCommands.PendingMessage]] = { + val args = new io.lettuce.core.XPendingArgs[K]() + args.consumer(io.lettuce.core.Consumer.from(group, consumer)) + range.foreach(r => args.range(LettuceRedisStreamAsyncCommands.toJavaRange(r))) + limit.foreach(l => args.limit(io.lettuce.core.Limit.create(l.offset, l.count))) + idle.foreach(d => args.idle(d.toJava)) + delegateRedisClusterCommandAndLift(_.xpending(key, args)).map(_.asScala.toList.map{ + pendingMessages => LettuceRedisStreamAsyncCommands.pendingMessageFromJava(pendingMessages) + }) + } + + + override def xRange(key: K, range: RedisStreamAsyncCommands.XRange): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] = + delegateRedisClusterCommandAndLift(_.xrange(key, LettuceRedisStreamAsyncCommands.toJavaRange(range))).map(_.asScala.map { kv => + RedisStreamAsyncCommands.StreamMessage(key, kv.getId, kv.getBody.asScala.toMap) + }.toSeq) + + override def xRange(key: K, + range: XRange, + limit: RedisStreamAsyncCommands.XRangeLimit): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] = { + val args = LettuceRedisStreamAsyncCommands.toJavaRange(range) + val limitArgs = io.lettuce.core.Limit.create(limit.offset, limit.count) + delegateRedisClusterCommandAndLift(_.xrange(key, args, limitArgs)).map(_.asScala.map { kv => + RedisStreamAsyncCommands.StreamMessage(kv.getStream, kv.getId, kv.getBody.asScala.toMap) + }.toSeq) + } + + override def xRead(streams: RedisStreamAsyncCommands.StreamOffset[K]*): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] = { + delegateRedisClusterCommandAndLift(_.xread(streams.map(LettuceRedisStreamAsyncCommands.streamOffsetToJava).toArray:_*)).map(_.asScala.map { kv => + RedisStreamAsyncCommands.StreamMessage(kv.getStream, kv.getId, kv.getBody.asScala.toMap) + }.toSeq) + } + + override def xRead( + streams: List[RedisStreamAsyncCommands.StreamOffset[K]], + count: Option[Long], + block: Option[FiniteDuration], + noAck: Boolean = false + ): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] = { + (count, block, noAck) match { + case (None, None, false) => xRead(streams: _*) + case _ => val args = new io.lettuce.core.XReadArgs() + count.foreach(args.count) + block.foreach(d => args.block(d.toJava)) + if(noAck) args.noack(java.lang.Boolean.TRUE) + delegateRedisClusterCommandAndLift(_.xread(args, streams.map(LettuceRedisStreamAsyncCommands.streamOffsetToJava):_*)).map(_.asScala.map { kv => + RedisStreamAsyncCommands.StreamMessage(kv.getStream, kv.getId, kv.getBody.asScala.toMap) + }.toSeq) + } + } + + override def xReadGroup(group: K, consumer: K, streams: RedisStreamAsyncCommands.StreamOffset[K]*): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] = { + delegateRedisClusterCommandAndLift(_.xreadgroup(io.lettuce.core.Consumer.from(group, consumer), streams.map(LettuceRedisStreamAsyncCommands.streamOffsetToJava):_*)).map(_.asScala.map { kv => + RedisStreamAsyncCommands.StreamMessage(kv.getStream, kv.getId, kv.getBody.asScala.toMap) + }.toSeq) + } + + override def xReadGroup(group: K, + consumer: K, + streams: List[RedisStreamAsyncCommands.StreamOffset[K]], + count: Option[Long], + block: Option[FiniteDuration], + noAck: Boolean): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] = { + (count, block, noAck) match { + case (None, None, false) => xReadGroup(group, consumer, streams: _*) + case _ => + val args = new io.lettuce.core.XReadArgs() + count.foreach(args.count) + block.foreach(d => args.block(d.toJava)) + if(noAck) args.noack(java.lang.Boolean.TRUE) + delegateRedisClusterCommandAndLift(_.xreadgroup(io.lettuce.core.Consumer.from(group, consumer), args, streams.map(LettuceRedisStreamAsyncCommands.streamOffsetToJava):_*)).map(_.asScala.map { kv => + RedisStreamAsyncCommands.StreamMessage(kv.getStream, kv.getId, kv.getBody.asScala.toMap) + }.toSeq) + } + } + + override def xRevRange(key: K, range: XRange): Future[Seq[RedisStreamAsyncCommands.StreamMessage[K, V]]] = { + delegateRedisClusterCommandAndLift(_.xrevrange(key, LettuceRedisStreamAsyncCommands.toJavaRange(range))).map(_.asScala.map { kv => + RedisStreamAsyncCommands.StreamMessage(key, kv.getId, kv.getBody.asScala.toMap) + }.toSeq) + } + + override def xTrim(key: K, count: Long): Future[Long] = { + delegateRedisClusterCommandAndLift(_.xtrim(key, count)).map(Long2long) + } + + override def xTrim(key: K, approximateTrimming: Boolean, count: Long): Future[Long] = { + delegateRedisClusterCommandAndLift(_.xtrim(key, approximateTrimming, count)).map(Long2long) + } + + override def xTrim(key: K, args: RedisStreamAsyncCommands.XTrimArgs): Future[Long] = { + val trimArgs = new io.lettuce.core.XTrimArgs() + if(args.approximateTrimming) trimArgs.approximateTrimming() + if(args.exactTrimming) trimArgs.exactTrimming() + args.maxLen.foreach(trimArgs.maxlen) + args.limit.foreach(trimArgs.limit) + args.minId.foreach(trimArgs.minId) + delegateRedisClusterCommandAndLift(_.xtrim(key, trimArgs)).map(Long2long) + } + +} + +object LettuceRedisStreamAsyncCommands { + + private [arugula] def toJavaRange(range: XRange): io.lettuce.core.Range[String] = { + io.lettuce.core.Range.from(toJavaBoundary(range.lower), toJavaBoundary(range.upper)) + } + + private [commands] def toJavaBoundary(boundary: XRange.Boundary): io.lettuce.core.Range.Boundary[String] = { + boundary.value match { + case Some(value) if boundary.inclusive => io.lettuce.core.Range.Boundary.including(value) + case Some(value) => io.lettuce.core.Range.Boundary.excluding(value) + case None => io.lettuce.core.Range.Boundary.unbounded() + } + } + + private[commands] def streamOffsetToJava[K](streamOffset: RedisStreamAsyncCommands.StreamOffset[K]): io.lettuce.core.XReadArgs.StreamOffset[K] = { + io.lettuce.core.XReadArgs.StreamOffset.from[K](streamOffset.name, streamOffset.offset) + } + + def xRangeFromJava(range: io.lettuce.core.Range[String]): RedisStreamAsyncCommands.XRange = { + RedisStreamAsyncCommands.XRange( + boundaryFromJava(range.getLower), + boundaryFromJava(range.getUpper) + ) + } + + def boundaryFromJava(boundary: io.lettuce.core.Range.Boundary[String]): RedisStreamAsyncCommands.XRange.Boundary = { + boundary match { + case b if b.isUnbounded => RedisStreamAsyncCommands.XRange.Boundary.unbounded + case b if b.isIncluding => RedisStreamAsyncCommands.XRange.Boundary.including(b.getValue) + case b => RedisStreamAsyncCommands.XRange.Boundary.excluding(b.getValue) + } + } + + def pendingMessagesFromJava(pendingMessages: io.lettuce.core.models.stream.PendingMessages): RedisStreamAsyncCommands.PendingMessages = { + RedisStreamAsyncCommands.PendingMessages( + pendingMessages.getCount, + xRangeFromJava(pendingMessages.getMessageIds), + pendingMessages.getConsumerMessageCount.asScala.map{ + case (consumer, count) => consumer -> Long2long(count) + }.toMap + ) + } + + def pendingMessageFromJava(pendingMessage: io.lettuce.core.models.stream.PendingMessage): RedisStreamAsyncCommands.PendingMessage = { + RedisStreamAsyncCommands.PendingMessage( + pendingMessage.getId, + pendingMessage.getConsumer, + pendingMessage.getSinceLastDelivery.toScala, + pendingMessage.getRedeliveryCount + ) + } +} diff --git a/modules/tests/it/src/test/scala/com/github/scoquelin/arugula/RedisCommandsIntegrationSpec.scala b/modules/tests/it/src/test/scala/com/github/scoquelin/arugula/RedisCommandsIntegrationSpec.scala index 93212a4..d9064eb 100644 --- a/modules/tests/it/src/test/scala/com/github/scoquelin/arugula/RedisCommandsIntegrationSpec.scala +++ b/modules/tests/it/src/test/scala/com/github/scoquelin/arugula/RedisCommandsIntegrationSpec.scala @@ -11,7 +11,7 @@ import scala.jdk.CollectionConverters.ListHasAsScala import com.github.scoquelin.arugula.commands.RedisBaseAsyncCommands.InitialCursor import com.github.scoquelin.arugula.commands.RedisGeoAsyncCommands.{GeoCoordinates, GeoWithin} -import com.github.scoquelin.arugula.commands.{RedisBaseAsyncCommands, RedisGeoAsyncCommands, RedisKeyAsyncCommands, RedisListAsyncCommands, RedisScriptingAsyncCommands, RedisServerAsyncCommands} +import com.github.scoquelin.arugula.commands.{RedisBaseAsyncCommands, RedisGeoAsyncCommands, RedisKeyAsyncCommands, RedisListAsyncCommands, RedisScriptingAsyncCommands, RedisServerAsyncCommands, RedisStreamAsyncCommands} import com.github.scoquelin.arugula.commands.RedisStringAsyncCommands.{BitFieldCommand, BitFieldDataType} import io.lettuce.core.{RedisCommandExecutionException, RedisCommandInterruptedException} @@ -1429,6 +1429,61 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with } } } + + "leveraging RedisStreamingAsyncCommands" should { + "allow various streaming commands" in { + withRedisSingleNode(RedisCodec.Utf8WithValueAsStringCodec) { client => + val key = randomKey("stream-key") + val group = randomKey("group") + val consumer = randomKey("consumer") + val entries = Map("field1" -> "value1", "field2" -> "value2", "field3" -> "value3") + for { + messageId <- client.xAdd(key, entries) + results <- client.xRange(key, RedisStreamAsyncCommands.XRange.unbounded) + _ <- results.size shouldBe 1 + _ <- results.head.id shouldBe messageId + _ <- results.head.entries shouldBe entries + dateRangeResults <- client.xRange(key, RedisStreamAsyncCommands.XRange.fromLower(java.time.Instant.now().minusSeconds(10))) + _ <- dateRangeResults.size shouldBe 1 + _ <- dateRangeResults.head.id.nonEmpty shouldBe true + _ <- dateRangeResults.head.entries shouldBe entries + revRangeResults <- client.xRevRange(key, RedisStreamAsyncCommands.XRange.unbounded) + _ <- revRangeResults.size shouldBe 1 + _ <- revRangeResults.head.id shouldBe messageId + _ <- revRangeResults.head.entries shouldBe entries + readResults <- client.xRead(RedisStreamAsyncCommands.StreamOffset.earliest(key)) + _ <- readResults.size shouldBe 1 + _ <- readResults.head.id shouldBe messageId + _ <- readResults.head.entries shouldBe entries + readResultsWithArgs <- client.xRead(List(RedisStreamAsyncCommands.StreamOffset.earliest(key), RedisStreamAsyncCommands.StreamOffset.earliest(key)), count = Some(2L), block = Some(1000.milliseconds)) + _ <- readResultsWithArgs.size shouldBe 2 + _ <- readResultsWithArgs.head.id shouldBe messageId + _ <- readResultsWithArgs.head.entries shouldBe entries + _ <- client.xGroupCreate(RedisStreamAsyncCommands.StreamOffset.earliest(key), group) + _ <- client.xGroupSetId(RedisStreamAsyncCommands.StreamOffset.earliest(key), group) + _ <- client.xGroupCreateConsumer(key, group, consumer) + groupReadResults <- client.xReadGroup(group, consumer, List(RedisStreamAsyncCommands.StreamOffset.earliest(key)), count = Some(2L), block = Some(1000.milliseconds)) + _ <- groupReadResults.size shouldBe 0 + pendingResults <- client.xPending(key, group) + _ <- pendingResults shouldBe RedisStreamAsyncCommands.PendingMessages(0, RedisStreamAsyncCommands.XRange.unbounded) + autoClaimResults <- client.xAutoClaim(key, group, consumer, 1.hour, messageId) + _ <- autoClaimResults.id shouldBe "0-0" + _ <- autoClaimResults.messages.isEmpty shouldBe true + claimResults <- client.xClaim(key, group, consumer, 1.hour, List(messageId)) + _ <- claimResults.isEmpty shouldBe true + _ <- client.xAck(key, group, messageId) + _ <- client.xGroupDelConsumer(key, group, consumer) + _ <- client.xGroupDestroy(key, group) + _ <- client.xGroupCreate(RedisStreamAsyncCommands.StreamOffset.earliest(key), group) + _ <- client.xGroupCreateConsumer(key, group, consumer) + _ <- client.xGroupDelConsumer(key, group, consumer) + _ <- client.xGroupDestroy(key, group) + _ <- client.xDel(key, messageId) + _ <- client.xTrim(key, 0) + } yield succeed + } + } + } } } diff --git a/modules/tests/test/src/test/scala/com/github/scoquelin/arugula/LettuceRedisStreamAsyncCommandsSpec.scala b/modules/tests/test/src/test/scala/com/github/scoquelin/arugula/LettuceRedisStreamAsyncCommandsSpec.scala new file mode 100644 index 0000000..32a6e02 --- /dev/null +++ b/modules/tests/test/src/test/scala/com/github/scoquelin/arugula/LettuceRedisStreamAsyncCommandsSpec.scala @@ -0,0 +1,340 @@ +package com.github.scoquelin.arugula + + +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters.{MapHasAsJava, SeqHasAsJava} + +import com.github.scoquelin.arugula.commands.RedisStreamAsyncCommands +import io.lettuce.core.{KeyValue, RedisFuture, ScoredValue, StreamMessage} +import org.mockito.ArgumentMatchers.{any, eq => meq} +import org.mockito.Mockito.{verify, when} +import org.scalatest.matchers.must.Matchers +import org.scalatest.{FutureOutcome, wordspec} + +import java.util.concurrent.TimeUnit + +class LettuceRedisStreamAsyncCommandsSpec extends wordspec.FixtureAsyncWordSpec with Matchers { + + override type FixtureParam = LettuceRedisCommandsClientFixture.TestContext + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = + withFixture(test.toNoArgAsyncTest(new LettuceRedisCommandsClientFixture.TestContext)) + + "LettuceRedisStreamAsyncCommands" should { + "delegate XACK command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val group = "group" + val ids = Seq("0-0", "0-1") + val expected = 2L + val mockRedisFuture: RedisFuture[java.lang.Long] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xack(any[String], any[String], any[String], any[String])) + .thenReturn(mockRedisFuture) + testClass.xAck(key, group, ids:_*).map{ result => + result mustBe expected + verify(lettuceAsyncCommands).xack(key, group, ids:_*) + succeed + } + } + + + "delegate XADD command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val fields = Map("field1" -> "value1", "field2" -> "value2") + val expected = "0-1" + val mockRedisFuture: RedisFuture[String] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xadd(any[String], any[java.util.Map[String, String]])).thenReturn(mockRedisFuture) + testClass.xAdd(key, fields).map{ result => + result mustBe expected + verify(lettuceAsyncCommands).xadd(key, fields.asJava) + succeed + } + } + + "delegate XADD command with id to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val id = "0-0" + val fields = Map("field1" -> "value1", "field2" -> "value2") + val expected = "0-1" + val mockRedisFuture: RedisFuture[String] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xadd(any[String], any[String], any[java.util.Map[String, String]])).thenReturn(mockRedisFuture) + testClass.xAdd(key, id, fields).map{ result => + result mustBe expected + verify(lettuceAsyncCommands).xadd(key, id, fields.asJava) + succeed + } + } + + "delegate XAUTOCLAIM command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val group = "group" + val consumer = "consumer" + val minIdleTime = 1000L + val id = "0-0" + val fields = Map("field1" -> "value1", "field2" -> "value2") + val message = new StreamMessage[String, String](key, id, fields.asJava) + val expected = new io.lettuce.core.models.stream.ClaimedMessages[String, String]("0-0", java.util.List.of(message)) + val mockRedisFuture: RedisFuture[io.lettuce.core.models.stream.ClaimedMessages[String, String]] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xautoclaim(any[String], any[io.lettuce.core.XAutoClaimArgs[String]])).thenReturn(mockRedisFuture) + testClass.xAutoClaim(key, group, consumer, FiniteDuration(minIdleTime, TimeUnit.MILLISECONDS), id).map { result => + result mustBe RedisStreamAsyncCommands.ClaimedMessages(id, List(RedisStreamAsyncCommands.StreamMessage(key, id, fields))) + verify(lettuceAsyncCommands).xautoclaim(meq(key), any[io.lettuce.core.XAutoClaimArgs[String]]) + succeed + } + } + + "delegate XCLAIM command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val group = "group" + val consumer = "consumer" + val minIdleTime = 1000L + val ids = List("0-0", "0-1") + val expected = java.util.List.of(new StreamMessage[String, String](key, "0-0", Map("field1" -> "value1", "field2" -> "value2").asJava)) + val mockRedisFuture: RedisFuture[java.util.List[io.lettuce.core.StreamMessage[String, String]]] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xclaim(any[String], any[io.lettuce.core.Consumer[String]], any[io.lettuce.core.XClaimArgs], any[String], any[String])) + .thenReturn(mockRedisFuture) + testClass.xClaim(key, group, consumer, FiniteDuration(minIdleTime, TimeUnit.MILLISECONDS), ids).map{ result => + result mustBe List(RedisStreamAsyncCommands.StreamMessage(key, "0-0", Map("field1" -> "value1", "field2" -> "value2"))) + verify(lettuceAsyncCommands).xclaim(meq(key), meq(io.lettuce.core.Consumer.from(group, consumer)), any[io.lettuce.core.XClaimArgs], meq(ids.head), meq(ids.last)) + succeed + } + } + + "delegate XDEL command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val ids = Seq("0-0", "0-1") + val expected = 2L + val mockRedisFuture: RedisFuture[java.lang.Long] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xdel(any[String], any[String], any[String])) + .thenReturn(mockRedisFuture) + testClass.xDel(key, ids: _*).map { result => + result mustBe expected + verify(lettuceAsyncCommands).xdel(key, ids: _*) + succeed + } + } + + "delegate XGROUP CREATE command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val group = "group" + val expected = "OK" + val mockRedisFuture: RedisFuture[String] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xgroupCreate(any[io.lettuce.core.XReadArgs.StreamOffset[String]], any[String], any[io.lettuce.core.XGroupCreateArgs])) + .thenReturn(mockRedisFuture) + testClass.xGroupCreate(RedisStreamAsyncCommands.StreamOffset.latest(key), group).map { result => + result mustBe () + verify(lettuceAsyncCommands).xgroupCreate(any[io.lettuce.core.XReadArgs.StreamOffset[String]], meq(group), any[io.lettuce.core.XGroupCreateArgs]) + succeed + } + } + + "delegate XGROUP DELCONSUMER command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val group = "group" + val id = "0-0" + val expected = 1L + val mockRedisFuture: RedisFuture[java.lang.Long] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xgroupDelconsumer(any[String], any[io.lettuce.core.Consumer[String]])) + .thenReturn(mockRedisFuture) + testClass.xGroupDelConsumer(key, group, id).map { result => + result mustBe expected + verify(lettuceAsyncCommands).xgroupDelconsumer(meq(key), meq(io.lettuce.core.Consumer.from(group, id))) + succeed + } + } + + "delegate XGROUP DESTROY command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val group = "group" + val expected = true + val mockRedisFuture: RedisFuture[java.lang.Boolean] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xgroupDestroy(any[String], any[String])) + .thenReturn(mockRedisFuture) + testClass.xGroupDestroy(key, group).map { result => + result mustBe expected + verify(lettuceAsyncCommands).xgroupDestroy(meq(key), meq(group)) + succeed + } + } + + "delegate XGROUP SETID command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val group = "group" + val id = "0-0" + val expected = "OK" + val mockRedisFuture: RedisFuture[String] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xgroupSetid(any[io.lettuce.core.XReadArgs.StreamOffset[String]], any[String])) + .thenReturn(mockRedisFuture) + testClass.xGroupSetId(RedisStreamAsyncCommands.StreamOffset.latest(key), group).map { result => + result mustBe () + verify(lettuceAsyncCommands).xgroupSetid(any[io.lettuce.core.XReadArgs.StreamOffset[String]], meq(group)) + succeed + } + } + + "delegate XLEN command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val expected = 2L + val mockRedisFuture: RedisFuture[java.lang.Long] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xlen(any[String])) + .thenReturn(mockRedisFuture) + testClass.xLen(key).map { result => + result mustBe expected + verify(lettuceAsyncCommands).xlen(meq(key)) + succeed + } + } + + "delegate XPENDING command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val group = "group" + val pendingMessages = Map[String, java.lang.Long]("consumer1" -> 0L, "consumer2" -> 1L) + val expected = new io.lettuce.core.models.stream.PendingMessages(2L, io.lettuce.core.Range.create("0-0", "0-1"), pendingMessages.asJava) + val mockRedisFuture: RedisFuture[io.lettuce.core.models.stream.PendingMessages] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xpending(any[String], any[String])) + .thenReturn(mockRedisFuture) + testClass.xPending(key, group).map { result => + result mustBe RedisStreamAsyncCommands.PendingMessages(2L, RedisStreamAsyncCommands.XRange("0-0", "0-1"), Map("consumer1" -> 0L, "consumer2" -> 1L)) + verify(lettuceAsyncCommands).xpending(meq(key), meq(group)) + succeed + } + } + + "delegate XRANGE command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val start = "0-0" + val end = "0-1" + val expected = java.util.List.of(new io.lettuce.core.StreamMessage(key, "0-0", Map("field1" -> "value1", "field2" -> "value2").asJava)) + val mockRedisFuture: RedisFuture[java.util.List[io.lettuce.core.StreamMessage[String, String]]] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xrange(any[String], any[io.lettuce.core.Range[String]])) + .thenReturn(mockRedisFuture) + testClass.xRange(key, RedisStreamAsyncCommands.XRange(start, end)).map { result => + result mustBe List(RedisStreamAsyncCommands.StreamMessage(key, "0-0", Map("field1" -> "value1", "field2" -> "value2"))) + verify(lettuceAsyncCommands).xrange(meq(key), meq(io.lettuce.core.Range.create(start, end))) + succeed + } + } + + "delegate XRANGE command with args to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val start = "0-0" + val end = "0-1" + val count = 10L + val expected = java.util.List.of(new io.lettuce.core.StreamMessage(key, "0-0", Map("field1" -> "value1", "field2" -> "value2").asJava)) + val mockRedisFuture: RedisFuture[java.util.List[io.lettuce.core.StreamMessage[String, String]]] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xrange(any[String], any[io.lettuce.core.Range[String]], any[io.lettuce.core.Limit])).thenReturn(mockRedisFuture) + testClass.xRange(key, RedisStreamAsyncCommands.XRange(start, end), RedisStreamAsyncCommands.XRangeLimit(0L, 10L)).map { result => + result mustBe List(RedisStreamAsyncCommands.StreamMessage(key, "0-0", Map("field1" -> "value1", "field2" -> "value2"))) + verify(lettuceAsyncCommands).xrange(meq(key), meq(io.lettuce.core.Range.create(start, end)), any[io.lettuce.core.Limit]) + succeed + } + } + + "delegate XREAD command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val streams = List(RedisStreamAsyncCommands.StreamOffset("stream1", "0-0"), RedisStreamAsyncCommands.StreamOffset("stream2", "0-1")) + val expected: java.util.List[io.lettuce.core.StreamMessage[String, String]] = java.util.List.of(new io.lettuce.core.StreamMessage[String, String](key, "0-0", Map("field1" -> "value1", "field2" -> "value2").asJava)) + val mockRedisFuture: RedisFuture[java.util.List[io.lettuce.core.StreamMessage[String, String]]] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xread(any[io.lettuce.core.XReadArgs.StreamOffset[String]], any[io.lettuce.core.XReadArgs.StreamOffset[String]])) + .thenReturn(mockRedisFuture) + testClass.xRead(streams).map { result => + result mustBe List(RedisStreamAsyncCommands.StreamMessage(key, "0-0", Map("field1" -> "value1", "field2" -> "value2"))) + verify(lettuceAsyncCommands).xread(any[io.lettuce.core.XReadArgs.StreamOffset[String]], any[io.lettuce.core.XReadArgs.StreamOffset[String]]) + succeed + } + } + + "delegate XREAD command with args to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val streams = List(RedisStreamAsyncCommands.StreamOffset("stream1", "0-0"), RedisStreamAsyncCommands.StreamOffset("stream2", "0-1")) + val count = 10L + val block = 1000L + val expected: java.util.List[io.lettuce.core.StreamMessage[String, String]] = java.util.List.of(new io.lettuce.core.StreamMessage[String, String](key, "0-0", Map("field1" -> "value1", "field2" -> "value2").asJava)) + val mockRedisFuture: RedisFuture[java.util.List[io.lettuce.core.StreamMessage[String, String]]] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xread(any[io.lettuce.core.XReadArgs], any[io.lettuce.core.XReadArgs.StreamOffset[String]], any[io.lettuce.core.XReadArgs.StreamOffset[String]])) + .thenReturn(mockRedisFuture) + testClass.xRead(streams, count = Some(count), block = Some(FiniteDuration(block, TimeUnit.MILLISECONDS)), noAck = true).map { result => + result mustBe List(RedisStreamAsyncCommands.StreamMessage(key, "0-0", Map("field1" -> "value1", "field2" -> "value2"))) + verify(lettuceAsyncCommands).xread(any[io.lettuce.core.XReadArgs], any[io.lettuce.core.XReadArgs.StreamOffset[String]], any[io.lettuce.core.XReadArgs.StreamOffset[String]]) + succeed + } + } + + "delegate XREAD GROUP command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val group = "group" + val consumer = "consumer" + val streams = List(RedisStreamAsyncCommands.StreamOffset("stream1", "0-0"), RedisStreamAsyncCommands.StreamOffset("stream2", "0-1")) + val expected: java.util.List[io.lettuce.core.StreamMessage[String, String]] = java.util.List.of(new io.lettuce.core.StreamMessage[String, String]("stream-key", "0-0", Map("field1" -> "value1", "field2" -> "value2").asJava)) + val mockRedisFuture: RedisFuture[java.util.List[io.lettuce.core.StreamMessage[String, String]]] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xreadgroup(any[io.lettuce.core.Consumer[String]], any[io.lettuce.core.XReadArgs.StreamOffset[String]], any[io.lettuce.core.XReadArgs.StreamOffset[String]])) + .thenReturn(mockRedisFuture) + testClass.xReadGroup(group, consumer, streams).map { result => + result mustBe List(RedisStreamAsyncCommands.StreamMessage("stream-key", "0-0", Map("field1" -> "value1", "field2" -> "value2"))) + verify(lettuceAsyncCommands).xreadgroup(meq(io.lettuce.core.Consumer.from(group, consumer)), any[io.lettuce.core.XReadArgs.StreamOffset[String]], any[io.lettuce.core.XReadArgs.StreamOffset[String]]) + succeed + } + } + + "delegate XREVRANGE command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val start = "0-0" + val end = "0-1" + val expected = java.util.List.of(new io.lettuce.core.StreamMessage(key, "0-0", Map("field1" -> "value1", "field2" -> "value2").asJava)) + val mockRedisFuture: RedisFuture[java.util.List[io.lettuce.core.StreamMessage[String, String]]] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xrevrange(any[String], any[io.lettuce.core.Range[String]])) + .thenReturn(mockRedisFuture) + testClass.xRevRange(key, RedisStreamAsyncCommands.XRange(start, end)).map { result => + result mustBe List(RedisStreamAsyncCommands.StreamMessage(key, "0-0", Map("field1" -> "value1", "field2" -> "value2"))) + verify(lettuceAsyncCommands).xrevrange(meq(key), meq(io.lettuce.core.Range.create(start, end))) + succeed + } + } + + "delegate XTRIM command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val count = 10L + val expected = 2L + val mockRedisFuture: RedisFuture[java.lang.Long] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xtrim(any[String], any[java.lang.Long])) + .thenReturn(mockRedisFuture) + testClass.xTrim(key, count).map { result => + result mustBe expected + verify(lettuceAsyncCommands).xtrim(key, count) + succeed + } + } + + "delegate XTRIM with args command to Lettuce and lift result into a Future" in { testContext => + import testContext._ + val key = "stream-key" + val expected = 2L + val mockRedisFuture: RedisFuture[java.lang.Long] = mockRedisFutureToReturn(expected) + when(lettuceAsyncCommands.xtrim(any[String], any[io.lettuce.core.XTrimArgs])) + .thenReturn(mockRedisFuture) + testClass.xTrim(key, RedisStreamAsyncCommands.XTrimArgs(approximateTrimming = true)).map { result => + result mustBe expected + verify(lettuceAsyncCommands).xtrim(meq(key), any[io.lettuce.core.XTrimArgs]) + succeed + } + } + } + +}