Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add task executor implementation #36

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Async is a synchronization and asynchronous computation package for Go.
* **ShardedMap** - Implements the generic `async.Map` interface in a thread-safe manner, delegating load/store operations to one of the underlying `async.SynchronizedMap`s (shards), using a key hash to calculate the shard number.
* **Future** - A placeholder object for a value that may not yet exist.
* **Promise** - While futures are defined as a type of read-only placeholder object created for a result which doesn’t yet exist, a promise can be thought of as a writable, single-assignment container, which completes a future.
* **Executor** - A worker pool for executing asynchronous tasks, where each submission returns a Future instance representing the result of the task.
* **Task** - A data type for controlling possibly lazy and asynchronous computations.
* **Once** - An object similar to sync.Once having the Do method taking `f func() (T, error)` and returning `(T, error)`.
* **Value** - An object similar to atomic.Value, but without the consistent type constraint.
Expand Down
38 changes: 34 additions & 4 deletions examples/future/main.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,56 @@
package main

import (
"context"
"log"
"time"

"github.com/reugn/async"
)

const ok = "OK"

func main() {
future := asyncAction()
result, err := future.Join()
// using a promise
future1 := asyncAction()
result1, err := future1.Join()
if err != nil {
log.Fatal(err)
}
log.Print(result1)

// using a task
task := async.NewTask(func() (string, error) { return ok, nil })
result2, err := task.Call().Join()
if err != nil {
log.Fatal(err)
}
log.Print(result2)

// using the executor
ctx := context.Background()
executor := async.NewExecutor[*string](ctx, async.NewExecutorConfig(2, 2))

future3, err := executor.Submit(func(_ context.Context) (*string, error) {
value := ok
return &value, nil
})
if err != nil {
log.Fatal(err)
}

result3, err := future3.Get(ctx)
if err != nil {
log.Fatal(err)
}
log.Print(result)
log.Print(*result3)
}

func asyncAction() async.Future[string] {
promise := async.NewPromise[string]()
go func() {
time.Sleep(time.Second)
promise.Success("OK")
promise.Success(ok)
}()

return promise.Future()
Expand Down
154 changes: 154 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package async

import (
"context"
"errors"
"sync"
"sync/atomic"
)

// ExecutorStatus represents the status of an [ExecutorService].
type ExecutorStatus uint32

const (
ExecutorStatusRunning ExecutorStatus = iota
ExecutorStatusTerminating
ExecutorStatusShutdown
)

var (
ErrExecutorQueueFull = errors.New("async: executor queue is full")
ErrExecutorShutdown = errors.New("async: executor is shut down")
)

// ExecutorService is an interface that defines a task executor.
type ExecutorService[T any] interface {
// Submit submits a function to the executor service.
// The function will be executed asynchronously and the result will be
// available via the returned future.
Submit(func(context.Context) (T, error)) (Future[T], error)

// Shutdown shuts down the executor service.
// Once the executor service is shut down, no new tasks can be submitted
// and any pending tasks will be cancelled.
Shutdown() error

// Status returns the current status of the executor service.
Status() ExecutorStatus
}

// ExecutorConfig represents the Executor configuration.
type ExecutorConfig struct {
WorkerPoolSize int
QueueSize int
}

// NewExecutorConfig returns a new [ExecutorConfig].
func NewExecutorConfig(workerPoolSize, queueSize int) *ExecutorConfig {
return &ExecutorConfig{
WorkerPoolSize: workerPoolSize,
QueueSize: queueSize,
}
}

// Executor implements the [ExecutorService] interface.
type Executor[T any] struct {
cancel context.CancelFunc
queue chan job[T]
status atomic.Uint32
}

var _ ExecutorService[any] = (*Executor[any])(nil)

type job[T any] struct {
promise Promise[T]
task func(context.Context) (T, error)
}

// NewExecutor returns a new [Executor].
func NewExecutor[T any](ctx context.Context, config *ExecutorConfig) *Executor[T] {
ctx, cancel := context.WithCancel(ctx)
executor := &Executor[T]{
cancel: cancel,
queue: make(chan job[T], config.QueueSize),
}
// init the workers pool
go executor.startWorkers(ctx, config.WorkerPoolSize)

// set status to terminating when ctx is done
go executor.monitorCtx(ctx)

// set the executor status to running
executor.status.Store(uint32(ExecutorStatusRunning))

return executor
}

func (e *Executor[T]) monitorCtx(ctx context.Context) {
<-ctx.Done()
e.status.Store(uint32(ExecutorStatusTerminating))
}

func (e *Executor[T]) startWorkers(ctx context.Context, poolSize int) {
var wg sync.WaitGroup
for i := 0; i < poolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
loop:
for ExecutorStatus(e.status.Load()) == ExecutorStatusRunning {
select {
case job := <-e.queue:
result, err := job.task(ctx)
if err != nil {
job.promise.Failure(err)
} else {
job.promise.Success(result)
}
case <-ctx.Done():
break loop
}
}
}()
}

