Skip to content

Commit

Permalink
Fix: Handling on windash and base64offset modifier
Browse files Browse the repository at this point in the history
  • Loading branch information
andurin committed Jun 19, 2024
1 parent cebb793 commit 24fe2c5
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 49 deletions.
73 changes: 49 additions & 24 deletions sigma/backends/elasticsearch/elasticsearch_eql.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ class EqlBackend(TextQueryBackend):
# Characters quoted in addition to wildcards and string quote
# add_escaped: ClassVar[str] = '+-=&|!(){}[]<>^"~*?:\\/ '
add_escaped: ClassVar[str] = '\n\r\t\\"'
bool_values: ClassVar[
Dict[bool, str]
] = { # Values to which boolean values are mapped.
True: "true",
False: "false",
}
bool_values: ClassVar[Dict[bool, str]] = (
{ # Values to which boolean values are mapped.
True: "true",
False: "false",
}
)

# Regular expressions
# Regular expression query as format string with placeholders {field} and {regex}
Expand Down Expand Up @@ -203,6 +203,25 @@ def is_ip(self, value: ConditionFieldEqualsValueExpression) -> bool:
except ValueError:
return False

def convert_condition_field_eq_expansion(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Any:
"""
Convert each value of the expansion with the field from the containing condition and OR-link
all converted subconditions.
"""
or_cond = ConditionOR(
[
ConditionFieldEqualsValueExpression(cond.field, value)
for value in cond.value.values
],
cond.source,
)
if self.decide_convert_condition_as_in_expression(or_cond, state):
return self.convert_condition_as_in_expression(or_cond, state)
else:
return self.convert_condition_or(cond, state)

def convert_condition_field_eq_val_str(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Union[str, DeferredQueryExpression]: # pragma: no cover
Expand Down Expand Up @@ -371,9 +390,11 @@ def finalize_query_siem_rule(
},
"params": {
"author": [rule.author] if rule.author is not None else [],
"description": rule.description
if rule.description is not None
else "No description",
"description": (
rule.description
if rule.description is not None
else "No description"
),
"ruleId": str(rule.id),
"falsePositives": rule.falsepositives,
"from": f"now-{self.schedule_interval}{self.schedule_interval_unit}",
Expand All @@ -384,13 +405,15 @@ def finalize_query_siem_rule(
"from": "1m",
},
"maxSignals": 100,
"riskScore": self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21,
"riskScore": (
self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21
),
"riskScoreMapping": [],
"severity": str(rule.level.name).lower()
if rule.level is not None
else "low",
"severity": (
str(rule.level.name).lower() if rule.level is not None else "low"
),
"severityMapping": [],
"threat": list(self.finalize_output_threat_model(rule.tags)),
"to": "now",
Expand Down Expand Up @@ -432,9 +455,9 @@ def finalize_query_siem_rule_ndjson(
"throttle": "no_actions",
"interval": f"{self.schedule_interval}{self.schedule_interval_unit}",
"author": [rule.author] if rule.author is not None else [],
"description": rule.description
if rule.description is not None
else "No description",
"description": (
rule.description if rule.description is not None else "No description"
),
"rule_id": str(rule.id),
"false_positives": rule.falsepositives,
"from": f"now-{self.schedule_interval}{self.schedule_interval_unit}",
Expand All @@ -445,13 +468,15 @@ def finalize_query_siem_rule_ndjson(
"from": "1m",
},
"max_signals": 100,
"risk_score": self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21,
"risk_score": (
self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21
),
"risk_score_mapping": [],
"severity": str(rule.level.name).lower()
if rule.level is not None
else "low",
"severity": (
str(rule.level.name).lower() if rule.level is not None else "low"
),
"severity_mapping": [],
"threat": list(self.finalize_output_threat_model(rule.tags)),
"tags": [f"{n.namespace}-{n.name}" for n in rule.tags],
Expand Down
75 changes: 50 additions & 25 deletions sigma/backends/elasticsearch/elasticsearch_lucene.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
import json
from typing import Iterable, ClassVar, Dict, List, Optional, Pattern, Tuple, Union
from typing import Iterable, ClassVar, Dict, List, Optional, Pattern, Tuple, Union, Any

from sigma.conversion.state import ConversionState
from sigma.rule import SigmaRule, SigmaRuleTag
Expand Down Expand Up @@ -81,12 +81,12 @@ class LuceneBackend(TextQueryBackend):
wildcard_single: ClassVar[str] = "?"
# Characters quoted in addition to wildcards and string quote
add_escaped: ClassVar[str] = '+-=&|!(){}[]<>^"~*?:\\/ '
bool_values: ClassVar[
Dict[bool, str]
] = { # Values to which boolean values are mapped.
True: "true",
False: "false",
}
bool_values: ClassVar[Dict[bool, str]] = (
{ # Values to which boolean values are mapped.
True: "true",
False: "false",
}
)

# Regular expressions
# Regular expression query as format string with placeholders {field} and {regex}
Expand Down Expand Up @@ -210,6 +210,25 @@ def convert_condition_field_eq_val_cidr(
else:
return super().convert_condition_field_eq_val_cidr(cond, state)

def convert_condition_field_eq_expansion(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Any:
"""
Convert each value of the expansion with the field from the containing condition and OR-link
all converted subconditions.
"""
or_cond = ConditionOR(
[
ConditionFieldEqualsValueExpression(cond.field, value)
for value in cond.value.values
],
cond.source,
)
if self.decide_convert_condition_as_in_expression(or_cond, state):
return self.convert_condition_as_in_expression(or_cond, state)
else:
return self.convert_condition_or(cond, state)

def compare_precedence(self, outer: ConditionItem, inner: ConditionItem) -> bool:
"""Override precedence check for null field conditions."""
if isinstance(inner, ConditionNOT) and LuceneBackend._is_field_null_condition(
Expand Down Expand Up @@ -386,9 +405,11 @@ def finalize_query_siem_rule(
},
"params": {
"author": [rule.author] if rule.author is not None else [],
"description": rule.description
if rule.description is not None
else "No description",
"description": (
rule.description
if rule.description is not None
else "No description"
),
"ruleId": str(rule.id),
"falsePositives": rule.falsepositives,
"from": f"now-{self.schedule_interval}{self.schedule_interval_unit}",
Expand All @@ -399,13 +420,15 @@ def finalize_query_siem_rule(
"from": "1m",
},
"maxSignals": 100,
"riskScore": self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21,
"riskScore": (
self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21
),
"riskScoreMapping": [],
"severity": str(rule.level.name).lower()
if rule.level is not None
else "low",
"severity": (
str(rule.level.name).lower() if rule.level is not None else "low"
),
"severityMapping": [],
"threat": list(self.finalize_output_threat_model(rule.tags)),
"to": "now",
Expand Down Expand Up @@ -447,9 +470,9 @@ def finalize_query_siem_rule_ndjson(
"throttle": "no_actions",
"interval": f"{self.schedule_interval}{self.schedule_interval_unit}",
"author": [rule.author] if rule.author is not None else [],
"description": rule.description
if rule.description is not None
else "No description",
"description": (
rule.description if rule.description is not None else "No description"
),
"rule_id": str(rule.id),
"false_positives": rule.falsepositives,
"from": f"now-{self.schedule_interval}{self.schedule_interval_unit}",
Expand All @@ -460,13 +483,15 @@ def finalize_query_siem_rule_ndjson(
"from": "1m",
},
"max_signals": 100,
"risk_score": self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21,
"risk_score": (
self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21
),
"risk_score_mapping": [],
"severity": str(rule.level.name).lower()
if rule.level is not None
else "low",
"severity": (
str(rule.level.name).lower() if rule.level is not None else "low"
),
"severity_mapping": [],
"threat": list(self.finalize_output_threat_model(rule.tags)),
"tags": [f"{n.namespace}-{n.name}" for n in rule.tags],
Expand Down
44 changes: 44 additions & 0 deletions tests/test_backend_elasticsearch_eql.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,50 @@ def test_eql_angle_brackets(eql_backend: EqlBackend):
]


def test_elasticsearch_eql_windash(eql_backend: EqlBackend):
assert (
eql_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldname|windash:
- "-param-name"
condition: sel
"""
)
)
== ['any where fieldname like~ ("-param-name", "/param-name")']
)


def test_elasticsearch_eql_windash_contains(eql_backend: EqlBackend):
assert (
eql_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldname|windash|contains:
- "-param-name"
condition: sel
"""
)
)
== ['any where fieldname like~ ("*-param-name*", "*/param-name*")']
)


def test_elasticsearch_eqlapi(eql_backend: EqlBackend):
"""Test for NDJSON output with embedded query string query."""
rule = SigmaCollection.from_yaml(
Expand Down
46 changes: 46 additions & 0 deletions tests/test_backend_elasticsearch_lucene.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,52 @@ def test_lucene_angle_brackets(lucene_backend: LuceneBackend):
]


def test_lucene_windash(lucene_backend: LuceneBackend):
"""Test for DSL output with < or > in the values"""
assert (
lucene_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldname|windash:
- "-param-name"
condition: sel
"""
)
)
== ["fieldname:(\\-param\\-name OR \\/param\\-name)"]
)


def test_lucene_windash_contains(lucene_backend: LuceneBackend):
"""Test for DSL output with < or > in the values"""
assert (
lucene_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldname|windash|contains:
- " -param-name "
condition: sel
"""
)
)
== ["fieldname:(*\\ \\-param\\-name\\ * OR *\\ \\/param\\-name\\ *)"]
)


def test_elasticsearch_ndjson_lucene(lucene_backend: LuceneBackend):
"""Test for NDJSON output with embedded query string query."""
rule = SigmaCollection.from_yaml(
Expand Down

0 comments on commit 24fe2c5

Please sign in to comment.