Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

Persistent / replicated metadata, util init fixes, and documentation #730

Merged
merged 11 commits into from
Oct 26, 2017
53 changes: 19 additions & 34 deletions cmd/infrakit/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
"strings"

"github.com/docker/infrakit/cmd/infrakit/base"
"github.com/docker/infrakit/cmd/infrakit/manager/schema"

"github.com/docker/infrakit/pkg/cli"
"github.com/docker/infrakit/pkg/discovery"
logutil "github.com/docker/infrakit/pkg/log"
"github.com/docker/infrakit/pkg/manager"
"github.com/docker/infrakit/pkg/plugin"
group_types "github.com/docker/infrakit/pkg/plugin/group/types"
"github.com/docker/infrakit/pkg/rpc/client"
group_rpc "github.com/docker/infrakit/pkg/rpc/group"
manager_rpc "github.com/docker/infrakit/pkg/rpc/manager"
Expand Down Expand Up @@ -113,54 +116,36 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
return err
}

// In any case, the view should be in JSON format

// Treat this as an Any and then convert
any := types.AnyString(view)

groups := []plugin.Spec{}
err = any.Decode(&groups)
if err != nil {
log.Warn("Error parsing the template for plugin specs.")
return err
}

// Check the list of plugins
for _, gp := range groups {

endpoint, err := plugins().Find(gp.Plugin)
commitEachGroup := func(name plugin.Name, gid group.ID, gspec group_types.Spec) error {
endpoint, err := plugins().Find(name)
if err != nil {
return err
}

// unmarshal the group spec
spec := group.Spec{}
if gp.Properties != nil {
err = gp.Properties.Decode(&spec)
if err != nil {
return err
}
}

// TODO(chungers) -- we need to enforce and confirm the type of this.
// Right now we assume the RPC endpoint is indeed a group.
target, err := group_rpc.NewClient(endpoint.Address)
log.Debug("commit", "plugin", name, "address", endpoint.Address, "err", err, "gspec", gspec)

log.Debug("commit", "plugin", gp.Plugin, "address", endpoint.Address, "err", err, "spec", spec)
if err != nil {
return err
}

any, err := types.AnyValue(gspec)
if err != nil {
return err
}

plan, err := target.CommitGroup(spec, *pretend)
plan, err := target.CommitGroup(group.Spec{
ID: gid,
Properties: any,
}, *pretend)

if err != nil {
return err
}

fmt.Println("Group", spec.ID, "with plugin", gp.Plugin, "plan:", plan)
fmt.Println("Group", gid, "with plugin", name, "plan:", plan)
return nil
}

return nil
return schema.ParseInputSpecs([]byte(view), commitEachGroup)
},
}
commit.Flags().AddFlagSet(templateFlags)
Expand Down Expand Up @@ -357,7 +342,7 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
}
change.AddCommand(changeList, changeGet)

cmd.AddCommand(commit, inspect, change, leader)
cmd.AddCommand(commit, inspect, leader)

return cmd
}
Expand Down
34 changes: 34 additions & 0 deletions cmd/infrakit/manager/schema/v0.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package schema

import (
"github.com/docker/infrakit/pkg/plugin"
group_types "github.com/docker/infrakit/pkg/plugin/group/types"
"github.com/docker/infrakit/pkg/spi/group"
"github.com/docker/infrakit/pkg/types"
)

// ParseInputSpecs parses the input bytes which is the groups.json, and calls
// each time a group spec is found.
func ParseInputSpecs(input []byte, foundGroupSpec func(plugin.Name, group.ID, group_types.Spec) error) error {
// TODO - update the schema soon. This is the Plugin/Properties schema
type spec struct {
Plugin plugin.Name
Properties struct {
ID group.ID
Properties group_types.Spec
}
}

specs := []spec{}
err := types.AnyBytes(input).Decode(&specs)
if err != nil {
return err
}
for _, s := range specs {
err = foundGroupSpec(s.Plugin, s.Properties.ID, s.Properties.Properties)
if err != nil {
return err
}
}
return nil
}
33 changes: 7 additions & 26 deletions cmd/infrakit/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/docker/infrakit/cmd/infrakit/base"

"github.com/docker/infrakit/pkg/cli"
"github.com/docker/infrakit/pkg/discovery"
"github.com/docker/infrakit/pkg/launch"
logutil "github.com/docker/infrakit/pkg/log"
Expand All @@ -16,7 +18,6 @@ import (
"github.com/docker/infrakit/pkg/run/manager"
group_kind "github.com/docker/infrakit/pkg/run/v0/group"
manager_kind "github.com/docker/infrakit/pkg/run/v0/manager"
"github.com/docker/infrakit/pkg/types"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -139,42 +140,22 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
}

configURL := start.Flags().String("config-url", "", "URL for the startup configs")
mustAll := start.Flags().Bool("all", false, "Panic if any plugin fails to start")
templateFlags, toJSON, _, processTemplate := base.TemplateProcessor(plugins)
start.Flags().AddFlagSet(templateFlags)

services := cli.NewServices(plugins)
start.Flags().AddFlagSet(services.ProcessTemplateFlags)

start.RunE = func(c *cobra.Command, args []string) error {

if plugins == nil {
panic("no plugins()")
}

parsedRules := []launch.Rule{}

log.Info("config", "url", *configURL)

if *configURL != "" {
buff, err := processTemplate(*configURL)
if err != nil {
return err
}

view, err := toJSON([]byte(buff))
if err != nil {
return err
}

configs := types.AnyBytes(view)
err = configs.Decode(&parsedRules)
if err != nil {
return err
}
}

pluginManager, err := manager.ManagePlugins(parsedRules, plugins, *mustAll, 5*time.Second)
pluginManager, err := cli.PluginManager(plugins, services, *configURL)
if err != nil {
return err
}

defer func() {
if r := recover(); r != nil {
log.Error("Error occurred. Recovered but exiting.", "err", r)
Expand Down
Loading