diff --git a/examples/ntriples/main.rs b/examples/ntriples/main.rs index 8ab2feb..3469d46 100644 --- a/examples/ntriples/main.rs +++ b/examples/ntriples/main.rs @@ -1,6 +1,6 @@ use remote_hdt::error::RemoteHDTError; use remote_hdt::storage::layout::tabular::TabularLayout; -use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization}; +use remote_hdt::storage::params::*; use remote_hdt::storage::Storage; pub fn main() -> Result<(), RemoteHDTError> { @@ -8,7 +8,7 @@ pub fn main() -> Result<(), RemoteHDTError> { Backend::FileSystem("root.zarr"), "examples/ntriples/rdf.nt", ChunkingStrategy::Chunk, - ReferenceSystem::SPO, + Optimization::Storage(ReferenceSystem::SPO), )?; Ok(()) diff --git a/examples/rdf_xml/main.rs b/examples/rdf_xml/main.rs index c415701..7682821 100644 --- a/examples/rdf_xml/main.rs +++ b/examples/rdf_xml/main.rs @@ -1,6 +1,6 @@ use remote_hdt::error::RemoteHDTError; use remote_hdt::storage::layout::tabular::TabularLayout; -use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization}; +use remote_hdt::storage::params::*; use remote_hdt::storage::Storage; pub fn main() -> Result<(), RemoteHDTError> { @@ -8,7 +8,7 @@ pub fn main() -> Result<(), RemoteHDTError> { Backend::FileSystem("root.zarr"), "examples/rdf_xml/rdf.rdf", ChunkingStrategy::Chunk, - ReferenceSystem::SPO, + Optimization::Storage(ReferenceSystem::SPO), )?; Ok(()) diff --git a/examples/serialize.rs b/examples/serialize.rs index 9ca4192..71d23ba 100644 --- a/examples/serialize.rs +++ b/examples/serialize.rs @@ -1,6 +1,6 @@ use remote_hdt::error::RemoteHDTError; use remote_hdt::storage::layout::matrix::MatrixLayout; -use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization}; +use remote_hdt::storage::params::*; use remote_hdt::storage::Storage; use std::env; use std::time::Instant; @@ -21,7 +21,7 @@ fn main() -> Result<(), RemoteHDTError> { Backend::FileSystem(zarr_path), rdf_path, ChunkingStrategy::Sharding(*shard_size), - ReferenceSystem::SPO, + Optimization::Query, )?; println!("Elapsed time: {:.2?}", before.elapsed()); diff --git a/examples/turtle/main.rs b/examples/turtle/main.rs index 76fe24a..cd97960 100644 --- a/examples/turtle/main.rs +++ b/examples/turtle/main.rs @@ -1,6 +1,6 @@ use remote_hdt::error::RemoteHDTError; use remote_hdt::storage::layout::tabular::TabularLayout; -use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization}; +use remote_hdt::storage::params::*; use remote_hdt::storage::Storage; pub fn main() -> Result<(), RemoteHDTError> { @@ -8,7 +8,7 @@ pub fn main() -> Result<(), RemoteHDTError> { Backend::FileSystem("root.zarr"), "examples/turtle/rdf.ttl", ChunkingStrategy::Chunk, - ReferenceSystem::SPO, + Optimization::Storage(ReferenceSystem::SPO), )?; Ok(()) diff --git a/src/dictionary.rs b/src/dictionary.rs index d58d06c..58b036b 100644 --- a/src/dictionary.rs +++ b/src/dictionary.rs @@ -7,7 +7,6 @@ use super::utils::hash_to_set; #[derive(Clone)] pub struct Dictionary { - reference_system: ReferenceSystem, subjects: Set, predicates: Set, objects: Set, @@ -16,7 +15,6 @@ pub struct Dictionary { impl Default for Dictionary { fn default() -> Self { Dictionary { - reference_system: ReferenceSystem::SPO, subjects: Set::new(vec!["PlaceHolder"]).unwrap(), predicates: Set::new(vec!["PlaceHolder"]).unwrap(), objects: Set::new(vec!["PlaceHolder"]).unwrap(), @@ -26,13 +24,11 @@ impl Default for Dictionary { impl Dictionary { pub(crate) fn from_vec_str( - reference_system: ReferenceSystem, subjects: &Vec, predicates: &Vec, objects: &Vec, ) -> Self { Dictionary { - reference_system, subjects: Set::new(subjects).unwrap(), predicates: Set::new(predicates).unwrap(), objects: Set::new(objects).unwrap(), @@ -40,13 +36,11 @@ impl Dictionary { } pub(crate) fn from_set_terms( - reference_system: ReferenceSystem, subjects: HashSet, predicates: HashSet, objects: HashSet, ) -> Self { Dictionary { - reference_system, subjects: Set::new(hash_to_set(subjects)).unwrap(), predicates: Set::new(hash_to_set(predicates)).unwrap(), objects: Set::new(hash_to_set(objects)).unwrap(), @@ -77,13 +71,13 @@ impl Dictionary { self.objects.to_owned() } - pub fn get_reference_system(&self) -> ReferenceSystem { - self.reference_system.to_owned() - } - - pub fn get_subject_idx(&self, subject: &str) -> Option { + pub fn get_subject_idx( + &self, + subject: &str, + reference_system: &ReferenceSystem, + ) -> Option { let mut locator = self.subjects.locator(); - match self.reference_system { + match reference_system { ReferenceSystem::PSO | ReferenceSystem::OSP => { locator.run(subject).map(|value| value + 1) } @@ -91,13 +85,21 @@ impl Dictionary { } } - pub fn get_subject_idx_unchecked(&self, subject: &str) -> usize { - self.get_subject_idx(subject).unwrap() + pub fn get_subject_idx_unchecked( + &self, + subject: &str, + reference_system: &ReferenceSystem, + ) -> usize { + self.get_subject_idx(subject, reference_system).unwrap() } - pub fn get_predicate_idx(&self, predicate: &str) -> Option { + pub fn get_predicate_idx( + &self, + predicate: &str, + reference_system: &ReferenceSystem, + ) -> Option { let mut locator = self.predicates.locator(); - match self.reference_system { + match reference_system { ReferenceSystem::SPO | ReferenceSystem::OPS => { locator.run(predicate).map(|value| value + 1) } @@ -105,13 +107,21 @@ impl Dictionary { } } - pub fn get_predicate_idx_unchecked(&self, predicate: &str) -> usize { - self.get_predicate_idx(predicate).unwrap() + pub fn get_predicate_idx_unchecked( + &self, + predicate: &str, + reference_system: &ReferenceSystem, + ) -> usize { + self.get_predicate_idx(predicate, reference_system).unwrap() } - pub fn get_object_idx(&self, object: &str) -> Option { + pub fn get_object_idx( + &self, + object: &str, + reference_system: &ReferenceSystem, + ) -> Option { let mut locator = self.objects.locator(); - match self.reference_system { + match reference_system { ReferenceSystem::SOP | ReferenceSystem::POS => { locator.run(object).map(|value| value + 1) } @@ -119,7 +129,11 @@ impl Dictionary { } } - pub fn get_object_idx_unchecked(&self, object: &str) -> usize { - self.get_object_idx(object).unwrap() + pub fn get_object_idx_unchecked( + &self, + object: &str, + reference_system: &ReferenceSystem, + ) -> usize { + self.get_object_idx(object, reference_system).unwrap() } } diff --git a/src/error.rs b/src/error.rs index a810e88..e6ac04f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -48,6 +48,8 @@ pub enum RemoteHDTError { ObjectsNotInJSON, #[error("The Reference System has not been serialized properly")] ReferenceSystemNotInJSON, + #[error("The Optimization has not been serialized properly")] + OptimizationNotInJSON, #[error("Error serializing the triples of the Graph")] TripleSerialization, #[error("The provided path is not valid")] diff --git a/src/io/mod.rs b/src/io/mod.rs index 7bf6a33..0d29716 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -50,14 +50,16 @@ trait Backend::Error>> { ReferenceSystem::OSP | ReferenceSystem::OPS => objects.len(), } ]; - let dictionary = - Dictionary::from_set_terms(reference_system.to_owned(), subjects, predicates, objects); + let dictionary = Dictionary::from_set_terms(subjects, predicates, objects); if let Err(err) = Self::parser_fn(path, &mut |triple: Triple| { { - let sidx = dictionary.get_subject_idx_unchecked(&triple.subject.to_string()); - let pidx = dictionary.get_predicate_idx_unchecked(&triple.predicate.to_string()); - let oidx = dictionary.get_object_idx_unchecked(&triple.object.to_string()); + let sidx = dictionary + .get_subject_idx_unchecked(&triple.subject.to_string(), reference_system); + let pidx = dictionary + .get_predicate_idx_unchecked(&triple.predicate.to_string(), reference_system); + let oidx = dictionary + .get_object_idx_unchecked(&triple.object.to_string(), reference_system); match reference_system { ReferenceSystem::SPO => { diff --git a/src/main.rs b/src/main.rs index b476eb9..d0ecff5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use clap::Parser; use remote_hdt::storage::layout::tabular::TabularLayout; use remote_hdt::storage::params::Backend; use remote_hdt::storage::params::ChunkingStrategy; -use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Optimization; use remote_hdt::storage::params::Serialization; use remote_hdt::storage::Storage; use remote_hdt::storage::StorageResult; @@ -13,7 +13,6 @@ struct Args { /// Input RDF file #[arg(short, long)] rdf: String, - /// Output Zarr directory #[arg(short, long, default_value = "root.zarr")] zarr: String, @@ -25,7 +24,7 @@ fn main() -> StorageResult<()> { Backend::FileSystem(&args.zarr), &args.rdf, ChunkingStrategy::Chunk, - ReferenceSystem::SPO, + Optimization::Query, )?; Ok(()) } diff --git a/src/storage/layout/matrix.rs b/src/storage/layout/matrix.rs index f2bc735..f840df6 100644 --- a/src/storage/layout/matrix.rs +++ b/src/storage/layout/matrix.rs @@ -38,7 +38,7 @@ impl Layout for MatrixLayout { fn chunk_shape( &self, - chunking_strategy: ChunkingStrategy, + chunking_strategy: &ChunkingStrategy, dimensionality: &Dimensionality, ) -> ChunkGrid { vec![ diff --git a/src/storage/layout/mod.rs b/src/storage/layout/mod.rs index bca148f..ed48bd8 100644 --- a/src/storage/layout/mod.rs +++ b/src/storage/layout/mod.rs @@ -9,6 +9,7 @@ use zarrs::array::DataType; use zarrs::array::DimensionName; use zarrs::array::FillValue; use zarrs::array_subset::ArraySubset; +use zarrs::group::Group; use zarrs::storage::store::FilesystemStore; use zarrs::storage::ReadableStorageTraits; @@ -19,6 +20,7 @@ use crate::utils::columns_per_shard; use crate::utils::rows_per_shard; use crate::utils::value_to_term; +use super::params::Optimization; use super::ChunkingStrategy; use super::Dimensionality; use super::ReferenceSystem; @@ -31,6 +33,24 @@ pub mod matrix; pub mod tabular; pub trait LayoutOps { + fn retrieve_group_attributes( + &mut self, + group: &Group, + ) -> StorageResult { + // 4. We get the attributes so we can obtain some values that we will need + let attributes = group.attributes(); + + let optimization: Optimization = match attributes.get("optimization") { + Some(reference_system) => reference_system, + None => return Err(RemoteHDTError::OptimizationNotInJSON), + } + .as_str() + .unwrap() + .into(); + + Ok(optimization) + } + fn retrieve_attributes( &mut self, arr: &Array, @@ -51,20 +71,7 @@ pub trait LayoutOps { None => return Err(RemoteHDTError::ObjectsNotInJSON), }); - let reference_system: ReferenceSystem = match attributes.get("reference_system") { - Some(reference_system) => reference_system, - None => return Err(RemoteHDTError::ReferenceSystemNotInJSON), - } - .as_str() - .unwrap() - .into(); - - Ok(Dictionary::from_vec_str( - reference_system, - subjects, - predicates, - objects, - )) + Ok(Dictionary::from_vec_str(subjects, predicates, objects)) } fn serialize(&mut self, arr: &Array, graph: Graph) -> StorageResult<()> { @@ -172,7 +179,7 @@ pub trait Layout: LayoutOps { fn data_type(&self) -> DataType; fn chunk_shape( &self, - chunking_strategy: ChunkingStrategy, + chunking_strategy: &ChunkingStrategy, dimensionality: &Dimensionality, ) -> ChunkGrid; fn fill_value(&self) -> FillValue; diff --git a/src/storage/layout/tabular.rs b/src/storage/layout/tabular.rs index be6c520..7e64554 100644 --- a/src/storage/layout/tabular.rs +++ b/src/storage/layout/tabular.rs @@ -32,7 +32,7 @@ impl Layout for TabularLayout { DataType::UInt32 } - fn chunk_shape(&self, chunking_strategy: ChunkingStrategy, _: &Dimensionality) -> ChunkGrid { + fn chunk_shape(&self, chunking_strategy: &ChunkingStrategy, _: &Dimensionality) -> ChunkGrid { vec![chunking_strategy.into(), NonZeroU64::new(3).unwrap()].into() // TODO: make this a constant value } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 2047967..6bbfe62 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -6,7 +6,7 @@ use std::sync::atomic::AtomicU32; use std::sync::Arc; use zarrs::array::Array; use zarrs::array::ArrayBuilder; -use zarrs::array_subset::ArraySubset; +use zarrs::group::Group; use zarrs::group::GroupBuilder; use zarrs::storage::store::FilesystemStore; use zarrs::storage::store::HTTPStore; @@ -22,6 +22,7 @@ use self::layout::Layout; use self::params::Backend; use self::params::ChunkingStrategy; use self::params::Dimensionality; +use self::params::Optimization; use self::params::ReferenceSystem; use self::params::Serialization; @@ -33,9 +34,8 @@ pub type ZarrArray = CsMat; type AtomicZarrType = AtomicU32; pub type StorageResult = Result; -const ARRAY_NAME: &str = "/group/RemoteHDT"; // TODO: parameterize this - pub struct Storage { + // TODO: think about which of the fields are actually required dictionary: Dictionary, dimensionality: Dimensionality, layout: Box>, @@ -75,7 +75,7 @@ impl Storage { store: Backend<'a>, rdf_path: &'a str, chunking_strategy: ChunkingStrategy, - reference_system: ReferenceSystem, + optimization: Optimization, // threading_strategy: ThreadingStrategy, TODO: implement this ) -> StorageResult<&mut Self> { let path = match store { @@ -94,7 +94,14 @@ impl Storage { let store = Arc::new(FilesystemStore::new(path)?); // Create a group and write metadata to filesystem - let group = GroupBuilder::new().build(store.clone(), "/group")?; + let group = GroupBuilder::new() + .attributes({ + let mut attributes = Map::new(); + attributes.insert("optimization".into(), optimization.as_ref().into()); + // TODO: move the subjects, predicates and objects to here + attributes + }) + .build(store.clone(), "/group")?; group.store_metadata()?; // TODO: rayon::ThreadPoolBuilder::new() @@ -102,22 +109,52 @@ impl Storage { // .build_global() // .unwrap(); - // 3. Import the RDF dump using `rdf-rs` + match optimization { + Optimization::Query => { + for reference_system in [ + ReferenceSystem::SPO, + ReferenceSystem::PSO, + ReferenceSystem::OPS, + ] { + self.serialize_array(&store, rdf_path, reference_system, &chunking_strategy)? + } + } + Optimization::Storage(ref reference_system) => self.serialize_array( + &store, + rdf_path, + reference_system.to_owned(), + &chunking_strategy, + )?, + } + + Ok(self) + } + + fn serialize_array<'a>( + &mut self, + store: &Arc, + rdf_path: &'a str, + reference_system: ReferenceSystem, + chunking_strategy: &ChunkingStrategy, + ) -> StorageResult<()> { + // Import the RDF dump using `rdf-rs` let graph = match RdfParser::parse(rdf_path, &reference_system) { Ok((graph, dictionary)) => { self.dictionary = dictionary; - self.dimensionality = Dimensionality::new(&self.dictionary, &graph); + self.dimensionality = + Dimensionality::new(&self.dictionary, &graph, &reference_system); graph } Err(_) => return Err(RemoteHDTError::RdfParse), }; - // 4. Build the structure of the Array; as such, several parameters of it are - // tweaked. Namely, the size of the array, the size of the chunks, the name - // of the different dimensions and the default values let subjects = self.dictionary.subjects(); let predicates = self.dictionary.predicates(); let objects = self.dictionary.objects(); + + // Build the structure of the Array; as such, several parameters of it are + // tweaked. Namely, the size of the array, the size of the chunks, the name + // of the different dimensions and the default values let arr = ArrayBuilder::new( self.layout.shape(&self.dimensionality), self.layout.data_type(), @@ -135,15 +172,15 @@ impl Storage { attributes.insert("reference_system".into(), reference_system.as_ref().into()); attributes }) - .build(store.clone(), ARRAY_NAME)?; + .build( + store.clone(), + &format!("/group/{}", reference_system.as_ref()), + )?; arr.store_metadata()?; self.layout.serialize(&arr, graph)?; - let shape = ArraySubset::new_with_ranges(&[0..10, 1..2]); - arr.retrieve_array_subset_elements::(&shape).unwrap(); - - Ok(self) + Ok(()) } pub fn load( @@ -163,11 +200,23 @@ impl Storage { Backend::HTTP(url) => Arc::new(HTTPStore::new(url)?), }; - let arr = Array::new(store, ARRAY_NAME)?; - let dictionary = self.layout.retrieve_attributes(&arr)?; - self.dictionary = dictionary; - self.reference_system = self.dictionary.get_reference_system(); - self.dimensionality = Dimensionality::new(&self.dictionary, &Graph::default()); + let group = Group::new(store.to_owned(), "/group")?; + let optimization = self.layout.retrieve_group_attributes(&group)?; + + match optimization { + Optimization::Query => todo!(), + Optimization::Storage(reference_system) => { + let arr = Array::new(store, &format!("/group/{}", reference_system.as_ref()))?; + let dictionary = self.layout.retrieve_attributes(&arr)?; + self.dictionary = dictionary; + self.reference_system = reference_system; + self.dimensionality = Dimensionality::new( + &self.dictionary, + &Graph::default(), + &self.reference_system, + ); + } + } match self.serialization { Serialization::Zarr => self.array = Some(arr), diff --git a/src/storage/ops.rs b/src/storage/ops.rs index afb7de2..7e7baf1 100644 --- a/src/storage/ops.rs +++ b/src/storage/ops.rs @@ -21,7 +21,10 @@ pub trait Ops { impl Ops for Storage { fn get_subject(&self, subject: &str) -> OpsResult { - let index = match self.dictionary.get_subject_idx(subject) { + let index = match self + .dictionary + .get_subject_idx(subject, &self.reference_system) + { Some(index) => index, None => return Err(OpsError::SubjectNotFound), }; @@ -51,7 +54,10 @@ impl Ops for Storage { } fn get_predicate(&self, predicate: &str) -> OpsResult { - let index = match self.dictionary.get_predicate_idx(predicate) { + let index = match self + .dictionary + .get_predicate_idx(predicate, &self.reference_system) + { Some(index) => index, None => return Err(OpsError::PredicateNotFound), }; @@ -79,7 +85,10 @@ impl Ops for Storage { } fn get_object(&self, object: &str) -> OpsResult { - let index = match self.dictionary.get_object_idx(object) { + let index = match self + .dictionary + .get_object_idx(object, &self.reference_system) + { Some(index) => index, None => return Err(OpsError::ObjectNotFound), }; diff --git a/src/storage/params.rs b/src/storage/params.rs index a21559b..d4dbd5a 100644 --- a/src/storage/params.rs +++ b/src/storage/params.rs @@ -34,19 +34,24 @@ pub enum ReferenceSystem { OPS, } +pub enum Optimization { + Query, + Storage(ReferenceSystem), +} + #[derive(Default)] pub struct Dimensionality { graph_size: Option, pub(crate) first_term_size: usize, - _second_term_size: usize, + // second_term_size: usize, pub(crate) third_term_size: usize, } -impl From for NonZeroU64 { - fn from(value: ChunkingStrategy) -> Self { +impl From<&ChunkingStrategy> for NonZeroU64 { + fn from(value: &ChunkingStrategy) -> Self { match value { ChunkingStrategy::Chunk => NonZeroU64::new(1).unwrap(), - ChunkingStrategy::Sharding(size) => NonZeroU64::new(size).unwrap(), + ChunkingStrategy::Sharding(size) => NonZeroU64::new(*size).unwrap(), ChunkingStrategy::Best => NonZeroU64::new(16).unwrap(), // TODO: set to the number of threads } } @@ -79,24 +84,46 @@ impl From<&str> for ReferenceSystem { } } +impl AsRef for Optimization { + fn as_ref(&self) -> &str { + match self { + Optimization::Query => "query", + Optimization::Storage(reference_system) => reference_system.as_ref(), + } + } +} + +impl From<&str> for Optimization { + fn from(value: &str) -> Self { + match value { + "query" => Optimization::Query, + _ => Optimization::Storage(value.into()), + } + } +} + impl Dimensionality { - pub(crate) fn new(dictionary: &Dictionary, graph: &Graph) -> Self { + pub(crate) fn new( + dictionary: &Dictionary, + graph: &Graph, + reference_system: &ReferenceSystem, + ) -> Self { Dimensionality { graph_size: graph .iter() .map(|triples| triples.len()) .reduce(|acc, a| acc + a), - first_term_size: match dictionary.get_reference_system() { + first_term_size: match reference_system { ReferenceSystem::SPO | ReferenceSystem::SOP => dictionary.subjects_size(), ReferenceSystem::POS | ReferenceSystem::PSO => dictionary.predicates_size(), ReferenceSystem::OPS | ReferenceSystem::OSP => dictionary.objects_size(), }, - _second_term_size: match dictionary.get_reference_system() { - ReferenceSystem::PSO | ReferenceSystem::OSP => dictionary.subjects_size(), - ReferenceSystem::SPO | ReferenceSystem::OPS => dictionary.predicates_size(), - ReferenceSystem::SOP | ReferenceSystem::POS => dictionary.objects_size(), - }, - third_term_size: match dictionary.get_reference_system() { + // second_term_size: match dictionary.get_reference_system() { + // ReferenceSystem::PSO | ReferenceSystem::OSP => dictionary.subjects_size(), + // ReferenceSystem::SPO | ReferenceSystem::OPS => dictionary.predicates_size(), + // ReferenceSystem::SOP | ReferenceSystem::POS => dictionary.objects_size(), + // }, + third_term_size: match reference_system { ReferenceSystem::POS | ReferenceSystem::OPS => dictionary.subjects_size(), ReferenceSystem::SOP | ReferenceSystem::OSP => dictionary.predicates_size(), ReferenceSystem::SPO | ReferenceSystem::PSO => dictionary.objects_size(), diff --git a/tests/common/mod.rs b/tests/common/mod.rs index e97dae3..6e94086 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -3,7 +3,7 @@ use remote_hdt::dictionary::Dictionary; use remote_hdt::storage::params::Backend; use remote_hdt::storage::params::ChunkingStrategy; -use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Optimization; use remote_hdt::storage::Storage; use sprs::CsMat; use sprs::TriMat; @@ -22,7 +22,7 @@ pub fn setup( path: &str, storage: &mut Storage, chunking_strategy: ChunkingStrategy, - reference_system: ReferenceSystem, + optimization: Optimization, ) { if File::open(path).is_err() { storage @@ -30,7 +30,7 @@ pub fn setup( Backend::FileSystem(path), "resources/rdf.nt", chunking_strategy, - reference_system, + optimization, ) .unwrap(); } else {