Skip to content

Commit

Permalink
♻️ Split off blocking logic
Browse files Browse the repository at this point in the history
  • Loading branch information
rster2002 committed May 11, 2024
1 parent f91d330 commit aecfd9f
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 114 deletions.
9 changes: 8 additions & 1 deletion ed-journals/src/modules.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
pub mod blockers;
/// Contains modules for working with journal log files providing readers for individual files or
/// all files at once.
pub mod logs;
pub mod journal_dir;

/// Contains structs and enums which are shared across events. Things like commodity and material
/// names, ship types, exobiology data etc. can be found here.
pub mod shared;

/// Provides different implementations for blocking current execution, whether it be synchronous or
/// asynchronous.
mod blockers;

/// Provides some utility functions and macros that are used internally.
mod utils;
43 changes: 43 additions & 0 deletions ed-journals/src/modules/blockers/async_blocker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,46 @@
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::{channel, Sender};

#[derive(Debug, Clone)]
pub struct AsyncBlocker {
waiting_sender: Arc<Mutex<(Option<Sender<()>>,)>>,
}

impl AsyncBlocker {
pub fn new() -> Self {
AsyncBlocker {
waiting_sender: Arc::new(Mutex::new((None,))),
}
}

pub async fn block(&self) {
let (sender, mut receiver) = channel(2);

{
let mut guard = self.waiting_sender.lock()
.expect("to gotten lock");

guard.0 = Some(sender);
}

receiver.recv().await
.expect("Failed to perform async block");
}

pub fn unblock_blocking(&self) {
let mut guard = self.waiting_sender
.lock()
.expect("Should have been locked");

if let Some(sender) = guard.0.as_ref() {
if sender.is_closed() {
return;
}

sender.blocking_send(())
.expect("Failed to send");

guard.0 = None;
}
}
}
4 changes: 2 additions & 2 deletions ed-journals/src/modules/blockers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
mod sync_blocker;
mod async_blocker;
pub mod sync_blocker;
pub mod async_blocker;
36 changes: 35 additions & 1 deletion ed-journals/src/modules/blockers/sync_blocker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::Thread;

#[derive(Debug, Clone)]
pub struct SyncBlocker {

waiting_thread: Arc<Mutex<(Option<Thread>,)>>,
}

impl SyncBlocker {
pub fn new() -> Self {
SyncBlocker {
waiting_thread: Arc::new(Mutex::new((None,))),
}
}

pub fn unblock(&self) {
{
let mut guard = self.waiting_thread.lock().expect("to have gotten a lock");

guard.0 = Some(thread::current());
}

thread::park();
}

pub fn block(&self) {
let mut guard = self.waiting_thread
.lock()
.expect("Should have been locked");

if let Some(thread) = guard.0.as_ref() {
thread.unpark();
guard.0 = None;
};
}
}
56 changes: 27 additions & 29 deletions ed-journals/src/modules/logs/async/live_log_dir_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,33 @@ use tokio::sync::mpsc::{channel, Sender};
use crate::logs::{LogDir, LogDirError, LogFile};
use crate::logs::content::LogEvent;
use crate::logs::r#async::LogFileReader;
use crate::modules::blockers::async_blocker::AsyncBlocker;
use crate::modules::logs::r#async::LogFileReaderError;
use crate::modules::logs::LogFileError;

