Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions Sources/OneWay/AnyEffect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public struct AnyEffect<Element>: Effect where Element: Sendable {
public enum Method: Sendable {
case register(any EffectID, cancelInFlight: Bool)
case cancel(any EffectID)
case throttle(id: any EffectID, interval: Duration, latest: Bool)
case none
}

Expand Down Expand Up @@ -158,6 +159,49 @@ public struct AnyEffect<Element>: Effect where Element: Sendable {
)
return copy
}

/// Creates an effect that emits elements from this effect, but only if a certain amount of time
/// has passed between emissions.
///
/// First, create a `Hashable` ID that will be used to identify the throttle effect:
///
/// ```swift
/// enum ThrottleID {
/// case button
/// }
/// ```
///
/// Then, apply the `throttle` modifier using the defined ID:
///
/// ```swift
/// func reduce(state: inout State, action: Action) -> AnyEffect<Action> {
/// switch action {
/// // ...
/// case .perform:
/// return .just(.increment)
/// .throttle(id: ThrottleID.button, for: .seconds(1))
/// // ...
/// }
/// }
/// ```
///
/// - Parameters:
/// - id: The effect’s identifier.
/// - interval: The duration that must elapse before another element can be emitted.
/// - latest: A Boolean value indicating whether to emit the most recent element.
/// If `false`, the effect emits the first element and ignores subsequent ones during the
/// interval. If `true`, it emits the first element and then the most recent element once
/// the interval has passed. Defaults to `false`.
/// - Returns: A new effect that emits elements according to the throttle behavior.
public consuming func throttle(
id: some EffectID,
for interval: Duration,
latest: Bool = false
) -> Self {
var copy = self
copy.method = .throttle(id: id, interval: interval, latest: latest)
return copy
}
}

