From 4e0b42f5fd7692bad0937a09725ae04f4daa04bc Mon Sep 17 00:00:00 2001 From: Michael Contreras Date: Tue, 5 May 2020 20:07:23 -0400 Subject: [PATCH] pool: un-batch payouts Send payments one at a time --- src/pool.c | 172 +++++++++++++++++++++++------------------------------ 1 file changed, 74 insertions(+), 98 deletions(-) diff --git a/src/pool.c b/src/pool.c index 3f7da04..2a465c1 100644 --- a/src/pool.c +++ b/src/pool.c @@ -1915,10 +1915,17 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback) else log_info("Payout transfer successful"); + payment_t *payment = (payment_t*) callback->data; + if (!payment) + { + log_error("Error: null payment"); + return; + } + int rc; char *err; - MDB_txn *txn; - MDB_cursor *cursor; + MDB_txn *txn = NULL; + MDB_cursor *cursor = NULL; /* First, updated balance(s) */ if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) @@ -1934,50 +1941,50 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback) mdb_txn_abort(txn); goto cleanup; } - payment_t *payment = (payment_t*) callback->data; - for (; payment->amount; payment++) + + MDB_cursor_op op = MDB_SET; + MDB_val key = {ADDRESS_MAX, (void*)payment->address}; + MDB_val val; + + rc = mdb_cursor_get(cursor, &key, &val, op); + if (rc == MDB_NOTFOUND) { - MDB_cursor_op op = MDB_SET; - MDB_val key = {ADDRESS_MAX, (void*)payment->address}; - MDB_val val; - rc = mdb_cursor_get(cursor, &key, &val, op); - if (rc == MDB_NOTFOUND) - { - log_error("Payment made to non-existent address"); - continue; - } - else if (rc != 0 && rc != MDB_NOTFOUND) - { - err = mdb_strerror(rc); - log_error("%s", err); - continue; - } - uint64_t current_amount = *(uint64_t*)val.mv_data; + log_error("Payment made to non-existent address"); + goto skip_balance; + } + else if (rc != 0 && rc != MDB_NOTFOUND) + { + err = mdb_strerror(rc); + log_error("%s", err); + goto skip_balance; + } + uint64_t current_amount = *(uint64_t*)val.mv_data; - if (current_amount >= payment->amount) - { - current_amount -= payment->amount; - } - else - { - log_error("Payment was more than balance: %"PRIu64" > %"PRIu64, - payment->amount, current_amount); - current_amount = 0; - } + if (current_amount >= payment->amount) + { + current_amount -= payment->amount; + } + else + { + log_error("Payment was more than balance: %"PRIu64" > %"PRIu64, + payment->amount, current_amount); + current_amount = 0; + } - if (error) - { - log_warn("Error seen on transfer for %s with amount %"PRIu64, - payment->address, payment->amount); - } - MDB_val new_val = {sizeof(current_amount), (void*)¤t_amount}; - rc = mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); - if (rc != 0) - { - err = mdb_strerror(rc); - log_error("%s", err); - } + if (error) + { + log_warn("Error seen on transfer for %s with amount %"PRIu64, + payment->address, payment->amount); + } + MDB_val new_val = {sizeof(current_amount), (void*)¤t_amount}; + rc = mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); + if (rc != 0) + { + err = mdb_strerror(rc); + log_error("%s", err); } + +skip_balance: if ((rc = mdb_txn_commit(txn)) != 0) { err = mdb_strerror(rc); @@ -2001,18 +2008,13 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback) goto cleanup; } time_t now = time(NULL); - payment = (payment_t*) callback->data; - for (; payment->amount; payment++) + + payment->timestamp = now; + MDB_val p_val = {sizeof(payment_t), payment}; + if ((rc = mdb_cursor_put(cursor, &key, &p_val, MDB_APPENDDUP)) != 0) { - payment->timestamp = now; - MDB_val key = {ADDRESS_MAX, (void*)payment->address}; - MDB_val val = {sizeof(payment_t), payment}; - if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0) - { - err = mdb_strerror(rc); - log_error("Error putting payment: %s", err); - continue; - } + err = mdb_strerror(rc); + log_error("Error putting payment: %s", err); } if ((rc = mdb_txn_commit(txn)) != 0) { @@ -2050,13 +2052,6 @@ send_payments(void) return rc; } - size_t payments_count = 0; - size_t payments_max_count = 25; - size_t payments_size = payments_max_count * sizeof(payment_t); - payment_t *payments = (payment_t*) calloc(1, payments_size); - payment_t *payment = payments; - payment_t *end_payment = payment + payments_max_count; - MDB_cursor_op op = MDB_FIRST; while (1) { @@ -2075,52 +2070,33 @@ send_payments(void) log_info("Sending payment: %"PRIu64", %.8s", amount, address); - strncpy(payment->address, address, ADDRESS_MAX-1); - payment->amount = amount; - payments_count++; + // free'd in rpc_callback_free + payment_t *payment = (payment_t*) calloc(1, sizeof (payment_t)); - if (++payment == end_payment) - { - payments_size <<= 1; - payments = (payment_t*) realloc(payments, payments_size); - payment = payments + payments_max_count; - memset(payment, 0, sizeof(payment_t) * payments_max_count); - payments_max_count <<= 1; - end_payment = payments + payments_max_count; + if (!payment) { + log_error("send_payments: OOM: alloc payments"); + return -1; } - } - mdb_cursor_close(cursor); - mdb_txn_abort(txn); - if (payments_count) - { - size_t body_size = 160 * payments_count + 128; - char body[body_size]; - char *start = body; - char *end = body + body_size; - start = stecpy(start, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":" + strncpy(payment->address, address, ADDRESS_MAX-1); + payment->amount = amount; + + char body[160+128]; + const size_t body_size = sizeof(body); + snprintf(body, body_size, + "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":" "\"transfer_split\",\"params\":{" - "\"ring_size\":11,\"destinations\":[", end); - for (size_t i=0; iaddress, end); - start = stecpy(start, "\",\"amount\":", end); - sprintf(start, "%"PRIu64"}", p->amount); - start = body + strlen(body); - if (i != payments_count -1) - start = stecpy(start, ",", end); - else - start = stecpy(start, "]}}", end); - } + "\"destinations\":[" + "{\"address\":\"%s\",\"amount\":""%"PRIu64"}" + "]}}", + payment->address, payment->amount); log_trace(body); rpc_callback_t *cb = rpc_callback_new( - rpc_on_wallet_transferred, payments); + rpc_on_wallet_transferred, payment); rpc_wallet_request(pool_base, body, cb); } - else - free(payments); + mdb_cursor_close(cursor); + mdb_txn_abort(txn); return 0; }