Skip to content

Commit

Permalink
WIP: Implementing BinCodeSerAndDeser for queries and responses
Browse files Browse the repository at this point in the history
  • Loading branch information
deven96 committed Jun 2, 2024
1 parent bb01067 commit f6d001e
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 66 deletions.
2 changes: 1 addition & 1 deletion ahnlich/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ itertools.workspace = true
clap.workspace = true
env_logger.workspace = true
log.workspace = true
thiserror = "1.0"
types = { path = "../types", version = "*" }

tokio = { version = "1.37.0", features = ["net", "macros", "io-util", "rt-multi-thread", "sync"] }

[dev-dependencies]
Expand Down
9 changes: 8 additions & 1 deletion ahnlich/server/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
use thiserror::Error;
use types::keyval::StoreName;
use types::metadata::MetadataKey;

/// TODO: Move to shared rust types so library can deserialize it from the TCP response
#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
#[derive(Error, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub enum ServerError {
#[error("Predicate {0} not found in store, attempt reindexing with predicate")]
PredicateNotFound(MetadataKey),
#[error("Store {0} not found")]
StoreNotFound(StoreName),
#[error("Store {0} already exists")]
StoreAlreadyExists(StoreName),
#[error("Store dimension is [{store_dimension}], input dimension of [{input_dimension}] was specified")]
StoreDimensionMismatch {
store_dimension: usize,
input_dimension: usize,
},
#[error("Could not deserialize query, error is {0}")]
QueryDeserializeError(String),
}
29 changes: 27 additions & 2 deletions ahnlich/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use types::bincode::BinCodeSerAndDeser;
use types::bincode::LENGTH_HEADER_SIZE;
use types::query::Query;
use types::query::ServerQuery;
use types::server::ServerResponse;
use types::server::ServerResult;

