1313#include <linux/btf_ids.h>
1414#include <asm/rqspinlock.h>
1515
16- #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
16+ #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE | BPF_F_RB_OVERWRITE )
1717
1818/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
1919#define RINGBUF_PGOFF \
@@ -30,6 +30,7 @@ struct bpf_ringbuf {
3030 u64 mask ;
3131 struct page * * pages ;
3232 int nr_pages ;
33+ bool overwrite_mode ;
3334 rqspinlock_t spinlock ____cacheline_aligned_in_smp ;
3435 /* For user-space producer ring buffers, an atomic_t busy bit is used
3536 * to synchronize access to the ring buffers in the kernel, rather than
@@ -72,6 +73,8 @@ struct bpf_ringbuf {
7273 */
7374 unsigned long consumer_pos __aligned (PAGE_SIZE );
7475 unsigned long producer_pos __aligned (PAGE_SIZE );
76+ /* points to the record right after the last overwritten one */
77+ unsigned long overwrite_pos ;
7578 unsigned long pending_pos ;
7679 char data [] __aligned (PAGE_SIZE );
7780};
@@ -166,7 +169,7 @@ static void bpf_ringbuf_notify(struct irq_work *work)
166169 * considering that the maximum value of data_sz is (4GB - 1), there
167170 * will be no overflow, so just note the size limit in the comments.
168171 */
169- static struct bpf_ringbuf * bpf_ringbuf_alloc (size_t data_sz , int numa_node )
172+ static struct bpf_ringbuf * bpf_ringbuf_alloc (size_t data_sz , int numa_node , bool overwrite_mode )
170173{
171174 struct bpf_ringbuf * rb ;
172175
@@ -183,17 +186,25 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
183186 rb -> consumer_pos = 0 ;
184187 rb -> producer_pos = 0 ;
185188 rb -> pending_pos = 0 ;
189+ rb -> overwrite_mode = overwrite_mode ;
186190
187191 return rb ;
188192}
189193
190194static struct bpf_map * ringbuf_map_alloc (union bpf_attr * attr )
191195{
196+ bool overwrite_mode = false;
192197 struct bpf_ringbuf_map * rb_map ;
193198
194199 if (attr -> map_flags & ~RINGBUF_CREATE_FLAG_MASK )
195200 return ERR_PTR (- EINVAL );
196201
202+ if (attr -> map_flags & BPF_F_RB_OVERWRITE ) {
203+ if (attr -> map_type == BPF_MAP_TYPE_USER_RINGBUF )
204+ return ERR_PTR (- EINVAL );
205+ overwrite_mode = true;
206+ }
207+
197208 if (attr -> key_size || attr -> value_size ||
198209 !is_power_of_2 (attr -> max_entries ) ||
199210 !PAGE_ALIGNED (attr -> max_entries ))
@@ -205,7 +216,7 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
205216
206217 bpf_map_init_from_attr (& rb_map -> map , attr );
207218
208- rb_map -> rb = bpf_ringbuf_alloc (attr -> max_entries , rb_map -> map .numa_node );
219+ rb_map -> rb = bpf_ringbuf_alloc (attr -> max_entries , rb_map -> map .numa_node , overwrite_mode );
209220 if (!rb_map -> rb ) {
210221 bpf_map_area_free (rb_map );
211222 return ERR_PTR (- ENOMEM );
@@ -293,13 +304,25 @@ static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma
293304 return remap_vmalloc_range (vma , rb_map -> rb , vma -> vm_pgoff + RINGBUF_PGOFF );
294305}
295306
307+ /* Return an estimate of the available data in the ring buffer.
308+ * Note: the returned value can exceed the actual ring buffer size because the
309+ * function is not synchronized with the producer. The producer acquires the
310+ * ring buffer's spinlock, but this function does not.
311+ */
296312static unsigned long ringbuf_avail_data_sz (struct bpf_ringbuf * rb )
297313{
298- unsigned long cons_pos , prod_pos ;
314+ unsigned long cons_pos , prod_pos , over_pos ;
299315
300316 cons_pos = smp_load_acquire (& rb -> consumer_pos );
301- prod_pos = smp_load_acquire (& rb -> producer_pos );
302- return prod_pos - cons_pos ;
317+
318+ if (unlikely (rb -> overwrite_mode )) {
319+ over_pos = smp_load_acquire (& rb -> overwrite_pos );
320+ prod_pos = smp_load_acquire (& rb -> producer_pos );
321+ return prod_pos - max (cons_pos , over_pos );
322+ } else {
323+ prod_pos = smp_load_acquire (& rb -> producer_pos );
324+ return prod_pos - cons_pos ;
325+ }
303326}
304327
305328static u32 ringbuf_total_data_sz (const struct bpf_ringbuf * rb )
@@ -402,11 +425,41 @@ bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr)
402425 return (void * )((addr & PAGE_MASK ) - off );
403426}
404427
428+ static bool bpf_ringbuf_has_space (const struct bpf_ringbuf * rb ,
429+ unsigned long new_prod_pos ,
430+ unsigned long cons_pos ,
431+ unsigned long pend_pos )
432+ {
433+ /* no space if oldest not yet committed record until the newest
434+ * record span more than (ringbuf_size - 1).
435+ */
436+ if (new_prod_pos - pend_pos > rb -> mask )
437+ return false;
438+
439+ /* ok, we have space in overwrite mode */
440+ if (unlikely (rb -> overwrite_mode ))
441+ return true;
442+
443+ /* no space if producer position advances more than (ringbuf_size - 1)
444+ * ahead of consumer position when not in overwrite mode.
445+ */
446+ if (new_prod_pos - cons_pos > rb -> mask )
447+ return false;
448+
449+ return true;
450+ }
451+
452+ static u32 bpf_ringbuf_round_up_hdr_len (u32 hdr_len )
453+ {
454+ hdr_len &= ~BPF_RINGBUF_DISCARD_BIT ;
455+ return round_up (hdr_len + BPF_RINGBUF_HDR_SZ , 8 );
456+ }
457+
405458static void * __bpf_ringbuf_reserve (struct bpf_ringbuf * rb , u64 size )
406459{
407- unsigned long cons_pos , prod_pos , new_prod_pos , pend_pos , flags ;
460+ unsigned long cons_pos , prod_pos , new_prod_pos , pend_pos , over_pos , flags ;
408461 struct bpf_ringbuf_hdr * hdr ;
409- u32 len , pg_off , tmp_size , hdr_len ;
462+ u32 len , pg_off , hdr_len ;
410463
411464 if (unlikely (size > RINGBUF_MAX_RECORD_SZ ))
412465 return NULL ;
@@ -429,24 +482,40 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
429482 hdr_len = READ_ONCE (hdr -> len );
430483 if (hdr_len & BPF_RINGBUF_BUSY_BIT )
431484 break ;
432- tmp_size = hdr_len & ~BPF_RINGBUF_DISCARD_BIT ;
433- tmp_size = round_up (tmp_size + BPF_RINGBUF_HDR_SZ , 8 );
434- pend_pos += tmp_size ;
485+ pend_pos += bpf_ringbuf_round_up_hdr_len (hdr_len );
435486 }
436487 rb -> pending_pos = pend_pos ;
437488
438- /* check for out of ringbuf space:
439- * - by ensuring producer position doesn't advance more than
440- * (ringbuf_size - 1) ahead
441- * - by ensuring oldest not yet committed record until newest
442- * record does not span more than (ringbuf_size - 1)
443- */
444- if (new_prod_pos - cons_pos > rb -> mask ||
445- new_prod_pos - pend_pos > rb -> mask ) {
489+ if (!bpf_ringbuf_has_space (rb , new_prod_pos , cons_pos , pend_pos )) {
446490 raw_res_spin_unlock_irqrestore (& rb -> spinlock , flags );
447491 return NULL ;
448492 }
449493
494+ /* In overwrite mode, advance overwrite_pos when the ring buffer is full.
495+ * The key points are to stay on record boundaries and consume enough records
496+ * to fit the new one.
497+ */
498+ if (unlikely (rb -> overwrite_mode )) {
499+ over_pos = rb -> overwrite_pos ;
500+ while (new_prod_pos - over_pos > rb -> mask ) {
501+ hdr = (void * )rb -> data + (over_pos & rb -> mask );
502+ hdr_len = READ_ONCE (hdr -> len );
503+ /* The bpf_ringbuf_has_space() check above ensures we won’t
504+ * step over a record currently being worked on by another
505+ * producer.
506+ */
507+ over_pos += bpf_ringbuf_round_up_hdr_len (hdr_len );
508+ }
509+ /* smp_store_release(&rb->producer_pos, new_prod_pos) at
510+ * the end of the function ensures that when consumer sees
511+ * the updated rb->producer_pos, it always sees the updated
512+ * rb->overwrite_pos, so when consumer reads overwrite_pos
513+ * after smp_load_acquire(r->producer_pos), the overwrite_pos
514+ * will always be valid.
515+ */
516+ WRITE_ONCE (rb -> overwrite_pos , over_pos );
517+ }
518+
450519 hdr = (void * )rb -> data + (prod_pos & rb -> mask );
451520 pg_off = bpf_ringbuf_rec_pg_off (rb , hdr );
452521 hdr -> len = size | BPF_RINGBUF_BUSY_BIT ;
@@ -576,6 +645,8 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
576645 return smp_load_acquire (& rb -> consumer_pos );
577646 case BPF_RB_PROD_POS :
578647 return smp_load_acquire (& rb -> producer_pos );
648+ case BPF_RB_OVERWRITE_POS :
649+ return smp_load_acquire (& rb -> overwrite_pos );
579650 default :
580651 return 0 ;
581652 }
0 commit comments