Skip to content

Commit

Permalink
Feat: support read file for kufu file system
Browse files Browse the repository at this point in the history
  • Loading branch information
yangsoon committed Mar 12, 2023
1 parent 8a5df10 commit b254f0c
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 143 deletions.
150 changes: 29 additions & 121 deletions src/db/local.rs → src/db/manager.rs
Original file line number Diff line number Diff line change
@@ -1,124 +1,12 @@
use super::{FSManger, Storage};
use super::Bucket::*;
use super::{FSManger, SledDb, Storage};
use crate::db::utils::*;
use crate::error::Error::{DentryAttrNotFound, InodeAttrNotFound};
use crate::error::Error::{ClusterObjectDataNotFound, DentryAttrNotFound, InodeAttrNotFound};
use crate::fuse::core::{time_now, DentryAttributes, FileKind, InodeAttributes};
use crate::{ClusterObject, Result};
use kube::core::DynamicObject;
use crate::Result;
use sled::IVec;
use sled::Transactional;
use sled::{Db, IVec, Tree};
use std::{collections::HashMap, path::Path};
use tracing::*;
use Bucket::*;

#[derive(Clone, Hash, PartialEq, Eq)]
pub enum Bucket {
RIndex,
Inode,
Dentry,
Data,
}

impl AsRef<[u8]> for Bucket {
fn as_ref(&self) -> &[u8] {
match self {
RIndex => "reverse-index".as_bytes(),
Inode => "inode".as_bytes(),
Dentry => "dentry".as_bytes(),
Data => "data".as_bytes(),
}
}
}

#[allow(dead_code)]
#[derive(Clone)]
pub struct SledDb {
db: Db,
buckets: HashMap<Bucket, Tree>,
}

#[allow(dead_code)]
impl SledDb {
pub fn new(path: impl AsRef<Path>) -> Result<SledDb> {
let db = sled::open(path)?;
clean_one_time_buckets(&db)?;
Ok(SledDb {
db: db.clone(),
buckets: HashMap::from([
(Bucket::RIndex, db.open_tree(Bucket::RIndex)?),
(Bucket::Inode, db.open_tree(Bucket::Inode)?),
(Bucket::Dentry, db.open_tree(Bucket::Dentry)?),
(Bucket::Data, db.open_tree(Bucket::Data)?),
]),
})
}

pub fn mount_gvr(&self, cluster_obj: ClusterObject) -> Result<u64> {
let parent_inode = self.mount_gvk(&cluster_obj)?;
let key = get_resource_full_key(&cluster_obj);
let value: IVec = (&cluster_obj).try_into()?;
self.mount_file(key, parent_inode, value)
}

pub fn update_gvr(&self, cluster_obj: ClusterObject) -> Result<()> {
let key = get_resource_full_key(&cluster_obj);
let value: IVec = (&cluster_obj).try_into()?;
self.edit_file(key, value)
}

pub fn mount_gvk(&self, cluster_obj: &ClusterObject) -> Result<u64> {
let api_path = get_resource_api_key(cluster_obj);
let parent_inode = ivec_to_u64(
self.get_bucket(RIndex)
.get(get_parent_resource_full_key(cluster_obj))?
.unwrap(),
);
self.mount_dir(api_path, parent_inode)
}
}

impl Storage for SledDb {
fn add(&self, cluster_obj: ClusterObject) -> Result<()> {
if self.has(&cluster_obj)? {
return self.update(cluster_obj);
}
// info!("impl Storage for SledDb add: {:?}", cluster_obj.meta);
self.mount_gvr(cluster_obj)?;
Ok(())
}

fn update(&self, cluster_obj: ClusterObject) -> Result<()> {
let key = get_resource_full_key(&cluster_obj);
let value: IVec = (&cluster_obj).try_into()?;
self.get_bucket(Data).insert(key, value)?;
Ok(())
}

fn get(&self, cluster_obj: ClusterObject) -> Result<Option<DynamicObject>> {
let key = get_resource_full_key(&cluster_obj);
let value = self.buckets.get(&Data).unwrap().get(key)?;
match value {
Some(v) => Ok(Some(serde_yaml::from_slice(v.as_ref())?)),
None => Ok(None),
}
}

fn delete(&self, cluster_obj: ClusterObject) -> Result<()> {
let key = get_resource_full_key(&cluster_obj);
self.get_bucket(Data).remove(key)?;
Ok(())
}

fn get_bucket(&self, name: Bucket) -> &Tree {
self.buckets.get(&name).unwrap()
}

fn has(&self, cluster_obj: &ClusterObject) -> Result<bool> {
let exist = self
.get_bucket(RIndex)
.contains_key(&get_resource_full_key(cluster_obj))?;
Ok(exist)
}
}
use std::path::Path;

