Skip to content

Commit

Permalink
Update flask_authgen_jwt.py and setup.cfg
Browse files Browse the repository at this point in the history
Correction of all bugs in the library but because of that was changed significant parts of the original functionality of the same library and in setup was changed the version and a error in the issues URL
  • Loading branch information
dmtzs committed Nov 8, 2022
1 parent f2420fc commit 7528ac5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 52 deletions.
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = Flask-authgen-jwt
version = 1.1.3
version = 1.2.4
author = Diego Martinez and Guillermo Ortega
author_email = [email protected]
description = JWT authentication and generator for Flask routes
Expand All @@ -9,7 +9,7 @@ long_description_content_type = text/markdown
requires-python = ">=3.9"
url = https://github.com/dmtzs/Flask-authgen-jwt
project_urls =
Bug Tracker = https://github.com/miguelgrinberg/flask-httpauth/issues
Bug Tracker = https://github.com/dmtzs/Flask-authgen-jwt/issues
classifiers =
Environment :: Web Environment
Intended Audience :: Developers
Expand Down
90 changes: 40 additions & 50 deletions src/flask_authgen_jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@

try:
import jwt
from typing import Callable
from functools import wraps
from base64 import b64decode
from typing import Callable, Optional
from flask import request, current_app, abort, make_response, jsonify
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

class Core():
basic_auth_callback: dict = None
basic_auth_callback: Callable[[str, str], bool] = None
enc_dec_jwt_callback: dict = None
get_user_roles_callback: list = None

def enc_dec_jwt_config(self, func) -> Callable:
def enc_dec_jwt_config(self, func: Callable[[None], dict]) -> None:
"""Decorator to verify the JWT token
:param f: function to be decorated
:return: the function to wrap should return a dictionary with the following keys:
Expand Down Expand Up @@ -55,7 +55,7 @@ def verify_user_roles(self, roles: list, user: str) -> None:
if not role_flag:
self.gen_abort_error("User does not have the required roles", 403)

def get_user_roles(self, func) -> Callable:
def get_user_roles(self, func: Callable[[str], list[str]]) -> Callable[[str], list[str]]:
"""Decorator to get the user roles by the user that was received from the JWT or basic auth.
To the function you will decorate with this decorator you will have available the user variable
:param f: function to be decorated
Expand All @@ -79,38 +79,18 @@ def ensure_sync(self, func) -> Callable:
return func

class GenJwt(Core):
def __init__(self, default_jwt_claims: bool = True, registered_claims_only: bool = True) -> None:
def __init__(self) -> None:
self.jwt_fields_attr: dict = None
self.default_jwt_claims: bool = default_jwt_claims
self.registered_claims_only: bool = registered_claims_only

def __validate_registered_claims(self) -> None:
"""
Method to validate the registered claims if registered_claims_only is True.
Cause this means that the user can only use the registered(standard) claims.
"""
registered_claims = ["iss", "sub", "aud", "exp", "nbf", "iat", "jti"]
for claim in self.jwt_fields_attr:
if claim not in registered_claims:
self.gen_abort_error(f"The claim {claim} is not a registered claim", 400)

def __create_jwt_payload(self) -> dict:
def __create_jwt_payload(self, bauth_credentials: dict) -> dict:
"""
Method to create the JWT payload but still not encoded
:return: JWT payload as a dictionary
"""
if not self.jwt_fields_attr:
self.gen_abort_error("jwt_claims decorator and function is not defined", 500)
if self.registered_claims_only:
self.__validate_registered_claims()
payload = {}
payload.update(self.jwt_fields_attr)
else:
if self.default_jwt_claims and not self.registered_claims_only:
payload = self.basic_auth_callback
payload.update(self.jwt_fields_attr)
else:
payload = self.jwt_fields_attr
payload = bauth_credentials
payload.update(self.jwt_fields_attr)

return payload

Expand All @@ -129,13 +109,17 @@ def __dec_set_basic_auth(self) -> None:
self.gen_abort_error("Authorization header must be Basic with user and password only", 400)
username = credentials[0]
password = credentials[1]
bauth_credentials = {
"username": username,
"password": password
}
if self.basic_auth_callback:
return self.ensure_sync(self.basic_auth_callback)(
username, password), username
username, password), bauth_credentials
else:
self.gen_abort_error("basic_auth decorator and function is not defined", 500)

