Skip to content

Add batching implementation for redis_hash output#3772

Open
Lusori0 wants to merge 4 commits intoredpanda-data:mainfrom
Lusori0:feat/redis_hash_output_batching
Open

Add batching implementation for redis_hash output#3772
Lusori0 wants to merge 4 commits intoredpanda-data:mainfrom
Lusori0:feat/redis_hash_output_batching

Conversation

@Lusori0
Copy link

@Lusori0 Lusori0 commented Nov 13, 2025

Closes #3771

This adds batching to the redis_hash output. The implementation heavily draws from the implementation of the redis_list output.

@Lusori0 Lusori0 requested a review from mmatczuk November 14, 2025 15:48
@Lusori0 Lusori0 requested a review from mmatczuk November 17, 2025 12:10
Copy link
Contributor

@mmatczuk mmatczuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code review

Description("A map of key/value pairs to set as hash fields.").
Default(map[string]any{}),
service.NewOutputMaxInFlightField(),
service.NewBatchPolicyField(loFieldBatching),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoFieldBatching is defined at line 34 but loFieldBatching (from the redis list output) is used here and at line 92. Both resolve to "batching" so it works at runtime, but this should use the hash output's own constant hoFieldBatching.

Rule: field names must use the component-prefix convention <componentAbbrev>Field<Name>godev agent.

return service.ErrNotConnected
}

func (r *redisHashWriter) buildMessage(msg *service.Message) (string, map[string]any, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: buildMessage takes a single *service.Message and uses r.key.TryString(msg) (line 177) and v.TryString(msg) (line 196) for interpolation. These create a faux batch of size 1 internally, so batch-context interpolation functions (batch_size(), batch_index(), windowed aggregations) will return incorrect results when used in key or fields expressions.

The batch path should use batch.TryInterpolatedString(i, r.key) and the batch-aware equivalent for fields, as output_list.go does in its WriteBatch.

Fix: either make buildMessage accept the full batch and index, or inline the batch-aware interpolation calls in the batch path and keep buildMessage only for the single-message fast path.

if len(batch) == 1 {
key, fields, err := r.buildMessage(batch[0])
if err != nil {
err = fmt.Errorf("failed to create message: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%v breaks the error chain. Use %w so callers can use errors.Is/errors.As on the inner error. Same issue at line 231.

Rule: "Use %w for wrapping. Use %v only when you intentionally want to break the error chain." — godev agent.

@mmatczuk
Copy link
Contributor

Commits

  1. All 4 commits lack the required system: message format. These should be prefixed with redis_hash: (e.g., redis_hash: add batching implementation).
  2. Less confusing naming and Fix logging are vague — the message should describe what was renamed or what logging was fixed.

Review
Adds batching to redis_hash output by switching to MustRegisterBatchOutput and using Redis pipelines. The implementation follows the output_list.go pattern but introduces a buildMessage helper that loses batch context.

  1. Bug: batch-unaware interpolationbuildMessage uses single-message TryString instead of batch-aware TryInterpolatedString, breaking batch-context functions like batch_size() and batch_index(): output_hash.go#L176-L200
  2. Wrong field constantloFieldBatching (redis list) used instead of the defined hoFieldBatching (redis hash): output_hash.go#L84
  3. Error wrapping %v — New error wrapping in WriteBatch uses %v instead of %w, breaking the error chain: output_hash.go#L215
  4. No tests — No unit or integration tests for the new batching behavior. No output_hash_test.go exists. The buildMessage helper and WriteBatch pipeline/error paths are untested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add batching to redis_hash output to improve performance

2 participants