Skip to content

Commit

Permalink
Feat: Add Elastic Security rules and Kibana saved object support for …
Browse files Browse the repository at this point in the history
…ES|QL (#67)

* feat(esql): add basic siem_rule template

* reoder rules fields, fix index

* add kibana_ndjson

* fix format doc

* cleanup

* fix unit tests

* add unit tests

* add esql_connect tests
  • Loading branch information
m4dh4t authored Aug 26, 2024
1 parent 75c1513 commit 0504df3
Show file tree
Hide file tree
Showing 3 changed files with 1,324 additions and 8 deletions.
295 changes: 287 additions & 8 deletions sigma/backends/elasticsearch/elasticsearch_esql.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from sigma.conversion.state import ConversionState
from sigma.rule import SigmaRule
from sigma.rule import SigmaRule, SigmaRuleTag
from sigma.correlations import SigmaCorrelationRule
from sigma.conversion.base import TextQueryBackend
from sigma.conditions import ConditionItem, ConditionAND, ConditionOR, ConditionNOT
from sigma.types import SigmaCompareExpression, SigmaRegularExpression, SigmaRegularExpressionFlag
from sigma.types import SigmaCompareExpression
from sigma.data.mitre_attack import mitre_attack_tactics, mitre_attack_techniques
import sigma
import re
from typing import ClassVar, Dict, Tuple, Pattern, List, Any
import json
from typing import ClassVar, Dict, Tuple, Pattern, List, Iterable, Optional

class ESQLBackend(TextQueryBackend):
"""ES|QL backend."""
Expand All @@ -19,13 +21,12 @@ class ESQLBackend(TextQueryBackend):
name : ClassVar[str] = "ES|QL backend"
formats : Dict[str, str] = {
"default": "Plain ES|QL queries",

"kibana_ndjson": "Kibana ES|QL queries in NDJSON Format.",
"siem_rule": "Elastic Security ES|QL queries as SIEM Rules in JSON Format.",
"siem_rule_ndjson": "Elastic Security ES|QL queries as SIEM Rules in NDJSON Format."
}
requires_pipeline : bool = True

query_expression : ClassVar[str] = "from {state[index]} | where {query}"
state_defaults : ClassVar[Dict[str, str]] = { "index": "*" }

precedence : ClassVar[Tuple[ConditionItem, ConditionItem, ConditionItem]] = (ConditionNOT, ConditionAND, ConditionOR)
group_expression : ClassVar[str] = "({expr})" # Expression for precedence override grouping as format string with {expr} placeholder

Expand Down Expand Up @@ -166,6 +167,27 @@ class ESQLBackend(TextQueryBackend):
"stats": "| where event_type_count {op} {count}"
}

def __init__(
self,
processing_pipeline: Optional[
"sigma.processing.pipeline.ProcessingPipeline"
] = None,
collect_errors: bool = False,
schedule_interval: int = 5,
schedule_interval_unit: str = "m",
**kwargs,
):
super().__init__(processing_pipeline, collect_errors, **kwargs)
self.schedule_interval = schedule_interval
self.schedule_interval_unit = schedule_interval_unit
self.severity_risk_mapping = {
"INFORMATIONAL": 1,
"LOW": 21,
"MEDIUM": 47,
"HIGH": 73,
"CRITICAL": 99,
}

