Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Dec 11, 2024
2 parents 69cb05d + 7be0d54 commit 069a919
Show file tree
Hide file tree
Showing 19 changed files with 494 additions and 283 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ Conversation options are represented as features. They can be configured from Ap
| `recognition_retry_max` | The maximum number of retries for voice recognition. | `int` | 2 |
| `recording_enabled` | Whether call recording is enabled. | `bool` | false |
| `slow_llm_for_chat` | Whether to use the slow LLM for chat. | `bool` | false |
| `vad_cutoff_timeout_ms` | The cutoff timeout for voice activity detection in secs. | `int` | 300 |
| `vad_cutoff_timeout_ms` | The cutoff timeout for voice activity detection in secs. | `int` | 150 |
| `vad_silence_timeout_ms` | The timeout for phone silence in secs. | `int` | 500 |
| `vad_threshold` | The threshold for voice activity detection. | `float` | 0.5 |

Expand Down
7 changes: 1 addition & 6 deletions app/helpers/cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import functools
from collections import OrderedDict
from collections.abc import AsyncGenerator, Awaitable
Expand Down Expand Up @@ -33,11 +32,7 @@ def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs) -> Awaitable:
# Create a cache key from event loop, args and kwargs, using frozenset for kwargs to ensure hashability
key = (
asyncio.get_event_loop(),
args,
frozenset(kwargs.items()),
)
key = (args, frozenset(kwargs.items()))