impl FSManger for SledDb {
fn mount_dir(&self, path: impl AsRef<Path>, parent_inode: u64) -> Result<u64> {
Expand All @@ -127,7 +15,7 @@ impl FSManger for SledDb {

if self.get_bucket(RIndex).contains_key(key.clone())? {
let inode = self.get_bucket(RIndex).get(key.clone())?.unwrap();
return Ok(ivec_to_u64(inode));
return Ok(ivec_to_u64(&inode));
}
let next_inode = handle_next_inode();
let inode_attr: IVec = InodeAttributes::new_dict(next_inode.0).into();
Expand Down Expand Up @@ -161,9 +49,14 @@ impl FSManger for SledDb {
}

fn mount_file(&self, path: impl AsRef<Path>, parent_inode: u64, content: IVec) -> Result<u64> {
let next_inode = handle_next_inode();
let name = extract_name(path.as_ref());
let key = into_string(path.as_ref());
let next_inode = if self.get_bucket(RIndex).contains_key(key.clone())? {
let inode = self.get_bucket(RIndex).get(key.clone())?.unwrap();
(ivec_to_u64(&inode), inode)
} else {
handle_next_inode()
};
let inode_attr: IVec = InodeAttributes::new_file(next_inode.0, content.len() as u64).into();
self.join_dir(parent_inode, next_inode.0, name, FileKind::File)?;
(
Expand Down Expand Up @@ -230,7 +123,7 @@ impl FSManger for SledDb {
Ok(attr)
}

fn get_inode(&self, inode: u64) -> Result<InodeAttributes> {
fn get_inode_attr(&self, inode: u64) -> Result<InodeAttributes> {
let inode_bucket = self.get_bucket(Inode);
let inode_key = u64_to_ivec(inode);
if !inode_bucket.contains_key(inode_key.clone())? {
Expand All @@ -245,4 +138,19 @@ impl FSManger for SledDb {
self.get_bucket(Inode).insert(u64_to_ivec(inode), value)?;
Ok(())
}

fn get_inode(&self, key: String) -> Result<u64> {
let inode = self.get_bucket(RIndex).get(key)?.unwrap();
Ok(ivec_to_u64(&inode))
}

fn get_data(&self, inode: u64) -> Result<IVec> {
let data_bucket = self.get_bucket(Data);
let obj_key = u64_to_ivec(inode);
if !data_bucket.contains_key(&obj_key)? {
return Err(ClusterObjectDataNotFound(inode));
}
let data = data_bucket.get(&obj_key)?.unwrap();
return Ok(data);
}
}
23 changes: 17 additions & 6 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
pub mod local;
pub mod manager;
pub mod storage;
pub mod utils;
use std::path::Path;

pub use local::*;
use sled::{IVec, Tree};
pub use storage::*;

use crate::fuse::core::DentryAttributes;
use crate::fuse::core::FileKind;
use crate::fuse::core::InodeAttributes;
use crate::ClusterObject;
use crate::Result;
use kube::core::DynamicObject;
use sled::{IVec, Tree};
use std::path::Path;

#[derive(Clone, Hash, PartialEq, Eq)]
pub enum Bucket {
RIndex,
Inode,
Dentry,
Data,
}

pub trait Storage: Sync + Send {
fn add(&self, cluster_obj: ClusterObject) -> Result<()>;
fn get(&self, cluster_obj: ClusterObject) -> Result<Option<DynamicObject>>;
Expand All @@ -26,6 +35,8 @@ pub trait FSManger: Sync + Send {
fn edit_file(&self, path: impl AsRef<Path>, content: IVec) -> Result<()>;
fn join_dir(&self, parent_inode: u64, inode: u64, name: String, kind: FileKind) -> Result<()>;
fn get_dentry(&self, inode: u64) -> Result<DentryAttributes>;
fn get_inode(&self, inode: u64) -> Result<InodeAttributes>;
fn get_inode_attr(&self, inode: u64) -> Result<InodeAttributes>;
fn update_inode(&self, inode: u64, attr: InodeAttributes) -> Result<()>;
fn get_inode(&self, key: String) -> Result<u64>;
fn get_data(&self, inode: u64) -> Result<IVec>;
}
131 changes: 131 additions & 0 deletions src/db/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use super::Bucket::*;
use super::{Bucket, FSManger, Storage};
use crate::db::utils::*;
use crate::error::Error::MockParentDirError;
use crate::{ClusterObject, Result};
use kube::core::DynamicObject;
use kube::discovery::Scope::*;
use sled::{Db, IVec, Tree};
use std::{collections::HashMap, path::Path};

impl AsRef<[u8]> for Bucket {
fn as_ref(&self) -> &[u8] {
match self {
RIndex => "reverse-index".as_bytes(),
Inode => "inode".as_bytes(),
Dentry => "dentry".as_bytes(),
Data => "data".as_bytes(),
}
}
}

#[allow(dead_code)]
#[derive(Clone)]
pub struct SledDb {
db: Db,
buckets: HashMap<Bucket, Tree>,
}

#[allow(dead_code)]
impl SledDb {
pub fn new(path: impl AsRef<Path>) -> Result<SledDb> {
let db = sled::open(path)?;
clean_one_time_buckets(&db)?;
Ok(SledDb {
db: db.clone(),
buckets: HashMap::from([
(Bucket::RIndex, db.open_tree(Bucket::RIndex)?),
(Bucket::Inode, db.open_tree(Bucket::Inode)?),
(Bucket::Dentry, db.open_tree(Bucket::Dentry)?),
(Bucket::Data, db.open_tree(Bucket::Data)?),
]),
})
}

pub fn mount_gvr(&self, cluster_obj: &ClusterObject) -> Result<u64> {
let parent_inode = self.mount_gvk(cluster_obj)?;
let key = get_resource_full_key(cluster_obj);
let value: IVec = (cluster_obj).try_into()?;
let file_key = format!("{}.yaml", &key);
match cluster_obj.scope() {
Namespaced => self.mount_file(&file_key, parent_inode, value),
Cluster => {
self.mount_dir(&key, parent_inode)?;
self.mount_file(&file_key, parent_inode, value)
}
}
}

pub fn update_gvr(&self, cluster_obj: ClusterObject) -> Result<()> {
let key = get_resource_full_key(&cluster_obj);
let value: IVec = (&cluster_obj).try_into()?;
self.edit_file(key, value)
}

pub fn mount_gvk(&self, cluster_obj: &ClusterObject) -> Result<u64> {
let api_path = get_resource_api_key(cluster_obj);
let parent_path = get_parent_resource_full_key(cluster_obj);
let parent_inode = if self.get_bucket(RIndex).contains_key(parent_path.clone())? {
let p_inode = self
.get_bucket(RIndex)
.get(get_parent_resource_full_key(cluster_obj))?
.unwrap();
ivec_to_u64(&p_inode)
} else {
match cluster_obj.scope() {
Namespaced => {
// TODO: replace default to cluster
self.mount_dir(
parent_path.clone(),
self.get_inode("default/namespace".to_string())?,
)?
}
Cluster => return Err(MockParentDirError(api_path.clone())),
}
};
self.mount_dir(api_path, parent_inode)
}
}

impl Storage for SledDb {
fn add(&self, cluster_obj: ClusterObject) -> Result<()> {
if self.has(&cluster_obj)? {
return self.update(cluster_obj);
}
self.mount_gvr(&cluster_obj)?;
Ok(())
}

fn update(&self, cluster_obj: ClusterObject) -> Result<()> {
let key = get_resource_full_key(&cluster_obj);
let value: IVec = (&cluster_obj).try_into()?;
self.get_bucket(Data).insert(key, value)?;
Ok(())
}

fn get(&self, cluster_obj: ClusterObject) -> Result<Option<DynamicObject>> {
let key = get_resource_full_key(&cluster_obj);
let value = self.buckets.get(&Data).unwrap().get(key)?;
match value {
Some(v) => Ok(Some(serde_yaml::from_slice(v.as_ref())?)),
None => Ok(None),
}
}

fn delete(&self, cluster_obj: ClusterObject) -> Result<()> {
let key = get_resource_full_key(&cluster_obj);
self.get_bucket(Data).remove(key)?;
Ok(())
}

fn get_bucket(&self, name: Bucket) -> &Tree {
self.buckets.get(&name).unwrap()
}

fn has(&self, cluster_obj: &ClusterObject) -> Result<bool> {
let exist = self
.get_bucket(RIndex)
.contains_key(&get_resource_full_key(cluster_obj))?;
Ok(exist)
}
}
Loading

0 comments on commit b254f0c

Please sign in to comment.