diff --git a/main.go b/main.go index f15b6c0..4ab3b9f 100644 --- a/main.go +++ b/main.go @@ -11,10 +11,11 @@ func main() { options := make(map[string]string) options["blocking"] = "0" options["transtype"] = "live" - options["latency"] = "200" + options["latency"] = "300" address := "0.0.0.0" port := uint16(8090) + buffersize := uint(384000) // 1s @ 3Mbits/ sck := srtgo.NewSrtSocket(address, port, options) err := sck.Listen(1) @@ -24,16 +25,7 @@ func main() { } log.Printf("Listening on %s:%d\n", address, port) - server := server.NewServer() - - // Handle SIGTERM signal - // ch := make(chan os.Signal, 1) - // signal.Notify(ch, syscall.SIGTERM) - // go func() { - // <-ch - // cancel() - // }() - + server := server.NewServer(buffersize) for { sock, err := sck.Accept() if err != nil { diff --git a/mpegts/h264.go b/mpegts/h264.go index b8cd955..aea5655 100644 --- a/mpegts/h264.go +++ b/mpegts/h264.go @@ -67,58 +67,22 @@ func (h *H264Parser) InitPacket() ([]byte, error) { return nil, err } + // pad with adaptationField + adaptationLen := MaxPayloadSize - len(pesPayload) + adaptationField := make([]byte, adaptationLen) + adaptationField[0] = 0x3 << 6 + for i := 0; i < adaptationLen-1; i++ { + adaptationField[i+1] = 0xff + } + // create MPEG-TS Packet pkt := Packet{ - PID: h.pid, - PUSI: true, - Payload: pesPayload, + PID: h.pid, + PUSI: true, + Payload: pesPayload, + AdaptationField: adaptationField, } - // //sps pps - // int len = ti->sps_len + ti->pps_len; - // if (len > TS_PACK_LEN-4) { - // printf("pid=%d, pes size=%d is abnormal!!!!\n", pid, len); - // return ret; - // } - // pos ++; - // //pid - // ti->es_pid = pid; - // tmp = ti->es_pid >> 8; - // p[pos++] = 0x40 | tmp; - // tmp = ti->es_pid; - // p[pos++] = tmp; - // p[pos] = 0x10; - // int ad_len = TS_PACK_LEN - 4 - len - 1; - // if (ad_len > 0) { - // p[pos++] = 0x30; - // p[pos++] = ad_len;//adaptation length - // p[pos++] = 0x00;// - // memset(p + pos, 0xFF, ad_len-1); - // pos += ad_len - 1; - // }else{ - // pos ++; - // } - - // //pes - // p[pos++] = 0; - // p[pos++] = 0; - // p[pos++] = 1; - // p[pos++] = stream_id; - // p[pos++] = 0;//total size - // p[pos++] = 0;//total size - // p[pos++] = 0x80;//flag - // p[pos++] = 0x80;//flag - // p[pos++] = 5;//header_len - // p[pos++] = 0;//pts - // p[pos++] = 0; - // p[pos++] = 0; - // p[pos++] = 0; - // p[pos++] = 0; - // memcpy(p+pos, ti->sps, ti->sps_len); - // pos += ti->sps_len; - // memcpy(p+pos, ti->pps, ti->pps_len); - // pos += ti->pps_len; - data := make([]byte, PacketLen) err = pkt.ToBytes(data) if err != nil { @@ -127,14 +91,6 @@ func (h *H264Parser) InitPacket() ([]byte, error) { return data, nil } -// func readByte(rd io.Reader) (byte, error) { -// b := make([]byte, 1) -// _, err := rd.Read(b) -// return b[0], err -// } - -// var counter = 0 - // Parse reads H.264 PPS and SPS from PES payloads func (h *H264Parser) Parse(rd *io.PipeReader) { // get semaphor @@ -185,7 +141,6 @@ func (h *H264Parser) Parse(rd *io.PipeReader) { case NALUnitTypeSPS: h.sps = nalBuffer nalBuffer = nil - log.Println("got SPS len", len(h.sps)) if h.sps != nil && h.pps != nil { close(h.done) rd.Close() @@ -195,7 +150,6 @@ func (h *H264Parser) Parse(rd *io.PipeReader) { case NALUnitTypePPS: h.pps = nalBuffer nalBuffer = nil - log.Println("got PPS len", len(h.pps)) if h.sps != nil && h.pps != nil { close(h.done) rd.Close() @@ -207,19 +161,20 @@ func (h *H264Parser) Parse(rd *io.PipeReader) { if nalType == NALUnitTypeSPS || nalType == NALUnitTypePPS { // log.Println("got SPS/PPS") nalBuffer = make([]byte, 0, 200) + nalBuffer = append(nalBuffer, 0x0) - startSlice := nalBuffer[:NALHeaderSize] if skippedZero { nalBuffer = append(nalBuffer, 0x0) - startSlice = nalBuffer[1 : NALHeaderSize+1] } // ignore error, because the bytes should be buffered through peek + startSlice := make([]byte, NALHeaderSize) n, _ := brd.Read(startSlice) if n < NALHeaderSize { log.Fatal("Short read, should be buffered") return } + nalBuffer = append(nalBuffer, startSlice...) } previousNalType = nalType } @@ -259,10 +214,8 @@ func (h *H264Parser) Parse(rd *io.PipeReader) { switch previousNalType { case NALUnitTypeSPS: h.sps = nalBuffer - log.Println("got SPS len on end", len(h.sps)) case NALUnitTypePPS: h.pps = nalBuffer - log.Println("got PPS len on end", len(h.pps)) } if h.sps != nil && h.pps != nil { diff --git a/mpegts/packet.go b/mpegts/packet.go index 8729dc9..6cebe31 100644 --- a/mpegts/packet.go +++ b/mpegts/packet.go @@ -100,8 +100,6 @@ func (pkt *Packet) ToBytes(data []byte) error { copy(data[offset:offset+payloadLength], pkt.Payload) offset += payloadLength - // fill remainder? - return nil } diff --git a/mpegts/parser.go b/mpegts/parser.go index 14a9098..43f3d54 100644 --- a/mpegts/parser.go +++ b/mpegts/parser.go @@ -170,7 +170,6 @@ func (p *Parser) ParsePSI(data []byte) (bool, error) { shouldStore = true p.expectedPATSection = hdr.sectionNumber + 1 if hdr.sectionNumber == hdr.lastSectionNumber { - log.Println("got PAT") p.hasPAT = true } @@ -202,9 +201,7 @@ func (p *Parser) ParsePSI(data []byte) (bool, error) { p.tspMap[elementaryPID] = &ElementaryStream{ parser: NewH264Parser(elementaryPID), } - log.Println("got H.264 stream, pid", elementaryPID) } - //log.Printf("got pid: %d, streamType: 0x%x\n", elementaryPID, streamType) if offset >= end { break @@ -214,7 +211,6 @@ func (p *Parser) ParsePSI(data []byte) (bool, error) { shouldStore = true p.expectedPMTSection = hdr.sectionNumber + 1 if hdr.sectionNumber == hdr.lastSectionNumber { - log.Println("got PMT") p.hasPMT = true } } diff --git a/mpegts/pes.go b/mpegts/pes.go index 6c6f779..3a33e72 100644 --- a/mpegts/pes.go +++ b/mpegts/pes.go @@ -46,6 +46,9 @@ func encodeVideoPES(data []byte) ([]byte, error) { // write pes length (can be 0 only for Video packets) binary.BigEndian.PutUint16(pes[offset:offset+2], 0) + // magic?! flag + pts + // pes = append(pes, 0x80, 0x80, 5, 0, 0, 0, 0, 0) + pes = append(pes, data...) return pes, nil diff --git a/relay/channel.go b/relay/channel.go index 7c98def..4363b23 100644 --- a/relay/channel.go +++ b/relay/channel.go @@ -8,8 +8,9 @@ import ( type UnsubscribeFunc func() type Channel struct { - mutex sync.Mutex - subs Subs + mutex sync.Mutex + subs Subs + buffersize uint } type Subs []chan []byte @@ -37,15 +38,19 @@ func (subs Subs) Remove(sub chan []byte) Subs { return subs[:len(subs)-1] // Truncate slice. } -func NewChannel() *Channel { - return &Channel{subs: make([]chan []byte, 0, 10)} +func NewChannel(buffersize uint) *Channel { + return &Channel{ + subs: make([]chan []byte, 0, 10), + buffersize: buffersize, + } } // Sub subscribes to a channel func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) { ch.mutex.Lock() defer ch.mutex.Unlock() - sub := make(chan []byte, 80) // about 300ms at 3Mbit/s + channelbuffer := ch.buffersize / 1316 + sub := make(chan []byte, channelbuffer) ch.subs = append(ch.subs, sub) var unsub UnsubscribeFunc = func() { diff --git a/relay/channel_test.go b/relay/channel_test.go index 74a0ed6..81920ee 100644 --- a/relay/channel_test.go +++ b/relay/channel_test.go @@ -6,7 +6,7 @@ import ( ) func TestChannel_PubSub(t *testing.T) { - ch := NewChannel() + ch := NewChannel(uint(1316 * 50)) // sub out, unsub := ch.Sub() @@ -33,7 +33,7 @@ func TestChannel_PubSub(t *testing.T) { } func TestChannel_DropOnOverflow(t *testing.T) { - ch := NewChannel() + ch := NewChannel(uint(1316 * 50)) sub, _ := ch.Sub() capacity := cap(sub) + 1 @@ -60,7 +60,7 @@ func TestChannel_DropOnOverflow(t *testing.T) { } func TestChannel_Close(t *testing.T) { - ch := NewChannel() + ch := NewChannel(0) sub1, _ := ch.Sub() _, _ = ch.Sub() diff --git a/relay/relay.go b/relay/relay.go index 36791be..90bf849 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -20,14 +20,16 @@ type Relay interface { // RelayImpl represents a multi-channel stream relay type RelayImpl struct { - mutex sync.Mutex - channels map[string]*Channel + mutex sync.Mutex + channels map[string]*Channel + buffersize uint } // NewRelay creates a relay -func NewRelay() Relay { +func NewRelay(buffersize uint) Relay { return &RelayImpl{ - channels: make(map[string]*Channel), + channels: make(map[string]*Channel), + buffersize: buffersize, } } @@ -40,7 +42,7 @@ func (s *RelayImpl) Publish(name string) (chan<- []byte, error) { return nil, StreamAlreadyExists } - channel := NewChannel() + channel := NewChannel(s.buffersize) s.channels[name] = channel ch := make(chan []byte, 0) diff --git a/relay/relay_test.go b/relay/relay_test.go index 26b3944..20275c3 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -7,7 +7,7 @@ import ( ) func TestRelayImpl_SubscribeAndUnsubscribe(t *testing.T) { - relay := NewRelay() + relay := NewRelay(uint(1316 * 50)) data := []byte{1, 2, 3, 4} pub, err := relay.Publish("test") @@ -45,7 +45,7 @@ func TestRelayImpl_SubscribeAndUnsubscribe(t *testing.T) { } func TestRelayImpl_PublisherClose(t *testing.T) { - relay := NewRelay() + relay := NewRelay(0) ch, _ := relay.Publish("test") sub, unsub, _ := relay.Subscribe("test") @@ -68,7 +68,7 @@ func TestRelayImpl_PublisherClose(t *testing.T) { } func TestRelayImpl_DoublePublish(t *testing.T) { - relay := NewRelay() + relay := NewRelay(0) relay.Publish("foo") _, err := relay.Publish("foo") @@ -78,7 +78,7 @@ func TestRelayImpl_DoublePublish(t *testing.T) { } func TestRelayImpl_SubscribeNonExisting(t *testing.T) { - relay := NewRelay() + relay := NewRelay(0) _, _, err := relay.Subscribe("foobar") if err != StreamNotExisting { diff --git a/server/server.go b/server/server.go index b42dc0b..7ce5f65 100644 --- a/server/server.go +++ b/server/server.go @@ -41,8 +41,8 @@ type ServerImpl struct { } // NewServer creates a server -func NewServer() Server { - ps := relay.NewRelay() +func NewServer(buffersize uint) Server { + ps := relay.NewRelay(buffersize) return &ServerImpl{ps} } @@ -119,7 +119,7 @@ func (s *ServerImpl) play(name string, sock *srtgo.SrtSocket) error { buf, ok := <-sub buffered := len(sub) - if buffered > 40 { + if buffered > 144 { log.Println("late in buffer", len(sub)) }