Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update download Module to Process Data by Laboratory COD and Project Subfolder #431

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Code contributions to the release:
- Add Validation for Dropdown Columns: Notify Users of Invalid Entries in build-schema module [#423](https://github.com/BU-ISCIII/relecov-tools/pull/423)
- Test SFTP Login by Updating Port Assignment in wrapper_manager [#426](https://github.com/BU-ISCIII/relecov-tools/pull/426)
- Update Test Data for new Schema & Modify JSON Filepaths in read-bioinfo-metadata [#427](https://github.com/BU-ISCIII/relecov-tools/pull/427)
- Update download Module to Process Data by Laboratory COD and Project Subfolder [#431](https://github.com/BU-ISCIII/relecov-tools/pull/431)

#### Fixes

Expand Down
10 changes: 10 additions & 0 deletions relecov_tools/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,22 @@ def relecov_tools_cli(verbose, log_file):
default=None,
help="Flag: Select which folders will be targeted giving [paths] or via prompt",
)
@click.option(
"-s",
"--subfolder",
is_flag=False,
flag_value="ALL",
default="RELECOV",
help="Flag: Specify which subfolder to process (default: RELECOV)",
)
def download(
user,
password,
conf_file,
download_option,
output_location,
target_folders,
subfolder,
):
"""Download files located in sftp server."""
download_manager = relecov_tools.download_manager.DownloadManager(
Expand All @@ -188,6 +197,7 @@ def download(
download_option,
output_location,
target_folders,
subfolder,
)
try:
download_manager.execute_process()
Expand Down
2 changes: 1 addition & 1 deletion relecov_tools/conf/bioinfo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"content": {
"per_sgene_ambiguous": "S-Gene_Ambiguous_Percentage",
"per_sgene_coverage": "S-Gene_Coverage_Percentage",
"per_ldmutations": "% LDMutations",
"per_ldmutations": "%LDMutations",
"number_of_sgene_frameshifts": "S-Gene_Frameshifts",
"number_of_unambiguous_bases": "Total_Unambiguous_Bases",
"number_of_Ns": "Total_Ns_count"
Expand Down
2 changes: 1 addition & 1 deletion relecov_tools/conf/configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@
},
"sftp_handle": {
"sftp_connection": {
"sftp_server": "sftprelecov.isciii.es",
"sftp_server": "sftpgenvigies.isciii.es",
"sftp_port": "22"
},
"metadata_processing": {
Expand Down
177 changes: 124 additions & 53 deletions relecov_tools/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
download_option=None,
output_location=None,
target_folders=None,
subfolder="RELECOV",
):
"""Initializes the sftp object"""
log.info("Initiating download process")
Expand All @@ -56,6 +57,7 @@ def __init__(
sftp_user = user
sftp_passwd = passwd
self.target_folders = target_folders
self.subfolder = subfolder
self.allowed_download_options = config_json.get_topic_data(
"sftp_handle", "allowed_download_options"
)
Expand Down Expand Up @@ -145,13 +147,14 @@ def create_local_folder(self, folder):
"""
log.info("Creating folder %s to download files", folder)
platform_storage_folder = self.platform_storage_folder
if platform_storage_folder == folder:
local_folder_path = platform_storage_folder
else:
folder = folder.strip("_tmp_processing")
local_folder_path = os.path.join(platform_storage_folder, folder)
path_parts = folder.split("/")
cod_folder = path_parts[0]
batch_folder = path_parts[-1].replace("_tmp_processing", "")
local_folder_path = os.path.join(
platform_storage_folder, cod_folder, batch_folder
)
os.makedirs(local_folder_path, exist_ok=True)
log.info("created the folder to download files %s", local_folder_path)
log.info("Created the folder to download files %s", local_folder_path)
return local_folder_path

def get_remote_folder_files(self, folder, local_folder, file_list):
Expand Down Expand Up @@ -584,30 +587,35 @@ def validate_remote_files(self, remote_folder, local_folder):
return sample_files_dict, local_meta_file

def delete_remote_files(self, remote_folder, files=None, skip_seqs=False):
"""Delete files from remote folder
"""Delete files from remote folder.

Args:
remote_folder (str): path to folder in remote repository
files (list(str), optional): list of target filenames in remote repository
skip_seqs (bool, optional): Skip sequencing files based on extension
remote_folder (str): Path to folder in remote repository.
files (list(str), optional): List of target filenames in remote repository.
skip_seqs (bool, optional): Skip sequencing files based on extension.
"""
log.info(f"Deleting files in remote {remote_folder}.")
stderr.print(f"[blue]Deleting files in remote {remote_folder}...")
all_files = self.relecov_sftp.get_file_list(remote_folder)
file_paths = {os.path.basename(f): f for f in all_files}
if files is None:
files_to_remove = self.relecov_sftp.get_file_list(remote_folder)
files_to_remove = all_files
else:
files_to_remove = files
for file in files_to_remove:
if skip_seqs is True:
if file.endswith(tuple(self.allowed_file_ext)):
continue
try:
self.relecov_sftp.remove_file(
os.path.join(remote_folder, os.path.basename(file))
)
except (IOError, PermissionError) as e:
log.error(f"Could not delete remote file {file}: {e}")
stderr.print(f"Could not delete remote file {file}. Error: {e}")
if skip_seqs and file.endswith(tuple(self.allowed_file_ext)):
continue
matched_path = file_paths.get(os.path.basename(file))
if matched_path:
try:
self.relecov_sftp.remove_file(matched_path)
except (IOError, PermissionError) as e:
log.error(f"Could not delete remote file {matched_path}: {e}")
stderr.print(
f"[red]Could not delete remote file {matched_path}. Error: {e}"
)
else:
stderr.print(f"[red]File not found before deletion: {file}")
return

def rename_remote_folder(self, remote_folder):
Expand Down Expand Up @@ -676,21 +684,20 @@ def move_processing_fastqs(self, folders_with_metadata):
for folder, files in folders_with_metadata.items():
self.current_folder = folder.split("/")[0]
successful_files = []
for file in files:
unique_files = list(set(files))
for file in unique_files:
if not file.endswith(tuple(self.allowed_file_ext)):
continue

file_dest = os.path.join(folder, os.path.basename(file))
if file_dest in successful_files:
continue
try:
# Paramiko.SSHClient.sftp_open does not have a method to copy files
self.relecov_sftp.rename_file(file, file_dest)
successful_files.append(file_dest)
except OSError:
if file in folders_with_metadata[folder]:
error_text = "File named %s already in %s. Skipped"
self.include_warning(error_text % (file, self.current_folder))
else:
error_text = "Error while moving file %s"
self.include_error(error_text % file)
except OSError as e:
log.error(f"Error moving file {file} to {file_dest}: {e}")
stderr.print(f"[red]Error moving file {file} to {file_dest}: {e}")
folders_with_metadata[folder] = successful_files
return folders_with_metadata

Expand Down Expand Up @@ -794,9 +801,11 @@ def merge_metadata(self, meta_sheet=None, *metadata_tables):
if meta_sheet:
merged_df[meta_sheet] = concat(
[merged_df[meta_sheet], table[meta_sheet]], ignore_index=True
)
).drop_duplicates()
else:
merged_df = concat([merged_df, table], ignore_index=True)
merged_df = concat(
[merged_df, table], ignore_index=True
).drop_duplicates()
return merged_df

def excel_to_df(self, excel_file, metadata_sheet, header_flag):
Expand Down Expand Up @@ -905,8 +914,9 @@ def pre_validate_folder(folder, folder_files):
# Taking the main folder for each lab as reference for merge and logs
main_folder = folder.split("/")[0]
self.current_folder = main_folder
temporal_foldername = "_".join([date_and_time, "tmp_processing"])
temp_folder = os.path.join(main_folder, temporal_foldername)
tmp_folder_parent = os.path.join(main_folder, self.subfolder)
temporal_foldername = f"{date_and_time}_tmp_processing"
temp_folder = os.path.join(tmp_folder_parent, temporal_foldername)
# Get every file except the excel ones as they are going to be merged
filelist = [fi for fi in target_folders[folder] if not fi.endswith(".xlsx")]
if not folders_with_metadata.get(temp_folder):
Expand Down Expand Up @@ -963,7 +973,7 @@ def pre_validate_folder(folder, folder_files):
return clean_target_folders, processed_folders

def select_target_folders(self):
"""Find the selected folders in remote if given, else select every folder
"""Find the selected folders in remote if given, else select every folder.

Returns:
folders_to_process (dict(str:list)): Dictionary with folders and their files
Expand Down Expand Up @@ -991,19 +1001,27 @@ def select_target_folders(self):
folders_to_process = {}
for targeted_folder in target_folders:
try:
full_folders = self.relecov_sftp.list_remote_folders(
subfolders = self.relecov_sftp.list_remote_folders(
targeted_folder, recursive=True
)
except (FileNotFoundError, OSError) as e:
log.error(f"Error during sftp listing. {targeted_folder} skipped:", e)
continue
for folder in full_folders:
list_files = self.relecov_sftp.get_file_list(folder)
if list_files:
folders_to_process[folder] = list_files
else:
log.info("%s is empty", folder)
continue

for folder in subfolders:
if folder.startswith(f"{targeted_folder}/{self.subfolder}"):
full_path = os.path.normpath(folder)
try:
list_files = self.relecov_sftp.get_file_list(full_path)

if list_files:
folders_to_process[full_path] = list_files

except FileNotFoundError:
log.error(
f"Subfolder {self.subfolder} not found in {targeted_folder}"
)
continue
if len(folders_to_process) == 0:
log.info("Exiting process, folders were empty.")
log.error("There are no files in the selected folders.")
Expand Down Expand Up @@ -1287,13 +1305,32 @@ def execute_process(self):
target_folders = self.select_target_folders()
if self.download_option == "delete_only":
log.info("Initiating delete_only process")
processed_folders = target_folders.keys()
processed_folders = list(target_folders.keys())
all_folders = []
for folder in processed_folders:
subfolders = self.relecov_sftp.list_remote_folders(
folder, recursive=True
)
all_folders.extend(subfolders)
all_folders.extend(processed_folders)
project_folders = {
folder for folder in processed_folders if folder.count("/") == 1
}
all_folders = sorted(
set(all_folders) - project_folders,
key=lambda x: x.count("/"),
reverse=True,
)
for folder in all_folders:
self.current_folder = folder
self.delete_remote_files(folder)
self.clean_remote_folder(folder)
log.info(f"Delete process finished in {folder}")
stderr.print(f"Delete process finished in {folder}")
for project_folder in project_folders:
self.delete_remote_files(project_folder)
log.info(f"Cleaned project folder: {project_folder}")
stderr.print(f"Cleaned project folder: {project_folder}")
else:
target_folders, processed_folders = self.merge_subfolders(target_folders)
self.download(target_folders)
Expand All @@ -1305,25 +1342,59 @@ def execute_process(self):
self.logsum.create_error_summary(called_module="download")
else:
log.info("Process log summary was empty. Not generated.")
processed_folders = list(
set(os.path.normpath(folder) for folder in processed_folders)
)
# If download_option is "download_clean", remove
# sftp folder content after download is finished
if self.download_option == "download_clean":
for folder in processed_folders:
self.delete_remote_files(folder, skip_seqs=True)
self.clean_remote_folder(folder)
normal_folders = {
folder
for folder in processed_folders
if "_tmp_processing" not in folder
}
for folder in normal_folders:
if self.relecov_sftp.get_file_list(folder):
self.delete_remote_files(folder, skip_seqs=True)
self.clean_remote_folder(folder)
folders_to_clean = copy.deepcopy(self.finished_folders)
for folder, downloaded_files in folders_to_clean.items():
self.delete_remote_files(folder, files=downloaded_files)
self.delete_remote_files(folder, skip_seqs=True)
self.clean_remote_folder(folder)
stderr.print(f"Delete process finished in remote {folder}")
log.info(f"Delete process finished in remote {folder}")
if self.relecov_sftp.get_file_list(folder):
self.delete_remote_files(folder, files=downloaded_files)
self.clean_remote_folder(folder)
log.info(f"Delete process finished in remote {folder}")

invalid_folders = [
key for key in target_folders if key not in folders_to_clean
]
for folder in invalid_folders:
self.rename_remote_folder(folder)
log.info("Renamed tmp processing folder: %s", folder)
log.info(f"Renamed tmp processing folder: {folder}")

cleaned_folders = []
for folder in processed_folders:
normalized_folder = os.path.normpath(folder)
if normalized_folder not in cleaned_folders:
cleaned_folders.append(normalized_folder)

for folder in cleaned_folders:
if self.relecov_sftp.get_file_list(folder):
all_subfolders = self.relecov_sftp.list_remote_folders(
folder, recursive=True
)
all_subfolders.sort(key=lambda x: x.count("/"), reverse=True)

for subfolder in all_subfolders:
if not self.relecov_sftp.get_file_list(subfolder):
self.clean_remote_folder(subfolder)
log.info(
f"Checked and removed empty subfolder: {subfolder}"
)

if not self.relecov_sftp.get_file_list(folder):
self.clean_remote_folder(folder)
log.info(f"Checked and removed empty folder: {folder}")

log.info("Finished download module execution")
stderr.print("Finished execution")
return
16 changes: 10 additions & 6 deletions relecov_tools/sftp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,16 @@ def get_file_list(self, folder_name):
"""
log.info("Listing files in %s", folder_name)
file_list = []
content_list = self.sftp.listdir_attr(folder_name)
file_list = [
os.path.join(folder_name, content.filename)
for content in content_list
if stat.S_ISREG(content.st_mode)
]
try:
content_list = self.sftp.listdir_attr(folder_name)
for content in content_list:
full_path = os.path.join(folder_name, content.filename)
if stat.S_ISDIR(content.st_mode):
file_list.extend(self.get_file_list(full_path))
elif stat.S_ISREG(content.st_mode):
file_list.append(full_path)
except FileNotFoundError:
log.error(f"Folder not found: {folder_name}")
return file_list

@reconnect_if_fail(n_times=3, sleep_time=30)
Expand Down
Loading