Skip to content

Commit

Permalink
Added Instant query to downloaded blocks.
Browse files Browse the repository at this point in the history
Signed-off-by: Kushal Shukla <[email protected]>
  • Loading branch information
kushalShukla-web committed Nov 19, 2024
1 parent 6337ce7 commit b224f59
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 16 deletions.
12 changes: 7 additions & 5 deletions prombench/manifests/prombench/benchmark/6_loadgen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
- name: simple_range
interval: 10s
type: range
start: 4h
start: 2h
end: 1h
step: 15s
queries:
Expand All @@ -31,7 +31,7 @@ data:
- name: aggr_range
interval: 30s
type: range
start: 4h
start: 1h
end: 0h
step: 15s
queries:
Expand Down Expand Up @@ -129,17 +129,19 @@ spec:
- name: download-key
image: kushalshukla/builder
imagePullPolicy: Always
command: [ "/download-key/key.sh" ]
command: [ "/go/src/github.com/key.sh" ]
env:
- name: PR_NUMBER
value: "{{ .PR_NUMBER }}"
- name: GITHUB_ORG
value: "{{ .GITHUB_ORG }}"
- name: GITHUB_REPO
value: "{{ .GITHUB_REPO }}"
value: "{{ .GITHUB_REPO }}"
- name: STORAGE
value: "/storage-paths"
volumeMounts:
- name: key
mountPath: /config
mountPath: /storage-paths
containers:
- name: prom-load-generator
image: kushalshukla/load-generator
Expand Down
62 changes: 51 additions & 11 deletions tools/load-generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"io"
"log"
"math/rand"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -98,6 +99,12 @@ type KeyConfig struct {
MaxTime int64 `yaml:"maxTime"`
}

type configState struct {
keyConfig *KeyConfig
Err error
StaticValues []int64 // For storing the static absolute time of the downloaded blocks.
}

func NewQuerier(groupID int, target, prNumber string, qg QueryGroup) *Querier {
qtype := qg.Type
if qtype == "" {
Expand Down Expand Up @@ -126,6 +133,18 @@ func NewQuerier(groupID int, target, prNumber string, qg QueryGroup) *Querier {
}
}

// This function returns an array of length 5 containing instant values between minTime and maxTime.
// To calculate the values, the formula (maxTime - minTime) / int64(count+1) is used to determine the step size.
// The step size is then added incrementally to minTime to generate the values.
func generateStaticValues(minTime, maxTime int64, count int) []int64 {
step := (maxTime - minTime) / int64(count+1)
values := make([]int64, count)
for i := 0; i < count; i++ {
values[i] = minTime + step*int64(i+1)
}
return values
}

// Function to load `minTime` and `maxTime` from key.yml
func loadKeyConfig() (*KeyConfig, error) {
filePath := "/config/key.yml"
Expand All @@ -148,20 +167,33 @@ func loadKeyConfig() (*KeyConfig, error) {
return &keyConfig, nil
}

func (q *Querier) run(wg *sync.WaitGroup) {
func configstate(v *KeyConfig, err error) (*configState, error) {
var staticValues []int64
// if there is an error in extracting key.yml file then it means there is no way to query
// data on downloaded blocks. so keep generateStateValues slices to be empty.
if err == nil {
staticValues = generateStaticValues(v.MinTime, v.MaxTime, 5)
}
return &configState{
keyConfig: v,
Err: err,
StaticValues: staticValues,
}, nil
}

func (q *Querier) run(wg *sync.WaitGroup, timeBound *configState) {
defer wg.Done()
fmt.Printf("Running querier %s %s for %s\n", q.target, q.name, q.url)
time.Sleep(20 * time.Second)

keyConfig, err := loadKeyConfig()

for {
start := time.Now()

for _, query := range q.queries {
q.query(query.Expr, "current", nil)
if err == nil {
q.query(query.Expr, "absolute", keyConfig)
// if there is an error we can avoid to go on absolute block.
if timeBound.Err == nil {
q.query(query.Expr, "absolute", timeBound)
}
}

Expand All @@ -172,7 +204,7 @@ func (q *Querier) run(wg *sync.WaitGroup) {
}
}

func (q *Querier) query(expr string, timeMode string, keyConfig *KeyConfig) {
func (q *Querier) query(expr string, timeMode string, timeBound *configState) {
queryCount.WithLabelValues(q.target, q.name, expr, q.qtype).Inc()
start := time.Now()

Expand All @@ -187,17 +219,20 @@ func (q *Querier) query(expr string, timeMode string, keyConfig *KeyConfig) {
qParams.Set("query", expr)
if q.qtype == "range" {
if timeMode == "current" {
fmt.Println("range , current blocks")
qParams.Set("start", fmt.Sprintf("%d", int64(time.Now().Add(-q.start).Unix())))
qParams.Set("end", fmt.Sprintf("%d", int64(time.Now().Add(-q.end).Unix())))
qParams.Set("step", q.step)
} else {
fmt.Println("range , absolute blocks")
endTime := time.Unix(0, keyConfig.MaxTime*int64(time.Millisecond))
endTime := time.Unix(0, timeBound.keyConfig.MaxTime*int64(time.Millisecond))
qParams.Set("start", fmt.Sprintf("%d", int64(endTime.Add(-q.start).Unix())))
qParams.Set("end", fmt.Sprintf("%d", int64(endTime.Add(-q.end).Unix())))
qParams.Set("step", q.step)
}
} else if timeMode == "absolute" {
x := timeBound.StaticValues
randomIndex := rand.Intn(len(x)) // calculating random index between StaticValues slice.
instantTime := time.Unix(0, x[randomIndex]*int64(time.Millisecond))
qParams.Set("time", fmt.Sprintf("%d", int64(instantTime.Unix())))
}
req.URL.RawQuery = qParams.Encode()

Expand Down Expand Up @@ -263,11 +298,16 @@ func main() {

var wg sync.WaitGroup

keyConfig, err := loadKeyConfig()
timeBound, err := configstate(keyConfig, err)
if err != nil {
log.Fatalf("Error creating app state: %v", err)
}
for i, group := range config.Querier.Groups {
wg.Add(1)
go NewQuerier(i, "pr", prNumber, group).run(&wg)
go NewQuerier(i, "pr", prNumber, group).run(&wg, timeBound)
wg.Add(1)
go NewQuerier(i, "release", prNumber, group).run(&wg)
go NewQuerier(i, "release", prNumber, group).run(&wg, timeBound)
}

prometheus.MustRegister(queryDuration, queryCount, queryFailCount)
Expand Down

0 comments on commit b224f59

Please sign in to comment.