Skip to content

Commit

Permalink
Merge pull request #68 from suredone/cache-and-load
Browse files Browse the repository at this point in the history
Load calculation fixes / qrlCache fixes
  • Loading branch information
ryanwitt authored Aug 21, 2024
2 parents 9cbd368 + 0be5198 commit d915f41
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export async function processMessages (queues, callback, options) {
const freememFactor = Math.min(1, Math.max(0, remainingMemory / memoryThreshold))

// Load
const oneMinuteLoad = loadavg()[0]
const oneMinuteLoad = systemMonitor.getLoad()
const loadPerCore = oneMinuteLoad / cores
const loadFactor = 1 - Math.min(1, Math.max(0, loadPerCore / 3))

Expand Down
7 changes: 5 additions & 2 deletions src/qrlCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ export async function qrlCacheGet (qname) {
// debug({ cmd })
const result = await client.send(cmd)
// debug('result', result)
if (!result) throw new QueueDoesNotExist(qname)
if (!result) {
qrlCacheInvalidate(qname)
throw new QueueDoesNotExist(qname)

Check warning on line 71 in src/qrlCache.js

View check run for this annotation

Codecov / codecov/patch

src/qrlCache.js#L70-L71

Added lines #L70 - L71 were not covered by tests
}
const { QueueUrl: qrl } = result
// debug('getQueueUrl returned', data)
qcache.set(qname, qrl)
Expand All @@ -79,7 +82,7 @@ export async function qrlCacheGet (qname) {
// Immediately updates the cache
//
export function qrlCacheSet (qname, qrl) {
qcache.set(qname, qrl)
if (qrl) qcache.set(qname, qrl)
// debug('qcache', Object.keys(qcache), 'set', qname, ' => ', qcache[qname])
}

Expand Down
61 changes: 56 additions & 5 deletions src/scheduler/systemMonitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
* backpressure.
*/

import os from 'os'

export class SystemMonitor {
constructor (reportCallback, reportSeconds = 1) {
this.reportCallback = reportCallback || console.log
this.reportSeconds = reportSeconds
this.measurements = []
this.latencies = []
this.oneMinuteLoad = os.loadavg()[0]
this.instantaneousLoad = this.oneMinuteLoad
this.measure()
this.reportLatency()
}
Expand All @@ -16,15 +20,20 @@ export class SystemMonitor {
clearTimeout(this.measureTimeout)
const start = new Date()
this.measureTimeout = setTimeout(() => {
const latency = new Date() - start
this.measurements.push(latency)
if (this.measurements.length > 1000) this.measurements.shift()
this.measureLatency(start)
this.measureLoad()
this.measure()
})
}

measureLatency (start) {
const latency = new Date() - start
this.latencies.push(latency)
if (this.latencies.length > 1000) this.latencies.shift()
}

getLatency () {
return this.measurements.length ? this.measurements.reduce((a, b) => a + b, 0) / this.measurements.length : 0
return this.latencies.length ? this.latencies.reduce((a, b) => a + b, 0) / this.latencies.length : 0
}

reportLatency () {
Expand All @@ -37,6 +46,48 @@ export class SystemMonitor {
}, this.reportSeconds * 1000)
}

/**
* Measures load over the last five seconds instead of being averaged over one
* minute. This lets the scheduler respond much faster to dips in load.
*
* Theory:
*
* The Linux kernel calculates the moving average something like:
* A_1 = A_0 * e + A_now (1 - e)
* Where:
* - A_now is the number of processes active/waiting
* - A_1 is the new one-minute load average after the measurement of A_now
* - A_0 is the previous one-minute average
* - e is 1884/2048.
*
* Solving this for A_now, which we want to access, we get:
* A_now = (A_1 - A_0 * e) / (1 - e)
*
* We use this formula below to extract A_now when we detect a change in A_1.
*
* Note: this code assums that we are observing the average often enough to
* detect each change. So you have to call it at least every 5 seconds. 1
* second is better to reduce latency of detecting the change.
*/

measureLoad () {
const [newLoad, ] = os.loadavg()
const previousLoad = this.oneMinuteLoad
if (previousLoad !== newLoad) {
const e = 1884 / 2048 // see include/linux/sched/loadavg.h
const active = (newLoad - previousLoad * e) / (1 - e)
// We take the min here so that spikes up in load are averaged out. We
// care about detecting spikes downward so we can allow more jobs to run.
this.instantaneousLoad = Math.min(active, newLoad)
this.oneMinuteLoad = newLoad
console.log({ newLoad, previousLoad, active, instantaneousLoad: this.instantaneousLoad, oneMinuteLoad: this.oneMinuteLoad })
}
}

getLoad() {
return this.instantaneousLoad
}

shutdown () {
clearTimeout(this.measureTimeout)
clearTimeout(this.reportTimeout)
Expand Down

0 comments on commit d915f41

Please sign in to comment.