Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ add_subdirectory(libvswitch)
add_subdirectory(libfdtgen)
add_subdirectory(libtx2bpmp)
add_subdirectory(libplatsupportports)
add_subdirectory(libsharedringbuffer)
26 changes: 26 additions & 0 deletions libsharedringbuffer/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright 2022, UNSW
# SPDX-License-Identifier: BSD-2-Clause
#

cmake_minimum_required(VERSION 3.7.2)

project(libsharedringbuffer C)

set(configure_string "")

config_string(
LibsharedringbufferDescCount
LIB_SHARED_RINGBUFFER_DESC_COUNT
"Number of buffer descriptors in the ring buffer"
DEFAULT
512
UNQUOTE
)

mark_as_advanced(LibsharedringbufferDescCount)
add_config_library(shared_ringbuffer "${configure_string}")

add_library(shared_ringbuffer STATIC EXCLUDE_FROM_ALL src/shared_ringbuffer.c)
target_include_directories(shared_ringbuffer PUBLIC include)
target_link_libraries(shared_ringbuffer shared_ringbuffer_Config muslc utils sel4utils)
66 changes: 66 additions & 0 deletions libsharedringbuffer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<!--
Copyright 2022, UNSW
SPDX-License-Identifier: CC-BY-SA-4.0
-->

libsharedringbuffer
-------------------

This directory contains a library implementation of shared ring
buffers for the transportation of data. This is intended to be used as a
communication mechanism between system components for bulk data transfer,
and was originally created as a data plane between an ethernet driver and
network stack for the sDDF. This library doesn't contain any code that
interfaces with seL4. It is expected that the user will provide shared
memory regions and notification/signalling handlers to this library.

To use this library in a project you can link `shared_ringbuffer` in your
target applications CMake configuration.

This libary is intended to be used by both a producer and a consumer. For
example, an ethernet driver produces data for a network stack to consume.
2 separate shared memory regions are required for each ring handle; one
to store available buffers and one to store used buffers. Each ring buffer
contains a separate read index and write index. The reader only ever
increments the read index, and the writer the write index. As read and
writes of a small integer are atomic, we can keep memory consistent
without locks.
The size of the ring buffers can be set with the cmake config option,
`LIB_SHARED_RINGBUFFER_DESC_COUNT` but defaults to 512. The user must
ensure that the shared memory regions handed to the library are of
appropriate size to match this.

Use case
---------

This library is intended to be used with a separate shared memory region,
usually allocated for DMA for a driver. The ring buffers can then contain
pointers into this shared memory, indicating which buffers are in use or
available to be used by either component.
Typically, 2 shared ring buffers are required, with separate structures
required on the recieve path and transmit path. Thus there are 4 regions
of shared memory required: 1 storing pointers to available RX buffers,
1 storing pointers to used RX buffers, 1 storing pointers to TX
buffers, and another storing pointers to available TX buffers.

On initialisation, both the producer and consumer should allocate their
own ring handles (`struct ring_handle`). These data structures simply
store pointers to the actual shared memory regions and are used to
interface with this library. The ring handle should then be passed into
`ring_init` along with 2 shared memory regions, an optional function
pointer to signal the other component, and either 1 or 0 to indicate
whether the read/write indices need to be initialised (note that only one
side of the shared memory regions needs to do this).

After initialisation, a typical use case would look like:
The driver wants to add some buffer that will be read by another component
(for example, a network stack processing incoming packets).

1. The driver dequeues a pointer to an available buffer from the
available ring.
2. Once data is inserted into the buffer (eg. via DMA), the driver
then enqueues the pointer to it into the used ring.
3. The driver can then notify the reciever.
4. Similarly, the reciever dequeues the pointer from the used ring,
processes the data, and once finished, can enqueue it back into
the available ring to be used once more by the driver.
231 changes: 231 additions & 0 deletions libsharedringbuffer/include/shared_ringbuffer/shared_ringbuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Copyright 2022, UNSW
*
* SPDX-License-Identifier: BSD-2-Clause
*/

#pragma once

#include <stdint.h>
#include <stddef.h>
#include <sel4utils/sel4_zf_logif.h>
#include <utils/util.h>
#include <shared_ringbuffer/gen_config.h>

/* Function pointer to be used to 'notify' components on either end of the shared memory */
typedef void (*notify_fn)(void);
Copy link
Member

@axel-h axel-h Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this get a context parameter for convenience reasons?


/* Buffer descriptor */
typedef struct buff_desc {
uintptr_t encoded_addr; /* encoded dma addresses */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is agnostic of what this memory is. IN the end, this could also be a void* in the end.

unsigned int len; /* associated memory lengths */
Copy link
Member

@axel-h axel-h Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use size_t here, so it scales with the architecture. Unless there is a strong reason to fix the structure size across architectures.

void *cookie; /* index into client side metadata */
} buff_desc_t;

