Skip to content

Commit

Permalink
Implementing store creation and adding option to error if exists
Browse files Browse the repository at this point in the history
  • Loading branch information
deven96 committed Jun 3, 2024
1 parent 03a38e9 commit 3018c9f
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 12 deletions.
15 changes: 6 additions & 9 deletions ahnlich/server/src/engine/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use types::keyval::StoreName;
use types::keyval::StoreValue;
use types::metadata::MetadataKey;
use types::predicate::PredicateCondition;
use types::server::StoreInfo;
use types::similarity::Algorithm;
/// A hash of Store key, this is more preferable when passing around references as arrays can be
/// potentially larger
Expand Down Expand Up @@ -58,14 +59,6 @@ impl From<&StoreKey> for StoreKeyId {
}
}

/// StoreInfo just shows store name, size and length
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct StoreInfo {
pub name: StoreName,
pub len: usize,
pub size_in_bytes: usize,
}

/// StoreUpsert shows how many entries were inserted and updated during a store add call
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct StoreUpsert {
Expand Down Expand Up @@ -210,12 +203,13 @@ impl StoreHandler {
.collect()
}

/// Matches CREATE - Creates a store if not exist, else return an error
/// Matches CREATESTORE - Creates a store if not exist, else return an error
pub(crate) fn create_store(
&self,
store_name: StoreName,
dimension: NonZeroUsize,
predicates: Vec<MetadataKey>,
error_if_exists: bool,
) -> Result<(), ServerError> {
if self
.stores
Expand All @@ -225,6 +219,7 @@ impl StoreHandler {
&self.stores.guard(),
)
.is_err()
&& error_if_exists
{
return Err(ServerError::StoreAlreadyExists(store_name));
}
Expand Down Expand Up @@ -465,6 +460,7 @@ mod tests {
StoreName(store_name.to_string()),
NonZeroUsize::new(size).unwrap(),
predicates,
true,
)
});
handle
Expand Down Expand Up @@ -492,6 +488,7 @@ mod tests {
StoreName(store_name.to_string()),
NonZeroUsize::new(size).unwrap(),
predicates,
true,
)
});
handle
Expand Down
21 changes: 20 additions & 1 deletion ahnlich/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ impl Server {
// TODO: Add cleanup for instance persistence
// just cancelling the token alone does not give enough time for each task to shutdown,
// there must be other measures in place to ensure proper cleanup
if let Err(_) = self
if self
.shutdown_token
.shutdown_with_limit(Duration::from_secs(10))
.await
.is_err()
{
log::error!("Server shutdown took longer than timeout");
}
Expand Down Expand Up @@ -176,6 +177,24 @@ impl ServerTask {
Query::Ping => Ok(ServerResponse::Pong),
Query::InfoServer => Ok(ServerResponse::InfoServer(self.server_info())),
Query::ListClients => Ok(ServerResponse::ClientList(self.client_handler.list())),
Query::ListStores => {
Ok(ServerResponse::StoreList(self.store_handler.list_stores()))
}
Query::CreateStore {
store,
dimension,
create_predicates,
error_if_exists,
} => self
.store_handler
.create_store(
store,
dimension,
create_predicates.into_iter().collect(),
error_if_exists,
)
.map(|_| ServerResponse::Unit)
.map_err(|e| format!("{e}")),
_ => Err("Response not implemented".to_string()),
})
}
Expand Down
69 changes: 69 additions & 0 deletions ahnlich/server/tests/server_test.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use futures::future::join_all;
use server::cli::ServerConfig;
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::time::SystemTime;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::time::{timeout, Duration};
use types::bincode::BinCodeSerAndDeser;
use types::keyval::StoreName;
use types::query::Query;
use types::query::ServerQuery;
use types::server::ConnectedClient;
use types::server::ServerInfo;
use types::server::ServerResponse;
use types::server::ServerResult;
use types::server::StoreInfo;

