Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/backend/bisheng/api/services/api_key_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from fastapi import Request
from typing import Optional
import threading

from bisheng.database.models.api_key import ApiKeyDao
from bisheng.database.models.user import UserDao


class ApiKeyAuth:
"""API Key认证类"""

@staticmethod
def get_api_key_from_request(request: Request) -> Optional[str]:
"""从请求中提取API Key"""
# 优先从Header中获取
api_key = request.headers.get("X-API-Key") or request.headers.get("Authorization")

if api_key and api_key.startswith("Bearer "):
api_key = api_key[7:] # 移除 "Bearer " 前缀

# 如果Header中没有,从查询参数中获取
if not api_key:
api_key = request.query_params.get("api_key")

return api_key

@staticmethod
def authenticate_api_key(api_key: str) -> Optional[dict]:
"""验证API Key并返回用户信息"""
if not api_key:
return None

# 验证API Key
api_key_obj = ApiKeyDao.validate_api_key(api_key)
if not api_key_obj:
return None

# 获取用户信息
user = UserDao.get_user(api_key_obj.user_id)
if not user or user.delete == 1: # 用户不存在或被禁用
return None

# 异步更新使用统计
threading.Thread(target=ApiKeyDao.update_usage, args=(api_key_obj.id,)).start()

# 构造用户信息
from bisheng.api.services.user_service import gen_user_role

role, _ = gen_user_role(user)

return {
"user_name": user.user_name,
"user_id": user.user_id,
"role": role
}


def get_current_user_by_api_key(request: Request) -> Optional[dict]:
"""通过API Key获取当前用户"""
api_key = ApiKeyAuth.get_api_key_from_request(request)
if not api_key:
return None

return ApiKeyAuth.authenticate_api_key(api_key)


66 changes: 66 additions & 0 deletions src/backend/bisheng/api/services/audit_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,69 @@ def get_chat_messages(cls, chat_list: List[AppChatList]) -> List[AppChatList]:
chat.messages = [message for message in chat_messages
if message.category != WorkflowEventType.UserInput.value]
return chat_list

@staticmethod
def create_api_key(user: UserPayload, ip: str, api_key):
"""记录创建API Key的审计日志"""
logger.info(f"act=create_api_key user={user.user_name} ip={ip} key_name={api_key.key_name}")

# 获取用户所属的分组
user_groups = UserGroupDao.get_user_group(user.user_id)
group_ids = [one.group_id for one in user_groups]

audit_log = AuditLog(
operator_id=user.user_id,
operator_name=user.user_name,
group_ids=group_ids,
system_id=SystemId.SYSTEM.value,
event_type=EventType.CREATE_API_KEY.value,
object_type=ObjectType.API_KEY_CONF.value,
object_id=str(api_key.id),
object_name=api_key.key_name,
ip_address=ip,
)
AuditLogDao.insert_audit_logs([audit_log])

@staticmethod
def update_api_key(user: UserPayload, ip: str, api_key):
"""记录更新API Key的审计日志"""
logger.info(f"act=update_api_key user={user.user_name} ip={ip} key_name={api_key.key_name}")

# 获取用户所属的分组
user_groups = UserGroupDao.get_user_group(user.user_id)
group_ids = [one.group_id for one in user_groups]

audit_log = AuditLog(
operator_id=user.user_id,
operator_name=user.user_name,
group_ids=group_ids,
system_id=SystemId.SYSTEM.value,
event_type=EventType.UPDATE_API_KEY.value,
object_type=ObjectType.API_KEY_CONF.value,
object_id=str(api_key.id),
object_name=api_key.key_name,
ip_address=ip,
)
AuditLogDao.insert_audit_logs([audit_log])

@staticmethod
def delete_api_key(user: UserPayload, ip: str, api_key):
"""记录删除API Key的审计日志"""
logger.info(f"act=delete_api_key user={user.user_name} ip={ip} key_name={api_key.key_name}")

# 获取用户所属的分组
user_groups = UserGroupDao.get_user_group(user.user_id)
group_ids = [one.group_id for one in user_groups]

