Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failed-import batch archiving to aid debugging #24

Merged
merged 6 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 81 additions & 15 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import boto3
import datetime
from environs import Env
import json
import logging
import os
import shutil
import signal
import urllib.parse
from django.db import transaction # type: ignore
Expand All @@ -21,6 +23,9 @@

stats = Instrumentation()

env = Env()
env.read_env()


def process_import_function(event, context):
"""
Expand All @@ -40,6 +45,15 @@ def process_import_function(event, context):
}
}

Example SQS Message:
{
'bucket': 'openstates-realtime-bills',
'file_archiving_enabled': False,
'file_path': 'ny/bill_184e17ec-919f-11ef-b06c-0a58a9feac02.json',
'jurisdiction_id': 'ocd-jurisdiction/country:us/state:ny/government',
'jurisdiction_name': 'New York'
}

"""

datadir = "/tmp/"
Expand All @@ -50,12 +64,16 @@ def process_import_function(event, context):
unique_jurisdictions = {}

# Get the uploaded file's information
messages = batch_retrieval_from_sqs()
sqs_fetch_batch_size = env.int("SQS_FETCH_BATCH_SIZE", 600)
sqs_delete_fetched_messages = env.bool("SQS_DELETE_FETCHED_MESSAGES", True)
messages = batch_retrieval_from_sqs(
sqs_fetch_batch_size, sqs_delete_fetched_messages
)
if not messages:
return

bucket = messages[0].get("bucket")
file_archiving_enabled = os.environ.get("FILE_ARCHIVING_ENABLED")
file_archiving_enabled = env.bool("FILE_ARCHIVING_ENABLED", False)
for message in messages:
bucket = message.get("bucket")
key = message.get("file_path")
Expand Down Expand Up @@ -114,6 +132,12 @@ def process_import_function(event, context):
for abbreviation, juris in unique_jurisdictions.items():
file_paths = juris["keys"]
jur_id = juris["id"]
if len(file_paths) == 0:
logger.error(
f"Was about to do an import of {jur_id} "
f"with an empty file_paths list, skipping it"
)
continue
logger.info(f"importing {jur_id}...")
try:
do_import(jur_id, f"{datadir}{abbreviation}")
Expand All @@ -131,15 +155,23 @@ def process_import_function(event, context):
file_archiving_enabled
and isinstance(file_archiving_enabled, bool)
) or file_archiving_enabled == "True":
archive_files(bucket, file_paths)
archive_individual_files(bucket, file_paths, filedir)

# delete object from original bucket
for file_path in file_paths:
s3_client.delete_object(Bucket=bucket, Key=file_path)
logger.info(f"Deleted files :: {file_paths}")
except Exception as e:
# Create zip of files we processed for debugging
failed_import_dir = f"{datadir}{abbreviation}"
# upload zip file that contains the directory failed_import_dir
archive_key = archive_jurisdiction_file_folder(
abbreviation, bucket, datadir, failed_import_dir
)

logger.error(
f"Error importing jurisdiction {jur_id}: {e}"
f"Error importing jurisdiction {jur_id}, "
f"stored snapshot of import dir as {archive_key}, error: {e}"
) # noqa: E501
continue

Expand All @@ -164,9 +196,33 @@ def remove_duplicate_message(items):
return filtered_items


def archive_files(bucket, all_keys, dest="archive"):
def archive_jurisdiction_file_folder(
jurisdiction_abbreviation, bucket, tmp_folder_path, file_folder_path
):
# Make a zip file of the jurisdiction's source data
now = datetime.datetime.now()
zip_filename = f"{jurisdiction_abbreviation}-{now.isoformat()}"
zip_filepath = os.path.join(tmp_folder_path, zip_filename)
# shutil puts all the files into the zip folder at root level.
# It does not include the folder in contents
# it does add the ".zip" extension
archive_filename = shutil.make_archive(
zip_filepath, "zip", file_folder_path
)

# Upload to archive section of S3 bucket
s3_destination_key = f"archive/{zip_filename}.zip"
s3_resource.meta.client.upload_file(
archive_filename, bucket, s3_destination_key
)

return s3_destination_key


def archive_individual_files(bucket, all_keys, dest="archive"):
"""
Archive the processed file to avoid possible scenarios of race conditions.

We currently use `meta.client.copy` instead of `client.copy` b/c
it can copy multiple files via multiple threads, since we have batching
in view.
Expand Down Expand Up @@ -198,15 +254,15 @@ def archive_files(bucket, all_keys, dest="archive"):
continue


def retrieve_messages_from_queue():
def retrieve_messages_from_queue(delete_after_fetch=True):
"""
Get the file paths from the SQS.
"""

# Create SQS client
sqs = boto3.client("sqs")

sqs_url = os.environ.get("SQS_QUEUE_URL")
sqs_url = env.str("SQS_QUEUE_URL")

# Receive message from SQS queue
response = sqs.receive_message(
Expand All @@ -227,26 +283,31 @@ def retrieve_messages_from_queue():

receipt_handle = message["ReceiptHandle"]

# Delete received message from queue
sqs.delete_message(QueueUrl=sqs_url, ReceiptHandle=receipt_handle)
logger.debug(f"Received and deleted message: {receipt_handle}")
if delete_after_fetch:
# Delete received message from queue
sqs.delete_message(
QueueUrl=sqs_url, ReceiptHandle=receipt_handle
)
logger.debug(f"Received and deleted message: {receipt_handle}")
else:
logger.debug(
f"Received message (no deletion): {receipt_handle}"
)
return message_bodies


def batch_retrieval_from_sqs(batch_size=600):
def batch_retrieval_from_sqs(batch_size=600, delete_after_fetch=True):
"""
Retrieve messages from SQS in batches
"""
msg = []

# SQS allows a maximum of 10 messages to be retrieved at a time
for _ in range(batch_size // 10):
msg.extend(retrieve_messages_from_queue())
msg.extend(retrieve_messages_from_queue(delete_after_fetch))
filtered_messages = remove_duplicate_message(msg)

logger.info(
f"message_count: {len(filtered_messages)} received & deleted from SQS"
)
logger.info(f"message_count: {len(filtered_messages)} received from SQS")
return filtered_messages


Expand Down Expand Up @@ -326,3 +387,8 @@ def do_import(jurisdiction_id: str, datadir: str) -> None:
Jurisdiction.objects.filter(id=jurisdiction_id).update(
latest_bill_update=datetime.datetime.utcnow()
)


# run process_import_function if main
if __name__ == "__main__":
process_import_function({}, {})
55 changes: 54 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ textract = "^1.6.5"
extract-msg = "0.28.7"
black = "^24.4.2"
setuptools = "69.5.1"
environs = "^11.1.0"


[build-system]
Expand Down
Loading