Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scandir and md5 for adlsgen2setup.py #2113

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
4 changes: 3 additions & 1 deletion docs/login_and_acl.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ The script performs the following steps:
- Creates example [groups](https://learn.microsoft.com/entra/fundamentals/how-to-manage-groups) listed in the [sampleacls.json](/scripts/sampleacls.json) file.
- Creates a filesystem / container `gptkbcontainer` in the storage account.
- Creates the directories listed in the [sampleacls.json](/scripts/sampleacls.json) file.
- Uploads the sample PDFs referenced in the [sampleacls.json](/scripts/sampleacls.json) file into the appropriate directories.
- Scans the directories for files recursively if you add the option '--scandirs' (default false) cto the argument list (default off) and you don't have '"scandir": false' (default true) below the directory element in the sampleacls.json file.
- Caluclates md5 checksuk of each file refrenced anc compares with existing 'filename.ext.md5' file. Skip upload if same else upload and storenew md5 value in 'filename.ext.md5'
- Uploads the sample PDFs referenced in the [sampleacls.json](/scripts/sampleacls.json) file or files found in the folders with scandir option set to true into the appropriate directories.
- [Recursively sets Access Control Lists (ACLs)](https://learn.microsoft.com/azure/storage/blobs/data-lake-storage-acl-cli) using the information from the [sampleacls.json](/scripts/sampleacls.json) file.

In order to use the sample access control, you need to join these groups in your Microsoft Entra tenant.
Expand Down
126 changes: 113 additions & 13 deletions scripts/adlsgen2setup.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import argparse
import asyncio
import datetime
import json
import logging
import os
import hashlib
from typing import Any, Optional

import aiohttp
Expand Down Expand Up @@ -56,7 +58,7 @@ def __init__(
self.data_access_control_format = data_access_control_format
self.graph_headers: Optional[dict[str, str]] = None

async def run(self):
async def run(self, scandirs: bool = False):
async with self.create_service_client() as service_client:
logger.info(f"Ensuring {self.filesystem_name} exists...")
async with service_client.get_file_system_client(self.filesystem_name) as filesystem_client:
Expand All @@ -80,15 +82,17 @@ async def run(self):
)
directories[directory] = directory_client

logger.info("Uploading scanned files...")
if scandirs and directory != "/":
await self.scan_and_upload_directories(directories, filesystem_client)

logger.info("Uploading files...")
for file, file_info in self.data_access_control_format["files"].items():
directory = file_info["directory"]
if directory not in directories:
logger.error(f"File {file} has unknown directory {directory}, exiting...")
return
await self.upload_file(
directory_client=directories[directory], file_path=os.path.join(self.data_directory, file)
)
await self.upload_file(directory_client=directories[directory], file_path=os.path.join(self.data_directory, file))

logger.info("Setting access control...")
for directory, access_control in self.data_access_control_format["directories"].items():
Expand All @@ -100,8 +104,7 @@ async def run(self):
f"Directory {directory} has unknown group {group_name} in access control list, exiting"
)
return
await directory_client.update_access_control_recursive(
acl=f"group:{groups[group_name]}:r-x"
await directory_client.update_access_control_recursive(acl=f"group:{groups[group_name]}:r-x"
)
if "oids" in access_control:
for oid in access_control["oids"]:
Expand All @@ -110,15 +113,110 @@ async def run(self):
for directory_client in directories.values():
await directory_client.close()

async def walk_files(self, src_filepath = "."):
filepath_list = []

#This for loop uses the os.walk() function to walk through the files and directories
#and records the filepaths of the files to a list
for root, dirs, files in os.walk(src_filepath):

#iterate through the files currently obtained by os.walk() and
#create the filepath string for that file and add it to the filepath_list list
for file in files:
#Checks to see if the root is '.' and changes it to the correct current
#working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value.
if root == '.':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we only need to set this once? or does it change for every file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good point

root_path = os.getcwd() + "/"
else:
root_path = root

filepath = os.path.join(root_path, file)

#Appends filepath to filepath_list if filepath does not currently exist in filepath_list
if filepath not in filepath_list:
filepath_list.append(filepath)

#Return filepath_list
return filepath_list

async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirectoryClient], filesystem_client):
logger.info("Scanning and uploading files from directories recursively...")
for directory, directory_client in directories.items():
directory_path = os.path.join(self.data_directory, directory)

