diff --git a/Cargo.lock b/Cargo.lock index d0ffe4a..ca23a77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,36 +185,208 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" +dependencies = [ + "concurrent-queue", + "event-listener 5.2.0", + "event-listener-strategy 0.5.0", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ae5ebefcc48e7452b4987947920dac9450be1110cadf34d1b8c116bdbaf97c" +dependencies = [ + "async-lock 3.3.0", + "async-task", + "concurrent-queue", + "fastrand 2.0.2", + "futures-lite 2.2.0", + "slab", +] + +[[package]] +name = "async-fs" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "279cf904654eeebfa37ac9bb1598880884924aab82e290aa65c9e77a0e142e06" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "blocking", + "futures-lite 1.13.0", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.2.0", + "async-executor", + "async-io 2.3.1", + "async-lock 3.3.0", + "blocking", + "futures-lite 2.2.0", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.27", + "slab", + "socket2 0.4.10", + "waker-fn", +] + [[package]] name = "async-io" version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f97ab0c5b00a7cdbe5a371b9a782ee7be1316095885c8a4ea1daf490eb0ef65" dependencies = [ - "async-lock", + "async-lock 3.3.0", "cfg-if", "concurrent-queue", "futures-io", - "futures-lite", + "futures-lite 2.2.0", "parking", - "polling", - "rustix", + "polling 3.4.0", + "rustix 0.38.31", "slab", "tracing", "windows-sys 0.52.0", ] +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + [[package]] name = "async-lock" version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" dependencies = [ - "event-listener", - "event-listener-strategy", + "event-listener 4.0.3", + "event-listener-strategy 0.4.0", + "pin-project-lite", +] + +[[package]] +name = "async-net" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0434b1ed18ce1cf5769b8ac540e33f01fa9471058b5e89da9e06f3c882a8c12f" +dependencies = [ + "async-io 1.13.0", + "blocking", + "futures-lite 1.13.0", +] + +[[package]] +name = "async-process" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88" +dependencies = [ + "async-io 1.13.0", + "async-lock 2.8.0", + "async-signal", + "blocking", + "cfg-if", + "event-listener 3.1.0", + "futures-lite 1.13.0", + "rustix 0.38.31", + "windows-sys 0.48.0", +] + +[[package]] +name = "async-signal" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e47d90f65a225c4527103a8d747001fc56e375203592b25ad103e1ca13124c5" +dependencies = [ + "async-io 2.3.1", + "async-lock 2.8.0", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix 0.38.31", + "signal-hook-registry", + "slab", + "windows-sys 0.48.0", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel 1.9.0", + "async-global-executor", + "async-io 1.13.0", + "async-lock 2.8.0", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite 1.13.0", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", ] +[[package]] +name = "async-task" +version = "4.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" + [[package]] name = "async-trait" version = "0.1.77" @@ -226,6 +398,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "asynchronous-codec" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4057f2c32adbb2fc158e22fb38433c8e9bbf76b75a4732c7c0cbaf695fb65568" +dependencies = [ + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "asynchronous-codec" version = "0.7.0" @@ -239,6 +424,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "attohttpc" version = "0.24.1" @@ -294,7 +485,7 @@ name = "beetswap" version = "0.1.0" dependencies = [ "anyhow", - "asynchronous-codec", + "asynchronous-codec 0.7.0", "blockstore", "bytes", "cid", @@ -303,12 +494,16 @@ dependencies = [ "futures", "futures-timer", "hex", + "instant", "libp2p", "libp2p-core", "libp2p-identity", "libp2p-noise", + "libp2p-stream", "libp2p-swarm", + "libp2p-swarm-test", "libp2p-yamux", + "log", "multihash", "multihash-codetable", "quick-protobuf", @@ -387,6 +582,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" +dependencies = [ + "async-channel 2.2.0", + "async-lock 3.3.0", + "async-task", + "fastrand 2.0.2", + "futures-io", + "futures-lite 2.2.0", + "piper", + "tracing", +] + [[package]] name = "blockstore" version = "0.4.0" @@ -811,6 +1022,23 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + [[package]] name = "event-listener" version = "4.0.3" @@ -822,16 +1050,52 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-listener" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + [[package]] name = "event-listener-strategy" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" dependencies = [ - "event-listener", + "event-listener 4.0.3", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291" +dependencies = [ + "event-listener 5.2.0", "pin-project-lite", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + +[[package]] +name = "fastrand" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" + [[package]] name = "fiat-crypto" version = "0.2.5" @@ -912,13 +1176,31 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-lite" version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "445ba825b27408685aaecefd65178908c36c6e96aaf6d8599419d46e624192ba" dependencies = [ + "fastrand 2.0.2", "futures-core", + "futures-io", + "parking", "pin-project-lite", ] @@ -1096,7 +1378,7 @@ dependencies = [ "ipnet", "once_cell", "rand", - "socket2", + "socket2 0.5.5", "thiserror", "tinyvec", "tokio", @@ -1205,7 +1487,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -1248,7 +1530,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6b0422c86d7ce0e97169cc42e04ae643caf278874a7a3c87b8150a220dc7e1e" dependencies = [ - "async-io", + "async-io 2.3.1", "core-foundation", "fnv", "futures", @@ -1256,6 +1538,7 @@ dependencies = [ "ipnet", "log", "rtnetlink", + "smol", "system-configuration", "tokio", "windows", @@ -1306,6 +1589,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.48.0", ] [[package]] @@ -1314,7 +1611,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2", + "socket2 0.5.5", "widestring", "windows-sys 0.48.0", "winreg", @@ -1350,6 +1647,15 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1466,7 +1772,7 @@ version = "0.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20499a945d2f0221fdc6269b3848892c0f370d2ee3e19c7f65a29d8f860f6126" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.7.0", "either", "futures", "futures-bounded", @@ -1476,7 +1782,7 @@ dependencies = [ "libp2p-swarm", "lru", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.1", "smallvec", "thiserror", "tracing", @@ -1516,7 +1822,7 @@ dependencies = [ "libp2p-swarm", "rand", "smallvec", - "socket2", + "socket2 0.5.5", "tokio", "tracing", "void", @@ -1544,7 +1850,7 @@ version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecd0545ce077f6ea5434bcb76e8d0fe942693b4380aaad0d34a358c2bd05793" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.7.0", "bytes", "curve25519-dalek", "futures", @@ -1564,6 +1870,22 @@ dependencies = [ "zeroize", ] +[[package]] +name = "libp2p-plaintext" +version = "0.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67330af40b67217e746d42551913cfb7ad04c74fa300fb329660a56318590b3f" +dependencies = [ + "asynchronous-codec 0.6.2", + "bytes", + "futures", + "libp2p-core", + "libp2p-identity", + "quick-protobuf", + "quick-protobuf-codec 0.2.0", + "tracing", +] + [[package]] name = "libp2p-quic" version = "0.10.2" @@ -1582,18 +1904,34 @@ dependencies = [ "rand", "ring 0.16.20", "rustls", - "socket2", + "socket2 0.5.5", "thiserror", "tokio", "tracing", ] +[[package]] +name = "libp2p-stream" +version = "0.1.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1f34086b787422d4cf148bc0f0c72557a1cb97811dd001f363af7b1f82057e" +dependencies = [ + "futures", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand", + "tracing", + "void", +] + [[package]] name = "libp2p-swarm" version = "0.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e92532fc3c4fb292ae30c371815c9b10103718777726ea5497abc268a4761866" dependencies = [ + "async-std", "either", "fnv", "futures", @@ -1623,19 +1961,39 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "libp2p-swarm-test" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a73027f1bdabd15d08b2c7954911cd56a6265c476763b2ceb10d9dc5ea4366b2" +dependencies = [ + "async-trait", + "futures", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-plaintext", + "libp2p-swarm", + "libp2p-tcp", + "libp2p-yamux", + "rand", + "tracing", +] + [[package]] name = "libp2p-tcp" version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b2460fc2748919adff99ecbc1aab296e4579e41f374fb164149bd2c9e529d4c" dependencies = [ + "async-io 1.13.0", "futures", "futures-timer", "if-watch", "libc", "libp2p-core", "libp2p-identity", - "socket2", + "socket2 0.5.5", "tokio", "tracing", ] @@ -1696,6 +2054,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -1717,6 +2081,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "value-bag", +] [[package]] name = "lru" @@ -1940,6 +2307,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6471bf08e7ac0135876a9581bf3217ef0333c191c128d34878079f42ee150411" dependencies = [ + "async-io 1.13.0", "bytes", "futures", "libc", @@ -2149,6 +2517,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.2", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -2165,6 +2544,22 @@ version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "626dec3cac7cc0e1577a2ec3fc496277ec2baa084bebad95bb6fdbfae235f84c" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "polling" version = "3.4.0" @@ -2174,7 +2569,7 @@ dependencies = [ "cfg-if", "concurrent-queue", "pin-project-lite", - "rustix", + "rustix 0.38.31", "tracing", "windows-sys 0.52.0", ] @@ -2295,13 +2690,26 @@ dependencies = [ "byteorder", ] +[[package]] +name = "quick-protobuf-codec" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ededb1cd78531627244d51dd0c7139fbe736c7d57af0092a76f0ffb2f56e98" +dependencies = [ + "asynchronous-codec 0.6.2", + "bytes", + "quick-protobuf", + "thiserror", + "unsigned-varint 0.7.2", +] + [[package]] name = "quick-protobuf-codec" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15a0580ab32b169745d7a39db2ba969226ca16738931be152a3209b409de2474" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.7.0", "bytes", "quick-protobuf", "thiserror", @@ -2351,7 +2759,7 @@ checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" dependencies = [ "bytes", "libc", - "socket2", + "socket2 0.5.5", "tracing", "windows-sys 0.48.0", ] @@ -2514,6 +2922,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "322c53fd76a18698f1c27381d58091de3a043d356aa5bd0d510608b565f469a0" dependencies = [ + "async-global-executor", "futures", "log", "netlink-packet-route", @@ -2553,6 +2962,20 @@ dependencies = [ "nom", ] +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.31" @@ -2562,7 +2985,7 @@ dependencies = [ "bitflags 2.4.2", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.13", "windows-sys 0.52.0", ] @@ -2688,6 +3111,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -2712,6 +3144,23 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +[[package]] +name = "smol" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13f2b548cd8447f8de0fdf1c592929f70f4fc7039a05e47404b0d096ec6987a1" +dependencies = [ + "async-channel 1.9.0", + "async-executor", + "async-fs", + "async-io 1.13.0", + "async-lock 2.8.0", + "async-net", + "async-process", + "blocking", + "futures-lite 1.13.0", +] + [[package]] name = "snow" version = "0.9.6" @@ -2729,6 +3178,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.5" @@ -2935,7 +3394,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", - "socket2", + "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", ] @@ -3115,6 +3574,10 @@ name = "unsigned-varint" version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" +dependencies = [ + "asynchronous-codec 0.6.2", + "bytes", +] [[package]] name = "unsigned-varint" @@ -3157,6 +3620,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74797339c3b98616c009c7c3eb53a0ce41e85c8ec66bd3db96ed132d20cfdee8" + [[package]] name = "version_check" version = "0.9.4" @@ -3169,6 +3638,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "want" version = "0.3.1" @@ -3209,6 +3684,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.90" diff --git a/Cargo.toml b/Cargo.toml index 2776704..556e3f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ cid = "0.11" fnv = "1.0.5" futures = "0.3" futures-timer = "3" +instant = "0.1.12" libp2p-core = "0.41" libp2p-identity = "0.2" libp2p-swarm = "0.44" @@ -31,7 +32,10 @@ clap = { version = "4.4", features = ["derive"] } hex = "0.4" libp2p = { version = "0.53", features = ["tokio", "tcp", "identify", "macros"] } libp2p-noise = "0.44" +libp2p-stream = "0.1.0-alpha" +libp2p-swarm-test = "0.3.0" libp2p-yamux = "0.45" +log = "0.4.18" # needed for minimal-versions multihash = "0.19" multihash-codetable = { version = "0.1", features = ["digest", "sha2"] } tokio = { version = "1", features = ["rt", "macros", "time", "sync"] } @@ -39,7 +43,7 @@ tracing-appender = "0.2.2" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } [features] -wasm-bindgen = ["futures-timer/wasm-bindgen"] +wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen"] [package.metadata.docs.rs] rustdoc-args = ["--cfg", "docs_rs"] diff --git a/src/client.rs b/src/client.rs index 9195e19..3841928 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,26 +1,26 @@ use std::collections::{hash_map, VecDeque}; -use std::fmt; use std::mem::take; use std::sync::Arc; use std::task::{ready, Context, Poll}; use std::time::Duration; +use std::{fmt, mem}; use asynchronous_codec::FramedWrite; use blockstore::{Blockstore, BlockstoreError}; use cid::CidGeneric; -use fnv::FnvHashMap; +use fnv::{FnvHashMap, FnvHashSet}; use futures::future::{AbortHandle, Abortable}; use futures::stream::FuturesUnordered; -use futures::task::AtomicWaker; use futures::{FutureExt, SinkExt, StreamExt}; use futures_timer::Delay; +use instant::Instant; use libp2p_core::upgrade::ReadyUpgrade; use libp2p_identity::PeerId; use libp2p_swarm::{ - ConnectionHandlerEvent, NotifyHandler, StreamProtocol, SubstreamProtocol, ToSwarm, + ConnectionHandlerEvent, ConnectionId, NotifyHandler, StreamProtocol, SubstreamProtocol, ToSwarm, }; use smallvec::SmallVec; -use std::sync::Mutex; +use tracing::warn; use crate::incoming_stream::ClientMessage; use crate::message::Codec; @@ -28,10 +28,12 @@ use crate::proto::message::mod_Message::{BlockPresenceType, Wantlist as ProtoWan use crate::proto::message::Message; use crate::utils::{box_future, convert_cid, stream_protocol, BoxFuture}; use crate::wantlist::{Wantlist, WantlistState}; -use crate::StreamRequester; +use crate::{ConnHandlerEvent, StreamRequester}; use crate::{Error, Event, Result, ToBehaviourEvent, ToHandlerEvent}; const SEND_FULL_INTERVAL: Duration = Duration::from_secs(30); +const RECEIVE_REQUEST_TIMEOUT: Duration = Duration::from_secs(1); +const START_SENDING_TIMEOUT: Duration = Duration::from_secs(5); /// ID of an ongoing query. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -74,25 +76,56 @@ where tasks: FuturesUnordered>>, query_abort_handle: FnvHashMap, next_query_id: u64, - waker: Arc, send_full_timer: Delay, new_blocks: Vec<(CidGeneric, Vec)>, } #[derive(Debug)] struct PeerState { - established_connections_num: usize, - sending: Arc>, + /// Keeps track of established connections. + /// + /// A connection is removed from this list if one of the following happens: + /// + /// * Connection closure is triggered. + /// * `ClientConnectionHandler` did not receive the `SendWantlist` request. In other + /// words the `RECEIVE_REQUEST_TIMEOUT` is triggered. + /// * `ClientConnectionHandler` failed to allocate a communication channel with the + /// other peer. In other words the `START_SENDING_TIMEOUT` is triggered. + /// * Communication channel with the peer was closed unexpectedly. This can happen for example when + /// the TCP conection is closed. + established_connections: FnvHashSet, + sending_state: SendingState, wantlist: WantlistState, send_full: bool, } +/// Sending state of the `ClientConnectionHandler`. +/// +/// This exists in two different places: +/// +/// * `PeerState` in `ClientBehaviour` +/// * `ClientConnectionHandler` +/// +/// The changes are synchronized via events. See the following on +/// why this designed was chosen: +/// +/// * https://github.com/eigerco/lumina/issues/257 +/// * https://github.com/eigerco/beetswap/pull/36 #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[doc(hidden)] pub enum SendingState { + /// All `ClientConnectionHandler` are ready to send new messages. + /// + /// NOTE: Each peer can have multiple `ClientConnectionHandler`. Ready, - Sending, - Poisoned, + /// `ClientBehaviour` requested to send a message via `ClientConnectionHandler` with `ConnectionId`. + Requested(Instant, ConnectionId), + /// `ClientConnectionHandler` with `ConnectionId` received the request. + RequestReceived(Instant, ConnectionId), + /// `ClientConnectionHandler` with `ConnectionId` started sending the message. + Sending(Instant, ConnectionId), + /// `ClientConnectionHandler` with `ConnectionId` failed to send the message. + Failed(ConnectionId), } impl ClientBehaviour @@ -114,37 +147,47 @@ where tasks: FuturesUnordered::new(), query_abort_handle: FnvHashMap::default(), next_query_id: 0, - waker: Arc::new(AtomicWaker::new()), send_full_timer: Delay::new(SEND_FULL_INTERVAL), new_blocks: Vec::new(), } } - pub(crate) fn new_connection_handler(&mut self, peer: PeerId) -> ClientConnectionHandler { - let peer = self.peers.entry(peer).or_insert_with(|| PeerState { - established_connections_num: 0, - sending: Arc::new(Mutex::new(SendingState::Ready)), + pub(crate) fn new_connection_handler( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + ) -> ClientConnectionHandler { + let peer = self.peers.entry(peer_id).or_insert_with(|| PeerState { + established_connections: FnvHashSet::default(), + sending_state: SendingState::Ready, wantlist: WantlistState::new(), send_full: true, }); - peer.established_connections_num += 1; + peer.established_connections.insert(connection_id); ClientConnectionHandler { + peer_id, + connection_id, + queue: VecDeque::new(), protocol: self.protocol.clone(), - stream_requested: false, - sink: None, - wantlist: None, - sending_state: None, - behaviour_waker: Arc::new(AtomicWaker::new()), + msg: None, + sink_state: SinkState::None, + sending_state: SendingState::Ready, + closing: false, + halted: false, + start_sending_timeout: None, } } - pub(crate) fn on_connection_closed(&mut self, peer: PeerId) { + pub(crate) fn on_connection_closed(&mut self, peer: PeerId, connection_id: ConnectionId) { if let hash_map::Entry::Occupied(mut entry) = self.peers.entry(peer) { - entry.get_mut().established_connections_num -= 1; + entry + .get_mut() + .established_connections + .remove(&connection_id); - if entry.get_mut().established_connections_num == 0 { + if entry.get().established_connections.is_empty() { entry.remove(); } } @@ -274,39 +317,68 @@ where } } + pub(crate) fn sending_state_changed(&mut self, peer_id: PeerId, state: SendingState) { + if let Some(peer) = self.peers.get_mut(&peer_id) { + peer.sending_state = state; + } + } + fn update_handlers(&mut self) -> bool { let mut handler_updated = false; + let mut peers_without_connection = SmallVec::<[PeerId; 8]>::new(); for (peer, state) in self.peers.iter_mut() { - let mut sending_state = state.sending.lock().unwrap(); - - // Decide if full list is needed or not. - let send_full = match &*sending_state { - SendingState::Sending => { - if Arc::strong_count(&state.sending) == 1 { - // `Sending` state with strong count of 1 can happen only - // when the connection is dropped just before it reads our - // event. In this case we treat is with the same way as `Poisoned` - // state. - true - } else { - // ClientConnectionHandler will wake us when we can retry + // Clear out bad connections. In case of a bad connection we + // must send the full wantlist because we don't know what + // the remote peer has received. + match state.sending_state { + SendingState::Ready => { + // Allowed to send + } + SendingState::Requested(instant, connection_id) => { + if instant.elapsed() < RECEIVE_REQUEST_TIMEOUT { + // Sending in progress continue; } + // Bad connection. + // `ClientConnectionHandler` didn't receive `SendWantlist` request before timeout. + state.established_connections.remove(&connection_id); + state.send_full = true; + state.sending_state = SendingState::Ready; + } + SendingState::RequestReceived(..) => { + // Stream allocation in progress + continue; + } + SendingState::Sending(..) => { + // Sending in progress + continue; } - SendingState::Ready => state.send_full, - // State is poisoned, send full list to recover. - SendingState::Poisoned => true, + SendingState::Failed(connection_id) => { + // Bad connection. + // `ClientConnectionHandler` failed to send wantlist because of network issues. + state.established_connections.remove(&connection_id); + state.send_full = true; + state.sending_state = SendingState::Ready; + } + }; + + let Some(connection_id) = state.established_connections.iter().next().copied() else { + peers_without_connection.push(*peer); + continue; }; - let wantlist = if send_full { + let wantlist = if state.send_full { state.wantlist.generate_proto_full(&self.wantlist) } else { + // NOTE: `generate_proto_update` alters the internal state of `WantlistState` + // each time it is called. So after calling it, any error should be recovered + // with `send_full`, even if the error happens before any byte leaves the wire. state.wantlist.generate_proto_update(&self.wantlist) }; // Allow empty entries to be sent when send_full flag is set. - if send_full { + if state.send_full { // Reset flag state.send_full = false; } else if wantlist.entries.is_empty() { @@ -316,22 +388,25 @@ where self.queue.push_back(ToSwarm::NotifyHandler { peer_id: peer.to_owned(), - handler: NotifyHandler::Any, - event: ToHandlerEvent::SendWantlist(wantlist, state.sending.clone()), + handler: NotifyHandler::One(connection_id), + event: ToHandlerEvent::SendWantlist(wantlist), }); - *sending_state = SendingState::Sending; + state.sending_state = SendingState::Requested(Instant::now(), connection_id); handler_updated = true; } + // Remove dead peers + for peer in peers_without_connection { + self.peers.remove(&peer); + } + // This is true if at least one handler is updated handler_updated } + /// This is polled by `Behaviour`, which is polled by `Swarm`. pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll> { - // Update waker - self.waker.register(cx.waker()); - loop { if let Some(ev) = self.queue.pop_front() { return Poll::Ready(ev); @@ -403,55 +478,88 @@ where } pub(crate) struct ClientConnectionHandler { + peer_id: PeerId, + connection_id: ConnectionId, + queue: VecDeque>, protocol: StreamProtocol, - stream_requested: bool, - sink: Option>, - /// Wantlist to be send - wantlist: Option, - /// Sending state of peer. - /// - /// Even if we have multiple concurrent connections with the peer, only - /// one of them will be sending and have this value filled. - sending_state: Option>>, - behaviour_waker: Arc, + msg: Option, + sink_state: SinkState, + sending_state: SendingState, + closing: bool, + halted: bool, + start_sending_timeout: Option, +} + +enum SinkState { + None, + Requested, + Ready(FramedWrite), } impl ClientConnectionHandler { + pub(crate) fn halted(&self) -> bool { + self.halted + } + pub(crate) fn set_stream(&mut self, stream: libp2p_swarm::Stream) { + if self.halted { + return; + } + // Convert `AsyncWrite` stream to `Sink` - self.sink = Some(FramedWrite::new(stream, Codec)); - self.stream_requested = false; + self.sink_state = SinkState::Ready(FramedWrite::new(stream, Codec)); } - pub(crate) fn send_wantlist( - &mut self, - wantlist: ProtoWantlist, - state: Arc>, - ) { - debug_assert!(self.wantlist.is_none()); - debug_assert!(self.sending_state.is_none()); + pub(crate) fn stream_allocation_failed(&mut self) { + if self.halted { + return; + } - self.wantlist = Some(wantlist); - self.sending_state = Some(state); + debug_assert!(matches!(self.sink_state, SinkState::Requested)); + // Reset state to force a new allocation in `poll`. + self.sink_state = SinkState::None; } - fn poll_outgoing_no_stream( + /// Initiate sending of a wantlist to the peer. + pub(crate) fn send_wantlist(&mut self, wantlist: ProtoWantlist) { + if self.halted { + return; + } + + debug_assert!(self.msg.is_none()); + debug_assert!(matches!(self.sending_state, SendingState::Ready)); + + self.msg = Some(Message { + wantlist: Some(wantlist), + ..Message::default() + }); + + self.change_sending_state(SendingState::RequestReceived( + Instant::now(), + self.connection_id, + )); + + // Before reaching the `Sending` state, a stream allocation must happen. + // This can take time or require multiple retries. We specify how much time we + // are willing to wait until `Sending` is reached. + self.start_sending_timeout = Some(Delay::new(START_SENDING_TIMEOUT)); + } + + /// Changes sending state if needed and informs `ClientBehaviour` if there is a change. + fn change_sending_state(&mut self, state: SendingState) { + if self.sending_state != state { + self.sending_state = state; + self.queue + .push_back(ToBehaviourEvent::SendingStateChanged(self.peer_id, state)); + } + } + + fn open_new_substream( &mut self, ) -> Poll< ConnectionHandlerEvent, StreamRequester, ToBehaviourEvent>, > { - // `stream_requested` already checked in `poll_outgoing` - debug_assert!(!self.stream_requested); - // `wantlist` and `sending_state` must be both `Some` or both `None` - debug_assert_eq!(self.wantlist.is_some(), self.sending_state.is_some()); - - if self.wantlist.is_none() { - // Nothing to send - return Poll::Pending; - } - - // There is data to send, so request a new stream. - self.stream_requested = true; + self.sink_state = SinkState::Requested; Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new( @@ -461,74 +569,116 @@ impl ClientConnectionHandler { }) } - fn on_sink_error(&mut self) { - self.sink.take(); + fn close_sink_on_error(&mut self, location: &str) { + warn!("sink operation failed, closing: {location}"); + self.sink_state = SinkState::None; + } + + /// This is polled when the `ConnectionHandler` task initiates the closing of the connection. + /// + /// This method needs to return all the remaining events that are going to be send to + /// the behaviour. It is polled in a stream-like fashion and stops when `Poll::Ready(None)` + /// is returned. + /// + /// After reaching this point, `poll` method will never be called again. + pub(crate) fn poll_close(&mut self, cx: &mut Context) -> Poll>> { + if !self.closing { + self.closing = true; + self.msg.take(); + + if let SinkState::Ready(mut sink) = mem::replace(&mut self.sink_state, SinkState::None) + { + // Close the sink but don't await for it. + let _ = sink.poll_close_unpin(cx); + } - if self.wantlist.is_none() { - if let Some(state) = self.sending_state.take() { - *state.lock().unwrap() = SendingState::Poisoned; - self.behaviour_waker.wake(); + // If sending is in progress, then we don't know how much data the other end received + // so we consider this as "failed". + if matches!( + self.sending_state, + SendingState::RequestReceived(..) | SendingState::Sending(..) + ) { + self.change_sending_state(SendingState::Failed(self.connection_id)); } + + self.queue + .push_back(ToBehaviourEvent::ClientClosingConnection( + self.peer_id, + self.connection_id, + )); } + + Poll::Ready(self.queue.pop_front()) } - fn poll_outgoing( - &mut self, - cx: &mut Context, - ) -> Poll< - ConnectionHandlerEvent, StreamRequester, ToBehaviourEvent>, - > { + /// Each connection has its own dedicated task, which polls this method. + pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll> { loop { - if self.stream_requested { - // We can not progress until we have a stream - return Poll::Pending; + if let Some(ev) = self.queue.pop_front() { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(ev)); } - let Some(sink) = self.sink.as_mut() else { - return self.poll_outgoing_no_stream(); - }; + if self.halted { + return Poll::Pending; + } - // Send the ongoing message before we continue to a new one - if ready!(sink.poll_flush_unpin(cx)).is_err() { - // Sink closed unexpectedly, retry - self.on_sink_error(); - continue; + if let Some(delay) = &mut self.start_sending_timeout { + // If we have never reached the `Sending` state within the specified + // time, we abort and halt this connection. + if delay.poll_unpin(cx).is_ready() { + self.start_sending_timeout.take(); + self.msg.take(); + self.close_sink_on_error("start_sending_timeout"); + self.change_sending_state(SendingState::Failed(self.connection_id)); + self.halted = true; + continue; + } } - let Some(wantlist) = self.wantlist.take() else { - // Nothing to send - if let Some(state) = self.sending_state.take() { - *state.lock().unwrap() = SendingState::Ready; - self.behaviour_waker.wake(); + match (&mut self.msg, &mut self.sink_state) { + (None, SinkState::None) => return Poll::Pending, + (Some(_), SinkState::None) => return self.open_new_substream(), + (_, SinkState::Requested) => return Poll::Pending, + (None, SinkState::Ready(sink)) => { + // When `poll_flush` returns `Ok`, it means the sending just finished. + // When `poll_flush` returns `Err`, it means the sending just failed. + if ready!(sink.poll_flush_unpin(cx)).is_err() { + self.close_sink_on_error("poll_flush_unpin"); + self.change_sending_state(SendingState::Failed(self.connection_id)); + continue; + } + + // Sending finished and we have nothing else to send, so we close the stream. + let _ = sink.poll_close_unpin(cx); + self.sink_state = SinkState::None; + self.change_sending_state(SendingState::Ready); } - return Poll::Pending; - }; + (msg @ Some(_), SinkState::Ready(sink)) => { + if ready!(sink.poll_ready_unpin(cx)).is_err() { + self.close_sink_on_error("poll_ready_unpin"); + continue; + } - let message = Message { - wantlist: Some(wantlist), - ..Message::default() - }; + let msg = msg.take().expect("msg is always Some here"); - if sink.start_send_unpin(&message).is_err() { - // Something went wrong, retry - self.on_sink_error(); - } + if sink.start_send_unpin(&msg).is_err() { + self.msg = Some(msg); + self.close_sink_on_error("start_send_unpin"); + continue; + } - // Loop again, so `poll_flush` will be called and register a waker. - } - } + // Stop the timer because sending started + self.start_sending_timeout = None; - pub(crate) fn poll( - &mut self, - cx: &mut Context, - ) -> Poll< - ConnectionHandlerEvent, StreamRequester, ToBehaviourEvent>, - > { - if let Poll::Ready(ready) = self.poll_outgoing(cx) { - return Poll::Ready(ready); - } + self.change_sending_state(SendingState::Sending( + Instant::now(), + self.connection_id, + )); - Poll::Pending + // Loop again, so `poll_flush` will be called and register a waker. + } + } + } } } @@ -538,377 +688,696 @@ impl fmt::Debug for ClientConnectionHandler { } } -impl Drop for ClientConnectionHandler { - fn drop(&mut self) { - if let Some(state) = self.sending_state.take() { - let mut state = state.lock().unwrap(); - - // If sending was never done - if *state == SendingState::Sending { - *state = SendingState::Poisoned; - self.behaviour_waker.wake(); - } - } - } -} - #[cfg(test)] mod tests { use super::*; - use crate::proto::message::mod_Message::mod_Wantlist::WantType; + use crate::cid_prefix::CidPrefix; + use crate::proto::message::mod_Message::mod_Wantlist::{Entry, WantType}; + use crate::proto::message::mod_Message::{Block, BlockPresence, Wantlist}; use crate::test_utils::{cid_of_data, poll_fn_once}; + use crate::Behaviour; + use asynchronous_codec::FramedRead; use blockstore::InMemoryBlockstore; - use std::future::poll_fn; - - #[tokio::test] - async fn get_known_cid() { - let mut client = new_client().await; - - let cid1 = cid_of_data(b"1"); - let query_id1 = client.get(&cid1); - - let cid2 = cid_of_data(b"2"); - let query_id2 = client.get(&cid2); - - for _ in 0..2 { - let ev = poll_fn(|cx| client.poll(cx)).await; - - match ev { - ToSwarm::GenerateEvent(Event::GetQueryResponse { query_id, data }) => { - if query_id == query_id1 { - assert_eq!(data, b"1"); - } else if query_id == query_id2 { - assert_eq!(data, b"2"); - } else { - unreachable!() - } - } - _ => unreachable!(), - } - } - } + use futures::future::{self, Either}; + use libp2p_stream::IncomingStreams; + use libp2p_swarm::Swarm; + use libp2p_swarm_test::SwarmExt; + use std::pin::pin; + use tokio::time::sleep; #[tokio::test] async fn get_unknown_cid_responds_with_have() { - let mut client = new_client().await; - - let peer1 = PeerId::random(); - let mut _conn1 = client.new_connection_handler(peer1); - - let peer2 = PeerId::random(); - let mut _conn2 = client.new_connection_handler(peer2); + let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new()); + let mut client = + Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new())); + + let (mut server_control, mut server_incoming_streams) = + connect_to_server(&mut client, server).await; + + // Initial full list sent to server + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![], + full: true, + }), + ..Default::default() + }] + ); let cid1 = cid_of_data(b"x1"); - let _query_id1 = client.get(&cid1); - - // Wantlist will be generated for both peers - for _ in 0..2 { - // wantlist with Have request will be generated - let ev = poll_fn(|cx| client.poll(cx)).await; - let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); - - assert!(peer_id == peer1 || peer_id == peer2); - assert_eq!(wantlist.entries.len(), 1); - assert!(wantlist.full); - - let entry = &wantlist.entries[0]; - assert_eq!(entry.block, cid1.to_bytes()); - assert!(!entry.cancel); - assert_eq!(entry.wantType, WantType::Have); - assert!(entry.sendDontHave); - - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; - } - - // Simulate that peer1 responsed with Have - let mut client_msg = ClientMessage::default(); - client_msg - .block_presences - .insert(cid1, BlockPresenceType::Have); - client.process_incoming_message(peer1, client_msg); - - // wantlist with Block request will be generated - let ev = poll_fn(|cx| client.poll(cx)).await; - let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); - - assert_eq!(peer_id, peer1); - assert_eq!(wantlist.entries.len(), 1); - assert!(!wantlist.full); - - let entry = &wantlist.entries[0]; - assert_eq!(entry.block, cid1.to_bytes()); - assert!(!entry.cancel); - assert_eq!(entry.wantType, WantType::Block); - assert!(entry.sendDontHave); - - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; + let _query_id1 = client.behaviour_mut().get(&cid1); + + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid1.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + }], + full: false, + }), + ..Default::default() + }] + ); + + send_message_to_client( + &mut server_control, + &mut client, + Message { + wantlist: None, + payload: vec![], + blockPresences: vec![BlockPresence { + cid: cid1.to_bytes(), + type_pb: BlockPresenceType::Have, + }], + pendingBytes: 0, + }, + ) + .await; + + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid1.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Block, + sendDontHave: true, + }], + full: false, + }), + payload: vec![], + blockPresences: vec![], + pendingBytes: 0 + }] + ); } #[tokio::test] async fn get_unknown_cid_responds_with_dont_have() { - let mut client = new_client().await; - - let peer1 = PeerId::random(); - let mut _conn1 = client.new_connection_handler(peer1); - - let peer2 = PeerId::random(); - let mut _conn2 = client.new_connection_handler(peer2); + let server1 = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new()); + let server2 = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new()); + let mut client = + Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new())); + + let (mut server1_control, mut server1_incoming_streams) = + connect_to_server(&mut client, server1).await; + let (_server2_control, mut server2_incoming_streams) = + connect_to_server(&mut client, server2).await; + + // Initial full list sent to server1 + let msgs = collect_incoming_messages(&mut server1_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![], + full: true, + }), + ..Default::default() + }] + ); + + // Initial full list sent to server2 + let msgs = collect_incoming_messages(&mut server2_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![], + full: true, + }), + ..Default::default() + }] + ); let cid1 = cid_of_data(b"x1"); - let _query_id1 = client.get(&cid1); - - // Wantlist will be generated for both peers - for _ in 0..2 { - // wantlist with Have request will be generated - let ev = poll_fn(|cx| client.poll(cx)).await; - let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); - - assert!(peer_id == peer1 || peer_id == peer2); - assert_eq!(wantlist.entries.len(), 1); - assert!(wantlist.full); - - let entry = &wantlist.entries[0]; - assert_eq!(entry.block, cid1.to_bytes()); - assert!(!entry.cancel); - assert_eq!(entry.wantType, WantType::Have); - assert!(entry.sendDontHave); - - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; - } - - // Simulate that peer1 responsed with DontHave - let mut client_msg = ClientMessage::default(); - client_msg - .block_presences - .insert(cid1, BlockPresenceType::DontHave); - client.process_incoming_message(peer1, client_msg); - - // Simulate that full wantlist is needed - for peer_state in client.peers.values_mut() { + let _query_id1 = client.behaviour_mut().get(&cid1); + + let expected_msgs = vec![Message { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid1.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + }], + full: false, + }), + payload: vec![], + blockPresences: vec![], + pendingBytes: 0, + }]; + + let msgs = collect_incoming_messages(&mut server1_incoming_streams, &mut client).await; + assert_eq!(&msgs, &expected_msgs); + + let msgs = collect_incoming_messages(&mut server2_incoming_streams, &mut client).await; + assert_eq!(&msgs, &expected_msgs); + + send_message_to_client( + &mut server1_control, + &mut client, + Message { + wantlist: None, + payload: vec![], + blockPresences: vec![BlockPresence { + cid: cid1.to_bytes(), + type_pb: BlockPresenceType::DontHave, + }], + pendingBytes: 0, + }, + ) + .await; + + // Mark that full wantlist must be send + for peer_state in client.behaviour_mut().client.peers.values_mut() { peer_state.send_full = true; } - for _ in 0..2 { - let ev = poll_fn(|cx| client.poll(cx)).await; - let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); - - if peer_id == peer1 { - // full wantlist of peer1 will be empty because it alreayd replied with DontHave - assert!(wantlist.entries.is_empty()); - assert!(wantlist.full); - } else if peer_id == peer2 { - assert_eq!(wantlist.entries.len(), 1); - assert!(wantlist.full); - - let entry = &wantlist.entries[0]; - assert_eq!(entry.block, cid1.to_bytes()); - assert!(!entry.cancel); - assert_eq!(entry.wantType, WantType::Have); - assert!(entry.sendDontHave); - } else { - panic!("Unknown peer id"); - } - - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; - } - - // No other events should be produced - assert!(dbg!(poll_fn_once(|cx| client.poll(cx)).await).is_none()); + // `client` sends a full wantlist to `server1` but without the `cid1` because server + // already replied with DontHave. + let msgs = collect_incoming_messages(&mut server1_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![], + full: true, + }), + ..Default::default() + }] + ); + + let msgs = collect_incoming_messages(&mut server2_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid1.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + }], + full: true, + }), + ..Default::default() + }] + ); } #[tokio::test] async fn get_unknown_cid_responds_with_block() { - let mut client = new_client().await; - - let peer = PeerId::random(); - let mut _conn = client.new_connection_handler(peer); - - let cid1 = cid_of_data(b"x1"); - let query_id1 = client.get(&cid1); + let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new()); + let mut client = + Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new())); + + let (mut server_control, mut server_incoming_streams) = + connect_to_server(&mut client, server).await; + + // Initial full list sent to server + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![], + full: true, + }), + ..Default::default() + }] + ); + + let data1 = b"x1"; + let cid1 = cid_of_data(data1); + let query_id1 = client.behaviour_mut().get(&cid1); + + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid1.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + }], + full: false, + }), + payload: vec![], + blockPresences: vec![], + pendingBytes: 0 + }] + ); + + let ev = send_message_to_client_and_wait_beheviour_event( + &mut server_control, + &mut client, + Message { + wantlist: None, + payload: vec![Block { + prefix: CidPrefix::from_cid(&cid1).to_bytes(), + data: data1.to_vec(), + }], + blockPresences: vec![], + pendingBytes: 0, + }, + ) + .await; + + let (query_id, data) = unwrap_get_query_reponse(ev); + assert_eq!(query_id, query_id1); + assert_eq!(data, data1); - // wantlist with Have request will be generated - let ev = poll_fn(|cx| client.poll(cx)).await; - let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); + // Poll once more for the store to be updated. This does not produce an event. + poll_fn_once(|cx| client.poll_next_unpin(cx)).await; + assert_eq!( + client + .behaviour() + .client + .store + .get(&cid1) + .await + .unwrap() + .unwrap(), + data1 + ); + } - assert_eq!(peer_id, peer); - assert_eq!(wantlist.entries.len(), 1); - assert!(wantlist.full); + #[tokio::test] + async fn update_wantlist() { + let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new()); + let mut client = + Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new())); + + let (_server_control, mut server_incoming_streams) = + connect_to_server(&mut client, server).await; + + // Initial full list sent to server + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![], + full: true, + }), + ..Default::default() + }] + ); - let entry = &wantlist.entries[0]; - assert_eq!(entry.block, cid1.to_bytes()); - assert!(!entry.cancel); - assert_eq!(entry.wantType, WantType::Have); - assert!(entry.sendDontHave); + let cid1 = cid_of_data(b"x1"); + let cid2 = cid_of_data(b"x2"); + let cid3 = cid_of_data(b"x3"); - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; + let _query_id1 = client.behaviour_mut().get(&cid1); + let _query_id2 = client.behaviour_mut().get(&cid2); + + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![ + Entry { + block: cid2.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + }, + Entry { + block: cid1.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + } + ], + full: false, + }), + ..Default::default() + }] + ); + + let _query_id3 = client.behaviour_mut().get(&cid3); + + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid3.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + }], + full: false, + }), + ..Default::default() + }] + ); + } - // Simulate that peer responsed with a block - let mut client_msg = ClientMessage::default(); - client_msg.blocks.insert(cid1, b"x1".to_vec()); - client.process_incoming_message(peer, client_msg); + #[tokio::test] + async fn request_then_cancel() { + let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new()); + let mut client = + Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new())); + + let (_server_control, mut server_incoming_streams) = + connect_to_server(&mut client, server).await; + + // Initial full list sent to server + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![], + full: true, + }), + ..Default::default() + }] + ); - // Receive an event with the found data - let ev = poll_fn(|cx| client.poll(cx)).await; + let cid1 = cid_of_data(b"x1"); + let cid2 = cid_of_data(b"x2"); - let (query_id, data) = match ev { - ToSwarm::GenerateEvent(Event::GetQueryResponse { query_id, data }) => (query_id, data), - _ => unreachable!(), - }; + let query_id1 = client.behaviour_mut().get(&cid1); + let query_id2 = client.behaviour_mut().get(&cid2); - assert_eq!(query_id, query_id1); - assert_eq!(data, b"x1"); + // This cancel will not generate any messages because request was not send yet + client.behaviour_mut().cancel(query_id2); + + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid1.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + },], + full: false, + }), + ..Default::default() + }] + ); - // Poll once more for the store to be updated. This does not produce an event. - poll_fn_once(|cx| client.poll(cx)).await; - assert_eq!(client.store.get(&cid1).await.unwrap().unwrap(), b"x1"); + // This cancel should produce a message for cancelling the request + client.behaviour_mut().cancel(query_id1); + + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid1.to_bytes(), + cancel: true, + ..Default::default() + },], + full: false, + }), + ..Default::default() + }] + ); } #[tokio::test] - async fn full_wantlist_then_update() { - let mut client = new_client().await; - - let peer = PeerId::random(); - let mut _conn = client.new_connection_handler(peer); + async fn request_before_connect() { + let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new()); + let mut client = + Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new())); let cid1 = cid_of_data(b"x1"); - let _query_id1 = client.get(&cid1); - let cid2 = cid_of_data(b"x2"); - let _query_id2 = client.get(&cid2); + let cid3 = cid_of_data(b"x3"); - let ev = poll_fn(|cx| client.poll(cx)).await; + let _query_id1 = client.behaviour_mut().get(&cid1); + let query_id2 = client.behaviour_mut().get(&cid2); + let _query_id3 = client.behaviour_mut().get(&cid3); + + // Cancel request of `cid2`. + client.behaviour_mut().cancel(query_id2); + + let (_server_control, mut server_incoming_streams) = + connect_to_server(&mut client, server).await; + + // Initial full list sent to server should contain `cid1` and `cid3`. + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![ + Entry { + block: cid3.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + }, + Entry { + block: cid1.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + } + ], + full: true, + }), + ..Default::default() + }] + ); + } - let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); + #[tokio::test] + async fn get_known_cid() { + let data1 = b"x1"; + let cid1 = cid_of_data(data1); + let cid2 = cid_of_data(b"x2"); - assert_eq!(peer_id, peer); - assert_eq!(wantlist.entries.len(), 2); - assert!(wantlist.full); + let blockstore = InMemoryBlockstore::<64>::new(); + blockstore.put_keyed(&cid1, data1).await.unwrap(); + + let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new()); + let mut client = Swarm::new_ephemeral(move |_| Behaviour::<64, _>::new(blockstore)); + + let (_server_control, mut server_incoming_streams) = + connect_to_server(&mut client, server).await; + + // Initial full list sent to server + let msgs = collect_incoming_messages(&mut server_incoming_streams, &mut client).await; + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![], + full: true, + }), + ..Default::default() + }] + ); + + let query_id1 = client.behaviour_mut().get(&cid1); + let _query_id2 = client.behaviour_mut().get(&cid2); + + let (msgs, ev) = collect_incoming_messages_and_behaviour_event( + &mut server_incoming_streams, + &mut client, + ) + .await; + + // `cid1` is known, so client replies without sending a request. + let (query_id, data) = unwrap_get_query_reponse(ev); + assert_eq!(query_id, query_id1); + assert_eq!(data, data1); + + // `cid2` is not know, so client sends a request. + assert_eq!( + msgs, + vec![Message { + wantlist: Some(Wantlist { + entries: vec![Entry { + block: cid2.to_bytes(), + priority: 1, + cancel: false, + wantType: WantType::Have, + sendDontHave: true, + },], + full: false, + }), + ..Default::default() + }] + ); + } - let entry1 = wantlist - .entries - .iter() - .find(|item| item.block == cid1.to_bytes()) - .unwrap(); - assert!(!entry1.cancel); - assert_eq!(entry1.wantType, WantType::Have); - assert!(entry1.sendDontHave); - - let entry2 = wantlist - .entries - .iter() - .find(|item| item.block == cid2.to_bytes()) + async fn connect_to_server( + client: &mut Swarm>>, + mut server: Swarm, + ) -> (libp2p_stream::Control, libp2p_stream::IncomingStreams) { + let mut server_control = server.behaviour().new_control(); + let server_incoming_streams = server_control + .accept(StreamProtocol::new("/ipfs/bitswap/1.2.0")) .unwrap(); - assert!(!entry2.cancel); - assert_eq!(entry2.wantType, WantType::Have); - assert!(entry2.sendDontHave); - - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; - - let cid3 = cid_of_data(b"x3"); - let _query_id3 = client.get(&cid3); - - let ev = poll_fn(|cx| client.poll(cx)).await; - let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); - assert_eq!(peer_id, peer); - assert_eq!(wantlist.entries.len(), 1); - assert!(!wantlist.full); + server.listen().with_memory_addr_external().await; + client.connect(&mut server).await; - let entry = &wantlist.entries[0]; - assert_eq!(entry.block, cid3.to_bytes()); - assert!(!entry.cancel); - assert_eq!(entry.wantType, WantType::Have); - assert!(entry.sendDontHave); + // Server can be controled by `server_control` but it still needs + // to be driven by the executor. + tokio::spawn(server.loop_on_next()); - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; + (server_control, server_incoming_streams) } - #[tokio::test] - async fn request_then_cancel() { - let mut client = new_client().await; - - let peer = PeerId::random(); - let mut _conn = client.new_connection_handler(peer); - - let cid1 = cid_of_data(b"x1"); - let query_id1 = client.get(&cid1); + async fn collect_incoming_messages( + server_incoming_streams: &mut IncomingStreams, + client: &mut Swarm>>, + ) -> Vec { + let server_fut = pin!(async { + let (peer_id, stream) = server_incoming_streams.next().await.unwrap(); + let stream = FramedRead::new(stream, Codec); + let msgs = stream.map(|res| res.unwrap()).collect::>().await; + (peer_id, msgs) + }); - let cid2 = cid_of_data(b"x2"); - let query_id2 = client.get(&cid2); + let client_peer_id = *client.local_peer_id(); + let client_fut = pin!(client.next_behaviour_event()); - // This cancel will not generate any messages because request was not send yet - client.cancel(query_id2); + match future::select(server_fut, client_fut).await { + Either::Left(((peer_id, mut msgs), _)) => { + assert_eq!(peer_id, client_peer_id); - // wantlist with Have request will be generated - let ev = poll_fn(|cx| client.poll(cx)).await; - let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); + // Sort message for easier testing + for msg in &mut msgs { + if let Some(wantlist) = &mut msg.wantlist { + wantlist + .entries + .sort_by(|entry1, entry2| entry1.block.cmp(&entry2.block)); + } - assert_eq!(peer_id, peer); - assert_eq!(wantlist.entries.len(), 1); - assert!(wantlist.full); + msg.payload + .sort_by(|block1, block2| block1.data.cmp(&block2.data)); + msg.blockPresences + .sort_by(|presence1, presence2| presence1.cid.cmp(&presence2.cid)); + } - let entry = &wantlist.entries[0]; - assert_eq!(entry.block, cid1.to_bytes()); - assert!(!entry.cancel); - assert_eq!(entry.wantType, WantType::Have); - assert!(entry.sendDontHave); + msgs + } + Either::Right((ev, _)) => panic!("Received behaviour event on client: {ev:?}"), + } + } - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; + async fn collect_incoming_messages_and_behaviour_event( + server_incoming_streams: &mut IncomingStreams, + client: &mut Swarm>>, + ) -> (Vec, Event) { + let mut server_fut = async { + let (peer_id, stream) = server_incoming_streams.next().await.unwrap(); + let stream = FramedRead::new(stream, Codec); + let msgs = stream.map(|res| res.unwrap()).collect::>().await; + (peer_id, msgs) + } + .boxed() + .fuse(); + + let mut msgs = None; + let mut ev = None; + + // We need to keep polling `client` even after it generates an event + // otherwise `server_fut` will not progress. + while msgs.is_none() || ev.is_none() { + tokio::select! { + (peer_id, m) = &mut server_fut => { + assert_eq!(peer_id, *client.local_peer_id()); + msgs = Some(m); + } + e = client.next_behaviour_event() => { + assert!(ev.is_none()); + ev = Some(e); + } + } + } - // This cancel should produce a message for cancelling the request - client.cancel(query_id1); + (msgs.unwrap(), ev.unwrap()) + } - // wantlist with Cancel request will be generated - let ev = poll_fn(|cx| client.poll(cx)).await; - let (peer_id, wantlist, _) = expect_send_wantlist_event(ev); + async fn send_message_to_client( + server_control: &mut libp2p_stream::Control, + client: &mut Swarm>>, + msg: Message, + ) { + let client_peer_id = *client.local_peer_id(); - assert_eq!(peer_id, peer); - assert_eq!(wantlist.entries.len(), 1); - assert!(!wantlist.full); + let server_fut = pin!(async { + let stream = server_control + .open_stream(client_peer_id, StreamProtocol::new("/ipfs/bitswap/1.2.0")) + .await + .unwrap(); + let mut stream = FramedWrite::new(stream, Codec); + stream.send(&msg).await.unwrap(); + // Wait a bit for the client to process it + sleep(Duration::from_millis(10)).await; + }); - let entry = &wantlist.entries[0]; - assert_eq!(entry.block, cid1.to_bytes()); - assert!(entry.cancel); + let client_fut = pin!(client.next_behaviour_event()); - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; + match future::select(server_fut, client_fut).await { + Either::Left((_, _)) => {} + Either::Right((ev, _)) => panic!("Received behaviour event on client: {ev:?}"), + } } - async fn blockstore() -> Arc> { - let store = Arc::new(InMemoryBlockstore::<64>::new()); + async fn send_message_to_client_and_wait_beheviour_event( + server_control: &mut libp2p_stream::Control, + client: &mut Swarm>>, + msg: Message, + ) -> Event { + let client_peer_id = *client.local_peer_id(); - for i in 0..16 { - let data = format!("{i}").into_bytes(); - let cid = cid_of_data(&data); - store.put_keyed(&cid, &data).await.unwrap(); - } + let server_fut = pin!(async { + let stream = server_control + .open_stream(client_peer_id, StreamProtocol::new("/ipfs/bitswap/1.2.0")) + .await + .unwrap(); + let mut stream = FramedWrite::new(stream, Codec); + stream.send(&msg).await.unwrap(); + }); - store - } + let client_fut = pin!(client.next_behaviour_event()); - async fn new_client() -> ClientBehaviour<64, InMemoryBlockstore<64>> { - let store = blockstore().await; - ClientBehaviour::<64, _>::new(ClientConfig::default(), store, None) + future::join(server_fut, client_fut).await.1 } - fn expect_send_wantlist_event( - ev: ToSwarm, - ) -> (PeerId, ProtoWantlist, Arc>) { + fn unwrap_get_query_reponse(ev: Event) -> (QueryId, Vec) { match ev { - ToSwarm::NotifyHandler { - peer_id, - event: ToHandlerEvent::SendWantlist(wantlist, send_state), - .. - } => (peer_id, wantlist, send_state), - ev => panic!("Expecting ToHandlerEvent::SendWantlist, found {ev:?}"), + Event::GetQueryResponse { query_id, data } => (query_id, data), + ev => panic!("Expected Event::GetQueryResponse, got {ev:?}"), } } } diff --git a/src/lib.rs b/src/lib.rs index 65755e3..8a4a538 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,8 @@ #![cfg_attr(docs_rs, feature(doc_cfg))] #![doc = include_str!("../README.md")] -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; +use std::sync::Arc; +use std::task::{ready, Context, Poll}; use blockstore::{Blockstore, BlockstoreError}; use cid::CidGeneric; @@ -47,7 +47,7 @@ where B: Blockstore + 'static, { protocol: StreamProtocol, - client: ClientBehaviour, + pub(crate) client: ClientBehaviour, server: ServerBehaviour, multihasher: Arc>, } @@ -129,7 +129,7 @@ where fn handle_established_inbound_connection( &mut self, - _connection_id: ConnectionId, + connection_id: ConnectionId, peer: PeerId, _local_addr: &Multiaddr, _remote_addr: &Multiaddr, @@ -137,7 +137,7 @@ where Ok(ConnHandler { peer, protocol: self.protocol.clone(), - client_handler: self.client.new_connection_handler(peer), + client_handler: self.client.new_connection_handler(peer, connection_id), server_handler: self.server.new_connection_handler(peer), incoming_streams: SelectAll::new(), multihasher: self.multihasher.clone(), @@ -146,7 +146,7 @@ where fn handle_established_outbound_connection( &mut self, - _connection_id: ConnectionId, + connection_id: ConnectionId, peer: PeerId, _addr: &Multiaddr, _role_override: Endpoint, @@ -154,7 +154,7 @@ where Ok(ConnHandler { peer, protocol: self.protocol.clone(), - client_handler: self.client.new_connection_handler(peer), + client_handler: self.client.new_connection_handler(peer, connection_id), server_handler: self.server.new_connection_handler(peer), incoming_streams: SelectAll::new(), multihasher: self.multihasher.clone(), @@ -164,8 +164,12 @@ where fn on_swarm_event(&mut self, event: FromSwarm) { #[allow(clippy::single_match)] match event { - FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => { - self.client.on_connection_closed(peer_id); + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) => { + self.client.on_connection_closed(peer_id, connection_id); } _ => {} } @@ -191,6 +195,12 @@ where trace!("received new blocks: {}", blocks.len()); self.server.new_blocks_available(blocks); } + ToBehaviourEvent::SendingStateChanged(peer_id, state) => { + self.client.sending_state_changed(peer_id, state); + } + ToBehaviourEvent::ClientClosingConnection(peer_id, connection_id) => { + self.client.on_connection_closed(peer_id, connection_id); + } } } @@ -222,12 +232,14 @@ where pub enum ToBehaviourEvent { IncomingMessage(PeerId, IncomingMessage), NewBlocksAvailable(Vec<(CidGeneric, Vec)>), + SendingStateChanged(PeerId, SendingState), + ClientClosingConnection(PeerId, ConnectionId), } #[derive(Debug)] #[doc(hidden)] pub enum ToHandlerEvent { - SendWantlist(ProtoWantlist, Arc>), + SendWantlist(ProtoWantlist), QueueOutgoingMessages(Vec<(Vec, Vec)>), } @@ -248,6 +260,9 @@ pub struct ConnHandler { multihasher: Arc>, } +pub(crate) type ConnHandlerEvent = + ConnectionHandlerEvent, StreamRequester, ToBehaviourEvent>; + impl ConnectionHandler for ConnHandler { type ToBehaviour = ToBehaviourEvent; type FromBehaviour = ToHandlerEvent; @@ -262,8 +277,8 @@ impl ConnectionHandler for ConnHandler { - self.client_handler.send_wantlist(wantlist, state); + ToHandlerEvent::SendWantlist(wantlist) => { + self.client_handler.send_wantlist(wantlist); } ToHandlerEvent::QueueOutgoingMessages(data) => { self.server_handler.queue_messages(data); @@ -286,6 +301,12 @@ impl ConnectionHandler for ConnHandler self.client_handler.set_stream(outbound.protocol), StreamRequester::Server => self.server_handler.set_stream(outbound.protocol), }, + ConnectionEvent::DialUpgradeError(outbound) => match outbound.info { + StreamRequester::Client => self.client_handler.stream_allocation_failed(), + StreamRequester::Server => { + // TODO + } + }, ConnectionEvent::FullyNegotiatedInbound(ev) => { let stream = IncomingStream::new(ev.protocol, self.multihasher.clone()); self.incoming_streams.push(stream); @@ -295,20 +316,18 @@ impl ConnectionHandler for ConnHandler bool { - // TODO - true + !self.client_handler.halted() } - fn poll_close(&mut self, _cx: &mut Context) -> Poll> { + fn poll_close(&mut self, cx: &mut Context) -> Poll> { + if let Some(ev) = ready!(self.client_handler.poll_close(cx)) { + return Poll::Ready(Some(ev)); + } + Poll::Ready(None) } - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent, - > { + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { if let Poll::Ready(Some(msg)) = self.incoming_streams.poll_next_unpin(cx) { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( ToBehaviourEvent::IncomingMessage(self.peer, msg),