Skip to content

Commit 5079b73

Browse files
authored
Cluster transactions (#249)
* Cluster client transaction support Signed-off-by: Adam Fowler <[email protected]> * Add transaction(_:) to client protocol Signed-off-by: Adam Fowler <[email protected]> * Add parameter pack version of ValkeyCluster.transaction Signed-off-by: Adam Fowler <[email protected]> * Fix documentation error Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent a789f2c commit 5079b73

File tree

5 files changed

+352
-49
lines changed

5 files changed

+352
-49
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 111 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -208,18 +208,9 @@ public final class ValkeyClusterClient: Sendable {
208208
public func execute<each Command: ValkeyCommand>(
209209
_ commands: repeat each Command
210210
) async -> sending (repeat Result<(each Command).Response, any Error>) {
211-
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, any Error>, to: Response.Type) -> Result<Response, any Error> {
212-
result.flatMap {
213-
do {
214-
return try .success(Response(fromRESP: $0))
215-
} catch {
216-
return .failure(error)
217-
}
218-
}
219-
}
220211
let results = await self.execute([any ValkeyCommand](commands: repeat each commands))
221212
var index = AutoIncrementingInteger()
222-
return (repeat convert(results[index.next()], to: (each Command).Response.self))
213+
return (repeat results[index.next()].convertFromRESP(to: (each Command).Response.self))
223214
}
224215

225216
/// Results from pipeline and index for each result
@@ -303,6 +294,87 @@ public final class ValkeyClusterClient: Sendable {
303294
}
304295
}
305296

297+
/// Pipeline a series of commands as a transaction to Valkey connection
298+
///
299+
/// Another client will never be served in the middle of the execution of these
300+
/// commands. See https://valkey.io/topics/transactions/ for more information.
301+
///
302+
/// EXEC and MULTI commands are added to the pipelined commands and the output
303+
/// of the EXEC command is transformed into an array of RESPToken Results, one for
304+
/// each command.
305+
///
306+
/// Transactions come only affect keys coming from the same HashSlot.
307+
///
308+
/// - Parameter commands: Parameter pack of ValkeyCommands
309+
/// - Returns: Parameter pack holding the responses of all the commands
310+
/// - Throws: ValkeyTransactionError when EXEC aborts
311+
@inlinable
312+
public func transaction<each Command: ValkeyCommand>(
313+
_ commands: repeat each Command
314+
) async throws -> sending (repeat Result<(each Command).Response, any Error>) {
315+
let results = try await self.transaction([any ValkeyCommand](commands: repeat each commands))
316+
var index = AutoIncrementingInteger()
317+
return (repeat results[index.next()].convertFromRESP(to: (each Command).Response.self))
318+
}
319+
320+
/// Pipeline a series of commands as a transaction to Valkey connection
321+
///
322+
/// Another client will never be served in the middle of the execution of these
323+
/// commands. See https://valkey.io/topics/transactions/ for more information.
324+
///
325+
/// EXEC and MULTI commands are added to the pipelined commands and the output
326+
/// of the EXEC command is transformed into an array of RESPToken Results, one for
327+
/// each command.
328+
///
329+
/// Transactions come only affect keys coming from the same HashSlot.
330+
///
331+
/// This is an alternative version of the transaction function ``ValkeyClusterClient/transaction(_:)->(_,_)``
332+
/// that allows for a collection of ValkeyCommands. It provides more flexibility but the command
333+
/// responses are returned as ``RESPToken`` instead of the response type for the command.
334+
///
335+
/// - Parameter commands: Collection of ValkeyCommands
336+
/// - Returns: Array holding the RESPToken responses of all the commands
337+
/// - Throws: ValkeyTransactionError when EXEC aborts
338+
@inlinable
339+
public func transaction<Commands: Collection & Sendable>(
340+
_ commands: Commands
341+
) async throws -> [Result<RESPToken, Error>] where Commands.Element == any ValkeyCommand {
342+
let hashSlot = try self.hashSlot(for: commands.flatMap { $0.keysAffected })
343+
344+
var clientSelector: () async throws -> ValkeyNodeClient = {
345+
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
346+
}
347+
348+
var asking = false
349+
var attempt = 0
350+
while !Task.isCancelled {
351+
do {
352+
let client = try await clientSelector()
353+
if asking {
354+
return try await client.transactionWithAsk(commands)
355+
} else {
356+
return try await client.transaction(commands)
357+
}
358+
} catch let error as ValkeyClusterError where error == .noNodeToTalkTo {
359+
// TODO: Rerun node discovery!
360+
} catch {
361+
let retryAction = self.getTransactionRetryAction(from: error)
362+
switch retryAction {
363+
case .redirect(let redirectError):
364+
clientSelector = { try await self.nodeClient(for: redirectError) }
365+
asking = (redirectError.redirection == .ask)
366+
case .tryAgain:
367+
let wait = self.clientConfiguration.retryParameters.calculateWaitTime(retry: attempt)
368+
try await Task.sleep(for: wait)
369+
attempt += 1
370+
case .dontRetry:
371+
throw error
372+
}
373+
}
374+
}
375+
throw ValkeyClusterError.clientRequestCancelled
376+
}
377+
306378
struct Redirection {
307379
let node: ValkeyNodeClient
308380
let ask: Bool
@@ -565,6 +637,35 @@ public final class ValkeyClusterClient: Sendable {
565637
}
566638
}
567639

