Skip to content

Commit 97a76e9

Browse files
authored
Merge pull request #13 from scality/improvement/LOGC-7
LOGC-7: Implement log processor
2 parents 47f7d2e + 266c848 commit 97a76e9

File tree

19 files changed

+2218
-52
lines changed

19 files changed

+2218
-52
lines changed

cmd/log-courier/main.go

Lines changed: 107 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,56 @@
11
package main
22

33
import (
4+
"context"
5+
"errors"
46
"fmt"
7+
"log/slog"
58
"os"
9+
"os/signal"
10+
"time"
611

712
"github.com/spf13/pflag"
13+
"golang.org/x/sys/unix"
14+
815
"github.com/scality/log-courier/pkg/logcourier"
16+
"github.com/scality/log-courier/pkg/util"
917
)
1018

1119
func main() {
20+
os.Exit(run())
21+
}
22+
23+
// buildProcessorConfig creates processor config from ConfigSpec
24+
func buildProcessorConfig(logger *slog.Logger) logcourier.Config {
25+
return logcourier.Config{
26+
ClickHouseHosts: logcourier.ConfigSpec.GetStringSlice("clickhouse.url"),
27+
ClickHouseUsername: logcourier.ConfigSpec.GetString("clickhouse.username"),
28+
ClickHousePassword: logcourier.ConfigSpec.GetString("clickhouse.password"),
29+
ClickHouseTimeout: time.Duration(logcourier.ConfigSpec.GetInt("clickhouse.timeout-seconds")) * time.Second,
30+
CountThreshold: logcourier.ConfigSpec.GetInt("consumer.count-threshold"),
31+
TimeThresholdSec: logcourier.ConfigSpec.GetInt("consumer.time-threshold-seconds"),
32+
DiscoveryInterval: time.Duration(logcourier.ConfigSpec.GetInt("consumer.discovery-interval-seconds")) * time.Second,
33+
DiscoveryIntervalJitterFactor: logcourier.ConfigSpec.GetFloat64("consumer.discovery-interval-jitter-factor"),
34+
NumWorkers: logcourier.ConfigSpec.GetInt("consumer.num-workers"),
35+
MaxRetries: logcourier.ConfigSpec.GetInt("retry.max-retries"),
36+
InitialBackoff: time.Duration(logcourier.ConfigSpec.GetInt("retry.initial-backoff-seconds")) * time.Second,
37+
MaxBackoff: time.Duration(logcourier.ConfigSpec.GetInt("retry.max-backoff-seconds")) * time.Second,
38+
BackoffJitterFactor: logcourier.ConfigSpec.GetFloat64("retry.backoff-jitter-factor"),
39+
S3Endpoint: logcourier.ConfigSpec.GetString("s3.endpoint"),
40+
S3AccessKeyID: logcourier.ConfigSpec.GetString("s3.access-key-id"),
41+
S3SecretAccessKey: logcourier.ConfigSpec.GetString("s3.secret-access-key"),
42+
Logger: logger,
43+
}
44+
}
45+
46+
func run() int {
47+
// Add command-line flags
1248
logcourier.ConfigSpec.AddFlag(pflag.CommandLine, "log-level", "log-level")
1349

1450
configFileFlag := pflag.String("config-file", "", "Path to configuration file")
1551
pflag.Parse()
1652

53+
// Load configuration
1754
configFile := *configFileFlag
1855
if configFile == "" {
1956
configFile = os.Getenv("LOG_COURIER_CONFIG_FILE")
@@ -23,16 +60,81 @@ func main() {
2360
if err != nil {
2461
fmt.Fprintf(os.Stderr, "Configuration error: %v\n", err)
2562
pflag.Usage()
26-
os.Exit(2)
63+
return 2
2764
}
2865

66+
// Validate configuration
2967
err = logcourier.ValidateConfig()
3068
if err != nil {
3169
fmt.Fprintf(os.Stderr, "Configuration validation error: %v\n", err)
32-
pflag.Usage()
33-
os.Exit(2)
70+
return 2
71+
}
72+
73+
// Set up logger
74+
logLevel := util.ParseLogLevel(logcourier.ConfigSpec.GetString("log-level"))
75+
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
76+
Level: logLevel,
77+
}))
78+
79+
// Get shutdown timeout from config
80+
shutdownTimeout := time.Duration(logcourier.ConfigSpec.GetInt("shutdown-timeout-seconds")) * time.Second
81+
82+
// Create processor
83+
ctx := context.Background()
84+
processorCfg := buildProcessorConfig(logger)
85+
86+
processor, err := logcourier.NewProcessor(ctx, processorCfg)
87+
if err != nil {
88+
logger.Error("failed to create processor", "error", err)
89+
return 1
90+
}
91+
defer func() {
92+
if err := processor.Close(); err != nil {
93+
logger.Error("failed to close processor", "error", err)
94+
}
95+
}()
96+
97+
// Set up signal handling for graceful shutdown
98+
ctx, cancel := context.WithCancel(ctx)
99+
defer cancel()
100+
101+
signalsChan := make(chan os.Signal, 1)
102+
signal.Notify(signalsChan, unix.SIGINT, unix.SIGTERM)
103+
104+
// Start processor in goroutine
105+
errChan := make(chan error)
106+
go func() {
107+
errChan <- processor.Run(ctx)
108+
}()
109+
110+
// Wait for signal or error
111+
select {
112+
case sig := <-signalsChan:
113+
logger.Info("signal received", "signal", sig)
114+
cancel()
115+
116+
// Wait for processor to stop gracefully (with timeout)
117+
shutdownTimer := time.NewTimer(shutdownTimeout)
118+
defer shutdownTimer.Stop()
119+
120+
select {
121+
case <-shutdownTimer.C:
122+
logger.Warn("shutdown timeout exceeded, forcing exit")
123+
return 1
124+
case err := <-errChan:
125+
if err != nil && !errors.Is(err, context.Canceled) {
126+
logger.Error("processor stopped with error", "error", err)
127+
return 1
128+
}
129+
}
130+
131+
case err := <-errChan:
132+
if err != nil && !errors.Is(err, context.Canceled) {
133+
logger.Error("processor error", "error", err)
134+
return 1
135+
}
34136
}
35137

