Skip to content

Commit 47f88f3

Browse files
committed
Added first steps of hash event handler
1 parent ec87f06 commit 47f88f3

File tree

5 files changed

+174
-28
lines changed

5 files changed

+174
-28
lines changed

src/db.rs

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// Copyright (C) 2024, Achiefs.
2+
13
use crate::appconfig;
24
use crate::utils;
35
use crate::dbfile::*;
@@ -65,7 +67,7 @@ impl DB {
6567
pub fn is_empty(&self) -> bool {
6668
let connection = self.open();
6769
let result = connection.query_row("SELECT * FROM files LIMIT 1", [], |_row| Ok(0));
68-
self.close(connection);
70+
self.close(connection);
6971
match result {
7072
Ok(_v) => false,
7173
Err(e) => {
@@ -76,7 +78,7 @@ impl DB {
7678
true
7779
}
7880
}
79-
}
81+
}
8082
}
8183

8284
// ------------------------------------------------------------------------
@@ -85,11 +87,13 @@ impl DB {
8587
let connection = self.open();
8688
let result = connection.execute(
8789
"CREATE TABLE IF NOT EXISTS files (
88-
id INTEGER PRIMARY KEY,
90+
dbid INTEGER PRIMARY KEY,
91+
id TEXT PRIMARY KEY,
8992
timestamp TEXT NOT NULL,
9093
hash TEXT NOT NULL,
9194
path TEXT NOT NULL UNIQUE,
92-
size INTEGER)",
95+
size INTEGER,
96+
PRIMARY KEY(dbid, id) )",
9397
(),
9498
);
9599
match result {
@@ -122,14 +126,15 @@ impl DB {
122126
"SELECT * FROM files WHERE path = ?1 LIMIT 1",
123127
[path.clone()],
124128
|row| Ok(DBFile {
125-
id: row.get(0).unwrap(),
126-
timestamp: row.get(1).unwrap(),
127-
hash: row.get(2).unwrap(),
128-
path: row.get(3).unwrap(),
129-
size: row.get(4).unwrap()
129+
dbid: row.get(0).unwrap(),
130+
id: row.get(1).unwrap(),
131+
timestamp: row.get(2).unwrap(),
132+
hash: row.get(3).unwrap(),
133+
path: row.get(4).unwrap(),
134+
size: row.get(5).unwrap()
130135
})
131136
);
132-
137+
133138
let data = match result {
134139
Ok(d) => Ok(d),
135140
Err(e) => {
@@ -149,17 +154,18 @@ impl DB {
149154

150155
// ------------------------------------------------------------------------
151156

152-
pub fn get_file_by_id(&self, id: u64) -> DBFile {
157+
pub fn get_file_by_id(&self, dbid: u64) -> DBFile {
153158
let connection = self.open();
154159
let data = connection.query_row(
155-
"SELECT * FROM files WHERE id = ?1 LIMIT 1",
156-
[id],
160+
"SELECT * FROM files WHERE dbid = ?1 LIMIT 1",
161+
[dbid],
157162
|row| Ok(DBFile {
158-
id: row.get(0).unwrap(),
159-
timestamp: row.get(1).unwrap(),
160-
hash: row.get(2).unwrap(),
161-
path: row.get(3).unwrap(),
162-
size: row.get(4).unwrap()
163+
dbid: row.get(0).unwrap(),
164+
id: row.get(1).unwrap(),
165+
timestamp: row.get(2).unwrap(),
166+
hash: row.get(3).unwrap(),
167+
path: row.get(4).unwrap(),
168+
size: row.get(5).unwrap()
163169
})
164170
).unwrap();
165171

@@ -185,8 +191,8 @@ impl DB {
185191
None => String::new()
186192
};
187193

188-
let query = format!("UPDATE files SET {}, {}, {} WHERE id = {}",
189-
timestamp_str, hash_str, size_str, dbfile.id);
194+
let query = format!("UPDATE files SET {}, {}, {} WHERE dbid = {}",
195+
timestamp_str, hash_str, size_str, dbfile.dbid);
190196

191197
let mut statement = connection.prepare(&query).unwrap();
192198
let result = statement.execute([]);
@@ -205,11 +211,12 @@ impl DB {
205211
"SELECT * from files").unwrap();
206212
let files = query.query_map([], |row|{
207213
Ok(DBFile {
208-
id: row.get(0).unwrap(),
209-
timestamp: row.get(1).unwrap(),
210-
hash: row.get(2).unwrap(),
211-
path: row.get(3).unwrap(),
212-
size: row.get(4).unwrap(),
214+
dbid: row.get(0).unwrap(),
215+
id: row.get(1).unwrap(),
216+
timestamp: row.get(2).unwrap(),
217+
hash: row.get(3).unwrap(),
218+
path: row.get(4).unwrap(),
219+
size: row.get(5).unwrap(),
213220
})
214221
}).unwrap();
215222

src/dbfile.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// Copyright (C) 2024, Achiefs.
2+
13
use crate::utils;
24
use crate::hash;
35
use crate::appconfig::*;
@@ -13,7 +15,8 @@ pub struct DBFileError {
1315
}
1416

1517
pub struct DBFile {
16-
pub id: u64,
18+
pub dbid: u64,
19+
pub id: String,
1720
pub timestamp: String,
1821
pub hash: String,
1922
pub path: String,
@@ -64,6 +67,7 @@ impl fmt::Debug for DBFileError {
6467
impl fmt::Debug for DBFile {
6568
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result{
6669
f.debug_tuple("")
70+
.field(&self.dbid)
6771
.field(&self.id)
6872
.field(&self.timestamp)
6973
.field(&self.hash)
@@ -98,7 +102,8 @@ impl DBFile {
98102
};
99103

100104
DBFile {
101-
id: 0,
105+
dbid: 0,
106+
id: utils::get_uuid(),
102107
timestamp: utils::get_current_time_millis(),
103108
hash,
104109
path: String::from(path),
@@ -110,7 +115,8 @@ impl DBFile {
110115

111116
pub fn clone(&self) -> Self {
112117
DBFile {
113-
id: self.id,
118+
dbid: self.dbid,
119+
id: self.id.clone(),
114120
timestamp: self.timestamp.clone(),
115121
hash: self.hash.clone(),
116122
path: self.path.clone(),

src/hashevent.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright (C) 2024, Achiefs.
2+
3+
use crate::appconfig;
4+
use crate::appconfig::*;
5+
use crate::dbfile::*;
6+
7+
use log::*;
8+
use std::fs::OpenOptions;
9+
use serde_json::{json, to_string};
10+
use std::io::Write;
11+
use reqwest::Client;
12+
use std::time::Duration;
13+
14+
pub struct HashEvent {
15+
dbfile: DBFile
16+
}
17+
18+
impl HashEvent {
19+
pub fn new(dbfile: DBFile) -> Self {
20+
HashEvent {
21+
dbfile
22+
}
23+
}
24+
25+
// ------------------------------------------------------------------------
26+
27+
fn log(&self, file: String) {
28+
let mut events_file = OpenOptions::new()
29+
.create(true)
30+
.append(true)
31+
.open(file)
32+
.expect("(hashevent::log) Unable to open events log file.");
33+
34+
match writeln!(events_file, "{}", self.format_json()) {
35+
Ok(_d) => debug!("Hash event log written"),
36+
Err(e) => error!("Hash event could not be written, Err: [{}]", e)
37+
};
38+
}
39+
40+
// ------------------------------------------------------------------------
41+
42+
async fn send(&self, cfg: AppConfig) {
43+
use time::OffsetDateTime;
44+
45+
let event = self.get_json();
46+
let current_date = OffsetDateTime::now_utc();
47+
let index = format!("fim-{}-{}-{}", current_date.year(), current_date.month() as u8, current_date.day() );
48+
49+
// Splunk endpoint integration
50+
if cfg.endpoint_type == "Splunk" {
51+
let data = json!({
52+
"source": cfg.node,
53+
"sourcetype": "_json",
54+
"event": event,
55+
"index": "fim_events"
56+
});
57+
debug!("Sending received event to Splunk integration, event: {}", data);
58+
let request_url = format!("{}/services/collector/event", cfg.endpoint_address);
59+
let client = Client::builder()
60+
.danger_accept_invalid_certs(cfg.insecure)
61+
.timeout(Duration::from_secs(30))
62+
.build().unwrap();
63+
match client
64+
.post(request_url)
65+
.header("Authorization", format!("Splunk {}", cfg.endpoint_token))
66+
.json(&data)
67+
.send()
68+
.await {
69+
Ok(response) => debug!("Response received: {:?}",
70+
response.text().await.unwrap()),
71+
Err(e) => debug!("Error on request: {:?}", e)
72+
}
73+
// Elastic endpoint integration
74+
} else {
75+
let request_url = format!("{}/{}/_doc/{}", cfg.endpoint_address, index, self.dbfile.id);
76+
let client = Client::builder()
77+
.danger_accept_invalid_certs(cfg.insecure)
78+
.timeout(Duration::from_secs(30))
79+
.build().unwrap();
80+
match client
81+
.post(request_url)
82+
.basic_auth(cfg.endpoint_user, Some(cfg.endpoint_pass))
83+
.json(&event)
84+
.send()
85+
.await {
86+
Ok(response) => debug!("Response received: {:?}",
87+
response.text().await.unwrap()),
88+
Err(e) => debug!("Error on request: {:?}", e)
89+
}
90+
}
91+
}
92+
93+
// ------------------------------------------------------------------------
94+
95+
pub async fn process(&self, cfg: AppConfig) {
96+
match cfg.get_events_destination().as_str() {
97+
appconfig::BOTH_MODE => {
98+
self.log(cfg.get_events_file());
99+
self.send(cfg).await;
100+
},
101+
appconfig::NETWORK_MODE => {
102+
self.send(cfg).await;
103+
},
104+
_ => self.log(cfg.get_events_file())
105+
}
106+
}
107+
108+
// ------------------------------------------------------------------------
109+
110+
fn format_json(&self) -> String { to_string(&self.get_json()).unwrap() }
111+
112+
// ------------------------------------------------------------------------
113+
114+
fn get_json(&self) -> serde_json::Value {
115+
json!({
116+
"dbfile.id": self.dbfile.id.clone(),
117+
"dbfile.timestamp": self.dbfile.timestamp.clone(),
118+
"dbfile.hash": self.dbfile.hash.clone(),
119+
"dbfile.path": self.dbfile.path.clone(),
120+
"dbfile.size": self.dbfile.size.clone()
121+
})
122+
}
123+
}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ mod init;
3939
mod db;
4040
mod dbfile;
4141
mod scanner;
42+
mod hashevent;
4243

4344
// ----------------------------------------------------------------------------
4445

src/scanner.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1+
// Copyright (C) 2024, Achiefs.
2+
13
use crate::db;
24
use crate::dbfile::*;
35
use crate::utils;
46
use crate::appconfig::AppConfig;
7+
use crate::hashevent::HashEvent;
58

69
use walkdir::WalkDir;
710
use log::*;
811

12+
// Temporal
13+
use tokio::runtime::Runtime;
14+
915
pub fn scan_path(cfg: AppConfig, root: String) {
1016
let db = db::DB::new();
1117
for res in WalkDir::new(root) {
@@ -42,6 +48,9 @@ pub fn check_changes(cfg: AppConfig, root: String) {
4248
Some(utils::get_current_time_millis()),
4349
Some(hash),
4450
Some(metadata.len()));
51+
let event = HashEvent::new(dbfile);
52+
let rt = Runtime::new().unwrap();
53+
rt.block_on(event.process(cfg.clone()));
4554
// Trigger new event
4655
}
4756
},

0 commit comments

Comments
 (0)