Skip to content

Commit

Permalink
perf: Optimize echo reduction latency
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Dec 11, 2024
1 parent 5fe3c42 commit 23b095d
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions app/helpers/call_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ class EchoCancellationStream:
_reference_queue: asyncio.Queue[bytes] = asyncio.Queue()
_sample_rate: int
_scheduler: Scheduler
_empty_packet: bytes

def __init__(
self,
Expand All @@ -654,6 +655,7 @@ def __init__(

self._chunk_size = int(self._sample_rate * self._packet_duration_ms / 1000)
self._packet_size = self._chunk_size * 2 # Each sample is 2 bytes (PCM 16-bit)
self._empty_packet: bytes = b"\x00" * self._packet_size

def _pcm_to_float(self, pcm: bytes) -> np.ndarray:
"""
Expand Down Expand Up @@ -705,11 +707,13 @@ async def _process_one(self, input_pcm: bytes) -> None:
"""
Process one audio chunk with echo cancellation.
"""
# Use silence as the reference if none is available
# Push raw input if reference is empty
if self._reference_queue.empty():
reference_pcm = b"\x00" * self._packet_size
reference_pcm = self._empty_packet

# Reference signal is available
else:
reference_pcm = await self._reference_queue.get()
reference_pcm = self._reference_queue.get_nowait()
self._reference_queue.task_done()

# Convert PCM to float for processing
Expand All @@ -719,6 +723,15 @@ async def _process_one(self, input_pcm: bytes) -> None:
# Update the input buffer with the reference signal
self._update_input_buffer(reference_signal)

# Reference signal is empty, skip noise reduction
if np.all(reference_signal == 0):
# Perform VAD test
input_speaking = await self._rms_speech_detection(input_signal)

# Add processed PCM and metadata to the output queue
self._output_queue.put_nowait((input_pcm, input_speaking))
return

# Apply noise reduction
reduced_signal = reduce_noise(
# Input signal
Expand All @@ -741,7 +754,27 @@ async def _process_one(self, input_pcm: bytes) -> None:
processed_pcm = self._float_to_pcm(reduced_signal)

# Add processed PCM and metadata to the output queue
await self._output_queue.put((processed_pcm, input_speaking))
self._output_queue.put_nowait((processed_pcm, input_speaking))

async def _ensure_stream(self, input_pcm: bytes) -> None:
"""
Ensure the audio stream is processed in real-time.
If the processing is delayed, the original input will be returned.
"""
# Process the audio
try:
await asyncio.wait_for(
self._process_one(input_pcm),
timeout=self._packet_duration_ms
/ 1000
* 4, # Allow temporary medium latency
)

# If the processing is delayed, return the original input
except TimeoutError:
logger.warning("Echo processing timeout, returning input")
await self._output_queue.put((input_pcm, False))

async def process_stream(self) -> None:
"""
Expand All @@ -756,14 +789,7 @@ async def process_stream(self) -> None:
self._input_queue.task_done()

# Queue the processing
await scheduler.spawn(
asyncio.wait_for(
self._process_one(input_pcm),
timeout=self._packet_duration_ms
/ 1000
* 5, # Allow temporary high latency
)
)
await scheduler.spawn(self._ensure_stream(input_pcm))

async def push_input(self, audio_data: bytes) -> None:
"""
Expand Down Expand Up @@ -794,13 +820,4 @@ async def pull_audio(self) -> tuple[bytes, bool]:
Returns a tuple with the echo-cancelled PCM audio and a boolean flag indicating if the user was speaking.
"""
# return await self._output_queue.get()
try:
return await asyncio.wait_for(
fut=self._output_queue.get(),
timeout=self._packet_duration_ms
/ 1000
* 1.5, # Allow temporary small latency
)
except TimeoutError:
return b"\x00" * self._packet_size, False # Silence PCM chunk and no speech
return await self._output_queue.get()

0 comments on commit 23b095d

Please sign in to comment.