Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ go_test(
"statement_bundle_test.go",
"statement_diag_test.go",
"tsdump_test.go",
"tsdump_upload_test.go",
"userfiletable_test.go",
"workload_test.go",
"zip_helpers_test.go",
Expand Down Expand Up @@ -447,6 +448,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/ts",
"//pkg/ts/tspb",
"//pkg/util",
"//pkg/util/envutil",
Expand Down
8 changes: 8 additions & 0 deletions pkg/cli/testdata/tsdump_upload_e2e
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

upload-datadog
cr.node.admission.admitted.elastic-cpu,2025-05-26T08:32:00Z,1,1
cr.node.sql.query.count,2021-01-01T00:00:00Z,1,100.5
cr.node.sql.query.count,2021-01-01T00:00:10Z,1,102.3
cr.store.rocksdb.block.cache.usage,2021-01-01T00:00:00Z,2,75.2
----
{"series":[{"metric":"crdb.tsdump.admission.admitted.elastic-cpu","type":0,"points":[{"timestamp":1748248320,"value":1}],"interval":10,"resources":null,"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"]},{"metric":"crdb.tsdump.sql.query.count","type":0,"points":[{"timestamp":1609459200,"value":100.5}],"interval":10,"resources":null,"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"]},{"metric":"crdb.tsdump.sql.query.count","type":0,"points":[{"timestamp":1609459210,"value":102.3}],"interval":10,"resources":null,"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"]},{"metric":"crdb.tsdump.rocksdb.block.cache.usage","type":0,"points":[{"timestamp":1609459200,"value":75.2}],"interval":10,"resources":null,"tags":["store:2","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"]}]}
14 changes: 8 additions & 6 deletions pkg/cli/tsdump_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type DatadogSeries struct {
Metric string `json:"metric"`
Type int `json:"type"`
Points []DatadogPoint `json:"points"`
Interval int `json:"interval,omitempty"`
Resources []struct {
Name string `json:"name"`
Type string `json:"type"`
Expand Down Expand Up @@ -171,7 +172,7 @@ var getCurrentTime = func() time.Time {
return timeutil.Now()
}

func doDDRequest(req *http.Request) error {
var doDDRequest = func(req *http.Request) error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
Expand Down Expand Up @@ -201,7 +202,7 @@ func appendTag(series *DatadogSeries, tagKey, tagValue string) {
}

func (d *datadogWriter) dump(kv *roachpb.KeyValue) (*DatadogSeries, error) {
name, source, _, _, err := ts.DecodeDataKey(kv.Key)
name, source, res, _, err := ts.DecodeDataKey(kv.Key)
if err != nil {
return nil, err
}
Expand All @@ -211,10 +212,11 @@ func (d *datadogWriter) dump(kv *roachpb.KeyValue) (*DatadogSeries, error) {
}

series := &DatadogSeries{
Metric: name,
Tags: []string{},
Type: d.resolveMetricType(name),
Points: make([]DatadogPoint, idata.SampleCount()),
Metric: name,
Tags: []string{},
Type: d.resolveMetricType(name),
Points: make([]DatadogPoint, idata.SampleCount()),
Interval: int(res.Duration().Seconds()), // convert from time.Duration to number of seconds.
}

sl := reCrStoreNode.FindStringSubmatch(name)
Expand Down
198 changes: 198 additions & 0 deletions pkg/cli/tsdump_upload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package cli

import (
"compress/gzip"
"encoding/csv"
"encoding/gob"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

// TestTSDumpUploadE2E tests the end-to-end functionality of uploading a time
// series dump to Datadog from a user perspective. This runs the tsdump command
// externally. The datadog API is mocked to capture the request and verify the
// uploaded data.
func TestTSDumpUploadE2E(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer testutils.TestingHook(&getCurrentTime, func() time.Time {
return time.Date(2024, 11, 14, 0, 0, 0, 0, time.UTC)
})()

datadriven.RunTest(t, "testdata/tsdump_upload_e2e", func(t *testing.T, d *datadriven.TestData) string {
var buf strings.Builder
defer mockDoDDRequest(t, &buf)()

c := NewCLITest(TestCLIParams{})
defer c.Cleanup()

switch d.Cmd {
case "upload-datadog":
debugTimeSeriesDumpOpts.clusterLabel = "test-cluster"
debugTimeSeriesDumpOpts.clusterID = "test-cluster-id"
debugTimeSeriesDumpOpts.zendeskTicket = "zd-test"
debugTimeSeriesDumpOpts.organizationName = "test-org"
debugTimeSeriesDumpOpts.userName = "test-user"
dumpFilePath := generateMockTSDumpFromCSV(t, d.Input)

var clusterLabel, apiKey string
if d.HasArg("cluster-label") {
d.ScanArgs(t, "cluster-label", &clusterLabel)
} else {
clusterLabel = "test-cluster"
}
if d.HasArg("api-key") {
d.ScanArgs(t, "api-key", &apiKey)
} else {
apiKey = "dd-api-key"
}

// Run the command
_, err := c.RunWithCapture(fmt.Sprintf(
`debug tsdump --format=datadog --dd-api-key="%s" --cluster-label="%s" %s`,
apiKey, clusterLabel, dumpFilePath,
))
require.NoError(t, err)
return strings.TrimSpace(buf.String())

default:
t.Fatalf("unknown command: %s", d.Cmd)
return ""
}
})
}

func mockDoDDRequest(t *testing.T, w io.Writer) func() {
t.Helper()

return testutils.TestingHook(&doDDRequest, func(req *http.Request) error {
defer req.Body.Close()

reader, err := gzip.NewReader(req.Body)
require.NoError(t, err)

raw, err := io.ReadAll(reader)
require.NoError(t, err)

fmt.Fprintln(w, string(raw))
return nil
})
}

// generateMockTSDumpFromCSV creates a mock tsdump file from CSV input string.
// CSV format: metric_name,timestamp,source,value
// Example: cr.node.admission.admitted.elastic-cpu,2025-05-26T08:32:00Z,1,1
// NOTE: this is the same format generated by the `cockroach tsdump` command
// when --format=csv is used.
func generateMockTSDumpFromCSV(t *testing.T, csvInput string) string {
t.Helper()

// Parse CSV data from input string
reader := csv.NewReader(strings.NewReader(csvInput))
csvData, err := reader.ReadAll()
require.NoError(t, err)
require.Greater(t, len(csvData), 0, "CSV input must have at least one data row")

// Create temporary file
tmpFile, err := os.CreateTemp("", "mock_tsdump_*.gob")
require.NoError(t, err)
defer tmpFile.Close()

// Create gob encoder
encoder := gob.NewEncoder(tmpFile)

// Process each row (no header expected)
for i, row := range csvData {
require.Len(t, row, 4, "CSV row %d must have 4 columns: metric_name,timestamp,source,value", i+1)

metricName := row[0]
timestampStr := row[1]
source := row[2]
valueStr := row[3]

// Parse timestamp (RFC3339 format)
timestamp, err := time.Parse(time.RFC3339, timestampStr)
require.NoError(t, err, "invalid timestamp format in row %d: %s (expected RFC3339)", i+1, timestampStr)
timestampNanos := timestamp.UnixNano()

// Parse value
value, err := strconv.ParseFloat(valueStr, 64)
require.NoError(t, err, "invalid value in row %d: %s", i+1, valueStr)

// Create KeyValue entry for this data point
kv, err := createMockTimeSeriesKV(metricName, source, timestampNanos, value)
require.NoError(t, err)

// Encode to gob format
err = encoder.Encode(kv)
require.NoError(t, err)
}

t.Cleanup(func() {
require.NoError(t, os.Remove(tmpFile.Name()), "failed to remove temporary file")
})
return tmpFile.Name()
}

// createMockTimeSeriesKV creates a roachpb.KeyValue entry containing time series data
func createMockTimeSeriesKV(
name, source string, timestamp int64, value float64,
) (roachpb.KeyValue, error) {
// Create TimeSeriesData
tsData := tspb.TimeSeriesData{
Name: name,
Source: source,
Datapoints: []tspb.TimeSeriesDatapoint{
{TimestampNanos: timestamp, Value: value},
},
}

// Convert to internal format using 10s resolution
resolution := ts.Resolution10s
idatas, err := tsData.ToInternal(
resolution.SlabDuration(), // 1 hour (3600 * 10^9 ns)
resolution.SampleDuration(), // 10 seconds (10 * 10^9 ns)
true, // columnar format
)
if err != nil {
return roachpb.KeyValue{}, err
}

// Should only be one internal data entry for a single datapoint
if len(idatas) != 1 {
return roachpb.KeyValue{}, fmt.Errorf("expected 1 internal data entry, got %d", len(idatas))
}

idata := idatas[0]

// Create the key
key := ts.MakeDataKey(name, source, resolution, idata.StartTimestampNanos)

// Create the value (protobuf-encoded internal data)
var roachValue roachpb.Value
if err := roachValue.SetProto(&idata); err != nil {
return roachpb.KeyValue{}, err
}

return roachpb.KeyValue{Key: key, Value: roachValue}, nil
}
18 changes: 18 additions & 0 deletions pkg/ts/resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ func (r Resolution) SlabDuration() int64 {
return duration
}

// Duration returns the time.Duration corresponding to the Resolution
func (r Resolution) Duration() time.Duration {
switch r {
case Resolution10s:
return time.Second * 10
case Resolution30m:
return time.Minute * 30
case resolution1ns:
return time.Nanosecond
case resolution50ns:
return time.Nanosecond * 50
default:
// it will never reach here because this is an enum. The user is never
// expected to construct a Resolution value directly. But just in case.
return time.Duration(r)
}
}

// IsRollup returns true if this resolution contains rollup data: statistical
// values about a large number of samples taken over a long period, such as
// the min, max and sum.
Expand Down
Loading