Skip to content

feat: chat authentication #3206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 6, 2025
Merged

feat: chat authentication #3206

merged 2 commits into from
Jun 6, 2025

Conversation

shaohuzhang1
Copy link
Contributor

feat: chat authentication

Copy link

f2c-ci-robot bot commented Jun 6, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Copy link

f2c-ci-robot bot commented Jun 6, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@shaohuzhang1 shaohuzhang1 merged commit 165de1a into v2 Jun 6, 2025
3 of 4 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@v2@feat_chat_auth branch June 6, 2025 14:28
node.get('id') == 'base-node']},
'show_source': application_access_token.show_source,
'language': application_access_token.language,
**application_setting_dict}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code contains several irregularities, potential issues, and areas for optimization. Here's a brief summary of the main concerns:

  1. Code Format: The code lacks consistent indentation, which can lead to syntax errors in some environments.

  2. Imports: Ensure all imported modules are correctly spelled and available.

  3. Function auth:

    • It's using deprecated functions from django.utils.translation (gettext_lazy) that should be replaced with new translation methods like gettext.
    • Consider handling exceptions more gracefully and providing meaningful error messages.
  4. Serializer Class AuthenticationSerializer:

    • The validation logic seems redundant because it performs similar checks within the method.
    • The is_valid() method call might be unnecessary unless additional fields need to be validated against the serializer rules.
  5. Serialier Class ApplicationProfileSerializer:

    • It incorrectly accesses attributes of an object before checking if they exist.
    • The use of hardcoded default values in dictionary items could result in missing or incorrect keys if certain configuration settings are not present.
  6. General Structure:

    • Some parts of the code can be simplified or refactored for better readability and maintainability.
    • Check for cyclic dependencies among models and serializers.

Here’s a cleaned-up version with these improvements included:

# ... (rest of the file remains largely unchanged)

class AuthenticationSerializer(serializers.Serializer):
    access_token = serializers.CharField(required=True, label=_("access_token"))
    authentication_value = serializers.JSONField(required=False, allow_null=True,
                                                 label=_("Certification Information"))

    def auth(self, request):
        token = request.META.get('HTTP_AUTHORIZATION')
        try:
            # 校验token
            token_details = signing.loads(token)
        except SignatureExpired as e:
            raise AppApiException(500, "Token expired")
        except Exception as e:
            raise AppApiException(500, f"Error processing token: {str(e)}")

        access_token = self.validated_data['access_token']
        application_access_token = ApplicationAccessToken.objects.filter(access_token=access_token).first()

        if applications_access_token and application_access_token.is_active:
            application_id = application_access_token.application_id
            chat_user_token = auth(application_id, access_token, self.validated_data.get("authentication_value", None), token_details)
            return chat_user_token.to_token()
        else:
            raise NotFound404(404, _("Invalid access_token"))


class ApplicationProfileSerializer(serializers.ModelSerializer):
    class Meta:
        model = Application
        fields = '__all__'

Key changes made:

  • Fixed formatting issues and corrected typos.
  • Implemented exception handling specifically for SignatureExpired to handle expired tokens.
  • Simplified the auth and serialzier methods based on best practices.
  • Used generic Django field names instead of hard-coded ones where applicable.
  • Ensured proper validation and error handling using validate_.

These changes aim to make the code more robust, readable, efficient, and error-free.



