Skip to content

Commit

Permalink
merge from main and resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: sh2 <[email protected]>
  • Loading branch information
shawnh2 committed Aug 2, 2023
2 parents 72744cd + 2182876 commit ac84693
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 8 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/onsi/ginkgo/v2 v2.4.0
github.com/onsi/gomega v1.23.0
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.2
gopkg.in/yaml.v3 v3.0.1
helm.sh/helm/v3 v3.11.1
k8s.io/api v0.26.0
Expand Down Expand Up @@ -102,6 +103,7 @@ require (
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/gtctl/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/spf13/cobra"

"github.com/GreptimeTeam/gtctl/pkg/cmd/gtctl/cluster/connect"
"github.com/GreptimeTeam/gtctl/pkg/cmd/gtctl/cluster/create"
"github.com/GreptimeTeam/gtctl/pkg/cmd/gtctl/cluster/delete"
"github.com/GreptimeTeam/gtctl/pkg/cmd/gtctl/cluster/get"
Expand Down Expand Up @@ -47,6 +48,7 @@ func NewClusterCommand(l logger.Logger) *cobra.Command {
cmd.AddCommand(scale.NewScaleClusterCommand(l))
cmd.AddCommand(get.NewGetClusterCommand(l))
cmd.AddCommand(list.NewListClustersCommand(l))
cmd.AddCommand(connect.NewConnectCommand(l))

return cmd
}
98 changes: 98 additions & 0 deletions pkg/cmd/gtctl/cluster/connect/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connect

import (
"context"
"fmt"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/GreptimeTeam/gtctl/pkg/cmd/gtctl/cluster/connect/mysql"
"github.com/GreptimeTeam/gtctl/pkg/cmd/gtctl/cluster/connect/pg"
"github.com/GreptimeTeam/gtctl/pkg/deployer/k8s"
"github.com/GreptimeTeam/gtctl/pkg/logger"
)

const (
connectionProtocolMySQL = "mysql"
connectionProtocolPostgres = "pg"
)

type getClusterCliOptions struct {
Namespace string
Protocol string
}

func NewConnectCommand(l logger.Logger) *cobra.Command {
var options getClusterCliOptions
cmd := &cobra.Command{
Use: "connect",
Short: "Connect to a GreptimeDB cluster",
Long: `Connect to a GreptimeDB cluster`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("cluster name should be set")
}

k8sDeployer, err := k8s.NewDeployer(l)
if err != nil {
return err
}

var (
ctx = context.TODO()
clusterName = args[0]
namespace = options.Namespace
)

name := types.NamespacedName{
Namespace: options.Namespace,
Name: clusterName,
}.String()
cluster, err := k8sDeployer.GetGreptimeDBCluster(ctx, name, nil)
if err != nil && errors.IsNotFound(err) {
l.Errorf("cluster %s in %s not found\n", clusterName, namespace)
return nil
}
rawCluster, ok := cluster.Raw.(*greptimedbclusterv1alpha1.GreptimeDBCluster)
if !ok {
return fmt.Errorf("invalid cluster type")
}
switch options.Protocol {
case connectionProtocolMySQL:
err := mysql.ConnectCommand(rawCluster, l)
if err != nil {
return fmt.Errorf("error connecting to mysql: %v", err)
}
case connectionProtocolPostgres:
err := pg.ConnectCommand(rawCluster, l)
if err != nil {
return fmt.Errorf("error connecting to postgres: %v", err)
}
default:
return fmt.Errorf("database type not supported")
}
return nil
},
}
cmd.Flags().String("p", "mysql", "Specify a database")
cmd.Flags().StringVarP(&options.Namespace, "namespace", "n", "default", "Namespace of GreptimeDB cluster.")
cmd.Flags().StringVarP(&options.Protocol, "protocol", "p", "mysql", "Specify a database")
return cmd
}
130 changes: 130 additions & 0 deletions pkg/cmd/gtctl/cluster/connect/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"
"os"
"os/exec"
"strconv"
"sync"
"syscall"

"github.com/go-sql-driver/mysql"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/GreptimeTeam/gtctl/pkg/logger"
)

const (
ArgHost = "-h"
ArgPort = "-P"
MySQL = "mysql"
DefaultAddr = "127.0.0.1"
DefaultNet = "tcp"
PrePort = ":"
Kubectl = "kubectl"
PortForward = "port-forward"
)

