Add batching implementation for redis_hash output#3772
Add batching implementation for redis_hash output#3772Lusori0 wants to merge 4 commits intoredpanda-data:mainfrom
Conversation
| Description("A map of key/value pairs to set as hash fields."). | ||
| Default(map[string]any{}), | ||
| service.NewOutputMaxInFlightField(), | ||
| service.NewBatchPolicyField(loFieldBatching), |
There was a problem hiding this comment.
hoFieldBatching is defined at line 34 but loFieldBatching (from the redis list output) is used here and at line 92. Both resolve to "batching" so it works at runtime, but this should use the hash output's own constant hoFieldBatching.
Rule: field names must use the component-prefix convention <componentAbbrev>Field<Name> — godev agent.
| return service.ErrNotConnected | ||
| } | ||
|
|
||
| func (r *redisHashWriter) buildMessage(msg *service.Message) (string, map[string]any, error) { |
There was a problem hiding this comment.
Bug: buildMessage takes a single *service.Message and uses r.key.TryString(msg) (line 177) and v.TryString(msg) (line 196) for interpolation. These create a faux batch of size 1 internally, so batch-context interpolation functions (batch_size(), batch_index(), windowed aggregations) will return incorrect results when used in key or fields expressions.
The batch path should use batch.TryInterpolatedString(i, r.key) and the batch-aware equivalent for fields, as output_list.go does in its WriteBatch.
Fix: either make buildMessage accept the full batch and index, or inline the batch-aware interpolation calls in the batch path and keep buildMessage only for the single-message fast path.
| if len(batch) == 1 { | ||
| key, fields, err := r.buildMessage(batch[0]) | ||
| if err != nil { | ||
| err = fmt.Errorf("failed to create message: %v", err) |
There was a problem hiding this comment.
%v breaks the error chain. Use %w so callers can use errors.Is/errors.As on the inner error. Same issue at line 231.
Rule: "Use %w for wrapping. Use %v only when you intentionally want to break the error chain." — godev agent.
|
Commits
Review
|
Closes #3771
This adds batching to the redis_hash output. The implementation heavily draws from the implementation of the redis_list output.