Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,7 @@ override.tf.json
# Ignore CLI configuration files
.terraformrc
terraform.rc

# AI files to temporarily ignore, until we're ready to commit them
CLAUDE.md
/.cursor
10 changes: 10 additions & 0 deletions .prettierrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"overrides": [
{
"files": ".cursor/rules/*.mdc",
"options": {
"parser": "markdown"
}
}
]
}
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ ifneq ($(REGENERATE),never)
app/functions/helmless/default-values.yaml: helm/values.yaml $(wildcard helm/*.yaml helm/templates/*.yaml helm/templates/*.tpl helm/*.yaml)
$(HELM) show values ./helm | $(PRETTIER) --stdin-filepath $@ > $@

generate: app/functions/helmless/default-values.yaml

bin/cloudzero-helmless: app/functions/helmless/default-values.yaml

# Add the embedded defaults file to dependencies
Expand Down
23 changes: 15 additions & 8 deletions app/config/gator/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ type Settings struct {
Region string `yaml:"region" env:"CSP_REGION" env-description:"cloud service provider region"`
ClusterName string `yaml:"cluster_name" env:"CLUSTER_NAME" env-description:"name of the cluster to monitor"`

Server Server `yaml:"server"`
Logging Logging `yaml:"logging"`
Database Database `yaml:"database"`
Cloudzero Cloudzero `yaml:"cloudzero"`
Metrics Metrics `yaml:"metrics"`
Server Server `yaml:"server"`
Logging Logging `yaml:"logging"`
Database Database `yaml:"database"`
Cloudzero Cloudzero `yaml:"cloudzero"`
Metrics Metrics `yaml:"metrics"`
Certificate Certificate `yaml:"certificate"`

mu sync.Mutex
}
Expand All @@ -59,16 +60,21 @@ type Metrics struct {
ObservabilityLabels []filter.FilterEntry `yaml:"observability_labels"`
}

type Certificate struct {
Cert string `yaml:"cert" env:"CERT_PATH" env-description:"path to TLS certificate file"`
Key string `yaml:"key" env:"KEY_PATH" env-description:"path to TLS key file"`
}

type Logging struct {
Level string `yaml:"level" default:"info" env:"LOG_LEVEL" env-description:"logging level such as debug, info, error"`
Capture bool `yaml:"capture" default:"true" env:"LOG_CAPTURE" env-description:"whether to persist logs to disk or not"`
}

type Database struct {
StoragePath string `yaml:"storage_path" default:"/cloudzero/data" env:"DATABASE_STORAGE_PATH" env-description:"location where to write database"`
MaxRecords int `yaml:"max_records" default:"1000000" env:"MAX_RECORDS_PER_FILE" env-description:"maximum records per file"`
MaxRecords int `yaml:"max_records" default:"1500000" env:"MAX_RECORDS_PER_FILE" env-description:"maximum records per file"`
CompressionLevel int `yaml:"compression_level" default:"8" env:"DATABASE_COMPRESS_LEVEL" env-description:"compression level for database files"`
CostMaxInterval time.Duration `yaml:"cost_max_interval" default:"10m" env:"COST_MAX_INTERVAL" env-description:"maximum interval to wait before flushing cost metrics"`
CostMaxInterval time.Duration `yaml:"cost_max_interval" default:"30m" env:"COST_MAX_INTERVAL" env-description:"maximum interval to wait before flushing cost metrics"`
ObservabilityMaxInterval time.Duration `yaml:"observability_max_interval" default:"10m" env:"OBSERVABILITY_MAX_INTERVAL" env-description:"maximum interval to wait before flushing observability metrics"`

PurgeRules PurgeRules `yaml:"purge_rules"`
Expand All @@ -82,8 +88,9 @@ type PurgeRules struct {
}

type Server struct {
Mode string `yaml:"mode" default:"http" env:"SERVER_MODE" env-description:"server mode such as http, https"`
Mode string `yaml:"mode" default:"http" env:"SERVER_MODE" env-description:"server mode such as http, https, dual"`
Port uint `yaml:"port" default:"8080" env:"SERVER_PORT" env-description:"server port"`
TLSPort uint `yaml:"tls_port" default:"8443" env:"SERVER_TLS_PORT" env-description:"server TLS port"`
Profiling bool `yaml:"profiling" default:"false" env:"SERVER_PROFILING" env-description:"enable profiling"`
ReconnectFrequency int `yaml:"reconnect_frequency" default:"16" env:"SERVER_RECONNECT_FREQUENCY" env-description:"how frequently to close HTTP connections from clients, to distribute the load. 0=never, otherwise 1/N probability."`
}
Expand Down
50 changes: 50 additions & 0 deletions app/domain/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ var (
},
[]string{},
)
// Custom metric for HPA scaling based on cost metrics shipping progress
costMetricsShippingProgress = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: types.ObservabilityMetric("cost_metrics_shipping_progress"),
Help: "Progress towards cost metrics shipping goal (ratio of currentPending/targetProgress), where targetProgress = (elapsedTime/costMaxInterval) * maxRecords, 1.0 = 100% of expected rate",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should add a "buffer" or scale factor, eg (currentPending/targetProgress)*1.05

Nevermind, this is handled properly by targetValue: "900m"

Why am I still posting this comment? Not sure, just to say, this all looks great to me. Nice work!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also working on a simplification where we just base it off of metrics/minute on a 2-minute sliding window, which also lets us do some other fun stuff...

},
[]string{},
)
)

// MetricCollector is responsible for collecting and flushing metrics.
Expand Down Expand Up @@ -173,6 +181,9 @@ func (d *MetricCollector) PutMetrics(ctx context.Context, contentType, encodingT
return stats, err
}

// Update the shipping progress metric for HPA scaling
d.updateShippingProgressMetric()

// In order to reduce the amount of time until the server starts seeing
// data, we perform a first flush 🍵 of the cost metrics immediately
// upon receipt.
Expand All @@ -193,6 +204,41 @@ func (d *MetricCollector) PutMetrics(ctx context.Context, contentType, encodingT
return stats, nil
}

// updateShippingProgressMetric calculates and updates the shipping progress metric
// for HPA scaling based on time-based expected progress versus actual pending records.
func (d *MetricCollector) updateShippingProgressMetric() {
currentPending := d.costStore.Pending()
maxRecords := d.settings.Database.MaxRecords
elapsedTime := d.costStore.ElapsedTime()
costMaxInterval := d.settings.Database.CostMaxInterval

// Calculate time-based target progress using the correct formula:
// targetProgress = (elapsedTime / costMaxInterval) * maxRecords
// progress = currentPending / targetProgress

// Convert costMaxInterval to milliseconds for calculation
costMaxIntervalMs := costMaxInterval.Milliseconds()

// Calculate expected number of records at this point in time
var progress float64
if elapsedTime == 0 || costMaxIntervalMs == 0 {
// At the very beginning or with invalid interval, use simple ratio
progress = float64(currentPending) / float64(maxRecords)
} else {
// Calculate time-based expected progress
targetProgress := (float64(elapsedTime) / float64(costMaxIntervalMs)) * float64(maxRecords)

if targetProgress == 0 {
// Avoid division by zero at the very start
progress = 0.0
} else {
progress = float64(currentPending) / targetProgress
}
}

costMetricsShippingProgress.WithLabelValues().Set(progress)
}

type metricCounter map[string]map[string]int

func (m metricCounter) Add(metricName string, metricValue string) {
Expand All @@ -207,6 +253,10 @@ func (d *MetricCollector) Flush(ctx context.Context) error {
if err := d.costStore.Flush(); err != nil {
return err
}

// Update the shipping progress metric after flushing
d.updateShippingProgressMetric()

return d.observabilityStore.Flush()
}

Expand Down
151 changes: 151 additions & 0 deletions app/domain/metric_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func TestPutMetrics(t *testing.T) {
storage := mocks.NewMockStore(ctrl)
storage.EXPECT().Put(ctx, gomock.Any()).Return(nil)
storage.EXPECT().Flush().Return(nil)
storage.EXPECT().Pending().Return(0).AnyTimes() // For the shipping progress metric
storage.EXPECT().ElapsedTime().Return(int64(10000)).AnyTimes() // For the time-based shipping progress metric
d, err := domain.NewMetricCollector(&cfg, mockClock, storage, nil)
require.NoError(t, err)
defer d.Close()
Expand All @@ -55,6 +57,8 @@ func TestPutMetrics(t *testing.T) {
storage := mocks.NewMockStore(ctrl)
storage.EXPECT().Put(ctx, gomock.Any()).Return(nil)
storage.EXPECT().Flush().Return(nil)
storage.EXPECT().Pending().Return(0).AnyTimes() // For the shipping progress metric
storage.EXPECT().ElapsedTime().Return(int64(10000)).AnyTimes() // For the time-based shipping progress metric
d, err := domain.NewMetricCollector(&cfg, mockClock, storage, nil)
require.NoError(t, err)
defer d.Close()
Expand All @@ -74,3 +78,150 @@ func TestPutMetrics(t *testing.T) {
assert.NotNil(t, stats)
})
}

func TestCostMetricsShippingProgressMetric(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

initialTime := time.Date(2023, 10, 1, 12, 0, 0, 0, time.UTC)
mockClock := mocks.NewMockClock(initialTime)

ctx := context.Background()

tests := []struct {
name string
maxRecords int
pendingRecords int
elapsedTimeMs int64
expectedProgress float64
expectMetricUpdateCalled bool
}{
{
name: "zero pending records",
maxRecords: 1000,
pendingRecords: 0,
elapsedTimeMs: 10000, // 10 seconds
expectedProgress: 0.0,
expectMetricUpdateCalled: true,
},
{
name: "10 seconds elapsed with expected rate",
maxRecords: 1000,
pendingRecords: 6, // Expected: (10000/1800000) * 1000 = 5.56, actual: 6, so 6/5.56 ≈ 1.08
elapsedTimeMs: 10000,
expectedProgress: 1.08,
expectMetricUpdateCalled: true,
},
{
name: "30 seconds elapsed with expected rate",
maxRecords: 1000,
pendingRecords: 17, // Expected: (30000/1800000) * 1000 = 16.67, actual: 17, so 17/16.67 ≈ 1.02
elapsedTimeMs: 30000,
expectedProgress: 1.02,
expectMetricUpdateCalled: true,
},
{
name: "5 minutes elapsed with expected rate",
maxRecords: 1000,
pendingRecords: 167, // Expected: (300000/1800000) * 1000 = 166.67, actual: 167, so 167/166.67 ≈ 1.0
elapsedTimeMs: 300000,
expectedProgress: 1.0,
expectMetricUpdateCalled: true,
},
{
name: "15 minutes elapsed with expected rate",
maxRecords: 1500000,
pendingRecords: 750000, // Expected: (900000/1800000) * 1500000 = 750000, actual: 750000, so 750000/750000 = 1.0
elapsedTimeMs: 900000,
expectedProgress: 1.0,
expectMetricUpdateCalled: true,
},
{
name: "30 minutes elapsed (full interval)",
maxRecords: 1000,
pendingRecords: 1000, // Expected: (1800000/1800000) * 1000 = 1000, actual: 1000, so 1000/1000 = 1.0
elapsedTimeMs: 1800000,
expectedProgress: 1.0,
expectMetricUpdateCalled: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := config.Settings{
CloudAccountID: "123456789012",
Region: "us-west-2",
ClusterName: "testcluster",
Database: config.Database{
MaxRecords: tt.maxRecords,
CostMaxInterval: 30 * time.Minute, // 30 minutes = 1800000 milliseconds
},
}

// Create a mock store that implements all needed methods
mockStore := mocks.NewMockStore(ctrl)

if tt.expectMetricUpdateCalled {
mockStore.EXPECT().Pending().Return(tt.pendingRecords).AnyTimes()
mockStore.EXPECT().ElapsedTime().Return(tt.elapsedTimeMs).AnyTimes()
}

mockStore.EXPECT().Put(ctx, gomock.Any()).Return(nil).AnyTimes()
mockStore.EXPECT().Flush().Return(nil).AnyTimes()

d, err := domain.NewMetricCollector(&cfg, mockClock, mockStore, nil)
require.NoError(t, err)
defer d.Close()

// Create test metric data
payload, _, _, err := testdata.BuildWriteRequest(testdata.WriteRequestFixture.Timeseries, nil, nil, nil, nil, "snappy")
require.NoError(t, err)

// Process metrics to trigger the progress metric update
stats, err := d.PutMetrics(ctx, "application/x-protobuf", "snappy", payload)
assert.NoError(t, err)
assert.Nil(t, stats)
})
}
}

func TestUpdateShippingProgressMetric(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

initialTime := time.Date(2023, 10, 1, 12, 0, 0, 0, time.UTC)
mockClock := mocks.NewMockClock(initialTime)

ctx := context.Background()

t.Run("flush triggers metric update", func(t *testing.T) {
cfg := config.Settings{
CloudAccountID: "123456789012",
Region: "us-west-2",
ClusterName: "testcluster",
Database: config.Database{
MaxRecords: 1000,
CostMaxInterval: 30 * time.Minute,
},
}

// Create mock stores for both cost and observability
mockCostStore := mocks.NewMockStore(ctrl)
mockObservabilityStore := mocks.NewMockStore(ctrl)

// Expect Pending() and ElapsedTime() to be called when Flush() is called
mockCostStore.EXPECT().Pending().Return(0).AnyTimes()
mockCostStore.EXPECT().ElapsedTime().Return(int64(10000)).AnyTimes()

mockCostStore.EXPECT().Flush().Return(nil)
mockObservabilityStore.EXPECT().Flush().Return(nil)

d, err := domain.NewMetricCollector(&cfg, mockClock, mockCostStore, mockObservabilityStore)
require.NoError(t, err)
defer d.Close()

// Call flush to trigger metric update
err = d.Flush(ctx)
assert.NoError(t, err)
})
}
Loading
Loading