Skip to content

Commit

Permalink
Merge pull request #3 from n0rdy/feature/init_stage_configs
Browse files Browse the repository at this point in the history
Provided a possibility to configure initial stage of the pipeline
  • Loading branch information
n0rdy authored Nov 20, 2023
2 parents 354c944 + a15252a commit 6b562b0
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 23 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ There still might be some bugs, so please, feel free to report them.
We've had one, yes. But what about second ~~breakfast~~ library?

## Table of contents
* [Installation](#installation)
* [Usage](#usage)
* [Simple example](#simple-example)
* [More detailed example](#more-detailed-example)
* [Documentation](#documentation)
* [Concepts](#concepts)
* [Pipeline](#pipeline)
Expand Down Expand Up @@ -244,6 +247,7 @@ It is possible to change the limit for each stage individually - see `configs.St
- `Logger` is a logger that will be used by the pipeline.
If it is passed as nil, then the `logging.NoOpsLogger` logger will be used that does nothing.
Check `logging` package for more details and predefined loggers.
- `InitStageConfig` is a configuration for the initial stage. See `configs.StageConfig` for more details.

If you pipeline performs any network calls within its transformation/aggregation logic, I'd suggest configuring the maximum number of goroutines to prevent the possible DDoS attack on the target server or reaching the maximum number of open files on the client machine.

Expand Down
6 changes: 5 additions & 1 deletion configs/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"
)

// PipelineConfig is a struct that contains the configuration for a pipeline
// PipelineConfig is a struct that contains the configuration for a pipeline.
//
// [PipelineConfig.ManualStart] is a boolean that indicates whether the pipeline should be started manually.
// If it is passed as true, the pipeline will not start automatically on creation, and it's up to the user to start it by calling the [pipeline.Pipeline.Start] method.
Expand All @@ -25,10 +25,14 @@ import (
//
// [PipelineConfig.Logger] is a logger that will be used by the pipeline.
// If it is passed as nil, then the [logging.NoOpsLogger] logger will be used that does nothing.
//
// [PipelineConfig.InitStageConfig] is a config for the init stage.
// See [StageConfig] for more details.
type PipelineConfig struct {
ManualStart bool
MaxGoroutinesTotal int
MaxGoroutinesPerStage int
Timeout time.Duration
Logger logging.Logger
InitStageConfig *StageConfig
}
118 changes: 107 additions & 11 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/n0rdy/pippin/types"
"github.com/n0rdy/pippin/types/statuses"
"github.com/n0rdy/pippin/utils"
"strconv"
"time"
)

Expand Down Expand Up @@ -44,6 +45,12 @@ type parsedConfigs struct {
stageRateLimiter *ratelimiter.RateLimiter
timeout time.Duration
logger logging.Logger
initStageConfig *parsedInitStageConfigs
}

type parsedInitStageConfigs struct {
timeout time.Duration
logger logging.Logger
}

// Start starts the pipeline if it was created with the delayed manual start.
Expand Down Expand Up @@ -139,6 +146,11 @@ func FromChannel[T any](fromCh <-chan T, confs ...configs.PipelineConfig) *Pipel
// If the limit is reached, then the pipeline will wait until the number of goroutines is decreased.
//
// The [configs.PipelineConfig.Timeout] config can be used to set the timeout for the pipeline.
//
// Use [configs.PipelineConfig.Logger] to set the logger for the pipeline.
//
// The [configs.PipelineConfig.InitStageConfig] config can be used to configure the initial stage.
// See [configs.StageConfig] for more details.
func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfig) *Pipeline[T] {
initChan := make(chan T)
pipelineStatusChan := make(chan statuses.Status)
Expand All @@ -151,9 +163,9 @@ func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfi
if starter != nil {
stageStarter = make(chan struct{})
}
logger := pc.logger
pipelineLogger := pc.logger

logger.Debug("Pipeline: initiating...")
pipelineLogger.Debug("Pipeline: initiating...")

var status statuses.Status
if starter == nil {
Expand All @@ -165,55 +177,87 @@ func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfi
p := &Pipeline[T]{
InitStage: stages.NewInitStage(
initChan, pc.pipelineRateLimiter, pc.stageRateLimiter, stageStarter,
ctx, ctxCancelFunc, pipelineStatusChan, logger,
ctx, ctxCancelFunc, pipelineStatusChan, pipelineLogger,
),
Status: status,
rateLimiter: pc.pipelineRateLimiter,
starter: starter,
ctx: ctx,
ctxCancelFunc: ctxCancelFunc,
statusChan: pipelineStatusChan,
logger: logger,
logger: pipelineLogger,
}

go p.listenToStatusUpdates()

localLogger, pipelineSpecific := localLogger(pipelineLogger, confs...)
if pipelineSpecific {
defer localLogger.Close()
}
localTimeout := localTimeout(confs...)

var stageIdAsString string
customStageId := customStageId(confs...)
if customStageId != 0 {
stageIdAsString = "stage " + strconv.FormatInt(customStageId, 10) + ": "
} else {
stageIdAsString = "stage " + strconv.FormatInt(stages.InitStageId, 10) + ": "
}

go func() {
defer close(initChan)

if starter != nil {
logger.Debug("Pipeline: waiting for the start signal...")
pipelineLogger.Debug("Pipeline: waiting for the start signal...")
localLogger.Debug(stageIdAsString + "waiting for the start signal...")

select {
case _, ok := <-starter:
if ok {
logger.Debug("Pipeline: start signal received")
pipelineLogger.Debug("Pipeline: start signal received")
localLogger.Debug(stageIdAsString + "start signal received")
stageStarter <- struct{}{}
close(starter)
}
case <-ctx.Done():
logger.Debug("Pipeline: interrupted before the start signal")
pipelineLogger.Debug("Pipeline: interrupted before the start signal")
localLogger.Debug(stageIdAsString + "context done signal received before the start signal")
// if the pipeline is interrupted before it is started, then return
close(starter)
return
}
}

logger.Info("Pipeline: started")
logger.Info("Stage 1: started")
pipelineLogger.Info("Pipeline: started")
localLogger.Info(stageIdAsString + "started")

// start pipeline timeout if configured
go func() {
if pc.timeout > 0 {
p.timeoutTimer = time.AfterFunc(pc.timeout, func() {
logger.Info("Pipeline: timeout reached for pipeline - interrupting the pipeline")
pipelineLogger.Info("Pipeline: timeout reached for pipeline - interrupting the pipeline")
ctxCancelFunc()
pipelineStatusChan <- statuses.TimedOut
})
}
}()

// start stage timeout if configured
var stageTimeoutTimer *time.Timer
go func() {
if localTimeout > 0 {
stageTimeoutTimer = time.AfterFunc(localTimeout, func() {
localLogger.Info(stageIdAsString + "timeout reached for stage - interrupting the pipeline" + stageIdAsString + " - interrupting the pipeline")
ctxCancelFunc()
pipelineStatusChan <- statuses.TimedOut
})
}
}()

pipelineInitFunc(initChan)
logger.Info("Stage 1: finished")

utils.StopSafely(stageTimeoutTimer)
localLogger.Info(stageIdAsString + "finished")
}()

return p
Expand All @@ -225,6 +269,7 @@ func parseConfigs(confs ...configs.PipelineConfig) *parsedConfigs {
var stageRateLimiter *ratelimiter.RateLimiter
var timeout time.Duration
var logger logging.Logger
var parsedInitStageConf *parsedInitStageConfigs

if len(confs) > 0 {
conf := confs[0]
Expand All @@ -243,6 +288,18 @@ func parseConfigs(confs ...configs.PipelineConfig) *parsedConfigs {
if conf.Logger != nil {
logger = conf.Logger
}

initStageConf := conf.InitStageConfig
if initStageConf != nil {
parsedInitStageConf = &parsedInitStageConfigs{}

if initStageConf.Timeout > 0 {
parsedInitStageConf.timeout = initStageConf.Timeout
}
if initStageConf.Logger != nil {
parsedInitStageConf.logger = initStageConf.Logger
}
}
}

if logger == nil {
Expand All @@ -255,5 +312,44 @@ func parseConfigs(confs ...configs.PipelineConfig) *parsedConfigs {
stageRateLimiter: stageRateLimiter,
timeout: timeout,
logger: logger,
initStageConfig: parsedInitStageConf,
}
}

func localLogger(pipelineLogger logging.Logger, confs ...configs.PipelineConfig) (logging.Logger, bool) {
if len(confs) == 0 {
return pipelineLogger, false
}

conf := confs[0].InitStageConfig
if conf != nil && conf.Logger != nil {
// stage configs overrides pipeline configs for logger
return conf.Logger, true
}
return pipelineLogger, false
}

func localTimeout(confs ...configs.PipelineConfig) time.Duration {
if len(confs) == 0 {
return 0
}

conf := confs[0].InitStageConfig
if conf != nil {
// stage configs overrides pipeline configs for timeout
return conf.Timeout
}
return 0
}

func customStageId(confs ...configs.PipelineConfig) int64 {
if len(confs) == 0 {
return 0
}

conf := confs[0].InitStageConfig
if conf != nil {
return conf.CustomId
}
return 0
}
113 changes: 113 additions & 0 deletions pippin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_PipelineAndStageRateLimiting_S
configs.PipelineConfig{
MaxGoroutinesTotal: 100,
MaxGoroutinesPerStage: 1,
Logger: logging.NewConsoleLogger(loglevels.TRACE),
},
)

Expand Down Expand Up @@ -1118,6 +1119,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_StageRateLimiting_PerStage_Low
[]string{"1", "a", "2", "-3", "4", "5", "b"},
configs.PipelineConfig{
MaxGoroutinesPerStage: 1,
Logger: logging.NewConsoleLogger(loglevels.TRACE),
},
)

Expand Down Expand Up @@ -1647,6 +1649,117 @@ func TestFromSlice_AllPossibleTransformations_Sum_PipelineAndStageConsoleLogger_
}
}

func TestFromSlice_AllPossibleTransformations_Sum_PipelineAndInitStageConsoleLogger_Success(t *testing.T) {
p := pipeline.FromSlice(
[]string{"1", "a", "2", "-3", "4", "5", "b"},
configs.PipelineConfig{
Logger: logging.NewConsoleLogger(loglevels.TRACE),
InitStageConfig: &configs.StageConfig{
Logger: logging.NewConsoleLogger(loglevels.DEBUG),
},
},
)

if p.Status != statuses.Running {
t.Errorf("expected status to be Running, got %s", p.Status.String())
}

atoiStage := transform.MapWithError(
p.InitStage,
func(input string) (int, error) {
return strconv.Atoi(input)
},
func(err error) {
fmt.Println(err)
},
)
// 1, 2, -3, 4, 5

oddNumsStage := transform.Filter(atoiStage, func(input int) bool {
return input%2 != 0
})
// 1, -3, 5

multipliedByTwoStage := transform.Map(oddNumsStage, func(input int) int {
return input * 2
})
// 2, -6, 10

toMatrixStage := transform.MapWithErrorMapper(
multipliedByTwoStage,
func(input int) ([]int, error) {
if input < 0 {
return nil, fmt.Errorf("negative number %d", input)
}

res := make([]int, input)
for i := 0; i < input; i++ {
res[i] = input * i
}
return res, nil
},
func(err error) []int {
return []int{42}
},
configs.StageConfig{
Logger: logging.NewConsoleLogger(loglevels.INFO),
},
)
// [0, 2], [42], [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]

plusOneStage := transform.FlatMapWithError(
toMatrixStage,
func(input int) ([]int, error) {
if input == 0 {
return nil, fmt.Errorf("zero")
}

return []int{input + 1}, nil
},
func(err error) {
fmt.Println(err)
},
)
// [3], [43], [11], [21], [31], [41], [51], [61], [71], [81], [91]

greaterThan42Stage := transform.FlatMapWithErrorMapper(
plusOneStage,
func(input int) ([]int, error) {
if input <= 42 {
return nil, fmt.Errorf("42")
}
return []int{input}, nil
},
func(err error) []int {
return []int{0}
},
)
// [0], [43], [0], [0], [0], [0], [51], [61], [71], [81], [91]

flattenedStage := transform.FlatMap(greaterThan42Stage, func(input int) int {
return input
})
// [0, 43, 0, 0, 0, 0, 51, 61, 71, 81, 91]

sum, err := aggregate.Sum(flattenedStage)
// 398

if err != nil {
t.Errorf("expected no error, got %v", err)
} else {
if *sum != 398 {
t.Errorf("expected sum to be 398, got %d", *sum)
}
}

// to sync with the pipeline
time.Sleep(1 * time.Second)

if p.Status != statuses.Done {
t.Errorf("expected status to be Done, got %s", p.Status.String())
}
}

func TestFromSlice_Map_Sync_SumComplexType_Success(t *testing.T) {
p := pipeline.FromSlice(
[]string{"1", "2", "-3", "4", "5"},
Expand Down
Loading

0 comments on commit 6b562b0

Please sign in to comment.