diff --git a/.gitmodules b/.gitmodules index 119f1b8..9dde401 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,6 @@ [submodule "Carthage.checkout/LlamaKit"] path = Carthage/Checkouts/LlamaKit - url = https://github.com/Carthage/LlamaKit.git + url = https://github.com/LlamaKit/LlamaKit.git [submodule "Carthage.checkout/Nimble"] path = Carthage/Checkouts/Nimble url = https://github.com/Quick/Nimble.git diff --git a/Cartfile b/Cartfile index e3e9ec7..812afc3 100644 --- a/Cartfile +++ b/Cartfile @@ -1,2 +1,2 @@ github "ReactiveCocoa/ReactiveCocoa" "swift-development" -github "Carthage/LlamaKit" == 0.1.1 +github "LlamaKit/LlamaKit" == 0.5 diff --git a/Cartfile.private b/Cartfile.private index 03c8b50..c6bd3e5 100644 --- a/Cartfile.private +++ b/Cartfile.private @@ -1,3 +1,3 @@ -github "Quick/Quick" == 0.2.0 -github "Quick/Nimble" +github "Quick/Quick" ~> 0.2 +github "Quick/Nimble" ~> 0.3 github "jspahrsummers/xcconfigs" >= 0.6 diff --git a/Cartfile.resolved b/Cartfile.resolved index 4c7ce0f..2d17890 100644 --- a/Cartfile.resolved +++ b/Cartfile.resolved @@ -1,5 +1,5 @@ -github "Carthage/LlamaKit" "carthage-0.1.1" -github "Quick/Nimble" "v0.2.0" -github "Quick/Quick" "v0.2.0" -github "jspahrsummers/xcconfigs" "0.7" -github "ReactiveCocoa/ReactiveCocoa" "e94a432e6c47e03f47cb263225ceccffe9b7bea6" +github "LlamaKit/LlamaKit" "v0.5.0" +github "Quick/Nimble" "v0.3.0" +github "Quick/Quick" "v0.2.2" +github "jspahrsummers/xcconfigs" "0.7.2" +github "ReactiveCocoa/ReactiveCocoa" "240140c14d023cdb49f4ddf9622c3131619b2197" diff --git a/Carthage/Checkouts/LlamaKit b/Carthage/Checkouts/LlamaKit index ac2c2d3..e37b966 160000 --- a/Carthage/Checkouts/LlamaKit +++ b/Carthage/Checkouts/LlamaKit @@ -1 +1 @@ -Subproject commit ac2c2d36bf14bdddd0c1c489df67ad9a7ad04eb4 +Subproject commit e37b966998df6ca05445c0b5d9c6c9560f1e7b61 diff --git a/Carthage/Checkouts/Nimble b/Carthage/Checkouts/Nimble index 6f787ee..aeb5da7 160000 --- a/Carthage/Checkouts/Nimble +++ b/Carthage/Checkouts/Nimble @@ -1 +1 @@ -Subproject commit 6f787eeb75b2fa71464981f28b3dc8d7bd532e69 +Subproject commit aeb5da7bed72483d22ccfef5310ccdd3dd39a06f diff --git a/Carthage/Checkouts/Quick b/Carthage/Checkouts/Quick index 315ae2a..b0e9828 160000 --- a/Carthage/Checkouts/Quick +++ b/Carthage/Checkouts/Quick @@ -1 +1 @@ -Subproject commit 315ae2a4a630156d5ac094ae1f107af60edce21d +Subproject commit b0e98286db7e9d1f1e73cd4e69eac2aae2a1e3f8 diff --git a/Carthage/Checkouts/ReactiveCocoa b/Carthage/Checkouts/ReactiveCocoa index 7121cb3..240140c 160000 --- a/Carthage/Checkouts/ReactiveCocoa +++ b/Carthage/Checkouts/ReactiveCocoa @@ -1 +1 @@ -Subproject commit 7121cb395f610f2d017ee087df874209d8804242 +Subproject commit 240140c14d023cdb49f4ddf9622c3131619b2197 diff --git a/Carthage/Checkouts/xcconfigs b/Carthage/Checkouts/xcconfigs index 6c64238..2e77204 160000 --- a/Carthage/Checkouts/xcconfigs +++ b/Carthage/Checkouts/xcconfigs @@ -1 +1 @@ -Subproject commit 6c64238566ef1126f1b7994aa9e6719229c50451 +Subproject commit 2e77204b59c3d97c24e5dd34966fb32c231194f0 diff --git a/ReactiveTask/Errors.swift b/ReactiveTask/Errors.swift index 89f3943..581cb3a 100644 --- a/ReactiveTask/Errors.swift +++ b/ReactiveTask/Errors.swift @@ -7,20 +7,44 @@ // import Foundation +import ReactiveCocoa -/// Possible error codes within `ReactiveTaskErrorDomain`. -public enum ReactiveTaskError: Int { - /// The domain for all errors originating within ReactiveTask. - public static let domain: NSString = "org.carthage.ReactiveTask" +/// An error originating from ReactiveTask. +public enum ReactiveTaskError { + /// A shell task exited unsuccessfully. + case ShellTaskFailed(exitCode: Int32, standardError: String?) - /// In a user info dictionary, associated with the exit code from a child - /// process. - public static let exitCodeKey: NSString = "ReactiveTaskErrorExitCode" + /// An error was returned from a POSIX API. + case POSIXError(Int32) +} - /// In a user info dictionary, associated with any accumulated stderr - /// string. - public static let standardErrorKey: NSString = "ReactiveTaskErrorStandardError" +extension ReactiveTaskError: ErrorType { + public var nsError: NSError { + switch self { + case let .POSIXError(code): + return NSError(domain: NSPOSIXErrorDomain, code: Int(code), userInfo: nil) - /// A shell task exited unsuccessfully. - case ShellTaskFailed + default: + return NSError(domain: "org.carthage.ReactiveTask", code: 0, userInfo: [ + NSLocalizedDescriptionKey: self.description + ]) + } + } +} + +extension ReactiveTaskError: Printable { + public var description: String { + switch self { + case let .ShellTaskFailed(exitCode, standardError): + var description = "A shell task failed with exit code \(exitCode)" + if let standardError = standardError { + description += ":\n\(standardError)" + } + + return description + + case let .POSIXError: + return nsError.description + } + } } diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index 0c8b3b4..e8a0d88 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -32,12 +32,10 @@ public struct TaskDescription { /// Data to stream to standard input of the launched process. /// - /// An error sent along this signal will interrupt the task. - /// /// If nil, stdin will be inherited from the parent process. - public var standardInput: ColdSignal? + public var standardInput: SignalProducer? - public init(launchPath: String, arguments: [String] = [], workingDirectoryPath: String? = nil, environment: [String: String]? = nil, standardInput: ColdSignal? = nil) { + public init(launchPath: String, arguments: [String] = [], workingDirectoryPath: String? = nil, environment: [String: String]? = nil, standardInput: SignalProducer? = nil) { self.launchPath = launchPath self.arguments = arguments self.workingDirectoryPath = workingDirectoryPath @@ -88,13 +86,12 @@ private final class Pipe { } /// Instantiates a new descriptor pair. - class func create() -> Result { + class func create() -> Result { var fildes: [Int32] = [ 0, 0 ] if pipe(&fildes) == 0 { return success(self(readFD: fildes[0], writeFD: fildes[1])) } else { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(errno), userInfo: nil) - return failure(nsError) + return failure(.POSIXError(errno)) } } @@ -107,18 +104,17 @@ private final class Pipe { /// Creates a signal that will take ownership of the `readFD` using /// dispatch_io, then read it to completion. /// - /// After subscribing to the returned signal, `readFD` should not be used + /// After starting the returned producer, `readFD` should not be used /// anywhere else, as it may close unexpectedly. - func transferReadsToSignal() -> ColdSignal { + func transferReadsToProducer() -> SignalProducer { let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) - return ColdSignal { sink, disposable in + return SignalProducer { observer, disposable in let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.readFD, queue) { error in if error == 0 { - sink.put(.Completed) + sendCompleted(observer) } else { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(error), userInfo: nil) - sink.put(.Error(nsError)) + sendError(observer, .POSIXError(error)) } close(self.readFD) @@ -127,12 +123,11 @@ private final class Pipe { dispatch_io_set_low_water(channel, 1) dispatch_io_read(channel, 0, UInt.max, queue) { (done, data, error) in if let data = data { - sink.put(.Next(Box(data))) + sendNext(observer, data) } if error != 0 { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(error), userInfo: nil) - sink.put(.Error(nsError)) + sendError(observer, .POSIXError(error)) } if done { @@ -150,39 +145,35 @@ private final class Pipe { /// `signal` into `writeFD`, then closes `writeFD` when the input signal /// terminates. /// - /// After subscribing to the returned signal, `writeFD` should not be used + /// After starting the returned producer, `writeFD` should not be used /// anywhere else, as it may close unexpectedly. /// - /// Returns a signal that will complete or error. - func writeDataFromSignal(signal: ColdSignal) -> ColdSignal<()> { + /// Returns a producer that will complete or error. + func writeDataFromProducer(producer: SignalProducer) -> SignalProducer<(), ReactiveTaskError> { let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) - return ColdSignal { sink, disposable in + return SignalProducer { observer, disposable in let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.writeFD, queue) { error in if error == 0 { - sink.put(.Completed) + sendCompleted(observer) } else { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(error), userInfo: nil) - sink.put(.Error(nsError)) + sendError(observer, .POSIXError(error)) } close(self.writeFD) } - signal.startWithSink { inputDisposable in - disposable.addDisposable(inputDisposable) + producer.startWithSignal { signal, producerDisposable in + disposable.addDisposable(producerDisposable) - return Event.sink(next: { data in + signal.observe(next: { data in let dispatchData = dispatch_data_create(data.bytes, UInt(data.length), queue, nil) dispatch_io_write(channel, 0, dispatchData, queue) { (done, data, error) in if error != 0 { - let nsError = NSError(domain: NSPOSIXErrorDomain, code: Int(error), userInfo: nil) - sink.put(.Error(nsError)) + sendError(observer, .POSIXError(error)) } } - }, error: { error in - sink.put(.Error(error)) }, completed: { dispatch_io_close(channel, 0) }) @@ -200,35 +191,45 @@ private final class Pipe { /// /// If `forwardingSink` is non-nil, each incremental piece of data will be sent /// to it as data is received. -private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf?) -> ColdSignal { - return pipe.transferReadsToSignal() - .on(next: { (data: dispatch_data_t) in - forwardingSink?.put(data as NSData) - return () - }) - .reduce(initial: nil) { (buffer: dispatch_data_t?, data: dispatch_data_t) in - if let buffer = buffer { - return dispatch_data_create_concat(buffer, data) - } else { - return data - } - } - .map { (data: dispatch_data_t?) -> NSData in - if let data = data { - return data as NSData - } else { - return NSData() - } +private func aggregateDataReadFromPipe(pipe: Pipe, forwardingSink: SinkOf?) -> SignalProducer { + let readProducer = pipe.transferReadsToProducer() + + return SignalProducer { observer, disposable in + var buffer: dispatch_data_t? = nil + + readProducer.startWithSignal { signal, signalDisposable in + disposable.addDisposable(signalDisposable) + + signal.observe(next: { data in + forwardingSink?.put(data as NSData) + + if let existingBuffer = buffer { + buffer = dispatch_data_create_concat(existingBuffer, data) + } else { + buffer = data + } + }, error: { error in + sendError(observer, error) + }, completed: { + if let buffer = buffer { + sendNext(observer, buffer as NSData) + } else { + sendNext(observer, NSData()) + } + + sendCompleted(observer) + }) } + } } /// Launches a new shell task, using the parameters from `taskDescription`. /// -/// Returns a cold signal that will launch the task when started, then send one +/// Returns a producer that will launch the task when started, then send one /// `NSData` value (representing aggregated data from `stdout`) and complete /// upon success. -public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf? = nil, standardError: SinkOf? = nil) -> ColdSignal { - return ColdSignal { sink, disposable in +public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf? = nil, standardError: SinkOf? = nil) -> SignalProducer { + return SignalProducer { observer, disposable in let task = NSTask() task.launchPath = taskDescription.launchPath task.arguments = taskDescription.arguments @@ -241,35 +242,39 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< task.environment = env } - var stdinSignal: ColdSignal<()> = .empty() + var stdinProducer: SignalProducer<(), ReactiveTaskError> = .empty if let input = taskDescription.standardInput { switch Pipe.create() { case let .Success(pipe): task.standardInput = pipe.unbox.readHandle - stdinSignal = ColdSignal.lazy { + // FIXME: This is basically a reimplementation of on(started:) + // to avoid a compiler crash. + stdinProducer = SignalProducer { observer, disposable in close(pipe.unbox.readFD) - return pipe.unbox.writeDataFromSignal(input) + + pipe.unbox.writeDataFromProducer(input).startWithSignal { signal, signalDisposable in + disposable.addDisposable(signalDisposable) + signal.observe(observer) + } } case let .Failure(error): - sink.put(.Error(error)) - return + sendError(observer, error.unbox) } } - ColdSignal.fromResult(Pipe.create()) - // TODO: This should be a zip. - .combineLatestWith(ColdSignal.fromResult(Pipe.create())) - .map { (stdoutPipe, stderrPipe) -> ColdSignal in - let stdoutSignal = aggregateDataReadFromPipe(stdoutPipe, standardOutput) - let stderrSignal = aggregateDataReadFromPipe(stderrPipe, standardError) + SignalProducer(result: Pipe.create()) + |> zipWith(SignalProducer(result: Pipe.create())) + |> joinMap(.Merge) { stdoutPipe, stderrPipe -> SignalProducer in + let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe, standardOutput) + let stderrProducer = aggregateDataReadFromPipe(stderrPipe, standardError) - let terminationStatusSignal = ColdSignal { sink, disposable in + let terminationStatusProducer = SignalProducer { observer, disposable in task.terminationHandler = { task in - sink.put(.Next(Box(task.terminationStatus))) - sink.put(.Completed) + sendNext(observer, task.terminationStatus) + sendCompleted(observer) } task.standardOutput = stdoutPipe.writeHandle @@ -278,7 +283,7 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< if disposable.disposed { stdoutPipe.closePipe() stderrPipe.closePipe() - stdinSignal.start().dispose() + stdinProducer.start().dispose() return } @@ -286,50 +291,32 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf< close(stdoutPipe.writeFD) close(stderrPipe.writeFD) - stdinSignal.startWithSink { stdinDisposable in - disposable.addDisposable(stdinDisposable) - - return Event.sink(error: { error in - sink.put(.Error(error)) - }) - } + let stdinDisposable = stdinProducer.start() + disposable.addDisposable(stdinDisposable) disposable.addDisposable { task.terminate() } } - return stdoutSignal - .combineLatestWith(stderrSignal) - .combineLatestWith(terminationStatusSignal) - .map { (datas, terminationStatus) -> (NSData, NSData, Int32) in - return (datas.0, datas.1, terminationStatus) - } - .tryMap { (stdoutData, stderrData, terminationStatus) -> Result in + return + combineLatest( + stdoutProducer, + stderrProducer, + terminationStatusProducer |> promoteErrors(ReactiveTaskError.self) + ) + |> tryMap { stdoutData, stderrData, terminationStatus -> Result in if terminationStatus == EXIT_SUCCESS { return success(stdoutData) } else { - let errorString = (stderrData.length > 0 ? NSString(data: stderrData, encoding: NSUTF8StringEncoding) as String? : nil) ?? "" - - var description = "A shell task failed with exit code \(terminationStatus)" - if !errorString.isEmpty { - description += ":\n\(errorString)" - } - - let error = NSError(domain: ReactiveTaskError.domain, code: ReactiveTaskError.ShellTaskFailed.rawValue, userInfo: [ - ReactiveTaskError.exitCodeKey: Int(terminationStatus), - ReactiveTaskError.standardErrorKey: errorString, - NSLocalizedDescriptionKey: errorString - ]) - - return failure(error) + let errorString = (stderrData.length > 0 ? NSString(data: stderrData, encoding: NSUTF8StringEncoding) : nil) + return failure(.ShellTaskFailed(exitCode: terminationStatus, standardError: errorString)) } } } - .merge(identity) - .startWithSink { taskDisposable in + |> startWithSignal { signal, taskDisposable in disposable.addDisposable(taskDisposable) - return sink + signal.observe(observer) } } } diff --git a/ReactiveTaskTests/TaskSpec.swift b/ReactiveTaskTests/TaskSpec.swift index 5e0a8ee..e484a6d 100644 --- a/ReactiveTaskTests/TaskSpec.swift +++ b/ReactiveTaskTests/TaskSpec.swift @@ -14,29 +14,28 @@ import ReactiveTask class TaskSpec: QuickSpec { override func spec() { - let standardOutput = ObservableProperty(NSData()) - let standardError = ObservableProperty(NSData()) + let standardOutput = MutableProperty(NSData()) + let standardError = MutableProperty(NSData()) beforeEach { standardOutput.value = NSData() standardError.value = NSData() } - func accumulatingSinkForProperty(property: ObservableProperty) -> SinkOf { - let (signal, sink) = HotSignal.pipe() + func accumulatingSinkForProperty(property: MutableProperty) -> SinkOf { + let (signal, sink) = Signal.pipe() - signal.scan(initial: NSData()) { (accum, data) in - let buffer = accum.mutableCopy() as NSMutableData - buffer.appendData(data) + property <~ signal + |> scan(NSData()) { (accum, data) in + let buffer = accum.mutableCopy() as NSMutableData + buffer.appendData(data) - return buffer - // FIXME: This doesn't actually need to be cold, it just works - // around memory management issues. - }.replay(0).start(next: { value in - property.value = value - }) + return buffer + } - return sink + return SinkOf { data in + sendNext(sink, data) + } } it("should launch a task that writes to stdout") { @@ -44,8 +43,8 @@ class TaskSpec: QuickSpec { let task = launchTask(desc, standardOutput: accumulatingSinkForProperty(standardOutput)) expect(standardOutput.value).to(equal(NSData())) - let result = task.wait() - expect(result.isSuccess()).to(beTruthy()) + let result = task |> wait + expect(result.isSuccess).to(beTruthy()) expect(NSString(data: standardOutput.value, encoding: NSUTF8StringEncoding)).to(equal("foobar\n")) } @@ -54,8 +53,8 @@ class TaskSpec: QuickSpec { let task = launchTask(desc, standardError: accumulatingSinkForProperty(standardError)) expect(standardError.value).to(equal(NSData())) - let result = task.wait() - expect(result.isSuccess()).to(beFalsy()) + let result = task |> wait + expect(result.isSuccess).to(beFalsy()) expect(NSString(data: standardError.value, encoding: NSUTF8StringEncoding)).to(equal("stat: not-a-real-file: stat: No such file or directory\n")) } @@ -63,11 +62,11 @@ class TaskSpec: QuickSpec { let strings = [ "foo\n", "bar\n", "buzz\n", "fuzz\n" ] let data = strings.map { $0.dataUsingEncoding(NSUTF8StringEncoding)! } - let desc = TaskDescription(launchPath: "/usr/bin/sort", standardInput: ColdSignal.fromValues(data)) + let desc = TaskDescription(launchPath: "/usr/bin/sort", standardInput: SignalProducer(values: data)) let task = launchTask(desc, standardOutput: accumulatingSinkForProperty(standardOutput)) - let result = task.wait() - expect(result.isSuccess()).to(beTruthy()) + let result = task |> wait + expect(result.isSuccess).to(beTruthy()) expect(NSString(data: standardOutput.value, encoding: NSUTF8StringEncoding)).to(equal("bar\nbuzz\nfoo\nfuzz\n")) } }