# Überprüfen, ob 'scandir' existiert und auf False gesetzt ist
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

German comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ja! will translate that ;)

if not self.data_access_control_format["directories"][directory].get("scandir", True):
logger.info(f"Skipping directory {directory} as 'scandir' is set to False")
continue

groups = self.data_access_control_format["directories"][directory].get("groups", [])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groups is not used?


# Check if the directory exists before walking it
if not os.path.exists(directory_path):
logger.warning(f"Directory does not exist: {directory_path}")
continue

# Get all file paths using the walk_files function
file_paths = await self.walk_files(directory_path)

# Upload each file collected
for file_path in file_paths:
await self.upload_file(directory_client, file_path)
logger.info(f"Uploaded {file_path} to {directory}")

def create_service_client(self):
return DataLakeServiceClient(
account_url=f"https://{self.storage_account_name}.dfs.core.windows.net", credential=self.credentials
)

async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str):
async def calc_md5(self, path: str) -> str:
with open(path, "rb") as file:
return hashlib.md5(file.read()).hexdigest()

async def check_md5(self, path: str, md5_hash: str) -> bool:
# if filename ends in .md5 skip
if path.endswith(".md5"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually want to move the storage of the MD5 into the Blob storage metadata for our main prepdocs strategy, as the local md5 is problematic when you switch environments. Would that be possible with ADSL2? I assume it also has the ability to store metadata, given its a type of Blob storage?
What do you think of that approach, remote MD5?

Copy link
Contributor Author

@cforce cforce Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve already implemented that and will include it with some additional metadata in the next commit. If i read the
file directly from the source (local storage or blob/data lake storage) and create a new checksum on the fly, keep it in heap . Then, i can compare it with the persistent checksum on the target storage (datalake)and update the blob only if diff and as well the checksum in the meta.

I’ve also started on preodocs.sh to make use of the persist MD5 on the backend once it’s "injected" from the data lake using the datalake strategy which seems to be unstable for exceptions . (exits the whole loop what is even more painful if you don't have a resume) Md5 will not dupe index and tokenizing work - ok, but it still will need to iterate again over all the offset until where the ingestion died abnormally. For external systems i use queries with timestamps to find a proper point to resume but i have no idea if and how current data lake file walker creates any order or if is completely random.
Maybe there is a way to walk/iterate along a query based on a metadata field (i have added) like last change date in metadata? This way i could persist the last ingestion date seen in another blob and use it as resume

I’m unsure where best to store this:
Two options i had in mind

a) In the index, where it would be associated with each chunk, meaning we’d need to locate chunks where there are more than zero results for the fingerprint, updating multiple rows accordingly.

b) In /content (blob storage) for inline rendering of the source document as a citation in the browser. This has the benefit of only requiring one update per change, though we’d still need to locate and remove all matching chunks in the index by primary key for consistency.

Option (a) seems more logical, doesn’t it?

Btw i remember i had trouble with md5 dupes for bigger data amount , so maybe its better to use sha digests?

And even more important: how would above efforts fit into integrated-vectorization?
Will that work with whatever fingerprint being takeover into the index. I assume it requires a new skill, doesn't it?

Copy link
Contributor Author

@cforce cforce Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blob storage has embedded md5 already as it seems -

from azure.storage.blob import BlobServiceClient

# Your connection string
connection_string = "your_connection_string"

# Your blob container name and blob name
container_name = "your_container_name"
blob_name = "your_blob_name"

# Initialize a BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

# Get a reference to the blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

# Get the properties of the blob
blob_properties = blob_client.get_blob_properties()

# Retrieve the MD5 hash value from the properties
md5_hash = blob_properties.content_settings.content_md5

print(f"MD5 Hash of the blob: {md5_hash}")

If that md5 will have same value as a local computed md5 from file is the question
Also as long as we use as blob name some filename (instead of md5) we still need to rely on that and can't handle rename scenarios. Also we can't easily with O(1) find out if we have any blob up there for the md5 of the file in our hands. If we would use md5 as blob name it would make things in code much slimmer, but debugging and using the azure portal ui file browser for debugging a bit more cumberstone. Also md5 would really need to be unique. her i would only trust sha256 in final instance

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blob service does have its own md5, but it only computes it for small files, so we would need to compute our own hash. Using sha256 also seems fine if we can store that in the blob metadata.

