Skip to content

Commit

Permalink
feat: merklewatcherv1 (#184)
Browse files Browse the repository at this point in the history
AB#9402

A service to monitor the recently active tenant massifs and seals.

It will be responsible for posting batches to a topic. The sealer will
pick up the batches, with the assumption that the batch is *at least*
those tenants active since the last batch. But which MAY include
redundant entries.

In cluster tests show the idle query costs < 5m. And the CPU cost, both
empirically and from the azure docs, should be fairly constant, but no
worse than proportional to the number of *matched blobs*

The azure request cost is 1. And we issue the query once per service
instance per deployment per interval. The interval defaults to 2
seconds. We could probably make that faster.

THe time horizon under consideration is currently set to 30 seconds. The
azure docs suggest that is safe. But super safe would be about a minute
or two.

Other notables:

snowflakeid was causing un-necessary dependencies on datatrails event
api consuming code. And it was particularly painful for this change. So
I've provisionaly refactored and moved to
go-datatrails/massifs/snowflakeid

task helm:test:forestrie was broken

It now correctly infers the --values files and renders the forestrie
chart corectly

The only remaining gap with  respect to skaffold.yml is how skaffold
images are specified

review: minors and and rootpublisher -> batchpublisher

Co-authored-by: Robin Bryce <[email protected]>
  • Loading branch information
robinbryce and Robin Bryce authored May 14, 2024
1 parent 061ca1c commit ddc7067
Show file tree
Hide file tree
Showing 16 changed files with 1,695 additions and 1 deletion.
48 changes: 48 additions & 0 deletions massifs/idtimestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"encoding/hex"
"errors"
"strings"
"time"

"github.com/datatrails/forestrie/go-forestrie/massifs/snowflakeid"
)

var (
Expand All @@ -30,6 +33,51 @@ func IDTimestampToHex(id uint64, epoch uint8) string {
return hex.EncodeToString(b)
}

// IDTimeHex returns the hex string idtimestamp corresponding to the time
func IDTimeHex(t time.Time) string {
id, epoch := IDTimestampFromTime(t)
return IDTimestampToHex(id, epoch)
}

// IDToTimeParts performs a lossy, but accurate, conversion to seconds and sub-second nanos.
func IDToTimeParts(id uint64) (int64, int64, []byte) {

ms := id >> snowflakeid.TimeShift

machineSeq := make([]byte, 4)

binary.BigEndian.PutUint32(machineSeq, uint32(id&(^snowflakeid.TimeMask)))

return int64(ms / 1000), int64(ms % 1000), machineSeq[1:]
}

// IDTimestampFromTime returns the time stamp and epoch implied by the provided time
func IDTimestampFromTime(t time.Time) (uint64, uint8) {
return IDTimeFromUnixTime(t.Unix(), t.Nanosecond()/1000000)
}

// IDTimeFromUnixTime calculates the idtimestamp from the provided time.
// The seconds are assumed to be counted from the unix epoch, and ms is
// truncated to 999.
func IDTimeFromUnixTime(seconds int64, ms int) (uint64, uint8) {

if ms >= 1000 {
ms = 999
}

unixMS := uint64(seconds * 1000)

// The epoch is taken as the floor of seconds over our epoch duration.
// The rounding here is safe for a few hundred years.
epoch := uint8(unixMS / uint64(snowflakeid.EpochMS(1)))

unixMS += uint64(ms)
unixMS -= uint64(snowflakeid.EpochMS(epoch))

msBits := unixMS << snowflakeid.TimeShift
return msBits, epoch
}

// SplitIDTimestampHex accepts a hex encoded, and epoch prefixed, id timestamp string
// Returns:
//
Expand Down
75 changes: 75 additions & 0 deletions massifs/idtimestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package massifs
import (
"reflect"
"testing"

"github.com/datatrails/forestrie/go-forestrie/massifs/snowflakeid"
)

func TestIDTimestampBytes(t *testing.T) {
Expand Down Expand Up @@ -72,3 +74,76 @@ func TestSplitIDTimestampBytes(t *testing.T) {
})
}
}

