-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add dependency processor using Apache Beam #6560
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: yunmaoQu <[email protected]>
af5f794
to
60fb334
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- where is it hooked up to anything?
- what would be the e2e testing for this component?
@yurishkuro I have fixed it |
Signed-off-by: yunmaoQu <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mahadzaryab1 interesting direction here
Signed-off-by: yunmaoQu <[email protected]>
@yurishkuro Except this ,I update all based on your review. |
config *Config | ||
aggregator *dependencyAggregator // Define the aggregator below. | ||
telset component.TelemetrySettings | ||
dependencyWriter *memory.Store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as I mentioned, you cannot have concrete store dependency here. The processor needs to work with any storage supported by Jaeger, as long as they implement WriteDependencies.
Example:
f, err := jaegerstorage.GetStorageFactory(storageName, host) |
func (tp *dependencyProcessor) Shutdown(ctx context.Context) error { | ||
close(tp.closeChan) | ||
if tp.aggregator != nil { | ||
if err := tp.aggregator.Close(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if aggregator has a Close() function why does it need to be passed closeChan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
// is considered complete and ready for dependency aggregation. | ||
// Default trace completion timeout: 2 seconds of inactivity | ||
InactivityTimeout time.Duration `yaml:"inactivity_timeout"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add Validate method and use valid:
notations in the field tags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
cmd/jaeger/internal/processors/dependencyprocessor/processor.go
Outdated
Show resolved
Hide resolved
cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go
Outdated
Show resolved
Hide resolved
Signed-off-by: yunmaoQu <[email protected]>
@yurishkuro I have fixed it |
cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go
Outdated
Show resolved
Hide resolved
Signed-off-by: yunmaoQu <[email protected]>
func (agg *dependencyAggregator) Start(closeChan chan struct{}) { | ||
agg.closeChan = closeChan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (agg *dependencyAggregator) Start(closeChan chan struct{}) { | |
agg.closeChan = closeChan | |
func (agg *dependencyAggregator) Start() { |
eventTime: time.Now(), | ||
} | ||
select { | ||
case agg.inputChan <- event: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the motivation for having this done in the background instead of in the caller goroutine? Are the operations on Beam pipeline threadsafe or is this the reason for separation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation for processing spans in the background (via a separate goroutine) rather than in the caller goroutine is primarily related to performance optimization, decoupling of concerns, and ensuring thread safety when interacting with the Apache Beam pipeline
config: &cfg, | ||
telset: telset, | ||
dependencyWriter: dependencyWriter, | ||
inputChan: make(chan spanEvent, 1000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the motivation for making this a bound queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation for making the inputChan a bounded queue (a buffered channel with a fixed size, e.g., 1000) is primarily to manage backpressure, control resource usage, and ensure system stability in high-throughput scenarios
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation for making the inputChan a bounded queue (a buffered channel with a fixed size, e.g., 1000) is primarily to manage backpressure, control resource usage, and ensure system stability in high-throughput scenarios
When a channel is unbounded, it cannot be written to unless there there is a reader waiting to consume it, so it provides a natural back pressure as the caller goroutine will be blocked and hold the remote caller. And it does not allow the queue to grow and accumulate unprocessed data while making it look like the processing was immediately successful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yurishkuro Ok,i will fix it . Could the part of code is ready to be merged ?
Hi @mahadzaryab1 , would you mind taking a look at my PR when you have a moment? I'd appreciate your feedback. Thanks! |
@yurishkuro I will fix the part you mentioned . Could the part of code is ready to be merged ? |
Hey @mahadzaryab1 , I know you're familiar with this part of the code. Could you give my PR a look and share your thoughts? Thank you! |
Which problem is this PR solving?
Resolves #5911
Description of the changes
How was this change tested?
Checklist
jaeger
:make lint test
jaeger-ui
:npm run lint
andnpm run test