Skip to content

Adding support for Seqpacket #5197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/vmm/src/devices/virtio/net/tap.rs
Original file line number Diff line number Diff line change
@@ -241,10 +241,17 @@ pub mod tests {
generated::ifreq__bindgen_ty_1::default().ifrn_name.len()
});

// Empty name - The tap should be named "tap0" by default
// Empty name - The tap should be named by the kernel (e.g., "tap0", "tap1", etc.)
let tap = Tap::open_named("").unwrap();
assert_eq!(b"tap0\0\0\0\0\0\0\0\0\0\0\0\0", &tap.if_name);
assert_eq!("tap0", tap.if_name_as_str());
let tap_name_str = tap.if_name_as_str();

// Check that it starts with "tap" and the remainder is numeric.
assert!(
Regex::new(r"^tap\d+$").unwrap().is_match(tap_name_str),
"Generated tap name '{}' does not match expected pattern",
tap_name_str
);


// Test using '%d' to have the kernel assign an unused name,
// and that we correctly copy back that generated name
111 changes: 60 additions & 51 deletions src/vmm/src/devices/virtio/rng/device.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use aws_lc_rs::rand;
use vm_memory::GuestMemoryError;
use vmm_sys_util::eventfd::EventFd;

use super::metrics::METRICS;
use super::metrics::EntropyMetricsPerDevice;
use super::{RNG_NUM_QUEUES, RNG_QUEUE};
use crate::devices::DeviceError;
use crate::devices::virtio::device::{DeviceState, IrqTrigger, IrqType, VirtioDevice};
@@ -113,14 +113,15 @@ impl Entropy {
}