/* Circular buffer containing descriptors */
typedef struct ring_buffer {
uint32_t write_idx;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be size_t also and not fix is to 32-bit (even is 32-bit might be practically sufficient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that size_t would be reserved for lengths/sizes of things. This is just an index into the array

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For portable code, my rule of thumb usually is, that it's either unsigned int (as C's native integer) or size_t (to support the maximum range, which might be more than unsigned int covers) then. If specific fix-size types are used, this should have a good reason or some explanation that we want to ensure something - but this would also become compile time asserts then.
When it comes to structures, a reason might be that the structure's memory layout is important because it get's serialized/deserialized and thus has to work across different architectures.

uint32_t read_idx;
buff_desc_t buffers[CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT];
} ring_buffer_t;

/* A ring handle for enqueing/dequeuing into */
typedef struct ring_handle {
ring_buffer_t *used_ring;
ring_buffer_t *avail_ring;
/* Function to be used to signal that work is queued in the used_ring */
notify_fn notify;
} ring_handle_t;

/**
* Initialise the shared ring buffer.
*
* @param ring ring handle to use.
* @param avail pointer to available ring in shared memory.
* @param used pointer to 'used' ring in shared memory.
* @param notify function pointer used to notify the other user.
* @param buffer_init 1 indicates the read and write indices in shared memory need to be initialised.
* 0 inidicates they do not. Only one side of the shared memory regions needs to do this.
*/
void ring_init(ring_handle_t *ring, ring_buffer_t *avail, ring_buffer_t *used, notify_fn notify, int buffer_init);

