Skip to content

Commit

Permalink
Merge pull request #20 from wakscord/add-testcode-20230801
Browse files Browse the repository at this point in the history
Alarm sender 위주로 테스트 코드 추가 및 리팩토링
  • Loading branch information
cdw8431 authored Aug 5, 2023
2 parents cb65817 + 79688e3 commit 570f088
Show file tree
Hide file tree
Showing 30 changed files with 563 additions and 143 deletions.
31 changes: 17 additions & 14 deletions app/alarm/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
from enum import Enum

from app.common.exceptions import AppException


class ParseInvalidArgumentException(AppException):
class AlarmSendFailedException(AppException):
def __init__(self, message: str):
self.message = message

def __str__(self) -> str:
return f"올바른 요청 인자가 아닙니다, ({self.message})"

return f"요청이 실패했습니다, ({self.message})"

class ParseInvalidFormatException(AppException):
def __init__(self, message: str):
self.message = message

class RateLimitException(AppException):
def __str__(self) -> str:
return f"올바른 요청 형식이 아닙니다, ({self.message})"
return "너무 많은 요청을 보내서 요청이 실패했습니다."


class AlarmSendFailedException(AppException):
def __init__(self, message: str):
self.message = message
class UnsubscriberException(AppException):
def __init__(self, unsubscriber: str | None):
self.unsubscriber = unsubscriber

def __str__(self) -> str:
return f"요청이 실패했습니다, ({self.message})"
return f"구독을 해지한 유저 입니다, (key: {self.unsubscriber})"


class RateLimitException(AppException):
def __str__(self) -> str:
return "너무 많은 요청을 보내서 요청이 실패했습니다."
class RequestExc(Enum):
UNKNOWN = "전송에 실패했습니다"
AIOHTTP_CLIENT_CONN_ERROR = "클라이언트 커넥션 에러가 발생했습니다"

@staticmethod
def get_message(exc: "RequestExc") -> str:
return exc.value
15 changes: 0 additions & 15 deletions app/alarm/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@

class AlarmRepository(ABC):
_PROXIES_KEY = "proxies"
_UNSUBSCRIBERS_KEY = "unsubscribers"

@abstractmethod
async def get_unsubscribers(self) -> set[str]:
raise NotImplementedError

@abstractmethod
async def add_unsubscriber(self, unsubscriber: str) -> None:
raise NotImplementedError

@abstractmethod
async def get_least_usage_proxy(self) -> str | None:
Expand All @@ -24,12 +15,6 @@ class AlarmRedisRepository(AlarmRepository):
def __init__(self, session: Redis):
self._session = session

async def get_unsubscribers(self) -> set[str]:
return await self._session.smembers(self._UNSUBSCRIBERS_KEY)

async def add_unsubscriber(self, unsubscriber: str) -> None:
await self._session.sadd(self._UNSUBSCRIBERS_KEY, unsubscriber)

async def get_least_usage_proxy(self) -> str | None:
least_usage_proxy: list[str] = await self._session.zrange(self._PROXIES_KEY, start=0, end=0)
if not least_usage_proxy:
Expand Down
23 changes: 9 additions & 14 deletions app/alarm/response_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@

from app.alarm.constants import DISCORD_WEBHOOK_URL
from app.alarm.dtos import SendResponseDTO
from app.alarm.exceptions import AlarmSendFailedException, RateLimitException
from app.alarm.repository import AlarmRepository
from app.alarm.exceptions import AlarmSendFailedException, RateLimitException, UnsubscriberException


class AlarmResponseValidator:
def __init__(self, repo: AlarmRepository):
self._repo = repo
@classmethod
async def validate(cls, response: SendResponseDTO) -> None:
if cls._is_success(response.status):
return

async def is_done(self, response: SendResponseDTO) -> bool:
if self._is_success(response.status):
return True
if cls._is_unsubscribe(response.status):
unsubscriber = cls._parse_unsubscriber(response.url)
raise UnsubscriberException(unsubscriber)

if self._is_unsubscribe(response.status):
unsubscriber = self._parse_unsubscriber(response.url)
if unsubscriber:
await self._repo.add_unsubscriber(unsubscriber)
return True

