Skip to content

Commit

Permalink
Merge pull request #26 from ysa23/chore/graphite-batching
Browse files Browse the repository at this point in the history
Chore: Batch reporting for Graphite
  • Loading branch information
ysa23 committed Feb 3, 2022
2 parents 72e75f2 + ea9bf90 commit 417d711
Show file tree
Hide file tree
Showing 13 changed files with 691 additions and 121 deletions.
11 changes: 11 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
root = true

[*]
indent_style = space
indent_size = 2
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

[*.md]
trim_trailing_whitespace = false
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,24 @@ const { Metrics, GraphiteReporter } = require('metrics-reporter');
const graphiteHost = '1.1.1.1'; // Graphite server IP address
const graphitePort = 8125; // Optional - port number. Defaults to 8125
const spacePrefix = 'My.Project'; // Optional - prefix to all metrics spaces
const tags = { tag1: 'value1' }; // Optional - key-value pairs to be appanded to all the metrics reported
const tags = { tag1: 'value1' }; // Optional - key-value pairs to be appanded to all the metrics reported
const batch = true; // Optional - Default `true` - Indicates that metrics will be sent in batches
const maxBufferSize = 500; // Optional - Default `1000` - Size of the buffer for sending batched messages. When buffer is filled it is flushed immediately
const flushInterval = 1000; // Optional - Default `1000` (1s) - Time in milliseconds. Indicates how often the buffer is flushed in case batch = true

const graphiteReporter = new GraphiteReporter({
host: graphiteHost,
port: graphitePort,
prefix: spacePrefix,
tags,
batch,
maxBufferSize,
flushInterval,
});

const metrics = new Metrics([graphiteReporter], errorCallback);

graphiteReporter.close(); // close should be called when the application terminates
```

#### DataDog
Expand Down
2 changes: 2 additions & 0 deletions examples/graphite.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ function timeout(ms) {
// eslint-disable-next-line no-await-in-loop
await timeout(10);
}

await timeout(10000);
})();
23 changes: 16 additions & 7 deletions network/socket.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
const dgram = require('dgram');

module.exports = function Socket({
function Socket({
port, host, batch = true, maxBufferSize = 1000, flushInterval = 1000,
}) {
if (!port) {
throw new TypeError('port is mandatory');
}
if (!host) {
throw new TypeError('host is mandatory');
}
validate({ name: 'port', value: port, type: 'number' });
validate({ name: 'host', value: host, type: 'string' });
validate({ name: 'batch', value: batch, type: 'boolean' });
validate({ name: 'maxBufferSize', value: maxBufferSize, type: 'number' });
validate({ name: 'flushInterval', value: flushInterval, type: 'number' });

const socket = dgram.createSocket('udp4');
socket.unref();
Expand Down Expand Up @@ -89,4 +88,14 @@ module.exports = function Socket({
callback();
});
}
}

function validate({ name, value, type }) {
if (value === undefined || value === null) throw new TypeError(`${name} is missing`);
// eslint-disable-next-line valid-typeof
if (typeof value !== type) throw new TypeError(`${name} is not a ${type}: ${value}: ${typeof value}`);
}

module.exports = {
Socket,
};
96 changes: 81 additions & 15 deletions network/socket.test.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,93 @@
const dgram = require('dgram');
const { when } = require('jest-when');
const Socket = require('./socket');
const { Socket } = require('./socket');

jest.mock('dgram');

describe('socket', () => {
describe('Socket', () => {
describe('constructor', () => {
it.each([undefined, null])('should throw when port is %s', port => {
stubCreateSocket();
describe('validations', () => {
it.each([
['undefined', undefined],
['null', null],
['string', 'strings'],
['array', ['a', 'b']],
['object', { key: 'value' }],
['function', () => {}]])('should throw when port is %s', (title, port) => {
stubCreateSocket();

expect(() => new Socket({
host: '127.0.0.1',
port,
})).toThrow(TypeError);
});
expect(() => new Socket({
host: '127.0.0.1',
port,
})).toThrow(TypeError);
});

it.each([undefined, null])('should throw when host is %s', host => {
stubCreateSocket();
it.each([
['undefined', undefined],
['null', null],
['number', 1],
['array', ['a', 'b']],
['object', { key: 'value' }],
['function', () => {}],
])('should throw when host is %s', (title, host) => {
stubCreateSocket();

expect(() => new Socket({
port: 1234,
host,
})).toThrow(TypeError);
expect(() => new Socket({
port: 1234,
host,
})).toThrow(TypeError);
});

it.each([
['null', null],
['number', 1],
['string', 'strings'],
['array', ['a', 'b']],
['object', { key: 'value' }],
['function', () => {}],
])('should throw when batch is %s', (title, batch) => {
stubCreateSocket();

expect(() => new Socket({
port: 1234,
host: '127.0.0.1',
batch,
})).toThrow(TypeError);
});

it.each([
['null', null],
['boolean', true],
['string', 'strings'],
['array', ['a', 'b']],
['object', { key: 'value' }],
['function', () => {}],
])('should throw when maxBufferSize is %s', (title, maxBufferSize) => {
stubCreateSocket();

expect(() => new Socket({
port: 1234,
host: '127.0.0.1',
maxBufferSize,
})).toThrow(TypeError);
});

it.each([
['null', null],
['boolean', true],
['string', 'strings'],
['array', ['a', 'b']],
['object', { key: 'value' }],
['function', () => {}],
])('should throw when flushInterval is %s', (title, flushInterval) => {
stubCreateSocket();

expect(() => new Socket({
port: 1234,
host: '127.0.0.1',
flushInterval,
})).toThrow(TypeError);
});
});

it('should create a socket', () => {
Expand Down
71 changes: 71 additions & 0 deletions network/statsd-socket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
const { Socket } = require('./socket');

const redundantDotsRegex = new RegExp('\\.\\.+', 'g');

function StatsdSocket({
port = 8125,
host,
batch = true,
maxBufferSize = 1000,
flushInterval = 1000,
tags: defaultTags,
prefix,
}) {
if (defaultTags && (Array.isArray(defaultTags) || typeof defaultTags !== 'object')) throw new TypeError('tags should be an object');

const metricPrefix = typeof prefix === 'string' && prefix.length ? removeRedundantDots(`${prefix}.`) : '';

const socket = new Socket({
host, port, batch, maxBufferSize, flushInterval,
});

function send({
key, value, type, tags, callback,
}) {
validate({ name: 'key', value: key, type: 'string' });
validate({ name: 'value', value, type: 'number' });
validate({ name: 'type', value: type, type: 'string' });
if (tags && (Array.isArray(tags) || typeof tags !== 'object')) throw new TypeError('tags should be an object');
if (callback && typeof callback !== 'function') throw new TypeError('callback should be a function');

const metric = `${metricPrefix}${key}:${value}|${type}${stringifyTags(tags)}`;

socket.send({ message: metric, callback });
}

function validate({ name, value, type }) {
if (value === undefined || value === null || (typeof value === 'string' && value === '')) throw new TypeError(`${name} is missing`);
// eslint-disable-next-line valid-typeof
if (typeof value !== type) throw new TypeError(`${name} is not a ${type}: ${value}: ${typeof value}`);
}

function close() {
socket.close();
}

function stringifyTags(tags) {
if (!tags && !defaultTags) {
return '';
}

const allTags = {
...defaultTags,
...tags,
};

return `|#${Object.entries(allTags).map(([key, value]) => `${key}:${value}`).join(',')}`;
}

return {
send,
close,
};
}

function removeRedundantDots(str) {
return str.replace(redundantDotsRegex, '.');
}

module.exports = {
StatsdSocket,
};
Loading

0 comments on commit 417d711

Please sign in to comment.