From 0ee9ba02671cdb44e83d270eb542ed22bcf6ff34 Mon Sep 17 00:00:00 2001 From: Michael Whittaker Date: Mon, 23 Oct 2023 12:49:24 -0700 Subject: [PATCH] Don't use kube.proto in kube.go. (#81) Previously, `ReplicaSetConfig` in `kube.proto` was used in two ways. It was serialized and passed to the babysitter, and it was used internally by `kube.go`. This PR refactors `kube.go` to use a `deployment` struct instead of a `ReplicaSetConfig`. This will enable me to simplify `ReplicaSetConfig` in a future PR. --- internal/impl/kube.go | 558 +++++++++++++++++++++--------------------- 1 file changed, 273 insertions(+), 285 deletions(-) diff --git a/internal/impl/kube.go b/internal/impl/kube.go index 1e5a00b..a13401f 100644 --- a/internal/impl/kube.go +++ b/internal/impl/kube.go @@ -22,12 +22,11 @@ import ( "io" "os" "path/filepath" + "sort" "strings" "github.com/ServiceWeaver/weaver-kube/internal/proto" "github.com/ServiceWeaver/weaver/runtime/bin" - "github.com/ServiceWeaver/weaver/runtime/graph" - "github.com/ServiceWeaver/weaver/runtime/logging" "github.com/ServiceWeaver/weaver/runtime/protos" "golang.org/x/exp/maps" appsv1 "k8s.io/api/apps/v1" @@ -58,15 +57,33 @@ const ( // Start value for ports used by the public and private listeners. var externalPort int32 = 20000 -// replicaSet contains information about a replica set. -type replicaSet struct { - name string // name of the replica set - components []*ReplicaSetConfig_Component // components hosted by replica set - image string // name of the image to be deployed - namespace string // namespace of the replica set - depId string // app deployment identifier - app *protos.AppConfig // app configuration - traceServiceURL string // trace exporter URL +// deployment contains information about a deployment of a Service Weaver +// application. +// +// Note that this is different from a Kubernetes Deployment. A deployed Service +// Weaver application consists of many Kubernetes Deployments. +type deployment struct { + deploymentId string // globally unique deployment id + image string // Docker image URI + traceServiceURL string // where traces are exported to, if not empty + config *kubeConfig // [kube] config from weaver.toml + app *protos.AppConfig // parsed weaver.toml + groups []group // groups +} + +// group contains information about a possibly replicated group of components. +type group struct { + name string // group name + components []string // hosted components + listeners []listener // hosted listeners +} + +// listener contains information about a listener. +type listener struct { + name string // listener name + serviceName string // Kubernetes service name + port int32 // port on which listener listens + public bool // is the listener publicly accessible } // shortenComponent shortens the given component name to be of the format @@ -90,42 +107,36 @@ func deploymentName(app, component, deploymentId string) string { return fmt.Sprintf("%s-%s-%s", shortened, deploymentId[:8], hash) } -// deploymentName returns a name that is version specific. -func (r *replicaSet) deploymentName() string { - return deploymentName(r.app.Name, r.name, r.depId) -} - -// buildDeployment generates a kubernetes deployment for a replica set. +// buildDeployment generates a Kubernetes Deployment for a group. // // TODO(rgrandl): test to see if it works with an app where a component foo is // collocated with main, and a component bar that is not collocated with main // calls foo. -func (r *replicaSet) buildDeployment(cfg *kubeConfig) (*appsv1.Deployment, error) { - name := r.deploymentName() - matchLabels := map[string]string{"serviceweaver/name": name} +func buildDeployment(d deployment, g group) (*appsv1.Deployment, error) { + // Create labels. + name := deploymentName(d.app.Name, g.name, d.deploymentId) podLabels := map[string]string{ "serviceweaver/name": name, - "serviceweaver/app": r.app.Name, - "serviceweaver/version": r.depId[:8], + "serviceweaver/app": d.app.Name, + "serviceweaver/version": d.deploymentId[:8], } - if cfg.Observability[metricsConfigKey] != disabled { - podLabels["metrics"] = r.app.Name // Needed by Prometheus to scrape the metrics. + if d.config.Observability[metricsConfigKey] != disabled { + podLabels["metrics"] = d.app.Name // Needed by Prometheus to scrape the metrics. } + // Pick DNS policy. dnsPolicy := corev1.DNSClusterFirst - if cfg.UseHostNetwork { + if d.config.UseHostNetwork { dnsPolicy = corev1.DNSClusterFirstWithHostNet } - var components []string - for _, component := range r.components { - components = append(components, logging.ShortenComponent(component.Name)) - } - - container, err := r.buildContainer(cfg) + // Create container. + container, err := buildContainer(d, g) if err != nil { return nil, err } + + // Create Deployment. return &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps/v1", @@ -133,32 +144,34 @@ func (r *replicaSet) buildDeployment(cfg *kubeConfig) (*appsv1.Deployment, error }, ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: r.namespace, + Namespace: d.config.Namespace, Labels: map[string]string{ - "serviceweaver/app": r.app.Name, - "serviceweaver/version": r.depId[:8], + "serviceweaver/app": d.app.Name, + "serviceweaver/version": d.deploymentId[:8], }, Annotations: map[string]string{ - "description": fmt.Sprintf("This Deployment hosts components %v.", strings.Join(components, ", ")), + "description": fmt.Sprintf("This Deployment hosts components %v.", strings.Join(g.components, ", ")), }, }, Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ - MatchLabels: matchLabels, + MatchLabels: map[string]string{ + "serviceweaver/name": name, + }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: podLabels, - Namespace: r.namespace, + Namespace: d.config.Namespace, Annotations: map[string]string{ - "description": fmt.Sprintf("This Pod hosts components %v.", strings.Join(components, ", ")), + "description": fmt.Sprintf("This Pod hosts components %v.", strings.Join(g.components, ", ")), }, }, Spec: corev1.PodSpec{ - ServiceAccountName: cfg.ServiceAccount, + ServiceAccountName: d.config.ServiceAccount, Containers: []corev1.Container{container}, DNSPolicy: dnsPolicy, - HostNetwork: cfg.UseHostNetwork, + HostNetwork: d.config.UseHostNetwork, }, }, }, @@ -170,19 +183,10 @@ func (r *replicaSet) buildDeployment(cfg *kubeConfig) (*appsv1.Deployment, error // Note that for public listeners, we generate a Load Balancer service because // it has to be reachable from the outside; for internal listeners, we generate // a ClusterIP service, reachable only from internal Service Weaver services. -func (r *replicaSet) buildListenerService(lis *ReplicaSetConfig_Listener) (*corev1.Service, error) { - // TODO(rgrandl): Specify whether the listener is public in the name. - // If the service name for the listener is not specified by the user, generate - // a deployment based service name. - lisServiceName := lis.ServiceName - if lisServiceName == "" { - lisServiceName = fmt.Sprintf("%s-%s", lis.Name, r.depId[:8]) - } - var serviceType string - if lis.IsPublic { +func buildListenerService(d deployment, g group, lis listener) (*corev1.Service, error) { + serviceType := "ClusterIP" + if lis.public { serviceType = "LoadBalancer" - } else { - serviceType = "ClusterIP" } return &corev1.Service{ @@ -191,59 +195,58 @@ func (r *replicaSet) buildListenerService(lis *ReplicaSetConfig_Listener) (*core Kind: "Service", }, ObjectMeta: metav1.ObjectMeta{ - Name: lisServiceName, - Namespace: r.namespace, + Name: lis.serviceName, + Namespace: d.config.Namespace, Labels: map[string]string{ - "serviceweaver/app": r.app.Name, - "serviceweaver/listener": lis.Name, - "serviceweaver/version": r.depId[:8], + "serviceweaver/app": d.app.Name, + "serviceweaver/listener": lis.name, + "serviceweaver/version": d.deploymentId[:8], }, Annotations: map[string]string{ - "description": fmt.Sprintf("This Service forwards traffic to the %q listener.", lis.Name), + "description": fmt.Sprintf("This Service forwards traffic to the %q listener.", lis.name), }, }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceType(serviceType), Selector: map[string]string{ - "serviceweaver/name": r.deploymentName(), + "serviceweaver/name": deploymentName(d.app.Name, g.name, d.deploymentId), }, Ports: []corev1.ServicePort{ { Port: servicePort, Protocol: "TCP", - TargetPort: intstr.IntOrString{IntVal: lis.ExternalPort}, + TargetPort: intstr.IntOrString{IntVal: lis.port}, }, }, }, }, nil } -// buildAutoscaler generates a kubernetes horizontal pod autoscaler for a replica set. -func (r *replicaSet) buildAutoscaler() (*autoscalingv2.HorizontalPodAutoscaler, error) { +// buildAutoscaler generates a Kubernetes HorizontalPodAutoscaler for a group. +func buildAutoscaler(d deployment, g group) (*autoscalingv2.HorizontalPodAutoscaler, error) { // Per deployment name that is app version specific. - aname := r.deploymentName() - depName := r.deploymentName() + name := deploymentName(d.app.Name, g.name, d.deploymentId) return &autoscalingv2.HorizontalPodAutoscaler{ TypeMeta: metav1.TypeMeta{ APIVersion: "autoscaling/v2", Kind: "HorizontalPodAutoscaler", }, ObjectMeta: metav1.ObjectMeta{ - Name: aname, - Namespace: r.namespace, + Name: name, + Namespace: d.config.Namespace, Labels: map[string]string{ - "serviceweaver/app": r.app.Name, - "serviceweaver/version": r.depId[:8], + "serviceweaver/app": d.app.Name, + "serviceweaver/version": d.deploymentId[:8], }, Annotations: map[string]string{ - "description": fmt.Sprintf("This HorizontalPodAutoscaler scales the %q Deployment.", depName), + "description": fmt.Sprintf("This HorizontalPodAutoscaler scales the %q Deployment.", name), }, }, Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ APIVersion: "apps/v1", Kind: "Deployment", - Name: depName, + Name: name, }, MinReplicas: ptrOf(int32(1)), MaxReplicas: 10, @@ -265,65 +268,81 @@ func (r *replicaSet) buildAutoscaler() (*autoscalingv2.HorizontalPodAutoscaler, }, nil } -// buildContainer builds a container specification for a replica set. -func (r *replicaSet) buildContainer(cfg *kubeConfig) (corev1.Container, error) { - // Set the binary path in the deployment w.r.t. to the binary path in the - // docker image. - r.app.Binary = fmt.Sprintf("/weaver/%s", filepath.Base(r.app.Binary)) - kubeCfgStr, err := proto.ToEnv(&ReplicaSetConfig{ - Namespace: r.namespace, - Name: r.name, - DepId: r.depId, - App: r.app, - TraceServiceUrl: r.traceServiceURL, - Components: r.components, +// buildContainer builds a container specification for a group. +func buildContainer(d deployment, g group) (corev1.Container, error) { + // Rewrite the app config to point to the binary in the container. + d.app.Binary = fmt.Sprintf("/weaver/%s", filepath.Base(d.app.Binary)) + + // Create the ReplicaSetConfig passed to the babysitter. + // + // TODO(mwhittaker): We associate every listener with the first component. + // This is technically incorrect, but doesn't affect how the babysitter + // runs. This will get simplified when I simplify ReplicaSetConfig. + components := make([]*ReplicaSetConfig_Component, len(g.components)) + for i, component := range g.components { + components[i] = &ReplicaSetConfig_Component{Name: component} + } + components[0].Listeners = make([]*ReplicaSetConfig_Listener, len(g.listeners)) + for i, listener := range g.listeners { + components[0].Listeners[i] = &ReplicaSetConfig_Listener{ + Name: listener.name, + ServiceName: listener.serviceName, + ExternalPort: listener.port, + IsPublic: listener.public, + } + } + configString, err := proto.ToEnv(&ReplicaSetConfig{ + Namespace: d.config.Namespace, + Name: g.name, + DepId: d.deploymentId, + App: d.app, + TraceServiceUrl: d.traceServiceURL, + Components: components, }) if err != nil { return corev1.Container{}, err } - // Always expose the metrics port from the container, so it can be - // discoverable for scraping by Prometheus. + // Gather the set of ports. var ports []corev1.ContainerPort - // Expose all of the listener ports. - for _, ls := range r.components { - for _, l := range ls.Listeners { - ports = append(ports, corev1.ContainerPort{ - Name: l.Name, - ContainerPort: l.ExternalPort, - }) - } + for _, l := range g.listeners { + ports = append(ports, corev1.ContainerPort{ + Name: l.name, + ContainerPort: l.port, + }) } - - if cfg.Observability[metricsConfigKey] != disabled { - // Expose the metrics port from the container, so it can be discoverable for - // scraping by Prometheus. - // TODO(rgrandl): We may want to have a default metrics port that can be scraped - // by any metrics collection system. For now, disable the port if Prometheus - // will not collect the metrics. + if d.config.Observability[metricsConfigKey] != disabled { + // Expose the metrics port from the container, so it can be + // discoverable for scraping by Prometheus. + // + // TODO(rgrandl): We may want to have a default metrics port that can + // be scraped by any metrics collection system. For now, disable the + // port if Prometheus will not collect the metrics. ports = append(ports, corev1.ContainerPort{ - Name: "prometheus", ContainerPort: defaultMetricsPort, + Name: "prometheus", + ContainerPort: defaultMetricsPort, }) } - resources, err := computeResourceRequirements(cfg.Resources) + // Gather the set of resources. + resources, err := computeResourceRequirements(d.config.Resources) if err != nil { return corev1.Container{}, err } c := corev1.Container{ Name: appContainerName, - Image: r.image, + Image: d.image, ImagePullPolicy: corev1.PullIfNotPresent, Args: []string{"babysitter"}, Env: []corev1.EnvVar{ - {Name: kubeConfigEnvKey, Value: kubeCfgStr}, + {Name: kubeConfigEnvKey, Value: configString}, }, Resources: resources, Ports: ports, - // Enabling TTY and Stdin allows the user to run a shell inside the container, - // for debugging. + // Enabling TTY and Stdin allows the user to run a shell inside the + // container, for debugging. TTY: true, Stdin: true, } @@ -362,11 +381,11 @@ func (r *replicaSet) buildContainer(cfg *kubeConfig) (corev1.Container, error) { } // Add probes if any. - if cfg.LivenessProbeOpts != nil { - c.LivenessProbe = createProbeFn(cfg.LivenessProbeOpts) + if d.config.LivenessProbeOpts != nil { + c.LivenessProbe = createProbeFn(d.config.LivenessProbeOpts) } - if cfg.ReadinessProbeOpts != nil { - c.LivenessProbe = createProbeFn(cfg.ReadinessProbeOpts) + if d.config.ReadinessProbeOpts != nil { + c.LivenessProbe = createProbeFn(d.config.ReadinessProbeOpts) } return c, nil } @@ -401,10 +420,16 @@ func (r *replicaSet) buildContainer(cfg *kubeConfig) (corev1.Container, error) { func generateYAMLs(image string, app *protos.AppConfig, depId string, cfg *kubeConfig) error { fmt.Fprintf(os.Stderr, greenText(), "\nGenerating kube deployment info ...") + // Form deployment. + d, err := newDeployment(app, cfg, depId, image) + if err != nil { + return err + } + // Generate header. var b bytes.Buffer yamlFile := filepath.Join(os.TempDir(), fmt.Sprintf("kube_%s.yaml", depId[:8])) - header, err := header(app, cfg, depId, yamlFile) + header, err := header(d, yamlFile) if err != nil { return err } @@ -416,7 +441,7 @@ func generateYAMLs(image string, app *protos.AppConfig, depId string, cfg *kubeC } // Generate core YAMLs (deployments, services, autoscalers). - if err := generateCoreYAMLs(&b, app, depId, cfg, image); err != nil { + if err := generateCoreYAMLs(&b, d); err != nil { return fmt.Errorf("unable to create kube app deployment: %w", err) } @@ -441,7 +466,15 @@ func generateYAMLs(image string, app *protos.AppConfig, depId string, cfg *kubeC } // header returns the informational header at the top of a generated YAML file. -func header(app *protos.AppConfig, cfg *kubeConfig, depId, filename string) (string, error) { +func header(d deployment, filename string) (string, error) { + type content struct { + ToolVersion string + App string + Version string + Groups [][]string + Listeners []string + Filename string + } header := template.Must(template.New("header").Parse(`# This file was generated by "weaver kube" version {{.ToolVersion}} for the following # application: # @@ -455,7 +488,7 @@ func header(app *protos.AppConfig, cfg *kubeConfig, depId, filename string) (str {{- end}} # listeners: {{- range .Listeners}} -# - {{.Name}} ({{.Component}}) +# - {{.}} {{- end}} # # This file contains the following resources: @@ -483,59 +516,37 @@ func header(app *protos.AppConfig, cfg *kubeConfig, depId, filename string) (str `)) - type listener struct { - Name string - Component string - } - - type content struct { - ToolVersion string - App string - Version string - Groups [][]string - Listeners []listener - Filename string - } - // Extract the tool version. toolVersion, _, err := ToolVersion() if err != nil { return "", err } - // Extract components and listeners. - replicaSets, _, err := buildReplicaSets(app, depId, "", cfg) - if err != nil { - return "", err + // Compute groups. + groups := make([][]string, len(d.groups)) + for i, g := range d.groups { + groups[i] = g.components } - var groups [][]string - var listeners []listener - for _, rs := range replicaSets { - if rs == nil { - // TODO(mwhittaker): Debug why buildReplicaSets is returning nil - // replica sets. - continue - } - var group []string - for _, component := range rs.components { - group = append(group, component.Name) - for _, l := range component.Listeners { - listeners = append(listeners, listener{l.Name, component.Name}) - } + + // Compute listeners. + var listeners []string + for _, g := range d.groups { + for _, lis := range g.listeners { + listeners = append(listeners, lis.name) } - groups = append(groups, group) } + // Execute template. var b strings.Builder - header.Execute(&b, content{ + err = header.Execute(&b, content{ ToolVersion: toolVersion, - App: app.Name, - Version: depId[:8], + App: d.app.Name, + Version: d.deploymentId[:8], Groups: groups, Listeners: listeners, Filename: filename, }) - return b.String(), nil + return b.String(), err } // generateRolesAndBindings generates Kubernetes roles and role bindings in @@ -600,65 +611,91 @@ func generateRolesAndBindings(w io.Writer, namespace, serviceAccount string) err } // generateCoreYAMLs generates the core YAMLs for the given deployment. -func generateCoreYAMLs(w io.Writer, app *protos.AppConfig, depId string, cfg *kubeConfig, image string) error { - // Generate the kubernetes replica sets for the deployment, along with - // their communication graph. - replicaSets, rg, err := buildReplicaSets(app, depId, image, cfg) - if err != nil { - return fmt.Errorf("unable to create replica sets: %w", err) - } - - // For each replica set, build a deployment and an autoscaler. If a replica - // set has any listeners, build a service for each listener. We traverse - // the graph in a deterministic order, to achieve a stable YAML file. - for _, n := range graph.ReversePostOrder(rg) { - rs := replicaSets[n] - - // Build a service for each listener. - for _, listeners := range rs.components { - for _, lis := range listeners.Listeners { - ls, err := rs.buildListenerService(lis) - if err != nil { - return fmt.Errorf("unable to create kube listener service for %s: %w", lis.Name, err) - } - if err := marshalResource(w, ls, fmt.Sprintf("Listener Service for replica set %s", rs.name)); err != nil { - return err - } - fmt.Fprintf(os.Stderr, "Generated kube listener service for listener %v\n", lis.Name) +func generateCoreYAMLs(w io.Writer, d deployment) error { + // For each group, build a deployment and an autoscaler. If a group has any + // listeners, build a service for each listener. + for _, g := range d.groups { + // Build a Service for each listener. + for _, lis := range g.listeners { + service, err := buildListenerService(d, g, lis) + if err != nil { + return fmt.Errorf("unable to create kube listener service for %s: %w", lis.name, err) + } + if err := marshalResource(w, service, fmt.Sprintf("Listener Service for group %s", g.name)); err != nil { + return err } + fmt.Fprintf(os.Stderr, "Generated kube listener service for listener %v\n", lis.name) } - // Build a deployment. - d, err := rs.buildDeployment(cfg) + // Build a Deployment for the group. + deployment, err := buildDeployment(d, g) if err != nil { - return fmt.Errorf("unable to create kube deployment for replica set %s: %w", rs.name, err) + return fmt.Errorf("unable to create kube deployment for group %s: %w", g.name, err) } - if err := marshalResource(w, d, fmt.Sprintf("Deployment for replica set %s", rs.name)); err != nil { + if err := marshalResource(w, deployment, fmt.Sprintf("Deployment for group %s", g.name)); err != nil { return err } - fmt.Fprintf(os.Stderr, "Generated kube deployment for replica set %v\n", rs.name) + fmt.Fprintf(os.Stderr, "Generated kube deployment for group %v\n", g.name) - // Build a horizontal pod autoscaler for the deployment. - a, err := rs.buildAutoscaler() + // Build autoscaler HorizontalPodAutoscaler for the Deployment. + autoscaler, err := buildAutoscaler(d, g) if err != nil { - return fmt.Errorf("unable to create kube autoscaler for replica set %s: %w", rs.name, err) + return fmt.Errorf("unable to create kube autoscaler for group %s: %w", g.name, err) } - if err := marshalResource(w, a, fmt.Sprintf("Autoscaler for replica set %s", rs.name)); err != nil { + if err := marshalResource(w, autoscaler, fmt.Sprintf("Autoscaler for group %s", g.name)); err != nil { return err } - fmt.Fprintf(os.Stderr, "Generated kube autoscaler for replica set %v\n", rs.name) + fmt.Fprintf(os.Stderr, "Generated kube autoscaler for group %v\n", g.name) } return nil } -// buildReplicaSets returns the replica sets that will be used for the -// given deployment, along with the communication graph used between those -// replica sets. -func buildReplicaSets(app *protos.AppConfig, depId string, image string, cfg *kubeConfig) ([]*replicaSet, graph.Graph, error) { +// newDeployment returns a new deployment for a Service Weaver application. +func newDeployment(app *protos.AppConfig, cfg *kubeConfig, depId, image string) (deployment, error) { + // Read the components and listeners from the binary. + components, err := readComponentsAndListeners(app.Binary) + if err != nil { + return deployment{}, err + } + + // Map every component to its group, or nil if it's in a group by itself. + groups := map[string]*protos.ComponentGroup{} + for _, group := range app.Colocate { + for _, component := range group.Components { + groups[component] = group + } + } + + // Form groups. + groupsByName := map[string]group{} + for component, listeners := range components { + // We use the first component in a group as the name of the group. + name := component + if group, ok := groups[component]; ok { + name = group.Components[0] + } + + // Append the component and listeners to the group. + g, ok := groupsByName[name] + if !ok { + g = group{name: name} + } + g.components = append(g.components, component) + for _, name := range listeners { + g.listeners = append(g.listeners, newListener(depId, cfg, name)) + } + groupsByName[name] = g + } + + // Sort groups by name to ensure stable YAML. + sorted := maps.Values(groupsByName) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].name < sorted[j].name + }) + // Compute the URL of the export traces service. var traceServiceURL string - jservice := cfg.Observability[tracesConfigKey] - switch { + switch jservice := cfg.Observability[tracesConfigKey]; { case jservice == auto: // Point to the service launched by the kube deployer. traceServiceURL = fmt.Sprintf("http://%s:%d/api/traces", name{app.Name, jaegerAppName}.DNSLabel(), defaultJaegerCollectorPort) @@ -669,62 +706,41 @@ func buildReplicaSets(app *protos.AppConfig, depId string, image string, cfg *ku // No trace to export. } - // Retrieve the components information from the binary. - components, cg, err := readBinary(app, cfg) - if err != nil { - return nil, nil, err - } + return deployment{ + deploymentId: depId, + image: image, + traceServiceURL: traceServiceURL, + config: cfg, + app: app, + groups: sorted, + }, nil - // For all co-located components, choose a component to serve as the - // primary. This will be the first component in each colocation group, as - // specified in the config file. - cmap := make(map[string]graph.Node, len(components)) // component name -> node - cg.PerNode(func(n graph.Node) { - cmap[components[n].Name] = n - }) - primary := make([]graph.Node, len(components)) - cg.PerNode(func(n graph.Node) { // default: each component its own primary - primary[n] = n - }) - for _, group := range app.Colocate { - if len(group.Components) == 0 { - continue - } - prim := cmap[group.Components[0]] - for _, c := range group.Components { - primary[cmap[c]] = prim - } +} + +// newListener returns a new listener. +func newListener(depId string, config *kubeConfig, name string) listener { + lis := listener{ + name: name, + serviceName: fmt.Sprintf("%s-%s", name, depId[:8]), + public: false, + port: externalPort, } + externalPort++ - // Build the replica set information, along with an associated graph - // of replica sets. - replicaSets := make([]*replicaSet, len(components)) - nodes := map[graph.Node]struct{}{} - cg.PerNode(func(n graph.Node) { - pn := primary[n] - nodes[pn] = struct{}{} - if replicaSets[pn] == nil { - replicaSets[pn] = &replicaSet{ - name: components[pn].Name, - image: image, - namespace: cfg.Namespace, - depId: depId, - app: app, - traceServiceURL: traceServiceURL, - } - } - replicaSets[pn].components = append(replicaSets[pn].components, components[n]) - }) - edges := map[graph.Edge]struct{}{} - graph.PerEdge(cg, func(e graph.Edge) { - src := primary[e.Src] - dst := primary[e.Dst] - edges[graph.Edge{Src: src, Dst: dst}] = struct{}{} - }) - return replicaSets, graph.NewAdjacencyGraph(maps.Keys(nodes), maps.Keys(edges)), nil + opts, ok := config.Listeners[name] + if ok && opts.ServiceName != "" { + lis.serviceName = opts.ServiceName + } + if ok && opts.Public { + lis.public = opts.Public + } + if ok && opts.Port != 0 { + lis.port = opts.Port + } + return lis } -// computeResourceRequirements compute the resource requirements to run the application's pods. +// computeResourceRequirements computes resource requirements. func computeResourceRequirements(req resourceRequirements) (corev1.ResourceRequirements, error) { requests := corev1.ResourceList{} limits := corev1.ResourceList{} @@ -767,55 +783,27 @@ func computeResourceRequirements(req resourceRequirements) (corev1.ResourceRequi }, nil } -// readBinary returns the component and listener information embedded in the -// binary. -func readBinary(app *protos.AppConfig, cfg *kubeConfig) ([]*ReplicaSetConfig_Component, graph.Graph, error) { - // Read the component graph from the binary. - cs, g, err := bin.ReadComponentGraph(app.Binary) +// readComponentsAndListeners returns a map from every component to its +// (potentially empty) set of listeners. +func readComponentsAndListeners(binary string) (map[string][]string, error) { + // Read the components from the binary. + names, _, err := bin.ReadComponentGraph(binary) if err != nil { - return nil, nil, fmt.Errorf("unable to retrieve the call graph for binary %s: %w", app.Binary, err) + return nil, err + } + components := map[string][]string{} + for _, name := range names { + components[name] = []string{} } - components := make([]*ReplicaSetConfig_Component, len(cs)) - g.PerNode(func(n graph.Node) { - components[n] = &ReplicaSetConfig_Component{Name: cs[n]} - }) - // Read the listeners information from the binary. - ls, err := bin.ReadListeners(app.Binary) + // Read the listeners from the binary. + ls, err := bin.ReadListeners(binary) if err != nil { - return nil, nil, fmt.Errorf("unable to retrieve the listeners for binary %s: %w", app.Binary, err) + return nil, err } - listeners := make(map[string][]string, len(ls)) for _, l := range ls { - listeners[l.Component] = l.Listeners + components[l.Component] = l.Listeners } - // Collate the two. - for _, c := range components { - for _, lis := range listeners[c.Name] { - public := false - if opts := cfg.Listeners[lis]; opts != nil && opts.Public { - public = true - } - var port int32 - if opts := cfg.Listeners[lis]; opts != nil && opts.Port != 0 { - port = opts.Port - } else { - // Pick an unused port. - port = externalPort - externalPort++ - } - var serviceName string - if opts := cfg.Listeners[lis]; opts != nil && opts.ServiceName != "" { - serviceName = opts.ServiceName - } - c.Listeners = append(c.Listeners, &ReplicaSetConfig_Listener{ - Name: lis, - ServiceName: serviceName, - ExternalPort: port, - IsPublic: public, - }) - } - } - return components, g, nil + return components, nil }