From d2a65d98e74cc06fddf3967243da30c1a104c485 Mon Sep 17 00:00:00 2001 From: Erik Wolfsohn Date: Thu, 15 Aug 2024 04:15:29 -0700 Subject: [PATCH] in the GISAID covCLI workflow: addressed issues w/metadata formatting & output file parsing. Added some additional error handling to prevent submission failures. --- gisaid_handler.py | 60 ++++++++++++++++++++++++++++++++++------------- seqsender.py | 2 ++ settings.py | 8 ++++--- upload_log.py | 50 +++++++++++++++++++++++++++++++-------- 4 files changed, 91 insertions(+), 29 deletions(-) diff --git a/gisaid_handler.py b/gisaid_handler.py index 06e65fc..aaa0e76 100755 --- a/gisaid_handler.py +++ b/gisaid_handler.py @@ -26,16 +26,22 @@ def create_gisaid_files(organism: str, database: str, submission_name: str, subm # Get column names for gisaid submission only gisaid_df = metadata.filter(regex=GISAID_REGEX).copy() gisaid_df.columns = gisaid_df.columns.str.replace("gs-","").str.strip() - #Add required GISAID fields + # Add required GISAID fields + # covCLI returns an error when authors or collection_date are capitalized if organism in ["COV", "POX", "ARBO"]: - gisaid_df = gisaid_df.rename(columns = {"sample_name": "virus_name", "authors": "Authors", "collection_date": "Collection_Date"}) + gisaid_df = gisaid_df.rename(columns = {"sample_name": "virus_name"}) + if organism in ["POX", "ARBO"]: + gisaid_df = gisaid_df.rename(columns = {"authors": "Authors", "collection_date": "Collection_Date"}) if organism == "COV": prefix_name = "covv_" else: prefix_name = organism.lower() + "_" gisaid_df = gisaid_df.add_prefix(prefix_name) gisaid_df["submitter"] = config_dict["Username"] - gisaid_df["fn"] = "" + # Copy FASTA headers from sequence_name column to fn column + gisaid_df["fn"] = gisaid_df["covv_sequence_name"] + # Dropping the original column so it doesn't cause issues later. + gisaid_df.drop(columns=['covv_sequence_name'], inplace=True) first_cols = ["submitter", "fn", (prefix_name + "virus_name")] elif "FLU" in organism: gisaid_df = gisaid_df.rename(columns = {"authors": "Authors", "collection_date": "Collection_Date"}) @@ -51,6 +57,9 @@ def create_gisaid_files(organism: str, database: str, submission_name: str, subm # Restructure column order last_cols = [col for col in gisaid_df.columns if col not in first_cols] gisaid_df = gisaid_df[first_cols + last_cols] + # If a submission fails and a new one is initiated without cleaning up metadata.csv/orig_metadata.csv, the column prefix doubles up. + # This works for now, might set up some housekeeping in the future + gisaid_df.columns = gisaid_df.columns.str.replace('^covv_covv_', 'covv_', regex=True) # Create submission files file_handler.save_csv(df=gisaid_df, file_path=submission_dir, file_name="metadata.csv") shutil.copy(os.path.join(submission_dir, "metadata.csv"), os.path.join(submission_dir, "orig_metadata.csv")) @@ -68,10 +77,12 @@ def process_gisaid_log(log_file: str, submission_dir: str) -> pd.DataFrame: while line: # If accession generated record it # Pattern options: "msg:": "; _" OR : ; _ - if re.match("(?i)(\W|^)(\"msg\":\s*\"\S+.*;\s*(EPI_ISL|EPI_ID)_\d{6,}\"|(epi_id|epi_isl_id):\s*\S.*;\s*(EPI_ISL_|EPI)\d+)(\W|$)", line): - gisaid_string = re.sub("(\W|^)\"msg\":\s*\"", "", line) - gisaid_string_list: List[str] = gisaid_string.strip().replace("\"", "").split(";") - sample_name = re.sub("(epi_isl_id|epi_id):\s*", "", gisaid_string_list[0].strip()) + # Changed re.match to re.search to return tokens that aren't at the beginning. Changed the regex to capture an arbitrary amount of numbers after EPI_ + if re.search("(?i)(\W|^)(\"msg\":\s*\"\S+.*;\s*(EPI_ISL|EPI_ID)_\d*\"|(epi_id|epi_isl_id):\s*\S.*;\s*(EPI_ISL_|EPI)\d+)(\W|$)", line): + gisaid_string = re.findall(r'(?:[a-zA-Z0-9_-]+(?:/[a-zA-Z0-9_-]+)+|EPI_\w*)', line) + gisaid_string = ' '.join(gisaid_string) + gisaid_string_list: List[str] = gisaid_string.split(' ') + sample_name = gisaid_string_list[0].strip() accession = gisaid_string_list[1].strip() if re.match("EPI_ISL_\d+", accession): gisaid_isolate_log.append({"gs-sample_name":sample_name, "gisaid_accession_epi_isl_id":accession}) @@ -80,9 +91,22 @@ def process_gisaid_log(log_file: str, submission_dir: str) -> pd.DataFrame: line = file.readline().strip() gisaid_isolate_df = pd.DataFrame(gisaid_isolate_log) gisaid_segment_df = pd.DataFrame(gisaid_segment_log) - upload_log.update_submission_status_csv(submission_dir=submission_dir.replace("/GISAID", "/"), update_database="GISAID", update_df=gisaid_isolate_df) - upload_log.update_submission_status_csv(submission_dir=submission_dir.replace("/GISAID", "/"), update_database="GISAID", update_df=gisaid_segment_df) - gisaid_isolate_df = gisaid_isolate_df[~gisaid_isolate_df["gisaid_accession_epi_isl_id"].str.contains("EPI_ISL_\d{6,}", regex = True, na = False)].copy() + + # Avoid passing an empty data frame to update_submission_status_csv since it was causing issues. + # Output types of GISAID entities found + if not gisaid_isolate_df.empty and not gisaid_segment_df.empty: + print("GISAID isolates and GISAID segments found.") + upload_log.update_submission_status_csv(submission_dir=submission_dir.replace("/GISAID", "/"), update_database="GISAID", update_df=gisaid_isolate_df) + upload_log.update_submission_status_csv(submission_dir=submission_dir.replace("/GISAID", "/"), update_database="GISAID", update_df=gisaid_segment_df) + elif not gisaid_isolate_df.empty: + print("GISAID isolates found.") + upload_log.update_submission_status_csv(submission_dir=submission_dir.replace("/GISAID", "/"), update_database="GISAID", update_df=gisaid_isolate_df) + elif not gisaid_segment_df.empty: + print("GISAID segments found.") + upload_log.update_submission_status_csv(submission_dir=submission_dir.replace("/GISAID", "/"), update_database="GISAID", update_df=gisaid_segment_df) + else: + print("Warning: no GISAID isolates or segments found") + gisaid_isolate_df = gisaid_isolate_df[~gisaid_isolate_df["gisaid_accession_epi_isl_id"].str.contains("EPI_ISL_\d*", regex = True, na = False)].copy() return gisaid_isolate_df[["gs-sample_name"]] # Submit to GISAID @@ -120,6 +144,7 @@ def submit_gisaid(organism: str, submission_dir: str, submission_name: str, conf while not os.path.exists(log_file): time.sleep(10) # Check submission log to see if all samples are uploaded successfully + # Check if not_submitted_df has contents before trying to process it not_submitted_df = process_gisaid_log(log_file=log_file, submission_dir=submission_dir) # If submission completed, no more attempts if not_submitted_df.empty: @@ -132,10 +157,13 @@ def submit_gisaid(organism: str, submission_dir: str, submission_name: str, conf if "FLU" in organism: column_name = "Isolate_Name" elif "COV" in organism: - column_name = "virus_name" + column_name = "covv_virus_name" metadata_df = metadata_df.merge(not_submitted_df, how="inner", left_on=column_name, right_on="gs-sample_name") - fasta_names = metadata_df["gs-sequence_name"].tolist() - metadata_df = metadata_df.drop(columns=["gs-sample_name", "gs-sequence_name"]) + fasta_names = metadata_df["sequence_name"].tolist() + if "sequence_name" in metadata_df.columns: + metadata_df.rename(columns={"sequence_name": "fn"}, inplace=True) + metadata_df = metadata_df.drop(columns=["gs-sample_name"]) + metadata_df = metadata_df.columns.str.replace('^covv_covv_', 'covv_', regex=True) metadata_df.to_csv(orig_metadata, header = True, index = False) fasta_dict = [] with open(orig_fasta, "r") as fsa: @@ -163,7 +191,7 @@ def update_gisaid_files(organism: str, submission_dir: str, submission_status_fi orig_fasta = os.path.join(submission_dir, "orig_sequence.fsa") # Filter out genbank that has accession number genbank_status_df = status_df[status_df["genbank-status"].str.contains("processed-ok", na=False)].copy() - gisaid_status_df = genbank_status_df[["gs-sample_name", "gs-sequence_name"]] + gisaid_status_df = genbank_status_df[["gs-sample_name", "sequence_name"]] # Add required gisaid fields metadata_df = pd.read_csv(metadata, header = 0, dtype = str, engine = "python", encoding="utf-8", index_col=False) if "FLU" in organism: @@ -171,8 +199,8 @@ def update_gisaid_files(organism: str, submission_dir: str, submission_status_fi elif "COV" in organism: column_name = "virus_name" metadata_df = metadata_df.merge(gisaid_status_df, how="inner", left_on=column_name, right_on="gs-sample_name") - fasta_names = metadata_df["gs-sequence_name"].tolist() - metadata_df = metadata_df.drop(columns=["gs-sample_name", "gs-sequence_name"]) + fasta_names = metadata_df["sequence_name"].tolist() + metadata_df = metadata_df.drop(columns=["gs-sample_name", "sequence_name"]) metadata_df.to_csv(orig_metadata, header = True, index = False) fasta_dict = [] with open(orig_fasta, "r") as fsa: diff --git a/seqsender.py b/seqsender.py index 33e8aac..1455823 100755 --- a/seqsender.py +++ b/seqsender.py @@ -25,6 +25,8 @@ from settings import VERSION +import sys + # Define current time STARTTIME = datetime.now() diff --git a/settings.py b/settings.py index 73d9278..b1f0b7d 100755 --- a/settings.py +++ b/settings.py @@ -54,12 +54,14 @@ # Genbank metadata regex GENBANK_REGEX = "^gb-sample_name$" +# Also added sequence_name to these Genbank entries since I needed it for GISAID, but haven't tested the GenBank workflow yet # GenBank source file metadata regex -GENBANK_REGEX_SRC = "^gb-sample_name$|^src-|^bioproject$|^organism$|^collection_date$" +GENBANK_REGEX_SRC = "^sequence_name$|^gb-sample_name$|^src-|^bioproject$|^organism$|^collection_date$" # GenBank comment file metadata regex -GENBANK_REGEX_CMT = "^gb-sample_name$|^cmt-|^organism$|^collection_date$" +GENBANK_REGEX_CMT = "^sequence_name$|^gb-sample_name$|^cmt-|^organism$|^collection_date$" ##### GISAID settings ##### # GISAID metadata regex -GISAID_REGEX = "^gs-|^collection_date$|^authors" +# Added ^sequence_name$ here - it seemed to be getting filtered out too early and causing issues +GISAID_REGEX = "^gs-|^sequence_name$|^collection_date$|^authors$" diff --git a/upload_log.py b/upload_log.py index d960c4a..6874096 100755 --- a/upload_log.py +++ b/upload_log.py @@ -87,7 +87,10 @@ def validate_submission_status_df(metadata: pd.DataFrame, database: List[str]) - sys.exit(1) # Update values of existing submission_status.csv file -def update_submission_status_csv(submission_dir: str, update_database: str, update_df: pd.DataFrame): +def update_submission_status_csv(submission_dir: str, update_database: str, update_df: pd.DataFrame, update_count: dict = {}): + # track changes to update_df + if update_database not in update_count: + update_count[update_database] = 0 # Pop off directory if inside database directory if os.path.split(submission_dir)[1] in ["BIOSAMPLE", "SRA", "GENBANK", "GISAID"]: submission_dir = os.path.split(submission_dir)[0] @@ -99,17 +102,31 @@ def update_submission_status_csv(submission_dir: str, update_database: str, upda validate_submission_status_df(metadata=df, database=all_databases) if f"{SAMPLE_NAME_DATABASE_PREFIX[update_database]}sample_name" in update_df: sample_column = (SAMPLE_NAME_DATABASE_PREFIX[update_database] + "sample_name") + df = df.set_index(sample_column, drop = False) + update_df = update_df.set_index(sample_column) + df.update(update_df) + df = df.reset_index(drop = True) + update_count[update_database] += 1 + validate_submission_status_df(metadata=df, database=all_databases) + file_handler.save_csv(df=df, file_path=submission_status_file) elif f"{SAMPLE_NAME_DATABASE_PREFIX[update_database]}segment_name" in update_df: sample_column = (SAMPLE_NAME_DATABASE_PREFIX[update_database] + "segment_name") + df = df.set_index(sample_column, drop = False) + update_df = update_df.set_index(sample_column) + df.update(update_df) + df = df.reset_index(drop = True) + update_count[update_database] += 1 + validate_submission_status_df(metadata=df, database=all_databases) + file_handler.save_csv(df=df, file_path=submission_status_file) else: - print(f"Error: Unable to update 'submission_status.csv' for '{update_database}' at '{submission_dir}'.", file=sys.stderr) - sys.exit(1) - df = df.set_index(sample_column, drop = False) - update_df = update_df.set_index(sample_column) - df.update(update_df) - df = df.reset_index(drop = True) - validate_submission_status_df(metadata=df, database=all_databases) - file_handler.save_csv(df=df, file_path=submission_status_file) + # No longer exits when unable to update since there may be valid reasons update_df is empty (submission w/only isolates or only segments). + # Alerts the user until this function performs at least one successful update during a submission workflow. + # Also changed the behavior of process_gisaid_log so that situation ideally shouldn't come up much + if update_count[update_database] == 0: + print(f"Error: Unable to update 'submission_status.csv' for '{update_database}' at '{submission_dir}'. The log file may be empty.", file=sys.stderr) + # sys.exit(1) + + return update_count # Create new row in submission_log.csv for database submission def create_submission_log(database: str, organism: str, submission_name: str, submission_dir: str, database_dir: str, config_file: str, submission_status: str, submission_id: str, submission_type: str) -> None: @@ -134,7 +151,20 @@ def create_submission_log(database: str, organism: str, submission_name: str, su df.loc[len(df)] = new_entry # type: ignore # Remove duplicates and keep latest update df = df.drop_duplicates(subset = ["Submission_Name", "Organism", "Database", "Submission_Type", "Config_File"], keep = "last", ignore_index = True) - file_handler.save_csv(df=df, file_path=submission_dir, file_name="submission_log.csv") + # Ran into permissions issues writing this to submission_dir - all the other outputs wrote successfully to submission_dir/submission_files and submission_dir/submission_files/GISAID + # Probably really uncommon, but this handles that error and tries to output submission info to stdout instead - might just add perms check for the entire submission directory tree + try: + file_handler.save_csv(df=df, file_path=submission_dir, file_name="submission_log.csv") + print(f"Successfully updated submission_log.csv at '{submission_dir}'.") + except PermissionError as e: + print(f"Permission error: Unable to write to 'submission_log.csv' at '{submission_dir}'. Please check file and directory permissions.", file=sys.stderr) + print(f"Attempting to write submission log to stdout instead.") + df.to_csv(sys.stdout, header=True, index=False) + sys.exit(1) + except Exception as e: + print(f"An unexpected error occurred while trying to save 'submission_log.csv': {e}", file=sys.stderr) + sys.exit(1) + #file_handler.save_csv(df=df, file_path=submission_dir, file_name="submission_log.csv") # Update values of existing submission_log.csv file def update_submission_log(database: str, organism: str, submission_name: str, submission_log_dir: str, submission_dir: str, submission_status: str, submission_id: str, submission_type: str) -> None: