Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add E2E Integration Test For Adaptive Sampling Processor #5951

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6cc7c1b
Setup Docker Compose With Jaeger All In One And Tracegen
mahadzaryab1 Sep 7, 2024
e4eb3b6
Use V2 Binary Instead of V1
mahadzaryab1 Sep 7, 2024
ea76c8e
Adjust Parameters For Integration Test
mahadzaryab1 Sep 7, 2024
5483545
Fix Makefile Cleanup
mahadzaryab1 Sep 7, 2024
8ff74a7
Expose Port 4318 In Jaeger
mahadzaryab1 Sep 7, 2024
6a9699a
Revert To Port 5000
mahadzaryab1 Sep 7, 2024
a65c190
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Sep 8, 2024
3018caf
Remove Leader Check In Adaptive Strategy Provider
mahadzaryab1 Sep 13, 2024
d7ba2ce
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Sep 13, 2024
c3b1ca4
Reduce Calculation Interval And Calculation Delay
mahadzaryab1 Sep 14, 2024
7561945
Remove Unused Method
mahadzaryab1 Sep 14, 2024
9aaaf75
Make Forwarding Port Explicit
mahadzaryab1 Sep 15, 2024
3dc923d
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Sep 15, 2024
7dd33d1
Hardcode Adaptive Sampling
mahadzaryab1 Sep 16, 2024
b2be33c
Add Expvar Extension
mahadzaryab1 Sep 16, 2024
b996270
Add Script For E2E Integration Test
mahadzaryab1 Sep 17, 2024
d661a0f
Add Github Action
mahadzaryab1 Sep 17, 2024
7aba57e
Fix Typo
mahadzaryab1 Sep 17, 2024
c9811ab
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Sep 17, 2024
735704c
Add Build Step To Script
mahadzaryab1 Sep 17, 2024
f485cc8
Add Missing Components To Workflow File
mahadzaryab1 Sep 17, 2024
54cd6b8
Add ExpVar Debugging For Post Aggregator Service Cache
mahadzaryab1 Oct 6, 2024
cf8dd9c
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Oct 6, 2024
e18d8d5
Use New Configuration Schema
mahadzaryab1 Oct 6, 2024
24d11d4
Patch To Only Remove One Check
mahadzaryab1 Oct 6, 2024
9ebf133
Fix Linting
mahadzaryab1 Oct 6, 2024
ca9a8c9
Comment Out Failing Tests For Now
mahadzaryab1 Oct 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .github/workflows/ci-e2e-adaptivesampling-processor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Test Adaptive Sampling Processor

on:
push:
branches: [main]

pull_request:
branches: [main]

concurrency:
group: ${{ github.workflow }}-${{ (github.event.pull_request && github.event.pull_request.number) || github.ref || github.run_id }}
cancel-in-progress: true

# See https://github.com/ossf/scorecard/blob/main/docs/checks.md#token-permissions
permissions: # added using https://github.com/step-security/secure-workflows
contents: read

jobs:
adaptivesampling-processor:
runs-on: ubuntu-latest
steps:
- name: Harden Runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
with:
egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs

- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7

- uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2
with:
go-version: 1.23.x

- name: Run Adaptive Sampling Processor Integration Test
run: bash scripts/adaptive-sampling-integration-test.sh

- name: Upload coverage to codecov
uses: ./.github/actions/upload-codecov
with:
files: cover.out
flags: adaptivesampling-processor
31 changes: 31 additions & 0 deletions docker-compose/adaptive-sampling/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) 2024 The Jaeger Authors.
# SPDX-License-Identifier: Apache-2.0

BINARY ?= jaeger

.PHONY: build
build: clean-jaeger
cd ../../ && make build-$(BINARY) GOOS=linux
cd ../../ && make create-baseimg PLATFORMS=linux/$(shell go env GOARCH)
cd ../../ && docker buildx build --target release \
--tag jaegertracing/$(BINARY):dev \
--build-arg base_image=localhost:5000/baseimg_alpine:latest \
--build-arg debug_image=not-used \
--build-arg TARGETARCH=$(shell go env GOARCH) \
--load \
cmd/$(BINARY)

.PHONY: dev
dev: export JAEGER_IMAGE_TAG = dev
dev: build
docker compose -f docker-compose.yml up $(DOCKER_COMPOSE_ARGS)

.PHONY: clean-jaeger
clean-jaeger:
# Also cleans up intermediate cached containers.
docker system prune -f

.PHONY: clean-all
clean-all: clean-jaeger
docker rmi -f jaegertracing/jaeger:dev ; \
docker rmi -f jaegertracing/jaeger:latest ;
19 changes: 19 additions & 0 deletions docker-compose/adaptive-sampling/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
services:
jaeger:
image: jaegertracing/jaeger:${JAEGER_IMAGE_TAG:-latest}
volumes:
- "./jaeger-v2-config.yml:/etc/jaeger/config.yml"
command: ["--config", "/etc/jaeger/config.yml"]
ports:
- "16686:16686"
- "5778:5778"
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
- "4318:4318"
- "27777:27777"

tracegen:
image: jaegertracing/jaeger-tracegen:latest
environment:
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4318/v1/traces
command: ["-adaptive-sampling", "http://jaeger:5778/api/sampling", "-pause", "100ms", "-duration", "60m"]
depends_on:
- jaeger
45 changes: 45 additions & 0 deletions docker-compose/adaptive-sampling/jaeger-v2-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
service:
extensions: [jaeger_storage, jaeger_query, remote_sampling, healthcheckv2, expvar]
pipelines:
traces:
receivers: [otlp]
processors: [batch, adaptive_sampling]
exporters: [jaeger_storage_exporter]

extensions:
healthcheckv2:
use_v2: true
http:
jaeger_query:
trace_storage: some_store
jaeger_storage:
backends:
some_store:
memory:
max_traces: 100000
remote_sampling:
adaptive:
sampling_store: some_store
initial_sampling_probability: 1.0
target_samples_per_second: 1
calculation_interval: 10s
calculation_delay: 20s
http:
grpc:
expvar:
port: 27777

receivers:
otlp:
protocols:
grpc:
http:
endpoint: 0.0.0.0:4318

processors:
batch:
adaptive_sampling:

exporters:
jaeger_storage_exporter:
trace_storage: some_store
2 changes: 1 addition & 1 deletion plugin/sampling/strategyprovider/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (a *aggregator) HandleRootSpan(span *span_model.Span, logger *zap.Logger) {
}
samplerType, samplerParam := span.GetSamplerParams(logger)
if samplerType == span_model.SamplerTypeUnrecognized {
return
samplerType = span_model.SamplerTypeProbabilistic
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro what kind of a config do we want to add to perform this override?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like "do not check sampler tags"

Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro should this config be exposed as part of the YAML configuration? or do we just want it to be internal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be user settable

}
a.RecordThroughput(service, span.OperationName, samplerType, samplerParam)
}
41 changes: 21 additions & 20 deletions plugin/sampling/strategyprovider/adaptive/post_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
// nested map: service -> operation -> throughput.
type serviceOperationThroughput map[string]map[string]*model.Throughput

