Skip to content

Commit 96c2eed

Browse files
committed
Merge remote-tracking branch 'upstream/master' into 2-prometheus-label-filters
2 parents f5b99fe + 267c26e commit 96c2eed

File tree

23 files changed

+890
-157
lines changed

23 files changed

+890
-157
lines changed

cmd/e2e-test/carbon-clickhouse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type CarbonClickhouse struct {
2828

2929
func (c *CarbonClickhouse) Start(testDir, clickhouseURL string) (string, error) {
3030
if len(c.Version) == 0 {
31-
c.Version = "0.11.4"
31+
c.Version = "latest"
3232
}
3333
if len(c.DockerImage) == 0 {
3434
c.DockerImage = "ghcr.io/go-graphite/carbon-clickhouse"

cmd/e2e-test/checks.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strings"
1212
"time"
1313

14+
"github.com/go-graphite/protocol/carbonapi_v3_pb"
1415
"github.com/lomik/graphite-clickhouse/helper/client"
1516
"github.com/lomik/graphite-clickhouse/helper/datetime"
1617
"github.com/lomik/graphite-clickhouse/helper/tests/compare"
@@ -271,6 +272,24 @@ func compareRender(errors *[]string, name, url string, actual, expected []client
271272
}
272273
}
273274

275+
func parseFilteringFunctions(strFilteringFuncs []string) ([]*carbonapi_v3_pb.FilteringFunction, error) {
276+
res := make([]*carbonapi_v3_pb.FilteringFunction, 0, len(strFilteringFuncs))
277+
for _, strFF := range strFilteringFuncs {
278+
strFFSplit := strings.Split(strFF, "(")
279+
if len(strFFSplit) != 2 {
280+
return nil, fmt.Errorf("could not parse filtering function: %s", strFF)
281+
}
282+
name := strFFSplit[0]
283+
args := strings.Split(strFFSplit[1], ",")
284+
for i := range args {
285+
args[i] = strings.TrimSpace(args[i])
286+
args[i] = strings.Trim(args[i], ")'")
287+
}
288+
res = append(res, &carbonapi_v3_pb.FilteringFunction{Name: name, Arguments: args})
289+
}
290+
return res, nil
291+
}
292+
274293
func verifyRender(ch *Clickhouse, gch *GraphiteClickhouse, check *RenderCheck, defaultPreision time.Duration) []string {
275294
var errors []string
276295
httpClient := http.Client{
@@ -280,7 +299,18 @@ func verifyRender(ch *Clickhouse, gch *GraphiteClickhouse, check *RenderCheck, d
280299
from := datetime.TimestampTruncate(check.from, defaultPreision)
281300
until := datetime.TimestampTruncate(check.until, defaultPreision)
282301
for _, format := range check.Formats {
283-
if url, result, respHeader, err := client.Render(&httpClient, address, format, check.Targets, from, until); err == nil {
302+
303+
var filteringFunctions []*carbonapi_v3_pb.FilteringFunction
304+
if format == client.FormatPb_v3 {
305+
var err error
306+
filteringFunctions, err = parseFilteringFunctions(check.FilteringFunctions)
307+
if err != nil {
308+
errors = append(errors, err.Error())
309+
continue
310+
}
311+
}
312+
313+
if url, result, respHeader, err := client.Render(&httpClient, address, format, check.Targets, filteringFunctions, check.MaxDataPoints, from, until); err == nil {
284314
id := requestId(respHeader)
285315
name := ""
286316
if check.ErrorRegexp != "" {
@@ -303,7 +333,7 @@ func verifyRender(ch *Clickhouse, gch *GraphiteClickhouse, check *RenderCheck, d
303333
if check.CacheTTL > 0 && check.ErrorRegexp == "" {
304334
// second query must be find-cached
305335
name = "cache"
306-
if url, result, respHeader, err = client.Render(&httpClient, address, format, check.Targets, from, until); err == nil {
336+
if url, result, respHeader, err = client.Render(&httpClient, address, format, check.Targets, filteringFunctions, check.MaxDataPoints, from, until); err == nil {
307337
compareRender(&errors, name, url, result, check.result, true, respHeader, check.CacheTTL)
308338
} else {
309339
errStr := strings.TrimRight(err.Error(), "\n")

cmd/e2e-test/e2etesting.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,15 @@ type Metric struct {
5959
}
6060

6161
type RenderCheck struct {
62-
Name string `toml:"name"`
63-
Formats []client.FormatType `toml:"formats"`
64-
From string `toml:"from"`
65-
Until string `toml:"until"`
66-
Targets []string `toml:"targets"`
67-
Timeout time.Duration `toml:"timeout"`
68-
DumpIfEmpty []string `toml:"dump_if_empty"`
62+
Name string `toml:"name"`
63+
Formats []client.FormatType `toml:"formats"`
64+
From string `toml:"from"`
65+
Until string `toml:"until"`
66+
Targets []string `toml:"targets"`
67+
MaxDataPoints int64 `toml:"max_data_points"`
68+
FilteringFunctions []string `toml:"filtering_functions"`
69+
Timeout time.Duration `toml:"timeout"`
70+
DumpIfEmpty []string `toml:"dump_if_empty"`
6971

7072
Optimize []string `toml:"optimize"` // optimize tables before run tests
7173

@@ -338,6 +340,7 @@ func verifyGraphiteClickhouse(test *TestSchema, gch *GraphiteClickhouse, clickho
338340
zap.String("clickhouse config", clickhouseDir),
339341
zap.String("graphite-clickhouse config", gch.ConfigTpl),
340342
zap.Strings("targets", check.Targets),
343+
zap.Strings("filtering_functions", check.FilteringFunctions),
341344
zap.String("from_raw", check.From),
342345
zap.String("until_raw", check.Until),
343346
zap.Int64("from", check.from),
@@ -361,6 +364,7 @@ func verifyGraphiteClickhouse(test *TestSchema, gch *GraphiteClickhouse, clickho
361364
zap.String("clickhouse config", clickhouseDir),
362365
zap.String("graphite-clickhouse config", gch.ConfigTpl),
363366
zap.Strings("targets", check.Targets),
367+
zap.Strings("filtering_functions", check.FilteringFunctions),
364368
zap.String("from_raw", check.From),
365369
zap.String("until_raw", check.Until),
366370
zap.Int64("from", check.from),
@@ -377,6 +381,7 @@ func verifyGraphiteClickhouse(test *TestSchema, gch *GraphiteClickhouse, clickho
377381
zap.String("clickhouse config", clickhouseDir),
378382
zap.String("graphite-clickhouse config", gch.ConfigTpl),
379383
zap.Strings("targets", check.Targets),
384+
zap.Strings("filtering_functions", check.FilteringFunctions),
380385
zap.String("from_raw", check.From),
381386
zap.String("until_raw", check.Until),
382387
zap.Int64("from", check.from),

cmd/graphite-clickhouse-client/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import (
55
"fmt"
66
"net/http"
77
"os"
8+
"strconv"
89
"strings"
910
"time"
1011

12+
"github.com/go-graphite/protocol/carbonapi_v3_pb"
1113
"github.com/lomik/graphite-clickhouse/helper/client"
1214
"github.com/lomik/graphite-clickhouse/helper/datetime"
1315
)
@@ -31,6 +33,7 @@ func main() {
3133
address := flag.String("address", "http://127.0.0.1:9090", "Address of graphite-clickhouse server")
3234
fromStr := flag.String("from", "0", "from")
3335
untilStr := flag.String("until", "", "until")
36+
maxDataPointsStr := flag.String("maxDataPoints", "1048576", "Maximum amount of datapoints in response")
3437

3538
metricsFind := flag.String("find", "", "Query for /metrics/find/ , valid formats are carbonapi_v3_pb. protobuf, pickle")
3639

@@ -71,6 +74,11 @@ func main() {
7174
fmt.Printf("invalid until: %s\n", *untilStr)
7275
os.Exit(1)
7376
}
77+
maxDataPoints, err := strconv.ParseInt(*maxDataPointsStr, 10, 64)
78+
if err != nil {
79+
fmt.Printf("invalid maxDataPoints: %s\n", *maxDataPointsStr)
80+
os.Exit(1)
81+
}
7482

7583
httpClient := http.Client{
7684
Timeout: *timeout,
@@ -182,7 +190,7 @@ func main() {
182190
if formatRender == client.FormatDefault {
183191
formatRender = client.FormatPb_v3
184192
}
185-
queryRaw, r, respHeader, err := client.Render(&httpClient, *address, formatRender, targets, int64(from), int64(until))
193+
queryRaw, r, respHeader, err := client.Render(&httpClient, *address, formatRender, targets, []*carbonapi_v3_pb.FilteringFunction{}, maxDataPoints, int64(from), int64(until))
186194
if respHeader != nil {
187195
fmt.Printf("Responce header: %+v\n", respHeader)
188196
}

finder/tagged.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,10 @@ func ParseTaggedConditions(conditions []string, config *config.Config, autocompl
291291
case "=":
292292
terms[i].Op = TaggedTermEq
293293
terms[i].HasWildcard = where.HasWildcard(terms[i].Value)
294-
if !terms[i].HasWildcard {
294+
// special case when using useCarbonBehaviour = true
295+
// which matches everything that does not have that tag
296+
emptyValue := config.FeatureFlags.UseCarbonBehavior && terms[i].Value == ""
297+
if !terms[i].HasWildcard && !emptyValue {
295298
nonWildcards++
296299
}
297300
case "!=":

helper/clickhouse/clickhouse.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ func extractClickhouseError(e string) (int, string) {
7272
return http.StatusServiceUnavailable, "Storage configuration error"
7373
}
7474
}
75-
return http.StatusInternalServerError, "Storage error"
75+
if strings.HasPrefix(e, "clickhouse response status 404: Code: 60. DB::Exception: Table default.") {
76+
return http.StatusServiceUnavailable, "Storage default tables damaged"
77+
}
78+
return http.StatusServiceUnavailable, "Storage unavailable"
7679
}
7780

7881
func HandleError(w http.ResponseWriter, err error) (status int, queueFail bool) {

helper/clickhouse/clickhouse_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,15 @@ func Test_extractClickhouseError(t *testing.T) {
3434
wantStatus: http.StatusForbidden,
3535
wantMessage: "Storage read limit for memory",
3636
},
37+
{
38+
errStr: "clickhouse response status 404: Code: 60. DB::Exception: Table default.graphite_index does not exist. (UNKNOWN_TABLE) (version 23.12.6.19 (official build))\n",
39+
wantStatus: http.StatusServiceUnavailable,
40+
wantMessage: "Storage default tables damaged",
41+
},
3742
{
3843
errStr: "Other error",
39-
wantStatus: http.StatusInternalServerError,
40-
wantMessage: "Storage error",
44+
wantStatus: http.StatusServiceUnavailable,
45+
wantMessage: "Storage unavailable",
4146
},
4247
}
4348
for _, tt := range tests {

helper/client/render.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type Metric struct {
3737

3838
// Render do /metrics/find/ request
3939
// Valid formats are carbonapi_v3_pb. protobuf, pickle, json
40-
func Render(client *http.Client, address string, format FormatType, targets []string, from, until int64) (string, []Metric, http.Header, error) {
40+
func Render(client *http.Client, address string, format FormatType, targets []string, filteringFunctions []*protov3.FilteringFunction, maxDataPoints, from, until int64) (string, []Metric, http.Header, error) {
4141
rUrl := "/render/"
4242
if format == FormatDefault {
4343
format = FormatPb_v3
@@ -56,6 +56,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st
5656
}
5757
fromStr := strconv.FormatInt(from, 10)
5858
untilStr := strconv.FormatInt(until, 10)
59+
maxDataPointsStr := strconv.FormatInt(maxDataPoints, 10)
5960

6061
u, err := url.Parse(address + rUrl)
6162
if err != nil {
@@ -77,10 +78,12 @@ func Render(client *http.Client, address string, format FormatType, targets []st
7778
}
7879
for i, target := range targets {
7980
r.Metrics[i] = protov3.FetchRequest{
80-
Name: target,
81-
StartTime: from,
82-
StopTime: until,
83-
PathExpression: target,
81+
Name: target,
82+
StartTime: from,
83+
StopTime: until,
84+
PathExpression: target,
85+
FilterFunctions: filteringFunctions,
86+
MaxDataPoints: maxDataPoints,
8487
}
8588
}
8689

@@ -93,10 +96,11 @@ func Render(client *http.Client, address string, format FormatType, targets []st
9396
}
9497
case FormatPb_v2, FormatProtobuf, FormatPickle, FormatJSON:
9598
v := url.Values{
96-
"format": []string{format.String()},
97-
"from": []string{fromStr},
98-
"until": []string{untilStr},
99-
"target": targets,
99+
"format": []string{format.String()},
100+
"from": []string{fromStr},
101+
"until": []string{untilStr},
102+
"target": targets,
103+
"maxDataPoints": []string{maxDataPointsStr},
100104
}
101105
u.RawQuery = v.Encode()
102106
default:

render/data/data.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,14 @@ func (d *Data) GetAggregation(id uint32) (string, error) {
5454
if err != nil {
5555
return function, err
5656
}
57-
if function == "any" || function == "anyLast" {
57+
switch function {
58+
case "any":
59+
return "first", nil
60+
case "anyLast":
5861
return "last", nil
62+
default:
63+
return function, nil
5964
}
60-
return function, nil
6165
}
6266

6367
// data wraps Data and adds asynchronous processing of data

render/data/query.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/tls"
66
"errors"
77
"fmt"
8+
"net/http"
89
"os"
910
"strings"
1011
"sync"
@@ -15,6 +16,7 @@ import (
1516

1617
"github.com/lomik/graphite-clickhouse/config"
1718
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
19+
"github.com/lomik/graphite-clickhouse/helper/errs"
1820
"github.com/lomik/graphite-clickhouse/helper/rollup"
1921
"github.com/lomik/graphite-clickhouse/metrics"
2022
"github.com/lomik/graphite-clickhouse/pkg/dry"
@@ -145,7 +147,11 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error {
145147
// carbonlink request
146148
carbonlinkResponseRead := queryCarbonlink(ctx, carbonlink, cond.metricsUnreverse)
147149

148-
cond.prepareLookup()
150+
err = cond.prepareLookup()
151+
if err != nil {
152+
logger.Error("prepare_lookup", zap.Error(err))
153+
return errs.NewErrorWithCode(err.Error(), http.StatusBadRequest)
154+
}
149155
cond.setStep(q.cStep)
150156
if cond.step < 1 {
151157
return ErrSetStepTimeout
@@ -279,7 +285,7 @@ func (c *conditions) prepareMetricsLists() {
279285
}
280286
}
281287

282-
func (c *conditions) prepareLookup() {
288+
func (c *conditions) prepareLookup() error {
283289
age := uint32(dry.Max(0, time.Now().Unix()-c.From))
284290
c.aggregations = make(map[string][]string)
285291
c.appliedFunctions = make(map[string][]string)
@@ -295,7 +301,11 @@ func (c *conditions) prepareLookup() {
295301
// Currently it just finds the first target matching the metric
296302
// to avoid making multiple request for every type of aggregation for a given metric.
297303
for _, alias := range c.AM.Get(c.metricsUnreverse[i]) {
298-
if requestedAgg := c.GetRequestedAggregation(alias.Target); requestedAgg != "" {
304+
requestedAgg, err := c.GetRequestedAggregation(alias.Target)
305+
if err != nil {
306+
return fmt.Errorf("failed to choose appropriate aggregation for '%s': %s", alias.Target, err.Error())
307+
}
308+
if requestedAgg != "" {
299309
agg = rollup.AggrMap[requestedAgg]
300310
c.appliedFunctions[alias.Target] = []string{graphiteConsolidationFunction}
301311
break
@@ -330,6 +340,7 @@ func (c *conditions) prepareLookup() {
330340
mm.WriteString(c.metricsRequested[i] + "\n")
331341
}
332342
}
343+
return nil
333344
}
334345

335346
var ErrSetStepTimeout = errors.New("unexpected error, setStep timeout")

0 commit comments

Comments
 (0)