Skip to content

Commit

Permalink
refactor: reference hashed queries in channel path
Browse files Browse the repository at this point in the history
  • Loading branch information
amlmtl committed Nov 23, 2023
1 parent d8933fb commit 483b02c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 142 deletions.
89 changes: 19 additions & 70 deletions internal/plugin/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
package query

import (
"errors"
"bytes"
"crypto/sha256"
"fmt"
"grafana-esp-plugin/internal/plugin/server"
"net/url"
"regexp"
"strconv"
"strings"
)

const CHANNEL_PATH_REGEX_PATTERN string = `^[A-z0-9_\-/=.]*$`

type Query struct {
ServerUrl url.URL
ProjectName string
Expand All @@ -41,72 +39,23 @@ func New(s server.Server, projectName string, cqName string, windowName string,
}
}

func FromChannelPath(channelPath string) (*Query, error) {
splitChannelPath := strings.Split(channelPath, "/")
if len(splitChannelPath) < 7 {
return nil, errors.New("invalid stream channel path length received")
}
channelType := splitChannelPath[0]
if channelType != "stream" {
return nil, errors.New("invalid stream channel path type received")
}
scheme := splitChannelPath[1]
host := splitChannelPath[2]
portString := splitChannelPath[3]
projectName := splitChannelPath[4]
cqName := splitChannelPath[5]
windowName := splitChannelPath[6]
intervalString := splitChannelPath[7]
maxEventsString := splitChannelPath[8]
fields := splitChannelPath[9:]

port, err := strconv.ParseUint(portString, 10, 16)
if err != nil {
return nil, err
}
isTls := scheme == "wss"
server, err := server.New(isTls, host, uint16(port))
if err != nil {
return nil, err
}
serverUrl := server.GetUrl()
interval, err := strconv.ParseUint(intervalString, 10, 64)
if err != nil {
return nil, err
}
maxEvents, err := strconv.ParseUint(maxEventsString, 10, 64)
if err != nil {
return nil, err
}

return &Query{
ServerUrl: serverUrl,
ProjectName: projectName,
CqName: cqName,
WindowName: windowName,
EventInterval: interval,
MaxEvents: maxEvents,
Fields: fields,
}, nil
func (q *Query) ToChannelPath() string {
hashString := q.calcHashString()
channelPath := fmt.Sprintf("stream/%s", hashString)
return channelPath
}

func (q Query) ToChannelPath() (*string, error) {
channelPath := fmt.Sprintf("stream/%s/%s/%s/%s/%s/%s/%d/%d/%s",
url.PathEscape(q.ServerUrl.Scheme),
url.PathEscape(q.ServerUrl.Hostname()),
q.ServerUrl.Port(),
url.PathEscape(q.ProjectName),
url.PathEscape(q.CqName),
url.PathEscape(q.WindowName),
q.EventInterval,
q.MaxEvents,
strings.Join(q.Fields, "/"),
)

// The Channel class depends on its path matching this arbitrary regex, so validate it here and prevent silent failures.
if !regexp.MustCompile(CHANNEL_PATH_REGEX_PATTERN).MatchString(channelPath) {
return nil, fmt.Errorf(`channel path "%s" must match %s pattern`, channelPath, CHANNEL_PATH_REGEX_PATTERN)
}

return &channelPath, nil
func (q *Query) calcHashString() string {
b := bytes.Join([][]byte{
[]byte(q.ServerUrl.String()),
[]byte(q.ProjectName),
[]byte(q.CqName),
[]byte(q.WindowName),
[]byte(strconv.Itoa(int(q.EventInterval))),
[]byte(strconv.Itoa(int(q.MaxEvents))),
[]byte(strings.Join(q.Fields, "/")),
}, []byte{0})
hashSum := sha256.Sum256(b)

return fmt.Sprintf("%x", hashSum)
}
79 changes: 14 additions & 65 deletions internal/plugin/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package query

import (
"grafana-esp-plugin/internal/plugin/server"
"testing"
)

Expand All @@ -15,68 +16,30 @@ type equalityAssertion struct {
}

func createQuery(t *testing.T) Query {
channelPath := "stream/wss/host/12345/project/cq/window"

q, err := FromChannelPath(channelPath)
s, err := server.FromUrlString("wss://host:12345")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

return *q
}

func TestQueryFromChannelPath(t *testing.T) {
q := createQuery(t)

equalityAssertions := []equalityAssertion{
{"wss", q.ServerUrl.Scheme},
{"host:12345", q.ServerUrl.Host},
{"project", q.ProjectName},
{"cq", q.CqName},
{"window", q.WindowName},
}

for _, equalityAssertion := range equalityAssertions {
expected := equalityAssertion.Expected
actual := equalityAssertion.Actual

if expected != actual {
t.Errorf("expected %v, got %v", expected, actual)
}
}
}
q := New(*s, "project", "cq", "window", 1, 2, []string{}, nil)

func TestQueryFromChannelPathError(t *testing.T) {
invalidChannelPaths := []string{
"foo/wss/host/12345/project/cq/window",
"stream/wss/host/12345/project/cq",
"stream/wss/host/12345/project/cq/window/foo",
"stream/wss/host/65536/project/cq/window",
"stream/wss/%%%/12345/project/cq/window",
}

for _, channelPath := range invalidChannelPaths {
q, err := FromChannelPath(channelPath)
if err == nil {
t.Errorf("expected error, got %v", err)
}

if q != nil {
t.Errorf("expected nil, got %v", q)
}
}
return *q
}

func TestQueryToChannelPath(t *testing.T) {
q := createQuery(t)
q1 := createQuery(t)

cp, err := q.ToChannelPath()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
q2 := createQuery(t)
authHeader := "Bearer foo"
q2.AuthorizationHeader = &authHeader

q3 := createQuery(t)
q3.ProjectName = "foo"

equalityAssertions := []equalityAssertion{
{"stream/wss/host/12345/project/cq/window", *cp},
{"stream/408e4ac72d899e96e5d777c2e9e74939a6257a15d2b365d55221763589b03c4e", q1.ToChannelPath()},
{"stream/408e4ac72d899e96e5d777c2e9e74939a6257a15d2b365d55221763589b03c4e", q2.ToChannelPath()},
{"stream/025b476e61af885aa2b86c50a37da4411d92ac1cc6a8956171145ccb9812aca8", q3.ToChannelPath()},
}

for _, equalityAssertion := range equalityAssertions {
Expand All @@ -88,17 +51,3 @@ func TestQueryToChannelPath(t *testing.T) {
}
}
}

func TestQueryToChannelPathError(t *testing.T) {
q := createQuery(t)
q.ProjectName = "../project"

cp, err := q.ToChannelPath()
if err == nil {
t.Errorf("expected error, got: %v", err)
}

if cp != nil {
t.Errorf("expected nil, got %v", cp)
}
}
11 changes: 4 additions & 7 deletions pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,11 @@ func (d *SampleDatasource) query(_ context.Context, datasourceUid string, qdto q

q := query.New(*s, qdto.ProjectName, qdto.CqName, qdto.WindowName, qdto.Interval, qdto.MaxDataPoints, qdto.Fields, authorizationHeader)

channelPath, err := q.ToChannelPath()
if err != nil {
return handleQueryError("invalid channel path", err)
}
channelPath := q.ToChannelPath()

d.channelQueryMap.Set(*channelPath, q)
d.channelQueryMap.Set(channelPath, q)

log.DefaultLogger.Debug("Received query", "path", *channelPath)
log.DefaultLogger.Debug("Received query", "path", channelPath)

// If query called with streaming on then return a channel
// to subscribe on a client-side and consume updates from a plugin.
Expand All @@ -165,7 +162,7 @@ func (d *SampleDatasource) query(_ context.Context, datasourceUid string, qdto q
channel := live.Channel{
Scope: live.ScopeDatasource,
Namespace: datasourceUid,
Path: *channelPath,
Path: channelPath,
}

frame := data.NewFrame("response")
Expand Down

0 comments on commit 483b02c

Please sign in to comment.