Skip to content

Commit f2adbaa

Browse files
committed
datastruct: add async_queue
1 parent bd11f6f commit f2adbaa

File tree

3 files changed

+150
-1
lines changed

3 files changed

+150
-1
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ CC=gcc
2020
CFLAGS=-Wall -pedantic -Ofast -std=gnu11 -I ./include -isystem ./deps/netmap/sys
2121
LDFLAGS=-lJudy
2222
SOURCES=src/eth.c src/exotcp.c src/http11.c src/log.c src/netmap.c src/parser.c \
23-
src/datastruct/hash.c src/datastruct/list.c \
23+
src/datastruct/hash.c src/datastruct/list.c src/datastruct/async_queue.c \
2424
src/exotcp/arp.c src/exotcp/checksum.c src/exotcp/eth.c src/exotcp/icmp.c src/exotcp/ip.c src/exotcp/tcp.c
2525
PARSER=src/parser.c
2626
OBJECTS=$(SOURCES:.c=.o)

include/eth/datastruct/async_queue.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (C) 2015 jibi <[email protected]>
3+
*
4+
* This program is free software; you can redistribute it and/or
5+
* modify it under the terms of the GNU General Public License
6+
* as published by the Free Software Foundation; either version 2
7+
* of the License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program; if not, write to the Free Software
16+
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17+
*/
18+
19+
#ifndef _ETH_DATASTRUCT_ASYNC_QUEUE
20+
#define _ETH_DATASTRUCT_ASYNC_QUEUE
21+
22+
23+
#include <stdint.h>
24+
#include <pthread.h>
25+
26+
typedef struct async_queue_s {
27+
void **items;
28+
29+
uint32_t begin;
30+
uint32_t end;
31+
32+
uint32_t count;
33+
uint32_t size;
34+
35+
pthread_mutex_t rw_lock;
36+
pthread_cond_t full_cond;
37+
} async_queue_t;
38+
39+
async_queue_t *async_queue_new();
40+
void async_queue_push(async_queue_t *q, void *item);
41+
void *async_queue_pop(async_queue_t *q);
42+
43+
#endif

src/datastruct/async_queue.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (C) 2015 jibi <[email protected]>
3+
*
4+
* This program is free software; you can redistribute it and/or
5+
* modify it under the terms of the GNU General Public License
6+
* as published by the Free Software Foundation; either version 2
7+
* of the License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program; if not, write to the Free Software
16+
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17+
*/
18+
19+
#include <stdio.h>
20+
#include <stdlib.h>
21+
#include <string.h>
22+
#include <stdint.h>
23+
#include <stdbool.h>
24+
25+
#include <pthread.h>
26+
27+
#include <eth/datastruct/async_queue.h>
28+
29+
#define ASYNC_QUEUE_INIT_SIZE 2048
30+
31+
static inline void async_queue_grow(async_queue_t *q);
32+
33+
async_queue_t *
34+
async_queue_new()
35+
{
36+
async_queue_t *q = malloc(sizeof(async_queue_t));
37+
38+
q->size = ASYNC_QUEUE_INIT_SIZE;
39+
q->items = malloc(q->size * sizeof(void *));
40+
41+
q->count = 0;
42+
q->begin = 0;
43+
q->end = 0;
44+
45+
pthread_mutex_init(&q->rw_lock, NULL);
46+
pthread_cond_init(&q->full_cond, NULL);
47+
48+
return q;
49+
}
50+
51+
void
52+
async_queue_push(async_queue_t *q, void *item)
53+
{
54+
bool empty;
55+
pthread_mutex_lock(&q->rw_lock);
56+
57+
empty = q->count == 0;
58+
59+
if (q->count == q->size) {
60+
async_queue_grow(q);
61+
}
62+
63+
q->items[q->end] = item;
64+
q->end = (q->end + 1) % q->size;
65+
q->count++;
66+
67+
pthread_mutex_unlock(&q->rw_lock);
68+
69+
if (empty) {
70+
pthread_cond_broadcast(&q->full_cond);
71+
}
72+
}
73+
74+
void *
75+
async_queue_pop(async_queue_t *q)
76+
{
77+
void *item;
78+
79+
pthread_mutex_lock(&q->rw_lock);
80+
81+
while (q->count == 0) {
82+
pthread_cond_wait(&q->full_cond, &q->rw_lock);
83+
}
84+
85+
item = q->items[q->begin];
86+
q->begin = (q->begin + 1) % q->size;
87+
q->count--;
88+
89+
pthread_mutex_unlock(&q->rw_lock);
90+
91+
return item;
92+
}
93+
94+
static inline
95+
void
96+
async_queue_grow(async_queue_t *q)
97+
{
98+
uint32_t new_size = q->size * 2;
99+
void **new_items = malloc(new_size * sizeof(void **));
100+
101+
memcpy(new_items, q->items, q->size);
102+
free(q->items);
103+
104+
q->items = new_items;
105+
q->size = new_size;
106+
}

0 commit comments

Comments
 (0)