Skip to content

Commit

Permalink
Add migration server metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Aug 30, 2022
1 parent 7aef280 commit 8a33385
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 3 deletions.
43 changes: 43 additions & 0 deletions dbs/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/dmwm/dbs2go/utils"
Expand Down Expand Up @@ -63,6 +64,27 @@ DBS migration status codes (in sync with old python migration server):
FAILED -> IN PROGRESS (3 -> 1) is allowed for retrying and retry count +1.
*/

// TotalPending represents total number pending migration requests
var TotalPending uint64

// TotalInProgress represents total number in progress migration requests
var TotalInProgress uint64

// TotalCompleted represents total number of completed migration requests
var TotalCompleted uint64

// TotalFailed represents total number of failed migration requests
var TotalFailed uint64

// TotalTermFailed represents total number of terminally failed migration requests
var TotalTermFailed uint64

// TotalExistInDB represents total number of exist in db migration requests
var TotalExistInDB uint64

// TotalQueued represents total number of queued migration requests
var TotalQueued uint64

// MigrationAsyncTimeout defines timeout of asynchrounous migration request process
var MigrationAsyncTimeout int

Expand Down Expand Up @@ -1325,10 +1347,31 @@ func migrationHost(mid int64) (string, error) {
return hostname, nil
}

// updateMigrationStatusMetrics updates metrics about migration statuses
func updateMigrationStatusMetrics(mrec MigrationRequest) {
status := mrec.MIGRATION_STATUS
if status == IN_PROGRESS {
atomic.AddUint64(&TotalInProgress, 1)
} else if status == PENDING {
atomic.AddUint64(&TotalPending, 1)
} else if status == COMPLETED {
atomic.AddUint64(&TotalCompleted, 1)
} else if status == FAILED {
atomic.AddUint64(&TotalFailed, 1)
} else if status == TERM_FAILED {
atomic.AddUint64(&TotalTermFailed, 1)
} else if status == EXIST_IN_DB {
atomic.AddUint64(&TotalExistInDB, 1)
} else if status == QUEUED {
atomic.AddUint64(&TotalQueued, 1)
}
}

