Skip to content

Support Django 5.1 and DRF 3.15.2 #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hostpolicy/api/permissions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from rest_framework.permissions import IsAuthenticated, SAFE_METHODS

from mreg.api.permissions import user_is_superuser, user_in_settings_group
from mreg.api.permissions import user_object_is_superuser, user_in_settings_group

from mreg.models.host import Host
from mreg.models.network import NetGroupRegexPermission
Expand All @@ -12,7 +12,7 @@ def user_is_hostpolicy_adminuser(user):


def is_super_or_hostpolicy_admin(user):
return user_is_superuser(user) or user_is_hostpolicy_adminuser(user)
return user_object_is_superuser(user) or user_is_hostpolicy_adminuser(user)


class IsSuperOrHostPolicyAdminOrReadOnly(IsAuthenticated):
Expand Down
5 changes: 0 additions & 5 deletions mreg/api/errors.py

This file was deleted.

32 changes: 32 additions & 0 deletions mreg/api/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from rest_framework import (exceptions, status)

from typing import Any

class CustomAPIExceptionError(exceptions.APIException):
def __init__(self, detail: Any = None):
detail = {"ERROR": detail if detail is not None else self.default_detail}
super().__init__(detail)


class ValidationError400(CustomAPIExceptionError):
status_code = 400
default_detail:str = 'Bad Request'

class ValidationError403(CustomAPIExceptionError):
status_code = status.HTTP_403_FORBIDDEN
default_detail:str = 'Forbidden'

class ValidationError404(CustomAPIExceptionError):
status_code = status.HTTP_404_NOT_FOUND
default_detail:str = 'Not Found'

class ValidationError409(CustomAPIExceptionError):
status_code = status.HTTP_409_CONFLICT
default_detail:str = 'Conflict'

class InternalServerError500(CustomAPIExceptionError):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
default_detail:str = 'Internal Server Error'

class NoIpAddressesError404(ValidationError404):
default_detail:str = 'No free ip addresses found in the network.'
61 changes: 36 additions & 25 deletions mreg/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
from rest_framework import exceptions
from rest_framework.permissions import IsAuthenticated, SAFE_METHODS

from typing import cast, TYPE_CHECKING

from mreg.api.v1.serializers import HostSerializer
from mreg.models.host import HostGroup
from mreg.models.network import NetGroupRegexPermission, Network

if TYPE_CHECKING:
from mreg.models.auth import User

NETWORK_ADMIN_GROUP = 'NETWORK_ADMIN_GROUP'
SUPERUSER_GROUP = 'SUPERUSER_GROUP'
ADMINUSER_GROUP = 'ADMINUSER_GROUP'
Expand Down Expand Up @@ -36,24 +41,23 @@ def _list_in_list(list_a, list_b):
return any(i in list_b for i in list_a)


def user_is_superuser(user):
def user_object_is_superuser(user: "User") -> bool:
return user_in_settings_group(user, 'SUPERUSER_GROUP')


def user_is_adminuser(user):
def user_object_is_adminuser(user: "User") -> bool:
return user_in_settings_group(user, 'ADMINUSER_GROUP')


def user_is_group_adminuser(user):
def user_object_is_group_adminuser(user: "User") -> bool:
return user_in_settings_group(user, 'GROUPADMINUSER_GROUP')

def user_object_is_network_adminuser(user: "User") -> bool:
return user_in_settings_group(user, 'NETWORK_ADMIN_GROUP')

def is_super_or_admin(user):
return user_is_superuser(user) or user_is_adminuser(user)

def is_super_or_admin(user: "User") -> bool:
return user_object_is_superuser(user) or user_object_is_adminuser(user)

def is_super_or_group_admin(user):
return user_is_superuser(user) or user_is_group_adminuser(user)
def is_super_or_group_admin(user: "User") -> bool:
return user_object_is_superuser(user) or user_object_is_group_adminuser(user)


class IsAuthenticatedAndReadOnly(IsAuthenticated):
Expand Down Expand Up @@ -84,7 +88,7 @@ def has_permission(self, request, view):
return False
if request.method in SAFE_METHODS:
return True
return is_super_or_admin(request.user)
return is_super_or_admin(cast("User", request.user))