if key in cache:
# Move the recently accessed key to the end (most recently used)
Expand Down
8 changes: 5 additions & 3 deletions app/helpers/call_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async def on_call_connected(
_handle_recording(
call=call,
client=client,
scheduler=scheduler,
server_call_id=server_call_id,
), # Second, start recording the call
)
Expand Down Expand Up @@ -234,7 +235,7 @@ async def on_automation_recognize_error(
logger.info(
"Timeout, retrying language selection (%s/%s)",
call.recognition_retry,
await recognition_retry_max(),
await recognition_retry_max(scheduler),
)
await _handle_ivr_language(
call=call,
Expand Down Expand Up @@ -320,7 +321,7 @@ async def _pre_recognize_error(
Returns True if the call should continue, False if it should end.
"""
# Voice retries are exhausted, end call
if call.recognition_retry >= await recognition_retry_max():
if call.recognition_retry >= await recognition_retry_max(scheduler):
logger.info("Timeout, ending call")
return False

Expand Down Expand Up @@ -792,14 +793,15 @@ async def _handle_ivr_language(
async def _handle_recording(
call: CallStateModel,
client: CallAutomationClient,
scheduler: Scheduler,
server_call_id: str,
) -> None:
"""
Start recording the call.
Feature activation is checked before starting the recording.
"""
if not await recording_enabled():
if not await recording_enabled(scheduler):
return

assert CONFIG.communication_services.recording_container_url
Expand Down
50 changes: 27 additions & 23 deletions app/helpers/call_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import UTC, datetime, timedelta
from functools import wraps

from aiojobs import Job, Scheduler
from aiojobs import Scheduler
from azure.cognitiveservices.speech import (
SpeechSynthesizer,
)
Expand Down Expand Up @@ -65,6 +65,7 @@ async def load_llm_chat( # noqa: PLR0913, PLR0915
stt_complete_gate = asyncio.Event() # Gate to wait for the recognition
aec = EchoCancellationStream(
sample_rate=audio_sample_rate,
scheduler=scheduler,
)
audio_reference: asyncio.Queue[bytes] = asyncio.Queue()

Expand Down Expand Up @@ -111,9 +112,9 @@ def _complete_stt_callback(text: str) -> None:
stt_buffer.append("")
# Store the recognition
stt_buffer[-1] = text
logger.debug("Complete recognition: %s", stt_buffer)
# Add a new buffer for the next partial recognition
stt_buffer.append("")
logger.debug("Complete recognition: %s", stt_buffer)

# Open the recognition gate
stt_complete_gate.set()
Expand All @@ -131,7 +132,7 @@ def _complete_stt_callback(text: str) -> None:
) as tts_client,
):
# Build scheduler
last_response: Job | None = None
last_chat: asyncio.Task | None = None

async def _timeout_callback() -> None:
"""
Expand All @@ -156,25 +157,25 @@ async def _stop_callback() -> None:
"""
Triggered when the audio buffer needs to be cleared.
"""
# Close previous response now
if last_response:
await last_response.close(timeout=0)
# Cancel previous chat
if last_chat:
last_chat.cancel()

# Stop TTS, clear the buffer and send a stop signal
# Stop TTS task
tts_client.stop_speaking_async()

# Reset the recognition
stt_buffer.clear()
stt_complete_gate.clear()

# Clear the audio buffer
# Clear the out buffer
while not audio_out.empty():
audio_out.get_nowait()
audio_out.task_done()

# Send a stop signal
await audio_out.put(False)

# Reset TTS buffer
stt_buffer.clear()
stt_complete_gate.clear()

async def _commit_answer(
wait: bool,
tool_blacklist: set[str] | None = None,
Expand All @@ -185,8 +186,8 @@ async def _commit_answer(
Start the chat task and wait for its response if needed. Job is stored in `last_response` shared variable.
"""
# Start chat task
nonlocal last_response
last_response = await scheduler.spawn(
nonlocal last_chat
last_chat = asyncio.create_task(
_continue_chat(
call=call,
client=automation_client,
Expand All @@ -200,7 +201,7 @@ async def _commit_answer(

# Wait for its response
if wait:
await last_response.wait()
await last_chat

async def _response_callback(_retry: bool = False) -> None:
"""
Expand Down Expand Up @@ -271,10 +272,11 @@ async def _response_callback(_retry: bool = False) -> None:
# Detect VAD
_process_audio_for_vad(
call=call,
stop_callback=_stop_callback,
echo_cancellation=aec,
out_stream=stt_stream,
response_callback=_response_callback,
scheduler=scheduler,
stop_callback=_stop_callback,
timeout_callback=_timeout_callback,
),
)
Expand Down Expand Up @@ -354,10 +356,10 @@ def _loading_task() -> asyncio.Task:
# Timeouts
soft_timeout_triggered = False
soft_timeout_task = asyncio.create_task(
asyncio.sleep(await answer_soft_timeout_sec())
asyncio.sleep(await answer_soft_timeout_sec(scheduler))
)
hard_timeout_task = asyncio.create_task(
asyncio.sleep(await answer_hard_timeout_sec())
asyncio.sleep(await answer_hard_timeout_sec(scheduler))
)

def _clear_tasks() -> None:
Expand Down Expand Up @@ -387,7 +389,7 @@ def _clear_tasks() -> None:
if hard_timeout_task.done():
logger.warning(
"Hard timeout of %ss reached",
await answer_hard_timeout_sec(),
await answer_hard_timeout_sec(scheduler),
)
# Clean up
_clear_tasks()
Expand All @@ -399,7 +401,7 @@ def _clear_tasks() -> None:
if soft_timeout_task.done() and not soft_timeout_triggered:
logger.warning(
"Soft timeout of %ss reached",
await answer_soft_timeout_sec(),
await answer_soft_timeout_sec(scheduler),
)
soft_timeout_triggered = True
# Never store the error message in the call history, it has caused hallucinations in the LLM
Expand Down Expand Up @@ -548,6 +550,7 @@ async def _content_callback(buffer: str) -> None:
async for delta in completion_stream(
max_tokens=160, # Lowest possible value for 90% of the cases, if not sufficient, retry will be triggered, 100 tokens ~= 75 words, 20 words ~= 1 sentence, 6 sentences ~= 160 tokens
messages=call.messages,
scheduler=scheduler,
system=system,
tools=tools,
):
Expand Down Expand Up @@ -659,6 +662,7 @@ async def _process_audio_for_vad( # noqa: PLR0913
echo_cancellation: EchoCancellationStream,
out_stream: PushAudioInputStream,
response_callback: Callable[[], Awaitable[None]],
scheduler: Scheduler,
stop_callback: Callable[[], Awaitable[None]],
timeout_callback: Callable[[], Awaitable[None]],
) -> None:
Expand All @@ -682,7 +686,7 @@ async def _wait_for_silence() -> None:
"""
# Wait before flushing
nonlocal stop_task
timeout_ms = await vad_silence_timeout_ms()
timeout_ms = await vad_silence_timeout_ms(scheduler)
await asyncio.sleep(timeout_ms / 1000)

# Cancel the clear TTS task
Expand All @@ -695,7 +699,7 @@ async def _wait_for_silence() -> None:
await response_callback()

# Wait for silence and trigger timeout
timeout_sec = await phone_silence_timeout_sec()
timeout_sec = await phone_silence_timeout_sec(scheduler)
while True:
# Stop this time if the call played a message
timeout_start = datetime.now(UTC)
Expand Down Expand Up @@ -724,7 +728,7 @@ async def _wait_for_stop() -> None:
"""
Stop the TTS if user speaks for too long.
"""
timeout_ms = await vad_cutoff_timeout_ms()
timeout_ms = await vad_cutoff_timeout_ms(scheduler)

# Wait before clearing the TTS queue
await asyncio.sleep(timeout_ms / 1000)
Expand Down
Loading

0 comments on commit 069a919

Please sign in to comment.