Skip to content
This repository has been archived by the owner on Feb 22, 2019. It is now read-only.

Commit

Permalink
better input buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Jan 26, 2015
1 parent ff69b00 commit 226201d
Showing 1 changed file with 33 additions and 20 deletions.
53 changes: 33 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,7 @@ func parseMessage(data []byte) []*Packet {
return output
}

func udpListener(destinations []Destination) {
addr, _ := net.ResolveUDPAddr("udp", *address)
log.Printf("listening on %s", addr)
listener, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}
defer listener.Close()

func processData(dataCh chan []byte, destinations []Destination) {
var destConns []net.Conn
for _, destination := range destinations {
conn, err := net.DialTimeout("udp", destination.Address, time.Second)
Expand All @@ -90,16 +82,8 @@ func udpListener(destinations []Destination) {
destConns = append(destConns, conn)
}

message := make([]byte, 512)
for {
n, remaddr, err := listener.ReadFromUDP(message)
if err != nil {
log.Printf("ERROR: reading UDP packet from %+v - %s", remaddr, err)
continue
}

log.Printf("message: %s (%d)", message[:n], n)
for _, p := range parseMessage(message[:n]) {
for data := range dataCh {
for _, p := range parseMessage(data) {
for i, destination := range destinations {
key := destination.Regex.ReplaceAll(p.Key, destination.Replace)
packet := fmt.Sprintf("%s:%s", key, p.Body)
Expand All @@ -121,6 +105,33 @@ func udpListener(destinations []Destination) {
}
}

func udpListener(dataCh chan []byte) {
addr, _ := net.ResolveUDPAddr("udp", *address)
log.Printf("listening on %s", addr)
listener, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}
defer listener.Close()

err = listener.SetReadBuffer(1024 * 1024)
if err != nil {
log.Printf("ERROR: SetReadBuffer - %s", err)
}

for {
message := make([]byte, 512)
n, remaddr, err := listener.ReadFromUDP(message)
if err != nil {
log.Printf("ERROR: reading UDP packet from %+v - %s", remaddr, err)
continue
}

log.Printf("msg: %s (%d)", message[:n], n)
dataCh <- message[:n]
}
}

func main() {
flag.Parse()

Expand All @@ -146,5 +157,7 @@ func main() {
signalchan := make(chan os.Signal, 1)
signal.Notify(signalchan, syscall.SIGTERM)

udpListener(destinations)
dataCh := make(chan []byte, 1000)
go udpListener(dataCh)
processData(dataCh, destinations)
}

0 comments on commit 226201d

Please sign in to comment.