Skip to content

Commit 6380631

Browse files
committed
Merge branch 'tcp-bugfix-decoder' into 'master'
TCP input: fix bug when detecting decoder of connection See merge request monitoring/ipfixcol2!19
2 parents f259d54 + c8b105d commit 6380631

File tree

10 files changed

+50
-39
lines changed

10 files changed

+50
-39
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ endif()
1515
# Versions and other informations
1616
set(IPFIXCOL_VERSION_MAJOR 2)
1717
set(IPFIXCOL_VERSION_MINOR 7)
18-
set(IPFIXCOL_VERSION_PATCH 0)
18+
set(IPFIXCOL_VERSION_PATCH 1)
1919
set(IPFIXCOL_VERSION
2020
${IPFIXCOL_VERSION_MAJOR}.${IPFIXCOL_VERSION_MINOR}.${IPFIXCOL_VERSION_PATCH})
2121

src/plugins/input/tcp/src/Acceptor.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,18 @@
2626
#include <ipfixcol2.h> // ipx_ctx_t, ipx_strerror, IPX_CTX_WARNING
2727

2828
#include "ClientManager.hpp" // ClientManager
29-
#include "DecoderFactory.hpp" // DecoderFactory
3029
#include "Config.hpp" // Config
3130
#include "UniqueFd.hpp" // UniqueFd
3231
#include "IpAddress.hpp" // IpAddress, IpVersion
3332

