Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use with ring buffer? #6

Open
goriunov opened this issue Jan 27, 2025 · 19 comments
Open

Use with ring buffer? #6

goriunov opened this issue Jan 27, 2025 · 19 comments

Comments

@goriunov
Copy link

Hi, just found your lib a few days ago and been playing around it and it has been pretty nice. However, i noticed that this lib only contains unbound channels, i was wondering on how hard would it be to add ring buffer, to create fast bound channels?

@tower120
Copy link
Owner

tower120 commented Jan 28, 2025

You want broadcast bounded queue/channel?


In short - your data stored in chunks/buckets/packets/call as you like. As soon as it read (all readers left it) - it disposed. See here..

And as I understand, there is no performance benefits from using fixed number of chunks.
I don't remember if currently disposed chunks are reused, but even if not - that is very minor performance difference.

So ... why do you want them bounded?

P.S. I think it is possible to have some ROUGH counter (unread chunks counter) on top of queue - which you could use to ... block writing or something...


And why do you want it as channel?

How do you use it?

@goriunov
Copy link
Author

goriunov commented Jan 28, 2025

So i have been running some basic tests, i am actually looking for something like MPSC.

The idea is that instead of allocating the chunks we will have one single RingBuffer, that would reuse itself so there would be no need to drop and create a new chunks.

I have been running some basic tests for the unbounded mpmc from this lib, it does pretty good, i am pushing about 5k events per second but if i just arbitrarily increase the BLOCK_SIZE lest say make it 200k the max latency on the delivered events stays under 0.1ms, except the times when the new block needs to be allocated and old one needs to be dropped when latency spikes. If using the default 4k size basically every couple of seconds max latency on passing by events jumps to roughly 1ms (correlated with the new block creating) not very ideal for my cases.

In theory having MPSC would resolve the problem with simple RingBuffer bounded size, as it does not need to allocate new blocks in between I do expect the max latency on event passing to be more or less stable. The idea would be just to track the head for the producers and tail for the consumer.

I had a rough ring buffer MPSC drafted (probably not correct) but it kinda proof of concept in general with the latency been way lower and more stable, at least for the MPSC scenario.

Also in my case I know for sure that the producer will always be slower than the consumer, so there is no chance of having the producer run out, in theory it should be:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;

const CACHE_LINE_SIZE: usize = 64;

#[repr(align(64))]
struct Slot<T> {
    data: UnsafeCell<MaybeUninit<T>>,
    stamp: AtomicUsize,
}

#[repr(align(64))]
struct AlignedAtomicUsize(AtomicUsize);

pub struct RingBuffer<T> {
    buffer: Box<[Slot<T>]>,
    head: AlignedAtomicUsize,      // Producers update this
    tail: AlignedAtomicUsize,      // Single consumer updates this
    capacity: usize,
    mask: usize,
}

unsafe impl<T: Send> Sync for RingBuffer<T> {}

impl<T> RingBuffer<T> {
    pub fn new(capacity: usize) -> Self {
        let cap = capacity.next_power_of_two();
        let mut buffer = Vec::with_capacity(cap);
        
        for i in 0..cap {
            buffer.push(Slot {
                data: UnsafeCell::new(MaybeUninit::uninit()),
                stamp: AtomicUsize::new(i),
            });
        }

        Self {
            buffer: buffer.into_boxed_slice(),
            head: AlignedAtomicUsize(AtomicUsize::new(0)),
            tail: AlignedAtomicUsize(AtomicUsize::new(0)),
            capacity: cap,
            mask: cap - 1,
        }
    }

    #[inline]
    pub fn push(&self, value: T) -> Result<(), T> {
        let mut head = self.head.0.load(Ordering::Relaxed);
        let capacity = self.capacity;
        
        loop {
            let index = head & self.mask;
            let stamp = self.buffer[index].stamp.load(Ordering::Acquire);
            
            if stamp == head {
                match self.head.0.compare_exchange_weak(
                    head,
                    head + 1,
                    Ordering::Acquire,
                    Ordering::Relaxed,
                ) {
                    Ok(_) => {
                        // Write value
                        unsafe {
                            (*self.buffer[index].data.get()).write(value);
                        }
                        // Publish to consumer
                        self.buffer[index].stamp.store(head + 1, Ordering::Release);
                        return Ok(());
                    }
                    Err(new_head) => head = new_head,
                }
            } else {
                // Check capacity using up-to-date tail
                let tail = self.tail.0.load(Ordering::Acquire);
                if head.wrapping_sub(tail) >= capacity {
                    return Err(value);
                }
                head = self.head.0.load(Ordering::Relaxed);
            }
        }
    }

