Skip to content

Commit

Permalink
code tag
Browse files Browse the repository at this point in the history
  • Loading branch information
computerphilosopher committed Aug 5, 2019
1 parent 42d796a commit 8b997d7
Show file tree
Hide file tree
Showing 6 changed files with 650 additions and 98 deletions.
168 changes: 101 additions & 67 deletions cmd_in_second.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,69 @@
#include <stdlib.h>
#include <fcntl.h>

#define LOG_PER_TIMER 500
#define LOG_LENGTH 400
#define IP_LENGTH 16
#define KEY_LENGTH 256

typedef enum cmd_in_second_state{
typedef enum cmd_in_second_state {
NOT_STARTED,
ON_LOGGING,
ON_FLUSHING
}state;
} state;

typedef struct cmd_in_second_log{
typedef struct cmd_in_second_log {
char key[KEY_LENGTH];
char client_ip[IP_LENGTH];
int32_t timer_idx;
}logtype;
} logtype;

typedef struct cmd_in_second_buffer {
logtype *ring;
int32_t front;
int32_t rear;
int32_t capacity;
}buffertype;
} buffertype;

typedef struct cmd_in_second_timer{
struct timeval* times;
int32_t size;
int32_t counter;
}timertype;
typedef struct cmd_in_second_timer {
struct timeval* ring;
int32_t front;
int32_t rear;
int32_t capacity;
int32_t circular_counter;
} timertype;

struct cmd_in_second {
char cmd[10];
char collection_name[4];
char cmd[20];
char collection_name[10];
struct cmd_in_second_buffer buffer;
int32_t bulk_limit;
timertype timer;
int32_t bulk_limit;
int32_t log_per_timer;
state cur_state;
};

static EXTENSION_LOGGER_DESCRIPTOR *mc_logger;
static struct cmd_in_second this;

static bool is_bulk_cmd()
{
const logtype* front = &this.buffer.ring[this.buffer.front];
const logtype* rear = &this.buffer.ring[this.buffer.rear];
const bool timer_empty = this.timer.front == this.timer.rear;

struct timeval front_time = this.timer.times[front->timer_idx];
struct timeval rear_time = this.timer.times[rear->timer_idx];
if (timer_empty) {
return false;
}

const struct timeval *front_time = &this.timer.ring[this.timer.front];
const struct timeval *rear_time = &this.timer.ring[this.timer.rear];

return rear_time.tv_sec - front_time.tv_sec <= 1;
return rear_time->tv_sec - front_time->tv_sec <= 1;
}

static void get_whole_cmd(char* whole_cmd)
{
if (strlen(this.collection_name)) {
sprintf(whole_cmd, "%s %s", this.collection_name, this.cmd);
snprintf(whole_cmd, 20, "%s %s", this.collection_name, this.cmd);
return;
}
sprintf(whole_cmd, "%s", this.cmd);
snprintf(whole_cmd, 15, "%s", this.cmd);
}

static bool buffer_empty()
Expand All @@ -79,64 +82,74 @@ static bool buffer_empty()

