From 572e53b598148a55297528ee5e5e18c36962aa32 Mon Sep 17 00:00:00 2001 From: hanzhaoyang Date: Thu, 22 Feb 2024 18:38:37 +0800 Subject: [PATCH] Fix: the concurrent number of zstd encoders in k8s container and support zstd configuration re-initialization(#92) --- go.mod | 1 + go.sum | 3 +++ util/compression/flags.go | 22 ++++++++++++++++++++++ util/compression/flags_test.go | 11 +++++++++++ util/compression/types.go | 7 +++++++ util/compression/zstd.go | 4 +++- 6 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 util/compression/flags.go create mode 100644 util/compression/flags_test.go diff --git a/go.mod b/go.mod index 0d31ca7..ee2f127 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.0 + go.uber.org/automaxprocs v1.5.3 k8s.io/api v0.26.3 k8s.io/apimachinery v0.26.3 k8s.io/apiserver v0.26.3 diff --git a/go.sum b/go.sum index da44516..ba9f8ed 100644 --- a/go.sum +++ b/go.sum @@ -264,6 +264,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= @@ -360,6 +361,8 @@ go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= +go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= diff --git a/util/compression/flags.go b/util/compression/flags.go new file mode 100644 index 0000000..5a2b9ff --- /dev/null +++ b/util/compression/flags.go @@ -0,0 +1,22 @@ +package compression + +import ( + _ "go.uber.org/automaxprocs" + + "github.com/spf13/pflag" + "runtime" +) + +var ( + EncoderLevel = 2 + EnCoderConcurrent = runtime.GOMAXPROCS(0) +) + +func AddFlags(fs *pflag.FlagSet) { + AddZStdEncoderFlags(fs) +} + +func AddZStdEncoderFlags(fs *pflag.FlagSet) { + fs.IntVar(&EncoderLevel, "zstd-encoder-level", EncoderLevel, "specifies a predefined compression level") + fs.IntVar(&EnCoderConcurrent, "zstd-encoder-concurrent", EnCoderConcurrent, "set the concurrency, meaning the maximum number of encoders to run concurrently") +} diff --git a/util/compression/flags_test.go b/util/compression/flags_test.go new file mode 100644 index 0000000..ca565f4 --- /dev/null +++ b/util/compression/flags_test.go @@ -0,0 +1,11 @@ +package compression + +import ( + "testing" + + "github.com/spf13/pflag" +) + +func TestFlags(t *testing.T) { + AddFlags(pflag.NewFlagSet("test", pflag.PanicOnError)) +} diff --git a/util/compression/types.go b/util/compression/types.go index 493fa24..82a4ba3 100644 --- a/util/compression/types.go +++ b/util/compression/types.go @@ -50,3 +50,10 @@ func init() { c.init() } } + +func Reinitialize() { + // reinitialize compressors + for _, c := range compressors { + c.init() + } +} diff --git a/util/compression/zstd.go b/util/compression/zstd.go index c29f985..545ad49 100644 --- a/util/compression/zstd.go +++ b/util/compression/zstd.go @@ -66,7 +66,9 @@ func (c *zstdCompressor) init() { // no compression: 38826717 ns 1.00x // gzip: 94855264 ns 2.44x // zstd: 48524197 ns 1.25x - zstd.WithEncoderLevel(zstd.SpeedDefault), + zstd.WithEncoderLevel(zstd.EncoderLevel(EncoderLevel)), + + zstd.WithEncoderConcurrency(EnCoderConcurrent), // TODO(charlie0129): give a dictionary to compressor to get even more improvements. // // Since we are dealing with highly-specialized small JSON data, a dictionary will