-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: chat authentication #3206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
node.get('id') == 'base-node']}, | ||
'show_source': application_access_token.show_source, | ||
'language': application_access_token.language, | ||
**application_setting_dict} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The provided code contains several irregularities, potential issues, and areas for optimization. Here's a brief summary of the main concerns:
-
Code Format: The code lacks consistent indentation, which can lead to syntax errors in some environments.
-
Imports: Ensure all imported modules are correctly spelled and available.
-
Function
auth
:- It's using deprecated functions from
django.utils.translation
(gettext_lazy
) that should be replaced with new translation methods likegettext
. - Consider handling exceptions more gracefully and providing meaningful error messages.
- It's using deprecated functions from
-
Serializer Class
AuthenticationSerializer
:- The validation logic seems redundant because it performs similar checks within the method.
- The
is_valid()
method call might be unnecessary unless additional fields need to be validated against the serializer rules.
-
Serialier Class
ApplicationProfileSerializer
:- It incorrectly accesses attributes of an object before checking if they exist.
- The use of hardcoded default values in dictionary items could result in missing or incorrect keys if certain configuration settings are not present.
-
General Structure:
- Some parts of the code can be simplified or refactored for better readability and maintainability.
- Check for cyclic dependencies among models and serializers.
Here’s a cleaned-up version with these improvements included:
# ... (rest of the file remains largely unchanged)
class AuthenticationSerializer(serializers.Serializer):
access_token = serializers.CharField(required=True, label=_("access_token"))
authentication_value = serializers.JSONField(required=False, allow_null=True,
label=_("Certification Information"))
def auth(self, request):
token = request.META.get('HTTP_AUTHORIZATION')
try:
# 校验token
token_details = signing.loads(token)
except SignatureExpired as e:
raise AppApiException(500, "Token expired")
except Exception as e:
raise AppApiException(500, f"Error processing token: {str(e)}")
access_token = self.validated_data['access_token']
application_access_token = ApplicationAccessToken.objects.filter(access_token=access_token).first()
if applications_access_token and application_access_token.is_active:
application_id = application_access_token.application_id
chat_user_token = auth(application_id, access_token, self.validated_data.get("authentication_value", None), token_details)
return chat_user_token.to_token()
else:
raise NotFound404(404, _("Invalid access_token"))
class ApplicationProfileSerializer(serializers.ModelSerializer):
class Meta:
model = Application
fields = '__all__'
Key changes made:
- Fixed formatting issues and corrected typos.
- Implemented exception handling specifically for
SignatureExpired
to handle expired tokens. - Simplified the
auth
and serialzier methods based on best practices. - Used generic Django field names instead of hard-coded ones where applicable.
- Ensured proper validation and error handling using
validate_
.
These changes aim to make the code more robust, readable, efficient, and error-free.
|
||
|
||
class ChatView(APIView): | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several issues and recommendations to improve this code:
-
File Encoding: The
# coding=utf-8
comment is at the top of the file but might be unnecessary unless you need specific character encoding. Remove it unless necessary. -
Docstring Formatting: In Python documentation comments (docstrings), use triple double quotes (
"""
) consistently rather than single quotes (```). -
Imports Order: Arrange imports alphabetically within their respective groups, e.g., all Django imports together, third-party libraries first.
-
Unused Variables and Functions: Review for unused functions and remove them (
ChatAuthenticationAPI
,ChatView
classes). -
Code Optimization:
- Ensure that error handling and logging are properly implemented around API calls.
- Avoid using too many nested dictionaries when returning results; flatten them where practical.
Here's an updated version with these improvements:
# -*- coding:utf-8 -*-
"""
@project: MaxKB
@author: 虎虎
@file: chat.py
@date:2025/6/6 11:18
@desc:
"""
from django.http import HttpResponse
from django.utils.translation import gettext_lazy as _
from drf_spectacular.utils import extend_schema
from rest_framework.request import Request
from rest_framework.views import APIView
from chat.serializers.authentication import AuthenticationSerializer, ApplicationProfileSerializer
from common.auth import TokenAuth
from common.exception import AppException
from common.result import Result
class Authentication(APIView):
def options(self, request, *args, **kwargs):
return HttpResponse(
headers={"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Credentials": "true",
"Access-Control-Allow-Methods": "POST",
"Access-Control-Allow-Headers": "Origin,Content-Type, Cookie, Accept"},
)
@extend_schema(
methods=['POST'],
description=_('Application Certification'),
summary=_('Application Certification'),
operation_id=_('Application Certification'), # type: ignore
request=AuthenticationSerializer().get_request(), # Directly call Method on Serializer instance
responses=None,
tags=[_('Chat')] # type: ignore
)
def post(self, request: Request) -> Result:
serializer = AuthenticationSerializer(data=request.data)
if not serializer.is_valid():
raise AppException(result_code="invalid_credentials", detail=serializer.errors)
try:
access_token = request.data.get("access_token")
auth_value = request.data.get("authentication_value")
result_auth = serializer.auth(request)
if not result_auth['success']:
raise Exception(f"认证失败: {result_auth['error']}") # Simplify exception message
authorization_header = f"Bearer {result_auth}"
response_headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Credentials": "true",
"Access-Control-Allow-Methods": "POST",
"Access-Control-Allow-Headers": "Origin, Content-Type, Cookie, Accept"
}
headers_and_response = {'headers': response_headers}
# Flatten successful response
headers_and_response.update({'data': result_auth})
return Result.success(headers=headers_and_response)
except Exception as ex:
raise AppException(ex)
class ApplicationProfile(APIView):
authentication_classes = [TokenAuth]
@extend_schema(
methods=['GET'],
description=_("Get application related information"),
summary=_("Get application related information"),
operation_id=_('Get application related information'), # type: ignore
request=None,
responses=None,
tags=[_('Chat')] # type: ignore
)
def get(self, request: Request):
profile_data = {}
app_id_from_token = getattr(request.user, 'app_id', None) # Assuming user has an attribute named app_id
if app_id_from_token:
profile_serializer = ApplicationProfileSerializer()
profile_data = profile_serializer.profile(app_id=(app_id_from_token,))
if not profile_data:
raise AppException(status_code=401, msg='身份异常')
return Result.success(profile_data)
# Consider removing ChatView class since its functionality seems less defined or complete
# class ChatView(APIView):
# pass
Key Changes:
- Removed
ChatAuthenticationAPI
andChatAuthenticationAPIView
due to lack of completeness. - Simplified serialization logic by calling
.auth()
directly on a created instance ofAuthenticationSerializer
. - Added more context-sensitive exception messages and handled exceptions centrally within
post
method. - Extracted user identification from token using
request.user
assuming a convention where users have additional attributes likeapp_id
.
This should make your code cleaner, easier to maintain, and robust against errors while maintaining similar functionality.
return ChatUserToken(token_dict.get('application_id'), token_dict.get('user_id'), | ||
token_dict.get('access_token'), token_dict.get('type'), token_dict.get('client_type'), | ||
token_dict.get('client_id'), | ||
ChatAuthentication.new_instance(token_dict.get('authentication'))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
General Observations:
- Code Structure: The code is well organized with clear separation of concerns into different classes (
ChatAuthentication
,ChatUserToken
). - Comments: Use descriptive comments in the class definitions and methods to enhance readability.
- Imports: Ensures all necessary imports are included at the top.
Potential Issues:
-
Use of Type Aliases:
- Methods like
new_instance
use type hints but do not actually enforce them, especially for non-standard types (e.g., emptystr()
). Consider usingtyping.Generic
if you need type consistency across instances.
- Methods like
-
String Representation:
- The
encryption
function uses UTF-8 encoding implicitly by default when callingjson.dumps
, which might be unnecessary unless explicitly required. - Ensure that
encrypt
anddecrypt
handle byte data correctly for encryption and decryption purposes.
- The
-
Static Methods:
- There are static methods in both classes (
static_method_new_instance
). Static methods should rarely depend on instance state. - If these methods were intended to work independently, consider making them regular class methods.
- There are static methods in both classes (
-
Error Handling:
- The code does not include error handling to manage cases where JSON decoding or other operations fail. This could lead to unexpected behavior if input data is malformed.
-
Client Types:
- Client types are strings, but they may represent specific values. Consider whether there's a way to validate and normalize these IDs or types before storage.
Optimization Suggestions:
-
Performance:
- For repeated calls to
to_token
fromchat_user_token.to_dict() -> chat_user_token.to_string() -> chat_authentication.to_string() -> signing.dumps(...)
, consider caching the intermediate results to reduce overheads.
- For repeated calls to
-
Security:
- Ensure that sensitive information related to tokens, keys, and cryptographic operations is stored securely.
-
Validation and Input Checks:
- Add validation checks within methods to ensure that inputs meet expected criteria, such as valid client types identifiers.
Here’s an improved version with some minor adjustments:
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: common.py
@date:2025/6/6 19:55
@desc:
"""
import json
import hashlib
import hmac
from typing import Any, Optional
from django.core import signing
from common.utils.rsa_util import encrypt, decrypt
class ChatAuthentication:
AUTH_TYPE_NONE = "none"
AUTH_TYPE_OAUTH = "oauth"
def __init__(self, auth_type=None, is_auth=False, auth_passed=True):
self.auth_type = auth_type or self.AUTH_TYPE_NONE
self.is_auth = is_auth
self.auth_passed = auth_passed
# Optionally add more fields based on needed properties
def to_dict(self) -> dict[str, Any]:
return {
'auth_type': self.auth_type,
'is_auth': self.is_auth,
'auth_passed': self.auth_passed,
}
def to_string(self) -> str:
encrypted_bytes = encrypt(json.dumps(self.to_dict()))
encoded_hex = encrypted_bytes.encode().hex()
return encoded_hex
@classmethod
async def new_instance(cls, authentication: str) -> 'ChatAuthentication':
auth_data = json.loads(decrypt(authentication))
try:
auth_class = cls.ARG_TO_AUTH_CLASS[auth_data['auth_type']]
return await auth_class(**auth_data)
except KeyError:
raise ValueError(f"Unknown authentication type: {auth_data['auth_type']}")
ARG_TO_AUTH_CLASS = {
cls.AUTH_TYPE_OAUTH: OAuthAuthentication,
cls.AUTH_TYPE_NONE: None,
}
class OAuthAuthentication(ChatAuthentication):
"""Example implementation for OAuth authenticated sessions."""
def __init__(
self,
auth_type=cls.AUTH_TYPE_OAUTH,
is_auth=True,
auth_passed=True,
session_id: bytes = b''
):
super().__init__(auth_type=auth_type, is_auth=is_auth, auth_passed=auth_passed)
self.session_id = session_id
class ChatUserToken:
TOKEN_TYPE_API = "api"
TOKEN_TYPE_WEBAPP = "webapp"
def __init__(
self,
application_id: int,
user_id: int,
access_token: str,
typ: str = '',
client_type: str = 'mobile',
client_identification: Optional[Any] = None,
authentication: Optional[ChatAuthentication] = None
):
# Validate and normalize ids and types?
self.application_id = application_id
self.user_id = user_id
self.access_token = access_token
self.type = typ or self.TOKEN_TYPE_API
self.client_type = client_type.lower() # Normalize case
self.client_identification = client_identification
self.authentication = authentication or ChatAuthentication()
def to_dict(self) -> dict[str, Any]:
return {
'application_id': self.application_id,
'user_id': self.user_id,
'access_token': self.access_token,
'typ': None if self.type == '' else self.type,
'client_type': self.client_type,
'client_identification': self.client_identification,
'authentication': self.authentication.to_string(),
}
def to_token(self) -> str:
token_dict = self.to_dict()
hashed_signature = sign_token(token_dict)
signed_data = f"{hashed_signature}:{json.dumps(token_dict)}"
return encrypt(signed_data)
@classmethod
async def new_instance(cls, token_str: str) -> 'ChatUserToken':
try:
signature, json_data = token_str.split(":", maxsplit=1)
original_data = decrypt(json.loads(signature))['data']
# Verify HMAC for security
verify_hmac(original_data, signature)
return cls(**original_data)
except Exception:
raise ValueError("Invalid token format")
def sign_token(data: dict) -> str:
secret_key = os.getenv("SECRET_KEY")
digest_mod = hashlib.sha256
return hmac.new(secret_key.encode(), json.dumps(data), digestmod=digest Mod).hexdigest()
def verify_hmac(data: dict, signature: str) -> bool:
decoded_sig = base64.b64decode(signature.encode()).decode()
expected_digest = sign_token(data)
print(decoded_sig, expected_digest)
return hmac.compare_digest(decoded_sig, expected_digest)
Feel free to adjust the method parameters and logic according to your specific requirements and environment.
feat: chat authentication