Skip to content

Commit

Permalink
feat: add foreground deletion and refactor deployer for cluster in ba…
Browse files Browse the repository at this point in the history
…re-metal mode (#106)

* refactor bare-metal deployer and add foreground delete for it

Signed-off-by: sh2 <[email protected]>

* add timeout for bare-metal mode deployment

Signed-off-by: sh2 <[email protected]>

* optimize msg of foreground deletion and add ctx for download process

Signed-off-by: sh2 <[email protected]>

* refactor: update some naming nits

Signed-off-by: sh2 <[email protected]>

* refactor: aggregate some commonly used dirs

Signed-off-by: sh2 <[email protected]>

---------

Signed-off-by: sh2 <[email protected]>
  • Loading branch information
shawnh2 authored Aug 3, 2023
1 parent 2182876 commit 4151288
Show file tree
Hide file tree
Showing 16 changed files with 966 additions and 459 deletions.
63 changes: 45 additions & 18 deletions pkg/cmd/gtctl/cluster/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/spf13/cobra"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/GreptimeTeam/gtctl/pkg/cmd/gtctl/cluster/common"
"github.com/GreptimeTeam/gtctl/pkg/deployer"
"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal"
bmconfig "github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/deployer/k8s"
"github.com/GreptimeTeam/gtctl/pkg/logger"
"github.com/GreptimeTeam/gtctl/pkg/status"
Expand Down Expand Up @@ -75,11 +78,18 @@ func NewCreateClusterCommand(l logger.Logger) *cobra.Command {

var (
clusterName = args[0]

// TODO(zyy17): should use timeout context.
ctx = context.TODO()
ctx = context.Background()
cancel context.CancelFunc
)

if options.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(options.Timeout)*time.Second)
defer cancel()
}

ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

clusterDeployer, err := newDeployer(l, clusterName, &options)
if err != nil {
return err
Expand Down Expand Up @@ -113,6 +123,12 @@ func NewCreateClusterCommand(l logger.Logger) *cobra.Command {
}

if err = deployGreptimeDBCluster(ctx, l, &options, spinner, clusterDeployer, clusterName); err != nil {
// Wait the cluster closing if deploy fails in bare-metal mode.
if options.BareMetal {
if err := waitChildProcess(ctx, clusterDeployer, true); err != nil {
return err
}
}
return err
}

Expand All @@ -121,20 +137,8 @@ func NewCreateClusterCommand(l logger.Logger) *cobra.Command {
}

if options.BareMetal {
d, ok := clusterDeployer.(*baremetal.Deployer)
if ok {
version := d.Config().Cluster.Artifact.Version
if version == "" {
version = "unknown"
}
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("The cluster(pid=%d, version=%s) is running in bare-metal mode now...\n",
os.Getpid(), version))
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("To view dashboard by accessing: %s\n", logger.Bold("http://localhost:4000/dashboard/")))

// Wait for all the child processes to exit.
if err := d.Wait(ctx); err != nil {
return err
}
if err := waitChildProcess(ctx, clusterDeployer, false); err != nil {
return err
}
}

Expand Down Expand Up @@ -183,7 +187,7 @@ func newDeployer(l logger.Logger, clusterName string, options *createClusterCliO
}

if options.Config != "" {
var config baremetal.Config
var config bmconfig.Config
data, err := os.ReadFile(options.Config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -319,3 +323,26 @@ func printTips(l logger.Logger, clusterName string, options *createClusterCliOpt
l.V(0).Infof("\nThank you for using %s! Check for more information on %s. 😊", logger.Bold("GreptimeDB"), logger.Bold("https://greptime.com"))
l.V(0).Infof("\n%s 🔑", logger.Bold("Invest in Data, Harvest over Time."))
}

func waitChildProcess(ctx context.Context, deployer deployer.Interface, close bool) error {
d, ok := deployer.(*baremetal.Deployer)
if ok {
v := d.Config().Cluster.Artifact.Version
if len(v) == 0 {
v = "unknown"
}

if !close {
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("The cluster(pid=%d, version=%s) is running in bare-metal mode now...\n", os.Getpid(), v))
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("To view dashboard by accessing: %s\n", logger.Bold("http://localhost:4000/dashboard/")))
} else {
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("The cluster(pid=%d, version=%s) run in bare-metal has been deleted now...\n", os.Getpid(), v))
}

// Wait for all the child processes to exit.
if err := d.Wait(ctx); err != nil {
return err
}
}
return nil
}
28 changes: 18 additions & 10 deletions pkg/deployer/baremetal/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ import (
"archive/zip"
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strings"

"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/logger"
"github.com/GreptimeTeam/gtctl/pkg/utils"
)
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewArtifactManager(workingDir string, l logger.Logger, alwaysDownload bool)
}