640+
@usableFromInline
641+
/* private */ func getTransactionRetryAction(from error: some Error) -> RetryAction {
642+
switch error {
643+
case let transactionError as ValkeyTransactionError:
644+
switch transactionError {
645+
case .transactionErrors(let results, let execError):
646+
// check whether queued results include any errors that warrent a retry
647+
for result in results {
648+
if case .failure(let queuedError) = result {
649+
let queuedAction = self.getRetryAction(from: queuedError)
650+
guard case .dontRetry = queuedAction else {
651+
return queuedAction
652+
}
653+
}
654+
}
655+
// check whether EXEC error warrents a retry
656+
let execAction = self.getRetryAction(from: execError)
657+
guard case .dontRetry = execAction else {
658+
return execAction
659+
}
660+
return .dontRetry
661+
case .transactionAborted:
662+
return .dontRetry
663+
}
664+
default:
665+
return .dontRetry
666+
}
667+
}
668+
568669
private func queueAction(_ action: RunAction) {
569670
self.actionStreamContinuation.yield(action)
570671
}

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 83 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -386,41 +386,9 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
386386
return try await _execute(
387387
buffer: encoder.buffer,
388388
promises: promises,
389-
valkeyPromises: promises.map { .nio($0) }
390-
) { promises -> Result<[Result<RESPToken, Error>], any Error> in
391-
let responses: EXEC.Response
392-
do {
393-
let execFutureResult = promises.last!.futureResult
394-
responses = try await execFutureResult.get().decode(as: EXEC.Response.self)
395-
} catch let error as ValkeyClientError where error.errorCode == .commandError {
396-
// we received an error while running the EXEC command. Extract queuing
397-
// results and throw error
398-
var results: [Result<RESPToken, Error>] = .init()
399-
results.reserveCapacity(promises.count - 2)
400-
for promise in promises[1..<(promises.count - 1)] {
401-
results.append(await promise.futureResult._result())
402-
}
403-
return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error))
404-
} catch {
405-
return .failure(error)
406-
}
407-
// If EXEC returned nil then transaction was aborted because a
408-
// WATCHed variable changed
409-
guard let responses else {
410-
return .failure(ValkeyTransactionError.transactionAborted)
411-
}
412-
// We convert all the RESP errors in the response from EXEC to Result.failure
413-
return .success(
414-
responses.map {
415-
switch $0.identifier {
416-
case .simpleError, .bulkError:
417-
.failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) }))
418-
default:
419-
.success($0)
420-
}
421-
}
422-
)
423-
}.get()
389+
valkeyPromises: promises.map { .nio($0) },
390+
processResults: self._processTransactionPromises
391+
).get()
424392
}
425393

426394
/// Pipeline a series of commands to Valkey connection and precede each command with an ASKING
@@ -468,6 +436,48 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
468436
}
469437
}
470438

439+
/// Pipeline a series of commands as a transaction preceded with an ASKING command
440+
///
441+
/// Once all the responses for the commands have been received the function returns
442+
/// an array of RESPToken Results, one for each command.
443+
///
444+
/// This is an internal function used by the cluster client
445+
///
446+
/// - Parameter commands: Collection of ValkeyCommands
447+
/// - Returns: Array holding the RESPToken responses of all the commands
448+
@usableFromInline
449+
func transactionWithAsk(
450+
_ commands: some Collection<any ValkeyCommand>
451+
) async throws -> [Result<RESPToken, any Error>] {
452+
self.logger.trace("transaction asking", metadata: ["commands": .string(Self.concatenateCommandNames(commands))])
453+
var promises: [EventLoopPromise<RESPToken>] = []
454+
promises.reserveCapacity(commands.count)
455+
var valkeyPromises: [ValkeyPromise<RESPToken>] = []
456+
valkeyPromises.reserveCapacity(commands.count + 3)
457+
var encoder = ValkeyCommandEncoder()
458+
ASKING().encode(into: &encoder)
459+
MULTI().encode(into: &encoder)
460+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
461+
valkeyPromises.append(.forget)
462+
valkeyPromises.append(.nio(promises.last!))
463+
464+
for command in commands {
465+
command.encode(into: &encoder)
466+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
467+
valkeyPromises.append(.nio(promises.last!))
468+
}
469+
EXEC().encode(into: &encoder)
470+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
471+
valkeyPromises.append(.nio(promises.last!))
472+
473+
return try await _execute(
474+
buffer: encoder.buffer,
475+
promises: promises,
476+
valkeyPromises: valkeyPromises,
477+
processResults: self._processTransactionPromises
478+
).get()
479+
}
480+
471481
/// Execute stream of commands written into buffer
472482
///
473483
/// The function is provided with an array of EventLoopPromises for the responses of commands
@@ -498,6 +508,44 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
498508
}
499509
}
500510