static void* buffer_flush_thread()
{
int32_t fd = open("cmd_in_second.log", O_CREAT | O_WRONLY | O_TRUNC, 0644);
const int32_t fd = open("cmd_in_second.log", O_CREAT | O_WRONLY | O_TRUNC, 0644);

if (fd < 0) {
mc_logger->log(EXTENSION_LOG_WARNING, NULL,
"Can't open cmd_in_second log file: %s\n", "cmd_in_second.log");
perror("Can't open cmd_in_second log file: cmd_in_second.log");
return NULL;
}

char whole_cmd[20] = "";
get_whole_cmd(whole_cmd);

buffertype* buffer = &this.buffer;
timertype* timer = &this.timer;

int32_t timer_idx = -1;

char* log_str = (char*)malloc(LOG_LENGTH * this.bulk_limit * sizeof(char));

if (log_str == NULL) {
mc_logger->log(EXTENSION_LOG_WARNING, NULL, "Can't allocate memory");
perror("Can't allocate memory");
return NULL;
}

int32_t expected_write_length = 0;
char whole_cmd[20] = "";
get_whole_cmd(whole_cmd);

const size_t whole_cmd_len = strlen(whole_cmd);
const int32_t whitespaces = 3;

size_t expected_write_length = 0;
int32_t circular_log_counter = 0;

while (!buffer_empty()) {

logtype front = buffer->ring[buffer->front++];
const logtype front = buffer->ring[buffer->front];
buffer->front = (buffer->front+1) % buffer->capacity;

char time_str[50] = "";

if (front.timer_idx != timer_idx) {
if (circular_log_counter == 0) {

const struct timeval* front_time = &timer->times[front.timer_idx];
const struct timeval* front_time = &timer->ring[timer->front];
const struct tm *lt = localtime((time_t*)&front_time->tv_sec);

sprintf(time_str, "%04d-%02d-%02d %02d:%02d:%02d.%06d\n", lt ->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, lt->tm_sec, (int32_t)front_time->tv_usec);
timer_idx = front.timer_idx;
timer->front = (timer->front+1) % timer->capacity;

if (lt == NULL) {
perror("localtime failed");
continue;
}

sprintf(time_str, "%04d-%02d-%02d %02d:%02d:%02d.%06d\n", lt ->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday,
lt->tm_hour, lt->tm_min, lt->tm_sec, (int32_t)front_time->tv_usec);
expected_write_length += 27;
}

char log[LOG_LENGTH] = "";
sprintf(log, "%s%s %s %s\n", time_str, whole_cmd, front.key, front.client_ip);
snprintf(log, LOG_LENGTH, "%s%s %s %s\n", time_str, whole_cmd, front.key, front.client_ip);
strncat(log_str, log, LOG_LENGTH);

expected_write_length += LOG_LENGTH;
expected_write_length += whole_cmd_len + strlen(front.key) + strlen(front.client_ip) + whitespaces;
circular_log_counter = (circular_log_counter+1) % this.log_per_timer;
}

if (write(fd, log_str, expected_write_length) != expected_write_length) {
mc_logger->log(EXTENSION_LOG_WARNING, NULL, "write length is difference to expectation.");
perror("write length is difference to expectation.");
}

close(fd);

free(log_str);

free(this.timer.times);
this.timer.times = NULL;
free(this.timer.ring);
this.timer.ring = NULL;

free(this.buffer.ring);
this.buffer.ring = NULL;
Expand All @@ -148,19 +161,19 @@ static void* buffer_flush_thread()

static int32_t buffer_flush()
{

this.cur_state = ON_FLUSHING;

int32_t ret = 0;
pthread_t tid;
pthread_attr_t attr;

int32_t ret = 0;

if (pthread_attr_init(&attr) != 0 ||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
(ret = pthread_create(&tid, &attr, buffer_flush_thread, NULL)) != 0)
{
mc_logger->log(EXTENSION_LOG_WARNING, NULL,
"Can't create buffer flush thread: %s\n", strerror(ret));
perror("Can't create buffer flush thread");
return ret;
}

return ret;
Expand All @@ -171,6 +184,9 @@ static void buffer_add(const logtype* log)

struct cmd_in_second_buffer* buffer = &this.buffer;

buffer->ring[buffer->rear] = *log;
buffer->rear = (buffer->rear+1) % buffer->capacity;

const bool buffer_full = (buffer->rear+1) % buffer->capacity == buffer->front;

if (buffer_full) {
Expand All @@ -181,41 +197,55 @@ static void buffer_add(const logtype* log)
buffer->front = (buffer->front+1) % buffer->capacity;
}

buffer->ring[buffer->rear] = *log;
buffer->rear = (buffer->rear+1) % buffer->capacity;
}

static void timer_add()
{
struct cmd_in_second_timer* timer = &this.timer;

const bool timer_full = (timer->rear+1) % timer->capacity == timer->front;

if (timer_full) {
timer->front = (timer->front+1) % timer->capacity;
}

if (gettimeofday(&timer->ring[timer->rear], NULL) == -1) {
perror("gettimeofday failed");
return;
};

timer->rear = (timer->rear+1) % timer->capacity;
}

static bool is_cmd_to_log(const char* collection_name, const char* cmd)
{
return strcmp(this.collection_name, collection_name) == 0 && strcmp(this.cmd, cmd) == 0;
return strcmp(this.collection_name, collection_name) == 0 &&
strcmp(this.cmd, cmd) == 0;
}

bool cmd_in_second_write(const char* collection_name, const char* cmd, const char* key, const char* client_ip)
bool cmd_in_second_write(const char* collection_name, const char* cmd,
const char* key, const char* client_ip)
{
if (this.cur_state != ON_LOGGING || !is_cmd_to_log(collection_name, cmd)) {
return false;
}

timertype *timer = &this.timer;

logtype log = {"", "", 0};
logtype log = {"", ""};
snprintf(log.client_ip, IP_LENGTH, "%s", client_ip);
snprintf(log.key, KEY_LENGTH, "%s", key);

if (timer->counter == 0) {
timer->size++;
gettimeofday(&timer->times[timer->size-1], NULL);
if (this.timer.circular_counter == 0) {
timer_add();
}

log.timer_idx = timer->size-1;

buffer_add(&log);
timer->counter = (timer->counter+1) % LOG_PER_TIMER;
this.timer.circular_counter = (this.timer.circular_counter+1) % this.log_per_timer;

return true;
}

int32_t cmd_in_second_start(const char* collection_name, const char* cmd, const int32_t bulk_limit)
int32_t cmd_in_second_start(const char* collection_name, const char* cmd,
const int32_t bulk_limit)
{

if (this.cur_state != NOT_STARTED) {
Expand All @@ -233,17 +263,21 @@ int32_t cmd_in_second_start(const char* collection_name, const char* cmd, const
return CMD_IN_SECOND_NO_MEM;
}

this.timer.size = 0;
this.timer.counter = 0;
this.timer.times = (struct timeval*)malloc(bulk_limit * sizeof(struct timeval));
this.log_per_timer = bulk_limit / 10 + (bulk_limit % 10 != 0);
this.timer.capacity = this.log_per_timer + 1;
this.timer.front = 0;
this.timer.rear = 0;
this.timer.circular_counter = 0;

this.timer.ring = (struct timeval*)malloc(this.timer.capacity * sizeof(struct timeval));

if (this.timer.times == NULL) {
if (this.timer.ring == NULL) {
free(this.buffer.ring);
return CMD_IN_SECOND_NO_MEM;
}

sprintf(this.collection_name, "%s", collection_name);
sprintf(this.cmd, "%s", cmd);
snprintf(this.collection_name, 5, "%s", collection_name);
snprintf(this.cmd, 15, "%s", cmd);

this.cur_state = ON_LOGGING;

Expand Down
2 changes: 1 addition & 1 deletion include/memcached/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct iovec {
#define SUPPORT_BOP_SMGET
#define JHPARK_OLD_SMGET_INTERFACE
#define MAX_EFLAG_COMPARE_COUNT 100

#define CMD_IN_SECOND 1

#ifdef __cplusplus
extern "C" {
Expand Down
Loading

0 comments on commit 8b997d7

Please sign in to comment.