elif self._is_rate_limit(response.status):
elif cls._is_rate_limit(response.status):
raise RateLimitException()

message = f"status_code: {response.status}, body: {await response.text()}"
Expand Down
81 changes: 44 additions & 37 deletions app/alarm/sender.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,89 @@
import asyncio
import math
import traceback
from typing import Callable
from typing import Callable, Coroutine

import aiohttp
import orjson
from aiohttp import BasicAuth, ClientResponse, ClientSession

from app.alarm.constants import DEFAULT_RETRY_AFTER, DEFAULT_RETRY_ATTEMPT, DISCORD_WEBHOOK_URL
from app.alarm.dtos import SendResponseDTO
from app.alarm.exceptions import AlarmSendFailedException, RateLimitException
from app.alarm.repository import AlarmRedisRepository
from app.alarm.exceptions import AlarmSendFailedException, RateLimitException, RequestExc, UnsubscriberException
from app.alarm.repository import AlarmRepository
from app.alarm.response_validator import AlarmResponseValidator
from app.common.logger import logger
from app.common.settings import settings
from app.retry.rate_limiter import RetryRateLimiter
from app.unsubscriber.repository import UnsubscriberRepository


class AlarmSender:
class AlarmService:
_headers = {"Content-Type": "application/json"}

def __init__(self, repo: AlarmRedisRepository, retry_rate_limiter: RetryRateLimiter):
self._repo = repo
self.retry_rate_limiter = retry_rate_limiter
def __init__(
self,
alarm_repo: AlarmRepository,
unsubscriber_repo: UnsubscriberRepository,
):
self._alarm_repo = alarm_repo
self._unsubscriber_repo = unsubscriber_repo

async def send(self, subscribers: list[str], message: dict) -> None:
unsubscribers: set[str] = await self._repo.get_unsubscribers()
active_subscribers = self._exclude_unsubscribers(subscribers, unsubscribers)
await self._send(active_subscribers, message)
async def send(self, subscribers: list[str], message: bytes) -> list[Coroutine]:
should_retry_alarms = []
chunked_subscribers_list = self._chunk_subscribers(subscribers, settings.MAX_CONCURRENT)

async def _send(self, subscribers: set[str], message: dict) -> None:
data = orjson.dumps(message)
chunked_subscribers_list = self._chunk_subscribers(list(subscribers), settings.MAX_CONCURRENT)
for chunked_subscribers in chunked_subscribers_list:
proxy = await self._repo.get_least_usage_proxy()
async with aiohttp.ClientSession(headers=self._headers) as session:
alarms = [
self._request(session, url=f"{DISCORD_WEBHOOK_URL}{key}", data=data, proxy=proxy)
for key in chunked_subscribers
]
result = await asyncio.gather(*alarms)
failed_subscribers: list[str] = await self._send(chunked_subscribers, message)
if not failed_subscribers:
continue
should_retry_alarms.extend(await self._create_retry_task(failed_subscribers, message))

failed_alarms = [alarm for alarm in result if alarm]
if failed_alarms:
proxy = await self._repo.get_least_usage_proxy()
retry_alarms = [self._retry(url=subscriber, data=data, proxy=proxy) for subscriber in failed_alarms]
self.retry_rate_limiter.add_alarms(retry_alarms)
return should_retry_alarms

async def _send(self, subscribers: list[str], message: bytes) -> list[str]:
proxy = await self._alarm_repo.get_least_usage_proxy()
async with aiohttp.ClientSession(headers=self._headers) as session:
alarms = [
self._request(session, url=f"{DISCORD_WEBHOOK_URL}{key}", data=message, proxy=proxy)
for key in subscribers
]
responses = await asyncio.gather(*alarms)
return [response for response in responses if response]

