-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
define reactive model for concurrency #261
Labels
feature
New feature or request
Comments
abandoning this strategy |
some legacy concurrent client test code (orpheus(alpha)) (this has been replaced by scorpio, which contains an improved version using a pipeline) package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
"github.com/snivilised/extendio/async"
)
// Initialize the random number generator.
func init() { rand.Seed(time.Now().Unix()) }
// Should we create a new project called lorax, our alternative reactive
// module for golang.
type MyJobInput struct {
sequenceNo int // allocated by observer
Recipient string
}
func (i MyJobInput) SequenceNo() int {
return i.sequenceNo
}
type MyJobResult = string
type MyResultChan chan async.JobResult[MyJobResult]
const (
BatchSize = 13
JobChSize = 10
ResultChSize = 10
)
type exec struct {
}
func (e *exec) Invoke(j async.Job[MyJobInput]) (async.JobResult[MyJobResult], error) {
r := rand.Intn(1000) + 1 //nolint:gosec,gomnd // trivial
delay := time.Millisecond * time.Duration(r)
time.Sleep(delay)
result := async.JobResult[MyJobResult]{
Payload: fmt.Sprintf(" ---> exec.Invoke [Seq: %v]🍉 Hello: '%v'", j.Input.SequenceNo(), j.Input.Recipient),
}
fmt.Println(result.Payload)
return result, nil
}
type observable struct {
sequenceNo int
audience []string
batchSize int
JobsCh async.JobStream[MyJobInput]
quit *sync.WaitGroup
Count int
}
func newObservable(ctx context.Context, wg *sync.WaitGroup, capacity int) *observable {
producer := observable{
audience: []string{
"paul", "phil", "lindsey", "kaz", "kerry",
"nick", "john", "raj", "jim", "mark", "robyn",
},
batchSize: BatchSize,
JobsCh: make(async.JobStream[MyJobInput], capacity),
quit: wg,
}
go producer.start(ctx)
return &producer
}
func (o *observable) start(ctx context.Context) {
defer func() {
close(o.JobsCh)
fmt.Printf("===> observable finished (Quit). ✨✨✨ \n")
o.quit.Done()
}()
fmt.Printf("===> ✨ observable.start ...\n")
userCh := make(chan string)
go func() {
fmt.Println("---> 🧲 type a key to continue ...")
fmt.Scanln()
fmt.Println("---> 🧲 terminating ...")
// This event must be broadcasted; the worker-pool is not aware of this
// event and continues to send ghost jobs?!
//
userCh <- "done"
close(userCh)
}()
for running := true; running; {
select {
case <-ctx.Done():
running = false
case <-userCh:
running = false
fmt.Printf("---> ✨ observable termination detected (running: %v)\n", running)
default:
fmt.Printf("---> ✨ observable.start/default(running: %v) ...\n", running)
o.batch()
}
}
}
func (o *observable) batch() {
r := rand.Intn(10) + 1 //nolint:gosec,gomnd // trivial
unit := time.Second / 10 //nolint:gomnd // trivial
base := time.Second
delay := base + (unit * time.Duration(r))
time.Sleep(delay)
for r := 0; r <= o.batchSize; r++ {
o.sequenceNo++
recipient := rand.Intn(len(o.audience)) //nolint:gosec // trivial
j := async.Job[MyJobInput]{
Input: MyJobInput{
Recipient: o.audience[recipient],
sequenceNo: o.sequenceNo,
},
}
o.Count++
o.JobsCh <- j
}
fmt.Printf("===> end of batch (%v). ✨\n", delay)
}
type MyJobsStreamIn = async.ResultStream[MyJobResult]
type observer struct {
ResultsCh MyJobsStreamIn
quit *sync.WaitGroup
Count int
}
func newObserver(ctx context.Context, wg *sync.WaitGroup, resultsCh MyJobsStreamIn) *observer {
consumer := &observer{
ResultsCh: resultsCh,
quit: wg,
}
go consumer.start(ctx)
return consumer
}
func (o *observer) start(ctx context.Context) {
defer func() {
fmt.Printf("===> observer finished (Quit). 💠💠💠 \n")
o.quit.Done()
}()
fmt.Printf("===> 💠 observable.start ...\n")
for running := true; running; {
select {
case <-ctx.Done():
running = false
case result, ok := <-o.ResultsCh:
if ok {
o.Count++
fmt.Printf("---> 💠 new result arrived(#%v): '%+v' \n", o.Count, result)
} else {
running = false
fmt.Printf("---> 💠 no more results available (running: %+v)\n", running)
}
}
}
}
func main() {
var wg sync.WaitGroup
fmt.Println("---> 🎯 orpheus(alpha) ...")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resultsCh := make(chan async.JobResult[MyJobResult], ResultChSize)
wg.Add(1)
fmt.Printf(" >>> 👾 WAIT-GROUP ADD(observable)\n")
producer := newObservable(ctx, &wg, JobChSize)
pool := async.NewWorkerPool[MyJobInput, MyJobResult](&async.NewWorkerPoolParams[MyJobInput, MyJobResult]{
Exec: &exec{},
JobsCh: producer.JobsCh,
Cancel: make(async.CancelStream),
Quit: &wg,
})
wg.Add(1)
fmt.Printf(" >>> 👾 WAIT-GROUP ADD(worker-pool)\n")
go pool.Run(ctx, resultsCh)
wg.Add(1)
fmt.Printf(" >>> 👾 WAIT-GROUP ADD(observer)\n")
consumer := newObserver(ctx, &wg, resultsCh)
wg.Wait()
fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
consumer.Count,
producer.Count,
)
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
references:
sub tasks:
The text was updated successfully, but these errors were encountered: