Skip to content

Commit 15e4f0c

Browse files
committed
pkg/cli (tsdump): add interval to metrics before uploading
This commit adds a new `interval` field to the `DatadogSeries` struct to represent the interval in seconds. The interval is encoded in the keys used to store metrics in the TSDB. It can either be 10s or 1800s (30min). Recent metrics use a 10s interval, while older metrics (typically 10 days or older) that have been rolled up use a 30min interval. This commit also adds a new end-to-end test for the tsdump-datadog upload flow. This test runs the tsdump command as a seperate process like a user would and mocks the Datadog API server to verify the upload functionality and data transformation logic. Part of: CC-32614 Release note: None
1 parent 4f26eda commit 15e4f0c

File tree

5 files changed

+226
-6
lines changed

5 files changed

+226
-6
lines changed

pkg/cli/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ go_test(
369369
"statement_bundle_test.go",
370370
"statement_diag_test.go",
371371
"tsdump_test.go",
372+
"tsdump_upload_test.go",
372373
"userfiletable_test.go",
373374
"workload_test.go",
374375
"zip_helpers_test.go",
@@ -447,6 +448,7 @@ go_test(
447448
"//pkg/testutils/skip",
448449
"//pkg/testutils/sqlutils",
449450
"//pkg/testutils/testcluster",
451+
"//pkg/ts",
450452
"//pkg/ts/tspb",
451453
"//pkg/util",
452454
"//pkg/util/envutil",

pkg/cli/testdata/tsdump_upload_e2e

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
2+
upload-datadog
3+
cr.node.admission.admitted.elastic-cpu,2025-05-26T08:32:00Z,1,1
4+
cr.node.sql.query.count,2021-01-01T00:00:00Z,1,100.5
5+
cr.node.sql.query.count,2021-01-01T00:00:10Z,1,102.3
6+
cr.store.rocksdb.block.cache.usage,2021-01-01T00:00:00Z,2,75.2
7+
----
8+
{"series":[{"metric":"crdb.tsdump.admission.admitted.elastic-cpu","type":0,"points":[{"timestamp":1748248320,"value":1}],"interval":10,"resources":null,"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"]},{"metric":"crdb.tsdump.sql.query.count","type":0,"points":[{"timestamp":1609459200,"value":100.5}],"interval":10,"resources":null,"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"]},{"metric":"crdb.tsdump.sql.query.count","type":0,"points":[{"timestamp":1609459210,"value":102.3}],"interval":10,"resources":null,"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"]},{"metric":"crdb.tsdump.rocksdb.block.cache.usage","type":0,"points":[{"timestamp":1609459200,"value":75.2}],"interval":10,"resources":null,"tags":["store:2","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"]}]}