// ConnectCommand connects to a GreptimeDB cluster
func ConnectCommand(rawCluster *greptimedbclusterv1alpha1.GreptimeDBCluster, l logger.Logger) error {
return connect("127.0.0.1", strconv.Itoa(int(rawCluster.Spec.MySQLServicePort)), rawCluster.Name, l)
}

// connect connects to a GreptimeDB cluster
func connect(host, port, clusterName string, l logger.Logger) error {
waitGroup := sync.WaitGroup{}
cmd := exec.CommandContext(context.Background(), Kubectl, PortForward, "-n", "default", "svc/"+clusterName+"-frontend", port+PrePort+port)
err := cmd.Start()
if err != nil {
l.Errorf("Error starting port-forwarding: %v", err)
return err
}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
if err = cmd.Wait(); err != nil {
//exit status 1
exitError, ok := err.(*exec.ExitError)
if !ok {
l.Errorf("Error waiting for port-forwarding to finish: %v", err)
return
}
if exitError.Sys().(syscall.WaitStatus).ExitStatus() == 1 {
return
}
}
}()
for {
cfg := mysql.Config{
Net: DefaultNet,
Addr: DefaultAddr + PrePort + port,
User: "",
Passwd: "",
DBName: "",
AllowNativePasswords: true,
}

db, err := sql.Open(MySQL, cfg.FormatDSN())
if err != nil {
continue
}

_, err = db.Conn(context.Background())
if err != nil {
continue
}

err = db.Close()
if err != nil {
if err == os.ErrProcessDone {
return nil
}
return err
}
break
}

cmd = mysqlCommand(port)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
err = cmd.Start()
if err != nil {
l.Errorf("Error starting mysql client: %v", err)
return err
}
if err = cmd.Wait(); err != nil {
l.Errorf("Error waiting for mysql client to finish: %v", err)
return err
}
// gracefully stop port-forwarding
err = cmd.Process.Kill()
if err != nil {
if err == os.ErrProcessDone {
l.V(1).Info("Shutting down port-forwarding successfully")
return nil
}
return err
}
waitGroup.Wait()
return nil
}

func mysqlCommand(port string) *exec.Cmd {
return exec.Command(MySQL, ArgHost, DefaultAddr, ArgPort, port)
}
24 changes: 24 additions & 0 deletions pkg/cmd/gtctl/cluster/connect/pg/pg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pg

import (
greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/GreptimeTeam/gtctl/pkg/logger"
)

func ConnectCommand(rawCluster *greptimedbclusterv1alpha1.GreptimeDBCluster, l logger.Logger) error {
return nil
}
90 changes: 90 additions & 0 deletions pkg/cmd/gtctl/cluster/create/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package create

import (
"fmt"
"strings"
)

const (
// Various of support config type
configOperator = "operator"
configCluster = "cluster"
configEtcd = "etcd"
)

type configValues struct {
rawConfig []string

operatorConfig string
clusterConfig string
etcdConfig string
}

// parseConfig parse raw config values and classify it to different
// categories of config type by its prefix.
func (c *configValues) parseConfig() error {
var (
operatorConfig []string
clusterConfig []string
etcdConfig []string
)

for _, raw := range c.rawConfig {
if len(raw) == 0 {
return fmt.Errorf("cannot parse empty config values")
}

var configPrefix, configValue string
values := strings.Split(raw, ",")

for _, value := range values {
value = strings.Trim(value, " ")
config := strings.SplitN(value, ".", 2)
configPrefix = config[0]
if len(config) == 2 {
configValue = config[1]
} else {
configValue = configPrefix
}

switch configPrefix {
case configOperator:
operatorConfig = append(operatorConfig, configValue)
case configCluster:
clusterConfig = append(clusterConfig, configValue)
case configEtcd:
etcdConfig = append(etcdConfig, configValue)
default:
clusterConfig = append(clusterConfig, value)
}
}
}

if len(operatorConfig) > 0 {
c.operatorConfig = strings.Join(operatorConfig, ",")
}

if len(clusterConfig) > 0 {
c.clusterConfig = strings.Join(clusterConfig, ",")
}

if len(etcdConfig) > 0 {
c.etcdConfig = strings.Join(etcdConfig, ",")
}

return nil
}
Loading

0 comments on commit ac84693

Please sign in to comment.