Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] feat: reduce allocs & improve throughput #998

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions v2/pkg/engine/datasource/httpclient/nethttpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,6 @@ func makeHTTPRequest(client *http.Client, ctx context.Context, url, method, head
}

if !enableTrace {
if response.ContentLength > 0 {
StarpTech marked this conversation as resolved.
Show resolved Hide resolved
out.Grow(int(response.ContentLength))
} else {
out.Grow(1024 * 4)
}
_, err = out.ReadFrom(respReader)
return
}
Expand Down
42 changes: 42 additions & 0 deletions v2/pkg/engine/resolve/fetch.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package resolve

import (
"bytes"
"encoding/json"
"slices"
"sync"

"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
)
Expand All @@ -20,6 +22,42 @@ type Fetch interface {
FetchKind() FetchKind
Dependencies() FetchDependencies
DataSourceInfo() DataSourceInfo

GetBuffer() *bytes.Buffer
ReportResponseSize(out *bytes.Buffer)
}

// FetchBufferSizeCalculator calculates the right size for a buffer based on the previous 64 fetches
jensneuse marked this conversation as resolved.
Show resolved Hide resolved
// Instead of using a buffer with a random default size and growing it to the right cap
// FetchBufferSizeCalculator uses information about previous fetches to suggest a reasonable size
// Overall, this has shown to reduce bytes.growSlice operations to almost zero in hot paths
type FetchBufferSizeCalculator struct {
mux sync.RWMutex
count int
total int
}

func (f *FetchBufferSizeCalculator) GetBuffer() *bytes.Buffer {
f.mux.RLock()
defer f.mux.RUnlock()
if f.count == 0 {
return bytes.NewBuffer(make([]byte, 0, 1024*4))
}
size := f.total / f.count
return bytes.NewBuffer(make([]byte, 0, size))
}

func (f *FetchBufferSizeCalculator) ReportResponseSize(out *bytes.Buffer) {
f.mux.Lock()
defer f.mux.Unlock()
inc := out.Cap()
if f.count > 64 { // reset after 64 fetches
f.total = inc
f.count = 1
} else {
f.count++
f.total += inc
}
}