/// The async variant of [super::blocking::LiveLogDirReader]. Watches the whole journal dir and
/// reads all files. Once all historic files have been read the current read will only resolve once
/// the newest log file is changed at which it will read the active log file and return the entry.
///
/// ```no_run
/// use std::path::PathBuf;
/// use ed_journals::logs::r#async::LiveLogDirReader;
///
/// let path = PathBuf::from("somePath");
///
/// let mut live_dir_reader = LiveLogDirReader::open(path)
/// .unwrap();
///
/// // At first this will read all existing lines from the journal logs, after which it will wait
/// // until it detects new entries in the latest log file.
/// while let Some(entry) = live_dir_reader.next().await {
/// // Do something with the entry
/// }
/// ```
#[derive(Debug)]
pub struct LiveLogDirReader {
waiting_sender: Arc<Mutex<(Option<Sender<()>>,)>>,
blocker: AsyncBlocker,
// waiting_sender: Arc<Mutex<(Option<Sender<()>>,)>>,
dir: LogDir,
current_file: Option<LogFile>,
current_reader: Option<LogFileReader>,
Expand Down Expand Up @@ -43,30 +64,17 @@ pub enum LiveLogDirReaderError {

impl LiveLogDirReader {
pub fn open<P: AsRef<Path>>(dir_path: P) -> Result<LiveLogDirReader, LiveLogDirReaderError> {
let waiting_sender = Arc::new(Mutex::new((None::<Sender<()>>,)));
let waiting_sender_local = waiting_sender.clone();
let blocker = AsyncBlocker::new();
let local_blocker = blocker.clone();

let mut watcher = notify::recommended_watcher(move |res| {
let mut guard = waiting_sender_local
.lock()
.expect("Should have been locked");

if let Some(sender) = guard.0.as_ref() {
if sender.is_closed() {
return;
}

sender.blocking_send(())
.expect("Failed to send");

guard.0 = None;
}
local_blocker.unblock_blocking();
})?;

watcher.watch(dir_path.as_ref(), RecursiveMode::NonRecursive)?;

Ok(LiveLogDirReader {
waiting_sender,
blocker,
dir: LogDir::new(dir_path.as_ref().to_path_buf())?,
current_file: None,
current_reader: None,
Expand Down Expand Up @@ -122,17 +130,7 @@ impl LiveLogDirReader {
let reader = self.current_reader.as_mut()?;

let Some(result) = reader.next().await else {
let (sender, mut receiver) = channel(2);

{
let mut guard = self.waiting_sender.lock()
.expect("to gotten lock");

guard.0 = Some(sender);
}

receiver.recv().await?;

self.blocker.block().await;
continue;
};

Expand Down
38 changes: 7 additions & 31 deletions ed-journals/src/modules/logs/async/live_log_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use std::sync::atomic::{AtomicBool, Ordering};
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use thiserror::Error;
use tokio::fs::File;
use tokio::sync::mpsc::{channel, Sender};
use crate::logs::r#async::LogFileReaderError;
use crate::logs::content::LogEvent;
use crate::modules::blockers::async_blocker::AsyncBlocker;
use crate::modules::logs::r#async::LogFileReader;

#[derive(Debug)]
pub struct LiveLogFileReader {
waiting_sender: Arc<Mutex<(Option<Sender<()>>,)>>,
blocker: AsyncBlocker,
journal_file_reader: LogFileReader,
watcher: RecommendedWatcher,
active: Arc<AtomicBool>,
Expand All @@ -34,30 +34,17 @@ impl LiveLogFileReader {

let journal_file_reader = LogFileReader::new(file);

let waiting_sender = Arc::new(Mutex::new((None::<Sender<()>>,)));
let waiting_sender_local = waiting_sender.clone();
let blocker = AsyncBlocker::new();
let local_blocker = blocker.clone();

let mut watcher = notify::recommended_watcher(move |res| {
let mut guard = waiting_sender_local
.lock()
.expect("Should have been locked");

if let Some(sender) = guard.0.as_ref() {
if sender.is_closed() {
return;
}

sender.blocking_send(())
.expect("Failed to send");

guard.0 = None;
}
local_blocker.unblock_blocking();
})?;

watcher.watch(&path, RecursiveMode::NonRecursive)?;

Ok(LiveLogFileReader {
waiting_sender,
blocker,
journal_file_reader,
watcher,
active: Arc::new(AtomicBool::new(true)),
Expand All @@ -72,18 +59,7 @@ impl LiveLogFileReader {

match self.journal_file_reader.next().await {
Some(value) => return Some(value),
None => {
let (sender, mut receiver) = channel(2);

{
let mut guard = self.waiting_sender.lock()
.expect("to gotten lock");

guard.0 = Some(sender);
}

receiver.recv().await?;
}
None => self.blocker.block().await,
}
}
}
Expand Down
30 changes: 10 additions & 20 deletions ed-journals/src/modules/logs/blocking/live_log_dir_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::logs::blocking::LogFileReader;
use crate::logs::content::LogEvent;
use crate::logs::blocking::LogFileReaderError;
use crate::logs::log_file::{LogFile, LogFileError};
use crate::modules::blockers::sync_blocker::SyncBlocker;

/// Watches the whole journal dir and reads all files. Once all historic files have been read it
/// will block the current thread until the newest log file is changed at which it will read the
Expand All @@ -34,7 +35,8 @@ use crate::logs::log_file::{LogFile, LogFileError};
/// ```
#[derive(Debug)]
pub struct LiveLogDirReader {
waiting_thread: Arc<Mutex<(Option<Thread>,)>>,
blocker: SyncBlocker,
// waiting_thread: Arc<Mutex<(Option<Thread>,)>>,
dir: LogDir,
current_file: Option<LogFile>,
current_reader: Option<LogFileReader>,
Expand Down Expand Up @@ -63,24 +65,19 @@ pub enum LiveLogDirReaderError {

impl LiveLogDirReader {
pub fn open(dir_path: PathBuf) -> Result<LiveLogDirReader, LiveLogDirReaderError> {
let waiting_thread = Arc::new(Mutex::new((None::<Thread>,)));
let waiting_thread_local = waiting_thread.clone();
let blocker = SyncBlocker::new();
let local_blocker = blocker.clone();
// let waiting_thread = Arc::new(Mutex::new((None::<Thread>,)));
// let waiting_thread_local = waiting_thread.clone();

let mut watcher = notify::recommended_watcher(move |_| {
let mut guard = waiting_thread_local
.lock()
.expect("Should have been locked");

if let Some(thread) = guard.0.as_ref() {
thread.unpark();
guard.0 = None;
};
local_blocker.unblock();
})?;

watcher.watch(&dir_path, RecursiveMode::NonRecursive)?;

Ok(Self {
waiting_thread,
blocker,
dir: LogDir::new(dir_path)?,
current_file: None,
current_reader: None,
Expand Down Expand Up @@ -160,14 +157,7 @@ impl Iterator for LiveLogDirReader {
let reader = self.current_reader.as_mut()?;

let Some(result) = reader.next() else {
{
let mut guard = self.waiting_thread.lock().expect("to have gotten a lock");

guard.0 = Some(thread::current());
}

thread::park();

self.blocker.block();
continue;
};

Expand Down
Loading

0 comments on commit aecfd9f

Please sign in to comment.