Skip to content

Commit

Permalink
Merge pull request #128 from quickwit-oss/ddelemeny/refactor-timefiel…
Browse files Browse the repository at this point in the history
…d-init

Use goroutine/chan to init, avoid hard failure
  • Loading branch information
ddelemeny authored May 9, 2024
2 parents ad87ff5 + 6ebe4d9 commit b97978b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 43 deletions.
8 changes: 7 additions & 1 deletion pkg/quickwit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
)

type ReadyStatus struct {
IsReady bool
Err error
}

type DatasourceInfo struct {
ID int64
HTTPClient *http.Client
URL string
Database string
ConfiguredFields ConfiguredFields
MaxConcurrentShardRequests int64
IsReady bool
ReadyStatus chan ReadyStatus
ShouldInit bool
}

type ConfiguredFields struct {
Expand Down
109 changes: 67 additions & 42 deletions pkg/quickwit/quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,46 +104,62 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
Database: index,
MaxConcurrentShardRequests: int64(maxConcurrentShardRequests),
ConfiguredFields: configuredFields,
IsReady: false,
}
return &QuickwitDatasource{dsInfo: model}, nil
}

// Network dependent datasource initialization.
// This is not done in the "constructor" function to allow saving the ds
// even if the server is not responsive.
func (ds *QuickwitDatasource) initDatasource(force bool) error {
if ds.dsInfo.IsReady && !force {
return nil
}

indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient)
if err != nil {
return fmt.Errorf("failed to get index metadata : %w", err)
}

if len(indexMetadataList) == 0 {
return fmt.Errorf("no index found for %s", ds.dsInfo.Database)
}

timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList)
if nil != err {
return err
ReadyStatus: make(chan es.ReadyStatus, 1),
ShouldInit: true,
}

ds.dsInfo.ConfiguredFields.TimeField = timeField
ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat
ds := &QuickwitDatasource{dsInfo: model}

ds.dsInfo.IsReady = true
return nil
// Create an initialization goroutine
go func(ds *QuickwitDatasource, readyStatus chan<- es.ReadyStatus) {
var status es.ReadyStatus = es.ReadyStatus{
IsReady: false,
Err: nil,
}
for {
// Will retry init everytime the channel is consumed until ready
if !status.IsReady || ds.dsInfo.ShouldInit {
qwlog.Debug("Initializing Datasource")
status.IsReady = true
status.Err = nil

indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient)
if err != nil {
status.IsReady = false
status.Err = fmt.Errorf("failed to get index metadata : %w", err)
} else if len(indexMetadataList) == 0 {
status.IsReady = false
status.Err = fmt.Errorf("no index found for %s", ds.dsInfo.Database)
} else {
timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList)
if nil != err {
status.IsReady = false
status.Err = err
} else if "" == timeField {
status.IsReady = false
status.Err = fmt.Errorf("timefield is empty for %s", ds.dsInfo.Database)
} else if "" == timeOutputFormat {
status.Err = fmt.Errorf("timefield's output_format is empty, logs timestamps will not be parsed correctly for %s", ds.dsInfo.Database)
}

ds.dsInfo.ConfiguredFields.TimeField = timeField
ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat
ds.dsInfo.ShouldInit = false
}
}
readyStatus <- status
}
}(ds, model.ReadyStatus)
return ds, nil
}

// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
// created. As soon as datasource settings change detected by SDK old datasource instance will
// be disposed and a new one will be created using NewSampleDatasource factory function.
func (ds *QuickwitDatasource) Dispose() {
// Clean up datasource instance resources.
// TODO
// FIXME: The ReadyStatus channel should probably be closed here, but doing it
// causes odd calls to healthcheck to fail. Needs investigation
// close(ds.dsInfo.ReadyStatus)
}

// CheckHealth handles health checks sent from Grafana to the plugin.
Expand All @@ -152,28 +168,37 @@ func (ds *QuickwitDatasource) Dispose() {
// a datasource is working as expected.
func (ds *QuickwitDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
res := &backend.CheckHealthResult{}
res.Status = backend.HealthStatusOk
res.Message = "plugin is running"

if err := ds.initDatasource(true); err != nil {
res.Status = backend.HealthStatusError
res.Message = fmt.Errorf("Failed to initialize datasource: %w", err).Error()
return res, nil
}
ds.dsInfo.ShouldInit = true
status := <-ds.dsInfo.ReadyStatus

if ds.dsInfo.ConfiguredFields.TimeField == "" || ds.dsInfo.ConfiguredFields.TimeOutputFormat == "" {
if nil != status.Err {
res.Status = backend.HealthStatusError
res.Message = fmt.Errorf("Failed to initialize datasource: %w", status.Err).Error()
} else if "" == ds.dsInfo.ConfiguredFields.TimeField {
res.Status = backend.HealthStatusError
res.Message = fmt.Sprintf("timefield is missing from index config \"%s\"", ds.dsInfo.Database)
return res, nil
} else if "" == ds.dsInfo.ConfiguredFields.TimeOutputFormat {
res.Status = backend.HealthStatusError
res.Message = fmt.Sprintf("timefield's output_format is missing from index config \"%s\"", ds.dsInfo.Database)
}
qwlog.Debug(res.Message)

res.Status = backend.HealthStatusOk
res.Message = "plugin is running"
return res, nil
}

func (ds *QuickwitDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
// Ensure ds is initialized, we need timestamp infos
if err := ds.initDatasource(false); err != nil {
return &backend.QueryDataResponse{}, fmt.Errorf("Failed to initialize datasource")
status := <-ds.dsInfo.ReadyStatus
if !status.IsReady {
qwlog.Debug(fmt.Errorf("Datasource initialization failed: %w", status.Err).Error())
response := &backend.QueryDataResponse{
Responses: backend.Responses{},
}
response.Responses["__qwQueryDataError"] = backend.ErrDataResponse(backend.StatusInternal, "Datasource initialization failed")
return response, nil
}

return queryData(ctx, req.Queries, &ds.dsInfo)
Expand Down

0 comments on commit b97978b

Please sign in to comment.