Skip to content

Commit

Permalink
Merge pull request #47 from dmtzs/development
Browse files Browse the repository at this point in the history
Development to master
  • Loading branch information
dmtzs authored Nov 27, 2022
2 parents 8a128c0 + 7e3cb1f commit 5a78644
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Personal files to ignore
readFileAsBinary/
my_tests/
private-key.pem
public-key.pem

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
12 changes: 12 additions & 0 deletions api_example/rsa_with_passwd/app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
try:
from flask import Flask
from . import flask_authgen_jwt
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

app = Flask(__name__)
app.config["JSON_SORT_KEYS"] = False
gen_auth = flask_authgen_jwt.GenJwt(rsa_encrypt=True)
auth = flask_authgen_jwt.DecJwt()

from app import routes
101 changes: 101 additions & 0 deletions api_example/rsa_with_passwd/app/routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
try:
import datetime as dt
from typing import Union
from app import app, auth, gen_auth
from flask import Response, make_response, jsonify
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

@gen_auth.enc_dec_jwt_config
def encode_private_key() -> dict[str, Union[str, bytes]]:
file_private = "./private-key.pem"
private_key = open(file_private).read().encode("ascii")

encode_attributes = {
"key": private_key,# In this case key should be your private key
"algorithm": "RS256",
"passphrase": b"your password"
}
return encode_attributes

@auth.enc_dec_jwt_config
def decode_pub_key() -> dict[str, Union[str, bytes]]:
file_public = "./public-key.pem"
public_key = open(file_public).read().encode("ascii")

decode_attributes = {
"key": public_key,# In this case key should be your public key
"algorithm": "RS256"
}
return decode_attributes

@gen_auth.personal_credentials_field
@auth.personal_credentials_field
def personal_credentials_field() -> tuple[str, str]:
return "per_username", "per_password"

@gen_auth.verify_bauth_credentials
def get_basic_auth_credentials2(username: str, password: str) -> dict:
# Use the username and password to authenticate the user in the way you want-
# and return true if the user is authenticated
if username == "admin2" and password == "passwd2":
return True
else:
return False

@auth.get_user_roles
@gen_auth.get_user_roles
def my_roles(username: str) -> list[str]:
# Use username to get roles from database
print(f"username in roles: {username}")
return ["admin", "user"]

@auth.get_jwt_claims_to_verify
def get_jwt_claims_to_verify() -> list[str]:
# return ["exp", "iat", "nbf"]
return ["exp", "iat"]

@gen_auth.jwt_claims
def jwt_claims() -> dict:
claims = {
"exp": dt.datetime.now(tz=dt.timezone.utc) + dt.timedelta(seconds=30),
"iat": dt.datetime.now(tz=dt.timezone.utc)
}
return claims

@auth.verify_jwt_credentials
def creds(username_jwt: str, password_jwt: str) -> bool:
my_dict = {
"username_jwt": username_jwt,
"password_jwt": password_jwt
}
return True
# return False

# -------------Endpoints-------------
@app.route("/")
@auth.login_required(roles=["admin", "eder"])
def index() -> Response:
return make_response("Todo bien", 200)

@app.route("/generate_token", methods=["POST"])
@gen_auth.generate_jwt(roles=["eder", "user"])
def gen_token(token) -> Response:
response = {
"status": "success",
"token": token
}
if token is None:
response["status"] = "error"
response["message_error"] = "Token was not generated correctly, please verify"
return make_response(jsonify(response), 500)
return make_response(jsonify(response)), 200

@app.route("/temp")
def temp() -> Response:
test = (("val1", "hola"), ("val2", "prueba2"))
response = {
"message": "solo prueba",
"test_data": test
}
return make_response(jsonify(response))
File renamed without changes.
11 changes: 11 additions & 0 deletions api_example/rsa_without_passwd/app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
try:
from flask import Flask
from . import flask_authgen_jwt
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

app = Flask(__name__)
gen_auth = flask_authgen_jwt.GenJwt()
auth = flask_authgen_jwt.DecJwt()

from app import routes
96 changes: 96 additions & 0 deletions api_example/rsa_without_passwd/app/routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
try:
import datetime as dt
from typing import Union
from app import app, auth, gen_auth
from flask import Response, make_response, jsonify
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

