Skip to content

Commit f280781

Browse files
committed
deactivate legacy wildcard support
1 parent 4138018 commit f280781

File tree

9 files changed

+167
-27
lines changed

9 files changed

+167
-27
lines changed

pkg/cache/v3/delta_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
2020
"github.com/envoyproxy/go-control-plane/pkg/log"
2121
rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
22+
"github.com/envoyproxy/go-control-plane/pkg/server/config"
2223
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2324
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
2425
)
@@ -39,7 +40,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
3940
// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
4041
for _, typ := range testTypes {
4142
watches[typ] = make(chan cache.DeltaResponse, 1)
42-
subscriptions[typ] = stream.NewDeltaSubscription(nil, nil, nil)
43+
subscriptions[typ] = stream.NewDeltaSubscription(nil, nil, nil, config.NewOpts(), typ)
4344
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
4445
Node: &core.Node{
4546
Id: "node",
@@ -120,7 +121,7 @@ func TestDeltaRemoveResources(t *testing.T) {
120121

121122
for _, typ := range testTypes {
122123
watches[typ] = make(chan cache.DeltaResponse, 1)
123-
sub := stream.NewDeltaSubscription(nil, nil, nil)
124+
sub := stream.NewDeltaSubscription(nil, nil, nil, config.NewOpts(), typ)
124125
subscriptions[typ] = &sub
125126
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
126127
// functionality. This means we should receive all resources back without requesting a subscription by name.
@@ -210,7 +211,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
210211
},
211212
TypeUrl: rsrc.EndpointType,
212213
ResourceNamesSubscribe: []string{clusterName},
213-
}, stream.NewDeltaSubscription([]string{clusterName}, nil, nil), responses)
214+
}, stream.NewDeltaSubscription([]string{clusterName}, nil, nil, config.NewOpts(), rsrc.EndpointType), responses)
214215

215216
require.NoError(t, err)
216217
defer cancel()
@@ -227,7 +228,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
227228

228229
// Create a non-buffered channel that will block sends.
229230
watchCh := make(chan cache.DeltaResponse)
230-
sub := stream.NewDeltaSubscription(names[rsrc.EndpointType], nil, nil)
231+
sub := stream.NewDeltaSubscription(names[rsrc.EndpointType], nil, nil, config.NewOpts(), rsrc.EndpointType)
231232
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
232233
Node: &core.Node{
233234
Id: key,
@@ -277,7 +278,7 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
277278
},
278279
TypeUrl: typ,
279280
ResourceNamesSubscribe: names[typ],
280-
}, stream.NewDeltaSubscription(names[typ], nil, nil), responses)
281+
}, stream.NewDeltaSubscription(names[typ], nil, nil, config.NewOpts(), typ), responses)
281282
require.NoError(t, err)
282283

283284
// Cancel the watch

pkg/cache/v3/linear_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
3333
"github.com/envoyproxy/go-control-plane/pkg/log"
3434
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
35+
"github.com/envoyproxy/go-control-plane/pkg/server/config"
3536
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
3637
)
3738

@@ -223,7 +224,7 @@ func hashResource(t *testing.T, resource types.Resource) string {
223224

224225
func createWildcardDeltaWatch(t *testing.T, initialReq bool, c *LinearCache, w chan DeltaResponse) {
225226
t.Helper()
226-
sub := stream.NewDeltaSubscription(nil, nil, nil)
227+
sub := stream.NewDeltaSubscription(nil, nil, nil, config.NewOpts(), testType)
227228
req := &DeltaRequest{TypeUrl: testType}
228229
if !initialReq {
229230
req.ResponseNonce = "1"
@@ -237,7 +238,7 @@ func createWildcardDeltaWatch(t *testing.T, initialReq bool, c *LinearCache, w c
237238
}
238239

239240
func subFromRequest(req *Request) stream.Subscription {
240-
return stream.NewSotwSubscription(req.GetResourceNames())
241+
return stream.NewSotwSubscription(req.GetResourceNames(), config.NewOpts(), req.GetTypeUrl())
241242
}
242243

243244
// This method represents the expected behavior of client and servers regarding the request and the subscription.
@@ -250,7 +251,7 @@ func updateFromSotwResponse(resp Response, sub *stream.Subscription, req *Reques
250251
}
251252

252253
func subFromDeltaRequest(req *DeltaRequest) stream.Subscription {
253-
return stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions())
254+
return stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions(), config.NewOpts(), req.GetTypeUrl())
254255
}
255256

256257
func TestLinearInitialResources(t *testing.T) {

pkg/cache/v3/simple_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
3737
"github.com/envoyproxy/go-control-plane/pkg/log"
3838
rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
39+
"github.com/envoyproxy/go-control-plane/pkg/server/config"
3940
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
4041
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
4142
)
@@ -54,7 +55,7 @@ func (group) ID(node *core.Node) string {
5455
}
5556

5657
func subFromRequest(req *cache.Request) stream.Subscription {
57-
return stream.NewSotwSubscription(req.GetResourceNames())
58+
return stream.NewSotwSubscription(req.GetResourceNames(), config.NewOpts(), req.GetTypeUrl())
5859
}
5960

6061
// This method represents the expected behavior of client and servers regarding the request and the subscription.
@@ -601,7 +602,7 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) {
601602
ResourceNames: []string{"rtds"},
602603
TypeUrl: rsrc.RuntimeType,
603604
}
604-
ss := stream.NewSotwSubscription([]string{"rtds"})
605+
ss := stream.NewSotwSubscription([]string{"rtds"}, config.NewOpts(), rsrc.RuntimeType)
605606
ss.SetReturnedResources(map[string]string{"cluster": "abcdef"})
606607
responder := make(chan cache.Response)
607608
_, err := c.CreateWatch(req, ss, responder)

pkg/server/config/config.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ type Opts struct {
99
Ordered bool
1010

1111
Logger log.Logger
12+
13+
// If true, deactivate legacy wildcard mode for all resource types
14+
legacyWildcardDeactivated bool
15+
16+
// Deactivate legacy wildcard mode for specific resource types
17+
legacyWildcardDeactivatedTypes map[string]struct{}
1218
}
1319

1420
func NewOpts() Opts {
@@ -18,6 +24,16 @@ func NewOpts() Opts {
1824
}
1925
}
2026

27+
// LegacyWildcardDeactivated returns whether legacy wildcard mode is deactivated for all resource types
28+
func (o Opts) LegacyWildcardDeactivated() bool {
29+
return o.legacyWildcardDeactivated
30+
}
31+
32+
// LegacyWildcardDeactivatedTypes returns the set of resource types for which legacy wildcard mode is deactivated
33+
func (o Opts) LegacyWildcardDeactivatedTypes() map[string]struct{} {
34+
return o.legacyWildcardDeactivatedTypes
35+
}
36+
2137
// Each xDS implementation should implement their own functional opts.
2238
// It is recommended that config values be added in this package specifically,
2339
// but the individual opts functions should be in their respective
@@ -28,3 +44,29 @@ func NewOpts() Opts {
2844
//
2945
// this allows for easy inference as to which opt applies to what implementation.
3046
type XDSOption func(*Opts)
47+
48+
// DeactivateLegacyWildcard deactivates legacy wildcard mode for all resource types.
49+
// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long
50+
// as there is no request made with resources or explicit wildcard requests on the same stream.
51+
// When deactivated, empty requests are treated as a request with no subscriptions to any resource.
52+
// This is recommended for when you are using the go-control-plane to serve grpc-xds clients.
53+
// These clients never want to treat an empty request as a wildcard subscription.
54+
func DeactivateLegacyWildcard() XDSOption {
55+
return func(o *Opts) {
56+
o.legacyWildcardDeactivated = true
57+
}
58+
}
59+
60+
// DeactivateLegacyWildcardForTypes deactivates legacy wildcard mode for specific resource types.
61+
// In legacy wildcard mode, empty requests to a stream, are treated as wildcard requests as long
62+
// as there is no request made with resources or explicit wildcard requests on the same stream.
63+
// When deactivated, empty requests are treated as a request with no subscriptions to any resource.
64+
func DeactivateLegacyWildcardForTypes(types []string) XDSOption {
65+
return func(o *Opts) {
66+
typeMap := make(map[string]struct{}, len(types))
67+
for _, t := range types {
68+
typeMap[t] = struct{}{}
69+
}
70+
o.legacyWildcardDeactivatedTypes = typeMap
71+
}
72+
}

pkg/server/delta/v3/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
218218
// We also set the subscription as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe).
219219
// If the subscription starts with this legacy mode, adding new resources will not unsubscribe from wildcard.
220220
// It can still be done by explicitly unsubscribing from "*"
221-
watch.subscription = stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions())
221+
watch.subscription = stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions(), s.opts, typeURL)
222222
} else {
223223
watch.Cancel()
224224

pkg/server/sotw/v3/ads.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
107107
subscription.SetResourceSubscription(req.GetResourceNames())
108108
} else {
109109
s.opts.Logger.Debugf("[sotw ads] New subscription for type %s and stream %d", typeURL, sw.ID)
110-
subscription = stream.NewSotwSubscription(req.GetResourceNames())
110+
subscription = stream.NewSotwSubscription(req.GetResourceNames(), s.opts, typeURL)
111111
}
112112

