|
11 | 11 |
|
12 | 12 | import pika |
13 | 13 | import yaml |
| 14 | +from thefuzz import fuzz |
14 | 15 |
|
15 | 16 | from pyobas import OpenBAS, utils |
16 | 17 |
|
@@ -416,3 +417,51 @@ def listen(self, message_callback: Callable[[Dict], None]) -> None: |
416 | 417 | self.config, self.injector_config, self.injector_logger, message_callback |
417 | 418 | ) |
418 | 419 | self.listen_queue.start() |
| 420 | + |
| 421 | + |
| 422 | +class OpenBASDetectionHelper: |
| 423 | + def __init__(self, logger, relevant_signatures_types) -> None: |
| 424 | + self.logger = logger |
| 425 | + self.relevant_signatures_types = relevant_signatures_types |
| 426 | + |
| 427 | + def match_alert_element_fuzzy(self, signature_value, alert_values, fuzzy_scoring): |
| 428 | + for alert_value in alert_values: |
| 429 | + self.logger.info( |
| 430 | + "Comparing alert value (" + alert_value + ", " + signature_value + ")" |
| 431 | + ) |
| 432 | + ratio = fuzz.ratio(alert_value, signature_value) |
| 433 | + if ratio > fuzzy_scoring: |
| 434 | + self.logger.info("MATCHING! (score: " + str(ratio) + ")") |
| 435 | + return True |
| 436 | + return False |
| 437 | + |
| 438 | + def match_alert_elements(self, signatures, alert_data): |
| 439 | + # Example for alert_data |
| 440 | + # {"process_name": {"list": ["xx", "yy"], "fuzzy": 90}} |
| 441 | + relevant_signatures = [ |
| 442 | + s for s in signatures if s["type"] in self.relevant_signatures_types |
| 443 | + ] |
| 444 | + |
| 445 | + # Matching logics |
| 446 | + signatures_number = len(relevant_signatures) |
| 447 | + matching_number = 0 |
| 448 | + for signature in relevant_signatures: |
| 449 | + alert_data_for_signature = alert_data[signature["type"]] |
| 450 | + signature_result = False |
| 451 | + if alert_data_for_signature["type"] == "fuzzy": |
| 452 | + signature_result = self.match_alert_element_fuzzy( |
| 453 | + signature["value"], |
| 454 | + alert_data_for_signature["data"], |
| 455 | + alert_data_for_signature["score"], |
| 456 | + ) |
| 457 | + elif alert_data_for_signature["type"] == "simple": |
| 458 | + signature_result = signature["value"] in str( |
| 459 | + alert_data_for_signature["data"] |
| 460 | + ) |
| 461 | + |
| 462 | + if signature_result: |
| 463 | + matching_number = matching_number + 1 |
| 464 | + |
| 465 | + if signatures_number == matching_number: |
| 466 | + return True |
| 467 | + return False |
0 commit comments