@gen_auth.enc_dec_jwt_config
def encode_private_key() -> dict[str, Union[str, bytes]]:
file_private = "./private-key.pem"
private_key = open(file_private).read().encode("ascii")

encode_attributes = {
"key": private_key,# In this case key should be your private key
"algorithm": "RS256"
}
return encode_attributes

@auth.enc_dec_jwt_config
def decode_pub_key() -> dict[str, Union[str, bytes]]:
file_public = "./public-key.pem"
public_key = open(file_public).read().encode("ascii")

decode_attributes = {
"key": public_key,# In this case key should be your public key
"algorithm": "RS256"
}
return decode_attributes

@gen_auth.personal_credentials_field
@auth.personal_credentials_field
def personal_credentials_field() -> tuple[str, str]:
return "per_username", "per_password"

@gen_auth.verify_bauth_credentials
def get_basic_auth_credentials2(username: str, password: str) -> dict:
# Use the username and password to authenticate the user in the way you want-
# and return true if the user is authenticated
if username == "admin2" and password == "passwd2":
return True
else:
return False

@auth.get_user_roles
@gen_auth.get_user_roles
def my_roles(username: str) -> list[str]:
# Use username to get roles from database
print(f"username in roles: {username}")
return ["admin", "user"]

@auth.get_jwt_claims_to_verify
def get_jwt_claims_to_verify() -> list[str]:
# return ["exp", "iat", "nbf"]
return ["exp", "iat"]

@gen_auth.jwt_claims
def jwt_claims() -> dict:
claims = {
"exp": dt.datetime.now(tz=dt.timezone.utc) + dt.timedelta(seconds=30),
"iat": dt.datetime.now(tz=dt.timezone.utc)
}
return claims

@auth.verify_jwt_credentials
def creds(username_jwt: str, password_jwt: str) -> bool:
my_dict = {
"username_jwt": username_jwt,
"password_jwt": password_jwt
}
return True
# return False

# -------------Endpoints-------------
@app.route("/")
@auth.login_required(roles=["admin", "eder"])
def index() -> Response:
return make_response("Todo bien", 200)

@app.route("/generate_token", methods=["POST"])
@gen_auth.generate_jwt(roles=["eder", "user"])
def gen_token(token) -> Response:
response = {
"status": "success",
"token": token
}
return make_response(jsonify(response)), 200

@app.route("/temp")
def temp() -> Response:
test = (("val1", "hola"), ("val2", "prueba2"))
response = {
"message": "solo prueba",
"test_data": test
}
return make_response(jsonify(response))
19 changes: 19 additions & 0 deletions api_example/rsa_without_passwd/run_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
try:
from app import app
from gevent.pywsgi import WSGIServer
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

if __name__== "__main__":
try:
# -----------------Dev mode-----------------
app.run(host= "127.0.0.1", port= 5000, debug= True)
# debug= True for apply changes made into the files without restarting the flask server

# -----------------Prod mode----------------
#appServer= WSGIServer(("127.0.0.1", 5000), app)
#appServer.serve_forever()
except Exception as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")
finally:
print("Finishing program")
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