// updateMigrationStatus updates migration status and increment retry count of
// migration record.
func updateMigrationStatus(mrec MigrationRequest, status int) error {
log.Printf("update migration request %d to status %d", mrec.MIGRATION_REQUEST_ID, status)
updateMigrationStatusMetrics(mrec)
tmplData := make(Record)
tmplData["Owner"] = DBOWNER
stm, err := LoadTemplateSQL("update_migration_status", tmplData)
Expand Down
5 changes: 5 additions & 0 deletions dbs/migration_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dbs
import (
"database/sql"
"log"
"sync/atomic"
"time"

"github.com/dmwm/dbs2go/utils"
Expand All @@ -26,6 +27,9 @@ var MigrationCleanupOffset int64
// MigrationRetries specifies total number of migration retries
var MigrationRetries int64

// TotalMigrationRequests counts total number of migration requests processed by this server
var TotalMigrationRequests uint64

// MigrationServer represent migration server.
// it accepts migration process timeout used by ProcessMigration API and
// exit channel
Expand Down Expand Up @@ -77,6 +81,7 @@ func MigrationServer(interval, timeout int, ch <-chan bool) {
params["migration_request_id"] = r.MIGRATION_REQUEST_ID
api.Params = params
time0 := time.Now()
atomic.AddUint64(&TotalMigrationRequests, 1)
api.ProcessMigration()
log.Printf("migration process %+v finished in %v", params, time.Since(time0))
}
Expand Down
54 changes: 51 additions & 3 deletions web/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ type Metrics struct {
DBStats sql.DBStats `json:"dbstats"` // metrics about database
MaxDBConnections uint64 `json:"maxDBConnections"` // max number of DB connections
MaxIdleConnections uint64 `json:"maxIdleConnections"` // max number of idle DB connections

// Migration server metrics
MigrationRequests uint64 `json:"migrationRequests"` // total number of migration requests across all services
MigrationPending uint64 `json:"migrationPending"` // total number of pending migration requests across all services
MigrationInProgress uint64 `json:"migrationInProgress"` // total number of in progress migration requests across all services
MigrationFailed uint64 `json:"migrationFailed"` // total number of failed migration requests across all services
MigrationTermFailed uint64 `json:"migrationTermFailed"` // total number of term failed migration requests across all services
MigrationCompleted uint64 `json:"migrationCompleted"` // total number of completed migration requests across all services
MigrationQueued uint64 `json:"migrationQueued"` // total number of queued migration requests across all services
MigrationExistInDB uint64 `json:"migrationExistInDB"` // total number of exist in db migration requests across all services
}

func metrics() Metrics {
Expand Down Expand Up @@ -175,10 +185,16 @@ func metrics() Metrics {
metrics.RPSLogical = float64(rstat.NumLogicalCores-NumLogicalCores) / lapse
metrics.RPSPhysical = float64(rstat.NumPhysicalCores-NumPhysicalCores) / lapse

// update time stamp
MetricsLastUpdateTime = time.Now()
// migration server metrics
metrics.MigrationRequests = dbs.TotalMigrationRequests
metrics.MigrationPending = dbs.TotalPending
metrics.MigrationInProgress = dbs.TotalInProgress
metrics.MigrationFailed = dbs.TotalFailed
metrics.MigrationTermFailed = dbs.TotalTermFailed
metrics.MigrationCompleted = dbs.TotalCompleted
metrics.MigrationQueued = dbs.TotalQueued
metrics.MigrationExistInDB = dbs.TotalExistInDB

// update request stats metrics
rstat.Update()

return metrics
Expand Down Expand Up @@ -392,6 +408,38 @@ func promMetrics(prefix string) string {
out += fmt.Sprintf("# TYPE %s_max_lifetime_closed counter\n", prefix)
out += fmt.Sprintf("%s_max_lifetime_closed %v\n", prefix, data.DBStats.MaxLifetimeClosed)

// migration server metrics
out += fmt.Sprintf("# HELP %s_migration_requests reports total number of migration requests\n", prefix)
out += fmt.Sprintf("# TYPE %s_migration_requests counter\n", prefix)
out += fmt.Sprintf("%s_migration_requests %v\n", prefix, data.MigrationRequests)

out += fmt.Sprintf("# HELP %s_pending reports total number of pending migration requests\n", prefix)
out += fmt.Sprintf("# TYPE %s_pending counter\n", prefix)
out += fmt.Sprintf("%s_migration_pending %v\n", prefix, data.MigrationPending)

out += fmt.Sprintf("# HELP %s_in_progress reports total number of in progress migration requests\n", prefix)
out += fmt.Sprintf("# TYPE %s_in_progress counter\n", prefix)
out += fmt.Sprintf("%s_migration_in_progress %v\n", prefix, data.MigrationInProgress)

out += fmt.Sprintf("# HELP %s_failed reports total number of failed migration requests\n", prefix)
out += fmt.Sprintf("# TYPE %s_failed counter\n", prefix)
out += fmt.Sprintf("%s_migration_failed %v\n", prefix, data.MigrationFailed)

out += fmt.Sprintf("# HELP %s_term_failed reports total number of term failed migration requests\n", prefix)
out += fmt.Sprintf("# TYPE %s_term_failed counter\n", prefix)
out += fmt.Sprintf("%s_migration_term_failed %v\n", prefix, data.MigrationTermFailed)

out += fmt.Sprintf("# HELP %s_completed reports total number of completed migration requests\n", prefix)
out += fmt.Sprintf("# TYPE %s_completed counter\n", prefix)
out += fmt.Sprintf("%s_migration_completed %v\n", prefix, data.MigrationCompleted)

out += fmt.Sprintf("# HELP %s_queued reports total number of queued migration requests\n", prefix)
out += fmt.Sprintf("# TYPE %s_queued counter\n", prefix)
out += fmt.Sprintf("%s_migration_queued %v\n", prefix, data.MigrationQueued)

out += fmt.Sprintf("# HELP %s_exist_in_db reports total number of exist in db migration requests\n", prefix)
out += fmt.Sprintf("# TYPE %s_exist_in_db counter\n", prefix)
out += fmt.Sprintf("%s_migration_exist_in_db %v\n", prefix, data.MigrationExistInDB)
return out
}

Expand Down

0 comments on commit 8a33385

Please sign in to comment.