Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ impl InstanceSpecifier {
specifier: service_name.to_string(),
})
} else {
Err(Error::ServiceError(ServiceFailedReason::InstanceSpecifierInvalid))
Err(Error::ServiceError(
ServiceFailedReason::InstanceSpecifierInvalid,
))
}
}
}
Expand Down Expand Up @@ -705,9 +707,9 @@ impl<S> SampleContainer<S> {
/// # Errors
/// Returns 'Error::AllocateError' if container is already full.
pub fn push_back(&mut self, new: S) -> Result<()> {
self.inner.push_back(new).map_err(|_| {
Error::AllocateError(AllocationFailureReason::OutOfMemory)
})?;
self.inner
.push_back(new)
.map_err(|_| Error::AllocateError(AllocationFailureReason::OutOfMemory))?;
Ok(())
}

Expand Down Expand Up @@ -736,6 +738,22 @@ impl<S> SampleContainer<S> {
/// Represents a live subscription to an event source with methods for both
/// non-blocking polling and asynchronous waiting for new events.
///
/// # Sample Selection Algorithm
///
/// The internal algorithm for sample delivery works as follows:
/// - Searches for the slot with the largest timestamp in the range `(last_reference_time, upper_limit)`
/// - On each successive call within the same receive operation, `upper_limit` is narrowed to the
/// previously found timestamp, ensuring the next search returns the next-newest sample
/// - After the call completes, `last_reference_time` is advanced to the newest delivered timestamp,
/// preventing those samples from being returned again
///
/// **Example**: If the producer sent samples with timestamps T1 < T2 < T3 before the first receive,
/// and `max_samples = 2`:
/// - 1st iteration: search `(0, MAX)` → finds T3
/// - 2nd iteration: search `(0, T3)` → finds T2
/// - **Result**: Delivers 2 newest samples (T3 and T2). Sample T1 is permanently skipped because
/// `last_reference_time` advances to T3 after this receive call.
///
/// # Type Parameters
/// * `T` - The relocatable event data type
/// * `R` - The runtime managing the subscription
Expand Down
96 changes: 41 additions & 55 deletions score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! that can subscribe to events and receive data samples.
//! It uses FFI to interact with the underlying C++ implementation.
//! Main components include `LolaConsumerInfo`, `SubscribableImpl`,
//! `SubscriberImpl`, `SampleConsumerDiscovery`, and `SampleConsumerBuilder`.
//! `SubscriberImpl`, `LolaConsumerDiscovery`, and `LolaConsumerBuilder`.
//! These components work together to enable consumers to discover services,
//! subscribe to events, and receive data samples in a type-safe manner.
//! The implementation ensures proper memory management and resource handling
Expand All @@ -35,6 +35,7 @@ use core::marker::PhantomData;
use core::mem::ManuallyDrop;
use core::ops::{Deref, DerefMut};
use core::panic;
use core::ptr::NonNull;
use futures::task::{AtomicWaker, Context, Poll};
use std::cmp::Ordering;
use std::pin::Pin;
Expand Down Expand Up @@ -197,7 +198,7 @@ impl std::fmt::Debug for ProxyInstanceManager {
/// it must not expose any mutable access to the proxy instance
/// Or must not provide any method to access the proxy instance directly
pub struct NativeProxyBase {
proxy: *mut ProxyBase, // Stores the proxy instance
proxy: NonNull<ProxyBase>, // Stores the proxy instance
}

//SAFETY: NativeProxyBase is safe to share between threads because:
Expand All @@ -213,7 +214,7 @@ impl Drop for NativeProxyBase {
//SAFETY: It is safe to destroy the proxy because it was created by FFI
// and proxy pointer received at the time of create_proxy called
unsafe {
bridge_ffi_rs::destroy_proxy(self.proxy);
bridge_ffi_rs::destroy_proxy(self.proxy.as_ptr());
}
}
}
Expand All @@ -228,12 +229,10 @@ impl NativeProxyBase {
pub fn new(interface_id: &str, handle: &HandleType) -> Result<Self> {
//SAFETY: It is safe to create the proxy because interface_id and handle are valid
//Handle received at the time of get_avaible_instances called with correct interface_id
let proxy = unsafe { bridge_ffi_rs::create_proxy(interface_id, handle) };
if proxy.is_null() {
return Err(Error::ConsumerError(
ConsumerFailedReason::ProxyCreationFailed,
));
}
let raw_proxy_ptr = unsafe { bridge_ffi_rs::create_proxy(interface_id, handle) };
let proxy = std::ptr::NonNull::new(raw_proxy_ptr).ok_or(Error::ConsumerError(
ConsumerFailedReason::ProxyCreationFailed,
))?;
Ok(Self { proxy })
}
}
Expand All @@ -245,7 +244,7 @@ impl NativeProxyBase {
/// And the proxy event lifetime is managed safely through Drop of the parent proxy instance
/// user can get the raw pointer using 'get_proxy_event_base' method
pub struct NativeProxyEventBase {
proxy_event_ptr: *mut ProxyEventBase,
proxy_event_ptr: NonNull<ProxyEventBase>,
}

//SAFETY: NativeProxyEventBase is to send between threads because:
Expand All @@ -256,26 +255,21 @@ pub struct NativeProxyEventBase {
unsafe impl Send for NativeProxyEventBase {}

impl NativeProxyEventBase {
pub fn new(proxy: *mut ProxyBase, interface_id: &str, identifier: &str) -> Result<Self> {
pub fn new(proxy: &NonNull<ProxyBase>, interface_id: &str, identifier: &str) -> Result<Self> {
//SAFETY: It is safe as we are passing valid proxy pointer and interface id to get event
// proxy pointer is created during consumer creation
let proxy_event_ptr =
unsafe { bridge_ffi_rs::get_event_from_proxy(proxy, interface_id, identifier) };
if proxy_event_ptr.is_null() {
return Err(Error::EventError(EventFailedReason::EventCreationFailed));
}
let raw_event_ptr =
unsafe { bridge_ffi_rs::get_event_from_proxy(proxy.as_ptr(), interface_id, identifier) };
let proxy_event_ptr = std::ptr::NonNull::new(raw_event_ptr)
.ok_or(Error::EventError(EventFailedReason::EventCreationFailed))?;
Ok(Self { proxy_event_ptr })
}

/// Provides access to the underlying proxy event base.
pub fn get_proxy_event_base(&self) -> &ProxyEventBase {
// SAFETY: proxy_event_ptr is valid for the entire lifetime of NativeProxyEventBase
// and was created by FFI during get_event_from_proxy()
unsafe {
self.proxy_event_ptr
.as_ref()
.expect("Event pointer is null")
}
unsafe { self.proxy_event_ptr.as_ref() }
}
}

Expand Down Expand Up @@ -311,7 +305,7 @@ impl<T: CommData + Debug> Subscriber<T, LolaRuntimeImpl> for SubscribableImpl<T>
fn subscribe(self, max_num_samples: usize) -> Result<Self::Subscription> {
let instance_info = self.instance_info.clone();
let event_instance = NativeProxyEventBase::new(
self.proxy_instance.0.proxy,
&self.proxy_instance.0.proxy,
self.instance_info.interface_id,
self.identifier,
)?;
Expand All @@ -335,7 +329,7 @@ impl<T: CommData + Debug> Subscriber<T, LolaRuntimeImpl> for SubscribableImpl<T>
max_num_samples,
instance_info,
waker_storage: Arc::default(),
async_init_status: AtomicBool::new(false),
async_init_status: std::sync::Once::new(),
_proxy: self.proxy_instance.clone(),
_phantom: PhantomData,
})
Expand Down Expand Up @@ -441,7 +435,7 @@ where
max_num_samples: usize,
instance_info: LolaConsumerInfo,
waker_storage: Arc<AtomicWaker>,
async_init_status: AtomicBool,
async_init_status: std::sync::Once,
_proxy: ProxyInstanceManager,
_phantom: PhantomData<T>,
}
Expand All @@ -455,9 +449,8 @@ impl<T: CommData + Debug> Drop for SubscriberImpl<T> {
// and then unsubscribe from the event to clean up resources on the C++ side.
let mut guard = self.event.get_proxy_event();
unsafe {
if self
.async_init_status
.load(std::sync::atomic::Ordering::Relaxed)
if self.async_init_status.is_completed()
// Check if the async receive callback was initialized
{
bridge_ffi_rs::clear_event_receive_handler(guard.deref_mut(), T::ID);
}
Expand Down Expand Up @@ -564,18 +557,11 @@ where
// on the same subscriber instance.
let mut event_guard = self.event.get_proxy_event();
// Initialize the async receive callback only once when the first receive call is made
if !self
.async_init_status
.load(std::sync::atomic::Ordering::Relaxed)
{
if let Err(_e) = self.init_async_receive(&mut event_guard) {
return Err(Error::ReceiveError(
ReceiveFailedReason::InitializationFailed,
));
}
self.async_init_status
.store(true, std::sync::atomic::Ordering::Relaxed);
}
// We are using std::sync::Once to ensure that the callback is set only once.
self.async_init_status.call_once(|| {
Comment thread
bharatGoswami8 marked this conversation as resolved.
self.init_async_receive(&mut event_guard)
.expect("Failed to initialize async receive callback");
});
ReceiveFuture {
event_guard: Some(event_guard),
waker_storage: Arc::clone(&self.waker_storage),
Expand Down Expand Up @@ -687,12 +673,12 @@ impl std::fmt::Debug for DiscoveryStateData {
}
}

pub struct SampleConsumerDiscovery<I> {
pub struct LolaConsumerDiscovery<I> {
pub instance_specifier: InstanceSpecifier,
pub _interface: PhantomData<I>,
}

impl<I: Interface> SampleConsumerDiscovery<I> {
impl<I: Interface> LolaConsumerDiscovery<I> {
pub fn new(_runtime: &LolaRuntimeImpl, instance_specifier: InstanceSpecifier) -> Self {
Self {
instance_specifier,
Expand All @@ -701,12 +687,12 @@ impl<I: Interface> SampleConsumerDiscovery<I> {
}
}

impl<I: Interface + Send> ServiceDiscovery<I, LolaRuntimeImpl> for SampleConsumerDiscovery<I>
impl<I: Interface + Send> ServiceDiscovery<I, LolaRuntimeImpl> for LolaConsumerDiscovery<I>
where
SampleConsumerBuilder<I>: ConsumerBuilder<I, LolaRuntimeImpl>,
LolaConsumerBuilder<I>: ConsumerBuilder<I, LolaRuntimeImpl>,
{
type ConsumerBuilder = SampleConsumerBuilder<I>;
type ServiceEnumerator = Vec<SampleConsumerBuilder<I>>;
type ConsumerBuilder = LolaConsumerBuilder<I>;
type ServiceEnumerator = Vec<LolaConsumerBuilder<I>>;

fn get_available_instances(&self) -> Result<Self::ServiceEnumerator> {
//If ANY Support is added in Lola, then we need to return all available instances
Expand All @@ -726,7 +712,7 @@ where
handle_index,
interface_id: I::INTERFACE_ID,
};
SampleConsumerBuilder {
LolaConsumerBuilder {
instance_info,
_interface: PhantomData,
}
Expand Down Expand Up @@ -848,7 +834,7 @@ impl<I: Interface> Drop for ServiceDiscoveryFuture<I> {
}

impl<I: Interface> Future for ServiceDiscoveryFuture<I> {
type Output = Result<Vec<SampleConsumerBuilder<I>>>;
type Output = Result<Vec<LolaConsumerBuilder<I>>>;

fn poll(
self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -881,7 +867,7 @@ impl<I: Interface> Future for ServiceDiscoveryFuture<I> {
handle_index,
interface_id: I::INTERFACE_ID,
};
SampleConsumerBuilder {
LolaConsumerBuilder {
instance_info,
_interface: PhantomData,
}
Expand All @@ -895,20 +881,20 @@ impl<I: Interface> Future for ServiceDiscoveryFuture<I> {
}
}

impl<I: Interface> ConsumerBuilder<I, LolaRuntimeImpl> for SampleConsumerBuilder<I> {}
impl<I: Interface> ConsumerBuilder<I, LolaRuntimeImpl> for LolaConsumerBuilder<I> {}

impl<I: Interface> Builder<I::Consumer<LolaRuntimeImpl>> for SampleConsumerBuilder<I> {
impl<I: Interface> Builder<I::Consumer<LolaRuntimeImpl>> for LolaConsumerBuilder<I> {
fn build(self) -> Result<I::Consumer<LolaRuntimeImpl>> {
Ok(Consumer::new(self.instance_info))
}
}

pub struct SampleConsumerBuilder<I: Interface> {
pub struct LolaConsumerBuilder<I: Interface> {
pub instance_info: LolaConsumerInfo,
pub _interface: PhantomData<I>,
}

impl<I: Interface> ConsumerDescriptor<LolaRuntimeImpl> for SampleConsumerBuilder<I> {
impl<I: Interface> ConsumerDescriptor<LolaRuntimeImpl> for LolaConsumerBuilder<I> {
fn get_instance_identifier(&self) -> &InstanceSpecifier {
//if InstanceSpecifier::ANY support enable by lola
//then this API should get InstanceSpecifier from FFI Call
Expand Down Expand Up @@ -1107,11 +1093,11 @@ mod test {

#[test]
fn test_sample_consumer_discovery_creation() {
// Test that SampleConsumerDiscovery can be created with valid interface
// Test that LolaConsumerDiscovery can be created with valid interface
let instance_specifier = com_api_concept::InstanceSpecifier::new("/test/vehicle")
.expect("Failed to create instance specifier");

let _discovery = super::SampleConsumerDiscovery::<TestVehicleInterface>::new(
let _discovery = super::LolaConsumerDiscovery::<TestVehicleInterface>::new(
&super::super::LolaRuntimeImpl {},
instance_specifier,
);
Expand All @@ -1125,7 +1111,7 @@ mod test {
let instance_specifier = com_api_concept::InstanceSpecifier::new("/test/vehicle")
.expect("Failed to create instance specifier");

let discovery = super::SampleConsumerDiscovery::<TestVehicleInterface>::new(
let discovery = super::LolaConsumerDiscovery::<TestVehicleInterface>::new(
&super::super::LolaRuntimeImpl {},
instance_specifier,
);
Expand Down
4 changes: 2 additions & 2 deletions score/mw/com/impl/rust/com-api/com-api-runtime-lola/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ mod consumer;
mod producer;
mod runtime;

pub use consumer::{LolaConsumerInfo, Sample, SampleConsumerDiscovery, SubscribableImpl};
pub use consumer::{LolaConsumerInfo, Sample, LolaConsumerDiscovery, SubscribableImpl};
pub use producer::{
LolaProviderInfo, Publisher, SampleMaybeUninit, SampleMut, SampleProducerBuilder,
LolaProviderInfo, Publisher, SampleMaybeUninit, SampleMut, LolaProducerBuilder,
};
pub use runtime::{LolaRuntimeImpl, RuntimeBuilderImpl};

Expand Down
Loading
Loading