extension AnyEffect {
Expand Down
127 changes: 92 additions & 35 deletions Sources/OneWay/Store.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import Foundation
/// It is fully thread-safe as it is implemented using an actor. It stores the `State` and can
/// change the `State` by receiving `Actions`. You can define `Action` and `State` in ``Reducer``.
/// If you create a data flow through `Store`, you can make it flow in one direction.
public actor Store<R: Reducer>
public actor Store<R: Reducer, C: Clock<Duration>>
where R.Action: Sendable, R.State: Sendable & Equatable {
/// A convenience type alias for referring to a action of a given reducer's action.
public typealias Action = R.Action
Expand Down Expand Up @@ -46,27 +46,34 @@ where R.Action: Sendable, R.State: Sendable & Equatable {
!isProcessing && tasks.isEmpty
}

private let reducer: any Reducer<Action, State>
private let reducer: R
private let clock: C
private let continuation: AsyncStream<State>.Continuation
private var isProcessing: Bool = false
private var actionQueue: [Action] = []
private var bindingTask: Task<Void, Never>?
private var tasks: [TaskID: Task<Void, Never>] = [:]
private var cancellables: [EffectIDWrapper: Set<TaskID>] = [:]
private var throttleTimestamps: [EffectIDWrapper: C.Instant] = [:]
private var trailingThrottledEffects: [EffectIDWrapper: AnyEffect<Action>] = [:]

/// Initializes a store from a reducer and an initial state.
/// Initializes a store from a reducer, an initial state, and a clock.
///
/// - Parameters:
/// - reducer: The reducer is responsible for transitioning the current state to the next
/// state.
/// - state: The state to initialize a store.
/// - reducer: The reducer responsible for transitioning the current state to the next
/// state in response to actions.
/// - state: The initial state used to create the store.
/// - clock: The clock that determines how time-based effects (such as debounce or throttle)
/// are scheduled. Defaults to `ContinuousClock`.
public init(
reducer: @Sendable @autoclosure () -> R,
state: State
state: State,
clock: C = ContinuousClock()
) {
self.initialState = state
self.state = state
self.reducer = reducer()
self.clock = clock
(states, continuation) = AsyncStream<State>.makeStream()
Task { await bindExternalEffect() }
defer { continuation.yield(state) }
Expand All @@ -86,35 +93,10 @@ where R.Action: Sendable, R.State: Sendable & Equatable {
isProcessing = true
await Task.yield()
for action in actionQueue {
let taskID = TaskID()
let effect = reducer.reduce(state: &state, action: action)
let task = Task { [weak self, taskID] in
guard !Task.isCancelled else { return }
for await value in effect.values {
guard let self else { break }
guard !Task.isCancelled else { break }
await send(value)
}
await self?.removeTask(taskID)
}
tasks[taskID] = task

switch effect.method {
case let .register(id, cancelInFlight):
let effectID = EffectIDWrapper(id)
if cancelInFlight {
let taskIDs = cancellables[effectID, default: []]
taskIDs.forEach { removeTask($0) }
cancellables.removeValue(forKey: effectID)
}
cancellables[effectID, default: []].insert(taskID)
case let .cancel(id):
let effectID = EffectIDWrapper(id)
let taskIDs = cancellables[effectID, default: []]
taskIDs.forEach { removeTask($0) }
cancellables.removeValue(forKey: effectID)
case .none:
break
let isThrottled = await throttleIfNeeded(for: effect)
if !isThrottled {
await execute(effect: effect)
}
}
actionQueue = []
Expand All @@ -130,6 +112,81 @@ where R.Action: Sendable, R.State: Sendable & Equatable {
tasks.forEach { $0.value.cancel() }
tasks.removeAll()
actionQueue.removeAll()
cancellables.removeAll()
trailingThrottledEffects.removeAll()
throttleTimestamps.removeAll()
}

private func throttleIfNeeded(for effect: AnyEffect<Action>) async -> Bool {
guard case let .throttle(id, interval, latest) = effect.method else {
return false
}
let effectID = EffectIDWrapper(id)
let now = clock.now
if let last = throttleTimestamps[effectID],
last.duration(to: now) < interval {
if latest {
trailingThrottledEffects[effectID] = effect
}
return true
} else {
throttleTimestamps[effectID] = now
if latest {
Task { [weak self] in
do {
try await self?.clock.sleep(for: interval)
await self?.executeTrailingThrottledEffects(effectID)
}
catch {
await self?.removeTrailingThrottledEffects(effectID)
}
}
}
return false
}
}

private func execute(effect: AnyEffect<Action>) async {
let taskID = TaskID()
let task = Task { [weak self, taskID] in
guard !Task.isCancelled else { return }
for await value in effect.values {
guard let self else { break }
guard !Task.isCancelled else { break }
await send(value)
}
await self?.removeTask(taskID)
}
tasks[taskID] = task

switch effect.method {
case let .register(id, cancelInFlight):
let effectID = EffectIDWrapper(id)
if cancelInFlight {
let taskIDs = cancellables[effectID, default: []]
taskIDs.forEach { removeTask($0) }
cancellables.removeValue(forKey: effectID)
}
cancellables[effectID, default: []].insert(taskID)
case let .cancel(id):
let effectID = EffectIDWrapper(id)
let taskIDs = cancellables[effectID, default: []]
taskIDs.forEach { removeTask($0) }
cancellables.removeValue(forKey: effectID)
case .throttle,
.none:
break
}
}

private func executeTrailingThrottledEffects(_ effectID: EffectIDWrapper) async {
if let effect = trailingThrottledEffects.removeValue(forKey: effectID) {
await execute(effect: effect)
}
}

private func removeTrailingThrottledEffects(_ effectID: EffectIDWrapper) async {
trailingThrottledEffects.removeValue(forKey: effectID)
}

private func bindExternalEffect() {
Expand Down
20 changes: 12 additions & 8 deletions Sources/OneWay/ViewStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Combine
/// It can observe state changes and send actions. It can primarily be used in SwiftUI's `View`,
/// `UIView` or `UIViewController` operating on main thread.
@MainActor
public final class ViewStore<R: Reducer>
public final class ViewStore<R: Reducer, C: Clock<Duration>>
where R.Action: Sendable, R.State: Sendable & Equatable {
/// A convenience type alias for referring to a action of a given reducer's action.
public typealias Action = R.Action
Expand All @@ -41,25 +41,29 @@ where R.Action: Sendable, R.State: Sendable & Equatable {
/// state changes
public let states: AsyncViewStateSequence<State>

private let store: Store<R>
private let store: Store<R, C>
private let continuation: AsyncStream<State>.Continuation
private var task: Task<Void, Never>?

/// Initializes a view store from a reducer and an initial state.
/// Initializes a view store from a reducer, an initial state, and a clock.
///
/// - Parameters:
/// - reducer: The reducer is responsible for transitioning the current state to the next
/// state.
/// - state: The state to initialize a store.
/// - reducer: The reducer responsible for transitioning the current state to the next
/// state in response to actions.
/// - state: The initial state used to create the store.
/// - clock: The clock that determines how time-based effects (such as debounce or throttle)
/// are scheduled. Defaults to `ContinuousClock`.
public init(
reducer: @Sendable @autoclosure () -> R,
state: State
state: State,
clock: C = ContinuousClock()
) {
self.initialState = state
self.state = state
self.store = Store(
reducer: reducer(),
state: state
state: state,
clock: clock
)
let (stream, continuation) = AsyncStream<State>.makeStream()
self.states = AsyncViewStateSequence(stream)
Expand Down
8 changes: 4 additions & 4 deletions Sources/OneWayTesting/Store+Testing.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ extension Store {
#if canImport(XCTest)
if isTimeout && result != input {
XCTFail(
"Exceeded timeout of \(timeout) seconds",
"Timeout exceeded \(timeout) seconds: received \(input), expected \(result)",
file: filePath,
line: line
)
Expand All @@ -87,7 +87,7 @@ extension Store {
case .testing:
if isTimeout && result != input {
Issue.record(
"Exceeded timeout of \(timeout) seconds",
"Timeout exceeded \(timeout) seconds: received \(input), expected \(result)",
sourceLocation: Testing.SourceLocation(
fileID: String(describing: fileID),
filePath: String(describing: filePath),
Expand Down Expand Up @@ -156,7 +156,7 @@ extension Store {
#if canImport(XCTest)
if isTimeout && result != input {
XCTFail(
"Exceeded timeout of \(timeout) seconds",
"Timeout exceeded \(timeout) seconds: received \(input), expected \(result)",
file: filePath,
line: line
)
Expand All @@ -174,7 +174,7 @@ extension Store {
case .testing:
if isTimeout && result != input {
Issue.record(
"Exceeded timeout of \(timeout) seconds",
"Timeout exceeded \(timeout) seconds: received \(input), expected \(result)",
sourceLocation: Testing.SourceLocation(
fileID: String(describing: fileID),
filePath: String(describing: filePath),
Expand Down
Loading