Skip to content

Commit 764efde

Browse files
committed
add tests
Signed-off-by: Benny Zlotnik <bzlotnik@redhat.com>
1 parent 04ba986 commit 764efde

2 files changed

Lines changed: 200 additions & 0 deletions

File tree

tests/byte_bounded_memory.rs

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Integration tests for byte-bounded memory behavior
2+
3+
use bytes::Bytes;
4+
use fls::fls::byte_channel::byte_bounded_channel;
5+
use std::time::Duration;
6+
use tokio::time::timeout;
7+
8+
/// Test that backpressure kicks in with large chunks
9+
#[tokio::test]
10+
async fn test_backpressure_with_large_chunks() {
11+
let max_bytes = 128 * 1024; // 128KB limit
12+
let (tx, mut rx) = byte_bounded_channel::<Bytes>(max_bytes, 100);
13+
14+
// Step 1: Fill buffer to capacity
15+
let chunk = Bytes::from(vec![1u8; 128 * 1024]); // Exactly the buffer size
16+
tx.send(chunk).await.unwrap();
17+
18+
// Step 2: Try to send another chunk - this should block due to backpressure
19+
let blocking_chunk = Bytes::from(vec![2u8; 64 * 1024]);
20+
let send_task = tokio::spawn(async move {
21+
tx.send(blocking_chunk).await.unwrap();
22+
"completed"
23+
});
24+
25+
// Step 3: Verify the send is blocked (structural check, not timing)
26+
tokio::time::sleep(Duration::from_millis(10)).await;
27+
assert!(
28+
!send_task.is_finished(),
29+
"Send should be blocked by backpressure"
30+
);
31+
32+
// Step 4: Consume the buffered chunk to free space
33+
let _consumed = rx.recv().await.unwrap();
34+
35+
// Step 5: Now the blocked send should complete
36+
let result = timeout(Duration::from_millis(100), send_task).await;
37+
assert!(result.is_ok(), "Send should unblock after freeing space");
38+
assert_eq!(result.unwrap().unwrap(), "completed");
39+
}
40+
41+
/// Test that small chunks don't artificially limit throughput
42+
#[tokio::test]
43+
async fn test_small_chunks_high_throughput() {
44+
let max_bytes = 64 * 1024; // 64KB limit
45+
let (tx, mut rx) = byte_bounded_channel::<Bytes>(max_bytes, 10000);
46+
47+
let start_time = std::time::Instant::now();
48+
49+
// Producer: send many small chunks quickly
50+
let producer = tokio::spawn(async move {
51+
for i in 0..1000 {
52+
let chunk = Bytes::from(vec![i as u8; 32]); // 32-byte chunks
53+
tx.send(chunk).await.unwrap();
54+
}
55+
});
56+
57+
// Consumer: receive all chunks
58+
let consumer = tokio::spawn(async move {
59+
let mut count = 0;
60+
while let Some(_chunk) = rx.recv().await {
61+
count += 1;
62+
if count >= 1000 {
63+
break;
64+
}
65+
}
66+
count
67+
});
68+
69+
let (_, received_count) = tokio::join!(producer, consumer);
70+
let elapsed = start_time.elapsed();
71+
72+
assert_eq!(received_count.unwrap(), 1000);
73+
// Should complete quickly (small chunks shouldn't be bottlenecked)
74+
assert!(
75+
elapsed < Duration::from_secs(1),
76+
"Small chunks took too long: {:?}",
77+
elapsed
78+
);
79+
}
80+
81+
/// Test backpressure behavior with mixed chunk sizes
82+
#[tokio::test]
83+
async fn test_backpressure_with_mixed_sizes() {
84+
let max_bytes = 256 * 1024; // 256KB limit
85+
let (tx, mut rx) = byte_bounded_channel::<Bytes>(max_bytes, 100);
86+
87+
// Fill buffer with mixed-size chunks
88+
tx.send(Bytes::from(vec![1u8; 128 * 1024])).await.unwrap(); // 128KB
89+
tx.send(Bytes::from(vec![2u8; 64 * 1024])).await.unwrap(); // 64KB
90+
tx.send(Bytes::from(vec![3u8; 32 * 1024])).await.unwrap(); // 32KB
91+
// Total: 224KB (getting close to 256KB limit)
92+
93+
// Try to send another 64KB chunk - this should block
94+
let blocking_chunk = Bytes::from(vec![4u8; 64 * 1024]); // Would exceed limit
95+
let send_task = tokio::spawn(async move {
96+
tx.send(blocking_chunk).await.unwrap();
97+
"completed"
98+
});
99+
100+
// Verify backpressure is working
101+
tokio::time::sleep(Duration::from_millis(10)).await;
102+
assert!(
103+
!send_task.is_finished(),
104+
"Send should be blocked when buffer would exceed limit"
105+
);
106+
107+
// Consume the 128KB chunk to free space
108+
let consumed = rx.recv().await.unwrap();
109+
assert_eq!(consumed.len(), 128 * 1024);
110+
111+
// Now the blocked send should complete
112+
let result = timeout(Duration::from_millis(100), send_task).await;
113+
assert!(result.is_ok(), "Send should unblock after consuming data");
114+
assert_eq!(result.unwrap().unwrap(), "completed");
115+
}
116+
117+
/// Test that oversized single chunks don't deadlock
118+
#[tokio::test]
119+
async fn test_oversized_chunk_no_deadlock() {
120+
let max_bytes = 100 * 1024; // 100KB limit
121+
let (tx, mut rx) = byte_bounded_channel::<Bytes>(max_bytes, 10);
122+
123+
// Send a chunk larger than the buffer
124+
let oversized_chunk = Bytes::from(vec![42u8; 256 * 1024]); // 256KB chunk
125+
126+
let producer = tokio::spawn(async move {
127+
tx.send(oversized_chunk.clone()).await.unwrap();
128+
oversized_chunk
129+
});
130+
131+
let consumer = tokio::spawn(async move { rx.recv().await.unwrap() });
132+
133+
// Should complete without deadlock
134+
let result = timeout(Duration::from_secs(1), async move {
135+
let (sent, received) = tokio::join!(producer, consumer);
136+
(sent.unwrap(), received.unwrap())
137+
})
138+
.await;
139+
140+
assert!(result.is_ok(), "Oversized chunk should not deadlock");
141+
let (sent, received) = result.unwrap();
142+
assert_eq!(sent, received);
143+
}
144+
145+
/// Test property: regardless of chunk pattern, all data flows through correctly
146+
#[tokio::test]
147+
async fn test_chunk_size_independence() {
148+
let max_bytes = 256 * 1024; // 256KB limit
149+
150+
// Test different chunk patterns
151+
let test_cases = vec![
152+
("uniform_small", vec![4096; 100]), // 100 × 4KB
153+
("uniform_large", vec![64 * 1024; 10]), // 10 × 64KB
154+
("mixed", vec![1024, 32 * 1024, 1024, 128 * 1024, 1024]), // Mixed sizes
155+
("single_large", vec![200 * 1024]), // 1 × 200KB
156+
];
157+
158+
for (name, chunk_sizes) in test_cases {
159+
println!("Testing chunk pattern: {}", name);
160+
161+
let (tx, mut rx) = byte_bounded_channel::<Bytes>(max_bytes, 1000);
162+
163+
// Producer: send the chunk pattern
164+
let chunk_pattern = chunk_sizes.clone();
165+
let producer = tokio::spawn(async move {
166+
let mut total_bytes = 0;
167+
for (i, size) in chunk_pattern.iter().enumerate() {
168+
let chunk = Bytes::from(vec![i as u8; *size]);
169+
total_bytes += chunk.len();
170+
tx.send(chunk).await.unwrap();
171+
}
172+
total_bytes
173+
});
174+
175+
// Consumer: receive all chunks
176+
let consumer = tokio::spawn(async move {
177+
let mut total_bytes = 0;
178+
let mut chunk_count = 0;
179+
while let Some(chunk) = rx.recv().await {
180+
total_bytes += chunk.len();
181+
chunk_count += 1;
182+
183+
if chunk_count >= chunk_sizes.len() {
184+
break;
185+
}
186+
}
187+
total_bytes
188+
});
189+
190+
let (sent_bytes, received_bytes) = tokio::join!(producer, consumer);
191+
assert_eq!(
192+
sent_bytes.unwrap(),
193+
received_bytes.unwrap(),
194+
"All bytes should flow through for pattern: {}",
195+
name
196+
);
197+
}
198+
}

tests/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::io::Write;
55
use xz2::write::XzEncoder;
66

77
/// Generate deterministic test data of a given size
8+
#[allow(dead_code)]
89
pub fn create_test_data(size: usize) -> Vec<u8> {
910
// Create a repeating pattern for easier debugging
1011
let pattern = b"TESTDATA";
@@ -28,6 +29,7 @@ pub fn compress_xz(data: &[u8]) -> Vec<u8> {
2829
}
2930

3031
/// Compress data using gzip compression
32+
#[allow(dead_code)]
3133
pub fn compress_gz(data: &[u8]) -> Vec<u8> {
3234
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3335
encoder

0 commit comments

Comments
 (0)