diff --git a/pkg/quickwit/client/client.go b/pkg/quickwit/client/client.go index 39221f0..f24bce7 100644 --- a/pkg/quickwit/client/client.go +++ b/pkg/quickwit/client/client.go @@ -26,9 +26,10 @@ type DatasourceInfo struct { } type ConfiguredFields struct { - TimeField string - LogMessageField string - LogLevelField string + TimeField string + TimeOutputFormat string + LogMessageField string + LogLevelField string } // Client represents a client which can interact with elasticsearch api diff --git a/pkg/quickwit/error_handling_test.go b/pkg/quickwit/error_handling_test.go index ecfed5e..7ce7c21 100644 --- a/pkg/quickwit/error_handling_test.go +++ b/pkg/quickwit/error_handling_test.go @@ -39,9 +39,10 @@ func TestErrorAvgMissingField(t *testing.T) { `) configuredFields := es.ConfiguredFields{ - TimeField: "testtime", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: Rfc3339, + TimeField: "testtime", + LogMessageField: "line", + LogLevelField: "lvl", } result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields) @@ -71,9 +72,10 @@ func TestErrorAvgMissingFieldNoDetailedErrors(t *testing.T) { `) configuredFields := es.ConfiguredFields{ - TimeField: "testtime", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: Rfc3339, + TimeField: "testtime", + LogMessageField: "line", + LogLevelField: "lvl", } result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields) @@ -117,9 +119,10 @@ func TestErrorTooManyDateHistogramBuckets(t *testing.T) { `) configuredFields := es.ConfiguredFields{ - TimeField: "testtime", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: Rfc3339, + TimeField: "testtime", + LogMessageField: "line", + LogLevelField: "lvl", } result, err := queryDataTestWithResponseCode(query, 200, response, configuredFields) require.NoError(t, err) @@ -154,9 +157,10 @@ func TestNonElasticError(t *testing.T) { response := []byte(`Access to the database is forbidden`) configuredFields := es.ConfiguredFields{ - TimeField: "testtime", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: Rfc3339, + TimeField: "testtime", + LogMessageField: "line", + LogLevelField: "lvl", } result, err := queryDataTestWithResponseCode(query, 403, response, configuredFields) diff --git a/pkg/quickwit/models.go b/pkg/quickwit/models.go index be99ea4..c88e8db 100644 --- a/pkg/quickwit/models.go +++ b/pkg/quickwit/models.go @@ -122,3 +122,13 @@ func describeMetric(metricType, field string) string { } return text + " " + field } + +const ( + Iso8601 string = "iso8601" + Rfc2822 string = "rfc2822" + Rfc3339 string = "rfc3339" + TimestampSecs string = "unix_timestamp_secs" + TimestampMillis string = "unix_timestamp_millis" + TimestampMicros string = "unix_timestamp_micros" + TimestampNanos string = "unix_timestamp_nanos" +) diff --git a/pkg/quickwit/querydata_test.go b/pkg/quickwit/querydata_test.go index 3661c8d..7f30da4 100644 --- a/pkg/quickwit/querydata_test.go +++ b/pkg/quickwit/querydata_test.go @@ -144,9 +144,10 @@ func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int, func queryDataTest(queriesBytes []byte, responseBytes []byte) (queryDataTestResult, error) { configuredFields := es.ConfiguredFields{ - TimeField: "testtime", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: Rfc3339, + TimeField: "testtime", + LogMessageField: "line", + LogLevelField: "lvl", } return queryDataTestWithResponseCode(queriesBytes, 200, responseBytes, configuredFields) } diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index 8ab25a1..4650230 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -58,6 +58,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc } timeField, toOk := jsonData["timeField"].(string) + timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string) logLevelField, ok := jsonData["logLevelField"].(string) if !ok { @@ -91,17 +92,18 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc maxConcurrentShardRequests = 256 } - if !toOk { - timeField, err = GetTimestampField(index, settings.URL, httpCli) + if !toOk || !tofOk { + timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli) if nil != err { return nil, err } } configuredFields := es.ConfiguredFields{ - TimeField: timeField, - LogLevelField: logLevelField, - LogMessageField: logMessageField, + TimeField: timeField, + TimeOutputFormat: timeOutputFormat, + LogLevelField: logLevelField, + LogMessageField: logMessageField, } model := es.DatasourceInfo{ diff --git a/pkg/quickwit/response_parser.go b/pkg/quickwit/response_parser.go index 16acecd..124ec90 100644 --- a/pkg/quickwit/response_parser.go +++ b/pkg/quickwit/response_parser.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "golang.org/x/exp/slices" es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/simplejson" @@ -246,7 +247,7 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str if propName == configuredFields.TimeField { timeVector := make([]*time.Time, size) for i, doc := range docs { - timeValue, err := ParseToTime(doc["sort"].([]any)[0]) + timeValue, err := ParseToTime(doc[configuredFields.TimeField], configuredFields.TimeOutputFormat) if err != nil { continue } @@ -293,13 +294,39 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str // Parses a value into Time given a timeOutputFormat. The conversion // only works with float64 as this is what we get when parsing a response. // TODO: understand why we get a float64? -func ParseToTime(value interface{}) (time.Time, error) { - typed_value, ok := value.(float64) - if !ok { - return time.Time{}, errors.New("parse time only accepts float64 with timestamp based format") +func ParseToTime(value interface{}, timeOutputFormat string) (time.Time, error) { + + if timeOutputFormat == Iso8601 || timeOutputFormat == Rfc3339 { + value_string := value.(string) + timeValue, err := time.Parse(time.RFC3339, value_string) + if err != nil { + return time.Time{}, err + } + return timeValue, nil + } else if timeOutputFormat == Rfc2822 { + value_string := value.(string) + timeValue, err := time.Parse(time.RFC822Z, value_string) + if err != nil { + return time.Time{}, err + } + return timeValue, nil + } else if slices.Contains([]string{TimestampSecs, TimestampMillis, TimestampMicros, TimestampNanos}, timeOutputFormat) { + typed_value, ok := value.(float64) + if !ok { + return time.Time{}, errors.New("parse time only accepts float64 with timestamp based format") + } + int64_value := int64(typed_value) + if timeOutputFormat == TimestampSecs { + return time.Unix(int64_value, 0), nil + } else if timeOutputFormat == TimestampMillis { + return time.Unix(0, int64_value*1_000_000), nil + } else if timeOutputFormat == TimestampMicros { + return time.Unix(0, int64_value*1_000), nil + } else if timeOutputFormat == TimestampNanos { + return time.Unix(0, int64_value), nil + } } - int64_value := int64(typed_value) - return time.Unix(0, int64_value), nil + return time.Time{}, fmt.Errorf("timeOutputFormat not supported yet %s", timeOutputFormat) } func processBuckets(aggs map[string]interface{}, target *Query, diff --git a/pkg/quickwit/response_parser_qw_test.go b/pkg/quickwit/response_parser_qw_test.go index 1b2e374..b75aa86 100644 --- a/pkg/quickwit/response_parser_qw_test.go +++ b/pkg/quickwit/response_parser_qw_test.go @@ -59,9 +59,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) { `) configuredFields := es.ConfiguredFields{ - TimeField: "testtime", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: TimestampNanos, + TimeField: "testtime", + LogMessageField: "line", + LogLevelField: "lvl", } result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields) frames := result.response.Responses["A"].Frames @@ -125,9 +126,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) { `) configuredFields := es.ConfiguredFields{ - TimeField: "testtime", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: TimestampMicros, + TimeField: "testtime", + LogMessageField: "line", + LogLevelField: "lvl", } result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields) frames := result.response.Responses["A"].Frames @@ -191,9 +193,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) { `) configuredFields := es.ConfiguredFields{ - TimeField: "testtime", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: TimestampMillis, + TimeField: "testtime", + LogMessageField: "line", + LogLevelField: "lvl", } result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields) frames := result.response.Responses["A"].Frames @@ -257,9 +260,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) { `) configuredFields := es.ConfiguredFields{ - TimeField: "testtime", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: TimestampSecs, + TimeField: "testtime", + LogMessageField: "line", + LogLevelField: "lvl", } result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields) frames := result.response.Responses["A"].Frames @@ -278,7 +282,7 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) { func TestConvertToTime(t *testing.T) { t.Run("Test parse unix timestamps nanosecs of float type", func(t *testing.T) { inputValue := interface{}(1234567890000000000.0) - value, _ := ParseToTime(inputValue) + value, _ := ParseToTime(inputValue, "unix_timestamp_nanos") require.Equal(t, time.Unix(1234567890, 0), value) }) } diff --git a/pkg/quickwit/response_parser_test.go b/pkg/quickwit/response_parser_test.go index 44e068f..bbdf1ea 100644 --- a/pkg/quickwit/response_parser_test.go +++ b/pkg/quickwit/response_parser_test.go @@ -3228,9 +3228,10 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) configuredFields := es.ConfiguredFields{ - TimeField: "@timestamp", - LogMessageField: "line", - LogLevelField: "lvl", + TimeOutputFormat: Rfc3339, + TimeField: "@timestamp", + LogMessageField: "line", + LogLevelField: "lvl", } timeRange := backend.TimeRange{ From: from, diff --git a/pkg/quickwit/timestamp_infos.go b/pkg/quickwit/timestamp_infos.go index 45a0313..e3b3de8 100644 --- a/pkg/quickwit/timestamp_infos.go +++ b/pkg/quickwit/timestamp_infos.go @@ -12,7 +12,8 @@ import ( type QuickwitIndexMetadata struct { IndexConfig struct { DocMapping struct { - TimestampField string `json:"timestamp_field"` + TimestampField string `json:"timestamp_field"` + FieldMappings []FieldMappings `json:"field_mappings"` } `json:"doc_mapping"` } `json:"index_config"` } @@ -37,21 +38,21 @@ func NewErrorCreationPayload(statusCode int, message string) error { // TODO: refactor either by using a timestamp alias suppprted by quickwit // or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint // /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1 -func GetTimestampField(index string, qwickwitUrl string, cli *http.Client) (string, error) { +func GetTimestampFieldInfos(index string, qwickwitUrl string, cli *http.Client) (string, string, error) { if strings.Contains(index, "*") || strings.Contains(index, ",") { return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli) } return GetTimestampFieldFromIndex(index, qwickwitUrl, cli) } -func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, error) { +func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, string, error) { mappingEndpointUrl := qwickwitUrl + "/indexes/" + index qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl) r, err := cli.Get(mappingEndpointUrl) if err != nil { errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) qwlog.Error(errMsg) - return "", err + return "", "", err } statusCode := r.StatusCode @@ -59,7 +60,7 @@ func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Clie if statusCode < 200 || statusCode >= 400 { errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) qwlog.Error(errMsg) - return "", NewErrorCreationPayload(statusCode, errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) } defer r.Body.Close() @@ -67,20 +68,20 @@ func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Clie if err != nil { errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) qwlog.Error(errMsg) - return "", NewErrorCreationPayload(statusCode, errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) } return DecodeTimestampFieldFromIndexConfig(body) } -func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, error) { +func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, string, error) { mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_patterns=" + indexPattern qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl) r, err := cli.Get(mappingEndpointUrl) if err != nil { errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) qwlog.Error(errMsg) - return "", err + return "", "", err } statusCode := r.StatusCode @@ -88,7 +89,7 @@ func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, if statusCode < 200 || statusCode >= 400 { errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) qwlog.Error(errMsg) - return "", NewErrorCreationPayload(statusCode, errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) } defer r.Body.Close() @@ -96,48 +97,78 @@ func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, if err != nil { errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) qwlog.Error(errMsg) - return "", NewErrorCreationPayload(statusCode, errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) } return DecodeTimestampFieldFromIndexConfigs(body) } -func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, error) { +func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, string, error) { var payload []QuickwitIndexMetadata err := json.Unmarshal(body, &payload) if err != nil { errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) qwlog.Error(errMsg) - return "", NewErrorCreationPayload(500, errMsg) + return "", "", NewErrorCreationPayload(500, errMsg) } + var refTimestampFieldName string = "" + var refTimestampOutputFormat string = "" var timestampFieldName string = "" + var timestampOutputFormat string = "" + for _, indexMetadata := range payload { - if timestampFieldName == "" { - timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField + timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField + timestampOutputFormat, _ = FindTimeStampFormat(timestampFieldName, nil, indexMetadata.IndexConfig.DocMapping.FieldMappings) + + if refTimestampFieldName == "" { + refTimestampFieldName = timestampFieldName + refTimestampOutputFormat = timestampOutputFormat continue } - if timestampFieldName != indexMetadata.IndexConfig.DocMapping.TimestampField { - errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s and %s", timestampFieldName, indexMetadata.IndexConfig.DocMapping.TimestampField) + if timestampFieldName != refTimestampFieldName || timestampOutputFormat != refTimestampOutputFormat { + errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s (%s) and %s (%s)", refTimestampFieldName, refTimestampOutputFormat, timestampFieldName, timestampOutputFormat) qwlog.Error(errMsg) - return "", NewErrorCreationPayload(400, errMsg) + return "", "", NewErrorCreationPayload(400, errMsg) } } - qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) - return timestampFieldName, nil + qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s, timestamptOutputFormat = %s", timestampFieldName, timestampOutputFormat)) + return timestampFieldName, timestampOutputFormat, nil } -func DecodeTimestampFieldFromIndexConfig(body []byte) (string, error) { +func DecodeTimestampFieldFromIndexConfig(body []byte) (string, string, error) { var payload QuickwitIndexMetadata err := json.Unmarshal(body, &payload) if err != nil { errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) qwlog.Error(errMsg) - return "", NewErrorCreationPayload(500, errMsg) + return "", "", NewErrorCreationPayload(500, errMsg) } timestampFieldName := payload.IndexConfig.DocMapping.TimestampField + timestampFieldFormat, _ := FindTimeStampFormat(timestampFieldName, nil, payload.IndexConfig.DocMapping.FieldMappings) qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) - return timestampFieldName, nil + return timestampFieldName, timestampFieldFormat, nil +} + +func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) (string, bool) { + if nil == fieldMappings { + return "", false + } + + for _, field := range fieldMappings { + fieldName := field.Name + if nil != parentName { + fieldName = fmt.Sprintf("%s.%s", *parentName, fieldName) + } + + if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat { + return *field.OutputFormat, true + } else if field.Type == "object" && nil != field.FieldMappings { + return FindTimeStampFormat(timestampFieldName, &field.Name, field.FieldMappings) + } + } + + return "", false } diff --git a/pkg/quickwit/timestamp_infos_test.go b/pkg/quickwit/timestamp_infos_test.go index 632d966..379b95b 100644 --- a/pkg/quickwit/timestamp_infos_test.go +++ b/pkg/quickwit/timestamp_infos_test.go @@ -7,31 +7,57 @@ import ( ) func TestDecodeTimestampFieldInfos(t *testing.T) { - t.Run("Test decode timestam field infos", func(t *testing.T) { + t.Run("Test decode timestamp field infos", func(t *testing.T) { t.Run("Test decode simple fields", func(t *testing.T) { // Given query := []byte(` { - "version": "0.6", - "index_config": { + "version": "0.6", + "index_config": { "version": "0.6", "doc_mapping": { - "timestamp_field": "timestamp", - "mode": "dynamic", - "tokenizers": [] + "timestamp_field": "timestamp", + "mode": "dynamic", + "tokenizers": [], + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "timestamp", + "type": "datetime", + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "output_format": "rfc3339", + "stored": true + } + ] }, "retention": null - }, - "sources": [] + }, + "sources": [] } `) // When - timestampFieldName, err := DecodeTimestampFieldFromIndexConfig(query) + timestampFieldName, timestampOutputFormat, err := DecodeTimestampFieldFromIndexConfig(query) // Then require.NoError(t, err) require.Equal(t, timestampFieldName, "timestamp") + require.Equal(t, timestampOutputFormat, "rfc3339") }) t.Run("Test decode from list of index config", func(t *testing.T) { @@ -39,21 +65,21 @@ func TestDecodeTimestampFieldInfos(t *testing.T) { query := []byte(` [ { - "version": "0.6", - "index_config": { + "version": "0.6", + "index_config": { "doc_mapping": { - "timestamp_field": "sub.timestamp" + "timestamp_field": "sub.timestamp" }, "indexing_settings": {}, "retention": null - }, - "sources": [] + }, + "sources": [] } ] `) // When - timestampFieldName, err := DecodeTimestampFieldFromIndexConfigs(query) + timestampFieldName, _, err := DecodeTimestampFieldFromIndexConfigs(query) // Then require.NoError(t, err) @@ -65,24 +91,24 @@ func TestDecodeTimestampFieldInfos(t *testing.T) { query := []byte(` [ { - "version": "0.6", - "index_config": { + "version": "0.6", + "index_config": { "doc_mapping": { - "timestamp_field": "sub.timestamp" + "timestamp_field": "sub.timestamp" }, "indexing_settings": {}, "retention": null - }, - "sources": [] + }, + "sources": [] }, { "version": "0.6", "index_config": { - "doc_mapping": { + "doc_mapping": { "timestamp_field": "sub.timestamp2" - }, - "indexing_settings": {}, - "retention": null + }, + "indexing_settings": {}, + "retention": null }, "sources": [] } @@ -90,7 +116,7 @@ func TestDecodeTimestampFieldInfos(t *testing.T) { `) // When - _, err := DecodeTimestampFieldFromIndexConfigs(query) + _, _, err := DecodeTimestampFieldFromIndexConfigs(query) // Then require.Error(t, err)