Skip to content

Commit

Permalink
[eclipse-uprotocol#122] Add abstract Communication Layer API traits
Browse files Browse the repository at this point in the history
Added traits for publishing messages, sending notifications and
invoking remote service operations. These traits represent the
uProtocol Communication Layer API that uEntities can use to
interact with each other using uProtocol.
  • Loading branch information
sophokles73 committed Jun 18, 2024
1 parent 3a50104 commit 5469eef
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 537 deletions.
53 changes: 53 additions & 0 deletions src/communication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{error::Error, fmt::Display};

pub use notification::{NotificationError, NotificationListener, Notifier};
pub use pubsub::{PubSubError, Publisher, Subscriber};
pub use rpc::{RpcClient, RpcServer, ServiceInvocationError};

mod notification;
mod pubsub;
mod rpc;

/// An error indicating a problem with registering or unregistering a message listener.
#[derive(Debug)]
pub enum RegistrationError {
/// Indicates that the maximum number of listeners supported by the Transport Layer implementation
/// has already been registered.
MaxListenersExceeded,
/// Indicates that no listener is registered for given pattern URIs.
NoSuchListener,
/// Indicates that the underlying Transport Layer implementation does not support registration and
/// notification of message handlers.
PushDeliveryMethodNotSupported,
}

impl Display for RegistrationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RegistrationError::MaxListenersExceeded => {
f.write_str("maximum number of listeners has been reached")
}
RegistrationError::NoSuchListener => {
f.write_str("no listener registered for given pattern")
}
RegistrationError::PushDeliveryMethodNotSupported => f.write_str(
"the underlying transport implementation does not support the push delivery method",
),
}
}
}

impl Error for RegistrationError {}
101 changes: 101 additions & 0 deletions src/communication/notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{error::Error, fmt::Display, sync::Arc};

use async_trait::async_trait;

use crate::communication::RegistrationError;
use crate::{UListener, UMessage, UStatus, UUri};

/// An error indicating a problem with sending a notification to another uEntity.
#[derive(Debug)]
pub enum NotificationError {
/// Indicates that the given message cannot be sent because it is not a [valid Notification message](crate::NotificationValidator).
InvalidArgument(String),
/// Indicates an unspecific error that occurred at the Transport Layer while trying to send a notification.
NotifyError(UStatus),
}

impl Display for NotificationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NotificationError::InvalidArgument(s) => {
f.write_fmt(format_args!("invalid argument: {}", s))
}
NotificationError::NotifyError(s) => {
f.write_fmt(format_args!("failed to send notification: {}", s))
}
}
}
}

impl Error for NotificationError {}

/// A client for sending Notification messages to a uEntity.
///
/// Please refer to the
/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
#[async_trait]
pub trait Notifier: Send + Sync {
/// Sends a notification to a uEntity.
///
/// # Errors
///
/// Returns an error if the given message is not a valid
/// [uProtocol Notification message](`crate::NotificationValidator`).
async fn notify(&self, notification: UMessage) -> Result<(), NotificationError>;
}

/// A client for listening to Notification messages sent to this uEntity.
///
/// Please refer to the
/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
#[async_trait]
pub trait NotificationListener: Send + Sync {
/// Starts listening to notifications that origin from uEntities matching a given pattern.
///
/// More than one handler can be registered for the same pattern.
/// The same handler can be registered for multiple patterns.
///
/// # Arguments
///
/// * `origin_filter` - The pattern defining the origin addresses of interest.
/// * `listener` - The handler to invoke for each notification that has been sent from a uEntity
/// [matching the given pattern](`crate::UUri::matches`).
///
/// # Errors
///
/// Returns an error if the listener cannot be registered.
async fn start_listening(
&self,
origin_filter: &UUri,
listener: Arc<dyn UListener>,
) -> Result<(), RegistrationError>;

/// Unregisters a previously [registered handler](`Self::start_listening`) for listening to notifications.
///
/// # Arguments
///
/// * `origin_filter` - The pattern that the handler had been registered for.
/// * `listener` - The handler to unregister.
///
/// # Errors
///
/// Returns an error if the listener cannot be unregistered.
async fn stop_listening(
&self,
origin_filter: &UUri,
listener: Arc<dyn UListener>,
) -> Result<(), RegistrationError>;
}
99 changes: 99 additions & 0 deletions src/communication/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{error::Error, fmt::Display, sync::Arc};

use async_trait::async_trait;

use crate::communication::RegistrationError;
use crate::{UListener, UMessage, UStatus, UUri};

/// An error indicating a problem with publishing a message to a topic.
#[derive(Debug)]
pub enum PubSubError {
/// Indicates that the given message cannot be sent because it is not a [valid Publish message](crate::PublishValidator).
InvalidArgument(String),
/// Indicates an unspecific error that occurred at the Transport Layer while trying to publish a message.
PublishError(UStatus),
}

impl Display for PubSubError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PubSubError::InvalidArgument(s) => f.write_fmt(format_args!("invalid argument: {}", s)),
PubSubError::PublishError(s) => {
f.write_fmt(format_args!("failed to publish message: {}", s))
}
}
}
}

impl Error for PubSubError {}

