-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use with ring buffer? #6
Comments
You want broadcast bounded queue/channel? In short - your data stored in chunks/buckets/packets/call as you like. As soon as it read (all readers left it) - it disposed. See here.. And as I understand, there is no performance benefits from using fixed number of chunks. So ... why do you want them bounded? P.S. I think it is possible to have some ROUGH counter (unread chunks counter) on top of queue - which you could use to ... block writing or something... And why do you want it as channel? How do you use it? |
So i have been running some basic tests, i am actually looking for something like MPSC. The idea is that instead of allocating the chunks we will have one single RingBuffer, that would reuse itself so there would be no need to drop and create a new chunks. I have been running some basic tests for the unbounded mpmc from this lib, it does pretty good, i am pushing about 5k events per second but if i just arbitrarily increase the BLOCK_SIZE lest say make it 200k the max latency on the delivered events stays under 0.1ms, except the times when the new block needs to be allocated and old one needs to be dropped when latency spikes. If using the default 4k size basically every couple of seconds max latency on passing by events jumps to roughly 1ms (correlated with the new block creating) not very ideal for my cases. In theory having MPSC would resolve the problem with simple RingBuffer bounded size, as it does not need to allocate new blocks in between I do expect the max latency on event passing to be more or less stable. The idea would be just to track the head for the producers and tail for the consumer. I had a rough ring buffer MPSC drafted (probably not correct) but it kinda proof of concept in general with the latency been way lower and more stable, at least for the MPSC scenario. Also in my case I know for sure that the producer will always be slower than the consumer, so there is no chance of having the producer run out, in theory it should be: use std::sync::atomic::{AtomicUsize, Ordering};
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
const CACHE_LINE_SIZE: usize = 64;
#[repr(align(64))]
struct Slot<T> {
data: UnsafeCell<MaybeUninit<T>>,
stamp: AtomicUsize,
}
#[repr(align(64))]
struct AlignedAtomicUsize(AtomicUsize);
pub struct RingBuffer<T> {
buffer: Box<[Slot<T>]>,
head: AlignedAtomicUsize, // Producers update this
tail: AlignedAtomicUsize, // Single consumer updates this
capacity: usize,
mask: usize,
}
unsafe impl<T: Send> Sync for RingBuffer<T> {}
impl<T> RingBuffer<T> {
pub fn new(capacity: usize) -> Self {
let cap = capacity.next_power_of_two();
let mut buffer = Vec::with_capacity(cap);
for i in 0..cap {
buffer.push(Slot {
data: UnsafeCell::new(MaybeUninit::uninit()),
stamp: AtomicUsize::new(i),
});
}
Self {
buffer: buffer.into_boxed_slice(),
head: AlignedAtomicUsize(AtomicUsize::new(0)),
tail: AlignedAtomicUsize(AtomicUsize::new(0)),
capacity: cap,
mask: cap - 1,
}
}
#[inline]
pub fn push(&self, value: T) -> Result<(), T> {
let mut head = self.head.0.load(Ordering::Relaxed);
let capacity = self.capacity;
loop {
let index = head & self.mask;
let stamp = self.buffer[index].stamp.load(Ordering::Acquire);
if stamp == head {
match self.head.0.compare_exchange_weak(
head,
head + 1,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
// Write value
unsafe {
(*self.buffer[index].data.get()).write(value);
}
// Publish to consumer
self.buffer[index].stamp.store(head + 1, Ordering::Release);
return Ok(());
}
Err(new_head) => head = new_head,
}
} else {
// Check capacity using up-to-date tail
let tail = self.tail.0.load(Ordering::Acquire);
if head.wrapping_sub(tail) >= capacity {
return Err(value);
}
head = self.head.0.load(Ordering::Relaxed);
}
}
}
#[inline]
pub fn pop(&self) -> Option<T> {
let tail = self.tail.0.load(Ordering::Relaxed);
let index = tail & self.mask;
// Check if slot has data
let stamp = self.buffer[index].stamp.load(Ordering::Acquire);
if stamp != tail + 1 {
return None;
}
// Read value
let value = unsafe {
ptr::read((*self.buffer[index].data.get()).as_ptr())
};
// Update stamp and tail
self.buffer[index].stamp.store(tail + self.capacity, Ordering::Release);
self.tail.0.store(tail + 1, Ordering::Release);
Some(value)
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn is_empty(&self) -> bool {
self.head.0.load(Ordering::Relaxed) == self.tail.0.load(Ordering::Relaxed)
}
}
impl<T> Drop for RingBuffer<T> {
fn drop(&mut self) {
let head = self.head.0.get_mut();
let tail = self.tail.0.get_mut();
for i in *tail..*head {
let index = i & self.mask;
unsafe {
ptr::drop_in_place((*self.buffer[index].data.get()).as_mut_ptr());
}
}
}
} Ideally i am looking to achieve low latency on passing data in between. Still trying to figure out all nuances |
If alloc/dealloc is actually a problem - I can fix that. There is no need for ring buffer. It is enough to have N blocks in object pool.
What is the size of your object? I don't believe 4k block alloc/dealloc could cost millisecond... Or that's not on "desktop" hardware? BUT! mpmc writer do spinlock on new block creation... Maybe that cause spikes? How many writers do you have? If you could localize your case and post that as an example, I think I could localize the problem.
As you can guess from "known limitations" this is assumed and required already.
With that implementation - you can't write SIMULTANEOUSLY from multiple producers. |
From the other side - if your object have costly destructor - that could cause spikes... |
Looks like there is actually a special case for mpsc. Single consumer allowed to destruct objects as he goes... |
Yes that makes sense!
That probably should do it, having a pool of object and remove the necessasity to create and distruct the blocks.
Most likely the data size is relatively big, I am passing over the book events that could be few kb raw size. So i am pretty sure it plays a role in the higher latency when passing data, but the spikes on itself are abit arbitrary, usually everything would be all right for period of time, and suddenly will have a spike on the max latency (i kinda roughly correlated it to the Blocks creation, disruction, as increase the block size increased the period without the spikes) I am running roughly 10 - 30 producers depending on workload. Always single consumer.
Yes that is the case in the example above, it is kinda a proof of concept, that definitely has some issues. I have crafted a simple example, excluding the data size from the picture, and having roughly the same number of messages send through as in my system, not using any benchmarking libs etc to keep it abit more close to my real world scenario, more of just running and observing the logs: use std::{sync::Arc, time::{Duration, Instant}};
use chute::LendingReader;
pub struct BasicEvent {
pub stream: String,
pub create_time: Instant,
}
fn main() {
let mut queue: Arc<chute::mpmc::Queue<BasicEvent>> = chute::mpmc::Queue::new();
std::thread::scope(|s| {
// start reader
let mut reader = queue.reader();
s.spawn(move || {
let mut current_time = Instant::now();
let mut max_latency = Duration::new(0, 0);
let mut message_counter = 0;
loop {
let msg = loop {
if let Some(msg) = reader.next() {
break msg;
}
};
message_counter += 1;
let latency = msg.create_time.elapsed();
if max_latency < latency {
max_latency = latency
}
if current_time.elapsed() > Duration::new(10, 0) {
println!(
"Events {} in 10s max latency {:?}",
message_counter, max_latency
);
current_time = Instant::now();
max_latency = Duration::new(0, 0);
message_counter = 0;
}
}
});
// simple example of 30 producers
for _ in 0..30 {
let mut writer = queue.writer();
s.spawn(move || loop {
writer.push(BasicEvent {
stream: String::from("some basic value, could be large"),
create_time: Instant::now(),
});
std::thread::sleep(Duration::from_millis(10));
});
}
})
} run with: I expect the max latency to be somewhat flat, as events size is always the same, may be small variation due to the other things running on systems but not have jitter in the range of milliseconds:
Not those numbers are on my Mac machine and running on other systems will give different result, but i believe irrelevant of what system you run you should see the jitter where some rounds finish with way higher latency than the avg |
Ok, thanks! I'll try your example in a few days. But I bet that mass-called String destructor is the main time consumer. |
I tried your example with default 4k block and 128 block. I would say there is no jitter on Windows 10 i7-4771. For 128 block:
4k has somewhat higher latency, but looks like there'se no jitter too. Try replacing On your experiments with ring buffer latency was better because you never actually destruct |
I replaced Thou I'll probably experiment with object pool too. |
Swapping to Array String, seems like it is improved the latency to delivery. Still have abit of a jitter at some iterations, i believe it has something to do with the data I am passing through and a point of block creation/disruction. Just out of curiosity have you tried to bump the block size ridiculously large, may be 256k or more, just to see if the general latency will be lower in between the new block creation/disruptions. The throughput of 30k per 10s is just a start, ideally I am planing to push up to 500k per second and need to keep jitter to a minimum. Also for the memory leak, I was not able to detect it running the variation of RingBuffer with some fixes from above, memory seems stable to me, just to make sure i increased the throughput to run 600k per 10s (larger messages from the examples) and been observing the memory usage on the whole program for roughly half a day (I might be missing something, pretty new to rust so far, need to figure out all the toolings for proper memory/cpu profiling etc). I will play bit more with different message structure to remove allocation to Heap and see if it will let me push the queue to the necessary limit with low jitter, also i have set up EC2 just to test this variations, as my mac definitely not helping with this as it seems like some jitter been due to the mac itself. |
I have also run a variation of the program with the MallocStackLogging, which resulted in the:
Gonna see if I can get the heap snapshot easier and look through it |
What kind of Mac? Arm with 16C/32T?
You know what, now I see latency spikes. On 256k block (but by design blocks should be small).
You don't destruct your objects at all when you go on another lap. That's guaranteed memory leak for String/Vec. Thou it is OK for ArrayString and other !needs_drop objects.
For tests on cloud, you need dedicated instance (whole MACHINE) - otherwise it will fluctuate by itself. I forgot to mention - make sure you have enabled compiler flag for this (there is one for ALL modern platforms ). It is enabled by default on Windows, but I have no idea what is on other platforms. |
Sorry I was wrong about your RingBuffer. You do destroy objects on |
You know, I don't understand completely how your RingBuffer works. If it works better for you - you should use it then. According to your test, you almost never in situation when writers write SIMULTANEOUSLY, and you don't need broadcasting at all. |
Could you please try this branch on your machine for latency? That should re-use up to 2 blocks. Which should work close to double-buffering. Block have some "initialization" cost, which is zeroing BLOCK_SIZE bits. Thou I don't see significant(if any) difference on my machine. |
Thanks! I will give it a try in a few days, will try to run it on weekends! |
You know, it is possible that you see latency not because of allocation/deallocation/block initialization, but because you have great number of hardware threads. It is possible to have latency in up to min(T,64) messages, if you get unlucky. Maybe not latency per se - your reader will see 64 messages at once.
You see, when writer finish message write - it atomically rise bit in bitblock. But as you see - reader can only read contiguously written messages. Which is actually a performance gain, since this is more cache friendly. But this may cause some latency, if lets say 64 threads write messages at once and so happens that first message finish writing last. 64 - is a bitblock size. Looks like it is possible to make it configurable u8/u16/u32/u64. This will reduce latency to 8/16/32/64 messages max, but reader will do atomic reads more often. P.S. But I think in general that should not happen, since for that to have significant effect - thread should be interrupted right in the middle of message write. |
I think it is possible to have |
I managed to make that unordered reader for mpmc! See https://github.com/tower120/chute/tree/unordered_reader. Use Alas, I don't see any difference in latency in your benchmark, on my hardware. Thou when increasing bandwidth to ~5*10^6 messages/sec, unordered reader have observably lower latency. This is too high numbers for my machine, since most of time CPU do thread-switching, instead of meaningful work (only 4 threads can simultaneously write to memory on i7-4771). And I assume only under such conditions thread could be interrupted in the middle of message writing. On a bright side - it does not reduce queue bandwidth (at least for me). Please try it, and object_pool when you have time. Also for benchmark purposes make sure your MessageObject is !needs_drop. |
Hi, just found your lib a few days ago and been playing around it and it has been pretty nice. However, i noticed that this lib only contains unbound channels, i was wondering on how hard would it be to add ring buffer, to create fast bound channels?
The text was updated successfully, but these errors were encountered: