Skip to content

Commit

Permalink
feat(aio_helper): nice subprocesses/threads
Browse files Browse the repository at this point in the history
In order to improve the responsiveness of the main process.

Signed-off-by: Rongrong <[email protected]>
  • Loading branch information
Rongronggg9 committed Jan 1, 2024
1 parent 22a6572 commit 22300b0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 5 deletions.
6 changes: 6 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Enhancements

- **Set niceness for subprocesses/threads**: (Unix only) Nice subprocesses and/or threads to improve the responsiveness of the main process.

## v2.4.1: Minor enhancements, bug fixes, and Happy New Year!

### Enhancements
Expand Down
6 changes: 6 additions & 0 deletions docs/CHANGELOG.zh.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# 更新日志

## 尚未发布

### 增强

- **为子进程/线程设置 nice 值**: (仅限 Unix) 为子进程和/或线程提升 nice 值以改进主进程的响应性。

## v2.4.1: 次要的增强和 bug 修复,以及新年快乐!

### 增强
Expand Down
32 changes: 27 additions & 5 deletions src/aio_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
Asyncio helper functions.
"""
from __future__ import annotations
from typing import Callable, Union, Optional
from typing import Callable, Optional
from typing_extensions import Literal

import os
from functools import partial
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Future
from time import sleep
from signal import signal, SIGINT, SIGTERM
from collections import deque
from multiprocessing import get_context, current_process

from . import env
from . import env, log

MP_CTX = get_context()

Expand All @@ -33,14 +33,28 @@
aioProcessExecutor: Optional[ProcessPoolExecutor] = None
__aioExecutorsDeque: Optional[deque] = None

logger = log.getLogger('RSStT.aio_helper')


def _process_exit(_signum, _frame):
exit(1)


def _common_initializer():
if hasattr(os, 'nice'):
try:
niceness = os.nice(1) # nice processes/threads
except Exception as e:
logger.warning('Failed to nice subprocess/thread', exc_info=e)
else:
logger.debug(f'The niceness of the subprocess/thread has been set to {niceness}')
logger.debug('Subprocess/thread initialized')


def _process_initializer():
signal(SIGINT, _process_exit)
signal(SIGTERM, _process_exit)
_common_initializer()


def init():
Expand All @@ -52,7 +66,8 @@ def init():
# asyncio executors
aioThreadExecutor = ThreadPoolExecutor(
max_workers=THREAD_POOL_WEIGHT,
thread_name_prefix="rsstt_aio_thread_"
thread_name_prefix="rsstt_aio_thread_",
initializer=_common_initializer
) if THREAD_POOL_WEIGHT else None
aioProcessExecutor = ProcessPoolExecutor(
max_workers=PROCESS_POOL_WEIGHT,
Expand All @@ -67,8 +82,15 @@ def init():
)
) if aioThreadExecutor and aioProcessExecutor else None

# initialize all subprocesses/threads now
# here we use sleep to ensure all subprocesses/threads are launched
futures: list[Future] = []
if aioProcessExecutor and type(MP_CTX).__name__ == 'ForkContext':
[aioProcessExecutor.submit(sleep, 0.01 * (i + 1)) for i in range(PROCESS_POOL_WEIGHT * 2)]
futures.extend(aioProcessExecutor.submit(sleep, 0.01 * (i + 1)) for i in range(PROCESS_POOL_WEIGHT * 2))
if aioThreadExecutor:
futures.extend(aioThreadExecutor.submit(sleep, 0.01 * (i + 1)) for i in range(THREAD_POOL_WEIGHT * 2))
for future in futures:
future.result() # wait for subprocesses/threads


def _get_executor():
Expand Down

0 comments on commit 22300b0

Please sign in to comment.