Skip to content

Commit

Permalink
[COST-3571] generate daily reports (#184)
Browse files Browse the repository at this point in the history
* only gather data for full days on initial CR creation

* files are packaged after 96 hours of data are collected

* when upgrading the operator, move and package the old files and regenerate data for the latest day

* add a method for copying files instead of defaulting to moving files

* report files generated before midnight are copied and packaged. the files remain in the data dir so we can append the rest of the days data to the file

* after midnight, the files are moved out of the data dir and packaged
  • Loading branch information
maskarb committed Sep 8, 2023
1 parent 881c060 commit fc6b035
Show file tree
Hide file tree
Showing 15 changed files with 786 additions and 337 deletions.
21 changes: 6 additions & 15 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ jobs:
runs-on: ubuntu-22.04
steps:
- name: Checkout repository
uses: actions/checkout@v3.1.0
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: 1.18
- name: golangci-lint
Expand All @@ -31,27 +31,18 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3.1.0
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Display build environment
run: printenv

- name: Setup Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
stable: 'true'
go-version: 1.18 # The Go version to download (if necessary) and use.

- name: Cache Go modules
uses: actions/cache@v2
with:
path: |
~/go/pkg/mod
key: ${{ runner.os }}-build-${{ hashFiles('**/go.sum') }}


- name: Install kubebuilder
run : |
os=$(go env GOOS)
Expand Down Expand Up @@ -80,7 +71,7 @@ jobs:
- name: Checkout
# this checkout is required for the coverage report. If we don't do this, then
# the uploaded report is invalid and codecov doesn't know how to process it.
uses: actions/checkout@v3.1.0
uses: actions/checkout@v3
with:
fetch-depth: 0

Expand All @@ -90,7 +81,7 @@ jobs:
name: coverage

- name: Upload coverage
uses: codecov/codecov-action@v3.1.1
uses: codecov/codecov-action@v3
with:
file: ./cover.out
flags: unittests
Expand Down
10 changes: 4 additions & 6 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package collector

import (
"errors"
"fmt"
"regexp"
"sort"
Expand Down Expand Up @@ -33,6 +34,8 @@ var (
statusTimeFormat = "2006-01-02 15:04:05"

log = logr.Log.WithName("collector")

ErrNoData = errors.New("no data to collect")
)

type mappedCSVStruct map[string]csvStruct
Expand Down Expand Up @@ -197,10 +200,8 @@ func GenerateReports(cr *metricscfgv1beta1.MetricsConfig, dirCfg *dirconfig.Dire

if len(nodeResults) <= 0 {
log.Info("no data to report")
cr.Status.Reports.DataCollected = false
cr.Status.Reports.DataCollectionMessage = "No data to report for the hour queried."
// there is no data for the hour queried. Return nothing
return nil
return ErrNoData
}
for node, val := range nodeResults {
resourceID := getResourceID(val["provider_id"].(string))
Expand Down Expand Up @@ -248,9 +249,6 @@ func GenerateReports(cr *metricscfgv1beta1.MetricsConfig, dirCfg *dirconfig.Dire

//################################################################################################################

cr.Status.Reports.DataCollected = true
cr.Status.Reports.DataCollectionMessage = ""

return nil
}

Expand Down
6 changes: 1 addition & 5 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,9 @@ func TestGenerateReportsNoNodeData(t *testing.T) {
},
TimeSeries: &fakeTimeRange,
}
if err := GenerateReports(fakeCR, fakeDirCfg, fakeCollector); err != nil {
if err := GenerateReports(fakeCR, fakeDirCfg, fakeCollector); err != nil && err != ErrNoData {
t.Errorf("Failed to generate reports: %v", err)
}
wanted := "No data to report for the hour queried."
if fakeCR.Status.Reports.DataCollectionMessage != wanted {
t.Errorf("Status not updated correctly: got %s want %s", fakeCR.Status.Reports.DataCollectionMessage, wanted)
}
filelist, err := ioutil.ReadDir(filepath.Join("test_files", "test_reports"))
if err != nil {
t.Fatalf("Failed to read expected reports dir")
Expand Down
123 changes: 77 additions & 46 deletions controllers/kokumetricsconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/project-koku/koku-metrics-operator/storage"
)

const HOURS_IN_DAY int = 23 // first hour is 0: 0 -> 23 == 24 hrs

var (
GitCommit string

Expand Down Expand Up @@ -67,11 +69,11 @@ type MetricsConfigReconciler struct {
InCluster bool
Namespace string

apiReader client.Reader
cvClientBuilder cv.ClusterVersionBuilder
promCollector *collector.PrometheusCollector
disablePreviousDataCollection bool
overrideSecretPath bool
apiReader client.Reader
cvClientBuilder cv.ClusterVersionBuilder
promCollector *collector.PrometheusCollector
initialDataCollection bool
overrideSecretPath bool
}

type previousAuthValidation struct {
Expand Down Expand Up @@ -426,20 +428,20 @@ func checkSource(r *MetricsConfigReconciler, handler *sources.SourceHandler, cr
}
}

func packageFiles(p *packaging.FilePackager) {
func packageFiles(p *packaging.FilePackager, cr *metricscfgv1beta1.MetricsConfig) {
log := log.WithName("packageAndUpload")

// if its time to package
if !checkCycle(log, *p.CR.Status.Upload.UploadCycle, p.CR.Status.Packaging.LastSuccessfulPackagingTime, "file packaging") {
if !checkCycle(log, *cr.Status.Upload.UploadCycle, cr.Status.Packaging.LastSuccessfulPackagingTime, "file packaging") {
return
}

// Package and split the payload if necessary
p.CR.Status.Packaging.PackagingError = ""
if err := p.PackageReports(); err != nil {
cr.Status.Packaging.PackagingError = ""
if err := p.PackageReports(cr); err != nil {
log.Error(err, "PackageReports failed")
// update the CR packaging error status
p.CR.Status.Packaging.PackagingError = err.Error()
cr.Status.Packaging.PackagingError = err.Error()
}
}

Expand Down Expand Up @@ -546,9 +548,7 @@ func configurePVC(r *MetricsConfigReconciler, req ctrl.Request, cr *metricscfgv1

if strings.Contains(cr.Status.Storage.VolumeType, "EmptyDir") {
cr.Status.Storage.VolumeMounted = false
if err := r.Status().Update(ctx, cr); err != nil {
log.Error(err, "failed to update MetricsConfig status")
}
r.updateStatusAndLogError(ctx, cr)
return &ctrl.Result{}, fmt.Errorf("PVC not mounted")
}
return nil, nil
Expand Down Expand Up @@ -594,9 +594,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// set the cluster ID & return if there are errors
if err := setClusterID(r, cr); err != nil {
log.Error(err, "failed to obtain clusterID")
if err := r.Status().Update(ctx, cr); err != nil {
log.Error(err, "failed to update MetricsConfig status")
}
r.updateStatusAndLogError(ctx, cr)
return ctrl.Result{}, err
}

Expand All @@ -610,6 +608,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// so we need to package the old files before generating new reports.
// We set this packaging time to zero so that the next call to packageFiles
// will force file packaging to occur.
log.Info("commit changed, resetting packaging time to force packaging of old data files")
cr.Status.Packaging.LastSuccessfulPackagingTime = metav1.Time{}
cr.Status.OperatorCommit = GitCommit
}
Expand All @@ -623,9 +622,11 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}

startTime, endTime := getTimeRange(ctx, r, cr)

packager := &packaging.FilePackager{
CR: cr,
DirCfg: dirCfg,
DirCfg: dirCfg,
FilesAction: packaging.MoveFiles,
}

// if packaging time is zero but there are files in the data dir, this is an upgraded operator.
Expand All @@ -635,44 +636,70 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
files, err := dirCfg.Reports.GetFiles()
if err == nil && len(files) > 0 {
log.Info("packaging files from an old operator version")
packageFiles(packager)
packageFiles(packager, cr)
// after packaging files after an upgrade, truncate the start time so we recollect
// all of today's data. This ensures that today's report contains any new report changes.
startTime = startTime.Truncate(24 * time.Hour)
}
}

// attempt to collect prometheus stats and create reports
if err := getPromCollector(r, cr); err != nil {
log.Error(err, "failed to get prometheus connection")
log.Info("failed to get prometheus connection", "error", err)
r.updateStatusAndLogError(ctx, cr)
return ctrl.Result{RequeueAfter: time.Minute * 2}, err // give things a break and try again in 2 minutes
}
originalStartTime, endTime := getTimeRange(ctx, r, cr)
startTime := originalStartTime
for startTime.Before(endTime) {
t := startTime
timeRange := promv1.Range{
Start: t,
End: t.Add(59*time.Minute + 59*time.Second),
Step: time.Minute,

for start := startTime; !start.After(endTime); start = start.AddDate(0, 0, 1) {
t := start
hours := int(endTime.Sub(t).Hours())
if hours > HOURS_IN_DAY {
hours = HOURS_IN_DAY
}
collectPromStats(r, cr, dirCfg, timeRange)
if startTime.Sub(originalStartTime) == 48*time.Hour {
// after collecting 48 hours of data, package the report to compress the files
for i := 0; i <= hours; i++ {
timeRange := promv1.Range{
Start: t,
End: t.Add(59*time.Minute + 59*time.Second),
Step: time.Minute,
}
if err := collectPromStats(r, cr, dirCfg, timeRange); err != nil {
if err == collector.ErrNoData && t.Hour() == 0 && t.Day() != endTime.Day() && r.initialDataCollection {
// if there is no data for the first hour of the day, and we are doing the
// initial data collection, skip to the next day so we avoid collecting
// partial data for a full day. This ensures we are generating a full daily
// report upon initial ingest.
log.Info("skipping data collection for day", "datetime", timeRange.Start)
break
}
}
t = t.Add(1 * time.Hour)
}

if r.initialDataCollection && t.Sub(startTime).Hours() == 96 {
// only perform these steps during the initial data collection.
// after collecting 96 hours of data, package the report to compress the files
// packaging is guarded by this LastSuccessfulPackagingTime, so setting it to
// zero enables packaging to occur thruout this loop
log.Info("collected 96 hours of data, resetting packaging time to force packaging")
cr.Status.Packaging.LastSuccessfulPackagingTime = metav1.Time{}
packageFiles(packager)
originalStartTime = startTime
}
startTime = startTime.Add(1 * time.Hour)
if err := r.Status().Update(ctx, cr); err != nil {
// it's not critical to handle this error. We update the status here to show progress
// if this loop takes a long time to complete. A missed update here does not impact
// data collection here.
log.Info("failed to update MetricsConfig status")
packageFiles(packager, cr)
startTime = t
// update status to show progress
r.updateStatusAndLogError(ctx, cr)
}
}

r.initialDataCollection = false
packager.FilesAction = packaging.CopyFiles
if endTime.Hour() == HOURS_IN_DAY && !cr.Status.Prometheus.LastQuerySuccessTime.Equal(&metav1.Time{Time: startTime}) {
// when we've reached the end of the day, force packaging to occur to generate the daily report
log.Info("collected a full day of data, resetting packaging time to force packaging")
cr.Status.Packaging.LastSuccessfulPackagingTime = metav1.Time{}
packager.FilesAction = packaging.MoveFiles
}

// package report files
packageFiles(packager)
packageFiles(packager, cr)

// Initial returned result -> requeue reconcile after 5 min.
// This result is replaced if upload or status update results in error.
Expand All @@ -693,9 +720,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// obtain credentials token/basic & return if there are authentication credential errors
if err := setAuthentication(r, authConfig, cr, req.NamespacedName); err != nil {
if err := r.Status().Update(ctx, cr); err != nil {
log.Error(err, "failed to update MetricsConfig status")
}
r.updateStatusAndLogError(ctx, cr)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -727,7 +752,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// remove old reports if maximum report count has been exceeded
if err := packager.TrimPackages(); err != nil {
if err := packager.TrimPackages(cr); err != nil {
result = ctrl.Result{}
errors = append(errors, err)
}
Expand All @@ -740,7 +765,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
cr.Status.Packaging.PackagedFiles = uploadFiles

if err := r.Status().Update(ctx, cr); err != nil {
log.Error(err, "failed to update MetricsConfig status")
log.Info("failed to update MetricsConfig status", "error", err)
result = ctrl.Result{}
errors = append(errors, err)
}
Expand All @@ -757,6 +782,12 @@ func (r *MetricsConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *MetricsConfigReconciler) updateStatusAndLogError(ctx context.Context, cr *metricscfgv1beta1.MetricsConfig) {
if err := r.Status().Update(ctx, cr); err != nil {
log.Info("failed to update MetricsConfig status", "error", err)
}
}

// concatErrs combines all the errors into one error
func concatErrs(errors ...error) error {
var err error
Expand Down
Loading

0 comments on commit fc6b035

Please sign in to comment.