Skip to content

Commit

Permalink
Fix HWM detection
Browse files Browse the repository at this point in the history
  • Loading branch information
elffjs committed Nov 6, 2024
1 parent c2abd1c commit 42a465b
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions cmd/devices-api/find_old_style_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (fost *findOldStyleTasks) Execute(_ context.Context, _ *flag.FlagSet, _ ...

MsgLoop:
for m := range pc.Messages() {
if m.Offset >= hwm {
if m.Offset >= hwm-1 {
fost.logger.Info().Msgf("Finished processing parition %d.", p)
continue MsgLoop
}
Expand All @@ -92,10 +92,10 @@ func (fost *findOldStyleTasks) Execute(_ context.Context, _ *flag.FlagSet, _ ...

if out.Data.SyntheticDevice == nil {
missing[key] = struct{}{}
fost.logger.Warn().Str("userDeviceId", out.Data.UserDeviceID).Str("integrationId", out.Data.IntegrationID).Str("taskId", out.Data.TaskID).Str("key", key).Int64("offset", m.Offset).Msg("Found a bad one.")
fost.logger.Warn().Str("userDeviceId", out.Data.UserDeviceID).Time("msgTime", m.Timestamp).Str("integrationId", out.Data.IntegrationID).Str("taskId", out.Data.TaskID).Str("key", key).Int64("offset", m.Offset).Msg("Found a bad one.")
} else {
if _, ok := missing[key]; ok {
fost.logger.Info().Str("userDeviceId", out.Data.UserDeviceID).Str("integrationId", out.Data.IntegrationID).Str("taskId", out.Data.TaskID).Str("key", key).Int64("offset", m.Offset).Msg("Bad one was later replaced with a good one.")
fost.logger.Info().Str("userDeviceId", out.Data.UserDeviceID).Time("msgTime", m.Timestamp).Str("integrationId", out.Data.IntegrationID).Str("taskId", out.Data.TaskID).Str("key", key).Int64("offset", m.Offset).Msg("Bad one was later replaced with a good one.")
delete(missing, key)
}
}
Expand Down

0 comments on commit 42a465b

Please sign in to comment.