/// A client for publishing messages to a topic.
///
/// Please refer to the
/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
#[async_trait]
pub trait Publisher: Send + Sync {
/// Publishes a message to a topic.
///
/// # Errors
///
/// Returns an error if the given message is not a valid
/// [uProtocol Publish message](`crate::PublishValidator`).
async fn publish(&self, message: UMessage) -> Result<(), PubSubError>;
}

/// A client for subscribing to topics.
///
/// Please refer to the
/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
#[async_trait]
pub trait Subscriber: Send + Sync {
/// Registers a handler to invoke for messages that have been published to topics matching a given pattern.
///
/// More than one handler can be registered for the same pattern.
/// The same handler can be registered for multiple patterns.
///
/// # Arguments
///
/// * `topic_filter` - The pattern defining the topics of interest.
/// * `listener` - The handler to invoke for each message that has been published to a topic
/// [matching the given pattern](`crate::UUri::matches`).
///
/// # Errors
///
/// Returns an error if the listener cannot be registered.
async fn subscribe(
&self,
topic_filter: &UUri,
listener: Arc<dyn UListener>,
) -> Result<(), RegistrationError>;

/// Unregisters a previously [registered handler](`Self::subscribe`).
///
/// # Arguments
///
/// * `topic_filter` - The UUri pattern that the handler had been registered for.
/// * `listener` - The handler to unregister.
///
/// # Errors
///
/// Returns an error if the listener cannot be unregistered.
async fn unsubscribe(
&self,
topic_filter: &UUri,
listener: Arc<dyn UListener>,
) -> Result<(), RegistrationError>;
}
87 changes: 87 additions & 0 deletions src/communication/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::error::Error;
use std::fmt::Display;
use std::sync::Arc;

use async_trait::async_trait;

use crate::communication::RegistrationError;
use crate::{UMessage, UStatus, UUri};

/// An error indicating a problem with publishing a message to a topic.
#[derive(Debug)]
pub enum ServiceInvocationError {
/// Indicates that the given message cannot be sent because it is not a [valid Publish message](crate::PublishValidator).
InvalidArgument(String),
/// Indicates an unspecific error that occurred at the Transport Layer while trying to publish a message.
RpcError(UStatus),
}

impl Display for ServiceInvocationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ServiceInvocationError::InvalidArgument(s) => {
f.write_fmt(format_args!("invalid argument: {}", s))
}
ServiceInvocationError::RpcError(s) => {
f.write_fmt(format_args!("failed to invoke service operation: {}", s))
}
}
}
}

impl Error for ServiceInvocationError {}

/// A client for invoking RPC methods.
///
/// Please refer to the
/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
#[async_trait]
pub trait RpcClient: Send + Sync {
/// Invokes a method on a service.
///
/// # Arguments
///
/// * `request` - The request message to be sent to the server.
///
/// # Returns
///
/// Returns the response message if the invocation was successful.
///
/// # Errors
///
/// Returns an error if invocation fails or the given message is not a valid RPC Request message.
async fn invoke_method(&self, request: UMessage) -> Result<UMessage, ServiceInvocationError>;
}

/// A server for exposing RPC endpoints.
///
/// Please refer to the
/// [Communication Layer API Specifications](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc).
#[async_trait]
pub trait RpcServer: Send + Sync {
async fn register_endpoint(
&self,
source_filter: &UUri,
method: &UUri,
listener: Arc<dyn RpcClient>,
) -> Result<(), RegistrationError>;
async fn unregister_endpoint(
&self,
source_filter: &UUri,
method: &UUri,
listener: Arc<dyn RpcClient>,
) -> Result<(), RegistrationError>;
}
9 changes: 4 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//!
//! ## Library contents
//!
//! * `rpc` module, which offers wrappers for dealing with uProtocol payload in the context of RPC method invocation
//! * `communication` module, which defines uProtocol's Communication Layer API for publishing and subscribing to topics and invoking RPC methods.
//! * `uattributes` module, with uProtocol message attribute types and validators
//! * `umessage` module, which defines the uProtocol core message type and provides related convenience functionality
//! * `upayload` module, which defines payload representation for uProtocol messages
Expand All @@ -43,13 +43,12 @@
//! * [Eclipse-uProtocol Core API types](https://github.com/eclipse-uprotocol/up-core-api)

// up_core_api types used and augmented by up_rust - symbols re-exported to toplevel, errors are module-specific
mod rpc;
pub use rpc::{RpcClient, RpcClientResult, RpcResult};
pub mod communication;

mod uattributes;
pub use uattributes::{
PublishValidator, RequestValidator, ResponseValidator, UAttributesValidator,
UAttributesValidators,
NotificationValidator, PublishValidator, RequestValidator, ResponseValidator,
UAttributesValidator, UAttributesValidators,
};
pub use uattributes::{UAttributes, UAttributesError, UMessageType, UPayloadFormat, UPriority};

Expand Down
18 changes: 0 additions & 18 deletions src/rpc.rs

This file was deleted.

5 changes: 0 additions & 5 deletions src/rpc/README.md

This file was deleted.

Loading

0 comments on commit 5469eef

Please sign in to comment.