Skip to content

Commit

Permalink
Update flask_authgen_jwt.py
Browse files Browse the repository at this point in the history
Added all http status codes using the http library
  • Loading branch information
dmtzs committed Dec 19, 2023
1 parent ac6df26 commit 13e1cfd
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions src/flask_authgen_jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def verify_user_roles(self, roles: list, user: str) -> None:
"""
if roles is not None:
if self.get_user_roles_callback is None:
self.gen_abort_error("get_user_roles decorator and function is not defined is not defined", 500)
self.gen_abort_error("get_user_roles decorator and function is not defined is not defined", HTTPStatus.INTERNAL_SERVER_ERROR.value)
else:
user_roles = self.ensure_sync(self.get_user_roles_callback)(user)
# if not set(roles).issubset(set(user_roles)):
Expand All @@ -116,7 +116,7 @@ def verify_user_roles(self, roles: list, user: str) -> None:
role_flag = True
break
if not role_flag:
self.gen_abort_error("User does not have the required roles", 403)
self.gen_abort_error("User does not have the required roles", HTTPStatus.FORBIDDEN.value)

def get_user_roles(self, func: Callable[[str], list[str]]) -> Callable[[str], list[str]]:
"""
Expand Down Expand Up @@ -207,7 +207,7 @@ def __create_jwt_payload(self, bauth_credentials: dict[str, str]) -> dict[str, U
- dict: dictionary with the JWT payload
"""
if not self.jwt_fields_attr:
self.gen_abort_error("jwt_claims decorator and function is not defined", 500)
self.gen_abort_error("jwt_claims decorator and function is not defined", HTTPStatus.INTERNAL_SERVER_ERROR.value)
if self.json_body_token:
if not request.is_json:
self.gen_abort_error("Missing JSON in request or not JSON format sent to endpoint", HTTPStatus.BAD_REQUEST.value)
Expand Down Expand Up @@ -252,7 +252,7 @@ def __dec_set_basic_auth(self) -> Optional[bool]:
return self.ensure_sync(self.basic_auth_callback)(
username, password), bauth_credentials
else:
self.gen_abort_error("basic_auth decorator and function is not defined", 500)
self.gen_abort_error("basic_auth decorator and function is not defined", HTTPStatus.INTERNAL_SERVER_ERROR.value)

def __encode_jwt(self, payload: dict) -> Optional[str]:
"""
Expand Down Expand Up @@ -284,7 +284,7 @@ def __encode_jwt(self, payload: dict) -> Optional[str]:
else:
message = "The algorithm RS256 is not supported, " \
"please verify the loading of your key or something relationated with the key"
self.gen_abort_error(message, 500)
self.gen_abort_error(message, HTTPStatus.INTERNAL_SERVER_ERROR.value)
except Exception as ex:
print(f"The following ERROR occurred in {__file__}: {ex}")
encoded_token = None
Expand Down Expand Up @@ -392,15 +392,15 @@ def wrapper(*args, **kwargs):
- Callable: the function to wrap that returns the encoded token
"""
if self.enc_dec_jwt_callback is None:
self.gen_abort_error("get_decode_jwt_attributes decorator and function to verify password and username is not set", 500)
self.gen_abort_error("get_decode_jwt_attributes decorator and function to verify password and username is not set", HTTPStatus.INTERNAL_SERVER_ERROR.value)
else:
grant_credentials_access = self.__dec_set_basic_auth()
if grant_credentials_access[0]:
self.verify_user_roles(roles, grant_credentials_access[1]["username"])
jwt_payload = self.__create_jwt_payload(grant_credentials_access[1])
token = self.__encode_jwt(jwt_payload)
else:
self.gen_abort_error("The credentials are not correct", 401)
self.gen_abort_error("The credentials are not correct", HTTPStatus.UNAUTHORIZED.value)

return self.ensure_sync(func)(token, *args, **kwargs)
return wrapper
Expand Down Expand Up @@ -462,7 +462,7 @@ def __decode_jwt(self) -> Optional[dict]:
else:
message = "The algorithm RS256 is not supported, " \
"please verify the loading of your key or something relationated with the key"
self.gen_abort_error(message, 500)
self.gen_abort_error(message, HTTPStatus.INTERNAL_SERVER_ERROR.value)
except Exception as ex:
print(f"The following ERROR occurred in {__file__}: {ex}")
decoded_token = None
Expand All @@ -481,27 +481,27 @@ def __verify_token(self, token: dict) -> None:
- None
"""
if token is None:
self.gen_abort_error("Invalid token", 401)
self.gen_abort_error("Invalid token", HTTPStatus.UNAUTHORIZED.value)
else:
if self.get_jwt_claims_to_verify_callback is not None:
claims = self.get_jwt_claims_to_verify_callback
for claim in claims:
if claim not in token:
self.gen_abort_error(f"The claim {claim} is not in the token", HTTPStatus.BAD_REQUEST.value)
if len(token) < 1:
self.gen_abort_error("Invalid token", 401)
self.gen_abort_error("Invalid token", HTTPStatus.UNAUTHORIZED.value)
if self.personal_credentials is not None:
per_username = self.personal_credentials[0]
per_password = self.personal_credentials[1]
if (per_username not in token) or (per_password not in token):
self.gen_abort_error("Invalid token", 401)
self.gen_abort_error("Invalid token", HTTPStatus.UNAUTHORIZED.value)
else:
if ("username" not in token) or ("password" not in token):
self.gen_abort_error("Invalid token", 401)
self.gen_abort_error("Invalid token", HTTPStatus.UNAUTHORIZED.value)
keys_to_validate = self.get_jwt_claims_to_verify_callback
for key in keys_to_validate:
if key not in token:
self.gen_abort_error("Credentials to validate for authentication inside token are not correct", 401)
self.gen_abort_error("Credentials to validate for authentication inside token are not correct", HTTPStatus.UNAUTHORIZED.value)

def __authenticate_credentials(self, token: dict[str, str]) -> bool:
"""
Expand All @@ -514,7 +514,7 @@ def __authenticate_credentials(self, token: dict[str, str]) -> bool:
- bool: True if the credentials are correct, False if not
"""
if self.credentials_success_callback is None:
self.gen_abort_error("get_credentials_success decorator is not set", 500)
self.gen_abort_error("get_credentials_success decorator is not set", HTTPStatus.INTERNAL_SERVER_ERROR.value)
if self.personal_credentials is None:
username_jwt = token["username"]
password_jwt = token["password"]
Expand Down Expand Up @@ -630,14 +630,14 @@ def wrapper(*args, **kwargs) -> Callable[[str], str]:
- Callable: the function to wrap that returns the encoded token
"""
if self.enc_dec_jwt_callback is None:
self.gen_abort_error("get_decode_jwt_attributes decorator and function to verify password and username is not set", 500)
self.gen_abort_error("get_decode_jwt_attributes decorator and function to verify password and username is not set", HTTPStatus.INTERNAL_SERVER_ERROR.value)
else:
token = self.__decode_jwt()
self.__verify_token(token)

grant_access = self.__authenticate_credentials(token)
if not grant_access:
self.gen_abort_error("The credentials are not correct", 401)
self.gen_abort_error("The credentials are not correct", HTTPStatus.UNAUTHORIZED.value)
if self.personal_credentials is not None:
self.verify_user_roles(roles, token[self.personal_credentials[0]])
else:
Expand Down

0 comments on commit 13e1cfd

Please sign in to comment.