Skip to content

Commit

Permalink
shared? maybe
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-zethraeus committed May 2, 2023
1 parent a6180c5 commit 84614b2
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 70 deletions.
24 changes: 24 additions & 0 deletions Sources/Emitter/Interface/Subscriber.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
// MARK: - Subscriber

public protocol Subscriber<Input, Failure> {
associatedtype Input
associatedtype Failure: Error
nonisolated func receive(emission: Emission<Input, Failure>)
}

extension Subscriber {
public func eraseSubscriber() -> AnySubscriber<Input, Failure> {
(self as? AnySubscriber<Input, Failure>) ?? .init(self)
}
}

// MARK: - AnySubscriber

public struct AnySubscriber<Input, Failure: Error>: Subscriber, Sendable {
public func receive(emission: Emission<Input, Failure>) {
receiveFunc(emission)
}

init(_ subscriber: some Subscriber<Input, Failure>) {
self.receiveFunc = {
subscriber.receive(emission: $0)
}
}

private let receiveFunc: @Sendable (Emission<Input, Failure>) -> Void
}
11 changes: 0 additions & 11 deletions Sources/Emitter/Operators/OnMainActor.swift

This file was deleted.

2 changes: 1 addition & 1 deletion Sources/Emitter/Operators/Redirect.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Disposable

