Skip to content
6 changes: 4 additions & 2 deletions internal/xds/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cdsbalancer
import (
"context"

"google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand All @@ -32,8 +33,9 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) ResourceChanged(u *xdsresource.ClusterResourceData, onDone func()) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
func (cw *clusterWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
clusterData := rd.(*xdsresource.ClusterResourceData)
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, clusterData.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

Expand Down
3 changes: 1 addition & 2 deletions internal/xds/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"google.golang.org/grpc/internal/xds/balancer/loadstore"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -236,7 +235,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
}
}
if startNewLoadReport {
var loadStore *lrsclient.LoadStore
var loadStore xdsclient.LoadStore
if b.xdsClient != nil {
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
clientimpl "google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand Down Expand Up @@ -76,12 +77,13 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
}

// ResourceChanged is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) ResourceChanged(update *xdsresource.EndpointsResourceData, onDone func()) {
func (er *edsDiscoveryMechanism) ResourceChanged(rd clientimpl.ResourceData, onDone func()) {
if er.stopped.HasFired() {
onDone()
return
}

update := rd.(*xdsresource.EndpointsResourceData)
er.mu.Lock()
er.update = &update.Resource
er.mu.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions internal/xds/balancer/loadstore/load_store_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"sync"

"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
"google.golang.org/grpc/internal/xds/xdsclient"
)

// NewWrapper creates a Wrapper.
Expand Down Expand Up @@ -54,8 +54,8 @@ type Wrapper struct {
// store and perCluster are initialized as nil. They are only set by the
// balancer when LRS is enabled. Before that, all functions to record loads
// are no-op.
store *lrsclient.LoadStore
perCluster *lrsclient.PerClusterReporter
store xdsclient.LoadStore
perCluster xdsclient.PerClusterReporter
}

// UpdateClusterAndService updates the cluster name and eds service for this
Expand All @@ -77,7 +77,7 @@ func (lsw *Wrapper) UpdateClusterAndService(cluster, edsService string) {

// UpdateLoadStore updates the load store for this wrapper. If it is changed
// from before, the perCluster store in this wrapper will also be updated.
func (lsw *Wrapper) UpdateLoadStore(store *lrsclient.LoadStore) {
func (lsw *Wrapper) UpdateLoadStore(store xdsclient.LoadStore) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
Expand Down
11 changes: 7 additions & 4 deletions internal/xds/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package resolver
import (
"context"

"google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand All @@ -36,8 +37,9 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}

func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
func (l *listenerWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
listenerData := rd.(*xdsresource.ListenerResourceData)
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(listenerData.Resource); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

Expand Down Expand Up @@ -68,9 +70,10 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}

func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigResourceData, onDone func()) {
func (r *routeConfigWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
rcData := rd.(*xdsresource.RouteConfigResourceData)
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
r.parent.onRouteConfigResourceUpdate(r.resourceName, rcData.Resource)
onDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
Expand Down
12 changes: 7 additions & 5 deletions internal/xds/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand All @@ -58,7 +59,7 @@ type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err
// XDSClient wraps the methods on the XDSClient which are required by
// the listenerWrapper.
type XDSClient interface {
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
WatchResource(rType xdsclient.ResourceType, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())
BootstrapConfig() *bootstrap.Config
}

Expand Down Expand Up @@ -386,17 +387,18 @@ type ldsWatcher struct {
name string
}

func (lw *ldsWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
func (lw *ldsWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
defer onDone()
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
lw.logger.Warningf("Resource %q received update after listener was closed", lw.name)
return
}
listenerData := rd.(*xdsresource.ListenerResourceData)
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, listenerData.Resource)
}
l := lw.parent
ilc := update.Resource.InboundListenerCfg
ilc := listenerData.Resource.InboundListenerCfg
// Make sure that the socket address on the received Listener resource
// matches the address of the net.Listener passed to us by the user. This
// check is done here instead of at the XDSClient layer because of the
Expand Down
8 changes: 5 additions & 3 deletions internal/xds/server/rds_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

Expand Down Expand Up @@ -135,20 +136,21 @@ type rdsWatcher struct {
canceled bool // eats callbacks if true
}

func (rw *rdsWatcher) ResourceChanged(update *xdsresource.RouteConfigResourceData, onDone func()) {
func (rw *rdsWatcher) ResourceChanged(rd xdsclient.ResourceData, onDone func()) {
defer onDone()
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
return
}
rw.mu.Unlock()
rcData := rd.(*xdsresource.RouteConfigResourceData)
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, rcData.Resource)
}

routeName := rw.routeName
rwu := rdsWatcherUpdate{data: &update.Resource}
rwu := rdsWatcherUpdate{data: &rcData.Resource}
rw.parent.updates[routeName] = rwu
rw.parent.callback(routeName, rwu)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/xds/testutils/fakeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Client struct {
name string
loadReportCh *testutils.Channel
lrsCancelCh *testutils.Channel
loadStore *lrsclient.LoadStore
loadStore xdsclient.LoadStore
bootstrapCfg *bootstrap.Config
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (*stream) Recv() ([]byte, error) {
}

// ReportLoad starts reporting load about clusterName to server.
func (xdsC *Client) ReportLoad(server *bootstrap.ServerConfig) (loadStore *lrsclient.LoadStore, cancel func(context.Context)) {
func (xdsC *Client) ReportLoad(server *bootstrap.ServerConfig) (loadStore xdsclient.LoadStore, cancel func(context.Context)) {
lrsClient, _ := lrsclient.New(lrsclient.Config{Node: clients.Node{ID: "fake-node-id"}, TransportBuilder: &transportBuilder{}})
xdsC.loadStore, _ = lrsClient.ReportLoad(clients.ServerIdentifier{ServerURI: server.ServerURI()})

Expand All @@ -100,7 +100,7 @@ func (xdsC *Client) WaitForCancelReportLoad(ctx context.Context) error {
}

// LoadStore returns the underlying load data store.
func (xdsC *Client) LoadStore() *lrsclient.LoadStore {
func (xdsC *Client) LoadStore() xdsclient.LoadStore {
return xdsC.loadStore
}

Expand Down
33 changes: 30 additions & 3 deletions internal/xds/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (

v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
)

// XDSClient is a full fledged gRPC client which queries a set of discovery APIs
Expand All @@ -47,13 +48,39 @@ type XDSClient interface {
// During a race (e.g. an xDS response is received while the user is calling
// cancel()), there's a small window where the callback can be called after
// the watcher is canceled. Callers need to handle this case.
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
WatchResource(rType xdsclient.ResourceType, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())

ReportLoad(*bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context))
ReportLoad(*bootstrap.ServerConfig) (LoadStore, func(context.Context))

BootstrapConfig() *bootstrap.Config
}

// PerClusterReporter is the minimal interface callers use to record per-cluster
// load and drops. It mirrors the PerClusterReporter methods from the concrete
// lrsclient.PerClusterReporter but is intentionally tiny.
type PerClusterReporter interface {
// CallStarted records that a call has started for the given locality.
CallStarted(locality clients.Locality)

// CallFinished records that a call has finished for the given locality.
// Pass the error (nil if success) so implementations can update success/failure counters.
CallFinished(locality clients.Locality, err error)

// CallServerLoad reports a numeric load metric for the given locality and cluster.
// (If your code uses a different name/parameters for this, match the concrete type.)
CallServerLoad(locality clients.Locality, clusterName string, value float64)

// CallDropped records a dropped request of the given category.
CallDropped(category string)
}

// LoadStore is the minimal interface returned by ReportLoad. It provides a
// way to get per-cluster reporters and to stop the store.
type LoadStore interface {
ReporterForCluster(clusterName, serviceName string) *lrsclient.PerClusterReporter
Stop(ctx context.Context)
}

// DumpResources returns the status and contents of all xDS resources. It uses
// xDS clients from the default pool.
func DumpResources() *v3statuspb.ClientStatusResponse {
Expand Down
4 changes: 2 additions & 2 deletions internal/xds/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var (
// interface with ref counting so that it can be shared by the xds resolver and
// balancer implementations, across multiple ClientConns and Servers.
type clientImpl struct {
*xdsclient.XDSClient // TODO: #8313 - get rid of embedding, if possible.
xdsClient *xdsclient.XDSClient

// The following fields are initialized at creation time and are read-only
// after that.
Expand Down Expand Up @@ -137,7 +137,7 @@ func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecor
return nil, err
}
c := &clientImpl{
XDSClient: client,
xdsClient: client,
xdsClientConfig: gConfig,
bootstrapConfig: config,
target: target,
Expand Down
5 changes: 2 additions & 3 deletions internal/xds/xdsclient/clientimpl_loadreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import (
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/grpctransport"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
)

// ReportLoad starts a load reporting stream to the given server. All load
// reports to the same server share the LRS stream.
//
// It returns a lrsclient.LoadStore for the user to report loads.
func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context)) {
// It returns a LoadStore for the user to report loads.
func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (LoadStore, func(context.Context)) {
load, err := c.lrsClient.ReportLoad(clients.ServerIdentifier{
ServerURI: server.ServerURI(),
Extensions: grpctransport.ServerIdentifierExtension{
Expand Down
Loading
Loading