    #[inline]
    pub fn pop(&self) -> Option<T> {
        let tail = self.tail.0.load(Ordering::Relaxed);
        let index = tail & self.mask;
        
        // Check if slot has data
        let stamp = self.buffer[index].stamp.load(Ordering::Acquire);
        if stamp != tail + 1 {
            return None;
        }

        // Read value
        let value = unsafe {
            ptr::read((*self.buffer[index].data.get()).as_ptr())
        };

        // Update stamp and tail
        self.buffer[index].stamp.store(tail + self.capacity, Ordering::Release);
        self.tail.0.store(tail + 1, Ordering::Release);
        
        Some(value)
    }

    #[inline]
    pub fn capacity(&self) -> usize {
        self.capacity
    }

    #[inline]
    pub fn is_empty(&self) -> bool {
        self.head.0.load(Ordering::Relaxed) == self.tail.0.load(Ordering::Relaxed)
    }
}

impl<T> Drop for RingBuffer<T> {
    fn drop(&mut self) {
        let head = self.head.0.get_mut();
        let tail = self.tail.0.get_mut();
        
        for i in *tail..*head {
            let index = i & self.mask;
            unsafe {
                ptr::drop_in_place((*self.buffer[index].data.get()).as_mut_ptr());
            }
        }
    }
}

Ideally i am looking to achieve low latency on passing data in between. Still trying to figure out all nuances

@tower120
Copy link
Owner

tower120 commented Jan 28, 2025

I have been running some basic tests for the unbounded mpmc from this lib, it does pretty good, i am pushing about 5k events per second but if i just arbitrarily increase the BLOCK_SIZE lest say make it 200k the max latency on the delivered events stays under 0.1ms, except the times when the new block needs to be allocated and old one needs to be dropped when latency spikes.

If alloc/dealloc is actually a problem - I can fix that. There is no need for ring buffer. It is enough to have N blocks in object pool.
After all object pool - can be viewed as a ring buffer itself.

If using the default 4k size basically every couple of seconds max latency on passing by events jumps to roughly 1ms (correlated with the new block creating) not very ideal for my cases.

What is the size of your object? I don't believe 4k block alloc/dealloc could cost millisecond... Or that's not on "desktop" hardware?


BUT! mpmc writer do spinlock on new block creation... Maybe that cause spikes? How many writers do you have?

If you could localize your case and post that as an example, I think I could localize the problem.


Also in my case I know for sure that the producer will always be slower than the consumer, so there is no chance of having the producer run out...

As you can guess from "known limitations" this is assumed and required already.

The idea would be just to track the head for the producers and tail for the consumer.

With that implementation - you can't write SIMULTANEOUSLY from multiple producers.
You do +1 to head's counter after ANY "write finish" event. If writers will finish NOT in the same order as they started - after one of your +1s head will point to not yet written "slot". That's why "atomic bitblocks" needed - and that's why chute makes a difference.

@tower120
Copy link
Owner

From the other side - if your object have costly destructor - that could cause spikes...
Try REDUCE block size to 64, 128, 256, 512.

@tower120
Copy link
Owner

Looks like there is actually a special case for mpsc. Single consumer allowed to destruct objects as he goes...

@goriunov
Copy link
Author

goriunov commented Jan 28, 2025

Yes that makes sense!

If alloc/dealloc is actually a problem - I can fix that. There is no need for ring buffer. It is enough to have N blocks in object pool.
After all object pool - can be viewed as a ring buffer itself.

That probably should do it, having a pool of object and remove the necessasity to create and distruct the blocks.

What is the size of your object? I don't believe 4k block alloc/dealloc could cost millisecond... Or that's not on "desktop" hardware?

Most likely the data size is relatively big, I am passing over the book events that could be few kb raw size. So i am pretty sure it plays a role in the higher latency when passing data, but the spikes on itself are abit arbitrary, usually everything would be all right for period of time, and suddenly will have a spike on the max latency (i kinda roughly correlated it to the Blocks creation, disruction, as increase the block size increased the period without the spikes)

I am running roughly 10 - 30 producers depending on workload. Always single consumer.

With that implementation - you can't write SIMULTANEOUSLY from multiple producers.
You do +1 to head's counter after ANY "write finish" event. If writers will finish NOT in the same order as they started - after one of your +1s head will point to not yet written "slot". That's why "atomic bitblocks" needed - and that's why chute makes a difference.

Yes that is the case in the example above, it is kinda a proof of concept, that definitely has some issues.

I have crafted a simple example, excluding the data size from the picture, and having roughly the same number of messages send through as in my system, not using any benchmarking libs etc to keep it abit more close to my real world scenario, more of just running and observing the logs:

