Skip to content

Commit

Permalink
refactor bare-metal deployer and add foreground delete for it
Browse files Browse the repository at this point in the history
Signed-off-by: sh2 <[email protected]>
  • Loading branch information
shawnh2 committed Jul 28, 2023
1 parent 0148890 commit 384e33d
Show file tree
Hide file tree
Showing 17 changed files with 908 additions and 424 deletions.
10 changes: 8 additions & 2 deletions pkg/cmd/gtctl/cluster/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"io/ioutil"
"os"
"os/signal"
"syscall"
"time"

"github.com/spf13/cobra"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/GreptimeTeam/gtctl/pkg/cmd/gtctl/cluster/common"
"github.com/GreptimeTeam/gtctl/pkg/deployer"
"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal"
bmconfig "github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/deployer/k8s"
"github.com/GreptimeTeam/gtctl/pkg/logger"
"github.com/GreptimeTeam/gtctl/pkg/status"
Expand Down Expand Up @@ -80,6 +83,9 @@ func NewCreateClusterCommand(l logger.Logger) *cobra.Command {
ctx = context.TODO()
)

ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

clusterDeployer, err := newDeployer(l, clusterName, &options)
if err != nil {
return err
Expand Down Expand Up @@ -127,7 +133,7 @@ func NewCreateClusterCommand(l logger.Logger) *cobra.Command {
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("To view dashboard by accessing: %s\n", logger.Bold("http://localhost:4000/dashboard/")))

// Wait for all the child processes to exit.
if err := d.Wait(ctx); err != nil {
if err := d.Wait(ctx, stop); err != nil {
return err
}
}
Expand Down Expand Up @@ -177,7 +183,7 @@ func newDeployer(l logger.Logger, clusterName string, options *createClusterCliO
}

if options.Config != "" {
var config baremetal.Config
var config bmconfig.Config
data, err := ioutil.ReadFile(options.Config)
if err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions pkg/deployer/baremetal/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"runtime"
"strings"

"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/logger"
"github.com/GreptimeTeam/gtctl/pkg/utils"
)
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewArtifactManager(workingDir string, l logger.Logger, alwaysDownload bool)
}

