Skip to content

Commit

Permalink
Merge pull request #2 from n0rdy/feature/logging
Browse files Browse the repository at this point in the history
Implemented logging
  • Loading branch information
n0rdy authored Nov 19, 2023
2 parents 5d87754 + eff01d9 commit 354c944
Show file tree
Hide file tree
Showing 16 changed files with 644 additions and 33 deletions.
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ and even if the pipeline rate limiter is full, the program will spawn a new goro
- `MaxGoroutinesPerStage` - is an integer that indicates the maximum number of goroutines that can be spawned by each stage.
If it is passed as `0` or less, then there is no limit.
It is possible to change the limit for each stage individually - see `configs.StageConfig.MaxGoroutines`.
- `TimeoutInMillis` - is an integer that indicates the timeout in milliseconds for the entire pipeline.
If it is passed as `0` or less, then there is no timeout.
- `Timeout` - indicates the timeout for the entire pipeline. If it is passed as `0` or less, then there is no timeout.
- `Logger` is a logger that will be used by the pipeline.
If it is passed as nil, then the `logging.NoOpsLogger` logger will be used that does nothing.
Check `logging` package for more details and predefined loggers.

If you pipeline performs any network calls within its transformation/aggregation logic, I'd suggest configuring the maximum number of goroutines to prevent the possible DDoS attack on the target server or reaching the maximum number of open files on the client machine.

