Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: groundwork - add pkg/concurrency and the associated test file #2745

Merged
merged 23 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8665ae8
groundwork: add pkg/concurrency and the associated test case - this w…
dave-gray101 Jul 8, 2024
a6bceda
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 8, 2024
fa2b7d7
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 9, 2024
288077e
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 9, 2024
55c1fab
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 9, 2024
42381a9
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 10, 2024
b355ac8
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 10, 2024
05fd7cb
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 11, 2024
08d56bf
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 11, 2024
b28624e
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 11, 2024
86a63ff
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 11, 2024
4573699
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 11, 2024
7f92ffd
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 12, 2024
de77a9f
Merge branch 'master' into gw-pkg-concurrency
mudler Jul 17, 2024
b2bb8bc
Merge branch 'master' into gw-pkg-concurrency
dave-gray101 Jul 17, 2024
67777b2
add ctx, use once, part 1
dave-gray101 Jul 17, 2024
4edf494
timeout tests and style fix
dave-gray101 Jul 17, 2024
d3b220e
add missing suite test files preventing tests from running
dave-gray101 Jul 17, 2024
8c0f388
fix names
dave-gray101 Jul 17, 2024
71fa178
fix test
dave-gray101 Jul 17, 2024
4e038f8
fix test comparison
dave-gray101 Jul 17, 2024
0a25375
put the correct body in place
dave-gray101 Jul 17, 2024
66012c1
Merge branch 'master' into gw-pkg-concurrency
mudler Jul 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/concurrency/concurrency_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package concurrency

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestConcurrency(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Concurrency test suite")
}
69 changes: 69 additions & 0 deletions pkg/concurrency/jobresult.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package concurrency

import (
"context"
"sync"
)

// This is a Read-ONLY structure that contains the result of an arbitrary asynchronous action
type JobResult[RequestType any, ResultType any] struct {
request *RequestType
result *ResultType
err error
once sync.Once
done *chan struct{}
}

// This structure is returned in a pair with a JobResult and serves as the structure that has access to be updated.
type WritableJobResult[RequestType any, ResultType any] struct {
*JobResult[RequestType, ResultType]
}

