Skip to content

Commit

Permalink
Merge #139454
Browse files Browse the repository at this point in the history
139454: vecindex: pace vector index operations r=drewkimball a=andy-kimball

Incorporate the pacer, which tries to match the pace of foreground insert and delete operations with background fixups. Update vecbench to get observability into the progress of index builds. Show time series charts for metrics like ops/sec, latency, and fixup queue size.

Epic: CRDB-42943

Release note: None

Co-authored-by: Andrew Kimball <[email protected]>
  • Loading branch information
craig[bot] and andy-kimball committed Jan 22, 2025
2 parents 6c80490 + 42355dd commit ad49a4a
Show file tree
Hide file tree
Showing 10 changed files with 513 additions and 39 deletions.
10 changes: 9 additions & 1 deletion pkg/cmd/vecbench/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@ load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "vecbench_lib",
srcs = ["main.go"],
srcs = [
"chart_printer.go",
"main.go",
"percentile_estimator.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/cmd/vecbench",
visibility = ["//visibility:private"],
deps = [
"//pkg/sql/vecindex",
"//pkg/sql/vecindex/quantize",
"//pkg/sql/vecindex/vecstore",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/vector",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_guptarohit_asciigraph//:asciigraph",
"@com_google_cloud_go_storage//:storage",
"@org_golang_x_term//:term",
],
)

Expand Down
208 changes: 208 additions & 0 deletions pkg/cmd/vecbench/chart_printer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package main

import (
"fmt"
"math"
"strings"

"github.com/guptarohit/asciigraph"
"golang.org/x/term"
)

// chartData contains data for a single time series chart.
type chartData struct {
caption string
x int
y int
width int
height int

series []float64
start int
end int
}

// chartPrinter display a grid of ASCII time series charts in the console,
// similar to this:
//
// 1722 ┤ ╭─╮ ╭─────╮╭ 2.00 ┼──╮ ╭╮ ╭─╮ ╭╮
// 1700 ┤╭╮ │ ╰╮ ╭╮ ╭╯ ╰╯ 1.60 ┤ │ ││ │ │ ││
// 1678 ┤││ ╭╯ ╰╮ ╭╯╰╮╭─╮╭╮ ╭─╯ 1.20 ┤ ╰───╮╭─╯╰──╯ ╰─╮╭─────╯╰─────
// 1656 ┼╯│╭╯ │╭╯ ╰╯ ╰╯╰─╯ 0.80 ┤ ││ ││
// 1634 ┤ ││ ╰╯ 0.40 ┤ ││ ││
// 1612 ┤ ╰╯ 0.00 ┤ ╰╯ ╰╯
//
// ops/sec (1707.98) fixup queue size (1.00)
//
// 0.68 ┼─╮ 55.10 ┤ ╭╮
// 0.64 ┤ ╰─╮ 54.40 ┤ ╭╮ ╭╯╰─╮ ╭──╮
// 0.59 ┤ ╰╮ 53.70 ┤ │╰─╯ ╰─╯ ╰╮
// 0.54 ┤ ╰╮ 53.00 ┤ │ ╰────
// 0.50 ┤ ╰╮ 52.30 ┤ ╭╮ ╭──╯
// 0.45 ┤ ╰──────────╮ 51.60 ┼─╮╭───╯╰─╯
// 0.41 ┤ ╰───────────── 50.90 ┤ ╰╯
//
// p50 ms latency (0.41) p99 ms latency (53.15)
//
// Example usage:
//
// var cp chartPrinter
// series1 := cp.AddChart("series 1")
// series2 := cp.AddChart("series 2")
// for {
// ...
// cp.AddSample(series1, sample1)
// cp.AddSample(series2, sample2)
// cp.Plot()
// ...
// }
type chartPrinter struct {
// Footer is the number of rows to reserve below the charts.
Footer int
// Don't show the chart. This will be automatically set if the console size
// cannot be determined, e.g. because we're running under a debugger.
Hide bool

consoleWidth int
consoleHeight int
charts []chartData
cleared bool
}