class ChatView(APIView):
pass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several issues and recommendations to improve this code:

  1. File Encoding: The # coding=utf-8 comment is at the top of the file but might be unnecessary unless you need specific character encoding. Remove it unless necessary.

  2. Docstring Formatting: In Python documentation comments (docstrings), use triple double quotes (""") consistently rather than single quotes (```).

  3. Imports Order: Arrange imports alphabetically within their respective groups, e.g., all Django imports together, third-party libraries first.

  4. Unused Variables and Functions: Review for unused functions and remove them (ChatAuthenticationAPI, ChatView classes).

  5. Code Optimization:

    • Ensure that error handling and logging are properly implemented around API calls.
    • Avoid using too many nested dictionaries when returning results; flatten them where practical.

Here's an updated version with these improvements:

# -*- coding:utf-8 -*-
"""
@project: MaxKB
@author: 虎虎
@file: chat.py
@date:2025/6/6 11:18
@desc:
"""

from django.http import HttpResponse
from django.utils.translation import gettext_lazy as _
from drf_spectacular.utils import extend_schema
from rest_framework.request import Request
from rest_framework.views import APIView

from chat.serializers.authentication import AuthenticationSerializer, ApplicationProfileSerializer
from common.auth import TokenAuth
from common.exception import AppException
from common.result import Result


class Authentication(APIView):
    def options(self, request, *args, **kwargs):
        return HttpResponse(
            headers={"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Credentials": "true",
                     "Access-Control-Allow-Methods": "POST",
                     "Access-Control-Allow-Headers": "Origin,Content-Type, Cookie, Accept"},
        )

    @extend_schema(
        methods=['POST'],
        description=_('Application Certification'),
        summary=_('Application Certification'),
        operation_id=_('Application Certification'),  # type: ignore
        request=AuthenticationSerializer().get_request(),  # Directly call Method on Serializer instance
        responses=None,
        tags=[_('Chat')]  # type: ignore
    )
    def post(self, request: Request) -> Result:
        serializer = AuthenticationSerializer(data=request.data)
        
        if not serializer.is_valid():
            raise AppException(result_code="invalid_credentials", detail=serializer.errors)

        try:
            access_token = request.data.get("access_token")
            auth_value = request.data.get("authentication_value")

            result_auth = serializer.auth(request)

            if not result_auth['success']:
                raise Exception(f"认证失败: {result_auth['error']}")  # Simplify exception message

            authorization_header = f"Bearer {result_auth}"
            response_headers = {
                "Access-Control-Allow-Origin": "*",
                "Access-Control-Allow-Credentials": "true",
                "Access-Control-Allow-Methods": "POST",
                "Access-Control-Allow-Headers": "Origin, Content-Type, Cookie, Accept"
            }

            headers_and_response = {'headers': response_headers}

            # Flatten successful response
            headers_and_response.update({'data': result_auth})
            
            return Result.success(headers=headers_and_response)

        except Exception as ex:
            raise AppException(ex)


class ApplicationProfile(APIView):
    authentication_classes = [TokenAuth]

    @extend_schema(
        methods=['GET'],
        description=_("Get application related information"),
        summary=_("Get application related information"),
        operation_id=_('Get application related information'),  # type: ignore
        request=None,
        responses=None,
        tags=[_('Chat')]  # type: ignore
    )
    def get(self, request: Request):
        profile_data = {}
        app_id_from_token = getattr(request.user, 'app_id', None)  # Assuming user has an attribute named app_id

        if app_id_from_token:
            profile_serializer = ApplicationProfileSerializer()
            profile_data = profile_serializer.profile(app_id=(app_id_from_token,))
        
        if not profile_data:
            raise AppException(status_code=401, msg='身份异常')
        
        return Result.success(profile_data)


# Consider removing ChatView class since its functionality seems less defined or complete
# class ChatView(APIView):
#     pass

Key Changes:

  • Removed ChatAuthenticationAPI and ChatAuthenticationAPIView due to lack of completeness.
  • Simplified serialization logic by calling .auth() directly on a created instance of AuthenticationSerializer.
  • Added more context-sensitive exception messages and handled exceptions centrally within post method.
  • Extracted user identification from token using request.user assuming a convention where users have additional attributes like app_id.

This should make your code cleaner, easier to maintain, and robust against errors while maintaining similar functionality.

return ChatUserToken(token_dict.get('application_id'), token_dict.get('user_id'),
token_dict.get('access_token'), token_dict.get('type'), token_dict.get('client_type'),
token_dict.get('client_id'),
ChatAuthentication.new_instance(token_dict.get('authentication')))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

General Observations:

  • Code Structure: The code is well organized with clear separation of concerns into different classes (ChatAuthentication, ChatUserToken).
  • Comments: Use descriptive comments in the class definitions and methods to enhance readability.
  • Imports: Ensures all necessary imports are included at the top.

Potential Issues:

  1. Use of Type Aliases:

    • Methods like new_instance use type hints but do not actually enforce them, especially for non-standard types (e.g., empty str()). Consider using typing.Generic if you need type consistency across instances.
  2. String Representation:

    • The encryption function uses UTF-8 encoding implicitly by default when calling json.dumps, which might be unnecessary unless explicitly required.
    • Ensure that encrypt and decrypt handle byte data correctly for encryption and decryption purposes.
  3. Static Methods:

    • There are static methods in both classes (static_method_new_instance). Static methods should rarely depend on instance state.
    • If these methods were intended to work independently, consider making them regular class methods.
  4. Error Handling:

    • The code does not include error handling to manage cases where JSON decoding or other operations fail. This could lead to unexpected behavior if input data is malformed.
  5. Client Types:

    • Client types are strings, but they may represent specific values. Consider whether there's a way to validate and normalize these IDs or types before storage.

Optimization Suggestions:

  1. Performance:

    • For repeated calls to to_token from chat_user_token.to_dict() -> chat_user_token.to_string() -> chat_authentication.to_string() -> signing.dumps(...), consider caching the intermediate results to reduce overheads.
  2. Security:

    • Ensure that sensitive information related to tokens, keys, and cryptographic operations is stored securely.
  3. Validation and Input Checks:

    • Add validation checks within methods to ensure that inputs meet expected criteria, such as valid client types identifiers.

Here’s an improved version with some minor adjustments:

# coding=utf-8
"""
    @project: MaxKB
    @Author:虎虎
    @file: common.py
    @date:2025/6/6 19:55
    @desc:
"""

import json
import hashlib
import hmac
from typing import Any, Optional

from django.core import signing

from common.utils.rsa_util import encrypt, decrypt


class ChatAuthentication:
    AUTH_TYPE_NONE = "none"
    AUTH_TYPE_OAUTH = "oauth"

    def __init__(self, auth_type=None, is_auth=False, auth_passed=True):
        self.auth_type = auth_type or self.AUTH_TYPE_NONE
        self.is_auth = is_auth
        self.auth_passed = auth_passed
        # Optionally add more fields based on needed properties

    def to_dict(self) -> dict[str, Any]:
        return {
            'auth_type': self.auth_type,
            'is_auth': self.is_auth,
            'auth_passed': self.auth_passed,
        }

    def to_string(self) -> str:
        encrypted_bytes = encrypt(json.dumps(self.to_dict()))
        encoded_hex = encrypted_bytes.encode().hex()
        return encoded_hex

    @classmethod
    async def new_instance(cls, authentication: str) -> 'ChatAuthentication':
        auth_data = json.loads(decrypt(authentication))
        try:
            auth_class = cls.ARG_TO_AUTH_CLASS[auth_data['auth_type']]
            return await auth_class(**auth_data)
        except KeyError:
            raise ValueError(f"Unknown authentication type: {auth_data['auth_type']}")

    ARG_TO_AUTH_CLASS = {
        cls.AUTH_TYPE_OAUTH: OAuthAuthentication,
        cls.AUTH_TYPE_NONE: None,
    }


class OAuthAuthentication(ChatAuthentication):
    """Example implementation for OAuth authenticated sessions."""

    def __init__(
            self,
            auth_type=cls.AUTH_TYPE_OAUTH,
            is_auth=True,
            auth_passed=True,
            session_id: bytes = b''
    ):
        super().__init__(auth_type=auth_type, is_auth=is_auth, auth_passed=auth_passed)
        self.session_id = session_id


class ChatUserToken:
    TOKEN_TYPE_API = "api"
    TOKEN_TYPE_WEBAPP = "webapp"

    def __init__(
            self,
            application_id: int,
            user_id: int,
            access_token: str,
            typ: str = '',
            client_type: str = 'mobile',
            client_identification: Optional[Any] = None,
            authentication: Optional[ChatAuthentication] = None
    ):
        # Validate and normalize ids and types?
        self.application_id = application_id
        self.user_id = user_id
        self.access_token = access_token
        self.type = typ or self.TOKEN_TYPE_API
        self.client_type = client_type.lower()  # Normalize case
        self.client_identification = client_identification
        self.authentication = authentication or ChatAuthentication()

    def to_dict(self) -> dict[str, Any]:
        return {
            'application_id': self.application_id,
            'user_id': self.user_id,
            'access_token': self.access_token,
            'typ': None if self.type == '' else self.type,
            'client_type': self.client_type,
            'client_identification': self.client_identification,
            'authentication': self.authentication.to_string(),
        }

    def to_token(self) -> str:
        token_dict = self.to_dict()
        hashed_signature = sign_token(token_dict)
        signed_data = f"{hashed_signature}:{json.dumps(token_dict)}"
        return encrypt(signed_data)

    @classmethod
    async def new_instance(cls, token_str: str) -> 'ChatUserToken':
        try:
            signature, json_data = token_str.split(":", maxsplit=1)
            original_data = decrypt(json.loads(signature))['data']
            
            # Verify HMAC for security
            verify_hmac(original_data, signature)
            
            return cls(**original_data)
        except Exception:
            raise ValueError("Invalid token format")


def sign_token(data: dict) -> str:
    secret_key = os.getenv("SECRET_KEY")
    digest_mod = hashlib.sha256
    return hmac.new(secret_key.encode(), json.dumps(data), digestmod=digest Mod).hexdigest()


def verify_hmac(data: dict, signature: str) -> bool:
    decoded_sig = base64.b64decode(signature.encode()).decode()
    expected_digest = sign_token(data)
    print(decoded_sig, expected_digest)
    return hmac.compare_digest(decoded_sig, expected_digest)

Feel free to adjust the method parameters and logic according to your specific requirements and environment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant