Skip to content
This repository has been archived by the owner on Feb 11, 2021. It is now read-only.

Commit

Permalink
Completely remove DispatchContext
Browse files Browse the repository at this point in the history
  • Loading branch information
iamed2 committed Aug 8, 2017
1 parent a23b6b2 commit c18c119
Show file tree
Hide file tree
Showing 12 changed files with 415 additions and 758 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ Documentation: [![](https://img.shields.io/badge/docs-stable-blue.svg)](https://

## Overview

Using Dispatcher, a `DispatchContext` maintains a computation graph of `DispatchNode`s.
Using Dispatcher, `run!` builds and runs a computation graph of `DispatchNode`s.
`DispatchNode`s represent units of computation that can be run.
The most common `DispatchNode` is `Op`, which represents a function call on some arguments.
Some of those arguments may exist when building the graph, and others may represent the results of other `DispatchNode`s.
An `Executor` executes a whole `DispatchContext`.
An `Executor` executes a whole `DispatchGraph`.
Two `Executor`s are provided.
`AsyncExecutor` executes computations asynchronously using Julia `Task`s.
`ParallelExecutor` executes computations in parallel using all available Julia processes (by calling `@spawn`).
Expand All @@ -31,7 +31,7 @@ Dispatcher is built to deal with discrete, heterogeneous data using any Julia fu
Arbiter requires manually adding tasks and their dependencies and handling data passing.
Dispatcher automatically identifies dependencies from user code and passes data efficiently between dependencies.

> Isn't this just Dask
> Isn't this just Dask?
Pretty much.
The plan is to implement another `Executor` and [integrate](https://github.com/dask/distributed/issues/586) with the [`dask.distributed`](https://distributed.readthedocs.io/) scheduler service to piggyback off of their great work.
Expand Down
6 changes: 3 additions & 3 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ CurrentModule = Dispatcher

## Overview

Using Dispatcher, a `DispatchContext` maintains a computation graph of `DispatchNode`s.
Using Dispatcher, `run!` builds and runs a computation graph of `DispatchNode`s.
`DispatchNode`s represent units of computation that can be run.
The most common `DispatchNode` is `Op`, which represents a function call on some arguments.
Some of those arguments may exist when building the graph, and others may represent the results of other `DispatchNode`s.
An `Executor` executes a whole `DispatchContext`.
An `Executor` executes a whole `DispatchGraph`.
Two `Executor`s are provided.
`AsyncExecutor` executes computations asynchronously using Julia `Task`s.
`ParallelExecutor` executes computations in parallel using all available Julia processes (by calling `@spawn`).
Expand All @@ -27,7 +27,7 @@ Dispatcher is built to deal with discrete, heterogeneous data using any Julia fu
Arbiter requires manually adding tasks and their dependencies and handling data passing.
Dispatcher automatically identifies dependencies from user code and passes data efficiently between dependencies.

> Isn't this just Dask
> Isn't this just Dask?
Pretty much.
The plan is to implement another `Executor` and [integrate](https://github.com/dask/distributed/issues/586) with the [`dask.distributed`](https://distributed.readthedocs.io/) scheduler service to piggyback off of their great work.
Expand Down
30 changes: 6 additions & 24 deletions docs/src/pages/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fetch{T<:DispatchNode}(::T)
```@docs
Op
Op(::Function)
@op
get_label(::Op)
set_label!(::Op, ::AbstractString)
has_label(::Op)
Expand Down Expand Up @@ -86,36 +87,17 @@ add_edge!(::DispatchGraph, ::DispatchNode, ::DispatchNode)
==(::DispatchGraph, ::DispatchGraph)
```

## Context

### DispatchContext

```@docs
DispatchContext
nodes(::DispatchContext)
add!
```

### Macros

```@docs
@dispatch_context
@node
@op
@component
@include
```

## Executors

### Executor

```@docs
Executor
run!{T<:DispatchNode, S<:DispatchNode}(exec::Executor, ctx::DispatchContext, nodes::AbstractArray{T}, input_nodes::AbstractArray{S})
run!(::Executor, ::DispatchContext)
prepare!(::Executor, ::DispatchContext)
dispatch!(::Executor, ::DispatchContext)
run!{T<:DispatchNode, S<:DispatchNode}(exec::Executor, nodes::AbstractArray{T}, input_nodes::AbstractArray{S})
run!(::Executor, ::DispatchGraph)
build_graph
prepare!(::Executor, ::DispatchGraph)
dispatch!(::Executor, ::DispatchGraph)
Dispatcher.retries(::Executor)
Dispatcher.retry_on(::Executor)
```
Expand Down
83 changes: 32 additions & 51 deletions docs/src/pages/manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,43 @@ This avoids having one large process where all data currently being used is stor

### Overview

Using Dispatcher, a `DispatchContext` maintains a computation graph of `DispatchNode`s.
Using Dispatcher, `run!` builds and runs a computation graph of `DispatchNode`s.
`DispatchNode`s represent units of computation that can be run.
The most common `DispatchNode` is `Op`, which represents a function call on some arguments.
Some of those arguments may exist when building the graph, and others may represent the results of other `DispatchNode`s.
An `Executor` executes a whole `DispatchContext`.
An `Executor` builds and executes a whole `DispatchGraph`.
Two `Executor`s are provided.
`AsyncExecutor` executes computations asynchronously using Julia `Task`s.
`ParallelExecutor` executes computations in parallel using all available Julia processes (by calling `@spawn`).

Here is an example defining and executing a graph:

```julia
ctx = @dispatch_context begin
filenames = ["mydata-$d.dat" for d in 1:100]
data = [(@op load(filename)) for filename in filenames]

reference = @op load_from_sql("sql://mytable")
processed = [(@op process(d, reference)) for d in data]

rolled = map(1:(length(processed) - 2)) do i
a = processed[i]
b = processed[i + 1]
c = processed[i + 2]
roll_result = @op roll(a, b, c)
return roll_result
end

compared = map(1:200) do i
a = rand(rolled)
b = rand(rolled)
compare_result = @op compare(a, b)
return compare_result
end

best = @op reduction(@node CollectNode(compared))
filenames = ["mydata-$d.dat" for d in 1:100]
data = [(@op load(filename)) for filename in filenames]

reference = @op load_from_sql("sql://mytable")
processed = [(@op process(d, reference)) for d in data]

rolled = map(1:(length(processed) - 2)) do i
a = processed[i]
b = processed[i + 1]
c = processed[i + 2]
roll_result = @op roll(a, b, c)
return roll_result
end

compared = map(1:200) do i
a = rand(rolled)
b = rand(rolled)
compare_result = @op compare(a, b)
return compare_result
end

best = @op reduction(CollectNode(compared))

executor = ParallelExecutor()
(run_best,) = run!(executor, ctx, [best])
(run_best,) = run!(executor, [best])
```

The components of this example will be discussed below.
Expand All @@ -59,26 +57,15 @@ This example is based on [a Dask example](http://matthewrocklin.com/blog/work/20

A `DispatchNode` generally represents a unit of computation that can be run.
`DispatchNode`s are constructed when defining the graph and are run as part of graph execution.
The `@node` macro takes a `DispatchNode` instance and adds it to the graph in the current context.
The following code, where `CollectNode <: DispatchNode`:
`CollectNode` from the above example is a subtype of `DispatchNode`.

```julia
collection = @node CollectNode(compared)
```

is equivalent to:

```julia
collection = add!(ctx, CollectNode(compared))
```

where `ctx` is the current dispatch context.
Any arguments to `DispatchNode` constructors (including in `@op`) which are `DispatchNode`s are recorded as dependencies in the graph.

### Op

An `Op` is a `DispatchNode` which represents some function call to be run as part of graph execution.
This is the most common type of `DispatchNode`.
The `@op` applies an extra transformation on top of the `@node` macro and deconstructs a function call to add to the graph.
The `@op` macro deconstructs a function call to construct an `Op`.
The following code:

```julia
Expand All @@ -88,33 +75,27 @@ roll_result = @op roll(a, b, c)
is equivalent to:

```julia
roll_result = add!(ctx, Op(roll, a, b, c))
roll_result = Op(roll, a, b, c)
```

where `ctx` is the current dispatch context.
Note that code in the argument list gets evaluated immediately; only the function call is delayed.

### Dispatch Context and Dispatch Graph

The above macros add nodes to a `DispatchContext`. The `DispatchContext` contains a `DispatchGraph`, which stores nodes and dependencies in a graph.
Any arguments to `DispatchNode` constructors (including in `@node` and `@op`) which are `DispatchNode`s are recorded as dependencies in the graph.

### Executors

An `Executor` runs a `DispatchContext`.
An `Executor` runs a `DispatchGraph`.
This package currently provides two `Executor`s: `AsyncExecutor` and `ParallelExecutor`.
They work the same way, except `AsyncExecutor` runs nodes using `@async` and `ParallelExecutor` uses `@spawn`.

This call:

```julia
(run_best,) = run!(executor, ctx, [best])
(run_best,) = run!(executor, [best])
```

takes an `Executor`, a `DispatchContext`, and a `Vector{DispatchNode}`, runs those nodes and all of their ancestors, and returns a collection of `DispatchResult`s (in this case containing only the `DispatchResult` for `best`).
takes an `Executor` and a `Vector{DispatchNode}`, creates a `DispatchGraph` of those nodes and all of their ancestors, runs it, and returns a collection of `DispatchResult`s (in this case containing only the `DispatchResult` for `best`).
A `DispatchResult` is a [`ResultType`](https://github.com/iamed2/ResultTypes.jl) containing either a `DispatchNode` or a `DependencyError` (an error that occurred when attempting to satisfy the requirements for running that node).

It is also possible to feed in inputs in place of nodes in the graph; see [`run!`](api.html#Dispatcher.run!-Tuple{Dispatcher.Executor,Dispatcher.DispatchContext,AbstractArray{T<:Dispatcher.DispatchNode,N},AbstractArray{S<:Dispatcher.DispatchNode,N}}) for more.
It is also possible to feed in inputs in place of nodes in the graph; see [`run!`](api.html#Dispatcher.run!-Tuple{Dispatcher.Executor,AbstractArray{T<:Dispatcher.DispatchNode,N},AbstractArray{S<:Dispatcher.DispatchNode,N}}) for more.

## Further Reading

Expand Down
46 changes: 22 additions & 24 deletions examples/dask-cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,31 @@ end


function main()
ctx = @dispatch_context begin
filenames = ["mydata-$d.dat" for d in 1:100]
data = [(@op load(filename)) for filename in filenames]

reference = @op load_from_sql("sql://mytable")
processed = [(@op process(d, reference)) for d in data]

rolled = map(1:(length(processed) - 2)) do i
a = processed[i]
b = processed[i + 1]
c = processed[i + 2]
roll_result = @op roll(a, b, c)
return roll_result
end

compared = map(1:200) do i
a = rand(rolled)
b = rand(rolled)
compare_result = @op compare(a, b)
return compare_result
end

best = @op reduction(@node CollectNode(compared))
filenames = ["mydata-$d.dat" for d in 1:100]
data = [(@op load(filename)) for filename in filenames]

reference = @op load_from_sql("sql://mytable")
processed = [(@op process(d, reference)) for d in data]

rolled = map(1:(length(processed) - 2)) do i
a = processed[i]
b = processed[i + 1]
c = processed[i + 2]
roll_result = @op roll(a, b, c)
return roll_result
end

compared = map(1:200) do i
a = rand(rolled)
b = rand(rolled)
compare_result = @op compare(a, b)
return compare_result
end

best = @op reduction(CollectNode(compared))

executor = ParallelExecutor()
(run_best,) = run!(executor, ctx, [best])
(run_best,) = run!(executor, [best])

return run_best
end
Expand Down
24 changes: 11 additions & 13 deletions examples/dask-do.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,22 @@ end
function main()
data = [1, 2, 3]

ctx = @dispatch_context begin
A = map(data) do i
@op slowinc(i)
end

B = map(A) do a
@op slowadd(a, 10)
end
A = map(data) do i
@op slowinc(i)
end

C = map(A) do a
@op slowadd(a, 100)
end
B = map(A) do a
@op slowadd(a, 10)
end

result = @op ((@op slowsum(A...)) + (@op slowsum(B...)) + (@op slowsum(C...)))
C = map(A) do a
@op slowadd(a, 100)
end

result = @op ((@op slowsum(A...)) + (@op slowsum(B...)) + (@op slowsum(C...)))

executor = AsyncExecutor()
(run_result,) = run!(executor, ctx, [result])
(run_result,) = run!(executor, [result])

return run_result
end
Expand Down
14 changes: 4 additions & 10 deletions src/Dispatcher.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ else
asyncmap = Base.asyncmap
end

export DispatchContext,
DispatchGraph,
export DispatchGraph,
DispatchNode,
DispatchResult,
DataNode,
Expand All @@ -21,7 +20,6 @@ export DispatchContext,
add_edge!,
nodes,
dependencies,
add!,
has_label,
get_label,
set_label!
Expand All @@ -31,13 +29,10 @@ export Executor,
ParallelExecutor,
dispatch!,
prepare!,
run!
run!,
build_graph

export @dispatch_context,
@op,
@node,
@component,
@include
export @op

using AutoHashEquals
using Compat
Expand All @@ -54,7 +49,6 @@ const logger = get_logger(current_module())

include("nodes.jl")
include("graph.jl")
include("context.jl")
include("executors.jl")
include("macros.jl")

Expand Down
Loading

0 comments on commit c18c119

Please sign in to comment.