Skip to content

Commit

Permalink
The worker's heart-beat manager watches all worker processes, not onl…
Browse files Browse the repository at this point in the history
…y the active one.

Signed-off-by: rafa-be <[email protected]>
  • Loading branch information
rafa-be committed Oct 25, 2024
1 parent 6446012 commit 7192a7d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 20 deletions.
23 changes: 14 additions & 9 deletions scaler/worker/agent/heartbeat_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
class VanillaHeartbeatManager(Looper, HeartbeatManager):
def __init__(self):
self._agent_process = psutil.Process()
self._worker_process: Optional[psutil.Process] = None

self._connector_external: Optional[AsyncConnector] = None
self._worker_task_manager: Optional[TaskManager] = None
Expand All @@ -36,9 +35,6 @@ def register(
self._timeout_manager = timeout_manager
self._processor_manager = processor_manager

def set_processor_pid(self, process_id: int):
self._worker_process = psutil.Process(process_id)

async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho):
if self._start_timestamp_ns == 0:
# not handling echo if we didn't send out heartbeat
Expand All @@ -49,21 +45,23 @@ async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho):
self._timeout_manager.update_last_seen_time()

async def routine(self):
if self._worker_process is None:
processors = self._processor_manager.processors()

if len(processors) == 0:
return

if self._start_timestamp_ns != 0:
# already sent heartbeat, expecting heartbeat echo, so not sending
return

processors = self._processor_manager.processors()
num_suspended_processors = self._processor_manager.num_suspended_processors()

for processor_holder in processors:
status = processor_holder.process().status()
if status in {psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD}:
await self._processor_manager.on_failing_processor(processor_holder.processor_id(), status)

processors = self._processor_manager.processors() # refreshes for removed dead and zombie processors
num_suspended_processors = self._processor_manager.num_suspended_processors()

await self._connector_external.send(
WorkerHeartbeat.new_msg(
Resource.new_msg(int(self._agent_process.cpu_percent() * 10), self._agent_process.memory_info().rss),
Expand All @@ -79,10 +77,17 @@ async def routine(self):
@staticmethod
def __get_processor_status_from_holder(processor: ProcessorHolder) -> ProcessorStatus:
process = processor.process()

try:
resource = Resource.new_msg(int(process.cpu_percent() * 10), process.memory_info().rss)
except psutil.ZombieProcess:
# Assumes dead processes do not use any resources
resource = Resource.new_msg(0, 0)

return ProcessorStatus.new_msg(
processor.pid(),
processor.initialized(),
processor.task() is not None,
processor.suspended(),
Resource.new_msg(int(process.cpu_percent() * 10), process.memory_info().rss),
resource,
)
4 changes: 0 additions & 4 deletions scaler/worker/agent/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@


class HeartbeatManager(metaclass=abc.ABCMeta):
@abc.abstractmethod
def set_processor_pid(self, process_id: int):
raise NotImplementedError()

@abc.abstractmethod
async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho):
raise NotImplementedError()
Expand Down
7 changes: 0 additions & 7 deletions scaler/worker/agent/processor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ async def on_failing_processor(self, processor_id: bytes, process_status: str):
profile_result = None

reason = f"process died {process_status=}"

if holder == self._current_holder:
self.__restart_current_processor(reason)
else:
Expand All @@ -189,8 +188,6 @@ async def on_failing_processor(self, processor_id: bytes, process_status: str):
TaskResult.new_msg(task_id, TaskStatus.Failed, profile_result.serialize(), [result_object_id])
)

self.restart_current_processor(f"process died {process_status=}")

async def on_suspend_task(self, task_id: bytes) -> bool:
assert self._current_holder is not None
holder = self._current_holder
Expand Down Expand Up @@ -226,8 +223,6 @@ def on_resume_task(self, task_id: bytes) -> bool:
self._current_holder = suspended_holder
suspended_holder.resume()

self._heartbeat.set_processor_pid(suspended_holder.pid())

logging.info(f"Worker[{os.getpid()}]: resume Processor[{self._current_holder.pid()}]")

return True
Expand Down Expand Up @@ -271,14 +266,12 @@ def __start_new_processor(self):

processor_pid = self._current_holder.pid()

self._heartbeat.set_processor_pid(processor_pid)
self._profiling_manager.on_process_start(processor_pid)

logging.info(f"Worker[{os.getpid()}]: start Processor[{processor_pid}]")

def __kill_processor(self, reason: str, holder: ProcessorHolder):
processor_pid = holder.pid()
assert processor_pid is not None

self._profiling_manager.on_process_end(processor_pid)

Expand Down

0 comments on commit 7192a7d

Please sign in to comment.