Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send the state to the Session backend. #468

Merged
merged 7 commits into from
May 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions gotham/src/middleware/session/backend/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ use futures::prelude::*;
use linked_hash_map::LinkedHashMap;
use log::trace;

use crate::middleware::session::backend::{Backend, NewBackend, SessionFuture};
use crate::middleware::session::{SessionError, SessionIdentifier};
use crate::middleware::session::backend::{
Backend, GetSessionFuture, NewBackend, SetSessionFuture,
};
use crate::middleware::session::SessionIdentifier;
use crate::state::State;

/// Type alias for the `MemoryBackend` storage container.
type MemoryMap = Mutex<LinkedHashMap<String, (Instant, Vec<u8>)>>;
Expand Down Expand Up @@ -80,21 +83,22 @@ impl NewBackend for MemoryBackend {
impl Backend for MemoryBackend {
fn persist_session(
&self,
_: &State,
identifier: SessionIdentifier,
content: &[u8],
) -> Result<(), SessionError> {
) -> Pin<Box<SetSessionFuture>> {
match self.storage.lock() {
Ok(mut storage) => {
storage.insert(identifier.value, (Instant::now(), Vec::from(content)));
Ok(())
Box::pin(future::ok(()))
}
Err(PoisonError { .. }) => {
unreachable!("session memory backend lock poisoned, HashMap panicked?")
}
}
}

fn read_session(&self, identifier: SessionIdentifier) -> Pin<Box<SessionFuture>> {
fn read_session(&self, _: &State, identifier: SessionIdentifier) -> Pin<Box<GetSessionFuture>> {
match self.storage.lock() {
Ok(mut storage) => match storage.get_refresh(&identifier.value) {
Some(&mut (ref mut instant, ref value)) => {
Expand All @@ -109,11 +113,11 @@ impl Backend for MemoryBackend {
}
}

fn drop_session(&self, identifier: SessionIdentifier) -> Result<(), SessionError> {
fn drop_session(&self, _: &State, identifier: SessionIdentifier) -> Pin<Box<SetSessionFuture>> {
match self.storage.lock() {
Ok(mut storage) => {
storage.remove(&identifier.value);
Ok(())
future::ok(()).boxed()
}
Err(PoisonError { .. }) => {
unreachable!("session memory backend lock poisoned, HashMap panicked?")
Expand Down Expand Up @@ -221,21 +225,24 @@ mod tests {
fn memory_backend_test() {
let new_backend = MemoryBackend::new(Duration::from_millis(100));
let bytes: Vec<u8> = (0..64).map(|_| rand::random()).collect();
let state = State::new();
let identifier = SessionIdentifier {
value: "totally_random_identifier".to_owned(),
};

new_backend
.new_backend()
.expect("can't create backend for write")
.persist_session(identifier.clone(), &bytes[..])
.expect("failed to persist");
futures::executor::block_on(
new_backend
.new_backend()
.expect("can't create backend for write")
.persist_session(&state, identifier.clone(), &bytes[..]),
)
.expect("failed to persist");

let received = futures::executor::block_on(
new_backend
.new_backend()
.expect("can't create backend for read")
.read_session(identifier),
.read_session(&state, identifier),
)
.expect("no response from backend")
.expect("session data missing");
Expand All @@ -247,6 +254,7 @@ mod tests {
fn memory_backend_refresh_test() {
let new_backend = MemoryBackend::new(Duration::from_millis(100));
let bytes: Vec<u8> = (0..64).map(|_| rand::random()).collect();
let state = State::new();
let identifier = SessionIdentifier {
value: "totally_random_identifier".to_owned(),
};
Expand All @@ -259,13 +267,19 @@ mod tests {
.new_backend()
.expect("can't create backend for write");

backend
.persist_session(identifier.clone(), &bytes[..])
.expect("failed to persist");
futures::executor::block_on(backend.persist_session(
&state,
identifier.clone(),
&bytes[..],
))
.expect("failed to persist");

backend
.persist_session(identifier2.clone(), &bytes2[..])
.expect("failed to persist");
futures::executor::block_on(backend.persist_session(
&state,
identifier2.clone(),
&bytes2[..],
))
.expect("failed to persist");

{
let mut storage = backend.storage.lock().expect("couldn't lock storage");
Expand All @@ -280,7 +294,7 @@ mod tests {
);
}

futures::executor::block_on(backend.read_session(identifier.clone()))
futures::executor::block_on(backend.read_session(&state, identifier.clone()))
.expect("failed to read session");

{
Expand Down
23 changes: 18 additions & 5 deletions gotham/src/middleware/session/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::pin::Pin;
use futures::prelude::*;

use crate::middleware::session::{SessionError, SessionIdentifier};
use crate::state::State;

/// A type which is used to spawn new `Backend` values.
pub trait NewBackend: Sync + Clone + RefUnwindSafe {
Expand All @@ -16,8 +17,11 @@ pub trait NewBackend: Sync + Clone + RefUnwindSafe {
fn new_backend(&self) -> anyhow::Result<Self::Instance>;
}

/// Type alias for the trait objects returned by `Backend`.
pub type SessionFuture = dyn Future<Output = Result<Option<Vec<u8>>, SessionError>> + Send;
/// Type alias for the trait objects that returned by `Backend`.
pub type GetSessionFuture = dyn Future<Output = Result<Option<Vec<u8>>, SessionError>> + Send;

/// Type alias for the trait objects that set the session in the `Backend`.
pub type SetSessionFuture = dyn Future<Output = Result<(), SessionError>> + Send;

/// A `Backend` receives session data and stores it, and recalls the session data subsequently.
///
Expand All @@ -27,17 +31,26 @@ pub trait Backend: Send {
/// Persists a session, either creating a new session or updating an existing session.
fn persist_session(
&self,
state: &State,
identifier: SessionIdentifier,
content: &[u8],
) -> Result<(), SessionError>;
) -> Pin<Box<SetSessionFuture>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for taking so long to review this - I believe this type signature is incorrect? Without any lifetime information, the GetSessionFuture type needs to live for 'static, which it will not do if it accessed the state in an async context. I think this needs to be changed to something like

persist_session<'fut, 'a: 'fut, 'b: 'fut, 'c: 'fut>(
    &'a self,
    state: &'b State,
    identifier: SessionIdentifier,
    content: &'c [u8]
) -> Pin<Box<GetSessionFuture + 'fut>>

Alternatively, it might make sense to let #[async_trait] deal with the lifetimes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, thanks for reviewing. I'll try to get to these shortly.

The last time I used #[async_trait] it resulting in the rust compiler not being able to produce any decent error messages for any errors in the code that implemented those traits. We ended up changing back to returning Futures instead. Do you know if this problem has been resolved? I would be hesitant to use it unless it has..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only time I used #[async_trait] was to run cargo expand on it to see how it worked because I didn't want my proc macro to invoke another proc macro. So I don't know if this has been changed or has ever been a problem. However, I feel like the code would probably be more readable if we don't introduce this many lifetimes, but if you run into any incorrect/unhelpful error messages, just write the code the explicit way. I'm happy with both solutions.


/// Retrieves a session from the underlying storage.
///
/// The returned future will resolve to an `Option<Vec<u8>>` on success, where a value of
/// `None` indicates that the session is not available for use and a new session should be
/// established.
fn read_session(&self, identifier: SessionIdentifier) -> Pin<Box<SessionFuture>>;
fn read_session(
&self,
state: &State,
identifier: SessionIdentifier,
) -> Pin<Box<GetSessionFuture>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above


/// Drops a session from the underlying storage.
fn drop_session(&self, identifier: SessionIdentifier) -> Result<(), SessionError>;
fn drop_session(
&self,
state: &State,
identifier: SessionIdentifier,
) -> Pin<Box<SetSessionFuture>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

}
93 changes: 52 additions & 41 deletions gotham/src/middleware/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ use serde::{Deserialize, Serialize};

use super::cookie::CookieParser;
use super::{Middleware, NewMiddleware};
use crate::handler::{HandlerError, HandlerFuture};
use crate::handler::{HandlerError, HandlerFuture, HandlerResult};
use crate::helpers::http::response::create_empty_response;
use crate::state::{self, FromState, State, StateData};

mod backend;
mod rng;

pub use self::backend::memory::MemoryBackend;
pub use self::backend::{Backend, NewBackend};
pub use self::backend::{Backend, GetSessionFuture, NewBackend, SetSessionFuture};

const SECURE_COOKIE_PREFIX: &str = "__Secure-";
const HOST_COOKIE_PREFIX: &str = "__Host-";
Expand Down Expand Up @@ -227,12 +227,19 @@ impl SessionCookieConfig {
/// # fn main() {
/// # let backend = MemoryBackend::new(Duration::from_secs(1));
/// # let identifier = SessionIdentifier { value: "u0G6KdfckQgkV0qLANZjjNkEHBU".to_owned() };
/// # let session = MySessionType {
/// # items: vec!["a".into(), "b".into(), "c".into()],
/// # };
/// #
/// # let bytes = bincode::serialize(&session).unwrap();
/// # backend.persist_session(identifier.clone(), &bytes[..]).unwrap();
/// # State::with_new(|state| {
/// # let session = MySessionType {
/// # items: vec!["a".into(), "b".into(), "c".into()],
/// # };
/// # let bytes = bincode::serialize(&session).unwrap();
/// # futures::executor::block_on(backend.persist_session(
/// # &state,
/// # identifier.clone(),
/// # &bytes[..]
/// # ))
/// # .unwrap();
/// # });
/// #
/// # let nm = NewSessionMiddleware::new(backend).with_session_type::<MySessionType>();
/// # let nm = Arc::new(nm);
Expand Down Expand Up @@ -285,11 +292,14 @@ where
/// Discards the session, invalidating it for future use and removing the data from the
/// `Backend`.
// TODO: Add test case that covers this.
pub fn discard(self, state: &mut State) -> Result<(), SessionError> {
pub fn discard(
self,
state: &mut State,
) -> Pin<Box<dyn Future<Output = Result<(), SessionError>>>> {
state.put(SessionDropData {
cookie_config: self.cookie_config,
});
self.backend.drop_session(self.identifier)
self.backend.drop_session(&state, self.identifier)
}

// Create a new, blank `SessionData<T>`
Expand Down Expand Up @@ -818,7 +828,7 @@ where
);

self.backend
.read_session(id.clone())
.read_session(&state, id.clone())
.then(move |r| self.load_session_into_state(state, id, r))
.and_then(move |state| chain(state))
.and_then(persist_session::<T>)
Expand Down Expand Up @@ -861,7 +871,7 @@ where

fn persist_session<T>(
(mut state, mut response): (State, Response<Body>),
) -> impl Future<Output = Result<(State, Response<Body>), (State, HandlerError)>>
) -> Pin<Box<dyn Future<Output = HandlerResult> + Send>>
where
T: Default + Serialize + for<'de> Deserialize<'de> + Send + 'static,
{
Expand All @@ -872,7 +882,7 @@ where
state::request_id(&state)
);
reset_cookie(&mut response, session_drop_data);
return future::ok((state, response)).right_future();
return Box::pin(future::ok((state, response)));
}
None => {
trace!(
Expand All @@ -889,14 +899,12 @@ where
}

match session_data.state {
SessionDataState::Dirty => {
write_session(state, response, session_data).left_future()
}
SessionDataState::Clean => future::ok((state, response)).right_future(),
SessionDataState::Dirty => write_session(state, response, session_data),
SessionDataState::Clean => Box::pin(future::ok((state, response))),
}
}
// Session was discarded with `SessionData::discard`, or otherwise removed
None => future::ok((state, response)).right_future(),
None => Box::pin(future::ok((state, response))),
}
}

Expand Down Expand Up @@ -931,7 +939,7 @@ fn write_session<T>(
state: State,
response: Response<Body>,
session_data: SessionData<T>,
) -> impl Future<Output = Result<(State, Response<Body>), (State, HandlerError)>>
) -> Pin<Box<dyn Future<Output = HandlerResult> + Send>>
where
T: Default + Serialize + for<'de> Deserialize<'de> + Send + 'static,
{
Expand All @@ -946,33 +954,33 @@ where

let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);

return future::ok((state, response));
return Box::pin(future::ok((state, response)));
}
};

let identifier = session_data.identifier;
let slice = &bytes[..];

let result = session_data
session_data
.backend
.persist_session(identifier.clone(), slice);

match result {
Ok(_) => {
trace!(
"[{}] persisted session ({}) successfully",
state::request_id(&state),
identifier.value
);
.persist_session(&state, identifier.clone(), slice)
.then(move |result| match result {
Ok(_) => {
trace!(
"[{}] persisted session ({}) successfully",
state::request_id(&state),
identifier.value
);

future::ok((state, response))
}
Err(_) => {
let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
future::ok((state, response))
}
Err(_) => {
let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);

future::ok((state, response))
}
}
future::ok((state, response))
}
})
.boxed()
}

impl<B, T> SessionMiddleware<B, T>
Expand Down Expand Up @@ -1157,6 +1165,7 @@ mod tests {
fn existing_session() {
let nm = NewSessionMiddleware::default().with_session_type::<TestSession>();
let m = nm.new_middleware().unwrap();
let mut state = State::new();

let identifier = m.random_identifier();
// 64 -> 512 bits = (85 * 6 + 2)
Expand All @@ -1168,9 +1177,11 @@ mod tests {
};
let bytes = bincode::serialize(&session).unwrap();

m.backend
.persist_session(identifier.clone(), &bytes)
.unwrap();
futures::executor::block_on(
m.backend
.persist_session(&state, identifier.clone(), &bytes),
)
.unwrap();

let received: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
let r = received.clone();
Expand All @@ -1192,7 +1203,6 @@ mod tests {
.boxed()
};

let mut state = State::new();
let mut headers = HeaderMap::new();
let cookie = Cookie::build("_gotham_session", identifier.value.clone()).finish();
headers.insert(COOKIE, cookie.to_string().parse().unwrap());
Expand All @@ -1211,8 +1221,9 @@ mod tests {
Err((_, e)) => panic!("error: {:?}", e),
}

let state = State::new();
let m = nm.new_middleware().unwrap();
let bytes = futures::executor::block_on(m.backend.read_session(identifier))
let bytes = futures::executor::block_on(m.backend.read_session(&state, identifier))
.unwrap()
.unwrap();
let updated = bincode::deserialize::<TestSession>(&bytes[..]).unwrap();
Expand Down