// BinaryPath returns the path of the binary of the given type and version.
func (am *ArtifactManager) BinaryPath(typ ArtifactType, artifact *Artifact) (string, error) {
func (am *ArtifactManager) BinaryPath(typ ArtifactType, artifact *config.Artifact) (string, error) {
if artifact.Local != "" {
return artifact.Local, nil
}
Expand All @@ -95,7 +96,7 @@ func (am *ArtifactManager) BinaryPath(typ ArtifactType, artifact *Artifact) (str
}

// PrepareArtifact will download the artifact from the given URL and uncompressed it.
func (am *ArtifactManager) PrepareArtifact(typ ArtifactType, artifact *Artifact) error {
func (am *ArtifactManager) PrepareArtifact(typ ArtifactType, artifact *config.Artifact) error {
// If you use the local artifact, we don't need to download it.
if artifact.Local != "" {
return nil
Expand Down
105 changes: 105 additions & 0 deletions pkg/deployer/baremetal/component/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import (
"bufio"
"context"
"os"
"os/exec"
"path"
"strconv"
"sync"

"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/logger"
)

// BareMetalCluster describes all the components need to be deployed under bare-metal mode.
type BareMetalCluster struct {
MetaSrv BareMetalClusterComponent
DataNodes BareMetalClusterComponent
Frontend BareMetalClusterComponent
Etcd BareMetalClusterComponent
}

// BareMetalClusterComponent is the basic unit of running GreptimeDB Cluster in bare-metal mode.
type BareMetalClusterComponent interface {
// Start starts cluster component by executing binary.
Start(ctx context.Context, binary string) error

// BuildArgs build up args for cluster component.
BuildArgs(ctx context.Context, params ...interface{}) []string

// IsRunning returns the status of current cluster component.
IsRunning(ctx context.Context) bool

// Delete deletes resources that allocated in the system for current component.
Delete(ctx context.Context) error
}

func NewGreptimeDBCluster(config *config.Cluster, dataDir, logsDir, pidsDir string,
wait *sync.WaitGroup, logger logger.Logger) *BareMetalCluster {
return &BareMetalCluster{
MetaSrv: newMetaSrv(config.Meta, logsDir, pidsDir, wait, logger),
DataNodes: newDataNodes(config.Datanode, config.Meta.ServerAddr, dataDir, logsDir, pidsDir, wait, logger),
Frontend: newFrontend(config.Frontend, config.Meta.ServerAddr, logsDir, pidsDir, wait, logger),
Etcd: newEtcd(dataDir, logsDir, pidsDir, wait, logger),
}
}

func runBinary(ctx context.Context, binary string, args []string, logDir string, pidDir string,
wait *sync.WaitGroup, logger logger.Logger) error {
cmd := exec.Command(binary, args...)

// output to binary.
logFile := path.Join(logDir, "log")
outputFile, err := os.Create(logFile)
if err != nil {
return err
}

outputFileWriter := bufio.NewWriter(outputFile)
cmd.Stdout = outputFileWriter
cmd.Stderr = outputFileWriter

logger.V(3).Infof("run binary '%s' with args: '%v', log: '%s', pid: '%s'", binary, args, logDir, pidDir)

if err := cmd.Start(); err != nil {
return err
}

pidFile := path.Join(pidDir, "pid")
f, err := os.Create(pidFile)
if err != nil {
return err
}

_, err = f.Write([]byte(strconv.Itoa(cmd.Process.Pid)))
if err != nil {
return err
}

go func() {
defer wait.Done()
wait.Add(1)
// TODO(sh2) caught up the `signal: interrupt` error and ignore
if err := cmd.Wait(); err != nil {
logger.Errorf("binary '%s' exited with error: %v", binary, err)
}
}()

return nil
}
190 changes: 190 additions & 0 deletions pkg/deployer/baremetal/component/datanode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import (
"context"
"fmt"
"net"
"net/http"
"path"
"strconv"
"sync"
"time"

"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/logger"
"github.com/GreptimeTeam/gtctl/pkg/utils"
)

type dataNodes struct {
config *config.Datanode
metaSrvAddr string

dataDir string
logsDir string
pidsDir string
wait *sync.WaitGroup
logger logger.Logger

dataHome string
dataNodeLogDirs []string
dataNodePidDirs []string
dataNodeDataDirs []string
}

func newDataNodes(config *config.Datanode, metaSrvAddr, dataDir, logsDir, pidsDir string,
wait *sync.WaitGroup, logger logger.Logger) BareMetalClusterComponent {
return &dataNodes{
config: config,
metaSrvAddr: metaSrvAddr,
dataDir: dataDir,
logsDir: logsDir,
pidsDir: pidsDir,
wait: wait,
logger: logger,
}
}

func (d *dataNodes) Start(ctx context.Context, binary string) error {
dataHome := path.Join(d.dataDir, "home")
if err := utils.CreateDirIfNotExists(dataHome); err != nil {
return err
}
d.dataHome = dataHome

for i := 0; i < d.config.Replicas; i++ {
dirName := fmt.Sprintf("datanode.%d", i)

datanodeLogDir := path.Join(d.logsDir, dirName)
if err := utils.CreateDirIfNotExists(datanodeLogDir); err != nil {
return err
}
d.dataNodeLogDirs = append(d.dataNodeLogDirs, datanodeLogDir)

datanodePidDir := path.Join(d.pidsDir, dirName)
if err := utils.CreateDirIfNotExists(datanodePidDir); err != nil {
return err
}
d.dataNodePidDirs = append(d.dataNodePidDirs, datanodePidDir)

walDir := path.Join(d.dataDir, dirName, "wal")
if err := utils.CreateDirIfNotExists(walDir); err != nil {
return err
}
d.dataNodeDataDirs = append(d.dataNodeDataDirs, path.Join(d.dataDir, dirName))

if err := runBinary(ctx, binary, d.BuildArgs(ctx, i, walDir), datanodeLogDir, datanodePidDir, d.wait, d.logger); err != nil {
return err
}
}

// FIXME(zyy17): Should add a timeout here.
ticker := time.Tick(500 * time.Millisecond)

Check failure on line 95 in pkg/deployer/baremetal/component/datanode.go

View workflow job for this annotation

GitHub Actions / build

SA1015: using time.Tick leaks the underlying ticker, consider using it only in endless functions, tests and the main package, and use time.NewTicker here (staticcheck)

TICKER:
for {

Check failure on line 98 in pkg/deployer/baremetal/component/datanode.go

View workflow job for this annotation

GitHub Actions / build

S1000: should use for range instead of for { select {} } (gosimple)
select {
case <-ticker:
if d.IsRunning(ctx) {
break TICKER
}
}
}

return nil
}

func (d *dataNodes) BuildArgs(ctx context.Context, params ...interface{}) []string {
logLevel := d.config.LogLevel
if logLevel == "" {
logLevel = "info"
}

nodeID_, walDir := params[0], params[1]
nodeID := nodeID_.(int)

args := []string{
fmt.Sprintf("--log-level=%s", logLevel),
"datanode", "start",
fmt.Sprintf("--node-id=%d", nodeID),
fmt.Sprintf("--metasrv-addr=%s", d.metaSrvAddr),
fmt.Sprintf("--rpc-addr=%s", generateDatanodeAddr(d.config.RPCAddr, nodeID)),
fmt.Sprintf("--http-addr=%s", generateDatanodeAddr(d.config.HTTPAddr, nodeID)),
fmt.Sprintf("--data-home=%s", d.dataHome),
fmt.Sprintf("--wal-dir=%s", walDir),
}
return args
}

func (d *dataNodes) IsRunning(ctx context.Context) bool {
for i := 0; i < d.config.Replicas; i++ {
addr := generateDatanodeAddr(d.config.HTTPAddr, i)
_, httpPort, err := net.SplitHostPort(addr)
if err != nil {
d.logger.V(5).Infof("failed to split host port: %s", err)
return false
}

rsp, err := http.Get(fmt.Sprintf("http://localhost:%s/health", httpPort))
if err != nil {
d.logger.V(5).Infof("failed to get datanode health: %s", err)
return false
}

if rsp.StatusCode != http.StatusOK {
return false
}

if err = rsp.Body.Close(); err != nil {
return false
}
}

return true
}

func (d *dataNodes) Delete(ctx context.Context) error {
if err := utils.DeleteDirIfExists(d.dataHome); err != nil {
return err
}

for _, dir := range d.dataNodeLogDirs {
if err := utils.DeleteDirIfExists(dir); err != nil {
return err
}
}

for _, dir := range d.dataNodePidDirs {
if err := utils.DeleteDirIfExists(dir); err != nil {
return err
}
}

for _, dir := range d.dataNodeDataDirs {
if err := utils.DeleteDirIfExists(dir); err != nil {
return err
}
}

return nil
}

func generateDatanodeAddr(addr string, nodeID int) string {
// Already checked in validation.
host, port, _ := net.SplitHostPort(addr)
portInt, _ := strconv.Atoi(port)
return net.JoinHostPort(host, strconv.Itoa(portInt+nodeID))
}
Loading

0 comments on commit 384e33d

Please sign in to comment.