use std::{sync::Arc, time::{Duration, Instant}};
use chute::LendingReader;

pub struct BasicEvent {
    pub stream: String,
    pub create_time: Instant,
}

fn main() {
    let mut queue: Arc<chute::mpmc::Queue<BasicEvent>> = chute::mpmc::Queue::new();

    std::thread::scope(|s| {
        // start reader
        let mut reader = queue.reader();
        s.spawn(move || {
            let mut current_time = Instant::now();
            let mut max_latency = Duration::new(0, 0);
            let mut message_counter = 0;

            loop {
                let msg = loop {
                    if let Some(msg) = reader.next() {
                        break msg;
                    }
                };

                message_counter += 1;

                let latency = msg.create_time.elapsed();
                if max_latency < latency {
                    max_latency = latency
                }

                if current_time.elapsed() > Duration::new(10, 0) {
                    println!(
                        "Events {} in 10s max latency {:?}",
                        message_counter, max_latency
                    );
                    current_time = Instant::now();
                    max_latency = Duration::new(0, 0);
                    message_counter = 0;
                }
            }
        });

        // simple example of 30 producers
        for _ in 0..30 {
            let mut writer = queue.writer();
            s.spawn(move || loop {
                writer.push(BasicEvent {
                    stream: String::from("some basic value, could be large"),
                    create_time: Instant::now(),
                });
                std::thread::sleep(Duration::from_millis(10));
            });
        }
    })
}

run with: cargo run --release

I expect the max latency to be somewhat flat, as events size is always the same, may be small variation due to the other things running on systems but not have jitter in the range of milliseconds:

// block size set to 128

Events 25503 in 10s max latency 114.292µs
Events 25485 in 10s max latency 374.791µs
Events 25463 in 10s max latency 2.053666ms
Events 25475 in 10s max latency 2.363333ms
Events 25407 in 10s max latency 855.25µs
Events 25443 in 10s max latency 394µs
Events 25414 in 10s max latency 46.791µs
Events 25453 in 10s max latency 133.375µs
Events 25333 in 10s max latency 51.259917ms
Events 25440 in 10s max latency 80µs

Not those numbers are on my Mac machine and running on other systems will give different result, but i believe irrelevant of what system you run you should see the jitter where some rounds finish with way higher latency than the avg

@tower120
Copy link
Owner

tower120 commented Jan 28, 2025

Ok, thanks!

I'll try your example in a few days. But I bet that mass-called String destructor is the main time consumer.

@tower120
Copy link
Owner

I tried your example with default 4k block and 128 block. I would say there is no jitter on Windows 10 i7-4771.

For 128 block:

Events 28001 in 10s max latency 141.1µs
Events 28400 in 10s max latency 469.4µs
Events 27951 in 10s max latency 131.4µs
Events 28274 in 10s max latency 162.8µs
Events 28508 in 10s max latency 200.3µs
Events 28291 in 10s max latency 134.9µs
Events 28337 in 10s max latency 289.6µs
Events 28592 in 10s max latency 224.2µs

4k has somewhat higher latency, but looks like there'se no jitter too.

Try replacing String in your BasicEvent with something like [u8;256].
In Rust String is just Vec<u8> - you constantly making allocations/deallocations on a hot path.
I assume that Windows allocator just do a better job then one in Mac.

On your experiments with ring buffer latency was better because you never actually destruct BasicEvent object when your "ring" goes on another lap. Looks like you simply leaked memory.

@tower120
Copy link
Owner

I replaced String with this. With 128 block latency become 2 times lower.

Thou I'll probably experiment with object pool too.

@goriunov
Copy link
Author

goriunov commented Jan 29, 2025

Swapping to Array String, seems like it is improved the latency to delivery. Still have abit of a jitter at some iterations, i believe it has something to do with the data I am passing through and a point of block creation/disruction. Just out of curiosity have you tried to bump the block size ridiculously large, may be 256k or more, just to see if the general latency will be lower in between the new block creation/disruptions.

The throughput of 30k per 10s is just a start, ideally I am planing to push up to 500k per second and need to keep jitter to a minimum.

Also for the memory leak, I was not able to detect it running the variation of RingBuffer with some fixes from above, memory seems stable to me, just to make sure i increased the throughput to run 600k per 10s (larger messages from the examples) and been observing the memory usage on the whole program for roughly half a day (I might be missing something, pretty new to rust so far, need to figure out all the toolings for proper memory/cpu profiling etc).

I will play bit more with different message structure to remove allocation to Heap and see if it will let me push the queue to the necessary limit with low jitter, also i have set up EC2 just to test this variations, as my mac definitely not helping with this as it seems like some jitter been due to the mac itself.

