From ba295ef20ce7d2b8f3c62927131f27f4a72d9070 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 7 Mar 2024 09:48:53 +0100 Subject: [PATCH] first version of the metadata storage --- examples/metadata_bench.rs | 23 +++- src/dictionary.rs | 1 + src/lib.rs | 2 +- src/metadata/mod.rs | 144 +++++++++++++++++++------- src/metadata/structure/coordinates.rs | 0 src/metadata/structure/mod.rs | 36 +++++++ src/storage/layout/metadata.rs | 102 ++++++++++++++++++ src/storage/layout/mod.rs | 1 + 8 files changed, 264 insertions(+), 45 deletions(-) create mode 100644 src/metadata/structure/coordinates.rs create mode 100644 src/metadata/structure/mod.rs create mode 100644 src/storage/layout/metadata.rs diff --git a/examples/metadata_bench.rs b/examples/metadata_bench.rs index 9eb6645..df2efe8 100644 --- a/examples/metadata_bench.rs +++ b/examples/metadata_bench.rs @@ -1,16 +1,29 @@ use remote_hdt::error::RemoteHDTError; use remote_hdt::metadata::Metadata; -use remote_hdt::storage::params::Serialization; use remote_hdt::storage::params::Backend; use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Serialization; +use remote_hdt::storage::layout::metadata::MetadataLayout; + + +use remote_hdt::storage::params::ChunkingStrategy; fn main() -> Result<(), RemoteHDTError> { - - let rdf_path = ""; + let rdf_path = "resources/1-lubm.ttl"; let metadata_path = ""; + let zarr_path = "1-lubm-metadata.zarr"; let fields = vec!["X_pos", "Y_pos"]; - let mut metadata: Metadata = Metadata::new(Serialization::Zarr); - metadata.serialize(rdf_path, ReferenceSystem::SPO,metadata_path,fields).unwrap(); + let mut metadata = Metadata::new( MetadataLayout,Serialization::Zarr); + metadata + .serialize( + Backend::FileSystem(zarr_path), + rdf_path, + ChunkingStrategy::Sharding(1024), + ReferenceSystem::SPO, + + metadata_path, + fields) + .unwrap(); Ok(()) } diff --git a/src/dictionary.rs b/src/dictionary.rs index d58d06c..6f42c7f 100644 --- a/src/dictionary.rs +++ b/src/dictionary.rs @@ -51,6 +51,7 @@ impl Dictionary { predicates: Set::new(hash_to_set(predicates)).unwrap(), objects: Set::new(hash_to_set(objects)).unwrap(), } + } pub fn subjects_size(&self) -> usize { diff --git a/src/lib.rs b/src/lib.rs index ecb6ec0..be8d49d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,6 @@ pub mod dictionary; mod engine; pub mod error; mod io; +pub mod metadata; pub mod storage; mod utils; -pub mod metadata; diff --git a/src/metadata/mod.rs b/src/metadata/mod.rs index e2b3c00..2ee7aeb 100644 --- a/src/metadata/mod.rs +++ b/src/metadata/mod.rs @@ -1,82 +1,148 @@ - - - use std::default; use std::path::PathBuf; use std::str::FromStr; +use std::collections::HashSet; +use std::sync::Arc; +use serde_json::Map; -use crate::dictionary::Dictionary; +use crate::storage::layout; +use crate::storage::layout::metadata::MetadataLayout; +use crate::storage::layout::tabular::TabularLayout; +use crate::utils::rdf_to_value; +use crate::dictionary::Dictionary; use crate::io::Graph; use crate::io::RdfParser; -use crate::storage::params::Serialization; +use crate::error::RemoteHDTError; +use crate::storage::layout::Layout; use crate::storage::params::Backend; +use crate::storage::params::Dimensionality; use crate::storage::params::ReferenceSystem; -use crate::error::RemoteHDTError; +use crate::storage::params::Serialization; +use crate::storage::params::ChunkingStrategy; use fcsd::Set; +use zarrs::array::Array; +use zarrs::array::ArrayBuilder; use zarrs::opendal::raw::oio::StreamExt; use zarrs::opendal::services::Fs; use zarrs::opendal::services::Http; use zarrs::opendal::Operator; use zarrs::storage::store::OpendalStore; -use zarrs::array::Array; - +use super::utils::hash_to_set; pub type MetadataResult = Result; +pub mod structure; -pub struct Metadata { - flatten_graph: Vec<(u32, u32, u32)>, +const ARRAY_NAME: &str = "/group/RemoteHDT"; // TODO: parameterize this + +pub struct Metadata { + flatten_graph: Vec<(String)>, serialization: Serialization, - dictionary : Dictionary, - array: Option> + dictionary: Dictionary, + array: Option>, + dimensionality: Dimensionality, + layout: Box>, } - -impl Metadata{ - pub fn new( serialization: Serialization) -> Self { +impl Metadata { + pub fn new( layout: impl Layout + 'static, serialization: Serialization) -> Self { Metadata { - flatten_graph: Vec::<(u32, u32, u32)>::default(), - serialization: serialization, - dictionary: Dictionary::default(), - array: None, + flatten_graph: Vec::::default(), + serialization: serialization, + dictionary: Dictionary::default(), + array: None, + dimensionality: Default::default(), + layout: Box::new(layout), } } - - - - - pub fn serialize(&mut self, rdf_path: &str, reference_system: ReferenceSystem, metadata_path: &str, fields: Vec<&str>) -> MetadataResult<&mut Self>{ - let graph_vector: Graph; + pub fn serialize<'a>( + &mut self, + store: Backend<'a>, + rdf_path: &str, + chunking_strategy: ChunkingStrategy, + reference_system: ReferenceSystem, + + metadata_path: &str, + fields: Vec<&str>, + ) -> MetadataResult<&mut Self> { + + let operator = match store { + Backend::FileSystem(path) => { + let mut builder = Fs::default(); + let path = PathBuf::from_str(path)?; + + match path.exists() { + true => return Err(RemoteHDTError::PathExists), + false => { + let path = match path.into_os_string().into_string() { + Ok(string) => string, + Err(_) => return Err(RemoteHDTError::OsPathToString), + }; + builder.root(&path); + } + } + + Operator::new(builder)?.finish() + } + Backend::HTTP(_) => return Err(RemoteHDTError::ReadOnlyBackend), + }; + + + // 2. We can create the FileSystemStore appropiately + let store = Arc::new(OpendalStore::new(operator.blocking())); - match RdfParser::parse(rdf_path, &reference_system) { + let graph = match RdfParser::parse(rdf_path, &reference_system) { Ok((graph, dictionary)) => { - graph_vector = graph; self.dictionary = dictionary; + self.dimensionality = Dimensionality::new(&self.dictionary, &graph); + graph } Err(_) => return Err(RemoteHDTError::RdfParse), }; + //Flatten the graph into triples let mut count = 0; - for i in graph_vector.iter() { - for j in i.iter(){ - self.flatten_graph.push((count, j.0, j.1)) + for i in graph.iter() { + for j in i.iter() { + self.flatten_graph.push(format!["{};{};{}",count, j.0, j.1]) } - count +=1; + count += 1; } + //TODO: change the implementation so it is only done here the flatten + let triples:HashSet<_> = self.flatten_graph.clone().into_iter().collect(); + let subjects = self.dictionary.subjects(); + let predicates = self.dictionary.predicates(); + let objects = self.dictionary.objects(); + + let arr = ArrayBuilder::new( + self.layout.shape(&self.dimensionality), + self.layout.data_type(), + self.layout + .chunk_shape(chunking_strategy, &self.dimensionality), + self.layout.fill_value(), + ) + .dimension_names(self.layout.dimension_names(&reference_system)) + .array_to_bytes_codec(self.layout.array_to_bytes_codec(&self.dimensionality)?) + .attributes({ + let mut attributes = Map::new(); + attributes.insert("triples".into(), rdf_to_value(Set::new(hash_to_set(triples)).unwrap())); + attributes.insert("subjects".into(), rdf_to_value(subjects)); + attributes.insert("predicates".into(), rdf_to_value(predicates)); + attributes.insert("objects".into(), rdf_to_value(objects)); + attributes.insert("reference_system".into(), reference_system.as_ref().into()); + attributes + }) + .build(store, ARRAY_NAME)?; + + arr.store_metadata()?; + self.layout.serialize(arr, graph)?; Ok(self) } - - - - - - - -} \ No newline at end of file +} diff --git a/src/metadata/structure/coordinates.rs b/src/metadata/structure/coordinates.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/metadata/structure/mod.rs b/src/metadata/structure/mod.rs new file mode 100644 index 0000000..f5a66bc --- /dev/null +++ b/src/metadata/structure/mod.rs @@ -0,0 +1,36 @@ +use parking_lot::Mutex; +use sprs::TriMat; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use zarrs::array::codec::ArrayToBytesCodecTraits; +use zarrs::array::Array; +use zarrs::array::ChunkGrid; +use zarrs::array::DataType; +use zarrs::array::DimensionName; +use zarrs::array::FillValue; +use zarrs::array_subset::ArraySubset; +use zarrs::storage::store::OpendalStore; + +use crate::dictionary::Dictionary; +use crate::error::RemoteHDTError; +use crate::io::Graph; +use crate::utils::columns_per_shard; +use crate::utils::rows_per_shard; +use crate::utils::value_to_term; + +use super::ChunkingStrategy; +use super::Dimensionality; +use super::ReferenceSystem; + + +type ArrayToBytesCodec = Box; + +pub mod coordinates; + + +pub trait StructureOps { + +} + +pub trait Structure: StructureOps { +} diff --git a/src/storage/layout/metadata.rs b/src/storage/layout/metadata.rs new file mode 100644 index 0000000..ed2e509 --- /dev/null +++ b/src/storage/layout/metadata.rs @@ -0,0 +1,102 @@ +use std::num::NonZeroU64; + +use parking_lot::Mutex; +use sprs::TriMat; +use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder; +use zarrs::array::codec::ArrayToBytesCodecTraits; +use zarrs::array::codec::GzipCodec; +use zarrs::array::ChunkGrid; +use zarrs::array::DataType; +use zarrs::array::DimensionName; +use zarrs::array::FillValue; + +use super::ChunkingStrategy; +use super::Dimensionality; +use super::ReferenceSystem; +use super::StorageResult; + +use crate::io::Graph; +use crate::storage::layout::LayoutOps; +use crate::storage::Layout; + +type Chunk = (u32, u32, u32); + +pub struct MetadataLayout; + +impl Layout for MetadataLayout { + fn shape(&self, dimensionality: &Dimensionality) -> Vec { + vec![dimensionality.get_graph_size(), 3] + } + + fn data_type(&self) -> DataType { + DataType::UInt32 + } + + fn chunk_shape(&self, chunking_strategy: ChunkingStrategy, _: &Dimensionality) -> ChunkGrid { + vec![chunking_strategy.into(), NonZeroU64::new(3).unwrap()].into() // TODO: make this a constant value + } + + fn fill_value(&self) -> FillValue { + FillValue::from(0u32) + } + + fn dimension_names(&self, _: &ReferenceSystem) -> Option> { + Some(vec![ + DimensionName::new("Triples"), + DimensionName::new("Fields"), + ]) + } + + fn array_to_bytes_codec( + &self, + _: &Dimensionality, + ) -> StorageResult> { + let mut sharding_codec_builder = ShardingCodecBuilder::new(vec![1, 3].try_into()?); + sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]); + Ok(Box::new(sharding_codec_builder.build())) + } +} + +impl LayoutOps for MetadataLayout { + fn graph_iter(&self, graph: Graph) -> Vec { + graph + .iter() + .enumerate() + .flat_map(|(first_term, triples)| { + + let count = 0; + triples + .iter() + .map(|&(second_term, third_term)| { + (count as u32, 1, 1) + }) + .collect::>() + }) + .collect::>() + } + + fn store_chunk_elements(&self, chunk: &[Chunk], _: usize) -> Vec { + let mut ans = Vec::new(); + for &(first_term, second_term, third_term) in chunk { + ans.push(first_term); + ans.push(second_term); + ans.push(third_term); + } + ans + } + + fn retrieve_chunk_elements( + &mut self, + matrix: &Mutex>, + first_term_index: usize, // TODO: will first_term_index instead of chunk[0] do the trick? + chunk: &[usize], + ) { + matrix + .lock() + .add_triplet(chunk[0], chunk[2], chunk[1] as usize); + } + + fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize { + dimensionality.first_term_size * dimensionality.third_term_size + } +} diff --git a/src/storage/layout/mod.rs b/src/storage/layout/mod.rs index a50e2a8..e1fd552 100644 --- a/src/storage/layout/mod.rs +++ b/src/storage/layout/mod.rs @@ -28,6 +28,7 @@ type ArrayToBytesCodec = Box; pub mod matrix; pub mod tabular; +pub mod metadata; pub trait LayoutOps { fn retrieve_attributes(&mut self, arr: &Array) -> StorageResult {