Skip to content

Commit

Permalink
logger: Add subscriber logic
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick José Pereira <[email protected]>
  • Loading branch information
patrickelectric committed Nov 10, 2024
1 parent ae04ece commit 6d57cb3
Showing 1 changed file with 115 additions and 2 deletions.
117 changes: 115 additions & 2 deletions src/lib/logger/manager.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,87 @@
use std::{
io::{self, Write},
sync::{Arc, Mutex},
};

use ringbuffer::{AllocRingBuffer, RingBuffer};
use tokio::sync::broadcast::{Receiver, Sender};
use tracing::{metadata::LevelFilter, *};
use tracing_log::LogTracer;
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer};
use tracing_subscriber::{
fmt::{self, MakeWriter},
layer::SubscriberExt,
EnvFilter, Layer,
};

use crate::cli;

struct BroadcastWriter {
sender: Sender<String>,
}

impl Write for BroadcastWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let message = String::from_utf8_lossy(buf).to_string();
let _ = self.sender.send(message);
Ok(buf.len())
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

struct BroadcastMakeWriter {
sender: Sender<String>,
}

impl<'a> MakeWriter<'a> for BroadcastMakeWriter {
type Writer = BroadcastWriter;

fn make_writer(&'a self) -> Self::Writer {
BroadcastWriter {
sender: self.sender.clone(),
}
}
}

#[derive(Debug, Default)]
pub struct Manager {
pub process: Option<tokio::task::JoinHandle<()>>,
}

pub struct History {
pub history: AllocRingBuffer<String>,
pub sender: Sender<String>,
}

impl Default for History {
fn default() -> Self {
let (sender, _receiver) = tokio::sync::broadcast::channel(100);
Self {
history: AllocRingBuffer::new(10 * 1024),
sender,
}
}
}

impl History {
pub fn push(&mut self, message: String) {
self.history.push(message.clone());
let _ = self.sender.send(message);
}

pub fn subscribe(&self) -> (Receiver<String>, Vec<String>) {
let reader = self.sender.subscribe();
(reader, self.history.to_vec())
}
}

lazy_static! {
static ref MANAGER: Arc<Mutex<Manager>> = Default::default();
pub static ref HISTORY: Arc<Mutex<History>> = Default::default();
}

// Start logger, should be done inside main
pub fn init() {
// Redirect all logs from libs using "Log"
Expand Down Expand Up @@ -56,21 +134,56 @@ pub fn init() {
.with_thread_names(true)
.with_filter(file_env_filter);

// Configure the server log
let server_env_filter = if cli::manager::is_tracing() {
EnvFilter::new(LevelFilter::TRACE.to_string())
} else {
EnvFilter::new(LevelFilter::DEBUG.to_string())
};
let (tx, mut rx) = tokio::sync::broadcast::channel(100);
let server_layer = fmt::Layer::new()
.with_writer(BroadcastMakeWriter { sender: tx.clone() })
.with_ansi(false)
.with_file(true)
.with_line_number(true)
.with_span_events(fmt::format::FmtSpan::NONE)
.with_target(false)
.with_thread_ids(true)
.with_thread_names(true)
.with_filter(server_env_filter);

let history = HISTORY.clone();
MANAGER.lock().unwrap().process = Some(tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(message) => {
history.lock().unwrap().push(message);
}
Err(error) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
warn!("Error receiving message: {:?}", error);
}
}
}
}));

// Configure the default subscriber
match cli::manager::is_tracy() {
true => {
let tracy_layer = tracing_tracy::TracyLayer::default();
let subscriber = tracing_subscriber::registry()
.with(console_layer)
.with(file_layer)
.with(server_layer)
.with(tracy_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("Unable to set a global subscriber");
}
false => {
let subscriber = tracing_subscriber::registry()
.with(console_layer)
.with(file_layer);
.with(file_layer)
.with(server_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("Unable to set a global subscriber");
}
Expand Down

0 comments on commit 6d57cb3

Please sign in to comment.