// AddChart adds a new chart with the given caption to the chart printer. It
// returns the ID of the chart to be used when calling AddSample.
// NOTE: When a new chart is added, all existing charts have any existing
// samples cleared.
func (cp *chartPrinter) AddChart(caption string) int {
if len(cp.charts) == 0 {
// Get size of console window. File descriptor 0 is stdin.
var err error
cp.consoleWidth, cp.consoleHeight, err = term.GetSize(0)
if err != nil {
// Terminal does not support size, e.g. because we're running under a
// debugger. In this case, hide the charts.
cp.Hide = true
cp.consoleWidth = 80
cp.consoleHeight = 40
}
cp.consoleHeight -= cp.Footer
}

// Add new chart, but preserve captions for existing charts.
numCharts := len(cp.charts) + 1
newCharts := make([]chartData, numCharts)
copy(newCharts, cp.charts)
newCharts[numCharts-1].caption = caption
cp.charts = newCharts

// Re-initialize all previous charts, taking into account new chart. Divide
// the charts into a square grid.
grid := math.Ceil(math.Sqrt(float64(numCharts)))
xInc := float64(cp.consoleWidth) / grid
yInc := float64(cp.consoleHeight) / grid

x := 0.0
y := 0.0
for i := range cp.charts {
nextX := x + xInc
chartWidth := int(nextX) - int(x) - 8
nextY := y + yInc
chartHeight := int(nextY) - int(y) - 2

cp.charts[i].x = int(x)
cp.charts[i].y = int(y)
cp.charts[i].width = chartWidth
cp.charts[i].height = chartHeight
cp.charts[i].series = make([]float64, chartWidth*2)
cp.charts[i].start = 0
cp.charts[i].end = 0

x = nextX
if int(x+1) >= cp.consoleWidth {
x = 0
y = nextY
}
}

return numCharts - 1
}

// AddSample adds a new sample for the given chart. The chart ID should have
// been obtained via a call to AddChart.
func (cp *chartPrinter) AddSample(chartID int, sample float64) {
chart := &cp.charts[chartID]

if chart.end-chart.start < chart.width {
chart.series[chart.end] = sample
chart.end++
} else {
if chart.start >= chart.width {
chart.start = 0
chart.end = chart.width
}
chart.series[chart.start] = sample
chart.start++
chart.series[chart.end] = sample
chart.end++
}
}

// Plot prints the most recent samples for the charts on the console.
func (cp *chartPrinter) Plot() {
if cp.Hide {
return
}

// Clear the console once. This "scrolls up" history so that it's not
// overwritten. After that, clear the console by writing spaces to it, since
// we don't want to keep chart printing history.
if !cp.cleared {
asciigraph.Clear()
cp.cleared = true
} else {
for y := range cp.consoleHeight {
blanks := strings.Repeat(" ", cp.consoleWidth)
cp.printAt(0, y, blanks)
}
}

// Print charts.
for i := range cp.charts {
chart := &cp.charts[i]

plotStr := asciigraph.Plot(chart.series[chart.start:chart.end],
asciigraph.Width(min(chart.end-chart.start, chart.width)),
asciigraph.Height(chart.height))

// Print each line at specific location.
lines := strings.Split(plotStr, "\n")
if len(lines) < chart.height {
// Vertically pad the chart.
plotStr = strings.Repeat("\n", chart.height-len(lines)) + plotStr
lines = strings.Split(plotStr, "\n")
}
for i, line := range lines {
cp.printAt(chart.x, chart.y+i, line)
}

// Print caption.
x := chart.x + (chart.width-len(chart.caption))/2
y := chart.y + len(lines)
cp.printAt(x, y, "%s (%0.2f)", White+chart.caption+Reset, chart.series[chart.end-1])
}

// Move cursor to footer.
lastChart := &cp.charts[len(cp.charts)-1]
cp.printAt(0, lastChart.y+lastChart.height+3, "")
}

func (cp *chartPrinter) printAt(x, y int, format string, args ...any) {
// Console positions are 1-based.
fmt.Printf("\033[%d;%dH", y+1, x+1)
fmt.Printf(format, args...)
}
79 changes: 61 additions & 18 deletions pkg/cmd/vecbench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/vector"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
)
Expand Down Expand Up @@ -61,6 +61,8 @@ const (
ShowCursor = "\033[?25h"
)

var flagHideCharts = flag.Bool("hide-charts", false, "Hide time series charts during index build.")

