Skip to content

Commit

Permalink
Added new Encrypted column type for AES256 encrypted columns
Browse files Browse the repository at this point in the history
  • Loading branch information
maximebf committed Sep 6, 2024
1 parent 2be4e41 commit 1ae39f2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sqlorm-py"
version = "0.2.1"
version = "0.2.2"
description = "A new kind or ORM that do not abstract away your database or SQL queries."
authors = ["Maxime Bouroumeau-Fuseau <[email protected]>"]
readme = "README.md"
Expand All @@ -13,6 +13,7 @@ python = "^3.10"
blinker = "^1.8.2"
psycopg = { extras = ["binary"], version = "^3.1.18", optional = true }
mysql-connector-python = { version = "^8.3.0", optional = true }
pycryptodome = { version = "^3.20.0", optional = true }

[tool.poetry.group.dev.dependencies]
pytest = "^8.0.0"
Expand Down
3 changes: 3 additions & 0 deletions sqlorm/types.py → sqlorm/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def __str__(self):
}
)

from .encrypted import Encrypted

__all__ = (
"SQLType",
"Integer",
Expand All @@ -68,4 +70,5 @@ def __str__(self):
"DateTime",
"JSON",
"Pickle",
"Encrypted"
)
33 changes: 33 additions & 0 deletions sqlorm/types/encrypted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import base64
from Crypto.Cipher import AES
from Crypto import Random
import os
from . import SQLType


class Encrypted(SQLType):
block_size = 16

def __init__(self, key):
super().__init__("text", self.decrypt, self.encrypt)
self.key = key

def encrypt(self, raw):
raw = self.pad(raw).encode()
iv = Random.new().read(AES.block_size)
key = self.key() if callable(self.key) else self.key
cipher = AES.new(key, AES.MODE_CBC, iv)
return base64.b64encode(iv + cipher.encrypt(raw))

def decrypt(self, enc):
enc = base64.b64decode(enc)
iv = enc[:16]
key = self.key() if callable(self.key) else self.key
cipher = AES.new(key, AES.MODE_CBC, iv)
return self.unpad(cipher.decrypt(enc[16:]).decode())

def pad(self, s):
return s + (self.block_size - len(s) % self.block_size) * chr(self.block_size - len(s) % self.block_size)

def unpad(self, s):
return s[:-ord(s[len(s)-1:])]

0 comments on commit 1ae39f2

Please sign in to comment.