return True

# if there is a file called .md5 in this directory, see if its updated
stored_hash = None
hash_path = f"{path}.md5"
if os.path.exists(hash_path):
with open(hash_path, encoding="utf-8") as md5_f:
stored_hash = md5_f.read()

if stored_hash and stored_hash.strip() == md5_hash.strip():
logger.info("Skipping %s, no changes detected.", path)
return True

# Write the hash
with open(hash_path, "w", encoding="utf-8") as md5_f:
md5_f.write(md5_hash)

return False

async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str):
# Calculate MD5 hash once
md5_hash = await self.calc_md5(file_path)

# Check if the file has been uploaded or if it has changed
if await self.check_md5(file_path, md5_hash):
logger.info("File %s has already been uploaded, skipping upload.", file_path)
return # Skip uploading if the MD5 check indicates no changes

# Proceed with the upload since the file has changed
with open(file=file_path, mode="rb") as f:
file_client = directory_client.get_file_client(file=os.path.basename(file_path))
await file_client.upload_data(f, overwrite=True)
last_modified = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
title = os.path.splitext(os.path.basename(file_path))[0]
metadata = {
"md5": md5_hash,
"category": category,
"updated": last_modified,
"title": title
}
await file_client.upload_data(f, overwrite=True, metadata=metadata)
logger.info("File %s uploaded with metadata %s.", file_path, metadata)

async def create_or_get_group(self, group_name: str):
group_id = None
Expand All @@ -144,6 +242,7 @@ async def create_or_get_group(self, group_name: str):
# If Unified does not work for you, then you may need the following settings instead:
# "mailEnabled": False,
# "mailNickname": group_name,

}
async with session.post("https://graph.microsoft.com/v1.0/groups", json=group) as response:
content = await response.json()
Expand All @@ -165,19 +264,19 @@ async def main(args: Any):
data_access_control_format = json.load(f)
command = AdlsGen2Setup(
data_directory=args.data_directory,
storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"],
filesystem_name="gptkbcontainer",
storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"],
filesystem_name=os.environ["AZURE_ADLS_GEN2_FILESYSTEM"],
security_enabled_groups=args.create_security_enabled_groups,
credentials=credentials,
data_access_control_format=data_access_control_format,
)
await command.run()
await command.run(args.scandirs)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Upload sample data to a Data Lake Storage Gen2 account and associate sample access control lists with it using sample groups",
epilog="Example: ./scripts/adlsgen2setup.py ./data --data-access-control ./scripts/sampleacls.json --create-security-enabled-groups <true|false>",
description="Upload data to a Data Lake Storage Gen2 account and associate access control lists with it using sample groups",
epilog="Example: ./scripts/adlsgen2setup.py ./data --data-access-control .azure/${AZURE_ENV_NAME}/docs_acls.json --create-security-enabled-groups <true|false> --scandirs",
)
parser.add_argument("data_directory", help="Data directory that contains sample PDFs")
parser.add_argument(
Expand All @@ -190,6 +289,7 @@ async def main(args: Any):
"--data-access-control", required=True, help="JSON file describing access control for the sample data"
)
parser.add_argument("--verbose", "-v", required=False, action="store_true", help="Verbose output")
parser.add_argument("--scandirs", required=False, action="store_true", help="Scan and upload all files from directories recursively")
args = parser.parse_args()
if args.verbose:
logging.basicConfig()
Expand Down
10 changes: 8 additions & 2 deletions scripts/sampleacls.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
},
"directories": {
"employeeinfo": {
"groups": ["GPTKB_HRTest"]
"groups": ["GPTKB_HRTest"],
"scandir": false
},
"benefitinfo": {
"groups": ["GPTKB_EmployeeTest", "GPTKB_HRTest"]
"groups": ["GPTKB_EmployeeTest", "GPTKB_HRTest"],
"scandir": false
},
"GPT4V_Examples": {
"groups": ["GPTKB_EmployeeTest", "GPTKB_HRTest"],
"scandir": true
},
"/": {
"groups": ["GPTKB_AdminTest"]
Expand Down
Loading