Skip to content

Commit

Permalink
Add support for batch reads and kernel buffer sizing
Browse files Browse the repository at this point in the history
Enables a single receiver to receive data from multiple pucks on the
same port. Sizing the kernel buffer should hopefully mitigate packet
loss.
  • Loading branch information
odsod committed Sep 10, 2019
1 parent 356b599 commit 58ab453
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 165 deletions.
20 changes: 3 additions & 17 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,7 @@ version: 2
jobs:
build:
docker:
- image: circleci/golang:1.12-stretch-node
working_directory: /go/src/github.com/einride/vlp-16-go
- image: circleci/golang:1.12-stretch-node
steps:
- checkout
- run: make build
- restore_cache:
keys:
- build-cache-{{ checksum ".git/modules/build/HEAD" }}
- restore_cache:
keys:
- mod-cache-{{ checksum "go.sum" }}
- run: make
- save_cache:
key: mod-cache-{{ checksum "go.sum" }}
paths: ["/go/pkg/mod"]
- save_cache:
key: build-cache-{{ checksum ".git/modules/build/HEAD" }}
paths: ["build"]
- checkout
- run: make
13 changes: 1 addition & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@
all: \
circleci-config-validate \
go-generate \
go-mocks \
go-lint \
go-review \
go-test \
go-lint \
go-mod-tidy \
git-verify-submodules \
git-verify-nodiff

export GO111MODULE := on

# clean: remove generated build files
.PHONY: clean
clean:
Expand Down Expand Up @@ -63,11 +60,3 @@ returnmode_string.go: returnmode.go $(GOBIN)
productid_string.go: productid.go $(GOBIN)
$(GOBIN) -m -run golang.org/x/tools/cmd/stringer \
-type ProductID -trimprefix ProductID -output $@ $<

# go-mocks: generate Go mocks
.PHONY: go-mocks
go-mocks: test/mocks/vlp16/mocks.go

test/mocks/vlp16/mocks.go: client.go $(GOBIN)
$(GOBIN) -m -run github.com/golang/mock/mockgen -destination $@ -package mockvlp16 \
github.com/einride/vlp-16-go UDPConn
77 changes: 0 additions & 77 deletions client.go

This file was deleted.

45 changes: 0 additions & 45 deletions client_test.go

This file was deleted.

14 changes: 8 additions & 6 deletions cmd/vlp-16-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/hex"
"fmt"
"net"
"os"
"strconv"

Expand All @@ -21,17 +20,20 @@ func main() {
fmt.Println(err.Error())
os.Exit(1)
}
udpConn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: port})
client, err := vlp16.ListenUDP(
context.Background(),
fmt.Sprintf("0.0.0.0:%d", port),
vlp16.WithBatchSize(10),
vlp16.WithBufferSize(2097152),
)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
panic(err)
}
client := vlp16.NewClient(udpConn)
for {
if err := client.Receive(context.Background()); err != nil {
panic(err)
}
fmt.Println(client.SenderAddr())
fmt.Println(client.SourceIP())
fmt.Println(hex.EncodeToString(client.RawPacket()))
fmt.Println()
}
Expand Down
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func examplePacket() *Packet {
}
}

