From 27f148a5e44d532b9ea6397d3e25e7e3cf1f9676 Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Mon, 21 Oct 2024 11:54:30 -0600 Subject: [PATCH 01/15] Add support for deletes in the memberlist backed KV store --- Makefile | 8 +- kv/memberlist/kv.pb.go | 125 +++++++++++++++++++--- kv/memberlist/kv.proto | 5 + kv/memberlist/memberlist_client.go | 133 +++++++++++++++++++----- kv/memberlist/memberlist_client_test.go | 10 +- kv/memberlist/status.gohtml | 4 + server/fake_server.pb.go | 86 +++++++-------- 7 files changed, 274 insertions(+), 97 deletions(-) diff --git a/Makefile b/Makefile index bda2b2452..f8d8d74b8 100644 --- a/Makefile +++ b/Makefile @@ -6,14 +6,14 @@ DONT_FIND := -name vendor -prune -o -name .git -prune -o -name .cache -prune -o PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS)) # Download the proper protoc version for Darwin (osx) and Linux. -# If you need windows for some reason it's at https://github.com/protocolbuffers/protobuf/releases/download/v3.6.1/protoc-3.6.1-win32.zip +# If you need windows for some reason it's at https://github.com/protocolbuffers/protobuf/releases/download/v28.2/protoc-28.2-win64.zip UNAME_S := $(shell uname -s) -PROTO_PATH := https://github.com/protocolbuffers/protobuf/releases/download/v3.6.1/ +PROTO_PATH := https://github.com/protocolbuffers/protobuf/releases/download/v28.2/ ifeq ($(UNAME_S), Linux) - PROTO_ZIP=protoc-3.6.1-linux-x86_64.zip + PROTO_ZIP=protoc-28.2-linux-x86_64.zip endif ifeq ($(UNAME_S), Darwin) - PROTO_ZIP=protoc-3.6.1-osx-x86_64.zip + PROTO_ZIP=protoc-28.2-osx-x86_64.zip endif GO_MODS=$(shell find . $(DONT_FIND) -type f -name 'go.mod' -print) diff --git a/kv/memberlist/kv.pb.go b/kv/memberlist/kv.pb.go index 4c2eb9265..2080e9789 100644 --- a/kv/memberlist/kv.pb.go +++ b/kv/memberlist/kv.pb.go @@ -76,6 +76,10 @@ type KeyValuePair struct { Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` // ID of the codec used to write the value Codec string `protobuf:"bytes,3,opt,name=codec,proto3" json:"codec,omitempty"` + // Is this Key marked for deletion? + Deleted bool `protobuf:"varint,4,opt,name=deleted,proto3" json:"deleted,omitempty"` + // When was the key last updated? + UpdateTimeMillis int64 `protobuf:"varint,5,opt,name=update_time_millis,json=updateTimeMillis,proto3" json:"update_time_millis,omitempty"` } func (m *KeyValuePair) Reset() { *m = KeyValuePair{} } @@ -131,6 +135,20 @@ func (m *KeyValuePair) GetCodec() string { return "" } +func (m *KeyValuePair) GetDeleted() bool { + if m != nil { + return m.Deleted + } + return false +} + +func (m *KeyValuePair) GetUpdateTimeMillis() int64 { + if m != nil { + return m.UpdateTimeMillis + } + return 0 +} + func init() { proto.RegisterType((*KeyValueStore)(nil), "memberlist.KeyValueStore") proto.RegisterType((*KeyValuePair)(nil), "memberlist.KeyValuePair") @@ -139,22 +157,25 @@ func init() { func init() { proto.RegisterFile("kv.proto", fileDescriptor_2216fe83c9c12408) } var fileDescriptor_2216fe83c9c12408 = []byte{ - // 236 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xc8, 0x2e, 0xd3, 0x2b, - 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0x4d, 0xcd, 0x4d, 0x4a, 0x2d, 0xca, 0xc9, 0x2c, 0x2e, - 0x91, 0xd2, 0x4d, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xcf, 0x4f, - 0xcf, 0xd7, 0x07, 0x2b, 0x49, 0x2a, 0x4d, 0x03, 0xf3, 0xc0, 0x1c, 0x30, 0x0b, 0xa2, 0x55, 0xc9, - 0x9e, 0x8b, 0xd7, 0x3b, 0xb5, 0x32, 0x2c, 0x31, 0xa7, 0x34, 0x35, 0xb8, 0x24, 0xbf, 0x28, 0x55, - 0x48, 0x8f, 0x8b, 0xb5, 0x20, 0x31, 0xb3, 0xa8, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0xdb, 0x48, - 0x42, 0x0f, 0x61, 0xb6, 0x1e, 0x4c, 0x65, 0x40, 0x62, 0x66, 0x51, 0x10, 0x44, 0x99, 0x92, 0x0f, - 0x17, 0x0f, 0xb2, 0xb0, 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0xa5, 0x04, 0xa3, 0x02, 0xa3, 0x06, - 0x67, 0x10, 0x88, 0x29, 0x24, 0xc2, 0xc5, 0x5a, 0x06, 0x92, 0x96, 0x60, 0x52, 0x60, 0xd4, 0xe0, - 0x09, 0x82, 0x70, 0x40, 0xa2, 0xc9, 0xf9, 0x29, 0xa9, 0xc9, 0x12, 0xcc, 0x60, 0x95, 0x10, 0x8e, - 0x93, 0xc9, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, - 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, - 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, - 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x24, 0x36, 0xb0, 0x5f, 0x8c, 0x01, 0x01, - 0x00, 0x00, 0xff, 0xff, 0x7a, 0x22, 0xdf, 0xec, 0x12, 0x01, 0x00, 0x00, + // 288 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x90, 0x31, 0x4b, 0xc3, 0x40, + 0x18, 0x86, 0xef, 0x33, 0x56, 0xeb, 0x59, 0xa1, 0x1c, 0x0e, 0x87, 0xc3, 0x47, 0xe8, 0x94, 0x41, + 0x53, 0x50, 0x77, 0xc1, 0x55, 0x04, 0x89, 0xe2, 0x5a, 0x92, 0xe6, 0xb3, 0x1e, 0xcd, 0x71, 0x25, + 0xbd, 0x14, 0xba, 0xf9, 0x13, 0x1c, 0xfc, 0x11, 0xfe, 0x14, 0xc7, 0x8e, 0x1d, 0xed, 0x65, 0x71, + 0xec, 0x4f, 0x90, 0x5c, 0x28, 0xba, 0xbd, 0xcf, 0xfb, 0x3e, 0x77, 0xc3, 0xc7, 0xbb, 0xd3, 0x45, + 0x3c, 0x2b, 0x8d, 0x35, 0x82, 0x6b, 0xd2, 0x19, 0x95, 0x85, 0x9a, 0xdb, 0xb3, 0x8b, 0x89, 0xb2, + 0xaf, 0x55, 0x16, 0x8f, 0x8d, 0x1e, 0x4e, 0xcc, 0xc4, 0x0c, 0xbd, 0x92, 0x55, 0x2f, 0x9e, 0x3c, + 0xf8, 0xd4, 0x3e, 0x1d, 0xdc, 0xf0, 0x93, 0x3b, 0x5a, 0x3e, 0xa7, 0x45, 0x45, 0x8f, 0xd6, 0x94, + 0x24, 0x62, 0xde, 0x99, 0xa5, 0xaa, 0x9c, 0x4b, 0x08, 0x83, 0xe8, 0xf8, 0x52, 0xc6, 0x7f, 0x7f, + 0xc7, 0x3b, 0xf3, 0x21, 0x55, 0x65, 0xd2, 0x6a, 0x83, 0x0f, 0xe0, 0xbd, 0xff, 0xbd, 0xe8, 0xf3, + 0x60, 0x4a, 0x4b, 0x09, 0x21, 0x44, 0x47, 0x49, 0x13, 0xc5, 0x29, 0xef, 0x2c, 0x9a, 0x59, 0xee, + 0x85, 0x10, 0xf5, 0x92, 0x16, 0x9a, 0x76, 0x6c, 0x72, 0x1a, 0xcb, 0xc0, 0x9b, 0x2d, 0x08, 0xc9, + 0x0f, 0x73, 0x2a, 0xc8, 0x52, 0x2e, 0xf7, 0x43, 0x88, 0xba, 0xc9, 0x0e, 0xc5, 0x39, 0x17, 0xd5, + 0x2c, 0x4f, 0x2d, 0x8d, 0xac, 0xd2, 0x34, 0xd2, 0xaa, 0x28, 0xd4, 0x5c, 0x76, 0x42, 0x88, 0x82, + 0xa4, 0xdf, 0x2e, 0x4f, 0x4a, 0xd3, 0xbd, 0xef, 0x6f, 0xaf, 0x57, 0x1b, 0x64, 0xeb, 0x0d, 0xb2, + 0xed, 0x06, 0xe1, 0xcd, 0x21, 0x7c, 0x3a, 0x84, 0x2f, 0x87, 0xb0, 0x72, 0x08, 0xdf, 0x0e, 0xe1, + 0xc7, 0x21, 0xdb, 0x3a, 0x84, 0xf7, 0x1a, 0xd9, 0xaa, 0x46, 0xb6, 0xae, 0x91, 0x65, 0x07, 0xfe, + 0x28, 0x57, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe0, 0x1f, 0xee, 0xce, 0x5b, 0x01, 0x00, 0x00, } func (this *KeyValueStore) Equal(that interface{}) bool { @@ -214,6 +235,12 @@ func (this *KeyValuePair) Equal(that interface{}) bool { if this.Codec != that1.Codec { return false } + if this.Deleted != that1.Deleted { + return false + } + if this.UpdateTimeMillis != that1.UpdateTimeMillis { + return false + } return true } func (this *KeyValueStore) GoString() string { @@ -232,11 +259,13 @@ func (this *KeyValuePair) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 9) s = append(s, "&memberlist.KeyValuePair{") s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") s = append(s, "Codec: "+fmt.Sprintf("%#v", this.Codec)+",\n") + s = append(s, "Deleted: "+fmt.Sprintf("%#v", this.Deleted)+",\n") + s = append(s, "UpdateTimeMillis: "+fmt.Sprintf("%#v", this.UpdateTimeMillis)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -305,6 +334,21 @@ func (m *KeyValuePair) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.UpdateTimeMillis != 0 { + i = encodeVarintKv(dAtA, i, uint64(m.UpdateTimeMillis)) + i-- + dAtA[i] = 0x28 + } + if m.Deleted { + i-- + if m.Deleted { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x20 + } if len(m.Codec) > 0 { i -= len(m.Codec) copy(dAtA[i:], m.Codec) @@ -373,6 +417,12 @@ func (m *KeyValuePair) Size() (n int) { if l > 0 { n += 1 + l + sovKv(uint64(l)) } + if m.Deleted { + n += 2 + } + if m.UpdateTimeMillis != 0 { + n += 1 + sovKv(uint64(m.UpdateTimeMillis)) + } return n } @@ -405,6 +455,8 @@ func (this *KeyValuePair) String() string { `Key:` + fmt.Sprintf("%v", this.Key) + `,`, `Value:` + fmt.Sprintf("%v", this.Value) + `,`, `Codec:` + fmt.Sprintf("%v", this.Codec) + `,`, + `Deleted:` + fmt.Sprintf("%v", this.Deleted) + `,`, + `UpdateTimeMillis:` + fmt.Sprintf("%v", this.UpdateTimeMillis) + `,`, `}`, }, "") return s @@ -631,6 +683,45 @@ func (m *KeyValuePair) Unmarshal(dAtA []byte) error { } m.Codec = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Deleted", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Deleted = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UpdateTimeMillis", wireType) + } + m.UpdateTimeMillis = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.UpdateTimeMillis |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipKv(dAtA[iNdEx:]) diff --git a/kv/memberlist/kv.proto b/kv/memberlist/kv.proto index cc5f12463..b2e513b07 100644 --- a/kv/memberlist/kv.proto +++ b/kv/memberlist/kv.proto @@ -19,4 +19,9 @@ message KeyValuePair { // ID of the codec used to write the value string codec = 3; + + // Is this Key marked for deletion? + bool deleted = 4; + // When was the key last updated? + int64 update_time_millis = 5; } diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 452798e04..b1d892daa 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -72,8 +72,14 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) { } // Delete is part of kv.Client interface. -func (c *Client) Delete(_ context.Context, _ string) error { - return errors.New("memberlist does not support Delete") +func (c *Client) Delete(ctx context.Context, key string) error { + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return err + } + + c.kv.Delete(key) + return nil } // CAS is part of kv.Client interface @@ -155,7 +161,8 @@ type KVConfig struct { RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"` // Remove LEFT ingesters from ring after this timeout. - LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` + LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` + ObsoleteEntriesTimeout time.Duration `yaml:"obsolete_entries_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` @@ -330,6 +337,9 @@ type ValueDesc struct { // ID of codec used to write this value. Only used when sending full state. CodecID string + + Deleted bool + UpdateTime time.Time } func (v ValueDesc) Clone() (result ValueDesc) { @@ -344,6 +354,8 @@ type valueUpdate struct { value []byte codec codec.Codec messageSize int + deleted bool + updateTime time.Time } func (v ValueDesc) String() string { @@ -508,6 +520,9 @@ func (m *KV) running(ctx context.Context) error { tickerChan = t.C } + obsoleteEntriesTicker := time.NewTicker(m.cfg.PushPullInterval) + defer obsoleteEntriesTicker.Stop() + logger := log.With(m.logger, "phase", "periodic_rejoin") for { select { @@ -521,6 +536,11 @@ func (m *KV) running(ctx context.Context) error { level.Warn(logger).Log("msg", "re-joining memberlist cluster failed", "err", err, "next_try_in", m.cfg.RejoinInterval) } + case <-obsoleteEntriesTicker.C: + // cleanupObsoleteEntries is normally called during push/pull, but if there are no other + // nodes to push/pull with, we can call it periodically to make sure we remove unused entries from memory. + m.cleanupObsoleteEntries() + case <-ctx.Done(): return nil } @@ -1005,6 +1025,20 @@ func (m *KV) notifyWatchersSync(key string) { } } +func (m *KV) Delete(key string) { + m.storeMu.Lock() + defer m.storeMu.Unlock() + + val, ok := m.store[key] + if !ok || val.Deleted { + return + } + + val.Deleted = true + val.UpdateTime = time.Now() + m.store[key] = val +} + // CAS implements Compare-And-Set/Swap operation. // // CAS expects that value returned by 'f' function implements Mergeable interface. If it doesn't, CAS fails immediately. @@ -1035,7 +1069,7 @@ outer: } } - change, newver, retry, err := m.trySingleCas(key, codec, f) + change, newver, retry, updateTime, err := m.trySingleCas(key, codec, f) if err != nil { level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry) @@ -1050,7 +1084,7 @@ outer: m.casSuccesses.Inc() m.notifyWatchers(key) - m.broadcastNewValue(key, change, newver, codec, true) + m.broadcastNewValue(key, change, newver, codec, true, false, updateTime) } return nil @@ -1067,50 +1101,52 @@ outer: // returns change, error (or nil, if CAS succeeded), and whether to retry or not. // returns errNoChangeDetected if merge failed to detect change in f's output. -func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, error) { +func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, time.Time, error) { val, ver, err := m.get(key, codec) if err != nil { - return nil, 0, false, fmt.Errorf("failed to get value: %v", err) + return nil, 0, false, time.Time{}, fmt.Errorf("failed to get value: %v", err) } out, retry, err := f(val) if err != nil { - return nil, 0, retry, fmt.Errorf("fn returned error: %v", err) + return nil, 0, retry, time.Time{}, fmt.Errorf("fn returned error: %v", err) } if out == nil { // no change to be done - return nil, 0, false, nil + return nil, 0, false, time.Time{}, nil } // Don't even try incomingValue, ok := out.(Mergeable) if !ok || incomingValue == nil { - return nil, 0, retry, fmt.Errorf("invalid type: %T, expected Mergeable", out) + return nil, 0, retry, time.Time{}, fmt.Errorf("invalid type: %T, expected Mergeable", out) } // To support detection of removed items from value, we will only allow CAS operation to // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'. // Supplied function may have kept a reference to the returned "incoming value". // If KV store will keep this value as well, it needs to make a clone. - change, newver, err := m.mergeValueForKey(key, incomingValue, true, ver, codec) + ut := time.Now() + + change, newver, err := m.mergeValueForKey(key, incomingValue, true, ver, codec, false, ut) if err == errVersionMismatch { - return nil, 0, retry, err + return nil, 0, retry, time.Time{}, err } if err != nil { - return nil, 0, retry, fmt.Errorf("merge failed: %v", err) + return nil, 0, retry, time.Time{}, fmt.Errorf("merge failed: %v", err) } if newver == 0 { // CAS method reacts on this error - return nil, 0, retry, errNoChangeDetected + return nil, 0, retry, time.Time{}, errNoChangeDetected } - return change, newver, retry, nil + return change, newver, retry, ut, nil } -func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool) { +func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time) { if locallyGenerated && m.State() != services.Running { level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key) return @@ -1123,7 +1159,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } - kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID()} + kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID(), Deleted: deleted, UpdateTimeMillis: updateTimeMillis(updateTime)} pairData, err := kvPair.Marshal() if err != nil { level.Error(m.logger).Log("msg", "failed to serialize KV pair", "key", key, "version", version, "err", err) @@ -1200,7 +1236,7 @@ func (m *KV) NotifyMsg(msg []byte) { ch := m.getKeyWorkerChannel(kvPair.Key) select { - case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg)}: + case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg), deleted: kvPair.Deleted, updateTime: updateTime(kvPair.UpdateTimeMillis)}: default: m.numberOfDroppedMessages.Inc() level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key) @@ -1227,7 +1263,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { select { case update := <-workerCh: // we have a value update! Let's merge it with our current version for given key - mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec) + mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime) changes := []string(nil) if mod != nil { @@ -1252,7 +1288,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { m.notifyWatchers(key) // Don't resend original message, but only changes. - m.broadcastNewValue(key, mod, version, update.codec, false) + m.broadcastNewValue(key, mod, version, update.codec, false, update.deleted, update.updateTime) } case <-m.shutdown: @@ -1326,6 +1362,8 @@ func (m *KV) LocalState(_ bool) []byte { kvPair.Key = key kvPair.Value = encoded kvPair.Codec = val.CodecID + kvPair.Deleted = val.Deleted + kvPair.UpdateTimeMillis = updateTimeMillis(val.UpdateTime) ser, err := kvPair.Marshal() if err != nil { @@ -1407,8 +1445,13 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { continue } + updateTime := updateTime(kvPair.UpdateTimeMillis) + if updateTime.IsZero() { + updateTime = time.Now() + } + // we have both key and value, try to merge it with our state - change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) + change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, updateTime) changes := []string(nil) if change != nil { @@ -1427,7 +1470,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) } else if newver > 0 { m.notifyWatchers(kvPair.Key) - m.broadcastNewValue(kvPair.Key, change, newver, codec, false) + m.broadcastNewValue(kvPair.Key, change, newver, codec, false, kvPair.Deleted, updateTime) } } @@ -1436,7 +1479,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { } } -func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec) (Mergeable, uint, error) { +func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { decodedValue, err := codec.Decode(incomingData) if err != nil { return nil, 0, fmt.Errorf("failed to decode value: %v", err) @@ -1448,14 +1491,14 @@ func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec. } // No need to clone this "incomingValue", since we have just decoded it from bytes, and won't be using it. - return m.mergeValueForKey(key, incomingValue, false, 0, codec) + return m.mergeValueForKey(key, incomingValue, false, 0, codec, deleted, updateTime) } // Merges incoming value with value we have in our store. Returns "a change" that can be sent to other // cluster members to update their state, and new version of the value. // If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. // If no modification occurred, new version is 0. -func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codec codec.Codec) (Mergeable, uint, error) { +func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { m.storeMu.Lock() defer m.storeMu.Unlock() @@ -1496,10 +1539,19 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue } newVersion := curr.Version + 1 + newUpdateTime := curr.UpdateTime + newDeleted := curr.Deleted + + if !updateTime.IsZero() && updateTime.After(newUpdateTime) { + newUpdateTime = updateTime + newDeleted = deleted + } m.store[key] = ValueDesc{ - value: result, - Version: newVersion, - CodecID: codec.CodecID(), + value: result, + Version: newVersion, + CodecID: codec.CodecID(), + Deleted: newDeleted, + UpdateTime: newUpdateTime, } // The "changes" returned by Merge() can contain references to the "result" @@ -1584,6 +1636,17 @@ func (m *KV) deleteSentReceivedMessages() { m.receivedMessagesSize = 0 } +func (m *KV) cleanupObsoleteEntries() { + m.storeMu.Lock() + defer m.storeMu.Lock() + + for k, v := range m.store { + if v.Deleted && time.Since(v.UpdateTime) > m.cfg.ObsoleteEntriesTimeout { + delete(m.store, k) + } + } +} + func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Message, int) { msgs = append(msgs, msg) size += msg.Size @@ -1595,3 +1658,17 @@ func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Mes return msgs, size } + +func updateTime(val int64) time.Time { + if val == 0 { + return time.Time{} + } + return time.UnixMilli(val) +} + +func updateTimeMillis(ts time.Time) int64 { + if ts.IsZero() { + return 0 + } + return ts.UnixMilli() +} diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 47e8b3a8f..7aed20ad0 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1686,11 +1686,11 @@ func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) { require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) // Check that locally-generated broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue. - kv.broadcastNewValue("non-local", smallUpdate, 1, codec, false) - kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false) - kv.broadcastNewValue("local", smallUpdate, 1, codec, true) - kv.broadcastNewValue("local", bigUpdate, 2, codec, true) - kv.broadcastNewValue("local", mediumUpdate, 3, codec, true) + kv.broadcastNewValue("non-local", smallUpdate, 1, codec, false, false, time.Now()) + kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false, false, time.Now()) + kv.broadcastNewValue("local", smallUpdate, 1, codec, true, false, time.Now()) + kv.broadcastNewValue("local", bigUpdate, 2, codec, true, false, time.Now()) + kv.broadcastNewValue("local", mediumUpdate, 3, codec, true, false, time.Now()) err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP memberlist_client_messages_in_broadcast_queue Number of user messages in the broadcast queue diff --git a/kv/memberlist/status.gohtml b/kv/memberlist/status.gohtml index 6f845b6e0..524acb809 100644 --- a/kv/memberlist/status.gohtml +++ b/kv/memberlist/status.gohtml @@ -22,6 +22,8 @@ Key Codec Version + Deleted + Update Time Actions @@ -32,6 +34,8 @@ {{ $k }} {{ $v.CodecID }} {{ $v.Version }} + {{ $v.Deleted }} + {{ $v.UpdateTime }} json | json-pretty diff --git a/server/fake_server.pb.go b/server/fake_server.pb.go index 4bb2d5a1f..f54d50ba9 100644 --- a/server/fake_server.pb.go +++ b/server/fake_server.pb.go @@ -7,10 +7,10 @@ import ( context "context" fmt "fmt" proto "github.com/gogo/protobuf/proto" - empty "github.com/golang/protobuf/ptypes/empty" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" io "io" math "math" math_bits "math/bits" @@ -236,12 +236,12 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type FakeServerClient interface { - Succeed(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) - FailWithError(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) - FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*empty.Empty, error) - Sleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) - StreamSleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) - ReturnProxyProtoCallerIP(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*ProxyProtoIPResponse, error) + Succeed(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) + FailWithError(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) + FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + Sleep(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) + StreamSleep(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) + ReturnProxyProtoCallerIP(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ProxyProtoIPResponse, error) } type fakeServerClient struct { @@ -252,8 +252,8 @@ func NewFakeServerClient(cc *grpc.ClientConn) FakeServerClient { return &fakeServerClient{cc} } -func (c *fakeServerClient) Succeed(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { - out := new(empty.Empty) +func (c *fakeServerClient) Succeed(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/server.FakeServer/Succeed", in, out, opts...) if err != nil { return nil, err @@ -261,8 +261,8 @@ func (c *fakeServerClient) Succeed(ctx context.Context, in *empty.Empty, opts .. return out, nil } -func (c *fakeServerClient) FailWithError(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { - out := new(empty.Empty) +func (c *fakeServerClient) FailWithError(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/server.FakeServer/FailWithError", in, out, opts...) if err != nil { return nil, err @@ -270,8 +270,8 @@ func (c *fakeServerClient) FailWithError(ctx context.Context, in *empty.Empty, o return out, nil } -func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*empty.Empty, error) { - out := new(empty.Empty) +func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/server.FakeServer/FailWithHTTPError", in, out, opts...) if err != nil { return nil, err @@ -279,8 +279,8 @@ func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHT return out, nil } -func (c *fakeServerClient) Sleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { - out := new(empty.Empty) +func (c *fakeServerClient) Sleep(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/server.FakeServer/Sleep", in, out, opts...) if err != nil { return nil, err @@ -288,7 +288,7 @@ func (c *fakeServerClient) Sleep(ctx context.Context, in *empty.Empty, opts ...g return out, nil } -func (c *fakeServerClient) StreamSleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) { +func (c *fakeServerClient) StreamSleep(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) { stream, err := c.cc.NewStream(ctx, &_FakeServer_serviceDesc.Streams[0], "/server.FakeServer/StreamSleep", opts...) if err != nil { return nil, err @@ -304,7 +304,7 @@ func (c *fakeServerClient) StreamSleep(ctx context.Context, in *empty.Empty, opt } type FakeServer_StreamSleepClient interface { - Recv() (*empty.Empty, error) + Recv() (*emptypb.Empty, error) grpc.ClientStream } @@ -312,15 +312,15 @@ type fakeServerStreamSleepClient struct { grpc.ClientStream } -func (x *fakeServerStreamSleepClient) Recv() (*empty.Empty, error) { - m := new(empty.Empty) +func (x *fakeServerStreamSleepClient) Recv() (*emptypb.Empty, error) { + m := new(emptypb.Empty) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } -func (c *fakeServerClient) ReturnProxyProtoCallerIP(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*ProxyProtoIPResponse, error) { +func (c *fakeServerClient) ReturnProxyProtoCallerIP(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ProxyProtoIPResponse, error) { out := new(ProxyProtoIPResponse) err := c.cc.Invoke(ctx, "/server.FakeServer/ReturnProxyProtoCallerIP", in, out, opts...) if err != nil { @@ -331,34 +331,34 @@ func (c *fakeServerClient) ReturnProxyProtoCallerIP(ctx context.Context, in *emp // FakeServerServer is the server API for FakeServer service. type FakeServerServer interface { - Succeed(context.Context, *empty.Empty) (*empty.Empty, error) - FailWithError(context.Context, *empty.Empty) (*empty.Empty, error) - FailWithHTTPError(context.Context, *FailWithHTTPErrorRequest) (*empty.Empty, error) - Sleep(context.Context, *empty.Empty) (*empty.Empty, error) - StreamSleep(*empty.Empty, FakeServer_StreamSleepServer) error - ReturnProxyProtoCallerIP(context.Context, *empty.Empty) (*ProxyProtoIPResponse, error) + Succeed(context.Context, *emptypb.Empty) (*emptypb.Empty, error) + FailWithError(context.Context, *emptypb.Empty) (*emptypb.Empty, error) + FailWithHTTPError(context.Context, *FailWithHTTPErrorRequest) (*emptypb.Empty, error) + Sleep(context.Context, *emptypb.Empty) (*emptypb.Empty, error) + StreamSleep(*emptypb.Empty, FakeServer_StreamSleepServer) error + ReturnProxyProtoCallerIP(context.Context, *emptypb.Empty) (*ProxyProtoIPResponse, error) } // UnimplementedFakeServerServer can be embedded to have forward compatible implementations. type UnimplementedFakeServerServer struct { } -func (*UnimplementedFakeServerServer) Succeed(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { +func (*UnimplementedFakeServerServer) Succeed(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Succeed not implemented") } -func (*UnimplementedFakeServerServer) FailWithError(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { +func (*UnimplementedFakeServerServer) FailWithError(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method FailWithError not implemented") } -func (*UnimplementedFakeServerServer) FailWithHTTPError(ctx context.Context, req *FailWithHTTPErrorRequest) (*empty.Empty, error) { +func (*UnimplementedFakeServerServer) FailWithHTTPError(ctx context.Context, req *FailWithHTTPErrorRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method FailWithHTTPError not implemented") } -func (*UnimplementedFakeServerServer) Sleep(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { +func (*UnimplementedFakeServerServer) Sleep(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Sleep not implemented") } -func (*UnimplementedFakeServerServer) StreamSleep(req *empty.Empty, srv FakeServer_StreamSleepServer) error { +func (*UnimplementedFakeServerServer) StreamSleep(req *emptypb.Empty, srv FakeServer_StreamSleepServer) error { return status.Errorf(codes.Unimplemented, "method StreamSleep not implemented") } -func (*UnimplementedFakeServerServer) ReturnProxyProtoCallerIP(ctx context.Context, req *empty.Empty) (*ProxyProtoIPResponse, error) { +func (*UnimplementedFakeServerServer) ReturnProxyProtoCallerIP(ctx context.Context, req *emptypb.Empty) (*ProxyProtoIPResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ReturnProxyProtoCallerIP not implemented") } @@ -367,7 +367,7 @@ func RegisterFakeServerServer(s *grpc.Server, srv FakeServerServer) { } func _FakeServer_Succeed_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(empty.Empty) + in := new(emptypb.Empty) if err := dec(in); err != nil { return nil, err } @@ -379,13 +379,13 @@ func _FakeServer_Succeed_Handler(srv interface{}, ctx context.Context, dec func( FullMethod: "/server.FakeServer/Succeed", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).Succeed(ctx, req.(*empty.Empty)) + return srv.(FakeServerServer).Succeed(ctx, req.(*emptypb.Empty)) } return interceptor(ctx, in, info, handler) } func _FakeServer_FailWithError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(empty.Empty) + in := new(emptypb.Empty) if err := dec(in); err != nil { return nil, err } @@ -397,7 +397,7 @@ func _FakeServer_FailWithError_Handler(srv interface{}, ctx context.Context, dec FullMethod: "/server.FakeServer/FailWithError", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).FailWithError(ctx, req.(*empty.Empty)) + return srv.(FakeServerServer).FailWithError(ctx, req.(*emptypb.Empty)) } return interceptor(ctx, in, info, handler) } @@ -421,7 +421,7 @@ func _FakeServer_FailWithHTTPError_Handler(srv interface{}, ctx context.Context, } func _FakeServer_Sleep_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(empty.Empty) + in := new(emptypb.Empty) if err := dec(in); err != nil { return nil, err } @@ -433,13 +433,13 @@ func _FakeServer_Sleep_Handler(srv interface{}, ctx context.Context, dec func(in FullMethod: "/server.FakeServer/Sleep", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).Sleep(ctx, req.(*empty.Empty)) + return srv.(FakeServerServer).Sleep(ctx, req.(*emptypb.Empty)) } return interceptor(ctx, in, info, handler) } func _FakeServer_StreamSleep_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(empty.Empty) + m := new(emptypb.Empty) if err := stream.RecvMsg(m); err != nil { return err } @@ -447,7 +447,7 @@ func _FakeServer_StreamSleep_Handler(srv interface{}, stream grpc.ServerStream) } type FakeServer_StreamSleepServer interface { - Send(*empty.Empty) error + Send(*emptypb.Empty) error grpc.ServerStream } @@ -455,12 +455,12 @@ type fakeServerStreamSleepServer struct { grpc.ServerStream } -func (x *fakeServerStreamSleepServer) Send(m *empty.Empty) error { +func (x *fakeServerStreamSleepServer) Send(m *emptypb.Empty) error { return x.ServerStream.SendMsg(m) } func _FakeServer_ReturnProxyProtoCallerIP_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(empty.Empty) + in := new(emptypb.Empty) if err := dec(in); err != nil { return nil, err } @@ -472,7 +472,7 @@ func _FakeServer_ReturnProxyProtoCallerIP_Handler(srv interface{}, ctx context.C FullMethod: "/server.FakeServer/ReturnProxyProtoCallerIP", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).ReturnProxyProtoCallerIP(ctx, req.(*empty.Empty)) + return srv.(FakeServerServer).ReturnProxyProtoCallerIP(ctx, req.(*emptypb.Empty)) } return interceptor(ctx, in, info, handler) } From 338f61a98ff7e2d0530fb8960deb31cece5c67d6 Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Mon, 21 Oct 2024 13:49:07 -0600 Subject: [PATCH 02/15] fix tests --- kv/memberlist/memberlist_client_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 7aed20ad0..679ff07f0 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -564,7 +564,7 @@ func defaultKVConfig(i int) KVConfig { cfg.GossipInterval = 100 * time.Millisecond cfg.GossipNodes = 10 - cfg.PushPullInterval = 5 * time.Second + cfg.PushPullInterval = 30 * time.Second cfg.TCPTransport = TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), @@ -653,7 +653,6 @@ func TestMultipleClientsWithSameLabelWithClusterLabelVerification(t *testing.T) cfg := defaultKVConfig(i) cfg.ClusterLabel = label - return cfg } @@ -1303,6 +1302,7 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) { TCPTransport: TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), }, + PushPullInterval: 30 * time.Second, } // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. cfg.RetransmitMult = 1 @@ -1372,6 +1372,7 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { TCPTransport: TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), }, + PushPullInterval: 30 * time.Second, } // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. cfg.RetransmitMult = 1 @@ -1518,6 +1519,7 @@ func TestDelegateMethodsDontCrashBeforeKVStarts(t *testing.T) { cfg.TCPTransport = TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), } + cfg.PushPullInterval = 30 * time.Second kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) @@ -1561,6 +1563,7 @@ func TestMetricsRegistration(t *testing.T) { cfg := KVConfig{} cfg.Codecs = append(cfg.Codecs, c) + cfg.PushPullInterval = 30 * time.Second reg := prometheus.NewPedanticRegistry() kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg) @@ -1670,6 +1673,7 @@ func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) { // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. cfg.RetransmitMult = 1 cfg.Codecs = append(cfg.Codecs, codec) + cfg.PushPullInterval = 30 * time.Second reg := prometheus.NewRegistry() kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg) @@ -1729,6 +1733,7 @@ func TestRaceBetweenStoringNewValueForKeyAndUpdatingIt(t *testing.T) { cfg.TCPTransport = TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), } + cfg.PushPullInterval = 30 * time.Second kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) @@ -1809,6 +1814,7 @@ func TestNotificationDelay(t *testing.T) { // We're going to trigger sends manually, so effectively disable the automatic send interval. const hundredYears = 100 * 365 * 24 * time.Hour cfg.NotifyInterval = hundredYears + cfg.PushPullInterval = 30 * time.Second kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) watchChan := make(chan string, 16) From 66fb2b13aad196f2e714d737a53114087ec990b9 Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Mon, 21 Oct 2024 14:03:47 -0600 Subject: [PATCH 03/15] fix go mod --- Makefile | 8 ++-- server/fake_server.pb.go | 86 ++++++++++++++++++++-------------------- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/Makefile b/Makefile index f8d8d74b8..bda2b2452 100644 --- a/Makefile +++ b/Makefile @@ -6,14 +6,14 @@ DONT_FIND := -name vendor -prune -o -name .git -prune -o -name .cache -prune -o PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS)) # Download the proper protoc version for Darwin (osx) and Linux. -# If you need windows for some reason it's at https://github.com/protocolbuffers/protobuf/releases/download/v28.2/protoc-28.2-win64.zip +# If you need windows for some reason it's at https://github.com/protocolbuffers/protobuf/releases/download/v3.6.1/protoc-3.6.1-win32.zip UNAME_S := $(shell uname -s) -PROTO_PATH := https://github.com/protocolbuffers/protobuf/releases/download/v28.2/ +PROTO_PATH := https://github.com/protocolbuffers/protobuf/releases/download/v3.6.1/ ifeq ($(UNAME_S), Linux) - PROTO_ZIP=protoc-28.2-linux-x86_64.zip + PROTO_ZIP=protoc-3.6.1-linux-x86_64.zip endif ifeq ($(UNAME_S), Darwin) - PROTO_ZIP=protoc-28.2-osx-x86_64.zip + PROTO_ZIP=protoc-3.6.1-osx-x86_64.zip endif GO_MODS=$(shell find . $(DONT_FIND) -type f -name 'go.mod' -print) diff --git a/server/fake_server.pb.go b/server/fake_server.pb.go index f54d50ba9..4bb2d5a1f 100644 --- a/server/fake_server.pb.go +++ b/server/fake_server.pb.go @@ -7,10 +7,10 @@ import ( context "context" fmt "fmt" proto "github.com/gogo/protobuf/proto" + empty "github.com/golang/protobuf/ptypes/empty" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" io "io" math "math" math_bits "math/bits" @@ -236,12 +236,12 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type FakeServerClient interface { - Succeed(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) - FailWithError(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) - FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) - Sleep(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) - StreamSleep(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) - ReturnProxyProtoCallerIP(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ProxyProtoIPResponse, error) + Succeed(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) + FailWithError(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) + FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*empty.Empty, error) + Sleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) + StreamSleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) + ReturnProxyProtoCallerIP(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*ProxyProtoIPResponse, error) } type fakeServerClient struct { @@ -252,8 +252,8 @@ func NewFakeServerClient(cc *grpc.ClientConn) FakeServerClient { return &fakeServerClient{cc} } -func (c *fakeServerClient) Succeed(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) +func (c *fakeServerClient) Succeed(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) err := c.cc.Invoke(ctx, "/server.FakeServer/Succeed", in, out, opts...) if err != nil { return nil, err @@ -261,8 +261,8 @@ func (c *fakeServerClient) Succeed(ctx context.Context, in *emptypb.Empty, opts return out, nil } -func (c *fakeServerClient) FailWithError(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) +func (c *fakeServerClient) FailWithError(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) err := c.cc.Invoke(ctx, "/server.FakeServer/FailWithError", in, out, opts...) if err != nil { return nil, err @@ -270,8 +270,8 @@ func (c *fakeServerClient) FailWithError(ctx context.Context, in *emptypb.Empty, return out, nil } -func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) +func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHTTPErrorRequest, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) err := c.cc.Invoke(ctx, "/server.FakeServer/FailWithHTTPError", in, out, opts...) if err != nil { return nil, err @@ -279,8 +279,8 @@ func (c *fakeServerClient) FailWithHTTPError(ctx context.Context, in *FailWithHT return out, nil } -func (c *fakeServerClient) Sleep(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) +func (c *fakeServerClient) Sleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) err := c.cc.Invoke(ctx, "/server.FakeServer/Sleep", in, out, opts...) if err != nil { return nil, err @@ -288,7 +288,7 @@ func (c *fakeServerClient) Sleep(ctx context.Context, in *emptypb.Empty, opts .. return out, nil } -func (c *fakeServerClient) StreamSleep(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) { +func (c *fakeServerClient) StreamSleep(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (FakeServer_StreamSleepClient, error) { stream, err := c.cc.NewStream(ctx, &_FakeServer_serviceDesc.Streams[0], "/server.FakeServer/StreamSleep", opts...) if err != nil { return nil, err @@ -304,7 +304,7 @@ func (c *fakeServerClient) StreamSleep(ctx context.Context, in *emptypb.Empty, o } type FakeServer_StreamSleepClient interface { - Recv() (*emptypb.Empty, error) + Recv() (*empty.Empty, error) grpc.ClientStream } @@ -312,15 +312,15 @@ type fakeServerStreamSleepClient struct { grpc.ClientStream } -func (x *fakeServerStreamSleepClient) Recv() (*emptypb.Empty, error) { - m := new(emptypb.Empty) +func (x *fakeServerStreamSleepClient) Recv() (*empty.Empty, error) { + m := new(empty.Empty) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } -func (c *fakeServerClient) ReturnProxyProtoCallerIP(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ProxyProtoIPResponse, error) { +func (c *fakeServerClient) ReturnProxyProtoCallerIP(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*ProxyProtoIPResponse, error) { out := new(ProxyProtoIPResponse) err := c.cc.Invoke(ctx, "/server.FakeServer/ReturnProxyProtoCallerIP", in, out, opts...) if err != nil { @@ -331,34 +331,34 @@ func (c *fakeServerClient) ReturnProxyProtoCallerIP(ctx context.Context, in *emp // FakeServerServer is the server API for FakeServer service. type FakeServerServer interface { - Succeed(context.Context, *emptypb.Empty) (*emptypb.Empty, error) - FailWithError(context.Context, *emptypb.Empty) (*emptypb.Empty, error) - FailWithHTTPError(context.Context, *FailWithHTTPErrorRequest) (*emptypb.Empty, error) - Sleep(context.Context, *emptypb.Empty) (*emptypb.Empty, error) - StreamSleep(*emptypb.Empty, FakeServer_StreamSleepServer) error - ReturnProxyProtoCallerIP(context.Context, *emptypb.Empty) (*ProxyProtoIPResponse, error) + Succeed(context.Context, *empty.Empty) (*empty.Empty, error) + FailWithError(context.Context, *empty.Empty) (*empty.Empty, error) + FailWithHTTPError(context.Context, *FailWithHTTPErrorRequest) (*empty.Empty, error) + Sleep(context.Context, *empty.Empty) (*empty.Empty, error) + StreamSleep(*empty.Empty, FakeServer_StreamSleepServer) error + ReturnProxyProtoCallerIP(context.Context, *empty.Empty) (*ProxyProtoIPResponse, error) } // UnimplementedFakeServerServer can be embedded to have forward compatible implementations. type UnimplementedFakeServerServer struct { } -func (*UnimplementedFakeServerServer) Succeed(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) { +func (*UnimplementedFakeServerServer) Succeed(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Succeed not implemented") } -func (*UnimplementedFakeServerServer) FailWithError(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) { +func (*UnimplementedFakeServerServer) FailWithError(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method FailWithError not implemented") } -func (*UnimplementedFakeServerServer) FailWithHTTPError(ctx context.Context, req *FailWithHTTPErrorRequest) (*emptypb.Empty, error) { +func (*UnimplementedFakeServerServer) FailWithHTTPError(ctx context.Context, req *FailWithHTTPErrorRequest) (*empty.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method FailWithHTTPError not implemented") } -func (*UnimplementedFakeServerServer) Sleep(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) { +func (*UnimplementedFakeServerServer) Sleep(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Sleep not implemented") } -func (*UnimplementedFakeServerServer) StreamSleep(req *emptypb.Empty, srv FakeServer_StreamSleepServer) error { +func (*UnimplementedFakeServerServer) StreamSleep(req *empty.Empty, srv FakeServer_StreamSleepServer) error { return status.Errorf(codes.Unimplemented, "method StreamSleep not implemented") } -func (*UnimplementedFakeServerServer) ReturnProxyProtoCallerIP(ctx context.Context, req *emptypb.Empty) (*ProxyProtoIPResponse, error) { +func (*UnimplementedFakeServerServer) ReturnProxyProtoCallerIP(ctx context.Context, req *empty.Empty) (*ProxyProtoIPResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ReturnProxyProtoCallerIP not implemented") } @@ -367,7 +367,7 @@ func RegisterFakeServerServer(s *grpc.Server, srv FakeServerServer) { } func _FakeServer_Succeed_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(emptypb.Empty) + in := new(empty.Empty) if err := dec(in); err != nil { return nil, err } @@ -379,13 +379,13 @@ func _FakeServer_Succeed_Handler(srv interface{}, ctx context.Context, dec func( FullMethod: "/server.FakeServer/Succeed", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).Succeed(ctx, req.(*emptypb.Empty)) + return srv.(FakeServerServer).Succeed(ctx, req.(*empty.Empty)) } return interceptor(ctx, in, info, handler) } func _FakeServer_FailWithError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(emptypb.Empty) + in := new(empty.Empty) if err := dec(in); err != nil { return nil, err } @@ -397,7 +397,7 @@ func _FakeServer_FailWithError_Handler(srv interface{}, ctx context.Context, dec FullMethod: "/server.FakeServer/FailWithError", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).FailWithError(ctx, req.(*emptypb.Empty)) + return srv.(FakeServerServer).FailWithError(ctx, req.(*empty.Empty)) } return interceptor(ctx, in, info, handler) } @@ -421,7 +421,7 @@ func _FakeServer_FailWithHTTPError_Handler(srv interface{}, ctx context.Context, } func _FakeServer_Sleep_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(emptypb.Empty) + in := new(empty.Empty) if err := dec(in); err != nil { return nil, err } @@ -433,13 +433,13 @@ func _FakeServer_Sleep_Handler(srv interface{}, ctx context.Context, dec func(in FullMethod: "/server.FakeServer/Sleep", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).Sleep(ctx, req.(*emptypb.Empty)) + return srv.(FakeServerServer).Sleep(ctx, req.(*empty.Empty)) } return interceptor(ctx, in, info, handler) } func _FakeServer_StreamSleep_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(emptypb.Empty) + m := new(empty.Empty) if err := stream.RecvMsg(m); err != nil { return err } @@ -447,7 +447,7 @@ func _FakeServer_StreamSleep_Handler(srv interface{}, stream grpc.ServerStream) } type FakeServer_StreamSleepServer interface { - Send(*emptypb.Empty) error + Send(*empty.Empty) error grpc.ServerStream } @@ -455,12 +455,12 @@ type fakeServerStreamSleepServer struct { grpc.ServerStream } -func (x *fakeServerStreamSleepServer) Send(m *emptypb.Empty) error { +func (x *fakeServerStreamSleepServer) Send(m *empty.Empty) error { return x.ServerStream.SendMsg(m) } func _FakeServer_ReturnProxyProtoCallerIP_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(emptypb.Empty) + in := new(empty.Empty) if err := dec(in); err != nil { return nil, err } @@ -472,7 +472,7 @@ func _FakeServer_ReturnProxyProtoCallerIP_Handler(srv interface{}, ctx context.C FullMethod: "/server.FakeServer/ReturnProxyProtoCallerIP", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FakeServerServer).ReturnProxyProtoCallerIP(ctx, req.(*emptypb.Empty)) + return srv.(FakeServerServer).ReturnProxyProtoCallerIP(ctx, req.(*empty.Empty)) } return interceptor(ctx, in, info, handler) } From 7880fdfd0a154deaf09ff16478d281faed18e2cb Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Wed, 23 Oct 2024 09:27:08 -0600 Subject: [PATCH 04/15] add delete test --- kv/memberlist/memberlist_client.go | 5 ++- kv/memberlist/memberlist_client_test.go | 52 ++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index b1d892daa..ed0c3d595 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -195,6 +195,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", cfg.AbortIfJoinFails, "If this node fails to join memberlist cluster, abort.") f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.") f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.") + f.DurationVar(&cfg.ObsoleteEntriesTimeout, prefix+"memberlist.obsolete-entries-timeout", mlDefaults.PushPullInterval, "How long to keep obsolete entries in the KV store.") f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 20*time.Second, "Timeout for leaving memberlist cluster.") f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", mlDefaults.GossipInterval, "How often to gossip.") f.IntVar(&cfg.GossipNodes, prefix+"memberlist.gossip-nodes", mlDefaults.GossipNodes, "How many nodes to gossip to.") @@ -324,7 +325,7 @@ type Message struct { Changes []string // List of changes in this message (as computed by *this* node). } -// ValueDesc stores the value along with it's codec and local version. +// ValueDesc stores the value along with its codec and local version. type ValueDesc struct { // We store the decoded value here to prevent decoding the entire state for every // update we receive. Whilst the updates are small and fast to decode, @@ -520,7 +521,7 @@ func (m *KV) running(ctx context.Context) error { tickerChan = t.C } - obsoleteEntriesTicker := time.NewTicker(m.cfg.PushPullInterval) + obsoleteEntriesTicker := time.NewTicker(m.cfg.ObsoleteEntriesTimeout) defer obsoleteEntriesTicker.Stop() logger := log.With(m.logger, "phase", "periodic_rejoin") diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 679ff07f0..d2b27aabe 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -564,7 +564,8 @@ func defaultKVConfig(i int) KVConfig { cfg.GossipInterval = 100 * time.Millisecond cfg.GossipNodes = 10 - cfg.PushPullInterval = 30 * time.Second + cfg.PushPullInterval = 5 * time.Second + cfg.ObsoleteEntriesTimeout = 5 * time.Second cfg.TCPTransport = TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), @@ -574,6 +575,55 @@ func defaultKVConfig(i int) KVConfig { return cfg } +func TestDelete(t *testing.T) { + c := dataCodec{} + + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: getLocalhostAddrs(), + BindPort: 0, // randomize ports + } + cfg.GossipNodes = 1 + cfg.GossipInterval = 100 * time.Millisecond + cfg.PushPullInterval = 1 * time.Second + cfg.ObsoleteEntriesTimeout = 1 * time.Second + cfg.ClusterLabelVerificationDisabled = true + cfg.Codecs = []codec.Codec{c} + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + kv, err := NewClient(mkv, c) + require.NoError(t, err) + + const key = "test" + + val := get(t, kv, key) + if val != nil { + t.Error("Expected nil, got:", val) + } + + err = cas(kv, key, updateFn("test")) + require.NoError(t, err) + + err = kv.Delete(context.Background(), key) + if err != nil { + t.Fatalf("Failed to delete key %s: %v", key, err) + } + t.Log("Deleted key", key) + + time.Sleep(1100 * time.Millisecond) // wait for obsolete entries to be removed + ctx := context.Background() + val, err = kv.Get(ctx, key) + + if val != nil { + t.Errorf("Expected nil, got: %v", val) + } + t.Log("Key", key, "is nil") +} + func TestMultipleClients(t *testing.T) { t.Parallel() From 9bad76ad22dd92ab31e3dcc79fc5a11b6957a9ac Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Fri, 25 Oct 2024 10:19:45 -0600 Subject: [PATCH 05/15] fix double lock, update tests --- kv/memberlist/memberlist_client.go | 15 ++++++++++----- kv/memberlist/memberlist_client_test.go | 17 ++++------------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index ed0c3d595..571d18b34 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -250,7 +250,7 @@ type KV struct { gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes // KV Store. - storeMu sync.Mutex + storeMu sync.RWMutex store map[string]ValueDesc // Codec registry @@ -521,8 +521,13 @@ func (m *KV) running(ctx context.Context) error { tickerChan = t.C } - obsoleteEntriesTicker := time.NewTicker(m.cfg.ObsoleteEntriesTimeout) - defer obsoleteEntriesTicker.Stop() + var obsoleteEntriesTickerChan <-chan time.Time + if m.cfg.ObsoleteEntriesTimeout > 0 { + obsoleteEntriesTicker := time.NewTicker(m.cfg.ObsoleteEntriesTimeout) + defer obsoleteEntriesTicker.Stop() + + obsoleteEntriesTickerChan = obsoleteEntriesTicker.C + } logger := log.With(m.logger, "phase", "periodic_rejoin") for { @@ -537,7 +542,7 @@ func (m *KV) running(ctx context.Context) error { level.Warn(logger).Log("msg", "re-joining memberlist cluster failed", "err", err, "next_try_in", m.cfg.RejoinInterval) } - case <-obsoleteEntriesTicker.C: + case <-obsoleteEntriesTickerChan: // cleanupObsoleteEntries is normally called during push/pull, but if there are no other // nodes to push/pull with, we can call it periodically to make sure we remove unused entries from memory. m.cleanupObsoleteEntries() @@ -1639,7 +1644,7 @@ func (m *KV) deleteSentReceivedMessages() { func (m *KV) cleanupObsoleteEntries() { m.storeMu.Lock() - defer m.storeMu.Lock() + defer m.storeMu.Unlock() for k, v := range m.store { if v.Deleted && time.Since(v.UpdateTime) > m.cfg.ObsoleteEntriesTimeout { diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index d2b27aabe..5aaec9431 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -576,6 +576,8 @@ func defaultKVConfig(i int) KVConfig { } func TestDelete(t *testing.T) { + t.Parallel() + c := dataCodec{} var cfg KVConfig @@ -586,7 +588,6 @@ func TestDelete(t *testing.T) { } cfg.GossipNodes = 1 cfg.GossipInterval = 100 * time.Millisecond - cfg.PushPullInterval = 1 * time.Second cfg.ObsoleteEntriesTimeout = 1 * time.Second cfg.ClusterLabelVerificationDisabled = true cfg.Codecs = []codec.Codec{c} @@ -612,16 +613,13 @@ func TestDelete(t *testing.T) { if err != nil { t.Fatalf("Failed to delete key %s: %v", key, err) } - t.Log("Deleted key", key) - time.Sleep(1100 * time.Millisecond) // wait for obsolete entries to be removed - ctx := context.Background() - val, err = kv.Get(ctx, key) + time.Sleep(2 * time.Second) // wait for obsolete entries to be removed + val = get(t, kv, key) if val != nil { t.Errorf("Expected nil, got: %v", val) } - t.Log("Key", key, "is nil") } func TestMultipleClients(t *testing.T) { @@ -1352,7 +1350,6 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) { TCPTransport: TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), }, - PushPullInterval: 30 * time.Second, } // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. cfg.RetransmitMult = 1 @@ -1422,7 +1419,6 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { TCPTransport: TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), }, - PushPullInterval: 30 * time.Second, } // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. cfg.RetransmitMult = 1 @@ -1569,7 +1565,6 @@ func TestDelegateMethodsDontCrashBeforeKVStarts(t *testing.T) { cfg.TCPTransport = TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), } - cfg.PushPullInterval = 30 * time.Second kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) @@ -1613,7 +1608,6 @@ func TestMetricsRegistration(t *testing.T) { cfg := KVConfig{} cfg.Codecs = append(cfg.Codecs, c) - cfg.PushPullInterval = 30 * time.Second reg := prometheus.NewPedanticRegistry() kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg) @@ -1723,7 +1717,6 @@ func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) { // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. cfg.RetransmitMult = 1 cfg.Codecs = append(cfg.Codecs, codec) - cfg.PushPullInterval = 30 * time.Second reg := prometheus.NewRegistry() kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg) @@ -1783,7 +1776,6 @@ func TestRaceBetweenStoringNewValueForKeyAndUpdatingIt(t *testing.T) { cfg.TCPTransport = TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), } - cfg.PushPullInterval = 30 * time.Second kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) @@ -1864,7 +1856,6 @@ func TestNotificationDelay(t *testing.T) { // We're going to trigger sends manually, so effectively disable the automatic send interval. const hundredYears = 100 * 365 * 24 * time.Hour cfg.NotifyInterval = hundredYears - cfg.PushPullInterval = 30 * time.Second kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) watchChan := make(chan string, 16) From 2f74f8301ca14c9e51aec6b19c2d4d10c123986b Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Fri, 25 Oct 2024 10:36:27 -0600 Subject: [PATCH 06/15] updated changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f9cb11448..7e21c690e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -236,6 +236,7 @@ * [ENHANCEMENT] Cache: Add `.Advance()` methods to mock cache clients for easier testing of TTLs. #601 * [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525 * [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592 +* [ENHANCEMENT] Memberlist: Implemented the `Delete` operation in the memberlist backed KV store. How frequently deleted entries are cleaned up is specified by the `-memberlist.obsolete-entries-timeout` flag. * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 From 19fe7121251000413d1c3fd917af52670bf7bba2 Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Fri, 25 Oct 2024 10:37:10 -0600 Subject: [PATCH 07/15] add PR number to changelog entry --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e21c690e..05f190c84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -236,7 +236,7 @@ * [ENHANCEMENT] Cache: Add `.Advance()` methods to mock cache clients for easier testing of TTLs. #601 * [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525 * [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592 -* [ENHANCEMENT] Memberlist: Implemented the `Delete` operation in the memberlist backed KV store. How frequently deleted entries are cleaned up is specified by the `-memberlist.obsolete-entries-timeout` flag. +* [ENHANCEMENT] Memberlist: Implemented the `Delete` operation in the memberlist backed KV store. How frequently deleted entries are cleaned up is specified by the `-memberlist.obsolete-entries-timeout` flag. #612 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 From c3ae993560752b5056e97401d622b892451a274e Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Tue, 14 Jan 2025 15:26:21 -0700 Subject: [PATCH 08/15] added new fields to message --- kv/memberlist/memberlist_client.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 571d18b34..d86bd7fb5 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -1280,9 +1280,11 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { Time: time.Now(), Size: update.messageSize, Pair: KeyValuePair{ - Key: key, - Value: update.value, - Codec: update.codec.CodecID(), + Key: key, + Value: update.value, + Codec: update.codec.CodecID(), + Deleted: update.deleted, + UpdateTimeMillis: update.updateTime.UnixMilli(), }, Version: version, Changes: changes, @@ -1596,6 +1598,7 @@ func (m *KV) storeCopy() map[string]ValueDesc { } return result } + func (m *KV) addReceivedMessage(msg Message) { if m.cfg.MessageHistoryBufferBytes == 0 { return From c776db1058e6c6fd4a15af2c6b07d95bcd8e9aad Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Tue, 14 Jan 2025 18:18:56 -0700 Subject: [PATCH 09/15] broadcast change from delete function --- kv/memberlist/memberlist_client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index d86bd7fb5..638af4f2e 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -1043,6 +1043,9 @@ func (m *KV) Delete(key string) { val.Deleted = true val.UpdateTime = time.Now() m.store[key] = val + + m.notifyWatchers(key) + m.broadcastNewValue(key, val.value, val.Version, m.GetCodec(val.CodecID), true, true, val.UpdateTime) } // CAS implements Compare-And-Set/Swap operation. From 2c78dbae7a97c6839b289a93b4066c27253a1dbe Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Tue, 14 Jan 2025 19:00:07 -0700 Subject: [PATCH 10/15] added some debug logging --- kv/memberlist/memberlist_client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 638af4f2e..d1d10b51b 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -545,6 +545,7 @@ func (m *KV) running(ctx context.Context) error { case <-obsoleteEntriesTickerChan: // cleanupObsoleteEntries is normally called during push/pull, but if there are no other // nodes to push/pull with, we can call it periodically to make sure we remove unused entries from memory. + level.Debug(m.logger).Log("msg", "initiating cleanup of obsolete entries") m.cleanupObsoleteEntries() case <-ctx.Done(): @@ -1044,6 +1045,7 @@ func (m *KV) Delete(key string) { val.UpdateTime = time.Now() m.store[key] = val + level.Debug(m.logger).Log("msg", "marked key for deletion", "key", key) m.notifyWatchers(key) m.broadcastNewValue(key, val.value, val.Version, m.GetCodec(val.CodecID), true, true, val.UpdateTime) } @@ -1554,6 +1556,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue newDeleted := curr.Deleted if !updateTime.IsZero() && updateTime.After(newUpdateTime) { + level.Debug(m.logger).Log("msg", "setting new update time and delete value", "key", key, "updateTime", updateTime, "deleted", deleted) newUpdateTime = updateTime newDeleted = deleted } @@ -1654,6 +1657,7 @@ func (m *KV) cleanupObsoleteEntries() { for k, v := range m.store { if v.Deleted && time.Since(v.UpdateTime) > m.cfg.ObsoleteEntriesTimeout { + level.Debug(m.logger).Log("msg", "deleting entry from KV store", "key", k) delete(m.store, k) } } From cad126f6a3ebc12e693b3c0265f5a989a1fc6ef2 Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Tue, 14 Jan 2025 19:28:47 -0700 Subject: [PATCH 11/15] merge value in delete function --- kv/memberlist/memberlist_client.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index d1d10b51b..78d476080 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -1034,7 +1034,6 @@ func (m *KV) notifyWatchersSync(key string) { func (m *KV) Delete(key string) { m.storeMu.Lock() - defer m.storeMu.Unlock() val, ok := m.store[key] if !ok || val.Deleted { @@ -1044,10 +1043,24 @@ func (m *KV) Delete(key string) { val.Deleted = true val.UpdateTime = time.Now() m.store[key] = val + m.storeMu.Unlock() + + c := m.GetCodec(val.CodecID) + if c == nil { + level.Warn(m.logger).Log("msg", "invalid codec", "codec_id", val.CodecID) + } - level.Debug(m.logger).Log("msg", "marked key for deletion", "key", key) - m.notifyWatchers(key) - m.broadcastNewValue(key, val.value, val.Version, m.GetCodec(val.CodecID), true, true, val.UpdateTime) + change, newver, err := m.mergeValueForKey(key, nil, false, 0, c, true, val.UpdateTime) + if err != nil { + level.Warn(m.logger).Log("msg", "failed to delete key", "key", key, "err", err) + return + } + + if newver > 0 { + level.Debug(m.logger).Log("msg", "broadcasting key deletion", "key", key, "version", newver) + m.notifyWatchers(key) + m.broadcastNewValue(key, change, newver, c, false, true, val.UpdateTime) + } } // CAS implements Compare-And-Set/Swap operation. From 1580b58772cd9c98d257cba73017edc12e1af90d Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Wed, 15 Jan 2025 10:21:39 -0700 Subject: [PATCH 12/15] fixes for delete op --- kv/memberlist/memberlist_client.go | 114 +++++++++++++---------------- 1 file changed, 52 insertions(+), 62 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 78d476080..83542b0e8 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -78,8 +78,7 @@ func (c *Client) Delete(ctx context.Context, key string) error { return err } - c.kv.Delete(key) - return nil + return c.kv.Delete(key) } // CAS is part of kv.Client interface @@ -1032,12 +1031,12 @@ func (m *KV) notifyWatchersSync(key string) { } } -func (m *KV) Delete(key string) { +func (m *KV) Delete(key string) error { m.storeMu.Lock() val, ok := m.store[key] if !ok || val.Deleted { - return + return nil } val.Deleted = true @@ -1047,20 +1046,20 @@ func (m *KV) Delete(key string) { c := m.GetCodec(val.CodecID) if c == nil { - level.Warn(m.logger).Log("msg", "invalid codec", "codec_id", val.CodecID) + return fmt.Errorf("invalid codec: %s", val.CodecID) } - change, newver, err := m.mergeValueForKey(key, nil, false, 0, c, true, val.UpdateTime) + change, newver, deleted, updated, err := m.mergeValueForKey(key, nil, false, 0, val.CodecID, true, time.Now()) if err != nil { - level.Warn(m.logger).Log("msg", "failed to delete key", "key", key, "err", err) - return + return err } if newver > 0 { - level.Debug(m.logger).Log("msg", "broadcasting key deletion", "key", key, "version", newver) m.notifyWatchers(key) - m.broadcastNewValue(key, change, newver, c, false, true, val.UpdateTime) + m.broadcastNewValue(key, change, newver, c, false, deleted, updated) } + + return nil } // CAS implements Compare-And-Set/Swap operation. @@ -1093,7 +1092,7 @@ outer: } } - change, newver, retry, updateTime, err := m.trySingleCas(key, codec, f) + change, newver, retry, deleted, updated, err := m.trySingleCas(key, codec, f) if err != nil { level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry) @@ -1108,13 +1107,13 @@ outer: m.casSuccesses.Inc() m.notifyWatchers(key) - m.broadcastNewValue(key, change, newver, codec, true, false, updateTime) + m.broadcastNewValue(key, change, newver, codec, true, deleted, updated) } return nil } - if lastError == errVersionMismatch { + if errors.Is(lastError, errVersionMismatch) { // this is more likely error than version mismatch. lastError = errTooManyRetries } @@ -1125,49 +1124,47 @@ outer: // returns change, error (or nil, if CAS succeeded), and whether to retry or not. // returns errNoChangeDetected if merge failed to detect change in f's output. -func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, time.Time, error) { +func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, bool, time.Time, error) { val, ver, err := m.get(key, codec) if err != nil { - return nil, 0, false, time.Time{}, fmt.Errorf("failed to get value: %v", err) + return nil, 0, false, false, time.Time{}, fmt.Errorf("failed to get value: %v", err) } out, retry, err := f(val) if err != nil { - return nil, 0, retry, time.Time{}, fmt.Errorf("fn returned error: %v", err) + return nil, 0, retry, false, time.Time{}, fmt.Errorf("fn returned error: %v", err) } if out == nil { // no change to be done - return nil, 0, false, time.Time{}, nil + return nil, 0, false, false, time.Time{}, nil } // Don't even try incomingValue, ok := out.(Mergeable) if !ok || incomingValue == nil { - return nil, 0, retry, time.Time{}, fmt.Errorf("invalid type: %T, expected Mergeable", out) + return nil, 0, retry, false, time.Time{}, fmt.Errorf("invalid type: %T, expected Mergeable", out) } // To support detection of removed items from value, we will only allow CAS operation to // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'. // Supplied function may have kept a reference to the returned "incoming value". // If KV store will keep this value as well, it needs to make a clone. - ut := time.Now() - - change, newver, err := m.mergeValueForKey(key, incomingValue, true, ver, codec, false, ut) + change, newver, deleted, updated, err := m.mergeValueForKey(key, incomingValue, true, ver, codec.CodecID(), false, time.Now()) if err == errVersionMismatch { - return nil, 0, retry, time.Time{}, err + return nil, 0, retry, false, time.Time{}, err } if err != nil { - return nil, 0, retry, time.Time{}, fmt.Errorf("merge failed: %v", err) + return nil, 0, retry, false, time.Time{}, fmt.Errorf("merge failed: %v", err) } if newver == 0 { // CAS method reacts on this error - return nil, 0, retry, time.Time{}, errNoChangeDetected + return nil, 0, retry, deleted, updated, errNoChangeDetected } - return change, newver, retry, ut, nil + return change, newver, retry, deleted, updated, nil } func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time) { @@ -1287,7 +1284,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { select { case update := <-workerCh: // we have a value update! Let's merge it with our current version for given key - mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime) + mod, version, deleted, updated, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime) changes := []string(nil) if mod != nil { @@ -1298,11 +1295,9 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { Time: time.Now(), Size: update.messageSize, Pair: KeyValuePair{ - Key: key, - Value: update.value, - Codec: update.codec.CodecID(), - Deleted: update.deleted, - UpdateTimeMillis: update.updateTime.UnixMilli(), + Key: key, + Value: update.value, + Codec: update.codec.CodecID(), }, Version: version, Changes: changes, @@ -1313,8 +1308,8 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { } else if version > 0 { m.notifyWatchers(key) - // Don't resend original message, but only changes. - m.broadcastNewValue(key, mod, version, update.codec, false, update.deleted, update.updateTime) + // Don't resend original message, but only changes, if any. + m.broadcastNewValue(key, mod, version, update.codec, false, deleted, updated) } case <-m.shutdown: @@ -1471,13 +1466,8 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { continue } - updateTime := updateTime(kvPair.UpdateTimeMillis) - if updateTime.IsZero() { - updateTime = time.Now() - } - // we have both key and value, try to merge it with our state - change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, updateTime) + change, newver, deleted, updated, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, updateTime(kvPair.UpdateTimeMillis)) changes := []string(nil) if change != nil { @@ -1496,7 +1486,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) } else if newver > 0 { m.notifyWatchers(kvPair.Key) - m.broadcastNewValue(kvPair.Key, change, newver, codec, false, kvPair.Deleted, updateTime) + m.broadcastNewValue(kvPair.Key, change, newver, codec, false, deleted, updated) } } @@ -1505,26 +1495,26 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { } } -func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { +func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, bool, time.Time, error) { decodedValue, err := codec.Decode(incomingData) if err != nil { - return nil, 0, fmt.Errorf("failed to decode value: %v", err) + return nil, 0, false, time.Time{}, fmt.Errorf("failed to decode value: %v", err) } incomingValue, ok := decodedValue.(Mergeable) if !ok { - return nil, 0, fmt.Errorf("expected Mergeable, got: %T", decodedValue) + return nil, 0, false, time.Time{}, fmt.Errorf("expected Mergeable, got: %T", decodedValue) } // No need to clone this "incomingValue", since we have just decoded it from bytes, and won't be using it. - return m.mergeValueForKey(key, incomingValue, false, 0, codec, deleted, updateTime) + return m.mergeValueForKey(key, incomingValue, false, 0, codec.CodecID(), deleted, updateTime) } // Merges incoming value with value we have in our store. Returns "a change" that can be sent to other // cluster members to update their state, and new version of the value. // If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. // If no modification occurred, new version is 0. -func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { +func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time) (change Mergeable, newVersion uint, newDeleted bool, newUpdated time.Time, err error) { m.storeMu.Lock() defer m.storeMu.Unlock() @@ -1534,16 +1524,25 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue curr := m.store[key] // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. if casVersion > 0 && curr.Version != casVersion { - return nil, 0, errVersionMismatch + return nil, 0, false, time.Time{}, errVersionMismatch } result, change, err := computeNewValue(incomingValue, incomingValueRequiresClone, curr.value, casVersion > 0) if err != nil { - return nil, 0, err + return nil, 0, false, time.Time{}, err + } + + newVersion = curr.Version + 1 + newUpdated = curr.UpdateTime + newDeleted = curr.Deleted + + if !updateTime.IsZero() && updateTime.After(newUpdated) { + newUpdated = updateTime + newDeleted = deleted } // No change, don't store it. - if change == nil || len(change.MergeContent()) == 0 { - return nil, 0, nil + if (change == nil || len(change.MergeContent()) == 0) && curr.Deleted == newDeleted { + return nil, 0, curr.Deleted, curr.UpdateTime, nil } if m.cfg.LeftIngestersTimeout > 0 { @@ -1560,32 +1559,23 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue // RemoveTombstones twice with same limit should be noop. change.RemoveTombstones(limit) if len(change.MergeContent()) == 0 { - return nil, 0, nil + return nil, 0, curr.Deleted, curr.UpdateTime, nil } } - newVersion := curr.Version + 1 - newUpdateTime := curr.UpdateTime - newDeleted := curr.Deleted - - if !updateTime.IsZero() && updateTime.After(newUpdateTime) { - level.Debug(m.logger).Log("msg", "setting new update time and delete value", "key", key, "updateTime", updateTime, "deleted", deleted) - newUpdateTime = updateTime - newDeleted = deleted - } m.store[key] = ValueDesc{ value: result, Version: newVersion, - CodecID: codec.CodecID(), + CodecID: codecID, Deleted: newDeleted, - UpdateTime: newUpdateTime, + UpdateTime: newUpdated, } // The "changes" returned by Merge() can contain references to the "result" // state. Therefore, make sure we clone it before releasing the lock. change = change.Clone() - return change, newVersion, nil + return change, newVersion, newDeleted, newUpdated, nil } // returns [result, change, error] From 318cf911785f6409efd6d731079c8434f4f7a0c4 Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Wed, 15 Jan 2025 10:57:59 -0700 Subject: [PATCH 13/15] fixe for handling nil change --- kv/memberlist/memberlist_client.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 83542b0e8..41c267574 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -1557,9 +1557,11 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue // Note that "result" and "change" may actually be the same Mergeable. That is why we // call RemoveTombstones on "result" first, so that we get the correct metrics. Calling // RemoveTombstones twice with same limit should be noop. - change.RemoveTombstones(limit) - if len(change.MergeContent()) == 0 { - return nil, 0, curr.Deleted, curr.UpdateTime, nil + if change != nil { + change.RemoveTombstones(limit) + if len(change.MergeContent()) == 0 { + return nil, 0, curr.Deleted, curr.UpdateTime, nil + } } } @@ -1573,8 +1575,9 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue // The "changes" returned by Merge() can contain references to the "result" // state. Therefore, make sure we clone it before releasing the lock. - change = change.Clone() - + if change != nil { + change = change.Clone() + } return change, newVersion, newDeleted, newUpdated, nil } From fda64eab28bc336f4fe867da42cf25dc7ab12521 Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Wed, 15 Jan 2025 15:39:45 -0700 Subject: [PATCH 14/15] handle nil changes --- kv/memberlist/memberlist_client.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 41c267574..dec174974 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -1039,8 +1039,6 @@ func (m *KV) Delete(key string) error { return nil } - val.Deleted = true - val.UpdateTime = time.Now() m.store[key] = val m.storeMu.Unlock() @@ -1049,7 +1047,7 @@ func (m *KV) Delete(key string) error { return fmt.Errorf("invalid codec: %s", val.CodecID) } - change, newver, deleted, updated, err := m.mergeValueForKey(key, nil, false, 0, val.CodecID, true, time.Now()) + change, newver, deleted, updated, err := m.mergeValueForKey(key, val.value, false, 0, val.CodecID, true, time.Now()) if err != nil { return err } @@ -1173,7 +1171,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } - data, err := codec.Encode(change) + data, err := handlePossibleNilEncode(codec, change) if err != nil { level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err) m.numberOfBroadcastMessagesDropped.Inc() @@ -1188,18 +1186,19 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } + mergedChanges := handlePossibleNilMergeContent(change) m.addSentMessage(Message{ Time: time.Now(), Size: len(pairData), Pair: kvPair, Version: version, - Changes: change.MergeContent(), + Changes: mergedChanges, }) l := len(pairData) b := ringBroadcast{ key: key, - content: change.MergeContent(), + content: mergedChanges, version: version, msg: pairData, finished: func(ringBroadcast) { @@ -1694,3 +1693,19 @@ func updateTimeMillis(ts time.Time) int64 { } return ts.UnixMilli() } + +func handlePossibleNilEncode(codec codec.Codec, change Mergeable) ([]byte, error) { + if change == nil { + return []byte{}, nil + } + + return codec.Encode(change) +} + +func handlePossibleNilMergeContent(change Mergeable) []string { + if change == nil { + return []string{} + } + + return change.MergeContent() +} From 78c84ee50267ce7af2d06c526692a6b1279dc145 Mon Sep 17 00:00:00 2001 From: Vernon Miller Date: Thu, 16 Jan 2025 14:44:59 -0700 Subject: [PATCH 15/15] addressed feedback --- kv/memberlist/memberlist_client.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index dec174974..fafa5f6e7 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -161,7 +161,7 @@ type KVConfig struct { // Remove LEFT ingesters from ring after this timeout. LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` - ObsoleteEntriesTimeout time.Duration `yaml:"obsolete_entries_timeout" category:"advanced"` + ObsoleteEntriesTimeout time.Duration `yaml:"obsolete_entries_timeout" category:"experimental"` // Timeout used when leaving the memberlist cluster. LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` @@ -338,7 +338,10 @@ type ValueDesc struct { // ID of codec used to write this value. Only used when sending full state. CodecID string - Deleted bool + // Deleted is used to mark the value as deleted. The value is removed from the KV store after `ObsoleteEntriesTimeout`. + Deleted bool + + // UpdateTime keeps track of the last time the value was updated. UpdateTime time.Time } @@ -1039,7 +1042,6 @@ func (m *KV) Delete(key string) error { return nil } - m.store[key] = val m.storeMu.Unlock() c := m.GetCodec(val.CodecID)