Expand All @@ -251,7 +253,8 @@ p := pipeline.FromSlice[int]([]int{1, 2, 3, 4, 5}, configs.PipelineConfig{
ManualStart: true,
MaxGoroutinesTotal: 100,
MaxGoroutinesPerStage: 10,
TimeoutInMillis: 1000,
Timeout: duration.Duration(1000) * time.Millisecond,
Logger: logging.NewConsoleLogger(loglevels.DEBUG),
})
```

Expand Down Expand Up @@ -429,10 +432,14 @@ It is represented by the `configs.StageConfig` struct, which contains the follow
If it is passed as `0` or less, then there is no limit.
This config option can be used to change the limit for each stage that comes from the `configs.PipelineConfig.MaxGoroutinesPerStage` option (if provided).
Please, note that the real number of goroutines might be higher than the number specified here, as the library spawns additional goroutines for internal purposes.
- `TimeoutInMillis` - is an integer that indicates the timeout in milliseconds for the stage. If it is passed as `0` or less, then there is no timeout.
- `Timeout` - indicates the timeout for the stage. If it is passed as `0` or less, then there is no timeout.
- `StageConfig.CustomId` - is a custom ID for the stage. If it is passed as 0, then the stage will be assigned an ID automatically.
Auto-generated IDs are calculated as follows: 1 + the ID of the previous stage.
The initial stage (the one that is created first) has an ID of 1. It is recommended to either rely on the auto-generated IDs or to provide a custom ID for each stage, otherwise the IDs might be messed up due to the (1 + the ID of the previous stage) logic mentioned above.
- `Logger` is a logger that will be used by the pipeline.
If it is passed as nil, then the `logging.NoOpsLogger` logger will be used that does nothing.
Check `logging` package for more details and predefined loggers.
This config option can be used to change the logger for each stage that comes from the `configs.PipelineConfig.Logger` option (if provided).

To create a transformation with a custom configuration:
```go
Expand All @@ -443,8 +450,9 @@ mappingStage := transform.Map[int, int](filteringStage, func(i int) int {
return i * 2
}, configs.StageConfig{
MaxGoroutines: 10,
TimeoutInMillis: 1000,
Timeout: time.Duration(1000) * time.Millisecond,
CustomId: 1,
Logger: logging.NewConsoleLogger(loglevels.INFO),
})
```

Expand Down
9 changes: 8 additions & 1 deletion configs/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package configs

import "time"
import (
"github.com/n0rdy/pippin/logging"
"time"
)

// PipelineConfig is a struct that contains the configuration for a pipeline
//
Expand All @@ -19,9 +22,13 @@ import "time"
//
// [PipelineConfig.Timeout] indicates the timeout for the pipeline.
// If it is passed as 0 or less, then there is no timeout.
//
// [PipelineConfig.Logger] is a logger that will be used by the pipeline.
// If it is passed as nil, then the [logging.NoOpsLogger] logger will be used that does nothing.
type PipelineConfig struct {
ManualStart bool
MaxGoroutinesTotal int
MaxGoroutinesPerStage int
Timeout time.Duration
Logger logging.Logger
}
10 changes: 9 additions & 1 deletion configs/stage.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package configs

import "time"
import (
"github.com/n0rdy/pippin/logging"
"time"
)

// StageConfig is a struct that holds the configuration for a stage.
//
Expand All @@ -16,8 +19,13 @@ import "time"
// Auto-generated IDs are calculated as follows: 1 + the ID of the previous stage.
// The initial stage (the one that is created first) has an ID of 1.
// It is recommended to either rely on the auto-generated IDs or to provide a custom ID for each stage, otherwise the IDs might be messed up due to the (1 + the ID of the previous stage) logic mentioned above.
//
// [StageConfig.Logger] is a logger that will be used by the stage.
// If it is passed as nil, then the [logging.NoOpsLogger] logger will be used that does nothing.
// This config option can be used to change the logger for each stage that comes from the [PipelineConfig.Logger] option (if provided).
type StageConfig struct {
MaxGoroutines int
Timeout time.Duration
CustomId int64
Logger logging.Logger
}
68 changes: 68 additions & 0 deletions logging/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package logging

import (
"fmt"
"github.com/n0rdy/pippin/types/loglevels"
"github.com/n0rdy/pippin/utils"
)

// ChannelLogger is a logger that writes to a channel.
// Please, make sure that you read from the channel, otherwise it will block the pipeline execution.
//
// ChannelLogger is handy when you want to write logs to a file or do some heavy actions on them, but don't want to affect performance of the pipeline.
//
// ChannelLogger accepts a log level as a parameter.
// It will print only those logs that have a level equal or higher than the specified one.
// Check [loglevels.LogLevel] for more details.
//
// Make sure to call the ChannelLogger.Close() method when you are done with the logger - this will close the channel.
type ChannelLogger struct {
ch chan<- string
level loglevels.LogLevel
}

func NewChannelLogger(ch chan<- string, level loglevels.LogLevel) Logger {
return &ChannelLogger{
ch: ch,
level: level,
}
}

func (cl *ChannelLogger) Trace(message string) {
if cl.level <= loglevels.TRACE {
cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.TracePrefix + message
}
}

func (cl *ChannelLogger) Debug(message string) {
if cl.level <= loglevels.DEBUG {
cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.DebugPrefix + message
}
}

func (cl *ChannelLogger) Info(message string) {
if cl.level <= loglevels.INFO {
cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.InfoPrefix + message
}
}

func (cl *ChannelLogger) Warn(message string, errs ...error) {
if cl.level <= loglevels.WARN {
cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.WarnPrefix + cl.messageWithErrors(message, errs...)
}
}

func (cl *ChannelLogger) Error(message string, errs ...error) {
if cl.level <= loglevels.ERROR {
cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.ErrorPrefix + cl.messageWithErrors(message, errs...)
}
}

func (cl *ChannelLogger) Close() error {
close(cl.ch)
return nil
}

func (cl *ChannelLogger) messageWithErrors(message string, errs ...error) string {
return fmt.Sprintf(message, errs)
}
61 changes: 61 additions & 0 deletions logging/console.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package logging

import (
"fmt"
"github.com/n0rdy/pippin/types/loglevels"
"github.com/n0rdy/pippin/utils"
)

// ConsoleLogger is a logger that prints logs to console.
// Basically, it uses fmt.Println() to print logs under the hood.
//
// ConsoleLogger accepts a log level as a parameter.
// It will print only those logs that have a level equal or higher than the specified one.
// Check [loglevels.LogLevel] for more details.
//
// The format of logs is: "time [log level] message"
// Example:
// 2006-01-02 15:04:05:000 [INFO] some cool info message
type ConsoleLogger struct {
level loglevels.LogLevel
}

func NewConsoleLogger(level loglevels.LogLevel) Logger {
return &ConsoleLogger{
level: level,
}
}

func (cl *ConsoleLogger) Trace(message string) {
if cl.level <= loglevels.TRACE {
fmt.Println(utils.TimeNowAsRFC3339NanoString() + loglevels.TracePrefix + message)
}
}

func (cl *ConsoleLogger) Debug(message string) {
if cl.level <= loglevels.DEBUG {
fmt.Println(utils.TimeNowAsRFC3339NanoString() + loglevels.DebugPrefix + message)
}
}

func (cl *ConsoleLogger) Info(message string) {
if cl.level <= loglevels.INFO {
fmt.Println(utils.TimeNowAsRFC3339NanoString() + loglevels.InfoPrefix + message)
}
}

func (cl *ConsoleLogger) Warn(message string, errs ...error) {
if cl.level <= loglevels.WARN {
fmt.Println(utils.TimeNowAsRFC3339NanoString()+loglevels.WarnPrefix+message, errs)
}
}

func (cl *ConsoleLogger) Error(message string, errs ...error) {
if cl.level <= loglevels.ERROR {
fmt.Println(utils.TimeNowAsRFC3339NanoString()+loglevels.ErrorPrefix+message, errs)
}
}

func (cl *ConsoleLogger) Close() error {
return nil
}
10 changes: 10 additions & 0 deletions logging/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package logging

type Logger interface {
Trace(message string)
Debug(message string)
Info(message string)
Warn(message string, errs ...error)
Error(message string, errs ...error)
Close() error
}
23 changes: 23 additions & 0 deletions logging/noops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package logging

// NoOpsLogger is a logger that does nothing.
// This is a default logger for the service if no logger is configured.
type NoOpsLogger struct{}

func NewNoOpsLogger() Logger {
return &NoOpsLogger{}
}

func (nol *NoOpsLogger) Trace(message string) {}

func (nol *NoOpsLogger) Debug(message string) {}

func (nol *NoOpsLogger) Info(message string) {}

func (nol *NoOpsLogger) Warn(message string, errs ...error) {}

func (nol *NoOpsLogger) Error(message string, errs ...error) {}

func (nol *NoOpsLogger) Close() error {
return nil
}
Loading

0 comments on commit 354c944

Please sign in to comment.