Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bspark/bulk bop insert #235

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
5 changes: 4 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ memcached_SOURCES = \
cmdlog.h \
lqdetect.c \
lqdetect.h \
trace.h
trace.h \
cmd_in_second.c \
cmd_in_second.h

memcached_LDFLAGS =-R '$(libdir)'
memcached_CFLAGS = @PROFILER_FLAGS@ ${AM_CFLAGS}
memcached_DEPENDENCIES = libmcd_util.la
Expand Down
313 changes: 313 additions & 0 deletions cmd_in_second.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
#include "cmd_in_second.h"
#include "include/memcached/extension.h"
#include <string.h>
#include <stdint.h>
#include <assert.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/uio.h>
#include <pthread.h>
#include <stdlib.h>
#include <fcntl.h>

#define LOG_LENGTH 400
#define IP_LENGTH 16
#define KEY_LENGTH 256

typedef enum cmd_in_second_state {
NOT_STARTED,
ON_LOGGING,
ON_FLUSHING
} state;

typedef struct cmd_in_second_log {
char key[KEY_LENGTH];
char client_ip[IP_LENGTH];
} logtype;

typedef struct cmd_in_second_buffer {
logtype* ring;
int32_t front;
int32_t rear;
int32_t capacity;
} buffertype;

typedef struct cmd_in_second_timer {
struct timeval* ring;
int32_t front;
int32_t rear;
int32_t capacity;
int32_t last_elem_idx;
int32_t circular_counter;
} timertype;

struct cmd_in_second {
char cmd[20];
char collection_name[10];
struct cmd_in_second_buffer buffer;
timertype timer;
int32_t bulk_limit;
int32_t log_per_timer;
state cur_state;
pthread_mutex_t lock;
};

static struct cmd_in_second this;

static bool is_bulk_cmd()
{
const bool timer_empty = this.timer.front == this.timer.rear;

if (timer_empty) {
return false;
}

const struct timeval* front_time = &this.timer.ring[this.timer.front];
const struct timeval* last_time = &this.timer.ring[this.timer.last_elem_idx];

//printf("%d\n", this.timer.last_elem_idx);
//assert(0);

return last_time->tv_sec - front_time->tv_sec <= 1;
}

static void get_whole_cmd(char* whole_cmd)
{
if (strlen(this.collection_name)) {
snprintf(whole_cmd, 20, "%s %s", this.collection_name, this.cmd);
return;
}
snprintf(whole_cmd, 15, "%s", this.cmd);
}

static bool buffer_empty()
{
return this.buffer.front == this.buffer.rear;
}

static void* buffer_flush_thread()
{
const int32_t fd = open("cmd_in_second.log", O_CREAT | O_WRONLY | O_TRUNC, 0644);

if (fd < 0) {
perror("Can't open cmd_in_second log file: cmd_in_second.log");
return NULL;
}

buffertype* const buffer = &this.buffer;
timertype* const timer = &this.timer;

char* log_str = (char*)malloc(LOG_LENGTH * this.bulk_limit * sizeof(char));

if (log_str == NULL) {
perror("Can't allocate memory");
return NULL;
}

char whole_cmd[20] = "";
get_whole_cmd(whole_cmd);

const size_t whole_cmd_len = strlen(whole_cmd);
const int32_t whitespaces = 3;

size_t expected_write_length = 0;
int32_t circular_log_counter = 0;

while (!buffer_empty()) {

const logtype front = buffer->ring[buffer->front];
buffer->front = (buffer->front+1) % buffer->capacity;

char time_str[50] = "";

if (circular_log_counter == 0) {

const struct timeval* front_time = &timer->ring[timer->front];
const struct tm* lt = localtime((time_t*)&front_time->tv_sec);

timer->front = (timer->front+1) % timer->capacity;

if (lt == NULL) {
perror("localtime failed");
continue;
}

sprintf(time_str, "%04d-%02d-%02d %02d:%02d:%02d.%06d\n", lt ->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday,
lt->tm_hour, lt->tm_min, lt->tm_sec, (int32_t)front_time->tv_usec);
expected_write_length += 27;
}

char log[LOG_LENGTH] = "";
snprintf(log, LOG_LENGTH, "%s%s %s %s\n", time_str, whole_cmd, front.key, front.client_ip);
strncat(log_str, log, LOG_LENGTH);

expected_write_length += whole_cmd_len + strlen(front.key) + strlen(front.client_ip) + whitespaces;
circular_log_counter = (circular_log_counter+1) % this.log_per_timer;
}

if (write(fd, log_str, expected_write_length) != expected_write_length) {
perror("write length is difference to expectation.");
}

close(fd);

free(log_str);

free(this.timer.ring);
this.timer.ring = NULL;

free(this.buffer.ring);
this.buffer.ring = NULL;

this.cur_state = NOT_STARTED;

return NULL;
}

static int32_t buffer_flush()
{

pthread_t tid;
pthread_attr_t attr;

int32_t ret = 0;

if (pthread_attr_init(&attr) != 0 ||
computerphilosopher marked this conversation as resolved.
Show resolved Hide resolved
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
(ret = pthread_create(&tid, &attr, buffer_flush_thread, NULL)) != 0)
{
perror("Can't create buffer flush thread");
return ret;
}

return ret;
}

