Skip to content

Commit e5e436f

Browse files
committed
stats: use scheduler for queue processing instead of thread
1 parent 817fe5a commit e5e436f

File tree

5 files changed

+31
-25
lines changed

5 files changed

+31
-25
lines changed

src/init.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2302,6 +2302,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
23022302
int nStatsPeriod = std::min(std::max((int)args.GetIntArg("-statsperiod", DEFAULT_STATSD_PERIOD), MIN_STATSD_PERIOD), MAX_STATSD_PERIOD);
23032303
node.scheduler->scheduleEvery(std::bind(&PeriodicStats, std::ref(node)), std::chrono::seconds{nStatsPeriod});
23042304
}
2305+
::g_stats_client->Schedule(*node.scheduler);
23052306

23062307
// ********************************************************* Step 11: import blocks
23072308

src/stats/client.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <random.h>
1111
#include <stats/rawsender.h>
1212
#include <sync.h>
13+
#include <util/check.h>
1314
#include <util/strencodings.h>
1415
#include <util/system.h>
1516
#include <util/translation.h>
@@ -49,6 +50,8 @@ class StatsdClientImpl final : public StatsdClient
4950
~StatsdClientImpl() = default;
5051

5152
public:
53+
void Schedule(CScheduler& scheduler) override;
54+
5255
bool dec(std::string_view key, float sample_rate) override { return count(key, -1, sample_rate); }
5356
bool inc(std::string_view key, float sample_rate) override { return count(key, 1, sample_rate); }
5457
bool count(std::string_view key, int64_t delta, float sample_rate) override { return _send(key, delta, STATSD_METRIC_COUNT, sample_rate); }
@@ -185,6 +188,11 @@ StatsdClientImpl::StatsdClientImpl(const std::string& host, uint16_t port, bool
185188
LogPrintf("%s: Initialized to transmit stats to %s:%d\n", __func__, host, port);
186189
}
187190

