Skip to content

Commit

Permalink
server: Add associated types Response and Exception to Service trait (#…
Browse files Browse the repository at this point in the history
…265)

* trait Service: Make server-side response optional

* Delete redundant type parameters

* Make Response and Exception associated types

* Add changelog entry
  • Loading branch information
uklotzde authored Jul 21, 2024
1 parent 8fb50bc commit 55c327a
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 105 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@

# Changelog

## v0.14.0 (Unreleased)

### Breaking Changes

- Server: Added associated types `Response` and `Exception` to `Service` trait.

## v0.13.1 (2024-06-25)

- Fix: Do not panic on disconnect.
Expand Down
43 changes: 22 additions & 21 deletions examples/rtu-over-tcp-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,37 @@ struct ExampleService {

impl tokio_modbus::server::Service for ExampleService {
type Request = SlaveRequest<'static>;
type Future = future::Ready<Result<Response, Exception>>;
type Response = Response;
type Exception = Exception;
type Future = future::Ready<Result<Self::Response, Self::Exception>>;

fn call(&self, req: Self::Request) -> Self::Future {
println!("{}", req.slave);
match req.request {
Request::ReadInputRegisters(addr, cnt) => future::ready(
let res = match req.request {
Request::ReadInputRegisters(addr, cnt) => {
register_read(&self.input_registers.lock().unwrap(), addr, cnt)
.map(Response::ReadInputRegisters),
),
Request::ReadHoldingRegisters(addr, cnt) => future::ready(
.map(Response::ReadInputRegisters)
}
Request::ReadHoldingRegisters(addr, cnt) => {
register_read(&self.holding_registers.lock().unwrap(), addr, cnt)
.map(Response::ReadHoldingRegisters),
),
Request::WriteMultipleRegisters(addr, values) => future::ready(
.map(Response::ReadHoldingRegisters)
}
Request::WriteMultipleRegisters(addr, values) => {
register_write(&mut self.holding_registers.lock().unwrap(), addr, &values)
.map(|_| Response::WriteMultipleRegisters(addr, values.len() as u16)),
),
Request::WriteSingleRegister(addr, value) => future::ready(
register_write(
&mut self.holding_registers.lock().unwrap(),
addr,
std::slice::from_ref(&value),
)
.map(|_| Response::WriteSingleRegister(addr, value)),
),
.map(|_| Response::WriteMultipleRegisters(addr, values.len() as u16))
}
Request::WriteSingleRegister(addr, value) => register_write(
&mut self.holding_registers.lock().unwrap(),
addr,
std::slice::from_ref(&value),
)
.map(|_| Response::WriteSingleRegister(addr, value)),
_ => {
println!("SERVER: Exception::IllegalFunction - Unimplemented function code in request: {req:?}");
future::ready(Err(Exception::IllegalFunction))
Err(Exception::IllegalFunction)
}
}
};
future::ready(res)
}
}

Expand Down
4 changes: 3 additions & 1 deletion examples/rtu-server-address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ struct Service {

impl tokio_modbus::server::Service for Service {
type Request = SlaveRequest<'static>;
type Future = future::Ready<Result<Response, Exception>>;
type Response = Response;
type Exception = Exception;
type Future = future::Ready<Result<Self::Response, Self::Exception>>;

fn call(&self, req: Self::Request) -> Self::Future {
if req.slave != self.slave.into() {
Expand Down
4 changes: 3 additions & 1 deletion examples/rtu-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ struct Service;

impl tokio_modbus::server::Service for Service {
type Request = SlaveRequest<'static>;
type Future = future::Ready<Result<Response, Exception>>;
type Response = Response;
type Exception = Exception;
type Future = future::Ready<Result<Self::Response, Self::Exception>>;

fn call(&self, req: Self::Request) -> Self::Future {
match req.request {
Expand Down
43 changes: 22 additions & 21 deletions examples/tcp-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,36 @@ struct ExampleService {

impl tokio_modbus::server::Service for ExampleService {
type Request = Request<'static>;
type Future = future::Ready<Result<Response, Exception>>;
type Response = Response;
type Exception = Exception;
type Future = future::Ready<Result<Self::Response, Self::Exception>>;

fn call(&self, req: Self::Request) -> Self::Future {
match req {
Request::ReadInputRegisters(addr, cnt) => future::ready(
let res = match req {
Request::ReadInputRegisters(addr, cnt) => {
register_read(&self.input_registers.lock().unwrap(), addr, cnt)
.map(Response::ReadInputRegisters),
),
Request::ReadHoldingRegisters(addr, cnt) => future::ready(
.map(Response::ReadInputRegisters)
}
Request::ReadHoldingRegisters(addr, cnt) => {
register_read(&self.holding_registers.lock().unwrap(), addr, cnt)
.map(Response::ReadHoldingRegisters),
),
Request::WriteMultipleRegisters(addr, values) => future::ready(
.map(Response::ReadHoldingRegisters)
}
Request::WriteMultipleRegisters(addr, values) => {
register_write(&mut self.holding_registers.lock().unwrap(), addr, &values)
.map(|_| Response::WriteMultipleRegisters(addr, values.len() as u16)),
),
Request::WriteSingleRegister(addr, value) => future::ready(
register_write(
&mut self.holding_registers.lock().unwrap(),
addr,
std::slice::from_ref(&value),
)
.map(|_| Response::WriteSingleRegister(addr, value)),
),
.map(|_| Response::WriteMultipleRegisters(addr, values.len() as u16))
}
Request::WriteSingleRegister(addr, value) => register_write(
&mut self.holding_registers.lock().unwrap(),
addr,
std::slice::from_ref(&value),
)
.map(|_| Response::WriteSingleRegister(addr, value)),
_ => {
println!("SERVER: Exception::IllegalFunction - Unimplemented function code in request: {req:?}");
future::ready(Err(Exception::IllegalFunction))
Err(Exception::IllegalFunction)
}
}
};
future::ready(res)
}
}

Expand Down
43 changes: 22 additions & 21 deletions examples/tls-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,35 +90,36 @@ struct ExampleService {

impl tokio_modbus::server::Service for ExampleService {
type Request = Request<'static>;
type Future = future::Ready<Result<Response, Exception>>;
type Response = Response;
type Exception = Exception;
type Future = future::Ready<Result<Self::Response, Self::Exception>>;

fn call(&self, req: Self::Request) -> Self::Future {
match req {
Request::ReadInputRegisters(addr, cnt) => future::ready(
let res = match req {
Request::ReadInputRegisters(addr, cnt) => {
register_read(&self.input_registers.lock().unwrap(), addr, cnt)
.map(Response::ReadInputRegisters),
),
Request::ReadHoldingRegisters(addr, cnt) => future::ready(
.map(Response::ReadInputRegisters)
}
Request::ReadHoldingRegisters(addr, cnt) => {
register_read(&self.holding_registers.lock().unwrap(), addr, cnt)
.map(Response::ReadHoldingRegisters),
),
Request::WriteMultipleRegisters(addr, values) => future::ready(
.map(Response::ReadHoldingRegisters)
}
Request::WriteMultipleRegisters(addr, values) => {
register_write(&mut self.holding_registers.lock().unwrap(), addr, &values)
.map(|_| Response::WriteMultipleRegisters(addr, values.len() as u16)),
),
Request::WriteSingleRegister(addr, value) => future::ready(
register_write(
&mut self.holding_registers.lock().unwrap(),
addr,
std::slice::from_ref(&value),
)
.map(|_| Response::WriteSingleRegister(addr, value)),
),
.map(|_| Response::WriteMultipleRegisters(addr, values.len() as u16))
}
Request::WriteSingleRegister(addr, value) => register_write(
&mut self.holding_registers.lock().unwrap(),
addr,
std::slice::from_ref(&value),
)
.map(|_| Response::WriteSingleRegister(addr, value)),
_ => {
println!("SERVER: Exception::IllegalFunction - Unimplemented function code in request: {req:?}");
future::ready(Err(Exception::IllegalFunction))
Err(Exception::IllegalFunction)
}
}
};
future::ready(res)
}
}

Expand Down
29 changes: 7 additions & 22 deletions src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,6 @@ impl From<ExceptionResponse> for ResponsePdu {
}
}

impl From<Result<Response, ExceptionResponse>> for ResponsePdu {
fn from(from: Result<Response, ExceptionResponse>) -> Self {
ResponsePdu(from.map(Into::into).map_err(Into::into))
}
}

#[cfg(any(
feature = "rtu-over-tcp-server",
feature = "rtu-server",
Expand All @@ -463,22 +457,13 @@ pub(crate) struct OptionalResponsePdu(pub(crate) Option<ResponsePdu>);
feature = "rtu-server",
feature = "tcp-server"
))]
impl<T> From<Option<T>> for OptionalResponsePdu
where
T: Into<ResponsePdu>,
{
fn from(from: Option<T>) -> Self {
Self(from.map(Into::into))
}
}

#[cfg(any(feature = "rtu-server", feature = "tcp-server"))]
impl<T> From<T> for OptionalResponsePdu
where
T: Into<ResponsePdu>,
{
fn from(from: T) -> Self {
Self(Some(from.into()))
impl From<Result<Option<Response>, ExceptionResponse>> for OptionalResponsePdu {
fn from(from: Result<Option<Response>, ExceptionResponse>) -> Self {
match from {
Ok(None) => Self(None),
Ok(Some(response)) => Self(Some(response.into())),
Err(exception) => Self(Some(exception.into())),
}
}
}

Expand Down
17 changes: 11 additions & 6 deletions src/server/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
ExceptionResponse, OptionalResponsePdu,
},
server::service::Service,
Exception, Response,
};

use super::Terminated;
Expand Down Expand Up @@ -44,6 +45,8 @@ impl Server {
where
S: Service + Send + Sync + 'static,
S::Request: From<RequestAdu<'static>> + Send,
S::Response: Into<Option<Response>>,
S::Exception: Into<Exception>,
{
let framed = Framed::new(self.serial, ServerCodec::default());
process(framed, service).await
Expand All @@ -57,6 +60,8 @@ impl Server {
where
S: Service + Send + Sync + 'static,
S::Request: From<RequestAdu<'static>> + Send,
S::Response: Into<Option<Response>>,
S::Exception: Into<Exception>,
X: Future<Output = ()> + Sync + Send + Unpin + 'static,
{
let framed = Framed::new(self.serial, ServerCodec::default());
Expand All @@ -73,13 +78,12 @@ impl Server {
}

/// frame wrapper around the underlying service's responses to forwarded requests
async fn process<S, Req>(
mut framed: Framed<SerialStream, ServerCodec>,
service: S,
) -> io::Result<()>
async fn process<S>(mut framed: Framed<SerialStream, ServerCodec>, service: S) -> io::Result<()>
where
S: Service<Request = Req> + Send + Sync + 'static,
S: Service + Send + Sync + 'static,
S::Request: From<RequestAdu<'static>> + Send,
S::Response: Into<Option<Response>>,
S::Exception: Into<Exception>,
{
loop {
let Some(request) = framed.next().await.transpose()? else {
Expand All @@ -92,9 +96,10 @@ where
let OptionalResponsePdu(Some(response_pdu)) = service
.call(request.into())
.await
.map(Into::into)
.map_err(|e| ExceptionResponse {
function: fc,
exception: e,
exception: e.into(),
})
.into()
else {
Expand Down
22 changes: 17 additions & 5 deletions src/server/rtu_over_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
ExceptionResponse, OptionalResponsePdu,
},
server::service::Service,
Exception, Response,
};

use super::Terminated;
Expand Down Expand Up @@ -75,6 +76,8 @@ impl Server {
where
S: Service + Send + Sync + 'static,
S::Request: From<RequestAdu<'static>> + Send,
S::Response: Into<Option<Response>>,
S::Exception: Into<Exception>,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
OnConnected: Fn(TcpStream, SocketAddr) -> F,
F: Future<Output = io::Result<Option<(S, T)>>>,
Expand Down Expand Up @@ -115,6 +118,8 @@ impl Server {
where
S: Service + Send + Sync + 'static,
S::Request: From<RequestAdu<'static>> + Send,
S::Response: Into<Option<Response>>,
S::Exception: Into<Exception>,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
X: Future<Output = ()> + Sync + Send + Unpin + 'static,
OnConnected: Fn(TcpStream, SocketAddr) -> F,
Expand All @@ -134,10 +139,12 @@ impl Server {
}

/// The request-response loop spawned by [`serve_until`] for each client
async fn process<S, T, Req>(mut framed: Framed<T, ServerCodec>, service: S) -> io::Result<()>
async fn process<S, T>(mut framed: Framed<T, ServerCodec>, service: S) -> io::Result<()>
where
S: Service<Request = Req> + Send + Sync + 'static,
S: Service + Send + Sync + 'static,
S::Request: From<RequestAdu<'static>> + Send,
S::Response: Into<Option<Response>>,
S::Exception: Into<Exception>,
T: AsyncRead + AsyncWrite + Unpin,
{
loop {
Expand All @@ -151,9 +158,10 @@ where
let OptionalResponsePdu(Some(response_pdu)) = service
.call(request.into())
.await
.map(Into::into)
.map_err(|e| ExceptionResponse {
function: fc,
exception: e,
exception: e.into(),
})
.into()
else {
Expand Down Expand Up @@ -220,7 +228,9 @@ mod tests {

impl Service for DummyService {
type Request = Request<'static>;
type Future = future::Ready<Result<Response, Exception>>;
type Response = Response;
type Exception = Exception;
type Future = future::Ready<Result<Self::Response, Self::Exception>>;

fn call(&self, _: Self::Request) -> Self::Future {
future::ready(Ok(self.response.clone()))
Expand Down Expand Up @@ -253,7 +263,9 @@ mod tests {

impl Service for DummyService {
type Request = Request<'static>;
type Future = future::Ready<Result<Response, Exception>>;
type Response = Response;
type Exception = Exception;
type Future = future::Ready<Result<Self::Response, Self::Exception>>;

fn call(&self, _: Self::Request) -> Self::Future {
future::ready(Ok(self.response.clone()))
Expand Down
Loading

0 comments on commit 55c327a

Please sign in to comment.