-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scandir and md5 for adlsgen2setup.py #2113
base: main
Are you sure you want to change the base?
Changes from 4 commits
a608e57
3206092
a482213
ce4bffe
285ae7d
bfb95ab
d07c9eb
4769133
9a385a4
66b5506
46365c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,10 @@ | ||
import argparse | ||
import asyncio | ||
import datetime | ||
import json | ||
import logging | ||
import os | ||
import hashlib | ||
from typing import Any, Optional | ||
|
||
import aiohttp | ||
|
@@ -56,7 +58,7 @@ def __init__( | |
self.data_access_control_format = data_access_control_format | ||
self.graph_headers: Optional[dict[str, str]] = None | ||
|
||
async def run(self): | ||
async def run(self, scandirs: bool = False): | ||
async with self.create_service_client() as service_client: | ||
logger.info(f"Ensuring {self.filesystem_name} exists...") | ||
async with service_client.get_file_system_client(self.filesystem_name) as filesystem_client: | ||
|
@@ -80,15 +82,17 @@ async def run(self): | |
) | ||
directories[directory] = directory_client | ||
|
||
logger.info("Uploading scanned files...") | ||
if scandirs and directory != "/": | ||
await self.scan_and_upload_directories(directories, filesystem_client) | ||
|
||
logger.info("Uploading files...") | ||
for file, file_info in self.data_access_control_format["files"].items(): | ||
directory = file_info["directory"] | ||
if directory not in directories: | ||
logger.error(f"File {file} has unknown directory {directory}, exiting...") | ||
return | ||
await self.upload_file( | ||
directory_client=directories[directory], file_path=os.path.join(self.data_directory, file) | ||
) | ||
await self.upload_file(directory_client=directories[directory], file_path=os.path.join(self.data_directory, file)) | ||
|
||
logger.info("Setting access control...") | ||
for directory, access_control in self.data_access_control_format["directories"].items(): | ||
|
@@ -100,8 +104,7 @@ async def run(self): | |
f"Directory {directory} has unknown group {group_name} in access control list, exiting" | ||
) | ||
return | ||
await directory_client.update_access_control_recursive( | ||
acl=f"group:{groups[group_name]}:r-x" | ||
await directory_client.update_access_control_recursive(acl=f"group:{groups[group_name]}:r-x" | ||
) | ||
if "oids" in access_control: | ||
for oid in access_control["oids"]: | ||
|
@@ -110,15 +113,110 @@ async def run(self): | |
for directory_client in directories.values(): | ||
await directory_client.close() | ||
|
||
async def walk_files(self, src_filepath = "."): | ||
filepath_list = [] | ||
|
||
#This for loop uses the os.walk() function to walk through the files and directories | ||
#and records the filepaths of the files to a list | ||
for root, dirs, files in os.walk(src_filepath): | ||
|
||
#iterate through the files currently obtained by os.walk() and | ||
#create the filepath string for that file and add it to the filepath_list list | ||
for file in files: | ||
#Checks to see if the root is '.' and changes it to the correct current | ||
#working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value. | ||
if root == '.': | ||
root_path = os.getcwd() + "/" | ||
else: | ||
root_path = root | ||
|
||
filepath = os.path.join(root_path, file) | ||
|
||
#Appends filepath to filepath_list if filepath does not currently exist in filepath_list | ||
if filepath not in filepath_list: | ||
filepath_list.append(filepath) | ||
|
||
#Return filepath_list | ||
return filepath_list | ||
|
||
async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirectoryClient], filesystem_client): | ||
logger.info("Scanning and uploading files from directories recursively...") | ||
for directory, directory_client in directories.items(): | ||
directory_path = os.path.join(self.data_directory, directory) | ||
|
||
# Überprüfen, ob 'scandir' existiert und auf False gesetzt ist | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. German comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ja! will translate that ;) |
||
if not self.data_access_control_format["directories"][directory].get("scandir", True): | ||
logger.info(f"Skipping directory {directory} as 'scandir' is set to False") | ||
continue | ||
|
||
groups = self.data_access_control_format["directories"][directory].get("groups", []) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. groups is not used? |
||
|
||
# Check if the directory exists before walking it | ||
if not os.path.exists(directory_path): | ||
logger.warning(f"Directory does not exist: {directory_path}") | ||
continue | ||
|
||
# Get all file paths using the walk_files function | ||
file_paths = await self.walk_files(directory_path) | ||
|
||
# Upload each file collected | ||
for file_path in file_paths: | ||
await self.upload_file(directory_client, file_path) | ||
logger.info(f"Uploaded {file_path} to {directory}") | ||
|
||
def create_service_client(self): | ||
return DataLakeServiceClient( | ||
account_url=f"https://{self.storage_account_name}.dfs.core.windows.net", credential=self.credentials | ||
) | ||
|
||
async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str): | ||
async def calc_md5(self, path: str) -> str: | ||
with open(path, "rb") as file: | ||
return hashlib.md5(file.read()).hexdigest() | ||
|
||
async def check_md5(self, path: str, md5_hash: str) -> bool: | ||
# if filename ends in .md5 skip | ||
if path.endswith(".md5"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We actually want to move the storage of the MD5 into the Blob storage metadata for our main prepdocs strategy, as the local md5 is problematic when you switch environments. Would that be possible with ADSL2? I assume it also has the ability to store metadata, given its a type of Blob storage? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’ve already implemented that and will include it with some additional metadata in the next commit. If i read the I’ve also started on preodocs.sh to make use of the persist MD5 on the backend once it’s "injected" from the data lake using the datalake strategy which seems to be unstable for exceptions . (exits the whole loop what is even more painful if you don't have a resume) Md5 will not dupe index and tokenizing work - ok, but it still will need to iterate again over all the offset until where the ingestion died abnormally. For external systems i use queries with timestamps to find a proper point to resume but i have no idea if and how current data lake file walker creates any order or if is completely random. I’m unsure where best to store this: a) In the index, where it would be associated with each chunk, meaning we’d need to locate chunks where there are more than zero results for the fingerprint, updating multiple rows accordingly. b) In /content (blob storage) for inline rendering of the source document as a citation in the browser. This has the benefit of only requiring one update per change, though we’d still need to locate and remove all matching chunks in the index by primary key for consistency. Option (a) seems more logical, doesn’t it? Btw i remember i had trouble with md5 dupes for bigger data amount , so maybe its better to use sha digests? And even more important: how would above efforts fit into integrated-vectorization? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blob storage has embedded md5 already as it seems -
If that md5 will have same value as a local computed md5 from file is the question There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blob service does have its own md5, but it only computes it for small files, so we would need to compute our own hash. Using sha256 also seems fine if we can store that in the blob metadata. |
||
return True | ||
|
||
# if there is a file called .md5 in this directory, see if its updated | ||
stored_hash = None | ||
hash_path = f"{path}.md5" | ||
if os.path.exists(hash_path): | ||
with open(hash_path, encoding="utf-8") as md5_f: | ||
stored_hash = md5_f.read() | ||
|
||
if stored_hash and stored_hash.strip() == md5_hash.strip(): | ||
logger.info("Skipping %s, no changes detected.", path) | ||
return True | ||
|
||
# Write the hash | ||
with open(hash_path, "w", encoding="utf-8") as md5_f: | ||
md5_f.write(md5_hash) | ||
|
||
return False | ||
|
||
async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str): | ||
# Calculate MD5 hash once | ||
md5_hash = await self.calc_md5(file_path) | ||
|
||
# Check if the file has been uploaded or if it has changed | ||
if await self.check_md5(file_path, md5_hash): | ||
logger.info("File %s has already been uploaded, skipping upload.", file_path) | ||
return # Skip uploading if the MD5 check indicates no changes | ||
|
||
# Proceed with the upload since the file has changed | ||
with open(file=file_path, mode="rb") as f: | ||
file_client = directory_client.get_file_client(file=os.path.basename(file_path)) | ||
await file_client.upload_data(f, overwrite=True) | ||
last_modified = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() | ||
title = os.path.splitext(os.path.basename(file_path))[0] | ||
metadata = { | ||
"md5": md5_hash, | ||
"category": category, | ||
"updated": last_modified, | ||
"title": title | ||
} | ||
await file_client.upload_data(f, overwrite=True, metadata=metadata) | ||
logger.info("File %s uploaded with metadata %s.", file_path, metadata) | ||
|
||
async def create_or_get_group(self, group_name: str): | ||
group_id = None | ||
|
@@ -144,6 +242,7 @@ async def create_or_get_group(self, group_name: str): | |
# If Unified does not work for you, then you may need the following settings instead: | ||
# "mailEnabled": False, | ||
# "mailNickname": group_name, | ||
|
||
} | ||
async with session.post("https://graph.microsoft.com/v1.0/groups", json=group) as response: | ||
content = await response.json() | ||
|
@@ -165,19 +264,19 @@ async def main(args: Any): | |
data_access_control_format = json.load(f) | ||
command = AdlsGen2Setup( | ||
data_directory=args.data_directory, | ||
storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"], | ||
filesystem_name="gptkbcontainer", | ||
storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"], | ||
filesystem_name=os.environ["AZURE_ADLS_GEN2_FILESYSTEM"], | ||
security_enabled_groups=args.create_security_enabled_groups, | ||
credentials=credentials, | ||
data_access_control_format=data_access_control_format, | ||
) | ||
await command.run() | ||
await command.run(args.scandirs) | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser( | ||
description="Upload sample data to a Data Lake Storage Gen2 account and associate sample access control lists with it using sample groups", | ||
epilog="Example: ./scripts/adlsgen2setup.py ./data --data-access-control ./scripts/sampleacls.json --create-security-enabled-groups <true|false>", | ||
description="Upload data to a Data Lake Storage Gen2 account and associate access control lists with it using sample groups", | ||
epilog="Example: ./scripts/adlsgen2setup.py ./data --data-access-control .azure/${AZURE_ENV_NAME}/docs_acls.json --create-security-enabled-groups <true|false> --scandirs", | ||
) | ||
parser.add_argument("data_directory", help="Data directory that contains sample PDFs") | ||
parser.add_argument( | ||
|
@@ -190,6 +289,7 @@ async def main(args: Any): | |
"--data-access-control", required=True, help="JSON file describing access control for the sample data" | ||
) | ||
parser.add_argument("--verbose", "-v", required=False, action="store_true", help="Verbose output") | ||
parser.add_argument("--scandirs", required=False, action="store_true", help="Scan and upload all files from directories recursively") | ||
args = parser.parse_args() | ||
if args.verbose: | ||
logging.basicConfig() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we only need to set this once? or does it change for every file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, good point