type FetchItem struct {
Expand Down Expand Up @@ -71,6 +109,7 @@ const (
)

type SingleFetch struct {
FetchBufferSizeCalculator
FetchConfiguration
FetchDependencies
InputTemplate InputTemplate
Expand Down Expand Up @@ -140,6 +179,7 @@ func (_ *SingleFetch) FetchKind() FetchKind {
// allows to join nested fetches to the same subgraph into a single fetch
// representations variable will contain multiple items according to amount of entities matching this query
type BatchEntityFetch struct {
FetchBufferSizeCalculator
FetchDependencies
Input BatchInput
DataSource DataSource
Expand Down Expand Up @@ -182,6 +222,7 @@ func (_ *BatchEntityFetch) FetchKind() FetchKind {
// EntityFetch - represents nested entity fetch on object field
// representations variable will contain single item
type EntityFetch struct {
FetchBufferSizeCalculator
FetchDependencies
Input EntityInput
DataSource DataSource
Expand Down Expand Up @@ -217,6 +258,7 @@ func (_ *EntityFetch) FetchKind() FetchKind {
// Usually, you want to batch fetches within a list, which is the default behavior of SingleFetch
// However, if the data source does not support batching, you can use this fetch to make parallel fetches within a list
type ParallelListItemFetch struct {
FetchBufferSizeCalculator
Fetch *SingleFetch
Traces []*SingleFetch
Trace *DataSourceLoadTrace
Expand Down
32 changes: 20 additions & 12 deletions v2/pkg/engine/resolve/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,38 +155,41 @@ func (l *Loader) resolveSingle(item *FetchItem) error {
switch f := item.Fetch.(type) {
case *SingleFetch:
res := &result{
out: &bytes.Buffer{},
out: f.GetBuffer(),
}
err := l.loadSingleFetch(l.ctx.ctx, f, item, items, res)
if err != nil {
return err
}
f.ReportResponseSize(res.out)
err = l.mergeResult(item, res, items)
if l.ctx.LoaderHooks != nil && res.loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.statusCode, res.ds, goerrors.Join(res.err, l.ctx.subgraphErrors))
}
return err
case *BatchEntityFetch:
res := &result{
out: &bytes.Buffer{},
out: f.GetBuffer(),
}
err := l.loadBatchEntityFetch(l.ctx.ctx, item, f, items, res)
if err != nil {
return errors.WithStack(err)
}
f.ReportResponseSize(res.out)
err = l.mergeResult(item, res, items)
if l.ctx.LoaderHooks != nil && res.loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.statusCode, res.ds, goerrors.Join(res.err, l.ctx.subgraphErrors))
}
return err
case *EntityFetch:
res := &result{
out: &bytes.Buffer{},
out: f.GetBuffer(),
}
err := l.loadEntityFetch(l.ctx.ctx, item, f, items, res)
if err != nil {
return errors.WithStack(err)
}
f.ReportResponseSize(res.out)
err = l.mergeResult(item, res, items)
if l.ctx.LoaderHooks != nil && res.loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.statusCode, res.ds, goerrors.Join(res.err, l.ctx.subgraphErrors))
Expand All @@ -201,11 +204,10 @@ func (l *Loader) resolveSingle(item *FetchItem) error {
for i := range items {
i := i
results[i] = &result{
out: &bytes.Buffer{},
out: f.GetBuffer(),
}
if l.ctx.TracingOptions.Enable {
f.Traces[i] = new(SingleFetch)
*f.Traces[i] = *f.Fetch
f.Traces[i] = f.Fetch
g.Go(func() error {
return l.loadFetch(ctx, f.Traces[i], item, items[i:i+1], results[i])
})
Expand All @@ -220,6 +222,7 @@ func (l *Loader) resolveSingle(item *FetchItem) error {
return errors.WithStack(err)
}
for i := range results {
f.ReportResponseSize(results[i].out)
err = l.mergeResult(item, results[i], items[i:i+1])
if l.ctx.LoaderHooks != nil && results[i].loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(results[i].loaderHookContext, results[i].statusCode, results[i].ds, goerrors.Join(results[i].err, l.ctx.subgraphErrors))
Expand Down Expand Up @@ -369,7 +372,8 @@ func (l *Loader) itemsData(items []*astjson.Value) *astjson.Value {
func (l *Loader) loadFetch(ctx context.Context, fetch Fetch, fetchItem *FetchItem, items []*astjson.Value, res *result) error {
switch f := fetch.(type) {
case *SingleFetch:
res.out = &bytes.Buffer{}
res.out = fetch.GetBuffer()
defer fetch.ReportResponseSize(res.out)
return l.loadSingleFetch(ctx, f, fetchItem, items, res)
case *ParallelListItemFetch:
results := make([]*result, len(items))
Expand All @@ -380,11 +384,10 @@ func (l *Loader) loadFetch(ctx context.Context, fetch Fetch, fetchItem *FetchIte
for i := range items {
i := i
results[i] = &result{
out: &bytes.Buffer{},
out: fetch.GetBuffer(),
}
if l.ctx.TracingOptions.Enable {
f.Traces[i] = new(SingleFetch)
*f.Traces[i] = *f.Fetch
f.Traces[i] = f.Fetch
g.Go(func() error {
return l.loadFetch(ctx, f.Traces[i], fetchItem, items[i:i+1], results[i])
})
Expand All @@ -399,12 +402,17 @@ func (l *Loader) loadFetch(ctx context.Context, fetch Fetch, fetchItem *FetchIte
return errors.WithStack(err)
}
res.nestedMergeItems = results
for i := range results {
fetch.ReportResponseSize(results[i].out)
}
return nil
case *EntityFetch:
res.out = &bytes.Buffer{}
res.out = fetch.GetBuffer()
defer fetch.ReportResponseSize(res.out)
return l.loadEntityFetch(ctx, fetchItem, f, items, res)
case *BatchEntityFetch:
res.out = &bytes.Buffer{}
res.out = fetch.GetBuffer()
defer fetch.ReportResponseSize(res.out)
return l.loadBatchEntityFetch(ctx, fetchItem, f, items, res)
}
return nil
Expand Down
6 changes: 5 additions & 1 deletion v2/pkg/engine/resolve/resolvable.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ func (r *Resolvable) ResolveNode(node Node, data *astjson.Value, out io.Writer)
r.printErr = nil
r.authorizationError = nil
r.errors = r.astjsonArena.NewArray()

defer func() {
// remove references to buffers when no longer needed
r.out = nil
r.errors = nil
}()
hasErrors := r.walkNode(node, data)
if hasErrors {
return fmt.Errorf("error resolving node")
Expand Down
35 changes: 29 additions & 6 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -67,6 +68,8 @@ type Resolver struct {

propagateSubgraphErrors bool
propagateSubgraphStatusCodes bool

resolvableBufferPool *pool.LimitBufferPool
}

func (r *Resolver) SetAsyncErrorWriter(w AsyncErrorWriter) {
Expand Down Expand Up @@ -142,6 +145,8 @@ type ResolverOptions struct {
ResolvableOptions ResolvableOptions
// AllowedCustomSubgraphErrorFields defines which fields are allowed in the subgraph error when in passthrough mode
AllowedSubgraphErrorFields []string
// BufferPoolOptions defines the size & limits of the resolvable buffer pool
BufferPoolOptions pool.LimitBufferPoolOptions
}

// New returns a new Resolver, ctx.Done() is used to cancel all active subscriptions & streams
Expand Down Expand Up @@ -175,6 +180,19 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
allowedErrorFields[field] = struct{}{}
}

if options.BufferPoolOptions.MaxBuffers == 0 {
options.BufferPoolOptions.MaxBuffers = runtime.GOMAXPROCS(-1)
}
if options.BufferPoolOptions.MaxBuffers < 8 {
options.BufferPoolOptions.MaxBuffers = 8
}
if options.BufferPoolOptions.MaxBufferSize == 0 {
options.BufferPoolOptions.MaxBufferSize = 1024 * 1024 * 10 // 10MB
}
if options.BufferPoolOptions.DefaultBufferSize < 1024*8 {
options.BufferPoolOptions.DefaultBufferSize = 1024 * 8 // 8KB
}

resolver := &Resolver{
ctx: ctx,
options: options,
Expand All @@ -188,6 +206,7 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
triggerUpdateBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
allowedErrorExtensionFields: allowedExtensionFields,
allowedErrorFields: allowedErrorFields,
resolvableBufferPool: pool.NewLimitBufferPool(ctx, options.BufferPoolOptions),
}
resolver.maxConcurrency = make(chan struct{}, options.MaxConcurrency)
for i := 0; i < options.MaxConcurrency; i++ {
Expand Down Expand Up @@ -242,6 +261,7 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
}()

t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields)
defer t.resolvable.Reset() // set all references to nil, e.g. pointers to buffers

err := t.resolvable.Init(ctx, data, response.Info.OperationType)
if err != nil {
Expand All @@ -254,15 +274,17 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
return nil, err
}
}

buf := &bytes.Buffer{}
err = t.resolvable.Resolve(ctx.ctx, response.Data, response.Fetches, buf)
buf := r.resolvableBufferPool.Get()
defer r.resolvableBufferPool.Put(buf)
err = t.resolvable.Resolve(ctx.ctx, response.Data, response.Fetches, buf.Buf)
if err != nil {
return nil, err
}

_, err = buf.WriteTo(writer)
return resp, err
_, err = buf.Buf.WriteTo(writer)
if err != nil {
return nil, err
}
return resp, nil
}

type trigger struct {
Expand All @@ -287,6 +309,7 @@ func (r *Resolver) executeSubscriptionUpdate(ctx *Context, sub *sub, sharedInput
fmt.Printf("resolver:trigger:subscription:update:%d\n", sub.id.SubscriptionID)
}
t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields)
defer t.resolvable.Reset() // reset all references

input := make([]byte, len(sharedInput))
copy(input, sharedInput)
Expand Down
Loading
Loading