-
Notifications
You must be signed in to change notification settings - Fork 5
/
tasks.go
101 lines (85 loc) · 2.11 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main
import (
"context"
"errors"
"fmt"
)
// Task represents an interface for executing tasks.
type Task interface {
Execute(ctx context.Context) error
}
// TaskFunc represents a function that can be executed as a task.
type TaskFunc func(ctx context.Context) error
func (tf TaskFunc) Execute(ctx context.Context) error {
return tf(ctx)
}
// ExecuteTasks executes a list of tasks concurrently.
// It takes a context, a list of tasks, and the number of workers to execute the tasks.
// It cancels remaining tasks if any of the tasks fail and returns an error.
func ExecuteTasks[T Task](ctx context.Context, tasks []T, workers int) error {
if len(tasks) == 0 || workers == 0 {
return nil
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Revise the number of workers.
workers = min(workers, len(tasks))
// Create a semaphore to limit the number of concurrent workers.
semaphore := make(chan struct{}, workers)
for i := 0; i < workers; i++ {
semaphore <- struct{}{}
}
// Create a channel to collect errors from tasks.
errCh := make(chan error, workers)
// Define a worker function that executes a task.
workerFunc := func(t Task) bool {
select {
case <-ctx.Done():
return false
case <-semaphore:
go func() {
// Release the semaphore when the task is done.
defer func() {
semaphore <- struct{}{}
}()
if err := t.Execute(ctx); err != nil {
// Cancel the context if any task fails.
cancel()
if !errors.Is(err, context.Canceled) {
errCh <- err
}
}
}()
return true
}
}
// Execute each task concurrently.
for _, task := range tasks {
if !workerFunc(task) {
break
}
}
// Drain the semaphore.
for i := 0; i < workers; i++ {
<-semaphore
}
close(errCh)
close(semaphore)
// Return the first error that occurs.
nErrors := len(errCh)
if nErrors > 0 {
err := <-errCh
for range errCh {
// Drain the error channel to make it garbage collected.
}
return fmt.Errorf(
"failed to execute concurrently %d tasks, first error: %w",
len(tasks),
err,
)
}
if ctx.Err() != nil {
return ctx.Err()
}
return nil
}