Skip to content

Commit ceffccc

Browse files
committed
fix: merge of subdirectories of flists
1 parent 2a967a1 commit ceffccc

File tree

2 files changed

+154
-54
lines changed

2 files changed

+154
-54
lines changed

rfs/src/main.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,16 @@ struct MergeOptions {
133133
#[clap(action=ArgAction::Append, required = true)]
134134
target_flists: Vec<String>,
135135

136-
#[clap(short, long, default_value_t = String::from("/tmp/cache"))]
136+
#[clap(short, long, default_value_t = String::from("/tmp/cache"))]
137137
cache: String,
138138
}
139139

140140
impl MergeOptions {
141-
142141
fn validate(&self) -> Result<()> {
143142
if self.target_flists.len() < 2 {
144-
return Err(anyhow::anyhow!("At least 2 target file lists are required for merge operation"));
143+
return Err(anyhow::anyhow!(
144+
"At least 2 target file lists are required for merge operation"
145+
));
145146
}
146147
Ok(())
147148
}
@@ -478,9 +479,8 @@ fn config(opts: ConfigOptions) -> Result<()> {
478479
}
479480

480481
fn merge(opts: MergeOptions) -> Result<()> {
481-
482482
opts.validate()?;
483-
483+
484484
let rt = tokio::runtime::Runtime::new()?;
485485

486486
rt.block_on(async move {

rfs/src/merge.rs

Lines changed: 149 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
use crate::{
22
cache::Cache,
33
fungi::{
4-
meta::{FileType, Inode, Walk, WalkVisitor},
4+
meta::{FileType, Inode, Mode, Walk, WalkVisitor},
55
Reader, Result, Writer,
66
},
77
store::{get_router, BlockStore, Router, Store, Stores},
88
};
99
use anyhow::Context;
1010
use hex::ToHex;
11-
use std::path::Path;
11+
use std::collections::{HashMap, HashSet};
12+
use std::path::{Path, PathBuf};
1213
use tokio::io::AsyncReadExt;
1314

15+
const ROOT_PATH: &str = "/";
16+
1417
pub async fn merge<S: Store>(
1518
writer: Writer,
1619
store: S,
@@ -38,12 +41,33 @@ pub async fn merge<S: Store>(
3841
}
3942

4043
let store = store.into();
44+
45+
let mut path_to_inode_map = HashMap::new();
46+
let root_path = PathBuf::from(ROOT_PATH);
47+
48+
let root_inode = Inode {
49+
name: ROOT_PATH.into(),
50+
mode: Mode::new(FileType::Dir, 0o755),
51+
..Default::default()
52+
};
53+
let root_ino = writer.inode(root_inode).await?;
54+
path_to_inode_map.insert(root_path, root_ino);
55+
4156
for target_flist in target_flists {
4257
let reader = Reader::new(&target_flist).await?;
4358
let router = get_router(&reader).await?;
44-
let cache = Cache::new(cache.clone(), router);
59+
let cache_instance = Cache::new(cache.clone(), router);
60+
61+
let mut visited = HashSet::new();
62+
let mut visitor = MergeVisitor {
63+
writer: writer.clone(),
64+
reader: reader.clone(),
65+
store: &store,
66+
cache: cache_instance,
67+
path_to_inode: &mut path_to_inode_map,
68+
visited: &mut visited,
69+
};
4570

46-
let mut visitor = MergeVisitor::new(writer.clone(), reader.clone(), &store, cache);
4771
reader.walk(&mut visitor).await?;
4872
}
4973

@@ -58,24 +82,80 @@ where
5882
reader: Reader,
5983
store: &'a BlockStore<S>,
6084
cache: Cache<Router<Stores>>,
85+
path_to_inode: &'a mut HashMap<PathBuf, u64>,
86+
visited: &'a mut HashSet<u64>,
6187
}
6288

6389
impl<'a, S> MergeVisitor<'a, S>
6490
where
6591
S: Store,
6692
{
67-
pub fn new(
68-
writer: Writer,
69-
reader: Reader,
70-
store: &'a BlockStore<S>,
71-
cache: Cache<Router<Stores>>,
72-
) -> Self {
73-
Self {
74-
writer,
75-
reader,
76-
store,
77-
cache,
93+
async fn ensure_parent_directory(&mut self, path: &Path) -> Result<u64> {
94+
if path.to_str() == Some(ROOT_PATH) {
95+
return Ok(*self.path_to_inode.get(path).unwrap_or(&1));
7896
}
97+
98+
if let Some(ino) = self.path_to_inode.get(path) {
99+
return Ok(*ino);
100+
}
101+
102+
let parent_path = path.parent().unwrap_or(Path::new(ROOT_PATH));
103+
let parent_ino = Box::pin(self.ensure_parent_directory(parent_path)).await?;
104+
105+
let dir_name = path
106+
.file_name()
107+
.map(|name| name.to_string_lossy().to_string())
108+
.unwrap_or_default();
109+
110+
let dir_inode = Inode {
111+
parent: parent_ino,
112+
name: dir_name,
113+
mode: Mode::new(FileType::Dir, 0o755),
114+
..Default::default()
115+
};
116+
117+
let new_ino = self.writer.inode(dir_inode).await?;
118+
self.path_to_inode.insert(path.to_path_buf(), new_ino);
119+
120+
Ok(new_ino)
121+
}
122+
123+
async fn copy_blocks(&mut self, source_ino: u64, dest_ino: u64) -> Result<()> {
124+
let blocks = self.reader.blocks(source_ino).await?;
125+
126+
for block in &blocks {
127+
self.writer.block(dest_ino, &block.id, &block.key).await?;
128+
}
129+
130+
let mut blocks_to_store = Vec::new();
131+
for block in blocks {
132+
if self.store.get(&block).await.is_err() {
133+
blocks_to_store.push(block);
134+
}
135+
}
136+
137+
for block in blocks_to_store {
138+
let (_, mut file) = self.cache.get(&block).await?;
139+
let mut data = Vec::new();
140+
if let Err(e) = file.read_to_end(&mut data).await {
141+
log::error!(
142+
"failed to read block {}: {}",
143+
block.id.as_slice().encode_hex::<String>(),
144+
e
145+
);
146+
return Err(e.into());
147+
}
148+
if let Err(e) = self.store.set(&data).await {
149+
log::error!(
150+
"failed to set block {}: {}",
151+
block.id.as_slice().encode_hex::<String>(),
152+
e
153+
);
154+
return Err(e.into());
155+
}
156+
}
157+
158+
Ok(())
79159
}
80160
}
81161

@@ -84,46 +164,66 @@ impl<'a, S> WalkVisitor for MergeVisitor<'a, S>
84164
where
85165
S: Store,
86166
{
87-
async fn visit(&mut self, _path: &Path, node: &Inode) -> Result<Walk> {
167+
async fn visit(&mut self, path: &Path, node: &Inode) -> Result<Walk> {
168+
if !self.visited.insert(node.ino) {
169+
return Ok(Walk::Continue);
170+
}
171+
88172
match node.mode.file_type() {
89173
FileType::Dir => {
90-
self.writer.inode(node.clone()).await?;
91-
return Ok(Walk::Continue);
174+
if path.to_str() == Some(ROOT_PATH) {
175+
return Ok(Walk::Continue);
176+
}
177+
178+
let dir_name = path
179+
.file_name()
180+
.map(|name| name.to_string_lossy().to_string())
181+
.unwrap_or_default();
182+
183+
let parent_path = path.parent().unwrap_or(Path::new(ROOT_PATH));
184+
let parent_ino = self.ensure_parent_directory(parent_path).await?;
185+
186+
let dir_node = Inode {
187+
parent: parent_ino,
188+
name: dir_name,
189+
mode: node.mode.clone(),
190+
uid: node.uid,
191+
gid: node.gid,
192+
rdev: node.rdev,
193+
ctime: node.ctime,
194+
mtime: node.mtime,
195+
data: node.data.clone(),
196+
..Default::default()
197+
};
198+
199+
let dir_ino = self.writer.inode(dir_node).await?;
200+
self.path_to_inode.insert(path.to_path_buf(), dir_ino);
92201
}
93202
FileType::Regular => {
94-
let ino = self.writer.inode(node.clone()).await?;
95-
let blocks = self.reader.blocks(node.ino).await?;
203+
let file_name = path
204+
.file_name()
205+
.map(|name| name.to_string_lossy().to_string())
206+
.unwrap_or_default();
96207

97-
for block in &blocks {
98-
self.writer.block(ino, &block.id, &block.key).await?;
99-
}
208+
let parent_path = path.parent().unwrap_or(Path::new(ROOT_PATH));
209+
let parent_ino = self.ensure_parent_directory(parent_path).await?;
100210

101-
let mut blocks_to_store = Vec::new();
102-
for block in blocks {
103-
if self.store.get(&block).await.is_err() {
104-
blocks_to_store.push(block);
105-
}
106-
}
107-
for block in blocks_to_store {
108-
let (_, mut file) = self.cache.get(&block).await?;
109-
let mut data = Vec::new();
110-
if let Err(e) = file.read_to_end(&mut data).await {
111-
log::error!(
112-
"failed to read block {}: {}",
113-
block.id.as_slice().encode_hex::<String>(),
114-
e
115-
);
116-
return Err(e.into());
117-
}
118-
if let Err(e) = self.store.set(&data).await {
119-
log::error!(
120-
"failed to set block {}: {}",
121-
block.id.as_slice().encode_hex::<String>(),
122-
e
123-
);
124-
return Err(e.into());
125-
}
126-
}
211+
let file_node = Inode {
212+
parent: parent_ino,
213+
name: file_name,
214+
size: node.size,
215+
uid: node.uid,
216+
gid: node.gid,
217+
mode: node.mode.clone(),
218+
rdev: node.rdev,
219+
ctime: node.ctime,
220+
mtime: node.mtime,
221+
data: node.data.clone(),
222+
..Default::default()
223+
};
224+
225+
let ino = self.writer.inode(file_node).await?;
226+
self.copy_blocks(node.ino, ino).await?;
127227
}
128228
_ => {
129229
log::warn!("Unknown file type for node: {:?}", node);

0 commit comments

Comments
 (0)