Skip to content

Commit

Permalink
Update upc to 0.3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
surban committed Nov 7, 2023
1 parent 8322d71 commit 8925184
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 39 deletions.
4 changes: 2 additions & 2 deletions aggligator-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ tokio-tungstenite = { version = "0.20", features = [
], optional = true }
url = { version = "2", optional = true }
axum-server = { version = "0.5", optional = true }
upc = { version = "0.2.4", optional = true }
usb-gadget = { version = "0.4", optional = true }
upc = { version = "0.3.1", optional = true }
usb-gadget = { version = "0.5", optional = true }
rusb = { version = "0.9", optional = true }
gethostname = { version = "0.4", optional = true }

Expand Down
43 changes: 6 additions & 37 deletions aggligator-util/src/transport/usb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ const TIMEOUT: Duration = Duration::from_secs(1);
#[cfg_attr(docsrs, doc(cfg(feature = "usb-host")))]
mod host {
use async_trait::async_trait;
use bytes::Bytes;
use futures::{sink, stream};
use futures::TryStreamExt;
use rusb::{Context, Device, Hotplug, HotplugBuilder, Registration, UsbContext};
use std::{
any::Any,
Expand Down Expand Up @@ -298,24 +297,9 @@ mod host {
}
let Some(dev) = dev else { return Err(Error::new(ErrorKind::NotFound, "USB device gone")) };

let (tx, rx) = upc::host::connect(&dev, tag.interface, &[])
.await
.map_err(|err| Error::new(ErrorKind::ConnectionRefused, err))?;
let (tx, rx) = upc::host::connect(&dev, tag.interface, &[]).await?;

let tx_sink = sink::unfold(tx, |tx, data: Bytes| async move {
tx.send(data.to_vec()).await.map_err(|err| Error::new(ErrorKind::ConnectionReset, err))?;
Ok(tx)
});

let rx_stream = stream::try_unfold(rx, |mut rx| async move {
match rx.recv().await {
Ok(data) => Ok(Some((data.into(), rx))),
Err(rusb::Error::Pipe) => Ok(None),
Err(err) => Err(Error::new(ErrorKind::ConnectionReset, err)),
}
});

Ok(TxRxBox::new(tx_sink, rx_stream).into())
Ok(TxRxBox::new(tx.into_sink(), rx.into_stream().map_ok(|p| p.freeze())).into())
}
}
}
Expand All @@ -329,15 +313,14 @@ pub use host::*;
mod device {
use aggligator::control::Direction;
use async_trait::async_trait;
use bytes::Bytes;
use core::fmt;
use futures::{sink, stream};
use futures::TryStreamExt;
use std::{
any::Any,
cmp::Ordering,
ffi::{OsStr, OsString},
hash::{Hash, Hasher},
io::{ErrorKind, Result},
io::Result,
};
use tokio::sync::{mpsc, Mutex};
use upc::device::UpcFunction;
Expand Down Expand Up @@ -418,21 +401,7 @@ mod device {

loop {
let (tx, rx) = upc_function.accept().await?;

let tx_sink = sink::unfold(tx, |tx, data: Bytes| async move {
tx.send(data.to_vec()).await?;
Ok(tx)
});

let rx_stream = stream::try_unfold(rx, |mut rx| async move {
match rx.recv().await {
Ok(data) => Ok(Some((data.into(), rx))),
Err(err) if err.kind() == ErrorKind::ConnectionReset => Ok(None),
Err(err) => Err(err),
}
});

let tx_rx = TxRxBox::new(tx_sink, rx_stream);
let tx_rx = TxRxBox::new(tx.into_sink(), rx.into_stream().map_ok(|p| p.freeze()));

let tag = IncomingUsbLinkTag { udc: self.udc_name.clone() };

Expand Down

0 comments on commit 8925184

Please sign in to comment.