Skip to content

Commit

Permalink
Merge pull request #36 from dmtzs/development
Browse files Browse the repository at this point in the history
Development to master
  • Loading branch information
dmtzs committed Nov 8, 2022
2 parents af1171d + 7528ac5 commit 16c3648
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 72 deletions.
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = Flask-authgen-jwt
version = 1.1.3
version = 1.2.4
author = Diego Martinez and Guillermo Ortega
author_email = [email protected]
description = JWT authentication and generator for Flask routes
Expand All @@ -9,7 +9,7 @@ long_description_content_type = text/markdown
requires-python = ">=3.9"
url = https://github.com/dmtzs/Flask-authgen-jwt
project_urls =
Bug Tracker = https://github.com/miguelgrinberg/flask-httpauth/issues
Bug Tracker = https://github.com/dmtzs/Flask-authgen-jwt/issues
classifiers =
Environment :: Web Environment
Intended Audience :: Developers
Expand Down
130 changes: 60 additions & 70 deletions src/flask_authgen_jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@

try:
import jwt
from typing import Callable
from functools import wraps
from base64 import b64decode
from typing import Callable, Optional
from flask import request, current_app, abort, make_response, jsonify
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

class Core():
basic_auth_callback: dict = None
basic_auth_callback: Callable[[str, str], bool] = None
enc_dec_jwt_callback: dict = None
get_user_roles_callback: list = None

def enc_dec_jwt_config(self, func) -> Callable:
def enc_dec_jwt_config(self, func: Callable[[None], dict]) -> None:
"""Decorator to verify the JWT token
:param f: function to be decorated
:return: the function to wrap should return a dictionary with the following keys:
Expand All @@ -37,20 +37,15 @@ def verify_dict_config(self, config: str) -> None:
for claim in claims:
if claim not in self.enc_dec_jwt_callback:
self.gen_abort_error(f"The claim {claim} is not in the dictionary", 400)
elif config == "basic_auth":
claims = ["username", "password"]
for claim in claims:
if claim not in self.basic_auth_callback:
self.gen_abort_error(f"The claim {claim} is not in the dictionary", 400)

def verify_user_roles(self, roles: list) -> None:
def verify_user_roles(self, roles: list, user: str) -> None:
"""Method to verify the user roles if are correct
:param roles: list of roles to verify against the user roles callback"""
if roles is not None:
if self.get_user_roles_callback is None:
self.gen_abort_error("get_user_roles decorator and function is not defined is not defined", 500)
else:
user_roles = self.get_user_roles_callback
user_roles = self.ensure_sync(self.get_user_roles_callback)(user)
# if not set(roles).issubset(set(user_roles)):
role_flag = False
for role in user_roles:
Expand All @@ -60,11 +55,12 @@ def verify_user_roles(self, roles: list) -> None:
if not role_flag:
self.gen_abort_error("User does not have the required roles", 403)

def get_user_roles(self, func) -> Callable:
"""Decorator to get the user roles
def get_user_roles(self, func: Callable[[str], list[str]]) -> Callable[[str], list[str]]:
"""Decorator to get the user roles by the user that was received from the JWT or basic auth.
To the function you will decorate with this decorator you will have available the user variable
:param f: function to be decorated
:return: user roles as a list"""
self.get_user_roles_callback = func()
self.get_user_roles_callback = func
return func

def gen_abort_error(self, error: str, status_code: int) -> None:
Expand All @@ -83,42 +79,22 @@ def ensure_sync(self, func) -> Callable:
return func

class GenJwt(Core):
def __init__(self, default_jwt_claims: bool = True, registered_claims_only: bool = True) -> None:
def __init__(self) -> None:
self.jwt_fields_attr: dict = None
self.default_jwt_claims: bool = default_jwt_claims
self.registered_claims_only: bool = registered_claims_only