/**
* Check if the ring buffer is empty.
*
* @param ring ring buffer to check.
*
* @return true indicates the buffer is empty, false otherwise.
*/
static inline int ring_empty(ring_buffer_t *ring)
{
return !((ring->write_idx - ring->read_idx) % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have the modulo operation here? Isn't this simply:

return (ring->write_idx == ring->read_idx);

}

/**
* Check if the ring buffer is full
*
* @param ring ring buffer to check.
*
* @return true indicates the buffer is full, false otherwise.
*/
static inline int ring_full(ring_buffer_t *ring)
{
return !((ring->write_idx - ring->read_idx + 1) % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sam here, why the modulo operation, this will even work for overflows:

return CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT == (ring->write_idx - ring->read_idx);

}

/**
* Notify the other user of changes to the shared ring buffers.
*
* @param ring the ring handle used.
*
*/
static inline void notify(ring_handle_t *ring)
{
return ring->notify();
}

/**
* Enqueue an element to a ring buffer
*
* @param ring Ring buffer to enqueue into.
* @param buffer address into shared memory where data is stored.
* @param len length of data inside the buffer above.
* @param cookie optional pointer to data required on dequeueing.
*
* @return -1 when ring is empty, 0 on success.
*/
static inline int enqueue(ring_buffer_t *ring, uintptr_t buffer, unsigned int len, void *cookie)
{
if (ring_full(ring)) {
ZF_LOGE("Ring full");
return -1;
}

ring->buffers[ring->write_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT].encoded_addr = buffer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the code redundancy a bit (even if the compiler might optimize this anyway)

buff_desc_t *buff_desc = &(ring->buffers[ring->write_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT]);
buff_desc->encoded_addr = buffer
buff_desc->len = len;
buff_desc->cookie = cookie;

ring->buffers[ring->write_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT].len = len;
ring->buffers[ring->write_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT].cookie = cookie;
ring->write_idx++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that this increment isn't after the release barrier? Otherwise I think it could potentially complete before the writes to the buff_desc do and cause a reader to read too early.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, sorry i think this was an accident. Thanks for pointing it out!


THREAD_MEMORY_RELEASE();

return 0;
}

/**
* Dequeue an element to a ring buffer.
*
* @param ring Ring buffer to Dequeue from.
* @param buffer pointer to the address of where to store buffer address.
* @param len pointer to variable to store length of data dequeueing.
* @param cookie pointer optional pointer to data required on dequeueing.
*
* @return -1 when ring is empty, 0 on success.
*/
static inline int dequeue(ring_buffer_t *ring, uintptr_t *addr, unsigned int *len, void **cookie)
{
if (ring_empty(ring)) {
ZF_LOGF("Ring is empty");
return -1;
}

*addr = ring->buffers[ring->read_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT].encoded_addr;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

buff_desc_t *buff_desc = &(ring->buffers[ring->read_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT]);
*addr = buff_desc->encoded_addr;
...

*len = ring->buffers[ring->read_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT].len;
*cookie = ring->buffers[ring->read_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT].cookie;

THREAD_MEMORY_RELEASE();
ring->read_idx++;
Comment on lines +135 to +136
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it'd end up making a noticeable performance difference, but there's an option for these two lines to be combined into __atomic_store_n(&ring->read_idx, ring->read_idx+1, __ATOMIC_RELEASE); which allows the compiler to generate slightly different instructions on architectures like aarch64 where there is a store-release instruction (stlr) which means an additional dmb ish isn't required as shown in the small assembly examples below which are generated from aarch64-linux-gnu-gcc:

  4005a8:       d5033bbf        dmb     ish
  4005ac:       90000462        adrp    x2, 48c000 <object.0+0x28>
  .. <cut 2 unrelated instructions>
  4005b8:       b9400841        ldr     w1, [x2, #8]
  4005bc:       11000421        add     w1, w1, #0x1
  4005c0:       b9000841        str     w1, [x2, #8]

  4005b0:       b9400801        ldr     w1, [x0, #8]
  4005b4:       11000421        add     w1, w1, #0x1
  4005b8:       889ffc41        stlr    w1, [x2]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not really have a requirement for __atomic_store_n()? There is no race for incrementing the value, there is only one writer. Also, does this guarantee that all other writes are finish before also, or do we still need a barrier to play safe to enforce this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still a plain store, but with release semantics. So it's not allowed to be re-ordered with any memory operations that come before it in program-order.


return 0;
}

/**
* Enqueue an element into an available ring buffer.
* This indicates the buffer address parameter is currently available for use.
*
* @param ring Ring handle to enqueue into.
* @param buffer address into shared memory where data is stored.
* @param len length of data inside the buffer above.
* @param cookie optional pointer to data required on dequeueing.
*
* @return -1 when ring is empty, 0 on success.
*/
static inline int enqueue_avail(ring_handle_t *ring, uintptr_t addr, unsigned int len, void *cookie)
{
return enqueue(ring->avail_ring, addr, len, cookie);
}

/**
* Enqueue an element into a used ring buffer.
* This indicates the buffer address parameter is currently in use.
*
* @param ring Ring handle to enqueue into.
* @param buffer address into shared memory where data is stored.
* @param len length of data inside the buffer above.
* @param cookie optional pointer to data required on dequeueing.
*
* @return -1 when ring is empty, 0 on success.
*/
static inline int enqueue_used(ring_handle_t *ring, uintptr_t addr, unsigned int len, void *cookie)
{
return enqueue(ring->used_ring, addr, len, cookie);
}

/**
* Dequeue an element from an available ring buffer.
*
* @param ring Ring handle to dequeue from.
* @param buffer pointer to the address of where to store buffer address.
* @param len pointer to variable to store length of data dequeueing.
* @param cookie pointer optional pointer to data required on dequeueing.
*
* @return -1 when ring is empty, 0 on success.
*/
static inline int dequeue_avail(ring_handle_t *ring, uintptr_t *addr, unsigned int *len, void **cookie)
{
return dequeue(ring->avail_ring, addr, len, cookie);
}

/**
* Dequeue an element from a used ring buffer.
*
* @param ring Ring handle to dequeue from.
* @param buffer pointer to the address of where to store buffer address.
* @param len pointer to variable to store length of data dequeueing.
* @param cookie pointer optional pointer to data required on dequeueing.
*
* @return -1 when ring is empty, 0 on success.
*/
static inline int dequeue_used(ring_handle_t *ring, uintptr_t *addr, unsigned int *len, void **cookie)
{
return dequeue(ring->used_ring, addr, len, cookie);
}

/**
* Dequeue an element from a ring buffer.
* This function is intended for use by the driver, to collect a pointer
* into this structure to be passed around as a cookie.
*
* @param ring Ring buffer to dequeue from.
* @param addr pointer to the address of where to store buffer address.
* @param len pointer to variable to store length of data dequeueing.
* @param cookie pointer to store a pointer to this particular entry.
*
* @return -1 when ring is empty, 0 on success.
*/
static int driver_dequeue(ring_buffer_t *ring, uintptr_t *addr, unsigned int *len, void **cookie)
{
if (!((ring->write_idx - ring->read_idx) % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could simply use our helper function:

Suggested change
if (!((ring->write_idx - ring->read_idx) % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT)) {
if (ring_empty(ring) {

ZF_LOGW("write idx = %d, read idx = %d", ring->write_idx, ring->read_idx);
Copy link
Member

@axel-h axel-h Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a debug log printed afterwards (to have the context when reading the logs) or just merge into the next log.

ZF_LOGW("Ring is empty");
return -1;
}

*addr = ring->buffers[ring->read_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT].encoded_addr;
*len = ring->buffers[ring->read_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT].len;
*cookie = &ring->buffers[ring->read_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*cookie = &ring->buffers[ring->read_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT];
*cookie = &ring->buffers[ring->read_idx % CONFIG_LIB_SHARED_RINGBUFFER_DESC_COUNT].cookie;


THREAD_MEMORY_RELEASE();
ring->read_idx++;

return 0;
}
20 changes: 20 additions & 0 deletions libsharedringbuffer/src/shared_ringbuffer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2022, UNSW
* SPDX-License-Identifier: BSD-2-Clause
*/

#include <shared_ringbuffer/shared_ringbuffer.h>

void ring_init(ring_handle_t *ring, ring_buffer_t *avail, ring_buffer_t *used, notify_fn notify, int buffer_init)
{
ring->used_ring = used;
ring->avail_ring = avail;
ring->notify = notify;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we consider passing NULL for any of these an error actually? We could put an assert here for development convenience, so this is caught and avoids potentially having to debug strange errors later.


if (buffer_init) {
ring->used_ring->write_idx = 0;
ring->used_ring->read_idx = 0;
ring->avail_ring->write_idx = 0;
ring->avail_ring->read_idx = 0;
}
}