Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool: un-batch payouts #38

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 74 additions & 98 deletions src/pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -1915,10 +1915,17 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
else
log_info("Payout transfer successful");

payment_t *payment = (payment_t*) callback->data;
if (!payment)
{
log_error("Error: null payment");
return;
}

int rc;
char *err;
MDB_txn *txn;
MDB_cursor *cursor;
MDB_txn *txn = NULL;
MDB_cursor *cursor = NULL;

/* First, updated balance(s) */
if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0)
Expand All @@ -1934,50 +1941,50 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
mdb_txn_abort(txn);
goto cleanup;
}
payment_t *payment = (payment_t*) callback->data;
for (; payment->amount; payment++)

MDB_cursor_op op = MDB_SET;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val val;

rc = mdb_cursor_get(cursor, &key, &val, op);
if (rc == MDB_NOTFOUND)
{
MDB_cursor_op op = MDB_SET;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val val;
rc = mdb_cursor_get(cursor, &key, &val, op);
if (rc == MDB_NOTFOUND)
{
log_error("Payment made to non-existent address");
continue;
}
else if (rc != 0 && rc != MDB_NOTFOUND)
{
err = mdb_strerror(rc);
log_error("%s", err);
continue;
}
uint64_t current_amount = *(uint64_t*)val.mv_data;
log_error("Payment made to non-existent address");
goto skip_balance;
}
else if (rc != 0 && rc != MDB_NOTFOUND)
{
err = mdb_strerror(rc);
log_error("%s", err);
goto skip_balance;
}
uint64_t current_amount = *(uint64_t*)val.mv_data;

if (current_amount >= payment->amount)
{
current_amount -= payment->amount;
}
else
{
log_error("Payment was more than balance: %"PRIu64" > %"PRIu64,
payment->amount, current_amount);
current_amount = 0;
}
if (current_amount >= payment->amount)
{
current_amount -= payment->amount;
}
else
{
log_error("Payment was more than balance: %"PRIu64" > %"PRIu64,
payment->amount, current_amount);
current_amount = 0;
}

if (error)
{
log_warn("Error seen on transfer for %s with amount %"PRIu64,
payment->address, payment->amount);
}
MDB_val new_val = {sizeof(current_amount), (void*)&current_amount};
rc = mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT);
if (rc != 0)
{
err = mdb_strerror(rc);
log_error("%s", err);
}
if (error)
{
log_warn("Error seen on transfer for %s with amount %"PRIu64,
payment->address, payment->amount);
}
MDB_val new_val = {sizeof(current_amount), (void*)&current_amount};
rc = mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT);
if (rc != 0)
{
err = mdb_strerror(rc);
log_error("%s", err);
}

skip_balance:
if ((rc = mdb_txn_commit(txn)) != 0)
{
err = mdb_strerror(rc);
Expand All @@ -2001,18 +2008,13 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
goto cleanup;
}
time_t now = time(NULL);
payment = (payment_t*) callback->data;
for (; payment->amount; payment++)

payment->timestamp = now;
MDB_val p_val = {sizeof(payment_t), payment};
if ((rc = mdb_cursor_put(cursor, &key, &p_val, MDB_APPENDDUP)) != 0)
{
payment->timestamp = now;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val val = {sizeof(payment_t), payment};
if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0)
{
err = mdb_strerror(rc);
log_error("Error putting payment: %s", err);
continue;
}
err = mdb_strerror(rc);
log_error("Error putting payment: %s", err);
}
if ((rc = mdb_txn_commit(txn)) != 0)
{
Expand Down Expand Up @@ -2050,13 +2052,6 @@ send_payments(void)
return rc;
}

size_t payments_count = 0;
size_t payments_max_count = 25;
size_t payments_size = payments_max_count * sizeof(payment_t);
payment_t *payments = (payment_t*) calloc(1, payments_size);
payment_t *payment = payments;
payment_t *end_payment = payment + payments_max_count;

MDB_cursor_op op = MDB_FIRST;
while (1)
{
Expand All @@ -2075,52 +2070,33 @@ send_payments(void)

log_info("Sending payment: %"PRIu64", %.8s", amount, address);

strncpy(payment->address, address, ADDRESS_MAX-1);
payment->amount = amount;
payments_count++;
// free'd in rpc_callback_free
payment_t *payment = (payment_t*) calloc(1, sizeof (payment_t));

if (++payment == end_payment)
{
payments_size <<= 1;
payments = (payment_t*) realloc(payments, payments_size);
payment = payments + payments_max_count;
memset(payment, 0, sizeof(payment_t) * payments_max_count);
payments_max_count <<= 1;
end_payment = payments + payments_max_count;
if (!payment) {
log_error("send_payments: OOM: alloc payments");
return -1;
}
}
mdb_cursor_close(cursor);
mdb_txn_abort(txn);

if (payments_count)
{
size_t body_size = 160 * payments_count + 128;
char body[body_size];
char *start = body;
char *end = body + body_size;
start = stecpy(start, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":"
strncpy(payment->address, address, ADDRESS_MAX-1);
payment->amount = amount;

char body[160+128];
const size_t body_size = sizeof(body);
snprintf(body, body_size,
"{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":"
"\"transfer_split\",\"params\":{"
"\"ring_size\":11,\"destinations\":[", end);
for (size_t i=0; i<payments_count; i++)
{
payment_t *p = &payments[i];
start = stecpy(start, "{\"address\":\"", end);
start = stecpy(start, p->address, end);
start = stecpy(start, "\",\"amount\":", end);
sprintf(start, "%"PRIu64"}", p->amount);
start = body + strlen(body);
if (i != payments_count -1)
start = stecpy(start, ",", end);
else
start = stecpy(start, "]}}", end);
}
"\"destinations\":["
"{\"address\":\"%s\",\"amount\":""%"PRIu64"}"
"]}}",
payment->address, payment->amount);
log_trace(body);
rpc_callback_t *cb = rpc_callback_new(
rpc_on_wallet_transferred, payments);
rpc_on_wallet_transferred, payment);
rpc_wallet_request(pool_base, body, cb);
}
else
free(payments);
mdb_cursor_close(cursor);
mdb_txn_abort(txn);

return 0;
}
Expand Down