@gen_auth.enc_dec_jwt_config
@auth.enc_dec_jwt_config
def test_creds() -> dict:
decode_attributes = {
def enc_dec_creds() -> dict[str, str]:
enc_dec_attributes = {
"key": "secret",
"algorithm": "HS256",
}
return decode_attributes
return enc_dec_attributes

@gen_auth.personal_credentials_field
@auth.personal_credentials_field
Expand Down
19 changes: 19 additions & 0 deletions api_example/without_rsa/run_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
try:
from app import app
from gevent.pywsgi import WSGIServer
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

if __name__== "__main__":
try:
# -----------------Dev mode-----------------
app.run(host= "127.0.0.1", port= 5000, debug= True)
# debug= True for apply changes made into the files without restarting the flask server

# -----------------Prod mode----------------
#appServer= WSGIServer(("127.0.0.1", 5000), app)
#appServer.serve_forever()
except Exception as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")
finally:
print("Finishing program")
44 changes: 36 additions & 8 deletions src/flask_authgen_jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@
import jwt
from functools import wraps
from base64 import b64decode
from typing import Callable, Optional
from datetime import datetime
from typing import Callable, Optional, Union
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from flask import request, current_app, abort, make_response, jsonify
except ImportError as eImp:
print(f"The following import ERROR occurred in {__file__}: {eImp}")

class Core():
basic_auth_callback: Callable[[str, str], bool] = None
enc_dec_jwt_callback: dict = None
enc_dec_jwt_callback: dict[Union[bytes, str]] = None
get_user_roles_callback: list = None
personal_credentials: tuple[str, str] = None

def enc_dec_jwt_config(self, func: Callable[[None], dict]) -> Callable[[None], dict]:
def enc_dec_jwt_config(self, func: Callable[[None], dict[str, Union[bytes, str]]]) -> Callable[[None], dict[str, Union[bytes, str]]]:
"""Decorator to verify the JWT token
:param f: function to be decorated
:return: the function to wrap should return a dictionary with the following keys:
Expand All @@ -48,13 +51,16 @@ def get_personal_credentials():
return func

def verify_dict_config(self, config: str) -> None:
"""Method that veryfies the JWT configuration generator and for basic auth
"""Method that veryfies the JWT configuration generator
:param config: str to identify which configuration to verify"""
if config == "jwt":
claims = ["key", "algorithm"]
for claim in claims:
if claim not in self.enc_dec_jwt_callback:
self.gen_abort_error(f"The claim {claim} is not in the dictionary", 400)
elif config == "rsa_pass":
if "passphrase" not in self.enc_dec_jwt_callback:
self.gen_abort_error("The claim passphrase is not in the dictionary", 400)

def verify_user_roles(self, roles: list, user: str) -> None:
"""Method to verify the user roles if are correct
Expand Down Expand Up @@ -97,10 +103,11 @@ def ensure_sync(self, func: Callable) -> Callable:
return func

class GenJwt(Core):
def __init__(self) -> None:
def __init__(self, rsa_encrypt: bool = False) -> None:
self.jwt_fields_attr: dict = None
self.rsa_encrypt: bool = rsa_encrypt

def __create_jwt_payload(self, bauth_credentials: dict) -> dict:
def __create_jwt_payload(self, bauth_credentials: dict[str, str]) -> dict[str, Union[str, datetime]]:
"""
Method to create the JWT payload but still not encoded
:return: JWT payload as a dictionary
Expand Down Expand Up @@ -152,7 +159,21 @@ def __encode_jwt(self, payload: dict) -> Optional[str]:
key = self.enc_dec_jwt_callback["key"]
algorithm = self.enc_dec_jwt_callback["algorithm"]
try:
encoded_token = jwt.encode(payload, key, algorithm=algorithm)
if algorithm == "HS256":
encoded_token = jwt.encode(payload, key, algorithm=algorithm)
elif algorithm == "RS256":
if self.rsa_encrypt:
self.verify_dict_config("rsa_pass")
passphrase = self.enc_dec_jwt_callback["passphrase"]
private_key = serialization.load_pem_private_key(
key, password=passphrase, backend=default_backend())
encoded_token = jwt.encode(payload, private_key, algorithm=algorithm)
elif not self.rsa_encrypt:
encoded_token = jwt.encode(payload, key, algorithm=algorithm)
else:
message = "The algorithm RS256 is not supported, " \
"please verify the loading of your key or something relationated with the key"
self.gen_abort_error(message, 500)
except Exception as ex:
print(f"The following ERROR occurred in {__file__}: {ex}")
encoded_token = None
Expand Down Expand Up @@ -228,7 +249,14 @@ def __decode_jwt(self) -> Optional[dict]:
key = self.enc_dec_jwt_callback["key"]
algorithm = self.enc_dec_jwt_callback["algorithm"]
try:
decoded_token = jwt.decode(token, key, algorithms=[algorithm])
if algorithm == "HS256":
decoded_token = jwt.decode(token, key, algorithms=[algorithm])
elif algorithm == "RS256":
decoded_token = jwt.decode(token, key, algorithms=[algorithm])
else:
message = "The algorithm RS256 is not supported, " \
"please verify the loading of your key or something relationated with the key"
self.gen_abort_error(message, 500)
except Exception as ex:
print(f"The following ERROR occurred in {__file__}: {ex}")
decoded_token = None
Expand Down

0 comments on commit 5a78644

Please sign in to comment.