diff --git a/app/helpers/call_llm.py b/app/helpers/call_llm.py index ba41c7f..862192d 100644 --- a/app/helpers/call_llm.py +++ b/app/helpers/call_llm.py @@ -38,6 +38,7 @@ from app.helpers.monitoring import ( SpanAttributeEnum, call_answer_latency, + call_cutoff_latency, gauge_set, tracer, ) @@ -177,6 +178,9 @@ async def _stop_callback() -> None: """ Triggered when the audio buffer needs to be cleared. """ + # Report the cutoff latency + start = time.monotonic() + # Cancel previous chat if last_chat: last_chat.cancel() @@ -196,6 +200,12 @@ async def _stop_callback() -> None: stt_buffer.clear() stt_complete_gate.clear() + # Report the cutoff latency + gauge_set( + metric=call_cutoff_latency, + value=time.monotonic() - start, + ) + async def _commit_answer( wait: bool, tool_blacklist: set[str] = set(), @@ -229,6 +239,7 @@ async def _response_callback(_retry: bool = False) -> None: If the recognition is empty, retry the recognition once. Otherwise, process the response. """ + # Report the answer latency nonlocal answer_start answer_start = time.monotonic() diff --git a/app/helpers/monitoring.py b/app/helpers/monitoring.py index 2817cbf..dc47e93 100644 --- a/app/helpers/monitoring.py +++ b/app/helpers/monitoring.py @@ -60,6 +60,12 @@ class SpanMeterEnum(str, Enum): """Echo cancellation missed frames.""" CALL_AEC_DROPED = "call.aec.droped" """Echo cancellation dropped frames.""" + CALL_CUTOFF_LATENCY = "call.cutoff.latency" + """Cutoff latency in seconds.""" + CALL_FRAMES_IN_LATENCY = "call.frames.in.latency" + """Audio frames in latency in seconds.""" + CALL_FRAMES_OUT_LATENCY = "call.frames.out.latency" + """Audio frames out latency in seconds.""" def counter( self, @@ -109,9 +115,12 @@ def gauge( ) # Init metrics -call_answer_latency = SpanMeterEnum.CALL_ANSWER_LATENCY.gauge("s") call_aec_droped = SpanMeterEnum.CALL_AEC_DROPED.counter("frames") call_aec_missed = SpanMeterEnum.CALL_AEC_MISSED.counter("frames") +call_answer_latency = SpanMeterEnum.CALL_ANSWER_LATENCY.gauge("s") +call_cutoff_latency = SpanMeterEnum.CALL_CUTOFF_LATENCY.gauge("s") +call_frames_in_latency = SpanMeterEnum.CALL_FRAMES_IN_LATENCY.gauge("s") +call_frames_out_latency = SpanMeterEnum.CALL_FRAMES_OUT_LATENCY.gauge("s") def gauge_set( diff --git a/app/main.py b/app/main.py index d47ef29..fcaa8e6 100644 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,6 @@ import asyncio import json +import time from base64 import b64decode, b64encode from contextlib import asynccontextmanager, suppress from datetime import timedelta @@ -59,7 +60,13 @@ from app.helpers.config import CONFIG from app.helpers.http import aiohttp_session, azure_transport from app.helpers.logging import logger -from app.helpers.monitoring import SpanAttributeEnum, tracer +from app.helpers.monitoring import ( + SpanAttributeEnum, + call_frames_in_latency, + call_frames_out_latency, + gauge_set, + tracer, +) from app.helpers.pydantic_types.phone_numbers import PhoneNumber from app.helpers.resources import resources_dir from app.models.call import CallGetModel, CallInitiateModel, CallStateModel @@ -611,30 +618,50 @@ async def _consume_audio() -> None: Consume audio data from the WebSocket. """ logger.debug("Audio data consumer started") + + # Loop until the WebSocket is disconnected with suppress(WebSocketDisconnect): + start: float | None = None async for event in websocket.iter_json(): # TODO: Handle configuration event (audio format, sample rate, etc.) # Skip non-audio events if "kind" not in event or event["kind"] != "AudioData": continue + # Filter out silent audio audio_data: dict[str, Any] = event.get("audioData", {}) audio_base64: str | None = audio_data.get("data", None) audio_silent: bool | None = audio_data.get("silent", True) if audio_silent or not audio_base64: continue + # Queue audio await audio_in.put(b64decode(audio_base64)) + # Report the frames in latency and reset the timer + if start: + gauge_set( + metric=call_frames_in_latency, + value=time.monotonic() - start, + ) + start = time.monotonic() + + logger.debug("Audio data consumer stopped") + async def _send_audio() -> None: """ Send audio data to the WebSocket """ logger.debug("Audio data sender started") + + # Loop until the WebSocket is disconnected with suppress(WebSocketDisconnect): + start: float | None = None while True: + # Get audio audio_data = await audio_out.get() audio_out.task_done() + # Send audio if isinstance(audio_data, bytes): await websocket.send_json( @@ -645,6 +672,7 @@ async def _send_audio() -> None: }, } ) + # Stop audio elif audio_data is False: logger.debug("Stop audio event received, stopping audio") @@ -655,6 +683,16 @@ async def _send_audio() -> None: } ) + # Report the frames out latency and reset the timer + if start: + gauge_set( + metric=call_frames_out_latency, + value=time.monotonic() - start, + ) + start = time.monotonic() + + logger.debug("Audio data sender stopped") + async with get_scheduler() as scheduler: await asyncio.gather( # Consume audio from the WebSocket