Skip to content
This repository has been archived by the owner on Jul 22, 2022. It is now read-only.

Commit

Permalink
Rework directsound to be more reasonable
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason Crawford committed Feb 20, 2021
1 parent 9414a5f commit 3bc17a1
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 88 deletions.
236 changes: 162 additions & 74 deletions device_directsound.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ package gosound
import (
"context"
"errors"
"sync/atomic"
"time"
"io"

"github.com/gotracker/gomixing/mixing"

Expand Down Expand Up @@ -72,63 +71,114 @@ func (d *dsoundDevice) Play(in <-chan *PremixData) error {
return d.PlayWithCtx(context.Background(), in)
}

// PlayWithCtx starts the wave output device playing
func (d *dsoundDevice) PlayWithCtx(ctx context.Context, in <-chan *PremixData) error {
type RowWave struct {
PlayOffset uint32
Row *PremixData
}
type playbackData struct {
event win32.HANDLE
row *PremixData
pos int
}

event, err := win32.CreateEvent()
if err != nil {
return err
}
defer win32.CloseHandle(event)
type playbackBuffer struct {
buffer *directsound.Buffer
notify *directsound.Notify
rows []playbackData
maxSamples int
writePos int
}

panmixer := mixing.GetPanMixer(d.mix.Channels)
if panmixer == nil {
return errors.New("invalid pan mixer - check channel count")
func (p *playbackBuffer) Add(mix *mixing.Mixer, row *PremixData, pos int, size int, blockAlign int, panmixer mixing.PanMixer) (int, error) {
remaining := p.maxSamples - p.writePos
samples := row.SamplesLen - pos
if samples >= remaining {
samples = remaining
}

playbackSize := int(d.wfx.NAvgBytesPerSec * 2)
lpdsb, err := d.ds.CreateSoundBufferSecondary(d.wfx, playbackSize)
bufPos := p.writePos * blockAlign
segments, err := p.buffer.Lock(bufPos, samples*blockAlign)
if err != nil {
return err
return 0, err
}
defer lpdsb.Release()

notify, err := lpdsb.GetNotify()
if err != nil {
return err
writeSegs := [][]byte{}
if pos > 0 {
front := make([]byte, pos*blockAlign)
writeSegs = append(writeSegs, front)
}
defer notify.Release()

pn := []directsound.PositionNotify{
{
Offset: uint32(playbackSize - int(d.wfx.NBlockAlign)),
EventNotify: event,
},
writeSegs = append(writeSegs, segments...)
if samples < row.SamplesLen {
rem := row.SamplesLen - samples
rear := make([]byte, rem*blockAlign)
writeSegs = append(writeSegs, rear)
}
mix.FlattenTo(writeSegs, panmixer, row.SamplesLen, row.Data, row.MixerVolume)
if err := p.buffer.Unlock(segments); err != nil {
return 0, err
}

if err := notify.SetNotificationPositions(pn); err != nil {
return err
p.writePos += samples
remaining -= samples
if remaining <= 0 {
err = io.EOF
}
return samples, err
}

// play (looping)
if err := lpdsb.Play(true); err != nil {
return err
// PlayWithCtx starts the wave output device playing
func (d *dsoundDevice) PlayWithCtx(ctx context.Context, in <-chan *PremixData) error {
maxOutstanding := 3
maxOutstandingEvents := 1000

panmixer := mixing.GetPanMixer(d.mix.Channels)
if panmixer == nil {
return errors.New("invalid pan mixer - check channel count")
}

done := make(chan struct{}, 1)
defer close(done)

myCtx, cancel := context.WithCancel(ctx)

out := make(chan RowWave, 3)
events := []win32.HANDLE{}
availableEvents := make(chan win32.HANDLE, maxOutstandingEvents)
defer func() {
for _, event := range events {
win32.CloseHandle(event)
}
}()
for i := 0; i < cap(availableEvents); i++ {
event, err := win32.CreateEvent()
if err != nil {
return err
}
events = append(events, event)
availableEvents <- event
}

playbackBuffers := make([]playbackBuffer, maxOutstanding)
playbackBufferSize := int(float64(d.wfx.NSamplesPerSec) * 0.5)
availableBuffers := make(chan *playbackBuffer, len(playbackBuffers))
defer close(availableBuffers)
for i := range playbackBuffers {
lpdsb, err := d.ds.CreateSoundBufferSecondary(d.wfx, playbackBufferSize*int(d.wfx.NBlockAlign))
if err != nil {
return err
}
playbackBuffers[i] = playbackBuffer{
buffer: lpdsb,
maxSamples: playbackBufferSize,
}
availableBuffers <- &playbackBuffers[i]
}
defer func() {
for _, pb := range playbackBuffers {
pb.buffer.Release()
}
}()

currentBuffer := <-availableBuffers

out := make(chan *playbackBuffer, maxOutstanding)
go func() {
defer close(out)
defer cancel()
writePos := 0
for {
select {
case <-myCtx.Done():
Expand All @@ -137,54 +187,92 @@ func (d *dsoundDevice) PlayWithCtx(ctx context.Context, in <-chan *PremixData) e
if !ok {
return
}
var rowWave RowWave
//_, writePos, err := lpdsb.GetCurrentPosition()
numBytes := row.SamplesLen * int(d.wfx.NBlockAlign)
segments, err := lpdsb.Lock(writePos%playbackSize, numBytes)
if err != nil {
continue

size := row.SamplesLen
pos := 0

blockAlign := int(d.wfx.NBlockAlign)
if size > 0 {
event := <-availableEvents
currentBuffer.rows = append(currentBuffer.rows, playbackData{
event: event,
row: row,
pos: currentBuffer.writePos * blockAlign,
})
}
d.mix.FlattenTo(segments, panmixer, row.SamplesLen, row.Data, row.MixerVolume)
if err := lpdsb.Unlock(segments); err != nil {
continue
for size > 0 {
n, err := currentBuffer.Add(&d.mix, row, pos, row.SamplesLen, blockAlign, panmixer)
size -= n
pos += n
if err != nil {
if !errors.Is(err, io.EOF) {
panic(err)
}
currentBuffer.writePos = 0
out <- currentBuffer
currentBuffer = <-availableBuffers
}
}
rowWave.Row = row
writePos += numBytes
rowWave.PlayOffset = uint32(writePos)
out <- rowWave
}
}
}()
playBase := uint32(0)
go func() {
eventCh, closeFunc := win32.EventToChannel(event)
defer closeFunc()
for {
select {
case <-eventCh:
atomic.AddUint32(&playBase, uint32(playbackSize))
case <-done:
return
}
for buffer := range out {
endEvent := <-availableEvents
if err := d.playWaveBuffer(buffer, endEvent); err != nil {
return err
}
}()
for rowWave := range out {
for {
playPos, _, _ := lpdsb.GetCurrentPosition()
base := atomic.LoadUint32(&playBase)
if playPos+base >= rowWave.PlayOffset {
if d.onRowOutput != nil {
d.onRowOutput(KindSoundCard, rowWave.Row)
}
break
}
time.Sleep(time.Millisecond * 1)
for _, n := range buffer.rows {
availableEvents <- n.event
}
availableEvents <- endEvent
buffer.rows = []playbackData{}
availableBuffers <- buffer
}
done <- struct{}{}
return nil
}

func (d *dsoundDevice) playWaveBuffer(p *playbackBuffer, endEvent win32.HANDLE) error {
notify, err := p.buffer.GetNotify()
if err != nil {
return err
}
defer notify.Release()

pn := []directsound.PositionNotify{}

for _, n := range p.rows {
pn = append(pn, directsound.PositionNotify{
Offset: uint32(n.pos),
EventNotify: n.event,
})
}

pn = append(pn, directsound.PositionNotify{
Offset: win32.DSBPN_OFFSETSTOP,
EventNotify: endEvent,
})

if err := notify.SetNotificationPositions(pn); err != nil {
return err
}

// play (non-looping)
if err := p.buffer.Play(false); err != nil {
return err
}

for _, n := range p.rows {
if d.onRowOutput != nil {
d.onRowOutput(KindSoundCard, n.row)
}
if err := win32.WaitForSingleObjectInfinite(n.event); err != nil {
return err
}
}
return win32.WaitForSingleObjectInfinite(endEvent)
}

// Close closes the wave output device
func (d *dsoundDevice) Close() {
if d.lpdsbPrimary != nil {
Expand Down
24 changes: 10 additions & 14 deletions internal/win32/win32.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (
// DSBPLAY_LOOPING loops a buffer until told not to
DSBPLAY_LOOPING = uint32(0x00000001)

// DSBPN_OFFSETSTOP if a magic value to signify for a PositionNotify that a Stop event notification is requested
DSBPN_OFFSETSTOP = uint32(0xFFFFFFFF)

WAVE_MAPPER = uint32(0xFFFFFFFF)
)

Expand Down Expand Up @@ -120,26 +123,20 @@ func GetDesktopWindow() HWND {

// WaitForSingleObjectInfinite will wait infinitely for a single handle value to become available
func WaitForSingleObjectInfinite(handle HANDLE) error {
h := atomic.LoadUintptr((*uintptr)(&handle))
s, e := syscall.WaitForSingleObject(syscall.Handle(h), syscall.INFINITE)
switch s {
case syscall.WAIT_OBJECT_0:
break
case syscall.WAIT_FAILED:
return os.NewSyscallError("WaitForSingleObject", e)
default:
return errors.New("os: unexpected result from WaitForSingleObject")
}
return nil
return internalWaitForSingleObject(handle, syscall.INFINITE)
}

// WaitForSingleObject will wait for a single handle value to become available up to a total of `duration` milliseconds
func WaitForSingleObject(handle HANDLE, duration time.Duration) error {
return internalWaitForSingleObject(handle, uint32(duration.Milliseconds()))
}

func internalWaitForSingleObject(handle HANDLE, duration uint32) error {
h := atomic.LoadUintptr((*uintptr)(&handle))
s, e := syscall.WaitForSingleObject(syscall.Handle(h), uint32(duration.Milliseconds()))
s, e := syscall.WaitForSingleObject(syscall.Handle(h), duration)
switch s {
case syscall.WAIT_OBJECT_0:
break
// we're good
case syscall.WAIT_FAILED:
return os.NewSyscallError("WaitForSingleObject", e)
default:
Expand All @@ -162,7 +159,6 @@ func EventToChannel(event HANDLE) (<-chan struct{}, func()) {
}
if err := WaitForSingleObjectInfinite(event); err != nil {
panic(err)
return
}
ch <- struct{}{}
}
Expand Down

0 comments on commit 3bc17a1

Please sign in to comment.