Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
badrogger committed Dec 16, 2024
1 parent 3ff53f8 commit 1b8e970
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 34 deletions.
52 changes: 21 additions & 31 deletions core/schains/firewall/nftables.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,15 @@ def create_table(self) -> None:

def add_schain_drop_rule(self, first_port: int, last_port: int) -> None:
expr = [

Check warning on line 66 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L66

Added line #L66 was not covered by tests
{
"match": {
"left": {
"payload": {
"protocol": "tcp",
"field": "dport"
{
'match': {
'op': '==',
'left': {'payload': {'protocol': 'tcp', 'field': 'dport'}},
'right': {'range': [first_port, last_port]},
}
},
"op": "==",
"right": {'range': [first_port, last_port]}
}
},
{'counter': None},
{"drop": None}
},
{'counter': None},
{'drop': None},
]

if self.expr_to_rule(expr) not in self.get_rules_by_policy(policy='drop'):
Expand Down Expand Up @@ -178,16 +173,16 @@ def add_rule(self, rule: SChainRule) -> None:
raise NFTablesCmdFailedError(f'Failed to add allow rule: {error}')

Check warning on line 173 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L171-L173

Added lines #L171 - L173 were not covered by tests

@classmethod
def rule_to_expr(cls, rule: SChainRule) -> list:
def rule_to_expr(cls, rule: SChainRule, counter: bool = True) -> list:
expr = []

Check warning on line 177 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L177

Added line #L177 was not covered by tests

if rule.first_ip:
if rule.last_ip == rule.first_ip:
expr.append(

Check warning on line 181 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L179-L181

Added lines #L179 - L181 were not covered by tests
{
'match': {
'left': {'payload': {'protocol': 'ip', 'field': 'saddr'}},
'op': '==',
'left': {'payload': {'protocol': 'ip', 'field': 'saddr'}},
'right': f'{rule.first_ip}',
}
}
Expand All @@ -196,8 +191,8 @@ def rule_to_expr(cls, rule: SChainRule) -> list:
expr.append(

Check warning on line 191 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L191

Added line #L191 was not covered by tests
{
'match': {
'left': {'payload': {'protocol': 'ip', 'field': 'saddr'}},
'op': '==',
'left': {'payload': {'protocol': 'ip', 'field': 'saddr'}},
'right': {'range': [f'{rule.first_ip}', f'{rule.last_ip}']},
}
}
Expand All @@ -207,14 +202,17 @@ def rule_to_expr(cls, rule: SChainRule) -> list:
expr.append(

Check warning on line 202 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L201-L202

Added lines #L201 - L202 were not covered by tests
{
'match': {
'left': {'payload': {'protocol': 'tcp', 'field': 'dport'}},
'op': '==',
'left': {'payload': {'protocol': 'tcp', 'field': 'dport'}},
'right': rule.port,
}
}
)

expr.extend([{'counter': None}, {'accept': None}])
if counter:
expr.append({'counter': None})

Check warning on line 213 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L212-L213

Added lines #L212 - L213 were not covered by tests

expr.append({'accept': None})
return expr

Check warning on line 216 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L215-L216

Added lines #L215 - L216 were not covered by tests

@classmethod
Expand All @@ -237,16 +235,9 @@ def expr_to_rule(self, expr: list) -> None:
if any([port, first_ip, last_ip]):
return SChainRule(port=port, first_ip=first_ip, last_ip=last_ip)

Check warning on line 236 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L235-L236

Added lines #L235 - L236 were not covered by tests

@classmethod
def expr_equals(cls, expr_a: list[dict], expr_b: list[dict]) -> bool:
for item_a, item_b in zip(sorted(expr_a), sorted(expr_b)):
if 'counter' not in item_a and item_a != item_b:
return False
return True

def remove_rule(self, rule: SChainRule) -> None:
if self.has_rule(rule):
expr = self.rule_to_expr(rule)
expr = self.rule_to_expr(rule, counter=False)

Check warning on line 240 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L239-L240

Added lines #L239 - L240 were not covered by tests

output = None
rc, output, error = self.run_cmd(f'list chain {self.FAMILY} {self.table} {self.chain}')
Expand All @@ -255,15 +246,14 @@ def remove_rule(self, rule: SChainRule) -> None:

current_rules = json.loads(output)

Check warning on line 247 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L247

Added line #L247 was not covered by tests

logger.info('HERE HERE %s', expr)
logger.info('HERE current rules %s', current_rules)
handle = None
for item in current_rules.get('nftables', []):
if 'rule' in item:
rule_data = item['rule']
logger.info('HERE HERE 2 %s', rule_data['expr'])
logger.info('HERE HERE 3 %s', expr)
if self.expr_equals(rule_data.get('expr'), expr):
rule_expr = list(

Check warning on line 253 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L249-L253

Added lines #L249 - L253 were not covered by tests
filter(lambda statement: 'counter' not in statement, rule_data['expr'])
)
if expr == rule_expr:
handle = rule_data.get('handle')
break

Check warning on line 258 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L256-L258

Added lines #L256 - L258 were not covered by tests

Expand Down
6 changes: 3 additions & 3 deletions tests/firewall/nftables_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ def nf_test_tables():

@pytest.fixture
def filter_table(nf_test_tables):
print(nf_test_tables.cmd('add table inet filter'))
print(nf_test_tables.cmd('add table inet firewall'))


@pytest.fixture
def custom_chain(nf_test_tables, filter_table):
nf_test_tables.cmd('add chain inet filter test-chain')
nf_test_tables.cmd('add chain inet firewall test-chain')
return 'test-chain'


Expand All @@ -35,7 +35,7 @@ def test_nftables_controller(custom_chain):
assert nft_controller.has_rule(rule_a)
assert nft_controller.has_rule(rule_b)
rules = list(nft_controller.rules)
assert rules == sorted([rule_b, rule_a])
assert sorted(rules) == sorted([rule_b, rule_a]), (rules, sorted([rule_b, rule_a]))
nft_controller.remove_rule(rule_a)
assert not nft_controller.has_rule(rule_a)
assert nft_controller.has_rule(rule_b)
Expand Down

0 comments on commit 1b8e970

Please sign in to comment.