diff --git a/Cargo.lock b/Cargo.lock index ec75d775..859c6cf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,20 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "acto" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c372578ce4215ccf94ec3f3585fbb6a902e47d07b064ff8a55d850ffb5025e" +dependencies = [ + "parking_lot", + "pin-project-lite", + "rustc_version", + "smol_str", + "tokio", + "tracing", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -227,6 +241,61 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.1.0", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backoff" version = "0.4.0" @@ -300,6 +369,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597bb81c80a54b6a4381b23faba8d7774b144c94cbd1d6fe3f1329bd776554ab" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -486,7 +564,7 @@ version = "4.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.87", @@ -538,7 +616,7 @@ dependencies = [ "encode_unicode", "lazy_static", "libc", - "unicode-width", + "unicode-width 0.1.14", "windows-sys 0.52.0", ] @@ -946,7 +1024,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.87", @@ -1422,6 +1500,12 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -1440,6 +1524,30 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hickory-proto" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07698b8420e2f0d6447a436ba999ec85d8fbf2a398bbd737b82cac4a2e96e512" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.4.0", + "ipnet", + "once_cell", + "rand", + "thiserror 1.0.69", + "tinyvec", + "tokio", + "tracing", + "url", +] + [[package]] name = "hickory-proto" version = "0.25.0-alpha.2" @@ -1474,7 +1582,7 @@ checksum = "46c110355b5703070d9e29c344d79818a7cde3de9c27fc35750defea6074b0ad" dependencies = [ "cfg-if", "futures-util", - "hickory-proto", + "hickory-proto 0.25.0-alpha.2", "ipconfig", "lru-cache", "once_cell", @@ -1785,6 +1893,16 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "idna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "idna" version = "0.5.0" @@ -1849,15 +1967,15 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.17.8" +version = "0.17.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3" +checksum = "cbf675b85ed934d3c67b5c5469701eec7db22689d0a2139d856e0925fa28b281" dependencies = [ "console", - "instant", "number_prefix", "portable-atomic", - "unicode-width", + "unicode-width 0.2.0", + "web-time", ] [[package]] @@ -1905,10 +2023,48 @@ version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" +[[package]] +name = "iroh" +version = "0.28.1" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" +dependencies = [ + "anyhow", + "async-channel", + "bytes", + "cc", + "derive_more", + "futures-lite 2.5.0", + "futures-util", + "iroh-base", + "iroh-io", + "iroh-metrics", + "iroh-net", + "iroh-node-util", + "iroh-quinn", + "iroh-relay", + "iroh-router", + "nested_enum_utils", + "num_cpus", + "parking_lot", + "postcard", + "quic-rpc", + "quic-rpc-derive", + "ref-cast", + "serde", + "serde-error", + "strum 0.25.0", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", + "url", +] + [[package]] name = "iroh-base" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#1479b45f28edafe55ca890ce8c1a8b9d5d982914" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" dependencies = [ "aead", "anyhow", @@ -1966,6 +2122,7 @@ dependencies = [ "hex", "http-body", "indicatif", + "iroh", "iroh-base", "iroh-io", "iroh-metrics", @@ -1984,7 +2141,7 @@ dependencies = [ "quic-rpc-derive", "rand", "range-collections", - "rcgen", + "rcgen 0.12.1", "redb 1.5.1", "redb 2.2.0", "ref-cast", @@ -1996,7 +2153,7 @@ dependencies = [ "serde_json", "serde_test", "smallvec", - "strum", + "strum 0.26.3", "tempfile", "testdir", "testresult", @@ -2025,7 +2182,7 @@ dependencies = [ [[package]] name = "iroh-metrics" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#1479b45f28edafe55ca890ce8c1a8b9d5d982914" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" dependencies = [ "anyhow", "erased_set", @@ -2045,9 +2202,10 @@ dependencies = [ [[package]] name = "iroh-net" version = "0.28.1" -source = "git+https://github.com/n0-computer/iroh?branch=main#1479b45f28edafe55ca890ce8c1a8b9d5d982914" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" dependencies = [ "anyhow", + "axum", "backoff", "base64 0.22.1", "bytes", @@ -2063,7 +2221,7 @@ dependencies = [ "genawaiter", "governor", "hex", - "hickory-proto", + "hickory-proto 0.25.0-alpha.2", "hickory-resolver", "hostname", "http 1.1.0", @@ -2073,6 +2231,7 @@ dependencies = [ "igd-next", "iroh-base", "iroh-metrics", + "iroh-net-report", "iroh-quinn", "iroh-quinn-proto", "iroh-quinn-udp", @@ -2091,7 +2250,7 @@ dependencies = [ "portmapper", "postcard", "rand", - "rcgen", + "rcgen 0.13.1", "reqwest", "ring", "rtnetlink", @@ -2100,9 +2259,10 @@ dependencies = [ "serde", "smallvec", "socket2", - "strum", + "strum 0.26.3", "stun-rs", "surge-ping", + "swarm-discovery", "thiserror 1.0.69", "time", "tokio", @@ -2122,6 +2282,52 @@ dependencies = [ "z32", ] +[[package]] +name = "iroh-net-report" +version = "0.28.0" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" +dependencies = [ + "anyhow", + "bytes", + "derive_more", + "futures-buffered", + "futures-lite 2.5.0", + "hickory-resolver", + "iroh-base", + "iroh-metrics", + "iroh-relay", + "netwatch", + "portmapper", + "rand", + "reqwest", + "rustls", + "surge-ping", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "iroh-node-util" +version = "0.28.0" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" +dependencies = [ + "anyhow", + "futures-lite 2.5.0", + "iroh-net", + "nested_enum_utils", + "quic-rpc", + "quic-rpc-derive", + "serde", + "serde-error", + "strum 0.26.3", + "tempfile", + "tokio", + "tracing", +] + [[package]] name = "iroh-quinn" version = "0.12.0" @@ -2174,7 +2380,7 @@ dependencies = [ [[package]] name = "iroh-relay" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#1479b45f28edafe55ca890ce8c1a8b9d5d982914" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" dependencies = [ "anyhow", "base64 0.22.1", @@ -2188,7 +2394,7 @@ dependencies = [ "futures-util", "governor", "hex", - "hickory-proto", + "hickory-proto 0.25.0-alpha.2", "hickory-resolver", "hostname", "http 1.1.0", @@ -2204,7 +2410,7 @@ dependencies = [ "pin-project", "postcard", "rand", - "rcgen", + "rcgen 0.13.1", "regex", "reqwest", "ring", @@ -2234,7 +2440,7 @@ dependencies = [ [[package]] name = "iroh-router" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#1479b45f28edafe55ca890ce8c1a8b9d5d982914" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" dependencies = [ "anyhow", "futures-buffered", @@ -2310,9 +2516,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.162" +version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" +checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" [[package]] name = "libm" @@ -2423,6 +2629,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md5" version = "0.7.0" @@ -2580,7 +2792,7 @@ dependencies = [ [[package]] name = "netwatch" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#1479b45f28edafe55ca890ce8c1a8b9d5d982914" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" dependencies = [ "anyhow", "bytes", @@ -3106,7 +3318,7 @@ checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" [[package]] name = "portmapper" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#1479b45f28edafe55ca890ce8c1a8b9d5d982914" +source = "git+https://github.com/n0-computer/iroh?branch=main#4e3b4312381350de5ac29a79ce4df2ebf433744a" dependencies = [ "anyhow", "base64 0.22.1", @@ -3338,6 +3550,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc623a188942fc875926f7baeb2cb08ed4288b64f29072656eb051e360ee7623" dependencies = [ "anyhow", + "bincode", "derive_more", "educe", "flume", @@ -3345,10 +3558,12 @@ dependencies = [ "futures-sink", "futures-util", "hex", + "iroh-quinn", "pin-project", "serde", "slab", "tokio", + "tokio-serde", "tokio-util", "tracing", ] @@ -3514,6 +3729,19 @@ dependencies = [ "yasna", ] +[[package]] +name = "rcgen" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54077e1872c46788540de1ea3d7f4ccb1983d12f9aa909b234468676c1a36779" +dependencies = [ + "pem", + "ring", + "rustls-pki-types", + "time", + "yasna", +] + [[package]] name = "redb" version = "1.5.1" @@ -3652,7 +3880,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tokio-rustls", "tower-service", @@ -3770,9 +3998,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.40" +version = "0.38.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" +checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" dependencies = [ "bitflags 2.6.0", "errno", @@ -3783,9 +4011,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.16" +version = "0.23.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eee87ff5d9b36712a58574e12e9f0ea80f915a5b0ac518d322b24a465617925e" +checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" dependencies = [ "log", "once_cell", @@ -3909,9 +4137,9 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01227be5826fa0690321a2ba6c5cd57a19cf3f6a09e76973b58e61de6ab9d1c1" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" dependencies = [ "windows-sys 0.59.0", ] @@ -4031,9 +4259,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", @@ -4041,6 +4269,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.8" @@ -4180,6 +4418,12 @@ dependencies = [ "serde", ] +[[package]] +name = "smol_str" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fad6c857cbab2627dcf01ec85a623ca4e7dcb5691cbaa3d7fb7653671f0d09c9" + [[package]] name = "socket2" version = "0.5.7" @@ -4301,13 +4545,35 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9426b2a0c03e6cc2ea8dbc0168dbbf943f88755e409fb91bcb8f6a268305f4a" +[[package]] +name = "strum" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" +dependencies = [ + "strum_macros 0.25.3", +] + [[package]] name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" dependencies = [ - "strum_macros", + "strum_macros 0.26.4", +] + +[[package]] +name = "strum_macros" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.87", ] [[package]] @@ -4316,7 +4582,7 @@ version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "rustversion", @@ -4369,6 +4635,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "swarm-discovery" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39769914108ae68e261d85ceac7bce7095947130f79c29d4535e9b31fc702a40" +dependencies = [ + "acto", + "anyhow", + "hickory-proto 0.24.1", + "rand", + "socket2", + "tokio", + "tracing", +] + [[package]] name = "syn" version = "1.0.109" @@ -4402,6 +4683,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sync_wrapper" version = "1.0.1" @@ -4650,7 +4937,7 @@ dependencies = [ "num-bigint", "pem", "proc-macro2", - "rcgen", + "rcgen 0.12.1", "reqwest", "ring", "rustls", @@ -4665,6 +4952,21 @@ dependencies = [ "x509-parser", ] +[[package]] +name = "tokio-serde" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466" +dependencies = [ + "bincode", + "bytes", + "educe", + "futures-core", + "futures-sink", + "pin-project", + "serde", +] + [[package]] name = "tokio-stream" version = "0.1.16" @@ -4756,6 +5058,28 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -4922,6 +5246,12 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + [[package]] name = "unicode-xid" version = "0.2.6" @@ -4961,9 +5291,9 @@ dependencies = [ [[package]] name = "url" -version = "2.5.3" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna 1.0.3", diff --git a/Cargo.toml b/Cargo.toml index 24d5aa79..b24cb3de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,22 @@ rust-version = "1.76" [dependencies] anyhow = { version = "1" } async-channel = "2.3.1" -bao-tree = { version = "0.13", features = ["tokio_fsm", "validate"], default-features = false } +bao-tree = { version = "0.13", features = [ + "tokio_fsm", + "validate", +], default-features = false } bytes = { version = "1.7", features = ["serde"] } chrono = "0.4.31" clap = { version = "4.5.20", features = ["derive"], optional = true } -console = { version = "0.15.8", optional = true } -derive_more = { version = "1.0.0", features = ["debug", "display", "deref", "deref_mut", "from", "try_into", "into"] } +derive_more = { version = "1.0.0", features = [ + "debug", + "display", + "deref", + "deref_mut", + "from", + "try_into", + "into", +] } futures-buffered = "0.2.4" futures-lite = "2.3" futures-util = { version = "0.3.30", optional = true } @@ -38,14 +48,18 @@ num_cpus = "1.15.0" oneshot = "0.1.8" parking_lot = { version = "0.12.1", optional = true } portable-atomic = { version = "1", optional = true } -postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } +postcard = { version = "1", default-features = false, features = [ + "alloc", + "use-std", + "experimental-derive", +] } quic-rpc = { version = "0.15.1", optional = true } quic-rpc-derive = { version = "0.15.0", optional = true } quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] } rand = "0.8" range-collections = "0.4.0" redb = { version = "2.2.0", optional = true } -redb_v1 = { package = "redb", version = "1.5.1", optional = true } +redb_v1 = { package = "redb", version = "1.5.1", optional = true } ref-cast = { version = "1.0.23", optional = true } reflink-copy = { version = "0.1.8", optional = true } self_cell = "1.0.1" @@ -61,9 +75,14 @@ tracing = "0.1" tracing-futures = "0.2.5" walkdir = { version = "2.5.0", optional = true } +# Examples +iroh = { version = "0.28", optional = true } +console = { version = "0.15.8", optional = true } + [dev-dependencies] http-body = "1.0" iroh-test = { version = "0.28" } +iroh-net = { version = "0.28", features = ["test-utils"] } futures-buffered = "0.2.4" proptest = "1.0.0" serde_json = "1.0.107" @@ -78,14 +97,26 @@ futures-util = "0.3.30" testdir = "0.9.1" [features] -default = ["fs-store", "rpc", "net_protocol"] +default = ["fs-store", "rpc", "net_protocol", "example-iroh"] downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"] net_protocol = ["downloader"] fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"] metrics = ["iroh-metrics/metrics"] redb = ["dep:redb"] cli = ["rpc", "dep:clap", "dep:indicatif", "dep:console"] -rpc = ["dep:quic-rpc", "dep:quic-rpc-derive", "dep:nested_enum_utils", "dep:strum", "dep:futures-util", "dep:ref-cast", "dep:portable-atomic", "dep:walkdir", "downloader"] +rpc = [ + "dep:quic-rpc", + "dep:quic-rpc-derive", + "dep:nested_enum_utils", + "dep:strum", + "dep:futures-util", + "dep:ref-cast", + "dep:portable-atomic", + "dep:walkdir", + "downloader", +] + +example-iroh = ["dep:iroh", "dep:clap", "dep:indicatif", "dep:console"] [package.metadata.docs.rs] all-features = true @@ -100,6 +131,22 @@ name = "fetch-fsm" [[example]] name = "fetch-stream" +[[example]] +name = "hello-world-fetch" +required-features = ["example-iroh"] + +[[example]] +name = "hello-world-provide" +required-features = ["example-iroh"] + +[[example]] +name = "local-swarm-discovery" +required-features = ["example-iroh"] + +[[example]] +name = "custom-protocol" +required-features = ["example-iroh"] + [lints.rust] missing_debug_implementations = "warn" @@ -134,4 +181,4 @@ iroh-router = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" } - +iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" } diff --git a/examples/custom-protocol.rs b/examples/custom-protocol.rs new file mode 100644 index 00000000..d285f23c --- /dev/null +++ b/examples/custom-protocol.rs @@ -0,0 +1,306 @@ +//! Example for adding a custom protocol to a iroh node. +//! +//! We are building a very simple custom protocol here, and make our iroh nodes speak this protocol +//! in addition to the built-in protocols (blobs, gossip, docs). +//! +//! Our custom protocol allows querying the blob store of other nodes for text matches. For +//! this, we keep a very primitive index of the UTF-8 text of our blobs. +//! +//! The example is contrived - we only use memory nodes, and our database is a hashmap in a mutex, +//! and our queries just match if the query string appears as-is in a blob. +//! Nevertheless, this shows how powerful systems can be built with custom protocols by also using +//! the existing iroh protocols (blobs in this case). +//! +//! ## Usage +//! +//! In one terminal, run +//! +//! cargo run --example custom-protocol --features=examples -- listen "hello-world" "foo-bar" "hello-moon" +//! +//! This spawns an iroh nodes with three blobs. It will print the node's node id. +//! +//! In another terminal, run +//! +//! cargo run --example custom-protocol --features=examples -- query hello +//! +//! Replace with the node id from above. This will connect to the listening node with our +//! custom protocol and query for the string `hello`. The listening node will return a list of +//! blob hashes that contain `hello`. We will then download all these blobs with iroh-blobs, +//! and then print a list of the hashes with their content. +//! +//! For this example, this will print: +//! +//! moobakc6gao3ufmk: hello moon +//! 25eyd35hbigiqc4n: hello world +//! +//! That's it! Follow along in the code below, we added a bunch of comments to explain things. + +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use anyhow::Result; +use clap::Parser; +use futures_lite::future::Boxed as BoxedFuture; +use iroh::{ + net::{ + endpoint::{get_remote_node_id, Connecting}, + Endpoint, NodeId, + }, + router::ProtocolHandler, +}; +use iroh_base::hash::Hash; +use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::MemClient, util::local_pool::LocalPool}; +use iroh_router::Router; +use tracing_subscriber::{prelude::*, EnvFilter}; + +#[derive(Debug, Parser)] +pub struct Cli { + #[clap(subcommand)] + command: Command, +} + +#[derive(Debug, Parser)] +pub enum Command { + /// Spawn a node in listening mode. + Listen { + /// Each text string will be imported as a blob and inserted into the search database. + text: Vec, + }, + /// Query a remote node for data and print the results. + Query { + /// The node id of the node we want to query. + node_id: NodeId, + /// The text we want to match. + query: String, + }, +} + +/// Each custom protocol is identified by its ALPN string. +/// +/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake, +/// and the connection is aborted unless both nodes pass the same bytestring. +const ALPN: &[u8] = b"iroh-example/text-search/0"; + +#[tokio::main] +async fn main() -> Result<()> { + setup_logging(); + let args = Cli::parse(); + + // Build a in-memory node. For production code, you'd want a persistent node instead usually. + let endpoint = Endpoint::builder().bind().await?; + let builder = Router::builder(endpoint); + let local_pool = LocalPool::default(); + let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint()); + let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); + let blobs_client = blobs.client(); + + // Build our custom protocol handler. The `builder` exposes access to various subsystems in the + // iroh node. In our case, we need a blobs client and the endpoint. + let proto = BlobSearch::new(blobs_client.clone(), builder.endpoint().clone()); + + // Add our protocol, identified by our ALPN, to the node, and spawn the node. + let builder = builder.accept(ALPN, proto.clone()); + let node = builder.spawn().await?; + + match args.command { + Command::Listen { text } => { + let node_id = node.endpoint().node_id(); + println!("our node id: {node_id}"); + + // Insert the text strings as blobs and index them. + for text in text.into_iter() { + proto.insert_and_index(text).await?; + } + + // Wait for Ctrl-C to be pressed. + tokio::signal::ctrl_c().await?; + } + Command::Query { node_id, query } => { + // Query the remote node. + // This will send the query over our custom protocol, read hashes on the reply stream, + // and download each hash over iroh-blobs. + let hashes = proto.query_remote(node_id, &query).await?; + + // Print out our query results. + for hash in hashes { + read_and_print(&blobs_client, hash).await?; + } + } + } + + node.shutdown().await?; + + Ok(()) +} + +#[derive(Debug, Clone)] +struct BlobSearch { + blobs: MemClient, + endpoint: Endpoint, + index: Arc>>, +} + +impl ProtocolHandler for BlobSearch { + /// The `accept` method is called for each incoming connection for our ALPN. + /// + /// The returned future runs on a newly spawned tokio task, so it can run as long as + /// the connection lasts. + fn accept(self: Arc, connecting: Connecting) -> BoxedFuture> { + // We have to return a boxed future from the handler. + Box::pin(async move { + // Wait for the connection to be fully established. + let connection = connecting.await?; + // We can get the remote's node id from the connection. + let node_id = get_remote_node_id(&connection)?; + println!("accepted connection from {node_id}"); + + // Our protocol is a simple request-response protocol, so we expect the + // connecting peer to open a single bi-directional stream. + let (mut send, mut recv) = connection.accept_bi().await?; + + // We read the query from the receive stream, while enforcing a max query length. + let query_bytes = recv.read_to_end(64).await?; + + // Now, we can perform the actual query on our local database. + let query = String::from_utf8(query_bytes)?; + let hashes = self.query_local(&query); + + // We want to return a list of hashes. We do the simplest thing possible, and just send + // one hash after the other. Because the hashes have a fixed size of 32 bytes, this is + // very easy to parse on the other end. + for hash in hashes { + send.write_all(hash.as_bytes()).await?; + } + + // By calling `finish` on the send stream we signal that we will not send anything + // further, which makes the receive stream on the other end terminate. + send.finish()?; + // By calling stopped we wait until the remote iroh Endpoint has acknowledged + // all data. This does not mean the remote application has received all data + // from the Endpoint. + send.stopped().await?; + Ok(()) + }) + } +} + +impl BlobSearch { + /// Create a new protocol handler. + pub fn new(blobs: MemClient, endpoint: Endpoint) -> Arc { + Arc::new(Self { + blobs, + endpoint, + index: Default::default(), + }) + } + + /// Query a remote node, download all matching blobs and print the results. + pub async fn query_remote(&self, node_id: NodeId, query: &str) -> Result> { + // Establish a connection to our node. + // We use the default node discovery in iroh, so we can connect by node id without + // providing further information. + let conn = self.endpoint.connect(node_id, ALPN).await?; + + // Open a bi-directional in our connection. + let (mut send, mut recv) = conn.open_bi().await?; + + // Send our query. + send.write_all(query.as_bytes()).await?; + + // Finish the send stream, signalling that no further data will be sent. + // This makes the `read_to_end` call on the accepting side terminate. + send.finish()?; + // By calling stopped we wait until the remote iroh Endpoint has acknowledged all + // data. This does not mean the remote application has received all data from the + // Endpoint. + send.stopped().await?; + + // In this example, we simply collect all results into a vector. + // For real protocols, you'd usually want to return a stream of results instead. + let mut out = vec![]; + + // The response is sent as a list of 32-byte long hashes. + // We simply read one after the other into a byte buffer. + let mut hash_bytes = [0u8; 32]; + loop { + // Read 32 bytes from the stream. + match recv.read_exact(&mut hash_bytes).await { + // FinishedEarly means that the remote side did not send further data, + // so in this case we break our loop. + Err(quinn::ReadExactError::FinishedEarly(_)) => break, + // Other errors are connection errors, so we bail. + Err(err) => return Err(err.into()), + Ok(_) => {} + }; + // Upcast the raw bytes to the `Hash` type. + let hash = Hash::from_bytes(hash_bytes); + // Download the content via iroh-blobs. + self.blobs.download(hash, node_id.into()).await?.await?; + // Add the blob to our local database. + self.add_to_index(hash).await?; + out.push(hash); + } + Ok(out) + } + + /// Query the local database. + /// + /// Returns the list of hashes of blobs which contain `query` literally. + pub fn query_local(&self, query: &str) -> Vec { + let db = self.index.lock().unwrap(); + db.iter() + .filter_map(|(text, hash)| text.contains(query).then_some(*hash)) + .collect::>() + } + + /// Insert a text string into the database. + /// + /// This first imports the text as a blob into the iroh blob store, and then inserts a + /// reference to that hash in our (primitive) text database. + pub async fn insert_and_index(&self, text: String) -> Result { + let hash = self.blobs.add_bytes(text.into_bytes()).await?.hash; + self.add_to_index(hash).await?; + Ok(hash) + } + + /// Index a blob which is already in our blob store. + /// + /// This only indexes complete blobs that are smaller than 1KiB. + /// + /// Returns `true` if the blob was indexed. + async fn add_to_index(&self, hash: Hash) -> Result { + let mut reader = self.blobs.read(hash).await?; + // Skip blobs larger than 1KiB. + if reader.size() > 1024 * 1024 { + return Ok(false); + } + let bytes = reader.read_to_bytes().await?; + match String::from_utf8(bytes.to_vec()) { + Ok(text) => { + let mut db = self.index.lock().unwrap(); + db.insert(text, hash); + Ok(true) + } + Err(_err) => Ok(false), + } + } +} + +/// Read a blob from the local blob store and print it to STDOUT. +async fn read_and_print(blobs: &MemClient, hash: Hash) -> Result<()> { + let content = blobs.read_to_bytes(hash).await?; + let message = String::from_utf8(content.to_vec())?; + println!("{}: {message}", hash.fmt_short()); + Ok(()) +} + +/// Set the RUST_LOG env var to one of {debug,info,warn} to see logging. +fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} diff --git a/examples/hello-world-fetch.rs b/examples/hello-world-fetch.rs new file mode 100644 index 00000000..9466d373 --- /dev/null +++ b/examples/hello-world-fetch.rs @@ -0,0 +1,91 @@ +//! An example that fetches an iroh blob and prints the contents. +//! Will only work with blobs and collections that contain text, and is meant as a companion to the `hello-world-get` examples. +//! +//! This is using an in memory database and a random node id. +//! Run the `provide` example, which will give you instructions on how to run this example. +use std::{env, str::FromStr}; + +use anyhow::{bail, ensure, Context, Result}; +use iroh::base::ticket::BlobTicket; +use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool, BlobFormat}; +use iroh_net::Endpoint; +use iroh_router::Router; +use tracing_subscriber::{prelude::*, EnvFilter}; + +// set the RUST_LOG env var to one of {debug,info,warn} to see logging info +pub fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} + +#[tokio::main] +async fn main() -> Result<()> { + setup_logging(); + println!("\n'Hello World' fetch example!"); + // get the ticket + let args: Vec = env::args().collect(); + + if args.len() != 2 { + bail!("expected one argument [BLOB_TICKET]\n\nGet a ticket by running the follow command in a separate terminal:\n\n`cargo run --example hello-world-provide`"); + } + + // deserialize ticket string into a ticket + let ticket = + BlobTicket::from_str(&args[1]).context("failed parsing blob ticket\n\nGet a ticket by running the follow command in a separate terminal:\n\n`cargo run --example hello-world-provide`")?; + + // create a new node + let endpoint = Endpoint::builder().bind().await?; + let builder = Router::builder(endpoint); + let local_pool = LocalPool::default(); + let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint()); + let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); + let node = builder.spawn().await?; + let blobs_client = blobs.client(); + + println!("fetching hash: {}", ticket.hash()); + println!("node id: {}", node.endpoint().node_id()); + println!("node listening addresses:"); + let addrs = node.endpoint().node_addr().await?; + for addr in addrs.direct_addresses() { + println!("\t{:?}", addr); + } + println!( + "node relay server url: {:?}", + node.endpoint() + .home_relay() + .expect("a default relay url should be provided") + .to_string() + ); + + // If the `BlobFormat` is `Raw`, we have the hash for a single blob, and simply need to read the blob using the `blobs` API on the client to get the content. + ensure!( + ticket.format() == BlobFormat::Raw, + "'Hello World' example expects to fetch a single blob, but the ticket indicates a collection.", + ); + + // `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress + // on the state of your download. + let download_stream = blobs_client + .download(ticket.hash(), ticket.node_addr().clone()) + .await?; + + // You can also just `await` the stream, which will poll the `DownloadProgress` stream for you. + let outcome = download_stream.await.context("unable to download hash")?; + + println!( + "\ndownloaded {} bytes from node {}", + outcome.downloaded_size, + ticket.node_addr().node_id + ); + + // Get the content we have just fetched from the iroh database. + + let bytes = blobs_client.read_to_bytes(ticket.hash()).await?; + let s = std::str::from_utf8(&bytes).context("unable to parse blob as as utf-8 string")?; + println!("{s}"); + + Ok(()) +} diff --git a/examples/hello-world-provide.rs b/examples/hello-world-provide.rs new file mode 100644 index 00000000..3fdfb8cf --- /dev/null +++ b/examples/hello-world-provide.rs @@ -0,0 +1,65 @@ +//! The smallest possible example to spin up a node and serve a single blob. +//! +//! This is using an in memory database and a random node id. +//! run this example from the project root: +//! $ cargo run --example hello-world-provide +use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; +use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool}; +use iroh_net::Endpoint; +use iroh_router::Router; +use tracing_subscriber::{prelude::*, EnvFilter}; + +// set the RUST_LOG env var to one of {debug,info,warn} to see logging info +pub fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + setup_logging(); + println!("'Hello World' provide example!"); + + // create a new node + let endpoint = Endpoint::builder().bind().await?; + let builder = Router::builder(endpoint); + let local_pool = LocalPool::default(); + let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint()); + let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); + let blobs_client = blobs.client(); + let node = builder.spawn().await?; + + // add some data and remember the hash + let res = blobs_client.add_bytes("Hello, world!").await?; + + // create a ticket + let mut addr = node.endpoint().node_addr().await?; + addr.apply_options(AddrInfoOptions::RelayAndAddresses); + let ticket = BlobTicket::new(addr, res.hash, res.format)?; + + // print some info about the node + println!("serving hash: {}", ticket.hash()); + println!("node id: {}", ticket.node_addr().node_id); + println!("node listening addresses:"); + for addr in ticket.node_addr().direct_addresses() { + println!("\t{:?}", addr); + } + println!( + "node relay server url: {:?}", + ticket + .node_addr() + .relay_url() + .expect("a default relay url should be provided") + .to_string() + ); + // print the ticket, containing all the above information + println!("\nin another terminal, run:"); + println!("\t cargo run --example hello-world-fetch {}", ticket); + // block until SIGINT is received (ctrl+c) + tokio::signal::ctrl_c().await?; + node.shutdown().await?; + Ok(()) +} diff --git a/examples/local-swarm-discovery.rs b/examples/local-swarm-discovery.rs new file mode 100644 index 00000000..f02f4789 --- /dev/null +++ b/examples/local-swarm-discovery.rs @@ -0,0 +1,271 @@ +//! Example that runs and iroh node with local node discovery and no relay server +//! +//! Run the follow command to run the "accept" side, that hosts the content: +//! $ cargo run --example local_swarm_discovery --features="discovery-local-network" -- accept [FILE_PATH] +//! Wait for output that looks like the following: +//! $ cargo run --example local_swarm_discovery --features="discovery-local-network" -- connect [NODE_ID] [HASH] -o [FILE_PATH] +//! Run that command on another machine in the same local network, replacing [FILE_PATH] to the path on which you want to save the transferred content. +use std::path::PathBuf; + +use anyhow::ensure; +use clap::{Parser, Subcommand}; +use iroh::{ + base::{hash::Hash, key::SecretKey}, + net::{discovery::local_swarm_discovery::LocalSwarmDiscovery, key::PublicKey, NodeAddr}, +}; +use iroh_blobs::{ + net_protocol::Blobs, rpc::client::blobs::WrapOption, util::local_pool::LocalPool, +}; +use iroh_net::{Endpoint, RelayMode}; +use iroh_router::Router; +use tracing_subscriber::{prelude::*, EnvFilter}; + +use self::progress::show_download_progress; + +// set the RUST_LOG env var to one of {debug,info,warn} to see logging info +pub fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} + +#[derive(Debug, Parser)] +#[command(version, about)] +pub struct Cli { + #[clap(subcommand)] + command: Commands, +} + +#[derive(Subcommand, Clone, Debug)] +pub enum Commands { + /// Launch an iroh node and provide the content at the given path + Accept { + /// path to the file you want to provide + path: PathBuf, + }, + /// Get the node_id and hash string from a node running accept in the local network + /// Download the content from that node. + Connect { + /// Node ID of a node on the local network + node_id: PublicKey, + /// Hash of content you want to download from the node + hash: Hash, + /// save the content to a file + #[clap(long, short)] + out: Option, + }, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + setup_logging(); + let cli = Cli::parse(); + + let key = SecretKey::generate(); + let discovery = LocalSwarmDiscovery::new(key.public())?; + + println!("Starting iroh node with local node discovery..."); + // create a new node + let endpoint = Endpoint::builder() + .secret_key(key) + .discovery(Box::new(discovery)) + .relay_mode(RelayMode::Disabled) + .bind() + .await?; + let builder = Router::builder(endpoint); + let local_pool = LocalPool::default(); + let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint()); + let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); + let node = builder.spawn().await?; + let blobs_client = blobs.client(); + + match &cli.command { + Commands::Accept { path } => { + if !path.is_file() { + println!("Content must be a file."); + node.shutdown().await?; + return Ok(()); + } + let absolute = path.canonicalize()?; + println!("Adding {} as {}...", path.display(), absolute.display()); + let stream = blobs_client + .add_from_path( + absolute, + true, + iroh_blobs::util::SetTagOption::Auto, + WrapOption::NoWrap, + ) + .await?; + let outcome = stream.finish().await?; + println!("To fetch the blob:\n\tcargo run --example local_swarm_discovery --features=\"local-swarm-discovery\" -- connect {} {} -o [FILE_PATH]", node.endpoint().node_id(), outcome.hash); + tokio::signal::ctrl_c().await?; + node.shutdown().await?; + std::process::exit(0); + } + Commands::Connect { node_id, hash, out } => { + println!("NodeID: {}", node.endpoint().node_id()); + let mut stream = blobs_client + .download(*hash, NodeAddr::new(*node_id)) + .await?; + show_download_progress(*hash, &mut stream).await?; + if let Some(path) = out { + let absolute = std::env::current_dir()?.join(path); + ensure!(!absolute.is_dir(), "output must not be a directory"); + tracing::info!( + "exporting {hash} to {} -> {}", + path.display(), + absolute.display() + ); + let stream = blobs_client + .export( + *hash, + absolute, + iroh_blobs::store::ExportFormat::Blob, + iroh_blobs::store::ExportMode::Copy, + ) + .await?; + stream.await?; + } + } + } + Ok(()) +} + +mod progress { + use anyhow::{bail, Result}; + use console::style; + use futures_lite::{Stream, StreamExt}; + use indicatif::{ + HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState, + ProgressStyle, + }; + use iroh_blobs::{ + get::{db::DownloadProgress, progress::BlobProgress, Stats}, + Hash, + }; + + pub async fn show_download_progress( + hash: Hash, + mut stream: impl Stream> + Unpin, + ) -> Result<()> { + eprintln!("Fetching: {}", hash); + let mp = MultiProgress::new(); + mp.set_draw_target(ProgressDrawTarget::stderr()); + let op = mp.add(make_overall_progress()); + let ip = mp.add(make_individual_progress()); + op.set_message(format!("{} Connecting ...\n", style("[1/3]").bold().dim())); + let mut seq = false; + while let Some(x) = stream.next().await { + match x? { + DownloadProgress::InitialState(state) => { + if state.connected { + op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim())); + } + if let Some(count) = state.root.child_count { + op.set_message(format!( + "{} Downloading {} blob(s)\n", + style("[3/3]").bold().dim(), + count + 1, + )); + op.set_length(count + 1); + op.reset(); + op.set_position(state.current.map(u64::from).unwrap_or(0)); + seq = true; + } + if let Some(blob) = state.get_current() { + if let Some(size) = blob.size { + ip.set_length(size.value()); + ip.reset(); + match blob.progress { + BlobProgress::Pending => {} + BlobProgress::Progressing(offset) => ip.set_position(offset), + BlobProgress::Done => ip.finish_and_clear(), + } + if !seq { + op.finish_and_clear(); + } + } + } + } + DownloadProgress::FoundLocal { .. } => {} + DownloadProgress::Connected => { + op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim())); + } + DownloadProgress::FoundHashSeq { children, .. } => { + op.set_message(format!( + "{} Downloading {} blob(s)\n", + style("[3/3]").bold().dim(), + children + 1, + )); + op.set_length(children + 1); + op.reset(); + seq = true; + } + DownloadProgress::Found { size, child, .. } => { + if seq { + op.set_position(child.into()); + } else { + op.finish_and_clear(); + } + ip.set_length(size); + ip.reset(); + } + DownloadProgress::Progress { offset, .. } => { + ip.set_position(offset); + } + DownloadProgress::Done { .. } => { + ip.finish_and_clear(); + } + DownloadProgress::AllDone(Stats { + bytes_read, + elapsed, + .. + }) => { + op.finish_and_clear(); + eprintln!( + "Transferred {} in {}, {}/s", + HumanBytes(bytes_read), + HumanDuration(elapsed), + HumanBytes((bytes_read as f64 / elapsed.as_secs_f64()) as u64) + ); + break; + } + DownloadProgress::Abort(e) => { + bail!("download aborted: {}", e); + } + } + } + Ok(()) + } + fn make_overall_progress() -> ProgressBar { + let pb = ProgressBar::hidden(); + pb.enable_steady_tick(std::time::Duration::from_millis(100)); + pb.set_style( + ProgressStyle::with_template( + "{msg}{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len}", + ) + .unwrap() + .progress_chars("#>-"), + ); + pb + } + + fn make_individual_progress() -> ProgressBar { + let pb = ProgressBar::hidden(); + pb.enable_steady_tick(std::time::Duration::from_millis(100)); + pb.set_style( + ProgressStyle::with_template("{msg}{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") + .unwrap() + .with_key( + "eta", + |state: &ProgressState, w: &mut dyn std::fmt::Write| { + write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap() + }, + ) + .progress_chars("#>-"), + ); + pb + } +} diff --git a/src/lib.rs b/src/lib.rs index 886d1d74..6c3063a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,3 +56,6 @@ pub use crate::util::{Tag, TempTag}; /// Block size used by iroh, 2^4*1024 = 16KiB pub const IROH_BLOCK_SIZE: BlockSize = BlockSize::from_chunk_log(4); + +#[doc(inline)] +pub use crate::protocol::ALPN; diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 9caff14e..c9ef4360 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -5,6 +5,7 @@ use std::{ collections::BTreeMap, + fmt::Debug, sync::{Arc, OnceLock}, }; @@ -31,6 +32,20 @@ use crate::{ HashAndFormat, TempTag, }; +// pub type ProtectCb = Box) -> BoxFuture<()> + Send + Sync>; +// +// #[derive(derive_more::Debug)] +// enum GcState { +// Initial(#[debug(skip)] Vec), +// Started(#[allow(dead_code)] Option>), +// } +// +// impl Default for GcState { +// fn default() -> Self { +// Self::Initial(Vec::new()) +// } +// } + #[derive(Debug)] pub struct Blobs { rt: LocalPoolHandle, @@ -97,8 +112,66 @@ impl BlobBatches { } } +/// Builder for the Blobs protocol handler +#[derive(Debug)] +pub struct Builder { + store: S, + events: Option, + gc_config: Option, +} + +impl Builder { + /// Set the event sender for the blobs protocol. + pub fn events(mut self, value: EventSender) -> Self { + self.events = Some(value); + self + } + + pub fn gc_config(mut self, value: crate::store::GcConfig) -> Self { + self.gc_config = Some(value); + self + } + + /// Build the Blobs protocol handler. + /// You need to provide a local pool handle and an endpoint. + pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc> { + let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone()); + Arc::new(Blobs::new( + self.store, + rt.clone(), + self.events.unwrap_or_default(), + downloader, + endpoint.clone(), + )) + } +} + +impl Blobs { + /// Create a new memory-backed Blobs protocol handler. + pub fn memory() -> Builder { + Builder { + store: crate::store::mem::Store::new(), + events: None, + gc_config: None, + } + } +} + +impl Blobs { + /// Load a persistent Blobs protocol handler from a path. + pub async fn persistent( + path: impl AsRef, + ) -> anyhow::Result> { + Ok(Builder { + store: crate::store::fs::Store::load(path).await?, + events: None, + gc_config: None, + }) + } +} + impl Blobs { - pub fn new_with_events( + pub fn new( store: S, rt: LocalPoolHandle, events: EventSender, @@ -125,10 +198,52 @@ impl Blobs { &self.rt } + pub fn downloader(&self) -> &Downloader { + &self.downloader + } + pub fn endpoint(&self) -> &Endpoint { &self.endpoint } + // pub fn add_protected(&self, cb: ProtectCb) -> Result<()> { + // let mut state = self.gc_state.lock().unwrap(); + // match &mut *state { + // GcState::Initial(cbs) => { + // cbs.push(cb); + // } + // GcState::Started(_) => { + // anyhow::bail!("cannot add protected blobs after gc has started"); + // } + // } + // Ok(()) + // } + // + // pub fn start_gc(&self, config: GcConfig) -> Result<()> { + // let mut state = self.gc_state.lock().unwrap(); + // let protected = match state.deref_mut() { + // GcState::Initial(items) => std::mem::take(items), + // GcState::Started(_) => anyhow::bail!("gc already started"), + // }; + // let protected = Arc::new(protected); + // let protected_cb = move || { + // let protected = protected.clone(); + // async move { + // let mut set = BTreeSet::new(); + // for cb in protected.iter() { + // cb(&mut set).await; + // } + // set + // } + // }; + // let store = self.store.clone(); + // let run = self + // .rt + // .spawn(move || async move { store.gc_run(config, protected_cb).await }); + // *state = GcState::Started(Some(run)); + // Ok(()) + // } + pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> { self.batches.lock().await } @@ -267,6 +382,65 @@ impl Blobs { } } +// trait BlobsInner: Debug + Send + Sync + 'static { +// fn shutdown(self: Arc) -> BoxedFuture<()>; +// fn accept(self: Arc, conn: Connecting) -> BoxedFuture>; +// fn client(self: Arc) -> MemClient; +// fn local_pool_handle(&self) -> &LocalPoolHandle; +// fn downloader(&self) -> &Downloader; +// } + +// #[derive(Debug)] +// struct Blobs2 { +// inner: Arc, +// } + +// impl Blobs2 { +// fn client(&self) -> MemClient { +// self.inner.clone().client() +// } + +// fn local_pool_handle(&self) -> &LocalPoolHandle { +// self.inner.local_pool_handle() +// } + +// fn downloader(&self) -> &Downloader { +// self.inner.downloader() +// } +// } + +// impl BlobsInner for Blobs { +// fn shutdown(self: Arc) -> BoxedFuture<()> { +// ProtocolHandler::shutdown(self) +// } + +// fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { +// ProtocolHandler::accept(self, conn) +// } + +// fn client(self: Arc) -> MemClient { +// Blobs::client(self) +// } + +// fn local_pool_handle(&self) -> &LocalPoolHandle { +// self.rt() +// } + +// fn downloader(&self) -> &Downloader { +// self.downloader() +// } +// } + +// impl ProtocolHandler for Blobs2 { +// fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { +// self.inner.clone().accept(conn) +// } + +// fn shutdown(self: Arc) -> BoxedFuture<()> { +// self.inner.clone().shutdown() +// } +// } + impl ProtocolHandler for Blobs { fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { Box::pin(async move { diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 64c4d705..2f45357f 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -989,7 +989,11 @@ pub struct DownloadOptions { #[cfg(test)] mod tests { - use iroh_net::NodeId; + use std::{path::Path, time::Duration}; + + use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; + use iroh_net::{key::SecretKey, test_utils::DnsPkarrServer, NodeId, RelayMode}; + use node::Node; use rand::RngCore; use testresult::TestResult; use tokio::{io::AsyncWriteExt, sync::mpsc}; @@ -1001,12 +1005,14 @@ mod tests { //! An iroh node that just has the blobs transport use std::{path::Path, sync::Arc}; - use iroh_net::{NodeAddr, NodeId}; - use quic_rpc::transport::{Connector, Listener}; + use iroh_net::{Endpoint, NodeAddr, NodeId}; + use iroh_router::Router; use tokio_util::task::AbortOnDropHandle; use super::RpcService; use crate::{ + downloader::Downloader, + net_protocol::Blobs, provider::{CustomEventSender, EventSender}, rpc::client::{blobs, tags}, util::local_pool::LocalPool, @@ -1028,26 +1034,65 @@ mod tests { pub struct Builder { store: S, events: EventSender, + endpoint: Option, } impl Builder { /// Sets the event sender pub fn blobs_events(self, events: impl CustomEventSender) -> Self { - Builder { - store: self.store, + Self { events: events.into(), + ..self + } + } + + /// Set an endpoint builder + pub fn endpoint(self, endpoint: iroh_net::endpoint::Builder) -> Self { + Self { + endpoint: Some(endpoint), + ..self } } /// Spawns the node pub async fn spawn(self) -> anyhow::Result { - let (client, router, rpc_task, _local_pool) = - setup_router(self.store, self.events).await?; + let store = self.store; + let events = self.events; + let endpoint = self + .endpoint + .unwrap_or_else(|| Endpoint::builder().discovery_n0()) + .bind() + .await?; + let local_pool = LocalPool::single(); + let mut router = Router::builder(endpoint.clone()); + + // Setup blobs + let downloader = + Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone()); + let blobs = Arc::new(Blobs::new( + store.clone(), + local_pool.handle().clone(), + events, + downloader, + endpoint.clone(), + )); + router = router.accept(crate::ALPN, blobs.clone()); + + // Build the router + let router = router.spawn().await?; + + // Setup RPC + let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32); + let internal_rpc = quic_rpc::RpcServer::new(internal_rpc).boxed(); + let _rpc_task = internal_rpc.spawn_accept_loop(move |msg, chan| { + blobs.clone().handle_rpc_request(msg, chan) + }); + let client = quic_rpc::RpcClient::new(controller).boxed(); Ok(Node { router, client, - _rpc_task: AbortOnDropHandle::new(rpc_task), - _local_pool, + _rpc_task, + _local_pool: local_pool, }) } } @@ -1058,6 +1103,7 @@ mod tests { Builder { store: crate::store::mem::Store::new(), events: Default::default(), + endpoint: None, } } @@ -1068,6 +1114,7 @@ mod tests { Ok(Builder { store: crate::store::fs::Store::load(path).await?, events: Default::default(), + endpoint: None, }) } @@ -1096,66 +1143,6 @@ mod tests { tags::Client::new(self.client.clone()) } } - - async fn setup_router( - store: S, - events: EventSender, - ) -> anyhow::Result<( - RpcClient, - iroh_router::Router, - tokio::task::JoinHandle<()>, - LocalPool, - )> { - let endpoint = iroh_net::Endpoint::builder().discovery_n0().bind().await?; - let local_pool = LocalPool::single(); - let mut router = iroh_router::Router::builder(endpoint.clone()); - - // Setup blobs - let downloader = crate::downloader::Downloader::new( - store.clone(), - endpoint.clone(), - local_pool.handle().clone(), - ); - let blobs = Arc::new(crate::net_protocol::Blobs::new_with_events( - store.clone(), - local_pool.handle().clone(), - events, - downloader, - endpoint.clone(), - )); - router = router.accept(crate::protocol::ALPN.to_vec(), blobs.clone()); - - // Build the router - let router = router.spawn().await?; - - // Setup RPC - let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32); - let controller = controller.boxed(); - let internal_rpc = internal_rpc.boxed(); - let internal_rpc = quic_rpc::RpcServer::new(internal_rpc); - - let rpc_server_task: tokio::task::JoinHandle<()> = tokio::task::spawn(async move { - loop { - let request = internal_rpc.accept().await; - match request { - Ok(accepting) => { - let blobs = blobs.clone(); - tokio::task::spawn(async move { - let (msg, chan) = accepting.read_first().await.unwrap(); - blobs.handle_rpc_request(msg, chan).await.unwrap(); - }); - } - Err(err) => { - tracing::warn!("rpc error: {:?}", err); - } - } - } - }); - - let client = quic_rpc::RpcClient::new(controller); - - Ok((client, router, rpc_server_task, local_pool)) - } } #[tokio::test] @@ -1801,4 +1788,148 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_ticket_multiple_addrs() -> TestResult<()> { + let _guard = iroh_test::logging::setup(); + + let node = Node::memory().spawn().await?; + let hash = node + .blobs() + .add_bytes(Bytes::from_static(b"hello")) + .await? + .hash; + + let mut addr = node.node_addr().await?; + addr.apply_options(AddrInfoOptions::RelayAndAddresses); + let ticket = BlobTicket::new(addr, hash, BlobFormat::Raw)?; + println!("addrs: {:?}", ticket.node_addr().info); + assert!(!ticket.node_addr().info.direct_addresses.is_empty()); + Ok(()) + } + + #[tokio::test] + async fn test_node_add_blob_stream() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + use std::io::Cursor; + let node = Node::memory().spawn().await?; + + let blobs = node.blobs(); + let input = vec![2u8; 1024 * 256]; // 265kb so actually streaming, chunk size is 64kb + let reader = Cursor::new(input.clone()); + let progress = blobs.add_reader(reader, SetTagOption::Auto).await?; + let outcome = progress.finish().await?; + let hash = outcome.hash; + let output = blobs.read_to_bytes(hash).await?; + assert_eq!(input, output.to_vec()); + Ok(()) + } + + #[tokio::test] + async fn test_node_add_tagged_blob_event() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let node = Node::memory().spawn().await?; + + let _got_hash = tokio::time::timeout(Duration::from_secs(10), async move { + let mut stream = node + .blobs() + .add_from_path( + Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"), + false, + SetTagOption::Auto, + WrapOption::NoWrap, + ) + .await?; + + while let Some(progress) = stream.next().await { + match progress? { + crate::provider::AddProgress::AllDone { hash, .. } => { + return Ok(hash); + } + crate::provider::AddProgress::Abort(e) => { + anyhow::bail!("Error while adding data: {e}"); + } + _ => {} + } + } + anyhow::bail!("stream ended without providing data"); + }) + .await + .context("timeout")? + .context("get failed")?; + + Ok(()) + } + + #[tokio::test] + async fn test_download_via_relay() -> Result<()> { + let _guard = iroh_test::logging::setup(); + let (relay_map, relay_url, _guard) = iroh_net::test_utils::run_relay_server().await?; + + let endpoint1 = iroh_net::Endpoint::builder() + .relay_mode(RelayMode::Custom(relay_map.clone())) + .insecure_skip_relay_cert_verify(true); + let node1 = Node::memory().endpoint(endpoint1).spawn().await?; + let endpoint2 = iroh_net::Endpoint::builder() + .relay_mode(RelayMode::Custom(relay_map.clone())) + .insecure_skip_relay_cert_verify(true); + let node2 = Node::memory().endpoint(endpoint2).spawn().await?; + let AddOutcome { hash, .. } = node1.blobs().add_bytes(b"foo".to_vec()).await?; + + // create a node addr with only a relay URL, no direct addresses + let addr = NodeAddr::new(node1.node_id()).with_relay_url(relay_url); + node2.blobs().download(hash, addr).await?.await?; + assert_eq!( + node2 + .blobs() + .read_to_bytes(hash) + .await + .context("get")? + .as_ref(), + b"foo" + ); + Ok(()) + } + + #[tokio::test] + #[ignore = "flaky"] + async fn test_download_via_relay_with_discovery() -> Result<()> { + let _guard = iroh_test::logging::setup(); + let (relay_map, _relay_url, _guard) = iroh_net::test_utils::run_relay_server().await?; + let dns_pkarr_server = DnsPkarrServer::run().await?; + + let secret1 = SecretKey::generate(); + let endpoint1 = iroh_net::Endpoint::builder() + .relay_mode(RelayMode::Custom(relay_map.clone())) + .insecure_skip_relay_cert_verify(true) + .dns_resolver(dns_pkarr_server.dns_resolver()) + .secret_key(secret1.clone()) + .discovery(dns_pkarr_server.discovery(secret1)); + let node1 = Node::memory().endpoint(endpoint1).spawn().await?; + let secret2 = SecretKey::generate(); + let endpoint2 = iroh_net::Endpoint::builder() + .relay_mode(RelayMode::Custom(relay_map.clone())) + .insecure_skip_relay_cert_verify(true) + .dns_resolver(dns_pkarr_server.dns_resolver()) + .secret_key(secret2.clone()) + .discovery(dns_pkarr_server.discovery(secret2)); + let node2 = Node::memory().endpoint(endpoint2).spawn().await?; + let hash = node1.blobs().add_bytes(b"foo".to_vec()).await?.hash; + + // create a node addr with node id only + let addr = NodeAddr::new(node1.node_id()); + node2.blobs().download(hash, addr).await?.await?; + assert_eq!( + node2 + .blobs() + .read_to_bytes(hash) + .await + .context("get")? + .as_ref(), + b"foo" + ); + Ok(()) + } }