Skip to content

Commit

Permalink
Merge pull request #7 from sourcenetwork/sisley/6-concat-fanciness
Browse files Browse the repository at this point in the history
feat: Expand enumerable types (queue, socket)
  • Loading branch information
AndrewSisley authored Jun 30, 2023
2 parents 4277211 + 463e41f commit 65c38cd
Show file tree
Hide file tree
Showing 7 changed files with 767 additions and 3 deletions.
42 changes: 39 additions & 3 deletions enumerable/concat.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package enumerable

// Concatenation is an extention of the enumerable interface allowing new sources
// to be added after initial construction.
type Concatenation[T any] interface {
Enumerable[T]
// Append appends a new source to this concatenation.
//
// This may be done after enumeration has begun.
Append(Enumerable[T])
}

type enumerableConcat[T any] struct {
sources []Enumerable[T]
currentSourceIndex int
Expand All @@ -8,29 +18,55 @@ type enumerableConcat[T any] struct {
// Concat takes zero to many source `Ènumerable`s and stacks them on top
// of each other, resulting in one enumerable that will iterate through all
// the values in all of the given sources.
func Concat[T any](sources ...Enumerable[T]) Enumerable[T] {
//
// New sources may be added after iteration has begun.
func Concat[T any](sources ...Enumerable[T]) Concatenation[T] {
return &enumerableConcat[T]{
sources: sources,
currentSourceIndex: 0,
}
}

// Append appends a new source to this concatenation.
//
// This may be done after enumeration has begun.
func (s *enumerableConcat[T]) Append(newSource Enumerable[T]) {
s.sources = append(s.sources, newSource)
}

func (s *enumerableConcat[T]) Next() (bool, error) {
startSourceIndex := s.currentSourceIndex
hasLooped := false

for {
// If we have reached the end of the sources slice we need to loop
// back to the beginning. It may be that earlier sources have gained
// items whilst we iterated though later sources.
if s.currentSourceIndex >= len(s.sources) {
return false, nil
if len(s.sources) < 1 || hasLooped {
return false, nil
}
s.currentSourceIndex = 0
hasLooped = true
}

currentSource := s.sources[s.currentSourceIndex]
hasValue, err := currentSource.Next()
if err != nil {
return false, nil
return false, err
}
if hasValue {
return true, nil
}

s.currentSourceIndex += 1

if s.currentSourceIndex == startSourceIndex {
// If we are here it means that we have re-cycled
// all the way through the source slice and have found
// no new items.
return false, nil
}
}
}

Expand Down
130 changes: 130 additions & 0 deletions enumerable/concat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package enumerable

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestConcatYieldsNothingGivenEmpty(t *testing.T) {
concat := Concat[int]()

hasNext, err := concat.Next()
require.NoError(t, err)
require.False(t, hasNext)
}

func TestConcatYieldsItemsFromSource(t *testing.T) {
v1 := 1
v2 := 2
v3 := 3
source1 := New([]int{v1, v2, v3})

concat := Concat(source1)

hasNext, err := concat.Next()
require.NoError(t, err)
require.True(t, hasNext)

r1, err := concat.Value()
require.NoError(t, err)
require.Equal(t, v1, r1)

hasNext, err = concat.Next()
require.NoError(t, err)
require.True(t, hasNext)

r2, err := concat.Value()
require.NoError(t, err)
require.Equal(t, v2, r2)

hasNext, err = concat.Next()
require.NoError(t, err)
require.True(t, hasNext)

r3, err := concat.Value()
require.NoError(t, err)
require.Equal(t, v3, r3)

hasNext, err = concat.Next()
require.NoError(t, err)
require.False(t, hasNext)
}

func TestConcatYieldsItemsFromSourceInOrder(t *testing.T) {
v1 := 1
v2 := 2
v3 := 3
v4 := 4
v5 := 5
v6 := 6
source1 := NewQueue[int]()
var s1 Enumerable[int] = source1
source2 := New([]int{v1, v2, v3})
source3 := New([]int{v4, v5})

concat := Concat(s1, source2, source3)

// Start yielding *before* source1 has any items
hasNext, err := concat.Next()
require.NoError(t, err)
require.True(t, hasNext)

r1, err := concat.Value()
require.NoError(t, err)
require.Equal(t, v1, r1)

// Put an item into source1
err = source1.Put(v6)
require.NoError(t, err)

// Assert that the yielding of items from source2 is
// not interupted by source1 recieving items
hasNext, err = concat.Next()
require.NoError(t, err)
require.True(t, hasNext)

r2, err := concat.Value()
require.NoError(t, err)
require.Equal(t, v2, r2)

hasNext, err = concat.Next()
require.NoError(t, err)
require.True(t, hasNext)

r3, err := concat.Value()
require.NoError(t, err)
require.Equal(t, v3, r3)

hasNext, err = concat.Next()
require.NoError(t, err)
require.True(t, hasNext)

// Assert that source3's items are yielded after
// source2's
r4, err := concat.Value()
require.NoError(t, err)
require.Equal(t, v4, r4)

hasNext, err = concat.Next()
require.NoError(t, err)
require.True(t, hasNext)

r5, err := concat.Value()
require.NoError(t, err)
require.Equal(t, v5, r5)

// Then assert that source1's items are yielded
// as the concat circles back round
hasNext, err = concat.Next()
require.NoError(t, err)
require.True(t, hasNext)

r6, err := concat.Value()
require.NoError(t, err)
require.Equal(t, v6, r6)

hasNext, err = concat.Next()
require.NoError(t, err)
require.False(t, hasNext)
}
154 changes: 154 additions & 0 deletions enumerable/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package enumerable

// Queue is an extention of the enumerable interface allowing individual
// items to be added into the enumerable.
//
// Added items will be yielded in a FIFO order. Items may be added after
// enumeration has begun.
type Queue[T any] interface {
Enumerable[T]
// Put adds an item to the queue.
Put(T) error
// Size returns the current length of the backing array.
//
// This may include empty space where yield items previously resided.
// Useful for testing and debugging.
Size() int
}

// For now, increasing the size one at a time is likely optimal
// for the only useage of the queue type. We may wish to change
// this at somepoint however.
const growthSize int = 1

type queue[T any] struct {
// The values slice of this queue.
//
// Note: queue is implementated as a dynamically sized ring buffer, the zero index
// is not nessecarily the next/current value. Also note that values are not explicitly
// removed from this slice, which values are still 'in' the queue is tracked by index.
values []T

// The index of the current value.
currentIndex int

// The index of the last value added to the queue.
lastSetIndex int

// Will be true if values[0] has been set.
zeroIndexSet bool

// Will be true a value has been attempted to be read from an empty queue.
waitingForWrite bool
}

var _ Queue[any] = (*queue[any])(nil)

// NewQueue creates an empty FIFO queue.
//
// It is implemented using a dynamically sized ring-buffer.
func NewQueue[T any]() Queue[T] {
return &queue[T]{
values: []T{},
currentIndex: -1,
lastSetIndex: -1,
}
}

func (q *queue[T]) Put(value T) error {
index := q.lastSetIndex + 1

if index >= len(q.values) {
if len(q.values) == 0 {
q.values = make([]T, growthSize)
q.currentIndex = -1
} else if q.zeroIndexSet {
// If the zero index is occupied, we cannot loop back to it here
// and instead need to grow the values slice.
newValues := make([]T, len(q.values)+growthSize)
copy(newValues, q.values[:index])
q.values = newValues
} else {
index = 0
if q.currentIndex >= len(q.values) {
q.currentIndex = -1
}
}
} else if index == q.currentIndex {
// If the write index has caught up to the read index
// the new value needs to be written between the two
// e.g: [3,4,here,1,2]
// Note: The last value read should not be overwritten, as `Value`
// may be called multiple times on it after a single `Next` call.
newValues := make([]T, len(q.values)+growthSize)
copy(newValues, q.values[:index])
copy(newValues[index+growthSize:], q.values[index:])
q.values = newValues
// Shift the current read index to reflect its new location.
q.currentIndex += growthSize
}

if index == 0 {
q.zeroIndexSet = true
}

q.values[index] = value
q.lastSetIndex = index

return nil
}

func (q *queue[T]) Next() (bool, error) {
// If the previous index was the zero-index the value is consumed (implicitly), so we update
// the flag here.
if q.currentIndex == 0 {
q.zeroIndexSet = false
}

nextIndex := q.currentIndex + 1
var hasValue bool
if nextIndex >= len(q.values) {
if q.zeroIndexSet {
// Circle back to the beginning
nextIndex = 0
hasValue = true
} else {
hasValue = false
if q.currentIndex == len(q.values) {
// If we have reached the end of the values slice, and the previous
// index was already out of bounds, we should avoid growing it further.
nextIndex = q.currentIndex
}
}
} else {
// If the previous read index was the last index written to then the value has been
// consumed and we have reached the edge of the ring: [v2, v3,^we are here, , v1]
hasValue = q.currentIndex != q.lastSetIndex
}

q.currentIndex = nextIndex
q.waitingForWrite = !hasValue
return hasValue, nil
}

func (q *queue[T]) Value() (T, error) {
// The read index might be out of bounds at this point (either outside the slice, or the ring)
// and we should not return a value here if that is the case.
if q.waitingForWrite {
var zero T
return zero, nil
}
return q.values[q.currentIndex], nil
}

func (q *queue[T]) Reset() {
q.values = []T{}
q.currentIndex = -1
q.lastSetIndex = -1
q.zeroIndexSet = false
q.waitingForWrite = false
}

func (q *queue[T]) Size() int {
return len(q.values)
}
Loading

0 comments on commit 65c38cd

Please sign in to comment.