func exampleSphericalPointCloud() *PointCloud {
func examplePointCloud() *PointCloud {
return &PointCloud{
Azimuths: [ColumnsPerPacket]unit.Angle{
103.42 * unit.Degree,
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ go 1.12

require (
github.com/einride/unit v1.6.0
github.com/golang/mock v1.3.1
github.com/stretchr/testify v1.3.0
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/tools v0.0.0-20190729092621-ff9f1409240a
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522
)
9 changes: 5 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/einride/unit v1.6.0 h1:V5vGFpNfGkFDPPhOduQhI4JSzePdDeHNI6NXyE7EFNo=
github.com/einride/unit v1.6.0/go.mod h1:VkeEoz/WVqQktnRS0c+Fn82+cgzbXaCxwrVAuku9RmI=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190729092621-ff9f1409240a h1:mEQZbbaBjWyLNy0tmZmgEuQAR8XOQ3hL8GYi3J/NG64=
golang.org/x/tools v0.0.0-20190729092621-ff9f1409240a/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522 h1:bhOzK9QyoD0ogCnFro1m2mz41+Ib0oOhfJnBp5MR4K4=
Expand Down
2 changes: 1 addition & 1 deletion pointcloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
func TestSphericalPointCloud_UnmarshalExamplePacket(t *testing.T) {
actual := &PointCloud{}
actual.UnmarshalPacket(examplePacket())
requirePointCloudEqual(t, exampleSphericalPointCloud(), actual)
requirePointCloudEqual(t, examplePointCloud(), actual)
}

func requirePointCloudEqual(t *testing.T, p *PointCloud, pc *PointCloud) {
Expand Down
96 changes: 96 additions & 0 deletions receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package vlp16

import (
"context"
"net"

"golang.org/x/net/ipv4"
"golang.org/x/xerrors"
)

// ListenUDP listens for VLP-16 UDP packets on the specified address.
func ListenUDP(ctx context.Context, addr string, receiverOpts ...ReceiverOption) (*Receiver, error) {
opts := defaultReceiverOptions()
for _, receiverOpt := range receiverOpts {
receiverOpt(opts)
}
var listenConfig net.ListenConfig
packetConn, err := listenConfig.ListenPacket(ctx, "udp4", addr)
if err != nil {
return nil, xerrors.Errorf("VLP-16: listen UDP: %w", err)
}
udpConn := packetConn.(*net.UDPConn)
if err := udpConn.SetReadBuffer(opts.bufferSizeBytes); err != nil {
return nil, xerrors.Errorf("VLP-16: listen UDP: %w", err)
}
conn := ipv4.NewPacketConn(udpConn)
c := &Receiver{conn: conn}
// allocate memory for batch reads
c.messages = make([]ipv4.Message, 0, opts.batchSize)
for i := 0; i < opts.batchSize; i++ {
c.packetBuf = append(c.packetBuf, &[lengthOfPacket]byte{})
c.messages = append(c.messages, ipv4.Message{
Buffers: [][]byte{c.packetBuf[i][:]},
})
}
return c, nil
}

// Receiver receives VLP-16 packets.
type Receiver struct {
conn *ipv4.PacketConn
messages []ipv4.Message
packetBuf []*[lengthOfPacket]byte
messageBufSize int
currMessageIndex int
currPacket Packet
currPointCloud PointCloud
}

// Receive a VLP-16 packet.
func (c *Receiver) Receive(ctx context.Context) error {
c.currMessageIndex++
if c.currMessageIndex >= c.messageBufSize {
c.currMessageIndex = 0
deadline, _ := ctx.Deadline()
if err := c.conn.SetReadDeadline(deadline); err != nil {
return xerrors.Errorf("VLP-16 receiver: %w", err)
}
n, err := c.conn.ReadBatch(c.messages, 0)
if err != nil {
return xerrors.Errorf("VLP-16 receiver: %w", err)
}
c.messageBufSize = n
}
c.currPacket.unmarshal(c.packetBuf[c.currMessageIndex])
c.currPointCloud.UnmarshalPacket(&c.currPacket)
return nil
}

// SourceIP returns the source IP of the last received VLP-16 packet.
func (c *Receiver) SourceIP() net.IP {
return c.messages[c.currMessageIndex].Addr.(*net.UDPAddr).IP
}

// RawPacket returns the raw bytes of the last received VLP-16 packet.
func (c *Receiver) RawPacket() []byte {
return c.packetBuf[c.currMessageIndex][:c.messages[c.currMessageIndex].N]
}

// Packet returns the last received VLP-16 packet.
func (c *Receiver) Packet() *Packet {
return &c.currPacket
}

// PointCloud returns the point cloud representation of the last received packet.
func (c *Receiver) PointCloud() *PointCloud {
return &c.currPointCloud
}

// Close the client's underlying UDP connection.
func (c *Receiver) Close() error {
if err := c.conn.Close(); err != nil {
return xerrors.Errorf("VLP-16 receiver: close: %w", err)
}
return nil
}
Loading

0 comments on commit 58ab453

Please sign in to comment.