def __validate_registered_claims(self) -> None:
"""
Method to validate the registered claims if registered_claims_only is True.
Cause this means that the user can only use the registered(standard) claims.
"""
registered_claims = ["iss", "sub", "aud", "exp", "nbf", "iat", "jti"]
for claim in self.jwt_fields_attr:
if claim not in registered_claims:
self.gen_abort_error(f"The claim {claim} is not a registered claim", 400)

def __create_jwt_payload(self) -> dict:
def __create_jwt_payload(self, bauth_credentials: dict) -> dict:
"""
Method to create the JWT payload but still not encoded
:return: JWT payload as a dictionary
"""
if not self.jwt_fields_attr:
self.gen_abort_error("jwt_claims decorator and function is not defined", 500)
if self.registered_claims_only:
self.__validate_registered_claims()
payload = {}
payload.update(self.jwt_fields_attr)
else:
if self.default_jwt_claims and not self.registered_claims_only:
payload = self.basic_auth_callback
payload.update(self.jwt_fields_attr)
else:
payload = self.jwt_fields_attr
payload = bauth_credentials
payload.update(self.jwt_fields_attr)

return payload

def __verify_basic_auth(self) -> None:
def __dec_set_basic_auth(self) -> None:
"""
Method to decode and verify the basic auth credentials in the expected format
"""
Expand All @@ -131,13 +107,19 @@ def __verify_basic_auth(self) -> None:
credentials = credentials.split(":")
if len(credentials) != 2:
self.gen_abort_error("Authorization header must be Basic with user and password only", 400)
self.verify_dict_config("basic_auth")
username = self.basic_auth_callback["username"]
password = self.basic_auth_callback["password"]
if credentials[0] != username or credentials[1] != password:
self.gen_abort_error("User or password is not correct", 401)
username = credentials[0]
password = credentials[1]
bauth_credentials = {
"username": username,
"password": password
}
if self.basic_auth_callback:
return self.ensure_sync(self.basic_auth_callback)(
username, password), bauth_credentials
else:
self.gen_abort_error("basic_auth decorator and function is not defined", 500)

def __encode_jwt(self, payload) -> tuple[str, None]:
def __encode_jwt(self, payload) -> Optional[str]:
"""
Method to encode the JWT token using the key and algorithm specified in the enc_dec_jwt_config decorator
that returns the dictionary with the configuration.
Expand All @@ -153,7 +135,7 @@ def __encode_jwt(self, payload) -> tuple[str, None]:
encoded_token = None
return encoded_token

def jwt_claims(self, func) -> Callable:
def jwt_claims(self, func: Callable[[None], dict]) -> None:
"""Decorator to add the claims to the JWT payload, default fields are:
- username: username of the user
- password: password of the user
Expand All @@ -168,13 +150,12 @@ def jwt_claims(self, func) -> Callable:
:return: the function to wrap should return a dictionary with the extra fields"""
self.jwt_fields_attr = func()

def get_basic_auth_credentials(self, func) -> Callable:
def verify_bauth_credentials(self, func: Callable[[str, str], bool]) -> Callable[[str, str], bool]:
"""Decorator to get the basic auth credentials
:param f: function to be decorated, should return a dictionary with the following keys:
- username: username of the user
- password: password of the user
:return: the function to wrap that returns the dictionary specified above"""
self.basic_auth_callback = func()
:param f: function to be decorated, should return a boolean:
:return: the function to wrap that returns a boolean, True if the credentials are correct, False if not
User should implement the function to validate the credentials and return the correct boolean"""
self.basic_auth_callback = func
return func

def generate_jwt(self, func=None, roles=None):
Expand All @@ -186,10 +167,13 @@ def wrapper(*args, **kwargs):
if self.enc_dec_jwt_callback is None:
self.gen_abort_error("get_decode_jwt_attributes decorator and function to verify password and username is not set", 500)
else:
self.__verify_basic_auth()
jwt_payload = self.__create_jwt_payload()
token = self.__encode_jwt(jwt_payload)
self.verify_user_roles(roles)
grant_credentials_access = self.__dec_set_basic_auth()
if grant_credentials_access[0]:
self.verify_user_roles(roles, grant_credentials_access[1]["username"])
jwt_payload = self.__create_jwt_payload(grant_credentials_access[1])
token = self.__encode_jwt(jwt_payload)
else:
self.gen_abort_error("The credentials are not correct", 401)