36-
// TODO: Initialize and run consumer
37-
fmt.Println("log-courier started (stub)")
138+
logger.Info("log-courier stopped")
139+
return 0
38140
}

go.mod

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,16 @@ require (
1414
github.com/ClickHouse/ch-go v0.68.0 // indirect
1515
github.com/Masterminds/semver/v3 v3.4.0 // indirect
1616
github.com/andybalholm/brotli v1.2.0 // indirect
17-
github.com/aws/aws-sdk-go-v2 v1.39.4 // indirect
17+
github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect
1818
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect
1919
github.com/aws/aws-sdk-go-v2/config v1.31.15 // indirect
2020
github.com/aws/aws-sdk-go-v2/credentials v1.18.19 // indirect
2121
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.11 // indirect
22-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.11 // indirect
23-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.11 // indirect
22+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect
23+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect
2424
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
2525
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.11 // indirect
26+
github.com/aws/aws-sdk-go-v2/service/iam v1.49.2 // indirect
2627
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 // indirect
2728
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.2 // indirect
2829
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.11 // indirect
@@ -31,7 +32,7 @@ require (
3132
github.com/aws/aws-sdk-go-v2/service/sso v1.29.8 // indirect
3233
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.3 // indirect
3334
github.com/aws/aws-sdk-go-v2/service/sts v1.38.9 // indirect
34-
github.com/aws/smithy-go v1.23.1 // indirect
35+
github.com/aws/smithy-go v1.23.2 // indirect
3536
github.com/fsnotify/fsnotify v1.9.0 // indirect
3637
github.com/go-faster/city v1.0.1 // indirect
3738
github.com/go-faster/errors v0.7.1 // indirect

go.sum

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/logcourier/batchfinder_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ var _ = Describe("BatchFinder", func() {
7272
oldTime := time.Now().Add(-2 * time.Hour)
7373
query := fmt.Sprintf(`
7474
INSERT INTO %s.access_logs
75-
(insertedAt, bucketName, timestamp, req_id, action, loggingEnabled, raftSessionId, httpURL)
75+
(insertedAt, bucketName, timestamp, req_id, action, loggingEnabled, raftSessionID, httpURL)
7676
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
7777
`, helper.DatabaseName)
7878
err := helper.Client.Exec(ctx, query,
@@ -82,7 +82,7 @@ var _ = Describe("BatchFinder", func() {
8282
"req-old", // req_id
8383
"GetObject", // action
8484
true, // loggingEnabled
85-
uint16(0), // raftSessionId
85+
uint16(0), // raftSessionID
8686
"/test-bucket/key", // httpURL
8787
)
8888
Expect(err).NotTo(HaveOccurred())
@@ -127,7 +127,7 @@ var _ = Describe("BatchFinder", func() {
127127

128128
// Commit offset at current time
129129
offsetTime := time.Now()
130-
offsetQuery := fmt.Sprintf("INSERT INTO %s.offsets (bucketName, raftSessionId, last_processed_ts) VALUES (?, ?, ?)", helper.DatabaseName)
130+
offsetQuery := fmt.Sprintf("INSERT INTO %s.offsets (bucketName, raftSessionID, last_processed_ts) VALUES (?, ?, ?)", helper.DatabaseName)
131131
err := helper.Client.Exec(ctx, offsetQuery,
132132
"test-bucket", uint16(0), offsetTime)
133133
Expect(err).NotTo(HaveOccurred())

pkg/logcourier/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,15 @@ func ValidateConfig() error {
3939
return fmt.Errorf("s3.max-backoff-delay-seconds must be positive, got %d", maxBackoffDelay)
4040
}
4141

42+
backoffJitterFactor := ConfigSpec.GetFloat64("retry.backoff-jitter-factor")
43+
if backoffJitterFactor < 0.0 || backoffJitterFactor > 1.0 {
44+
return fmt.Errorf("retry.backoff-jitter-factor must be between 0.0 and 1.0, got %f", backoffJitterFactor)
45+
}
46+
47+
discoveryIntervalJitterFactor := ConfigSpec.GetFloat64("consumer.discovery-interval-jitter-factor")
48+
if discoveryIntervalJitterFactor < 0.0 || discoveryIntervalJitterFactor > 1.0 {
49+
return fmt.Errorf("consumer.discovery-interval-jitter-factor must be between 0.0 and 1.0, got %f", discoveryIntervalJitterFactor)
50+
}
51+
4252
return nil
4353
}

pkg/logcourier/configspec.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,43 @@ var ConfigSpec = util.ConfigSpec{
4545
DefaultValue: 900,
4646
EnvVar: "LOG_COURIER_CONSUMER_TIME_THRESHOLD_SECONDS",
4747
},
48+
"consumer.discovery-interval-seconds": util.ConfigVarSpec{
49+
Help: "Interval in seconds between work discovery runs",
50+
DefaultValue: 60,
51+
EnvVar: "LOG_COURIER_CONSUMER_DISCOVERY_INTERVAL_SECONDS",
52+
},
53+
"consumer.discovery-interval-jitter-factor": util.ConfigVarSpec{
54+
Help: "Jitter factor for discovery interval (0.0 to 1.0, where 0 is no jitter and 1.0 is up to 100% jitter)",
55+
DefaultValue: 0.1,
56+
EnvVar: "LOG_COURIER_CONSUMER_DISCOVERY_INTERVAL_JITTER_FACTOR",
57+
},
58+
"consumer.num-workers": util.ConfigVarSpec{
59+
Help: "Number of parallel workers for batch processing",
60+
DefaultValue: 10,
61+
EnvVar: "LOG_COURIER_CONSUMER_NUM_WORKERS",
62+
},
63+
64+
// Retry configuration
65+
"retry.max-retries": util.ConfigVarSpec{
66+
Help: "Maximum number of retry attempts for failed batch processing",
67+
DefaultValue: 5,
68+
EnvVar: "LOG_COURIER_RETRY_MAX_RETRIES",
69+
},
70+
"retry.initial-backoff-seconds": util.ConfigVarSpec{
71+
Help: "Initial backoff duration in seconds for retry attempts",
72+
DefaultValue: 1,
73+
EnvVar: "LOG_COURIER_RETRY_INITIAL_BACKOFF_SECONDS",
74+
},
75+
"retry.max-backoff-seconds": util.ConfigVarSpec{
76+
Help: "Maximum backoff duration in seconds for retry attempts",
77+
DefaultValue: 60,
78+
EnvVar: "LOG_COURIER_RETRY_MAX_BACKOFF_SECONDS",
79+
},
80+
"retry.backoff-jitter-factor": util.ConfigVarSpec{
81+
Help: "Jitter factor for backoff (0.0 to 1.0, where 0 is no jitter and 1.0 is up to 100% jitter)",
82+
DefaultValue: 0.3,
83+
EnvVar: "LOG_COURIER_RETRY_BACKOFF_JITTER_FACTOR",
84+
},
4885

4986
// S3 configuration
5087
"s3.endpoint": util.ConfigVarSpec{
@@ -79,4 +116,9 @@ var ConfigSpec = util.ConfigSpec{
79116
DefaultValue: "info",
80117
EnvVar: "LOG_COURIER_LOG_LEVEL",
81118
},
119+
"shutdown-timeout-seconds": util.ConfigVarSpec{
120+
Help: "Maximum time to wait for graceful shutdown in seconds",
121+
DefaultValue: 30,
122+
EnvVar: "LOG_COURIER_SHUTDOWN_TIMEOUT_SECONDS",
123+
},
82124
}

pkg/logcourier/logfetch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecor
5353
insertedAt,
5454
loggingTargetBucket,
5555
loggingTargetPrefix,
56+
raftSessionID,
5657
timestamp
5758
FROM %s.%s
5859
WHERE bucketName = ?

pkg/logcourier/logfetch_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,9 @@ var _ = Describe("LogFetcher", func() {
194194
ReqID: "test-req-id",
195195
Action: "PutObject",
196196
ObjectKey: "test-key",
197-
BytesSent: 12345,
198-
HttpCode: 200,
197+
BytesSent: 12345,
198+
HttpCode: 200,
199+
RaftSessionID: 42,
199200
})
200201
Expect(err).NotTo(HaveOccurred())
201202

@@ -216,6 +217,7 @@ var _ = Describe("LogFetcher", func() {
216217
Expect(rec.ObjectKey).To(Equal("test-key"))
217218
Expect(rec.BytesSent).To(Equal(uint64(12345)))
218219
Expect(rec.HttpCode).To(Equal(uint16(200)))
220+
Expect(rec.RaftSessionID).To(Equal(uint16(42)))
219221
})
220222
})
221223
})

pkg/logcourier/logrecord.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,6 @@ type LogRecord struct {
3838
ElapsedMs float32 `ch:"elapsed_ms"` // AWS field 14: Total Time
3939
TurnAroundTime float32 `ch:"turnAroundTime"` // AWS field 15: Turn-Around Time
4040

41-
HttpCode uint16 `ch:"httpCode"` // AWS field 10: HTTP Status
41+
HttpCode uint16 `ch:"httpCode"` // AWS field 10: HTTP Status
42+
RaftSessionID uint16 `ch:"raftSessionID"` // Bucket raft session ID
4243
}

0 commit comments

Comments
 (0)