def __encode_jwt(self, payload) -> tuple[str, None]:
def __encode_jwt(self, payload) -> Optional[str]:
"""
Method to encode the JWT token using the key and algorithm specified in the enc_dec_jwt_config decorator
that returns the dictionary with the configuration.
Expand All @@ -151,7 +135,7 @@ def __encode_jwt(self, payload) -> tuple[str, None]:
encoded_token = None
return encoded_token

def jwt_claims(self, func) -> Callable:
def jwt_claims(self, func: Callable[[None], dict]) -> None:
"""Decorator to add the claims to the JWT payload, default fields are:
- username: username of the user
- password: password of the user
Expand All @@ -166,7 +150,7 @@ def jwt_claims(self, func) -> Callable:
:return: the function to wrap should return a dictionary with the extra fields"""
self.jwt_fields_attr = func()

def get_basic_auth_credentials(self, func) -> Callable:
def verify_bauth_credentials(self, func: Callable[[str, str], bool]) -> Callable[[str, str], bool]:
"""Decorator to get the basic auth credentials
:param f: function to be decorated, should return a boolean:
:return: the function to wrap that returns a boolean, True if the credentials are correct, False if not
Expand All @@ -184,9 +168,9 @@ def wrapper(*args, **kwargs):
self.gen_abort_error("get_decode_jwt_attributes decorator and function to verify password and username is not set", 500)
else:
grant_credentials_access = self.__dec_set_basic_auth()
if grant_credentials_access:
self.verify_user_roles(roles, grant_credentials_access[1])
jwt_payload = self.__create_jwt_payload()
if grant_credentials_access[0]:
self.verify_user_roles(roles, grant_credentials_access[1]["username"])
jwt_payload = self.__create_jwt_payload(grant_credentials_access[1])
token = self.__encode_jwt(jwt_payload)
else:
self.gen_abort_error("The credentials are not correct", 401)
Expand All @@ -201,10 +185,10 @@ class DecJwt(Core):
token: dict = None
def __init__(self, token_as_attr: bool = False) -> None:
self.token_as_attr: bool = token_as_attr
self.credentials_success_callback: dict = None
self.get_jwt_claims_to_verify_callback: list = None
self.credentials_success_callback: bool = None
self.get_jwt_claims_to_verify_callback: list[str] = None

def __decode_jwt(self) -> tuple[str, None]:
def __decode_jwt(self) -> Optional[str]:
"""
Decode the JWT token using the key and algorithm specified in the enc_dec_jwt_config decorator
that returns the dictionary with the configuration.
Expand Down Expand Up @@ -239,6 +223,12 @@ def __verify_token(self, token) -> None:
self.gen_abort_error(f"The claim {claim} is not in the token", 400)
if len(token) < 1:
self.gen_abort_error("Invalid token", 401)
if ("username" not in token) or ("password" not in token):
self.gen_abort_error("Invalid token", 401)
keys_to_validate = self.get_jwt_claims_to_verify_callback
for key in keys_to_validate:
if key not in token:
self.gen_abort_error("Credentials to validate for authentication inside token are not correct", 401)

def __authenticate_credentials(self, token) -> None:
"""
Expand All @@ -247,34 +237,31 @@ def __authenticate_credentials(self, token) -> None:
"""
if self.credentials_success_callback is None:
self.gen_abort_error("get_credentials_success decorator is not set", 500)
else:
keys_to_validate = self.get_jwt_claims_to_verify_callback.keys()
for key in keys_to_validate:
if self.credentials_success_callback[key] != token[key]:
self.gen_abort_error("Credentials to validate for authentication inside token are not correct", 401)
username_jwt = token["username"]
password_jwt = token["password"]
return self.ensure_sync(self.credentials_success_callback)(username_jwt, password_jwt)

def __set_token_as_attr(self, token) -> None:
def __set_token_as_attr(self, token: dict) -> None:
"""
Method to set the token as an attribute of the class
:param token: token to set as attribute
"""
if self.token_as_attr:
self.token = token

def get_jwt_claims_to_verify(self, func) -> Callable:
def get_jwt_claims_to_verify(self, func: Callable[[None], list[str]]) -> None:
"""Decorator to get the claims to verify in the token
:param func: function to be decorated, should return a list of the claims to verify
:return: the function to wrap that returns the a boolean field"""
self.get_jwt_claims_to_verify_callback = func()
return func

def verify_jwt_credentials(self, func) -> Callable:
def verify_jwt_credentials(self, func) -> Callable[[str, str], dict]:
"""Decorator to get the credentials from database or whatever part
to verify the token fields later
:param func: function to be decorated
:return: the function to wrap that returns the dictionary with the credentials.
the dictionary keys of this decorator should be the same as the claims of the token that you want to validate"""
self.credentials_success_callback = func()
self.credentials_success_callback = func
return func

def login_required(self, func=None, roles=None):
Expand All @@ -288,8 +275,11 @@ def wrapper(*args, **kwargs):
else:
token = self.__decode_jwt()
self.__verify_token(token)
self.verify_user_roles(roles)# TODO: Cambiar como el de arriba, mandar user y lo mismo para el metodo que decodifica el jwt
self.__authenticate_credentials(token)
self.verify_user_roles(roles, token["username"])

grant_access = self.__authenticate_credentials(token)
if not grant_access:
self.gen_abort_error("The credentials are not correct", 401)
self.__set_token_as_attr(token)

return self.ensure_sync(func)(*args, **kwargs)
Expand Down

0 comments on commit 7528ac5

Please sign in to comment.