diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 429e7e3..b9695af 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,30 +2,29 @@ - - - - - + - + - + + + + - + @@ -568,7 +567,6 @@ true diff --git a/cmd/commands/helper.go b/cmd/commands/helper.go index af4a4dc..5f0d03e 100644 --- a/cmd/commands/helper.go +++ b/cmd/commands/helper.go @@ -2,25 +2,24 @@ package commands import ( b64 "encoding/base64" + "encoding/json" "fmt" - jsoniter "github.com/json-iterator/go" kubemq "github.com/kubemq-io/kubemq-go" "strconv" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary - type object struct { Id string `json:"id"` Channel string `json:"channel,omitempty"` ClientId string `json:"client_id,omitempty"` Metadata string `json:"metadata,omitempty"` Tags map[string]string `json:"tags,omitempty"` - Body string `json:"body,omitempty"` Executed string `json:"executed,omitempty"` + Timeout string `json:"timeout,omitempty"` ExecutedAt string `json:"executed_at,omitempty"` Error string `json:"error,omitempty"` - payload []byte + BodyJson json.RawMessage `json:"body_json,omitempty"` + BodyString string `json:"body_string,omitempty"` } func newObjectWithCommandReceive(cmd *kubemq.CommandReceive) *object { @@ -30,38 +29,78 @@ func newObjectWithCommandReceive(cmd *kubemq.CommandReceive) *object { ClientId: cmd.ClientId, Metadata: cmd.Metadata, Tags: cmd.Tags, - Body: "", + BodyJson: json.RawMessage{}, + BodyString: "", Executed: "", ExecutedAt: "", Error: "", - payload: cmd.Body, + Timeout: "", } - sDec, err := b64.StdEncoding.DecodeString(string(cmd.Body)) - if err != nil { - obj.Body = string(cmd.Body) + var js json.RawMessage + if err := json.Unmarshal(cmd.Body, &js); err == nil { + obj.BodyJson = js } else { - obj.Body = string(sDec) + sDec, err := b64.StdEncoding.DecodeString(string(cmd.Body)) + if err != nil { + obj.BodyString = string(cmd.Body) + } else { + obj.BodyString = string(sDec) + } } return obj } func newObjectWithCommandResponse(response *kubemq.CommandResponse) *object { obj := &object{ Id: response.CommandId, + Channel: "", ClientId: response.ResponseClientId, + Metadata: "", Tags: response.Tags, Executed: strconv.FormatBool(response.Executed), + Timeout: "", ExecutedAt: response.ExecutedAt.Format("2006-01-02 15:04:05.999"), Error: response.Error, + BodyJson: nil, + BodyString: "", } if !response.Executed { obj.ExecutedAt = "" } return obj } +func newObjectWithCommand(cmd *kubemq.Command) *object { + + obj := &object{ + Id: cmd.Id, + Channel: cmd.Channel, + ClientId: cmd.ClientId, + Metadata: cmd.Metadata, + Tags: cmd.Tags, + BodyJson: json.RawMessage{}, + BodyString: "", + Executed: "", + ExecutedAt: "", + Error: "", + Timeout: cmd.Timeout.String(), + } + var js json.RawMessage + if err := json.Unmarshal(cmd.Body, &js); err == nil { + obj.BodyJson = js + } else { + sDec, err := b64.StdEncoding.DecodeString(string(cmd.Body)) + if err != nil { + obj.BodyString = string(cmd.Body) + } else { + obj.BodyString = string(sDec) + } + } + + return obj +} func (o *object) String() string { - data, _ := json.MarshalIndent(o, "", " ") + data, _ := json.MarshalIndent(o, "", " ") return string(data) } @@ -72,3 +111,6 @@ func printCommandReceive(command *kubemq.CommandReceive) { func printCommandResponse(response *kubemq.CommandResponse) { fmt.Println(newObjectWithCommandResponse(response)) } +func printCommand(cmd *kubemq.Command) { + fmt.Println(newObjectWithCommand(cmd)) +} diff --git a/cmd/commands/receive.go b/cmd/commands/receive.go index 59c1b5f..5aa883a 100644 --- a/cmd/commands/receive.go +++ b/cmd/commands/receive.go @@ -21,14 +21,14 @@ type CommandsReceiveOptions struct { } var commandsReceiveExamples = ` - # Receive commands from a 'commands' channel (blocks until next message) + # Receive commands from a 'commands' channel (blocks until next body) kubemqctl commands receive some-channel - # Receive commands from a 'commands' channel with group (blocks until next message) + # Receive commands from a 'commands' channel with group (blocks until next body) kubemqctl commands receive some-channel -g G1 ` -var commandsReceiveLong = `Receive (Subscribe) command allows to consume a message from 'commands' channel and response with appropriate reply` -var commandsReceiveShort = `Receive a message from 'commands' channel command` +var commandsReceiveLong = `Receive (Subscribe) command allows to consume a body from 'commands' channel and response with appropriate reply` +var commandsReceiveShort = `Receive a body from 'commands' channel command` func NewCmdCommandsReceive(ctx context.Context, cfg *config.Config) *cobra.Command { o := &CommandsReceiveOptions{ @@ -86,7 +86,7 @@ func (o *CommandsReceiveOptions) Run(ctx context.Context) error { utils.Println(fmt.Errorf("receive commands messages, %s", err.Error()).Error()) } for { - utils.Println("waiting for the next command message...") + utils.Println("waiting for the next command body...") select { case err := <-errChan: return fmt.Errorf("server disconnected with error: %s", err.Error()) diff --git a/cmd/commands/send.go b/cmd/commands/send.go index 97cfd68..ba94941 100644 --- a/cmd/commands/send.go +++ b/cmd/commands/send.go @@ -9,6 +9,7 @@ import ( "github.com/kubemq-io/kubemqctl/pkg/kubemq" "github.com/kubemq-io/kubemqctl/pkg/utils" "github.com/spf13/cobra" + "io/ioutil" "time" ) @@ -16,9 +17,10 @@ type CommandsSendOptions struct { cfg *config.Config transport string channel string - message string + body string metadata string timeout int + fileName string } var commandsSendExamples = ` @@ -26,10 +28,10 @@ var commandsSendExamples = ` kubemqctl commands send some-channel some-command # Send command to a 'commands' channel with metadata - kubemqctl commands send some-channel some-message -m some-metadata + kubemqctl commands send some-channel some-body -m some-metadata # Send command to a 'commands' channel with 120 seconds timeout - kubemqctl commands send some-channel some-message -o 120 + kubemqctl commands send some-channel some-body -o 120 ` var commandsSendLong = `Send command allow to send messages to 'commands' channel with an option to set command time-out` var commandsSendShort = `Send messages to 'commands' channel command` @@ -54,20 +56,35 @@ func NewCmdCommandsSend(ctx context.Context, cfg *config.Config) *cobra.Command utils.CheckErr(o.Run(ctx)) }, } - cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "m", "", "Set metadata message") + cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "m", "", "Set metadata body") cmd.PersistentFlags().IntVarP(&o.timeout, "timeout", "o", 30, "Set command timeout") - + cmd.PersistentFlags().StringVarP(&o.fileName, "file", "f", "", "set load body from file") return cmd } func (o *CommandsSendOptions) Complete(args []string, transport string) error { o.transport = transport - if len(args) >= 2 { + if len(args) >= 1 { o.channel = args[0] - o.message = args[1] - return nil + + } else { + return fmt.Errorf("missing channel argument") } - return fmt.Errorf("missing arguments, must be 2 arguments, channel and a message") + + if o.fileName != "" { + data, err := ioutil.ReadFile(o.fileName) + if err != nil { + return err + } + o.body = string(data) + } else { + if len(args) >= 2 { + o.body = args[1] + } else { + return fmt.Errorf("missing body argument") + } + } + return nil } func (o *CommandsSendOptions) Validate() error { @@ -87,12 +104,14 @@ func (o *CommandsSendOptions) Run(ctx context.Context) error { msg := client.C(). SetChannel(o.channel). SetId(uuid.New().String()). - SetBody([]byte(o.message)). + SetBody([]byte(o.body)). SetMetadata(o.metadata). SetTimeout(time.Duration(o.timeout) * time.Second) + fmt.Println("Sending Command:") + printCommand(msg) res, err := msg.Send(ctx) if err != nil { - return fmt.Errorf("sending commands message, %s", err.Error()) + return fmt.Errorf("sending commands body, %s", err.Error()) } printCommandResponse(res) return nil diff --git a/cmd/create/cluster/deploy.go b/cmd/create/cluster/deploy.go index 6a618ea..9a5c87e 100644 --- a/cmd/create/cluster/deploy.go +++ b/cmd/create/cluster/deploy.go @@ -67,7 +67,7 @@ func defaultDeployOptions(cmd *cobra.Command) *deployOptions { cmd.PersistentFlags().StringVarP(&o.configFilename, "config-file", "c", "", "set kubemq config file") cmd.PersistentFlags().StringVarP(&o.name, "name", "", "kubemq-cluster", "set kubemq cluster name") cmd.PersistentFlags().StringVarP(&o.namespace, "namespace", "n", "kubemq", "set kubemq cluster namespace") - cmd.PersistentFlags().StringVarP(&o.key, "key", "", "", "set kubemq license key") + cmd.PersistentFlags().StringVarP(&o.key, "key", "k", "", "set kubemq license key") cmd.PersistentFlags().StringVarP(&o.statefulSetConfigData, "statefulset-config-data", "", "", "set kubemq cluster statefulset configuration data") cmd.PersistentFlags().BoolVarP(&o.standalone, "standalone", "", false, "set kubemq cluster standalone mode") cmd.PersistentFlags().Int32VarP(&o.replicas, "replicas", "r", 3, "set replicas") @@ -82,7 +82,6 @@ func (o *deployOptions) validate() error { if o.namespace == "" { return fmt.Errorf("error setting deploy configuration, missing kubemq cluster namespace") } - if err := o.api.validate(); err != nil { return err } @@ -138,6 +137,10 @@ func (o *deployOptions) validate() error { if err := o.volume.validate(); err != nil { return err } + + if o.license.licenseData == "" && o.key == "" { + return fmt.Errorf("license key is required, get a key at https://account.kubemq.io/login/register") + } return nil } diff --git a/cmd/create/cluster/license.go b/cmd/create/cluster/license.go index e70aaf5..ce0fbbb 100644 --- a/cmd/create/cluster/license.go +++ b/cmd/create/cluster/license.go @@ -2,9 +2,7 @@ package cluster import ( "fmt" - "github.com/kubemq-io/kubemqctl/pkg/api" "github.com/kubemq-io/kubemqctl/pkg/k8s/types/kubemqcluster" - "github.com/kubemq-io/kubemqctl/pkg/utils" "github.com/spf13/cobra" "io/ioutil" ) @@ -12,24 +10,20 @@ import ( var defaultLicenseConfig = &deployLicenseOptions{ licenseData: "", licenseFilename: "", - licenseToken: "", } type deployLicenseOptions struct { licenseData string licenseFilename string - licenseToken string } func setLicenseConfig(cmd *cobra.Command) *deployLicenseOptions { o := &deployLicenseOptions{ licenseData: "", licenseFilename: "", - licenseToken: "", } cmd.PersistentFlags().StringVarP(&o.licenseData, "license-data", "", "", "set license data") cmd.PersistentFlags().StringVarP(&o.licenseFilename, "license-file", "", "", "set license file") - cmd.PersistentFlags().StringVarP(&o.licenseToken, "license-token", "t", "", "set license token") return o } @@ -44,20 +38,6 @@ func (o *deployLicenseOptions) complete() error { return fmt.Errorf("error loading license file data: %s", err.Error()) } o.licenseData = string(data) - } else { - if o.licenseToken != "" { - utils.Printf("fetching license for token %s ", o.licenseToken) - data, err := api.GetLicenseDataByToken(o.licenseToken) - if err != nil { - utils.PrintlnfNoTitle(", error: %s ", err.Error()) - } else { - utils.PrintlnfNoTitle(" completed") - o.licenseData = data - } - if o.licenseData == "" { - utils.Printlnf("no valid license data received") - } - } } return nil } diff --git a/cmd/events/helper.go b/cmd/events/helper.go index ff0cf51..9363459 100644 --- a/cmd/events/helper.go +++ b/cmd/events/helper.go @@ -2,45 +2,47 @@ package events import ( b64 "encoding/base64" + "encoding/json" "fmt" - jsoniter "github.com/json-iterator/go" kubemq "github.com/kubemq-io/kubemq-go" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary - type object struct { - Id string `json:"id"` - Channel string `json:"channel,omitempty"` - ClientId string `json:"client_id,omitempty"` - Metadata string `json:"metadata,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Body string `json:"body,omitempty"` - payload []byte + Id string `json:"id"` + Channel string `json:"channel,omitempty"` + ClientId string `json:"client_id,omitempty"` + Metadata string `json:"metadata,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + BodyJson json.RawMessage `json:"body_json,omitempty"` + BodyString string `json:"body_string,omitempty"` } func newObjectWithEvent(event *kubemq.Event) *object { obj := &object{ - Id: event.Id, - Channel: event.Channel, - ClientId: event.ClientId, - Metadata: event.Metadata, - Tags: event.Tags, - Body: "", - payload: event.Body, + Id: event.Id, + Channel: event.Channel, + ClientId: event.ClientId, + Metadata: event.Metadata, + Tags: event.Tags, + BodyJson: json.RawMessage{}, + BodyString: "", } - - sDec, err := b64.StdEncoding.DecodeString(string(event.Body)) - if err != nil { - obj.Body = string(event.Body) + var js json.RawMessage + if err := json.Unmarshal(event.Body, &js); err == nil { + obj.BodyJson = js } else { - obj.Body = string(sDec) + sDec, err := b64.StdEncoding.DecodeString(string(event.Body)) + if err != nil { + obj.BodyString = string(event.Body) + } else { + obj.BodyString = string(sDec) + } } return obj } func (o *object) String() string { - data, _ := json.MarshalIndent(o, "", " ") + data, _ := json.MarshalIndent(o, "", " ") return string(data) } diff --git a/cmd/events/receive.go b/cmd/events/receive.go index 81366ce..a337807 100644 --- a/cmd/events/receive.go +++ b/cmd/events/receive.go @@ -18,15 +18,15 @@ type EventsReceiveOptions struct { } var eventsReceiveExamples = ` - # Receive messages from an 'events' channel (blocks until next message) + # Receive messages from an 'events' channel (blocks until next body) kubemqctl events receive some-channel - # Receive messages from an 'events' channel with group (blocks until next message) + # Receive messages from an 'events' channel with group (blocks until next body) kubemqctl events receive some-channel -g G1 ` var eventsReceiveLong = `Receive (Subscribe) command allows to consume one or many messages from 'events' channel` -var eventsReceiveShort = `Receive a message from 'events' channel command` +var eventsReceiveShort = `Receive a body from 'events' channel command` func NewCmdEventsReceive(ctx context.Context, cfg *config.Config) *cobra.Command { o := &EventsReceiveOptions{ diff --git a/cmd/events/send.go b/cmd/events/send.go index 69ad610..2cff647 100644 --- a/cmd/events/send.go +++ b/cmd/events/send.go @@ -10,6 +10,7 @@ import ( "github.com/kubemq-io/kubemqctl/pkg/kubemq" "github.com/kubemq-io/kubemqctl/pkg/utils" "github.com/spf13/cobra" + "io/ioutil" "time" ) @@ -17,24 +18,25 @@ type EventsSendOptions struct { cfg *config.Config transport string channel string - message string + body string metadata string messages int isStream bool + fileName string } var eventsSendExamples = ` - # Send (Publish) message to a 'events' channel - kubemqctl events send some-channel some-message + # Send (Publish) body to a 'events' channel + kubemqctl events send some-channel some-body - # Send (Publish) message to a 'events' channel with metadata - kubemqctl events send some-channel some-message --metadata some-metadata + # Send (Publish) body to a 'events' channel with metadata + kubemqctl events send some-channel some-body --metadata some-metadata # Send (Publish) batch of 10 messages to a 'events' channel - kubemqctl events send some-channel some-message -m 10 + kubemqctl events send some-channel some-body -m 10 # Send (Publish) batch of 100 messages to a 'events' channel in stream mode - kubemqctl events send some-channel some-message -m 100 -s + kubemqctl events send some-channel some-body -m 100 -s ` var eventsSendLong = `Send command allows to send (publish) one or many messages to an 'events' channel` var eventsSendShort = `Send messages to an 'events' channel command` @@ -59,21 +61,36 @@ func NewCmdEventsSend(ctx context.Context, cfg *config.Config) *cobra.Command { utils.CheckErr(o.Run(ctx)) }, } - cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "", "", "set message metadata field") + cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "", "", "set body metadata field") cmd.PersistentFlags().IntVarP(&o.messages, "messages", "m", 1, "set how many 'events' messages to send") cmd.PersistentFlags().BoolVarP(&o.isStream, "stream", "s", false, "set stream of all messages at once") - + cmd.PersistentFlags().StringVarP(&o.fileName, "file", "f", "", "set body body from file") return cmd } func (o *EventsSendOptions) Complete(args []string, transport string) error { o.transport = transport - if len(args) >= 2 { + if len(args) >= 1 { o.channel = args[0] - o.message = args[1] - return nil + + } else { + return fmt.Errorf("missing channel argument") + } + + if o.fileName != "" { + data, err := ioutil.ReadFile(o.fileName) + if err != nil { + return err + } + o.body = string(data) + } else { + if len(args) >= 2 { + o.body = args[1] + } else { + return fmt.Errorf("missing body argument") + } } - return fmt.Errorf("missing arguments, must be 2 arguments, channel and a message") + return nil } func (o *EventsSendOptions) Validate() error { @@ -94,28 +111,31 @@ func (o *EventsSendOptions) Run(ctx context.Context) error { utils.Printlnf("Streaming %d events messages ...", o.messages) eventsCh := make(chan *kubemq2.Event, 100) errCh := make(chan error, 10) - + fmt.Println("Sending Stream Events:") go client.StreamEvents(ctx, eventsCh, errCh) startTime := time.Now() for i := 1; i <= o.messages; i++ { - eventsCh <- client.E(). + msg := client.E(). SetChannel(o.channel). SetId(uuid.New().String()). - SetBody([]byte(o.message)). + SetBody([]byte(o.body)). SetMetadata(o.metadata) + printEvent(msg) + eventsCh <- msg } utils.Printlnf("%d events messages streamed in %s.", o.messages, time.Since(startTime)) time.Sleep(time.Second) } else { + fmt.Println("Sending Events:") for i := 1; i <= o.messages; i++ { msg := client.E(). SetChannel(o.channel). SetId(uuid.New().String()). - SetBody([]byte(o.message)). + SetBody([]byte(o.body)). SetMetadata(o.metadata) err = msg.Send(ctx) if err != nil { - return fmt.Errorf("sending 'events' message, %s", err.Error()) + return fmt.Errorf("sending 'events' body, %s", err.Error()) } printEvent(msg) } diff --git a/cmd/events_store/helper.go b/cmd/events_store/helper.go index d55ad25..d655528 100644 --- a/cmd/events_store/helper.go +++ b/cmd/events_store/helper.go @@ -2,67 +2,75 @@ package events_store import ( b64 "encoding/base64" + "encoding/json" "fmt" - jsoniter "github.com/json-iterator/go" kubemq "github.com/kubemq-io/kubemq-go" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary - type object struct { - Id string `json:"id"` - Channel string `json:"channel,omitempty"` - ClientId string `json:"client_id,omitempty"` - Metadata string `json:"metadata,omitempty"` - Timestamp string `json:"timestamp,omitempty"` - Sequence uint64 `json:"sequence,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Body string `json:"body,omitempty"` - payload []byte + Id string `json:"id"` + Channel string `json:"channel,omitempty"` + ClientId string `json:"client_id,omitempty"` + Metadata string `json:"metadata,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + Sequence uint64 `json:"sequence,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + BodyJson json.RawMessage `json:"body_json,omitempty"` + BodyString string `json:"body_string,omitempty"` } func newObjectWithEventReceive(event *kubemq.EventStoreReceive) *object { obj := &object{ - Id: event.Id, - Channel: event.Channel, - ClientId: event.ClientId, - Metadata: event.Metadata, - Timestamp: event.Timestamp.Format("2006-01-02 15:04:05.999"), - Sequence: event.Sequence, - Tags: event.Tags, - Body: "", - payload: event.Body, + Id: event.Id, + Channel: event.Channel, + ClientId: event.ClientId, + Metadata: event.Metadata, + Timestamp: event.Timestamp.Format("2006-01-02 15:04:05.999"), + Sequence: event.Sequence, + Tags: event.Tags, + BodyJson: json.RawMessage{}, + BodyString: "", } - sDec, err := b64.StdEncoding.DecodeString(string(event.Body)) - if err != nil { - obj.Body = string(event.Body) + var js json.RawMessage + if err := json.Unmarshal(event.Body, &js); err == nil { + obj.BodyJson = js } else { - obj.Body = string(sDec) + sDec, err := b64.StdEncoding.DecodeString(string(event.Body)) + if err != nil { + obj.BodyString = string(event.Body) + } else { + obj.BodyString = string(sDec) + } } return obj } func newObjectWithEventStore(event *kubemq.EventStore) *object { obj := &object{ - Id: event.Id, - Channel: event.Channel, - ClientId: event.ClientId, - Metadata: event.Metadata, - Tags: event.Tags, - Body: "", - payload: event.Body, + Id: event.Id, + Channel: event.Channel, + ClientId: event.ClientId, + Metadata: event.Metadata, + Tags: event.Tags, + BodyJson: json.RawMessage{}, + BodyString: "", } - sDec, err := b64.StdEncoding.DecodeString(string(event.Body)) - if err != nil { - obj.Body = string(event.Body) + var js json.RawMessage + if err := json.Unmarshal(event.Body, &js); err == nil { + obj.BodyJson = js } else { - obj.Body = string(sDec) + sDec, err := b64.StdEncoding.DecodeString(string(event.Body)) + if err != nil { + obj.BodyString = string(event.Body) + } else { + obj.BodyString = string(sDec) + } } return obj } func (o *object) String() string { - data, _ := json.MarshalIndent(o, "", " ") + data, _ := json.MarshalIndent(o, "", " ") return string(data) } diff --git a/cmd/events_store/list.go b/cmd/events_store/list.go index 12ebd3a..b0aeb62 100644 --- a/cmd/events_store/list.go +++ b/cmd/events_store/list.go @@ -2,7 +2,7 @@ package events_store import ( "context" - jsonEnc "encoding/json" + "encoding/json" "fmt" "github.com/go-resty/resty" "github.com/kubemq-io/kubemqctl/pkg/config" @@ -88,10 +88,10 @@ func (o *EventsStoreListOptions) Run(ctx context.Context) error { } type Response struct { - Node string `json:"node"` - Error bool `json:"error"` - ErrorString string `json:"error_string"` - Data jsonEnc.RawMessage `json:"data"` + Node string `json:"node"` + Error bool `json:"error"` + ErrorString string `json:"error_string"` + Data json.RawMessage `json:"data"` } type Queues struct { diff --git a/cmd/events_store/receive.go b/cmd/events_store/receive.go index 89e7e50..d446311 100644 --- a/cmd/events_store/receive.go +++ b/cmd/events_store/receive.go @@ -30,10 +30,10 @@ type EventsStoreReceiveOptions struct { } var eventsReceiveExamples = ` - # Receive messages from an 'events store' channel (blocks until next message) + # Receive messages from an 'events store' channel (blocks until next body) kubemqctl events_store receive some-channel - # Receive messages from an 'events channel' with group(blocks until next message) + # Receive messages from an 'events channel' with group(blocks until next body) kubemqctl events_store receive some-channel -g G1 ` var eventsReceiveLong = `Receive (Subscribe) command allows to consume messages from an 'events store' with options to set offset parameters` @@ -61,10 +61,10 @@ func NewCmdEventsStoreReceive(ctx context.Context, cfg *config.Config) *cobra.Co } cmd.PersistentFlags().StringVarP(&o.group, "group", "g", "", "set 'events_store' channel consumer group (load balancing)") - cmd.PersistentFlags().BoolVar(&o.startNew, "start-new", false, "start from new message only") - cmd.PersistentFlags().BoolVar(&o.startFirst, "start-first", false, "start from first message in the channel") - cmd.PersistentFlags().BoolVar(&o.startLast, "start-last", false, "start from last message in the channel") - cmd.PersistentFlags().IntVar(&o.startSequence, "start-sequence", 0, "start from message sequence") + cmd.PersistentFlags().BoolVar(&o.startNew, "start-new", false, "start from new body only") + cmd.PersistentFlags().BoolVar(&o.startFirst, "start-first", false, "start from first body in the channel") + cmd.PersistentFlags().BoolVar(&o.startLast, "start-last", false, "start from last body in the channel") + cmd.PersistentFlags().IntVar(&o.startSequence, "start-sequence", 0, "start from body sequence") cmd.PersistentFlags().StringVar(&o.startTime, "start-time", "", "start from timestamp format 2006-01-02 15:04:05") cmd.PersistentFlags().StringVar(&o.startDuration, "start-duration", "", "start from time duration i.e. 1h") return cmd @@ -160,7 +160,7 @@ func (o *EventsStoreReceiveOptions) promptOptions() error { action := "" prompt := &survey.Select{ Message: "Start receive events store messages options:", - Options: []string{"start from new messages only", "start from first message", "start from last message", "start from sequence", "start from time", "start from duration"}, + Options: []string{"start from new messages only", "start from first body", "start from last body", "start from sequence", "start from time", "start from duration"}, } err := survey.AskOne(prompt, &action) if err != nil { @@ -170,10 +170,10 @@ func (o *EventsStoreReceiveOptions) promptOptions() error { case "start from new messages only": o.subOptions = kubemq2.StartFromNewEvents() return nil - case "start from first message": + case "start from first body": o.subOptions = kubemq2.StartFromFirstEvent() return nil - case "start from last message": + case "start from last body": o.subOptions = kubemq2.StartFromLastEvent() return nil case "start from sequence": @@ -182,7 +182,7 @@ func (o *EventsStoreReceiveOptions) promptOptions() error { Renderer: survey.Renderer{}, Message: "Set sequence:", Default: "1", - Help: "1 is the first message", + Help: "1 is the first body", } err := survey.AskOne(prompt, &seqStr) diff --git a/cmd/events_store/send.go b/cmd/events_store/send.go index 3069564..9dbd8a6 100644 --- a/cmd/events_store/send.go +++ b/cmd/events_store/send.go @@ -10,6 +10,7 @@ import ( "github.com/kubemq-io/kubemqctl/pkg/kubemq" "github.com/kubemq-io/kubemqctl/pkg/utils" "github.com/spf13/cobra" + "io/ioutil" "time" ) @@ -17,24 +18,25 @@ type EventsStoreSendOptions struct { cfg *config.Config transport string channel string - message string + body string metadata string messages int isStream bool + fileName string } var eventsSendExamples = ` - # Send (Publish) message to an 'events store' channel - kubemqctl events_store send some-channel some-message + # Send (Publish) body to an 'events store' channel + kubemqctl events_store send some-channel some-body - # Send (Publish) message to an 'events store' channel with metadata - kubemqctl events_store send some-channel some-message --metadata some-metadata + # Send (Publish) body to an 'events store' channel with metadata + kubemqctl events_store send some-channel some-body --metadata some-metadata # Send 10 messages to an 'events store' channel - kubemqctl events_store send some-channel some-message -m 10 + kubemqctl events_store send some-channel some-body -m 10 # Send 100 messages to an 'events store' channel in stream mode - kubemqctl events_store send some-channel some-message -m 100 -s + kubemqctl events_store send some-channel some-body -m 100 -s ` var eventsSendLong = `Send command allows to send (publish) one or many messages to an 'events store' channel` var eventsSendShort = `Send messages to an 'events store' channel command` @@ -59,20 +61,36 @@ func NewCmdEventsStoreSend(ctx context.Context, cfg *config.Config) *cobra.Comma utils.CheckErr(o.Run(ctx)) }, } - cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "", "", "set message metadata field") + cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "", "", "set body metadata field") cmd.PersistentFlags().IntVarP(&o.messages, "messages", "m", 1, "set how many 'events store' messages to send") cmd.PersistentFlags().BoolVarP(&o.isStream, "stream", "s", false, "set stream of all messages at once") + cmd.PersistentFlags().StringVarP(&o.fileName, "file", "f", "", "set load body from file") return cmd } func (o *EventsStoreSendOptions) Complete(args []string, transport string) error { o.transport = transport - if len(args) >= 2 { + if len(args) >= 1 { o.channel = args[0] - o.message = args[1] - return nil + + } else { + return fmt.Errorf("missing channel argument") } - return fmt.Errorf("missing arguments, must be 2 arguments, channel and a message") + + if o.fileName != "" { + data, err := ioutil.ReadFile(o.fileName) + if err != nil { + return err + } + o.body = string(data) + } else { + if len(args) >= 2 { + o.body = args[1] + } else { + return fmt.Errorf("missing body argument") + } + } + return nil } func (o *EventsStoreSendOptions) Validate() error { @@ -94,29 +112,32 @@ func (o *EventsStoreSendOptions) Run(ctx context.Context) error { eventsCh := make(chan *kubemq2.EventStore, 1000) eventsResultsCh := make(chan *kubemq2.EventStoreResult, 1000) errCh := make(chan error, 10) - + fmt.Println("Sending Stream Events Store:") go client.StreamEventsStore(ctx, eventsCh, eventsResultsCh, errCh) startTime := time.Now() for i := 1; i <= o.messages; i++ { - eventsCh <- client.ES(). + msg := client.ES(). SetChannel(o.channel). SetId(uuid.New().String()). - SetBody([]byte(o.message)). + SetBody([]byte(o.body)). SetMetadata(o.metadata) + printEventStore(msg) + eventsCh <- msg <-eventsResultsCh } utils.Printlnf("%d events store messages streamed in %s.", o.messages, time.Since(startTime)) time.Sleep(2 * time.Second) } else { + fmt.Println("Sending Events Store:") for i := 1; i <= o.messages; i++ { msg := client.ES(). SetChannel(o.channel). SetId(uuid.New().String()). - SetBody([]byte(o.message)). + SetBody([]byte(o.body)). SetMetadata(o.metadata) _, err := msg.Send(ctx) if err != nil { - return fmt.Errorf("sending 'events store' message, %s", err.Error()) + return fmt.Errorf("sending 'events store' body, %s", err.Error()) } printEventStore(msg) } diff --git a/cmd/get/cluster/cluster.go b/cmd/get/cluster/cluster.go index a2b5a9d..9eef57f 100644 --- a/cmd/get/cluster/cluster.go +++ b/cmd/get/cluster/cluster.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/kubemq-io/kubemqctl/cmd/get/cluster/describe" "github.com/kubemq-io/kubemqctl/cmd/get/cluster/events" - "github.com/kubemq-io/kubemqctl/cmd/get/cluster/license" "github.com/kubemq-io/kubemqctl/cmd/get/cluster/logs" "github.com/kubemq-io/kubemqctl/pkg/config" "github.com/kubemq-io/kubemqctl/pkg/k8s/client" @@ -50,7 +49,6 @@ func NewCmdGet(ctx context.Context, cfg *config.Config) *cobra.Command { cmd.AddCommand(logs.NewCmdLogs(ctx, cfg)) cmd.AddCommand(events.NewCmdEvents(ctx, cfg)) cmd.AddCommand(describe.NewCmdDescribe(ctx, cfg)) - cmd.AddCommand(license.NewCmdLicense(ctx, cfg)) return cmd } diff --git a/cmd/get/cluster/license/license.go b/cmd/get/cluster/license/license.go deleted file mode 100644 index 4eb4934..0000000 --- a/cmd/get/cluster/license/license.go +++ /dev/null @@ -1,87 +0,0 @@ -package license - -import ( - "context" - "fmt" - "github.com/kubemq-io/kubemqctl/pkg/api" - "github.com/kubemq-io/kubemqctl/pkg/config" - "github.com/kubemq-io/kubemqctl/pkg/utils" - "github.com/spf13/cobra" -) - -const ( - prefix = "-----BEGIN KUBEMQ KEY-----" - suffix = "-----END KUBEMQ KEY-----" -) - -type getLicenseOptions struct { - licenseToken string -} - -var licenseExamples = ` - # Get license with token - kubemqctl get cluster license -t some-token -` -var licenseLong = `License command allows to fetach KubeMQ license file from token` -var licenseShort = `License command allows to fetach KubeMQ license file from token` - -func NewCmdLicense(ctx context.Context, cfg *config.Config) *cobra.Command { - o := &getLicenseOptions{ - licenseToken: "", - } - cmd := &cobra.Command{ - - Use: "license", - Aliases: []string{"lic"}, - Short: licenseShort, - Long: licenseLong, - Example: licenseExamples, - Run: func(cmd *cobra.Command, args []string) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - utils.CheckErr(o.Complete(args), cmd) - utils.CheckErr(o.Validate()) - utils.CheckErr(o.Run(ctx)) - }, - } - cmd.PersistentFlags().StringVarP(&o.licenseToken, "token", "t", "", "set license token") - - return cmd -} - -func (o *getLicenseOptions) Validate() error { - return nil -} -func (o *getLicenseOptions) Complete(args []string) error { - return nil -} -func split(v string, n int) string { - newStr := "" - buf := []byte(v) - for i := 0; i < len(v); i++ { - if i > 0 && i%n == 0 { - newStr = newStr + "\n" + string(buf[i]) - } else { - newStr = newStr + string(buf[i]) - } - } - return newStr -} - -func (o *getLicenseOptions) Run(ctx context.Context) error { - if o.licenseToken != "" { - utils.Printlnf("fetching license for token %s :", o.licenseToken) - data, err := api.GetLicenseDataByToken(o.licenseToken) - if err != nil { - utils.PrintlnfNoTitle(", error: %s ", err.Error()) - } else { - key := fmt.Sprintf("%s\n%s\n%s", prefix, split(data, 64), suffix) - utils.PrintlnfNoTitle(key) - - } - - } else { - return fmt.Errorf("no token was provided") - } - return nil -} diff --git a/cmd/queries/helper.go b/cmd/queries/helper.go index bd35baa..411120a 100644 --- a/cmd/queries/helper.go +++ b/cmd/queries/helper.go @@ -2,26 +2,26 @@ package queries import ( b64 "encoding/base64" + "encoding/json" "fmt" - jsoniter "github.com/json-iterator/go" + kubemq "github.com/kubemq-io/kubemq-go" "strconv" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary - type object struct { Id string `json:"id"` Channel string `json:"channel,omitempty"` ClientId string `json:"client_id,omitempty"` Metadata string `json:"metadata,omitempty"` Tags map[string]string `json:"tags,omitempty"` - Body string `json:"body"` + Timeout string `json:"timeout,omitempty"` + BodyJson json.RawMessage `json:"body_json,omitempty"` + BodyString string `json:"body_string,omitempty"` Executed string `json:"executed,omitempty"` ExecutedAt string `json:"executed_at,omitempty"` Error string `json:"error,omitempty"` CacheHit string `json:"cache_hit,omitempty"` - payload []byte } func newObjectWithQueryReceive(query *kubemq.QueryReceive) *object { @@ -31,20 +31,26 @@ func newObjectWithQueryReceive(query *kubemq.QueryReceive) *object { ClientId: query.ClientId, Metadata: query.Metadata, Tags: query.Tags, - Body: "", + BodyJson: json.RawMessage{}, + BodyString: "", Executed: "", ExecutedAt: "", Error: "", CacheHit: "", - payload: query.Body, + Timeout: "", } - - sDec, err := b64.StdEncoding.DecodeString(string(query.Body)) - if err != nil { - obj.Body = string(query.Body) + var js json.RawMessage + if err := json.Unmarshal(query.Body, &js); err == nil { + obj.BodyJson = js } else { - obj.Body = string(sDec) + sDec, err := b64.StdEncoding.DecodeString(string(query.Body)) + if err != nil { + obj.BodyString = string(query.Body) + } else { + obj.BodyString = string(sDec) + } } + return obj } func newObjectWithQueryResponse(response *kubemq.QueryResponse) *object { @@ -54,28 +60,58 @@ func newObjectWithQueryResponse(response *kubemq.QueryResponse) *object { ClientId: response.ResponseClientId, Metadata: response.Metadata, Tags: response.Tags, - Body: "", + BodyJson: json.RawMessage{}, + BodyString: "", Executed: strconv.FormatBool(response.Executed), ExecutedAt: response.ExecutedAt.Format("2006-01-02 15:04:05.999"), Error: response.Error, CacheHit: strconv.FormatBool(response.CacheHit), - payload: response.Body, } - sDec, err := b64.StdEncoding.DecodeString(string(response.Body)) - if err != nil { - obj.Body = string(response.Body) + var js json.RawMessage + if err := json.Unmarshal(response.Body, &js); err == nil { + obj.BodyJson = js } else { - obj.Body = string(sDec) - } - if !response.Executed { - obj.ExecutedAt = "" + sDec, err := b64.StdEncoding.DecodeString(string(response.Body)) + if err != nil { + obj.BodyString = string(response.Body) + } else { + obj.BodyString = string(sDec) + } } return obj } +func newObjectWithCommand(query *kubemq.Query) *object { + obj := &object{ + Id: query.Id, + Channel: query.Channel, + ClientId: query.ClientId, + Metadata: query.Metadata, + Tags: query.Tags, + Timeout: query.Timeout.String(), + BodyJson: json.RawMessage{}, + BodyString: "", + Executed: "", + ExecutedAt: "", + Error: "", + CacheHit: "", + } + var js json.RawMessage + if err := json.Unmarshal(query.Body, &js); err == nil { + obj.BodyJson = js + } else { + sDec, err := b64.StdEncoding.DecodeString(string(query.Body)) + if err != nil { + obj.BodyString = string(query.Body) + } else { + obj.BodyString = string(sDec) + } + } + return obj +} func (o *object) String() string { - data, _ := json.MarshalIndent(o, "", " ") + data, _ := json.MarshalIndent(o, "", " ") return string(data) } @@ -86,3 +122,6 @@ func printQueryReceive(query *kubemq.QueryReceive) { func printQueryResponse(response *kubemq.QueryResponse) { fmt.Println(newObjectWithQueryResponse(response)) } +func printQuery(query *kubemq.Query) { + fmt.Println(newObjectWithCommand(query)) +} diff --git a/cmd/queries/receive.go b/cmd/queries/receive.go index bab1806..13e143a 100644 --- a/cmd/queries/receive.go +++ b/cmd/queries/receive.go @@ -21,14 +21,14 @@ type QueriesReceiveOptions struct { } var queriesReceiveExamples = ` - # Receive 'queries' from a 'queries' channel (blocks until next message) + # Receive 'queries' from a 'queries' channel (blocks until next body) kubemqctl queries receive some-channel - # Receive 'queries' from a 'queries' channel with group(blocks until next message) + # Receive 'queries' from a 'queries' channel with group(blocks until next body) kubemqctl queries receive some-channel -g G1 ` -var queriesReceiveLong = `Receive (Subscribe) command allows to receive a message from a 'queries' channel and response with appropriate reply` -var queriesReceiveShort = `Receive a message from a 'queries' channel` +var queriesReceiveLong = `Receive (Subscribe) command allows to receive a body from a 'queries' channel and response with appropriate reply` +var queriesReceiveShort = `Receive a body from a 'queries' channel` func NewCmdQueriesReceive(ctx context.Context, cfg *config.Config) *cobra.Command { o := &QueriesReceiveOptions{ @@ -85,7 +85,7 @@ func (o *QueriesReceiveOptions) Run(ctx context.Context) error { utils.Println(fmt.Errorf("receive 'queries' messages, %s", err.Error()).Error()) } for { - utils.Println("waiting for the next query message...") + utils.Println("waiting for the next query body...") select { case err := <-errChan: return fmt.Errorf("server disconnected with error: %s", err.Error()) @@ -122,7 +122,7 @@ func (o *QueriesReceiveOptions) Run(ctx context.Context) error { respBody := "" prompt := &survey.Input{ Renderer: survey.Renderer{}, - Message: "Set response message", + Message: "Set response body", Default: "response-to", Help: "", } diff --git a/cmd/queries/send.go b/cmd/queries/send.go index 48f3f34..e68dd33 100644 --- a/cmd/queries/send.go +++ b/cmd/queries/send.go @@ -9,6 +9,7 @@ import ( "github.com/kubemq-io/kubemqctl/pkg/kubemq" "github.com/kubemq-io/kubemqctl/pkg/utils" "github.com/spf13/cobra" + "io/ioutil" "time" ) @@ -16,11 +17,12 @@ type QueriesSendOptions struct { cfg *config.Config transport string channel string - message string + body string metadata string timeout int cacheKey string cacheTTL time.Duration + fileName string } var queriesSendExamples = ` @@ -28,13 +30,13 @@ var queriesSendExamples = ` kubemqctl queries send some-channel some-query # Send query to a 'queries' channel with metadata - kubemqctl queries send some-channel some-message -m some-metadata + kubemqctl queries send some-channel some-body -m some-metadata # Send query to a 'queries' channel with 120 seconds timeout - kubemqctl queries send some-channel some-message -o 120 + kubemqctl queries send some-channel some-body -o 120 # Send query to a 'queries' channel with cache-key and cache duration of 1m - kubemqctl queries send some-channel some-message -c cache-key -d 1m + kubemqctl queries send some-channel some-body -c cache-key -d 1m ` var queriesSendLong = `Send command allow to send messages to 'queries' channel with an option to set query time-out and caching parameters` var queriesSendShort = `Send messages to a 'queries' channel command` @@ -59,24 +61,38 @@ func NewCmdQueriesSend(ctx context.Context, cfg *config.Config) *cobra.Command { utils.CheckErr(o.Run(ctx)) }, } - cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "m", "", "set query message metadata field") + cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "m", "", "set query body metadata field") cmd.PersistentFlags().StringVarP(&o.cacheKey, "cache-key", "c", "", "set query cache key") cmd.PersistentFlags().IntVarP(&o.timeout, "timeout", "o", 30, "set query timeout") cmd.PersistentFlags().DurationVarP(&o.cacheTTL, "cache-duration", "d", 10*time.Minute, "set cache duration timeout") - + cmd.PersistentFlags().StringVarP(&o.fileName, "file", "f", "", "set load body from file") return cmd } func (o *QueriesSendOptions) Complete(args []string, transport string) error { o.transport = transport - if len(args) >= 2 { + if len(args) >= 1 { o.channel = args[0] - o.message = args[1] - return nil + + } else { + return fmt.Errorf("missing channel argument") } - return fmt.Errorf("missing arguments, must be 2 arguments, channel and a message") -} + if o.fileName != "" { + data, err := ioutil.ReadFile(o.fileName) + if err != nil { + return err + } + o.body = string(data) + } else { + if len(args) >= 2 { + o.body = args[1] + } else { + return fmt.Errorf("missing body argument") + } + } + return nil +} func (o *QueriesSendOptions) Validate() error { return nil } @@ -90,19 +106,19 @@ func (o *QueriesSendOptions) Run(ctx context.Context) error { defer func() { client.Close() }() - + fmt.Println("Sending Query:") msg := client.Q(). SetChannel(o.channel). SetId(uuid.New().String()). - SetBody([]byte(o.message)). + SetBody([]byte(o.body)). SetMetadata(o.metadata). SetTimeout(time.Duration(o.timeout) * time.Second). SetCacheKey(o.cacheKey). SetCacheTTL(o.cacheTTL) - + printQuery(msg) res, err := msg.Send(ctx) if err != nil { - return fmt.Errorf("sending query message, %s", err.Error()) + return fmt.Errorf("sending query body, %s", err.Error()) } printQueryResponse(res) return nil diff --git a/cmd/queue/attach.go b/cmd/queue/attach.go index 0f1e108..9b1c865 100644 --- a/cmd/queue/attach.go +++ b/cmd/queue/attach.go @@ -2,6 +2,7 @@ package queue import ( "context" + "encoding/json" "fmt" "github.com/go-resty/resty" "github.com/kubemq-io/kubemqctl/pkg/attach" diff --git a/cmd/queue/helper.go b/cmd/queue/helper.go index 3b07a16..bf5bedd 100644 --- a/cmd/queue/helper.go +++ b/cmd/queue/helper.go @@ -2,53 +2,80 @@ package queue import ( b64 "encoding/base64" + "encoding/json" "fmt" - jsoniter "github.com/json-iterator/go" + kubemq "github.com/kubemq-io/kubemq-go" "time" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary - type queueMessageObject struct { - Id string `json:"id"` - Channel string `json:"channel"` - ClientId string `json:"client_id"` - Timestamp string `json:"timestamp"` - Sequence uint64 `json:"sequence"` - Metadata string `json:"metadata"` - Tags map[string]string `json:"tags,omitempty"` - Body string `json:"body"` - payload []byte + Id string `json:"id"` + Channel string `json:"channel"` + ClientId string `json:"client_id"` + Timestamp string `json:"timestamp,omitempty"` + Sequence uint64 `json:"sequence,omitempty"` + Metadata string `json:"metadata,omitempty"` + DelayTo string `json:"delay_to,omitempty"` + ExpireAt string `json:"expire_at,omitempty"` + MaxDeadLetterQueueRetires int32 `json:"max_dead_letter_queue_retires,omitempty"` + DeadLetterQueue string `json:"dead_letter_queue,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + BodyJson json.RawMessage `json:"body_json,omitempty"` + BodyString string `json:"body_string,omitempty"` } func newQueueMessageObject(msg *kubemq.QueueMessage) *queueMessageObject { obj := &queueMessageObject{ - Id: msg.MessageID, - Channel: msg.Channel, - ClientId: msg.ClientID, - Timestamp: "", - Sequence: 0, - Metadata: msg.Metadata, - Tags: msg.Tags, - Body: "", - payload: msg.Body, + Id: msg.MessageID, + Channel: msg.Channel, + ClientId: msg.ClientID, + Timestamp: "", + Sequence: 0, + Metadata: msg.Metadata, + DelayTo: "", + ExpireAt: "", + Tags: msg.Tags, + BodyJson: json.RawMessage{}, + BodyString: "", + } + if msg.Policy != nil { + if msg.Policy.DelaySeconds > 0 { + obj.DelayTo = (time.Duration(msg.Policy.DelaySeconds) * time.Second).String() + } + if msg.Policy.ExpirationSeconds > 0 { + obj.ExpireAt = (time.Duration(msg.Policy.ExpirationSeconds) * time.Second).String() + } + obj.MaxDeadLetterQueueRetires = msg.Policy.MaxReceiveCount + obj.DeadLetterQueue = msg.Policy.MaxReceiveQueue } if msg.Attributes != nil { obj.Timestamp = time.Unix(0, msg.Attributes.Timestamp).Format("2006-01-02 15:04:05.999") obj.Sequence = msg.Attributes.Sequence + if msg.Attributes.DelayedTo > 0 { + obj.DelayTo = time.Unix(0, msg.Attributes.DelayedTo).Format("2006-01-02 15:04:05.999") + } + if msg.Attributes.ExpirationAt > 0 { + obj.ExpireAt = time.Unix(0, msg.Attributes.ExpirationAt).Format("2006-01-02 15:04:05.999") + } } - sDec, err := b64.StdEncoding.DecodeString(string(msg.Body)) - if err != nil { - obj.Body = string(msg.Body) + var js json.RawMessage + if err := json.Unmarshal(msg.Body, &js); err == nil { + obj.BodyJson = js } else { - obj.Body = string(sDec) + sDec, err := b64.StdEncoding.DecodeString(string(msg.Body)) + if err != nil { + obj.BodyString = string(msg.Body) + } else { + obj.BodyString = string(sDec) + } } + return obj } func (o *queueMessageObject) String() string { - data, _ := json.MarshalIndent(o, "", " ") + data, _ := json.MarshalIndent(o, "", " ") return string(data) } @@ -57,3 +84,6 @@ func printItems(items []*kubemq.QueueMessage) { fmt.Println(newQueueMessageObject(item)) } } +func printQueueMessage(msg *kubemq.QueueMessage) { + fmt.Println(newQueueMessageObject(msg)) +} diff --git a/cmd/queue/list.go b/cmd/queue/list.go index f5fccf8..c885789 100644 --- a/cmd/queue/list.go +++ b/cmd/queue/list.go @@ -2,7 +2,7 @@ package queue import ( "context" - jsonEnc "encoding/json" + "encoding/json" "fmt" "github.com/go-resty/resty" "github.com/kubemq-io/kubemqctl/pkg/config" @@ -88,10 +88,10 @@ func (o *QueueListOptions) Run(ctx context.Context) error { } type Response struct { - Node string `json:"node"` - Error bool `json:"error"` - ErrorString string `json:"error_string"` - Data jsonEnc.RawMessage `json:"data"` + Node string `json:"node"` + Error bool `json:"error"` + ErrorString string `json:"error_string"` + Data json.RawMessage `json:"data"` } type Queues struct { diff --git a/cmd/queue/send.go b/cmd/queue/send.go index 6854dcf..cf9024f 100644 --- a/cmd/queue/send.go +++ b/cmd/queue/send.go @@ -8,7 +8,7 @@ import ( "github.com/kubemq-io/kubemqctl/pkg/kubemq" "github.com/kubemq-io/kubemqctl/pkg/utils" "github.com/spf13/cobra" - "time" + "io/ioutil" ) type QueueSendOptions struct { @@ -22,6 +22,7 @@ type QueueSendOptions struct { metadata string deadLetter string messages int + fileName string } var queueSendExamples = ` @@ -72,18 +73,35 @@ func NewCmdQueueSend(ctx context.Context, cfg *config.Config) *cobra.Command { cmd.PersistentFlags().IntVarP(&o.messages, "messages", "m", 1, "set dead-letter max receive count") cmd.PersistentFlags().StringVarP(&o.deadLetter, "dead-letter-queue", "q", "", "set dead-letter queue name") cmd.PersistentFlags().StringVarP(&o.metadata, "metadata", "", "", "set queue message metadata field") + cmd.PersistentFlags().StringVarP(&o.fileName, "file", "f", "", "set load message body from file") return cmd } func (o *QueueSendOptions) Complete(args []string, transport string) error { o.transport = transport - if len(args) >= 2 { + if len(args) >= 1 { o.channel = args[0] - o.body = args[1] - return nil + + } else { + return fmt.Errorf("missing channel argument") + } + + if o.fileName != "" { + data, err := ioutil.ReadFile(o.fileName) + if err != nil { + return err + } + o.body = string(data) + } else { + if len(args) >= 2 { + o.body = args[1] + } else { + return fmt.Errorf("missing body argument") + } } - return fmt.Errorf("missing arguments, must be 2 arguments, channel and a message") + return nil + } func (o *QueueSendOptions) Validate() error { @@ -117,17 +135,9 @@ func (o *QueueSendOptions) Run(ctx context.Context) error { if res != nil { if res.IsError { return fmt.Errorf("sending queue message response, %s", res.Error) - - } - var delay string - var exp string - if res.DelayedTo > 0 { - delay = fmt.Sprintf(", delayed to: %s", time.Unix(0, res.DelayedTo)) - } - if res.ExpirationAt > 0 { - exp = fmt.Sprintf(", expire at: %s", time.Unix(0, res.ExpirationAt)) } - utils.Printlnf("[channel: %s] [client id: %s] -> {id: %s, metadata: %s, body: %s, sent at: %s%s%s}", msg.Channel, msg.ClientID, res.MessageID, msg.Metadata, msg.Body, time.Unix(0, res.SentAt).Format("2006-01-02 15:04:05.999"), exp, delay) + fmt.Println("Sent:") + printQueueMessage(msg) } } diff --git a/pkg/api/bolt.go b/pkg/api/bolt.go deleted file mode 100644 index 1466b9a..0000000 --- a/pkg/api/bolt.go +++ /dev/null @@ -1,32 +0,0 @@ -package api - -import ( - "errors" - "github.com/go-resty/resty/v2" -) - -const boltURL = "https://bolt.kubemq.io" - -type response struct { - Error bool `json:"error"` - ErrorString string `json:"error_string"` - Data string `json:"data"` -} - -func GetLicenseDataByToken(key string) (string, error) { - req := resty.New().R() - lic := &response{} - r, err := req.SetResult(lic).SetError(lic).SetQueryParam("key", key).Get(boltURL + "/getlicensebykey") - if err != nil { - return "", err - } - - if !r.IsSuccess() || r.IsError() { - return "", err - } - if lic.Error { - return "", errors.New(lic.ErrorString) - } - - return lic.Data, nil -} diff --git a/request.json b/request.json new file mode 100644 index 0000000..a1f7c88 --- /dev/null +++ b/request.json @@ -0,0 +1,8 @@ +{ + "metadata": { + "method": "create_bucket", + "bucket_name": "my_bucket_name", + "item_name": "my_item_name" + }, + "data": "bXkgaXRlbSBoZXJl" +}