Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rabbitmq/src/urabbitmq/client_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ std::vector<std::string> ParseHosts(const formats::json::Value& secdist_doc) {
const auto hosts_json = secdist_doc["hosts"];
auto hosts = hosts_json.As<std::vector<std::string>>();

UINVARIANT(!hosts.empty(), "Empty list of hosts in clickhouse secdist");
UINVARIANT(!hosts.empty(), "Empty list of hosts in rabbitmq secdist");
const auto unique_count = std::unordered_set<std::string>{hosts.begin(), hosts.end()}.size();

UINVARIANT(unique_count == hosts.size(), "Hosts are not unique in clickhouse secdist");
UINVARIANT(unique_count == hosts.size(), "Hosts are not unique in rabbitmq secdist");

return hosts;
}
Expand Down
4 changes: 3 additions & 1 deletion rabbitmq/src/urabbitmq/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ ConnectionPool::ConnectionPool(
// Already logged in base class
}

monitor_.Start("connection_pool_monitor", {{kPoolMonitorInterval}}, [this] { RunMonitor(); });
monitor_.Start("connection_pool_monitor", {{kPoolMonitorInterval}, {}, logging::Level::kDebug}, [this] {
RunMonitor();
});
}

ConnectionPool::~ConnectionPool() {
Expand Down
22 changes: 21 additions & 1 deletion rabbitmq/src/urabbitmq/impl/amqp_connection_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,23 @@ void AmqpConnectionHandler::onReady(AMQP::Connection*) {
connection_ready_event_.Send();
}

uint16_t AmqpConnectionHandler::onNegotiate(AMQP::Connection* connection, uint16_t interval) {
// we accept the suggestion from the server, but if the interval is smaller
// that one minute, we will use a one minute interval instead
if (interval < 60) interval = 60;

heartbeats_.Start(
fmt::format("rabbitmq_heartbeats_for_{}:{}", address_.hostname(), address_.port()),
std::chrono::seconds(interval),
[this, connection] {
if (!IsBroken()) connection->heartbeat();
}
);

// return the interval that we want to use
return interval;
}

void AmqpConnectionHandler::OnConnectionCreated(AmqpConnection* connection, engine::Deadline deadline) {
reader_.Start(connection);

Expand All @@ -169,7 +186,10 @@ void AmqpConnectionHandler::OnConnectionCreated(AmqpConnection* connection, engi

void AmqpConnectionHandler::OnConnectionDestruction() { reader_.Stop(); }

void AmqpConnectionHandler::Invalidate() { broken_ = true; }
void AmqpConnectionHandler::Invalidate() {
heartbeats_.Stop();
broken_ = true;
}

bool AmqpConnectionHandler::IsBroken() const { return broken_.load(); }

Expand Down
6 changes: 6 additions & 0 deletions rabbitmq/src/urabbitmq/impl/amqp_connection_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include <amqpcpp.h>

#include "userver/utils/periodic_task.hpp"

USERVER_NAMESPACE_BEGIN

namespace engine::io {
Expand Down Expand Up @@ -61,6 +63,8 @@ class AmqpConnectionHandler final : public AMQP::ConnectionHandler {

void onReady(AMQP::Connection* connection) override;

uint16_t onNegotiate(AMQP::Connection* connection, uint16_t interval) override;

void OnConnectionCreated(AmqpConnection* connection, engine::Deadline deadline);
void OnConnectionDestruction();

Expand Down Expand Up @@ -90,6 +94,8 @@ class AmqpConnectionHandler final : public AMQP::ConnectionHandler {

std::atomic<bool> is_ready_{false};
std::optional<std::string> error_;

utils::PeriodicTask heartbeats_;
};

} // namespace impl
Expand Down
Loading