Skip to content

Commit

Permalink
Add failed-import batch archiving to aid debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
jessemortenson committed Oct 24, 2024
1 parent d55c98d commit 583a0ff
Showing 1 changed file with 61 additions and 11 deletions.
72 changes: 61 additions & 11 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import shutil
import signal
import urllib.parse
from django.db import transaction # type: ignore
Expand Down Expand Up @@ -40,6 +41,15 @@ def process_import_function(event, context):
}
}
Example SQS Message:
{
'bucket': 'openstates-realtime-bills',
'file_archiving_enabled': False,
'file_path': 'ny/bill_184e17ec-919f-11ef-b06c-0a58a9feac02.json',
'jurisdiction_id': 'ocd-jurisdiction/country:us/state:ny/government',
'jurisdiction_name': 'New York'
}
"""

datadir = "/tmp/"
Expand All @@ -50,7 +60,14 @@ def process_import_function(event, context):
unique_jurisdictions = {}

# Get the uploaded file's information
messages = batch_retrieval_from_sqs()
sqs_fetch_batch_size = int(os.environ.get("SQS_FETCH_BATCH_SIZE", 600))
sqs_delete_fetched_messages = os.environ.get("SQS_DELETE_FETCHED_MESSAGES", True)
if (
sqs_delete_fetched_messages is not False
and sqs_delete_fetched_messages.lower() != "true" and sqs_delete_fetched_messages != "1"
):
sqs_delete_fetched_messages = True
messages = batch_retrieval_from_sqs(sqs_fetch_batch_size, sqs_delete_fetched_messages)
if not messages:
return

Expand Down Expand Up @@ -114,6 +131,9 @@ def process_import_function(event, context):
for abbreviation, juris in unique_jurisdictions.items():
file_paths = juris["keys"]
jur_id = juris["id"]
if len(file_paths) == 0:
logger.error(f"Was about to do an import of {jur_id} with an empty file_paths list, skipping it")
continue
logger.info(f"importing {jur_id}...")
try:
do_import(jur_id, f"{datadir}{abbreviation}")
Expand All @@ -131,15 +151,20 @@ def process_import_function(event, context):
file_archiving_enabled
and isinstance(file_archiving_enabled, bool)
) or file_archiving_enabled == "True":
archive_files(bucket, file_paths)
archive_individual_files(bucket, file_paths, filedir)

# delete object from original bucket
for file_path in file_paths:
s3_client.delete_object(Bucket=bucket, Key=file_path)
logger.info(f"Deleted files :: {file_paths}")
except Exception as e:
# Create zip of files we processed for debugging
failed_import_dir = f"{datadir}{abbreviation}"
# upload zip file that contains the directory failed_import_dir
archive_key = archive_jurisdiction_file_folder(abbreviation, bucket, datadir, failed_import_dir)

logger.error(
f"Error importing jurisdiction {jur_id}: {e}"
f"Error importing jurisdiction {jur_id}, stored snapshot of import dir as {archive_key}, error: {e}"
) # noqa: E501
continue

Expand All @@ -164,9 +189,26 @@ def remove_duplicate_message(items):
return filtered_items


def archive_files(bucket, all_keys, dest="archive"):
def archive_jurisdiction_file_folder(jurisdiction_abbreviation, bucket, tmp_folder_path, file_folder_path):
# Make a zip file of the jurisdiction's source data
now = datetime.datetime.now()
zip_filename = f"{jurisdiction_abbreviation}-{now.isoformat()}"
zip_filepath = os.path.join(tmp_folder_path, zip_filename)
# shutil puts all the files into the zip folder at root level. It does not include the folder in contents
# it does add the ".zip" extension
archive_filename = shutil.make_archive(zip_filepath, 'zip', file_folder_path)

# Upload to archive section of S3 bucket
s3_destination_key = f"archive/{zip_filename}.zip"
s3_resource.meta.client.upload_file(archive_filename, bucket, s3_destination_key)

return s3_destination_key


def archive_individual_files(bucket, all_keys, dest="archive"):
"""
Archive the processed file to avoid possible scenarios of race conditions.
We currently use `meta.client.copy` instead of `client.copy` b/c
it can copy multiple files via multiple threads, since we have batching
in view.
Expand Down Expand Up @@ -198,7 +240,7 @@ def archive_files(bucket, all_keys, dest="archive"):
continue


def retrieve_messages_from_queue():
def retrieve_messages_from_queue(delete_after_fetch=True):
"""
Get the file paths from the SQS.
"""
Expand Down Expand Up @@ -227,25 +269,28 @@ def retrieve_messages_from_queue():

receipt_handle = message["ReceiptHandle"]

# Delete received message from queue
sqs.delete_message(QueueUrl=sqs_url, ReceiptHandle=receipt_handle)
logger.debug(f"Received and deleted message: {receipt_handle}")
if delete_after_fetch:
# Delete received message from queue
sqs.delete_message(QueueUrl=sqs_url, ReceiptHandle=receipt_handle)
logger.debug(f"Received and deleted message: {receipt_handle}")
else:
logger.debug(f"Received message (no deletion): {receipt_handle}")
return message_bodies


def batch_retrieval_from_sqs(batch_size=600):
def batch_retrieval_from_sqs(batch_size=600, delete_after_fetch=True):
"""
Retrieve messages from SQS in batches
"""
msg = []

# SQS allows a maximum of 10 messages to be retrieved at a time
for _ in range(batch_size // 10):
msg.extend(retrieve_messages_from_queue())
msg.extend(retrieve_messages_from_queue(delete_after_fetch))
filtered_messages = remove_duplicate_message(msg)

logger.info(
f"message_count: {len(filtered_messages)} received & deleted from SQS"
f"message_count: {len(filtered_messages)} received from SQS"
)
return filtered_messages

Expand Down Expand Up @@ -326,3 +371,8 @@ def do_import(jurisdiction_id: str, datadir: str) -> None:
Jurisdiction.objects.filter(id=jurisdiction_id).update(
latest_bill_update=datetime.datetime.utcnow()
)


# run process_import_function if main
if __name__ == "__main__":
process_import_function({}, {})

0 comments on commit 583a0ff

Please sign in to comment.