extension Emitter {
func redirect(
public func redirect(
_ redirection: @escaping @Sendable (
_ event: Emission<Output, Failure>,
_ downstream: @escaping @Sendable (Emission<Output, Failure>) -> Void
Expand Down
204 changes: 204 additions & 0 deletions Sources/Emitter/Operators/Shared.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import Disposable
import Foundation

extension Emitter {

@available(macOS 13.0, iOS 16.0, *)
public func shared(replay: Int = 1) -> some Emitter<Output, Failure> {
Emitters.Shared(upstream: self, replayCount: replay)
}
}

// MARK: - Emitters.Shared

extension Emitters {

// MARK: - Prefix

@available(macOS 13.0, iOS 16.0, *)
public struct Shared<Upstream: Emitter>: Emitter {

// MARK: Lifecycle

public init(
upstream: Upstream,
replayCount: Int
) {
self.upstream = upstream
self.replayCount = replayCount
}

// MARK: Public

public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure

public func subscribe(_ subscriber: some Subscriber<Output, Failure>)
-> AutoDisposable
{
if let sub = sharedSub.value {
return sub.addSubscriber(subscriber)
} else {
let sub = SharedSub<Output, Failure>(
replayCount: replayCount
) {
sharedSub.value = nil
}
sharedSub.value = sub
let disposable = upstream.subscribe(sub)
sub.storeUpstream(disposable: disposable)
return sub.addSubscriber(subscriber)
}
}

// MARK: Private

private struct SharedSub<Input, Failure: Error>: Subscriber {

// MARK: Lifecycle

public init(
replayCount: Int,
onAllUnsubscribe: @escaping () -> Void
) {
self.onAllUnsubscribe = onAllUnsubscribe
self.maxCount = replayCount
self.cache = .init((
buffer: ContiguousArray<Emission<Upstream.Output, Failure>>(
unsafeUninitializedCapacity: replayCount,
initializingWith: { _, initializedCount in
initializedCount = 0
}
),
next: 0,
count: 0,
isActive: true
))
}

// MARK: Public

public func receive(emission: Emission<Upstream.Output, Failure>) {
switch emission {
case .failed,
.finished:
cache.value.isActive = false
return
case .value:
cache.withLock { mutValue in
var (buffer, next, count, isActive) = mutValue
assert(isActive)
if next >= count {
buffer.append(emission)
} else {
buffer[next] = emission
}
count = min(count + 1, maxCount)
next = (next + 1) % maxCount
mutValue = (buffer, next, count, isActive)
}
for downstream in downstreams.value.values {
downstream.receive(emission: emission)
}
}
}

// MARK: Internal

func storeUpstream(disposable: AutoDisposable) {
upstreamDisposable.withLock { mutValue in
assert(mutValue == nil)
mutValue = disposable
}
}

func getBuffer() -> [Emission<Upstream.Output, Failure>]? {
cache.withLock { mutValue in
let (buffer, next, count, isActive) = mutValue
if !isActive {
return nil
}
let mod = (next - count) % maxCount
let firstStart = mod < 0 ? maxCount + mod : mod
let firstEnd = min(firstStart + count, maxCount)
let firstRange = firstStart ..< firstEnd
let secondRange: Range<Int>
if firstRange.count < count {
let remainder = count - firstRange.count
secondRange = 0 ..< remainder
} else {
secondRange = 0 ..< 0
}
return Array(buffer[firstRange] + buffer[secondRange])
}
}

func addSubscriber(_ subscriber: some Subscriber<Output, Failure>) -> AutoDisposable {
let id = UUID()
let subscriber = subscriber.eraseSubscriber()
// if we get a nil buffer, we've completed.
guard let buffer = getBuffer()
else {
return AutoDisposable { }
}
defer {
for output in buffer {
subscriber.receive(emission: output)
}
}
downstreams.withLock { mutValue in
mutValue[id] = subscriber
}
return AutoDisposable {
removeSubscriber(id: id)
}
}

// MARK: Private

private let onAllUnsubscribe: () -> Void

private let maxCount: Int
/// FIXME: these separated locked state bits probably race.
private let cache: Locked<(
buffer: ContiguousArray<Emission<Upstream.Output, Failure>>,
next: Int,
count: Int,
isActive: Bool
)>
private let downstreams = Locked<[UUID: AnySubscriber<Output, Failure>]>([:])
private let upstreamDisposable = Locked<AutoDisposable?>(nil)

private func removeSubscriber(id: UUID) {
let remainingSubscribers = downstreams.withLock { mutValue in
mutValue[id] = nil
return mutValue.count
}
if remainingSubscribers > 0 {
return
}
if
let disposable = upstreamDisposable.withLock(action: { mutValue in
let value = mutValue
mutValue = nil
return value
})
{
disposable.dispose()
}
onAllUnsubscribe()
}

}

private let sharedSub = Locked<SharedSub<Output, Failure>?>(nil)

private let upstream: Upstream
private let replayCount: Int
}
}

// MARK: - Emitters.Shared + Sendable

@available(macOS 13.0, iOS 16.0, *)
extension Emitters.Shared: Sendable where Upstream: Sendable { }
8 changes: 2 additions & 6 deletions Sources/Emitter/Sources/PublishSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,8 @@ extension PublishSubject {

if didSubscribe {
return AutoDisposable {
if
let subscription = self.state.withLock(action: { state in
state.subs.remove(subscription)
})
{
subscription.receive(emission: .finished)
_ = self.state.withLock { state in
state.subs.remove(subscription)
}
}
} else {
Expand Down
52 changes: 0 additions & 52 deletions Tests/EmitterTests/OperatorTests/OnMainActorTests.swift

This file was deleted.

64 changes: 64 additions & 0 deletions Tests/EmitterTests/OperatorTests/SharedTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import Disposable
import Emitter
import XCTest

// MARK: - SharedTests

final class SharedTests: XCTestCase {

let stage = DisposableStage()

override func setUp() { }

override func tearDown() {
stage.reset()
}

@available(macOS 13.0, *)
func test_shared() throws {
var record1: [String] = []
var record2: [String] = []
var record3: [String] = []
let source = PublishSubject<String, Never>()

let shared = source
.shared(replay: 5)

shared.subscribe { value in
record1.append(value)
}.stage(on: stage)

XCTAssertEqual(record1.count, 0)

let entries: [String] = ["a", "d", "e", "f", "g", "h", "i"]

for entry in entries {
source.emit(value: entry)
}

XCTAssertEqual(record1.count, 7)

shared.subscribe { value in
record2.append(value)
}.stage(on: stage)

XCTAssertEqual(record2, ["e", "f", "g", "h", "i"])

stage.reset()

shared.subscribe { value in
record3.append(value)
}.stage(on: stage)

XCTAssertEqual(record3, [])

let entries2: [String] = ["a", "b", "x"]

for entry in entries2 {
source.emit(value: entry)
}

XCTAssertEqual(record3, entries2)
}

}

0 comments on commit 84614b2

Please sign in to comment.