Skip to content

Commit

Permalink
allow passing max buffersize, improve h.264 parsing
Browse files Browse the repository at this point in the history
(initpacket doesn't work yet though)
  • Loading branch information
iSchluff committed Oct 31, 2020
1 parent c760157 commit 57e10c4
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 99 deletions.
14 changes: 3 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ func main() {
options := make(map[string]string)
options["blocking"] = "0"
options["transtype"] = "live"
options["latency"] = "200"
options["latency"] = "300"

address := "0.0.0.0"
port := uint16(8090)
buffersize := uint(384000) // 1s @ 3Mbits/

sck := srtgo.NewSrtSocket(address, port, options)
err := sck.Listen(1)
Expand All @@ -24,16 +25,7 @@ func main() {
}
log.Printf("Listening on %s:%d\n", address, port)

server := server.NewServer()

// Handle SIGTERM signal
// ch := make(chan os.Signal, 1)
// signal.Notify(ch, syscall.SIGTERM)
// go func() {
// <-ch
// cancel()
// }()

server := server.NewServer(buffersize)
for {
sock, err := sck.Accept()
if err != nil {
Expand Down
77 changes: 15 additions & 62 deletions mpegts/h264.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,58 +67,22 @@ func (h *H264Parser) InitPacket() ([]byte, error) {
return nil, err
}

// pad with adaptationField
adaptationLen := MaxPayloadSize - len(pesPayload)
adaptationField := make([]byte, adaptationLen)
adaptationField[0] = 0x3 << 6
for i := 0; i < adaptationLen-1; i++ {
adaptationField[i+1] = 0xff
}

// create MPEG-TS Packet
pkt := Packet{
PID: h.pid,
PUSI: true,
Payload: pesPayload,
PID: h.pid,
PUSI: true,
Payload: pesPayload,
AdaptationField: adaptationField,
}

// //sps pps
// int len = ti->sps_len + ti->pps_len;
// if (len > TS_PACK_LEN-4) {
// printf("pid=%d, pes size=%d is abnormal!!!!\n", pid, len);
// return ret;
// }
// pos ++;
// //pid
// ti->es_pid = pid;
// tmp = ti->es_pid >> 8;
// p[pos++] = 0x40 | tmp;
// tmp = ti->es_pid;
// p[pos++] = tmp;
// p[pos] = 0x10;
// int ad_len = TS_PACK_LEN - 4 - len - 1;
// if (ad_len > 0) {
// p[pos++] = 0x30;
// p[pos++] = ad_len;//adaptation length
// p[pos++] = 0x00;//
// memset(p + pos, 0xFF, ad_len-1);
// pos += ad_len - 1;
// }else{
// pos ++;
// }

// //pes
// p[pos++] = 0;
// p[pos++] = 0;
// p[pos++] = 1;
// p[pos++] = stream_id;
// p[pos++] = 0;//total size
// p[pos++] = 0;//total size
// p[pos++] = 0x80;//flag
// p[pos++] = 0x80;//flag
// p[pos++] = 5;//header_len
// p[pos++] = 0;//pts
// p[pos++] = 0;
// p[pos++] = 0;
// p[pos++] = 0;
// p[pos++] = 0;
// memcpy(p+pos, ti->sps, ti->sps_len);
// pos += ti->sps_len;
// memcpy(p+pos, ti->pps, ti->pps_len);
// pos += ti->pps_len;

data := make([]byte, PacketLen)
err = pkt.ToBytes(data)
if err != nil {
Expand All @@ -127,14 +91,6 @@ func (h *H264Parser) InitPacket() ([]byte, error) {
return data, nil
}

// func readByte(rd io.Reader) (byte, error) {
// b := make([]byte, 1)
// _, err := rd.Read(b)
// return b[0], err
// }

// var counter = 0

// Parse reads H.264 PPS and SPS from PES payloads
func (h *H264Parser) Parse(rd *io.PipeReader) {
// get semaphor
Expand Down Expand Up @@ -185,7 +141,6 @@ func (h *H264Parser) Parse(rd *io.PipeReader) {
case NALUnitTypeSPS:
h.sps = nalBuffer
nalBuffer = nil
log.Println("got SPS len", len(h.sps))
if h.sps != nil && h.pps != nil {
close(h.done)
rd.Close()
Expand All @@ -195,7 +150,6 @@ func (h *H264Parser) Parse(rd *io.PipeReader) {
case NALUnitTypePPS:
h.pps = nalBuffer
nalBuffer = nil
log.Println("got PPS len", len(h.pps))
if h.sps != nil && h.pps != nil {
close(h.done)
rd.Close()
Expand All @@ -207,19 +161,20 @@ func (h *H264Parser) Parse(rd *io.PipeReader) {
if nalType == NALUnitTypeSPS || nalType == NALUnitTypePPS {
// log.Println("got SPS/PPS")
nalBuffer = make([]byte, 0, 200)
nalBuffer = append(nalBuffer, 0x0)

startSlice := nalBuffer[:NALHeaderSize]
if skippedZero {
nalBuffer = append(nalBuffer, 0x0)
startSlice = nalBuffer[1 : NALHeaderSize+1]
}

// ignore error, because the bytes should be buffered through peek
startSlice := make([]byte, NALHeaderSize)
n, _ := brd.Read(startSlice)
if n < NALHeaderSize {
log.Fatal("Short read, should be buffered")
return
}
nalBuffer = append(nalBuffer, startSlice...)
}
previousNalType = nalType
}
Expand Down Expand Up @@ -259,10 +214,8 @@ func (h *H264Parser) Parse(rd *io.PipeReader) {
switch previousNalType {
case NALUnitTypeSPS:
h.sps = nalBuffer
log.Println("got SPS len on end", len(h.sps))
case NALUnitTypePPS:
h.pps = nalBuffer
log.Println("got PPS len on end", len(h.pps))
}

if h.sps != nil && h.pps != nil {
Expand Down
2 changes: 0 additions & 2 deletions mpegts/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ func (pkt *Packet) ToBytes(data []byte) error {
copy(data[offset:offset+payloadLength], pkt.Payload)
offset += payloadLength

// fill remainder?

return nil
}

Expand Down
4 changes: 0 additions & 4 deletions mpegts/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func (p *Parser) ParsePSI(data []byte) (bool, error) {
shouldStore = true
p.expectedPATSection = hdr.sectionNumber + 1
if hdr.sectionNumber == hdr.lastSectionNumber {
log.Println("got PAT")
p.hasPAT = true
}

Expand Down Expand Up @@ -202,9 +201,7 @@ func (p *Parser) ParsePSI(data []byte) (bool, error) {
p.tspMap[elementaryPID] = &ElementaryStream{
parser: NewH264Parser(elementaryPID),
}
log.Println("got H.264 stream, pid", elementaryPID)
}
//log.Printf("got pid: %d, streamType: 0x%x\n", elementaryPID, streamType)

if offset >= end {
break
Expand All @@ -214,7 +211,6 @@ func (p *Parser) ParsePSI(data []byte) (bool, error) {
shouldStore = true
p.expectedPMTSection = hdr.sectionNumber + 1
if hdr.sectionNumber == hdr.lastSectionNumber {
log.Println("got PMT")
p.hasPMT = true
}
}
Expand Down
3 changes: 3 additions & 0 deletions mpegts/pes.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func encodeVideoPES(data []byte) ([]byte, error) {
// write pes length (can be 0 only for Video packets)
binary.BigEndian.PutUint16(pes[offset:offset+2], 0)

// magic?! flag + pts
// pes = append(pes, 0x80, 0x80, 5, 0, 0, 0, 0, 0)

pes = append(pes, data...)

return pes, nil
Expand Down
15 changes: 10 additions & 5 deletions relay/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
type UnsubscribeFunc func()

type Channel struct {
mutex sync.Mutex
subs Subs
mutex sync.Mutex
subs Subs
buffersize uint
}
type Subs []chan []byte

Expand Down Expand Up @@ -37,15 +38,19 @@ func (subs Subs) Remove(sub chan []byte) Subs {
return subs[:len(subs)-1] // Truncate slice.
}

func NewChannel() *Channel {
return &Channel{subs: make([]chan []byte, 0, 10)}
func NewChannel(buffersize uint) *Channel {
return &Channel{
subs: make([]chan []byte, 0, 10),
buffersize: buffersize,
}
}

// Sub subscribes to a channel
func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) {
ch.mutex.Lock()
defer ch.mutex.Unlock()
sub := make(chan []byte, 80) // about 300ms at 3Mbit/s
channelbuffer := ch.buffersize / 1316
sub := make(chan []byte, channelbuffer)
ch.subs = append(ch.subs, sub)

var unsub UnsubscribeFunc = func() {
Expand Down
6 changes: 3 additions & 3 deletions relay/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestChannel_PubSub(t *testing.T) {
ch := NewChannel()
ch := NewChannel(uint(1316 * 50))

// sub
out, unsub := ch.Sub()
Expand All @@ -33,7 +33,7 @@ func TestChannel_PubSub(t *testing.T) {
}

func TestChannel_DropOnOverflow(t *testing.T) {
ch := NewChannel()
ch := NewChannel(uint(1316 * 50))

sub, _ := ch.Sub()
capacity := cap(sub) + 1
Expand All @@ -60,7 +60,7 @@ func TestChannel_DropOnOverflow(t *testing.T) {
}

func TestChannel_Close(t *testing.T) {
ch := NewChannel()
ch := NewChannel(0)
sub1, _ := ch.Sub()
_, _ = ch.Sub()

Expand Down
12 changes: 7 additions & 5 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ type Relay interface {

// RelayImpl represents a multi-channel stream relay
type RelayImpl struct {
mutex sync.Mutex
channels map[string]*Channel
mutex sync.Mutex
channels map[string]*Channel
buffersize uint
}

// NewRelay creates a relay
func NewRelay() Relay {
func NewRelay(buffersize uint) Relay {
return &RelayImpl{
channels: make(map[string]*Channel),
channels: make(map[string]*Channel),
buffersize: buffersize,
}
}

Expand All @@ -40,7 +42,7 @@ func (s *RelayImpl) Publish(name string) (chan<- []byte, error) {
return nil, StreamAlreadyExists
}

channel := NewChannel()
channel := NewChannel(s.buffersize)
s.channels[name] = channel

ch := make(chan []byte, 0)
Expand Down
8 changes: 4 additions & 4 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestRelayImpl_SubscribeAndUnsubscribe(t *testing.T) {
relay := NewRelay()
relay := NewRelay(uint(1316 * 50))
data := []byte{1, 2, 3, 4}

pub, err := relay.Publish("test")
Expand Down Expand Up @@ -45,7 +45,7 @@ func TestRelayImpl_SubscribeAndUnsubscribe(t *testing.T) {
}

func TestRelayImpl_PublisherClose(t *testing.T) {
relay := NewRelay()
relay := NewRelay(0)

ch, _ := relay.Publish("test")
sub, unsub, _ := relay.Subscribe("test")
Expand All @@ -68,7 +68,7 @@ func TestRelayImpl_PublisherClose(t *testing.T) {
}

func TestRelayImpl_DoublePublish(t *testing.T) {
relay := NewRelay()
relay := NewRelay(0)
relay.Publish("foo")
_, err := relay.Publish("foo")

Expand All @@ -78,7 +78,7 @@ func TestRelayImpl_DoublePublish(t *testing.T) {
}

func TestRelayImpl_SubscribeNonExisting(t *testing.T) {
relay := NewRelay()
relay := NewRelay(0)

_, _, err := relay.Subscribe("foobar")
if err != StreamNotExisting {
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type ServerImpl struct {
}

// NewServer creates a server
func NewServer() Server {
ps := relay.NewRelay()
func NewServer(buffersize uint) Server {
ps := relay.NewRelay(buffersize)
return &ServerImpl{ps}
}

Expand Down Expand Up @@ -119,7 +119,7 @@ func (s *ServerImpl) play(name string, sock *srtgo.SrtSocket) error {
buf, ok := <-sub

buffered := len(sub)
if buffered > 40 {
if buffered > 144 {
log.Println("late in buffer", len(sub))
}

Expand Down

0 comments on commit 57e10c4

Please sign in to comment.