191+
void StatsdClientImpl::Schedule(CScheduler& scheduler)
192+
{
193+
Assert(m_sender)->Schedule(scheduler);
194+
}
195+
188196
template <typename T1>
189197
inline bool StatsdClientImpl::_send(std::string_view key, T1 value, std::string_view type, float sample_rate)
190198
{

src/stats/client.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <string_view>
1616

1717
class ArgsManager;
18+
class CScheduler;
1819

1920
/** Default host assumed to be running a Statsd server */
2021
static const std::string DEFAULT_STATSD_HOST{""};
@@ -40,6 +41,9 @@ class StatsdClient
4041
static util::Result<std::unique_ptr<StatsdClient>> make(const ArgsManager& args);
4142
virtual ~StatsdClient() = default;
4243

44+
/* Schedule tasks */
45+
virtual void Schedule(CScheduler& scheduler) {}
46+
4347
/* Statsd-defined APIs */
4448
virtual bool dec(std::string_view key, float sample_rate = 1.f) { return false; }
4549
virtual bool inc(std::string_view key, float sample_rate = 1.f) { return false; }

src/stats/rawsender.cpp

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <logging.h>
99
#include <netaddress.h>
1010
#include <netbase.h>
11+
#include <scheduler.h>
1112
#include <util/sock.h>
1213
#include <util/thread.h>
1314

@@ -38,26 +39,19 @@ RawSender::RawSender(const std::string& host, uint16_t port, bool use_tcp, std::
3839

3940
if (m_interval_ms == 0) {
4041
LogPrint(BCLog::NET, "%s: Send interval is zero, not starting RawSender queueing thread.\n", __func__);
41-
} else {
42-
m_interrupt.reset();
43-
m_thread = std::thread(&util::TraceThread, "rawsender", [this] { QueueThreadMain(); });
4442
}
4543

4644
if (m_use_tcp) {
4745
m_reconn = std::thread(&util::TraceThread, "rawreconnect", [this] { ReconnectThread(); });
4846
}
4947

50-
LogPrint(BCLog::NET, "%s: Started %sinstance sending messages to %s over %s\n", __func__,
51-
m_thread.joinable() ? "threaded " : "", this->ToStringHostPort(), m_use_tcp ? "TCP" : "UDP");
48+
LogPrint(BCLog::NET, "%s: Started instance sending messages to %s over %s\n", __func__, this->ToStringHostPort(),
49+
m_use_tcp ? "TCP" : "UDP");
5250
}
5351

5452
RawSender::~RawSender()
5553
{
5654
// If there are threads, interrupt and stop it
57-
if (m_thread.joinable()) {
58-
m_interrupt();
59-
m_thread.join();
60-
}
6155
if (m_reconn.joinable()) {
6256
m_reconn_interrupt();
6357
m_reconn.join();
@@ -67,6 +61,12 @@ RawSender::~RawSender()
6761
QueueFlush(m_queue);
6862
}
6963

64+
void RawSender::Schedule(CScheduler& scheduler)
65+
{
66+
if (m_interval_ms == 0) return;
67+
scheduler.scheduleEvery([this] { this->QueueThreadMain(); }, std::chrono::milliseconds{m_interval_ms});
68+
}
69+
7070
std::optional<bilingual_str> RawSender::Connect()
7171
{
7272
AssertLockHeld(cs_net);
@@ -167,8 +167,8 @@ std::optional<bilingual_str> RawSender::Send(const RawMessage& msg)
167167
AssertLockNotHeld(cs);
168168
AssertLockNotHeld(cs_net);
169169

170-
// If there is a thread, append to queue
171-
if (m_thread.joinable()) {
170+
// If queueing is enabled, append
171+
if (m_interval_ms > 0) {
172172
WITH_LOCK(cs, QueueAdd(m_queue, msg));
173173
return std::nullopt;
174174
}
@@ -283,14 +283,8 @@ void RawSender::QueueThreadMain()
283283
AssertLockNotHeld(cs);
284284
AssertLockNotHeld(cs_net);
285285

286-
while (!m_interrupt) {
287-
// Swap the queues to commit the existing queue of messages
288-
std::vector<RawMessage> queue;
289-
WITH_LOCK(cs, m_queue.swap(queue));
290-
QueueFlush(queue);
291-
292-
if (!m_interrupt.sleep_for(std::chrono::milliseconds(m_interval_ms))) {
293-
return;
294-
}
295-
}
286+
// Swap the queues to commit the existing queue of messages
287+
std::vector<RawMessage> queue;
288+
WITH_LOCK(cs, m_queue.swap(queue));
289+
QueueFlush(queue);
296290
}

src/stats/rawsender.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <string>
1818
#include <vector>
1919

20+
class CScheduler;
2021
class Sock;
2122

2223
struct RawMessage : public std::vector<uint8_t>
@@ -64,6 +65,8 @@ class RawSender
6465
RawSender& operator=(const RawSender&) = delete;
6566
RawSender(RawSender&&) = delete;
6667

68+
void Schedule(CScheduler& schedule);
69+
6770
//! Request a message to be sent based on configuration (queueing, batching)
6871
std::optional<bilingual_str> Send(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs, !cs_net);
6972

@@ -111,14 +114,10 @@ class RawSender
111114

112115
/* Mutex to protect (batches of) messages queue */
113116
mutable Mutex cs;
114-
/* Interrupt for queue processing thread */
115-
CThreadInterrupt m_interrupt;
116117
/* Interrupt for reconnection thread (TCP only) */
117118
CThreadInterrupt m_reconn_interrupt;
118119
/* Queue of (batches of) messages to be sent */
119120
std::vector<RawMessage> m_queue GUARDED_BY(cs);
120-
/* Thread that processes queue every m_interval_ms */
121-
std::thread m_thread;
122121
/* Reconnection attempt thread (TCP only) */
123122
std::thread m_reconn;
124123
/* Queue of messages to be sent when reconnection succeeds (TCP only) */

0 commit comments

Comments
 (0)