#[derive(Debug)]
pub struct Server {
Expand Down Expand Up @@ -79,15 +85,34 @@ impl ServerTask {
async fn process(self) -> IoResult<()> {
self.stream.readable().await?;
let mut reader = BufReader::new(self.stream);
let mut length_buf = [0u8; types::query::LENGTH_HEADER_SIZE];
let mut length_buf = [0u8; LENGTH_HEADER_SIZE];
loop {
reader.read_exact(&mut length_buf).await?;
let data_length = u64::from_be_bytes(length_buf);
let mut data = vec![0u8; data_length as usize];
reader.read_exact(&mut data).await?;
reader.get_mut().write_all(&data).await?;
// TODO: Add trace here to catch whenever queries could not be deserialized at all
if let Ok(queries) = ServerQuery::deserialize(false, &data) {
// TODO: Pass in store_handler and use to respond to queries
let results = Self::handle(queries.into_inner());
if let Ok(binary_results) = results.serialize() {
reader.get_mut().write_all(&binary_results).await?;
}
}
}
}

fn handle(queries: Vec<Query>) -> ServerResult {
let mut result = ServerResult::with_capacity(queries.len());
for query in queries {
result.push(match query {
Query::Ping => Ok(ServerResponse::Pong),
Query::InfoServer => Ok(ServerResponse::Unit),
_ => Err("Response not implemented".to_string()),
})
}
result
}
}

#[cfg(test)]
Expand Down
34 changes: 25 additions & 9 deletions ahnlich/server/tests/server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use server::cli::ServerConfig;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::time::{timeout, Duration};
use types::query::deserialize_queries;
use types::bincode::BinCodeSerAndDeser;
use types::query::Query;
use types::query::SerializedQuery;
use types::query::ServerQuery;
use types::server::ServerResponse;
use types::server::ServerResult;

#[tokio::test]
async fn test_run_server_echos() {
Expand All @@ -22,20 +24,34 @@ async fn test_run_server_echos() {
let mut reader = BufReader::new(stream);

// Message to send
let message = vec![Query::InfoServer, Query::Ping];
let serialized_message = SerializedQuery::from_queries(&message).unwrap();
let message = ServerQuery::from_queries(&[Query::InfoServer, Query::Ping]);
let serialized_message = message.serialize().unwrap();

// Send the message
reader.write_all(serialized_message.data()).await.unwrap();

let mut response = vec![0u8; serialized_message.length()];
reader.write_all(&serialized_message).await.unwrap();

// get length of response
let mut length_header = [0u8; types::bincode::LENGTH_HEADER_SIZE];
timeout(
Duration::from_secs(1),
reader.read_exact(&mut length_header),
)
.await
.unwrap()
.unwrap();
let data_length = u64::from_be_bytes(length_header);
let mut response = vec![0u8; data_length as usize];

timeout(Duration::from_secs(1), reader.read_exact(&mut response))
.await
.unwrap()
.unwrap();

let deserialized = deserialize_queries(&response).unwrap();
let mut expected = ServerResult::with_capacity(2);
expected.push(Ok(ServerResponse::Unit));
expected.push(Ok(ServerResponse::Pong));

let response = ServerResult::deserialize(false, &response).unwrap();

assert_eq!(message, deserialized);
assert_eq!(response, expected);
}
39 changes: 39 additions & 0 deletions ahnlich/types/src/bincode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use bincode::config::DefaultOptions;
use bincode::config::Options;
use serde::Deserialize;
use serde::Serialize;

pub const LENGTH_HEADER_SIZE: usize = 8;

/// - Length encoding must use fixed int and not var int
/// - Endianess must be Big Endian.
/// - First 8 bytes must contain length of the entire vec of response or queries
///
/// Used to serialize and deserialize queries and responses into bincode
pub trait BinCodeSerAndDeser<'a>
where
Self: Serialize + Deserialize<'a>,
{
fn serialize(&self) -> Result<Vec<u8>, bincode::Error> {
let config = DefaultOptions::new()
.with_fixint_encoding()
.with_big_endian();
let serialized_data = config.serialize(self)?;
let data_length = serialized_data.len() as u64;
// serialization appends the length buffer to be read first
let mut buffer = Vec::with_capacity(LENGTH_HEADER_SIZE + serialized_data.len());
buffer.extend(&data_length.to_be_bytes());
buffer.extend(&serialized_data);
Ok(buffer)
}

fn deserialize(has_length_header: bool, bytes: &'a [u8]) -> Result<Self, bincode::Error> {
let config = DefaultOptions::new()
.with_fixint_encoding()
.with_big_endian();
if has_length_header {
return config.deserialize(&bytes[LENGTH_HEADER_SIZE..]);
}
config.deserialize(bytes)
}
}
12 changes: 12 additions & 0 deletions ahnlich/types/src/keyval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ use ndarray::Array1;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap as StdHashMap;
use std::fmt;

/// Name of a Store
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct StoreName(pub String);

impl fmt::Display for StoreName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

/// A store value for now is a simple key value pair of strings
pub type StoreValue = StdHashMap<MetadataKey, MetadataValue>;

Expand All @@ -25,6 +32,11 @@ impl PartialEq for StoreKey {
if self.0.shape() != other.0.shape() {
return false;
}
// std::f64::EPSILON adheres to the IEEE 754 standard and we use it here to determine when
// two Array1<f64> are extremely similar to the point where the differences are neglible.
// We can modify to allow for greater precision, however we currently only
// use it for PartialEq and not for it's distinctive properties. For that, within the
// server we defer to using StoreKeyId whenever we want to compare distinctive Array1<f64>
self.0
.iter()
.zip(other.0.iter())
Expand Down
17 changes: 2 additions & 15 deletions ahnlich/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,7 @@
pub mod bincode;
pub mod keyval;
pub mod metadata;
pub mod predicate;
pub mod query;
pub mod server;
pub mod similarity;

pub fn add(left: usize, right: usize) -> usize {
left + right
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}
8 changes: 8 additions & 0 deletions ahnlich/types/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use serde::Deserialize;
use serde::Serialize;
use std::fmt;
/// New types for store metadata key and values
#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
#[serde(transparent)]
Expand All @@ -9,6 +10,13 @@ impl MetadataKey {
Self(input)
}
}

impl fmt::Display for MetadataKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
#[serde(transparent)]
pub struct MetadataValue(String);
Expand Down
57 changes: 19 additions & 38 deletions ahnlich/types/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use bincode::config::DefaultOptions;
use bincode::config::Options;
use std::collections::HashSet;
use std::num::NonZeroUsize;

use crate::bincode::BinCodeSerAndDeser;
use crate::keyval::{StoreKey, StoreName, StoreValue};
use crate::metadata::MetadataKey;
use crate::predicate::PredicateCondition;
use crate::similarity::Algorithm;
use serde::Deserialize;
use serde::Serialize;

pub const LENGTH_HEADER_SIZE: usize = 8;
/// All possible queries for the server to respond to
///
///
Expand Down Expand Up @@ -68,48 +66,31 @@ pub enum Query {
Ping,
}

#[derive(Debug)]
pub struct SerializedQuery {
data: Vec<u8>,
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ServerQuery {
queries: Vec<Query>,
}

impl SerializedQuery {
/// Sends an array of queries with their length encoding first specified in 8 bytes
/// Enforces default configuration for bincode serialization
pub fn from_queries(queries: &[Query]) -> Result<Self, bincode::Error> {
// TODO: Make shareable across serialize and deserialize as perhaps a config stored on heap
// initialized via once_cell. Currently cannot be done as Limit must be specified for the
// Options trait and the Limit enum is not made public outside bincode
let config = DefaultOptions::new()
.with_fixint_encoding()
.with_big_endian();
let serialized_data = config.serialize(queries)?;
let data_length = serialized_data.len() as u64;
impl ServerQuery {
pub fn with_capacity(len: usize) -> Self {
Self {
queries: Vec::with_capacity(len),
}
}

let mut buffer = Vec::with_capacity(LENGTH_HEADER_SIZE + serialized_data.len());
buffer.extend(&data_length.to_be_bytes());
buffer.extend(&serialized_data);
Ok(SerializedQuery { data: buffer })
pub fn push(&mut self, entry: Query) {
self.queries.push(entry)
}

pub fn data(&self) -> &[u8] {
&self.data
pub fn from_queries(queries: &[Query]) -> Self {
Self {
queries: queries.to_vec(),
}
}

pub fn length(&self) -> usize {
let mut length_buf = [0u8; LENGTH_HEADER_SIZE];
length_buf.copy_from_slice(&self.data[0..LENGTH_HEADER_SIZE]);
let length = u64::from_be_bytes(length_buf);
length as usize
pub fn into_inner(self) -> Vec<Query> {
self.queries
}
}

/// Receives an array of bytes with their length encoding first specified in 8 bytes
/// Uses default configuration for bincode deserialization to attempt to retrieve queries
pub fn deserialize_queries(queries: &[u8]) -> Result<Vec<Query>, bincode::Error> {
let config = DefaultOptions::new()
.with_fixint_encoding()
.with_big_endian();
let deserialized_data = config.deserialize(queries)?;
Ok(deserialized_data)
}
impl BinCodeSerAndDeser<'_> for ServerQuery {}
40 changes: 40 additions & 0 deletions ahnlich/types/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::bincode::BinCodeSerAndDeser;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashSet;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ServerResponse {
// Unit variant for no action
Unit,
Pong,
// List of connected clients. Potentially outdated at the point of read
ClientList(HashSet<ConnectedClient>),
// TODO: Define return types for queries, e.t.c
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ConnectedClient {
pub address: String,
}

// ServerResult: Given that an array of queries are sent in, we expect that an array of responses
// be returned each being a potential error
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ServerResult {
results: Vec<Result<ServerResponse, String>>,
}

impl BinCodeSerAndDeser<'_> for ServerResult {}

impl ServerResult {
pub fn with_capacity(len: usize) -> Self {
Self {
results: Vec::with_capacity(len),
}
}

pub fn push(&mut self, entry: Result<ServerResponse, String>) {
self.results.push(entry)
}
}

0 comments on commit f6d001e

Please sign in to comment.