From 75646bfa923a67c411f04ae8e82d27d642b6a241 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Fri, 13 Feb 2015 20:56:11 -0800 Subject: [PATCH 1/8] Loosen Quick and Nimble constraints, bump LlamaKit --- Cartfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cartfile b/Cartfile index 6384ce8..c8890da 100644 --- a/Cartfile +++ b/Cartfile @@ -1,5 +1,5 @@ github "ReactiveCocoa/ReactiveCocoa" "swift-development" -github "Quick/Quick" == 0.2.0 -github "Quick/Nimble" -github "Carthage/LlamaKit" == 0.1.1 +github "Quick/Quick" ~> 0.2 +github "Quick/Nimble" ~> 0.3 +github "LlamaKit/LlamaKit" == 0.5 github "jspahrsummers/xcconfigs" >= 0.6 From 4930739252b9cf0c5944ed0f9b85aa854b652ff8 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Fri, 13 Feb 2015 21:08:42 -0800 Subject: [PATCH 2/8] carthage update --- .gitmodules | 2 +- Cartfile.resolved | 10 +++++----- Carthage/Checkouts/LlamaKit | 2 +- Carthage/Checkouts/Nimble | 2 +- Carthage/Checkouts/Quick | 2 +- Carthage/Checkouts/ReactiveCocoa | 2 +- Carthage/Checkouts/xcconfigs | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/.gitmodules b/.gitmodules index 119f1b8..9dde401 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,6 @@ [submodule "Carthage.checkout/LlamaKit"] path = Carthage/Checkouts/LlamaKit - url = https://github.com/Carthage/LlamaKit.git + url = https://github.com/LlamaKit/LlamaKit.git [submodule "Carthage.checkout/Nimble"] path = Carthage/Checkouts/Nimble url = https://github.com/Quick/Nimble.git diff --git a/Cartfile.resolved b/Cartfile.resolved index 4c7ce0f..4ab0771 100644 --- a/Cartfile.resolved +++ b/Cartfile.resolved @@ -1,5 +1,5 @@ -github "Carthage/LlamaKit" "carthage-0.1.1" -github "Quick/Nimble" "v0.2.0" -github "Quick/Quick" "v0.2.0" -github "jspahrsummers/xcconfigs" "0.7" -github "ReactiveCocoa/ReactiveCocoa" "e94a432e6c47e03f47cb263225ceccffe9b7bea6" +github "LlamaKit/LlamaKit" "v0.5.0" +github "Quick/Nimble" "v0.3.0" +github "Quick/Quick" "v0.2.2" +github "jspahrsummers/xcconfigs" "0.7.1" +github "ReactiveCocoa/ReactiveCocoa" "15c2c9a473f5bf09b79061f603cb122a257162e1" diff --git a/Carthage/Checkouts/LlamaKit b/Carthage/Checkouts/LlamaKit index ac2c2d3..e37b966 160000 --- a/Carthage/Checkouts/LlamaKit +++ b/Carthage/Checkouts/LlamaKit @@ -1 +1 @@ -Subproject commit ac2c2d36bf14bdddd0c1c489df67ad9a7ad04eb4 +Subproject commit e37b966998df6ca05445c0b5d9c6c9560f1e7b61 diff --git a/Carthage/Checkouts/Nimble b/Carthage/Checkouts/Nimble index 6f787ee..aeb5da7 160000 --- a/Carthage/Checkouts/Nimble +++ b/Carthage/Checkouts/Nimble @@ -1 +1 @@ -Subproject commit 6f787eeb75b2fa71464981f28b3dc8d7bd532e69 +Subproject commit aeb5da7bed72483d22ccfef5310ccdd3dd39a06f diff --git a/Carthage/Checkouts/Quick b/Carthage/Checkouts/Quick index 315ae2a..b0e9828 160000 --- a/Carthage/Checkouts/Quick +++ b/Carthage/Checkouts/Quick @@ -1 +1 @@ -Subproject commit 315ae2a4a630156d5ac094ae1f107af60edce21d +Subproject commit b0e98286db7e9d1f1e73cd4e69eac2aae2a1e3f8 diff --git a/Carthage/Checkouts/ReactiveCocoa b/Carthage/Checkouts/ReactiveCocoa index 7121cb3..15c2c9a 160000 --- a/Carthage/Checkouts/ReactiveCocoa +++ b/Carthage/Checkouts/ReactiveCocoa @@ -1 +1 @@ -Subproject commit 7121cb395f610f2d017ee087df874209d8804242 +Subproject commit 15c2c9a473f5bf09b79061f603cb122a257162e1 diff --git a/Carthage/Checkouts/xcconfigs b/Carthage/Checkouts/xcconfigs index 6c64238..b09b4b6 160000 --- a/Carthage/Checkouts/xcconfigs +++ b/Carthage/Checkouts/xcconfigs @@ -1 +1 @@ -Subproject commit 6c64238566ef1126f1b7994aa9e6719229c50451 +Subproject commit b09b4b63235b760a3a48a8dd19c0415aaaa8f269 From 9fc386534a5ab094eb593945898bda266c07abb3 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Fri, 13 Feb 2015 21:09:28 -0800 Subject: [PATCH 3/8] Refactor ReactiveTaskError into a real enum --- ReactiveTask/Errors.swift | 31 +++++++++++++++++-------------- ReactiveTask/Task.swift | 18 +++--------------- 2 files changed, 20 insertions(+), 29 deletions(-) diff --git a/ReactiveTask/Errors.swift b/ReactiveTask/Errors.swift index 89f3943..2bc8117 100644 --- a/ReactiveTask/Errors.swift +++ b/ReactiveTask/Errors.swift @@ -8,19 +8,22 @@ import Foundation -/// Possible error codes within `ReactiveTaskErrorDomain`. -public enum ReactiveTaskError: Int { - /// The domain for all errors originating within ReactiveTask. - public static let domain: NSString = "org.carthage.ReactiveTask" - - /// In a user info dictionary, associated with the exit code from a child - /// process. - public static let exitCodeKey: NSString = "ReactiveTaskErrorExitCode" - - /// In a user info dictionary, associated with any accumulated stderr - /// string. - public static let standardErrorKey: NSString = "ReactiveTaskErrorStandardError" - +/// An error originating from ReactiveTask. +public enum ReactiveTaskError { /// A shell task exited unsuccessfully. - case ShellTaskFailed + case ShellTaskFailed(exitCode: Int, standardError: String?) +} + +extension ReactiveTaskError: Printable { + public var description: String { + switch self { + case let .ShellTaskFailed(exitCode, standardError): + var description = "A shell task failed with exit code \(exitCode)" + if let standardError = standardError { + description += ":\n\(standardError)" + } + + return description + } + } } diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index 0c8b3b4..997c1fb 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -305,24 +305,12 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< .map { (datas, terminationStatus) -> (NSData, NSData, Int32) in return (datas.0, datas.1, terminationStatus) } - .tryMap { (stdoutData, stderrData, terminationStatus) -> Result in + .tryMap { (stdoutData, stderrData, terminationStatus) -> Result in if terminationStatus == EXIT_SUCCESS { return success(stdoutData) } else { - let errorString = (stderrData.length > 0 ? NSString(data: stderrData, encoding: NSUTF8StringEncoding) as String? : nil) ?? "" - - var description = "A shell task failed with exit code \(terminationStatus)" - if !errorString.isEmpty { - description += ":\n\(errorString)" - } - - let error = NSError(domain: ReactiveTaskError.domain, code: ReactiveTaskError.ShellTaskFailed.rawValue, userInfo: [ - ReactiveTaskError.exitCodeKey: Int(terminationStatus), - ReactiveTaskError.standardErrorKey: errorString, - NSLocalizedDescriptionKey: errorString - ]) - - return failure(error) + let errorString = (stderrData.length > 0 ? String(data: stderrData, encoding: NSUTF8StringEncoding) : nil) + return failure(ReactiveTaskError(exitCode: terminationStatus, standardError: errorString)) } } } From ca074dd6e7b8a787f35f37e01b762aedfda9dfff Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Fri, 13 Feb 2015 21:28:58 -0800 Subject: [PATCH 4/8] WIP refactoring for new RAC and LlamaKit --- ReactiveTask/Errors.swift | 25 +++++++- ReactiveTask/Task.swift | 117 +++++++++++++++++--------------------- 2 files changed, 75 insertions(+), 67 deletions(-) diff --git a/ReactiveTask/Errors.swift b/ReactiveTask/Errors.swift index 2bc8117..581cb3a 100644 --- a/ReactiveTask/Errors.swift +++ b/ReactiveTask/Errors.swift @@ -7,11 +7,29 @@ // import Foundation +import ReactiveCocoa /// An error originating from ReactiveTask. public enum ReactiveTaskError { /// A shell task exited unsuccessfully. - case ShellTaskFailed(exitCode: Int, standardError: String?) + case ShellTaskFailed(exitCode: Int32, standardError: String?) + + /// An error was returned from a POSIX API. + case POSIXError(Int32) +} + +extension ReactiveTaskError: ErrorType { + public var nsError: NSError { + switch self { + case let .POSIXError(code): + return NSError(domain: NSPOSIXErrorDomain, code: Int(code), userInfo: nil) + + default: + return NSError(domain: "org.carthage.ReactiveTask", code: 0, userInfo: [ + NSLocalizedDescriptionKey: self.description + ]) + } + } } extension ReactiveTaskError: Printable { @@ -22,8 +40,11 @@ extension ReactiveTaskError: Printable { if let standardError = standardError { description += ":\n\(standardError)" } - + return description + + case let .POSIXError: + return nsError.description } } } diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index 997c1fb..d01fcd3 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -32,12 +32,10 @@ public struct TaskDescription { /// Data to stream to standard input of the launched process. /// - /// An error sent along this signal will interrupt the task. - /// /// If nil, stdin will be inherited from the parent process. - public var standardInput: ColdSignal? + public var standardInput: SignalProducer? - public init(launchPath: String, arguments: [String] = [], workingDirectoryPath: String? = nil, environment: [String: String]? = nil, standardInput: ColdSignal? = nil) { + public init(launchPath: String, arguments: [String] = [], workingDirectoryPath: String? = nil, environment: [String: String]? = nil, standardInput: SignalProducer? = nil) { self.launchPath = launchPath self.arguments = arguments self.workingDirectoryPath = workingDirectoryPath @@ -88,13 +86,12 @@ private final class Pipe { } /// Instantiates a new descriptor pair. - class func create() -> Result { + class func create() -> Result { var fildes: [Int32] = [ 0, 0 ] if pipe(&fildes) == 0 { return success(self(readFD: fildes[0], writeFD: fildes[1])) } else { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(errno), userInfo: nil) - return failure(nsError) + return failure(.POSIXError(errno)) } } @@ -107,18 +104,17 @@ private final class Pipe { /// Creates a signal that will take ownership of the `readFD` using /// dispatch_io, then read it to completion. /// - /// After subscribing to the returned signal, `readFD` should not be used + /// After starting the returned producer, `readFD` should not be used /// anywhere else, as it may close unexpectedly. - func transferReadsToSignal() -> ColdSignal { + func transferReadsToProducer() -> SignalProducer { let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) - return ColdSignal { sink, disposable in + return SignalProducer { observer, disposable in let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.readFD, queue) { error in if error == 0 { - sink.put(.Completed) + sendCompleted(observer) } else { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(error), userInfo: nil) - sink.put(.Error(nsError)) + sendError(observer, .POSIXError(error)) } close(self.readFD) @@ -127,12 +123,11 @@ private final class Pipe { dispatch_io_set_low_water(channel, 1) dispatch_io_read(channel, 0, UInt.max, queue) { (done, data, error) in if let data = data { - sink.put(.Next(Box(data))) + sendNext(observer, data) } if error != 0 { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(error), userInfo: nil) - sink.put(.Error(nsError)) + sendError(observer, .POSIXError(error)) } if done { @@ -150,39 +145,35 @@ private final class Pipe { /// `signal` into `writeFD`, then closes `writeFD` when the input signal /// terminates. /// - /// After subscribing to the returned signal, `writeFD` should not be used + /// After starting the returned producer, `writeFD` should not be used /// anywhere else, as it may close unexpectedly. /// - /// Returns a signal that will complete or error. - func writeDataFromSignal(signal: ColdSignal) -> ColdSignal<()> { + /// Returns a producer that will complete or error. + func writeDataFromProducer(producer: SignalProducer) -> SignalProducer<(), ReactiveTaskError> { let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) - return ColdSignal { sink, disposable in + return SignalProducer { observer, disposable in let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.writeFD, queue) { error in if error == 0 { - sink.put(.Completed) + sendCompleted(observer) } else { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(error), userInfo: nil) - sink.put(.Error(nsError)) + sendError(observer, .POSIXError(error)) } close(self.writeFD) } - signal.startWithSink { inputDisposable in - disposable.addDisposable(inputDisposable) + producer.startWithSignal { signal, producerDisposable in + disposable.addDisposable(producerDisposable) - return Event.sink(next: { data in + signal.observe(next: { data in let dispatchData = dispatch_data_create(data.bytes, UInt(data.length), queue, nil) dispatch_io_write(channel, 0, dispatchData, queue) { (done, data, error) in if error != 0 { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(error), userInfo: nil) - sink.put(.Error(nsError)) + sendError(observer, .POSIXError(error)) } } - }, error: { error in - sink.put(.Error(error)) }, completed: { dispatch_io_close(channel, 0) }) @@ -200,20 +191,20 @@ private final class Pipe { /// /// If `forwardingSink` is non-nil, each incremental piece of data will be sent /// to it as data is received. -private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf?) -> ColdSignal { - return pipe.transferReadsToSignal() - .on(next: { (data: dispatch_data_t) in +private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf?) -> SignalProducer { + return pipe.transferReadsToProducer() + |> on(next: { (data: dispatch_data_t) in forwardingSink?.put(data as NSData) return () }) - .reduce(initial: nil) { (buffer: dispatch_data_t?, data: dispatch_data_t) in + |> reduce(nil) { (buffer: dispatch_data_t?, data: dispatch_data_t) in if let buffer = buffer { return dispatch_data_create_concat(buffer, data) } else { return data } } - .map { (data: dispatch_data_t?) -> NSData in + |> map { (data: dispatch_data_t?) -> NSData in if let data = data { return data as NSData } else { @@ -224,11 +215,11 @@ private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf? = nil, standardError: SinkOf? = nil) -> ColdSignal { - return ColdSignal { sink, disposable in +public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf? = nil, standardError: SinkOf? = nil) -> SignalProducer { + return SignalProducer { observer, disposable in let task = NSTask() task.launchPath = taskDescription.launchPath task.arguments = taskDescription.arguments @@ -241,35 +232,32 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< task.environment = env } - var stdinSignal: ColdSignal<()> = .empty() + var stdinSignal: SignalProducer<(), ReactiveTaskError> = .empty if let input = taskDescription.standardInput { switch Pipe.create() { case let .Success(pipe): task.standardInput = pipe.unbox.readHandle - stdinSignal = ColdSignal.lazy { + stdinSignal = pipe.unbox.writeDataFromProducer(input) |> on(started: { close(pipe.unbox.readFD) - return pipe.unbox.writeDataFromSignal(input) - } + }) case let .Failure(error): - sink.put(.Error(error)) - return + sendError(observer, error.unbox) } } - ColdSignal.fromResult(Pipe.create()) - // TODO: This should be a zip. - .combineLatestWith(ColdSignal.fromResult(Pipe.create())) - .map { (stdoutPipe, stderrPipe) -> ColdSignal in - let stdoutSignal = aggregateDataReadFromPipe(stdoutPipe, standardOutput) - let stderrSignal = aggregateDataReadFromPipe(stderrPipe, standardError) + SignalProducer(result: Pipe.create()) + |> zipWith(SignalProducer(result: Pipe.create())) + |> mergeMap { stdoutPipe, stderrPipe -> SignalProducer in + let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe, standardOutput) + let stderrProducer = aggregateDataReadFromPipe(stderrPipe, standardError) - let terminationStatusSignal = ColdSignal { sink, disposable in + let terminationStatusProducer = SignalProducer { observer, disposable in task.terminationHandler = { task in - sink.put(.Next(Box(task.terminationStatus))) - sink.put(.Completed) + sendNext(observer, task.terminationStatus) + sendCompleted(observer) } task.standardOutput = stdoutPipe.writeHandle @@ -286,11 +274,11 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< close(stdoutPipe.writeFD) close(stderrPipe.writeFD) - stdinSignal.startWithSink { stdinDisposable in + stdinSignal.startWithSignal { signal, stdinDisposable in disposable.addDisposable(stdinDisposable) - return Event.sink(error: { error in - sink.put(.Error(error)) + signal.observe(error: { error in + sendError(observer, error) }) } @@ -299,25 +287,24 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< } } - return stdoutSignal - .combineLatestWith(stderrSignal) - .combineLatestWith(terminationStatusSignal) - .map { (datas, terminationStatus) -> (NSData, NSData, Int32) in + return stdoutProducer + |> combineLatestWith(stderrSignal) + |> combineLatestWith(terminationStatusSignal) + |> map { datas, terminationStatus -> (NSData, NSData, Int32) in return (datas.0, datas.1, terminationStatus) } - .tryMap { (stdoutData, stderrData, terminationStatus) -> Result in + |> tryMap { stdoutData, stderrData, terminationStatus -> Result in if terminationStatus == EXIT_SUCCESS { return success(stdoutData) } else { let errorString = (stderrData.length > 0 ? String(data: stderrData, encoding: NSUTF8StringEncoding) : nil) - return failure(ReactiveTaskError(exitCode: terminationStatus, standardError: errorString)) + return failure(.ShellTaskFailed(exitCode: terminationStatus, standardError: errorString)) } } } - .merge(identity) - .startWithSink { taskDisposable in + |> startWithSignal { signal, taskDisposable in disposable.addDisposable(taskDisposable) - return sink + signal.observe(observer) } } } From cd0513a2147d1d3e61f1c001d52277f974075f88 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Sat, 28 Feb 2015 22:13:18 -0800 Subject: [PATCH 5/8] carthage update --- Cartfile.resolved | 4 ++-- Carthage/Checkouts/ReactiveCocoa | 2 +- Carthage/Checkouts/xcconfigs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cartfile.resolved b/Cartfile.resolved index 4ab0771..2d17890 100644 --- a/Cartfile.resolved +++ b/Cartfile.resolved @@ -1,5 +1,5 @@ github "LlamaKit/LlamaKit" "v0.5.0" github "Quick/Nimble" "v0.3.0" github "Quick/Quick" "v0.2.2" -github "jspahrsummers/xcconfigs" "0.7.1" -github "ReactiveCocoa/ReactiveCocoa" "15c2c9a473f5bf09b79061f603cb122a257162e1" +github "jspahrsummers/xcconfigs" "0.7.2" +github "ReactiveCocoa/ReactiveCocoa" "240140c14d023cdb49f4ddf9622c3131619b2197" diff --git a/Carthage/Checkouts/ReactiveCocoa b/Carthage/Checkouts/ReactiveCocoa index 15c2c9a..240140c 160000 --- a/Carthage/Checkouts/ReactiveCocoa +++ b/Carthage/Checkouts/ReactiveCocoa @@ -1 +1 @@ -Subproject commit 15c2c9a473f5bf09b79061f603cb122a257162e1 +Subproject commit 240140c14d023cdb49f4ddf9622c3131619b2197 diff --git a/Carthage/Checkouts/xcconfigs b/Carthage/Checkouts/xcconfigs index b09b4b6..2e77204 160000 --- a/Carthage/Checkouts/xcconfigs +++ b/Carthage/Checkouts/xcconfigs @@ -1 +1 @@ -Subproject commit b09b4b63235b760a3a48a8dd19c0415aaaa8f269 +Subproject commit 2e77204b59c3d97c24e5dd34966fb32c231194f0 From e286f9ffbd3b3080f36aeb2c7e8440c146b770dc Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Sun, 1 Mar 2015 10:27:06 -0800 Subject: [PATCH 6/8] Finish updating Task.swift to latest RAC --- ReactiveTask/Task.swift | 48 +++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index d01fcd3..798aed5 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -193,11 +193,11 @@ private final class Pipe { /// to it as data is received. private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf?) -> SignalProducer { return pipe.transferReadsToProducer() - |> on(next: { (data: dispatch_data_t) in - forwardingSink?.put(data as NSData) - return () - }) |> reduce(nil) { (buffer: dispatch_data_t?, data: dispatch_data_t) in + // FIXME: This should go into on(next:), but the compiler currently + // crashes when that's attempted. + forwardingSink?.put(data as NSData) + if let buffer = buffer { return dispatch_data_create_concat(buffer, data) } else { @@ -232,16 +232,23 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< task.environment = env } - var stdinSignal: SignalProducer<(), ReactiveTaskError> = .empty + var stdinProducer: SignalProducer<(), ReactiveTaskError> = .empty if let input = taskDescription.standardInput { switch Pipe.create() { case let .Success(pipe): task.standardInput = pipe.unbox.readHandle - stdinSignal = pipe.unbox.writeDataFromProducer(input) |> on(started: { + // FIXME: This is basically a reimplementation of on(started:) + // to avoid a compiler crash. + stdinProducer = SignalProducer { observer, disposable in close(pipe.unbox.readFD) - }) + + pipe.unbox.writeDataFromProducer(input).startWithSignal { signal, signalDisposable in + disposable.addDisposable(signalDisposable) + signal.observe(observer) + } + } case let .Failure(error): sendError(observer, error.unbox) @@ -250,7 +257,7 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< SignalProducer(result: Pipe.create()) |> zipWith(SignalProducer(result: Pipe.create())) - |> mergeMap { stdoutPipe, stderrPipe -> SignalProducer in + |> joinMap(.Merge) { stdoutPipe, stderrPipe -> SignalProducer in let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe, standardOutput) let stderrProducer = aggregateDataReadFromPipe(stderrPipe, standardError) @@ -266,7 +273,7 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< if disposable.disposed { stdoutPipe.closePipe() stderrPipe.closePipe() - stdinSignal.start().dispose() + stdinProducer.start().dispose() return } @@ -274,30 +281,25 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< close(stdoutPipe.writeFD) close(stderrPipe.writeFD) - stdinSignal.startWithSignal { signal, stdinDisposable in - disposable.addDisposable(stdinDisposable) - - signal.observe(error: { error in - sendError(observer, error) - }) - } + let stdinDisposable = stdinProducer.start() + disposable.addDisposable(stdinDisposable) disposable.addDisposable { task.terminate() } } - return stdoutProducer - |> combineLatestWith(stderrSignal) - |> combineLatestWith(terminationStatusSignal) - |> map { datas, terminationStatus -> (NSData, NSData, Int32) in - return (datas.0, datas.1, terminationStatus) - } + return + combineLatest( + stdoutProducer, + stderrProducer, + terminationStatusProducer |> promoteErrors(ReactiveTaskError.self) + ) |> tryMap { stdoutData, stderrData, terminationStatus -> Result in if terminationStatus == EXIT_SUCCESS { return success(stdoutData) } else { - let errorString = (stderrData.length > 0 ? String(data: stderrData, encoding: NSUTF8StringEncoding) : nil) + let errorString = (stderrData.length > 0 ? NSString(data: stderrData, encoding: NSUTF8StringEncoding) : nil) return failure(.ShellTaskFailed(exitCode: terminationStatus, standardError: errorString)) } } From 959067e8151fb633663c39dd044cd67f5d99bca5 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Sun, 1 Mar 2015 10:30:52 -0800 Subject: [PATCH 7/8] Update TaskSpec to latest RAC --- ReactiveTaskTests/TaskSpec.swift | 41 ++++++++++++++++---------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/ReactiveTaskTests/TaskSpec.swift b/ReactiveTaskTests/TaskSpec.swift index 5e0a8ee..e484a6d 100644 --- a/ReactiveTaskTests/TaskSpec.swift +++ b/ReactiveTaskTests/TaskSpec.swift @@ -14,29 +14,28 @@ import ReactiveTask class TaskSpec: QuickSpec { override func spec() { - let standardOutput = ObservableProperty(NSData()) - let standardError = ObservableProperty(NSData()) + let standardOutput = MutableProperty(NSData()) + let standardError = MutableProperty(NSData()) beforeEach { standardOutput.value = NSData() standardError.value = NSData() } - func accumulatingSinkForProperty(property: ObservableProperty) -> SinkOf { - let (signal, sink) = HotSignal.pipe() + func accumulatingSinkForProperty(property: MutableProperty) -> SinkOf { + let (signal, sink) = Signal.pipe() - signal.scan(initial: NSData()) { (accum, data) in - let buffer = accum.mutableCopy() as NSMutableData - buffer.appendData(data) + property <~ signal + |> scan(NSData()) { (accum, data) in + let buffer = accum.mutableCopy() as NSMutableData + buffer.appendData(data) - return buffer - // FIXME: This doesn't actually need to be cold, it just works - // around memory management issues. - }.replay(0).start(next: { value in - property.value = value - }) + return buffer + } - return sink + return SinkOf { data in + sendNext(sink, data) + } } it("should launch a task that writes to stdout") { @@ -44,8 +43,8 @@ class TaskSpec: QuickSpec { let task = launchTask(desc, standardOutput: accumulatingSinkForProperty(standardOutput)) expect(standardOutput.value).to(equal(NSData())) - let result = task.wait() - expect(result.isSuccess()).to(beTruthy()) + let result = task |> wait + expect(result.isSuccess).to(beTruthy()) expect(NSString(data: standardOutput.value, encoding: NSUTF8StringEncoding)).to(equal("foobar\n")) } @@ -54,8 +53,8 @@ class TaskSpec: QuickSpec { let task = launchTask(desc, standardError: accumulatingSinkForProperty(standardError)) expect(standardError.value).to(equal(NSData())) - let result = task.wait() - expect(result.isSuccess()).to(beFalsy()) + let result = task |> wait + expect(result.isSuccess).to(beFalsy()) expect(NSString(data: standardError.value, encoding: NSUTF8StringEncoding)).to(equal("stat: not-a-real-file: stat: No such file or directory\n")) } @@ -63,11 +62,11 @@ class TaskSpec: QuickSpec { let strings = [ "foo\n", "bar\n", "buzz\n", "fuzz\n" ] let data = strings.map { $0.dataUsingEncoding(NSUTF8StringEncoding)! } - let desc = TaskDescription(launchPath: "/usr/bin/sort", standardInput: ColdSignal.fromValues(data)) + let desc = TaskDescription(launchPath: "/usr/bin/sort", standardInput: SignalProducer(values: data)) let task = launchTask(desc, standardOutput: accumulatingSinkForProperty(standardOutput)) - let result = task.wait() - expect(result.isSuccess()).to(beTruthy()) + let result = task |> wait + expect(result.isSuccess).to(beTruthy()) expect(NSString(data: standardOutput.value, encoding: NSUTF8StringEncoding)).to(equal("bar\nbuzz\nfoo\nfuzz\n")) } } From bef5506e21c084353d5f0e5e52f0b56fc7671bc9 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Sun, 1 Mar 2015 11:08:47 -0800 Subject: [PATCH 8/8] Work around super bizarre Swift behavior with reduce() https://twitter.com/jspahrsummers/status/572109525546745856 --- ReactiveTask/Task.swift | 46 +++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index 798aed5..e8a0d88 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -192,25 +192,35 @@ private final class Pipe { /// If `forwardingSink` is non-nil, each incremental piece of data will be sent /// to it as data is received. private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf?) -> SignalProducer { - return pipe.transferReadsToProducer() - |> reduce(nil) { (buffer: dispatch_data_t?, data: dispatch_data_t) in - // FIXME: This should go into on(next:), but the compiler currently - // crashes when that's attempted. - forwardingSink?.put(data as NSData) - - if let buffer = buffer { - return dispatch_data_create_concat(buffer, data) - } else { - return data - } - } - |> map { (data: dispatch_data_t?) -> NSData in - if let data = data { - return data as NSData - } else { - return NSData() - } + let readProducer = pipe.transferReadsToProducer() + + return SignalProducer { observer, disposable in + var buffer: dispatch_data_t? = nil + + readProducer.startWithSignal { signal, signalDisposable in + disposable.addDisposable(signalDisposable) + + signal.observe(next: { data in + forwardingSink?.put(data as NSData) + + if let existingBuffer = buffer { + buffer = dispatch_data_create_concat(existingBuffer, data) + } else { + buffer = data + } + }, error: { error in + sendError(observer, error) + }, completed: { + if let buffer = buffer { + sendNext(observer, buffer as NSData) + } else { + sendNext(observer, NSData()) + } + + sendCompleted(observer) + }) } + } } /// Launches a new shell task, using the parameters from `taskDescription`.