Skip to content

Commit

Permalink
async sink config param and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gabime committed Dec 28, 2024
1 parent 345af1c commit a7298c5
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 195 deletions.
55 changes: 28 additions & 27 deletions bench/async_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
//
#include <atomic>
#include <iostream>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include <locale>
#include <algorithm>

#include "spdlog/sinks/async_sink.h"
#include "spdlog/sinks/basic_file_sink.h"
Expand All @@ -24,20 +26,9 @@ using namespace spdlog::sinks;

void bench_mt(int howmany, std::shared_ptr<spdlog::logger> log, int thread_count);

#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4996) // disable fopen warning under msvc
#endif // _MSC_VER

int count_lines(const char *filename) {
int counter = 0;
auto *infile = fopen(filename, "r");
int ch = 0;
while (EOF != (ch = getc(infile))) {
if ('\n' == ch) counter++;
}
fclose(infile);
return counter;
std::ifstream ifs(filename);
return std::count(std::istreambuf_iterator(ifs), std::istreambuf_iterator<char>(), '\n');
}

void verify_file(const char *filename, int expected_count) {
Expand Down Expand Up @@ -66,10 +57,10 @@ int main(int argc, char *argv[]) {

try {
spdlog::set_pattern("[%^%l%$] %v");
// if (argc == 1) {
// spdlog::info("Usage: {} <message_count> <threads> <q_size> <iterations>", argv[0]);
// return 0;
// }
if (argc > 1 && (std::string(argv[1]) == "-h" || std::string(argv[1]) == "--help")) {
spdlog::info("Usage: {} <message_count> <threads> <q_size> <iterations>", argv[0]);
return 0;
}

if (argc > 1) howmany = atoi(argv[1]);
if (argc > 2) threads = atoi(argv[2]);
Expand All @@ -82,6 +73,11 @@ int main(int argc, char *argv[]) {
}

if (argc > 4) iters = atoi(argv[4]);
// validate all argc values
if (howmany < 1 || threads < 1 || queue_size < 1 || iters < 1) {
spdlog::error("Invalid input values");
exit(1);
}

auto slot_size = sizeof(details::async_log_msg);
spdlog::info("-------------------------------------------------");
Expand All @@ -98,25 +94,30 @@ int main(int argc, char *argv[]) {
spdlog::info("Queue Overflow Policy: block");
spdlog::info("*********************************");
for (int i = 0; i < iters; i++) {
auto async_sink = std::make_shared<async_sink_mt>(queue_size);
auto file_sink = std::make_shared<basic_file_sink_mt>(filename, true);
async_sink->add_sink(std::move(file_sink));
auto logger = std::make_shared<spdlog::logger>("async_logger", std::move(async_sink));
bench_mt(howmany, std::move(logger), threads);
verify_file(filename, howmany);
{
auto file_sink = std::make_shared<basic_file_sink_mt>(filename, true);
auto cfg = async_sink::config();
cfg.queue_size = queue_size;
cfg.sinks.push_back(std::move(file_sink));
auto async_sink = std::make_shared<sinks::async_sink>(cfg);
auto logger = std::make_shared<spdlog::logger>("async_logger", std::move(async_sink));
bench_mt(howmany, std::move(logger), threads);
}
//verify_file(filename, howmany); // in separate scope to ensure logger is destroyed and all logs were written
}

spdlog::info("");
spdlog::info("*********************************");
spdlog::info("Queue Overflow Policy: overrun");
spdlog::info("*********************************");
// do same test but discard the oldest if queue is full instead of blocking
filename = "logs/basic_async-overrun.log";
for (int i = 0; i < iters; i++) {
auto async_sink = std::make_shared<async_sink_mt>(queue_size);
async_sink->set_overflow_policy(async_sink_mt::overflow_policy::overrun_oldest);
async_sink::config cfg;
cfg.policy = async_sink::overflow_policy::overrun_oldest;
cfg.queue_size = queue_size;
auto file_sink = std::make_shared<basic_file_sink_mt>(filename, true);
async_sink->add_sink(std::move(file_sink));
cfg.sinks.push_back(std::move(file_sink));
auto async_sink = std::make_shared<sinks::async_sink>(cfg);
auto logger = std::make_shared<spdlog::logger>("async_logger", std::move(async_sink));
bench_mt(howmany, std::move(logger), threads);
}
Expand Down
77 changes: 40 additions & 37 deletions include/spdlog/sinks/async_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <thread>
#include <vector>

#include "../details/async_log_msg.h"
#include "dist_sink.h"
#include "sink.h"

// async_sink is a sink that sends log messages to a dist_sink in a separate thread using a queue.
// The worker thread dequeues the messages and sends them to the dist_sink to perform the actual logging.
// The worker thread is terminated when the async_sink is destroyed.
// Once the sink is destroyed, the worker thread empties the queue and exits.

namespace spdlog::details { // forward declaration
template <typename T>
Expand All @@ -20,61 +22,62 @@ class mpmc_blocking_queue;
namespace spdlog {
namespace sinks {

template <typename Mutex>
class async_sink final : public dist_sink<Mutex> {
class async_sink final : public sink {
public:
using base_t = dist_sink<Mutex>;
using async_log_msg = details::async_log_msg;
using queue_t = details::mpmc_blocking_queue<async_log_msg>;
enum { default_queue_size = 8192, max_queue_size = 1024 * 1024 * 10 };

// Async overflow policy - block by default.
enum class overflow_policy : std::uint8_t {
block, // Block until the log message can be enqueued (default).
overrun_oldest, // Overrun the oldest message in the queue if full.
discard_new // Discard the log message if the queue is full
};

async_sink(size_t queue_size, std::function<void()> on_thread_start, std::function<void()> on_thread_stop);
~async_sink() override;
enum { default_queue_size = 8192, max_queue_size = 10 * 1024 * 1024 };

struct config {
size_t queue_size = default_queue_size;
overflow_policy policy = overflow_policy::block;
std::vector<std::shared_ptr<sink>> sinks;
std::function<void()> on_thread_start = nullptr;
std::function<void()> on_thread_stop = nullptr;
};

explicit async_sink(config async_config);

// create an async_sink with one backend sink
template <typename Sink, typename... SinkArgs>
static std::shared_ptr<async_sink> with_sink(SinkArgs &&...sink_args) {
config cfg{};
cfg.sinks.emplace_back(std::make_shared<Sink>(std::forward<SinkArgs>(sink_args)...));
return std::make_shared<async_sink>(cfg);
}

async_sink();
explicit async_sink(size_t queue_size);
async_sink(std::function<void()> on_thread_start, std::function<void()> on_thread_stop);
async_sink(const async_sink &) = delete;
async_sink &operator=(const async_sink &) = delete;
async_sink(async_sink &&) = default;
async_sink &operator=(async_sink &&) = default;
~async_sink() override;

void set_overflow_policy(overflow_policy policy);
[[nodiscard]] overflow_policy get_overflow_policy() const;
// sink interface implementation
void log(const details::log_msg &msg) override;
void flush() override;
void set_pattern(const std::string &pattern) override;
void set_formatter(std::unique_ptr<formatter> sink_formatter) override;

// async sink specific methods
[[nodiscard]] size_t get_overrun_counter() const;
void reset_overrun_counter() const;

[[nodiscard]] size_t get_discard_counter() const;
void reset_discard_counter() const;
[[nodiscard]] const config &get_config() const;

private:
void sink_it_(const details::log_msg &msg) override;
void flush_() override;
void send_message_(async_log_msg::type msg_type, const details::log_msg &msg);
void backend_loop_();
using async_log_msg = details::async_log_msg;
using queue_t = details::mpmc_blocking_queue<async_log_msg>;

std::atomic<overflow_policy> overflow_policy_ = overflow_policy::block;
void send_message_(async_log_msg::type msg_type, const details::log_msg &msg) const;
void backend_loop_() const;
void backend_log_(const details::log_msg &msg) const;
void backend_flush_() const;

config config_;
std::unique_ptr<queue_t> q_;
std::thread worker_thread_;
};

using async_sink_mt = async_sink<std::mutex>;
using async_sink_st = async_sink<details::null_mutex>;

} // namespace sinks

class logger;
template <typename... SinkArgs>
std::shared_ptr<logger> create_async(std::string logger_name, SinkArgs &&...sink_args) {
auto async_sink = std::make_shared<sinks::async_sink_mt>(std::forward<SinkArgs>(sink_args)...);
return std::make_shared<logger>(std::move(logger_name), std::move(async_sink));
}
} // namespace spdlog
120 changes: 50 additions & 70 deletions src/sinks/async_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,99 +3,71 @@

#include "spdlog/sinks/async_sink.h"

#include <cassert>
#include <memory>
#include <mutex>
#include <cassert>

#include "spdlog/details/mpmc_blocking_q.h"
#include "spdlog/common.h"
#include "spdlog/details/mpmc_blocking_q.h"
#include "spdlog/pattern_formatter.h"
#include "spdlog/spdlog.h"

namespace spdlog {
namespace sinks {

template <typename Mutex>
async_sink<Mutex>::async_sink(size_t queue_size, std::function<void()> on_thread_start, std::function<void()> on_thread_stop)
: base_t() {
if (queue_size == 0 || queue_size > max_queue_size) {
async_sink::async_sink(config async_config)
: config_(std::move(async_config)) {
if (config_.queue_size == 0 || config_.queue_size > max_queue_size) {
throw spdlog_ex("async_sink: invalid queue size");
}
// printf("........... Allocating queue: slot: %zu X %zu bytes ====> %lld KB ..............\n",
// queue_size, sizeof(details::async_log_msg), (sizeof(details::async_log_msg) * queue_size)/1024);
q_ = std::make_unique<queue_t>(queue_size);

worker_thread_ = std::thread([this, on_thread_start, on_thread_stop] {
if (on_thread_start) on_thread_start();
q_ = std::make_unique<queue_t>(config_.queue_size);
worker_thread_ = std::thread([this] {
if (config_.on_thread_start) config_.on_thread_start();
this->backend_loop_();
if (on_thread_stop) on_thread_stop();
if (config_.on_thread_stop) config_.on_thread_stop();
});
}

template <typename Mutex>
async_sink<Mutex>::~async_sink() {
async_sink::~async_sink() {
try {
q_->enqueue(async_log_msg(async_log_msg::type::terminate));
worker_thread_.join();
} catch (...) {
printf("Exception in ~async_sink()\n");
}
};

template <typename Mutex>
async_sink<Mutex>::async_sink()
: async_sink(default_queue_size, nullptr, nullptr) {}
void async_sink::log(const details::log_msg &msg) { send_message_(async_log_msg::type::log, msg); }

template <typename Mutex>
async_sink<Mutex>::async_sink(size_t queue_size)
: async_sink(queue_size, nullptr, nullptr) {}
void async_sink::flush() { send_message_(async_log_msg::type::flush, details::log_msg()); }

template <typename Mutex>
async_sink<Mutex>::async_sink(std::function<void()> on_thread_start, std::function<void()> on_thread_stop)
: async_sink(default_queue_size, on_thread_start, on_thread_stop) {}
void async_sink::set_pattern(const std::string &pattern) { set_formatter(std::make_unique<pattern_formatter>(pattern)); }

template <typename Mutex>
void async_sink<Mutex>::sink_it_(const details::log_msg &msg) {
send_message_(async_log_msg::type::log, msg);
void async_sink::set_formatter(std::unique_ptr<formatter> formatter) {
const auto &sinks = config_.sinks;
for (auto it = sinks.begin(); it != sinks.end(); ++it) {
if (std::next(it) == sinks.end()) {
// last element - we can move it.
(*it)->set_formatter(std::move(formatter));
break; // to prevent clang-tidy warning
}
(*it)->set_formatter(formatter->clone());
}
}

size_t async_sink::get_overrun_counter() const { return q_->overrun_counter(); }

template <typename Mutex>
void async_sink<Mutex>::set_overflow_policy(overflow_policy policy) {
overflow_policy_ = policy;
}

template <typename Mutex>
typename async_sink<Mutex>::overflow_policy async_sink<Mutex>::get_overflow_policy() const {
return overflow_policy_;
}
void async_sink::reset_overrun_counter() const { q_->reset_overrun_counter(); }

template <typename Mutex>
size_t async_sink<Mutex>::get_overrun_counter() const {
return q_->overrun_counter();
}
size_t async_sink::get_discard_counter() const { return q_->discard_counter(); }

template <typename Mutex>
void async_sink<Mutex>::reset_overrun_counter() const {
q_->reset_overrun_counter();
}
void async_sink::reset_discard_counter() const { q_->reset_discard_counter(); }

template <typename Mutex>
size_t async_sink<Mutex>::get_discard_counter() const {
return q_->discard_counter();
}
const async_sink::config &async_sink::get_config() const { return config_; }

template <typename Mutex>
void async_sink<Mutex>::reset_discard_counter() const {
q_->reset_discard_counter();
}

template <typename Mutex>
void async_sink<Mutex>::flush_() {
send_message_(async_log_msg::type::flush, details::log_msg());
}

template <typename Mutex>
void async_sink<Mutex>::send_message_(async_log_msg::type msg_type, const details::log_msg &msg) {
switch (overflow_policy_) {
// private methods
void async_sink::send_message_(async_log_msg::type msg_type, const details::log_msg &msg) const {
switch (config_.policy) {
case overflow_policy::block:
q_->enqueue(async_log_msg(msg_type, msg));
break;
Expand All @@ -111,17 +83,16 @@ void async_sink<Mutex>::send_message_(async_log_msg::type msg_type, const detail
}
}

template <typename Mutex>
void async_sink<Mutex>::backend_loop_() {
void async_sink::backend_loop_() const {
details::async_log_msg incoming_msg;
for (;;) {
q_->dequeue(incoming_msg);
switch (incoming_msg.message_type()) {
case async_log_msg::type::log:
base_t::sink_it_(incoming_msg);
backend_log_(incoming_msg);
break;
case async_log_msg::type::flush:
base_t::flush_();
backend_flush_();
break;
case async_log_msg::type::terminate:
return;
Expand All @@ -131,10 +102,19 @@ void async_sink<Mutex>::backend_loop_() {
}
}

void async_sink::backend_log_(const details::log_msg &msg) const {
for (const auto &sink : config_.sinks) {
if (sink->should_log(msg.log_level)) {
sink->log(msg);
}
}
}

void async_sink::backend_flush_() const {
for (const auto &sink : config_.sinks) {
sink->flush();
}
}

} // namespace sinks
} // namespace spdlog

// template instantiations
#include "spdlog/details/null_mutex.h"
template class SPDLOG_API spdlog::sinks::async_sink<std::mutex>;
template class SPDLOG_API spdlog::sinks::async_sink<spdlog::details::null_mutex>;
Loading

0 comments on commit a7298c5

Please sign in to comment.