Skip to content

Commit

Permalink
Reuse slices for LIST calls
Browse files Browse the repository at this point in the history
Signed-off-by: Igor Suleymanov <[email protected]>
  • Loading branch information
radiohead committed Mar 27, 2024
1 parent d2cd257 commit 25d6c90
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 16 deletions.
1 change: 1 addition & 0 deletions k8s/client_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (c *ClientRegistry) ClientFor(sch resource.Schema) (resource.Client, error)
config: c.clientConfig,
requestDurations: c.requestDurations,
totalRequests: c.totalRequests,
listParser: NewListParser(),
},
schema: sch,
config: c.clientConfig,
Expand Down
6 changes: 4 additions & 2 deletions k8s/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ func TestClient_Client(t *testing.T) {
restClient := getMockClient("http://localhost", testSchema.Group(), testSchema.Version())
client := Client{
client: &groupVersionClient{
client: restClient,
client: restClient,
listParser: NewListParser(),
},
schema: testSchema,
}
Expand Down Expand Up @@ -678,7 +679,8 @@ func getClientTestSetup(schema resource.Schema) (*Client, *testServer) {
client := getMockClient(server.URL, schema.Group(), schema.Version())
return &Client{
client: &groupVersionClient{
client: client,
client: client,
listParser: NewListParser(),
},
schema: schema,
}, &s
Expand Down
3 changes: 2 additions & 1 deletion k8s/gvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type groupVersionClient struct {
config ClientConfig
requestDurations *prometheus.HistogramVec
totalRequests *prometheus.CounterVec
listParser *ListParser
}

func (g *groupVersionClient) get(ctx context.Context, identifier resource.Identifier, plural string,
Expand Down Expand Up @@ -367,7 +368,7 @@ func (g *groupVersionClient) list(
span.SetStatus(codes.Error, err.Error())
return err
}
return rawToListWithParser(bytes, into, options.Limit, itemParser)
return g.listParser.Parse(bytes, into, options.Limit, itemParser)
}

//nolint:revive
Expand Down
1 change: 1 addition & 0 deletions k8s/schemaless.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (s *SchemalessClient) getClient(identifier resource.FullIdentifier) (*group
config: s.clientConfig,
requestDurations: s.requestDurations,
totalRequests: s.totalRequests,
listParser: NewListParser(),
}
return s.clients[gv.Identifier()], nil
}
Expand Down
3 changes: 2 additions & 1 deletion k8s/schemaless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ func getSchemalessClientTestSetup(gvs ...schema.GroupVersion) (*SchemalessClient

for _, gv := range gvs {
client.clients[gv.Identifier()] = &groupVersionClient{
client: getMockClient(server.URL, gv.Group, gv.Version),
client: getMockClient(server.URL, gv.Group, gv.Version),
listParser: NewListParser(),
}
}
return client, &s
Expand Down
66 changes: 57 additions & 9 deletions k8s/translation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"time"

admission "k8s.io/api/admission/v1beta1"
Expand Down Expand Up @@ -132,23 +133,70 @@ func rawToObject(raw []byte, into resource.Object) error {
// ObjectParserFn is a function that parses raw bytes into a resource.Object.
type ObjectParserFn func([]byte) (resource.Object, error)

func rawToListWithParser(raw []byte, into resource.ListObject, respSize int, itemParser ObjectParserFn) error {
var um k8sListWithItems
if respSize > 0 {
um.Items = make([]json.RawMessage, 0, respSize)
// ListParser
type ListParser struct {
pool sync.Pool
}

// NewListParser
func NewListParser() *ListParser {
return &ListParser{
pool: sync.Pool{
New: func() any {
return newK8sListWithItems(0)
},
},
}
}

func newK8sListWithItems(sz int) *k8sListWithItems {
res := &k8sListWithItems{}

if sz < 1 {
sz = 128 // TODO: do we need default size?
}

res.Items = make([]json.RawMessage, 0, sz)
return res
}

func resizeK8sListWithItems(lis *k8sListWithItems, newsz int) *k8sListWithItems {
// Always clear first
clear(lis.Items)
lis.Items = lis.Items[:0]

if newsz < 1 {
return lis
}

if cap(lis.Items)-len(lis.Items) < newsz {
lis.Items = append(make([]json.RawMessage, 0, len(lis.Items)+newsz), lis.Items...)
}

return lis
}

// Parse
func (p *ListParser) Parse(raw []byte, into resource.ListObject, respSize int, itemParser ObjectParserFn) error {
lis, ok := p.pool.Get().(*k8sListWithItems)
if !ok {
lis = newK8sListWithItems(respSize)
} else {
lis = resizeK8sListWithItems(lis, respSize)
}
defer p.pool.Put(lis)

if err := json.Unmarshal(raw, &um); err != nil {
if err := json.Unmarshal(raw, &lis); err != nil {
return err
}

into.SetListMetadata(resource.ListMetadata{
ResourceVersion: um.Metadata.ResourceVersion,
Continue: um.Metadata.Continue,
RemainingItemCount: um.Metadata.RemainingItemCount,
ResourceVersion: lis.Metadata.ResourceVersion,
Continue: lis.Metadata.Continue,
RemainingItemCount: lis.Metadata.RemainingItemCount,
})

for _, item := range um.Items {
for _, item := range lis.Items {
obj, err := itemParser(item)
if err != nil {
into.Clear()
Expand Down
3 changes: 2 additions & 1 deletion k8s/translation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ func TestRawToListWithParser(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := rawToListWithParser(test.raw, test.into, len(test.expectedList.ListItems()), test.parser)
lp := NewListParser()
err := lp.Parse(test.raw, test.into, len(test.expectedList.ListItems()), test.parser)
assert.Equal(t, test.expectedError, err)
assert.Equal(t, test.expectedList.ListMetadata(), test.into.ListMetadata())
// Compare list items as JSON, as the lists are slices of pointers and will be unequal
Expand Down
17 changes: 15 additions & 2 deletions operator/informer_kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func NewKubernetesBasedInformerWithFilters(sch resource.Schema, client ListWatch
return nil, fmt.Errorf("client cannot be nil")
}

resp := listObjectWrapper{
Items: make([]runtime.Object, 0, 128),
}

return &KubernetesBasedInformer{
schema: sch,
ErrorHandler: func(err error) {
Expand All @@ -60,9 +64,18 @@ func NewKubernetesBasedInformerWithFilters(sch resource.Schema, client ListWatch
attribute.String("kind.version", sch.Version()),
attribute.String("namespace", namespace),
)
resp := listObjectWrapper{
Items: make([]runtime.Object, 0, options.Limit),

// Always clear first
clear(resp.Items)
resp.Items = resp.Items[:0]

// Resize after if needed
if newsz := int(options.Limit); newsz > 0 {
if cap(resp.Items)-len(resp.Items) < newsz {
resp.Items = append(make([]runtime.Object, 0, len(resp.Items)+newsz), resp.Items...)
}
}

err := client.ListInto(ctx, namespace, resource.ListOptions{
LabelFilters: labelFilters,
Continue: options.Continue,
Expand Down

0 comments on commit 25d6c90

Please sign in to comment.