audit_log = AuditLog(
operator_id=user.user_id,
operator_name=user.user_name,
group_ids=group_ids,
system_id=SystemId.SYSTEM.value,
event_type=EventType.DELETE_API_KEY.value,
object_type=ObjectType.API_KEY_CONF.value,
object_id=str(api_key.id),
object_name=api_key.key_name,
ip_address=ip,
)
AuditLogDao.insert_audit_logs([audit_log])
25 changes: 16 additions & 9 deletions src/backend/bisheng/api/services/user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from bisheng.api.errcode.user import (UserLoginOfflineError, UserNameAlreadyExistError,
UserNeedGroupAndRoleError)
from bisheng.api.JWT import ACCESS_TOKEN_EXPIRE_TIME
from bisheng.api.services.api_key_auth import ApiKeyAuth
from bisheng.api.utils import md5_hash
from bisheng.api.v1.schemas import CreateUserReq
from bisheng.cache.redis import redis_client
Expand Down Expand Up @@ -285,16 +286,22 @@ def get_assistant_list_by_access(role_id: int, name: str, page_num: int, page_si
}


async def get_login_user(authorize: AuthJWT = Depends()) -> UserPayload:
async def get_login_user(request: Request, authorize: AuthJWT = Depends()) -> UserPayload:
"""
获取当前登录的用户
"""
# 校验是否过期,过期则直接返回http 状态码的 401
authorize.jwt_required()

current_user = json.loads(authorize.get_jwt_subject())
user = UserPayload(**current_user)

# 获取API Key
api_key = ApiKeyAuth.get_api_key_from_request(request)
user = None
if api_key:
user = ApiKeyAuth.authenticate_api_key(api_key)
user = UserPayload(**user) if user else None
if not user or not api_key:
# 校验是否过期,过期则直接返回http 状态码的 401
authorize.jwt_required()

current_user = json.loads(authorize.get_jwt_subject())
user = UserPayload(**current_user)
# 判断是否允许多点登录
if not settings.get_system_login_method().allow_multi_login:
# 获取access_token
Expand All @@ -305,11 +312,11 @@ async def get_login_user(authorize: AuthJWT = Depends()) -> UserPayload:
return user


