Skip to content

Commit

Permalink
daemon/defer: use async-signal-safe formatting on SIGALRM
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček committed Feb 19, 2025
1 parent 62be654 commit c6339da
Showing 1 changed file with 157 additions and 11 deletions.
168 changes: 157 additions & 11 deletions daemon/defer.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,146 @@ V6_CONF = {1, V6_PREFIXES_CNT, V6_PREFIXES, V6_RATE_MULT, V6_SUBPRIO};
// payload is counted either as part of session wire buffer (for stream) or as part of iter ctx (for datagrams)


/// Async-signal-safe snprintf-like formatting function, it supports:
/// * %s takes (char *);
/// * %u takes unsigned, %NUMu allowed for padding with spaces or zeroes;
/// * %x takes unsigned, %NUMx allowed;
/// * %f takes double, behaves like %.3f;
/// * %r takes (struct sockaddr *).
int sigsafe_format(char *str, size_t size, const char *fmt, ...) {
char *strp = str; // ptr just after last written char
char *stre = str + size; // ptr just after str buffer
const char digits[] ="0123456789abcdef";
va_list ap;
va_start(ap, fmt);
while (*fmt && (stre-strp > 1)) {
const char *append_str = NULL;
int append_len = -1;
bool mod_zero = false;
int mod_int = 0;
int base = 10;
char tmpstr[50];

if (*fmt != '%') {
char *perc = strchr(fmt, '%');
append_str = fmt;
append_len = perc ? perc - fmt : strlen(fmt);
fmt += append_len;
} else while(fmt++, !append_str) {
switch(*fmt) {
case '%': // %%
append_str = "%";
break;
case 's': // just %s
append_str = va_arg(ap, char *);
break;
case 'x': // %x, %#x, %0#x
base = 16; // passthrough
case 'u': { // %u, %#u, %0#u
unsigned num = va_arg(ap, unsigned);
char *sp = tmpstr + sizeof(tmpstr);
*--sp = '\0';
while ((num > 0) || !*sp) {
*--sp = digits[num % base];
num /= base;
mod_int--;
}
while (mod_int-- > 0) {
*--sp = mod_zero ? '0' : ' ';
}
append_str = sp;
} break;
case 'f': { // just %f, behaves like %.3f
double valf = va_arg(ap, double);
const char *sign = "";
if (valf < 0) { sign = "-"; valf *= -1; }
uint64_t vali = valf * 1000 + 0.5; // larger numbers, NaNs, ... are not handled
strp += sigsafe_format(strp, stre-strp, "%s%u.%03u", sign, vali / 1000, vali % 1000);
append_str = "";
} break;
case 'r': { // just %r, takes (struct sockaddr *)
struct sockaddr *addr = va_arg(ap, void *);
switch (addr->sa_family) {
case AF_UNIX:
append_str = ((struct sockaddr_un *)addr)->sun_path;
break;
case AF_INET: {
struct sockaddr_in *addr4 = (struct sockaddr_in *)addr;
uint8_t *ipv4 = (uint8_t *)&(addr4->sin_addr);
strp += sigsafe_format(strp, stre-strp, "%u.%u.%u.%u#%u", ipv4[0], ipv4[1], ipv4[2], ipv4[3], addr4->sin_port);
append_str = "";
} break;
case AF_INET6: {
struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr;
uint8_t *ipv6 = (uint8_t *)&(addr6->sin6_addr);
int mzb = -2, mze = 0; // maximal zero-filled gap begin (incl.) and end (excl.)
{ // find longest gap
int zb = 0, ze = 0;
for (size_t i = 0; i < 16; i += 2) {
if (!ipv6[i] && !ipv6[i+1]) {
if (i == ze) {
ze += 2;
} else {
if (ze - zb > mze - mzb) {
mzb = zb; mze = ze;
}
zb = i; ze = i + 2;
}
}
}
if (ze - zb > mze - mzb) {
mzb = zb; mze = ze;
}
}
for (int i = -!mzb; i < 15; i++) {
if (i == mzb) i = mze - 1; // after ':' (possibly for i=-1), skip sth. and continue with ':' (possibly for i=15)
if (i%2) {
if (strp < stre) *strp++ = ':';
} else {
strp += sigsafe_format(strp, stre-strp, "%x", (ipv6[i] << 8) | ipv6[i+1]);
}
}
strp += sigsafe_format(strp, stre-strp, "#%u", addr6->sin6_port);
append_str = "";
} break;
}
} break;
default:
if (('0' <= *fmt) && (*fmt <= '9')) {
if ((mod_int == 0) && (*fmt == '0')) {
mod_zero = true;
} else {
mod_int = mod_int * 10 + *fmt - '0';
}
} else {
append_str = "[ERR]";
}
break;
}
}

// append to str (without \0)
append_len = MIN(append_len >= 0 ? append_len : strlen(append_str), stre-strp-1);
memcpy(strp, append_str, append_len);
strp += append_len;
}
*strp = '\0';
va_end(ap);
return strp-str;
}

