Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of the value cache #2151

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 38 additions & 14 deletions internal/runtime/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics {
l.cm.controllerEvaluation.Set(1)
defer l.cm.controllerEvaluation.Set(0)

l.cache.SetScope(options.ArgScope)
if options.ArgScope != nil {
l.cache.UpdateScopeVariables(options.ArgScope.Variables)
}

for key, value := range options.Args {
l.cache.CacheModuleArgument(key, value)
Expand All @@ -166,7 +168,7 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics {

var (
components = make([]ComponentNode, 0)
componentIDs = make([]ComponentID, 0)
componentIDs = make(map[string]ComponentID)
wildum marked this conversation as resolved.
Show resolved Hide resolved
services = make([]*ServiceNode, 0, len(l.services))
)

Expand Down Expand Up @@ -200,7 +202,7 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics {
switch n := n.(type) {
case ComponentNode:
components = append(components, n)
componentIDs = append(componentIDs, n.ID())
componentIDs[n.ID().String()] = n.ID()

if err = l.evaluate(logger, n); err != nil {
var evalDiags diag.Diagnostics
Expand Down Expand Up @@ -260,7 +262,15 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics {
l.componentNodes = components
l.serviceNodes = services
l.graph = &newGraph
l.cache.SyncIDs(componentIDs)
err := l.cache.SyncIDs(componentIDs)
if err != nil {
wildum marked this conversation as resolved.
Show resolved Hide resolved
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: fmt.Sprintf("Failed to update the internal cache with the new list of components: %s", err),
})
// return it now because there is no point to process further
return diags
}
l.blocks = options.ComponentBlocks
if l.globals.OnExportsChange != nil && l.cache.ExportChangeIndex() != l.moduleExportIndex {
l.moduleExportIndex = l.cache.ExportChangeIndex()
Expand Down Expand Up @@ -615,7 +625,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {

// Finally, wire component references.
l.cache.mut.RLock()
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope, l.globals.MinStability)
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.GetContext(), l.globals.MinStability)
l.cache.mut.RUnlock()
for _, ref := range refs {
g.AddEdge(dag.Edge{From: n, To: ref.Target})
Expand Down Expand Up @@ -645,7 +655,7 @@ func (l *Loader) wireCustomComponentNode(g *dag.Graph, cc *CustomComponentNode)
// Variables returns the Variables the Loader exposes for other components to
// reference.
func (l *Loader) Variables() map[string]interface{} {
return l.cache.BuildContext().Variables
return l.cache.GetContext().Variables
}

// Components returns the current set of loaded components.
Expand Down Expand Up @@ -703,7 +713,10 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedN
switch parentNode := parent.Node.(type) {
case ComponentNode:
// Make sure we're in-sync with the current exports of parent.
l.cache.CacheExports(parentNode.ID(), parentNode.Exports())
err := l.cache.CacheExports(parentNode.ID(), parentNode.Exports())
if err != nil {
level.Error(l.log).Log("msg", "failed to cache exports during evaluation", "err", err)
}
case *ImportConfigNode:
// Update the scope with the imported content.
l.componentNodeManager.customComponentReg.updateImportContent(parentNode)
Expand Down Expand Up @@ -780,7 +793,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr
var err error
switch n := n.(type) {
case BlockNode:
ectx := l.cache.BuildContext()
ectx := l.cache.GetContext()

// RLock before evaluate to prevent Evaluating while the config is being reloaded
l.mut.RLock()
Expand Down Expand Up @@ -819,22 +832,33 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr
// evaluate constructs the final context for the BlockNode and
// evaluates it. mut must be held when calling evaluate.
func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error {
ectx := l.cache.BuildContext()
ectx := l.cache.GetContext()
err := bn.Evaluate(ectx)
return l.postEvaluate(logger, bn, err)
}

// postEvaluate is called after a node has been evaluated. It updates the caches and logs any errors.
// mut must be held when calling postEvaluate.
// The evaluation err is passed as an argument to allow shadowing it with an error that could be more relevant to the user
// but cannot be determined before the evaluation (for example, we must evaluate the argument node to see if it's optional before
// raising an error when a value is missing). When err is not nil, this function must return an error.
func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error {
switch c := bn.(type) {
case ComponentNode:
// Always update the cache both the arguments and exports, since both might
// change when a component gets re-evaluated. We also want to cache the arguments and exports in case of an error
l.cache.CacheArguments(c.ID(), c.Arguments())
l.cache.CacheExports(c.ID(), c.Exports())
// Always update the cached exports, since that it might change when a component gets re-evaluated.
// We also want to cache it in case of an error
err2 := l.cache.CacheExports(c.ID(), c.Exports())
if err2 != nil {
if err != nil {
level.Error(logger).Log("msg", "evaluation and exports caching failed", "eval err", err, "caching err", err2)
return errors.Join(err, err2)
} else {
level.Error(logger).Log("msg", "failed to cache exports after evaluation", "err", err2)
return err2
}
}
case *ArgumentConfigNode:
if _, found := l.cache.moduleArguments[c.Label()]; !found {
if _, found := l.cache.GetModuleArgument(c.Label()); !found {
if c.Optional() {
l.cache.CacheModuleArgument(c.Label(), c.Default())
} else {
Expand Down
Loading
Loading