Skip to content

Commit

Permalink
add load body from file support
Browse files Browse the repository at this point in the history
  • Loading branch information
kubemq committed Jan 27, 2021
1 parent 80e1f2f commit 186e790
Show file tree
Hide file tree
Showing 24 changed files with 468 additions and 391 deletions.
35 changes: 17 additions & 18 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 54 additions & 12 deletions cmd/commands/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,24 @@ package commands

import (
b64 "encoding/base64"
"encoding/json"
"fmt"
jsoniter "github.com/json-iterator/go"
kubemq "github.com/kubemq-io/kubemq-go"
"strconv"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

type object struct {
Id string `json:"id"`
Channel string `json:"channel,omitempty"`
ClientId string `json:"client_id,omitempty"`
Metadata string `json:"metadata,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Body string `json:"body,omitempty"`
Executed string `json:"executed,omitempty"`
Timeout string `json:"timeout,omitempty"`
ExecutedAt string `json:"executed_at,omitempty"`
Error string `json:"error,omitempty"`
payload []byte
BodyJson json.RawMessage `json:"body_json,omitempty"`
BodyString string `json:"body_string,omitempty"`
}

func newObjectWithCommandReceive(cmd *kubemq.CommandReceive) *object {
Expand All @@ -30,38 +29,78 @@ func newObjectWithCommandReceive(cmd *kubemq.CommandReceive) *object {
ClientId: cmd.ClientId,
Metadata: cmd.Metadata,
Tags: cmd.Tags,
Body: "",
BodyJson: json.RawMessage{},
BodyString: "",
Executed: "",
ExecutedAt: "",
Error: "",
payload: cmd.Body,
Timeout: "",
}

sDec, err := b64.StdEncoding.DecodeString(string(cmd.Body))
if err != nil {
obj.Body = string(cmd.Body)
var js json.RawMessage
if err := json.Unmarshal(cmd.Body, &js); err == nil {
obj.BodyJson = js
} else {
obj.Body = string(sDec)
sDec, err := b64.StdEncoding.DecodeString(string(cmd.Body))
if err != nil {
obj.BodyString = string(cmd.Body)
} else {
obj.BodyString = string(sDec)
}
}
return obj
}
func newObjectWithCommandResponse(response *kubemq.CommandResponse) *object {
obj := &object{
Id: response.CommandId,
Channel: "",
ClientId: response.ResponseClientId,
Metadata: "",
Tags: response.Tags,
Executed: strconv.FormatBool(response.Executed),
Timeout: "",
ExecutedAt: response.ExecutedAt.Format("2006-01-02 15:04:05.999"),
Error: response.Error,
BodyJson: nil,
BodyString: "",
}
if !response.Executed {
obj.ExecutedAt = ""
}
return obj
}
func newObjectWithCommand(cmd *kubemq.Command) *object {

obj := &object{
Id: cmd.Id,
Channel: cmd.Channel,
ClientId: cmd.ClientId,
Metadata: cmd.Metadata,
Tags: cmd.Tags,
BodyJson: json.RawMessage{},
BodyString: "",
Executed: "",
ExecutedAt: "",
Error: "",
Timeout: cmd.Timeout.String(),
}
var js json.RawMessage
if err := json.Unmarshal(cmd.Body, &js); err == nil {
obj.BodyJson = js
} else {
sDec, err := b64.StdEncoding.DecodeString(string(cmd.Body))
if err != nil {
obj.BodyString = string(cmd.Body)
} else {
obj.BodyString = string(sDec)
}
}

return obj
}

func (o *object) String() string {
data, _ := json.MarshalIndent(o, "", " ")
data, _ := json.MarshalIndent(o, "", " ")
return string(data)
}

Expand All @@ -72,3 +111,6 @@ func printCommandReceive(command *kubemq.CommandReceive) {
func printCommandResponse(response *kubemq.CommandResponse) {
fmt.Println(newObjectWithCommandResponse(response))
}
func printCommand(cmd *kubemq.Command) {
fmt.Println(newObjectWithCommand(cmd))
}
10 changes: 5 additions & 5 deletions cmd/commands/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ type CommandsReceiveOptions struct {
}

var commandsReceiveExamples = `
# Receive commands from a 'commands' channel (blocks until next message)
# Receive commands from a 'commands' channel (blocks until next body)
kubemqctl commands receive some-channel
# Receive commands from a 'commands' channel with group (blocks until next message)
# Receive commands from a 'commands' channel with group (blocks until next body)
kubemqctl commands receive some-channel -g G1
`
var commandsReceiveLong = `Receive (Subscribe) command allows to consume a message from 'commands' channel and response with appropriate reply`
var commandsReceiveShort = `Receive a message from 'commands' channel command`
var commandsReceiveLong = `Receive (Subscribe) command allows to consume a body from 'commands' channel and response with appropriate reply`
var commandsReceiveShort = `Receive a body from 'commands' channel command`

func NewCmdCommandsReceive(ctx context.Context, cfg *config.Config) *cobra.Command {
o := &CommandsReceiveOptions{
Expand Down Expand Up @@ -86,7 +86,7 @@ func (o *CommandsReceiveOptions) Run(ctx context.Context) error {
utils.Println(fmt.Errorf("receive commands messages, %s", err.Error()).Error())
}
for {
utils.Println("waiting for the next command message...")
utils.Println("waiting for the next command body...")
select {
case err := <-errChan:
return fmt.Errorf("server disconnected with error: %s", err.Error())
Expand Down
41 changes: 30 additions & 11 deletions cmd/commands/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,29 @@ import (
"github.com/kubemq-io/kubemqctl/pkg/kubemq"
"github.com/kubemq-io/kubemqctl/pkg/utils"
"github.com/spf13/cobra"
"io/ioutil"
"time"
)

type CommandsSendOptions struct {
cfg *config.Config
transport string
channel string
message string
body string
metadata string
timeout int
fileName string
}

var commandsSendExamples = `
# Send command to a 'commands' channel
kubemqctl commands send some-channel some-command
# Send command to a 'commands' channel with metadata
kubemqctl commands send some-channel some-message -m some-metadata
kubemqctl commands send some-channel some-body -m some-metadata
# Send command to a 'commands' channel with 120 seconds timeout
kubemqctl commands send some-channel some-message -o 120
kubemqctl commands send some-channel some-body -o 120
`
var commandsSendLong = `Send command allow to send messages to 'commands' channel with an option to set command time-out`
var commandsSendShort = `Send messages to 'commands' channel command`
Expand All @@ -54,20 +56,35 @@ func NewCmdCommandsSend(ctx context.Context, cfg *config.Config) *cobra.Command
utils.CheckErr(o.Run(ctx))
},
}
cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "m", "", "Set metadata message")
cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "m", "", "Set metadata body")
cmd.PersistentFlags().IntVarP(&o.timeout, "timeout", "o", 30, "Set command timeout")

cmd.PersistentFlags().StringVarP(&o.fileName, "file", "f", "", "set load body from file")
return cmd
}

func (o *CommandsSendOptions) Complete(args []string, transport string) error {
o.transport = transport
if len(args) >= 2 {
if len(args) >= 1 {
o.channel = args[0]
o.message = args[1]
return nil

} else {
return fmt.Errorf("missing channel argument")
}
return fmt.Errorf("missing arguments, must be 2 arguments, channel and a message")

if o.fileName != "" {
data, err := ioutil.ReadFile(o.fileName)
if err != nil {
return err
}
o.body = string(data)
} else {
if len(args) >= 2 {
o.body = args[1]
} else {
return fmt.Errorf("missing body argument")
}
}
return nil
}

func (o *CommandsSendOptions) Validate() error {
Expand All @@ -87,12 +104,14 @@ func (o *CommandsSendOptions) Run(ctx context.Context) error {
msg := client.C().
SetChannel(o.channel).
SetId(uuid.New().String()).
SetBody([]byte(o.message)).
SetBody([]byte(o.body)).
SetMetadata(o.metadata).
SetTimeout(time.Duration(o.timeout) * time.Second)
fmt.Println("Sending Command:")
printCommand(msg)
res, err := msg.Send(ctx)
if err != nil {
return fmt.Errorf("sending commands message, %s", err.Error())
return fmt.Errorf("sending commands body, %s", err.Error())
}
printCommandResponse(res)
return nil
Expand Down
7 changes: 5 additions & 2 deletions cmd/create/cluster/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func defaultDeployOptions(cmd *cobra.Command) *deployOptions {
cmd.PersistentFlags().StringVarP(&o.configFilename, "config-file", "c", "", "set kubemq config file")
cmd.PersistentFlags().StringVarP(&o.name, "name", "", "kubemq-cluster", "set kubemq cluster name")
cmd.PersistentFlags().StringVarP(&o.namespace, "namespace", "n", "kubemq", "set kubemq cluster namespace")
cmd.PersistentFlags().StringVarP(&o.key, "key", "", "", "set kubemq license key")
cmd.PersistentFlags().StringVarP(&o.key, "key", "k", "", "set kubemq license key")
cmd.PersistentFlags().StringVarP(&o.statefulSetConfigData, "statefulset-config-data", "", "", "set kubemq cluster statefulset configuration data")
cmd.PersistentFlags().BoolVarP(&o.standalone, "standalone", "", false, "set kubemq cluster standalone mode")
cmd.PersistentFlags().Int32VarP(&o.replicas, "replicas", "r", 3, "set replicas")
Expand All @@ -82,7 +82,6 @@ func (o *deployOptions) validate() error {
if o.namespace == "" {
return fmt.Errorf("error setting deploy configuration, missing kubemq cluster namespace")
}

if err := o.api.validate(); err != nil {
return err
}
Expand Down Expand Up @@ -138,6 +137,10 @@ func (o *deployOptions) validate() error {
if err := o.volume.validate(); err != nil {
return err
}

if o.license.licenseData == "" && o.key == "" {
return fmt.Errorf("license key is required, get a key at https://account.kubemq.io/login/register")
}
return nil
}

Expand Down
Loading

0 comments on commit 186e790

Please sign in to comment.