Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch audit logs and ensure they are not lost #1080

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
146 changes: 16 additions & 130 deletions src/audit.rs → src/audit/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,136 +16,18 @@
*
*/

use std::{
collections::HashMap,
fmt::{Debug, Display},
};

use crate::{about::current, storage::StorageMetadata, HTTP_CLIENT};
use std::fmt::Display;

use super::option::CONFIG;
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use serde::Serialize;
use serde_json::{json, Value};
use chrono::Utc;
use tracing::error;

use ulid::Ulid;
use url::Url;

static AUDIT_LOGGER: Lazy<Option<AuditLogger>> = Lazy::new(AuditLogger::new);

// AuditLogger handles sending audit logs to a remote logging system
pub struct AuditLogger {
log_endpoint: Url,
}

impl AuditLogger {
/// Create an audit logger that can be used to capture and push
/// audit logs to the appropriate logging system over HTTP
pub fn new() -> Option<AuditLogger> {
// Try to construct the log endpoint URL by joining the base URL
// with the ingest path, This can fail if the URL is not valid,
// when the base URL is not set or the ingest path is not valid
let log_endpoint = match CONFIG
.parseable
.audit_logger
.as_ref()?
.join("/api/v1/ingest")
{
Ok(url) => url,
Err(err) => {
eprintln!("Couldn't setup audit logger: {err}");
return None;
}
};

Some(AuditLogger { log_endpoint })
}

// Sends the audit log to the configured endpoint with proper authentication
async fn send_log(&self, json: Value) {
let mut req = HTTP_CLIENT
.post(self.log_endpoint.as_str())
.json(&json)
.header("x-p-stream", "audit_log");

// Use basic auth if credentials are configured
if let Some(username) = CONFIG.parseable.audit_username.as_ref() {
req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref())
}

match req.send().await {
Ok(r) => {
if let Err(e) = r.error_for_status() {
error!("{e}")
}
}
Err(e) => error!("Failed to send audit event: {}", e),
}
}
}

// Represents the version of the audit log format
#[non_exhaustive]
#[repr(u8)]
#[derive(Debug, Clone, Copy, Serialize, Default)]
pub enum AuditLogVersion {
// NOTE: default should be latest version
#[default]
V1 = 1,
}

#[derive(Serialize, Default)]
pub struct AuditDetails {
pub version: AuditLogVersion,
pub id: Ulid,
pub generated_at: DateTime<Utc>,
}

#[derive(Serialize, Default)]
pub struct ServerDetails {
pub version: String,
pub deployment_id: Ulid,
}

// Contains information about the actor (user) who performed the action
#[derive(Serialize, Default)]
pub struct ActorDetails {
pub remote_host: String,
pub user_agent: String,
pub username: String,
pub authorization_method: String,
}
use crate::{about::current, storage::StorageMetadata};

// Contains details about the HTTP request that was made
#[derive(Serialize, Default)]
pub struct RequestDetails {
pub stream: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub method: String,
pub path: String,
pub protocol: String,
pub headers: HashMap<String, String>,
}

/// Contains information about the response sent back to the client
#[derive(Default, Serialize)]
pub struct ResponseDetails {
pub status_code: u16,
pub error: Option<String>,
}

/// The main audit log structure that combines all audit information
#[derive(Serialize)]
pub struct AuditLog {
pub audit: AuditDetails,
pub parseable_server: ServerDetails,
pub actor: ActorDetails,
pub request: RequestDetails,
pub response: ResponseDetails,
}
use super::{
ActorDetails, AuditDetails, AuditLog, AuditLogVersion, RequestDetails, ResponseDetails,
ServerDetails, AUDIT_LOG_TX,
};

/// Builder pattern implementation for constructing audit logs
pub struct AuditLogBuilder {
Expand All @@ -157,7 +39,7 @@ pub struct AuditLogBuilder {
impl Default for AuditLogBuilder {
fn default() -> Self {
AuditLogBuilder {
enabled: AUDIT_LOGGER.is_some(),
enabled: AUDIT_LOG_TX.get().is_some(),
inner: AuditLog {
audit: AuditDetails {
version: AuditLogVersion::V1,
Expand Down Expand Up @@ -288,10 +170,14 @@ impl AuditLogBuilder {
audit_log.audit.generated_at = now;
audit_log.request.end_time = now;

AUDIT_LOGGER
.as_ref()
.unwrap()
.send_log(json!(audit_log))
// NOTE: we are fine with blocking here as user expects audit logs to be sent at all costs
if let Err(e) = AUDIT_LOG_TX
.get()
.expect("Audit logger not initialized")
.send(audit_log)
.await
{
error!("Couldn't send to logger: {e}")
}
}
}
Loading
Loading