def convert_correlation_search(
self,
rule: SigmaCorrelationRule,
Expand All @@ -176,6 +198,10 @@ def convert_correlation_search(
for rule_reference in rule.rules
for state in rule_reference.rule.get_conversion_states()
]

# Deduplicate sources using via set
sources = list(set(sources))

if "*" in sources:
return super().convert_correlation_search(rule, sources="*", **kwargs)
else:
Expand All @@ -186,4 +212,257 @@ def convert_correlation_search_multi_rule_query_postprocess(self, query: str) ->

def convert_correlation_typing_query_postprocess(self, query: str) -> str:
return self.convert_correlation_search_multi_rule_query_postprocess(query)
### Correlation end ###

### Correlation end ###

def finalize_query_default(
self, rule: SigmaRule, query: str, index: int, state: ConversionState
) -> str:
return f"from {state.processing_state.get('index', '*')} | where {query}"

def finalize_query_kibana_ndjson(
self, rule: SigmaRule, query: str, index: int, state: ConversionState
) -> Dict:
# TODO: implement the per-query output for the output format kibana here. Usually, the
# generated query is embedded into a template, e.g. a JSON format with additional
# information from the Sigma rule.
index = state.processing_state.get("index", "*")
return {
"attributes": {
"columns": [],
"description": rule.description if rule.description is not None else "No description",
"grid": {},
"hideChart": False,
"isTextBasedQuery": True,
"kibanaSavedObjectMeta": {
"searchSourceJSON": str(
json.dumps(
{
"query": {
"esql": f"from {index} | where {query}"
},
"index": {
"title": index,
"timeFieldName": "@timestamp",
"sourceFilters": [],
"type": "esql",
"fieldFormats": {},
"runtimeFieldMap": {},
"allowNoIndex": False,
"name": index,
"allowHidden": False,
},
"filter": [],
}
)
),
},
"sort": [["@timestamp", "desc"]],
"timeRestore": False,
"title": f"SIGMA - {rule.title}",
"usesAdHocDataView": False
},
"id": str(rule.id),
"managed": False,
"references": [],
"type": "search",
"typeMigrationVersion": "10.2.0",
}

def finalize_output_kibana_ndjson(self, queries: List[Dict]) -> List[List[Dict]]:
# TODO: implement the output finalization for all generated queries for the format kibana
# here. Usually, the single generated queries are embedded into a structure, e.g. some
# JSON or XML that can be imported into the SIEM.
return list(queries)

def finalize_output_threat_model(self, tags: List[SigmaRuleTag]) -> Iterable[Dict]:
attack_tags = [t for t in tags if t.namespace == "attack"]
if not len(attack_tags) >= 2:
return []

techniques = [
tag.name.upper() for tag in attack_tags if re.match(r"[tT]\d{4}", tag.name)
]
tactics = [
tag.name.lower()
for tag in attack_tags
if not re.match(r"[tT]\d{4}", tag.name)
]

for tactic, technique in zip(tactics, techniques):
if (
not tactic or not technique
): # Only add threat if tactic and technique is known
continue

try:
if "." in technique: # Contains reference to Mitre Att&ck subtechnique
sub_technique = technique
technique = technique[0:5]
sub_technique_name = mitre_attack_techniques[sub_technique]

sub_techniques = [
{
"id": sub_technique,
"reference": f"https://attack.mitre.org/techniques/{sub_technique.replace('.', '/')}",
"name": sub_technique_name,
}
]
else:
sub_techniques = []

tactic_id = [
id
for (id, name) in mitre_attack_tactics.items()
if name == tactic.replace("_", "-")
][0]
technique_name = mitre_attack_techniques[technique]
except (IndexError, KeyError):
# Occurs when Sigma Mitre Att&ck list is out of date
continue

yield {
"tactic": {
"id": tactic_id,
"reference": f"https://attack.mitre.org/tactics/{tactic_id}",
"name": tactic.title().replace("_", " "),
},
"framework": "MITRE ATT&CK",
"technique": [
{
"id": technique,
"reference": f"https://attack.mitre.org/techniques/{technique}",
"name": technique_name,
"subtechnique": sub_techniques,
}
],
}

for tag in attack_tags:
tags.remove(tag)

def finalize_query_siem_rule(
self, rule: SigmaRule, query: str, index: int, state: ConversionState
) -> Dict:
"""
Create SIEM Rules in JSON Format. These rules could be imported into Kibana using the
Create Rule API https://www.elastic.co/guide/en/kibana/current/create-rule-api.html
This API (and generated data) is NOT the same like importing Detection Rules via:
Kibana -> Security -> Alerts -> Manage Rules -> Import
If you want to have a nice importable NDJSON File for the Security Rule importer
use pySigma Format 'siem_rule_ndjson' instead.
"""

return {
"name": f"SIGMA - {rule.title}",
"tags": [f"{n.namespace}-{n.name}" for n in rule.tags],
"enabled": True,
"consumer": "siem",
"throttle": None,
"schedule": {
"interval": f"{self.schedule_interval}{self.schedule_interval_unit}"
},
"params": {
"author": [rule.author] if rule.author is not None else [],
"description": (
rule.description
if rule.description is not None
else "No description"
),
"ruleId": str(rule.id),
"falsePositives": rule.falsepositives,
"from": f"now-{self.schedule_interval}{self.schedule_interval_unit}",
"immutable": False,
"license": "DRL",
"outputIndex": "",
"meta": {
"from": "1m",
},
"maxSignals": 100,
"relatedIntegrations": [],
"requiredFields": [],
"riskScore": (
self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21
),
"riskScoreMapping": [],
"setup": "",
"severity": (
str(rule.level.name).lower() if rule.level is not None else "low"
),
"severityMapping": [],
"threat": list(self.finalize_output_threat_model(rule.tags)),
"to": "now",
"references": rule.references,
"version": 1,
"exceptionsList": [],
"type": "esql",
"language": "esql",
"query": f"from {state.processing_state.get('index', '*')} [metadata _id, _index, _version] | where {query}",
},
"rule_type_id": "siem.esqlRule",
"notify_when": "onActiveAlert",
"actions": [],
}

def finalize_output_siem_rule(self, queries: List[Dict]) -> List[List[Dict]]:
return list(queries)

def finalize_query_siem_rule_ndjson(
self, rule: SigmaRule, query: str, index: int, state: ConversionState
) -> Dict:
"""
Generating SIEM/Detection Rules in NDJSON Format. Compatible with
https://www.elastic.co/guide/en/security/current/rules-ui-management.html#import-export-rules-ui
"""

return {
"id": str(rule.id),
"name": f"SIGMA - {rule.title}",
"tags": [f"{n.namespace}-{n.name}" for n in rule.tags],
"interval": f"{self.schedule_interval}{self.schedule_interval_unit}",
"enabled": True,
"description": (
rule.description if rule.description is not None else "No description"
),
"risk_score": (
self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21
),
"severity": (
str(rule.level.name).lower() if rule.level is not None else "low"
),
"note": "",
"license": "DRL",
"output_index": "",
"meta": {
"from": "1m",
},
"investigation_fields": {},
"author": [rule.author] if rule.author is not None else [],
"false_positives": rule.falsepositives,
"from": f"now-{self.schedule_interval}{self.schedule_interval_unit}",
"rule_id": str(rule.id),
"max_signals": 100,
"risk_score_mapping": [],
"severity_mapping": [],
"threat": list(self.finalize_output_threat_model(rule.tags)),
"to": "now",
"references": rule.references,
"version": 1,
"exceptions_list": [],
"immutable": False,
"related_integrations": [],
"required_fields": [],
"setup": "",
"type": "esql",
"language": "esql",
"query": f"from {state.processing_state.get('index', '*')} [metadata _id, _index, _version] | where {query}",
"actions": [],
}

def finalize_output_siem_rule_ndjson(self, queries: List[Dict]) -> List[List[Dict]]:
return list(queries)
Loading

0 comments on commit 0504df3

Please sign in to comment.