Skip to content

Commit

Permalink
Merge pull request #31 from ysa23/feat/error-callback-for-reporters
Browse files Browse the repository at this point in the history
Feat: Use a single error callback per reporter
  • Loading branch information
ysa23 committed Feb 18, 2022
2 parents ceb0eae + 946840f commit 4a0c833
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 174 deletions.
97 changes: 68 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,17 @@ const stringReporter = new StringReporter({ action: metricString => {
}});
const consoleReporter = new ConsoleReporter();

const metrics = new Metrics({ reporters: [stringReporter, consoleReporter], errback: errorCallback });
const reporters = [ // Array of reporters to trigger when a metrics should be reported
stringReporter, consoleReporter
];
const errback = (err) => { // Optional - A function to be called when an error occurs
console.error(err);
};

const metrics = new Metrics({
reporters,
errback
});
```

### Reporting Metrics
Expand All @@ -71,6 +81,7 @@ const metric = metrics.space('http').space('requests'); // http.requests
#### Execution time
Use the `meter` method on a `Space` to report execution time of a function:
```javascript
// Callback function
const wrapper = metrics.space('users.get').meter(function(userIds, callback) {
// read users from database
callback(...);
Expand Down Expand Up @@ -150,7 +161,9 @@ const metrics = new Metrics({
}
});
```
The error callback receives a single parameter - an Error instance. The callback will be triggered when any error occurs during the metrics reporting
The error callback receives a single parameter - an Error instance. The callback will be triggered when any error occurs during the metrics reporting.

**Please note:** Some reporters require their own error handler. Make sure to initialize it as well.

### Reporters
Metrics comes with several built-in reporters
Expand All @@ -161,21 +174,25 @@ const { Metrics, GraphiteReporter } = require('metrics-reporter');

const graphiteHost = '1.1.1.1'; // Graphite server IP address
const graphitePort = 8125; // Optional - port number. Defaults to 8125
const spacePrefix = 'My.Project'; // Optional - prefix to all metrics spaces
const spacePrefix = 'My.Project'; // Optional - prefix to all metrics spaces
const tags = { tag1: 'value1' }; // Optional - key-value pairs to be appanded to all the metrics reported
const batch = true; // Optional - Default `true` - Indicates that metrics will be sent in batches
const maxBufferSize = 500; // Optional - Default `1000` - Size of the buffer for sending batched messages. When buffer is filled it is flushed immediately
const flushInterval = 1000; // Optional - Default `1000` (1s) - Time in milliseconds. Indicates how often the buffer is flushed in case batch = true
const flushInterval = 1000; // Optional - Default `1000` (1s) - Time in milliseconds. Indicates how often the buffer is flushed in case batch = true
const errback = (err) => { // Optional - function to be triggered when an error occurs
console.error(err)
};

const graphiteReporter = new GraphiteReporter({
host: graphiteHost,
port: graphitePort,
prefix: spacePrefix,
tags,
batch,
maxBufferSize,
flushInterval,
});
host: graphiteHost,
port: graphitePort,
prefix: spacePrefix,
tags,
batch,
maxBufferSize,
flushInterval,
errback,
});

const metrics = new Metrics({ reporters: [graphiteReporter] });

Expand All @@ -189,20 +206,24 @@ const { Metrics, DataDogReporter } = require('metrics-reporter');

const agentHost = '1.1.1.1'; // DataDog agent IP address
const port = 8125; // Optional - Default `8125` - port number. Defaults to 8125
const spacePrefix = 'My.Project'; // Optional - prefix to all metrics spaces
const spacePrefix = 'My.Project'; // Optional - prefix to all metrics spaces
const batch = true; // Optional - Default `true` - Indicates that metrics will be sent in batches
const maxBufferSize = 500; // Optional - Default `1000` - Size of the buffer for sending batched messages. When buffer is filled it is flushed immediately
const flushInterval = 1000; // Optional - Default `1000` (1s) - Time in milliseconds. Indicates how often the buffer is flushed in case batch = true
const flushInterval = 1000; // Optional - Default `1000` (1s) - Time in milliseconds. Indicates how often the buffer is flushed in case batch = true
const tags = { tag1: 'value1' }; // Optional - key-value pairs to be appanded to all the metrics reported
const errback = (err) => { // Optional - function to be triggered when an error occurs
console.error(err)
};

const datadogReporter = new DataDogReporter({
host: agentHost,
port,
prefix: spacePrefix,
batch,
maxBufferSize,
flushInterval,
tags,
host: agentHost,
port,
prefix: spacePrefix,
batch,
maxBufferSize,
flushInterval,
tags,
errback,
});

const metrics = new Metrics({ reporters: [datadogReporter] });
Expand Down Expand Up @@ -262,29 +283,43 @@ A reporter must contain three methods:
The methods get the following parameters:
* `key` (mandatory) - the metric to report
* `value` (mandatory) - the value to report (ms, count or increment for example)
* `tags` (optional) - an object that contains the tags to report on the metric as properties
* `errorCallback` (optional) - a callback function to be triggered when an error occurs within the reporter.
* `tags` (optional) - an object that contains the tags to report on the metric as properties

For example, lets see how to implement a reporter for redis:
```js
const client = require('redis').createClient();

module.exports = function RedisReporter(channel) {
function report(key, val, tags, errorCallback) {
function RedisReporter({
channel,
errback
}) {
function report(key, val, tags) {
client.publish(channel, JSON.stringify({ key, value: val, tags }));
}

function value(key, val, tags, errorCallback) {
client.set(key, val, errorCallback);
function value(key, val, tags) {
client.set(key, val, (err) => {
if (!err || !errback) {
return;
}

errback(err);
});
}

function increment(key, value, tags, errorCallback) {
function increment(key, value, tags) {
const multi = client.multi();
for(let i = 0; i < value; i++) {
multi.incr(key);
}

multi.exec(errorCallback);
multi.exec((err) => {
if (!err || !errback) {
return;
}

errback(err);
});
}

return {
Expand All @@ -293,6 +328,10 @@ module.exports = function RedisReporter(channel) {
increment,
}
};

module.exports = {
RedisReporter,
};
```
The new reporter will publish a message to a specified channel in redis when a metric is reported.

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"datadog",
"DogStatsD"
],
"version": "0.11.0",
"version": "0.12.0",
"repository": {
"type": "git",
"url": "https://github.com/ysa23/metrics-js"
Expand Down
5 changes: 4 additions & 1 deletion src/metrics.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
const { validate } = require('./validation/validator');
const { Space } = require('./space');

function Metrics({ reporters, errback }) {
if (!reporters || !Array.isArray(reporters) || reporters.length === 0) throw new TypeError('reporters is missing or empty');
if (errback && typeof errback !== 'function') throw new TypeError('errback must be a function');
validate({
name: 'errback', value: errback, type: 'function', required: false,
});

if (!reporters.every(r => r && typeof r.report === 'function' && typeof r.value === 'function' && typeof r.increment === 'function')) {
throw new TypeError('must pass valid reporters with a `report` function');
Expand Down
2 changes: 1 addition & 1 deletion src/metrics.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ describe('Metrics', () => {
const reporters = [new InMemoryReporter({ buffer: [] })];

expect(() => new Metrics({ reporters, errback }))
.toThrow('errback must be a function');
.toThrow(TypeError);
});

it('should create a metrics object', () => {
Expand Down
40 changes: 13 additions & 27 deletions src/network/socket.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
const dgram = require('dgram');
const { validate } = require('../validation/validator');

function Socket({
port, host, batch = true, maxBufferSize = 1000, flushInterval = 1000,
port, host, batch = true, maxBufferSize = 1000, flushInterval = 1000, errback,
}) {
validate({ name: 'port', value: port, type: 'number' });
validate({ name: 'host', value: host, type: 'string' });
validate({ name: 'batch', value: batch, type: 'boolean' });
validate({ name: 'maxBufferSize', value: maxBufferSize, type: 'number' });
validate({ name: 'flushInterval', value: flushInterval, type: 'number' });
validate({
name: 'errback', value: errback, type: 'function', required: false,
});

const socket = dgram.createSocket('udp4');
socket.unref();
Expand All @@ -22,18 +26,15 @@ function Socket({
interval.unref();
}

function send({ message, callback }) {
function send({ message }) {
if (!message) {
throw new TypeError('message is mandatory');
}
if (callback && typeof callback !== 'function') {
throw new TypeError('callback should be a function');
}

if (batch === true) {
append({ message, callback });
append({ message });
} else {
sendImmediate({ message, callback });
sendImmediate({ message });
}
}

Expand All @@ -44,8 +45,8 @@ function Socket({
}
}

function append({ message, callback }) {
buffer.push({ message, callback });
function append({ message }) {
buffer.push({ message });
bufferSize += message.length;

if (bufferSize > maxBufferSize) {
Expand All @@ -59,33 +60,24 @@ function Socket({
}

const bufferedMessage = buffer.map(x => x.message).join('\n');
const callbacks = buffer.map(x => x.callback);
// We capture the messages to send first to avoid concurrency issues for handling the buffer.
// If we purge it after, new messages added to the buffer won't be sent, or worse, resent.
bufferSize = 0;
buffer = [];

sendImmediate({
message: bufferedMessage,
callback: err => {
callbacks.filter(cb => cb).forEach(cb => cb(err));
},
});
}

function sendImmediate({ message, callback }) {
function sendImmediate({ message }) {
const bytes = Buffer.from(message);
socket.send(bytes, 0, bytes.length, port, host, err => {
if (!callback) {
return;
}

if (err) {
callback(err);
if (!errback || !err) {
return;
}

callback();
errback(err);
});
}

Expand All @@ -95,12 +87,6 @@ function Socket({
};
}

function validate({ name, value, type }) {
if (value === undefined || value === null) throw new TypeError(`${name} is missing`);
// eslint-disable-next-line valid-typeof
if (typeof value !== type) throw new TypeError(`${name} is not a ${type}: ${value}: ${typeof value}`);
}

module.exports = {
Socket,
};
Loading

0 comments on commit 4a0c833

Please sign in to comment.