diff --git a/http/core/core.go b/http/core/core.go index 870da3bc8..600840627 100644 --- a/http/core/core.go +++ b/http/core/core.go @@ -75,3 +75,12 @@ func ExitSuccessWithData(r *pigeon.Request, data interface{}) bool { }) return r.Exit(200) } + +func ExitFailWithData(r *pigeon.Request, data interface{}, message string) bool { + r.SendJSON(pigeon.JSON{ + "errorCode": "503", + "errorMsg": message, + "data": data, + }) + return r.Exit(503) +} diff --git a/http/manager/bind.go b/http/manager/bind.go index 854d154c3..23c60e946 100644 --- a/http/manager/bind.go +++ b/http/manager/bind.go @@ -22,7 +22,10 @@ package manager -import "github.com/opencurve/pigeon" +import ( + "github.com/opencurve/pigeon" + "mime/multipart" +) var METHOD_REQUEST map[string]Request @@ -87,6 +90,19 @@ type DeployClusterRequest struct{} type GetClusterServicesAddrRequest struct{} +type DeployClusterCmdRequest struct { + Command string `json:"command" binding:"required"` +} + +type DeployClusterUploadRequest struct { + FilePath string `json:"filepath" form:"filepath" binding:"required"` + File *multipart.FileHeader `form:"file" binding:"required"` +} + +type DeployClusterDownloadRequest struct { + FilePath string `json:"filepath" form:"filepath" binding:"required"` +} + var requests = []Request{ { "GET", @@ -166,4 +182,22 @@ var requests = []Request{ GetClusterServicesAddrRequest{}, GetClusterServicesAddr, }, + { + "POST", + "cluster.deploy.cmd", + DeployClusterCmdRequest{}, + DeployClusterCmd, + }, + { + "POST", + "cluster.deploy.upload", + DeployClusterUploadRequest{}, + DeployClusterUpload, + }, + { + "GET", + "cluster.deploy.download", + DeployClusterDownloadRequest{}, + DeployClusterDownload, + }, } diff --git a/http/manager/manager.go b/http/manager/manager.go index e919830f2..f6b4ee90e 100644 --- a/http/manager/manager.go +++ b/http/manager/manager.go @@ -24,6 +24,9 @@ package manager import ( "fmt" + "github.com/opencurve/curveadm/internal/utils" + "io" + "os/exec" "strings" "github.com/opencurve/curveadm/cli/cli" @@ -313,3 +316,48 @@ func GetClusterServicesAddr(r *pigeon.Request, ctx *Context) bool { servicesAddr.Addrs = getServicesAddrFromConf(dcs, mcs) return core.ExitSuccessWithData(r, servicesAddr) } + +func DeployClusterCmd(r *pigeon.Request, ctx *Context) bool { + data := ctx.Data.(*DeployClusterCmdRequest) + r.Logger().Info("DeployClusterCmd", pigeon.Field("command", data.Command)) + cmd := exec.Command("/bin/bash", "-c", data.Command) + out, err := cmd.CombinedOutput() + if err != nil { + r.Logger().Warn("DeployClusterCmd failed when execute command", + pigeon.Field("error", err)) + return core.ExitFailWithData(r, string(out), string(out)) + } + r.Logger().Info("DeployClusterCmd", pigeon.Field("result", out)) + return core.ExitSuccessWithData(r, string(out)) +} + +func DeployClusterUpload(r *pigeon.Request, ctx *Context) bool { + data := ctx.Data.(*DeployClusterUploadRequest) + r.Logger().Info("DeployClusterUpload", pigeon.Field("file", data.FilePath)) + mf, err := data.File.Open() + if err != nil { + r.Logger().Warn("DeployClusterUpload failed when open file", + pigeon.Field("error", err)) + return core.ExitFailWithData(r, err.Error(), err.Error()) + } + defer mf.Close() + content, err := io.ReadAll(mf) + if err != nil { + r.Logger().Warn("DeployClusterUpload failed when read file", + pigeon.Field("error", err)) + return core.ExitFailWithData(r, err.Error(), err.Error()) + } + err = utils.WriteFile(data.FilePath, string(content), 0644) + if err != nil { + r.Logger().Warn("DeployClusterUpload failed when write file", + pigeon.Field("error", err)) + return core.ExitFailWithData(r, err.Error(), err.Error()) + } + return core.Exit(r, err) +} + +func DeployClusterDownload(r *pigeon.Request, ctx *Context) bool { + data := ctx.Data.(*DeployClusterDownloadRequest) + r.Logger().Info("DeployClusterDownload", pigeon.Field("file", data.FilePath)) + return r.SendFile(data.FilePath) +} diff --git a/internal/configure/hosts/hc_get.go b/internal/configure/hosts/hc_get.go index 876538410..0778d5710 100644 --- a/internal/configure/hosts/hc_get.go +++ b/internal/configure/hosts/hc_get.go @@ -77,7 +77,13 @@ func (hc *HostConfig) GetForwardAgent() bool { return hc.getBool(CONFIG_FORW func (hc *HostConfig) GetBecomeUser() string { return hc.getString(CONFIG_BECOME_USER) } func (hc *HostConfig) GetLabels() []string { return hc.labels } func (hc *HostConfig) GetEnvs() []string { return hc.envs } +func (hc *HostConfig) GetProtocol() string { return hc.getString(CONFIG_PROTOCOL) } func (hc *HostConfig) GetSSHConfig() *module.SSHConfig { + + if hc.GetProtocol() != SSH_PROTOCOL { + return nil + } + hostname := hc.GetSSHHostname() if len(hostname) == 0 { hostname = hc.GetHostname() @@ -95,3 +101,17 @@ func (hc *HostConfig) GetSSHConfig() *module.SSHConfig { ConnectRetries: curveadm.GlobalCurveAdmConfig.GetSSHRetries(), } } + +func (hc *HostConfig) GetHttpConfig() *module.HttpConfig { + + if hc.GetProtocol() != HTTP_PROTOCOL { + return nil + } + + return &module.HttpConfig{ + Host: hc.GetHostname(), + Port: (uint)(hc.GetHTTPPort()), + } +} + +func (hc *HostConfig) GetHTTPPort() int { return hc.getInt(CONFIG_HTTP_PORT) } diff --git a/internal/configure/hosts/hc_item.go b/internal/configure/hosts/hc_item.go index 6900cbd63..a7fb50cc8 100644 --- a/internal/configure/hosts/hc_item.go +++ b/internal/configure/hosts/hc_item.go @@ -32,7 +32,10 @@ import ( ) const ( - DEFAULT_SSH_PORT = 22 + DEFAULT_SSH_PORT = 22 + DEFAULT_HTTP_PORT = 8000 + SSH_PROTOCOL = "ssh" + HTTP_PROTOCOL = "http" ) var ( @@ -97,4 +100,18 @@ var ( false, nil, ) + + CONFIG_PROTOCOL = itemset.Insert( + "protocol", + comm.REQUIRE_STRING, + false, + SSH_PROTOCOL, + ) + + CONFIG_HTTP_PORT = itemset.Insert( + "http_port", + comm.REQUIRE_POSITIVE_INTEGER, + false, + DEFAULT_HTTP_PORT, + ) ) diff --git a/internal/errno/errno.go b/internal/errno/errno.go index fd40bbc00..307eeccd9 100644 --- a/internal/errno/errno.go +++ b/internal/errno/errno.go @@ -476,7 +476,8 @@ var ( ERR_METASERVER_REQUIRES_3_HOSTS = EC(503009, "metaserver requires at least 3 hosts to distrubute zones") // 510: checker (ssh) - ERR_SSH_CONNECT_FAILED = EC(510000, "SSH connect failed") + ERR_SSH_CONNECT_FAILED = EC(510000, "SSH connect failed") + ERR_HTTP_CONNECT_FAILED = EC(510001, "HTTP connect failed") // 520: checker (permission) ERR_USER_NOT_FOUND = EC(520000, "user not found") diff --git a/internal/task/context/context.go b/internal/task/context/context.go index bbc758373..693e170f6 100644 --- a/internal/task/context/context.go +++ b/internal/task/context/context.go @@ -29,27 +29,27 @@ import ( ) type Context struct { - sshClient *module.SSHClient - module *module.Module - register *Register + remoteClient module.RemoteClient + module *module.Module + register *Register } -func NewContext(sshClient *module.SSHClient) (*Context, error) { +func NewContext(remoteClient module.RemoteClient) (*Context, error) { return &Context{ - sshClient: sshClient, - module: module.NewModule(sshClient), - register: NewRegister(), + remoteClient: remoteClient, + module: module.NewModule(remoteClient), + register: NewRegister(), }, nil } func (ctx *Context) Close() { - if ctx.sshClient != nil { - ctx.sshClient.Client().Close() + if ctx.remoteClient != nil { + ctx.remoteClient.Close() } } -func (ctx *Context) SSHClient() *module.SSHClient { - return ctx.sshClient +func (ctx *Context) RemoteClient() module.RemoteClient { + return ctx.remoteClient } func (ctx *Context) Module() *module.Module { diff --git a/internal/task/step/shell.go b/internal/task/step/shell.go index e002e1cac..a34ff27a7 100644 --- a/internal/task/step/shell.go +++ b/internal/task/step/shell.go @@ -567,18 +567,20 @@ func (s *Scp) Execute(ctx *context.Context) error { return errno.ERR_WRITE_FILE_FAILED.E(err) } - config := ctx.SSHClient().Config() - cmd := ctx.Module().Shell().Scp(localPath, config.User, config.Host, s.RemotePath) - cmd.AddOption("-P %d", config.Port) - if !config.ForwardAgent { - cmd.AddOption("-i %s", config.PrivateKeyPath) - } + //config := ctx.SSHClient().Config() + //cmd := ctx.Module().Shell().Scp(localPath, config.User, config.Host, s.RemotePath) + //cmd.AddOption("-P %d", config.Port) + //if !config.ForwardAgent { + // cmd.AddOption("-i %s", config.PrivateKeyPath) + //} + + err = ctx.Module().File().Upload(localPath, s.RemotePath) options := s.ExecOptions options.ExecWithSudo = false options.ExecInLocal = true - out, err := cmd.Execute(options) - return PostHandle(nil, nil, out, err, errno.ERR_SECURE_COPY_FILE_TO_REMOTE_FAILED) + //out, err := cmd.Execute(options) + return PostHandle(nil, nil, "", err, errno.ERR_SECURE_COPY_FILE_TO_REMOTE_FAILED) } func (s *Command) Execute(ctx *context.Context) error { diff --git a/internal/task/task/bs/add_target.go b/internal/task/task/bs/add_target.go index 703267921..9507fd5e8 100644 --- a/internal/task/task/bs/add_target.go +++ b/internal/task/task/bs/add_target.go @@ -54,7 +54,7 @@ func NewAddTargetTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*task } subname := fmt.Sprintf("host=%s volume=%s", options.Host, volume) - t := task.NewTask("Add Target", subname, hc.GetSSHConfig()) + t := task.NewTask("Add Target", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var output string diff --git a/internal/task/task/bs/balance_leader.go b/internal/task/task/bs/balance_leader.go index 2c29f75a7..565688233 100644 --- a/internal/task/task/bs/balance_leader.go +++ b/internal/task/task/bs/balance_leader.go @@ -48,7 +48,7 @@ func NewBalanceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Ta subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Balance Leader", subname, hc.GetSSHConfig()) + t := task.NewTask("Balance Leader", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step t.AddStep(&step.ContainerExec{ diff --git a/internal/task/task/bs/create_volume.go b/internal/task/task/bs/create_volume.go index 829abb85e..d610e8859 100644 --- a/internal/task/task/bs/create_volume.go +++ b/internal/task/task/bs/create_volume.go @@ -74,7 +74,7 @@ func NewCreateVolumeTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*t } subname := fmt.Sprintf("hostname=%s image=%s", hc.GetHostname(), cc.GetContainerImage()) - t := task.NewTask("Create Volume", subname, hc.GetSSHConfig()) + t := task.NewTask("Create Volume", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var out string diff --git a/internal/task/task/bs/delete_target.go b/internal/task/task/bs/delete_target.go index 7cb454fb0..435d6c125 100644 --- a/internal/task/task/bs/delete_target.go +++ b/internal/task/task/bs/delete_target.go @@ -58,7 +58,7 @@ func NewDeleteTargetTask(curveadm *cli.CurveAdm, cc *client.ClientConfig) (*task } subname := fmt.Sprintf("hostname=%s tid=%s", hc.GetHostname(), options.Tid) - t := task.NewTask("Delete Target", subname, hc.GetSSHConfig()) + t := task.NewTask("Delete Target", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var output string diff --git a/internal/task/task/bs/detect_release.go b/internal/task/task/bs/detect_release.go index b1cb0a23a..e61ce8da3 100644 --- a/internal/task/task/bs/detect_release.go +++ b/internal/task/task/bs/detect_release.go @@ -75,7 +75,7 @@ func NewDetectOSReleaseTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, var success bool var out string subname := fmt.Sprintf("host=%s", host) - t := task.NewTask("Detect OS Release", subname, hc.GetSSHConfig()) + t := task.NewTask("Detect OS Release", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.Cat{ diff --git a/internal/task/task/bs/format.go b/internal/task/task/bs/format.go index 0049477e8..546f5abd7 100644 --- a/internal/task/task/bs/format.go +++ b/internal/task/task/bs/format.go @@ -237,7 +237,7 @@ func NewFormatChunkfilePoolTask(curveadm *cli.CurveAdm, fc *configure.FormatConf usagePercent := fc.GetFormatPercent() subname := fmt.Sprintf("host=%s device=%s mountPoint=%s usage=%d%%", fc.GetHost(), device, mountPoint, usagePercent) - t := task.NewTask("Start Format Chunkfile Pool", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Format Chunkfile Pool", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var oldContainerId, containerId, oldUUID string diff --git a/internal/task/task/bs/format_clean.go b/internal/task/task/bs/format_clean.go index 2bff89e86..7d8d5fd42 100644 --- a/internal/task/task/bs/format_clean.go +++ b/internal/task/task/bs/format_clean.go @@ -78,7 +78,7 @@ func NewCleanFormatTask(curveadm *cli.CurveAdm, fc *configure.FormatConfig) (*ta containerName := device2ContainerName(device) subname := fmt.Sprintf("host=%s device=%s mountPoint=%s containerName=%s", fc.GetHost(), device, mountPoint, containerName) - t := task.NewTask("Clean Format Container", subname, hc.GetSSHConfig()) + t := task.NewTask("Clean Format Container", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/bs/format_status.go b/internal/task/task/bs/format_status.go index 68b3a3265..c7947c9bf 100644 --- a/internal/task/task/bs/format_status.go +++ b/internal/task/task/bs/format_status.go @@ -122,7 +122,7 @@ func NewGetFormatStatusTask(curveadm *cli.CurveAdm, fc *configure.FormatConfig) // new task device := fc.GetDevice() subname := fmt.Sprintf("host=%s device=%s", fc.GetHost(), fc.GetDevice()) - t := task.NewTask("Get Format Status", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Format Status", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var deviceUsage, containerStatus string diff --git a/internal/task/task/bs/format_stop.go b/internal/task/task/bs/format_stop.go index aff2902a4..9dce5747f 100644 --- a/internal/task/task/bs/format_stop.go +++ b/internal/task/task/bs/format_stop.go @@ -79,7 +79,7 @@ func NewStopFormatTask(curveadm *cli.CurveAdm, fc *configure.FormatConfig) (*tas containerName := device2ContainerName(device) subname := fmt.Sprintf("host=%s device=%s mountPoint=%s containerName=%s", fc.GetHost(), device, mountPoint, containerName) - t := task.NewTask("Stop Format Chunkfile Pool", subname, hc.GetSSHConfig()) + t := task.NewTask("Stop Format Chunkfile Pool", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) var oldContainerId string var oldUUID string diff --git a/internal/task/task/bs/install_polarfs.go b/internal/task/task/bs/install_polarfs.go index 9b7976d36..a43cbe0b9 100644 --- a/internal/task/task/bs/install_polarfs.go +++ b/internal/task/task/bs/install_polarfs.go @@ -104,7 +104,7 @@ func NewInstallPolarFSTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) ( // new task release := getRelease(curveadm) subname := fmt.Sprintf("host=%s release=%s", host, release) - t := task.NewTask("Install PolarFS", subname, hc.GetSSHConfig()) + t := task.NewTask("Install PolarFS", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var input, output string diff --git a/internal/task/task/bs/list_targets.go b/internal/task/task/bs/list_targets.go index f7543ca8c..518e14a21 100644 --- a/internal/task/task/bs/list_targets.go +++ b/internal/task/task/bs/list_targets.go @@ -111,7 +111,7 @@ func NewListTargetsTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, erro } subname := fmt.Sprintf("host=%s", hc.GetHostname()) - t := task.NewTask("List Targets", subname, hc.GetSSHConfig()) + t := task.NewTask("List Targets", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var output string diff --git a/internal/task/task/bs/map.go b/internal/task/task/bs/map.go index adcb0c00c..d6b8d14d8 100644 --- a/internal/task/task/bs/map.go +++ b/internal/task/task/bs/map.go @@ -79,7 +79,7 @@ func NewMapTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*task.Task, } subname := fmt.Sprintf("hostname=%s volume=%s:%s", hc.GetHostname(), options.User, options.Volume) - t := task.NewTask("Map Volume", subname, hc.GetSSHConfig()) + t := task.NewTask("Map Volume", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var out string diff --git a/internal/task/task/bs/start_nebd.go b/internal/task/task/bs/start_nebd.go index d7fdcaccd..9d9dfd335 100644 --- a/internal/task/task/bs/start_nebd.go +++ b/internal/task/task/bs/start_nebd.go @@ -146,7 +146,7 @@ func NewStartNEBDServiceTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) } subname := fmt.Sprintf("hostname=%s image=%s", hc.GetHostname(), cc.GetContainerImage()) - t := task.NewTask("Start NEBD Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Start NEBD Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var containerId, out string diff --git a/internal/task/task/bs/start_tgtd.go b/internal/task/task/bs/start_tgtd.go index ef4358e35..01c0a980a 100644 --- a/internal/task/task/bs/start_tgtd.go +++ b/internal/task/task/bs/start_tgtd.go @@ -66,7 +66,7 @@ func NewStartTargetDaemonTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig // new task subname := fmt.Sprintf("host=%s image=%s", options.Host, cc.GetContainerImage()) - t := task.NewTask("Start Target Daemon", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Target Daemon", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var status, containerId, out string diff --git a/internal/task/task/bs/stop_tgtd.go b/internal/task/task/bs/stop_tgtd.go index 0725c0355..c65130cf9 100644 --- a/internal/task/task/bs/stop_tgtd.go +++ b/internal/task/task/bs/stop_tgtd.go @@ -50,7 +50,7 @@ func NewStopTargetDaemonTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, // new task subname := fmt.Sprintf("host=%s", options.Host) - t := task.NewTask("Stop Target Daemon", subname, hc.GetSSHConfig()) + t := task.NewTask("Stop Target Daemon", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var containerId string diff --git a/internal/task/task/bs/uninstall_polarfs.go b/internal/task/task/bs/uninstall_polarfs.go index c228403ca..c89d98547 100644 --- a/internal/task/task/bs/uninstall_polarfs.go +++ b/internal/task/task/bs/uninstall_polarfs.go @@ -77,7 +77,7 @@ func NewUninstallPolarFSTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, // new task release := getRelease(curveadm) subname := fmt.Sprintf("host=%s release=%s", host, release) - t := task.NewTask("Uninstall PolarFS", subname, hc.GetSSHConfig()) + t := task.NewTask("Uninstall PolarFS", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.RemoveFile{ diff --git a/internal/task/task/bs/unmap.go b/internal/task/task/bs/unmap.go index 33a596373..81cdac472 100644 --- a/internal/task/task/bs/unmap.go +++ b/internal/task/task/bs/unmap.go @@ -138,7 +138,7 @@ func NewUnmapTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, error) { subname := fmt.Sprintf("hostname=%s volume=%s:%s containerId=%s", hc.GetHostname(), options.User, options.Volume, tui.TrimContainerId(containerId)) - t := task.NewTask("Unmap Volume", subname, hc.GetSSHConfig()) + t := task.NewTask("Unmap Volume", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var output string diff --git a/internal/task/task/checker/date.go b/internal/task/task/checker/date.go index 41a4900aa..95777db2a 100644 --- a/internal/task/task/checker/date.go +++ b/internal/task/task/checker/date.go @@ -87,7 +87,7 @@ func NewGetHostDate(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Ta } subname := fmt.Sprintf("host=%s start=%d", dc.GetHost(), time.Now().Unix()) - t := task.NewTask("Get Host Date ", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Host Date ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) var start int64 var out string @@ -132,7 +132,7 @@ func checkDate(curveadm *cli.CurveAdm) step.LambdaType { } func NewCheckDate(curveadm *cli.CurveAdm, c interface{}) (*task.Task, error) { - t := task.NewTask("Check Host Date ", "", nil) + t := task.NewTask("Check Host Date ", "", nil, nil) t.AddStep(&step.Lambda{ Lambda: checkDate(curveadm), }) diff --git a/internal/task/task/checker/kernel.go b/internal/task/task/checker/kernel.go index 7a3c4b36e..9dc006242 100644 --- a/internal/task/task/checker/kernel.go +++ b/internal/task/task/checker/kernel.go @@ -106,7 +106,7 @@ func NewCheckKernelVersionTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig // new task subname := fmt.Sprintf("host=%s role=%s require=(>=%s)", dc.GetHost(), dc.GetRole(), CHUNKSERVER_LEAST_KERNEL_VERSION) - t := task.NewTask("Check Kernel Version ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Kernel Version ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string @@ -132,7 +132,7 @@ func NewCheckKernelModuleTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig // new task name := curveadm.MemStorage().Get(comm.KEY_CHECK_KERNEL_MODULE_NAME).(string) subname := fmt.Sprintf("host=%s module=%s", host, name) - t := task.NewTask("Check Kernel Module", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Kernel Module", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/checker/network.go b/internal/task/task/checker/network.go index ca0542fec..e4d9abc2e 100644 --- a/internal/task/task/checker/network.go +++ b/internal/task/task/checker/network.go @@ -128,7 +128,7 @@ func NewCheckPortInUseTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* addresses := getServiceListenAddresses(dc) subname := fmt.Sprintf("host=%s role=%s ports={%s}", dc.GetHost(), dc.GetRole(), joinPorts(dc, addresses)) - t := task.NewTask("Check Port In Use ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Port In Use ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) var containerId, out string var success bool @@ -197,7 +197,7 @@ func NewCheckDestinationReachableTask(curveadm *cli.CurveAdm, dc *topology.Deplo addresses := unique(getServiceConnectAddress(dc, dcs)) subname := fmt.Sprintf("host=%s role=%s ping={%s}", dc.GetHost(), dc.GetRole(), tui.TrimAddress(strings.Join(addresses, ","))) - t := task.NewTask("Check Destination Reachable ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Destination Reachable ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) var out string var success bool @@ -252,7 +252,7 @@ func NewStartHTTPServerTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( addresses := getServiceListenAddresses(dc) subname := fmt.Sprintf("host=%s role=%s ports={%s}", dc.GetHost(), dc.GetRole(), joinPorts(dc, addresses)) - t := task.NewTask("Start Mock HTTP Server ", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Mock HTTP Server ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var containerId, out string @@ -323,7 +323,7 @@ func NewCheckNetworkFirewallTask(curveadm *cli.CurveAdm, dc *topology.DeployConf // add task subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Check Network Firewall ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Network Firewall ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string @@ -389,7 +389,7 @@ func NewCleanEnvironmentTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) // new task subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Clean Precheck Environment", subname, hc.GetSSHConfig()) + t := task.NewTask("Clean Precheck Environment", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/checker/permission.go b/internal/task/task/checker/permission.go index cf906c63f..a4a116875 100644 --- a/internal/task/task/checker/permission.go +++ b/internal/task/task/checker/permission.go @@ -119,7 +119,7 @@ func NewCheckPermissionTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( // new task subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Check Permission ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Permission ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out, hostname string diff --git a/internal/task/task/checker/service.go b/internal/task/task/checker/service.go index 4a9a3af35..85a19077a 100644 --- a/internal/task/task/checker/service.go +++ b/internal/task/task/checker/service.go @@ -162,7 +162,7 @@ func NewCheckChunkfilePoolTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig } subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Check Chunkfile Pool ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Chunkfile Pool ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) t.AddStep(&step2CheckChunkfilePool{ dc: dc, @@ -174,7 +174,7 @@ func NewCheckChunkfilePoolTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig func NewCheckDiskSizeTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Check Disk Size ", subname, nil) + t := task.NewTask("Check Disk Size ", subname, nil, nil) t.AddStep(&step2CheckDiskSize{ dc: dc, @@ -187,7 +187,7 @@ func NewCheckDiskSizeTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*t func NewCheckS3Task(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Check S3", subname, nil) + t := task.NewTask("Check S3", subname, nil, nil) t.AddStep(&step2CheckS3{ s3AccessKey: dc.GetS3AccessKey(), @@ -208,13 +208,13 @@ func NewCheckMdsAddressTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) address := cc.GetClusterMDSAddr() subname := fmt.Sprintf("host=%s address=%s", host, address) - t := task.NewTask("Check MDS Address", subname, hc.GetSSHConfig()) + t := task.NewTask("Check MDS Address", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) return t, nil } func NewClientS3ConfigureTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*task.Task, error) { - t := task.NewTask("Check S3 Configure ", "", nil) + t := task.NewTask("Check S3 Configure ", "", nil, nil) t.AddStep(&step2CheckClientS3Configure{ config: cc, diff --git a/internal/task/task/checker/ssh.go b/internal/task/task/checker/ssh.go index f5e3a4528..1657ac567 100644 --- a/internal/task/task/checker/ssh.go +++ b/internal/task/task/checker/ssh.go @@ -73,7 +73,7 @@ func NewCheckSSHConnectTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( // new task method := utils.Choose(hc.GetForwardAgent(), "forwardAgent", "privateKey") subname := fmt.Sprintf("host=%s method=%s", dc.GetHost(), method) - t := task.NewTask("Check SSH Connect ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check SSH Connect ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.Lambda{ diff --git a/internal/task/task/checker/topology.go b/internal/task/task/checker/topology.go index 4e4a23875..c58255809 100644 --- a/internal/task/task/checker/topology.go +++ b/internal/task/task/checker/topology.go @@ -262,7 +262,7 @@ func NewCheckTopologyTask(curveadm *cli.CurveAdm, null interface{}) (*task.Task, // new task dcs := curveadm.MemStorage().Get(comm.KEY_ALL_DEPLOY_CONFIGS).([]*topology.DeployConfig) subname := fmt.Sprintf("cluster=%s kind=%s", curveadm.ClusterName(), dcs[0].GetKind()) - t := task.NewTask("Check Topology ", subname, nil) + t := task.NewTask("Check Topology ", subname, nil, nil) // add step to task for _, dc := range dcs { diff --git a/internal/task/task/common/backup_etcd.go b/internal/task/task/common/backup_etcd.go index e9aaeb3b7..ba218df44 100644 --- a/internal/task/task/common/backup_etcd.go +++ b/internal/task/task/common/backup_etcd.go @@ -57,7 +57,7 @@ func NewBackupEtcdDataTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Backup Etcd Data", subname, hc.GetSSHConfig()) + t := task.NewTask("Backup Etcd Data", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) t.AddStep(&step.ContainerExec{ ContainerId: &containerId, diff --git a/internal/task/task/common/clean_service.go b/internal/task/task/common/clean_service.go index d20513929..365c8baea 100644 --- a/internal/task/task/common/clean_service.go +++ b/internal/task/task/common/clean_service.go @@ -159,7 +159,7 @@ func NewCleanServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*ta recycle := curveadm.MemStorage().Get(comm.KEY_CLEAN_BY_RECYCLE).(bool) subname := fmt.Sprintf("host=%s role=%s containerId=%s clean=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId), strings.Join(only, ",")) - t := task.NewTask("Clean Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Clean Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task clean := utils.Slice2Map(only) diff --git a/internal/task/task/common/client_status.go b/internal/task/task/common/client_status.go index 4d4d9a581..16353a495 100644 --- a/internal/task/task/common/client_status.go +++ b/internal/task/task/common/client_status.go @@ -96,7 +96,7 @@ func NewGetClientStatusTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, containerId := client.ContainerId subname := fmt.Sprintf("host=%s kind=%s containerId=%s", hc.GetHost(), client.Kind, tui.TrimContainerId(containerId)) - t := task.NewTask("Get Client Status", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Client Status", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var status string diff --git a/internal/task/task/common/collect_client.go b/internal/task/task/common/collect_client.go index f9085cf20..1d8363d77 100644 --- a/internal/task/task/common/collect_client.go +++ b/internal/task/task/common/collect_client.go @@ -47,7 +47,7 @@ func NewCollectClientTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, er containerId := client.ContainerId subname := fmt.Sprintf("host=%s kind=%s containerId=%s", hc.GetHost(), client.Kind, tui.TrimContainerId(containerId)) - t := task.NewTask("Collect Client", subname, hc.GetSSHConfig()) + t := task.NewTask("Collect Client", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/collect_curveadm.go b/internal/task/task/common/collect_curveadm.go index 1cd3cb337..b9de2d611 100644 --- a/internal/task/task/common/collect_curveadm.go +++ b/internal/task/task/common/collect_curveadm.go @@ -40,7 +40,7 @@ func NewCollectCurveAdmTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( kind := dc.GetKind() subname := fmt.Sprintf("cluster=%s kind=%s", curveadm.ClusterName(), kind) - t := task.NewTask("Collect CurveAdm", subname, nil) + t := task.NewTask("Collect CurveAdm", subname, nil, nil) // add step to task secret := curveadm.MemStorage().Get(comm.KEY_SECRET).(string) diff --git a/internal/task/task/common/collect_service.go b/internal/task/task/common/collect_service.go index 19698aa13..fb264c76b 100644 --- a/internal/task/task/common/collect_service.go +++ b/internal/task/task/common/collect_service.go @@ -92,7 +92,7 @@ func NewCollectServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Collect Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Collect Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/create_container.go b/internal/task/task/common/create_container.go index 4cbc5879c..0776d91c2 100644 --- a/internal/task/task/common/create_container.go +++ b/internal/task/task/common/create_container.go @@ -220,7 +220,7 @@ func NewCreateContainerTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( // new task subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Create Container", subname, hc.GetSSHConfig()) + t := task.NewTask("Create Container", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var oldContainerId, containerId string diff --git a/internal/task/task/common/create_pool.go b/internal/task/task/common/create_pool.go index dd6ffaa29..e14552932 100644 --- a/internal/task/task/common/create_pool.go +++ b/internal/task/task/common/create_pool.go @@ -195,7 +195,7 @@ func NewCreateTopologyTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* "Create Physical Pool") subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask(name, subname, hc.GetSSHConfig()) + t := task.NewTask(name, subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var success bool diff --git a/internal/task/task/common/etcd_auth_enable.go b/internal/task/task/common/etcd_auth_enable.go index 9d3b71109..79280feac 100644 --- a/internal/task/task/common/etcd_auth_enable.go +++ b/internal/task/task/common/etcd_auth_enable.go @@ -63,7 +63,7 @@ func NewEnableEtcdAuthTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Enable Etcd Auth", subname, hc.GetSSHConfig()) + t := task.NewTask("Enable Etcd Auth", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) script := scripts.ENABLE_ETCD_AUTH layout := dc.GetProjectLayout() diff --git a/internal/task/task/common/init_support.go b/internal/task/task/common/init_support.go index c322cc4a2..baa4d5f44 100644 --- a/internal/task/task/common/init_support.go +++ b/internal/task/task/common/init_support.go @@ -37,7 +37,7 @@ func NewInitSupportTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*tas kind := dc.GetKind() subname := fmt.Sprintf("cluster=%s kind=%s", curveadm.ClusterName(), kind) - t := task.NewTask("Init Support", subname, nil) + t := task.NewTask("Init Support", subname, nil, nil) /* * 0d7a7103521da69c6331a96355142c3b diff --git a/internal/task/task/common/install_client.go b/internal/task/task/common/install_client.go index dcb691b12..baadda58d 100644 --- a/internal/task/task/common/install_client.go +++ b/internal/task/task/common/install_client.go @@ -142,7 +142,7 @@ func NewInstallClientTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (* release := getRelease(curveadm) subname := fmt.Sprintf("host=%s release=%s", host, release) name := utils.Choose(kind == KIND_CURVEBS, "CurveBS", "CurveFS") - t := task.NewTask(fmt.Sprintf("Install %s Client", name), subname, hc.GetSSHConfig()) + t := task.NewTask(fmt.Sprintf("Install %s Client", name), subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var input, output string diff --git a/internal/task/task/common/pull_image.go b/internal/task/task/common/pull_image.go index ee40f930d..855b815c9 100644 --- a/internal/task/task/common/pull_image.go +++ b/internal/task/task/common/pull_image.go @@ -41,7 +41,7 @@ func NewPullImageTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task. // new task subname := fmt.Sprintf("host=%s image=%s", dc.GetHost(), dc.GetContainerImage()) - t := task.NewTask("Pull Image", subname, hc.GetSSHConfig()) + t := task.NewTask("Pull Image", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.PullImage{ diff --git a/internal/task/task/common/restart_service.go b/internal/task/task/common/restart_service.go index 9f81741c3..64fe0fabf 100644 --- a/internal/task/task/common/restart_service.go +++ b/internal/task/task/common/restart_service.go @@ -59,7 +59,7 @@ func NewRestartServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Restart Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Restart Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/service_status.go b/internal/task/task/common/service_status.go index 660fd68c7..3be7dc94f 100644 --- a/internal/task/task/common/service_status.go +++ b/internal/task/task/common/service_status.go @@ -229,7 +229,7 @@ func NewInitServiceStatusTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Init Service Status", subname, nil) + t := task.NewTask("Init Service Status", subname, nil, nil) t.AddStep(&step2InitStatus{ dc: dc, @@ -265,7 +265,7 @@ func NewGetServiceStatusTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Get Service Status", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Service Status", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var status string diff --git a/internal/task/task/common/start_service.go b/internal/task/task/common/start_service.go index b3f7d921f..9f66678cc 100644 --- a/internal/task/task/common/start_service.go +++ b/internal/task/task/common/start_service.go @@ -90,7 +90,7 @@ func NewStartServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*ta // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Start Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/stop_service.go b/internal/task/task/common/stop_service.go index 9600d9f99..8aee65529 100644 --- a/internal/task/task/common/stop_service.go +++ b/internal/task/task/common/stop_service.go @@ -73,7 +73,7 @@ func NewStopServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*tas // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Stop Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Stop Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/sync_config.go b/internal/task/task/common/sync_config.go index ba1243b39..eb57c7241 100644 --- a/internal/task/task/common/sync_config.go +++ b/internal/task/task/common/sync_config.go @@ -131,7 +131,7 @@ func NewSyncConfigTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Sync Config", subname, hc.GetSSHConfig()) + t := task.NewTask("Sync Config", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/uninstall_client.go b/internal/task/task/common/uninstall_client.go index ce939832d..f02f7ed0d 100644 --- a/internal/task/task/common/uninstall_client.go +++ b/internal/task/task/common/uninstall_client.go @@ -81,7 +81,7 @@ func NewUninstallClientTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, kind := curveadm.MemStorage().Get(comm.KEY_CLIENT_KIND).(string) subname := fmt.Sprintf("host=%s release=%s kind=%s", host, release, kind) name := utils.Choose(kind == KIND_CURVEBS, "CurveBS", "CurveFS") - t := task.NewTask(fmt.Sprintf("Uninstall %s Client", name), subname, hc.GetSSHConfig()) + t := task.NewTask(fmt.Sprintf("Uninstall %s Client", name), subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddPostStep(&step2UninstallPackage{ diff --git a/internal/task/task/common/update_topology.go b/internal/task/task/common/update_topology.go index de6d8529e..f693a047f 100644 --- a/internal/task/task/common/update_topology.go +++ b/internal/task/task/common/update_topology.go @@ -45,7 +45,7 @@ func updateTopology(curveadm *cli.CurveAdm) step.LambdaType { } func NewUpdateTopologyTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, error) { - t := task.NewTask("Update Topology", "", nil) + t := task.NewTask("Update Topology", "", nil, nil) // add step to task t.AddStep(&step.Lambda{ diff --git a/internal/task/task/fs/mount.go b/internal/task/task/fs/mount.go index 26091cb9a..28d3e04e1 100644 --- a/internal/task/task/fs/mount.go +++ b/internal/task/task/fs/mount.go @@ -319,7 +319,7 @@ func NewMountFSTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*task.T mountFSName := options.MountFSName mountFSType := options.MountFSType subname := fmt.Sprintf("mountFSName=%s mountFSType=%s mountPoint=%s", mountFSName, mountFSType, mountPoint) - t := task.NewTask("Mount FileSystem", subname, hc.GetSSHConfig()) + t := task.NewTask("Mount FileSystem", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var containerId, out string diff --git a/internal/task/task/fs/umount.go b/internal/task/task/fs/umount.go index ea4e55f9a..23de64f5a 100644 --- a/internal/task/task/fs/umount.go +++ b/internal/task/task/fs/umount.go @@ -129,7 +129,7 @@ func NewUmountFSTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, error) // new task mountPoint := options.MountPoint subname := fmt.Sprintf("host=%s mountPoint=%s", options.Host, mountPoint) - t := task.NewTask("Umount FileSystem", subname, hc.GetSSHConfig()) + t := task.NewTask("Umount FileSystem", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var status string diff --git a/internal/task/task/monitor/clean_container.go b/internal/task/task/monitor/clean_container.go index 1756be472..feee92a10 100644 --- a/internal/task/task/monitor/clean_container.go +++ b/internal/task/task/monitor/clean_container.go @@ -44,7 +44,7 @@ func NewCleanConfigContainerTask(curveadm *cli.CurveAdm, cfg *configure.MonitorC if err != nil { return nil, err } - t := task.NewTask("Clean Config Container", "", hc.GetSSHConfig()) + t := task.NewTask("Clean Config Container", "", hc.GetSSHConfig(), hc.GetHttpConfig()) t.AddStep(&common.Step2CleanContainer{ ServiceId: serviceId, ContainerId: containerId, diff --git a/internal/task/task/monitor/clean_service.go b/internal/task/task/monitor/clean_service.go index 2e7d02fa8..e5c6d4d24 100644 --- a/internal/task/task/monitor/clean_service.go +++ b/internal/task/task/monitor/clean_service.go @@ -73,7 +73,7 @@ func NewCleanMonitorTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) ( only := curveadm.MemStorage().Get(comm.KEY_CLEAN_ITEMS).([]string) subname := fmt.Sprintf("host=%s role=%s containerId=%s clean=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId), strings.Join(only, ",")) - t := task.NewTask("Clean Monitor", subname, hc.GetSSHConfig()) + t := task.NewTask("Clean Monitor", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task clean := utils.Slice2Map(only) diff --git a/internal/task/task/monitor/create_container.go b/internal/task/task/monitor/create_container.go index ac8ae9821..c95a7d766 100644 --- a/internal/task/task/monitor/create_container.go +++ b/internal/task/task/monitor/create_container.go @@ -120,7 +120,7 @@ func NewCreateContainerTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig // new task subname := fmt.Sprintf("host=%s role=%s", host, cfg.GetRole()) - t := task.NewTask("Create Container", subname, hc.GetSSHConfig()) + t := task.NewTask("Create Container", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var oldContainerId, containerId string diff --git a/internal/task/task/monitor/pull_image.go b/internal/task/task/monitor/pull_image.go index 80883e43d..ebe87ae86 100644 --- a/internal/task/task/monitor/pull_image.go +++ b/internal/task/task/monitor/pull_image.go @@ -41,7 +41,7 @@ func NewPullImageTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) (*ta // new task subname := fmt.Sprintf("host=%s image=%s", host, image) - t := task.NewTask("Pull Image", subname, hc.GetSSHConfig()) + t := task.NewTask("Pull Image", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.PullImage{ Image: image, diff --git a/internal/task/task/monitor/restart_service.go b/internal/task/task/monitor/restart_service.go index 25be8d924..726056ba2 100644 --- a/internal/task/task/monitor/restart_service.go +++ b/internal/task/task/monitor/restart_service.go @@ -49,7 +49,7 @@ func NewRestartServiceTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Restart Monitor Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Restart Monitor Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/monitor/start_service.go b/internal/task/task/monitor/start_service.go index 7391e1aaf..71c55faa8 100644 --- a/internal/task/task/monitor/start_service.go +++ b/internal/task/task/monitor/start_service.go @@ -59,7 +59,7 @@ func NewStartServiceTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) ( // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Start Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/monitor/status_service.go b/internal/task/task/monitor/status_service.go index 9c9c36323..6908a8710 100644 --- a/internal/task/task/monitor/status_service.go +++ b/internal/task/task/monitor/status_service.go @@ -126,7 +126,7 @@ func NewInitMonitorStatusTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConf subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Init Monitor Status", subname, nil) + t := task.NewTask("Init Monitor Status", subname, nil, nil) t.AddStep(&step2InitMonitorStatus{ mc: cfg, @@ -154,7 +154,7 @@ func NewGetMonitorStatusTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfi // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Get Monitor Status", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Monitor Status", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var status string diff --git a/internal/task/task/monitor/stop_service.go b/internal/task/task/monitor/stop_service.go index 126a5801d..221cb3e52 100644 --- a/internal/task/task/monitor/stop_service.go +++ b/internal/task/task/monitor/stop_service.go @@ -49,7 +49,7 @@ func NewStopServiceTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) (* // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Stop Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Stop Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/monitor/sync_config.go b/internal/task/task/monitor/sync_config.go index a82d85892..41ad22b44 100644 --- a/internal/task/task/monitor/sync_config.go +++ b/internal/task/task/monitor/sync_config.go @@ -72,7 +72,7 @@ func NewSyncConfigTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) (*t // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Sync Config", subname, hc.GetSSHConfig()) + t := task.NewTask("Sync Config", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string t.AddStep(&step.ListContainers{ // gurantee container exist diff --git a/internal/task/task/playground/create.go b/internal/task/task/playground/create.go index 68b0823b6..05e534d57 100644 --- a/internal/task/task/playground/create.go +++ b/internal/task/task/playground/create.go @@ -100,7 +100,7 @@ func NewCreatePlaygroundTask(curveadm *cli.CurveAdm, cfg *configure.PlaygroundCo // new task subname := fmt.Sprintf("kind=%s name=%s image=%s", kind, name, containerImage) - t := task.NewTask("Create Playground", subname, nil) + t := task.NewTask("Create Playground", subname, nil, nil) var containerId string // add step to task diff --git a/internal/task/task/playground/init.go b/internal/task/task/playground/init.go index 7443432df..a7441354d 100644 --- a/internal/task/task/playground/init.go +++ b/internal/task/task/playground/init.go @@ -106,7 +106,7 @@ func NewInitPlaygroundTask(curveadm *cli.CurveAdm, cfg *configure.PlaygroundConf kind := cfg.GetKind() name := cfg.GetName() subname := fmt.Sprintf("kind=%s name=%s", kind, name) - t := task.NewTask("Init Playground", subname, nil) + t := task.NewTask("Init Playground", subname, nil, nil) poolset := curveadm.MemStorage().Get(comm.POOLSET).(string) poolsetDisktype := curveadm.MemStorage().Get(comm.POOLSET_DISK_TYPE).(string) diff --git a/internal/task/task/playground/list.go b/internal/task/task/playground/list.go index 99dd534c1..1ce567985 100644 --- a/internal/task/task/playground/list.go +++ b/internal/task/task/playground/list.go @@ -83,7 +83,7 @@ func NewGetPlaygroundStatusTask(curveadm *cli.CurveAdm, v interface{}) (*task.Ta // new task playground := v.(storage.Playground) subname := fmt.Sprintf("id=%d name=%s", playground.Id, playground.Name) - t := task.NewTask("Get Playground Status", subname, nil) + t := task.NewTask("Get Playground Status", subname, nil, nil) // add step to task var status string diff --git a/internal/task/task/playground/remove.go b/internal/task/task/playground/remove.go index df6bdf647..4d92a8424 100644 --- a/internal/task/task/playground/remove.go +++ b/internal/task/task/playground/remove.go @@ -95,7 +95,7 @@ func NewRemovePlaygroundTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, // new task playground := v.(storage.Playground) subname := fmt.Sprintf("name=%s", playground.Name) - t := task.NewTask("Remove Playground", subname, nil) + t := task.NewTask("Remove Playground", subname, nil, nil) // add step to task var containerId string diff --git a/internal/task/task/playground/start.go b/internal/task/task/playground/start.go index 535fc4915..bbbc2ba83 100644 --- a/internal/task/task/playground/start.go +++ b/internal/task/task/playground/start.go @@ -43,7 +43,7 @@ func wait(seconds int) step.LambdaType { func NewStartPlaygroundTask(curveadm *cli.CurveAdm, cfg *configure.PlaygroundConfig) (*task.Task, error) { // new task subname := fmt.Sprintf("kind=%s name=%s", cfg.GetKind(), cfg.GetName()) - t := task.NewTask("Start Playground", subname, nil) + t := task.NewTask("Start Playground", subname, nil, nil) // add step to task containerId := cfg.GetName() diff --git a/internal/task/task/task.go b/internal/task/task/task.go index 3eb0d755b..bcbcd7125 100644 --- a/internal/task/task/task.go +++ b/internal/task/task/task.go @@ -44,24 +44,26 @@ type ( } Task struct { - tid string // task id - ptid string // parent task id - name string - subname string - steps []Step - postSteps []Step - sshConfig *module.SSHConfig + tid string // task id + ptid string // parent task id + name string + subname string + steps []Step + postSteps []Step + sshConfig *module.SSHConfig + httpConfig *module.HttpConfig } ) -func NewTask(name, subname string, sshConfig *module.SSHConfig) *Task { +func NewTask(name, subname string, sshConfig *module.SSHConfig, httpConfig *module.HttpConfig) *Task { tid := uuid.NewString()[:12] return &Task{ - tid: tid, - ptid: tid, - name: name, - subname: subname, - sshConfig: sshConfig, + tid: tid, + ptid: tid, + name: name, + subname: subname, + sshConfig: sshConfig, + httpConfig: httpConfig, } } @@ -111,16 +113,22 @@ func (t *Task) executePost(ctx *context.Context) { } func (t *Task) Execute() error { - var sshClient *module.SSHClient + var remoteClient module.RemoteClient if t.sshConfig != nil { client, err := module.NewSSHClient(*t.sshConfig) if err != nil { return errno.ERR_SSH_CONNECT_FAILED.E(err) } - sshClient = client + remoteClient = client + } else if t.httpConfig != nil { + client, err := module.NewHttpClient(*t.httpConfig) + if err != nil { + return errno.ERR_HTTP_CONNECT_FAILED.E(err) + } + remoteClient = client } - ctx, err := context.NewContext(sshClient) + ctx, err := context.NewContext(remoteClient) if err != nil { return err } diff --git a/internal/task/task/website/clean_service.go b/internal/task/task/website/clean_service.go index 682225488..c69c5e9a8 100644 --- a/internal/task/task/website/clean_service.go +++ b/internal/task/task/website/clean_service.go @@ -62,7 +62,7 @@ func NewCleanWebsiteTask(curveadm *cli.CurveAdm, cfg *configure.WebsiteConfig) ( only := curveadm.MemStorage().Get(comm.KEY_CLEAN_ITEMS).([]string) subname := fmt.Sprintf("host=%s role=%s containerId=%s clean=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId), strings.Join(only, ",")) - t := task.NewTask("Clean Website", subname, hc.GetSSHConfig()) + t := task.NewTask("Clean Website", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task clean := utils.Slice2Map(only) diff --git a/internal/task/task/website/create_container.go b/internal/task/task/website/create_container.go index 4b79ec846..f1c50d111 100644 --- a/internal/task/task/website/create_container.go +++ b/internal/task/task/website/create_container.go @@ -54,7 +54,7 @@ func NewCreateContainerTask(curveadm *cli.CurveAdm, cfg *configure.WebsiteConfig // new task subname := fmt.Sprintf("host=%s role=%s", host, cfg.GetRole()) - t := task.NewTask("Create Container", subname, hc.GetSSHConfig()) + t := task.NewTask("Create Container", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var oldContainerId, containerId string diff --git a/internal/task/task/website/pull_image.go b/internal/task/task/website/pull_image.go index d9a4ac35d..334082854 100644 --- a/internal/task/task/website/pull_image.go +++ b/internal/task/task/website/pull_image.go @@ -41,7 +41,7 @@ func NewPullImageTask(curveadm *cli.CurveAdm, cfg *configure.WebsiteConfig) (*ta // new task subname := fmt.Sprintf("host=%s image=%s", host, image) - t := task.NewTask("Pull Image", subname, hc.GetSSHConfig()) + t := task.NewTask("Pull Image", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.PullImage{ Image: image, diff --git a/internal/task/task/website/restart_service.go b/internal/task/task/website/restart_service.go index c714bd5e2..3fe8c5914 100644 --- a/internal/task/task/website/restart_service.go +++ b/internal/task/task/website/restart_service.go @@ -45,7 +45,7 @@ func NewRestartServiceTask(curveadm *cli.CurveAdm, cfg *configure.WebsiteConfig) // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Restart Website Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Restart Website Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/website/start_service.go b/internal/task/task/website/start_service.go index 2c25ba316..b780e21b6 100644 --- a/internal/task/task/website/start_service.go +++ b/internal/task/task/website/start_service.go @@ -45,7 +45,7 @@ func NewStartServiceTask(curveadm *cli.CurveAdm, cfg *configure.WebsiteConfig) ( // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Start Website", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Website", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/website/status_service.go b/internal/task/task/website/status_service.go index ff7d38326..7e57f669a 100644 --- a/internal/task/task/website/status_service.go +++ b/internal/task/task/website/status_service.go @@ -127,7 +127,7 @@ func NewInitWebsiteStatusTask(curveadm *cli.CurveAdm, cfg *configure.WebsiteConf subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Init Website Status", subname, nil) + t := task.NewTask("Init Website Status", subname, nil, nil) t.AddStep(&step2InitWebsiteStatus{ wc: cfg, @@ -153,7 +153,7 @@ func NewGetWebsiteStatusTask(curveadm *cli.CurveAdm, cfg *configure.WebsiteConfi // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Get Website Status", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Website Status", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var status string diff --git a/internal/task/task/website/stop_service.go b/internal/task/task/website/stop_service.go index a6300b2dc..bce881d58 100644 --- a/internal/task/task/website/stop_service.go +++ b/internal/task/task/website/stop_service.go @@ -45,7 +45,7 @@ func NewStopServiceTask(curveadm *cli.CurveAdm, cfg *configure.WebsiteConfig) (* // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Stop Website", subname, hc.GetSSHConfig()) + t := task.NewTask("Stop Website", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/website/sync_config.go b/internal/task/task/website/sync_config.go index ea837f40b..02cdee549 100644 --- a/internal/task/task/website/sync_config.go +++ b/internal/task/task/website/sync_config.go @@ -70,7 +70,7 @@ func NewSyncConfigTask(curveadm *cli.CurveAdm, cfg *configure.WebsiteConfig) (*t // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Sync Config", subname, hc.GetSSHConfig()) + t := task.NewTask("Sync Config", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string t.AddStep(&step.ListContainers{ // gurantee container exist diff --git a/internal/tui/hosts.go b/internal/tui/hosts.go index 9a19ba983..67366c89c 100644 --- a/internal/tui/hosts.go +++ b/internal/tui/hosts.go @@ -45,6 +45,8 @@ func FormatHosts(hcs []*configure.HostConfig, verbose bool) string { "Hostname", "User", "Port", + "Protocol", + "HTTP Port", "Private Key File", "Forward Agent", "Become User", @@ -60,8 +62,10 @@ func FormatHosts(hcs []*configure.HostConfig, verbose bool) string { host := hc.GetHost() hostname := hc.GetHostname() + protocol := hc.GetProtocol() user := hc.GetUser() port := strconv.Itoa(hc.GetSSHPort()) + httpPort := strconv.Itoa(hc.GetHTTPPort()) forwardAgent := utils.Choose(hc.GetForwardAgent(), "Y", "N") becomeUser := utils.Choose(len(hc.GetBecomeUser()) > 0, hc.GetBecomeUser(), "-") labels := utils.Choose(len(hc.GetLabels()) > 0, strings.Join(hc.GetLabels(), ","), "-") @@ -78,6 +82,8 @@ func FormatHosts(hcs []*configure.HostConfig, verbose bool) string { hostname, user, port, + protocol, + httpPort, privateKeyFile, forwardAgent, becomeUser, diff --git a/pkg/module/docker_cli.go b/pkg/module/docker_cli.go index 1668a2311..a910444d8 100644 --- a/pkg/module/docker_cli.go +++ b/pkg/module/docker_cli.go @@ -49,18 +49,18 @@ const ( ) type DockerCli struct { - sshClient *SSHClient - options []string - tmpl *template.Template - data map[string]interface{} + options []string + tmpl *template.Template + data map[string]interface{} + remoteClient RemoteClient } -func NewDockerCli(sshClient *SSHClient) *DockerCli { +func NewDockerCli(remoteClient RemoteClient) *DockerCli { return &DockerCli{ - sshClient: sshClient, - options: []string{}, - tmpl: nil, - data: map[string]interface{}{}, + remoteClient: remoteClient, + options: []string{}, + tmpl: nil, + data: map[string]interface{}{}, } } @@ -72,7 +72,7 @@ func (s *DockerCli) AddOption(format string, args ...interface{}) *DockerCli { func (cli *DockerCli) Execute(options ExecOptions) (string, error) { cli.data["options"] = strings.Join(cli.options, " ") cli.data["engine"] = options.ExecWithEngine - return execCommand(cli.sshClient, cli.tmpl, cli.data, options) + return execCommand(cli.remoteClient, cli.tmpl, cli.data, options) } func (cli *DockerCli) DockerInfo() *DockerCli { diff --git a/pkg/module/file.go b/pkg/module/file.go index b7d686420..4a8f49062 100644 --- a/pkg/module/file.go +++ b/pkg/module/file.go @@ -41,35 +41,36 @@ var ( ) type FileManager struct { - sshClient *SSHClient + remoteClient RemoteClient } -func NewFileManager(sshClient *SSHClient) *FileManager { - return &FileManager{sshClient: sshClient} +func NewFileManager(remoteClient RemoteClient) *FileManager { + return &FileManager{remoteClient: remoteClient} } func (f *FileManager) Upload(localPath, remotePath string) error { - if f.sshClient == nil { + if f.remoteClient == nil { return ERR_UNREACHED } - err := f.sshClient.Client().Upload(localPath, remotePath) + err := f.remoteClient.Upload(localPath, remotePath) log.SwitchLevel(err)("UploadFile", - log.Field("remoteAddress", remoteAddr(f.sshClient)), + log.Field("remoteAddress", remoteAddr(f.remoteClient)), log.Field("localPath", localPath), log.Field("remotePath", remotePath), - log.Field("error", err)) + log.Field("error", err), + log.Field("protocol", f.remoteClient.Protocol())) return err } func (f *FileManager) Download(remotePath, localPath string) error { - if f.sshClient == nil { + if f.remoteClient == nil { return ERR_UNREACHED } - err := f.sshClient.Client().Download(remotePath, localPath) + err := f.remoteClient.Download(remotePath, localPath) log.SwitchLevel(err)("DownloadFile", - log.Field("remoteAddress", remoteAddr(f.sshClient)), + log.Field("remoteAddress", remoteAddr(f.remoteClient)), log.Field("remotePath", remotePath), log.Field("localPath", localPath), log.Field("error", err)) diff --git a/pkg/module/http.go b/pkg/module/http.go new file mode 100644 index 000000000..51213b7b8 --- /dev/null +++ b/pkg/module/http.go @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2021 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package module + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + log "github.com/opencurve/curveadm/pkg/log/glg" + "io" + "mime/multipart" + "net/http" + "net/url" + "os" +) + +const ( + HTTP_PROTOCOL = "http" +) + +type ( + HttpConfig struct { + Host string + Port uint + } + + HttpClient struct { + config HttpConfig + client *http.Client + } + + HttpResult struct { + Data string `json:"data"` + ErrorCode string `json:"errorCode"` + ErrorMsg string `json:"errorMsg"` + } +) + +func (client *HttpClient) Protocol() string { + return HTTP_PROTOCOL +} + +func (client *HttpClient) WrapperCommand(command string, execInLocal bool) (wrapperCmd string) { + return command +} + +func (client *HttpClient) RunCommand(ctx context.Context, command string) (out []byte, err error) { + data := make(map[string]interface{}) + data["command"] = command + bytesData, _ := json.Marshal(data) + + baseURL, _ := url.Parse(fmt.Sprintf("http://%s:%d", client.config.Host, client.config.Port)) + params := url.Values{} + params.Add("method", "cluster.deploy.cmd") + baseURL.RawQuery = params.Encode() + resp, err := client.client.Post(baseURL.String(), "application/json", bytes.NewReader(bytesData)) + if err != nil { + return + } + respData, err := io.ReadAll(resp.Body) + if err != nil { + return + } + result := &HttpResult{} + err = json.Unmarshal(respData, result) + if err != nil { + return + } + + log.Info("http resp", log.Field("result", result)) + + if result.ErrorCode != "0" { + return []byte(result.Data), fmt.Errorf(result.ErrorMsg) + } + + return []byte(result.Data), nil +} + +func (client *HttpClient) RemoteAddr() (addr string) { + config := client.Config() + return fmt.Sprintf("%s:%d", config.Host, config.Port) +} + +func (client *HttpClient) Upload(localPath string, remotePath string) (err error) { + bodyBuf := &bytes.Buffer{} + bodyWriter := multipart.NewWriter(bodyBuf) + fh, err := os.Open(localPath) + if err != nil { + return err + } + defer fh.Close() + fileWriter, err := bodyWriter.CreateFormFile("file", localPath) + if err != nil { + return err + } + _, err = io.Copy(fileWriter, fh) + if err != nil { + return err + } + + baseURL, _ := url.Parse(fmt.Sprintf("http://%s:%d", client.config.Host, client.config.Port)) + params := url.Values{} + params.Add("method", "cluster.deploy.upload") + baseURL.RawQuery = params.Encode() + boundary := "--boundary" + bodyWriter.SetBoundary(boundary) + bodyWriter.WriteField("filepath", remotePath) + bodyWriter.Close() + resp, err := client.client.Post(baseURL.String(), bodyWriter.FormDataContentType(), bodyBuf) + if err != nil { + return err + } + defer resp.Body.Close() + return err +} + +func (client *HttpClient) Download(remotePath string, localPath string) (err error) { + baseURL, _ := url.Parse(fmt.Sprintf("http://%s:%d", client.config.Host, client.config.Port)) + params := url.Values{} + params.Add("method", "cluster.deploy.download") + params.Add("filepath", remotePath) + baseURL.RawQuery = params.Encode() + resp, err := client.client.Get(baseURL.String()) + if err != nil { + return + } + defer resp.Body.Close() + localFile, err := os.Create(localPath) + if err != nil { + return + } + defer localFile.Close() + _, err = io.Copy(localFile, resp.Body) + return +} + +func (client *HttpClient) Close() { + +} + +func (client *HttpClient) Config() HttpConfig { + return client.config +} + +func NewHttpClient(config HttpConfig) (*HttpClient, error) { + return &HttpClient{ + config: config, + client: &http.Client{}, + }, nil +} diff --git a/pkg/module/module.go b/pkg/module/module.go index 72618e822..5ff2bbf8b 100644 --- a/pkg/module/module.go +++ b/pkg/module/module.go @@ -33,13 +33,12 @@ import ( "text/template" "time" - "github.com/melbahja/goph" log "github.com/opencurve/curveadm/pkg/log/glg" ) type ( Module struct { - sshClient *SSHClient + remoteClient RemoteClient } ExecOptions struct { @@ -60,33 +59,31 @@ func (e *TimeoutError) Error() string { e.timeout) } -func NewModule(sshClient *SSHClient) *Module { - return &Module{sshClient: sshClient} +func NewModule(remoteClient RemoteClient) *Module { + return &Module{remoteClient: remoteClient} } func (m *Module) Shell() *Shell { - return NewShell(m.sshClient) + return NewShell(m.remoteClient) } func (m *Module) File() *FileManager { - return NewFileManager(m.sshClient) + return NewFileManager(m.remoteClient) } func (m *Module) DockerCli() *DockerCli { - return NewDockerCli(m.sshClient) + return NewDockerCli(m.remoteClient) } // common utils -func remoteAddr(client *SSHClient) string { +func remoteAddr(client RemoteClient) string { if client == nil { return "-" } - - config := client.Config() - return fmt.Sprintf("%s@%s:%d", config.User, config.Host, config.Port) + return client.RemoteAddr() } -func execCommand(sshClient *SSHClient, +func execCommand(remoteClient RemoteClient, tmpl *template.Template, data map[string]interface{}, options ExecOptions) (string, error) { @@ -108,14 +105,8 @@ func execCommand(sshClient *SSHClient, command = strings.TrimLeft(command, " ") // (3) handle 'become_user' - if sshClient != nil { - becomeMethod := sshClient.Config().BecomeMethod - becomeFlags := sshClient.Config().BecomeFlags - becomeUser := sshClient.Config().BecomeUser - if len(becomeUser) > 0 && !options.ExecInLocal { - become := strings.Join([]string{becomeMethod, becomeFlags, becomeUser}, " ") - command = strings.Join([]string{become, command}, " ") - } + if remoteClient != nil { + command = remoteClient.WrapperCommand(command, options.ExecInLocal) } // (4) create context for timeout @@ -134,12 +125,7 @@ func execCommand(sshClient *SSHClient, cmd.Env = []string{"LANG=en_US.UTF-8"} out, err = cmd.CombinedOutput() } else { - var cmd *goph.Cmd - cmd, err = sshClient.Client().CommandContext(ctx, command) - if err == nil { - cmd.Env = []string{"LANG=en_US.UTF-8"} - out, err = cmd.CombinedOutput() - } + out, err = remoteClient.RunCommand(ctx, command) } if ctx.Err() == context.DeadlineExceeded { @@ -147,7 +133,7 @@ func execCommand(sshClient *SSHClient, } log.SwitchLevel(err)("Execute command", - log.Field("remoteAddr", remoteAddr(sshClient)), + log.Field("remoteAddr", remoteAddr(remoteClient)), log.Field("command", command), log.Field("output", strings.TrimSuffix(string(out), "\n")), log.Field("error", err)) diff --git a/pkg/module/remote_client.go b/pkg/module/remote_client.go new file mode 100644 index 000000000..93dcdcac3 --- /dev/null +++ b/pkg/module/remote_client.go @@ -0,0 +1,15 @@ +package module + +import ( + "context" +) + +type RemoteClient interface { + Protocol() string + WrapperCommand(command string, execInLocal bool) (wrapperCmd string) + RunCommand(ctx context.Context, command string) (out []byte, err error) + RemoteAddr() (addr string) + Upload(localPath string, remotePath string) (err error) + Download(remotePath string, localPath string) (err error) + Close() +} diff --git a/pkg/module/shell.go b/pkg/module/shell.go index b9f246a6f..f397b4e89 100644 --- a/pkg/module/shell.go +++ b/pkg/module/shell.go @@ -77,18 +77,18 @@ const ( // TODO(P1): support command pipe type Shell struct { - sshClient *SSHClient - options []string - tmpl *template.Template - data map[string]interface{} + remoteClient RemoteClient + options []string + tmpl *template.Template + data map[string]interface{} } -func NewShell(sshClient *SSHClient) *Shell { +func NewShell(remoteClient RemoteClient) *Shell { return &Shell{ - sshClient: sshClient, - options: []string{}, - tmpl: nil, - data: map[string]interface{}{}, + remoteClient: remoteClient, + options: []string{}, + tmpl: nil, + data: map[string]interface{}{}, } } @@ -109,7 +109,7 @@ func (s *Shell) String() (string, error) { func (s *Shell) Execute(options ExecOptions) (string, error) { s.data["options"] = strings.Join(s.options, " ") - return execCommand(s.sshClient, s.tmpl, s.data, options) + return execCommand(s.remoteClient, s.tmpl, s.data, options) } // text diff --git a/pkg/module/ssh.go b/pkg/module/ssh.go index 8f2d47f79..8616736e1 100644 --- a/pkg/module/ssh.go +++ b/pkg/module/ssh.go @@ -25,8 +25,11 @@ package module import ( + "context" "errors" + "fmt" "net" + "strings" "time" "github.com/melbahja/goph" @@ -34,6 +37,10 @@ import ( "golang.org/x/crypto/ssh" ) +const ( + SSH_PROTOCOL = "ssh" +) + type ( SSHConfig struct { User string @@ -151,3 +158,45 @@ connect: config: config, }, err } + +func (client *SSHClient) WrapperCommand(command string, execInLocal bool) string { + becomeMethod := client.Config().BecomeMethod + becomeFlags := client.Config().BecomeFlags + becomeUser := client.Config().BecomeUser + if len(becomeUser) > 0 && !execInLocal { + become := strings.Join([]string{becomeMethod, becomeFlags, becomeUser}, " ") + command = strings.Join([]string{become, command}, " ") + } + return command +} + +func (client *SSHClient) RunCommand(ctx context.Context, command string) (out []byte, err error) { + var cmd *goph.Cmd + cmd, err = client.Client().CommandContext(ctx, command) + if err == nil { + cmd.Env = []string{"LANG=en_US.UTF-8"} + out, err = cmd.CombinedOutput() + } + return +} + +func (client *SSHClient) RemoteAddr() (addr string) { + config := client.Config() + return fmt.Sprintf("%s@%s:%d", config.User, config.Host, config.Port) +} + +func (client *SSHClient) Upload(localPath string, remotePath string) (err error) { + return client.client.Upload(localPath, remotePath) +} + +func (client *SSHClient) Download(remotePath string, localPath string) (err error) { + return client.client.Download(remotePath, localPath) +} + +func (client *SSHClient) Close() { + client.client.Close() +} + +func (client *SSHClient) Protocol() string { + return SSH_PROTOCOL +}