@goriunov
Copy link
Author

I have also run a variation of the program with the MallocStackLogging, which resulted in the:

leaks Report Version: 4.0, multi-line stacks
Process 9158: 86352 nodes malloced for 12843 KB
Process 9158: 0 leaks for 0 total leaked bytes.

Gonna see if I can get the heap snapshot easier and look through it

@tower120
Copy link
Owner

tower120 commented Jan 29, 2025

What kind of Mac? Arm with 16C/32T?
I test on 4C/8T CPU - maybe there is issues that you describe, I just don't see them.

Just out of curiosity have you tried to bump the block size ridiculously large, may be 256k or more, just to see if the general latency will be lower in between the new block creation/disruptions.

You know what, now I see latency spikes. On 256k block (but by design blocks should be small).
I will assume that on some platforms allocator just not work as good as on Windows.
I try to implement object pool - that should fix that.

Also for the memory leak, I was not able to detect it running the variation of RingBuffer

You don't destruct your objects at all when you go on another lap. That's guaranteed memory leak for String/Vec. Thou it is OK for ArrayString and other !needs_drop objects.

also i have set up EC2 just to test this variations

For tests on cloud, you need dedicated instance (whole MACHINE) - otherwise it will fluctuate by itself.
It is not enough to rent just dedicated CPU socket, since you working with memory-bandwidth sensitive operations,
and all CPUs basically use the same memory.


I forgot to mention - make sure you have enabled compiler flag for this (there is one for ALL modern platforms ). It is enabled by default on Windows, but I have no idea what is on other platforms.

@tower120
Copy link
Owner

Sorry I was wrong about your RingBuffer. You do destroy objects on pop by transferring ownership.

@tower120
Copy link
Owner

You know, I don't understand completely how your RingBuffer works.
If your algorithm waits on push for other writer to finish - then I was wrong saying that your RingBuffer will work wrong on simultaneous writes.

If it works better for you - you should use it then. According to your test, you almost never in situation when writers write SIMULTANEOUSLY, and you don't need broadcasting at all.

@tower120
Copy link
Owner

tower120 commented Jan 29, 2025

Could you please try this branch on your machine for latency?

That should re-use up to 2 blocks. Which should work close to double-buffering. Block have some "initialization" cost, which is zeroing BLOCK_SIZE bits.

Thou I don't see significant(if any) difference on my machine.

@goriunov
Copy link
Author

Thanks! I will give it a try in a few days, will try to run it on weekends!

@tower120
Copy link
Owner

tower120 commented Jan 31, 2025

You know, it is possible that you see latency not because of allocation/deallocation/block initialization, but because you have great number of hardware threads. It is possible to have latency in up to min(T,64) messages, if you get unlucky. Maybe not latency per se - your reader will see 64 messages at once.

chute being broadcast - designed for maximum-bandwidth first.

You see, when writer finish message write - it atomically rise bit in bitblock.
Reader each time you ask for next message read atomically bitblock and get trailing_ones bits - this is the number of contiguous messages fully written, and ready to read. This allows reader to read messages in bulk without having to atomically read for each message. It does not write anything as well.

But as you see - reader can only read contiguously written messages. Which is actually a performance gain, since this is more cache friendly. But this may cause some latency, if lets say 64 threads write messages at once and so happens that first message finish writing last.

64 - is a bitblock size. Looks like it is possible to make it configurable u8/u16/u32/u64. This will reduce latency to 8/16/32/64 messages max, but reader will do atomic reads more often.

P.S. But I think in general that should not happen, since for that to have significant effect - thread should be interrupted right in the middle of message write.

@tower120
Copy link
Owner

tower120 commented Jan 31, 2025

I think it is possible to have mpsc (and maybe mpmc) version, that read messages out-of-order (within bitblock). I don't know can it be called "queue" thou, after that...

@tower120
Copy link
Owner

tower120 commented Feb 1, 2025

I managed to make that unordered reader for mpmc! See https://github.com/tower120/chute/tree/unordered_reader. Use queue.unordered_reader() instead of queue.reader().

Alas, I don't see any difference in latency in your benchmark, on my hardware. Thou when increasing bandwidth to ~5*10^6 messages/sec, unordered reader have observably lower latency. This is too high numbers for my machine, since most of time CPU do thread-switching, instead of meaningful work (only 4 threads can simultaneously write to memory on i7-4771). And I assume only under such conditions thread could be interrupted in the middle of message writing.

On a bright side - it does not reduce queue bandwidth (at least for me).


Please try it, and object_pool when you have time. Also for benchmark purposes make sure your MessageObject is !needs_drop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants