diff --git a/src/include/management.h b/src/include/management.h index 0123e122..e38b286e 100644 --- a/src/include/management.h +++ b/src/include/management.h @@ -58,6 +58,7 @@ extern "C" { #define MANAGEMENT_CLIENT_FD 16 #define MANAGEMENT_SWITCH_TO 17 #define MANAGEMENT_RELOAD 18 +#define MANAGEMENT_REMOVE_FD 19 /** * Read the management header @@ -294,6 +295,16 @@ pgagroal_management_switch_to(SSL* ssl, int socket, char* server); int pgagroal_management_reload(SSL* ssl, int socket); +/** + * Management operation: Remove socket descriptor + * @param slot The slot + * @param socket The socket + * @param pid The pid + * @return 0 upon success, otherwise 1 + */ +int +pgagroal_management_remove_fd(int32_t slot, int socket, pid_t pid); + #ifdef __cplusplus } #endif diff --git a/src/libpgagroal/management.c b/src/libpgagroal/management.c index 03267004..b87bcf89 100644 --- a/src/libpgagroal/management.c +++ b/src/libpgagroal/management.c @@ -137,6 +137,7 @@ pgagroal_management_read_payload(int socket, signed char id, int* payload_i, cha case MANAGEMENT_FLUSH: case MANAGEMENT_KILL_CONNECTION: case MANAGEMENT_CLIENT_DONE: + case MANAGEMENT_REMOVE_FD: if (read_complete(NULL, socket, &buf4[0], sizeof(buf4))) { goto error; @@ -1155,6 +1156,58 @@ pgagroal_management_reload(SSL* ssl, int fd) return 1; } +int +pgagroal_management_remove_fd(int32_t slot, int socket, pid_t pid) +{ + char p[MISC_LENGTH]; + int fd; + char buf[4]; + struct configuration* config; + + config = (struct configuration*)shmem; + + if (atomic_load(&config->states[slot]) == STATE_NOTINIT) + { + return 0; + } + + memset(&p, 0, sizeof(p)); + snprintf(&p[0], sizeof(p), ".s.%d", pid); + + if (pgagroal_connect_unix_socket(config->unix_socket_dir, &p[0], &fd)) + { + pgagroal_log_debug("pgagroal_management_remove_fd: slot %d state %d database %s user %s socket %d pid %d connect: %d", + slot, atomic_load(&config->states[slot]), + config->connections[slot].database, config->connections[slot].username, socket, pid, fd); + errno = 0; + goto error; + } + + if (write_header(NULL, fd, MANAGEMENT_REMOVE_FD, slot)) + { + pgagroal_log_warn("pgagroal_management_remove_fd: write: %d", fd); + errno = 0; + goto error; + } + + pgagroal_write_int32(&buf, socket); + if (write_complete(NULL, fd, &buf, sizeof(buf))) + { + pgagroal_log_warn("pgagroal_management_remove_fd: write: %d %s", fd, strerror(errno)); + errno = 0; + goto error; + } + + pgagroal_disconnect(fd); + + return 0; + +error: + pgagroal_disconnect(fd); + + return 1; +} + static int read_complete(SSL* ssl, int socket, void* buf, size_t size) { diff --git a/src/libpgagroal/pipeline_perf.c b/src/libpgagroal/pipeline_perf.c index 62c7292d..eae37aca 100644 --- a/src/libpgagroal/pipeline_perf.c +++ b/src/libpgagroal/pipeline_perf.c @@ -29,7 +29,9 @@ /* pgagroal */ #include #include +#include #include +#include #include #include @@ -37,6 +39,10 @@ #include #include #include +#include +#include +#include +#include static int performance_initialize(void*, void**, size_t*); static void performance_start(struct ev_loop *loop, struct worker_io*); @@ -61,6 +67,16 @@ struct pipeline performance_pipeline(void) return pipeline; } +static int unix_socket = -1; +static struct ev_io io_mgt; + +static int fds[MAX_NUMBER_OF_CONNECTIONS]; +static bool news[MAX_NUMBER_OF_CONNECTIONS]; + +static void start_mgt(struct ev_loop *loop); +static void shutdown_mgt(struct ev_loop *loop); +static void accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents); + static int performance_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shmem_size) { @@ -70,11 +86,42 @@ performance_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shme static void performance_start(struct ev_loop *loop, struct worker_io* w) { + char p[MISC_LENGTH]; + struct configuration* config; + + config = (struct configuration*)shmem; + + for (int i = 0; i < config->max_connections; i++) + { + fds[i] = config->connections[i].fd; + news[i] = config->connections[i].new; + } + + memset(&p, 0, sizeof(p)); + snprintf(&p[0], sizeof(p), ".s.%d", getpid()); + + if (pgagroal_bind_unix_socket(config->unix_socket_dir, &p[0], &unix_socket)) + { + pgagroal_log_fatal("pgagroal: Could not bind to %s/%s", config->unix_socket_dir, &p[0]); + goto error; + } + + start_mgt(loop); + + return; + +error: + + exit_code = WORKER_FAILURE; + running = 0; + ev_break(loop, EVBREAK_ALL); + return; } static void performance_stop(struct ev_loop *loop, struct worker_io* w) { + shutdown_mgt(loop); } static void @@ -93,6 +140,7 @@ performance_client(struct ev_loop *loop, struct ev_io *watcher, int revents) int status = MESSAGE_STATUS_ERROR; struct worker_io* wi = NULL; struct message* msg = NULL; + struct configuration* config = NULL; wi = (struct worker_io*)watcher; @@ -127,7 +175,10 @@ performance_client(struct ev_loop *loop, struct ev_io *watcher, int revents) return; client_error: - pgagroal_log_warn("[C] Client error: %s (socket %d status %d)", strerror(errno), wi->client_fd, status); + config = (struct configuration*)shmem; + pgagroal_log_warn("[C] Client error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->client_fd, status); pgagroal_log_message(msg); errno = 0; @@ -137,7 +188,10 @@ performance_client(struct ev_loop *loop, struct ev_io *watcher, int revents) return; server_error: - pgagroal_log_warn("[C] Server error: %s (socket %d status %d)", strerror(errno), wi->server_fd, status); + config = (struct configuration*)shmem; + pgagroal_log_warn("[C] Server error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->server_fd, status); pgagroal_log_message(msg); errno = 0; @@ -154,6 +208,7 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents) bool fatal = false; struct worker_io* wi = NULL; struct message* msg = NULL; + struct configuration* config = NULL; wi = (struct worker_io*)watcher; @@ -194,7 +249,10 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents) return; client_error: - pgagroal_log_warn("[S] Client error: %s (socket %d status %d)", strerror(errno), wi->client_fd, status); + config = (struct configuration*)shmem; + pgagroal_log_warn("[S] Client error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->client_fd, status); pgagroal_log_message(msg); errno = 0; @@ -204,7 +262,10 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents) return; server_error: - pgagroal_log_warn("[S] Server error: %s (socket %d status %d)", strerror(errno), wi->server_fd, status); + config = (struct configuration*)shmem; + pgagroal_log_warn("[S] Server error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->server_fd, status); pgagroal_log_message(msg); errno = 0; @@ -213,3 +274,80 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents) ev_break(loop, EVBREAK_ALL); return; } + +static void +start_mgt(struct ev_loop *loop) +{ + memset(&io_mgt, 0, sizeof(struct ev_io)); + ev_io_init(&io_mgt, accept_cb, unix_socket, EV_READ); + ev_io_start(loop, &io_mgt); +} + +static void +shutdown_mgt(struct ev_loop* loop) +{ + char p[MISC_LENGTH]; + struct configuration* config = NULL; + + config = (struct configuration*)shmem; + + memset(&p, 0, sizeof(p)); + snprintf(&p[0], sizeof(p), ".s.%d", getpid()); + + ev_io_stop(loop, &io_mgt); + pgagroal_disconnect(unix_socket); + errno = 0; + pgagroal_remove_unix_socket(config->unix_socket_dir, &p[0]); + errno = 0; +} + +static void +accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) +{ + struct sockaddr_in client_addr; + socklen_t client_addr_length; + int client_fd; + signed char id; + int32_t slot; + int payload_i; + char* payload_s = NULL; + + pgagroal_log_trace("accept_cb: sockfd ready (%d)", revents); + + if (EV_ERROR & revents) + { + pgagroal_log_debug("accept_cb: invalid event: %s", strerror(errno)); + errno = 0; + return; + } + + client_addr_length = sizeof(client_addr); + client_fd = accept(watcher->fd, (struct sockaddr *)&client_addr, &client_addr_length); + if (client_fd == -1) + { + pgagroal_log_debug("accept: %s (%d)", strerror(errno), watcher->fd); + errno = 0; + return; + } + + /* Process internal management request -- f.ex. returning a file descriptor to the pool */ + pgagroal_management_read_header(client_fd, &id, &slot); + pgagroal_management_read_payload(client_fd, id, &payload_i, &payload_s); + + switch (id) + { + case MANAGEMENT_REMOVE_FD: + pgagroal_log_debug("pgagroal: Management remove file descriptor: Slot %d FD %d", slot, payload_i); + if (fds[slot] == payload_i && !news[slot]) + { + pgagroal_disconnect(payload_i); + fds[slot] = 0; + } + break; + default: + pgagroal_log_debug("pgagroal: Unsupported management id: %d", id); + break; + } + + pgagroal_disconnect(client_fd); +} diff --git a/src/libpgagroal/pipeline_session.c b/src/libpgagroal/pipeline_session.c index e2eff47f..5d332cb4 100644 --- a/src/libpgagroal/pipeline_session.c +++ b/src/libpgagroal/pipeline_session.c @@ -29,18 +29,24 @@ /* pgagroal */ #include #include +#include #include +#include #include #include #include #include -#include #include +#include /* system */ #include #include #include +#include +#include +#include +#include static int session_initialize(void*, void**, size_t*); static void session_start(struct ev_loop *loop, struct worker_io*); @@ -53,6 +59,8 @@ static void session_periodic(void); static bool in_tx; static int next_client_message; static int next_server_message; +static int unix_socket = -1; +static struct ev_io io_mgt; #define CLIENT_INIT 0 #define CLIENT_IDLE 1 @@ -65,6 +73,13 @@ struct client_session time_t timestamp; /**< The last used timestamp */ }; +static int fds[MAX_NUMBER_OF_CONNECTIONS]; +static bool news[MAX_NUMBER_OF_CONNECTIONS]; + +static void start_mgt(struct ev_loop *loop); +static void shutdown_mgt(struct ev_loop *loop); +static void accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents); + static void client_active(int); static void client_inactive(int); @@ -123,12 +138,22 @@ session_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shmem_si static void session_start(struct ev_loop *loop, struct worker_io* w) { + char p[MISC_LENGTH]; struct client_session* client; + struct configuration* config; + + config = (struct configuration*)shmem; in_tx = false; next_client_message = 0; next_server_message = 0; + for (int i = 0; i < config->max_connections; i++) + { + fds[i] = config->connections[i].fd; + news[i] = config->connections[i].new; + } + if (pipeline_shmem != NULL) { client = pipeline_shmem + (w->slot * sizeof(struct client_session)); @@ -136,6 +161,26 @@ session_start(struct ev_loop *loop, struct worker_io* w) atomic_store(&client->state, CLIENT_IDLE); client->timestamp = time(NULL); } + + memset(&p, 0, sizeof(p)); + snprintf(&p[0], sizeof(p), ".s.%d", getpid()); + + if (pgagroal_bind_unix_socket(config->unix_socket_dir, &p[0], &unix_socket)) + { + pgagroal_log_fatal("pgagroal: Could not bind to %s/%s", config->unix_socket_dir, &p[0]); + goto error; + } + + start_mgt(loop); + + return; + +error: + + exit_code = WORKER_FAILURE; + running = 0; + ev_break(loop, EVBREAK_ALL); + return; } static void @@ -143,6 +188,8 @@ session_stop(struct ev_loop *loop, struct worker_io* w) { struct client_session* client; + shutdown_mgt(loop); + if (pipeline_shmem != NULL) { client = pipeline_shmem + (w->slot * sizeof(struct client_session)); @@ -320,7 +367,9 @@ session_client(struct ev_loop *loop, struct ev_io *watcher, int revents) return; client_error: - pgagroal_log_warn("[C] Client error: %s (socket %d status %d)", strerror(errno), wi->client_fd, status); + pgagroal_log_warn("[C] Client error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->client_fd, status); pgagroal_log_message(msg); errno = 0; @@ -332,7 +381,9 @@ session_client(struct ev_loop *loop, struct ev_io *watcher, int revents) return; server_error: - pgagroal_log_warn("[C] Server error: %s (socket %d status %d)", strerror(errno), wi->server_fd, status); + pgagroal_log_warn("[C] Server error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->server_fd, status); pgagroal_log_message(msg); errno = 0; @@ -360,6 +411,7 @@ session_server(struct ev_loop *loop, struct ev_io *watcher, int revents) bool fatal = false; struct worker_io* wi = NULL; struct message* msg = NULL; + struct configuration* config = NULL; wi = (struct worker_io*)watcher; @@ -453,7 +505,10 @@ session_server(struct ev_loop *loop, struct ev_io *watcher, int revents) return; client_error: - pgagroal_log_warn("[S] Client error: %s (socket %d status %d)", strerror(errno), wi->client_fd, status); + config = (struct configuration*)shmem; + pgagroal_log_warn("[S] Client error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->client_fd, status); pgagroal_log_message(msg); errno = 0; @@ -465,7 +520,10 @@ session_server(struct ev_loop *loop, struct ev_io *watcher, int revents) return; server_error: - pgagroal_log_warn("[S] Server error: %s (socket %d status %d)", strerror(errno), wi->server_fd, status); + config = (struct configuration*)shmem; + pgagroal_log_warn("[S] Server error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->server_fd, status); pgagroal_log_message(msg); errno = 0; @@ -477,6 +535,83 @@ session_server(struct ev_loop *loop, struct ev_io *watcher, int revents) return; } +static void +start_mgt(struct ev_loop *loop) +{ + memset(&io_mgt, 0, sizeof(struct ev_io)); + ev_io_init(&io_mgt, accept_cb, unix_socket, EV_READ); + ev_io_start(loop, &io_mgt); +} + +static void +shutdown_mgt(struct ev_loop* loop) +{ + char p[MISC_LENGTH]; + struct configuration* config = NULL; + + config = (struct configuration*)shmem; + + memset(&p, 0, sizeof(p)); + snprintf(&p[0], sizeof(p), ".s.%d", getpid()); + + ev_io_stop(loop, &io_mgt); + pgagroal_disconnect(unix_socket); + errno = 0; + pgagroal_remove_unix_socket(config->unix_socket_dir, &p[0]); + errno = 0; +} + +static void +accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) +{ + struct sockaddr_in client_addr; + socklen_t client_addr_length; + int client_fd; + signed char id; + int32_t slot; + int payload_i; + char* payload_s = NULL; + + pgagroal_log_trace("accept_cb: sockfd ready (%d)", revents); + + if (EV_ERROR & revents) + { + pgagroal_log_debug("accept_cb: invalid event: %s", strerror(errno)); + errno = 0; + return; + } + + client_addr_length = sizeof(client_addr); + client_fd = accept(watcher->fd, (struct sockaddr *)&client_addr, &client_addr_length); + if (client_fd == -1) + { + pgagroal_log_debug("accept: %s (%d)", strerror(errno), watcher->fd); + errno = 0; + return; + } + + /* Process internal management request -- f.ex. returning a file descriptor to the pool */ + pgagroal_management_read_header(client_fd, &id, &slot); + pgagroal_management_read_payload(client_fd, id, &payload_i, &payload_s); + + switch (id) + { + case MANAGEMENT_REMOVE_FD: + pgagroal_log_debug("pgagroal: Management remove file descriptor: Slot %d FD %d", slot, payload_i); + if (fds[slot] == payload_i && !news[slot]) + { + pgagroal_disconnect(payload_i); + fds[slot] = 0; + } + break; + default: + pgagroal_log_debug("pgagroal: Unsupported management id: %d", id); + break; + } + + pgagroal_disconnect(client_fd); +} + static void client_active(int slot) { diff --git a/src/libpgagroal/pipeline_transaction.c b/src/libpgagroal/pipeline_transaction.c index f99af925..4e2ed21b 100644 --- a/src/libpgagroal/pipeline_transaction.c +++ b/src/libpgagroal/pipeline_transaction.c @@ -72,6 +72,8 @@ static int next_server_message; static int unix_socket = -1; static int deallocate; static bool fatal; +static int fds[MAX_NUMBER_OF_CONNECTIONS]; +static bool news[MAX_NUMBER_OF_CONNECTIONS]; static struct ev_io io_mgt; static struct worker_io server_io; @@ -123,6 +125,12 @@ transaction_start(struct ev_loop* loop, struct worker_io* w) goto error; } + for (int i = 0; i < config->max_connections; i++) + { + fds[i] = config->connections[i].fd; + news[i] = config->connections[i].new; + } + start_mgt(loop); pgagroal_tracking_event_slot(TRACKER_TX_RETURN_CONNECTION_START, w->slot); @@ -323,7 +331,10 @@ transaction_client(struct ev_loop* loop, struct ev_io* watcher, int revents) return; client_error: - pgagroal_log_warn("[C] Client error: %s (slot %d socket %d status %d)", strerror(errno), slot, wi->client_fd, status); + pgagroal_log_warn("[C] Client error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->client_fd, status); + pgagroal_log_message(msg); errno = 0; exit_code = WORKER_CLIENT_FAILURE; @@ -332,7 +343,10 @@ transaction_client(struct ev_loop* loop, struct ev_io* watcher, int revents) return; server_error: - pgagroal_log_warn("[C] Server error: %s (slot %d socket %d status %d)", strerror(errno), slot, wi->server_fd, status); + pgagroal_log_warn("[C] Server error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->server_fd, status); + pgagroal_log_message(msg); errno = 0; exit_code = WORKER_SERVER_FAILURE; @@ -490,7 +504,10 @@ transaction_server(struct ev_loop *loop, struct ev_io *watcher, int revents) return; client_error: - pgagroal_log_warn("[S] Client error: %s (slot %d socket %d status %d)", strerror(errno), slot, wi->client_fd, status); + pgagroal_log_warn("[S] Client error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->client_fd, status); + pgagroal_log_message(msg); errno = 0; exit_code = WORKER_CLIENT_FAILURE; @@ -499,7 +516,10 @@ transaction_server(struct ev_loop *loop, struct ev_io *watcher, int revents) return; server_error: - pgagroal_log_warn("[S] Server error: %s (slot %d socket %d status %d)", strerror(errno), slot, wi->server_fd, status); + pgagroal_log_warn("[S] Server error (slot %d database %s user %s): %s (socket %d status %d)", + wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username, + strerror(errno), wi->server_fd, status); + pgagroal_log_message(msg); errno = 0; exit_code = WORKER_SERVER_FAILURE; @@ -579,6 +599,15 @@ accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) { case MANAGEMENT_CLIENT_FD: pgagroal_log_debug("pgagroal: Management client file descriptor: Slot %d FD %d", slot, payload_i); + fds[slot] = payload_i; + break; + case MANAGEMENT_REMOVE_FD: + pgagroal_log_debug("pgagroal: Management remove file descriptor: Slot %d FD %d", slot, payload_i); + if (fds[slot] == payload_i && !news[slot]) + { + pgagroal_disconnect(payload_i); + fds[slot] = 0; + } break; default: pgagroal_log_debug("pgagroal: Unsupported management id: %d", id); diff --git a/src/libpgagroal/worker.c b/src/libpgagroal/worker.c index e4bd2686..ebf27ba9 100644 --- a/src/libpgagroal/worker.c +++ b/src/libpgagroal/worker.c @@ -285,7 +285,7 @@ signal_cb(struct ev_loop *loop, ev_signal *w, int revents) si = (struct signal_info*)w; - pgagroal_log_debug("pgagroal: signal for slot %d", si->slot); + pgagroal_log_debug("pgagroal: signal %d for slot %d", si->signal.signum, si->slot); exit_code = WORKER_SHUTDOWN; running = 0; diff --git a/src/main.c b/src/main.c index 9d193c51..08999360 100644 --- a/src/main.c +++ b/src/main.c @@ -1210,10 +1210,7 @@ accept_main_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) } else if (pid > 0) { - if (config->pipeline == PIPELINE_TRANSACTION) - { - add_client(pid); - } + add_client(pid); } else { @@ -1308,6 +1305,14 @@ accept_mgt_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) pgagroal_log_debug("pgagroal: Management kill connection: Slot %d", slot); if (known_fds[slot] == payload_i) { + struct client* c = clients; + + while (c != NULL) + { + pgagroal_management_remove_fd(slot, payload_i, c->pid); + c = c->next; + } + pgagroal_disconnect(payload_i); known_fds[slot] = 0; } @@ -1406,11 +1411,8 @@ accept_mgt_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) break; case MANAGEMENT_CLIENT_DONE: pgagroal_log_debug("pgagroal: Management client done"); - if (config->pipeline == PIPELINE_TRANSACTION) - { - pid_t p = (pid_t)payload_i; - remove_client(p); - } + pid_t p = (pid_t)payload_i; + remove_client(p); break; case MANAGEMENT_SWITCH_TO: pgagroal_log_debug("pgagroal: Management switch to");