Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blocking XPASS #2391

Merged
merged 5 commits into from
Apr 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 53 additions & 16 deletions tests/appsec/test_blocking_addresses.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,21 @@ class Test_Blocking_request_method:
"""Test if blocking is supported on server.request.method address"""

def setup_blocking(self):
self.rm_req_block = weblog.request("OPTIONS")
if not hasattr(self, "rm_req_block") or self.rm_req_block is None:
self.rm_req_block = weblog.request("OPTIONS")

def test_blocking(self):
"""Test if requests that should be blocked are blocked"""
assert self.rm_req_block.status_code == 403
interfaces.library.assert_waf_attack(self.rm_req_block, rule="tst-037-006")

def setup_non_blocking(self):
self.setup_blocking()
self.rm_req_nonblock = weblog.request("GET")

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
assert self.rm_req_nonblock.status_code == 200

def setup_blocking_before(self):
Expand Down Expand Up @@ -272,9 +275,11 @@ class Test_Blocking_request_uri:
"""Test if blocking is supported on server.request.uri.raw address"""

def setup_blocking(self):
self.rm_req_block1 = self.ruri_req = weblog.get("/waf/foo.git")
if not hasattr(self, "rm_req_block1") or self.rm_req_block1 is None:
self.rm_req_block1 = self.ruri_req = weblog.get("/waf/foo.git")
# query parameters are part of uri
self.rm_req_block2 = weblog.get("/waf?foo=.git")
if not hasattr(self, "rm_req_block2") or self.rm_req_block2 is None:
self.rm_req_block2 = weblog.get("/waf?foo=.git")

def test_blocking(self):
"""Test if requests that should be blocked are blocked"""
Expand All @@ -283,10 +288,12 @@ def test_blocking(self):
interfaces.library.assert_waf_attack(response, rule="tst-037-002")

def setup_non_blocking(self):
self.setup_blocking()
self.rm_req_nonblock1 = weblog.get("/waf/legit")

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
assert self.rm_req_nonblock1.status_code == 200

def setup_blocking_uri_raw(self):
Expand Down Expand Up @@ -321,8 +328,10 @@ class Test_Blocking_request_path_params:
"""Test if blocking is supported on server.request.path_params address"""

def setup_blocking(self):
self.rm_req_block1 = weblog.get("/params/AiKfOeRcvG45")
self.rm_req_block2 = weblog.get("/waf/AiKfOeRcvG45")
if not hasattr(self, "rm_req_block1") or self.rm_req_block1 is None:
self.rm_req_block1 = weblog.get("/params/AiKfOeRcvG45")
if not hasattr(self, "rm_req_block2") or self.rm_req_block2 is None:
self.rm_req_block2 = weblog.get("/waf/AiKfOeRcvG45")

def test_blocking(self):
"""Test if requests that should be blocked are blocked"""
Expand All @@ -331,11 +340,13 @@ def test_blocking(self):
interfaces.library.assert_waf_attack(response, rule="tst-037-007")

def setup_non_blocking(self):
self.setup_blocking()
# query parameters are not a part of path parameters
self.rm_req_nonblock = weblog.get("/waf/noharm?value=AiKfOeRcvG45")

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
assert self.rm_req_nonblock.status_code == 200

def setup_blocking_before(self):
Expand All @@ -362,8 +373,10 @@ class Test_Blocking_request_query:
"""Test if blocking is supported on server.request.query address"""

def setup_blocking(self):
self.rm_req_block1 = weblog.get("/waf", params={"foo": "xtrace"})
self.rm_req_block2 = weblog.get("/waf?foo=xtrace")
if not hasattr(self, "rm_req_block1") or self.rm_req_block1 is None:
self.rm_req_block1 = weblog.get("/waf", params={"foo": "xtrace"})
if not hasattr(self, "rm_req_block2") or self.rm_req_block2 is None:
self.rm_req_block2 = weblog.get("/waf?foo=xtrace")

def test_blocking(self):
"""Test if requests that should be blocked are blocked"""
Expand All @@ -372,13 +385,15 @@ def test_blocking(self):
interfaces.library.assert_waf_attack(response, rule="tst-037-001")