class IsSuperOrNetworkAdminMember(IsAuthenticated):
Expand All @@ -97,7 +101,7 @@ def has_permission(self, request, view):
import mreg.api.v1.views
if not super().has_permission(request, view):
return False
if user_is_superuser(request.user):
if user_object_is_superuser(cast("User", request.user)):
return True
if request_in_settings_group(request, NETWORK_ADMIN_GROUP):
if isinstance(view, mreg.api.v1.views.NetworkDetail):
Expand All @@ -122,7 +126,7 @@ def has_permission(self, request, view):
return False
if request.method in SAFE_METHODS:
return True
return is_super_or_group_admin(request.user)
return is_super_or_group_admin(cast("User", request.user))


def _deny_superuser_only_names(data=None, name=None, view=None, request=None):
Expand All @@ -135,6 +139,9 @@ def _deny_superuser_only_names(data=None, name=None, view=None, request=None):
if 'host' in data:
name = data['host'].name

if name is None:
return False

# Underscore is allowed for non-superuser in SRV records,
# and for members of <DNS_UNDERSCORE_GROUP> in all records.
if '_' in name and not isinstance(view, (mreg.api.v1.views.SrvDetail,
Expand All @@ -150,8 +157,10 @@ def _deny_superuser_only_names(data=None, name=None, view=None, request=None):
return False


def is_reserved_ip(ip):
network = Network.objects.filter(network__net_contains=ip).first()
def is_reserved_ip(ip, network=None):
if network is None:
network = Network.objects.filter(network__net_contains=ip).first()

if network:
return any(ip == str(i) for i in network.get_reserved_ipaddresses())
return False
Expand All @@ -174,17 +183,18 @@ class IsGrantedNetGroupRegexPermission(IsAuthenticated):
"""

def has_permission(self, request, view):
user = cast("User", request.user)
# This method is called before the view is executed, so
# just do some preliminary checks.
if not super().has_permission(request, view):
return False
if request.method in SAFE_METHODS:
return True
if is_super_or_admin(request.user):
if is_super_or_admin(user):
return True
# Will do do more object checks later, but initially refuse any
# unwarranted requests.
if NetGroupRegexPermission.objects.filter(group__in=request.user.group_list
if NetGroupRegexPermission.objects.filter(group__in=user.group_list
).exists():
return True
return False
Expand All @@ -200,7 +210,7 @@ def has_obj_perm(self, user, obj):
def has_create_permission(self, request, view, validated_serializer):
import mreg.api.v1.views

if user_is_superuser(request.user):
if user_object_is_superuser(request.user):
return True

hostname = None
Expand All @@ -211,7 +221,7 @@ def has_create_permission(self, request, view, validated_serializer):
if 'ipaddress' in data:
if _deny_reserved_ipaddress(data['ipaddress'], request):
return False
if user_is_adminuser(request.user):
if user_object_is_adminuser(request.user):
return True
if isinstance(view, (mreg.api.v1.views.IpaddressList,
mreg.api.v1.views.PtrOverrideList)):
Expand Down Expand Up @@ -245,7 +255,7 @@ def has_create_permission(self, request, view, validated_serializer):
def has_destroy_permission(self, request, view, validated_serializer):
import mreg.api.v1.views

if user_is_superuser(request.user):
if user_object_is_superuser(request.user):
return True
obj = view.get_object()
if isinstance(view, mreg.api.v1.views.HostDetail):
Expand All @@ -261,21 +271,21 @@ def has_destroy_permission(self, request, view, validated_serializer):
if hasattr(obj, 'ipaddress'):
if _deny_reserved_ipaddress(obj.ipaddress, request):
return False
if user_is_adminuser(request.user):
if user_object_is_adminuser(request.user):
return True
return self.has_obj_perm(request.user, obj)

def has_update_permission(self, request, view, validated_serializer):
import mreg.api.v1.views
if user_is_superuser(request.user):
if user_object_is_superuser(request.user):
return True
data = validated_serializer.validated_data
if _deny_superuser_only_names(data=data, view=view, request=request):
return False
if 'ipaddress' in data:
if _deny_reserved_ipaddress(data['ipaddress'], request):
return False
if user_is_adminuser(request.user):
if user_object_is_adminuser(request.user):
return True
obj = view.get_object()
if isinstance(view, mreg.api.v1.views.HostDetail):
Expand Down Expand Up @@ -308,17 +318,18 @@ def _get_hostname_and_ips(self, hostobject):
class HostGroupPermission(IsAuthenticated):

def has_permission(self, request, view):
user = cast("User", request.user)
# This method is called before the view is executed, so
# just do some preliminary checks.
if not super().has_permission(request, view):
return False
if request.method in SAFE_METHODS:
return True
if is_super_or_group_admin(request.user):
if is_super_or_group_admin(user):
return True
# Will do do more object checks later, but initially refuse any
# unwarranted requests.
if HostGroup.objects.filter(owners__name__in=request.user.group_list).exists():
if HostGroup.objects.filter(owners__name__in=user.group_list).exists():
return True
return False

Expand Down
70 changes: 63 additions & 7 deletions mreg/api/v1/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.contrib.auth.models import Group
from django.utils import timezone
from django.db import transaction

from rest_framework import serializers

Expand All @@ -14,8 +15,7 @@

from mreg.utils import (nonify, normalize_mac)
from mreg.validators import (validate_keys, validate_normalizeable_mac_address)
from mreg.api.errors import ValidationError409

from mreg.api.exceptions import ValidationError409, ValidationError400

class ValidationMixin:
"""Provides standard validation of data fields"""
Expand Down Expand Up @@ -363,18 +363,74 @@ class Meta:
model = HostGroup
fields = '__all__'


def _validate_ip_not_in_network_excluded_range(ip):
if ip is None:
return
qs = NetworkExcludedRange.objects.filter(start_ip__lte=ip,
end_ip__gte=ip)
qs = NetworkExcludedRange.objects.filter(start_ip__lte=ip, end_ip__gte=ip)
if qs.exists():
raise serializers.ValidationError(
f"IP {ip} in an excluded range: {qs.first()}")
raise ValidationError400(f"IP {ip} in an excluded range: {qs.first()}")


class LabelSerializer(serializers.ModelSerializer):
class Meta:
model = Label
fields = '__all__'

class HostCreateSerializer(serializers.ModelSerializer):
name = serializers.CharField(required=True)
ipaddress = serializers.CharField(write_only=True, required=False)
network = serializers.CharField(write_only=True, required=False)
allocation_method = serializers.CharField(write_only=True, required=False)

class Meta:
model = Host
fields = ['id', 'name', 'ipaddress', 'network', 'allocation_method']
extra_kwargs = {'id': {'read_only': True}}

def validate_name(self, value):
# Check if name is already in use in Host
if Host.objects.filter(name=value).exists():
raise ValidationError409("name already in use")
# Check if name is already in use as a CNAME
if Cname.objects.filter(name=value).exists():
raise ValidationError409("name already in use as a cname")
return value

def validate_ipaddress(self, value):
try:
ipaddress.ip_address(value)
except ValueError as error:
raise ValidationError400(str(error))

return value

def validate(self, data):
# No need to call super().validate(data) since we're handling all validations here
ipaddress = data.get('ipaddress')
network = data.get('network')
allocation_method = data.get('allocation_method')

# 'ipaddress' and 'network' are mutually exclusive
if ipaddress and network:
raise ValidationError400("'ipaddress' and 'network' is mutually exclusive")

# 'allocation_method' is only allowed with 'network'
if allocation_method and not network:
raise ValidationError400("allocation_method is only allowed with 'network'")

return data

def create(self, validated_data):
ipaddress = validated_data.pop('ipaddress', None)

# Start atomic transaction
with transaction.atomic():
host = Host.objects.create(**validated_data)

if ipaddress:
self.validate_ipaddress(ipaddress)
Ipaddress.objects.create(host=host, ipaddress=ipaddress)
else:
pass

return host
4 changes: 3 additions & 1 deletion mreg/api/v1/tests/test_zonefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ def test_excluding_private_addresses(self):
{'name': 'delta', 'ip': '129.240.130.240', 'private': False},
{'name': 'echo', 'ip': '2001:700:100:4003::29', 'private': False}
]
print("Initial:", self.forward)
for h in testhosts:
self._add_host('{}.{}'.format(h['name'], self.forward.name), h['ip'])
print(self._add_host('{}.{}'.format(h['name'], self.forward.name), h['ip']))
# get the forward zone file, verify it contains both private and non-private addresses
data = self._get_zone(self.forward)
print("Zone after:", data)
for h in testhosts:
self.assertIn(h['ip'], data)
# get the forward zone file but with private addresses excluded, verify it contains only public addresses
Expand Down
Loading
Loading