Skip to content

Commit

Permalink
chore: Change IpfsOptions visibility, remove UninitializedIpfs::{empt…
Browse files Browse the repository at this point in the history
…y, with_opt} (#294)
  • Loading branch information
dariusc93 authored Aug 28, 2024
1 parent 490ab28 commit 6a6d12d
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 173 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- refactor: Reference `Block` in `BlockStore::put_block`. [PR 272](https://github.com/dariusc93/rust-ipfs/pull/272)
- feat: Passthrough timeout to WantSession::new. [PR 265](https://github.com/dariusc93/rust-ipfs/pull/265)
- chore: Update libp2p to 0.54. [PR 289](https://github.com/dariusc93/rust-ipfs/pull/289)
- chore: Change IpfsOptions visibility, remove UninitializedIpfs::{empty, with_opt}. [PR 294](https://github.com/dariusc93/rust-ipfs/pull/294)

# 0.11.21
- chore: Put libp2p-webrtc-websys behind feature.
Expand Down
2 changes: 1 addition & 1 deletion src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ fn resolve_local<'a>(
let (cid, data) = block.into_inner();
Ok(resolve_local_dagpb(
cid,
data.into(),
data,
segment,
segments.peek().is_none(),
cache,
Expand Down
25 changes: 1 addition & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl PartialEq for StorageType {
impl Eq for StorageType {}

/// Ipfs node options used to configure the node to be created with [`UninitializedIpfs`].
pub struct IpfsOptions {
struct IpfsOptions {
/// The path of the ipfs repo (blockstore and datastore).
///
/// This is always required but can be any path with in-memory backends. The filesystem backend
Expand Down Expand Up @@ -529,29 +529,6 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
}
}

/// New uninitualized instance without any listener addresses
#[deprecated(
note = "UninitializedIpfs::empty will be removed in the future. Use UninitializedIpfs::new()"
)]
pub fn empty() -> Self {
Self::new()
}

/// Configures a new UninitializedIpfs with from the given options and optionally a span.
/// If the span is not given, it is defaulted to `tracing::trace_span!("ipfs")`.
///
/// The span is attached to all operations called on the later created `Ipfs` along with all
/// operations done in the background task as well as tasks spawned by the underlying
/// `libp2p::Swarm`.
#[deprecated(
note = "UninitializedIpfs::with_opt will be removed in the future. Use UninitializedIpfs::new()"
)]
pub fn with_opt(options: IpfsOptions) -> Self {
let mut opt = Self::new();
opt.options = options;
opt
}

/// Set default listening unspecified ipv4 and ipv6 addresseses for tcp and udp/quic
pub fn set_default_listener(self) -> Self {
self.add_listening_addrs(vec![
Expand Down
2 changes: 1 addition & 1 deletion src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ where
C: NetworkBehaviour,
<C as NetworkBehaviour>::ToSwarm: Debug + Send,
{
pub fn new(
pub(crate) fn new(
keypair: &Keypair,
options: &IpfsOptions,
repo: &Repo,
Expand Down
2 changes: 1 addition & 1 deletion src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl Default for SwarmConfig {
#[allow(deprecated)]
//TODO: use libp2p::SwarmBuilder
/// Creates a new IPFS swarm.
pub fn create_swarm<C>(
pub(crate) fn create_swarm<C>(
keypair: &Keypair,
options: &IpfsOptions,
repo: &Repo,
Expand Down
12 changes: 4 additions & 8 deletions src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,15 +709,11 @@ impl Repo {
futures::pin_mut!(notified_fut);

match futures::future::select(block_fut, notified_fut).await {
Either::Left((Ok(Ok(block)), _)) => return Ok::<_, Error>(block),
Either::Left((Ok(Err(e)), _)) => {
return Err::<_, Error>(anyhow::anyhow!("{e}"))
}
Either::Left((Err(e), _)) => return Err::<_, Error>(e.into()),
Either::Left((Ok(Ok(block)), _)) => Ok::<_, Error>(block),
Either::Left((Ok(Err(e)), _)) => Err::<_, Error>(anyhow::anyhow!("{e}")),
Either::Left((Err(e), _)) => Err::<_, Error>(e.into()),
Either::Right(((), _)) => {
return Err::<_, Error>(anyhow::anyhow!(
"request for {cid} has been cancelled"
))
Err::<_, Error>(anyhow::anyhow!("request for {cid} has been cancelled"))
}
}
}
Expand Down
188 changes: 50 additions & 138 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,43 +341,25 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
Bootstrap(Err(BootstrapError::Timeout { .. })) => {
warn!("kad: timed out while trying to bootstrap");

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to bootstrap"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to bootstrap"
)));
}
}
GetClosestPeers(Ok(GetClosestPeersOk { key, peers })) => {
if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Ok(KadResult::Peers(
peers.iter().map(|info| info.peer_id).collect(),
)));
}
if let Ok(peer_id) = PeerId::from_bytes(&key) {
if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
if !peers.iter().any(|info| info.peer_id == peer_id) {
for ret in rets {
let _ = ret.send(Err(anyhow::anyhow!(
"Could not locate peer"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Ok(KadResult::Peers(
peers.iter().map(|info| info.peer_id).collect(),
)));
}
if let Ok(peer_id) = PeerId::from_bytes(&key) {
if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
if !peers.iter().any(|info| info.peer_id == peer_id) {
for ret in rets {
let _ = ret.send(Err(anyhow::anyhow!(
"Could not locate peer"
)));
}
}
}
Expand All @@ -390,26 +372,17 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
// don't mention the key here, as this is just the id of our node
warn!("kad: timed out while trying to find all closest peers");

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to find all closest peers"
)));
}
if let Ok(peer_id) = PeerId::from_bytes(&key) {
if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
for ret in rets {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to find all closest peers"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to find all closest peers"
)));
}
if let Ok(peer_id) = PeerId::from_bytes(&key) {
if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
for ret in rets {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to find all closest peers"
)));
}
}
}
Expand Down Expand Up @@ -438,17 +411,10 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to get providers for {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!("timed out while trying to get providers for the given key")));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to get providers for the given key"
)));
}
}
StartProviding(Ok(AddProviderOk { key })) => {
Expand All @@ -459,19 +425,10 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to provide {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to provide the record"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to provide the record"
)));
}
}
RepublishProvider(Ok(AddProviderOk { key })) => {
Expand Down Expand Up @@ -503,17 +460,8 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: couldn't find record {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
}
GetRecord(Err(GetRecordError::QuorumFailed {
Expand All @@ -527,34 +475,16 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
quorum, key
);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
}
GetRecord(Err(GetRecordError::Timeout { key })) => {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to get key {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
}
PutRecord(Ok(PutRecordOk { key }))
Expand All @@ -578,19 +508,10 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
quorum, key
);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: quorum failed when trying to put the record"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: quorum failed when trying to put the record"
)));
}
}
PutRecord(Err(PutRecordError::Timeout {
Expand All @@ -601,20 +522,11 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to put record {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to put record {}",
key
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to put record {}",
key
)));
}
}
RepublishRecord(Err(PutRecordError::Timeout {
Expand Down
1 change: 1 addition & 0 deletions unixfs/benches/ingest-tar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ fn ingest_tar(bytes: &[u8], buffer: &mut Vec<u8>, path: &mut String) {

let len = buffer.len();

#[allow(clippy::needless_borrows_for_generic_args)]
let mh = Multihash::wrap(
multihash_codetable::Code::Sha2_256.into(),
&Sha256::digest(&buffer),
Expand Down
1 change: 1 addition & 0 deletions unixfs/src/dir/builder/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl PostOrderIterator {

buffer.truncate(size);

#[allow(clippy::needless_borrows_for_generic_args)]
let mh = Multihash::wrap(Code::Sha2_256.into(), &Sha256::digest(&buffer)).unwrap();
let cid = Cid::new_v0(mh).expect("sha2_256 is the correct multihash for cidv0");

Expand Down

0 comments on commit 6a6d12d

Please sign in to comment.