diff --git a/app.py b/app.py index bf55843..41ea715 100644 --- a/app.py +++ b/app.py @@ -1,8 +1,10 @@ import boto3 import datetime +from environs import Env import json import logging import os +import shutil import signal import urllib.parse from django.db import transaction # type: ignore @@ -21,6 +23,9 @@ stats = Instrumentation() +env = Env() +env.read_env() + def process_import_function(event, context): """ @@ -40,6 +45,15 @@ def process_import_function(event, context): } } + Example SQS Message: + { + 'bucket': 'openstates-realtime-bills', + 'file_archiving_enabled': False, + 'file_path': 'ny/bill_184e17ec-919f-11ef-b06c-0a58a9feac02.json', + 'jurisdiction_id': 'ocd-jurisdiction/country:us/state:ny/government', + 'jurisdiction_name': 'New York' + } + """ datadir = "/tmp/" @@ -50,12 +64,16 @@ def process_import_function(event, context): unique_jurisdictions = {} # Get the uploaded file's information - messages = batch_retrieval_from_sqs() + sqs_fetch_batch_size = env.int("SQS_FETCH_BATCH_SIZE", 600) + sqs_delete_fetched_messages = env.bool("SQS_DELETE_FETCHED_MESSAGES", True) + messages = batch_retrieval_from_sqs( + sqs_fetch_batch_size, sqs_delete_fetched_messages + ) if not messages: return bucket = messages[0].get("bucket") - file_archiving_enabled = os.environ.get("FILE_ARCHIVING_ENABLED") + file_archiving_enabled = env.bool("FILE_ARCHIVING_ENABLED", False) for message in messages: bucket = message.get("bucket") key = message.get("file_path") @@ -114,6 +132,12 @@ def process_import_function(event, context): for abbreviation, juris in unique_jurisdictions.items(): file_paths = juris["keys"] jur_id = juris["id"] + if len(file_paths) == 0: + logger.error( + f"Was about to do an import of {jur_id} " + f"with an empty file_paths list, skipping it" + ) + continue logger.info(f"importing {jur_id}...") try: do_import(jur_id, f"{datadir}{abbreviation}") @@ -131,15 +155,23 @@ def process_import_function(event, context): file_archiving_enabled and isinstance(file_archiving_enabled, bool) ) or file_archiving_enabled == "True": - archive_files(bucket, file_paths) + archive_individual_files(bucket, file_paths, filedir) # delete object from original bucket for file_path in file_paths: s3_client.delete_object(Bucket=bucket, Key=file_path) logger.info(f"Deleted files :: {file_paths}") except Exception as e: + # Create zip of files we processed for debugging + failed_import_dir = f"{datadir}{abbreviation}" + # upload zip file that contains the directory failed_import_dir + archive_key = archive_jurisdiction_file_folder( + abbreviation, bucket, datadir, failed_import_dir + ) + logger.error( - f"Error importing jurisdiction {jur_id}: {e}" + f"Error importing jurisdiction {jur_id}, " + f"stored snapshot of import dir as {archive_key}, error: {e}" ) # noqa: E501 continue @@ -164,9 +196,33 @@ def remove_duplicate_message(items): return filtered_items -def archive_files(bucket, all_keys, dest="archive"): +def archive_jurisdiction_file_folder( + jurisdiction_abbreviation, bucket, tmp_folder_path, file_folder_path +): + # Make a zip file of the jurisdiction's source data + now = datetime.datetime.now() + zip_filename = f"{jurisdiction_abbreviation}-{now.isoformat()}" + zip_filepath = os.path.join(tmp_folder_path, zip_filename) + # shutil puts all the files into the zip folder at root level. + # It does not include the folder in contents + # it does add the ".zip" extension + archive_filename = shutil.make_archive( + zip_filepath, "zip", file_folder_path + ) + + # Upload to archive section of S3 bucket + s3_destination_key = f"archive/{zip_filename}.zip" + s3_resource.meta.client.upload_file( + archive_filename, bucket, s3_destination_key + ) + + return s3_destination_key + + +def archive_individual_files(bucket, all_keys, dest="archive"): """ Archive the processed file to avoid possible scenarios of race conditions. + We currently use `meta.client.copy` instead of `client.copy` b/c it can copy multiple files via multiple threads, since we have batching in view. @@ -198,7 +254,7 @@ def archive_files(bucket, all_keys, dest="archive"): continue -def retrieve_messages_from_queue(): +def retrieve_messages_from_queue(delete_after_fetch=True): """ Get the file paths from the SQS. """ @@ -206,7 +262,7 @@ def retrieve_messages_from_queue(): # Create SQS client sqs = boto3.client("sqs") - sqs_url = os.environ.get("SQS_QUEUE_URL") + sqs_url = env.str("SQS_QUEUE_URL") # Receive message from SQS queue response = sqs.receive_message( @@ -227,13 +283,20 @@ def retrieve_messages_from_queue(): receipt_handle = message["ReceiptHandle"] - # Delete received message from queue - sqs.delete_message(QueueUrl=sqs_url, ReceiptHandle=receipt_handle) - logger.debug(f"Received and deleted message: {receipt_handle}") + if delete_after_fetch: + # Delete received message from queue + sqs.delete_message( + QueueUrl=sqs_url, ReceiptHandle=receipt_handle + ) + logger.debug(f"Received and deleted message: {receipt_handle}") + else: + logger.debug( + f"Received message (no deletion): {receipt_handle}" + ) return message_bodies -def batch_retrieval_from_sqs(batch_size=600): +def batch_retrieval_from_sqs(batch_size=600, delete_after_fetch=True): """ Retrieve messages from SQS in batches """ @@ -241,12 +304,10 @@ def batch_retrieval_from_sqs(batch_size=600): # SQS allows a maximum of 10 messages to be retrieved at a time for _ in range(batch_size // 10): - msg.extend(retrieve_messages_from_queue()) + msg.extend(retrieve_messages_from_queue(delete_after_fetch)) filtered_messages = remove_duplicate_message(msg) - logger.info( - f"message_count: {len(filtered_messages)} received & deleted from SQS" - ) + logger.info(f"message_count: {len(filtered_messages)} received from SQS") return filtered_messages @@ -326,3 +387,8 @@ def do_import(jurisdiction_id: str, datadir: str) -> None: Jurisdiction.objects.filter(id=jurisdiction_id).update( latest_bill_update=datetime.datetime.utcnow() ) + + +# run process_import_function if main +if __name__ == "__main__": + process_import_function({}, {}) diff --git a/poetry.lock b/poetry.lock index f117f27..a1d8ea8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -458,6 +458,26 @@ files = [ {file = "ebcdic-1.1.1-py2.py3-none-any.whl", hash = "sha256:33b4cb729bc2d0bf46cc1847b0e5946897cb8d3f53520c5b9aa5fa98d7e735f1"}, ] +[[package]] +name = "environs" +version = "11.1.0" +description = "simplified environment variable parsing" +optional = false +python-versions = ">=3.8" +files = [ + {file = "environs-11.1.0-py3-none-any.whl", hash = "sha256:64a95eb490370ea042c761203ff3b8c4b432f799dc20d623de9b00fae21efa16"}, + {file = "environs-11.1.0.tar.gz", hash = "sha256:8035fe255de3a095156ba79a1b76ff576ce60367817fe20c20bbe67869c8c7b2"}, +] + +[package.dependencies] +marshmallow = ">=3.13.0" +python-dotenv = "*" + +[package.extras] +dev = ["environs[tests]", "pre-commit (>=3.5,<5.0)", "tox"] +django = ["dj-database-url", "dj-email-url", "django-cache-url"] +tests = ["environs[django]", "pytest"] + [[package]] name = "et-xmlfile" version = "1.1.0" @@ -1134,6 +1154,25 @@ files = [ {file = "markupsafe-3.0.1.tar.gz", hash = "sha256:3e683ee4f5d0fa2dde4db77ed8dd8a876686e3fc417655c2ece9a90576905344"}, ] +[[package]] +name = "marshmallow" +version = "3.23.1" +description = "A lightweight library for converting complex datatypes to and from native Python datatypes." +optional = false +python-versions = ">=3.9" +files = [ + {file = "marshmallow-3.23.1-py3-none-any.whl", hash = "sha256:fece2eb2c941180ea1b7fcbd4a83c51bfdd50093fdd3ad2585ee5e1df2508491"}, + {file = "marshmallow-3.23.1.tar.gz", hash = "sha256:3a8dfda6edd8dcdbf216c0ede1d1e78d230a6dc9c5a088f58c4083b974a0d468"}, +] + +[package.dependencies] +packaging = ">=17.0" + +[package.extras] +dev = ["marshmallow[tests]", "pre-commit (>=3.5,<5.0)", "tox"] +docs = ["alabaster (==1.0.0)", "autodocsumm (==0.2.14)", "sphinx (==8.1.3)", "sphinx-issues (==5.0.0)", "sphinx-version-warning (==1.1.2)"] +tests = ["pytest", "simplejson"] + [[package]] name = "matplotlib-inline" version = "0.1.7" @@ -1789,6 +1828,20 @@ files = [ [package.dependencies] six = ">=1.5" +[[package]] +name = "python-dotenv" +version = "1.0.1" +description = "Read key-value pairs from a .env file and set them as environment variables" +optional = false +python-versions = ">=3.8" +files = [ + {file = "python-dotenv-1.0.1.tar.gz", hash = "sha256:e324ee90a023d808f1959c46bcbc04446a10ced277783dc6ee09987c37ec10ca"}, + {file = "python_dotenv-1.0.1-py3-none-any.whl", hash = "sha256:f7b63ef50f1b690dddf550d03497b66d609393b40b564ed0d674909a68ebf16a"}, +] + +[package.extras] +cli = ["click (>=5.0)"] + [[package]] name = "python-pptx" version = "0.6.23" @@ -2350,4 +2403,4 @@ wheel = "*" [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "d6320db60d693f54bd8ef46e64e1011cf711a8de789384578027b1f44b843134" +content-hash = "a2444c94d717eee02b6b66db44247809046648a69e23b902cd24f31126a3c910" diff --git a/pyproject.toml b/pyproject.toml index 12173a6..5eb5ee8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ textract = "^1.6.5" extract-msg = "0.28.7" black = "^24.4.2" setuptools = "69.5.1" +environs = "^11.1.0" [build-system]