static void buffer_add(const logtype* log)
{

struct cmd_in_second_buffer* const buffer = &this.buffer;

buffer->ring[buffer->rear] = *log;
buffer->rear = (buffer->rear+1) % buffer->capacity;

const bool buffer_full = (buffer->rear+1) % buffer->capacity == buffer->front;

if (buffer_full) {
if (is_bulk_cmd()) {
this.cur_state = ON_FLUSHING;
buffer_flush();
return;
}
buffer->front = (buffer->front+1) % buffer->capacity;
}
}

static void timer_add()
{
struct cmd_in_second_timer* const timer = &this.timer;

const bool timer_full = (timer->rear+1) % timer->capacity == timer->front;

if (timer_full) {
timer->front = (timer->front+1) % timer->capacity;
}

if (gettimeofday(&timer->ring[timer->rear], NULL) == -1) {
perror("gettimeofday failed");
return;
};

timer->last_elem_idx = timer->rear;
timer->rear = (timer->rear+1) % timer->capacity;
}

static bool is_cmd_to_log(const char* collection_name, const char* cmd)
{
return strcmp(this.collection_name, collection_name) == 0 &&
strcmp(this.cmd, cmd) == 0;
}

bool cmd_in_second_write(const char* collection_name, const char* cmd,
const char* key, const char* client_ip)
{
pthread_mutex_lock(&this.lock);
if (this.cur_state != ON_LOGGING || !is_cmd_to_log(collection_name, cmd)) {
pthread_mutex_unlock(&this.lock);
return false;
}

logtype log = {"", ""};
snprintf(log.client_ip, IP_LENGTH, "%s", client_ip);
snprintf(log.key, KEY_LENGTH, "%s", key);

if (this.timer.circular_counter == 0) {
timer_add();
}

buffer_add(&log);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

지금하는 방식으로 buffer에 log 삽입하면 데이터 복제가 추가적으로 일어나요.
위에서 log 선언해서 로컬 변수로 log 구조체를 만들었는데 해당 구조체를 buffer에 삽입할 때 다시 한번 복제하고 있어요. 이렇게 log 구조체를 미리 만들고 그것을 복사하여 버퍼에 넣는 것보다는, 이미 buffer에 공간이 있기 때문에 그 공간만을 가져와서 해당 공간에다가 바로 데이터를 복제하는 편이 맞아요.

this.timer.circular_counter = (this.timer.circular_counter+1) % this.log_per_timer;

pthread_mutex_unlock(&this.lock);
return true;
}

void cmd_in_second_init()
computerphilosopher marked this conversation as resolved.
Show resolved Hide resolved
{
assert("test");
computerphilosopher marked this conversation as resolved.
Show resolved Hide resolved
this.cur_state = NOT_STARTED;
pthread_mutex_init(&this.lock, NULL);

this.buffer.front = 0;
computerphilosopher marked this conversation as resolved.
Show resolved Hide resolved
this.buffer.rear = 0;
this.buffer.ring = NULL;

this.timer.front = 0;
this.timer.rear = 0;
this.timer.capacity = 0;
this.timer.circular_counter = 0;
this.timer.last_elem_idx = 0;
this.timer.ring = NULL;
}

int32_t cmd_in_second_start(const char* collection_name, const char* cmd,
computerphilosopher marked this conversation as resolved.
Show resolved Hide resolved
const int32_t bulk_limit)
{

pthread_mutex_lock(&this.lock);

if (this.cur_state != NOT_STARTED) {
pthread_mutex_unlock(&this.lock);
return CMD_IN_SECOND_STARTED_ALREADY;
}

this.bulk_limit = bulk_limit;

this.buffer.capacity = bulk_limit+1;
this.buffer.ring = (logtype*)malloc(this.buffer.capacity * sizeof(logtype));

if (this.buffer.ring == NULL) {
pthread_mutex_unlock(&this.lock);
return CMD_IN_SECOND_NO_MEM;
}

this.log_per_timer = bulk_limit / 10 + (bulk_limit % 10 != 0);
this.timer.capacity = this.log_per_timer + 1;

this.timer.ring = (struct timeval*)malloc(this.timer.capacity * sizeof(struct timeval));

if (this.timer.ring == NULL) {
free(this.buffer.ring);
pthread_mutex_unlock(&this.lock);
return CMD_IN_SECOND_NO_MEM;
}

snprintf(this.collection_name, 5, "%s", collection_name);
snprintf(this.cmd, 15, "%s", cmd);

this.cur_state = ON_LOGGING;

pthread_mutex_unlock(&this.lock);

return CMD_IN_SECOND_START;
}
15 changes: 15 additions & 0 deletions cmd_in_second.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef __CMD_IN_SECOND_
#define __CMD_IN_SECOND_
#endif

#include <stdbool.h>
#include <stdint.h>
#include <sys/time.h>

#define CMD_IN_SECOND_START 0
#define CMD_IN_SECOND_STARTED_ALREADY 1
#define CMD_IN_SECOND_NO_MEM 2

void cmd_in_second_init(void);
int32_t cmd_in_second_start(const char* collection_name, const char* cmd, const int32_t bulk_limit);
bool cmd_in_second_write(const char* collection_name, const char* cmd, const char* key, const char* client_ip);
2 changes: 1 addition & 1 deletion include/memcached/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct iovec {
#define SUPPORT_BOP_SMGET
#define JHPARK_OLD_SMGET_INTERFACE
#define MAX_EFLAG_COMPARE_COUNT 100

#define CMD_IN_SECOND 1
computerphilosopher marked this conversation as resolved.
Show resolved Hide resolved

#ifdef __cplusplus
extern "C" {
Expand Down
Loading