Skip to content

Commit

Permalink
feat: Add gauge and counters for AEC and answer latency
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Dec 13, 2024
1 parent cafe7f3 commit 59e0d68
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 76 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,18 @@ Enhance the LLM’s accuracy and domain adaptation by integrating historical dat
4. Validate improvements: Test the updated model against sample scenarios and measure key performance indicators (e.g. user satisfaction, call duration, resolution rate) to confirm that adjustments have led to meaningful enhancements.
5. Monitor, iterate, and A/B test: Regularly reassess the model’s performance, integrate newly gathered data, and apply further fine-tuning as needed. Leverage [built-in feature configurations to A/B test (App Configuration Experimentation)](https://learn.microsoft.com/en-us/azure/azure-app-configuration/concept-experimentation) different versions of the model, ensuring responsible, data-driven decisions and continuous optimization over time.

### Monitoring the application

Application send traces and metrics to Azure Application Insights. You can monitor the application from the Azure portal, or by using the API.

This includes application behavior, database queries, and external service calls. Plus, LLM metrics (latency, token usage, prompts content, raw response) from [OpenLLMetry](https://github.com/traceloop/openllmetry), following the [semantic sonventions for OpenAI operations](https://opentelemetry.io/docs/specs/semconv/gen-ai/openai/#openai-spans).

Additionally custom metrics (viewable in Application Insights > Metrics) are published, notably:

- `call.aec.droped`, number of times the echo cancellation dropped the voice completely.
- `call.aec.missed`, number of times the echo cancellation failed to remove the echo in time.
- `call.answer.latency`, time between the end of the user voice and the start of the bot voice.

## Q&A

### What will this cost?
Expand Down
18 changes: 9 additions & 9 deletions app/helpers/call_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from app.helpers.features import recognition_retry_max, recording_enabled
from app.helpers.llm_worker import completion_sync
from app.helpers.logging import logger
from app.helpers.monitoring import SpanAttributes, span_attribute, tracer
from app.helpers.monitoring import SpanAttributeEnum, tracer
from app.models.call import CallStateModel
from app.models.message import (
ActionEnum as MessageActionEnum,
Expand Down Expand Up @@ -229,7 +229,7 @@ async def on_automation_recognize_error(
logger.warning("Unknown context %s, no action taken", contexts)

# Enrich span
span_attribute(SpanAttributes.CALL_CHANNEL, "ivr")
SpanAttributeEnum.CALL_CHANNEL.attribute("ivr")

# Retry IVR recognition
logger.info(
Expand Down Expand Up @@ -348,7 +348,7 @@ async def on_play_started(
logger.debug("Play started")

# Enrich span
span_attribute(SpanAttributes.CALL_CHANNEL, "voice")
SpanAttributeEnum.CALL_CHANNEL.attribute("voice")

# Update last interaction
async with _db.call_transac(
Expand All @@ -374,7 +374,7 @@ async def on_automation_play_completed(
logger.debug("Play completed")

# Enrich span
span_attribute(SpanAttributes.CALL_CHANNEL, "voice")
SpanAttributeEnum.CALL_CHANNEL.attribute("voice")

# Update last interaction
async with _db.call_transac(
Expand Down Expand Up @@ -414,7 +414,7 @@ async def on_play_error(error_code: int) -> None:
logger.debug("Play failed")

# Enrich span
span_attribute(SpanAttributes.CALL_CHANNEL, "voice")
SpanAttributeEnum.CALL_CHANNEL.attribute("voice")

# Suppress known errors
# See: https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/communication-services/how-tos/call-automation/play-action.md
Expand Down Expand Up @@ -452,8 +452,8 @@ async def on_ivr_recognized(
logger.info("IVR recognized: %s", label)

# Enrich span
span_attribute(SpanAttributes.CALL_CHANNEL, "ivr")
span_attribute(SpanAttributes.CALL_MESSAGE, label)
SpanAttributeEnum.CALL_CHANNEL.attribute("ivr")
SpanAttributeEnum.CALL_MESSAGE.attribute(label)

# Parse language from label
try:
Expand Down Expand Up @@ -517,8 +517,8 @@ async def on_sms_received(
logger.info("SMS received from %s: %s", call.initiate.phone_number, message)

# Enrich span
span_attribute(SpanAttributes.CALL_CHANNEL, "sms")
span_attribute(SpanAttributes.CALL_MESSAGE, message)
SpanAttributeEnum.CALL_CHANNEL.attribute("sms")
SpanAttributeEnum.CALL_MESSAGE.attribute(message)

# Add the SMS to the call history
async with _db.call_transac(
Expand Down
37 changes: 30 additions & 7 deletions app/helpers/call_llm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime, timedelta
from functools import wraps
Expand All @@ -12,7 +13,7 @@
from openai import APIError

from app.helpers.call_utils import (
EchoCancellationStream,
AECStream,
handle_media,
handle_realtime_tts,
tts_sentence_split,
Expand All @@ -34,7 +35,12 @@
completion_stream,
)
from app.helpers.logging import logger
from app.helpers.monitoring import SpanAttributes, span_attribute, tracer
from app.helpers.monitoring import (
SpanAttributeEnum,
call_answer_latency,
gauge_set,
tracer,
)
from app.models.call import CallStateModel
from app.models.message import (
ActionEnum as MessageAction,
Expand Down Expand Up @@ -63,11 +69,12 @@ async def load_llm_chat( # noqa: PLR0913, PLR0915
# Init language recognition
stt_buffer: list[str] = [] # Temporary buffer for recognition
stt_complete_gate = asyncio.Event() # Gate to wait for the recognition
aec = EchoCancellationStream(
aec = AECStream(
sample_rate=audio_sample_rate,
scheduler=scheduler,
)
audio_reference: asyncio.Queue[bytes] = asyncio.Queue()
answer_start: float | None = None

async def _send_in_to_aec() -> None:
"""
Expand All @@ -83,8 +90,21 @@ async def _send_out_to_aec() -> None:
Forward the TTS to the echo cancellation and output.
"""
while True:
# Consume the audio
out_chunck = await audio_reference.get()
audio_reference.task_done()

# Report the answer latency and reset the timer
nonlocal answer_start
if answer_start:
# Enrich span
gauge_set(
metric=call_answer_latency,
value=time.monotonic() - answer_start,
)
answer_start = None

# Forward the audio
await asyncio.gather(
# First, send the audio to the output
audio_out.put(out_chunck),
Expand Down Expand Up @@ -209,6 +229,9 @@ async def _response_callback(_retry: bool = False) -> None:
If the recognition is empty, retry the recognition once. Otherwise, process the response.
"""
nonlocal answer_start
answer_start = time.monotonic()

# Wait the complete recognition for 50ms maximum
try:
await asyncio.wait_for(stt_complete_gate.wait(), timeout=0.05)
Expand All @@ -226,7 +249,7 @@ async def _response_callback(_retry: bool = False) -> None:
await asyncio.sleep(0.2)
return await _response_callback(_retry=True)

# Stop any previous response
# Stop any previous response, but keep the metrics
await _stop_callback()

# Add it to the call history and update last interaction
Expand Down Expand Up @@ -302,8 +325,8 @@ async def _continue_chat( # noqa: PLR0915, PLR0913
Returns the updated call model.
"""
# Add span attributes
span_attribute(SpanAttributes.CALL_CHANNEL, "voice")
span_attribute(SpanAttributes.CALL_MESSAGE, call.messages[-1].content)
SpanAttributeEnum.CALL_CHANNEL.attribute("voice")
SpanAttributeEnum.CALL_MESSAGE.attribute(call.messages[-1].content)

# Reset recognition retry counter
async with _db.call_transac(
Expand Down Expand Up @@ -665,7 +688,7 @@ async def _content_callback(buffer: str) -> None:
# TODO: Refacto and simplify
async def _process_audio_for_vad( # noqa: PLR0913
call: CallStateModel,
echo_cancellation: EchoCancellationStream,
echo_cancellation: AECStream,
out_stream: PushAudioInputStream,
response_callback: Callable[[], Awaitable[None]],
scheduler: Scheduler,
Expand Down
18 changes: 16 additions & 2 deletions app/helpers/call_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from app.helpers.features import vad_threshold
from app.helpers.identity import token
from app.helpers.logging import logger
from app.helpers.monitoring import call_aec_droped, call_aec_missed, counter_add
from app.models.call import CallStateModel
from app.models.message import (
MessageModel,
Expand Down Expand Up @@ -622,7 +623,7 @@ async def use_stt_client(
client.stop_continuous_recognition_async()


class EchoCancellationStream:
class AECStream:
"""
Real-time audio stream with echo cancellation.
Expand Down Expand Up @@ -773,7 +774,11 @@ async def _ensure_stream(self, input_pcm: bytes) -> None:

# If the processing is delayed, return the original input
except TimeoutError:
logger.warning("Echo processing timeout, returning input")
# Enrich span
counter_add(
metric=call_aec_missed,
value=1,
)
await self._output_queue.put((input_pcm, False))

async def process_stream(self) -> None:
Expand Down Expand Up @@ -820,12 +825,21 @@ async def pull_audio(self) -> tuple[bytes, bool]:
Returns a tuple with the echo-cancelled PCM audio and a boolean flag indicating if the user was speaking.
"""
# Fetch output audio
try:
return await asyncio.wait_for(
fut=self._output_queue.get(),
timeout=self._packet_duration_ms
/ 1000
* 1.5, # Allow temporary small latency
)

# If the processing is delayed, return an empty packet
except TimeoutError:
# Enrich span
counter_add(
metric=call_aec_droped,
value=1,
)
# Return empty packet
return self._empty_packet, False
12 changes: 6 additions & 6 deletions app/helpers/llm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from app.helpers.cache import async_lru_cache
from app.helpers.logging import logger
from app.helpers.monitoring import SpanAttributes, span_attribute, tracer
from app.helpers.monitoring import SpanAttributeEnum, tracer
from app.models.call import CallStateModel
from app.models.message import ToolModel

Expand Down Expand Up @@ -105,7 +105,7 @@ async def execute(
# Update tool
tool.content = res
# Enrich span
span_attribute(SpanAttributes.TOOL_RESULT, tool.content)
SpanAttributeEnum.TOOL_RESULT.attribute(tool.content)
return

# Try to fix JSON args to catch LLM hallucinations
Expand All @@ -128,12 +128,12 @@ async def execute(
f"Bad arguments, available are {functions}. Please try again."
)
# Enrich span
span_attribute(SpanAttributes.TOOL_RESULT, tool.content)
SpanAttributeEnum.TOOL_RESULT.attribute(tool.content)
return

# Enrich span
span_attribute(SpanAttributes.TOOL_ARGS, json.dumps(args))
span_attribute(SpanAttributes.TOOL_NAME, name)
SpanAttributeEnum.TOOL_ARGS.attribute(json.dumps(args))
SpanAttributeEnum.TOOL_NAME.attribute(name)

# Execute the function
try:
Expand All @@ -160,7 +160,7 @@ async def execute(
# Update tool
tool.content = res
# Enrich span
span_attribute(SpanAttributes.TOOL_RESULT, tool.content)
SpanAttributeEnum.TOOL_RESULT.attribute(tool.content)

@cache
def _available_functions(
Expand Down
Loading

0 comments on commit 59e0d68

Please sign in to comment.