Skip to content

Conversation

@dvasilas
Copy link
Collaborator

Implement the core log processing logic.

Best reviewed commit by commit.

@codecov
Copy link

codecov bot commented Nov 10, 2025

Codecov Report

❌ Patch coverage is 69.30320% with 163 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.78%. Comparing base (47f7d2e) to head (266c848).
⚠️ Report is 11 commits behind head on main.

Files with missing lines Patch % Lines
cmd/log-courier/main.go 0.00% 65 Missing ⚠️
pkg/logcourier/processor.go 82.35% 29 Missing and 13 partials ⚠️
pkg/testutil/s3.go 70.96% 28 Missing and 8 partials ⚠️
pkg/util/logging.go 0.00% 12 Missing ⚠️
pkg/logcourier/config.go 33.33% 2 Missing and 2 partials ⚠️
pkg/logcourier/offset.go 84.61% 2 Missing ⚠️
pkg/testutil/clickhouse.go 97.14% 2 Missing ⚠️

❌ Your project status has failed because the head coverage (77.78%) is below the adjusted base coverage (84.90%). You can increase the head coverage or adjust the Removed Code Behavior.

Additional details and impacted files

Impacted file tree graph

Files with missing lines Coverage Δ
pkg/logcourier/configspec.go 100.00% <ø> (ø)
pkg/logcourier/logfetch.go 89.47% <100.00%> (+0.18%) ⬆️
pkg/s3/uploader.go 100.00% <ø> (ø)
pkg/util/config.go 76.92% <100.00%> (+1.24%) ⬆️
pkg/logcourier/offset.go 87.87% <84.61%> (+0.78%) ⬆️
pkg/testutil/clickhouse.go 91.51% <97.14%> (+2.20%) ⬆️
pkg/logcourier/config.go 69.23% <33.33%> (-10.77%) ⬇️
pkg/util/logging.go 0.00% <0.00%> (ø)
pkg/testutil/s3.go 70.96% <70.96%> (ø)
pkg/logcourier/processor.go 82.35% <82.35%> (ø)
... and 1 more

... and 1 file with indirect coverage changes

@@            Coverage Diff             @@
##             main      #13      +/-   ##
==========================================
- Coverage   85.31%   77.78%   -7.53%     
==========================================
  Files          14       17       +3     
  Lines         538     1049     +511     
==========================================
+ Hits          459      816     +357     
- Misses         54      186     +132     
- Partials       25       47      +22     
Flag Coverage Δ
unit 77.78% <69.30%> (-7.53%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link

@fredmnl fredmnl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of small comments.

I was thinking about the general "cron"-ness of log-courier. We might benefit from not having it trigger at exact periodic intervals but rather on a jittery interval so that we stop having huge spiked of work periodically (looking at you, backbeat).

Also I'm not totally sure I get how batch.MaxTimestamp gets set.

}

// runCycle executes a single discovery and processing cycle
func (p *Processor) runCycle(ctx context.Context) error {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag might not even be needed since (in my current partial understanding) there is a single-threaded main loop that calls runCycle synchronously.

If we keep the flag, I'd rather have a mutex (although https://pkg.go.dev/sync#Mutex.TryLock doesn't sound very encouraging of this idea), or at least make the boolean thread-safe by using a mutex around the read/write.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I was Node.js thinking mode.
Removed it in 51bcddf.


// uploadLogBatchWithRetry handles fetching, building, and uploading with retries
// Returns the records and offset info needed for committing
func (p *Processor) uploadLogBatchWithRetry(ctx context.Context, batch LogBatch) ([]LogRecord, time.Time, uint16, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you can create a retryer decorator , I'm not sure if Go generics can help in making a very generic one.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, [nit] jitter is a nice touch to add to exponential backoff.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted the retry logic to a decorator (309c974)and added jitter (b2cdee8). Thanks for the suggestions.

Copy link

@leif-scality leif-scality left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that if we have multiple log-couriers they are going to build the same log s3 object mutiple times, s3 is going to reject the upload, but still we are going to waist resources building it for nothing. We should have a task queue to prevent duplicate work

"consecutiveFailures", p.consecutiveFailures,
"maxConsecutiveCycleFailures", maxConsecutiveCycleFailures)

if p.consecutiveFailures >= maxConsecutiveCycleFailures {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we return here we are going to crash log-courier, do we want this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my intention here is not stop log-courier
Here is the scenario I have in mind: Clickhouse storage grows, and the circuit-breaker makes the database read-only.
So log-courier can read logs and upload S3 objects, but cannot commit.
In that case, log-courier will keep uploading the same objects indefinitely.
So the idea here is to exit if we are not able to make any commit after 3 attempts (including retries).
However, what will happen in practice is that a higher-level mechanism (supervisord / ballot) will restart log-courier.

It's maybe better to simplify and continue retrying instead of existing in that case.

// 3. No successes, any transient errors -> cycle fails
// Indicates system-wide issue (ClickHouse down, S3 throttled, etc.)
// After 3 consecutive cycle failures, processor exits.
if successCount == 0 && transientErrorCount > 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird condition, we should retry in a loop if we have an error, why do we want to discard the error if one batch is ok. If we have 100 batches with 1 success and 99 errors we don't return an error, do we want this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we return an error we will trigger the consecutiveFailures mechanism (and possibly crash log-courier), so we want to return an error only when we are not able to make any progress.

The idea is that if we were able to write logs and commit for even 1 bucket, then we should log the errors, but continue to the next processing cycle.

The other case is that we fail for all buckets, but the failure is "transient", for example the account we use for writing does not exist or we have passed incorrect access/secret key. I don't log-courier should exit for this type of errors.

Copy link

@jonathan-gramain jonathan-gramain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is quite big 🤯 🙂

}

