-
Notifications
You must be signed in to change notification settings - Fork 0
LOGC-7: Implement log processor #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
819196a to
ca263f9
Compare
Codecov Report❌ Patch coverage is ❌ Your project status has failed because the head coverage (77.78%) is below the adjusted base coverage (84.90%). You can increase the head coverage or adjust the Removed Code Behavior. Additional details and impacted files
... and 1 file with indirect coverage changes @@ Coverage Diff @@
## main #13 +/- ##
==========================================
- Coverage 85.31% 77.78% -7.53%
==========================================
Files 14 17 +3
Lines 538 1049 +511
==========================================
+ Hits 459 816 +357
- Misses 54 186 +132
- Partials 25 47 +22
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
fredmnl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of small comments.
I was thinking about the general "cron"-ness of log-courier. We might benefit from not having it trigger at exact periodic intervals but rather on a jittery interval so that we stop having huge spiked of work periodically (looking at you, backbeat).
Also I'm not totally sure I get how batch.MaxTimestamp gets set.
| } | ||
|
|
||
| // runCycle executes a single discovery and processing cycle | ||
| func (p *Processor) runCycle(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flag might not even be needed since (in my current partial understanding) there is a single-threaded main loop that calls runCycle synchronously.
If we keep the flag, I'd rather have a mutex (although https://pkg.go.dev/sync#Mutex.TryLock doesn't sound very encouraging of this idea), or at least make the boolean thread-safe by using a mutex around the read/write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I was Node.js thinking mode.
Removed it in 51bcddf.
pkg/logcourier/processor.go
Outdated
|
|
||
| // uploadLogBatchWithRetry handles fetching, building, and uploading with retries | ||
| // Returns the records and offset info needed for committing | ||
| func (p *Processor) uploadLogBatchWithRetry(ctx context.Context, batch LogBatch) ([]LogRecord, time.Time, uint16, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you can create a retryer decorator , I'm not sure if Go generics can help in making a very generic one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, [nit] jitter is a nice touch to add to exponential backoff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leif-scality
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that if we have multiple log-couriers they are going to build the same log s3 object mutiple times, s3 is going to reject the upload, but still we are going to waist resources building it for nothing. We should have a task queue to prevent duplicate work
pkg/logcourier/processor.go
Outdated
| "consecutiveFailures", p.consecutiveFailures, | ||
| "maxConsecutiveCycleFailures", maxConsecutiveCycleFailures) | ||
|
|
||
| if p.consecutiveFailures >= maxConsecutiveCycleFailures { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we return here we are going to crash log-courier, do we want this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, my intention here is not stop log-courier
Here is the scenario I have in mind: Clickhouse storage grows, and the circuit-breaker makes the database read-only.
So log-courier can read logs and upload S3 objects, but cannot commit.
In that case, log-courier will keep uploading the same objects indefinitely.
So the idea here is to exit if we are not able to make any commit after 3 attempts (including retries).
However, what will happen in practice is that a higher-level mechanism (supervisord / ballot) will restart log-courier.
It's maybe better to simplify and continue retrying instead of existing in that case.
pkg/logcourier/processor.go
Outdated
| // 3. No successes, any transient errors -> cycle fails | ||
| // Indicates system-wide issue (ClickHouse down, S3 throttled, etc.) | ||
| // After 3 consecutive cycle failures, processor exits. | ||
| if successCount == 0 && transientErrorCount > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird condition, we should retry in a loop if we have an error, why do we want to discard the error if one batch is ok. If we have 100 batches with 1 success and 99 errors we don't return an error, do we want this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we return an error we will trigger the consecutiveFailures mechanism (and possibly crash log-courier), so we want to return an error only when we are not able to make any progress.
The idea is that if we were able to write logs and commit for even 1 bucket, then we should log the errors, but continue to the next processing cycle.
The other case is that we fail for all buckets, but the failure is "transient", for example the account we use for writing does not exist or we have passed incorrect access/secret key. I don't log-courier should exit for this type of errors.
jonathan-gramain
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is quite big 🤯 🙂
pkg/testutil/s3.go
Outdated
| } | ||
|
|
||
| // Delete objects | ||
| for _, obj := range listOutput.Contents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a loop, what about sending a single DeleteObjects requests for efficiency? I know this is just for tests but it can accelerate their execution slightly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, done.
pkg/logcourier/processor.go
Outdated
| p.consecutiveFailures++ | ||
| p.logger.Error("cycle failed", | ||
| "error", err, | ||
| "consecutiveFailures", p.consecutiveFailures, | ||
| "maxConsecutiveCycleFailures", maxConsecutiveCycleFailures) | ||
| } else { | ||
| p.consecutiveFailures = 0 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about embedding this logic inside runCycle, so it avoids duplicating it in the Run function? runCycle would then only send back an error when it has exhausted its failure limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opted for removing this logic entirely and instead let runCycle retry indefinitely.
pkg/logcourier/processor.go
Outdated
|
|
||
| for attempt := 0; attempt <= p.maxRetries; attempt++ { | ||
| if attempt > 0 { | ||
| p.logger.Info("retrying upload after backoff", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd put this log line after the select, to log it just before we actually do retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept this because in the updated implementation it prints the backoff duration, and I added a Debug log just before we retry the operation.
pkg/logcourier/processor.go
Outdated
| lastErr = err | ||
|
|
||
| if IsPermanentError(err) { | ||
| p.logger.Error("permanent error, not retrying", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are "permanent" errors considered normal in some cases? I understood this from another comment, in which case I suggest using Info level for logging those benign errors (if we can distinguish them easily) to avoid alarming the admin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in the sense that they are "usage" errors (a target bucket does not have the policy granting write access to the log delivery user).
Changed to Info.
pkg/logcourier/processor_test.go
Outdated
|
|
||
| BeforeEach(func() { | ||
| // Configure viper for all config keys | ||
| viper.Reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could do logcourier.ConfigSpec.Reset() instead for better encapsulation.
Maybe you can also use a helper to reset the rest to their default values (although I would think viper does this automatically when Reset is called, buy maybe not 🤷 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with logcourier.ConfigSpec.Reset()
pkg/logcourier/processor_test.go
Outdated
| Expect(objects).NotTo(BeEmpty(), "Expected at least one log object in S3") | ||
|
|
||
| // Verify object content | ||
| if len(objects) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may not be necessary to check if the objects array has elements, thanks to the above Expect check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! done.
dvasilas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your patience in reviewing this huge PR !
I should have split it into smaller ones 🙈
| } | ||
|
|
||
| // runCycle executes a single discovery and processing cycle | ||
| func (p *Processor) runCycle(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I was Node.js thinking mode.
Removed it in 51bcddf.
pkg/logcourier/processor.go
Outdated
| p.consecutiveFailures++ | ||
| p.logger.Error("cycle failed", | ||
| "error", err, | ||
| "consecutiveFailures", p.consecutiveFailures, | ||
| "maxConsecutiveCycleFailures", maxConsecutiveCycleFailures) | ||
| } else { | ||
| p.consecutiveFailures = 0 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opted for removing this logic entirely and instead let runCycle retry indefinitely.
pkg/logcourier/processor.go
Outdated
| lastErr = err | ||
|
|
||
| if IsPermanentError(err) { | ||
| p.logger.Error("permanent error, not retrying", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in the sense that they are "usage" errors (a target bucket does not have the policy granting write access to the log delivery user).
Changed to Info.
pkg/logcourier/processor_test.go
Outdated
|
|
||
| BeforeEach(func() { | ||
| // Configure viper for all config keys | ||
| viper.Reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with logcourier.ConfigSpec.Reset()
pkg/logcourier/processor_test.go
Outdated
| Expect(objects).NotTo(BeEmpty(), "Expected at least one log object in S3") | ||
|
|
||
| // Verify object content | ||
| if len(objects) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! done.
pkg/logcourier/processor.go
Outdated
|
|
||
| for attempt := 0; attempt <= p.maxRetries; attempt++ { | ||
| if attempt > 0 { | ||
| p.logger.Info("retrying upload after backoff", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept this because in the updated implementation it prints the backoff duration, and I added a Debug log just before we retry the operation.
pkg/logcourier/processor.go
Outdated
| logFields["attempt"] = attempt | ||
| logFields["backoffSeconds"] = backoff.Seconds() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As best practice I would avoid mutating the parameter logFields but create a local copy instead (say if it is passed from a global static value one day it would be problematic).
You could also consider making the logFields parameter optional (e.g. support nil) and create an empty map in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, thanks !
| for _, obj := range listOutput.Contents { | ||
| _, delErr := h.client.DeleteObject(ctx, &awss3.DeleteObjectInput{ | ||
| // Delete objects in batch | ||
| if len(listOutput.Contents) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be fine for now, but you could consider doing ListObjectVersions and pass the version ID to delete to be able to support versioned buckets as well (it would still work the same on nonversioned buckets).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added testing with versioned buckets to https://scality.atlassian.net/browse/LOGC-16 to make sure we don't forget.
When no offset exists, max() returns NULL which ClickHouse driver converts to Unix epoch (1970-01-01). Use maxOrNull() and sql.NullTime to properly detect NULL values. Return Go zero time for missing offsets, enabling reliable IsZero() checks. This makes the case in which a bucket does not have a committed offset yet more explicit.
Add OffsetManagerInterface and UploaderInterface to enable tests to use custom implementations (for injecting errors). Existing implementations already satisfy these interfaces.
- S3TestHelper for bucket/object operations - CountingUploader for tracking upload attempts - InsertTestLogWithTargetBucket for inserting test data - FailingOffsetManager for simulating offset commit failures
06ae75d to
e3ab787
Compare
- Discovery cycle with configurable interval - Parallel batch processing with worker pool - Retry logic - Error classification (permanent vs transient)
Transform stub main into complete application - Load and validate configuration - Set up logging - Create and initialize processor - Signal handling - Shutdown timeout with cleanup
e3ab787 to
266c848
Compare
Implement the core log processing logic.
Best reviewed commit by commit.