diff --git a/nmdc_automation/api/nmdcapi.py b/nmdc_automation/api/nmdcapi.py index 6347e2a2..15415152 100755 --- a/nmdc_automation/api/nmdcapi.py +++ b/nmdc_automation/api/nmdcapi.py @@ -16,7 +16,10 @@ import logging from tenacity import retry, wait_exponential, stop_after_attempt -logging.basicConfig(level=logging.INFO) +logging_level = os.getenv("NMDC_LOG_LEVEL", logging.DEBUG) +logging.basicConfig( + level=logging_level, format="%(asctime)s %(levelname)s: %(message)s" +) logger = logging.getLogger(__name__) SECONDS_IN_DAY = 86400 @@ -74,11 +77,7 @@ def _get_token(self, *args, **kwargs): return _get_token - @retry( - wait=wait_exponential(multiplier=4, min=8, max=120), - stop=stop_after_attempt(6), - reraise=True, - ) + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) def get_token(self): """ Get a token using a client id/secret. @@ -120,12 +119,13 @@ def get_token(self): "Content-Type": "application/json", "Authorization": "Bearer %s" % (self.token), } - logging.info(f"New token expires at {self.expires_at}") + logging.debug(f"New token expires at {self.expires_at}") return response_body def get_header(self): return self.header + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def minter(self, id_type, informed_by=None): url = f"{self._base_url}pids/mint" @@ -143,6 +143,7 @@ def minter(self, id_type, informed_by=None): raise ValueError("Failed to bind metadata to pid") return id + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def mint(self, ns, typ, ct): """ @@ -155,8 +156,11 @@ def mint(self, ns, typ, ct): url = self._base_url + "ids/mint" d = {"populator": "", "naa": ns, "shoulder": typ, "number": ct} resp = requests.post(url, headers=self.header, data=json.dumps(d)) + if not resp.ok: + resp.raise_for_status() return resp.json() + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def get_object(self, obj, decode=False): """ @@ -164,6 +168,8 @@ def get_object(self, obj, decode=False): """ url = "%sobjects/%s" % (self._base_url, obj) resp = requests.get(url, headers=self.header) + if not resp.ok: + resp.raise_for_status() data = resp.json() if decode and "description" in data: try: @@ -173,6 +179,8 @@ def get_object(self, obj, decode=False): return data + + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def create_object(self, fn, description, dataurl): """ @@ -214,8 +222,11 @@ def create_object(self, fn, description, dataurl): "self_uri": "todo", } resp = requests.post(url, headers=self.header, data=json.dumps(d)) + if not resp.ok: + resp.raise_for_status() return resp.json() + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def post_objects(self, obj_data): url = self._base_url + "workflows/workflow_executions" @@ -225,23 +236,29 @@ def post_objects(self, obj_data): resp.raise_for_status() return resp.json() + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def set_type(self, obj, typ): url = "%sobjects/%s/types" % (self._base_url, obj) d = [typ] resp = requests.put(url, headers=self.header, data=json.dumps(d)) + if not resp.ok: + resp.raise_for_status() return resp.json() + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def bump_time(self, obj): url = "%sobjects/%s" % (self._base_url, obj) now = datetime.today().isoformat() - d = {"created_time": now} resp = requests.patch(url, headers=self.header, data=json.dumps(d)) + if not resp.ok: + resp.raise_for_status() return resp.json() # TODO test that this concatenates multi-page results + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def list_jobs(self, filt=None, max=100) -> List[dict]: url = "%sjobs?max_page_size=%s" % (self._base_url, max) @@ -259,8 +276,6 @@ def list_jobs(self, filt=None, max=100) -> List[dict]: except Exception as e: logging.error(f"Failed to parse response: {resp.text}") raise e - - if "resources" not in response_json: logging.warning(str(response_json)) break @@ -270,15 +285,19 @@ def list_jobs(self, filt=None, max=100) -> List[dict]: url = orig_url + "&page_token=%s" % (response_json["next_page_token"]) return results + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token - def get_job(self, job): - url = "%sjobs/%s" % (self._base_url, job) + def get_job(self, job_id: str): + url = "%sjobs/%s" % (self._base_url, job_id) resp = requests.get(url, headers=self.header) + if not resp.ok: + resp.raise_for_status return resp.json() + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token - def claim_job(self, job): - url = "%sjobs/%s:claim" % (self._base_url, job) + def claim_job(self, job_id: str): + url = "%sjobs/%s:claim" % (self._base_url, job_id) resp = requests.post(url, headers=self.header) if resp.status_code == 409: claimed = True @@ -302,6 +321,7 @@ def _page_query(self, url): url = orig_url + "&page_token=%s" % (resp["next_page_token"]) return results + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def list_objs(self, filt=None, max_page_size=40): url = "%sobjects?max_page_size=%d" % (self._base_url, max_page_size) @@ -310,6 +330,7 @@ def list_objs(self, filt=None, max_page_size=40): results = self._page_query(url) return results + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def list_ops(self, filt=None, max_page_size=40): url = "%soperations?max_page_size=%d" % (self._base_url, max_page_size) @@ -329,12 +350,16 @@ def list_ops(self, filt=None, max_page_size=40): url = orig_url + "&page_token=%s" % (resp["next_page_token"]) return results + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def get_op(self, opid): url = "%soperations/%s" % (self._base_url, opid) resp = requests.get(url, headers=self.header) + if not resp.ok: + resp.raise_for_status() return resp.json() + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def update_op(self, opid, done=None, results=None, meta=None): """ @@ -357,8 +382,11 @@ def update_op(self, opid, done=None, results=None, meta=None): d["metadata"] = cur["metadata"] d["metadata"]["extra"] = meta resp = requests.patch(url, headers=self.header, data=json.dumps(d)) + if not resp.ok: + resp.raise_for_status() return resp.json() + @retry(wait=wait_exponential(multiplier=4, min=8, max=120), stop=stop_after_attempt(6), reraise=True) @refresh_token def run_query(self, query): url = "%squeries:run" % self._base_url diff --git a/nmdc_automation/run_process/run_import.py b/nmdc_automation/run_process/run_import.py index 6301b3aa..f2972c84 100644 --- a/nmdc_automation/run_process/run_import.py +++ b/nmdc_automation/run_process/run_import.py @@ -13,7 +13,7 @@ from nmdc_automation.api import NmdcRuntimeApi -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index d15f6c02..f535a4d4 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -12,6 +12,7 @@ import importlib.resources from functools import lru_cache import traceback +import os from nmdc_schema.nmdc import Database from nmdc_automation.api import NmdcRuntimeApi @@ -23,8 +24,12 @@ DEFAULT_STATE_DIR = Path(__file__).parent / "_state" DEFAULT_STATE_FILE = DEFAULT_STATE_DIR / "state.json" INITIAL_STATE = {"jobs": []} + +logging_level = os.getenv("NMDC_LOG_LEVEL", logging.DEBUG) +logging.basicConfig( + level=logging_level, format="%(asctime)s %(levelname)s: %(message)s" +) logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) class FileHandler: diff --git a/nmdc_automation/workflow_automation/wfutils.py b/nmdc_automation/workflow_automation/wfutils.py index 667acbc1..056f455f 100755 --- a/nmdc_automation/workflow_automation/wfutils.py +++ b/nmdc_automation/workflow_automation/wfutils.py @@ -22,6 +22,11 @@ DEFAULT_MAX_RETRIES = 2 +logging_level = os.getenv("NMDC_LOG_LEVEL", logging.DEBUG) +logging.basicConfig( + level=logging_level, format="%(asctime)s %(levelname)s: %(message)s" +) +logger = logging.getLogger(__name__) class JobRunnerABC(ABC): """Abstract base class for job runners""" @@ -132,7 +137,7 @@ def generate_submission_files(self) -> Dict[str, Any]: "workflowInputs": open(_json_tmp(self._generate_workflow_inputs()), "rb"), "labels": open(_json_tmp(self._generate_workflow_labels()), "rb"), } except Exception as e: - logging.error(f"Failed to generate submission files: {e}") + logger.error(f"Failed to generate submission files: {e}") self._cleanup_files(list(files.values())) raise e return files @@ -144,7 +149,7 @@ def _cleanup_files(self, files: List[Union[tempfile.NamedTemporaryFile, tempfile file.close() os.unlink(file.name) except Exception as e: - logging.error(f"Failed to cleanup file: {e}") + logger.error(f"Failed to cleanup file: {e}") def submit_job(self, force: bool = False) -> Optional[str]: """ @@ -154,7 +159,7 @@ def submit_job(self, force: bool = False) -> Optional[str]: """ status = self.workflow.last_status if status in self.NO_SUBMIT_STATES and not force: - logging.info(f"Job {self.job_id} in state {status}, skipping submission") + logger.info(f"Job {self.job_id} in state {status}, skipping submission") return cleanup_files = [] try: @@ -165,12 +170,12 @@ def submit_job(self, force: bool = False) -> Optional[str]: response.raise_for_status() self.metadata = response.json() self.job_id = self.metadata["id"] - logging.info(f"Submitted job {self.job_id}") + logger.info(f"Submitted job {self.job_id}") else: - logging.info(f"Dry run: skipping job submission") + logger.info(f"Dry run: skipping job submission") self.job_id = "dry_run" - logging.info(f"Job {self.job_id} submitted") + logger.info(f"Job {self.job_id} submitted") start_time = datetime.now(pytz.utc).isoformat() # update workflow state self.workflow.done = False @@ -179,7 +184,7 @@ def submit_job(self, force: bool = False) -> Optional[str]: self.workflow.update_state({"last_status": "Submitted"}) return self.job_id except Exception as e: - logging.error(f"Failed to submit job: {e}") + logger.error(f"Failed to submit job: {e}") raise e finally: self._cleanup_files(cleanup_files) @@ -191,7 +196,7 @@ def get_job_status(self) -> str: status_url = f"{self.service_url}/{self.workflow.cromwell_jobid}/status" # There can be a delay between submitting a job and it # being available in Cromwell so handle 404 errors - logging.debug(f"Getting job status from {status_url}") + logger.debug(f"Getting job status from {status_url}") try: response = requests.get(status_url) response.raise_for_status() @@ -355,9 +360,9 @@ def fetch_release_file(self, filename: str, suffix: str = None) -> str: Download a release file from the Git repository and save it as a temporary file. Note: the temporary file is not deleted automatically. """ - logging.debug(f"Fetching release file: {filename}") + logger.debug(f"Fetching release file: {filename}") url = self._build_release_url(filename) - logging.debug(f"Fetching release file from URL: {url}") + logger.debug(f"Fetching release file from URL: {url}") # download the file as a stream to handle large files response = requests.get(url, stream=True) try: @@ -371,9 +376,9 @@ def fetch_release_file(self, filename: str, suffix: str = None) -> str: def _build_release_url(self, filename: str) -> str: """Build the URL for a release file in the Git repository.""" - logging.debug(f"Building release URL for {filename}") + logger.debug(f"Building release URL for {filename}") release = self.config["release"] - logging.debug(f"Release: {release}") + logger.debug(f"Release: {release}") base_url = self.config["git_repo"].rstrip("/") url = f"{base_url}{self.GIT_RELEASES_PATH}/{release}/{filename}" return url @@ -388,7 +393,7 @@ def _write_stream_to_file(self, response: requests.Response, file: tempfile.Name except Exception as e: # clean up the temporary file Path(file.name).unlink(missing_ok=True) - logging.error(f"Error writing stream to file: {e}") + logger.error(f"Error writing stream to file: {e}") raise e @@ -508,16 +513,16 @@ def make_data_objects(self, output_dir: Union[str, Path] = None) -> List[DataObj for output_spec in self.workflow.data_outputs: # specs are defined in the workflow.yaml file under Outputs output_key = f"{self.workflow.input_prefix}.{output_spec['output']}" - logging.info(f"Processing output {output_key}") + logger.info(f"Processing output {output_key}") # get the full path to the output file from the job_runner output_file_path = Path(self.job.outputs[output_key]) - logging.info(f"Output file path: {output_file_path}") + logger.info(f"Output file path: {output_file_path}") if output_key not in self.job.outputs: if output_spec.get("optional"): - logging.debug(f"Optional output {output_key} not found in job outputs") + logger.debug(f"Optional output {output_key} not found in job outputs") continue else: - logging.warning(f"Required output {output_key} not found in job outputs") + logger.warning(f"Required output {output_key} not found in job outputs") continue @@ -531,7 +536,7 @@ def make_data_objects(self, output_dir: Union[str, Path] = None) -> List[DataObj # copy the file to the output directory shutil.copy(output_file_path, new_output_file_path) else: - logging.warning(f"Output directory not provided, not copying {output_file_path} to output directory") + logger.warning(f"Output directory not provided, not copying {output_file_path} to output directory") # create a DataObject object data_object = DataObject( @@ -562,7 +567,7 @@ def make_workflow_execution(self, data_objects: List[DataObject]) -> WorkflowExe if attr_val.startswith("{outputs."): match = re.match(pattern, attr_val) if not match: - logging.warning(f"Invalid output reference {attr_val}") + logger.warning(f"Invalid output reference {attr_val}") continue logical_names.add(match.group(1)) field_names.add(match.group(2)) @@ -579,7 +584,7 @@ def make_workflow_execution(self, data_objects: List[DataObject]) -> WorkflowExe if field_name in data: wf_dict[field_name] = data[field_name] else: - logging.warning(f"Field {field_name} not found in {data_path}") + logger.warning(f"Field {field_name} not found in {data_path}") wfe = workflow_process_factory(wf_dict) return wfe