#define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__)

// Uses NON-STANDARD format string, see sigsafe_format above.
#define SIGSAFE_LOG(max_size, ...) { \
char msg[max_size]; \
int len = sigsafe_format(msg, sizeof(msg), "[defer ] "__VA_ARGS__); \
write(kr_log_target == LOG_TARGET_STDOUT ? 1 : 2, msg, len); \
}
#define SIGSAFE_VERBOSE_LOG(max_size, ...) { \
if (kr_log_is_debug(DEFER, NULL)) { SIGSAFE_LOG(max_size, " | " __VA_ARGS__)}}


struct defer {
size_t capacity;
kru_price_t max_decay;
Expand Down Expand Up @@ -247,7 +385,8 @@ static inline int kru_charge_classify(const struct kru_conf *kru_conf, uint8_t *
uint16_t *out_load, uint8_t *out_prefix)
{
uint16_t loads[kru_conf->prefixes_cnt];
KRU.load_multi_prefix((struct kru *)defer->kru, kr_now(),
const uint64_t now = kr_now(); // NOLINT(bugprone-signal-handler), uv_now only reads uint64_t
KRU.load_multi_prefix((struct kru *)defer->kru, now,
kru_conf->namespace, key, kru_conf->prefixes, prices, kru_conf->prefixes_cnt, loads);

int priority = 0;
Expand Down Expand Up @@ -299,10 +438,16 @@ void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)

uint16_t load;
uint8_t prefix;
if (nsec > 500 * 1000) {
uv_update_time(uv_default_loop()); // NOLINT(bugprone-signal-handler)
// on Linux, it just calls clock_gettime(CLOCK_MONOTONIC[_COARSE], ...) and sets value for uv_now (kr_now);
// libuv probably updates time just once per loop by itself
}
kru_charge_classify(kru_conf, key, prices, &load, &prefix);

VERBOSE_LOG(" %s ADD %4.3f ms * %.2f -> load: %d on /%d\n",
kr_straddr(&addr->ip), nsec / 1000000.0, pf16 / (float)(1<<16), load, prefix);
SIGSAFE_VERBOSE_LOG(KR_STRADDR_MAXLEN + 100,
" %r ADD %f ms * %f -> load: %u on /%u\n",
&addr->ip, nsec / 1000000.0, pf16 / (float)(1<<16), load, prefix);
}

/// Determine priority of the request in [0, QUEUES_CNT - 1];
Expand Down Expand Up @@ -651,21 +796,22 @@ static void defer_alarm(int signum)
uint64_t elapsed = 0;
if (defer_sample_state.is_accounting) {
elapsed = defer_get_stamp() - defer_sample_state.stamp;
VERBOSE_LOG("SIGALRM %s, host %s used %.3f s of cpu time on ongoing operation\n",
SIGSAFE_VERBOSE_LOG(KR_STRADDR_MAXLEN + 100,
"SIGALRM %s, host %r used %f s of cpu time on ongoing operation.\n",
signum ? "received" : "initialized",
kr_straddr(&defer_sample_state.addr.ip), elapsed / 1000000000.0); // XXX
&defer_sample_state.addr.ip, elapsed / 1000000000.0);
} else {
VERBOSE_LOG("SIGALRM %s, no measuring in progress\n",
SIGSAFE_VERBOSE_LOG(KR_STRADDR_MAXLEN + 100,
"SIGALRM %s, no measuring in progress.\n",
signum ? "received" : "initialized");
}
int64_t rest_to_timeout_ms = defer->hard_timeout - elapsed / 1000000; // ms - ns

if (rest_to_timeout_ms <= 0) {
uv_update_time(uv_default_loop()); // TODO more conceptual solution?
defer_charge(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
kr_log_crit(DEFER, "Host %s used %0.3f s of cpu time continuously, interrupting cresd.\n",
kr_straddr(&defer_sample_state.addr.ip), elapsed / 1000000000.0);
classify(&defer_sample_state.addr, defer_sample_state.stream); // XXX
__sync_synchronize();
SIGSAFE_LOG(KR_STRADDR_MAXLEN + 100,
"Host %r used %f s of cpu time continuously, interrupting kresd.\n",
&defer_sample_state.addr.ip, elapsed / 1000000000.0);
abort();
}
alarm((rest_to_timeout_ms + 999) / 1000);
Expand Down

0 comments on commit c6339da

Please sign in to comment.