Skip to content

Commit

Permalink
Use mutex when switching media encoders
Browse files Browse the repository at this point in the history
  • Loading branch information
sergystepanov committed Aug 11, 2024
1 parent 7873631 commit 0232384
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 9 deletions.
48 changes: 40 additions & 8 deletions pkg/worker/media/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type WebrtcMediaPipe struct {
audioBuf buffer
log *logger.Logger

mua sync.RWMutex
muv sync.RWMutex

aConf config.Audio
vConf config.Video

Expand All @@ -132,8 +135,9 @@ func (wmp *WebrtcMediaPipe) SetAudioCb(cb func([]byte, int32)) {
wmp.onAudio = func(bytes []byte) { cb(bytes, fr) }
}
func (wmp *WebrtcMediaPipe) Destroy() {
if wmp.v != nil {
wmp.v.Stop()
v := wmp.Video()
if v != nil {
v.Stop()
}
}
func (wmp *WebrtcMediaPipe) PushAudio(audio []int16) { wmp.audioBuf.write(audio, wmp.encodeAudio) }
Expand All @@ -145,11 +149,15 @@ func (wmp *WebrtcMediaPipe) Init() error {
if err := wmp.initVideo(wmp.VideoW, wmp.VideoH, wmp.VideoScale, wmp.vConf); err != nil {
return err
}
if wmp.v == nil || wmp.a == nil {
return fmt.Errorf("could intit the encoders, v=%v a=%v", wmp.v != nil, wmp.a != nil)

a := wmp.Audio()
v := wmp.Video()

if v == nil || a == nil {
return fmt.Errorf("could intit the encoders, v=%v a=%v", v != nil, a != nil)
}

wmp.log.Debug().Msgf("%v", wmp.v.Info())
wmp.log.Debug().Msgf("%v", v.Info())
wmp.initialized = true
return nil
}
Expand All @@ -172,7 +180,7 @@ func (wmp *WebrtcMediaPipe) initAudio(srcHz int, frameSize int) error {
}

func (wmp *WebrtcMediaPipe) encodeAudio(pcm samples) {
data, err := wmp.a.Encode(pcm)
data, err := wmp.Audio().Encode(pcm)
audioPool.Put((*[]int16)(&pcm))
if err != nil {
wmp.log.Error().Err(err).Msgf("opus encode fail")
Expand All @@ -198,15 +206,15 @@ func (wmp *WebrtcMediaPipe) initVideo(w, h int, scale float64, conf config.Video
func round(x int, scale float64) int { return (int(float64(x)*scale) + 1) & ^1 }

func (wmp *WebrtcMediaPipe) ProcessVideo(v app.Video) []byte {
return wmp.v.Encode(encoder.InFrame(v.Frame))
return wmp.Video().Encode(encoder.InFrame(v.Frame))
}

func (wmp *WebrtcMediaPipe) Reinit() error {
if !wmp.initialized {
return nil
}

wmp.v.Stop()
wmp.Video().Stop()
if err := wmp.initVideo(wmp.VideoW, wmp.VideoH, wmp.VideoScale, wmp.vConf); err != nil {
return err
}
Expand All @@ -221,3 +229,27 @@ func (wmp *WebrtcMediaPipe) IsInitialized() bool { return wmp.initialized }
func (wmp *WebrtcMediaPipe) SetPixFmt(f uint32) { wmp.oldPf = f; wmp.v.SetPixFormat(f) }
func (wmp *WebrtcMediaPipe) SetVideoFlip(b bool) { wmp.oldFlip = b; wmp.v.SetFlip(b) }
func (wmp *WebrtcMediaPipe) SetRot(r uint) { wmp.oldRot = r; wmp.v.SetRot(r) }

func (wmp *WebrtcMediaPipe) Video() *encoder.Video {
wmp.muv.RLock()
defer wmp.muv.RUnlock()
return wmp.v
}

func (wmp *WebrtcMediaPipe) SetVideo(e *encoder.Video) {
wmp.muv.Lock()
wmp.v = e
wmp.muv.Unlock()
}

func (wmp *WebrtcMediaPipe) Audio() *opus.Encoder {
wmp.mua.RLock()
defer wmp.mua.RUnlock()
return wmp.a
}

func (wmp *WebrtcMediaPipe) SetAudio(e *opus.Encoder) {
wmp.mua.Lock()
wmp.a = e
wmp.mua.Unlock()
}
2 changes: 1 addition & 1 deletion pkg/worker/media/media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestEncoders(t *testing.T) {
}
}

func BenchmarkH264(b *testing.B) { run(1920, 1080, encoder.H264, b.N, nil, nil, b) }
func BenchmarkH264(b *testing.B) { run(640, 480, encoder.H264, b.N, nil, nil, b) }
func BenchmarkVP8(b *testing.B) { run(1920, 1080, encoder.VP8, b.N, nil, nil, b) }

func run(w, h int, cod encoder.VideoCodec, count int, a *image.RGBA, b *image.RGBA, backend testing.TB) {
Expand Down

0 comments on commit 0232384

Please sign in to comment.