Skip to content

Commit

Permalink
Revert to using timestamp output format
Browse files Browse the repository at this point in the history
  • Loading branch information
ddelemeny committed Mar 18, 2024
1 parent 957a6f8 commit 8a3c88b
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 93 deletions.
7 changes: 4 additions & 3 deletions pkg/quickwit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ type DatasourceInfo struct {
}

type ConfiguredFields struct {
TimeField string
LogMessageField string
LogLevelField string
TimeField string
TimeOutputFormat string
LogMessageField string
LogLevelField string
}

// Client represents a client which can interact with elasticsearch api
Expand Down
28 changes: 16 additions & 12 deletions pkg/quickwit/error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ func TestErrorAvgMissingField(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}

result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields)
Expand Down Expand Up @@ -71,9 +72,10 @@ func TestErrorAvgMissingFieldNoDetailedErrors(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}

result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields)
Expand Down Expand Up @@ -117,9 +119,10 @@ func TestErrorTooManyDateHistogramBuckets(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, err := queryDataTestWithResponseCode(query, 200, response, configuredFields)
require.NoError(t, err)
Expand Down Expand Up @@ -154,9 +157,10 @@ func TestNonElasticError(t *testing.T) {
response := []byte(`Access to the database is forbidden`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}

result, err := queryDataTestWithResponseCode(query, 403, response, configuredFields)
Expand Down
10 changes: 10 additions & 0 deletions pkg/quickwit/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,13 @@ func describeMetric(metricType, field string) string {
}
return text + " " + field
}

const (
Iso8601 string = "iso8601"
Rfc2822 string = "rfc2822"
Rfc3339 string = "rfc3339"
TimestampSecs string = "unix_timestamp_secs"
TimestampMillis string = "unix_timestamp_millis"
TimestampMicros string = "unix_timestamp_micros"
TimestampNanos string = "unix_timestamp_nanos"
)
7 changes: 4 additions & 3 deletions pkg/quickwit/querydata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,10 @@ func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int,

func queryDataTest(queriesBytes []byte, responseBytes []byte) (queryDataTestResult, error) {
configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
return queryDataTestWithResponseCode(queriesBytes, 200, responseBytes, configuredFields)
}
12 changes: 7 additions & 5 deletions pkg/quickwit/quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
}

timeField, toOk := jsonData["timeField"].(string)
timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string)

logLevelField, ok := jsonData["logLevelField"].(string)
if !ok {
Expand Down Expand Up @@ -91,17 +92,18 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
maxConcurrentShardRequests = 256
}

if !toOk {
timeField, err = GetTimestampField(index, settings.URL, httpCli)
if !toOk || !tofOk {
timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli)
if nil != err {
return nil, err
}
}

configuredFields := es.ConfiguredFields{
TimeField: timeField,
LogLevelField: logLevelField,
LogMessageField: logMessageField,
TimeField: timeField,
TimeOutputFormat: timeOutputFormat,
LogLevelField: logLevelField,
LogMessageField: logMessageField,
}

model := es.DatasourceInfo{
Expand Down
41 changes: 34 additions & 7 deletions pkg/quickwit/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"golang.org/x/exp/slices"

es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client"
"github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/simplejson"
Expand Down Expand Up @@ -246,7 +247,7 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str
if propName == configuredFields.TimeField {
timeVector := make([]*time.Time, size)
for i, doc := range docs {
timeValue, err := ParseToTime(doc["sort"].([]any)[0])
timeValue, err := ParseToTime(doc[configuredFields.TimeField], configuredFields.TimeOutputFormat)
if err != nil {
continue
}
Expand Down Expand Up @@ -293,13 +294,39 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str
// Parses a value into Time given a timeOutputFormat. The conversion
// only works with float64 as this is what we get when parsing a response.
// TODO: understand why we get a float64?
func ParseToTime(value interface{}) (time.Time, error) {
typed_value, ok := value.(float64)
if !ok {
return time.Time{}, errors.New("parse time only accepts float64 with timestamp based format")
func ParseToTime(value interface{}, timeOutputFormat string) (time.Time, error) {

if timeOutputFormat == Iso8601 || timeOutputFormat == Rfc3339 {
value_string := value.(string)
timeValue, err := time.Parse(time.RFC3339, value_string)
if err != nil {
return time.Time{}, err
}
return timeValue, nil
} else if timeOutputFormat == Rfc2822 {
value_string := value.(string)
timeValue, err := time.Parse(time.RFC822Z, value_string)
if err != nil {
return time.Time{}, err
}
return timeValue, nil
} else if slices.Contains([]string{TimestampSecs, TimestampMillis, TimestampMicros, TimestampNanos}, timeOutputFormat) {
typed_value, ok := value.(float64)
if !ok {
return time.Time{}, errors.New("parse time only accepts float64 with timestamp based format")
}
int64_value := int64(typed_value)
if timeOutputFormat == TimestampSecs {
return time.Unix(int64_value, 0), nil
} else if timeOutputFormat == TimestampMillis {
return time.Unix(0, int64_value*1_000_000), nil
} else if timeOutputFormat == TimestampMicros {
return time.Unix(0, int64_value*1_000), nil
} else if timeOutputFormat == TimestampNanos {
return time.Unix(0, int64_value), nil
}
}
int64_value := int64(typed_value)
return time.Unix(0, int64_value), nil
return time.Time{}, fmt.Errorf("timeOutputFormat not supported yet %s", timeOutputFormat)
}

func processBuckets(aggs map[string]interface{}, target *Query,
Expand Down
30 changes: 17 additions & 13 deletions pkg/quickwit/response_parser_qw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: TimestampNanos,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
frames := result.response.Responses["A"].Frames
Expand Down Expand Up @@ -125,9 +126,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: TimestampMicros,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
frames := result.response.Responses["A"].Frames
Expand Down Expand Up @@ -191,9 +193,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: TimestampMillis,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
frames := result.response.Responses["A"].Frames
Expand Down Expand Up @@ -257,9 +260,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: TimestampSecs,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
frames := result.response.Responses["A"].Frames
Expand All @@ -278,7 +282,7 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
func TestConvertToTime(t *testing.T) {
t.Run("Test parse unix timestamps nanosecs of float type", func(t *testing.T) {
inputValue := interface{}(1234567890000000000.0)
value, _ := ParseToTime(inputValue)
value, _ := ParseToTime(inputValue, "unix_timestamp_nanos")
require.Equal(t, time.Unix(1234567890, 0), value)
})
}
7 changes: 4 additions & 3 deletions pkg/quickwit/response_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3228,9 +3228,10 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
configuredFields := es.ConfiguredFields{
TimeField: "@timestamp",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "@timestamp",
LogMessageField: "line",
LogLevelField: "lvl",
}
timeRange := backend.TimeRange{
From: from,
Expand Down
Loading

0 comments on commit 8a3c88b

Please sign in to comment.