diff --git a/api_example/app/__init__.py b/api_example/app/__init__.py new file mode 100644 index 0000000..8b6c9f0 --- /dev/null +++ b/api_example/app/__init__.py @@ -0,0 +1,11 @@ +try: + from flask import Flask + import flask_authgen_jwt +except ImportError as eImp: + print(f"The following import ERROR occurred in {__file__}: {eImp}") + +app = Flask(__name__) +gen_auth = flask_authgen_jwt.GenJwt() +auth = flask_authgen_jwt.DecJwt() + +from app import routes \ No newline at end of file diff --git a/api_example/app/routes.py b/api_example/app/routes.py new file mode 100644 index 0000000..4134bff --- /dev/null +++ b/api_example/app/routes.py @@ -0,0 +1,82 @@ +try: + import datetime as dt + from app import app, auth, gen_auth + from flask import Response, make_response, jsonify +except ImportError as eImp: + print(f"The following import ERROR occurred in {__file__}: {eImp}") + +@gen_auth.enc_dec_jwt_config +@auth.enc_dec_jwt_config +def test_creds() -> dict: + decode_attributes = { + "key": "secret", + "algorithm": "HS256", + } + return decode_attributes + +@gen_auth.personal_credentials_field +@auth.personal_credentials_field +def personal_credentials_field() -> tuple[str, str]: + return "per_username", "per_password" + +@gen_auth.verify_bauth_credentials +def get_basic_auth_credentials2(username: str, password: str) -> dict: + # Use the username and password to authenticate the user in the way you want- + # and return true if the user is authenticated + if username == "admin2" and password == "passwd2": + return True + else: + return False + +@auth.get_user_roles +@gen_auth.get_user_roles +def my_roles(username: str) -> list[str]: + # Use username to get roles from database + print(f"username in roles: {username}") + return ["admin", "user"] + +@auth.get_jwt_claims_to_verify +def get_jwt_claims_to_verify() -> list[str]: + # return ["exp", "iat", "nbf"] + return ["exp", "iat"] + +@gen_auth.jwt_claims +def jwt_claims() -> dict: + claims = { + "exp": dt.datetime.now(tz=dt.timezone.utc) + dt.timedelta(seconds=30), + "iat": dt.datetime.now(tz=dt.timezone.utc) + } + return claims + +@auth.verify_jwt_credentials#TODO: Checar si debe cambiar o no +def creds(username_jwt: str, password_jwt: str) -> bool: + my_dict = { + "username_jwt": username_jwt, + "password_jwt": password_jwt + } + return True + # return False + +# -------------Endpoints------------- +@app.route("/") +@auth.login_required(roles=["admin", "eder"]) +def index(): + return Response("Todo bien"), 200 + +@app.route("/generate_token", methods=["POST"]) +@gen_auth.generate_jwt(roles=["eder", "user"]) +def gen_token(token): + response = { + "status": "success", + "token": token + } + return make_response(jsonify(response)), 200 + +@app.route("/temp") +def temp(): + test = (("val1", "hola"), ("val2", "prueba2")) + response = { + "message": "solo prueba", + "test_data": test + } + return make_response(jsonify(response)) \ No newline at end of file diff --git a/api_example/run_app.py b/api_example/run_app.py new file mode 100644 index 0000000..f912a0c --- /dev/null +++ b/api_example/run_app.py @@ -0,0 +1,19 @@ +try: + from app import app + from gevent.pywsgi import WSGIServer +except ImportError as eImp: + print(f"The following import ERROR occurred in {__file__}: {eImp}") + +if __name__== "__main__": + try: + # -----------------Dev mode----------------- + app.run(host= "127.0.0.1", port= 5000, debug= True) + # debug= True for apply changes made into the files without restarting the flask server + + # -----------------Prod mode---------------- + #appServer= WSGIServer(("127.0.0.1", 5000), app) + #appServer.serve_forever() + except Exception as eImp: + print(f"The following import ERROR occurred in {__file__}: {eImp}") + finally: + print("Finishing program") \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index a3b9122..c0a326a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = Flask-authgen-jwt -version = 1.2.4 +version = 2.0.0 author = Diego Martinez and Guillermo Ortega author_email = gd-code@outlook.com description = JWT authentication and generator for Flask routes diff --git a/src/flask_authgen_jwt.py b/src/flask_authgen_jwt.py index 834dac8..dddf91b 100644 --- a/src/flask_authgen_jwt.py +++ b/src/flask_authgen_jwt.py @@ -19,6 +19,7 @@ class Core(): basic_auth_callback: Callable[[str, str], bool] = None enc_dec_jwt_callback: dict = None get_user_roles_callback: list = None + personal_credentials: tuple[str, str] = None def enc_dec_jwt_config(self, func: Callable[[None], dict]) -> Callable[[None], dict]: """Decorator to verify the JWT token @@ -29,6 +30,23 @@ def enc_dec_jwt_config(self, func: Callable[[None], dict]) -> Callable[[None], d self.enc_dec_jwt_callback = func() return func + def personal_credentials_field(self, func: Callable[[None], tuple[str, str]]) -> Callable[[None], tuple[str, str]]: + """ + Decorator to set the personal credentials, if youu dont want to use username and password inside the token + then with this you can return a tuple in which the first element is the username and the second is the password + but as you want to name that respective fields so the library will validate using the fields you set + :param func: function to be decorated + :return: the tuple with the username and password with personal names + + :Example: + @dec_jwt.personal_credentials_field + + def get_personal_credentials(): + return "my_username_personal_name_field", "my_password_personal_name_field" + """ + self.personal_credentials = func() + return func + def verify_dict_config(self, config: str) -> None: """Method that veryfies the JWT configuration generator and for basic auth :param config: str to identify which configuration to verify""" @@ -89,6 +107,11 @@ def __create_jwt_payload(self, bauth_credentials: dict) -> dict: """ if not self.jwt_fields_attr: self.gen_abort_error("jwt_claims decorator and function is not defined", 500) + if self.personal_credentials is not None: + bauth_credentials[self.personal_credentials[0]] = bauth_credentials["username"] + bauth_credentials[self.personal_credentials[1]] = bauth_credentials["password"] + del bauth_credentials["username"] + del bauth_credentials["password"] payload = bauth_credentials payload.update(self.jwt_fields_attr) @@ -226,8 +249,14 @@ def __verify_token(self, token: dict) -> None: self.gen_abort_error(f"The claim {claim} is not in the token", 400) if len(token) < 1: self.gen_abort_error("Invalid token", 401) - if ("username" not in token) or ("password" not in token): - self.gen_abort_error("Invalid token", 401) + if self.personal_credentials is not None: + per_username = self.personal_credentials[0] + per_password = self.personal_credentials[1] + if (per_username not in token) or (per_password not in token): + self.gen_abort_error("Invalid token", 401) + else: + if ("username" not in token) or ("password" not in token): + self.gen_abort_error("Invalid token", 401) keys_to_validate = self.get_jwt_claims_to_verify_callback for key in keys_to_validate: if key not in token: @@ -240,8 +269,12 @@ def __authenticate_credentials(self, token: dict) -> bool: """ if self.credentials_success_callback is None: self.gen_abort_error("get_credentials_success decorator is not set", 500) - username_jwt = token["username"] - password_jwt = token["password"] + if self.personal_credentials is None: + username_jwt = token["username"] + password_jwt = token["password"] + else: + username_jwt = token[self.personal_credentials[0]] + password_jwt = token[self.personal_credentials[1]] return self.ensure_sync(self.credentials_success_callback)(username_jwt, password_jwt) def __set_token_as_attr(self, token: dict) -> None: @@ -282,11 +315,14 @@ def wrapper(*args, **kwargs): else: token = self.__decode_jwt() self.__verify_token(token) - self.verify_user_roles(roles, token["username"]) grant_access = self.__authenticate_credentials(token) if not grant_access: self.gen_abort_error("The credentials are not correct", 401) + if self.personal_credentials is not None: + self.verify_user_roles(roles, token[self.personal_credentials[0]]) + else: + self.verify_user_roles(roles, token["username"]) self.__set_token_as_attr(token) return self.ensure_sync(func)(*args, **kwargs)