// wait for all workers to exit
wg.Wait()
// close the queue and cancel all pending tasks
close(e.queue)
for job := range e.queue {
job.promise.Failure(ErrExecutorShutdown)
}
// mark the executor as shut down
e.status.Store(uint32(ExecutorStatusShutdown))
}

// Submit submits a function to the executor.
// The function will be executed asynchronously and the result will be
// available via the returned future.
func (e *Executor[T]) Submit(f func(context.Context) (T, error)) (Future[T], error) {
promise := NewPromise[T]()
if ExecutorStatus(e.status.Load()) == ExecutorStatusRunning {
select {
case e.queue <- job[T]{promise, f}:
default:
return nil, ErrExecutorQueueFull
}
} else {
return nil, ErrExecutorShutdown
}
return promise.Future(), nil
}

// Shutdown shuts down the executor.
// Once the executor service is shut down, no new tasks can be submitted
// and any pending tasks will be cancelled.
func (e *Executor[T]) Shutdown() error {
e.cancel()
return nil
}

// Status returns the current status of the executor.
func (e *Executor[T]) Status() ExecutorStatus {
return ExecutorStatus(e.status.Load())
}
89 changes: 89 additions & 0 deletions executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package async

import (
"context"
"runtime"
"testing"
"time"

"github.com/reugn/async/internal/assert"
)

func TestExecutor(t *testing.T) {
ctx := context.Background()
executor := NewExecutor[int](ctx, NewExecutorConfig(2, 2))

job := func(_ context.Context) (int, error) {
time.Sleep(time.Millisecond)
return 1, nil
}
jobLong := func(_ context.Context) (int, error) {
time.Sleep(10 * time.Millisecond)
return 1, nil
}

future1 := submitJob[int](t, executor, job)
future2 := submitJob[int](t, executor, job)

// wait for the first two jobs to complete
time.Sleep(3 * time.Millisecond)

// submit four more jobs
future3 := submitJob[int](t, executor, jobLong)
future4 := submitJob[int](t, executor, jobLong)
future5 := submitJob[int](t, executor, jobLong)
future6 := submitJob[int](t, executor, jobLong)

// the queue has reached its maximum capacity
future7, err := executor.Submit(job)
assert.ErrorIs(t, err, ErrExecutorQueueFull)
assert.IsNil(t, future7)

assert.Equal(t, executor.Status(), ExecutorStatusRunning)

routines := runtime.NumGoroutine()

// shut down the executor
executor.Shutdown()
time.Sleep(time.Millisecond)

// verify that submit fails after the executor was shut down
_, err = executor.Submit(job)
assert.ErrorIs(t, err, ErrExecutorShutdown)

// validate the executor status
assert.Equal(t, executor.Status(), ExecutorStatusTerminating)
time.Sleep(10 * time.Millisecond)
assert.Equal(t, executor.Status(), ExecutorStatusShutdown)

assert.Equal(t, routines, runtime.NumGoroutine()+4)

assertFutureResult(t, 1, future1, future2, future3, future4)
assertFutureError(t, ErrExecutorShutdown, future5, future6)
}

func submitJob[T any](t *testing.T, executor ExecutorService[T],
f func(context.Context) (T, error)) Future[T] {
future, err := executor.Submit(f)
assert.IsNil(t, err)

time.Sleep(time.Millisecond) // switch context
return future
}

func assertFutureResult[T any](t *testing.T, expected T, futures ...Future[T]) {
for _, future := range futures {
result, err := future.Join()
assert.IsNil(t, err)
assert.Equal(t, expected, result)
}
}

func assertFutureError[T any](t *testing.T, expected error, futures ...Future[T]) {
for _, future := range futures {
result, err := future.Join()
var zero T
assert.Equal(t, zero, result)
assert.ErrorIs(t, err, expected)
}
}
Loading