Skip to content

Commit

Permalink
Fixes #33: processors can be suspended during the initialization phase.
Browse files Browse the repository at this point in the history
Signed-off-by: rafa-be <[email protected]>
  • Loading branch information
rafa-be committed Oct 25, 2024
1 parent bcf7cce commit 6446012
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 99 deletions.
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.8.11"
__version__ = "1.8.12"
10 changes: 6 additions & 4 deletions scaler/worker/agent/heartbeat_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,21 @@ async def routine(self):
# already sent heartbeat, expecting heartbeat echo, so not sending
return

if self._worker_process.status() in {psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD}:
await self._processor_manager.on_failing_task(self._worker_process.status())

processors = self._processor_manager.processors()
num_suspended_processors = self._processor_manager.num_suspended_processors()

for processor_holder in processors:
status = processor_holder.process().status()
if status in {psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD}:
await self._processor_manager.on_failing_processor(processor_holder.processor_id(), status)

await self._connector_external.send(
WorkerHeartbeat.new_msg(
Resource.new_msg(int(self._agent_process.cpu_percent() * 10), self._agent_process.memory_info().rss),
psutil.virtual_memory().available,
self._worker_task_manager.get_queued_size() - num_suspended_processors,
self._latency_us,
self._processor_manager.task_lock(),
self._processor_manager.can_accept_task(),
[self.__get_processor_status_from_holder(processor) for processor in processors],
)
)
Expand Down
18 changes: 7 additions & 11 deletions scaler/worker/agent/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,31 +58,31 @@ def on_object_response(self, request: ObjectResponse):
raise NotImplementedError()

@abc.abstractmethod
async def acquire_task_active_lock(self):
def can_accept_task(self) -> bool:
raise NotImplementedError()

@abc.abstractmethod
async def on_task(self, task: Task) -> bool:
async def wait_until_can_accept_task(self):
raise NotImplementedError()

@abc.abstractmethod
def on_cancel_task(self, task_id: bytes) -> Optional[Task]:
async def on_task(self, task: Task) -> bool:
raise NotImplementedError()

@abc.abstractmethod
async def on_failing_task(self, error: str):
async def on_cancel_task(self, task_id: bytes) -> Optional[Task]:
raise NotImplementedError()

@abc.abstractmethod
def on_suspend_task(self, task_id: bytes) -> bool:
async def on_failing_processor(self, processor_id: bytes, process_status: str):
raise NotImplementedError()

@abc.abstractmethod
def on_resume_task(self, task_id: bytes) -> bool:
async def on_suspend_task(self, task_id: bytes) -> bool:
raise NotImplementedError()

@abc.abstractmethod
def restart_current_processor(self, reason: str):
def on_resume_task(self, task_id: bytes) -> bool:
raise NotImplementedError()

@abc.abstractmethod
Expand All @@ -109,10 +109,6 @@ def processors(self) -> List[ProcessorHolder]:
def num_suspended_processors(self) -> int:
raise NotImplementedError()

@abc.abstractmethod
def task_lock(self) -> bool:
raise NotImplementedError()


class ProfilingManager(metaclass=abc.ABCMeta):
@abc.abstractmethod
Expand Down
8 changes: 8 additions & 0 deletions scaler/worker/agent/processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
event_loop: str,
address: ZMQConfig,
resume_event: Optional[EventType],
resumed_event: Optional[EventType],
garbage_collect_interval_seconds: int,
trim_memory_threshold_bytes: int,
logging_paths: Tuple[str, ...],
Expand All @@ -52,6 +53,7 @@ def __init__(
self._address = address

self._resume_event = resume_event
self._resumed_event = resumed_event

self._garbage_collect_interval_seconds = garbage_collect_interval_seconds
self._trim_memory_threshold_bytes = trim_memory_threshold_bytes
Expand Down Expand Up @@ -108,8 +110,14 @@ def __interrupt(self, *args):

def __suspend(self, *args):
assert self._resume_event is not None
assert self._resumed_event is not None

self._resume_event.wait() # stops any computation in the main thread until the event is triggered

# Ensures the processor agent knows we stopped waiting on `_resume_event`, as to avoid re-entrant wait on the
# event.
self._resumed_event.set()

def __run_forever(self):
try:
self._connector.send(ProcessorInitialized.new_msg())
Expand Down
27 changes: 17 additions & 10 deletions scaler/worker/agent/processor_holder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import asyncio
import logging
import os
import signal
from multiprocessing import Event
from typing import Optional, Tuple

import multiprocessing
import psutil

from scaler.io.config import DEFAULT_PROCESSOR_KILL_DELAY_SECONDS
Expand All @@ -26,19 +25,22 @@ def __init__(
):
self._processor_id: Optional[bytes] = None
self._task: Optional[Task] = None
self._initialized = asyncio.Event()
self._suspended = False

self._hard_suspend = hard_suspend
if hard_suspend:
self._resume_event = None
self._resumed_event = None
else:
self._resume_event = Event()
context = multiprocessing.get_context("spawn")
self._resume_event = context.Event()
self._resumed_event = context.Event()

self._processor = Processor(
event_loop=event_loop,
address=address,
resume_event=self._resume_event,
resumed_event=self._resumed_event,
garbage_collect_interval_seconds=garbage_collect_interval_seconds,
trim_memory_threshold_bytes=trim_memory_threshold_bytes,
logging_paths=logging_paths,
Expand All @@ -59,14 +61,10 @@ def processor_id(self) -> bytes:
return self._processor_id

def initialized(self) -> bool:
return self._initialized.is_set()
return self._processor_id is not None

def wait_initialized(self):
return self._initialized.wait()

def set_initialized(self, processor_id: bytes):
def initialize(self, processor_id: bytes):
self._processor_id = processor_id
self._initialized.set()

def task(self) -> Optional[Task]:
return self._task
Expand All @@ -81,6 +79,7 @@ def suspend(self):
assert self._processor is not None
assert self._task is not None
assert self._suspended is False
assert self.initialized()

if self._hard_suspend:
self.__send_signal("SIGSTOP")
Expand All @@ -92,7 +91,9 @@ def suspend(self):
# See https://github.com/Citi/scaler/issues/14

assert self._resume_event is not None
assert self._resumed_event is not None
self._resume_event.clear()
self._resumed_event.clear()

self.__send_signal(SUSPEND_SIGNAL)

Expand All @@ -106,8 +107,14 @@ def resume(self):
self.__send_signal("SIGCONT")
else:
assert self._resume_event is not None
assert self._resumed_event is not None

self._resume_event.set()

# Waits until the processor resumes processing. This avoids any future call to `suspend()` while the
# processor hasn't returned from the `_resumed_event.wait()` call yet (causes a re-entrant error on Linux).
self._resumed_event.wait()

self._suspended = False

def kill(self):
Expand Down
Loading

0 comments on commit 6446012

Please sign in to comment.