Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel container startup with deferred values #315

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 52 additions & 23 deletions constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/dig/internal/digerror"
"go.uber.org/dig/internal/digreflect"
"go.uber.org/dig/internal/dot"
"go.uber.org/dig/internal/promise"
)

// constructorNode is a node in the dependency graph that represents
Expand All @@ -45,12 +46,18 @@ type constructorNode struct {
// id uniquely identifies the constructor that produces a node.
id dot.CtorID

// Whether this node is already building its paramList and calling the constructor
calling bool

// Whether the constructor owned by this node was already called.
called bool

// Type information about constructor parameters.
paramList paramList

// The result of calling the constructor
deferred promise.Deferred

// Type information about constructor results.
resultList resultList

Expand Down Expand Up @@ -122,42 +129,64 @@ func (n *constructorNode) String() string {
return fmt.Sprintf("deps: %v, ctor: %v", n.paramList, n.ctype)
}

// Call calls this constructor if it hasn't already been called and
// injects any values produced by it into the provided container.
func (n *constructorNode) Call(c containerStore) error {
if n.called {
return nil
// Call calls this constructor if it hasn't already been called and injects any values produced by it into the container
// passed to newConstructorNode.
//
// If constructorNode has a unresolved deferred already in the process of building, it will return that one. If it has
// already been successfully called, it will return an already-resolved deferred. Together these mean it will try the
// call again if it failed last time.
//
// On failure, the returned pointer is not guaranteed to stay in a failed state; another call will reset it back to its
// zero value; don't store the returned pointer. (It will still call each observer only once.)
func (n *constructorNode) Call(c containerStore) *promise.Deferred {
if n.calling || n.called {
return &n.deferred
}

n.calling = true
n.deferred = promise.Deferred{}

if err := shallowCheckDependencies(c, n.paramList); err != nil {
return errMissingDependencies{
n.deferred.Resolve(errMissingDependencies{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to return here and in other error cases?
Or rather, not set calling and deferred until after the check,
with the check itself returning promise.Fail.

Func: n.location,
Reason: err,
}
})
}

args, err := n.paramList.BuildList(c, false /* decorating */)
if err != nil {
var args []reflect.Value
var results []reflect.Value

n.paramList.BuildList(c, false /* decorating */, &args).Catch(func(err error) error {
return errArgumentsFailed{
Func: n.location,
Reason: err,
}
}

receiver := newStagingContainerWriter()
results := c.invoker()(reflect.ValueOf(n.ctor), args)
if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil {
return errConstructorFailed{Func: n.location, Reason: err}
}
}).Then(func() *promise.Deferred {
return c.scheduler().Schedule(func() {
results = c.invoker()(reflect.ValueOf(n.ctor), args)
})
}).Then(func() *promise.Deferred {
receiver := newStagingContainerWriter()
if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil {
return promise.Fail(errConstructorFailed{Func: n.location, Reason: err})
}

// Commit the result to the original container that this constructor
// was supplied to. The provided constructor is only used for a view of
// the rest of the graph to instantiate the dependencies of this
// container.
receiver.Commit(n.s)
n.called = true
// Commit the result to the original container that this constructor
// was supplied to. The provided container is only used for a view of
// the rest of the graph to instantiate the dependencies of this
// container.
receiver.Commit(n.s)
n.calling = false
n.called = true
n.deferred.Resolve(nil)
return promise.Done
}).Catch(func(err error) error {
n.calling = false
n.deferred.Resolve(err)
return nil
})

return nil
return &n.deferred
}

// stagingContainerWriter is a containerWriter that records the changes that
Expand Down
15 changes: 13 additions & 2 deletions constructor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,18 @@ func TestNodeAlreadyCalled(t *testing.T) {
require.False(t, n.called, "node must not have been called")

c := New()
require.NoError(t, n.Call(c.scope), "invoke failed")
d := n.Call(c.scope)
c.scope.sched.Flush()

ok, err := d.Resolved()
require.True(t, ok, "deferred must be resolved")
require.NoError(t, err, "invoke failed")

require.True(t, n.called, "node must be called")
require.NoError(t, n.Call(c.scope), "calling again should be okay")
d = n.Call(c.scope)
c.scope.sched.Flush()

ok, err = d.Resolved()
require.True(t, ok, "deferred must be resolved")
require.NoError(t, err, "calling again should be okay")
}
33 changes: 33 additions & 0 deletions container.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"reflect"

"go.uber.org/dig/internal/dot"
"go.uber.org/dig/internal/scheduler"
)

const (
Expand Down Expand Up @@ -142,6 +143,9 @@ type containerStore interface {

// Returns invokerFn function to use when calling arguments.
invoker() invokerFn

// Returns the scheduler to use for this scope.
scheduler() scheduler.Scheduler
}

// New constructs a Container.
Expand Down Expand Up @@ -231,6 +235,35 @@ func dryInvoker(fn reflect.Value, _ []reflect.Value) []reflect.Value {
return results
}

type maxConcurrencyOption int

// MaxConcurrency run constructors in this container with the given level of
// concurrency:
//
// - max = 0 or 1: run one constructor at a time (this is the default)
//
// - max > 1: run at most 'max' constructors at a time
//
// - max < 0: run an unlimited number of constructors at a time
//
// Concurrency is limited by how many constructors' dependencies are satisfied at
// once and Go's own allocation of OS threads to Goroutines. This is useful for
// applications that have many slow, independent constructors.
func MaxConcurrency(max int) Option {
return maxConcurrencyOption(max)
}

func (m maxConcurrencyOption) applyOption(container *Container) {
switch {
case m == 0, m == 1:
container.scope.sched = scheduler.Synchronous
case m > 1:
container.scope.sched = scheduler.NewParallel(int(m))
case m < 0:
container.scope.sched = new(scheduler.Unbounded)
}
}

// String representation of the entire Container
func (c *Container) String() string {
return c.scope.String()
Expand Down
71 changes: 53 additions & 18 deletions decorate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (

"go.uber.org/dig/internal/digreflect"
"go.uber.org/dig/internal/dot"
"go.uber.org/dig/internal/promise"
)

type decorator interface {
Call(c containerStore) error
Call(c containerStore) *promise.Deferred
ID() dot.CtorID
}

Expand All @@ -42,12 +43,18 @@ type decoratorNode struct {
// Location where this function was defined.
location *digreflect.Func

// Whether this node is already building its paramList and calling the constructor
calling bool

// Whether the decorator owned by this node was already called.
called bool

// Parameters of the decorator.
params paramList

// The result of calling the constructor
deferred promise.Deferred

// Results of the decorator.
results resultList

Expand Down Expand Up @@ -86,32 +93,60 @@ func newDecoratorNode(dcor interface{}, s *Scope) (*decoratorNode, error) {
return n, nil
}

func (n *decoratorNode) Call(s containerStore) error {
if n.called {
return nil
// Call calls this decorator if it hasn't already been called and injects any values produced by it into the container
// passed to newConstructorNode.
//
// If constructorNode has a unresolved deferred already in the process of building, it will return that one. If it has
// already been successfully called, it will return an already-resolved deferred. Together these mean it will try the
// call again if it failed last time.
//
// On failure, the returned pointer is not guaranteed to stay in a failed state; another call will reset it back to its
// zero value; don't store the returned pointer. (It will still call each observer only once.)
func (n *decoratorNode) Call(s containerStore) *promise.Deferred {
if n.calling || n.called {
return &n.deferred
}

n.calling = true
n.deferred = promise.Deferred{}

if err := shallowCheckDependencies(s, n.params); err != nil {
return errMissingDependencies{
n.deferred.Resolve(errMissingDependencies{
Func: n.location,
Reason: err,
}
})
}

args, err := n.params.BuildList(n.s, true /* decorating */)
if err != nil {
return errArgumentsFailed{
Func: n.location,
Reason: err,
var args []reflect.Value
d := n.params.BuildList(s, true /* decorating */, &args)

d.Observe(func(err error) {
if err != nil {
n.calling = false
n.deferred.Resolve(errArgumentsFailed{
Func: n.location,
Reason: err,
})
return
}
}

results := reflect.ValueOf(n.dcor).Call(args)
if err := n.results.ExtractList(n.s, true /* decorated */, results); err != nil {
return err
}
n.called = true
return nil
var results []reflect.Value

s.scheduler().Schedule(func() {
results = s.invoker()(reflect.ValueOf(n.dcor), args)
}).Observe(func(_ error) {
n.calling = false
if err := n.results.ExtractList(n.s, true /* decorated */, results); err != nil {
n.deferred.Resolve(err)
return
}

n.called = true
n.deferred.Resolve(nil)
})
})

return &n.deferred
}

func (n *decoratorNode) ID() dot.CtorID { return n.id }
Expand Down
Loading