async def _request(self, session: ClientSession, url: str, data: bytes, proxy: str | None) -> str | None:
proxy_auth = BasicAuth(settings.PROXY_USER, settings.PROXY_PASSWORD) if proxy else None
try:
response: ClientResponse = await session.post(url=url, data=data, proxy=proxy, proxy_auth=proxy_auth)
response_dto = SendResponseDTO(url=response.url, status=response.status, text=response.text)
await AlarmResponseValidator(self._repo).is_done(response_dto)
await AlarmResponseValidator.validate(response_dto)
return None
except UnsubscriberException as exc:
if exc.unsubscriber:
await self._unsubscriber_repo.add_unsubscriber(exc.unsubscriber)
except (RateLimitException, AlarmSendFailedException) as exc:
logger.warning(exc)
except aiohttp.ClientConnectionError as exc:
logger.warning(f"클라이언트 커넥션 에러가 발생했습니다, (exception: {exc})")
exc_message = RequestExc.get_message(RequestExc.AIOHTTP_CLIENT_CONN_ERROR)
logger.warning(f"{exc_message}, (exception: {exc})")
except Exception as exc:
logger.warning(f"전송에 실패했습니다, (exception: {exc}\n{traceback.format_exc()})")
exc_message = RequestExc.get_message(RequestExc.UNKNOWN)
logger.warning(f"{exc_message}, (exception: {exc}\n{traceback.format_exc()})")
return url

async def _retry(self, url: str, data: bytes, proxy: str | None) -> None:
async def _create_retry_task(self, failed_subscribers: list[str], message: bytes) -> list[Coroutine]:
proxy = await self._alarm_repo.get_least_usage_proxy()
return [self._retry(url=subscriber, message=message, proxy=proxy) for subscriber in failed_subscribers]

async def _retry(self, url: str, message: bytes, proxy: str | None) -> None:
remain_retry_attempt = DEFAULT_RETRY_ATTEMPT
async with aiohttp.ClientSession(headers=self._headers) as session:
while True:
is_success = not await self._request(session, url=url, data=data, proxy=proxy)
is_success = not await self._request(session, url=url, data=message, proxy=proxy)
if is_success or not remain_retry_attempt:
break
remain_retry_attempt -= 1
current_retry_attempt = DEFAULT_RETRY_ATTEMPT - remain_retry_attempt
await asyncio.sleep(current_retry_attempt * DEFAULT_RETRY_AFTER)

@staticmethod
def _exclude_unsubscribers(subscribers: list[str], unsubscribers: set[str]) -> set[str]:
return set(subscribers) - unsubscribers

@staticmethod
def _chunk_subscribers(subscribers: list[str], max_concurrent: int) -> list[list[str]]:
chunk_len: int = math.ceil(len(subscribers) / max_concurrent)
Expand Down
12 changes: 9 additions & 3 deletions app/common/di.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
from redis.asyncio import BlockingConnectionPool, Redis

from app.alarm.repository import AlarmRedisRepository
from app.alarm.sender import AlarmSender
from app.alarm.sender import AlarmService
from app.common.settings import settings
from app.node.manager import NodeManager
from app.retry.rate_limiter import RetryRateLimiter
from app.unsubscriber.repository import UnsubscriberRedisRepository
from app.unsubscriber.service import UnsubscriberService


class CacheContainer(containers.DeclarativeContainer):
Expand All @@ -27,6 +29,10 @@ class AppContainer(containers.DeclarativeContainer):
cache_session = CacheContainer.redis_session

retry_rate_limiter = providers.Singleton(RetryRateLimiter)
alarm_repository = providers.Singleton(AlarmRedisRepository, session=cache_session)
alarm_sender = providers.Singleton(AlarmSender, repo=alarm_repository, retry_rate_limiter=retry_rate_limiter)
node_manager = providers.Singleton(NodeManager, node_id=settings.NODE_ID, session=cache_session)

alarm_repo = providers.Singleton(AlarmRedisRepository, session=cache_session)
unsubscriber_repo = providers.Singleton(UnsubscriberRedisRepository, session=cache_session)

alarm_service = providers.Singleton(AlarmService, alarm_repo=alarm_repo, unsubscriber_repo=unsubscriber_repo)
unsubscriber_service = providers.Singleton(UnsubscriberService, repo=unsubscriber_repo)
6 changes: 3 additions & 3 deletions app/common/process_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
def process_status_handler(func): # type: ignore
@functools.wraps(func)
async def wrapper(*args, **kwargs): # type: ignore
process_status_manager.start()
manager.start()
result = await func(*args, **kwargs)

