Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load calculation fixes / qrlCache fixes #68

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export async function processMessages (queues, callback, options) {
const freememFactor = Math.min(1, Math.max(0, remainingMemory / memoryThreshold))

// Load
const oneMinuteLoad = loadavg()[0]
const oneMinuteLoad = systemMonitor.getLoad()
const loadPerCore = oneMinuteLoad / cores
const loadFactor = 1 - Math.min(1, Math.max(0, loadPerCore / 3))

Expand Down
7 changes: 5 additions & 2 deletions src/qrlCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ export async function qrlCacheGet (qname) {
// debug({ cmd })
const result = await client.send(cmd)
// debug('result', result)
if (!result) throw new QueueDoesNotExist(qname)
if (!result) {
qrlCacheInvalidate(qname)
throw new QueueDoesNotExist(qname)
}
const { QueueUrl: qrl } = result
// debug('getQueueUrl returned', data)
qcache.set(qname, qrl)
Expand All @@ -79,7 +82,7 @@ export async function qrlCacheGet (qname) {
// Immediately updates the cache
//
export function qrlCacheSet (qname, qrl) {
qcache.set(qname, qrl)
if (qrl) qcache.set(qname, qrl)
// debug('qcache', Object.keys(qcache), 'set', qname, ' => ', qcache[qname])
}

Expand Down
61 changes: 56 additions & 5 deletions src/scheduler/systemMonitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
* backpressure.
*/

import os from 'os'

export class SystemMonitor {
constructor (reportCallback, reportSeconds = 1) {
this.reportCallback = reportCallback || console.log
this.reportSeconds = reportSeconds
this.measurements = []
this.latencies = []
this.oneMinuteLoad = os.loadavg()[0]
this.instantaneousLoad = this.oneMinuteLoad
this.measure()
this.reportLatency()
}
Expand All @@ -16,15 +20,20 @@ export class SystemMonitor {
clearTimeout(this.measureTimeout)
const start = new Date()
this.measureTimeout = setTimeout(() => {
const latency = new Date() - start
this.measurements.push(latency)
if (this.measurements.length > 1000) this.measurements.shift()
this.measureLatency(start)
this.measureLoad()
this.measure()
})
}

measureLatency (start) {
const latency = new Date() - start
this.latencies.push(latency)
if (this.latencies.length > 1000) this.latencies.shift()
}

getLatency () {
return this.measurements.length ? this.measurements.reduce((a, b) => a + b, 0) / this.measurements.length : 0
return this.latencies.length ? this.latencies.reduce((a, b) => a + b, 0) / this.latencies.length : 0
}

reportLatency () {
Expand All @@ -37,6 +46,48 @@ export class SystemMonitor {
}, this.reportSeconds * 1000)
}

/**
* Measures load over the last five seconds instead of being averaged over one
* minute. This lets the scheduler respond much faster to dips in load.
*
* Theory:
*
* The Linux kernel calculates the moving average something like:
* A_1 = A_0 * e + A_now (1 - e)
* Where:
* - A_now is the number of processes active/waiting
* - A_1 is the new one-minute load average after the measurement of A_now
* - A_0 is the previous one-minute average
* - e is 1884/2048.
*
* Solving this for A_now, which we want to access, we get:
* A_now = (A_1 - A_0 * e) / (1 - e)
*
* We use this formula below to extract A_now when we detect a change in A_1.
*
* Note: this code assums that we are observing the average often enough to
* detect each change. So you have to call it at least every 5 seconds. 1
* second is better to reduce latency of detecting the change.
*/

measureLoad () {
const [newLoad, ] = os.loadavg()
const previousLoad = this.oneMinuteLoad
if (previousLoad !== newLoad) {
const e = 1884 / 2048 // see include/linux/sched/loadavg.h
const active = (newLoad - previousLoad * e) / (1 - e)
// We take the min here so that spikes up in load are averaged out. We
// care about detecting spikes downward so we can allow more jobs to run.
this.instantaneousLoad = Math.min(active, newLoad)
this.oneMinuteLoad = newLoad
console.log({ newLoad, previousLoad, active, instantaneousLoad: this.instantaneousLoad, oneMinuteLoad: this.oneMinuteLoad })
}
}

getLoad() {
return this.instantaneousLoad
}

shutdown () {
clearTimeout(this.measureTimeout)
clearTimeout(this.reportTimeout)
Expand Down
Loading