113113
cancel, err := s.cache.CreateWatch(req, subscription, respChan)

pkg/server/sotw/v3/xds.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
128128
subscription.SetResourceSubscription(req.GetResourceNames())
129129
} else {
130130
s.opts.Logger.Debugf("[sotw] New subscription for type %s and stream %d", typeURL, sw.ID)
131-
subscription = stream.NewSotwSubscription(req.GetResourceNames())
131+
subscription = stream.NewSotwSubscription(req.GetResourceNames(), s.opts, typeURL)
132132
}
133133

134134
responder := make(chan cache.Response, 1)

pkg/server/stream/v3/subscription.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package stream
22

3+
import "github.com/envoyproxy/go-control-plane/pkg/server/config"
4+
35
const (
46
explicitWildcard = "*"
57
)
@@ -9,10 +11,10 @@ type Subscription struct {
911
// wildcard indicates if the subscription currently has a wildcard watch.
1012
wildcard bool
1113

12-
// allowLegacyWildcard indicates that the stream never provided any resource
13-
// and is de facto wildcard.
14-
// As soon as a resource or an explicit subscription to wildcard is provided,
15-
// this flag will be set to false
14+
// allowLegacyWildcard indicates that an empty request should be treated as a wildcard request.
15+
// If in the stream at some point subscribes explicitly a resource or a explicitly makes wildcard
16+
// request, then subsequent empty requests should not be treated as wildcard and so this
17+
// flag will be set to false.
1618
allowLegacyWildcard bool
1719

1820
// subscribedResourceNames provides the resources explicitly requested by the client
@@ -24,10 +26,26 @@ type Subscription struct {
2426
}
2527

2628
// newSubscription initializes a subscription state.
27-
func newSubscription(wildcard bool, initialResourceVersions map[string]string) Subscription {
29+
func newSubscription(emptyRequest bool, initialResourceVersions map[string]string, opts config.Opts, typeURL string) Subscription {
30+
allowLegacyWildcard := emptyRequest
31+
32+
if opts.LegacyWildcardDeactivated() {
33+
allowLegacyWildcard = false
34+
} else if typeMap := opts.LegacyWildcardDeactivatedTypes(); len(typeMap) > 0 {
35+
if _, found := typeMap[typeURL]; found {
36+
allowLegacyWildcard = false
37+
}
38+
}
39+
40+
// By default we set the subscription as a wildcard only if the request was empty
41+
// and in legacy mode. Later on when we actually process the request, if the request
42+
// was non-empty, it may have an explicit wildcard subscription, in which case
43+
// we will set the wildcard field on the subscription accordingly.
44+
wildcard := emptyRequest && allowLegacyWildcard
45+
2846
state := Subscription{
2947
wildcard: wildcard,
30-
allowLegacyWildcard: wildcard,
48+
allowLegacyWildcard: allowLegacyWildcard,
3149
subscribedResourceNames: map[string]struct{}{},
3250
returnedResources: initialResourceVersions,
3351
}
@@ -39,8 +57,8 @@ func newSubscription(wildcard bool, initialResourceVersions map[string]string) S
3957
return state
4058
}
4159

42-
func NewSotwSubscription(subscribed []string) Subscription {
43-
sub := newSubscription(len(subscribed) == 0, nil)
60+
func NewSotwSubscription(subscribed []string, opts config.Opts, typeURL string) Subscription {
61+
sub := newSubscription(len(subscribed) == 0, nil, opts, typeURL)
4462
sub.SetResourceSubscription(subscribed)
4563
return sub
4664
}
@@ -90,8 +108,8 @@ func (s *Subscription) SetResourceSubscription(subscribed []string) {
90108
s.subscribedResourceNames = subscribedResources
91109
}
92110

93-
func NewDeltaSubscription(subscribed, unsubscribed []string, initialResourceVersions map[string]string) Subscription {
94-
sub := newSubscription(len(subscribed) == 0, initialResourceVersions)
111+
func NewDeltaSubscription(subscribed, unsubscribed []string, initialResourceVersions map[string]string, opts config.Opts, typeURL string) Subscription {
112+
sub := newSubscription(len(subscribed) == 0, initialResourceVersions, opts, typeURL)
95113
sub.UpdateResourceSubscriptions(subscribed, unsubscribed)
96114
return sub
97115
}

pkg/server/stream/v3/subscription_test.go

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"testing"
55

66
"github.com/stretchr/testify/assert"
7+
8+
"github.com/envoyproxy/go-control-plane/pkg/server/config"
79
)
810

911
func TestSotwSubscriptions(t *testing.T) {
1012
t.Run("legacy mode properly handled", func(t *testing.T) {
11-
sub := NewSotwSubscription([]string{})
13+
sub := NewSotwSubscription([]string{}, config.NewOpts(), "")
1214
assert.True(t, sub.IsWildcard())
1315

1416
// Requests always set empty in legacy mode
@@ -35,7 +37,7 @@ func TestSotwSubscriptions(t *testing.T) {
3537

3638
t.Run("new wildcard mode from start", func(t *testing.T) {
3739
// A resource is provided so the subscription was created in wildcard
38-
sub := NewSotwSubscription([]string{"*"})
40+
sub := NewSotwSubscription([]string{"*"}, config.NewOpts(), "")
3941
assert.True(t, sub.IsWildcard())
4042
assert.Empty(t, sub.SubscribedResources())
4143

@@ -73,7 +75,7 @@ func TestSotwSubscriptions(t *testing.T) {
7375

7476
func TestDeltaSubscriptions(t *testing.T) {
7577
t.Run("legacy mode properly handled", func(t *testing.T) {
76-
sub := NewDeltaSubscription([]string{}, []string{}, map[string]string{"resource": "version"})
78+
sub := NewDeltaSubscription([]string{}, []string{}, map[string]string{"resource": "version"}, config.NewOpts(), "")
7779
assert.True(t, sub.IsWildcard())
7880
assert.Empty(t, sub.SubscribedResources())
7981
assert.Equal(t, map[string]string{"resource": "version"}, sub.ReturnedResources())
@@ -102,7 +104,7 @@ func TestDeltaSubscriptions(t *testing.T) {
102104

103105
t.Run("new wildcard mode", func(t *testing.T) {
104106
// A resource is provided so the subscription was created in wildcard
105-
sub := NewDeltaSubscription([]string{"*"}, []string{}, map[string]string{"resource": "version"})
107+
sub := NewDeltaSubscription([]string{"*"}, []string{}, map[string]string{"resource": "version"}, config.NewOpts(), "")
106108
assert.True(t, sub.IsWildcard())
107109
assert.Empty(t, sub.SubscribedResources())
108110

@@ -157,3 +159,78 @@ func TestDeltaSubscriptions(t *testing.T) {
157159
assert.Equal(t, map[string]struct{}{"resource": {}}, sub.SubscribedResources())
158160
})
159161
}
162+
163+
func TestSotwSubscriptionsWithDeactivatedLegacyWildcard(t *testing.T) {
164+
t.Run("deactivate for all types", func(t *testing.T) {
165+
opts := config.NewOpts()
166+
deactivateOpt := config.DeactivateLegacyWildcard()
167+
deactivateOpt(&opts)
168+
169+
// Create subscription with empty resource list (would normally be legacy wildcard)
170+
sub := NewSotwSubscription([]string{}, opts, "type.googleapis.com/envoy.config.cluster.v3.Cluster")
171+
172+
// With deactivated legacy wildcard, subscription should NOT be wildcard initially
173+
// because allowLegacyWildcard=false means empty list doesn't trigger legacy behavior
174+
assert.False(t, sub.IsWildcard())
175+
176+
// Set empty resources - should remain non-wildcard
177+
sub.SetResourceSubscription([]string{})
178+
assert.False(t, sub.IsWildcard())
179+
180+
// Can still explicitly subscribe to wildcard
181+
sub.SetResourceSubscription([]string{"*"})
182+
assert.True(t, sub.IsWildcard())
183+
})
184+
}
185+
186+
func TestSotwSubscriptionsWithDeactivatedLegacyWildcardForTypes(t *testing.T) {
187+
t.Run("deactivate for specific type", func(t *testing.T) {
188+
opts := config.NewOpts()
189+
clusterType := "type.googleapis.com/envoy.config.cluster.v3.Cluster"
190+
endpointType := "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
191+
192+
deactivateOpt := config.DeactivateLegacyWildcardForTypes([]string{clusterType})
193+
deactivateOpt(&opts)
194+
195+
// Create subscription for deactivated type
196+
subCluster := NewSotwSubscription([]string{}, opts, clusterType)
197+
// Should NOT be wildcard because legacy wildcard is deactivated for this type
198+
assert.False(t, subCluster.IsWildcard())
199+
200+
// Setting empty resources should remain non-wildcard
201+
subCluster.SetResourceSubscription([]string{})
202+
assert.False(t, subCluster.IsWildcard())
203+
204+
// Create subscription for non-deactivated type
205+
subEndpoint := NewSotwSubscription([]string{}, opts, endpointType)
206+
assert.True(t, subEndpoint.IsWildcard())
207+
208+
// Setting empty resources should maintain legacy wildcard for this type
209+
subEndpoint.SetResourceSubscription([]string{})
210+
assert.True(t, subEndpoint.IsWildcard())
211+
})
212+
213+
t.Run("deactivate for multiple types", func(t *testing.T) {
214+
opts := config.NewOpts()
215+
clusterType := "type.googleapis.com/envoy.config.cluster.v3.Cluster"
216+
endpointType := "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
217+
routeType := "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"
218+
219+
deactivateOpt := config.DeactivateLegacyWildcardForTypes([]string{clusterType, endpointType})
220+
deactivateOpt(&opts)
221+
222+
// Both cluster and endpoint should have legacy wildcard deactivated
223+
subCluster := NewSotwSubscription([]string{}, opts, clusterType)
224+
subCluster.SetResourceSubscription([]string{})
225+
assert.False(t, subCluster.IsWildcard())
226+
227+
subEndpoint := NewSotwSubscription([]string{}, opts, endpointType)
228+
subEndpoint.SetResourceSubscription([]string{})
229+
assert.False(t, subEndpoint.IsWildcard())
230+
231+
// Route should still have legacy wildcard enabled
232+
subRoute := NewSotwSubscription([]string{}, opts, routeType)
233+
subRoute.SetResourceSubscription([]string{})
234+
assert.True(t, subRoute.IsWildcard())
235+
})
236+
}

0 commit comments

Comments
 (0)