1
1
import os
2
2
import io
3
- from typing import Callable , Optional
3
+ from typing import Optional
4
4
import logging
5
5
import secrets
6
6
from cryptography .hazmat .primitives .ciphers import Cipher , algorithms , modes
7
7
import utils
8
- from dataclasses import dataclass , field
9
- import zlib
10
- import struct
11
- from typing import Literal
12
-
13
-
14
- @dataclass
15
- class Fraction :
16
- """Dataclass to represent a fraction"""
17
-
18
- magic : int
19
- index : int
20
- iv : bytes
21
- _crc : int = field (init = False , repr = False )
22
- data : bytes
23
-
24
- def header_to_bytes (
25
- self , endianess : Literal ["big" , "little" ] = "little" , crc = True
26
- ) -> bytes :
27
- """
28
- Convert the header information of the fraction to bytes
29
-
30
- endianess: Endianess to use (big, little)
31
- crc: Include CRC in the returned data (default: True)
32
- """
33
- end = ">" if endianess == "big" else "<"
34
- fmt = f"{ end } II16sI" if crc else f"{ end } II16s"
35
-
36
- args = [fmt , self .magic , self .index , self .iv ]
37
- if crc :
38
- args .append (self ._crc )
39
-
40
- header_data = struct .pack (* args );
41
- logging .info (f"Header data: { header_data .hex ()} " )
42
- return header_data
43
-
44
- def calculate_crc (self ) -> None :
45
- """Calculate the CRC checksum of the fraction"""
46
- crc_data = self .header_to_bytes (crc = False ) + self .data
47
- self ._crc = zlib .crc32 (crc_data )
48
-
49
- @property
50
- def crc (self ) -> int :
51
- if not self ._crc :
52
- self .calculate_crc ()
53
-
54
- return self ._crc
55
-
56
- @property
57
- def data_size (self ) -> int :
58
- return len (self .data )
59
-
60
- def __post_init__ (self ) -> None :
61
- self .calculate_crc ()
62
-
8
+ from fraction import Fraction
63
9
64
10
class Fractionator :
65
11
MAGIC : int = 0xDEADBEEF
@@ -82,16 +28,33 @@ def __init__(
82
28
self ._key : Optional [bytes ] = Fractionator .validate_aes_key (
83
29
key
84
30
) # AES-256 cryptographic key
31
+ self ._mode = modes .CFB
32
+ self ._algorithm = algorithms .AES256
33
+ self ._cipher = None
34
+
35
+ @property
36
+ def cipher (self ) -> Cipher :
37
+ if not self ._cipher :
38
+ if not self ._key or not self ._iv :
39
+ raise ValueError (f"Missing key or IV (_key:{ self ._key } , _iv:{ self ._iv } )" )
40
+
41
+ self ._cipher = Cipher (self ._algorithm (self ._key ), self ._mode (self ._iv ))
42
+
43
+ return self ._cipher
85
44
86
45
def open_reading_stream (self ) -> None :
87
46
"""
88
47
Opens a reading stream to the file specified in self._path.
89
48
If a stream is already open, this function has no effect
90
49
"""
91
- if self ._buf_reader is None or self ._buf_reader .closed :
92
- self ._buf_reader = open (self ._path , "rb" )
93
- logging .debug (f"Opened reading stream to { self ._path } ." )
94
- return
50
+ try :
51
+ if self ._buf_reader is None or self ._buf_reader .closed :
52
+ self ._buf_reader = open (self ._path , "rb" )
53
+ logging .debug (f"Opened reading stream to { self ._path } ." )
54
+ return
55
+ except FileNotFoundError as err :
56
+ logging .error (f"File not found: { self ._path } " )
57
+ raise err
95
58
96
59
def _make_fraction (self , index : int ) -> None :
97
60
"""Read from the object-file and generate a fraction"""
@@ -103,19 +66,23 @@ def _make_fraction(self, index: int) -> None:
103
66
Fractionator .CHUNK_SIZE
104
67
) # don't use peek, as it does not advance the position in the file
105
68
106
- # Generate an IV and encrypt the chunk
107
- self ._iv = secrets .token_bytes (
108
- 16
109
- ) # initialization vector for AES-256 encryption
110
- encrypted_data = self .do_aes_operation (data , True ) # encrypt chunk
111
-
69
+ # generate iv and encrypt the chunk
70
+ encrypted_data = self ._encrypt_chunk (data )
71
+
112
72
# Create a fraction instance and add it to self._fractions
113
73
fraction = Fraction (
114
74
magic = Fractionator .MAGIC , index = index , iv = self ._iv , data = encrypted_data
115
75
)
116
76
self ._fractions .append (fraction )
117
77
logging .debug (f"Created fraction #{ fraction .index } " )
118
78
79
+ def _encrypt_chunk (self , data : bytes ) -> bytes :
80
+ self ._iv = self ._generate_iv ()
81
+ return self .do_aes_operation (data , True )
82
+
83
+ def _generate_iv (self ) -> bytes :
84
+ return secrets .token_bytes (16 )
85
+
119
86
def make_fractions (self ) -> None :
120
87
"""Iterate through the Fractionator object file specified in self._path and generate Fraction objects"""
121
88
size = os .path .getsize (self ._path )
@@ -163,11 +130,11 @@ def load_backup(self, backup_path: str):
163
130
with open (backup_path , "r" ) as f :
164
131
self .fraction_paths = [line .strip () for line in f ]
165
132
logging .debug (
166
- f"[debug: _load_backup] Loaded { len (self .fraction_paths )} paths from backup."
133
+ f"Loaded { len (self .fraction_paths )} paths from backup."
167
134
)
168
135
169
136
except OSError as e :
170
- logging .error (f"[error: _load_backup] Failed to load backup: { e } " )
137
+ logging .error (f"Failed to load backup: { e } " )
171
138
return []
172
139
173
140
def _clean_fraction (self , path : str ):
@@ -190,12 +157,13 @@ def clean_fractions(self) -> None:
190
157
self .fraction_paths = []
191
158
logging .info ("Done." )
192
159
160
+
193
161
def do_aes_operation (self , data : bytes , op : bool ) -> bytes :
194
162
"""Perform an AES-256 operation on given data (encryption [op=True]/decryption [op=False])"""
195
163
if not self ._key or not self ._iv :
196
164
raise ValueError (f"Missing key or IV (_key:{ self ._key } , _iv:{ self ._iv } )" )
197
165
198
- cipher = Cipher ( algorithms . AES ( self ._key ), modes . OFB ( self . _iv ))
166
+ cipher = self .cipher
199
167
operator = cipher .encryptor () if op else cipher .decryptor ()
200
168
201
169
return operator .update (data ) + operator .finalize ()
0 commit comments