diff --git a/README.md b/README.md index 0ac8dbe..35029e5 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,8 @@ The available ISO 3166 alpha-2 country codes are listed in [here](https://www.ib ISO continent codes are: `AF` - Africa, `AN` - Antarctica, `AS` - Asia, `EU` - Europe, `NA` - North America, `OC` - Oceania and `SA` - South America. +### Check access on timeout + Without additional configuration, the middleware will check the user's access on every request. This can slow down the site. To avoid this, you can use the `FORBID_TIMEOUT` variable to set the cache timeout in seconds. When the timeout expires, the middleware will check the user's access again. @@ -80,6 +82,12 @@ expires, the middleware will check the user's access again. FORBID_TIMEOUT = 60 * 10 ``` +### Detect usage of a VPN + +If you want to detect the usage of a VPN, you can use the `FORBID_VPN` variable. When this variable is set to `True`, +the middleware will check if the user's timezone matches the timezone the IP address belongs to. If the timezones do not +match, the user will be considered in the usage of a VPN and forbidden to access the site. + ## Contribute Any contribution is welcome. If you have any ideas or suggestions, feel free to open an issue or a pull request. And diff --git a/src/django_forbid/__init__.py b/src/django_forbid/__init__.py index 27fdca4..81f0fde 100644 --- a/src/django_forbid/__init__.py +++ b/src/django_forbid/__init__.py @@ -1 +1 @@ -__version__ = "0.0.3" +__version__ = "0.0.4" diff --git a/src/django_forbid/access.py b/src/django_forbid/access.py index df3e5a3..16dcc35 100644 --- a/src/django_forbid/access.py +++ b/src/django_forbid/access.py @@ -43,38 +43,57 @@ def __init__(self): for territory in getattr(settings, self.territories, []): self.rules.append(ContinentRule(territory.upper())) - def grants(self, ip_address): + def accessible(self, city): """Checks if the IP address is in the white zone.""" - city = self.geoip.city(ip_address) return any(map(lambda rule: rule(city), self.rules)) + def grants(self, city): + """Checks if the IP address is permitted.""" + raise NotImplementedError + class PermitAccess(Access): countries = "WHITELIST_COUNTRIES" territories = "WHITELIST_TERRITORIES" - def grants(self, ip_address): + def grants(self, city): """Checks if the IP address is permitted.""" - try: - return not self.rules or super().grants(ip_address) - except AddressNotFoundError: - return getattr(settings, "DEBUG", False) + return not self.rules or self.accessible(city) class ForbidAccess(Access): countries = "FORBIDDEN_COUNTRIES" territories = "FORBIDDEN_TERRITORIES" - def grants(self, ip_address): + def grants(self, city): """Checks if the IP address is forbidden.""" - try: - return not self.rules or not super().grants(ip_address) - except AddressNotFoundError: - return getattr(settings, "DEBUG", False) + return not self.rules or not self.accessible(city) -def grants_access(ip_address): +def grants_access(request, ip_address): """Checks if the IP address is in the white zone.""" - if ForbidAccess().grants(ip_address): - return PermitAccess().grants(ip_address) - return False + try: + city = Access.geoip.city(ip_address) + + # Saves the timezone in the session for + # comparing it with the timezone in the + # POST request sent from user's browser + # to detect if the user is using VPN. + timezone = city.get("time_zone") + request.session["tz"] = timezone + + # First, checks if the IP address is not + # forbidden. If it is, False is returned + # otherwise, checks if the IP address is + # permitted. + if ForbidAccess().grants(city): + return PermitAccess().grants(city) + return False + except (AddressNotFoundError, Exception): + # This happens when the IP address is not + # in the GeoIP2 database. Usually, this + # happens when the IP address is a local. + return not any([ + ForbidAccess().rules, + PermitAccess().rules, + ]) or getattr(settings, "DEBUG", False) diff --git a/src/django_forbid/detect.py b/src/django_forbid/detect.py new file mode 100644 index 0000000..13dd59b --- /dev/null +++ b/src/django_forbid/detect.py @@ -0,0 +1,58 @@ +import json +import re + +from django.conf import settings +from django.http import HttpResponse +from django.http import HttpResponseForbidden +from django.shortcuts import redirect +from django.shortcuts import render + + +def detect_vpn(get_response, request): + response_attributes = ("content", "charset", "status", "reason") + + def erase_response_attributes(): + for attr in response_attributes: + request.session.pop(attr) + + if any([ + # The session key is checked to avoid + # redirect loops in development mode. + not request.session.has_key("tz"), + # Checks if FORBID_VPN is False or not set. + not getattr(settings, "FORBID_VPN", False), + # Checks if the request is an AJAX request. + not re.search( + r"\w+\/(?:html|xhtml\+xml|xml)", + request.META.get("HTTP_ACCEPT"), + ), + ]): + return get_response(request) + + if all(map(request.session.has_key, ("tz", *response_attributes))): + # Handles if the user's timezone differs from the + # one determined by GeoIP API. If so, VPN is used. + if request.POST.get("timezone", "N/A") != request.session.get("tz"): + erase_response_attributes() + if hasattr(settings, "FORBIDDEN_URL"): + return redirect(settings.FORBIDDEN_URL) + return HttpResponseForbidden() + + # Restores the response from the session. + response = HttpResponse(**{attr: request.session.get(attr) for attr in response_attributes}) + if hasattr(response, "headers"): + response.headers = json.loads(request.session.get("headers")) + erase_response_attributes() + return response + + # Gets the response and saves attributes in the session to restore it later. + response = get_response(request) + if hasattr(response, "headers"): + # In older versions of Django, HttpResponse does not have headers attribute. + request.session["headers"] = json.dumps(dict(response.headers)) + request.session["content"] = response.content.decode(response.charset) + request.session["charset"] = response.charset + request.session["status"] = response.status_code + request.session["reason"] = response.reason_phrase + + return render(request, "timezone.html", status=302) diff --git a/src/django_forbid/middleware.py b/src/django_forbid/middleware.py index f35ba6b..cd7b0f1 100644 --- a/src/django_forbid/middleware.py +++ b/src/django_forbid/middleware.py @@ -6,6 +6,7 @@ from django.utils.timezone import utc from .access import grants_access +from .detect import detect_vpn class ForbidMiddleware: @@ -24,13 +25,13 @@ def __call__(self, request): # Checks if access is not timed out yet. if acss - request.session.get("ACCESS") < settings.FORBID_TIMEOUT: - return self.get_response(request) + return detect_vpn(self.get_response, request) # Checks if access is granted when timeout is reached. - if grants_access(address.split(",")[0].strip()): + if grants_access(request, address.split(",")[0].strip()): acss = datetime.utcnow().replace(tzinfo=utc) request.session["ACCESS"] = acss.timestamp() - return self.get_response(request) + return detect_vpn(self.get_response, request) # Redirects to forbidden page if URL is set. if hasattr(settings, "FORBIDDEN_URL"): diff --git a/src/django_forbid/templates/timezone.html b/src/django_forbid/templates/timezone.html new file mode 100644 index 0000000..6242038 --- /dev/null +++ b/src/django_forbid/templates/timezone.html @@ -0,0 +1,19 @@ +
+ + + + \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 427ce53..2aab0d0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,9 @@ -from django.conf import settings from pathlib import Path +import pytest +from django.conf import settings +from django.http import HttpResponse + def pytest_configure(): settings.configure( @@ -10,6 +13,17 @@ def pytest_configure(): MIDDLEWARE=[ "django_forbid.middleware.ForbidMiddleware" ], + TEMPLATES=[{"BACKEND": "django.template.backends.django.DjangoTemplates", "APP_DIRS": True}], # The `pathlib.Path` support was added after Django 3.0. GEOIP_PATH=(Path(__file__).parent / "geoip").as_posix(), ) + + +@pytest.fixture +def get_response(): + """A dummy view function.""" + + def get_response_mock(_): + return HttpResponse() + + return get_response_mock diff --git a/tests/test_grants_access.py b/tests/test_grants_access.py index 667ec12..d3ce67b 100644 --- a/tests/test_grants_access.py +++ b/tests/test_grants_access.py @@ -1,82 +1,100 @@ +from django.test import RequestFactory from django.test import override_settings from django_forbid.access import grants_access +factory = RequestFactory() +request = factory.get("/") +request.session = {} + +LOCALHOST = "localhost" +IP_LOCAL1 = "0.0.0.0" +IP_LOCAL2 = "127.0.0.1" +IP_LONDON = "212.102.63.59" + def test_access_without_configuration(): """If no configuration is provided, access is granted everywhere.""" - assert grants_access("doesnt-matter") + assert grants_access(request, LOCALHOST) + + +@override_settings(FORBID_VPN=True) +def test_access_forbid_vpn(): + """If VPN detection is enabled, access is granted everywhere.""" + assert grants_access(request, LOCALHOST) @override_settings(WHITELIST_COUNTRIES=["US"], DEBUG=True) def test_access_from_localhost_development_mode(): """In development mode, access is granted from localhost.""" - assert grants_access("127.0.0.1") - assert grants_access("localhost") + assert grants_access(request, IP_LOCAL1) + assert grants_access(request, IP_LOCAL2) + assert grants_access(request, LOCALHOST) @override_settings(WHITELIST_COUNTRIES=["US"]) def test_access_from_localhost_production_mode(): """In production mode, access is not granted from localhost.""" - assert not grants_access("127.0.0.1") - assert not grants_access("localhost") + assert not grants_access(request, IP_LOCAL1) + assert not grants_access(request, IP_LOCAL2) + assert not grants_access(request, LOCALHOST) @override_settings(WHITELIST_COUNTRIES=["GB"]) def test_access_from_gb_when_gb_in_countries_whitelist(): """Access is granted from GB when GB is in the counties' whitelist.""" - assert grants_access("212.102.63.59") + assert grants_access(request, IP_LONDON) @override_settings(WHITELIST_COUNTRIES=["US"]) def test_access_from_gb_when_gb_not_in_countries_whitelist(): """Access is not granted from GB when GB is not in the counties' whitelist.""" - assert not grants_access("212.102.63.59") + assert not grants_access(request, IP_LONDON) @override_settings(WHITELIST_TERRITORIES=["EU"]) def test_access_from_gb_when_eu_in_continent_whitelist(): """Access is granted from GB when EU is in the continents' whitelist.""" - assert grants_access("212.102.63.59") + assert grants_access(request, IP_LONDON) @override_settings(WHITELIST_TERRITORIES=["US"]) def test_access_from_gb_when_gb_not_in_continent_whitelist(): """Access is not granted from GB when EU is not in the continents' whitelist.""" - assert not grants_access("212.102.63.59") + assert not grants_access(request, IP_LONDON) @override_settings(FORBIDDEN_COUNTRIES=["GB"]) def test_access_from_gb_when_gb_in_forbidden_countries(): """Access is not granted from GB when GB is in the forbidden list.""" - assert not grants_access("212.102.63.59") + assert not grants_access(request, IP_LONDON) @override_settings(FORBIDDEN_COUNTRIES=["RU"]) def test_access_from_gb_when_gb_not_in_forbidden_countries(): """Access is granted from GB when GB is not in the forbidden list.""" - assert grants_access("212.102.63.59") + assert grants_access(request, IP_LONDON) @override_settings(FORBIDDEN_TERRITORIES=["EU"]) def test_access_from_gb_when_eu_in_forbidden_territories(): """Access is not granted from GB when EU is in the forbidden list.""" - assert not grants_access("212.102.63.59") + assert not grants_access(request, IP_LONDON) @override_settings(FORBIDDEN_TERRITORIES=["AS"]) def test_access_from_gb_when_eu_not_in_forbidden_territories(): """Access is granted from GB when EU is not in the forbidden list.""" - assert grants_access("212.102.63.59") + assert grants_access(request, IP_LONDON) @override_settings(WHITELIST_TERRITORIES=["EU"], FORBIDDEN_COUNTRIES=["GB"]) def test_mix_config_access_from_gb_when_eu_in_whitelist_but_gb_is_forbidden(): """Access is not granted from GB when EU is in the whitelist but GB is forbidden.""" - assert not grants_access("212.102.63.59") + assert not grants_access(request, IP_LONDON) @override_settings(WHITELIST_COUNTRIES=["GB"], FORBIDDEN_COUNTRIES=["GB"]) def test_mix_config_access_from_gb_when_gb_in_both_lists(): """Access is not granted from GB when GB is in both lists.""" - assert not grants_access("212.102.63.59") + assert not grants_access(request, IP_LONDON) diff --git a/tests/test_vpn_detection.py b/tests/test_vpn_detection.py new file mode 100644 index 0000000..f5b8d82 --- /dev/null +++ b/tests/test_vpn_detection.py @@ -0,0 +1,111 @@ +from django.test import RequestFactory +from django.test import override_settings + +from django_forbid.access import grants_access +from django_forbid.detect import detect_vpn + +LOCALHOST = "localhost" +IP_LONDON = "212.102.63.59" +IP_ZURICH = "146.70.99.178" + + +class SessionStore(dict): + def has_key(self, key): + return key in self + + +class WSGIRequest: + def __init__(self, accept): + self.accept = accept + self.session = SessionStore() + + def get(self): + request = RequestFactory().get("/") + request.session = self.session + request.META["HTTP_ACCEPT"] = self.accept + return request + + def post(self, data): + request = RequestFactory().post("/", data) + request.session = self.session + request.META["HTTP_ACCEPT"] = self.accept + return request + + +class Detector: + def __init__(self, get_response, ajax=False): + access = "*/*" if ajax else "text/html" + self.request = WSGIRequest(access) + self.get_response = get_response + + def request_resource(self, ip_address=""): + """Sends a request to the server to access a resource""" + request = self.request.get() + grants_access(request, ip_address) + return detect_vpn(self.get_response, request) + + def request_access(self): + """Simulates the request sent by the user browser to the server""" + request = self.request.post({"timezone": "Europe/London"}) + return detect_vpn(self.get_response, request) + + +@override_settings(FORBID_VPN=True) +def test_detect_when_using_localhost(get_response): + """It should give access to the user when using localhost""" + detector = Detector(get_response) + response = detector.request_resource(LOCALHOST) + assert response.status_code == 200 + + +@override_settings(FORBID_VPN=True) +def test_detect_when_using_localhost_ajax(get_response): + """It should give access to the user when request is done by AJAX""" + detector = Detector(get_response, True) + response = detector.request_resource(LOCALHOST) + assert response.status_code == 200 + + +@override_settings(FORBID_VPN=True) +def test_detect_when_using_nonlocal_ip(get_response): + """User should pass through two requests to access the resource""" + detector = Detector(get_response) + response = detector.request_resource(IP_LONDON) + assert response.status_code == 302 + response = detector.request_access() + assert response.status_code == 200 + + +@override_settings(FORBID_VPN=True) +def test_detect_when_using_vpn(get_response): + """User should be forbidden to access the resource when using VPN""" + detector = Detector(get_response) + response = detector.request_resource(IP_ZURICH) + assert response.status_code == 302 + response = detector.request_access() + assert response.status_code == 403 + + +@override_settings(FORBID_VPN=True) +def test_detect_when_turns_off_vpn_after_using(get_response): + """User should get access to the resource when VPN is turned off""" + detector = Detector(get_response) + response = detector.request_resource(IP_ZURICH) + assert response.status_code == 302 + response = detector.request_access() + assert response.status_code == 403 + + # Turn off VPN - back to London + detector = Detector(get_response) + response = detector.request_resource(IP_LONDON) + assert response.status_code == 302 + response = detector.request_access() + assert response.status_code == 200 + + +@override_settings(FORBID_VPN=True) +def test_detect_when_using_nonlocal_ip_ajax(get_response): + """It should give access to the user when request is done by AJAX""" + detector = Detector(get_response, True) + response = detector.request_resource(IP_LONDON) + assert response.status_code == 200