Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

define reactive model for concurrency #261

Closed
6 of 11 tasks
plastikfan opened this issue May 10, 2023 · 2 comments
Closed
6 of 11 tasks

define reactive model for concurrency #261

plastikfan opened this issue May 10, 2023 · 2 comments
Assignees
Labels
feature New feature or request

Comments

@plastikfan
Copy link
Contributor

plastikfan commented May 10, 2023

references:

sub tasks:

@plastikfan plastikfan added the feature New feature or request label May 10, 2023
@plastikfan plastikfan self-assigned this May 10, 2023
@plastikfan
Copy link
Contributor Author

abandoning this strategy

@plastikfan plastikfan pinned this issue Aug 18, 2023
@plastikfan
Copy link
Contributor Author

some legacy concurrent client test code (orpheus(alpha)) (this has been replaced by scorpio, which contains an improved version using a pipeline)

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"

	"github.com/snivilised/extendio/async"
)

// Initialize the random number generator.
func init() { rand.Seed(time.Now().Unix()) }

// Should we create a new project called lorax, our alternative reactive
// module for golang.

type MyJobInput struct {
	sequenceNo int // allocated by observer
	Recipient  string
}

func (i MyJobInput) SequenceNo() int {
	return i.sequenceNo
}

type MyJobResult = string
type MyResultChan chan async.JobResult[MyJobResult]

const (
	BatchSize    = 13
	JobChSize    = 10
	ResultChSize = 10
)

type exec struct {
}

func (e *exec) Invoke(j async.Job[MyJobInput]) (async.JobResult[MyJobResult], error) {
	r := rand.Intn(1000) + 1 //nolint:gosec,gomnd // trivial
	delay := time.Millisecond * time.Duration(r)
	time.Sleep(delay)

	result := async.JobResult[MyJobResult]{
		Payload: fmt.Sprintf("	---> exec.Invoke [Seq: %v]🍉 Hello: '%v'", j.Input.SequenceNo(), j.Input.Recipient),
	}
	fmt.Println(result.Payload)

	return result, nil
}

type observable struct {
	sequenceNo int
	audience   []string
	batchSize  int
	JobsCh     async.JobStream[MyJobInput]
	quit       *sync.WaitGroup
	Count      int
}

func newObservable(ctx context.Context, wg *sync.WaitGroup, capacity int) *observable {
	producer := observable{
		audience: []string{
			"paul", "phil", "lindsey", "kaz", "kerry",
			"nick", "john", "raj", "jim", "mark", "robyn",
		},
		batchSize: BatchSize,
		JobsCh:    make(async.JobStream[MyJobInput], capacity),
		quit:      wg,
	}
	go producer.start(ctx)

	return &producer
}

func (o *observable) start(ctx context.Context) {
	defer func() {
		close(o.JobsCh)
		fmt.Printf("===> observable finished (Quit). ✨✨✨ \n")
		o.quit.Done()
	}()

	fmt.Printf("===> ✨ observable.start ...\n")

	userCh := make(chan string)
	go func() {
		fmt.Println("---> 🧲 type a key to continue ...")
		fmt.Scanln()
		fmt.Println("---> 🧲 terminating ...")

		// This event must be broadcasted; the worker-pool is not aware of this
		// event and continues to send ghost jobs?!
		//
		userCh <- "done"
		close(userCh)
	}()

	for running := true; running; {
		select {
		case <-ctx.Done():
			running = false

		case <-userCh:
			running = false
			fmt.Printf("---> ✨ observable termination detected (running: %v)\n", running)

		default:
			fmt.Printf("---> ✨ observable.start/default(running: %v) ...\n", running)
			o.batch()
		}
	}
}

func (o *observable) batch() {
	r := rand.Intn(10) + 1   //nolint:gosec,gomnd // trivial
	unit := time.Second / 10 //nolint:gomnd // trivial
	base := time.Second
	delay := base + (unit * time.Duration(r))
	time.Sleep(delay)

	for r := 0; r <= o.batchSize; r++ {
		o.sequenceNo++
		recipient := rand.Intn(len(o.audience)) //nolint:gosec // trivial
		j := async.Job[MyJobInput]{
			Input: MyJobInput{
				Recipient:  o.audience[recipient],
				sequenceNo: o.sequenceNo,
			},
		}
		o.Count++
		o.JobsCh <- j
	}

	fmt.Printf("===> end of batch (%v). ✨\n", delay)
}

type MyJobsStreamIn = async.ResultStream[MyJobResult]

type observer struct {
	ResultsCh MyJobsStreamIn
	quit      *sync.WaitGroup
	Count     int
}

func newObserver(ctx context.Context, wg *sync.WaitGroup, resultsCh MyJobsStreamIn) *observer {
	consumer := &observer{
		ResultsCh: resultsCh,
		quit:      wg,
	}
	go consumer.start(ctx)

	return consumer
}

func (o *observer) start(ctx context.Context) {
	defer func() {
		fmt.Printf("===> observer finished (Quit). 💠💠💠 \n")
		o.quit.Done()
	}()
	fmt.Printf("===> 💠 observable.start ...\n")

	for running := true; running; {
		select {
		case <-ctx.Done():
			running = false

		case result, ok := <-o.ResultsCh:
			if ok {
				o.Count++
				fmt.Printf("---> 💠 new result arrived(#%v): '%+v' \n", o.Count, result)
			} else {
				running = false
				fmt.Printf("---> 💠 no more results available (running: %+v)\n", running)
			}
		}
	}
}

func main() {
	var wg sync.WaitGroup

	fmt.Println("---> 🎯 orpheus(alpha) ...")

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	resultsCh := make(chan async.JobResult[MyJobResult], ResultChSize)

	wg.Add(1)
	fmt.Printf("		>>> 👾 WAIT-GROUP ADD(observable)\n")

	producer := newObservable(ctx, &wg, JobChSize)
	pool := async.NewWorkerPool[MyJobInput, MyJobResult](&async.NewWorkerPoolParams[MyJobInput, MyJobResult]{
		Exec:   &exec{},
		JobsCh: producer.JobsCh,
		Cancel: make(async.CancelStream),
		Quit:   &wg,
	})

	wg.Add(1)
	fmt.Printf("		>>> 👾 WAIT-GROUP ADD(worker-pool)\n")

	go pool.Run(ctx, resultsCh)

	wg.Add(1)
	fmt.Printf("		>>> 👾 WAIT-GROUP ADD(observer)\n")

	consumer := newObserver(ctx, &wg, resultsCh)

	wg.Wait()
	fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
		consumer.Count,
		producer.Count,
	)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant