Skip to content

Commit

Permalink
passing build with ha_tracker now generated by csproto
Browse files Browse the repository at this point in the history
  • Loading branch information
francoposa committed Jan 19, 2025
1 parent 8f192ee commit 30bad7a
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 533 deletions.
18 changes: 17 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ images: ## Print all image names.
@echo > /dev/null

# Generating proto code is automated.
PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print)
PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print | grep -v 'pkg/distributor/ha_tracker.proto')
PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS))

# Generating OTLP translation code is automated.
Expand Down Expand Up @@ -292,6 +292,22 @@ protos: $(PROTO_GOS)

GENERATE_FILES ?= true

PROTO_DEFS_CSPROTO := ./pkg/distributor/ha_tracker.proto
PROTO_GOS_CSPROTO := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS_CSPROTO))

# this doesn't work I don't know why yet
#.PHONY: protos-csproto
#protos-csproto: $(PROTO_GOS_CSPROTO)
# @for name in $(PROTO_DEFS_CSPROTO); do \
# protoc \
# -I . \
# -I ./pkg/distributor/ \
# --go_out=paths=source_relative:. \
# --fastmarshal_out=apiversion=v2,paths=source_relative:. \
# --go-grpc_out=require_unimplemented_servers=false,paths=source_relative:. \
# $${name}; \
# done

%.pb.go: %.proto
ifeq ($(GENERATE_FILES),true)
protoc -I $(GOPATH)/src:./vendor/github.com/gogo/protobuf:./vendor:./$(@D):./pkg/storegateway/storepb --gogoslick_out=plugins=grpc,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5477,7 +5477,7 @@ func preparePartitionsRing(cfg prepConfig, ingesters []*mockIngester) *ring.Part

// Add all ingesters are partition owners.
for _, ingester := range ingesters {
desc.AddOrUpdateOwner(ingester.instanceID(), ring.OwnerActive, ingester.partitionID(), timeBeforeShuffleShardingLookbackPeriod)
desc.AddOrUpdateOwner(ingester.instanceID(), ring.OwnerState_OwnerActive, ingester.partitionID(), timeBeforeShuffleShardingLookbackPeriod)
}

return desc
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/timestamp"
"google.golang.org/protobuf/proto"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
Expand Down
176 changes: 176 additions & 0 deletions pkg/distributor/ha_tracker.pb.fm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// GENERATED CODE - DO NOT EDIT
// This file was generated by protoc-gen-fastmarshal

package distributor

import (
"fmt"
"sync/atomic"
"github.com/CrowdStrike/csproto"
)

//------------------------------------------------------------------------------
// Custom Protobuf size/marshal/unmarshal code for ReplicaDesc

// Size calculates and returns the size, in bytes, required to hold the contents of m using the Protobuf
// binary encoding.
func (m *ReplicaDesc) Size() int {
// nil message is always 0 bytes
if m == nil {
return 0
}
// return cached size, if present
if csz := int(atomic.LoadInt32(&m.sizeCache)); csz > 0 {
return csz
}
// calculate and cache
var sz, l int
_ = l // avoid unused variable

// Replica (string,optional)
if l = len(m.Replica); l > 0 {
sz += csproto.SizeOfTagKey(1) + csproto.SizeOfVarint(uint64(l)) + l
}
// ReceivedAt (int64,optional)
if m.ReceivedAt != 0 {
sz += csproto.SizeOfTagKey(2) + csproto.SizeOfVarint(uint64(m.ReceivedAt))
}
// DeletedAt (int64,optional)
if m.DeletedAt != 0 {
sz += csproto.SizeOfTagKey(3) + csproto.SizeOfVarint(uint64(m.DeletedAt))
}
// ElectedAt (int64,optional)
if m.ElectedAt != 0 {
sz += csproto.SizeOfTagKey(4) + csproto.SizeOfVarint(uint64(m.ElectedAt))
}
// ElectedChanges (int64,optional)
if m.ElectedChanges != 0 {
sz += csproto.SizeOfTagKey(5) + csproto.SizeOfVarint(uint64(m.ElectedChanges))
}
// cache the size so it can be re-used in Marshal()/MarshalTo()
atomic.StoreInt32(&m.sizeCache, int32(sz))
return sz
}

// Marshal converts the contents of m to the Protobuf binary encoding and returns the result or an error.
func (m *ReplicaDesc) Marshal() ([]byte, error) {
siz := m.Size()
if siz == 0 {
return []byte{}, nil
}
buf := make([]byte, siz)
err := m.MarshalTo(buf)
return buf, err
}

// MarshalTo converts the contents of m to the Protobuf binary encoding and writes the result to dest.
func (m *ReplicaDesc) MarshalTo(dest []byte) error {
// nil message == no-op
if m == nil {
return nil
}
var (
enc = csproto.NewEncoder(dest)
buf []byte
err error
extVal interface{}
)
// ensure no unused variables
_ = enc
_ = buf
_ = err
_ = extVal

// Replica (1,string,optional)
if len(m.Replica) > 0 {
enc.EncodeString(1, m.Replica)
}
// ReceivedAt (2,int64,optional)
if m.ReceivedAt != 0 {
enc.EncodeInt64(2, m.ReceivedAt)
}
// DeletedAt (3,int64,optional)
if m.DeletedAt != 0 {
enc.EncodeInt64(3, m.DeletedAt)
}
// ElectedAt (4,int64,optional)
if m.ElectedAt != 0 {
enc.EncodeInt64(4, m.ElectedAt)
}
// ElectedChanges (5,int64,optional)
if m.ElectedChanges != 0 {
enc.EncodeInt64(5, m.ElectedChanges)
}
return nil
}

// Unmarshal decodes a binary encoded Protobuf message from p and populates m with the result.
func (m *ReplicaDesc) Unmarshal(p []byte) error {
m.Reset()
if len(p) == 0 {
return nil
}
dec := csproto.NewDecoder(p)
for dec.More() {
tag, wt, err := dec.DecodeTag()
if err != nil {
return err
}
switch tag {
case 1: // Replica (string,optional)
if wt != csproto.WireTypeLengthDelimited {
return fmt.Errorf("incorrect wire type %v for field 'replica' (tag=1), expected 2 (length-delimited)", wt)
}
if s, err := dec.DecodeString(); err != nil {
return fmt.Errorf("unable to decode string value for field 'replica' (tag=1): %w", err)
} else {
m.Replica = s
}

case 2: // ReceivedAt (int64,optional)
if wt != csproto.WireTypeVarint {
return fmt.Errorf("incorrect wire type %v for tag field 'received_at' (tag=2), expected 0 (varint)", wt)
}
if v, err := dec.DecodeInt64(); err != nil {
return fmt.Errorf("unable to decode int64 value for field 'received_at' (tag=2): %w", err)
} else {
m.ReceivedAt = v
}
case 3: // DeletedAt (int64,optional)
if wt != csproto.WireTypeVarint {
return fmt.Errorf("incorrect wire type %v for tag field 'deleted_at' (tag=3), expected 0 (varint)", wt)
}
if v, err := dec.DecodeInt64(); err != nil {
return fmt.Errorf("unable to decode int64 value for field 'deleted_at' (tag=3): %w", err)
} else {
m.DeletedAt = v
}
case 4: // ElectedAt (int64,optional)
if wt != csproto.WireTypeVarint {
return fmt.Errorf("incorrect wire type %v for tag field 'elected_at' (tag=4), expected 0 (varint)", wt)
}
if v, err := dec.DecodeInt64(); err != nil {
return fmt.Errorf("unable to decode int64 value for field 'elected_at' (tag=4): %w", err)
} else {
m.ElectedAt = v
}
case 5: // ElectedChanges (int64,optional)
if wt != csproto.WireTypeVarint {
return fmt.Errorf("incorrect wire type %v for tag field 'elected_changes' (tag=5), expected 0 (varint)", wt)
}
if v, err := dec.DecodeInt64(); err != nil {
return fmt.Errorf("unable to decode int64 value for field 'elected_changes' (tag=5): %w", err)
} else {
m.ElectedChanges = v
}

default:
if skipped, err := dec.Skip(tag, wt); err != nil {
return fmt.Errorf("invalid operation skipping tag %v: %w", tag, err)
} else {
m.unknownFields = append(m.unknownFields, skipped...)
}
}
}
return nil
}
Loading

0 comments on commit 30bad7a

Please sign in to comment.