// Search command options.
var flagMaxResults = flag.Int("k", 10, "Number of search results, used in recall calculation.")
var flagBeamSize = flag.Int(
Expand Down Expand Up @@ -388,9 +390,6 @@ func buildIndex(
data.Train.SplitAt(n)
}

startTime := timeutil.Now()
fmt.Printf(White+"Building index for dataset: %s\n"+Reset, datasetName)

// Create index.
store := vecstore.NewInMemoryStore(data.Train.Dims, seed)
index := createIndex(ctx, stopper, store)
Expand All @@ -401,6 +400,9 @@ func buildIndex(
binary.BigEndian.PutUint32(primaryKeys[i*4:], uint32(i))
}

// Compute percentile latencies.
estimator := NewPercentileEstimator(1000)

// Insert block of vectors within the scope of a transaction.
var insertCount atomic.Uint64
insertBlock := func(start, end int) {
Expand All @@ -411,41 +413,82 @@ func buildIndex(
key := primaryKeys[i*4 : i*4+4]
vec := data.Train.At(i)
store.InsertVector(key, vec)
startMono := crtime.NowMono()
if err := index.Insert(ctx, txn, vec, key); err != nil {
panic(err)
}
estimator.Add(startMono.Elapsed().Seconds())
}

inserted := insertCount.Add(uint64(end - start))
elapsed := timeutil.Since(startTime)
fmt.Printf(White+"\rInserted %d / %d vectors (%.2f%%) in %v "+Reset,
inserted, data.Train.Count,
(float64(inserted)/float64(data.Train.Count))*100, elapsed.Truncate(time.Second))
insertCount.Add(uint64(end - start))
}

// Set up time series charts.
cp := chartPrinter{Footer: 2, Hide: *flagHideCharts}
throughput := cp.AddChart("ops/sec")
fixups := cp.AddChart("fixup queue size")
throttled := cp.AddChart("pacer ops/sec")
p50 := cp.AddChart("p50 ms latency")
p90 := cp.AddChart("p90 ms latency")
p99 := cp.AddChart("p99 ms latency")

fmt.Printf(White+"Building index for dataset: %s\n"+Reset, datasetName)
startAt := crtime.NowMono()

// Insert vectors into the store on multiple goroutines.
var wait sync.WaitGroup
procs := runtime.GOMAXPROCS(-1)
countPerProc := (data.Train.Count + procs) / procs
blockSize := index.Options().MinPartitionSize
for i := 0; i < data.Train.Count; i += countPerProc {
end := min(i+countPerProc, data.Train.Count)
wait.Add(1)
go func(start, end int) {
// Break vector group into individual transactions that each insert a
// block of vectors. Run any pending fixups after each block.
for j := start; j < end; j += blockSize {
insertBlock(j, min(j+blockSize, end))
index.ProcessFixups()
}

wait.Done()
}(i, end)
}
wait.Wait()

elapsed := timeutil.Since(startTime)
fmt.Printf(White+"\nBuilt index in %v\n"+Reset, roundDuration(elapsed))
// Compute ops per second.
var lastInserted int

// Update progress every second.
lastProgressAt := startAt
for {
time.Sleep(time.Second)

// Calculate exactly how long it's been since last progress update.
now := crtime.NowMono()
sinceProgress := now.Sub(lastProgressAt)
lastProgressAt = now

// Calculate ops per second over the last second.
totalInserted := int(insertCount.Load())
opsPerSec := float64(totalInserted-lastInserted) / sinceProgress.Seconds()
lastInserted = totalInserted

cp.AddSample(throughput, opsPerSec)
cp.AddSample(fixups, float64(index.Fixups().QueueSize()))
cp.AddSample(throttled, index.Fixups().AllowedOpsPerSec())
cp.AddSample(p50, estimator.Estimate(0.50)*1000)
cp.AddSample(p90, estimator.Estimate(0.90)*1000)
cp.AddSample(p99, estimator.Estimate(0.99)*1000)
cp.Plot()

sinceStart := now.Sub(startAt)
fmt.Printf(White+"\rInserted %d / %d vectors (%.2f%%) in %v"+Reset,
totalInserted, data.Train.Count, (float64(totalInserted)/float64(data.Train.Count))*100,
sinceStart.Truncate(time.Second))

if totalInserted >= data.Train.Count {
break
}
}

// Wait for any remaining background fixups to be processed.
index.ProcessFixups()

fmt.Printf(White+"\nBuilt index in %v\n"+Reset, roundDuration(startAt.Elapsed()))
return store, index
}

Expand Down
Loading

0 comments on commit ad49a4a

Please sign in to comment.