From 7192a7db1647cbc9179b8a9340fcedd90e597a15 Mon Sep 17 00:00:00 2001 From: rafa-be Date: Tue, 22 Oct 2024 14:32:34 +0200 Subject: [PATCH] The worker's heart-beat manager watches all worker processes, not only the active one. Signed-off-by: rafa-be --- scaler/worker/agent/heartbeat_manager.py | 23 ++++++++++++++--------- scaler/worker/agent/mixins.py | 4 ---- scaler/worker/agent/processor_manager.py | 7 ------- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/scaler/worker/agent/heartbeat_manager.py b/scaler/worker/agent/heartbeat_manager.py index d3d4510..b85274c 100644 --- a/scaler/worker/agent/heartbeat_manager.py +++ b/scaler/worker/agent/heartbeat_manager.py @@ -14,7 +14,6 @@ class VanillaHeartbeatManager(Looper, HeartbeatManager): def __init__(self): self._agent_process = psutil.Process() - self._worker_process: Optional[psutil.Process] = None self._connector_external: Optional[AsyncConnector] = None self._worker_task_manager: Optional[TaskManager] = None @@ -36,9 +35,6 @@ def register( self._timeout_manager = timeout_manager self._processor_manager = processor_manager - def set_processor_pid(self, process_id: int): - self._worker_process = psutil.Process(process_id) - async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho): if self._start_timestamp_ns == 0: # not handling echo if we didn't send out heartbeat @@ -49,21 +45,23 @@ async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho): self._timeout_manager.update_last_seen_time() async def routine(self): - if self._worker_process is None: + processors = self._processor_manager.processors() + + if len(processors) == 0: return if self._start_timestamp_ns != 0: # already sent heartbeat, expecting heartbeat echo, so not sending return - processors = self._processor_manager.processors() - num_suspended_processors = self._processor_manager.num_suspended_processors() - for processor_holder in processors: status = processor_holder.process().status() if status in {psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD}: await self._processor_manager.on_failing_processor(processor_holder.processor_id(), status) + processors = self._processor_manager.processors() # refreshes for removed dead and zombie processors + num_suspended_processors = self._processor_manager.num_suspended_processors() + await self._connector_external.send( WorkerHeartbeat.new_msg( Resource.new_msg(int(self._agent_process.cpu_percent() * 10), self._agent_process.memory_info().rss), @@ -79,10 +77,17 @@ async def routine(self): @staticmethod def __get_processor_status_from_holder(processor: ProcessorHolder) -> ProcessorStatus: process = processor.process() + + try: + resource = Resource.new_msg(int(process.cpu_percent() * 10), process.memory_info().rss) + except psutil.ZombieProcess: + # Assumes dead processes do not use any resources + resource = Resource.new_msg(0, 0) + return ProcessorStatus.new_msg( processor.pid(), processor.initialized(), processor.task() is not None, processor.suspended(), - Resource.new_msg(int(process.cpu_percent() * 10), process.memory_info().rss), + resource, ) diff --git a/scaler/worker/agent/mixins.py b/scaler/worker/agent/mixins.py index d2acb8a..8cec6de 100644 --- a/scaler/worker/agent/mixins.py +++ b/scaler/worker/agent/mixins.py @@ -15,10 +15,6 @@ class HeartbeatManager(metaclass=abc.ABCMeta): - @abc.abstractmethod - def set_processor_pid(self, process_id: int): - raise NotImplementedError() - @abc.abstractmethod async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho): raise NotImplementedError() diff --git a/scaler/worker/agent/processor_manager.py b/scaler/worker/agent/processor_manager.py index eb19be6..26c559d 100644 --- a/scaler/worker/agent/processor_manager.py +++ b/scaler/worker/agent/processor_manager.py @@ -164,7 +164,6 @@ async def on_failing_processor(self, processor_id: bytes, process_status: str): profile_result = None reason = f"process died {process_status=}" - if holder == self._current_holder: self.__restart_current_processor(reason) else: @@ -189,8 +188,6 @@ async def on_failing_processor(self, processor_id: bytes, process_status: str): TaskResult.new_msg(task_id, TaskStatus.Failed, profile_result.serialize(), [result_object_id]) ) - self.restart_current_processor(f"process died {process_status=}") - async def on_suspend_task(self, task_id: bytes) -> bool: assert self._current_holder is not None holder = self._current_holder @@ -226,8 +223,6 @@ def on_resume_task(self, task_id: bytes) -> bool: self._current_holder = suspended_holder suspended_holder.resume() - self._heartbeat.set_processor_pid(suspended_holder.pid()) - logging.info(f"Worker[{os.getpid()}]: resume Processor[{self._current_holder.pid()}]") return True @@ -271,14 +266,12 @@ def __start_new_processor(self): processor_pid = self._current_holder.pid() - self._heartbeat.set_processor_pid(processor_pid) self._profiling_manager.on_process_start(processor_pid) logging.info(f"Worker[{os.getpid()}]: start Processor[{processor_pid}]") def __kill_processor(self, reason: str, holder: ProcessorHolder): processor_pid = holder.pid() - assert processor_pid is not None self._profiling_manager.on_process_end(processor_pid)