def setup_non_blocking(self):
self.setup_blocking()
# path parameters are not a part of query parameters
self.rm_req_nonblock1 = weblog.get("/waf/xtrace")
# query parameters are blocking only on value not parameter name
self.rm_req_nonblock2 = weblog.get("/waf?xtrace=foo")

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
for response in (self.rm_req_nonblock1, self.rm_req_nonblock2):
assert response.status_code == 200

Expand Down Expand Up @@ -406,8 +421,10 @@ class Test_Blocking_request_headers:
"""Test if blocking is supported on server.request.headers.no_cookies address"""

def setup_blocking(self):
self.rm_req_block1 = weblog.get("/waf", headers={"foo": "asldhkuqwgervf"})
self.rm_req_block2 = weblog.get("/waf", headers={"Accept-Language": "asldhkuqwgervf"})
if not hasattr(self, "rm_req_block1") or self.rm_req_block1 is None:
self.rm_req_block1 = weblog.get("/waf", headers={"foo": "asldhkuqwgervf"})
if not hasattr(self, "rm_req_block2") or self.rm_req_block2 is None:
self.rm_req_block2 = weblog.get("/waf", headers={"Accept-Language": "asldhkuqwgervf"})

def test_blocking(self):
"""Test if requests that should be blocked are blocked"""
Expand All @@ -416,13 +433,15 @@ def test_blocking(self):
interfaces.library.assert_waf_attack(response, rule="tst-037-003")

def setup_non_blocking(self):
self.setup_blocking()
# query parameters are not a part of headers
self.rm_req_nonblock1 = weblog.get("/waf?value=asldhkuqwgervf")
# header parameters are blocking only on value not parameter name
self.rm_req_nonblock2 = weblog.get("/waf", headers={"asldhkuqwgervf": "foo"})

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
for response in (self.rm_req_nonblock1, self.rm_req_nonblock2):
assert response.status_code == 200

Expand Down Expand Up @@ -450,8 +469,10 @@ class Test_Blocking_request_cookies:
"""Test if blocking is supported on server.request.cookies address"""

def setup_blocking(self):
self.rm_req_block1 = weblog.get("/waf", cookies={"foo": "jdfoSDGFkivRG_234"})
self.rm_req_block2 = weblog.get("/waf", cookies={"Accept-Language": "jdfoSDGFkivRG_234"})
if not hasattr(self, "rm_req_block1") or self.rm_req_block1 is None:
self.rm_req_block1 = weblog.get("/waf", cookies={"foo": "jdfoSDGFkivRG_234"})
if not hasattr(self, "rm_req_block2") or self.rm_req_block2 is None:
self.rm_req_block2 = weblog.get("/waf", cookies={"Accept-Language": "jdfoSDGFkivRG_234"})

def test_blocking(self):
"""Test if requests that should be blocked are blocked"""
Expand All @@ -460,13 +481,15 @@ def test_blocking(self):
interfaces.library.assert_waf_attack(response, rule="tst-037-008")

def setup_non_blocking(self):
self.setup_blocking()
# headers parameters are not a part of cookies
self.rm_req_nonblock1 = weblog.get("/waf", headers={"foo": "jdfoSDGFkivRG_234"})
# cookies parameters are blocking only on value not parameter name
self.rm_req_nonblock2 = weblog.get("/waf", headers={"jdfoSDGFkivRG_234": "foo"})

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
for response in (self.rm_req_nonblock1, self.rm_req_nonblock2):
assert response.status_code == 200

Expand Down Expand Up @@ -494,8 +517,10 @@ class Test_Blocking_request_body:
"""Test if blocking is supported on server.request.body address for urlencoded body"""

def setup_blocking(self):
self.rm_req_block1 = weblog.post("/waf", data={"value1": "bsldhkuqwgervf"})
self.rm_req_block2 = weblog.post("/waf", data={"foo": "bsldhkuqwgervf"})
if not hasattr(self, "rm_req_block1") or self.rm_req_block1 is None:
self.rm_req_block1 = weblog.post("/waf", data={"value1": "bsldhkuqwgervf"})
if not hasattr(self, "rm_req_block2") or self.rm_req_block2 is None:
self.rm_req_block2 = weblog.post("/waf", data={"foo": "bsldhkuqwgervf"})