// BinaryPath returns the path of the binary of the given type and version.
func (am *ArtifactManager) BinaryPath(typ ArtifactType, artifact *Artifact) (string, error) {
func (am *ArtifactManager) BinaryPath(typ ArtifactType, artifact *config.Artifact) (string, error) {
if artifact.Local != "" {
return artifact.Local, nil
}
Expand All @@ -95,7 +96,7 @@ func (am *ArtifactManager) BinaryPath(typ ArtifactType, artifact *Artifact) (str
}

// PrepareArtifact will download the artifact from the given URL and uncompressed it.
func (am *ArtifactManager) PrepareArtifact(typ ArtifactType, artifact *Artifact) error {
func (am *ArtifactManager) PrepareArtifact(ctx context.Context, typ ArtifactType, artifact *config.Artifact) error {
// If you use the local artifact, we don't need to download it.
if artifact.Local != "" {
return nil
Expand All @@ -106,7 +107,7 @@ func (am *ArtifactManager) PrepareArtifact(typ ArtifactType, artifact *Artifact)
binDir = path.Join(am.dir, typ.String(), artifact.Version, "bin")
)

artifactFile, err := am.download(typ, artifact.Version, pkgDir)
artifactFile, err := am.download(ctx, typ, artifact.Version, pkgDir)
if err != nil {
return err
}
Expand Down Expand Up @@ -186,7 +187,7 @@ func (am *ArtifactManager) installGreptime(artifactFile, binDir string) error {
return nil
}

func (am *ArtifactManager) download(typ ArtifactType, version, pkgDir string) (string, error) {
func (am *ArtifactManager) download(ctx context.Context, typ ArtifactType, version, pkgDir string) (string, error) {
var extension string
switch runtime.GOOS {
case GOOSDarwin:
Expand Down Expand Up @@ -223,7 +224,7 @@ func (am *ArtifactManager) download(typ ArtifactType, version, pkgDir string) (s

am.logger.V(3).Infof("Downloading artifact from '%s' to '%s'", downloadURL, artifactFile)

req, err := http.NewRequest(http.MethodGet, downloadURL, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadURL, nil)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -325,15 +326,20 @@ func (am *ArtifactManager) unzip(file, dst string) error {
return err
}

dstFile.Close()
fileInArchive.Close()
if err := dstFile.Close(); err != nil {
return err
}

if err := fileInArchive.Close(); err != nil {
return err
}
}

return nil
}

func (am *ArtifactManager) untar(file, dst string) error {
data, err := ioutil.ReadFile(file)
data, err := os.ReadFile(file)
if err != nil {
return err
}
Expand Down Expand Up @@ -365,7 +371,9 @@ func (am *ArtifactManager) untar(file, dst string) error {
if _, err := io.Copy(outFile, tarReader); err != nil {
return err
}
outFile.Close()
if err := outFile.Close(); err != nil {
return err
}
case tar.TypeDir:
if err := os.Mkdir(dst+"/"+header.Name, 0755); err != nil {
return err
Expand Down
120 changes: 120 additions & 0 deletions pkg/deployer/baremetal/component/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import (
"bufio"
"context"
"os"
"os/exec"
"path"
"strconv"
"sync"
"syscall"

"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/logger"
)

// WorkDirs include all the dirs used in bare-metal mode.
type WorkDirs struct {
DataDir string
LogsDir string
PidsDir string
}

// BareMetalCluster describes all the components need to be deployed under bare-metal mode.
type BareMetalCluster struct {
MetaSrv BareMetalClusterComponent
Datanode BareMetalClusterComponent
Frontend BareMetalClusterComponent
Etcd BareMetalClusterComponent
}

// BareMetalClusterComponent is the basic unit of running GreptimeDB Cluster in bare-metal mode.
type BareMetalClusterComponent interface {
// Start starts cluster component by executing binary.
Start(ctx context.Context, binary string) error

// BuildArgs build up args for cluster component.
BuildArgs(ctx context.Context, params ...interface{}) []string

// IsRunning returns the status of current cluster component.
IsRunning(ctx context.Context) bool

// Delete deletes resources that allocated in the system for current component.
Delete(ctx context.Context) error
}

func NewGreptimeDBCluster(config *config.Cluster, workDirs WorkDirs, wg *sync.WaitGroup, logger logger.Logger) *BareMetalCluster {
return &BareMetalCluster{
MetaSrv: newMetaSrv(config.MetaSrv, workDirs, wg, logger),
Datanode: newDataNodes(config.Datanode, config.MetaSrv.ServerAddr, workDirs, wg, logger),
Frontend: newFrontend(config.Frontend, config.MetaSrv.ServerAddr, workDirs, wg, logger),
Etcd: newEtcd(workDirs, wg, logger),
}
}

func runBinary(ctx context.Context, binary string, args []string, logDir string, pidDir string,
wg *sync.WaitGroup, logger logger.Logger) error {
cmd := exec.CommandContext(ctx, binary, args...)

// output to binary.
logFile := path.Join(logDir, "log")
outputFile, err := os.Create(logFile)
if err != nil {
return err
}

outputFileWriter := bufio.NewWriter(outputFile)
cmd.Stdout = outputFileWriter
cmd.Stderr = outputFileWriter

logger.V(3).Infof("run binary '%s' with args: '%v', log: '%s', pid: '%s'", binary, args, logDir, pidDir)

if err := cmd.Start(); err != nil {
return err
}

pidFile := path.Join(pidDir, "pid")
f, err := os.Create(pidFile)
if err != nil {
return err
}

_, err = f.Write([]byte(strconv.Itoa(cmd.Process.Pid)))
if err != nil {
return err
}

go func() {
defer wg.Done()
wg.Add(1)
if err := cmd.Wait(); err != nil {
// Caught signal kill and interrupt error then ignore.
if exit, ok := err.(*exec.ExitError); ok {
if status, ok := exit.Sys().(syscall.WaitStatus); ok {
if status.Signaled() &&
(status.Signal() == syscall.SIGKILL || status.Signal() == syscall.SIGINT) {
return
}
}
}
logger.Errorf("binary '%s' exited with error: %v", binary, err)
}
}()

return nil
}
Loading

0 comments on commit 4151288

Please sign in to comment.