|
1 | 1 | import logging |
2 | 2 | import os |
3 | | -import shutil |
4 | 3 | import time |
5 | 4 | import zipfile |
6 | 5 | from datetime import datetime |
7 | | -from multiprocessing import Pool |
8 | | -from pathlib import Path |
9 | 6 | from tempfile import TemporaryDirectory |
10 | | -from typing import List, Tuple, Union |
| 7 | +from typing import Dict, List, Optional, Union |
11 | 8 |
|
12 | 9 | import requests |
13 | 10 | from requests.exceptions import ConnectionError, Timeout |
|
25 | 22 | REQUEST_TIMEOUT = 600 |
26 | 23 |
|
27 | 24 |
|
| 25 | +class UploadDirectoryInfo: |
| 26 | + def __init__( |
| 27 | + self, |
| 28 | + name: str, |
| 29 | + files: Optional[List[str]] = None, |
| 30 | + directories: Optional[List["UploadDirectoryInfo"]] = None, |
| 31 | + ): |
| 32 | + self.name = name |
| 33 | + self.files = files if files is not None else [] |
| 34 | + self.directories = directories if directories is not None else [] |
| 35 | + |
| 36 | + def serialize(self) -> Dict: |
| 37 | + return { |
| 38 | + "name": self.name, |
| 39 | + "files": [{"token": file} for file in self.files], |
| 40 | + "directories": [directory.serialize() for directory in self.directories], |
| 41 | + } |
| 42 | + |
| 43 | + |
28 | 44 | def parse_datetime_string(string: str) -> Union[datetime, str]: |
29 | 45 | time_formats = ["%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S.%fZ"] |
30 | 46 | for t in time_formats: |
@@ -138,51 +154,108 @@ def _upload_blob(file_path: str, model_type: str) -> str: |
138 | 154 | return response["token"] |
139 | 155 |
|
140 | 156 |
|
141 | | -def zip_file(args: Tuple[Path, Path, Path]) -> int: |
142 | | - file_path, zip_path, source_path_obj = args |
143 | | - arcname = file_path.relative_to(source_path_obj) |
144 | | - size = file_path.stat().st_size |
145 | | - with zipfile.ZipFile(zip_path, "a", zipfile.ZIP_STORED, allowZip64=True) as zipf: |
146 | | - zipf.write(file_path, arcname) |
147 | | - return size |
148 | | - |
149 | | - |
150 | | -def zip_files(source_path_obj: Path, zip_path: Path) -> List[int]: |
151 | | - files = [file for file in source_path_obj.rglob("*") if file.is_file()] |
152 | | - args = [(file, zip_path, source_path_obj) for file in files] |
153 | | - |
154 | | - with Pool() as pool: |
155 | | - sizes = pool.map(zip_file, args) |
156 | | - return sizes |
157 | | - |
158 | | - |
159 | | -def upload_files(source_path: str, model_type: str) -> List[str]: |
160 | | - source_path_obj = Path(source_path) |
161 | | - with TemporaryDirectory() as temp_dir: |
162 | | - temp_dir_path = Path(temp_dir) |
163 | | - total_size = 0 |
164 | | - |
165 | | - if source_path_obj.is_dir(): |
166 | | - for file_path in source_path_obj.rglob("*"): |
167 | | - if file_path.is_file(): |
168 | | - total_size += file_path.stat().st_size |
169 | | - elif source_path_obj.is_file(): |
170 | | - total_size = source_path_obj.stat().st_size |
171 | | - else: |
172 | | - path_error_message = "The source path does not point to a valid file or directory." |
173 | | - raise ValueError(path_error_message) |
174 | | - |
175 | | - with tqdm(total=total_size, desc="Zipping", unit="B", unit_scale=True, unit_divisor=1024) as pbar: |
176 | | - if source_path_obj.is_dir(): |
177 | | - zip_path = temp_dir_path / "archive.zip" |
178 | | - sizes = zip_files(source_path_obj, zip_path) |
179 | | - for size in sizes: |
180 | | - pbar.update(size) |
181 | | - upload_path = str(zip_path) |
182 | | - elif source_path_obj.is_file(): |
183 | | - temp_file_path = temp_dir_path / source_path_obj.name |
184 | | - shutil.copy(source_path_obj, temp_file_path) |
185 | | - pbar.update(temp_file_path.stat().st_size) |
186 | | - upload_path = str(temp_file_path) |
187 | | - |
188 | | - return [token for token in [_upload_blob(upload_path, model_type)] if token] |
| 157 | +def upload_files_and_directories( |
| 158 | + folder: str, model_type: str, quiet: bool = False # noqa: FBT002, FBT001 |
| 159 | +) -> UploadDirectoryInfo: |
| 160 | + # Count the total number of files |
| 161 | + file_count = 0 |
| 162 | + for _, _, files in os.walk(folder): |
| 163 | + file_count += len(files) |
| 164 | + |
| 165 | + if file_count > MAX_FILES_TO_UPLOAD: |
| 166 | + if not quiet: |
| 167 | + logger.info(f"More than {MAX_FILES_TO_UPLOAD} files detected, creating a zip archive...") |
| 168 | + |
| 169 | + with TemporaryDirectory() as temp_dir: |
| 170 | + zip_path = os.path.join(temp_dir, TEMP_ARCHIVE_FILE) |
| 171 | + with zipfile.ZipFile(zip_path, "w") as zipf: |
| 172 | + for root, _, files in os.walk(folder): |
| 173 | + for file in files: |
| 174 | + file_path = os.path.join(root, file) |
| 175 | + zipf.write(file_path, os.path.relpath(file_path, folder)) |
| 176 | + |
| 177 | + tokens = [ |
| 178 | + token |
| 179 | + for token in [_upload_file_or_folder(temp_dir, TEMP_ARCHIVE_FILE, model_type, quiet)] |
| 180 | + if token is not None |
| 181 | + ] |
| 182 | + return UploadDirectoryInfo(name="archive", files=tokens) |
| 183 | + |
| 184 | + root_dict = UploadDirectoryInfo(name="root") |
| 185 | + if os.path.isfile(folder): |
| 186 | + # Directly upload the file if the path is a file |
| 187 | + file_name = os.path.basename(folder) |
| 188 | + token = _upload_file_or_folder(os.path.dirname(folder), file_name, model_type, quiet) |
| 189 | + if token: |
| 190 | + root_dict.files.append(token) |
| 191 | + else: |
| 192 | + for root, _, files in os.walk(folder): |
| 193 | + # Path of the current folder relative to the base folder |
| 194 | + path = os.path.relpath(root, folder) |
| 195 | + |
| 196 | + # Navigate or create the dictionary path to the current folder |
| 197 | + current_dict = root_dict |
| 198 | + if path != ".": |
| 199 | + for part in path.split(os.sep): |
| 200 | + # Find or create the subdirectory in the current dictionary |
| 201 | + for subdir in current_dict.directories: |
| 202 | + if subdir.name == part: |
| 203 | + current_dict = subdir |
| 204 | + break |
| 205 | + else: |
| 206 | + # If the directory is not found, create a new one |
| 207 | + new_dir = UploadDirectoryInfo(name=part) |
| 208 | + current_dict.directories.append(new_dir) |
| 209 | + current_dict = new_dir |
| 210 | + |
| 211 | + # Add file tokens to the current directory in the dictionary |
| 212 | + for file in files: |
| 213 | + token = _upload_file_or_folder(root, file, model_type, quiet) |
| 214 | + if token: |
| 215 | + current_dict.files.append(token) |
| 216 | + |
| 217 | + return root_dict |
| 218 | + |
| 219 | + |
| 220 | +def _upload_file_or_folder( |
| 221 | + parent_path: str, |
| 222 | + file_or_folder_name: str, |
| 223 | + model_type: str, |
| 224 | + quiet: bool = False, # noqa: FBT002, FBT001 |
| 225 | +) -> Optional[str]: |
| 226 | + """ |
| 227 | + Uploads a file or each file inside a folder individually from a specified path to a remote service. |
| 228 | + Parameters |
| 229 | + ========== |
| 230 | + parent_path: The parent directory path from where the file or folder is to be uploaded. |
| 231 | + file_or_folder_name: The name of the file or folder to be uploaded. |
| 232 | + dir_mode: The mode to handle directories. Accepts 'zip', 'tar', or other values for skipping. |
| 233 | + model_type: Type of the model that is being uploaded. |
| 234 | + quiet: suppress verbose output (default is False) |
| 235 | + :return: A token if the upload is successful, or None if the file is skipped or the upload fails. |
| 236 | + """ |
| 237 | + full_path = os.path.join(parent_path, file_or_folder_name) |
| 238 | + if os.path.isfile(full_path): |
| 239 | + return _upload_file(file_or_folder_name, full_path, quiet, model_type) |
| 240 | + return None |
| 241 | + |
| 242 | + |
| 243 | +def _upload_file(file_name: str, full_path: str, quiet: bool, model_type: str) -> Optional[str]: # noqa: FBT001 |
| 244 | + """Helper function to upload a single file |
| 245 | + Parameters |
| 246 | + ========== |
| 247 | + file_name: name of the file to upload |
| 248 | + full_path: path to the file to upload |
| 249 | + quiet: suppress verbose output |
| 250 | + model_type: Type of the model that is being uploaded. |
| 251 | + :return: None - upload unsuccessful; instance of UploadFile - upload successful |
| 252 | + """ |
| 253 | + |
| 254 | + if not quiet: |
| 255 | + logger.info("Starting upload for file " + file_name) |
| 256 | + |
| 257 | + content_length = os.path.getsize(full_path) |
| 258 | + token = _upload_blob(full_path, model_type) |
| 259 | + if not quiet: |
| 260 | + logger.info("Upload successful: " + file_name + " (" + File.get_size(content_length) + ")") |
| 261 | + return token |
0 commit comments