Skip to content

Commit 9392eb3

Browse files
committed
Give the ListTempTags rpc call the stream treatment as well!
1 parent d1277fb commit 9392eb3

File tree

6 files changed

+75
-25
lines changed

6 files changed

+75
-25
lines changed

src/api/proto.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
//! The file system store is quite complex and optimized, so to get started take a look at
1515
//! the much simpler memory store.
1616
use std::{
17+
collections::HashSet,
1718
fmt::{self, Debug},
1819
future::{Future, IntoFuture},
1920
io,
@@ -131,7 +132,7 @@ pub enum Request {
131132
RenameTag(RenameTagRequest),
132133
#[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
133134
CreateTag(CreateTagRequest),
134-
#[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
135+
#[rpc(tx = mpsc::Sender<ListTempTagsItem>)]
135136
ListTempTags(ListTempTagsRequest),
136137
#[rpc(tx = oneshot::Sender<TempTag>)]
137138
CreateTempTag(CreateTempTagRequest),
@@ -406,6 +407,34 @@ impl IrpcStreamItem for ListTagsItem {
406407
}
407408
}
408409

410+
pub struct ListTempTagsProgress {
411+
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListTempTagsItem>>>,
412+
}
413+
414+
impl IntoFuture for ListTempTagsProgress {
415+
fn into_future(self) -> Self::IntoFuture {
416+
Box::pin(self.inner.try_collect())
417+
}
418+
419+
type IntoFuture = future::Boxed<Self::Output>;
420+
421+
type Output = super::Result<HashSet<HashAndFormat>>;
422+
}
423+
424+
impl ListTempTagsProgress {
425+
pub(super) fn new(
426+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListTempTagsItem>>> + Send + 'static,
427+
) -> Self {
428+
Self {
429+
inner: Box::pin(fut),
430+
}
431+
}
432+
433+
pub fn stream(self) -> impl Stream<Item = super::Result<HashAndFormat>> {
434+
self.inner.into_stream()
435+
}
436+
}
437+
409438
pub struct ListTagsProgress {
410439
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListTagsItem>>>,
411440
}
@@ -493,6 +522,37 @@ pub struct CreateTempTagRequest {
493522
#[derive(Debug, Serialize, Deserialize)]
494523
pub struct ListTempTagsRequest;
495524

525+
#[derive(Debug, Serialize, Deserialize)]
526+
pub enum ListTempTagsItem {
527+
Item(HashAndFormat),
528+
Error(super::Error),
529+
Done,
530+
}
531+
532+
impl IrpcStreamItem for ListTempTagsItem {
533+
type Error = super::Error;
534+
type Item = HashAndFormat;
535+
536+
fn into_result_opt(self) -> Option<Result<HashAndFormat, super::Error>> {
537+
match self {
538+
ListTempTagsItem::Item(item) => Some(Ok(item)),
539+
ListTempTagsItem::Done => None,
540+
ListTempTagsItem::Error(err) => Some(Err(err)),
541+
}
542+
}
543+
544+
fn from_result(item: std::result::Result<HashAndFormat, super::Error>) -> Self {
545+
match item {
546+
Ok(i) => Self::Item(i),
547+
Err(e) => Self::Error(e),
548+
}
549+
}
550+
551+
fn done() -> Self {
552+
Self::Done
553+
}
554+
}
555+
496556
/// Rename a tag atomically
497557
#[derive(Debug, Serialize, Deserialize)]
498558
pub struct RenameTagRequest {

src/api/tags.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! The main entry point is the [`Tags`] struct.
44
use std::ops::RangeBounds;
55

6-
use n0_future::{Stream, StreamExt};
6+
use n0_future::StreamExt;
77
use ref_cast::RefCast;
88
use tracing::trace;
99

@@ -17,7 +17,7 @@ use super::{
1717
ApiClient, Tag, TempTag,
1818
};
1919
use crate::{
20-
api::proto::{ListTagsProgress, ListTempTagsRequest},
20+
api::proto::{ListTagsProgress, ListTempTagsProgress, ListTempTagsRequest},
2121
HashAndFormat,
2222
};
2323

@@ -33,11 +33,10 @@ impl Tags {
3333
Self::ref_cast(sender)
3434
}
3535

36-
pub async fn list_temp_tags(&self) -> irpc::Result<impl Stream<Item = HashAndFormat>> {
36+
pub fn list_temp_tags(&self) -> ListTempTagsProgress {
3737
let options = ListTempTagsRequest;
3838
trace!("{:?}", options);
39-
let res = self.client.rpc(options).await?;
40-
Ok(n0_future::stream::iter(res))
39+
ListTempTagsProgress::new(self.client.server_streaming(options, 32))
4140
}
4241

4342
/// List all tags with options.

src/store/fs.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ use crate::{
123123
},
124124
util::{
125125
channel::oneshot,
126+
irpc::MpscSenderExt,
126127
temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
127128
ChunkRangesExt,
128129
},
@@ -520,7 +521,7 @@ impl Actor {
520521
Command::ListTempTags(cmd) => {
521522
trace!("{cmd:?}");
522523
let tts = self.temp_tags.list();
523-
cmd.tx.send(tts).await.ok();
524+
cmd.tx.forward_iter(tts.into_iter().map(Ok)).await.ok();
524525
}
525526
Command::ImportBytes(cmd) => {
526527
trace!("{cmd:?}");
@@ -1417,13 +1418,13 @@ impl FsStore {
14171418
#[cfg(test)]
14181419
pub mod tests {
14191420
use core::panic;
1420-
use std::collections::{HashMap, HashSet};
1421+
use std::{collections::HashMap, future::IntoFuture};
14211422

14221423
use bao_tree::{
14231424
io::{outboard::PreOrderMemOutboard, round_up_to_chunks_groups},
14241425
ChunkRanges,
14251426
};
1426-
use n0_future::{stream, Stream, StreamExt};
1427+
use n0_future::{stream, Stream};
14271428
use testresult::TestResult;
14281429
use walkdir::WalkDir;
14291430

@@ -1973,23 +1974,13 @@ pub mod tests {
19731974
let batch = store.blobs().batch().await?;
19741975
let tt1 = batch.temp_tag(Hash::new("foo")).await?;
19751976
let tt2 = batch.add_slice("boo").await?;
1976-
let tts = store
1977-
.tags()
1978-
.list_temp_tags()
1979-
.await?
1980-
.collect::<HashSet<_>>()
1981-
.await;
1977+
let tts = store.tags().list_temp_tags().into_future().await?;
19821978
assert!(tts.contains(tt1.hash_and_format()));
19831979
assert!(tts.contains(tt2.hash_and_format()));
19841980
drop(batch);
19851981
store.sync_db().await?;
19861982
store.wait_idle().await?;
1987-
let tts = store
1988-
.tags()
1989-
.list_temp_tags()
1990-
.await?
1991-
.collect::<HashSet<_>>()
1992-
.await;
1983+
let tts = store.tags().list_temp_tags().await?;
19931984
// temp tag went out of scope, so it does not work anymore
19941985
assert!(!tts.contains(tt1.hash_and_format()));
19951986
assert!(!tts.contains(tt2.hash_and_format()));

src/store/fs/gc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ pub(super) async fn gc_mark_task(
5555
roots.insert(info.hash_and_format());
5656
}
5757
trace!("traversing temp roots");
58-
let mut tts = store.tags().list_temp_tags().await?;
58+
let mut tts = store.tags().list_temp_tags().stream();
5959
while let Some(tt) = tts.next().await {
6060
trace!("adding temp root {:?}", tt);
61-
roots.insert(tt);
61+
roots.insert(tt?);
6262
}
6363
for HashAndFormat { hash, format } in roots {
6464
// we need to do this for all formats except raw

src/store/mem.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ impl Actor {
324324
Command::ListTempTags(cmd) => {
325325
trace!("{cmd:?}");
326326
let tts = self.temp_tags.list();
327-
cmd.tx.send(tts).await.ok();
327+
cmd.tx.forward_iter(tts.into_iter().map(Ok)).await.ok();
328328
}
329329
Command::ListBlobs(cmd) => {
330330
let ListBlobsMsg { tx, .. } = cmd;

src/store/readonly_mem.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ impl Actor {
205205
.ok();
206206
}
207207
Command::ListTempTags(cmd) => {
208-
cmd.tx.send(Vec::new()).await.ok();
208+
cmd.tx.forward_iter(std::iter::empty()).await.ok();
209209
}
210210
Command::SyncDb(cmd) => {
211211
cmd.tx.send(Ok(())).await.ok();

0 commit comments

Comments
 (0)