Skip to content

Commit

Permalink
add rewardProcessor to payout pos reward (debugging)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonzg committed Mar 21, 2023
1 parent 78d0e50 commit 11714e3
Show file tree
Hide file tree
Showing 6 changed files with 4,611 additions and 477 deletions.
113 changes: 74 additions & 39 deletions init.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var PoolLogger = require("./libs/logUtil.js");
var CliListener = require("./libs/cliListener.js");
var PoolWorker = require("./libs/poolWorker.js");
var PaymentProcessor = require("./libs/paymentProcessor.js");
var rewardProcessor = require("./libs/rewardProcessor.js");
var Website = require("./libs/website.js");
var ProfitSwitch = require("./libs/profitSwitch.js");

Expand All @@ -31,7 +32,7 @@ var poolConfigs;

var logger = new PoolLogger({
logLevel: portalConfig.logLevel,
logColors: portalConfig.logColors
logColors: portalConfig.logColors,
});

try {
Expand Down Expand Up @@ -80,6 +81,9 @@ if (cluster.isWorker) {
case "pool":
new PoolWorker(logger);
break;
case "rewardProcessor":
new rewardProcessor(logger);
break;
case "paymentProcessor":
new PaymentProcessor(logger);
break;
Expand All @@ -95,14 +99,14 @@ if (cluster.isWorker) {
}

//Read all pool configs from pool_configs and join them with their coin profile
var buildPoolConfigs = function() {
var buildPoolConfigs = function () {
var configs = {};
var configDir = "pool_configs/";

var poolConfigFiles = [];

/* Get filenames of pool config json files that are enabled */
fs.readdirSync(configDir).forEach(function(file) {
fs.readdirSync(configDir).forEach(function (file) {
if (
!fs.existsSync(configDir + file) ||
path.extname(configDir + file) !== ".json"
Expand Down Expand Up @@ -153,7 +157,7 @@ var buildPoolConfigs = function() {
}
}

poolConfigFiles.forEach(function(poolOptions) {
poolConfigFiles.forEach(function (poolOptions) {
poolOptions.coinFileName = poolOptions.coin;

var coinFilePath = "coins/" + poolOptions.coinFileName;
Expand Down Expand Up @@ -217,8 +221,8 @@ var buildPoolConfigs = function() {
return configs;
};

var spawnPoolWorkers = function() {
Object.keys(poolConfigs).forEach(function(coin) {
var spawnPoolWorkers = function () {
Object.keys(poolConfigs).forEach(function (coin) {
var p = poolConfigs[coin];

if (!Array.isArray(p.daemons) || p.daemons.length < 1) {
Expand All @@ -242,7 +246,7 @@ var spawnPoolWorkers = function() {

var serializedConfigs = JSON.stringify(poolConfigs);

var numForks = (function() {
var numForks = (function () {
if (!portalConfig.clustering || !portalConfig.clustering.enabled) return 1;
if (portalConfig.clustering.forks === "auto") return os.cpus().length;
if (!portalConfig.clustering.forks || isNaN(portalConfig.clustering.forks))
Expand All @@ -252,31 +256,31 @@ var spawnPoolWorkers = function() {

var poolWorkers = {};

var createPoolWorker = function(forkId) {
var createPoolWorker = function (forkId) {
var worker = cluster.fork({
workerType: "pool",
forkId: forkId,
pools: serializedConfigs,
portalConfig: JSON.stringify(portalConfig)
portalConfig: JSON.stringify(portalConfig),
});
worker.forkId = forkId;
worker.type = "pool";
poolWorkers[forkId] = worker;
worker
.on("exit", function(code, signal) {
.on("exit", function (code, signal) {
logger.error(
"Master",
"PoolSpawner",
"Fork " + forkId + " died, spawning replacement worker..."
);
setTimeout(function() {
setTimeout(function () {
createPoolWorker(forkId);
}, 2000);
})
.on("message", function(msg) {
.on("message", function (msg) {
switch (msg.type) {
case "banIP":
Object.keys(cluster.workers).forEach(function(id) {
Object.keys(cluster.workers).forEach(function (id) {
if (cluster.workers[id].type === "pool") {
cluster.workers[id].send({ type: "banIP", ip: msg.ip });
}
Expand All @@ -287,7 +291,7 @@ var spawnPoolWorkers = function() {
};

var i = 0;
var spawnInterval = setInterval(function() {
var spawnInterval = setInterval(function () {
createPoolWorker(i);
i++;
if (i === numForks) {
Expand All @@ -305,22 +309,22 @@ var spawnPoolWorkers = function() {
}, 250);
};

var startCliListener = function() {
var startCliListener = function () {
var cliPort = portalConfig.cliPort;

var listener = new CliListener(cliPort);
listener
.on("log", function(text) {
.on("log", function (text) {
logger.debug("Master", "CLI", text);
})
.on("command", function(command, params, options, reply) {
.on("command", function (command, params, options, reply) {
switch (command) {
case "blocknotify":
Object.keys(cluster.workers).forEach(function(id) {
Object.keys(cluster.workers).forEach(function (id) {
cluster.workers[id].send({
type: "blocknotify",
coin: params[0],
hash: params[1]
hash: params[1],
});
});
reply("Pool workers notified");
Expand All @@ -329,7 +333,7 @@ var startCliListener = function() {
processCoinSwitchCommand(params, options, reply);
break;
case "reloadpool":
Object.keys(cluster.workers).forEach(function(id) {
Object.keys(cluster.workers).forEach(function (id) {
cluster.workers[id].send({ type: "reloadpool", coin: params[0] });
});
reply("reloaded pool " + params[0]);
Expand All @@ -342,11 +346,11 @@ var startCliListener = function() {
.start();
};

var processCoinSwitchCommand = function(params, options, reply) {
var processCoinSwitchCommand = function (params, options, reply) {
var logSystem = "CLI";
var logComponent = "coinswitch";

var replyError = function(msg) {
var replyError = function (msg) {
reply(msg);
logger.error(logSystem, logComponent, msg);
};
Expand All @@ -366,7 +370,7 @@ var processCoinSwitchCommand = function(params, options, reply) {
return;
} else if (
options.algorithm &&
!Object.keys(portalConfig.switching).filter(function(s) {
!Object.keys(portalConfig.switching).filter(function (s) {
return portalConfig.switching[s].algorithm === options.algorithm;
})[0]
) {
Expand All @@ -377,7 +381,7 @@ var processCoinSwitchCommand = function(params, options, reply) {
}

var messageCoin = params[0].toLowerCase();
var newCoin = Object.keys(poolConfigs).filter(function(p) {
var newCoin = Object.keys(poolConfigs).filter(function (p) {
return p.toLowerCase() === messageCoin;
})[0];

Expand All @@ -400,7 +404,7 @@ var processCoinSwitchCommand = function(params, options, reply) {
}
}

switchNames.forEach(function(name) {
switchNames.forEach(function (name) {
if (
poolConfigs[newCoin].coin.algorithm !==
portalConfig.switching[name].algorithm
Expand All @@ -417,19 +421,19 @@ var processCoinSwitchCommand = function(params, options, reply) {
return;
}

Object.keys(cluster.workers).forEach(function(id) {
Object.keys(cluster.workers).forEach(function (id) {
cluster.workers[id].send({
type: "coinswitch",
coin: newCoin,
switchName: name
switchName: name,
});
});
});

reply("Switch message sent to pool workers");
};

var startPaymentProcessor = function() {
var startPaymentProcessor = function () {
var enabledForAny = false;
for (var pool in poolConfigs) {
var p = poolConfigs[pool];
Expand All @@ -445,41 +449,70 @@ var startPaymentProcessor = function() {

var worker = cluster.fork({
workerType: "paymentProcessor",
pools: JSON.stringify(poolConfigs)
pools: JSON.stringify(poolConfigs),
});
worker.on("exit", function(code, signal) {
worker.on("exit", function (code, signal) {
logger.error(
"Master",
"Payment Processor",
"Payment processor died, spawning replacement..."
);
setTimeout(function() {
setTimeout(function () {
startPaymentProcessor(poolConfigs);
}, 2000);
});
};

var startWebsite = function() {
var startRewardProcessor = function () {
var enabledForAny = false;
for (var pool in poolConfigs) {
var p = poolConfigs[pool];
var enabled = p.enabled && p.rewardProcessing && p.rewardProcessing.enabled;
if (enabled) {
enabledForAny = true;
break;
}
}

if (!enabledForAny) return;

var worker = cluster.fork({
workerType: "rewardProcessor",
pools: JSON.stringify(poolConfigs),
});
worker.on("exit", function (code, signal) {
logger.error(
"Master",
"Reward Processor",
"Reward processor died, spawning replacement..."
);
setTimeout(function () {
startRewardProcessor(poolConfigs);
}, 2000);
});
};

var startWebsite = function () {
if (!portalConfig.website.enabled) return;

var worker = cluster.fork({
workerType: "website",
pools: JSON.stringify(poolConfigs),
portalConfig: JSON.stringify(portalConfig)
portalConfig: JSON.stringify(portalConfig),
});
worker.on("exit", function(code, signal) {
worker.on("exit", function (code, signal) {
logger.error(
"Master",
"Website",
"Website process died, spawning replacement..."
);
setTimeout(function() {
setTimeout(function () {
startWebsite(portalConfig, poolConfigs);
}, 2000);
});
};

var startProfitSwitch = function() {
var startProfitSwitch = function () {
if (!portalConfig.profitSwitch || !portalConfig.profitSwitch.enabled) {
//logger.error('Master', 'Profit', 'Profit auto switching disabled');
return;
Expand All @@ -488,15 +521,15 @@ var startProfitSwitch = function() {
var worker = cluster.fork({
workerType: "profitSwitch",
pools: JSON.stringify(poolConfigs),
portalConfig: JSON.stringify(portalConfig)
portalConfig: JSON.stringify(portalConfig),
});
worker.on("exit", function(code, signal) {
worker.on("exit", function (code, signal) {
logger.error(
"Master",
"Profit",
"Profit switching process died, spawning replacement..."
);
setTimeout(function() {
setTimeout(function () {
startWebsite(portalConfig, poolConfigs);
}, 2000);
});
Expand All @@ -509,6 +542,8 @@ var startProfitSwitch = function() {

startPaymentProcessor();

startRewardProcessor();

startWebsite();

startProfitSwitch();
Expand Down
Loading

0 comments on commit 11714e3

Please sign in to comment.