Skip to content

Commit

Permalink
perf: Remove defer feature update
Browse files Browse the repository at this point in the history
This is totally an anti pattern. It overloads in few seconds the scheduler queue and slow down the whole request by 5-10 seconds.
  • Loading branch information
clemlesne committed Dec 14, 2024
1 parent d9604bc commit 42286af
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 211 deletions.
8 changes: 3 additions & 5 deletions app/helpers/call_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ async def on_call_connected(
_handle_recording(
call=call,
client=client,
scheduler=scheduler,
server_call_id=server_call_id,
), # Second, start recording the call
)
Expand Down Expand Up @@ -235,7 +234,7 @@ async def on_automation_recognize_error(
logger.info(
"Timeout, retrying language selection (%s/%s)",
call.recognition_retry,
await recognition_retry_max(scheduler),
await recognition_retry_max(),
)
await _handle_ivr_language(
call=call,
Expand Down Expand Up @@ -321,7 +320,7 @@ async def _pre_recognize_error(
Returns True if the call should continue, False if it should end.
"""
# Voice retries are exhausted, end call
if call.recognition_retry >= await recognition_retry_max(scheduler):
if call.recognition_retry >= await recognition_retry_max():
logger.info("Timeout, ending call")
return False

Expand Down Expand Up @@ -793,15 +792,14 @@ async def _handle_ivr_language(
async def _handle_recording(
call: CallStateModel,
client: CallAutomationClient,
scheduler: Scheduler,
server_call_id: str,
) -> None:
"""
Start recording the call.
Feature activation is checked before starting the recording.
"""
if not await recording_enabled(scheduler):
if not await recording_enabled():
return

assert CONFIG.communication_services.recording_container_url
Expand Down
17 changes: 7 additions & 10 deletions app/helpers/call_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ async def _response_callback(_retry: bool = False) -> None:
in_callback=aec.pull_audio,
out_callback=stt_client.push_audio,
response_callback=_response_callback,
scheduler=scheduler,
stop_callback=_stop_callback,
timeout_callback=_timeout_callback,
)
Expand Down Expand Up @@ -307,10 +306,10 @@ def _loading_task() -> asyncio.Task:
# Timeouts
soft_timeout_triggered = False
soft_timeout_task = asyncio.create_task(
asyncio.sleep(await answer_soft_timeout_sec(scheduler))
asyncio.sleep(await answer_soft_timeout_sec())
)
hard_timeout_task = asyncio.create_task(
asyncio.sleep(await answer_hard_timeout_sec(scheduler))
asyncio.sleep(await answer_hard_timeout_sec())
)

def _clear_tasks() -> None:
Expand Down Expand Up @@ -340,7 +339,7 @@ def _clear_tasks() -> None:
if hard_timeout_task.done():
logger.warning(
"Hard timeout of %ss reached",
await answer_hard_timeout_sec(scheduler),
await answer_hard_timeout_sec(),
)
# Clean up
_clear_tasks()
Expand All @@ -352,7 +351,7 @@ def _clear_tasks() -> None:
if soft_timeout_task.done() and not soft_timeout_triggered:
logger.warning(
"Soft timeout of %ss reached",
await answer_soft_timeout_sec(scheduler),
await answer_soft_timeout_sec(),
)
soft_timeout_triggered = True
# Never store the error message in the call history, it has caused hallucinations in the LLM
Expand Down Expand Up @@ -501,7 +500,6 @@ async def _content_callback(buffer: str) -> None:
async for delta in completion_stream(
max_tokens=160, # Lowest possible value for 90% of the cases, if not sufficient, retry will be triggered, 100 tokens ~= 75 words, 20 words ~= 1 sentence, 6 sentences ~= 160 tokens
messages=call.messages,
scheduler=scheduler,
system=system,
tools=tools,
):
Expand Down Expand Up @@ -619,7 +617,6 @@ async def _process_audio_for_vad( # noqa: PLR0913
in_callback: Callable[[], Awaitable[tuple[bytes, bool]]],
out_callback: Callable[[bytes], None],
response_callback: Callable[[], Awaitable[None]],
scheduler: Scheduler,
stop_callback: Callable[[], Awaitable[None]],
timeout_callback: Callable[[], Awaitable[None]],
) -> None:
Expand All @@ -643,7 +640,7 @@ async def _wait_for_silence() -> None:
"""
# Wait before flushing
nonlocal stop_task
timeout_ms = await vad_silence_timeout_ms(scheduler)
timeout_ms = await vad_silence_timeout_ms()
await asyncio.sleep(timeout_ms / 1000)

# Cancel the clear TTS task
Expand All @@ -656,7 +653,7 @@ async def _wait_for_silence() -> None:
await response_callback()

# Wait for silence and trigger timeout
timeout_sec = await phone_silence_timeout_sec(scheduler)
timeout_sec = await phone_silence_timeout_sec()
while True:
# Stop this time if the call played a message
timeout_start = datetime.now(UTC)
Expand Down Expand Up @@ -685,7 +682,7 @@ async def _wait_for_stop() -> None:
"""
Stop the TTS if user speaks for too long.
"""
timeout_ms = await vad_cutoff_timeout_ms(scheduler)
timeout_ms = await vad_cutoff_timeout_ms()

# Wait before clearing the TTS queue
await asyncio.sleep(timeout_ms / 1000)
Expand Down
5 changes: 2 additions & 3 deletions app/helpers/call_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,7 @@ async def pull_recognition(self) -> str:
try:
await asyncio.wait_for(
self._stt_complete_gate.wait(),
timeout=await recognition_stt_complete_timeout_ms(self._scheduler)
/ 1000,
timeout=await recognition_stt_complete_timeout_ms() / 1000,
)
except TimeoutError:
logger.debug("Complete recognition timeout, using partial recognition")
Expand Down Expand Up @@ -856,7 +855,7 @@ async def _rms_speech_detection(self, voice: np.ndarray) -> bool:
# Calculate Root Mean Square (RMS)
rms = np.sqrt(np.mean(voice**2))
# Get VAD threshold, divide by 10 to more usability from user side, as RMS is in range 0-1 and a detection of 0.1 is a good maximum threshold
threshold = await vad_threshold(self._scheduler) / 10
threshold = await vad_threshold() / 10
return rms >= threshold

async def _process_one(self, input_pcm: bytes) -> None:
Expand Down
59 changes: 19 additions & 40 deletions app/helpers/features.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import TypeVar, cast

from aiojobs import Scheduler
from azure.appconfiguration.aio import AzureAppConfigurationClient
from azure.core.exceptions import ResourceNotFoundError

Expand All @@ -16,55 +15,51 @@
T = TypeVar("T", bool, int, float, str)


async def answer_hard_timeout_sec(scheduler: Scheduler) -> int:
async def answer_hard_timeout_sec() -> int:
"""
The hard timeout for the bot answer in secs.
"""
return await _default(
default=60,
key="answer_hard_timeout_sec",
scheduler=scheduler,
type_res=int,
)


async def answer_soft_timeout_sec(scheduler: Scheduler) -> int:
async def answer_soft_timeout_sec() -> int:
"""
The soft timeout for the bot answer in secs.
"""
return await _default(
default=30,
key="answer_soft_timeout_sec",
scheduler=scheduler,
type_res=int,
)


async def callback_timeout_hour(scheduler: Scheduler) -> int:
async def callback_timeout_hour() -> int:
"""
The timeout for a callback in hours. Set 0 to disable.
"""
return await _default(
default=24,
key="callback_timeout_hour",
scheduler=scheduler,
type_res=int,
)


async def phone_silence_timeout_sec(scheduler: Scheduler) -> int:
async def phone_silence_timeout_sec() -> int:
"""
Amount of silence in secs to trigger a warning message from the assistant.
"""
return await _default(
default=20,
key="phone_silence_timeout_sec",
scheduler=scheduler,
type_res=int,
)


async def vad_threshold(scheduler: Scheduler) -> float:
async def vad_threshold() -> float:
"""
The threshold for voice activity detection. Between 0.1 and 1.
"""
Expand All @@ -73,88 +68,80 @@ async def vad_threshold(scheduler: Scheduler) -> float:
key="vad_threshold",
max_incl=1,
min_incl=0.1,
scheduler=scheduler,
type_res=float,
)


async def vad_silence_timeout_ms(scheduler: Scheduler) -> int:
async def vad_silence_timeout_ms() -> int:
"""
Silence to trigger voice activity detection in milliseconds.
"""
return await _default(
default=500,
key="vad_silence_timeout_ms",
scheduler=scheduler,
type_res=int,
)


async def vad_cutoff_timeout_ms(scheduler: Scheduler) -> int:
async def vad_cutoff_timeout_ms() -> int:
"""
The cutoff timeout for voice activity detection in milliseconds.
"""
return await _default(
default=250,
key="vad_cutoff_timeout_ms",
scheduler=scheduler,
type_res=int,
)


async def recording_enabled(scheduler: Scheduler) -> bool:
async def recording_enabled() -> bool:
"""
Whether call recording is enabled.
"""
return await _default(
default=False,
key="recording_enabled",
scheduler=scheduler,
type_res=bool,
)


async def slow_llm_for_chat(scheduler: Scheduler) -> bool:
async def slow_llm_for_chat() -> bool:
"""
Whether to use the slow LLM for chat.
"""
return await _default(
default=True,
key="slow_llm_for_chat",
scheduler=scheduler,
type_res=bool,
)


async def recognition_retry_max(scheduler: Scheduler) -> int:
async def recognition_retry_max() -> int:
"""
The maximum number of retries for voice recognition. Minimum of 1.
"""
return await _default(
default=3,
key="recognition_retry_max",
min_incl=1,
scheduler=scheduler,
type_res=int,
)


async def recognition_stt_complete_timeout_ms(scheduler: Scheduler) -> int:
async def recognition_stt_complete_timeout_ms() -> int:
"""
The timeout for STT completion in milliseconds.
"""
return await _default(
default=100,
key="recognition_stt_complete_timeout_ms",
scheduler=scheduler,
type_res=int,
)


async def _default( # noqa: PLR0913
async def _default(
default: T,
key: str,
scheduler: Scheduler,
type_res: type[T],
max_incl: T | None = None,
min_incl: T | None = None,
Expand All @@ -165,7 +152,6 @@ async def _default( # noqa: PLR0913
# Get the setting
res = await _get(
key=key,
scheduler=scheduler,
type_res=type_res,
)
if res:
Expand Down Expand Up @@ -207,11 +193,7 @@ def _validate(
return res


async def _get(
key: str,
scheduler: Scheduler,
type_res: type[T],
) -> T | None:
async def _get(key: str, type_res: type[T]) -> T | None:
"""
Get a setting from the App Configuration service.
"""
Expand All @@ -224,15 +206,6 @@ async def _get(
value=cached.decode(),
)

# Defer the update
await scheduler.spawn(_refresh(cache_key, key))
return


async def _refresh(
cache_key: str,
key: str,
) -> T | None:
# Try live
try:
async with await _use_client() as client:
Expand All @@ -253,6 +226,12 @@ async def _refresh(
value=res,
)

# Return value
return _parse(
type_res=type_res,
value=res,
)


@async_lru_cache()
async def _use_client() -> AzureAppConfigurationClient:
Expand Down
5 changes: 2 additions & 3 deletions app/helpers/llm_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def new_claim(

# Store the last message and use it at first message of the new claim
self.call = await _db.call_create(
call=CallStateModel(
CallStateModel(
initiate=self.call.initiate.model_copy(),
voice_id=self.call.voice_id,
messages=[
Expand All @@ -101,8 +101,7 @@ async def new_claim(
# Reinsert the last message, using more will add the user message asking to create the new claim and the assistant can loop on it sometimes
self.call.messages[-1],
],
),
scheduler=self.scheduler,
)
)
return "Claim, reminders and messages reset"

Expand Down
Loading

0 comments on commit 42286af

Please sign in to comment.