diff --git a/setup.cfg b/setup.cfg index 26837f1..a3b9122 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = Flask-authgen-jwt -version = 1.1.3 +version = 1.2.4 author = Diego Martinez and Guillermo Ortega author_email = gd-code@outlook.com description = JWT authentication and generator for Flask routes @@ -9,7 +9,7 @@ long_description_content_type = text/markdown requires-python = ">=3.9" url = https://github.com/dmtzs/Flask-authgen-jwt project_urls = - Bug Tracker = https://github.com/miguelgrinberg/flask-httpauth/issues + Bug Tracker = https://github.com/dmtzs/Flask-authgen-jwt/issues classifiers = Environment :: Web Environment Intended Audience :: Developers diff --git a/src/flask_authgen_jwt.py b/src/flask_authgen_jwt.py index 6edcd1d..354fe65 100644 --- a/src/flask_authgen_jwt.py +++ b/src/flask_authgen_jwt.py @@ -8,19 +8,19 @@ try: import jwt - from typing import Callable from functools import wraps from base64 import b64decode + from typing import Callable, Optional from flask import request, current_app, abort, make_response, jsonify except ImportError as eImp: print(f"The following import ERROR occurred in {__file__}: {eImp}") class Core(): - basic_auth_callback: dict = None + basic_auth_callback: Callable[[str, str], bool] = None enc_dec_jwt_callback: dict = None get_user_roles_callback: list = None - def enc_dec_jwt_config(self, func) -> Callable: + def enc_dec_jwt_config(self, func: Callable[[None], dict]) -> None: """Decorator to verify the JWT token :param f: function to be decorated :return: the function to wrap should return a dictionary with the following keys: @@ -37,20 +37,15 @@ def verify_dict_config(self, config: str) -> None: for claim in claims: if claim not in self.enc_dec_jwt_callback: self.gen_abort_error(f"The claim {claim} is not in the dictionary", 400) - elif config == "basic_auth": - claims = ["username", "password"] - for claim in claims: - if claim not in self.basic_auth_callback: - self.gen_abort_error(f"The claim {claim} is not in the dictionary", 400) - def verify_user_roles(self, roles: list) -> None: + def verify_user_roles(self, roles: list, user: str) -> None: """Method to verify the user roles if are correct :param roles: list of roles to verify against the user roles callback""" if roles is not None: if self.get_user_roles_callback is None: self.gen_abort_error("get_user_roles decorator and function is not defined is not defined", 500) else: - user_roles = self.get_user_roles_callback + user_roles = self.ensure_sync(self.get_user_roles_callback)(user) # if not set(roles).issubset(set(user_roles)): role_flag = False for role in user_roles: @@ -60,11 +55,12 @@ def verify_user_roles(self, roles: list) -> None: if not role_flag: self.gen_abort_error("User does not have the required roles", 403) - def get_user_roles(self, func) -> Callable: - """Decorator to get the user roles + def get_user_roles(self, func: Callable[[str], list[str]]) -> Callable[[str], list[str]]: + """Decorator to get the user roles by the user that was received from the JWT or basic auth. + To the function you will decorate with this decorator you will have available the user variable :param f: function to be decorated :return: user roles as a list""" - self.get_user_roles_callback = func() + self.get_user_roles_callback = func return func def gen_abort_error(self, error: str, status_code: int) -> None: @@ -83,42 +79,22 @@ def ensure_sync(self, func) -> Callable: return func class GenJwt(Core): - def __init__(self, default_jwt_claims: bool = True, registered_claims_only: bool = True) -> None: + def __init__(self) -> None: self.jwt_fields_attr: dict = None - self.default_jwt_claims: bool = default_jwt_claims - self.registered_claims_only: bool = registered_claims_only - - def __validate_registered_claims(self) -> None: - """ - Method to validate the registered claims if registered_claims_only is True. - Cause this means that the user can only use the registered(standard) claims. - """ - registered_claims = ["iss", "sub", "aud", "exp", "nbf", "iat", "jti"] - for claim in self.jwt_fields_attr: - if claim not in registered_claims: - self.gen_abort_error(f"The claim {claim} is not a registered claim", 400) - def __create_jwt_payload(self) -> dict: + def __create_jwt_payload(self, bauth_credentials: dict) -> dict: """ Method to create the JWT payload but still not encoded :return: JWT payload as a dictionary """ if not self.jwt_fields_attr: self.gen_abort_error("jwt_claims decorator and function is not defined", 500) - if self.registered_claims_only: - self.__validate_registered_claims() - payload = {} - payload.update(self.jwt_fields_attr) - else: - if self.default_jwt_claims and not self.registered_claims_only: - payload = self.basic_auth_callback - payload.update(self.jwt_fields_attr) - else: - payload = self.jwt_fields_attr + payload = bauth_credentials + payload.update(self.jwt_fields_attr) return payload - def __verify_basic_auth(self) -> None: + def __dec_set_basic_auth(self) -> None: """ Method to decode and verify the basic auth credentials in the expected format """ @@ -131,13 +107,19 @@ def __verify_basic_auth(self) -> None: credentials = credentials.split(":") if len(credentials) != 2: self.gen_abort_error("Authorization header must be Basic with user and password only", 400) - self.verify_dict_config("basic_auth") - username = self.basic_auth_callback["username"] - password = self.basic_auth_callback["password"] - if credentials[0] != username or credentials[1] != password: - self.gen_abort_error("User or password is not correct", 401) + username = credentials[0] + password = credentials[1] + bauth_credentials = { + "username": username, + "password": password + } + if self.basic_auth_callback: + return self.ensure_sync(self.basic_auth_callback)( + username, password), bauth_credentials + else: + self.gen_abort_error("basic_auth decorator and function is not defined", 500) - def __encode_jwt(self, payload) -> tuple[str, None]: + def __encode_jwt(self, payload) -> Optional[str]: """ Method to encode the JWT token using the key and algorithm specified in the enc_dec_jwt_config decorator that returns the dictionary with the configuration. @@ -153,7 +135,7 @@ def __encode_jwt(self, payload) -> tuple[str, None]: encoded_token = None return encoded_token - def jwt_claims(self, func) -> Callable: + def jwt_claims(self, func: Callable[[None], dict]) -> None: """Decorator to add the claims to the JWT payload, default fields are: - username: username of the user - password: password of the user @@ -168,13 +150,12 @@ def jwt_claims(self, func) -> Callable: :return: the function to wrap should return a dictionary with the extra fields""" self.jwt_fields_attr = func() - def get_basic_auth_credentials(self, func) -> Callable: + def verify_bauth_credentials(self, func: Callable[[str, str], bool]) -> Callable[[str, str], bool]: """Decorator to get the basic auth credentials - :param f: function to be decorated, should return a dictionary with the following keys: - - username: username of the user - - password: password of the user - :return: the function to wrap that returns the dictionary specified above""" - self.basic_auth_callback = func() + :param f: function to be decorated, should return a boolean: + :return: the function to wrap that returns a boolean, True if the credentials are correct, False if not + User should implement the function to validate the credentials and return the correct boolean""" + self.basic_auth_callback = func return func def generate_jwt(self, func=None, roles=None): @@ -186,10 +167,13 @@ def wrapper(*args, **kwargs): if self.enc_dec_jwt_callback is None: self.gen_abort_error("get_decode_jwt_attributes decorator and function to verify password and username is not set", 500) else: - self.__verify_basic_auth() - jwt_payload = self.__create_jwt_payload() - token = self.__encode_jwt(jwt_payload) - self.verify_user_roles(roles) + grant_credentials_access = self.__dec_set_basic_auth() + if grant_credentials_access[0]: + self.verify_user_roles(roles, grant_credentials_access[1]["username"]) + jwt_payload = self.__create_jwt_payload(grant_credentials_access[1]) + token = self.__encode_jwt(jwt_payload) + else: + self.gen_abort_error("The credentials are not correct", 401) return self.ensure_sync(func)(token, *args, **kwargs) return wrapper @@ -201,10 +185,10 @@ class DecJwt(Core): token: dict = None def __init__(self, token_as_attr: bool = False) -> None: self.token_as_attr: bool = token_as_attr - self.credentials_success_callback: dict = None - self.get_jwt_claims_to_verify_callback: list = None + self.credentials_success_callback: bool = None + self.get_jwt_claims_to_verify_callback: list[str] = None - def __decode_jwt(self) -> tuple[str, None]: + def __decode_jwt(self) -> Optional[str]: """ Decode the JWT token using the key and algorithm specified in the enc_dec_jwt_config decorator that returns the dictionary with the configuration. @@ -239,6 +223,12 @@ def __verify_token(self, token) -> None: self.gen_abort_error(f"The claim {claim} is not in the token", 400) if len(token) < 1: self.gen_abort_error("Invalid token", 401) + if ("username" not in token) or ("password" not in token): + self.gen_abort_error("Invalid token", 401) + keys_to_validate = self.get_jwt_claims_to_verify_callback + for key in keys_to_validate: + if key not in token: + self.gen_abort_error("Credentials to validate for authentication inside token are not correct", 401) def __authenticate_credentials(self, token) -> None: """ @@ -247,13 +237,11 @@ def __authenticate_credentials(self, token) -> None: """ if self.credentials_success_callback is None: self.gen_abort_error("get_credentials_success decorator is not set", 500) - else: - keys_to_validate = self.get_jwt_claims_to_verify_callback.keys() - for key in keys_to_validate: - if self.credentials_success_callback[key] != token[key]: - self.gen_abort_error("Credentials to validate for authentication inside token are not correct", 401) + username_jwt = token["username"] + password_jwt = token["password"] + return self.ensure_sync(self.credentials_success_callback)(username_jwt, password_jwt) - def __set_token_as_attr(self, token) -> None: + def __set_token_as_attr(self, token: dict) -> None: """ Method to set the token as an attribute of the class :param token: token to set as attribute @@ -261,20 +249,19 @@ def __set_token_as_attr(self, token) -> None: if self.token_as_attr: self.token = token - def get_jwt_claims_to_verify(self, func) -> Callable: + def get_jwt_claims_to_verify(self, func: Callable[[None], list[str]]) -> None: """Decorator to get the claims to verify in the token :param func: function to be decorated, should return a list of the claims to verify :return: the function to wrap that returns the a boolean field""" self.get_jwt_claims_to_verify_callback = func() - return func - def verify_jwt_credentials(self, func) -> Callable: + def verify_jwt_credentials(self, func) -> Callable[[str, str], dict]: """Decorator to get the credentials from database or whatever part to verify the token fields later :param func: function to be decorated :return: the function to wrap that returns the dictionary with the credentials. the dictionary keys of this decorator should be the same as the claims of the token that you want to validate""" - self.credentials_success_callback = func() + self.credentials_success_callback = func return func def login_required(self, func=None, roles=None): @@ -288,8 +275,11 @@ def wrapper(*args, **kwargs): else: token = self.__decode_jwt() self.__verify_token(token) - self.verify_user_roles(roles) - self.__authenticate_credentials(token) + self.verify_user_roles(roles, token["username"]) + + grant_access = self.__authenticate_credentials(token) + if not grant_access: + self.gen_abort_error("The credentials are not correct", 401) self.__set_token_as_attr(token) return self.ensure_sync(func)(*args, **kwargs)