-
Notifications
You must be signed in to change notification settings - Fork 157
Set unique flow id & timestamp to each flow #343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Set unique flow id & timestamp to each flow #343
Conversation
|
@jem-davies @gregfurman |
|
@sananguliyev Hey! Thanks for the contribution and apologies for the delay. Will take a look this eve 😄 |
gregfurman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions relating to the metadata approach. Think better internal tracing is a brilliant idea so I'm keen to hear your thoughts on my comments 😄
| } | ||
|
|
||
| // Try to use OpenTelemetry trace ID if available | ||
| if traceID := tracing.GetTraceID(part); traceID != "" && traceID != "00000000000000000000000000000000" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not rely soley on the TraceID as the FlowID? Or are you trying to account for the case where a span wasn't initialised (I'm no expert in OTEL so pardon my ignorance)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose here is to reuse something if exists. If the user enabled tracing then message has its own ID and here I wanted to use it instead of generating new one, and it's also better in order to link trace and internal events if someone wants to use flow ID for some purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gregfurman I did not get whether you agree to keep this like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I suppose this makes sense. Do you imagine a situation where the flowID and traceID should ever differ?
Also, perhaps we set 00000000000000000000000000000000 to a const so it's not a magic string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think they should ever differ if tracing enabled. The whole point here is to have some ID, but not optional like trace ID. And also if there is trace ID I do not see a problem with having different IDs.
internal/bundle/tracing/events.go
Outdated
| if flowID, exists := part.MetaGetMut("_bento_flow_id"); exists { | ||
| if flowIDStr, ok := flowID.(string); ok && flowIDStr != "" { | ||
| return flowIDStr | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure storing this in the message metadata is the best approach. For example, there are instances where metadata is dumped to a JSON body, sent as headers, and sometimes even fully reset/cleared. Also, the fact that metadata is mutable means a user could hypothetically overwrite this data and lose it.
If we are going to be storing this info, what're your thoughts on perhaps rather using the message context as a store (similar to how OTEL does it)?
i.e
type flowTraceKey struct{} // store with an internal struct as a key to avoid collisions/overwrites
func WithFlowD(ctx context.Context, flowID string) context.Context {
return context.WithValue(ctx, traceKey{}, traceID)
}
func SetFlowID(p *message.Part) string {
ctx := message.GetContext(p)
// V7 UUIDs are sortable since they're time ordered
// hence no real need for timestamps as well
// (see https://pkg.go.dev/github.com/google/uuid#NewV7)
flowID, _ := uuid.NewV7()
return context.WithValue(ctx, traceKey{}, flowID.String())
}
func GetFlowID(ctx context.Context) string {
if id, ok := ctx.Value(traceKey{}).(string); ok {
return id
}
return ""
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, initially I did not plan to add it as bloblang method and wanted to make it only accessible from tracing summary api, but then just decided to add it in order to make it accessible from bloblang, too. I did not investigate too much before adding bloblang method. Your suggestion sounds good, but I first need to check it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exposing this as a bloblang method is also an option, and I can remove this method implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer the context based approach. Else, the more we can rely on open telemetry tracing context the better!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @sananguliyev 👋 Do you keen have capacity to finish up this PR? I'm happy to take it over otherwise 😄 Lmk!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @gregfurman, I might have some time next week. I will try to apply what you suggested until next weekend. If you want to release in the next a couple of days then feel free to take it over :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No pressure 🙂 Let me move it back into draft then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found some time today, and fixed the issue you pointed here. You're absolutely right about metadata being mutable and users can mess with that, which should not be the case since flow ID is immutable by nature. So please take another look. And feel free to make changes here in case you see something small, in order to move faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gregfurman do you have capacity to review this? I am planning to make another contribution to enable CDC source, but first want to finish this and then contribute the component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey! Sorry I must've missed the notification since I placed the PR into draft 🤦
I'll give the PR another look this evening 🙂
gregfurman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the follow ups on this!
I'm thinking whether we want to ever set the flowID values outside of the traceInput (i.e when a message part is first created), although I suppose there's the case where we want to create child-spans.
bento/internal/bundle/tracing/input.go
Lines 52 to 57 in 852a365
| if t.e.IsEnabled() { | |
| _ = tran.Payload.Iter(func(i int, part *message.Part) error { | |
| _ = atomic.AddUint64(t.ctr, 1) | |
| t.e.Add(EventProduceOf(part)) | |
| return nil | |
| }) |
Let me know!
| // TODO: Find a better way of locating deletes (using batch index tracking). | ||
| t.e.Add(EventDeleteOf()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Should we deprecate EventDeleteOf() in favour of this EventDeleteOfPart function?
|
|
||
| assert.Equal(t, `{"content":"waddup","id":"HELLO WORLD","meta":{"workflow":{"succeeded":["fooproc"]}}}`, outValue) | ||
|
|
||
| // Normalize events for testing by removing FlowID, Timestamp, and _bento_flow_id metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we still using this _bento_flow_id approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need some cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were using the struct{} approach instead of the string for specifying the key?
| `meta flow_id = flow_id()`, | ||
| ), | ||
| ).Experimental(), | ||
| func(fCtx FunctionContext) (any, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to be setting the flowID from within this bloblang function?
Should it not rather just retrieve the flowID/traceID
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have this, I thought it makes sense to have access to this value in case some users want to add it to logs or somewhere else for logging or even some other purposes, because it's same as trace ID which we can currently access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function should change to only receive the flowID, and not set it if not found. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main idea here is it should always be available. Regardless where it's first called it should either return existing and create new and set to ctx. E.g. we have steps a and b. it's always created at a, and when b reads it it get the already created one. If there is any case and for whatever reason we add one more step which runs before the a then it creates flow id and a just get what is there instead of creating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should always be created anyway if tracing is enabled since the input component is wrapped in a tracedInput component.
Imagine this case:
input:
resource: A
pipeline:
processors:
- resource: B
- mapping: "meta flow_id = flow_id()" # <-- set and get here
output:
resource: CIf not set at the input layer, setting a flow_id at the mapping step does not help us at all with knowing the flow of a message through a pipeline -- only that it was sent on to a resource C
Since all messages (in a normal pipeline) are initially created at the input layer, EventProduce (which is at the input layer) means there should be an attached flowID -- making the create call redundant.
bento/internal/bundle/tracing/events.go
Lines 42 to 48 in b90d55a
| return NodeEvent{ | |
| Type: EventProduce, | |
| Content: string(part.AsBytes()), | |
| Meta: meta, | |
| FlowID: getOrCreateFlowID(part), | |
| Timestamp: time.Now(), | |
| } |
Does this make sense? Or perhaps I conceptually not understanding the inclusion of flowID and when we expect it to be called.
| return flowID, nil | ||
| } | ||
|
|
||
| // Try to use OpenTelemetry trace ID if available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we remove some of these comments? Also this looks largely identical to the getOrCreateFlowID
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely need some general cleanup.
| } | ||
|
|
||
| // Try to use OpenTelemetry trace ID if available | ||
| if traceID := tracing.GetTraceID(part); traceID != "" && traceID != "00000000000000000000000000000000" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I suppose this makes sense. Do you imagine a situation where the flowID and traceID should ever differ?
Also, perhaps we set 00000000000000000000000000000000 to a const so it's not a magic string?
| } | ||
| meta := make(map[string]any) | ||
| for mk, mv := range ev.Meta { | ||
| if mk != "_bento_flow_id" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already mentioned. Do we still want this metadata?
In the current build of the traced stream, tracing events are currently grouped only by section. This setup makes it difficult to link these events together and understand the complete message journey in the stream flow. The new implementation addresses this issue by:
EventsByFlowID, which organizes events by flow and arranges them in chronological order based on timestamp.Additionally: If no tags exist: git describe --tags will fail, but the error message will be suppressed by 2>/dev/null, and then it will fall back to "v0.0.0". Currently, it prints
fatal: No names found, cannot describe anything.when make file tries to generate version.Fixes #344