511+
@usableFromInline
512+
func _processTransactionPromises(
513+
_ promises: [EventLoopPromise<RESPToken>]
514+
) async -> sending Result<[Result<RESPToken, Error>], any Error> {
515+
let responses: EXEC.Response
516+
do {
517+
let execFutureResult = promises.last!.futureResult
518+
responses = try await execFutureResult.get().decode(as: EXEC.Response.self)
519+
} catch let error as ValkeyClientError where error.errorCode == .commandError {
520+
// we received an error while running the EXEC command. Extract queuing
521+
// results and throw error
522+
var results: [Result<RESPToken, Error>] = .init()
523+
results.reserveCapacity(promises.count - 2)
524+
for promise in promises[1..<(promises.count - 1)] {
525+
results.append(await promise.futureResult._result())
526+
}
527+
return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error))
528+
} catch {
529+
return .failure(error)
530+
}
531+
// If EXEC returned nil then transaction was aborted because a
532+
// WATCHed variable changed
533+
guard let responses else {
534+
return .failure(ValkeyTransactionError.transactionAborted)
535+
}
536+
// We convert all the RESP errors in the response from EXEC to Result.failure
537+
return .success(
538+
responses.map {
539+
switch $0.identifier {
540+
case .simpleError, .bulkError:
541+
.failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) }))
542+
default:
543+
.success($0)
544+
}
545+
}
546+
)
547+
}
548+
501549
#if DistributedTracingSupport
502550
@usableFromInline
503551
func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) {

Sources/Valkey/Node/ValkeyNodeClient.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,17 @@ extension ValkeyNodeClient {
272272
return .init(repeating: .failure(error), count: commands.count)
273273
}
274274
}
275+
276+
/// Internal command used by cluster client, that precedes each command with a ASKING
277+
/// command
278+
@usableFromInline
279+
func transactionWithAsk<Commands: Collection & Sendable>(
280+
_ commands: Commands
281+
) async throws -> [Result<RESPToken, any Error>] where Commands.Element == any ValkeyCommand {
282+
try await self.withConnection { connection in
283+
try await connection.transactionWithAsk(commands)
284+
}
285+
}
275286
}
276287

277288
/// Extension that makes ``ValkeyNode`` conform to ``ValkeyNodeConnectionPool``.

Sources/Valkey/ValkeyClientProtocol.swift

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public protocol ValkeyClientProtocol: Sendable {
1919
/// Once all the responses for the commands have been received the function returns
2020
/// an array of RESPToken Results, one for each command.
2121
///
22-
/// This is an alternative version of the pipelining function ``ValkeyClient/execute(_:)->(_,_)``
22+
/// This is an alternative version of the pipelining function ``ValkeyConnection/execute(_:)->(_,_)``
2323
/// that allows for a collection of ValkeyCommands. It provides more flexibility but
2424
/// is more expensive to run and the command responses are returned as ``RESPToken``
2525
/// instead of the response type for the command.
@@ -28,6 +28,24 @@ public protocol ValkeyClientProtocol: Sendable {
2828
/// - Returns: Array holding the RESPToken responses of all the commands
2929
func execute(_ commands: [any ValkeyCommand]) async -> [Result<RESPToken, any Error>]
3030

31+
/// Pipeline a series of commands as a transaction to Valkey connection
32+
///
33+
/// Another client will never be served in the middle of the execution of these
34+
/// commands. See https://valkey.io/topics/transactions/ for more information.
35+
///
36+
/// EXEC and MULTI commands are added to the pipelined commands and the output
37+
/// of the EXEC command is transformed into an array of RESPToken Results, one for
38+
/// each command.
39+
///
40+
/// This is an alternative version of the transaction function ``ValkeyConnection/transaction(_:)->(_,_)``
41+
/// that allows for a collection of ValkeyCommands. It provides more flexibility but the command
42+
/// responses are returned as ``RESPToken`` instead of the response type for the command.
43+
///
44+
/// - Parameter commands: Collection of ValkeyCommands
45+
/// - Returns: Array holding the RESPToken responses of all the commands
46+
/// - Throws: ValkeyTransactionError when EXEC aborts
47+
func transaction(_ commands: [any ValkeyCommand]) async throws -> [Result<RESPToken, any Error>]
48+
3149
/// Execute subscribe command and run closure using related ``ValkeySubscription``
3250
/// AsyncSequence
3351
///

0 commit comments

Comments
 (0)