Skip to content

Commit

Permalink
ops: Added OTEL metrics cutoff latency and frame in/out latencies
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Dec 13, 2024
1 parent f2095f5 commit 34ca384
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 2 deletions.
11 changes: 11 additions & 0 deletions app/helpers/call_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from app.helpers.monitoring import (
SpanAttributeEnum,
call_answer_latency,
call_cutoff_latency,
gauge_set,
tracer,
)
Expand Down Expand Up @@ -177,6 +178,9 @@ async def _stop_callback() -> None:
"""
Triggered when the audio buffer needs to be cleared.
"""
# Report the cutoff latency
start = time.monotonic()

# Cancel previous chat
if last_chat:
last_chat.cancel()
Expand All @@ -196,6 +200,12 @@ async def _stop_callback() -> None:
stt_buffer.clear()
stt_complete_gate.clear()

# Report the cutoff latency
gauge_set(
metric=call_cutoff_latency,
value=time.monotonic() - start,
)

async def _commit_answer(
wait: bool,
tool_blacklist: set[str] = set(),
Expand Down Expand Up @@ -229,6 +239,7 @@ async def _response_callback(_retry: bool = False) -> None:
If the recognition is empty, retry the recognition once. Otherwise, process the response.
"""
# Report the answer latency
nonlocal answer_start
answer_start = time.monotonic()

Expand Down
11 changes: 10 additions & 1 deletion app/helpers/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ class SpanMeterEnum(str, Enum):
"""Echo cancellation missed frames."""
CALL_AEC_DROPED = "call.aec.droped"
"""Echo cancellation dropped frames."""
CALL_CUTOFF_LATENCY = "call.cutoff.latency"
"""Cutoff latency in seconds."""
CALL_FRAMES_IN_LATENCY = "call.frames.in.latency"
"""Audio frames in latency in seconds."""
CALL_FRAMES_OUT_LATENCY = "call.frames.out.latency"
"""Audio frames out latency in seconds."""

def counter(
self,
Expand Down Expand Up @@ -109,9 +115,12 @@ def gauge(
)

# Init metrics
call_answer_latency = SpanMeterEnum.CALL_ANSWER_LATENCY.gauge("s")
call_aec_droped = SpanMeterEnum.CALL_AEC_DROPED.counter("frames")
call_aec_missed = SpanMeterEnum.CALL_AEC_MISSED.counter("frames")
call_answer_latency = SpanMeterEnum.CALL_ANSWER_LATENCY.gauge("s")
call_cutoff_latency = SpanMeterEnum.CALL_CUTOFF_LATENCY.gauge("s")
call_frames_in_latency = SpanMeterEnum.CALL_FRAMES_IN_LATENCY.gauge("s")
call_frames_out_latency = SpanMeterEnum.CALL_FRAMES_OUT_LATENCY.gauge("s")


def gauge_set(
Expand Down
40 changes: 39 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import time
from base64 import b64decode, b64encode
from contextlib import asynccontextmanager, suppress
from datetime import timedelta
Expand Down Expand Up @@ -59,7 +60,13 @@
from app.helpers.config import CONFIG
from app.helpers.http import aiohttp_session, azure_transport
from app.helpers.logging import logger
from app.helpers.monitoring import SpanAttributeEnum, tracer
from app.helpers.monitoring import (
SpanAttributeEnum,
call_frames_in_latency,
call_frames_out_latency,
gauge_set,
tracer,
)
from app.helpers.pydantic_types.phone_numbers import PhoneNumber
from app.helpers.resources import resources_dir
from app.models.call import CallGetModel, CallInitiateModel, CallStateModel
Expand Down Expand Up @@ -611,30 +618,50 @@ async def _consume_audio() -> None:
Consume audio data from the WebSocket.
"""
logger.debug("Audio data consumer started")

# Loop until the WebSocket is disconnected
with suppress(WebSocketDisconnect):
start: float | None = None
async for event in websocket.iter_json():
# TODO: Handle configuration event (audio format, sample rate, etc.)
# Skip non-audio events
if "kind" not in event or event["kind"] != "AudioData":
continue

# Filter out silent audio
audio_data: dict[str, Any] = event.get("audioData", {})
audio_base64: str | None = audio_data.get("data", None)
audio_silent: bool | None = audio_data.get("silent", True)
if audio_silent or not audio_base64:
continue

# Queue audio
await audio_in.put(b64decode(audio_base64))

# Report the frames in latency and reset the timer
if start:
gauge_set(
metric=call_frames_in_latency,
value=time.monotonic() - start,
)
start = time.monotonic()

logger.debug("Audio data consumer stopped")

async def _send_audio() -> None:
"""
Send audio data to the WebSocket
"""
logger.debug("Audio data sender started")

# Loop until the WebSocket is disconnected
with suppress(WebSocketDisconnect):
start: float | None = None
while True:
# Get audio
audio_data = await audio_out.get()
audio_out.task_done()

# Send audio
if isinstance(audio_data, bytes):
await websocket.send_json(
Expand All @@ -645,6 +672,7 @@ async def _send_audio() -> None:
},
}
)

# Stop audio
elif audio_data is False:
logger.debug("Stop audio event received, stopping audio")
Expand All @@ -655,6 +683,16 @@ async def _send_audio() -> None:
}
)

# Report the frames out latency and reset the timer
if start:
gauge_set(
metric=call_frames_out_latency,
value=time.monotonic() - start,
)
start = time.monotonic()

logger.debug("Audio data sender stopped")

async with get_scheduler() as scheduler:
await asyncio.gather(
# Consume audio from the WebSocket
Expand Down

0 comments on commit 34ca384

Please sign in to comment.