Skip to content

Commit

Permalink
Merge pull request #41 from dmtzs/development
Browse files Browse the repository at this point in the history
Development to master
  • Loading branch information
dmtzs committed Nov 17, 2022
2 parents 04c4c85 + 3c78fc1 commit 714dc2a
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 6 deletions.
11 changes: 11 additions & 0 deletions api_example/app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
try:
from flask import Flask
import flask_authgen_jwt
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

app = Flask(__name__)
gen_auth = flask_authgen_jwt.GenJwt()
auth = flask_authgen_jwt.DecJwt()

from app import routes
82 changes: 82 additions & 0 deletions api_example/app/routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
try:
import datetime as dt
from app import app, auth, gen_auth
from flask import Response, make_response, jsonify
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

@gen_auth.enc_dec_jwt_config
@auth.enc_dec_jwt_config
def test_creds() -> dict:
decode_attributes = {
"key": "secret",
"algorithm": "HS256",
}
return decode_attributes

@gen_auth.personal_credentials_field
@auth.personal_credentials_field
def personal_credentials_field() -> tuple[str, str]:
return "per_username", "per_password"

@gen_auth.verify_bauth_credentials
def get_basic_auth_credentials2(username: str, password: str) -> dict:
# Use the username and password to authenticate the user in the way you want-
# and return true if the user is authenticated
if username == "admin2" and password == "passwd2":
return True
else:
return False

@auth.get_user_roles
@gen_auth.get_user_roles
def my_roles(username: str) -> list[str]:
# Use username to get roles from database
print(f"username in roles: {username}")
return ["admin", "user"]

@auth.get_jwt_claims_to_verify
def get_jwt_claims_to_verify() -> list[str]:
# return ["exp", "iat", "nbf"]
return ["exp", "iat"]

@gen_auth.jwt_claims
def jwt_claims() -> dict:
claims = {
"exp": dt.datetime.now(tz=dt.timezone.utc) + dt.timedelta(seconds=30),
"iat": dt.datetime.now(tz=dt.timezone.utc)
}
return claims

@auth.verify_jwt_credentials#TODO: Checar si debe cambiar o no
def creds(username_jwt: str, password_jwt: str) -> bool:
my_dict = {
"username_jwt": username_jwt,
"password_jwt": password_jwt
}
return True
# return False

# -------------Endpoints-------------
@app.route("/")
@auth.login_required(roles=["admin", "eder"])
def index():
return Response("Todo bien"), 200

@app.route("/generate_token", methods=["POST"])
@gen_auth.generate_jwt(roles=["eder", "user"])
def gen_token(token):
response = {
"status": "success",
"token": token
}
return make_response(jsonify(response)), 200

@app.route("/temp")
def temp():
test = (("val1", "hola"), ("val2", "prueba2"))
response = {
"message": "solo prueba",
"test_data": test
}
return make_response(jsonify(response))
19 changes: 19 additions & 0 deletions api_example/run_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
try:
from app import app
from gevent.pywsgi import WSGIServer
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

if __name__== "__main__":
try:
# -----------------Dev mode-----------------
app.run(host= "127.0.0.1", port= 5000, debug= True)
# debug= True for apply changes made into the files without restarting the flask server

# -----------------Prod mode----------------
#appServer= WSGIServer(("127.0.0.1", 5000), app)
#appServer.serve_forever()
except Exception as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")
finally:
print("Finishing program")
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = Flask-authgen-jwt
version = 1.2.4
version = 2.0.0
author = Diego Martinez and Guillermo Ortega
author_email = [email protected]
description = JWT authentication and generator for Flask routes
Expand Down
46 changes: 41 additions & 5 deletions src/flask_authgen_jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Core():
basic_auth_callback: Callable[[str, str], bool] = None
enc_dec_jwt_callback: dict = None
get_user_roles_callback: list = None
personal_credentials: tuple[str, str] = None

def enc_dec_jwt_config(self, func: Callable[[None], dict]) -> Callable[[None], dict]:
"""Decorator to verify the JWT token
Expand All @@ -29,6 +30,23 @@ def enc_dec_jwt_config(self, func: Callable[[None], dict]) -> Callable[[None], d
self.enc_dec_jwt_callback = func()
return func

def personal_credentials_field(self, func: Callable[[None], tuple[str, str]]) -> Callable[[None], tuple[str, str]]:
"""
Decorator to set the personal credentials, if youu dont want to use username and password inside the token
then with this you can return a tuple in which the first element is the username and the second is the password
but as you want to name that respective fields so the library will validate using the fields you set
:param func: function to be decorated
:return: the tuple with the username and password with personal names
:Example:
@dec_jwt.personal_credentials_field
def get_personal_credentials():
return "my_username_personal_name_field", "my_password_personal_name_field"
"""
self.personal_credentials = func()
return func

def verify_dict_config(self, config: str) -> None:
"""Method that veryfies the JWT configuration generator and for basic auth
:param config: str to identify which configuration to verify"""
Expand Down Expand Up @@ -89,6 +107,11 @@ def __create_jwt_payload(self, bauth_credentials: dict) -> dict:
"""
if not self.jwt_fields_attr:
self.gen_abort_error("jwt_claims decorator and function is not defined", 500)
if self.personal_credentials is not None:
bauth_credentials[self.personal_credentials[0]] = bauth_credentials["username"]
bauth_credentials[self.personal_credentials[1]] = bauth_credentials["password"]
del bauth_credentials["username"]
del bauth_credentials["password"]
payload = bauth_credentials
payload.update(self.jwt_fields_attr)

Expand Down Expand Up @@ -226,8 +249,14 @@ def __verify_token(self, token: dict) -> None:
self.gen_abort_error(f"The claim {claim} is not in the token", 400)
if len(token) < 1:
self.gen_abort_error("Invalid token", 401)
if ("username" not in token) or ("password" not in token):
self.gen_abort_error("Invalid token", 401)
if self.personal_credentials is not None:
per_username = self.personal_credentials[0]
per_password = self.personal_credentials[1]
if (per_username not in token) or (per_password not in token):
self.gen_abort_error("Invalid token", 401)
else:
if ("username" not in token) or ("password" not in token):
self.gen_abort_error("Invalid token", 401)
keys_to_validate = self.get_jwt_claims_to_verify_callback
for key in keys_to_validate:
if key not in token:
Expand All @@ -240,8 +269,12 @@ def __authenticate_credentials(self, token: dict) -> bool:
"""
if self.credentials_success_callback is None:
self.gen_abort_error("get_credentials_success decorator is not set", 500)
username_jwt = token["username"]
password_jwt = token["password"]
if self.personal_credentials is None:
username_jwt = token["username"]
password_jwt = token["password"]
else:
username_jwt = token[self.personal_credentials[0]]
password_jwt = token[self.personal_credentials[1]]
return self.ensure_sync(self.credentials_success_callback)(username_jwt, password_jwt)

def __set_token_as_attr(self, token: dict) -> None:
Expand Down Expand Up @@ -282,11 +315,14 @@ def wrapper(*args, **kwargs):
else:
token = self.__decode_jwt()
self.__verify_token(token)
self.verify_user_roles(roles, token["username"])

grant_access = self.__authenticate_credentials(token)
if not grant_access:
self.gen_abort_error("The credentials are not correct", 401)
if self.personal_credentials is not None:
self.verify_user_roles(roles, token[self.personal_credentials[0]])
else:
self.verify_user_roles(roles, token["username"])
self.__set_token_as_attr(token)

return self.ensure_sync(func)(*args, **kwargs)
Expand Down

0 comments on commit 714dc2a

Please sign in to comment.