pkg/cli/tsdump_upload.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type DatadogSeries struct {
8686
Metric string `json:"metric"`
8787
Type int `json:"type"`
8888
Points []DatadogPoint `json:"points"`
89+
Interval int `json:"interval,omitempty"`
8990
Resources []struct {
9091
Name string `json:"name"`
9192
Type string `json:"type"`
@@ -171,7 +172,7 @@ var getCurrentTime = func() time.Time {
171172
return timeutil.Now()
172173
}
173174

174-
func doDDRequest(req *http.Request) error {
175+
var doDDRequest = func(req *http.Request) error {
175176
resp, err := http.DefaultClient.Do(req)
176177
if err != nil {
177178
return err
@@ -201,7 +202,7 @@ func appendTag(series *DatadogSeries, tagKey, tagValue string) {
201202
}
202203

203204
func (d *datadogWriter) dump(kv *roachpb.KeyValue) (*DatadogSeries, error) {
204-
name, source, _, _, err := ts.DecodeDataKey(kv.Key)
205+
name, source, res, _, err := ts.DecodeDataKey(kv.Key)
205206
if err != nil {
206207
return nil, err
207208
}
@@ -211,10 +212,11 @@ func (d *datadogWriter) dump(kv *roachpb.KeyValue) (*DatadogSeries, error) {
211212
}
212213

213214
series := &DatadogSeries{
214-
Metric: name,
215-
Tags: []string{},
216-
Type: d.resolveMetricType(name),
217-
Points: make([]DatadogPoint, idata.SampleCount()),
215+
Metric: name,
216+
Tags: []string{},
217+
Type: d.resolveMetricType(name),
218+
Points: make([]DatadogPoint, idata.SampleCount()),
219+
Interval: int(res.Duration().Seconds()), // covert from time.Duration to number of seconds.
218220
}
219221

220222
sl := reCrStoreNode.FindStringSubmatch(name)

pkg/cli/tsdump_upload_test.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package cli
7+
8+
import (
9+
"compress/gzip"
10+
"encoding/csv"
11+
"encoding/gob"
12+
"fmt"
13+
"io"
14+
"net/http"
15+
"os"
16+
"strconv"
17+
"strings"
18+
"testing"
19+
"time"
20+
21+
"github.com/cockroachdb/cockroach/pkg/roachpb"
22+
"github.com/cockroachdb/cockroach/pkg/testutils"
23+
"github.com/cockroachdb/cockroach/pkg/ts"
24+
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
25+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
26+
"github.com/cockroachdb/cockroach/pkg/util/log"
27+
"github.com/cockroachdb/datadriven"
28+
"github.com/stretchr/testify/require"
29+
)
30+
31+
func TestTSDumpUploadE2E(t *testing.T) {
32+
defer leaktest.AfterTest(t)()
33+
defer log.Scope(t).Close(t)
34+
defer testutils.TestingHook(&getCurrentTime, func() time.Time {
35+
return time.Date(2024, 11, 14, 0, 0, 0, 0, time.UTC)
36+
})()
37+
38+
datadriven.RunTest(t, "testdata/tsdump_upload_e2e", func(t *testing.T, d *datadriven.TestData) string {
39+
var buf strings.Builder
40+
defer mockDoDDRequest(t, &buf)()
41+
42+
c := NewCLITest(TestCLIParams{})
43+
defer c.Cleanup()
44+
45+
switch d.Cmd {
46+
case "upload-datadog":
47+
dumpFilePath := generateMockTSDumpFromCSV(t, d.Input)
48+
49+
var clusterLabel, apiKey string
50+
if d.HasArg("cluster-label") {
51+
d.ScanArgs(t, "cluster-label", &clusterLabel)
52+
} else {
53+
clusterLabel = "test-cluster"
54+
}
55+
if d.HasArg("api-key") {
56+
d.ScanArgs(t, "api-key", &apiKey)
57+
} else {
58+
apiKey = "dd-api-key"
59+
}
60+
61+
// Run the command
62+
stdout, err := c.RunWithCapture(fmt.Sprintf(
63+
`debug tsdump --format=datadog --dd-api-key="%s" --cluster-label="%s" %s`,
64+
apiKey, clusterLabel, dumpFilePath,
65+
))
66+
t.Log(stdout)
67+
require.NoError(t, err)
68+
return strings.TrimSpace(buf.String())
69+
70+
default:
71+
t.Fatalf("unknown command: %s", d.Cmd)
72+
return ""
73+
}
74+
})
75+
}
76+
77+
func mockDoDDRequest(t *testing.T, w io.Writer) func() {
78+
t.Helper()
79+
80+
return testutils.TestingHook(&doDDRequest, func(req *http.Request) error {
81+
defer req.Body.Close()
82+
83+
reader, err := gzip.NewReader(req.Body)
84+
require.NoError(t, err)
85+
86+
raw, err := io.ReadAll(reader)
87+
require.NoError(t, err)
88+
89+
fmt.Fprintln(w, string(raw))
90+
return nil
91+
})
92+
}
93+
94+
// generateMockTSDumpFromCSV creates a mock tsdump file from CSV input string.
95+
// CSV format: metric_name,timestamp,source,value
96+
// Example: cr.node.admission.admitted.elastic-cpu,2025-05-26T08:32:00Z,1,1
97+
// NOTE: this is the same format generated by the `cockroach tsdump` command
98+
// when --format=csv is used.
99+
func generateMockTSDumpFromCSV(t *testing.T, csvInput string) string {
100+
t.Helper()
101+
102+
// Parse CSV data from input string
103+
reader := csv.NewReader(strings.NewReader(csvInput))
104+
csvData, err := reader.ReadAll()
105+
require.NoError(t, err)
106+
require.Greater(t, len(csvData), 0, "CSV input must have at least one data row")
107+
108+
// Create temporary file
109+
tmpFile, err := os.CreateTemp("", "mock_tsdump_*.gob")
110+
require.NoError(t, err)
111+
defer tmpFile.Close()
112+
113+
// Create gob encoder
114+
encoder := gob.NewEncoder(tmpFile)
115+
116+
// Process each row (no header expected)
117+
for i, row := range csvData {
118+
require.Len(t, row, 4, "CSV row %d must have 4 columns: metric_name,timestamp,source,value", i+1)
119+
120+
metricName := row[0]
121+
timestampStr := row[1]
122+
source := row[2]
123+
valueStr := row[3]
124+
125+
// Parse timestamp (RFC3339 format)
126+
timestamp, err := time.Parse(time.RFC3339, timestampStr)
127+
require.NoError(t, err, "invalid timestamp format in row %d: %s (expected RFC3339)", i+1, timestampStr)
128+
timestampNanos := timestamp.UnixNano()
129+
130+
// Parse value
131+
value, err := strconv.ParseFloat(valueStr, 64)
132+
require.NoError(t, err, "invalid value in row %d: %s", i+1, valueStr)
133+
134+
// Create KeyValue entry for this data point
135+
kv, err := createMockTimeSeriesKV(metricName, source, timestampNanos, value)
136+
require.NoError(t, err)
137+
138+
// Encode to gob format
139+
err = encoder.Encode(kv)
140+
require.NoError(t, err)
141+
}
142+
143+
t.Cleanup(func() {
144+
require.NoError(t, os.Remove(tmpFile.Name()), "failed to remove temporary file")
145+
})
146+
return tmpFile.Name()
147+
}
148+
149+
// createMockTimeSeriesKV creates a roachpb.KeyValue entry containing time series data
150+
func createMockTimeSeriesKV(
151+
name, source string, timestamp int64, value float64,
152+
) (roachpb.KeyValue, error) {
153+
// Create TimeSeriesData
154+
tsData := tspb.TimeSeriesData{
155+
Name: name,
156+
Source: source,
157+
Datapoints: []tspb.TimeSeriesDatapoint{
158+
{TimestampNanos: timestamp, Value: value},
159+
},
160+
}
161+
162+
// Convert to internal format using 10s resolution
163+
resolution := ts.Resolution10s
164+
idatas, err := tsData.ToInternal(
165+
resolution.SlabDuration(), // 1 hour (3600 * 10^9 ns)
166+
resolution.SampleDuration(), // 10 seconds (10 * 10^9 ns)
167+
true, // columnar format
168+
)
169+
if err != nil {
170+
return roachpb.KeyValue{}, err
171+
}
172+
173+
// Should only be one internal data entry for a single datapoint
174+
if len(idatas) != 1 {
175+
return roachpb.KeyValue{}, fmt.Errorf("expected 1 internal data entry, got %d", len(idatas))
176+
}
177+
178+
idata := idatas[0]
179+
180+
// Create the key
181+
key := ts.MakeDataKey(name, source, resolution, idata.StartTimestampNanos)
182+
183+
// Create the value (protobuf-encoded internal data)
184+
var roachValue roachpb.Value
185+
if err := roachValue.SetProto(&idata); err != nil {
186+
return roachpb.KeyValue{}, err
187+
}
188+
189+
return roachpb.KeyValue{Key: key, Value: roachValue}, nil
190+
}

pkg/ts/resolution.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,24 @@ func (r Resolution) SlabDuration() int64 {
9797
return duration
9898
}
9999

100+
// Duration returns the time.Duration corresponding to the Resolution
101+
func (r Resolution) Duration() time.Duration {
102+
switch r {
103+
case Resolution10s:
104+
return time.Second * 10
105+
case Resolution30m:
106+
return time.Minute * 30
107+
case resolution1ns:
108+
return time.Nanosecond
109+
case resolution50ns:
110+
return time.Nanosecond * 50
111+
default:
112+
// it will never reach here because this is an enum. The user is never
113+
// expected to construct a Resolution value directly. But just in case.
114+
return time.Duration(r)
115+
}
116+
}
117+
100118
// IsRollup returns true if this resolution contains rollup data: statistical
101119
// values about a large number of samples taken over a long period, such as
102120
// the min, max and sum.

0 commit comments

Comments
 (0)