3433
namespace tcp_in {
3534

36-
Acceptor::Acceptor(ClientManager &clients, DecoderFactory factory, ipx_ctx_t *ctx) :
35+
Acceptor::Acceptor(ClientManager &clients, ipx_ctx_t *ctx) :
3736
m_epoll(),
3837
m_sockets(),
3938
m_pipe_in(),
4039
m_pipe_out(),
4140
m_clients(clients),
42-
m_factory(std::move(factory)),
4341
m_thread(),
4442
m_ctx(ctx)
4543
{
@@ -231,8 +229,7 @@ void Acceptor::mainloop() {
231229
}
232230

233231
try {
234-
auto decoder = m_factory.detect_decoder(new_sd.get());
235-
m_clients.add_connection(std::move(new_sd), std::move(decoder));
232+
m_clients.add_connection(std::move(new_sd));
236233
} catch (std::exception &ex) {
237234
IPX_CTX_ERROR(m_ctx, "Acceptor: %s", ex.what());
238235
}

src/plugins/input/tcp/src/Acceptor.hpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,10 @@ class Acceptor {
3232
* @brief Creates the acceptor thread.
3333
*
3434
* @param clients Reference to client manager.
35-
* @param factory Initialized decoder factory.
3635
* @param config File configuration.
3736
* @param ctx The plugin context.
3837
*/
39-
Acceptor(ClientManager &clients, DecoderFactory factory, ipx_ctx_t *ctx);
38+
Acceptor(ClientManager &clients, ipx_ctx_t *ctx);
4039

4140
// force that acceptor stays in its original memory (so that `this` pointer stays valid on the
4241
// other thread)
@@ -79,7 +78,6 @@ class Acceptor {
7978

8079
/** Accepted clients. */
8180
ClientManager &m_clients;
82-
DecoderFactory m_factory;
8381
std::thread m_thread;
8482
ipx_ctx_t *m_ctx;
8583
};

src/plugins/input/tcp/src/ClientManager.cpp

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@
2525
#include <ipfixcol2.h> // ipx_strerror, ipx_ctx_t, ipx_session, IPX_CTX_INFO, IPX_CTX_WARNING
2626

2727
#include "Connection.hpp" // Connection
28-
#include "Decoder.hpp" // Decoder
2928
#include "UniqueFd.hpp" // UniqueFd
3029

3130
namespace tcp_in {
3231

33-
ClientManager::ClientManager(ipx_ctx_t *ctx) :
32+
ClientManager::ClientManager(ipx_ctx_t *ctx, DecoderFactory factory) :
3433
m_ctx(ctx),
3534
m_epoll(),
3635
m_mutex(),
37-
m_connections()
36+
m_connections(),
37+
m_factory(std::move(factory))
3838
{}
3939

40-
void ClientManager::add_connection(UniqueFd fd, std::unique_ptr<Decoder> decoder) {
40+
void ClientManager::add_connection(UniqueFd fd) {
4141
const char *err_str;
4242

4343
// get the flags and set it to non-blocking mode
@@ -57,17 +57,12 @@ void ClientManager::add_connection(UniqueFd fd, std::unique_ptr<Decoder> decoder
5757

5858
int borrowed_fd = fd.get();
5959

60-
std::unique_ptr<Connection> connection(new Connection(std::move(fd), std::move(decoder)));
60+
std::unique_ptr<Connection> connection(new Connection(std::move(fd), m_factory, m_ctx));
6161

6262
auto net = &connection->get_session()->tcp.net;
6363
std::array<char, INET6_ADDRSTRLEN> src_addr_str{};
6464
inet_ntop(net->l3_proto, &net->addr_src, src_addr_str.begin(), src_addr_str.size());
6565
IPX_CTX_INFO(m_ctx, "New exporter connected from '%s'.", src_addr_str.begin());
66-
IPX_CTX_INFO(
67-
m_ctx,
68-
"Using %s Decoder for the new connection",
69-
connection->get_decoder().get_name()
70-
);
7166

7267
std::lock_guard<std::mutex> lock(m_mutex);
7368

src/plugins/input/tcp/src/ClientManager.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#include <ipfixcol2.h> // ipx_session, ipx_ctx_t
1919

2020
#include "Connection.hpp" // Connection
21-
#include "Decoder.hpp" // Decoder
21+
#include "DecoderFactory.hpp" // Decoder
2222
#include "UniqueFd.hpp" // UniqueFd
2323
#include "Epoll.hpp" // Epoll
2424

@@ -31,15 +31,14 @@ class ClientManager {
3131
* @brief Creates client manager with no clients.
3232
* @throws when fails to create epoll
3333
*/
34-
ClientManager(ipx_ctx_t *ctx);
34+
ClientManager(ipx_ctx_t *ctx, DecoderFactory factory);
3535

3636
/**
3737
* @brief Adds connection to the vector and epoll.
3838
* @param fd file descriptor of the new tcp connection.
39-
* @param decoder decoder for the connection.
4039
* @throws when fails to add the new connection to epoll or when fails to create new session.
4140
*/
42-
void add_connection(UniqueFd fd, std::unique_ptr<Decoder> decoder);
41+
void add_connection(UniqueFd fd);
4342

4443
/**
4544
* @brief Removes connection from the vector based on its session. This is safe only for the
@@ -69,6 +68,7 @@ class ClientManager {
6968
Epoll m_epoll;
7069
std::mutex m_mutex;
7170
std::vector<std::unique_ptr<Connection>> m_connections;
71+
DecoderFactory m_factory;
7272
};
7373

7474
} // namespace tcp_in

src/plugins/input/tcp/src/Connection.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,11 @@
2626

2727
namespace tcp_in {
2828

29-
Connection::Connection(UniqueFd fd, std::unique_ptr<Decoder> decoder) :
30-
m_session(nullptr),
29+
Connection::Connection(UniqueFd fd, DecoderFactory &factory, ipx_ctx *ctx) :
3130
m_fd(std::move(fd)),
32-
m_new_connnection(true),
33-
m_decoder(std::move(decoder))
31+
m_factory(factory),
32+
m_ctx(ctx)
3433
{
35-
if (!m_decoder) {
36-
throw std::runtime_error("Decoder was null.");
37-
}
3834
const char *err_str;
3935

4036
sockaddr_storage src_addr;
@@ -96,6 +92,19 @@ Connection::Connection(UniqueFd fd, std::unique_ptr<Decoder> decoder) :
9692
}
9793

9894
bool Connection::receive(ipx_ctx_t *ctx) {
95+
if (!m_decoder) {
96+
m_decoder = m_factory.detect_decoder(m_fd.get());
97+
if (!m_decoder) {
98+
return true;
99+
}
100+
101+
IPX_CTX_INFO(
102+
m_ctx,
103+
"Using %s Decoder for the new connection",
104+
m_decoder->get_name()
105+
);
106+
}
107+
99108
auto &buffer = m_decoder->decode();
100109
buffer.process_decoded([=](ByteVector &&msg) { send_msg(ctx, std::move(msg)); });
101110
return !buffer.is_eof_reached();

src/plugins/input/tcp/src/Connection.hpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "ByteVector.hpp" // ByteVector
1818
#include "Decoder.hpp" // Decoder
19+
#include "DecoderFactory.hpp"
1920
#include "UniqueFd.hpp" // UniqueFd
2021

2122
namespace tcp_in {
@@ -26,10 +27,10 @@ class Connection {
2627
/**
2728
* @brief Creates new connection with this TCP connection file descriptor.
2829
* @param fd File descriptor of the new tcp connection.
29-
* @param decoder Decoder to use in this connection.
30+
* @param factory The decoder factory to decide the decoder of this connection
3031
* @throws when fails to create new session
3132
*/
32-
Connection(UniqueFd fd, std::unique_ptr<Decoder> decoder);
33+
Connection(UniqueFd fd, DecoderFactory& factory, ipx_ctx_t *ctx);
3334

3435
Connection(const Connection &) = delete;
3536

@@ -69,13 +70,18 @@ class Connection {
6970
private:
7071
void send_msg(ipx_ctx_t *ctx, ByteVector &&msg);
7172

72-
ipx_session *m_session;
7373
/** TCP file descriptor */
7474
UniqueFd m_fd;
75+
/** Decoder factory */
76+
DecoderFactory &m_factory;
77+
/** Plugin context for logging */
78+
ipx_ctx_t *m_ctx;
79+
/** The session identifier */
80+
ipx_session *m_session = nullptr;
7581
/** true if this connection didn't receive any full messages, otherwise false. */
76-
bool m_new_connnection;
82+
bool m_new_connnection = true;
7783
/** selected decoder or nullptr. */
78-
std::unique_ptr<Decoder> m_decoder;
84+
std::unique_ptr<Decoder> m_decoder = nullptr;
7985
};
8086

8187
} // namespace tcp_in

src/plugins/input/tcp/src/DecoderFactory.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ std::unique_ptr<Decoder> DecoderFactory::detect_decoder(int fd) {
3939

4040
std::array<uint8_t, MAX_MAGIC_LEN> buf{};
4141

42-
auto res = recv(fd, buf.begin(), buf.size(), MSG_PEEK | MSG_WAITALL);
42+
auto res = recv(fd, buf.begin(), buf.size(), MSG_PEEK | MSG_DONTWAIT);
43+
if (res == EAGAIN || res == EWOULDBLOCK) {
44+
// Not enough data yet
45+
return nullptr;
46+
}
47+
4348
if (res == -1) {
4449
const char *err_msg;
4550
ipx_strerror(errno, err_msg);

src/plugins/input/tcp/src/DecoderFactory.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ class DecoderFactory {
2626
* constructs it. This function may block if the decoder cannot be determined without recieving
2727
* more data.
2828
* @param fd TCP stream file descriptor
29-
* @return Instance of the correct decoder, nullptr no decoder matches the data.
29+
* @throws std::runtime_error on socket error or if no decoder matches the data
30+
* @return Instance of the correct decoder, nullptr if there is not enough data to decide the decoder yet
3031
*/
3132
std::unique_ptr<Decoder> detect_decoder(int fd);
3233

src/plugins/input/tcp/src/Plugin.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ namespace tcp_in {
2424

2525
Plugin::Plugin(ipx_ctx_t *ctx, Config &config) :
2626
m_ctx(ctx),
27-
m_clients(ctx),
28-
m_acceptor(m_clients, DecoderFactory(), ctx)
27+
m_clients(ctx, DecoderFactory()),
28+
m_acceptor(m_clients, ctx)
2929
{
3030
m_acceptor.bind_addresses(config);
3131
m_acceptor.start();

0 commit comments

Comments
 (0)