Skip to content

Commit

Permalink
improve cron and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
the-infinity committed Nov 27, 2024
1 parent 105e43f commit 45b33c5
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
4 changes: 4 additions & 0 deletions webapp/common/logging/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ class LogMessageType(Enum):
DATABASE_DELETE = 'database-delete'
EXCEPTION = 'exception'
FAILED_SOURCE_HANDLING = 'failed-source-handling'
FAILED_STATIC_SOURCE_HANDLING = 'failed-static-source-handling'
FAILED_REALTIME_SOURCE_HANDLING = 'failed-realtime-source-handling'
FAILED_PARKING_SITE_HANDLING = 'failed-parking-site-handling'
FAILED_STATIC_PARKING_SITE_HANDLING = 'failed-static-parking-site-handling'
FAILED_REALTIME_PARKING_SITE_HANDLING = 'failed-realtime-parking-site-handling'
DUPLICATE_HANDLING = 'duplicate-handling'
MISC = 'misc'

Expand Down
29 changes: 21 additions & 8 deletions webapp/services/import_service/generic/generic_import_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
Use of this source code is governed by an MIT-style license that can be found in the LICENSE.txt.
"""

import traceback
from datetime import datetime, timezone

from flask import Flask
from validataclass.helpers import UnsetValue

from parkapi_sources import ParkAPISources
from parkapi_sources.converters.base_converter.pull import PullConverter
from parkapi_sources.exceptions import ImportParkingSiteException
from parkapi_sources.models import RealtimeParkingSiteInput, StaticParkingSiteInput
from validataclass.helpers import UnsetValue

from webapp.common.logging.models import LogMessageType, LogTag
from webapp.models import ExternalIdentifier, ParkingSite, ParkingSiteHistory, Source, Tag
from webapp.models.parking_site_group import ParkingSiteGroup
Expand Down Expand Up @@ -79,15 +80,15 @@ def update_source_static(self, source_uid: str):
try:
static_parking_site_inputs, static_parking_site_errors = converter.get_static_parking_sites()
except Exception as e:
self.logger.warning(message_type=LogMessageType.FAILED_SOURCE_HANDLING, message=str(e))
self.logger.warning(message_type=LogMessageType.FAILED_STATIC_SOURCE_HANDLING, message=str(e))
source.static_status = SourceStatus.FAILED
self.source_repository.save_source(source)
return

self.handle_static_import_results(source, static_parking_site_inputs, static_parking_site_errors)

for static_parking_site_error in static_parking_site_errors:
self.logger.warning(LogMessageType.FAILED_PARKING_SITE_HANDLING, str(static_parking_site_error))
self.logger.warning(LogMessageType.FAILED_STATIC_PARKING_SITE_HANDLING, str(static_parking_site_error))

source.static_data_updated_at = datetime.now(tz=timezone.utc)
source.static_status = SourceStatus.ACTIVE
Expand All @@ -110,15 +111,15 @@ def update_source_realtime(self, source_uid: str):
try:
realtime_parking_site_inputs, realtime_parking_site_errors = converter.get_realtime_parking_sites()
except Exception as e:
self.logger.warning(message_type=LogMessageType.FAILED_SOURCE_HANDLING, message=str(e))
self.logger.warning(message_type=LogMessageType.FAILED_REALTIME_SOURCE_HANDLING, message=str(e))
source.realtime_status = SourceStatus.FAILED
self.source_repository.save_source(source)
return

self.handle_realtime_import_results(source, realtime_parking_site_inputs, realtime_parking_site_errors)

for realtime_parking_site_error in realtime_parking_site_errors:
self.logger.warning(LogMessageType.FAILED_PARKING_SITE_HANDLING, str(realtime_parking_site_error))
self.logger.warning(LogMessageType.FAILED_REALTIME_PARKING_SITE_HANDLING, str(realtime_parking_site_error))

source.realtime_data_updated_at = datetime.now(tz=timezone.utc)
source.realtime_status = SourceStatus.ACTIVE
Expand Down Expand Up @@ -152,9 +153,15 @@ def handle_static_import_results(
):
existing_parking_site_ids = self.parking_site_repository.fetch_parking_sites_ids_by_source_id(source.id)
for static_parking_site_input in static_parking_site_inputs:
self._save_static_parking_site_input(source, static_parking_site_input, existing_parking_site_ids)
try:
self._save_static_parking_site_input(source, static_parking_site_input, existing_parking_site_ids)
except:
self.logger.warning(
LogMessageType.FAILED_STATIC_SOURCE_HANDLING,
f'Unhandled exception at dataset {static_parking_site_input}: {traceback.format_exc()}',
)

# Delete remaining existing parking sites because they are not in the new dataset
# Delete remaining existing parking sites because they are not in the new dataset
for existing_parking_site_id in existing_parking_site_ids:
existing_parking_site = self.parking_site_repository.fetch_parking_site_by_id(existing_parking_site_id)
self.parking_site_repository.delete_parking_site(existing_parking_site)
Expand Down Expand Up @@ -256,6 +263,12 @@ def handle_realtime_import_results(
self._save_realtime_parking_site_input(source, realtime_parking_site_input)
except ObjectNotFoundException:
realtime_parking_site_error_count += 1
except:
realtime_parking_site_error_count += 1
self.logger.warning(
LogMessageType.FAILED_REALTIME_SOURCE_HANDLING,
f'Unhandled exception at dataset {realtime_parking_site_input}: {traceback.format_exc()}',
)

if len(realtime_parking_site_inputs):
source.realtime_status = SourceStatus.ACTIVE
Expand Down
4 changes: 3 additions & 1 deletion webapp/services/tasks/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import abc
from abc import ABC

from celery.schedules import crontab


class BaseTask(ABC):
@property
Expand All @@ -14,7 +16,7 @@ def name(self) -> str:

@property
@abc.abstractmethod
def run_interval(self) -> int:
def run_interval(self) -> int | crontab:
raise RuntimeError('Not Implemented')

@staticmethod
Expand Down
4 changes: 3 additions & 1 deletion webapp/services/tasks/generic_import_heartbeat_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
Use of this source code is governed by an MIT-style license that can be found in the LICENSE.txt.
"""

from celery.schedules import crontab

from webapp.dependencies import dependencies
from webapp.extensions import celery

Expand All @@ -20,7 +22,7 @@ def task(source: str):


class RunGenericStaticImportTask(BaseTask):
run_interval = 60 * 60 * 24 # 24 hours
run_interval = crontab(hour='1')

@staticmethod
@celery.task()
Expand Down

0 comments on commit 45b33c5

Please sign in to comment.