return self.ensure_sync(func)(token, *args, **kwargs)
return wrapper
Expand All @@ -201,10 +185,10 @@ class DecJwt(Core):
token: dict = None
def __init__(self, token_as_attr: bool = False) -> None:
self.token_as_attr: bool = token_as_attr
self.credentials_success_callback: dict = None
self.get_jwt_claims_to_verify_callback: list = None
self.credentials_success_callback: bool = None
self.get_jwt_claims_to_verify_callback: list[str] = None

def __decode_jwt(self) -> tuple[str, None]:
def __decode_jwt(self) -> Optional[str]:
"""
Decode the JWT token using the key and algorithm specified in the enc_dec_jwt_config decorator
that returns the dictionary with the configuration.
Expand Down Expand Up @@ -239,6 +223,12 @@ def __verify_token(self, token) -> None:
self.gen_abort_error(f"The claim {claim} is not in the token", 400)
if len(token) < 1:
self.gen_abort_error("Invalid token", 401)
if ("username" not in token) or ("password" not in token):
self.gen_abort_error("Invalid token", 401)
keys_to_validate = self.get_jwt_claims_to_verify_callback
for key in keys_to_validate:
if key not in token:
self.gen_abort_error("Credentials to validate for authentication inside token are not correct", 401)

def __authenticate_credentials(self, token) -> None:
"""
Expand All @@ -247,34 +237,31 @@ def __authenticate_credentials(self, token) -> None:
"""
if self.credentials_success_callback is None:
self.gen_abort_error("get_credentials_success decorator is not set", 500)
else:
keys_to_validate = self.get_jwt_claims_to_verify_callback.keys()
for key in keys_to_validate:
if self.credentials_success_callback[key] != token[key]:
self.gen_abort_error("Credentials to validate for authentication inside token are not correct", 401)
username_jwt = token["username"]
password_jwt = token["password"]
return self.ensure_sync(self.credentials_success_callback)(username_jwt, password_jwt)

def __set_token_as_attr(self, token) -> None:
def __set_token_as_attr(self, token: dict) -> None:
"""
Method to set the token as an attribute of the class
:param token: token to set as attribute
"""
if self.token_as_attr:
self.token = token

def get_jwt_claims_to_verify(self, func) -> Callable:
def get_jwt_claims_to_verify(self, func: Callable[[None], list[str]]) -> None:
"""Decorator to get the claims to verify in the token
:param func: function to be decorated, should return a list of the claims to verify
:return: the function to wrap that returns the a boolean field"""
self.get_jwt_claims_to_verify_callback = func()
return func

def verify_jwt_credentials(self, func) -> Callable:
def verify_jwt_credentials(self, func) -> Callable[[str, str], dict]:
"""Decorator to get the credentials from database or whatever part
to verify the token fields later
:param func: function to be decorated
:return: the function to wrap that returns the dictionary with the credentials.
the dictionary keys of this decorator should be the same as the claims of the token that you want to validate"""
self.credentials_success_callback = func()
self.credentials_success_callback = func
return func

def login_required(self, func=None, roles=None):
Expand All @@ -288,8 +275,11 @@ def wrapper(*args, **kwargs):
else:
token = self.__decode_jwt()
self.__verify_token(token)
self.verify_user_roles(roles)
self.__authenticate_credentials(token)
self.verify_user_roles(roles, token["username"])

grant_access = self.__authenticate_credentials(token)
if not grant_access:
self.gen_abort_error("The credentials are not correct", 401)
self.__set_token_as_attr(token)

return self.ensure_sync(func)(*args, **kwargs)
Expand Down

0 comments on commit 16c3648

Please sign in to comment.