process_status_manager.complete()
manager.complete()
return result

return wrapper
Expand All @@ -33,4 +33,4 @@ def get_current(self) -> ProcessStatus:
return self._status


process_status_manager = ProcessStatusManager()
manager = ProcessStatusManager()
Empty file added app/common/utils/__init__.py
Empty file.
17 changes: 17 additions & 0 deletions app/common/utils/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from app.common.exceptions import AppException


class ParseInvalidArgumentException(AppException):
def __init__(self, message: str):
self.message = message

def __str__(self) -> str:
return f"올바른 요청 인자가 아닙니다, ({self.message})"


class ParseInvalidFormatException(AppException):
def __init__(self, message: str):
self.message = message

def __str__(self) -> str:
return f"올바른 요청 형식이 아닙니다, ({self.message})"
4 changes: 2 additions & 2 deletions app/alarm/task_parser.py → app/common/utils/task_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import orjson
from dacite import from_dict

from app.alarm.exceptions import ParseInvalidArgumentException, ParseInvalidFormatException
from app.common.utils.exceptions import ParseInvalidArgumentException, ParseInvalidFormatException


@dataclass(frozen=True)
Expand All @@ -13,7 +13,7 @@ class AlarmTask:
data: dict


class AlarmTaskParser:
class TaskParser:
def __init__(self, raw_task: tuple[str, str]) -> None:
self._alarm_task = self._parse_raw_task(raw_task)

Expand Down
35 changes: 28 additions & 7 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,51 @@
import asyncio
from typing import Coroutine

from dependency_injector.wiring import Provide, inject
from orjson import orjson

from app.alarm.sender import AlarmSender
from app.alarm.task_parser import AlarmTaskParser
from app.alarm.sender import AlarmService
from app.common.di import AppContainer
from app.common.exceptions import async_exception_handler
from app.common.process_status import process_status_handler
from app.common.utils.task_parser import TaskParser
from app.node.manager import NodeManager
from app.retry.rate_limiter import RetryRateLimiter
from app.unsubscriber.service import UnsubscriberService


@process_status_handler
@inject
async def process_task(task: tuple[str, str], alarm_sender: AlarmSender = Provide[AppContainer.alarm_sender]) -> None:
parser = AlarmTaskParser(task)
await alarm_sender.send(parser.parse_subscribers(), parser.parse_message())
async def process_task(
task: tuple[str, str],
alarm_service: AlarmService = Provide[AppContainer.alarm_service],
unsubscriber_service: UnsubscriberService = Provide[AppContainer.unsubscriber_service],
) -> list[Coroutine]:
parser = TaskParser(task)
request_subscribers = parser.parse_subscribers()
request_message = parser.parse_message()

active_subscribers: list[str] = await unsubscriber_service.exclude_unsubscribers(request_subscribers)
message: bytes = orjson.dumps(request_message)

failed_alarms = await alarm_service.send(active_subscribers, message)
return failed_alarms


@async_exception_handler
@inject
async def run(node_manager: NodeManager = Provide[AppContainer.node_manager]) -> None:
async def run(
node_manager: NodeManager = Provide[AppContainer.node_manager],
retry_rate_limiter: RetryRateLimiter = Provide[AppContainer.retry_rate_limiter],
) -> None:
await node_manager.join_server()
await retry_rate_limiter.watch_retry()

while True:
task = await node_manager.pop_task()
if task:
await process_task(task)
failed_alarms = await process_task(task)
retry_rate_limiter.add_alarms(failed_alarms)


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion app/node/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from redis.asyncio import Redis

from app.common.logger import logger
from app.common.process_status import ProcessStatus, process_status_manager
from app.common.process_status import ProcessStatus
from app.common.process_status import manager as process_status_manager
from app.node.constants import NODE_HEALTH_CHECK_INTERVAL, TASK_POP_INTERVAL


Expand Down
Loading

0 comments on commit 570f088

Please sign in to comment.