diff --git a/device_directsound.go b/device_directsound.go index dbbea7e..6a51e89 100644 --- a/device_directsound.go +++ b/device_directsound.go @@ -5,8 +5,7 @@ package gosound import ( "context" "errors" - "sync/atomic" - "time" + "io" "github.com/gotracker/gomixing/mixing" @@ -72,51 +71,64 @@ func (d *dsoundDevice) Play(in <-chan *PremixData) error { return d.PlayWithCtx(context.Background(), in) } -// PlayWithCtx starts the wave output device playing -func (d *dsoundDevice) PlayWithCtx(ctx context.Context, in <-chan *PremixData) error { - type RowWave struct { - PlayOffset uint32 - Row *PremixData - } +type playbackData struct { + event win32.HANDLE + row *PremixData + pos int +} - event, err := win32.CreateEvent() - if err != nil { - return err - } - defer win32.CloseHandle(event) +type playbackBuffer struct { + buffer *directsound.Buffer + notify *directsound.Notify + rows []playbackData + maxSamples int + writePos int +} - panmixer := mixing.GetPanMixer(d.mix.Channels) - if panmixer == nil { - return errors.New("invalid pan mixer - check channel count") +func (p *playbackBuffer) Add(mix *mixing.Mixer, row *PremixData, pos int, size int, blockAlign int, panmixer mixing.PanMixer) (int, error) { + remaining := p.maxSamples - p.writePos + samples := row.SamplesLen - pos + if samples >= remaining { + samples = remaining } - playbackSize := int(d.wfx.NAvgBytesPerSec * 2) - lpdsb, err := d.ds.CreateSoundBufferSecondary(d.wfx, playbackSize) + bufPos := p.writePos * blockAlign + segments, err := p.buffer.Lock(bufPos, samples*blockAlign) if err != nil { - return err + return 0, err } - defer lpdsb.Release() - - notify, err := lpdsb.GetNotify() - if err != nil { - return err + writeSegs := [][]byte{} + if pos > 0 { + front := make([]byte, pos*blockAlign) + writeSegs = append(writeSegs, front) } - defer notify.Release() - - pn := []directsound.PositionNotify{ - { - Offset: uint32(playbackSize - int(d.wfx.NBlockAlign)), - EventNotify: event, - }, + writeSegs = append(writeSegs, segments...) + if samples < row.SamplesLen { + rem := row.SamplesLen - samples + rear := make([]byte, rem*blockAlign) + writeSegs = append(writeSegs, rear) + } + mix.FlattenTo(writeSegs, panmixer, row.SamplesLen, row.Data, row.MixerVolume) + if err := p.buffer.Unlock(segments); err != nil { + return 0, err } - if err := notify.SetNotificationPositions(pn); err != nil { - return err + p.writePos += samples + remaining -= samples + if remaining <= 0 { + err = io.EOF } + return samples, err +} - // play (looping) - if err := lpdsb.Play(true); err != nil { - return err +// PlayWithCtx starts the wave output device playing +func (d *dsoundDevice) PlayWithCtx(ctx context.Context, in <-chan *PremixData) error { + maxOutstanding := 3 + maxOutstandingEvents := 1000 + + panmixer := mixing.GetPanMixer(d.mix.Channels) + if panmixer == nil { + return errors.New("invalid pan mixer - check channel count") } done := make(chan struct{}, 1) @@ -124,11 +136,49 @@ func (d *dsoundDevice) PlayWithCtx(ctx context.Context, in <-chan *PremixData) e myCtx, cancel := context.WithCancel(ctx) - out := make(chan RowWave, 3) + events := []win32.HANDLE{} + availableEvents := make(chan win32.HANDLE, maxOutstandingEvents) + defer func() { + for _, event := range events { + win32.CloseHandle(event) + } + }() + for i := 0; i < cap(availableEvents); i++ { + event, err := win32.CreateEvent() + if err != nil { + return err + } + events = append(events, event) + availableEvents <- event + } + + playbackBuffers := make([]playbackBuffer, maxOutstanding) + playbackBufferSize := int(float64(d.wfx.NSamplesPerSec) * 0.5) + availableBuffers := make(chan *playbackBuffer, len(playbackBuffers)) + defer close(availableBuffers) + for i := range playbackBuffers { + lpdsb, err := d.ds.CreateSoundBufferSecondary(d.wfx, playbackBufferSize*int(d.wfx.NBlockAlign)) + if err != nil { + return err + } + playbackBuffers[i] = playbackBuffer{ + buffer: lpdsb, + maxSamples: playbackBufferSize, + } + availableBuffers <- &playbackBuffers[i] + } + defer func() { + for _, pb := range playbackBuffers { + pb.buffer.Release() + } + }() + + currentBuffer := <-availableBuffers + + out := make(chan *playbackBuffer, maxOutstanding) go func() { defer close(out) defer cancel() - writePos := 0 for { select { case <-myCtx.Done(): @@ -137,54 +187,92 @@ func (d *dsoundDevice) PlayWithCtx(ctx context.Context, in <-chan *PremixData) e if !ok { return } - var rowWave RowWave - //_, writePos, err := lpdsb.GetCurrentPosition() - numBytes := row.SamplesLen * int(d.wfx.NBlockAlign) - segments, err := lpdsb.Lock(writePos%playbackSize, numBytes) - if err != nil { - continue + + size := row.SamplesLen + pos := 0 + + blockAlign := int(d.wfx.NBlockAlign) + if size > 0 { + event := <-availableEvents + currentBuffer.rows = append(currentBuffer.rows, playbackData{ + event: event, + row: row, + pos: currentBuffer.writePos * blockAlign, + }) } - d.mix.FlattenTo(segments, panmixer, row.SamplesLen, row.Data, row.MixerVolume) - if err := lpdsb.Unlock(segments); err != nil { - continue + for size > 0 { + n, err := currentBuffer.Add(&d.mix, row, pos, row.SamplesLen, blockAlign, panmixer) + size -= n + pos += n + if err != nil { + if !errors.Is(err, io.EOF) { + panic(err) + } + currentBuffer.writePos = 0 + out <- currentBuffer + currentBuffer = <-availableBuffers + } } - rowWave.Row = row - writePos += numBytes - rowWave.PlayOffset = uint32(writePos) - out <- rowWave } } }() - playBase := uint32(0) - go func() { - eventCh, closeFunc := win32.EventToChannel(event) - defer closeFunc() - for { - select { - case <-eventCh: - atomic.AddUint32(&playBase, uint32(playbackSize)) - case <-done: - return - } + for buffer := range out { + endEvent := <-availableEvents + if err := d.playWaveBuffer(buffer, endEvent); err != nil { + return err } - }() - for rowWave := range out { - for { - playPos, _, _ := lpdsb.GetCurrentPosition() - base := atomic.LoadUint32(&playBase) - if playPos+base >= rowWave.PlayOffset { - if d.onRowOutput != nil { - d.onRowOutput(KindSoundCard, rowWave.Row) - } - break - } - time.Sleep(time.Millisecond * 1) + for _, n := range buffer.rows { + availableEvents <- n.event } + availableEvents <- endEvent + buffer.rows = []playbackData{} + availableBuffers <- buffer } done <- struct{}{} return nil } +func (d *dsoundDevice) playWaveBuffer(p *playbackBuffer, endEvent win32.HANDLE) error { + notify, err := p.buffer.GetNotify() + if err != nil { + return err + } + defer notify.Release() + + pn := []directsound.PositionNotify{} + + for _, n := range p.rows { + pn = append(pn, directsound.PositionNotify{ + Offset: uint32(n.pos), + EventNotify: n.event, + }) + } + + pn = append(pn, directsound.PositionNotify{ + Offset: win32.DSBPN_OFFSETSTOP, + EventNotify: endEvent, + }) + + if err := notify.SetNotificationPositions(pn); err != nil { + return err + } + + // play (non-looping) + if err := p.buffer.Play(false); err != nil { + return err + } + + for _, n := range p.rows { + if d.onRowOutput != nil { + d.onRowOutput(KindSoundCard, n.row) + } + if err := win32.WaitForSingleObjectInfinite(n.event); err != nil { + return err + } + } + return win32.WaitForSingleObjectInfinite(endEvent) +} + // Close closes the wave output device func (d *dsoundDevice) Close() { if d.lpdsbPrimary != nil { diff --git a/internal/win32/win32.go b/internal/win32/win32.go index b786080..bbc8e27 100644 --- a/internal/win32/win32.go +++ b/internal/win32/win32.go @@ -32,6 +32,9 @@ const ( // DSBPLAY_LOOPING loops a buffer until told not to DSBPLAY_LOOPING = uint32(0x00000001) + // DSBPN_OFFSETSTOP if a magic value to signify for a PositionNotify that a Stop event notification is requested + DSBPN_OFFSETSTOP = uint32(0xFFFFFFFF) + WAVE_MAPPER = uint32(0xFFFFFFFF) ) @@ -120,26 +123,20 @@ func GetDesktopWindow() HWND { // WaitForSingleObjectInfinite will wait infinitely for a single handle value to become available func WaitForSingleObjectInfinite(handle HANDLE) error { - h := atomic.LoadUintptr((*uintptr)(&handle)) - s, e := syscall.WaitForSingleObject(syscall.Handle(h), syscall.INFINITE) - switch s { - case syscall.WAIT_OBJECT_0: - break - case syscall.WAIT_FAILED: - return os.NewSyscallError("WaitForSingleObject", e) - default: - return errors.New("os: unexpected result from WaitForSingleObject") - } - return nil + return internalWaitForSingleObject(handle, syscall.INFINITE) } // WaitForSingleObject will wait for a single handle value to become available up to a total of `duration` milliseconds func WaitForSingleObject(handle HANDLE, duration time.Duration) error { + return internalWaitForSingleObject(handle, uint32(duration.Milliseconds())) +} + +func internalWaitForSingleObject(handle HANDLE, duration uint32) error { h := atomic.LoadUintptr((*uintptr)(&handle)) - s, e := syscall.WaitForSingleObject(syscall.Handle(h), uint32(duration.Milliseconds())) + s, e := syscall.WaitForSingleObject(syscall.Handle(h), duration) switch s { case syscall.WAIT_OBJECT_0: - break + // we're good case syscall.WAIT_FAILED: return os.NewSyscallError("WaitForSingleObject", e) default: @@ -162,7 +159,6 @@ func EventToChannel(event HANDLE) (<-chan struct{}, func()) { } if err := WaitForSingleObjectInfinite(event); err != nil { panic(err) - return } ch <- struct{}{} }