// Wait blocks until the result is ready and then returns the result, or the context expires.
// Returns *ResultType instead of ResultType since its possible we have only an error and nil for ResultType.
// Is this correct and idiomatic?
func (jr *JobResult[RequestType, ResultType]) Wait(ctx context.Context) (*ResultType, error) {
if jr.done == nil { // If the channel is blanked out, result is ready.
return jr.result, jr.err
}
select {
case <-*jr.done: // Wait for the result to be ready
jr.done = nil
if jr.err != nil {
return nil, jr.err
}
return jr.result, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

// Accessor function to allow holders of JobResults to access the associated request, without allowing the pointer to be updated.
func (jr *JobResult[RequestType, ResultType]) Request() *RequestType {
return jr.request
}

// This is the function that actually updates the Result and Error on the JobResult... but it's normally not accessible
func (jr *JobResult[RequestType, ResultType]) setResult(result ResultType, err error) {
jr.once.Do(func() {
jr.result = &result
jr.err = err
close(*jr.done) // Signal that the result is ready - since this is only ran once, jr.done cannot be set to nil yet.
})
}

// Only the WritableJobResult can actually call setResult - prevents accidental corruption
func (wjr *WritableJobResult[RequestType, ResultType]) SetResult(result ResultType, err error) {
wjr.JobResult.setResult(result, err)
}

// NewJobResult binds a request to a matched pair of JobResult and WritableJobResult
func NewJobResult[RequestType any, ResultType any](request RequestType) (*JobResult[RequestType, ResultType], *WritableJobResult[RequestType, ResultType]) {
done := make(chan struct{})
jr := &JobResult[RequestType, ResultType]{
once: sync.Once{},
request: &request,
done: &done,
}
return jr, &WritableJobResult[RequestType, ResultType]{JobResult: jr}
}
80 changes: 80 additions & 0 deletions pkg/concurrency/jobresult_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package concurrency_test

import (
"context"
"fmt"
"time"

. "github.com/mudler/LocalAI/pkg/concurrency"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("pkg/concurrency unit tests", func() {
It("can be used to recieve a result across goroutines", func() {
jr, wjr := NewJobResult[string, string]("foo")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a style nitpick, so feel free to ignore.

I have to say the pattern is a bit confusing to me, I would expect something like (pseudo-code ahead):

asyncJob := func (wjr *WritableJobResult[string, string]) {
  // ... do something to write the job result
} 
futureResult := NewAsyncOperation(asyncJob)
result := futureResult.Wait()

Copy link
Owner

@mudler mudler Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might also actually want even a ctx to pass down there to be sure we can short-circuits Waits call as well, but that is not a biggie as can be handled in the async func

type AsyncJobResult[T] *WritableJobResult[T, T]
asyncJob := func (ajr AsyncJobResult) {
  // ... do something to write the job result
} 
futureResult := NewAsyncOperation(asyncJob, opts ...)
result := futureResult.Wait(ctx)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context on wait is a really good idea, I'll add that in

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cleaned up the style in these tests a bit - there's no need to use an explicitly distinct "reader" goroutine.
I also added in a context parameter, to make things a lot more flexible. Now, with context.Background() things work as before, blocking for all time until a request arrives - but by adding a timeout here, the reader can indicate when to break that wait lock.

Notably, I didn't interpret canceling the read-only side Wait() call to indicate that the actual underlying task should be canceled - I'd assume that would be on the writable side. The test I've added that exercises contexts should validate that multiple readers play nice and allow early dropouts.

Expect(jr).ToNot(BeNil())
Expect(wjr).ToNot(BeNil())

go func(wjr *WritableJobResult[string, string]) {
time.Sleep(time.Second * 5)
wjr.SetResult("bar", nil)
}(wjr)

resPtr, err := jr.Wait(context.Background())
Expect(err).To(BeNil())
Expect(jr.Request).ToNot(BeNil())
Expect(*jr.Request()).To(Equal("foo"))
Expect(resPtr).ToNot(BeNil())
Expect(*resPtr).To(Equal("bar"))

})

It("can be used to recieve an error across goroutines", func() {
jr, wjr := NewJobResult[string, string]("foo")
Expect(jr).ToNot(BeNil())
Expect(wjr).ToNot(BeNil())

go func(wjr *WritableJobResult[string, string]) {
time.Sleep(time.Second * 5)
wjr.SetResult("", fmt.Errorf("test"))
}(wjr)

_, err := jr.Wait(context.Background())
Expect(jr.Request).ToNot(BeNil())
Expect(*jr.Request()).To(Equal("foo"))
Expect(err).ToNot(BeNil())
Expect(err).To(MatchError("test"))
})

It("can properly handle timeouts", func() {
jr, wjr := NewJobResult[string, string]("foo")
Expect(jr).ToNot(BeNil())
Expect(wjr).ToNot(BeNil())

go func(wjr *WritableJobResult[string, string]) {
time.Sleep(time.Second * 5)
wjr.SetResult("bar", nil)
}(wjr)

timeout1s, c1 := context.WithTimeoutCause(context.Background(), time.Second, fmt.Errorf("timeout"))
timeout10s, c2 := context.WithTimeoutCause(context.Background(), time.Second*10, fmt.Errorf("timeout"))

_, err := jr.Wait(timeout1s)
Expect(jr.Request).ToNot(BeNil())
Expect(*jr.Request()).To(Equal("foo"))
Expect(err).ToNot(BeNil())
Expect(err).To(MatchError(context.DeadlineExceeded))

resPtr, err := jr.Wait(timeout10s)
Expect(jr.Request).ToNot(BeNil())
Expect(*jr.Request()).To(Equal("foo"))
Expect(err).To(BeNil())
Expect(resPtr).ToNot(BeNil())
Expect(*resPtr).To(Equal("bar"))

// Is this needed? Cleanup Either Way.
c1()
c2()
})
})
13 changes: 13 additions & 0 deletions pkg/downloader/downloader_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package downloader

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestDownloader(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Downloader test suite")
}
Loading