Skip to content

Commit

Permalink
Merge pull request #34 from MlDenis/iter16
Browse files Browse the repository at this point in the history
Iter16
  • Loading branch information
MlDenis authored Feb 1, 2024
2 parents 87c7d22 + be1ebe9 commit 3b1ad1c
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 41 deletions.
13 changes: 0 additions & 13 deletions cmd/server/config.go

This file was deleted.

20 changes: 17 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
"errors"
"flag"
"fmt"
"io"
"net/http"

"github.com/MlDenis/prometheus_wannabe/internal/converter"
"github.com/MlDenis/prometheus_wannabe/internal/database"
"github.com/MlDenis/prometheus_wannabe/internal/database/postgre"
Expand All @@ -24,10 +21,15 @@ import (
"github.com/MlDenis/prometheus_wannabe/internal/metrics/storage/file"
"github.com/MlDenis/prometheus_wannabe/internal/metrics/storage/memory"
"github.com/MlDenis/prometheus_wannabe/internal/worker"
"go.uber.org/zap"
"io"
"net/http"

"github.com/caarlos0/env/v7"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"

_ "net/http/pprof"
)

const (
Expand All @@ -44,6 +46,16 @@ var compressContentTypes = []string{
"text/xml",
}

type config struct {
Key string `env:"KEY"`
ServerURL string `env:"ADDRESS"`
StoreInterval int `env:"STORE_INTERVAL"`
StoreFile string `env:"STORE_FILE"`
Restore bool `env:"RESTORE"`
DB string `env:"DATABASE_DSN"`
LogLevel zap.AtomicLevel `env:"LOG_LEVEL"`
}

type metricInfoContextKey struct {
key string
}
Expand Down Expand Up @@ -130,8 +142,10 @@ func createConfig() (*config, error) {

func initRouter(metricsStorage storage.MetricsStorage, converter *model.MetricsConverter, htmlPageBuilder html.HTMLPageBuilder, dbStorage database.DataBase) *chi.Mux {
router := chi.NewRouter()

router.Use(middleware.Logger)
router.Use(middleware.Compress(gzip.BestSpeed, compressContentTypes...))
router.Mount("/debug", middleware.Profiler())
router.Route("/update", func(r chi.Router) {
r.With(fillSingleJSONContext, updateMetrics(metricsStorage, converter)).
Post("/", successSingleJSONResponse())
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/MlDenis/prometheus_wannabe

go 1.20
go 1.21

require (
github.com/caarlos0/env/v7 v7.1.0
Expand All @@ -19,7 +19,6 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
github.com/caarlos0/env/v7 v7.1.0 h1:9lzTF5amyQeWHZzuZeKlCb5FWSUxpG1js43mhbY8ozg=
github.com/caarlos0/env/v7 v7.1.0/go.mod h1:LPPWniDUq4JaO6Q41vtlyikhMknqymCLBw0eX4dcH1E=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -18,6 +17,7 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZ
github.com/jackc/pgx/v5 v5.4.2 h1:u1gmGDwbdRUZiwisBm/Ky2M14uQyUP65bG8+20nnyrg=
github.com/jackc/pgx/v5 v5.4.2/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
Expand Down Expand Up @@ -52,6 +52,8 @@ github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYm
github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
Expand All @@ -71,6 +73,7 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func (a *aggregateMetricsProvider) GetMetrics() <-chan metrics.Metric {
func (a *aggregateMetricsProvider) Update(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)

for i := 0; i < len(a.providers); i++ {
num := i
for _, provider := range a.providers {
provider := provider
eg.Go(func() error {
err := a.providers[num].Update(ctx)
err := provider.Update(ctx)
if err != nil {
return logger.WrapError("update metrics", err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/metrics/provider/gopsutil/gopsutilMetricProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ import (
"golang.org/x/sync/errgroup"
)

var cpuInterval = 100 * time.Millisecond
const cpuInterval = 100 * time.Millisecond

// GopsutilMetricsProvider is a provider of Gopsutil metrics.
type GopsutilMetricsProvider struct {
totalMetric metrics.Metric
freeMetric metrics.Metric
cpuUtilizationMetrics map[int]metrics.Metric
cpuUtilizationMetrics []metrics.Metric
}

// NewGopsutilMetricsProvider create new instance of GopsutilMetricsProvider.
func NewGopsutilMetricsProvider() *GopsutilMetricsProvider {
numCPU := runtime.NumCPU()
cpuUtilizationMetrics := make(map[int]metrics.Metric, numCPU)
cpuUtilizationMetrics := make([]metrics.Metric, numCPU)
for i := 0; i < numCPU; i++ {
cpuUtilizationMetrics[i] = types.NewGaugeMetric(fmt.Sprintf("CPUutilization%v", i+1))
}
Expand Down
12 changes: 7 additions & 5 deletions internal/metrics/provider/runtime/runtimeMetricsProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,20 @@ func (p *runtimeMetricsProvider) Update(context.Context) error {
stats := runtime.MemStats{}
runtime.ReadMemStats(&stats)

var err error
for _, metric := range p.metrics {
metricName := metric.GetName()
metricValue, err := getFieldValue(&stats, metricName)
if err != nil {
return logger.WrapError(fmt.Sprintf("get %s runtime metric value", metricName), err)
metricValue, metricErr := getFieldValue(&stats, metricName)
if metricErr != nil {
err = logger.WrapError(fmt.Sprintf("get %s runtime metric value", metricName), metricErr)
logrus.Error(err)
continue
}

metric.SetValue(metricValue)
logrus.Infof("Updated metric: %v. value: %v", metricName, metric.GetStringValue())
}

return nil
return err
}

func (p *runtimeMetricsProvider) GetMetrics() <-chan metrics.Metric {
Expand Down
26 changes: 20 additions & 6 deletions internal/metrics/sendler/http/httpMetricsSendler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
"sync"
"time"

"github.com/MlDenis/prometheus_wannabe/internal/logger"
Expand All @@ -27,12 +28,18 @@ type metricsPusherConfig interface {

type httpMetricsPusher struct {
parallelLimit int
client http.Client
client *http.Client
metricsServerURL string
pushTimeout time.Duration
converter *model.MetricsConverter
}

var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}

func NewMetricsPusher(config metricsPusherConfig, converter *model.MetricsConverter) (sendler.MetricsPusher, error) {
serverURL, err := normalizeURL(config.MetricsServerURL())
if err != nil {
Expand All @@ -41,7 +48,7 @@ func NewMetricsPusher(config metricsPusherConfig, converter *model.MetricsConver

return &httpMetricsPusher{
parallelLimit: config.ParallelLimit(),
client: http.Client{},
client: &http.Client{},
metricsServerURL: serverURL.String(),
pushTimeout: config.PushMetricsTimeout(),
converter: converter,
Expand Down Expand Up @@ -94,13 +101,16 @@ func (p *httpMetricsPusher) pushMetrics(ctx context.Context, metricsList []metri
modelMetrics[i] = modelMetric
}

var buffer bytes.Buffer
err := json.NewEncoder(&buffer).Encode(modelMetrics)
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()

err := json.NewEncoder(buf).Encode(modelMetrics)
if err != nil {
return logger.WrapError("serialize model request", err)
}

request, err := http.NewRequestWithContext(pushCtx, http.MethodPost, p.metricsServerURL+"/updates", &buffer)
request, err := http.NewRequestWithContext(pushCtx, http.MethodPost, p.metricsServerURL+"/updates", buf)
if err != nil {
return logger.WrapError("create push request", err)
}
Expand All @@ -125,7 +135,11 @@ func (p *httpMetricsPusher) pushMetrics(ctx context.Context, metricsList []metri
}

for _, metric := range metricsList {
logrus.Infof("Pushed metric: %v. value: %v, status: %v", metric.GetName(), metric.GetStringValue(), response.Status)
logrus.WithFields(logrus.Fields{
"metric": metric.GetName(),
"value": metric.GetStringValue(),
"status": response.Status,
}).Info("Pushed metric")
metric.ResetState()
}

Expand Down
3 changes: 3 additions & 0 deletions internal/metrics/storage/file/fileStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func (f *fileStorage) Restore(ctx context.Context, metricValues map[string]map[s
func (f *fileStorage) updateMetrics(metricsList []metrics.Metric) error {
// Read and write
return f.workWithFile(os.O_CREATE|os.O_RDWR, func(fileStream *os.File) error {
f.lock.Lock()
defer f.lock.Unlock()

metricsMap := map[string]metrics.Metric{} // contains?
for _, metric := range metricsList {
metricsMap[metric.GetType()+metric.GetName()] = metric
Expand Down
12 changes: 7 additions & 5 deletions internal/metrics/storage/memory/inMemoryStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/MlDenis/prometheus_wannabe/internal/converter"
"github.com/MlDenis/prometheus_wannabe/internal/logger"
"github.com/MlDenis/prometheus_wannabe/internal/metrics"
"github.com/MlDenis/prometheus_wannabe/internal/metrics/storage"
"github.com/MlDenis/prometheus_wannabe/internal/metrics/types"
Expand All @@ -25,6 +24,9 @@ func NewInMemoryStorage() storage.MetricsStorage {
}

func (s *inMemoryStorage) AddMetricValues(ctx context.Context, metricList []metrics.Metric) ([]metrics.Metric, error) {
s.lock.Lock()
defer s.lock.Unlock()

result := make([]metrics.Metric, len(metricList))

for i, metric := range metricList {
Expand Down Expand Up @@ -71,12 +73,12 @@ func (s *inMemoryStorage) GetMetric(ctx context.Context, metricType string, metr
defer s.lock.RUnlock()
metricsByName, ok := s.metricsByType[metricType]
if !ok {
return nil, logger.WrapError(fmt.Sprintf("get metric with type %s", metricType), metrics.ErrMetricNotFound)
return nil, fmt.Errorf("get metric with type %s: %w", metricType, metrics.ErrMetricNotFound)
}

metric, ok := metricsByName[metricName]
if !ok {
return nil, logger.WrapError(fmt.Sprintf("metrics with name %v and types %v not found", metricName, metricType), metrics.ErrMetricNotFound)
return nil, fmt.Errorf("metrics with name %v and types %v not found: %w", metricName, metricType, metrics.ErrMetricNotFound)
}

return metric, nil
Expand All @@ -93,13 +95,13 @@ func (s *inMemoryStorage) Restore(ctx context.Context, metricValues map[string]m
if metricType == "counter" {
metricFactory = types.NewCounterMetric
} else if metricType != "gauge" {
return logger.WrapError(fmt.Sprintf("handle backup metric with type '%s'", metricType), metrics.ErrUnknownMetricType)
return fmt.Errorf("handle backup metric with type '%s': %w", metricType, metrics.ErrUnknownMetricType)
}

for metricName, metricValue := range metricsByType {
value, err := converter.ToFloat64(metricValue)
if err != nil {
return logger.WrapError("parse float metric value", err)
return fmt.Errorf("parse float metric value: %w", err)
}

metricsList, ok := s.metricsByType[metricType]
Expand Down
Binary file added profiles/base.pprof
Binary file not shown.
Binary file added profiles/result.pprof
Binary file not shown.

0 comments on commit 3b1ad1c

Please sign in to comment.