-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ring: add GetWithOptions method to adjust per call behavior #632
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,41 @@ const ( | |
GetBufferSize = 5 | ||
) | ||
|
||
// Options are the result of Option instances that can be used to modify Ring.GetWithOptions behavior. | ||
type Options struct { | ||
ReplicationFactor int | ||
BufDescs []InstanceDesc | ||
BufHosts []string | ||
BufZones []string | ||
} | ||
|
||
// Option can be used to modify Ring behavior when calling Ring.GetWithOptions | ||
type Option func(opts *Options) | ||
|
||
// WithBuffers creates an Option that will cause the given buffers to be used, avoiding allocations. | ||
func WithBuffers(bufDescs []InstanceDesc, bufHosts, bufZones []string) Option { | ||
return func(opts *Options) { | ||
opts.BufDescs = bufDescs | ||
opts.BufHosts = bufHosts | ||
opts.BufZones = bufZones | ||
} | ||
} | ||
|
||
// WithReplicationFactor creates an Option that overrides the default replication factor for a single call. | ||
func WithReplicationFactor(replication int) Option { | ||
return func(opts *Options) { | ||
opts.ReplicationFactor = replication | ||
} | ||
} | ||
|
||
func collectOptions(opts ...Option) Options { | ||
final := Options{} | ||
for _, opt := range opts { | ||
opt(&final) | ||
} | ||
return final | ||
} | ||
|
||
// ReadRing represents the read interface to the ring. | ||
// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet. | ||
type ReadRing interface { | ||
|
@@ -42,13 +77,17 @@ type ReadRing interface { | |
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet(). | ||
Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) | ||
|
||
// GetWithOptions returns n (or more) instances which form the replicas for the given key | ||
// with 0 or more Option instances to change the behavior of the method call. | ||
GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) | ||
|
||
// GetAllHealthy returns all healthy instances in the ring, for the given operation. | ||
// This function doesn't check if the quorum is honored, so doesn't fail if the number | ||
// of unhealthy instances is greater than the tolerated max unavailable. | ||
GetAllHealthy(op Operation) (ReplicationSet, error) | ||
|
||
// GetReplicationSetForOperation returns all instances where the input operation should be executed. | ||
// The resulting ReplicationSet doesn't necessarily contains all healthy instances | ||
// The resulting ReplicationSet doesn't necessarily contain all healthy instances | ||
// in the ring, but could contain the minimum set of instances required to execute | ||
// the input operation. | ||
GetReplicationSetForOperation(op Operation) (ReplicationSet, error) | ||
|
@@ -421,19 +460,44 @@ func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegiste | |
} | ||
|
||
// Get returns n (or more) instances which form the replicas for the given key. | ||
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) { | ||
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, _ []string) (ReplicationSet, error) { | ||
// Note that we purposefully aren't calling GetWithOptions here since the closures it | ||
// uses result in heap allocations which we specifically avoid in this method since it's | ||
// called in hot loops. | ||
return r.getReplicationSetForKey(key, op, bufDescs, bufHosts, r.cfg.ReplicationFactor) | ||
} | ||
|
||
// GetWithOptions returns n (or more) instances which form the replicas for the given key | ||
// with 0 or more options to change the behavior of the method call. | ||
func (r *Ring) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) { | ||
options := collectOptions(opts...) | ||
return r.getReplicationSetForKey(key, op, options.BufDescs, options.BufHosts, options.ReplicationFactor) | ||
} | ||
|
||
func (r *Ring) getReplicationSetForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int) (ReplicationSet, error) { | ||
r.mtx.RLock() | ||
defer r.mtx.RUnlock() | ||
if r.ringDesc == nil || len(r.ringTokens) == 0 { | ||
return ReplicationSet{}, ErrEmptyRing | ||
} | ||
|
||
instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil) | ||
if replicationFactor <= 0 || replicationFactor < r.cfg.ReplicationFactor { | ||
replicationFactor = r.cfg.ReplicationFactor | ||
} | ||
|
||
// Not all replication strategies support increasing the replication factor beyond | ||
// the number of zones available. Return an error unless a ReplicationStrategy has | ||
// explicitly opted into supporting this. | ||
if replicationFactor > r.cfg.ReplicationFactor && !r.strategy.SupportsExpandedReplication() { | ||
return ReplicationSet{}, fmt.Errorf("per-call replication factor %d cannot exceed the configured replication factor %d with this replication strategy", replicationFactor, r.cfg.ReplicationFactor) | ||
} | ||
|
||
instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, replicationFactor, nil) | ||
if err != nil { | ||
return ReplicationSet{}, err | ||
} | ||
|
||
healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) | ||
healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, replicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) | ||
if err != nil { | ||
return ReplicationSet{}, err | ||
} | ||
|
@@ -447,21 +511,31 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, | |
// Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy. | ||
// InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early. | ||
// This function needs to be called with read lock on the ring. | ||
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) { | ||
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) { | ||
var ( | ||
n = r.cfg.ReplicationFactor | ||
n = replicationFactor | ||
instances = bufDescs[:0] | ||
start = searchToken(r.ringTokens, key) | ||
iterations = 0 | ||
maxZones = len(r.ringTokensByZone) | ||
maxInstances = len(r.ringDesc.Ingesters) | ||
|
||
// We use a slice instead of a map because it's faster to search within a | ||
// slice than lookup a map for a very low number of items. | ||
// slice than lookup a map for a very low number of items, we only expect | ||
// to have low single-digit number of hosts. | ||
distinctHosts = bufHosts[:0] | ||
distinctZones = bufZones[:0] | ||
|
||
// TODO: Do we need to pass this in to avoid allocations? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to replace passing |
||
hostsPerZone = make(map[string]int) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about this. Can't we still use a slice with a struct? Have you ran |
||
targetHostsPerZone = max(1, replicationFactor/maxZones) | ||
) | ||
for i := start; len(distinctHosts) < min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ { | ||
|
||
for i := start; len(distinctHosts) < min(maxInstances, n) && iterations < len(r.ringTokens); i++ { | ||
// If we have the target number of instances in all zones, stop looking. | ||
if r.cfg.ZoneAwarenessEnabled && haveTargetHostsInAllZones(hostsPerZone, targetHostsPerZone, maxZones) { | ||
break | ||
} | ||
|
||
iterations++ | ||
// Wrap i around in the ring. | ||
i %= len(r.ringTokens) | ||
|
@@ -478,9 +552,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance | |
continue | ||
} | ||
|
||
// Ignore if the instances don't have a zone set. | ||
// If we already have the required number of instances for this zone, skip. | ||
if r.cfg.ZoneAwarenessEnabled && info.Zone != "" { | ||
if slices.Contains(distinctZones, info.Zone) { | ||
if hostsPerZone[info.Zone] >= targetHostsPerZone { | ||
continue | ||
} | ||
} | ||
|
@@ -493,9 +567,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance | |
if op.ShouldExtendReplicaSetOnState(instance.State) { | ||
n++ | ||
} else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" { | ||
// We should only add the zone if we are not going to extend, | ||
// as we want to extend the instance in the same AZ. | ||
distinctZones = append(distinctZones, info.Zone) | ||
// We should only increment the count for this zone if we are not going to | ||
// extend, as we want to extend the instance in the same AZ. | ||
hostsPerZone[info.Zone]++ | ||
} | ||
|
||
include, keepGoing := true, true | ||
|
@@ -512,6 +586,20 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance | |
return instances, nil | ||
} | ||
|
||
func haveTargetHostsInAllZones(hostsByZone map[string]int, targetHostsPerZone int, maxZones int) bool { | ||
if len(hostsByZone) != maxZones { | ||
return false | ||
} | ||
|
||
for _, count := range hostsByZone { | ||
if count < targetHostsPerZone { | ||
return false | ||
} | ||
} | ||
|
||
return true | ||
} | ||
|
||
// GetAllHealthy implements ReadRing. | ||
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) { | ||
r.mtx.RLock() | ||
|
@@ -1332,36 +1420,3 @@ func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool { | |
|
||
// All states are healthy, no states extend replica set. | ||
var allStatesRingOperation = Operation(0x0000ffff) | ||
|
||
// numberOfKeysOwnedByInstance returns how many of the supplied keys are owned by given instance. | ||
func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instanceID string, bufDescs []InstanceDesc, bufHosts []string, bufZones []string) (int, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was only called from tests. I've moved most of this logic to the test in question ( |
||
r.mtx.RLock() | ||
defer r.mtx.RUnlock() | ||
|
||
if r.ringDesc == nil || len(r.ringTokens) == 0 { | ||
return 0, ErrEmptyRing | ||
} | ||
|
||
// Instance is not in this ring, it can't own any key. | ||
if _, ok := r.ringDesc.Ingesters[instanceID]; !ok { | ||
return 0, nil | ||
} | ||
|
||
owned := 0 | ||
for _, tok := range keys { | ||
i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) { | ||
if foundInstanceID == instanceID { | ||
// If we've found our instance, we can stop. | ||
return true, false | ||
} | ||
return false, true | ||
}) | ||
if err != nil { | ||
return 0, err | ||
} | ||
if len(i) > 0 { | ||
owned++ | ||
} | ||
} | ||
return owned, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we know if this won't break clients with their own
ReplicationStrategy
implementations? I'm wondering if we shouldn't add a new interface and check with type assertions ingetReplicationSetForKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will break existing
ReplicationStrategy
implementations but it's an obvious and simple fix. We could add a new type but I'm not sure what that gets us besides not requiring changes from people that have their ownReplicationStrategy
implementations.