Skip to content

Commit

Permalink
Merge pull request #24 from openstates/archive-failed-jurisdiction-im…
Browse files Browse the repository at this point in the history
…port/DATA-5013

Add failed-import batch archiving to aid debugging
  • Loading branch information
jessemortenson authored Nov 11, 2024
2 parents 0db1c01 + 851cc0c commit bf13ab4
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 16 deletions.
96 changes: 81 additions & 15 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import boto3
import datetime
from environs import Env
import json
import logging
import os
import shutil
import signal
import urllib.parse
from django.db import transaction # type: ignore
Expand All @@ -21,6 +23,9 @@

stats = Instrumentation()

env = Env()
env.read_env()


def process_import_function(event, context):
"""
Expand All @@ -40,6 +45,15 @@ def process_import_function(event, context):
}
}
Example SQS Message:
{
'bucket': 'openstates-realtime-bills',
'file_archiving_enabled': False,
'file_path': 'ny/bill_184e17ec-919f-11ef-b06c-0a58a9feac02.json',
'jurisdiction_id': 'ocd-jurisdiction/country:us/state:ny/government',
'jurisdiction_name': 'New York'
}
"""

datadir = "/tmp/"
Expand All @@ -50,12 +64,16 @@ def process_import_function(event, context):
unique_jurisdictions = {}

# Get the uploaded file's information
messages = batch_retrieval_from_sqs()
sqs_fetch_batch_size = env.int("SQS_FETCH_BATCH_SIZE", 600)
sqs_delete_fetched_messages = env.bool("SQS_DELETE_FETCHED_MESSAGES", True)
messages = batch_retrieval_from_sqs(
sqs_fetch_batch_size, sqs_delete_fetched_messages
)
if not messages:
return

bucket = messages[0].get("bucket")
file_archiving_enabled = os.environ.get("FILE_ARCHIVING_ENABLED")
file_archiving_enabled = env.bool("FILE_ARCHIVING_ENABLED", False)
for message in messages:
bucket = message.get("bucket")
key = message.get("file_path")
Expand Down Expand Up @@ -114,6 +132,12 @@ def process_import_function(event, context):
for abbreviation, juris in unique_jurisdictions.items():
file_paths = juris["keys"]
jur_id = juris["id"]
if len(file_paths) == 0:
logger.error(
f"Was about to do an import of {jur_id} "
f"with an empty file_paths list, skipping it"
)
continue
logger.info(f"importing {jur_id}...")
try:
do_import(jur_id, f"{datadir}{abbreviation}")
Expand All @@ -131,15 +155,23 @@ def process_import_function(event, context):
file_archiving_enabled
and isinstance(file_archiving_enabled, bool)
) or file_archiving_enabled == "True":
archive_files(bucket, file_paths)
archive_individual_files(bucket, file_paths, filedir)

# delete object from original bucket
for file_path in file_paths:
s3_client.delete_object(Bucket=bucket, Key=file_path)
logger.info(f"Deleted files :: {file_paths}")
except Exception as e:
# Create zip of files we processed for debugging
failed_import_dir = f"{datadir}{abbreviation}"
# upload zip file that contains the directory failed_import_dir
archive_key = archive_jurisdiction_file_folder(
abbreviation, bucket, datadir, failed_import_dir
)

logger.error(
f"Error importing jurisdiction {jur_id}: {e}"
f"Error importing jurisdiction {jur_id}, "
f"stored snapshot of import dir as {archive_key}, error: {e}"
) # noqa: E501
continue

Expand All @@ -164,9 +196,33 @@ def remove_duplicate_message(items):
return filtered_items


def archive_files(bucket, all_keys, dest="archive"):
def archive_jurisdiction_file_folder(
jurisdiction_abbreviation, bucket, tmp_folder_path, file_folder_path
):
# Make a zip file of the jurisdiction's source data
now = datetime.datetime.now()
zip_filename = f"{jurisdiction_abbreviation}-{now.isoformat()}"
zip_filepath = os.path.join(tmp_folder_path, zip_filename)
# shutil puts all the files into the zip folder at root level.
# It does not include the folder in contents
# it does add the ".zip" extension
archive_filename = shutil.make_archive(
zip_filepath, "zip", file_folder_path
)

# Upload to archive section of S3 bucket
s3_destination_key = f"archive/{zip_filename}.zip"
s3_resource.meta.client.upload_file(
archive_filename, bucket, s3_destination_key
)

return s3_destination_key


def archive_individual_files(bucket, all_keys, dest="archive"):
"""
Archive the processed file to avoid possible scenarios of race conditions.
We currently use `meta.client.copy` instead of `client.copy` b/c
it can copy multiple files via multiple threads, since we have batching
in view.
Expand Down Expand Up @@ -198,15 +254,15 @@ def archive_files(bucket, all_keys, dest="archive"):
continue


def retrieve_messages_from_queue():
def retrieve_messages_from_queue(delete_after_fetch=True):
"""
Get the file paths from the SQS.
"""

# Create SQS client
sqs = boto3.client("sqs")

sqs_url = os.environ.get("SQS_QUEUE_URL")
sqs_url = env.str("SQS_QUEUE_URL")

# Receive message from SQS queue
response = sqs.receive_message(
Expand All @@ -227,26 +283,31 @@ def retrieve_messages_from_queue():

receipt_handle = message["ReceiptHandle"]

# Delete received message from queue
sqs.delete_message(QueueUrl=sqs_url, ReceiptHandle=receipt_handle)
logger.debug(f"Received and deleted message: {receipt_handle}")
if delete_after_fetch:
# Delete received message from queue
sqs.delete_message(
QueueUrl=sqs_url, ReceiptHandle=receipt_handle
)
logger.debug(f"Received and deleted message: {receipt_handle}")
else:
logger.debug(
f"Received message (no deletion): {receipt_handle}"
)
return message_bodies


def batch_retrieval_from_sqs(batch_size=600):
def batch_retrieval_from_sqs(batch_size=600, delete_after_fetch=True):
"""
Retrieve messages from SQS in batches
"""
msg = []

# SQS allows a maximum of 10 messages to be retrieved at a time
for _ in range(batch_size // 10):
msg.extend(retrieve_messages_from_queue())
msg.extend(retrieve_messages_from_queue(delete_after_fetch))
filtered_messages = remove_duplicate_message(msg)

logger.info(
f"message_count: {len(filtered_messages)} received & deleted from SQS"
)
logger.info(f"message_count: {len(filtered_messages)} received from SQS")
return filtered_messages


Expand Down Expand Up @@ -326,3 +387,8 @@ def do_import(jurisdiction_id: str, datadir: str) -> None:
Jurisdiction.objects.filter(id=jurisdiction_id).update(
latest_bill_update=datetime.datetime.utcnow()
)


# run process_import_function if main
if __name__ == "__main__":
process_import_function({}, {})
55 changes: 54 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ textract = "^1.6.5"
extract-msg = "0.28.7"
black = "^24.4.2"
setuptools = "69.5.1"
environs = "^11.1.0"


[build-system]
Expand Down

0 comments on commit bf13ab4

Please sign in to comment.