func (t serviceOperationThroughput) get(service, operation string) (*model.Throughput, bool) {

Check failure on line 41 in plugin/sampling/strategyprovider/adaptive/post_aggregator.go

View workflow job for this annotation

GitHub Actions / lint

func `serviceOperationThroughput.get` is unused (unused)
svcThroughput, ok := t[service]
if ok {
v, ok := svcThroughput[operation]
Expand Down Expand Up @@ -376,30 +376,31 @@
return p1
}

func (p *PostAggregator) isUsingAdaptiveSampling(

Check failure on line 379 in plugin/sampling/strategyprovider/adaptive/post_aggregator.go

View workflow job for this annotation

GitHub Actions / lint

unused-receiver: method receiver 'p' is not referenced in method's body, consider removing or renaming it as _ (revive)
probability float64,

Check failure on line 380 in plugin/sampling/strategyprovider/adaptive/post_aggregator.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'probability' seems to be unused, consider removing or renaming it as _ (revive)
service string,

Check failure on line 381 in plugin/sampling/strategyprovider/adaptive/post_aggregator.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'service' seems to be unused, consider removing or renaming it as _ (revive)
operation string,

Check failure on line 382 in plugin/sampling/strategyprovider/adaptive/post_aggregator.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'operation' seems to be unused, consider removing or renaming it as _ (revive)
throughput serviceOperationThroughput,

Check failure on line 383 in plugin/sampling/strategyprovider/adaptive/post_aggregator.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'throughput' seems to be unused, consider removing or renaming it as _ (revive)
) bool {
if FloatEquals(probability, p.InitialSamplingProbability) {
// If the service is seen for the first time, assume it's using adaptive sampling (ie prob == initialProb).
// Even if this isn't the case, the next time around this loop, the newly calculated probability will not equal
// the initialProb so the logic will fall through.
return true
}
if opThroughput, ok := throughput.get(service, operation); ok {
f := TruncateFloat(probability)
_, ok := opThroughput.Probabilities[f]
return ok
}
// By this point, we know that there's no recorded throughput for this operation for this round
// of calculation. Check the previous bucket to see if this operation was using adaptive sampling
// before.
if len(p.serviceCache) > 1 {
if e := p.serviceCache[1].Get(service, operation); e != nil {
return e.UsingAdaptive && !FloatEquals(e.Probability, p.InitialSamplingProbability)
}
}
return false
// if FloatEquals(probability, p.InitialSamplingProbability) {
// // If the service is seen for the first time, assume it's using adaptive sampling (ie prob == initialProb).
// // Even if this isn't the case, the next time around this loop, the newly calculated probability will not equal
// // the initialProb so the logic will fall through.
// return true
// }
// if opThroughput, ok := throughput.get(service, operation); ok {
// f := TruncateFloat(probability)
// _, ok := opThroughput.Probabilities[f]
// return ok
// }
// // By this point, we know that there's no recorded throughput for this operation for this round
// // of calculation. Check the previous bucket to see if this operation was using adaptive sampling
// // before.
// if len(p.serviceCache) > 1 {
// if e := p.serviceCache[1].Get(service, operation); e != nil {
// return e.UsingAdaptive && !FloatEquals(e.Probability, p.InitialSamplingProbability)
// }
// }
// return false
return true
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro this causes the unit tests to fail and I believe its messing with the calculations as well. Any ideas on how we can get around this? If we don't hardcode this here however, the probability only gets calculated once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's very difficult to troubleshoot like this. I would suggest maybe altering tracegen and manually adding the sampler.type=probabilistic / sampler.param=0.5 (any value for now) attributes to the span to see how the system reacts to this. To my knowledge aside from this check the probability used by the sampler should not be affecting the calculations, but I may be wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and another thing would help is to expose internal state via expvar so that we can actually monitor how that state changes.

}
11 changes: 2 additions & 9 deletions plugin/sampling/strategyprovider/adaptive/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,14 @@ func (p *Provider) runUpdateProbabilitiesLoop() {
for {
select {
case <-ticker.C:
// Only load probabilities if this strategy_store doesn't hold the leader lock
if !p.isLeader() {
p.loadProbabilities()
p.generateStrategyResponses()
}
p.loadProbabilities()
p.generateStrategyResponses()
case <-p.shutdown:
return
}
}
}

func (p *Provider) isLeader() bool {
return p.electionParticipant.IsLeader()
}

// generateStrategyResponses generates and caches SamplingStrategyResponse from the calculated sampling probabilities.
func (p *Provider) generateStrategyResponses() {
p.RLock()
Expand Down
99 changes: 89 additions & 10 deletions scripts/adaptive-sampling-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,95 @@

set -euf -o pipefail

# This script is currently a placeholder.
compose_file=docker-compose/adaptive-sampling/docker-compose.yml

# Commands to run integration test:
# SAMPLING_STORAGE_TYPE=memory SAMPLING_CONFIG_TYPE=adaptive go run -tags=ui ./cmd/all-in-one --log-level=debug
# go run ./cmd/tracegen -adaptive-sampling=http://localhost:14268/api/sampling -pause=10ms -duration=60m
set -x

# Check how strategy is changing
# curl 'http://localhost:14268/api/sampling?service=tracegen' | jq .
timeout=600
end_time=$((SECONDS + timeout))
success="false"

# Issues
# - SDK does not report sampling probability in the tags the way Jaeger SDKs did
# - Server probably does not recognize spans as having adaptive sampling without sampler info
# - There is no way to modify target traces-per-second dynamically, must restart collector.
threshold=0.5

dump_logs() {
echo "::group:: docker logs"
docker compose -f $compose_file logs
echo "::endgroup::"
}

teardown_services() {
if [[ "$success" == "false" ]]; then
dump_logs
fi
docker compose -f $compose_file down
}

check_service_health() {
local service_name=$1
local url=$2
echo "Checking health of service: $service_name at $url"

local wait_seconds=3
local curl_params=(
--silent
--output
/dev/null
--write-out
"%{http_code}"
)
while [ $SECONDS -lt $end_time ]; do
if [[ "$(curl "${curl_params[@]}" "${url}")" == "200" ]]; then
echo "✅ $service_name is healthy"
return 0
fi
echo "Waiting for $service_name to be healthy..."
sleep $wait_seconds
done

echo "❌ ERROR: $service_name did not become healthy in time"
return 1
}

wait_for_services() {
echo "Waiting for services to be up and running..."
check_service_health "Jaeger" "http://localhost:16686"
}

check_tracegen_probability() {
local url="http://localhost:5778/api/sampling?service=tracegen"
response=$(curl -s "$url")
probability=$(echo "$response" | jq .operationSampling | jq -r '.perOperationStrategies[] | select(.operation=="lets-go")' | jq .probabilisticSampling.samplingRate)
if [ ! -z "$probability" ]; then
if (( $(echo "$probability < $threshold" |bc -l) )); then
return 0
fi
fi
return -1
}

check_adaptive_sampling() {
local wait_seconds=10
while [ $SECONDS -lt $end_time ]; do
if check_tracegen_probability; then
success="true"
break
fi
sleep $wait_seconds
done
if [[ "$success" == "false" ]]; then
echo "❌ ERROR: Adaptive sampling probability did not drop below "$threshold"."
exit 1
else
echo "✅ Adaptive sampling probability integration test passed"
fi
}

main() {
(cd docker-compose/adaptive-sampling && make dev DOCKER_COMPOSE_ARGS="-d")
wait_for_services
check_adaptive_sampling
}

trap teardown_services EXIT INT

main
Loading