func TestIDTimeFromUnixTime(t *testing.T) {

epochMS := snowflakeid.EpochMS(1)
// epochSec := epochMS / 1000
type args struct {
seconds int64
ms int
}
tests := []struct {
name string
args args
want uint64
wantErr bool
}{
{
name: "contemporary, epoch 1",
args: args{
seconds: 1715184784,
},
want: (uint64(1715184784*1000) - uint64(epochMS)) << (64 - 40),
},
{
name: "far future (hopefully on a beach or a mountain, along with aubry d grey), epoch 4",
args: args{
seconds: (1715184784*1000 + 3*epochMS) / 1000,
},
// note this long hand form accounts for the various roundings that
// happen in the conversion process. Converting without an explicit
// epoch is always lossy, The method under test is intended for
// constructing query filters when looking for blobs with a lastid
// tag *after* a certain approximate time.
want: uint64((((1715184784*1000+3*epochMS)/1000)*1000)-4*epochMS) << (64 - 40),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := IDTimeFromUnixTime(tt.args.seconds, tt.args.ms)
if got != tt.want {
t.Errorf("IDTimeFromTimeParts() = %v, want %v", got, tt.want)
}
})
}
}

func TestIDToTimeParts(t *testing.T) {
type args struct {
id uint64
}
tests := []struct {
name string
args args
want int64
want1 int64
want2 []byte
}{
{"1 bits", args{(1 << (24 + 10)) | (1 << 8) | 1}, 1, 1024 % 1000, []byte{0, 1, 1}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1, got2 := IDToTimeParts(tt.args.id)
if got != tt.want {
t.Errorf("IDToTimeParts() got = %v, want %v", got, tt.want)
}
if got1 != tt.want1 {
t.Errorf("IDToTimeParts() got1 = %v, want %v", got1, tt.want1)
}
if !reflect.DeepEqual(got2, tt.want2) {
t.Errorf("IDToTimeParts() got2 = %v, want %v", got2, tt.want2)
}
})
}
}
2 changes: 1 addition & 1 deletion massifs/massifcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"hash"

"github.com/datatrails/forestrie/go-forestrie/merklelog/snowflakeid"
"github.com/datatrails/forestrie/go-forestrie/massifs/snowflakeid"
"github.com/datatrails/forestrie/go-forestrie/mmr"
"github.com/datatrails/go-datatrails-common/logger"
)
Expand Down
79 changes: 79 additions & 0 deletions massifs/pathparse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package massifs

import (
"errors"
"fmt"
"strconv"
"strings"
)

var (
ErrMassifPathFmt = errors.New("invalid massif path")
)

// XXX: NOTE: Just staging these functions here while the open sourcing effort is in flight

// isMassifPathLike performs a shallow sanity check on a path to see if it could be a massif log path
func IsMassifPathLike(path string) bool {
if !strings.HasPrefix(path, V1MMRTenantPrefix) {
return false
}
if !strings.HasSuffix(path, V1MMRMassifExt) {
return false
}
return true
}

// IsSealPathLike performs a shallow sanity check on a path to see if it could be a massif seal path
func IsSealPathLike(path string) bool {
if !strings.HasPrefix(path, V1MMRTenantPrefix) {
return false
}
if !strings.HasSuffix(path, V1MMRSealSignedRootExt) {
return false
}
return true
}

// ParseMassifPathTenant parse the tenant uuid from a massif storage path
// Performs basic sanity checks
func ParseMassifPathTenant(path string) (string, error) {
if !strings.HasPrefix(path, V1MMRTenantPrefix) {
return "", fmt.Errorf("invalid massif path: %s", path)
}

// the +1 strips the leading /
path = path[len(V1MMRTenantPrefix)+1:]

parts := strings.Split(path, V1MMRPathSep)
if len(parts) == 0 {
return "", fmt.Errorf("invalid massif path: %s", path)
}
// we could parse the uuid, but that seems like over kill
return parts[0], nil
}

// ParseMassifPathTenant parse the log file number and extension from the storage path
// Performs basic sanity checks
func ParseMassifPathNumberExt(path string) (int, string, error) {
if !strings.HasPrefix(path, V1MMRTenantPrefix) {
return 0, "", fmt.Errorf("%w: %s", ErrMassifPathFmt, path)
}
parts := strings.Split(path, V1MMRPathSep)
if len(parts) == 0 {
return 0, "", fmt.Errorf("%w: %s", ErrMassifPathFmt, path)
}
base := parts[len(parts)-1]
parts = strings.Split(base, V1MMRExtSep)
if len(parts) != 2 {
return 0, "", fmt.Errorf("%w: base name invalid %s", ErrMassifPathFmt, path)
}
if parts[1] != V1MMRMassifExt && parts[1] != V1MMRSealSignedRootExt {
return 0, "", fmt.Errorf("%w: extension invalid %s", ErrMassifPathFmt, path)
}
number, err := strconv.Atoi(parts[0])
if err != nil {
return 0, "", fmt.Errorf("%w: log file number invalid %s (%v)", ErrMassifPathFmt, path, err)
}
return number, parts[1], nil
}
160 changes: 160 additions & 0 deletions massifs/pathparse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package massifs

import (
"testing"
)

func TestParseMassifPathTenant(t *testing.T) {
type args struct {
path string
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
"happy case",
args{"v1/mmrs/tenant/84e0e9e9-d479-4d4e-9e8c-afc19a8fc185/0/massifs/0000000000000000.log"},
"84e0e9e9-d479-4d4e-9e8c-afc19a8fc185",
false,
},
{
"missing prefix",
args{"d479-4d4e-9e8c-afc19a8fc185/0/massifs/0000000000000000.log"},
"",
true,
},
{
"corrupt prefix",
args{"v1/mmrx/tenanx/84e0e9e9-d479-4d4e-9e8c-afc19a8fc185/0/massifs/0000000000000000.log"},
"",
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseMassifPathTenant(tt.args.path)
if (err != nil) != tt.wantErr {
t.Errorf("ParseMassifPathTenant() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("ParseMassifPathTenant() = %v, want %v", got, tt.want)
}
})
}
}

func TestParseMassifPathNumberExt(t *testing.T) {
type args struct {
path string
}
tests := []struct {
name string
args args
want int
want1 string
wantErr bool
}{
// TODO: Add test cases.
{
"happy case",
args{"v1/mmrs/tenant/84e0e9e9-d479-4d4e-9e8c-afc19a8fc185/0/massifs/0000000000000002.log"},
2,
"log",
false,
},
{
"seals happy case",
args{"v1/mmrs/tenant/84e0e9e9-d479-4d4e-9e8c-afc19a8fc185/0/massifseals/0000000000000002.sth"},
2,
"sth",
false,
},

{
"bad log ext",
args{"v1/mmrs/tenant/84e0e9e9-d479-4d4e-9e8c-afc19a8fc185/0/massifs/0000000000000002.lxg"},
0,
"",
true,
},
{
"to few parts in base log name",
args{"v1/mmrs/tenant/84e0e9e9-d479-4d4e-9e8c-afc19a8fc185/0/massifs/0000000000000000"},
0,
"",
true,
},
{
"un parsable log number",
args{"v1/mmrs/tenant/84e0e9e9-d479-4d4e-9e8c-afc19a8fc185/0/massifs/0000000y00z00000.log"},
0,
"",
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1, err := ParseMassifPathNumberExt(tt.args.path)
if (err != nil) != tt.wantErr {
t.Errorf("ParseMassifPathNumberExt() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("ParseMassifPathNumberExt() got = %v, want %v", got, tt.want)
}
if got1 != tt.want1 {
t.Errorf("ParseMassifPathNumberExt() got1 = %v, want %v", got1, tt.want1)
}
})
}
}

func TestIsMassifPathLike(t *testing.T) {
type args struct {
path string
}
tests := []struct {
name string
args args
want bool
}{
// notice that the "like tests are intended as a simplifying pre-filter
{"happy case", args{"v1/mmrs/tenant/log"}, true},
{"negative case 1", args{"v1/mmrs/tenant/lox"}, false},
{"negative case 1", args{"v1/mxrs/tenant/log"}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsMassifPathLike(tt.args.path); got != tt.want {
t.Errorf("IsMassifPathLike() = %v, want %v", got, tt.want)
}
})
}
}

func TestIsSealPathLike(t *testing.T) {
type args struct {
path string
}
tests := []struct {
name string
args args
want bool
}{
// notice that the "like tests are intended as a simplifying pre-filter
{"happy case", args{"v1/mmrs/tenant/sth"}, true},
{"negative case 1", args{"v1/mmrs/tenant/sty"}, false},
{"negative case 1", args{"v1/mxrs/tenant/sth"}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsSealPathLike(tt.args.path); got != tt.want {
t.Errorf("IsSealPathLike() = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit ddc7067

Please sign in to comment.