// Delete objects
for _, obj := range listOutput.Contents {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a loop, what about sending a single DeleteObjects requests for efficiency? I know this is just for tests but it can accelerate their execution slightly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done.

Comment on lines 170 to 177
p.consecutiveFailures++
p.logger.Error("cycle failed",
"error", err,
"consecutiveFailures", p.consecutiveFailures,
"maxConsecutiveCycleFailures", maxConsecutiveCycleFailures)
} else {
p.consecutiveFailures = 0
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about embedding this logic inside runCycle, so it avoids duplicating it in the Run function? runCycle would then only send back an error when it has exhausted its failure limit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted for removing this logic entirely and instead let runCycle retry indefinitely.


for attempt := 0; attempt <= p.maxRetries; attempt++ {
if attempt > 0 {
p.logger.Info("retrying upload after backoff",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put this log line after the select, to log it just before we actually do retry.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this because in the updated implementation it prints the backoff duration, and I added a Debug log just before we retry the operation.

lastErr = err

if IsPermanentError(err) {
p.logger.Error("permanent error, not retrying",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are "permanent" errors considered normal in some cases? I understood this from another comment, in which case I suggest using Info level for logging those benign errors (if we can distinguish them easily) to avoid alarming the admin.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the sense that they are "usage" errors (a target bucket does not have the policy granting write access to the log delivery user).
Changed to Info.


BeforeEach(func() {
// Configure viper for all config keys
viper.Reset()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could do logcourier.ConfigSpec.Reset() instead for better encapsulation.

Maybe you can also use a helper to reset the rest to their default values (although I would think viper does this automatically when Reset is called, buy maybe not 🤷 )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with logcourier.ConfigSpec.Reset()

Expect(objects).NotTo(BeEmpty(), "Expected at least one log object in S3")

// Verify object content
if len(objects) > 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be necessary to check if the objects array has elements, thanks to the above Expect check.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! done.

Copy link
Collaborator Author

@dvasilas dvasilas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your patience in reviewing this huge PR !
I should have split it into smaller ones 🙈

}

// runCycle executes a single discovery and processing cycle
func (p *Processor) runCycle(ctx context.Context) error {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I was Node.js thinking mode.
Removed it in 51bcddf.

Comment on lines 170 to 177
p.consecutiveFailures++
p.logger.Error("cycle failed",
"error", err,
"consecutiveFailures", p.consecutiveFailures,
"maxConsecutiveCycleFailures", maxConsecutiveCycleFailures)
} else {
p.consecutiveFailures = 0
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted for removing this logic entirely and instead let runCycle retry indefinitely.

lastErr = err

if IsPermanentError(err) {
p.logger.Error("permanent error, not retrying",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the sense that they are "usage" errors (a target bucket does not have the policy granting write access to the log delivery user).
Changed to Info.


BeforeEach(func() {
// Configure viper for all config keys
viper.Reset()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with logcourier.ConfigSpec.Reset()

Expect(objects).NotTo(BeEmpty(), "Expected at least one log object in S3")

// Verify object content
if len(objects) > 0 {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! done.


for attempt := 0; attempt <= p.maxRetries; attempt++ {
if attempt > 0 {
p.logger.Info("retrying upload after backoff",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this because in the updated implementation it prints the backoff duration, and I added a Debug log just before we retry the operation.

Comment on lines 377 to 378
logFields["attempt"] = attempt
logFields["backoffSeconds"] = backoff.Seconds()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As best practice I would avoid mutating the parameter logFields but create a local copy instead (say if it is passed from a global static value one day it would be problematic).

You could also consider making the logFields parameter optional (e.g. support nil) and create an empty map in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks !

for _, obj := range listOutput.Contents {
_, delErr := h.client.DeleteObject(ctx, &awss3.DeleteObjectInput{
// Delete objects in batch
if len(listOutput.Contents) > 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine for now, but you could consider doing ListObjectVersions and pass the version ID to delete to be able to support versioned buckets as well (it would still work the same on nonversioned buckets).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added testing with versioned buckets to https://scality.atlassian.net/browse/LOGC-16 to make sure we don't forget.

When no offset exists, max() returns NULL which ClickHouse driver
converts to Unix epoch (1970-01-01).
Use maxOrNull() and sql.NullTime to properly detect NULL values.
Return Go zero time for missing offsets, enabling reliable IsZero() checks.
This makes the case in which a bucket does not have a committed offset
yet more explicit.
Add OffsetManagerInterface and UploaderInterface to enable
tests to use custom implementations (for injecting errors).
Existing implementations already satisfy these interfaces.
- S3TestHelper for bucket/object operations
- CountingUploader for tracking upload attempts
- InsertTestLogWithTargetBucket for inserting test data
- FailingOffsetManager for simulating offset commit failures
- Discovery cycle with configurable interval
- Parallel batch processing with worker pool
- Retry logic
- Error classification (permanent vs transient)
Transform stub main into complete application
- Load and validate configuration
- Set up logging
- Create and initialize processor
- Signal handling
- Shutdown timeout with cleanup
@dvasilas dvasilas merged commit 97a76e9 into main Nov 20, 2025
2 of 3 checks passed
@dvasilas dvasilas deleted the improvement/LOGC-7 branch November 20, 2025 10:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants