Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add separate read connection for server and batch share inserts #15

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,15 @@ for (const seedPeer of config.seedPeers) {
}

if (config.poolServer.enabled) {
const poolServer = new PoolServer($.consensus, config.pool, config.poolServer.port, config.poolServer.mySqlPsw, config.poolServer.mySqlHost, config.poolServer.sslKeyPath, config.poolServer.sslCertPath);
const poolServer = new PoolServer(
$.consensus,
config.pool,
config.poolServer.port,
config.poolServer.mySqlPsw,
config.poolServer.mySqlHost,
config.poolServer.mySqlReadHost,
config.poolServer.sslKeyPath,
config.poolServer.sslCertPath);

if (config.poolMetricsServer.enabled) {
$.metricsServer = new MetricsServer(config.poolServer.sslKeyPath, config.poolServer.sslCertPath, config.poolMetricsServer.port, config.poolMetricsServer.password);
Expand Down
7 changes: 6 additions & 1 deletion server.sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
// Password of the MySQL pool_server user.
mySqlPsw: 'password',

// Host the MySQL database runs on.
// Host URL the MySQL database runs on.
// Default: 'localhost'
mySqlHost: 'localhost'

// (Optional) A URL for a read-only endpoint of the MySQL server. Useful for
// scaling horizontally.
// Default: 'localhost'
//mySqlReadHost: 'localhost'
},

// General mining pool configuration
Expand Down
6 changes: 3 additions & 3 deletions spec/PoolAgent.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ describe('PoolAgent', () => {
it('does not count shares onto old blocks (smart mode)', (done) => {
(async () => {
const consensus = await Nimiq.Consensus.volatileFull();
const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', '', '', '');
const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', '', '', '', '');
await poolServer.start();

let fixFakeTime = 0;
Expand Down Expand Up @@ -135,7 +135,7 @@ describe('PoolAgent', () => {
it('bans clients with too many invalid shares', (done) => {
(async () => {
const consensus = await Nimiq.Consensus.volatileFull();
const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', '', '', '');
const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', '', '', '', '');
await poolServer.start();
const time = new Nimiq.Time();
spyOn(time, 'now').and.callFake(() => 0);
Expand Down Expand Up @@ -171,7 +171,7 @@ describe('PoolAgent', () => {
const clientAddress = keyPair.publicKey.toAddress();

const consensus = await Nimiq.Consensus.volatileFull();
const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', 'localhost', '', '');
const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', 'localhost', 'localhost', '', '');
await poolServer.start();
const poolAgent = new PoolAgent(poolServer, { close: () => {}, send: () => {} }, Nimiq.NetAddress.fromIP('1.2.3.4'));
spyOn(poolAgent, '_regenerateNonce').and.callFake(() => { poolAgent._nonce = 42 });
Expand Down
6 changes: 4 additions & 2 deletions src/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ const DEFAULT_CONFIG = /** @type {Config} */ {
sslCertPath: null,
sslKeyPath: null,
mySqlPsw: null,
mySqlHost: null
mySqlHost: null,
mySqlReadHost: null
},
poolService: {
enabled: false,
Expand Down Expand Up @@ -134,7 +135,8 @@ const CONFIG_TYPES = {
certPath: 'string',
keyPath: 'string',
mySqlPsw: 'string',
mySqlHost: 'string'
mySqlHost: 'string',
mySqlReadHost: 'string'
}
},
poolService: {
Expand Down
98 changes: 83 additions & 15 deletions src/PoolServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@ const fs = require('fs');
const PoolAgent = require('./PoolAgent.js');
const Helper = require('./Helper.js');

/**
* @typedef Deferred
* @property {Promise} promise
* @property {Function} resolve
* @property {Function} reject
*/

class PoolServer extends Nimiq.Observable {
/**
* @param {Nimiq.FullConsensus} consensus
* @param {PoolConfig} config
* @param {number} port
* @param {string} mySqlPsw
* @param {string} mySqlHost
* @param {string} mySqlWriteHost
* @param {string} [mySqlReadHost]
* @param {string} sslKeyPath
* @param {string} sslCertPath
*/
constructor(consensus, config, port, mySqlPsw, mySqlHost, sslKeyPath, sslCertPath) {
constructor(consensus, config, port, mySqlPsw, mySqlWriteHost, mySqlReadHost, sslKeyPath, sslCertPath) {
super();

/** @type {Nimiq.FullConsensus} */
Expand All @@ -39,7 +47,10 @@ class PoolServer extends Nimiq.Observable {
this._mySqlPsw = mySqlPsw;

/** @type {string} */
this._mySqlHost = mySqlHost;
this._mySqlReadHost = mySqlReadHost;

/** @type {string} */
this._mySqlWriteHost = mySqlWriteHost;

/** @type {string} */
this._sslKeyPath = sslKeyPath;
Expand Down Expand Up @@ -74,6 +85,18 @@ class PoolServer extends Nimiq.Observable {
/** @type {number} */
this._averageHashrate = 0;

/** @type {Array[]} */
this._queuedShares = [];

/** @type {number} */
this._lastShareInsert = 0;

/** @type {number} */
this._lastKnownHeight = 0;

/** @type {Deferred} */
this._sharesDeferred = { };

/** @type {boolean} */
this._started = false;

Expand All @@ -91,13 +114,22 @@ class PoolServer extends Nimiq.Observable {
this._currentLightHead = this.consensus.blockchain.head.toLight();
await this._updateTransactions();

this.connectionPool = await mysql.createPool({
host: this._mySqlHost,
this.writePool = await mysql.createPool({
host: this._mySqlWriteHost,
user: 'pool_server',
password: this._mySqlPsw,
database: 'pool'
});

this.readPool = !this._mySqlReadHost
? this.writePool
: await mysql.createPool({
host: this._mySqlReadHost,
user: 'pool_server',
password: this._mySqlPsw,
database: 'pool'
});

this._wss = PoolServer.createServer(this.port, this._sslKeyPath, this._sslCertPath);
this._wss.on('connection', (ws, req) => this._onConnection(ws, req));

Expand Down Expand Up @@ -276,10 +308,45 @@ class PoolServer extends Nimiq.Observable {
* @param {Nimiq.Hash} shareHash
*/
async storeShare(userId, deviceId, prevHash, prevHashHeight, difficulty, shareHash) {
let prevHashId = await Helper.getStoreBlockId(this.connectionPool, prevHash, prevHashHeight);
const query = "INSERT INTO share (user, device, datetime, prev_block, difficulty, hash) VALUES (?, ?, ?, ?, ?, ?)";
const queryArgs = [userId, deviceId, Date.now(), prevHashId, difficulty, shareHash.serialize()];
await this.connectionPool.execute(query, queryArgs);
let prevHashId;
if (prevHashHeight > this._lastKnownHeight) {
prevHashId = await Helper.getStoreBlockId(this.writePool, prevHash, prevHashHeight);
this._lastKnownHeight = prevHashHeight;
} else {
prevHashId = await Helper.getBlockId(this.readPool, prevHash);
}

this._queuedShares.push([userId, deviceId, Date.now(), prevHashId, difficulty, shareHash.serialize()]);

if (!this._sharesDeferred.promise) {
this._sharesDeferred.promise = new Promise((resolve, reject) => {
this._sharesDeferred.resolve = resolve;
this._sharesDeferred.reject = reject;
});
}

if (this._lastShareInsert < Date.now() - PoolServer.INSERT_INTERVAL) {
this._sharesDeferred.promise = null;
return this._storeShares().then(this._sharesDeferred.resolve, this._sharesDeferred.reject);
}

clearTimeout(this._shareTimeout);
this._shareTimeout = setTimeout(() => {
this._storeShares().then(this._sharesDeferred.resolve, this._sharesDeferred.reject);
}, PoolServer.INSERT_INTERVAL);

return this._sharesDeferred.promise;
}

_storeShares() {
this._lastShareInsert = Date.now();
clearTimeout(this._shareTimeout);

const shares = this._queuedShares.splice(0, this._queuedShares.length);
const query = `INSERT INTO share (user, device, datetime, prev_block, difficulty, hash)
VALUES ${shares.map(() => '(?,?,?,?,?,?)').join(',')}`;
const queryArgs = [].concat.apply([], shares);
return this.writePool.execute(query, queryArgs);
}

/**
Expand All @@ -290,7 +357,7 @@ class PoolServer extends Nimiq.Observable {
async containsShare(user, shareHash) {
const query = "SELECT * FROM share WHERE user=? AND hash=?";
const queryArgs = [user, shareHash.serialize()];
const [rows, fields] = await this.connectionPool.execute(query, queryArgs);
const [rows, fields] = await this.readPool.execute(query, queryArgs);
return rows.length > 0;
}

Expand All @@ -300,7 +367,7 @@ class PoolServer extends Nimiq.Observable {
* @returns {Promise<number>}
*/
async getUserBalance(userId, includeVirtual = false) {
return await Helper.getUserBalance(this._config, this.connectionPool, userId, this.consensus.blockchain.height, includeVirtual);
return await Helper.getUserBalance(this._config, this.readPool, userId, this.consensus.blockchain.height, includeVirtual);
}

/**
Expand All @@ -309,7 +376,7 @@ class PoolServer extends Nimiq.Observable {
async storePayoutRequest(userId) {
const query = "INSERT IGNORE INTO payout_request (user) VALUES (?)";
const queryArgs = [userId];
await this.connectionPool.execute(query, queryArgs);
await this.writePool.execute(query, queryArgs);
}

/**
Expand All @@ -318,7 +385,7 @@ class PoolServer extends Nimiq.Observable {
*/
async hasPayoutRequest(userId) {
const query = `SELECT * FROM payout_request WHERE user=?`;
const [rows, fields] = await this.connectionPool.execute(query, [userId]);
const [rows, fields] = await this.readPool.execute(query, [userId]);
return rows.length > 0;
}

Expand All @@ -327,8 +394,8 @@ class PoolServer extends Nimiq.Observable {
* @returns {Promise.<number>}
*/
async getStoreUserId(addr) {
await this.connectionPool.execute("INSERT IGNORE INTO user (address) VALUES (?)", [addr.toBase64()]);
const [rows, fields] = await this.connectionPool.execute("SELECT id FROM user WHERE address=?", [addr.toBase64()]);
await this.writePool.execute("INSERT IGNORE INTO user (address) VALUES (?)", [addr.toBase64()]);
const [rows, fields] = await this.writePool.execute("SELECT id FROM user WHERE address=?", [addr.toBase64()]);
return rows[0].id;
}

Expand Down Expand Up @@ -403,5 +470,6 @@ class PoolServer extends Nimiq.Observable {
PoolServer.DEFAULT_BAN_TIME = 1000 * 60 * 10; // 10 minutes
PoolServer.UNBAN_IPS_INTERVAL = 1000 * 60; // 1 minute
PoolServer.HASHRATE_INTERVAL = 1000 * 60; // 1 minute
PoolServer.INSERT_INTERVAL = 1000; // 1 second

module.exports = exports = PoolServer;