Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ring: add GetWithOptions method to adjust per call behavior #632

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import (
)

type ReplicationStrategy interface {
// Filter out unhealthy instances and checks if there're enough instances
// Filter out unhealthy instances and checks if there are enough instances
// for an operation to succeed. Returns an error if there are not enough
// instances.
Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error)

// SupportsExpandedReplication returns true for replication strategies that
// support increasing the replication factor beyond a single instance per zone,
// false otherwise.
SupportsExpandedReplication() bool
Comment on lines +14 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know if this won't break clients with their own ReplicationStrategy implementations? I'm wondering if we shouldn't add a new interface and check with type assertions in getReplicationSetForKey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will break existing ReplicationStrategy implementations but it's an obvious and simple fix. We could add a new type but I'm not sure what that gets us besides not requiring changes from people that have their own ReplicationStrategy implementations.

}

type defaultReplicationStrategy struct{}
Expand Down Expand Up @@ -70,6 +75,14 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati
return instances, len(instances) - minSuccess, nil
}

func (s *defaultReplicationStrategy) SupportsExpandedReplication() bool {
// defaultReplicationStrategy assumes that a single instance per zone is returned and that
// it can treat replication factor as equivalent to the number of zones. This doesn't work
// when a per-call replication factor increases it beyond the configured replication factor
// and the number of zones.
return false
}

type ignoreUnhealthyInstancesReplicationStrategy struct{}

func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy {
Expand Down Expand Up @@ -101,6 +114,10 @@ func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []Instanc
return instances, len(instances) - 1, nil
}

func (r *ignoreUnhealthyInstancesReplicationStrategy) SupportsExpandedReplication() bool {
return true
}

func (r *Ring) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool {
return instance.IsHealthy(op, r.cfg.HeartbeatTimeout, now)
}
Expand Down
149 changes: 102 additions & 47 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,41 @@ const (
GetBufferSize = 5
)

// Options are the result of Option instances that can be used to modify Ring.GetWithOptions behavior.
type Options struct {
ReplicationFactor int
BufDescs []InstanceDesc
BufHosts []string
BufZones []string
}

// Option can be used to modify Ring behavior when calling Ring.GetWithOptions
type Option func(opts *Options)

// WithBuffers creates an Option that will cause the given buffers to be used, avoiding allocations.
func WithBuffers(bufDescs []InstanceDesc, bufHosts, bufZones []string) Option {
return func(opts *Options) {
opts.BufDescs = bufDescs
opts.BufHosts = bufHosts
opts.BufZones = bufZones
}
}

// WithReplicationFactor creates an Option that overrides the default replication factor for a single call.
func WithReplicationFactor(replication int) Option {
return func(opts *Options) {
opts.ReplicationFactor = replication
}
}

func collectOptions(opts ...Option) Options {
final := Options{}
for _, opt := range opts {
opt(&final)
}
return final
}

// ReadRing represents the read interface to the ring.
// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet.
type ReadRing interface {
Expand All @@ -42,13 +77,17 @@ type ReadRing interface {
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error)

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more Option instances to change the behavior of the method call.
GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error)

// GetAllHealthy returns all healthy instances in the ring, for the given operation.
// This function doesn't check if the quorum is honored, so doesn't fail if the number
// of unhealthy instances is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)

// GetReplicationSetForOperation returns all instances where the input operation should be executed.
// The resulting ReplicationSet doesn't necessarily contains all healthy instances
// The resulting ReplicationSet doesn't necessarily contain all healthy instances
// in the ring, but could contain the minimum set of instances required to execute
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
Expand Down Expand Up @@ -421,19 +460,44 @@ func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegiste
}

// Get returns n (or more) instances which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, _ []string) (ReplicationSet, error) {
// Note that we purposefully aren't calling GetWithOptions here since the closures it
// uses result in heap allocations which we specifically avoid in this method since it's
// called in hot loops.
return r.getReplicationSetForKey(key, op, bufDescs, bufHosts, r.cfg.ReplicationFactor)
}

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more options to change the behavior of the method call.
func (r *Ring) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) {
options := collectOptions(opts...)
return r.getReplicationSetForKey(key, op, options.BufDescs, options.BufHosts, options.ReplicationFactor)
}

func (r *Ring) getReplicationSetForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}

instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil)
if replicationFactor <= 0 || replicationFactor < r.cfg.ReplicationFactor {
replicationFactor = r.cfg.ReplicationFactor
}

