Skip to content

Commit

Permalink
add support for event-based handling of TX audio
Browse files Browse the repository at this point in the history
  • Loading branch information
ftl committed Feb 6, 2021
1 parent 3be01f2 commit 68210ce
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 45 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This is a client library for Expert Electronic's [TCI protocol](https://github.com/maksimus1210/TCI) written in Go. It comes with a simple CLI client application that allows to send single commands to the TCI server and to monitor incoming TCI messages.

Currently, there is no support of IQ or audio data.
Currently, there is no support for IQ data.

## Some Details About TCI

Expand Down
98 changes: 66 additions & 32 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,22 @@ func (f ConnectionListenerFunc) Connected(connected bool) {
// Client represents a TCI client.
type Client struct {
DeviceInfo
notifier
*notifier
*streamer
host *net.TCPAddr
closed chan struct{}
ready chan struct{}
disconnectChan chan struct{}
writeChan chan command
commands chan command
txAudio chan []byte
timeout time.Duration
}

const (
commandQueueSize = 1
txAudioQueueSize = 25
)

type command struct {
Message
reply chan reply
Expand All @@ -74,9 +80,9 @@ func newClient(host *net.TCPAddr, listeners []interface{}) *Client {
ready: make(chan struct{}),
timeout: DefaultTimeout,
}
result.notifier.listeners = listeners
result.notifier = newNotifier(listeners, result.closed)
result.Notify(result)
result.streamer = newStreamer(&result.notifier, result)
result.streamer = newStreamer(result.notifier, result)
result.WhenDisconnected(result.streamer.Close)
return result
}
Expand Down Expand Up @@ -148,7 +154,8 @@ func (c *Client) connect() error {
}
c.ready = make(chan struct{})
c.disconnectChan = make(chan struct{})
c.writeChan = make(chan command, 1)
c.commands = make(chan command, commandQueueSize)
c.txAudio = make(chan []byte, txAudioQueueSize)
remoteAddr := conn.RemoteAddr()

incoming := make(chan Message, 1)
Expand Down Expand Up @@ -195,15 +202,15 @@ func (c *Client) readLoop(conn clientConn, incoming chan<- Message) {
log.Printf("cannot parse incoming message: %v", err)
continue
}
c.handleIncomingMessage(message)
c.notifier.textMessage(message)
incoming <- message
case websocket.BinaryMessage:
message, err := ParseBinaryMessage(msg)
if err != nil {
log.Printf("cannot parse incoming message: %v", err)
continue
}
c.handleIncomingBinaryMessage(message)
c.notifier.binaryMessage(message)
default:
log.Printf("unknown message type: %d %v", msgType, msg)
}
Expand All @@ -216,40 +223,53 @@ func (c *Client) writeLoop(conn clientConn, incoming <-chan Message) {

var currentCommand *command
var currentDeadline time.Time
timer := time.NewTimer(c.timeout)
defer timer.Stop()

for {
now := time.Now()
select {
case <-c.disconnectChan:
return
case msg := <-incoming:
if currentCommand == nil {
if currentCommand == nil {
select {
case msg := <-c.txAudio:
err := conn.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Printf("error writing tx audio: %v", err)
continue
}
case cmd := <-c.commands:
if cmd.reply != nil {
currentCommand = &cmd
currentDeadline = now.Add(c.timeout)
}
err := conn.WriteMessage(websocket.TextMessage, []byte(cmd.String()))
if err != nil {
log.Printf("error writing command %q: %v", cmd, err)
continue
}
case <-incoming:
continue
}
if msg.IsReplyTo(currentCommand.Message) {
currentCommand.reply <- reply{Message: msg}
currentCommand = nil
}
default:
if currentCommand == nil {
select {
case cmd := <-c.writeChan:
if cmd.reply != nil {
currentCommand = &cmd
currentDeadline = now.Add(c.timeout)
}
err := conn.WriteMessage(websocket.TextMessage, []byte(cmd.String()))
if err != nil {
log.Printf("error writing command %q: %v", cmd, err)
continue
}
case <-incoming:
} else {
timer.Reset(currentDeadline.Sub(now))
select {
case <-c.disconnectChan:
return
case msg := <-c.txAudio:
err := conn.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Printf("error writing tx audio: %v", err)
continue
}
} else if now.After(currentDeadline) {
case msg := <-incoming:
if msg.IsReplyTo(currentCommand.Message) {
currentCommand.reply <- reply{Message: msg}
currentCommand = nil
}
case <-timer.C:
currentCommand.reply <- reply{err: ErrTimeout}
currentCommand = nil
}
timer.Stop()
}
}
}
Expand Down Expand Up @@ -309,7 +329,7 @@ func (c *Client) command(cmd string, args ...interface{}) (Message, error) {
return Message{}, ErrNotConnected
}
replyChan := make(chan reply, 1)
c.writeChan <- command{
c.commands <- command{
Message: NewMessage(cmd, args...),
reply: replyChan,
}
Expand All @@ -321,6 +341,20 @@ func (c *Client) command(cmd string, args ...interface{}) (Message, error) {
return reply.Message, reply.err
}

// SendTXAudio sends the given samples as reply to a TXChrono message.
func (c *Client) SendTXAudio(trx int, sampleRate AudioSampleRate, samples []float32) error {
msg, err := NewTXAudioMessage(trx, sampleRate, samples)
if err != nil {
return err
}
select {
case c.txAudio <- msg:
return nil
default:
return fmt.Errorf("tx audio queue blocked, samples dropped")
}
}

// SetTimeout sets the duration to wait for the reply to a command.
func (c *Client) SetTimeout(timeout time.Duration) {
c.timeout = timeout
Expand Down
26 changes: 26 additions & 0 deletions client/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,32 @@ func (m Message) ToFloat(i int) (float64, error) {
return strconv.ParseFloat(arg, 64)
}

// NewTXAudioMessage returns a binary message of type TXAudioStream that contains the given samples.
func NewTXAudioMessage(trx int, sampleRate AudioSampleRate, samples []float32) ([]byte, error) {
msg := &encodedBinaryMessage{
TRX: uint32(trx),
SampleRate: uint32(sampleRate),
Format: 4,
Codec: 0,
CRC: 0,
DataLength: uint32(len(samples)),
Type: uint32(TXAudioStreamMessage),
}

buf := bytes.NewBuffer(make([]byte, 0, 64+len(samples)*4))
err := binary.Write(buf, binary.LittleEndian, msg)
if err != nil {
return nil, fmt.Errorf("cannot write tx audio message header: %w", err)
}
err = binary.Write(buf, binary.LittleEndian, &samples)
if err != nil {
return nil, fmt.Errorf("cannot write tx audio message data: %w", err)
}

return buf.Bytes(), nil
}

// ParseBinaryMessage parses the given byte slice as incoming binary message.
func ParseBinaryMessage(b []byte) (BinaryMessage, error) {
buf := bytes.NewReader(b)
var msg encodedBinaryMessage
Expand Down
37 changes: 36 additions & 1 deletion client/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,45 @@ package client

import "log"

func newNotifier(listeners []interface{}, closed <-chan struct{}) *notifier {
result := &notifier{
listeners: listeners,
closed: closed,
textMessages: make(chan Message, 1),
binaryMessages: make(chan BinaryMessage, 1),
}
go result.notifyLoop()
return result
}

type notifier struct {
listeners []interface{}
listeners []interface{}
closed <-chan struct{}
textMessages chan Message
binaryMessages chan BinaryMessage
}

func (n *notifier) notifyLoop() {
for {
select {
case <-n.closed:
return
case msg := <-n.textMessages:
n.handleIncomingMessage(msg)
case msg := <-n.binaryMessages:
n.handleIncomingBinaryMessage(msg)
}
}
}

func (n *notifier) Notify(listener interface{}) {
n.listeners = append(n.listeners, listener)
}

func (n *notifier) textMessage(msg Message) {
n.textMessages <- msg
}

func (n *notifier) handleIncomingMessage(msg Message) {
n.emitMessage(msg)
var err error
Expand Down Expand Up @@ -1219,6 +1250,10 @@ func (n *notifier) emitRXBalance(msg Message) error {
return nil
}

func (n *notifier) binaryMessage(msg BinaryMessage) {
n.binaryMessages <- msg
}

func (n *notifier) handleIncomingBinaryMessage(msg BinaryMessage) {
n.emitBinaryMessage(msg)
switch msg.Type {
Expand Down
18 changes: 9 additions & 9 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
)

var rootFlags = struct {
hostAddress *string
trx *int
reconnect *bool
hostAddress string
trx int
reconnect bool
}{}

var rootCmd = &cobra.Command{
Expand All @@ -37,14 +37,14 @@ func Execute() {
}

func init() {
rootFlags.hostAddress = rootCmd.PersistentFlags().StringP("host", "", "localhost:40001", "connect to this TCI host")
rootFlags.trx = rootCmd.PersistentFlags().IntP("trx", "t", 0, "use this TRX")
rootFlags.reconnect = rootCmd.PersistentFlags().BoolP("reconnect", "r", false, "try to reconnect if the TCI connection failed")
rootCmd.PersistentFlags().StringVar(&rootFlags.hostAddress, "host", "localhost:40001", "connect to this TCI host")
rootCmd.PersistentFlags().IntVar(&rootFlags.trx, "trx", 0, "use this TRX")
rootCmd.PersistentFlags().BoolVar(&rootFlags.reconnect, "reconnect", false, "try to reconnect if the TCI connection failed")
}

func runWithClient(f func(context.Context, *client.Client, *cobra.Command, []string)) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
host, err := parseHostArg(*rootFlags.hostAddress)
host, err := parseHostArg(rootFlags.hostAddress)
if err != nil {
log.Fatalf("invalid host address: %v", err)
}
Expand All @@ -59,7 +59,7 @@ func runWithClient(f func(context.Context, *client.Client, *cobra.Command, []str
go handleCancelation(signals, cancel)

var c *client.Client
if *rootFlags.reconnect {
if rootFlags.reconnect {
c = client.KeepOpen(host, 30*time.Second)
} else {
c, err = client.Open(host)
Expand All @@ -68,7 +68,7 @@ func runWithClient(f func(context.Context, *client.Client, *cobra.Command, []str
log.Fatalf("cannot conntect to %s: %v", host.String(), err)
}
defer c.Disconnect()
if !*rootFlags.reconnect {
if !rootFlags.reconnect {
c.WhenDisconnected(cancel)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ func send(ctx context.Context, c *client.Client, _ *cobra.Command, args []string
if len(args) < 1 {
log.Fatal("no text to send, use tci send <text>")
}
c.SendCWMacro(*rootFlags.trx, strings.Join(args, " "))
c.SendCWMacro(rootFlags.trx, strings.Join(args, " "))
}
Loading

0 comments on commit 68210ce

Please sign in to comment.