#[tokio::test]
async fn test_server_client_info() {
Expand Down Expand Up @@ -54,6 +57,72 @@ async fn test_server_client_info() {
query_server_assert_result(&mut reader, message, expected.clone()).await;
}

#[tokio::test]
async fn test_simple_stores_list() {
let server = server::Server::new(&ServerConfig::default())
.await
.expect("Could not initialize server");
let address = server.local_addr().expect("Could not get local addr");
let _ = tokio::spawn(async move { server.start().await });
// Allow some time for the server to start
tokio::time::sleep(Duration::from_millis(100)).await;
let message = ServerQuery::from_queries(&[Query::ListStores]);
let mut expected = ServerResult::with_capacity(1);
let expected_response = HashSet::from_iter([]);
expected.push(Ok(ServerResponse::StoreList(expected_response)));
let stream = TcpStream::connect(address).await.unwrap();
let mut reader = BufReader::new(stream);
query_server_assert_result(&mut reader, message, expected).await
}

#[tokio::test]
async fn test_create_stores() {
let server = server::Server::new(&ServerConfig::default())
.await
.expect("Could not initialize server");
let address = server.local_addr().expect("Could not get local addr");
let _ = tokio::spawn(async move { server.start().await });
// Allow some time for the server to start
tokio::time::sleep(Duration::from_millis(100)).await;
let message = ServerQuery::from_queries(&[
Query::CreateStore {
store: StoreName("Main".to_string()),
dimension: NonZeroUsize::new(3).unwrap(),
create_predicates: HashSet::new(),
error_if_exists: true,
},
// difference in dimensions don't matter as name is the same so this should error
Query::CreateStore {
store: StoreName("Main".to_string()),
dimension: NonZeroUsize::new(2).unwrap(),
create_predicates: HashSet::new(),
error_if_exists: true,
},
// Should not error despite existing
Query::CreateStore {
store: StoreName("Main".to_string()),
dimension: NonZeroUsize::new(2).unwrap(),
create_predicates: HashSet::new(),
error_if_exists: false,
},
Query::ListStores,
]);
let mut expected = ServerResult::with_capacity(1);
expected.push(Ok(ServerResponse::Unit));
expected.push(Err("Store Main already exists".to_string()));
expected.push(Ok(ServerResponse::Unit));
expected.push(Ok(ServerResponse::StoreList(HashSet::from_iter([
StoreInfo {
name: StoreName("Main".to_string()),
len: 0,
size_in_bytes: 1712,
},
]))));
let stream = TcpStream::connect(address).await.unwrap();
let mut reader = BufReader::new(stream);
query_server_assert_result(&mut reader, message, expected).await
}

#[tokio::test]
async fn test_run_server_echos() {
let server = server::Server::new(&ServerConfig::default())
Expand Down
3 changes: 2 additions & 1 deletion ahnlich/types/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use serde::Serialize;
/// - First 8 bytes must contain length of the entire vec of queries
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum Query {
Create {
CreateStore {
store: StoreName,
dimension: NonZeroUsize,
create_predicates: HashSet<MetadataKey>,
error_if_exists: bool,
},
GetKey {
keys: Vec<StoreKey>,
Expand Down
10 changes: 10 additions & 0 deletions ahnlich/types/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::bincode::BinCodeSerAndDeser;
use crate::keyval::StoreName;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashSet;
Expand All @@ -13,10 +14,19 @@ pub enum ServerResponse {
Pong,
// List of connected clients. Potentially outdated at the point of read
ClientList(HashSet<ConnectedClient>),
StoreList(HashSet<StoreInfo>),
InfoServer(ServerInfo),
// TODO: Define return types for queries, e.t.c
}

/// StoreInfo just shows store name, size and length
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct StoreInfo {
pub name: StoreName,
pub len: usize,
pub size_in_bytes: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct ServerInfo {
pub address: String,
Expand Down
2 changes: 1 addition & 1 deletion docs/draft.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<!-- - `CONNECT`-->
<!-- - `DISCONNECT`-->
<!-- - `SHUTDOWNSERVER`: shut down basically discounts from all connected clients, performs cleanup before killing the server-->
- `CREATE`: Create a store which must have a unique name with respect to the server.
- `CREATESTORE`: Create a store which must have a unique name with respect to the server.
Create can take in name_of_store, dimensions_of_vectors(immutable) to be stored in that store, ability to create predicate indices
- `GETKEY`: takes in store, key and direct return of key within store matching the input key

Expand Down

0 comments on commit 3018c9f

Please sign in to comment.