def test_blocking(self):
"""Test if requests that should be blocked are blocked"""
Expand All @@ -504,6 +529,7 @@ def test_blocking(self):
interfaces.library.assert_waf_attack(response, rule="tst-037-004")

def setup_non_blocking(self):
self.setup_blocking()
# raw body are never parsed
self.rm_req_nonblock1 = weblog.post(
"/waf", data=b'\x00{"value3": "bsldhkuqwgervf"}\xFF', headers={"content-type": "application/octet-stream"}
Expand All @@ -512,10 +538,12 @@ def setup_non_blocking(self):

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
assert self.rm_req_nonblock1.status_code == 200
assert self.rm_req_nonblock2.status_code == 200

def setup_non_blocking_plain_text(self):
self.setup_blocking()
self.rm_req_nonblock_plain_text = weblog.post(
"/waf", data=b'{"value4": "bsldhkuqwgervf"}', headers={"content-type": "text/plain"}
)
Expand All @@ -525,6 +553,7 @@ def setup_non_blocking_plain_text(self):
reason="Blocks on text/plain if parsed to a String",
)
def test_non_blocking_plain_text(self):
self.test_blocking()
# TODO: This test is pending a better definition of when text/plain is considered parsed body,
# which depends on application logic.
assert self.rm_req_nonblock_plain_text.status_code == 200
Expand Down Expand Up @@ -552,7 +581,8 @@ class Test_Blocking_response_status:
"""Test if blocking is supported on server.response.status address"""

def setup_blocking(self):
self.rm_req_block = {status: weblog.get(f"/tag_value/anything/{status}") for status in (415, 416, 417, 418)}
if not hasattr(self, "rm_req_block") or self.rm_req_block is None:
self.rm_req_block = {status: weblog.get(f"/tag_value/anything/{status}") for status in (415, 416, 417, 418)}

def test_blocking(self):
"""Test if requests that should be blocked are blocked"""
Expand All @@ -561,10 +591,12 @@ def test_blocking(self):
interfaces.library.assert_waf_attack(response, rule="tst-037-005")

def setup_non_blocking(self):
self.setup_blocking()
self.rm_req_nonblock = {status: weblog.get(f"/tag_value/anything/{status}") for status in (411, 412, 413, 414)}

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
for code, response in self.rm_req_nonblock.items():
assert response.status_code == code, response.request.url

Expand All @@ -576,8 +608,10 @@ class Test_Blocking_response_headers:
"""Test if blocking is supported on server.response.headers.no_cookies address"""

def setup_blocking(self):
self.rm_req_block1 = weblog.get(f"/tag_value/anything/200?content-language=en-us")
self.rm_req_block2 = weblog.get(f"/tag_value/anything/200?content-language=krypton")
if not hasattr(self, "rm_req_block1") or self.rm_req_block1 is None:
self.rm_req_block1 = weblog.get(f"/tag_value/anything/200?content-language=en-us")
if not hasattr(self, "rm_req_block2") or self.rm_req_block2 is None:
self.rm_req_block2 = weblog.get(f"/tag_value/anything/200?content-language=krypton")

def test_blocking(self):
"""Test if requests that should be blocked are blocked"""
Expand All @@ -586,11 +620,13 @@ def test_blocking(self):
interfaces.library.assert_waf_attack(response, rule="tst-037-009")

def setup_non_blocking(self):
self.setup_blocking()
self.rm_req_nonblock1 = weblog.get(f"/tag_value/anything/200?content-color=en-us")
self.rm_req_nonblock2 = weblog.get(f"/tag_value/anything/200?content-language=fr")

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
for response in (self.rm_req_nonblock1, self.rm_req_nonblock2):
assert response.status_code == 200

Expand All @@ -606,6 +642,7 @@ def test_blocking(self):

def test_non_blocking(self):
"""Test if requests that should not be blocked are not blocked"""
self.test_blocking()
assert False, "TODO"

def test_blocking_before(self):
Expand Down
Loading