fn handle_one(&mut self) -> Result<u32, EntropyError> {
let global = EntropyMetricsPerDevice::alloc("global".to_string());
// If guest provided us with an empty buffer just return directly
if self.buffer.is_empty() {
return Ok(0);
}

let mut rand_bytes = vec![0; self.buffer.len() as usize];
rand::fill(&mut rand_bytes).inspect_err(|_| {
METRICS.host_rng_fails.inc();
global.host_rng_fails.inc();
})?;

// It is ok to unwrap here. We are writing `iovec.len()` bytes at offset 0.
@@ -129,12 +130,13 @@ impl Entropy {
}

fn process_entropy_queue(&mut self) {
let global = EntropyMetricsPerDevice::alloc("global".to_string());
let mut used_any = false;
while let Some(desc) = self.queues[RNG_QUEUE].pop() {
// This is safe since we checked in the event handler that the device is activated.
let mem = self.device_state.mem().unwrap();
let index = desc.index;
METRICS.entropy_event_count.inc();
global.entropy_event_count.inc();

// SAFETY: This descriptor chain points to a single `DescriptorChain` memory buffer,
// no other `IoVecBufferMut` object points to the same `DescriptorChain` at the same
@@ -151,33 +153,33 @@ impl Entropy {
// to handle once we do have budget.
if !self.rate_limit_request(u64::from(self.buffer.len())) {
debug!("entropy: throttling entropy queue");
METRICS.entropy_rate_limiter_throttled.inc();
global.entropy_rate_limiter_throttled.inc();
self.queues[RNG_QUEUE].undo_pop();
break;
}

self.handle_one().unwrap_or_else(|err| {
error!("entropy: {err}");
METRICS.entropy_event_fails.inc();
global.entropy_event_fails.inc();
0
})
}
Err(err) => {
error!("entropy: Could not parse descriptor chain: {err}");
METRICS.entropy_event_fails.inc();
global.entropy_event_fails.inc();
0
}
};

match self.queues[RNG_QUEUE].add_used(index, bytes) {
Ok(_) => {
used_any = true;
METRICS.entropy_bytes.add(bytes.into());
global.entropy_bytes.add(bytes.into());
}
Err(err) => {
error!("entropy: Could not add used descriptor to queue: {err}");
Self::rate_limit_replenish_request(&mut self.rate_limiter, bytes.into());
METRICS.entropy_event_fails.inc();
global.entropy_event_fails.inc();
// If we are not able to add a buffer to the used queue, something
// is probably seriously wrong, so just stop processing additional
// buffers
@@ -189,33 +191,34 @@ impl Entropy {
if used_any {
self.signal_used_queue().unwrap_or_else(|err| {
error!("entropy: {err:?}");
METRICS.entropy_event_fails.inc()
global.entropy_event_fails.inc()
});
}
}

pub(crate) fn process_entropy_queue_event(&mut self) {
let global = EntropyMetricsPerDevice::alloc("global".to_string());
if let Err(err) = self.queue_events[RNG_QUEUE].read() {
error!("Failed to read entropy queue event: {err}");
METRICS.entropy_event_fails.inc();
global.entropy_event_fails.inc();
} else if !self.rate_limiter.is_blocked() {
// We are not throttled, handle the entropy queue
self.process_entropy_queue();
} else {
METRICS.rate_limiter_event_count.inc();
global.rate_limiter_event_count.inc();
}
}

pub(crate) fn process_rate_limiter_event(&mut self) {
METRICS.rate_limiter_event_count.inc();
global.rate_limiter_event_count.inc();
match self.rate_limiter.event_handler() {
Ok(_) => {
// There might be enough budget now to process entropy requests.
self.process_entropy_queue();
}
Err(err) => {
error!("entropy: Failed to handle rate-limiter event: {err:?}");
METRICS.entropy_event_fails.inc();
global.entropy_event_fails.inc();
}
}
}
@@ -291,13 +294,15 @@ impl VirtioDevice for Entropy {
}

fn activate(&mut self, mem: GuestMemoryMmap) -> Result<(), ActivateError> {
let global = EntropyMetricsPerDevice::alloc("global".to_string());

for q in self.queues.iter_mut() {
q.initialize(&mem)
.map_err(ActivateError::QueueMemoryError)?;
}

self.activate_event.write(1).map_err(|_| {
METRICS.activate_fails.inc();
global.activate_fails.inc();
ActivateError::EventFd
})?;
self.device_state = DeviceState::Activated(mem);
@@ -454,6 +459,7 @@ mod tests {

#[test]
fn test_entropy_event() {
let global = EntropyMetricsPerDevice::alloc("global".to_string());
let mem = create_virtio_mem();
let mut th = VirtioTestHelper::<Entropy>::new(&mem, default_entropy());

@@ -462,29 +468,29 @@ mod tests {
// Add a read-only descriptor (this should fail)
th.add_desc_chain(RNG_QUEUE, 0, &[(0, 64, 0)]);

let entropy_event_fails = METRICS.entropy_event_fails.count();
let entropy_event_count = METRICS.entropy_event_count.count();
let entropy_bytes = METRICS.entropy_bytes.count();
let host_rng_fails = METRICS.host_rng_fails.count();
let entropy_event_fails = global.entropy_event_fails.count();
let entropy_event_count = global.entropy_event_count.count();
let entropy_bytes = global.entropy_bytes.count();
let host_rng_fails = global.host_rng_fails.count();
assert_eq!(th.emulate_for_msec(100).unwrap(), 1);
assert_eq!(METRICS.entropy_event_fails.count(), entropy_event_fails + 1);
assert_eq!(METRICS.entropy_event_count.count(), entropy_event_count + 1);
assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes);
assert_eq!(METRICS.host_rng_fails.count(), host_rng_fails);
assert_eq!(global.entropy_event_fails.count(), entropy_event_fails + 1);
assert_eq!(global.entropy_event_count.count(), entropy_event_count + 1);
assert_eq!(global.entropy_bytes.count(), entropy_bytes);
assert_eq!(global.host_rng_fails.count(), host_rng_fails);

// Add two good descriptors
th.add_desc_chain(RNG_QUEUE, 0, &[(1, 10, VIRTQ_DESC_F_WRITE)]);
th.add_desc_chain(RNG_QUEUE, 100, &[(2, 20, VIRTQ_DESC_F_WRITE)]);

let entropy_event_fails = METRICS.entropy_event_fails.count();
let entropy_event_count = METRICS.entropy_event_count.count();
let entropy_bytes = METRICS.entropy_bytes.count();
let host_rng_fails = METRICS.host_rng_fails.count();
let entropy_event_fails = global.entropy_event_fails.count();
let entropy_event_count = global.entropy_event_count.count();
let entropy_bytes = global.entropy_bytes.count();
let host_rng_fails = global.host_rng_fails.count();
assert_eq!(th.emulate_for_msec(100).unwrap(), 1);
assert_eq!(METRICS.entropy_event_fails.count(), entropy_event_fails);
assert_eq!(METRICS.entropy_event_count.count(), entropy_event_count + 2);
assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 30);
assert_eq!(METRICS.host_rng_fails.count(), host_rng_fails);
assert_eq!(global.entropy_event_fails.count(), entropy_event_fails);
assert_eq!(global.entropy_event_count.count(), entropy_event_count + 2);
assert_eq!(global.entropy_bytes.count(), entropy_bytes + 30);
assert_eq!(global.host_rng_fails.count(), host_rng_fails);

th.add_desc_chain(
RNG_QUEUE,
@@ -496,34 +502,36 @@ mod tests {
],
);

let entropy_event_fails = METRICS.entropy_event_fails.count();
let entropy_event_count = METRICS.entropy_event_count.count();
let entropy_bytes = METRICS.entropy_bytes.count();
let host_rng_fails = METRICS.host_rng_fails.count();
let entropy_event_fails = global.entropy_event_fails.count();
let entropy_event_count = global.entropy_event_count.count();
let entropy_bytes = global.entropy_bytes.count();
let host_rng_fails = global.host_rng_fails.count();
assert_eq!(th.emulate_for_msec(100).unwrap(), 1);
assert_eq!(METRICS.entropy_event_fails.count(), entropy_event_fails);
assert_eq!(METRICS.entropy_event_count.count(), entropy_event_count + 1);
assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 512);
assert_eq!(METRICS.host_rng_fails.count(), host_rng_fails);
assert_eq!(global.entropy_event_fails.count(), entropy_event_fails);
assert_eq!(global.entropy_event_count.count(), entropy_event_count + 1);
assert_eq!(global.entropy_bytes.count(), entropy_bytes + 512);
assert_eq!(global.host_rng_fails.count(), host_rng_fails);
}

#[test]
fn test_bad_rate_limiter_event() {
let global = EntropyMetricsPerDevice::alloc("global".to_string());
let mem = create_virtio_mem();
let mut th = VirtioTestHelper::<Entropy>::new(&mem, default_entropy());

th.activate_device(&mem);
let mut dev = th.device();

check_metric_after_block!(
&METRICS.entropy_event_fails,
&global.entropy_event_fails,
1,
dev.process_rate_limiter_event()
);
}

#[test]
fn test_bandwidth_rate_limiter() {
let global = EntropyMetricsPerDevice::alloc("global".to_string());
let mem = create_virtio_mem();
// Rate Limiter with 4000 bytes / sec allowance and no initial burst allowance
let device = Entropy::new(RateLimiter::new(4000, 0, 1000, 0, 0, 0).unwrap()).unwrap();
@@ -535,7 +543,7 @@ mod tests {
// buffer should be processed normally
th.add_desc_chain(RNG_QUEUE, 0, &[(0, 4000, VIRTQ_DESC_F_WRITE)]);
check_metric_after_block!(
METRICS.entropy_bytes,
global.entropy_bytes,
4000,
th.device().process_entropy_queue()
);
@@ -551,12 +559,12 @@ mod tests {
th.add_desc_chain(RNG_QUEUE, 0, &[(0, 4000, VIRTQ_DESC_F_WRITE)]);
th.add_desc_chain(RNG_QUEUE, 1, &[(1, 1000, VIRTQ_DESC_F_WRITE)]);
check_metric_after_block!(
METRICS.entropy_bytes,
global.entropy_bytes,
4000,
th.device().process_entropy_queue()
);
check_metric_after_block!(
METRICS.entropy_rate_limiter_throttled,
global.entropy_rate_limiter_throttled,
1,
th.device().process_entropy_queue()
);
@@ -565,12 +573,13 @@ mod tests {
// 250 msec should give enough time for replenishing 1000 bytes worth of tokens.
// Give it an extra 100 ms just to be sure the timer event reaches us from the kernel.
std::thread::sleep(Duration::from_millis(350));
check_metric_after_block!(METRICS.entropy_bytes, 1000, th.emulate_for_msec(100));
check_metric_after_block!(global.entropy_bytes, 1000, th.emulate_for_msec(100));
assert!(!th.device().rate_limiter().is_blocked());
}

#[test]
fn test_ops_rate_limiter() {
let global = EntropyMetricsPerDevice::alloc("global".to_string());
let mem = create_virtio_mem();
// Rate Limiter with unlimited bandwidth and allowance for 1 operation every 100 msec,
// (10 ops/sec), without initial burst.
@@ -583,7 +592,7 @@ mod tests {
// so this should succeed.
th.add_desc_chain(RNG_QUEUE, 0, &[(0, 4000, VIRTQ_DESC_F_WRITE)]);
check_metric_after_block!(
METRICS.entropy_bytes,
global.entropy_bytes,
4000,
th.device().process_entropy_queue()
);
@@ -593,30 +602,30 @@ mod tests {
std::thread::sleep(Duration::from_millis(1000));

// First one should succeed
let entropy_bytes = METRICS.entropy_bytes.count();
let entropy_bytes = global.entropy_bytes.count();
th.add_desc_chain(RNG_QUEUE, 0, &[(0, 64, VIRTQ_DESC_F_WRITE)]);
check_metric_after_block!(METRICS.entropy_bytes, 64, th.emulate_for_msec(100));
assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 64);
check_metric_after_block!(global.entropy_bytes, 64, th.emulate_for_msec(100));
assert_eq!(global.entropy_bytes.count(), entropy_bytes + 64);
// The rate limiter is not blocked yet.
assert!(!th.device().rate_limiter().is_blocked());
// But immediately asking another operation should block it because we have 1 op every 100
// msec.
th.add_desc_chain(RNG_QUEUE, 0, &[(0, 64, VIRTQ_DESC_F_WRITE)]);
check_metric_after_block!(
METRICS.entropy_rate_limiter_throttled,
global.entropy_rate_limiter_throttled,
1,
th.emulate_for_msec(50)
);
// Entropy bytes count should not have increased.
assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 64);
assert_eq!(global.entropy_bytes.count(), entropy_bytes + 64);
// After 100 msec (plus 50 msec for ensuring the event reaches us from the kernel), the
// timer of the rate limiter should fire saying that there's now more tokens available
check_metric_after_block!(
METRICS.rate_limiter_event_count,
global.rate_limiter_event_count,
1,
th.emulate_for_msec(150)
);
// The rate limiter event should have processed the pending buffer as well
assert_eq!(METRICS.entropy_bytes.count(), entropy_bytes + 128);
assert_eq!(global.entropy_bytes.count(), entropy_bytes + 128);
}
}
228 changes: 213 additions & 15 deletions src/vmm/src/devices/virtio/rng/metrics.rs
Original file line number Diff line number Diff line change
@@ -38,16 +38,63 @@ use serde::{Serialize, Serializer};

use crate::logger::SharedIncMetric;

/// Stores aggregated entropy metrics
pub(super) static METRICS: EntropyDeviceMetrics = EntropyDeviceMetrics::new();
use std::sync::{Arc, RwLock};
use std::collections::BTreeMap;

/// Called by METRICS.flush(), this function facilitates serialization of entropy device metrics.
/// This function facilitates aggregation and serialization of
/// per device vsock metrics. (Can also handle singular)
pub fn flush_metrics<S: Serializer>(serializer: S) -> Result<S::Ok, S::Error> {
let mut seq = serializer.serialize_map(Some(1))?;
seq.serialize_entry("entropy", &METRICS)?;
let entropy_metrics = METRICS.read().unwrap();
let metrics_len = entropy_metrics.metrics.len();
// +1 to accomodate aggregate net metrics
let mut seq = serializer.serialize_map(Some(1 + metrics_len))?;

let mut entropy_aggregated: EntropyDeviceMetrics = EntropyDeviceMetrics::default();

for (name, metrics) in entropy_metrics.metrics.iter() {
let devn = format!("entropy_{}", name);
// serialization will flush the metrics so aggregate before it.
let m: &EntropyDeviceMetrics = metrics;
entropy_aggregated.aggregate(m);
seq.serialize_entry(&devn, m)?;
}
seq.serialize_entry("entropy", &entropy_aggregated)?;
seq.end()
}

pub struct EntropyMetricsPerDevice {
pub metrics: BTreeMap<String, Arc<EntropyDeviceMetrics>>
}

impl EntropyMetricsPerDevice {
/// Allocate `NetDeviceMetrics` for net device having
/// id `iface_id`. Also, allocate only if it doesn't
/// exist to avoid overwriting previously allocated data.
/// lock is always initialized so it is safe the unwrap
/// the lock without a check.
pub fn alloc(iface_id: String) -> Arc<EntropyDeviceMetrics> {
Arc::clone(
METRICS
.write()
.unwrap()
.metrics
.entry(iface_id)
.or_insert_with(|| Arc::new(EntropyDeviceMetrics::default())),
)
}
}

static METRICS: RwLock<EntropyMetricsPerDevice> = RwLock::new(EntropyMetricsPerDevice {
metrics: {
let tree = BTreeMap::new();
tree.insert(
"global".to_string(),
Arc::new(EntropyDeviceMetrics::default()),
);
tree
},
});

#[derive(Debug, Serialize)]
pub(super) struct EntropyDeviceMetrics {
/// Number of device activation failures
@@ -86,15 +133,166 @@ pub mod tests {
use crate::logger::IncMetric;

#[test]
fn test_entropy_dev_metrics() {
let entropy_metrics: EntropyDeviceMetrics = EntropyDeviceMetrics::new();
let entropy_metrics_local: String = serde_json::to_string(&entropy_metrics).unwrap();
// the 1st serialize flushes the metrics and resets values to 0 so that
// we can compare the values with local metrics.
serde_json::to_string(&METRICS).unwrap();
let entropy_metrics_global: String = serde_json::to_string(&METRICS).unwrap();
assert_eq!(entropy_metrics_local, entropy_metrics_global);
entropy_metrics.entropy_event_count.inc();
assert_eq!(entropy_metrics.entropy_event_count.count(), 1);
fn test_rng_dev_metrics() {
drop(METRICS.read().unwrap());
drop(METRICS.write().unwrap());

for i in 0..5 {
let devn: String = format!("entropy{}", i);
NetMetricsPerDevice::alloc(devn.clone());
METRICS
.read()
.unwrap()
.metrics
.get(&devn)
.unwrap()
.activate_fails
.inc();
METRICS
.read()
.unwrap()
.metrics
.get(&devn)
.unwrap()
.entropy_bytes
.add(10);
METRICS
.read()
.unwrap()
.metrics
.get(&devn)
.unwrap()
.host_rng_fails
.add(5);
}

for i in 0..5 {
let devn: String = format!("entropy{}", i);
assert!(
METRICS
.read()
.unwrap()
.metrics
.get(&devn)
.unwrap()
.activate_fails
.count()
>= 1
);
assert!(
METRICS
.read()
.unwrap()
.metrics
.get(&devn)
.unwrap()
.entropy_bytes
.count()
>= 10
);
assert_eq!(
METRICS
.read()
.unwrap()
.metrics
.get(&devn)
.unwrap()
.host_rng_fails
.count(),
5
);
}
}

#[test]
fn test_single_rng_metrics() {
// Use eth0 so that we can check thread safety with the
// `test_net_dev_metrics` which also uses the same name.
let devn = "entropy0";

drop(METRICS.read().unwrap());
drop(METRICS.write().unwrap());

NetMetricsPerDevice::alloc(String::from(devn));
METRICS.read().unwrap().metrics.get(devn).unwrap();

METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.activate_fails
.inc();
assert!(
METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.activate_fails
.count()
> 0,
"{}",
METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.activate_fails
.count()
);
// we expect only 2 tests (this and test_max_net_dev_metrics)
// to update activate_fails count for eth0.
assert!(
METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.activate_fails
.count()
<= 2,
"{}",
METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.activate_fails
.count()
);

METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.activate_fails
.inc();
METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.entropy_bytes
.add(5);
assert!(
METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.entropy_bytes
.count()
>= 5
);
}
}
23 changes: 22 additions & 1 deletion src/vmm/src/devices/virtio/vsock/csm/connection.rs
Original file line number Diff line number Diff line change
@@ -95,6 +95,7 @@ use crate::devices::virtio::vsock::metrics::METRICS;
use crate::devices::virtio::vsock::packet::{VsockPacketHeader, VsockPacketRx, VsockPacketTx};
use crate::logger::IncMetric;
use crate::utils::wrap_usize_to_u32;
use crate::vmm_config::vsock::VsockSocketType;

/// Trait that vsock connection backends need to implement.
///
@@ -139,6 +140,9 @@ pub struct VsockConnection<S: VsockConnectionBackend> {
/// Instant when this connection should be scheduled for immediate termination, due to some
/// timeout condition having been fulfilled.
expiry: Option<Instant>,
/// Manages type of connection to determine whether to use S backend or buffer
socket_type: VsockSocketType,
seqpacket_buf: Option<Vec<u8>>,
}

impl<S> VsockChannel for VsockConnection<S>
@@ -509,6 +513,7 @@ where
local_port: u32,
peer_port: u32,
peer_buf_alloc: u32,
socket_type: VsockSocketType::Stream,
) -> Self {
Self {
local_cid,
@@ -525,6 +530,12 @@ where
last_fwd_cnt_to_peer: Wrapping(0),
pending_rx: PendingRxSet::from(PendingRx::Response),
expiry: None,
socket_type,
seqpacket_buf: if socket_type == VsockSocketType::SeqPacket {
Some(Vec::new())
} else {
None
},
}
}

@@ -535,6 +546,7 @@ where
peer_cid: u64,
local_port: u32,
peer_port: u32,
socket_type: VsockSocketType,
) -> Self {
Self {
local_cid,
@@ -551,6 +563,12 @@ where
last_fwd_cnt_to_peer: Wrapping(0),
pending_rx: PendingRxSet::from(PendingRx::Request),
expiry: None,
socket_type,
seqpacket_buf: if socket_type == VsockSocketType::SeqPacket {
Some(Vec::new())
} else {
None
},
}
}

@@ -874,6 +892,7 @@ mod tests {
handler_ctx.device.queues[TXQ_INDEX].pop().unwrap(),
)
.unwrap();
let socket_type = VsockSocketType::Stream;
let conn = match conn_state {
ConnState::PeerInit => VsockConnection::<TestStream>::new_peer_init(
stream,
@@ -882,9 +901,10 @@ mod tests {
LOCAL_PORT,
PEER_PORT,
PEER_BUF_ALLOC,
socket_type,
),
ConnState::LocalInit => VsockConnection::<TestStream>::new_local_init(
stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT,
stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT, socket_type,
),
ConnState::Established => {
let mut conn = VsockConnection::<TestStream>::new_peer_init(
@@ -894,6 +914,7 @@ mod tests {
LOCAL_PORT,
PEER_PORT,
PEER_BUF_ALLOC,
socket_type,
);
assert!(conn.has_pending_rx());
conn.recv_pkt(&mut rx_pkt).unwrap();
16 changes: 10 additions & 6 deletions src/vmm/src/devices/virtio/vsock/device.rs
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ use crate::devices::virtio::device::{DeviceState, IrqTrigger, IrqType, VirtioDev
use crate::devices::virtio::generated::virtio_config::{VIRTIO_F_IN_ORDER, VIRTIO_F_VERSION_1};
use crate::devices::virtio::queue::Queue as VirtQueue;
use crate::devices::virtio::vsock::VsockError;
use crate::devices::virtio::vsock::metrics::METRICS;
use crate::devices::virtio::vsock::metrics::VsockMetricsPerDevice;
use crate::logger::IncMetric;
use crate::utils::byte_order;
use crate::vstate::memory::{Bytes, GuestMemoryMmap};
@@ -241,11 +241,12 @@ where
// connections and the guest_cid configuration field is fetched again. Existing listen sockets
// remain but their CID is updated to reflect the current guest_cid.
pub fn send_transport_reset_event(&mut self) -> Result<(), DeviceError> {
let global = VsockMetricsPerDevice::alloc("global".to_string());
// This is safe since we checked in the caller function that the device is activated.
let mem = self.device_state.mem().unwrap();

let head = self.queues[EVQ_INDEX].pop().ok_or_else(|| {
METRICS.ev_queue_event_fails.inc();
global.ev_queue_event_fails.inc();
DeviceError::VsockError(VsockError::EmptyQueue)
})?;

@@ -301,6 +302,7 @@ where
}

fn read_config(&self, offset: u64, data: &mut [u8]) {
let global = VsockMetricsPerDevice::alloc("global".to_string());
match offset {
0 if data.len() == 8 => byte_order::write_le_u64(data, self.cid()),
0 if data.len() == 4 => {
@@ -310,7 +312,7 @@ where
byte_order::write_le_u32(data, ((self.cid() >> 32) & 0xffff_ffff) as u32)
}
_ => {
METRICS.cfg_fails.inc();
global.cfg_fails.inc();
warn!(
"vsock: virtio-vsock received invalid read request of {} bytes at offset {}",
data.len(),
@@ -321,7 +323,8 @@ where
}

fn write_config(&mut self, offset: u64, data: &[u8]) {
METRICS.cfg_fails.inc();
let global = VsockMetricsPerDevice::alloc("global".to_string());
global.cfg_fails.inc();
warn!(
"vsock: guest driver attempted to write device config (offset={:#x}, len={:#x})",
offset,
@@ -330,21 +333,22 @@ where
}

fn activate(&mut self, mem: GuestMemoryMmap) -> Result<(), ActivateError> {
let global = VsockMetricsPerDevice::alloc("global".to_string());
for q in self.queues.iter_mut() {
q.initialize(&mem)
.map_err(ActivateError::QueueMemoryError)?;
}

if self.queues.len() != defs::VSOCK_NUM_QUEUES {
METRICS.activate_fails.inc();
global.activate_fails.inc();
return Err(ActivateError::QueueMismatch {
expected: defs::VSOCK_NUM_QUEUES,
got: self.queues.len(),
});
}

if self.activate_evt.write(1).is_err() {
METRICS.activate_fails.inc();
global.activate_fails.inc();
return Err(ActivateError::EventFd);
}

21 changes: 12 additions & 9 deletions src/vmm/src/devices/virtio/vsock/event_handler.rs
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ use vmm_sys_util::epoll::EventSet;
use super::VsockBackend;
use super::device::{EVQ_INDEX, RXQ_INDEX, TXQ_INDEX, Vsock};
use crate::devices::virtio::device::VirtioDevice;
use crate::devices::virtio::vsock::metrics::METRICS;
use crate::devices::virtio::vsock::metrics::VsockMetricsPerDevice;
use crate::logger::IncMetric;

impl<B> Vsock<B>
@@ -47,37 +47,39 @@ where
const PROCESS_NOTIFY_BACKEND: u32 = 4;

pub fn handle_rxq_event(&mut self, evset: EventSet) -> bool {
let global = VsockMetricsPerDevice::alloc("global".to_string());
if evset != EventSet::IN {
warn!("vsock: rxq unexpected event {:?}", evset);
METRICS.rx_queue_event_fails.inc();
global.rx_queue_event_fails.inc();
return false;
}

let mut raise_irq = false;
if let Err(err) = self.queue_events[RXQ_INDEX].read() {
error!("Failed to get vsock rx queue event: {:?}", err);
METRICS.rx_queue_event_fails.inc();
global.rx_queue_event_fails.inc();
} else if self.backend.has_pending_rx() {
raise_irq |= self.process_rx();
METRICS.rx_queue_event_count.inc();
global.rx_queue_event_count.inc();
}
raise_irq
}

pub fn handle_txq_event(&mut self, evset: EventSet) -> bool {
let global = VsockMetricsPerDevice::alloc("global".to_string());
if evset != EventSet::IN {
warn!("vsock: txq unexpected event {:?}", evset);
METRICS.tx_queue_event_fails.inc();
global.tx_queue_event_fails.inc();
return false;
}

let mut raise_irq = false;
if let Err(err) = self.queue_events[TXQ_INDEX].read() {
error!("Failed to get vsock tx queue event: {:?}", err);
METRICS.tx_queue_event_fails.inc();
global.tx_queue_event_fails.inc();
} else {
raise_irq |= self.process_tx();
METRICS.tx_queue_event_count.inc();
global.tx_queue_event_count.inc();
// The backend may have queued up responses to the packets we sent during
// TX queue processing. If that happened, we need to fetch those responses
// and place them into RX buffers.
@@ -89,15 +91,16 @@ where
}

pub fn handle_evq_event(&mut self, evset: EventSet) -> bool {
let global = VsockMetricsPerDevice::alloc("global".to_string());
if evset != EventSet::IN {
warn!("vsock: evq unexpected event {:?}", evset);
METRICS.ev_queue_event_fails.inc();
global.ev_queue_event_fails.inc();
return false;
}

if let Err(err) = self.queue_events[EVQ_INDEX].read() {
error!("Failed to consume vsock evq event: {:?}", err);
METRICS.ev_queue_event_fails.inc();
global.ev_queue_event_fails.inc();
}
false
}
238 changes: 225 additions & 13 deletions src/vmm/src/devices/virtio/vsock/metrics.rs
Original file line number Diff line number Diff line change
@@ -41,16 +41,66 @@ use serde::{Serialize, Serializer};

use crate::logger::SharedIncMetric;

use std::sync::{Arc, RwLock};
use std::collections::BTreeMap;

/// Stores aggregate metrics of all Vsock connections/actions
pub(super) static METRICS: VsockDeviceMetrics = VsockDeviceMetrics::new();
// pub(super) static METRICS: VsockDeviceMetrics = VsockDeviceMetrics::new();

/// Called by METRICS.flush(), this function facilitates serialization of vsock device metrics.
/// This function facilitates aggregation and serialization of
/// per device vsock metrics. (Can also handle singular)
pub fn flush_metrics<S: Serializer>(serializer: S) -> Result<S::Ok, S::Error> {
let mut seq = serializer.serialize_map(Some(1))?;
seq.serialize_entry("vsock", &METRICS)?;
let vsock_metrics = METRICS.read().unwrap();
let metrics_len = vsock_metrics.metrics.len();
// +1 to accomodate aggregate net metrics
let mut seq = serializer.serialize_map(Some(1 + metrics_len))?;

let mut vsock_aggregated: VsockDeviceMetrics = VsockDeviceMetrics::default();

for (name, metrics) in vsock_metrics.metrics.iter() {
let devn = format!("vsock_{}", name);
// serialization will flush the metrics so aggregate before it.
let m: &VsockDeviceMetrics = metrics;
vsock_aggregated.aggregate(m);
seq.serialize_entry(&devn, m)?;
}
seq.serialize_entry("vsock", &vsock_aggregated)?;
seq.end()
}

pub struct VsockMetricsPerDevice {
pub metrics: BTreeMap<String, Arc<VsockDeviceMetrics>>
}

impl VsockMetricsPerDevice {
/// Allocate `NetDeviceMetrics` for net device having
/// id `iface_id`. Also, allocate only if it doesn't
/// exist to avoid overwriting previously allocated data.
/// lock is always initialized so it is safe the unwrap
/// the lock without a check.
pub fn alloc(iface_id: String) -> Arc<VsockDeviceMetrics> {
Arc::clone(
METRICS
.write()
.unwrap()
.metrics
.entry(iface_id)
.or_insert_with(|| Arc::new(VsockDeviceMetrics::default())),
)
}
}

static METRICS: RwLock<VsockMetricsPerDevice> = RwLock::new(VsockMetricsPerDevice {
metrics: {
let tree = BTreeMap::new();
tree.insert(
"global".to_string(),
Arc::new(VsockDeviceMetrics::default()),
);
tree
},
});

/// Vsock-related metrics.
#[derive(Debug, Serialize)]
pub(super) struct VsockDeviceMetrics {
@@ -130,16 +180,178 @@ pub mod tests {
use super::*;
use crate::logger::IncMetric;

// Simple test to test ability to handle different devices based on some id
// Mimics the behavior and test of per-device structure in network devices.
#[test]
fn test_vsock_dev_metrics() {
let vsock_metrics: VsockDeviceMetrics = VsockDeviceMetrics::new();
let vsock_metrics_local: String = serde_json::to_string(&vsock_metrics).unwrap();
// the 1st serialize flushes the metrics and resets values to 0 so that
// we can compare the values with local metrics.
serde_json::to_string(&METRICS).unwrap();
let vsock_metrics_global: String = serde_json::to_string(&METRICS).unwrap();
assert_eq!(vsock_metrics_local, vsock_metrics_global);
vsock_metrics.conns_added.inc();
assert_eq!(vsock_metrics.conns_added.count(), 1);
drop(METRICS.read().unwrap());
drop(METRICS.write().unwrap());

for i in 0..3 {
let devn: String = format!("vsock{}", i);
VsockMetricsPerDevice::alloc(devn.clone());
METRICS
.read()
.unwrap()
.metrics
.get(&devn)
.unwrap()
.conns_added
.inc();
}
METRICS
.read()
.unwrap()
.metrics
.get("vsock1")
.unwrap()
.conns_added
.add(5);
METRICS
.read()
.unwrap()
.metrics
.get("vsock2")
.unwrap()
.activate_fails
.inc();

let json_output = serde_json::to_string(&*METRICS.read().unwrap()).unwrap();

// Optional: print JSON to visually verify structure
println!("{}", json_output);

let parsed: serde_json::Value = serde_json::from_str(&json_output).unwrap();
let a_count = parsed["vsock_vsock0"]["conns_added"]["count"].as_u64().unwrap();
let b_count = parsed["vsock_vsock1"]["conns_added"]["count"].as_u64().unwrap();
let c_count = parsed["vsock_vsock2"]["conns_added"]["count"].as_u64().unwrap();
let a_count_2 = parsed["vsock_vsock0"]["activate_fails"]["count"].as_u64().unwrap();
let c_count_2 = parsed["vsock_vsock2"]["activate_fails"]["count"].as_u64().unwrap();
let aggregated = parsed["vsock"]["conns_added"]["count"].as_u64().unwrap();

assert_eq!(a_count, 1);
assert_eq!(b_count, 6);
assert_eq!(c_count, 1);
assert_eq!(a_count_2, 0);
assert_eq!(c_count_2, 1);
assert_eq!(aggregated, 8);

drop(METRICS.read().unwrap());
assert_eq!(METRICS
.read()
.unwrap()
.metrics
.get("vsock0")
.unwrap()
.conns_added
.count(), 0);
assert_eq!(METRICS
.read()
.unwrap()
.metrics
.get("vsock1")
.unwrap()
.conns_added
.count(), 0);

METRICS
.read()
.unwrap()
.metrics
.get("vsock0")
.unwrap()
.activate_fails
.inc();

METRICS
.read()
.unwrap()
.metrics
.get("vsock0")
.unwrap()
.rx_bytes_count
.inc();

}

// Device meant to test capability of retrieving and maintaining
// a default vsock for the tree, the default represents the global value.
// Also copies thread safety test from net devices.
#[test]
fn test_vsock_default() {
// Use vsock0 so that we can check thread safety with other tests.
let devn = "vsock0";

// Drop any existing read/write lock to avoid deadlocks or stale locks.
drop(METRICS.read().unwrap());
drop(METRICS.write().unwrap());

// Allocate metrics for the device.
VsockMetricsPerDevice::alloc(String::from(devn));
assert!(METRICS.read().unwrap().metrics.get(devn).is_some());

// Increment a field (e.g. activate_fails) to ensure it's being tracked.
METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.activate_fails
.inc();

let count = METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.activate_fails
.count();
assert!(
count > 0,
"Expected activate_fails count > 0 but got {}",
count
);

// Ensure only up to 2 tests increment this (if sharing across tests).
assert!(
count <= 2,
"Expected activate_fails count <= 2 but got {}",
count
);

// Add more metric changes and assert correctness.
METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.activate_fails
.inc();

METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.rx_bytes_count
.add(5);

let rx_count = METRICS
.read()
.unwrap()
.metrics
.get(devn)
.unwrap()
.rx_bytes_count
.count();
assert!(
rx_count >= 5,
"Expected rx_bytes_count >= 5 but got {}",
rx_count
);
}
}
8 changes: 6 additions & 2 deletions src/vmm/src/devices/virtio/vsock/unix/muxer.rs
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ use super::{MuxerConnection, VsockUnixBackendError, defs};
use crate::devices::virtio::vsock::metrics::METRICS;
use crate::devices::virtio::vsock::packet::{VsockPacketRx, VsockPacketTx};
use crate::logger::IncMetric;
use crate::vmm_config::vsock::VsockSocketType;

/// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map,
/// keyed by a `ConnMapKey` object.
@@ -108,6 +109,8 @@ pub struct VsockMuxer {
local_port_set: HashSet<u32>,
/// The last used host-side port.
local_port_last: u32,
/// Type of Socket (Either Stream or SeqPacket)
socket_type: VsockSocketType,
}

impl VsockChannel for VsockMuxer {
@@ -303,7 +306,7 @@ impl VsockBackend for VsockMuxer {}

impl VsockMuxer {
/// Muxer constructor.
pub fn new(cid: u64, host_sock_path: String) -> Result<Self, VsockUnixBackendError> {
pub fn new(cid: u64, host_sock_path: String, socket_type: VsockSocketType) -> Result<Self, VsockUnixBackendError> {
// Open/bind on the host Unix socket, so we can accept host-initiated
// connections.
let host_sock = UnixListener::bind(&host_sock_path)
@@ -321,6 +324,7 @@ impl VsockMuxer {
killq: MuxerKillQ::new(),
local_port_last: (1u32 << 30) - 1,
local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS),
socket_type,
};

// Listen on the host initiated socket, for incoming connections.
@@ -849,7 +853,7 @@ mod tests {
)
.unwrap();

let muxer = VsockMuxer::new(PEER_CID, get_file(name)).unwrap();
let muxer = VsockMuxer::new(PEER_CID, get_file(name), VsockSocketType::Stream).unwrap();
Self {
_vsock_test_ctx: vsock_test_ctx,
rx_pkt,
21 changes: 19 additions & 2 deletions src/vmm/src/vmm_config/vsock.rs
Original file line number Diff line number Diff line change
@@ -32,6 +32,20 @@ pub struct VsockDeviceConfig {
pub guest_cid: u32,
/// Path to local unix socket.
pub uds_path: String,
// Type of socket being used
#[serde(default = "default_socket_type")]
pub socket_type : VsockSocketType,
}

fn default_socket_type() -> VsockSocketType {
VsockSocketType::Stream
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum VsockSocketType {
Stream,
SeqPacket,
}

#[derive(Debug)]
@@ -47,6 +61,7 @@ impl From<&VsockAndUnixPath> for VsockDeviceConfig {
vsock_id: None,
guest_cid: u32::try_from(vsock_lock.cid()).unwrap(),
uds_path: vsock.uds_path.clone(),
socket_type: vsock_lock.socket_type()
}
}
}
@@ -99,7 +114,7 @@ impl VsockBuilder {
pub fn create_unixsock_vsock(
cfg: VsockDeviceConfig,
) -> Result<Vsock<VsockUnixBackend>, VsockConfigError> {
let backend = VsockUnixBackend::new(u64::from(cfg.guest_cid), cfg.uds_path)?;
let backend = VsockUnixBackend::new(u64::from(cfg.guest_cid), cfg.uds_path, cfg.socket_type)?;

Vsock::new(u64::from(cfg.guest_cid), backend).map_err(VsockConfigError::CreateVsockDevice)
}
@@ -122,6 +137,7 @@ pub(crate) mod tests {
vsock_id: None,
guest_cid: 3,
uds_path: tmp_sock_file.as_path().to_str().unwrap().to_string(),
socket_type: VsockSocketType::Stream,
}
}

@@ -168,10 +184,11 @@ pub(crate) mod tests {
fn test_set_device() {
let mut vsock_builder = VsockBuilder::new();
let mut tmp_sock_file = TempFile::new().unwrap();
let socket_type = VsockSocketType::Stream;
tmp_sock_file.remove().unwrap();
let vsock = Vsock::new(
0,
VsockUnixBackend::new(1, tmp_sock_file.as_path().to_str().unwrap().to_string())
VsockUnixBackend::new(1, tmp_sock_file.as_path().to_str().unwrap().to_string(), socket_type)
.unwrap(),
)
.unwrap();