// Not all replication strategies support increasing the replication factor beyond
// the number of zones available. Return an error unless a ReplicationStrategy has
// explicitly opted into supporting this.
if replicationFactor > r.cfg.ReplicationFactor && !r.strategy.SupportsExpandedReplication() {
return ReplicationSet{}, fmt.Errorf("per-call replication factor %d cannot exceed the configured replication factor %d with this replication strategy", replicationFactor, r.cfg.ReplicationFactor)
}

instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, replicationFactor, nil)
if err != nil {
return ReplicationSet{}, err
}

healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, replicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
if err != nil {
return ReplicationSet{}, err
}
Expand All @@ -447,21 +511,31 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
// Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy.
// InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early.
// This function needs to be called with read lock on the ring.
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
var (
n = r.cfg.ReplicationFactor
n = replicationFactor
instances = bufDescs[:0]
start = searchToken(r.ringTokens, key)
iterations = 0
maxZones = len(r.ringTokensByZone)
maxInstances = len(r.ringDesc.Ingesters)

// We use a slice instead of a map because it's faster to search within a
// slice than lookup a map for a very low number of items.
// slice than lookup a map for a very low number of items, we only expect
// to have low single-digit number of hosts.
distinctHosts = bufHosts[:0]
distinctZones = bufZones[:0]

// TODO: Do we need to pass this in to avoid allocations?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to replace passing bufDescs, bufHosts, bufZones around everywhere with a single struct Buffers which could include this map but that was a pretty invasive and non-backwards compatible change (not that we have any guarantees in dskit).

hostsPerZone = make(map[string]int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. Can't we still use a slice with a struct? Have you ran BenchmarkRing_Get (maybe also adding a test case there with a different RF)?

targetHostsPerZone = max(1, replicationFactor/maxZones)
)
for i := start; len(distinctHosts) < min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ {

for i := start; len(distinctHosts) < min(maxInstances, n) && iterations < len(r.ringTokens); i++ {
// If we have the target number of instances in all zones, stop looking.
if r.cfg.ZoneAwarenessEnabled && haveTargetHostsInAllZones(hostsPerZone, targetHostsPerZone, maxZones) {
break
}

iterations++
// Wrap i around in the ring.
i %= len(r.ringTokens)
Expand All @@ -478,9 +552,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
continue
}

// Ignore if the instances don't have a zone set.
// If we already have the required number of instances for this zone, skip.
if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
if slices.Contains(distinctZones, info.Zone) {
if hostsPerZone[info.Zone] >= targetHostsPerZone {
continue
}
}
Expand All @@ -493,9 +567,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
if op.ShouldExtendReplicaSetOnState(instance.State) {
n++
} else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
// We should only add the zone if we are not going to extend,
// as we want to extend the instance in the same AZ.
distinctZones = append(distinctZones, info.Zone)
// We should only increment the count for this zone if we are not going to
// extend, as we want to extend the instance in the same AZ.
hostsPerZone[info.Zone]++
}

include, keepGoing := true, true
Expand All @@ -512,6 +586,20 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
return instances, nil
}

func haveTargetHostsInAllZones(hostsByZone map[string]int, targetHostsPerZone int, maxZones int) bool {
if len(hostsByZone) != maxZones {
return false
}

for _, count := range hostsByZone {
if count < targetHostsPerZone {
return false
}
}

return true
}

// GetAllHealthy implements ReadRing.
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
r.mtx.RLock()
Expand Down Expand Up @@ -1332,36 +1420,3 @@ func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool {

// All states are healthy, no states extend replica set.
var allStatesRingOperation = Operation(0x0000ffff)

// numberOfKeysOwnedByInstance returns how many of the supplied keys are owned by given instance.
func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instanceID string, bufDescs []InstanceDesc, bufHosts []string, bufZones []string) (int, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only called from tests. I've moved most of this logic to the test in question (token_range_test.go).

r.mtx.RLock()
defer r.mtx.RUnlock()

if r.ringDesc == nil || len(r.ringTokens) == 0 {
return 0, ErrEmptyRing
}

// Instance is not in this ring, it can't own any key.
if _, ok := r.ringDesc.Ingesters[instanceID]; !ok {
return 0, nil
}

owned := 0
for _, tok := range keys {
i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) {
if foundInstanceID == instanceID {
// If we've found our instance, we can stop.
return true, false
}
return false, true
})
if err != nil {
return 0, err
}
if len(i) > 0 {
owned++
}
}
return owned, nil
}
Loading
Loading