Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/app/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from oauth2_metadata.views import (
DynamicClientRegistrationView,
OAuthAuthorizeView,
authorization_server_metadata,
)
from users.views import password_reset_redirect
Expand Down Expand Up @@ -57,6 +58,11 @@
"robots.txt",
TemplateView.as_view(template_name="robots.txt", content_type="text/plain"),
),
path(
"api/v1/oauth/authorize/",
OAuthAuthorizeView.as_view(),
name="oauth-authorize",
),
path(
"o/register/",
DynamicClientRegistrationView.as_view(),
Expand Down
12 changes: 12 additions & 0 deletions api/oauth2_metadata/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@

from oauth2_metadata.services import validate_redirect_uri


class OAuthConsentSerializer(serializers.Serializer): # type: ignore[type-arg]
allow = serializers.BooleanField()
client_id = serializers.CharField()
redirect_uri = serializers.CharField()
response_type = serializers.CharField()
scope = serializers.CharField(required=False, default="mcp")
code_challenge = serializers.CharField()
code_challenge_method = serializers.CharField()
state = serializers.CharField(required=False, allow_blank=True, default="")


# Allow ASCII letters, digits, spaces, hyphens, underscores, dots, and parentheses.
# ASCII-only to prevent Unicode homoglyph spoofing on the consent screen.
_CLIENT_NAME_RE = re.compile(r"^[\w\s.\-()]+$", re.ASCII)
Expand Down
106 changes: 103 additions & 3 deletions api/oauth2_metadata/views.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
from typing import Any
from urllib.parse import urlencode, urlparse, urlunparse

from django.conf import settings
from django.http import HttpRequest, JsonResponse
from django.http import HttpRequest, JsonResponse, QueryDict
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_GET
from oauth2_provider.exceptions import OAuthToolkitError
from oauth2_provider.models import get_application_model
from oauth2_provider.scopes import get_scopes_backend
from oauth2_provider.views.mixins import OAuthLibMixin
from rest_framework import status
from rest_framework import status as drf_status
from rest_framework.permissions import AllowAny
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.throttling import ScopedRateThrottle
from rest_framework.views import APIView

from oauth2_metadata.serializers import DCRRequestSerializer
from oauth2_metadata.serializers import DCRRequestSerializer, OAuthConsentSerializer
from oauth2_metadata.services import create_oauth2_application


Expand Down Expand Up @@ -46,6 +52,100 @@ def authorization_server_metadata(request: HttpRequest) -> JsonResponse:
return JsonResponse(metadata)


class OAuthAuthorizeView(OAuthLibMixin, APIView): # type: ignore[misc]
"""Validate an OAuth authorisation request and process consent decisions."""

permission_classes = [IsAuthenticated]

def get(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""Validate an authorisation request and return application info."""
# Bridge DRF auth to Django request so DOT sees the authenticated user.
request._request.user = request.user

try:
scopes, credentials = self.validate_authorization_request(request._request)
except OAuthToolkitError as e:
oauthlib_error = e.oauthlib_error
return Response(
{
"error": getattr(oauthlib_error, "error", "invalid_request"),
"error_description": getattr(oauthlib_error, "description", str(e)),
},
status=status.HTTP_400_BAD_REQUEST,
)

Application = get_application_model()
application = Application.objects.get(
client_id=credentials["client_id"],
)
all_scopes = get_scopes_backend().get_all_scopes()
scopes_dict: dict[str, str] = {s: all_scopes.get(s, s) for s in scopes}
return Response(
{
"application": {
"name": application.name,
"client_id": application.client_id,
},
"scopes": scopes_dict,
"redirect_uri": credentials.get("redirect_uri", ""),
# skip_authorization is safe to reuse here: this custom view
# always shows the consent screen regardless of this flag.
# We only use it as a trust signal for the frontend UI.
"is_verified": bool(application.skip_authorization),
}
)

def post(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""Process a consent decision and return the redirect URI."""
serializer = OAuthConsentSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data: dict[str, Any] = serializer.validated_data
allow: bool = data.pop("allow")

# Bridge DRF auth to Django request so DOT sees the authenticated user.
request._request.user = request.user

# DOT's validate_authorization_request reads OAuth params from GET
# and also from request.get_full_path() which uses META['QUERY_STRING'].
# This is necessary because DOT's OAuthLibMixin is designed for
# form-based flows where params arrive via GET. If a DOT upgrade
# changes how it reads params, this will need updating.
query = QueryDict(mutable=True)
for key, value in data.items():
query[key] = str(value)
request._request.GET = query # type: ignore[assignment]
request._request.META["QUERY_STRING"] = query.urlencode()

try:
scopes, credentials = self.validate_authorization_request(request._request)
except OAuthToolkitError as e:
oauthlib_error = e.oauthlib_error
return Response(
{
"error": getattr(oauthlib_error, "error", "invalid_request"),
"error_description": getattr(oauthlib_error, "description", str(e)),
},
status=status.HTTP_400_BAD_REQUEST,
)

try:
scopes_str = " ".join(scopes) if isinstance(scopes, list) else scopes
uri, _headers, _body, _status = self.create_authorization_response(
request._request, scopes_str, credentials, allow
)
except OAuthToolkitError:
# User denied access -- build the error redirect manually.
redirect_uri = credentials.get("redirect_uri", data.get("redirect_uri", ""))
state = credentials.get("state", data.get("state", ""))
error_params: dict[str, str] = {"error": "access_denied"}
if state:
error_params["state"] = state
parsed = urlparse(str(redirect_uri))
uri = urlunparse(parsed._replace(query=urlencode(error_params)))

return Response({"redirect_uri": uri})


class DynamicClientRegistrationView(APIView):
"""RFC 7591 Dynamic Client Registration endpoint."""

Expand Down
Loading
Loading