async def get_admin_user(authorize: AuthJWT = Depends()) -> UserPayload:
async def get_admin_user(request: Request, authorize: AuthJWT = Depends()) -> UserPayload:
"""
获取超级管理账号,非超级管理员用户,抛出异常
"""
login_user = await get_login_user(authorize)
login_user = await get_login_user(request, authorize)
if not login_user.is_admin():
raise UnAuthorizedError.http_exception()
return login_user
2 changes: 1 addition & 1 deletion src/backend/bisheng/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ async def chat(
Authorize._token = t
else:
Authorize.jwt_required(auth_from='websocket', websocket=websocket)
login_user = await get_login_user(Authorize)
login_user = await get_login_user(websocket, Authorize)
user_id = login_user.user_id
if chat_id:
with session_getter() as session:
Expand Down
143 changes: 143 additions & 0 deletions src/backend/bisheng/api/v1/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from bisheng.cache.redis import redis_client
from bisheng.database.base import session_getter
from bisheng.database.models.api_key import ApiKeyDao, ApiKeyCreate, ApiKeyRead, ApiKeyUpdate
from bisheng.database.models.group import GroupDao
from bisheng.database.models.role import Role, RoleCreate, RoleDao, RoleUpdate
from bisheng.database.constants import AdminRole, DefaultRole
Expand Down Expand Up @@ -824,3 +825,145 @@ def md5_hash(string):
md5 = hashlib.md5()
md5.update(string.encode('utf-8'))
return md5.hexdigest()


@router.post('/user/api_key/create', status_code=200)
async def create_api_key(*,
request: Request,
api_key_req: ApiKeyCreate,
login_user: UserPayload = Depends(get_login_user)):
"""
创建API Key
"""
try:
# 检查用户是否已有太多API Key(可选限制)
existing_keys = ApiKeyDao.get_user_api_keys(login_user.user_id)
if len(existing_keys) >= 10: # 限制每个用户最多10个API Key
raise HTTPException(status_code=400, detail='API Key数量已达上限')

# 创建API Key
api_key = ApiKeyDao.create_api_key(login_user.user_id, api_key_req)

# 记录审计日志
AuditLogService.create_api_key(login_user, get_request_ip(request), api_key)

# 返回完整的API Key(只在创建时返回一次)
return resp_200({
'id': api_key.id,
'key_name': api_key.key_name,
'api_key': api_key.api_key, # 完整的key
'expires_at': api_key.expires_at,
'create_time': api_key.create_time
})
except Exception as e:
logger.exception(f'create_api_key error: {e}')
raise HTTPException(status_code=500, detail='创建API Key失败')


@router.get('/user/api_key/list', status_code=200)
async def list_api_keys(login_user: UserPayload = Depends(get_login_user)):
"""
获取用户的API Key列表
"""
try:
api_keys = ApiKeyDao.get_user_api_keys(login_user.user_id)
result = []
for key in api_keys:
key_read = ApiKeyRead(**key.__dict__)
key_read.mask_api_key() # 遮盖API Key
result.append(key_read.__dict__)

return resp_200(result)
except Exception as e:
logger.exception(f'list_api_keys error: {e}')
raise HTTPException(status_code=500, detail='获取API Key列表失败')


@router.patch('/user/api_key/{api_key_id}', status_code=200)
async def update_api_key(*,
request: Request,
api_key_id: int,
api_key_update: ApiKeyUpdate,
login_user: UserPayload = Depends(get_login_user)):
"""
更新API Key
"""
try:
# 检查API Key是否属于当前用户
existing_key = ApiKeyDao.get_api_key_by_id(api_key_id)
if not existing_key or existing_key.user_id != login_user.user_id:
raise HTTPException(status_code=404, detail='API Key不存在')

# 更新API Key
updated_key = ApiKeyDao.update_api_key(api_key_id, api_key_update)
if not updated_key:
raise HTTPException(status_code=404, detail='API Key不存在')

# 记录审计日志
AuditLogService.update_api_key(login_user, get_request_ip(request), updated_key)

# 返回遮盖后的结果
key_read = ApiKeyRead(**updated_key.__dict__)
key_read.mask_api_key()
return resp_200(key_read.__dict__)
except HTTPException:
raise
except Exception as e:
logger.exception(f'update_api_key error: {e}')
raise HTTPException(status_code=500, detail='更新API Key失败')


@router.delete('/user/api_key/{api_key_id}', status_code=200)
async def delete_api_key(*,
request: Request,
api_key_id: int,
login_user: UserPayload = Depends(get_login_user)):
"""
删除API Key
"""
try:
# 检查API Key是否属于当前用户
existing_key = ApiKeyDao.get_api_key_by_id(api_key_id)
if not existing_key or existing_key.user_id != login_user.user_id:
raise HTTPException(status_code=404, detail='API Key不存在')

# 删除API Key
success = ApiKeyDao.delete_api_key(api_key_id)
if not success:
raise HTTPException(status_code=404, detail='API Key不存在')

# 记录审计日志
AuditLogService.delete_api_key(login_user, get_request_ip(request), existing_key)

return resp_200()
except HTTPException:
raise
except Exception as e:
logger.exception(f'delete_api_key error: {e}')
raise HTTPException(status_code=500, detail='删除API Key失败')


@router.get('/user/api_key/{api_key_id}/usage', status_code=200)
async def get_api_key_usage(*,
api_key_id: int,
login_user: UserPayload = Depends(get_login_user)):
"""
获取API Key使用统计
"""
try:
# 检查API Key是否属于当前用户
api_key = ApiKeyDao.get_api_key_by_id(api_key_id)
if not api_key or api_key.user_id != login_user.user_id:
raise HTTPException(status_code=404, detail='API Key不存在')

return resp_200({
'total_uses': api_key.total_uses,
'last_used_at': api_key.last_used_at,
'is_active': api_key.is_active,
'expires_at': api_key.expires_at
})
except HTTPException:
raise
except Exception as e:
logger.exception(f'get_api_key_usage error: {e}')
raise HTTPException(status_code=500, detail='获取API Key使用统计失败')
Loading