Live-Radio-Karaoke / core /audio_streamer.py
Luigi's picture
fix(core): Resolve startup race conditions and stabilize connections
3f55f46
# core/audio_streamer.py
"""
Fetches the live radio stream, transcodes it with FFmpeg, and distributes it.
ENHANCED WITH DETAILED DEBUGGING AND EXPLICIT TIMEOUTS.
"""
import asyncio
import logging
import aiohttp
import ssl
from config import CHUNK_SIZE, SAMPLE_RATE, BYTES_PER_SAMPLE
from core.connection_manager import ConnectionManager
logger = logging.getLogger("audio_streamer")
QUEUE_PUT_TIMEOUT = 5.0
class AudioStreamer:
def __init__(self, pcm_queue: asyncio.Queue, manager: ConnectionManager, radio_url: str, station_name: str):
self.pcm_queue = pcm_queue
self.manager = manager
self.radio_url = radio_url
self.station_name = station_name
self._pcm_reader_task: asyncio.Task | None = None
logger.info(f"Audio Streamer initialized for station: {radio_url}")
async def _pcm_reader_loop(self, proc: asyncio.subprocess.Process):
"""Reads PCM data from FFmpeg's stdout and queues it for ASR."""
logger.info("[PCM_READER] Reader started for PID %s", proc.pid)
logger.info(f"[PCM_READER] Task started. Reading from FFmpeg PID {proc.pid} stdout.")
server_stream_time = 0.0
try:
while True:
logger.debug(f"[PCM_READER] Awaiting proc.stdout.read({CHUNK_SIZE})...")
pcm_chunk = await proc.stdout.read(CHUNK_SIZE)
if len(pcm_chunk) == 0:
logger.warning("[PCM_READER] Reached end of FFmpeg stream (read 0 bytes). Exiting loop.")
break
# logger.info(f"[PCM_READER] Read {len(pcm_chunk)} bytes of PCM data from FFmpeg.")
chunk_duration = len(pcm_chunk) / (SAMPLE_RATE * BYTES_PER_SAMPLE)
try:
q_size = self.pcm_queue.qsize()
logger.debug(f"[PCM_READER] PCM queue size is {q_size}/{self.pcm_queue.maxsize} before put.")
await asyncio.wait_for(
self.pcm_queue.put((pcm_chunk, server_stream_time)),
timeout=QUEUE_PUT_TIMEOUT
)
logger.debug(f"[PCM_READER] Successfully queued chunk for ASR at stream time {server_stream_time:.2f}s.")
except asyncio.TimeoutError:
logger.error(
f"[PCM_READER] DEADLOCK ALERT: Failed to put PCM chunk into ASR queue within {QUEUE_PUT_TIMEOUT}s. "
"The ASR service may be stuck. Clearing queue to prevent total stall."
)
while not self.pcm_queue.empty():
self.pcm_queue.get_nowait()
except Exception as q_err:
logger.error(f"[PCM_READER] Error putting to queue: {q_err}", exc_info=True)
break
server_stream_time += chunk_duration
except asyncio.CancelledError:
logger.info("[PCM_READER] Task cancelled.")
except Exception as e:
logger.error(f"[PCM_READER] 💥 Task failed unexpectedly: {e}", exc_info=True)
finally:
logger.warning("[PCM_READER] Task finished.")
logger.warning("[PCM_READER] Reader exited for PID %s", proc.pid)
async def run_fetching_loop(self):
"""The main loop for fetching and processing the radio stream."""
logger.info("[FETCHER] Main fetcher task started.")
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
loop = asyncio.get_running_loop()
# Create SSL context with relaxed verification for problematic streams
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Define timeout for the connection
timeout = aiohttp.ClientTimeout(total=None, connect=30, sock_read=None, sock_connect=30)
# Track consecutive failures to implement exponential backoff
consecutive_failures = 0
max_consecutive_failures = 5
while True:
proc = None
try:
logger.info(f"[FETCHER] Attempting to connect to radio stream: {self.radio_url}")
# Try with SSL first (for HTTPS streams)
connector = aiohttp.TCPConnector(
ssl=ssl_context,
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
use_dns_cache=False,
)
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers=headers
) as session:
async with session.get(self.radio_url) as response:
if response.status != 200:
raise aiohttp.ClientError(f"Radio stream returned status: {response.status}")
logger.info("✅ [FETCHER] Successfully connected to radio stream.")
content_type = response.headers.get('Content-Type', '').lower()
mime = 'audio/mpeg'
if 'aac' in content_type:
mime = 'audio/aac'
elif 'ogg' in content_type:
mime = 'audio/ogg'
logger.info(f"[FETCHER] Determined client MIME type: {mime}")
self.manager.broadcast_to_station(self.station_name, {"type": "config", "payload": {"mime": mime}})
proc = await asyncio.create_subprocess_exec(
'ffmpeg', '-i', 'pipe:0', '-f', 's16le', '-ar', str(SAMPLE_RATE),
'-ac', '1', '-loglevel', 'warning', 'pipe:1',
stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
)
logger.info("[FETCHER] Started FFmpeg process. PID: %s", proc.pid)
# cancel old reader (if any) *before* spawning the new one
if self._pcm_reader_task and not self._pcm_reader_task.done():
self._pcm_reader_task.cancel()
try:
await asyncio.wait_for(self._pcm_reader_task, timeout=2.0)
except asyncio.TimeoutError:
logger.warning("[FETCHER] PCM reader task did not cancel in time.")
# now spawn reader for the *new* process
logger.info("[FETCHER] Spawning reader for PID %s", proc.pid)
self._pcm_reader_task = asyncio.create_task(self._pcm_reader_loop(proc))
chunk_count = 0
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
chunk_count += 1
# logger.info(f"[FETCHER] Received chunk #{chunk_count} ({len(chunk)} bytes) from radio stream.")
self.manager.broadcast_to_station(self.station_name, chunk)
logger.debug(f"[FETCHER] Broadcasted {len(chunk)} raw bytes to clients.")
try:
logger.debug(f"[FETCHER] Writing {len(chunk)} bytes to FFmpeg stdin...")
await loop.run_in_executor(None, proc.stdin.write, chunk)
await proc.stdin.drain()
logger.debug("[FETCHER] FFmpeg stdin drained successfully.")
except (BrokenPipeError, ConnectionResetError) as pipe_err:
logger.warning(f"[FETCHER] FFmpeg process closed stdin pipe unexpectedly: {pipe_err}. Ending stream processing.")
break
logger.warning("[FETCHER] Radio stream chunk iteration ended. Closing FFmpeg stdin.")
if proc.stdin and not proc.stdin.is_closing():
proc.stdin.close()
logger.info("[FETCHER] Waiting for PCM reader task to complete...")
await self._pcm_reader_task
logger.info("[FETCHER] PCM reader task has completed.")
# Reset failure counter on successful connection
consecutive_failures = 0
except aiohttp.ClientConnectorError as conn_err:
consecutive_failures += 1
error_msg = str(conn_err)
logger.error(f"[FETCHER] Connection error: {error_msg}")
# Check if it's a connection refused error
if "Connect call failed" in error_msg or "Connection refused" in error_msg:
logger.warning(f"[FETCHER] Connection refused for {self.radio_url}. This may indicate the stream is offline or the URL is incorrect.")
elif "Name or service not known" in error_msg:
logger.warning(f"[FETCHER] DNS resolution failed for {self.radio_url}. Check the URL or network connectivity.")
# Implement exponential backoff
backoff_time = min(5 * (2 ** consecutive_failures), 60) # Max 60 seconds
logger.info(f"[FETCHER] Consecutive failures: {consecutive_failures}. Backing off for {backoff_time} seconds...")
if consecutive_failures >= max_consecutive_failures:
logger.error(f"[FETCHER] Too many consecutive failures ({consecutive_failures}). Consider checking the stream URL.")
# Send error message to clients
self.manager.broadcast_to_station(self.station_name, {
"type": "error",
"payload": {
"message": f"Unable to connect to stream after {consecutive_failures} attempts. The stream may be offline."
}
})
await asyncio.sleep(backoff_time)
except ssl.SSLError as ssl_err:
logger.warning(f"[FETCHER] SSL error occurred: {ssl_err}. Trying without SSL verification...")
# Retry with completely disabled SSL verification
try:
connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(timeout=timeout, connector=connector, headers=headers) as session:
async with session.get(self.radio_url) as response:
if response.status != 200:
raise aiohttp.ClientError(f"Radio stream returned status: {response.status}")
logger.info("✅ [FETCHER] Successfully connected to radio stream (SSL disabled).")
content_type = response.headers.get('Content-Type', '').lower()
mime = 'audio/mpeg'
if 'aac' in content_type:
mime = 'audio/aac'
elif 'ogg' in content_type:
mime = 'audio/ogg'
logger.info(f"[FETCHER] Determined client MIME type: {mime}")
self.manager.broadcast_to_station(self.station_name, {"type": "config", "payload": {"mime": mime}})
proc = await asyncio.create_subprocess_exec(
'ffmpeg', '-i', 'pipe:0', '-f', 's16le', '-ar', str(SAMPLE_RATE),
'-ac', '1', '-loglevel', 'warning', 'pipe:1',
stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
)
logger.info("[FETCHER] Started FFmpeg process. PID: %s", proc.pid)
# cancel old reader (if any) *before* spawning the new one
if self._pcm_reader_task and not self._pcm_reader_task.done():
self._pcm_reader_task.cancel()
try:
await asyncio.wait_for(self._pcm_reader_task, timeout=2.0)
except (asyncio.TimeoutError, aiohttp.ServerTimeoutError):
logger.warning("[FETCHER] PCM reader task did not cancel in time.")
# now spawn reader for the *new* process
logger.info("[FETCHER] Spawning reader for PID %s", proc.pid)
self._pcm_reader_task = asyncio.create_task(self._pcm_reader_loop(proc))
chunk_count = 0
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
chunk_count += 1
# logger.info(f"[FETCHER] Received chunk #{chunk_count} ({len(chunk)} bytes) from radio stream.")
self.manager.broadcast_to_station(self.station_name, chunk)
logger.debug(f"[FETCHER] Broadcasted {len(chunk)} raw bytes to clients.")
try:
logger.debug(f"[FETCHER] Writing {len(chunk)} bytes to FFmpeg stdin...")
await loop.run_in_executor(None, proc.stdin.write, chunk)
await proc.stdin.drain()
logger.debug("[FETCHER] FFmpeg stdin drained successfully.")
except (BrokenPipeError, ConnectionResetError) as pipe_err:
logger.warning(f"[FETCHER] FFmpeg process closed stdin pipe unexpectedly: {pipe_err}. Ending stream processing.")
break
logger.warning("[FETCHER] Radio stream chunk iteration ended. Closing FFmpeg stdin.")
if proc.stdin and not proc.stdin.is_closing():
proc.stdin.close()
logger.info("[FETCHER] Waiting for PCM reader task to complete...")
await self._pcm_reader_task
logger.info("[FETCHER] PCM reader task has completed.")
# Reset failure counter on successful connection
consecutive_failures = 0
except Exception as retry_err:
consecutive_failures += 1
logger.error(f"[FETCHER] Retry with disabled SSL also failed: {retry_err}")
except asyncio.TimeoutError:
consecutive_failures += 1
logger.warning("[FETCHER] Connection to radio stream timed out.")
await asyncio.sleep(5) # Wait before reconnecting on timeout
except aiohttp.ClientError as client_err:
consecutive_failures += 1
logger.error(f"[FETCHER] Client error: {client_err}")
except asyncio.CancelledError:
logger.info("[FETCHER] Main fetcher task cancelled.")
break
except Exception as e:
consecutive_failures += 1
logger.error(f"[FETCHER] 💥 Fetcher loop failed: {e}. Reconnecting...", exc_info=True)
finally:
logger.warning("[FETCHER] Cleaning up current session...")
if self._pcm_reader_task and not self._pcm_reader_task.done():
self._pcm_reader_task.cancel()
if proc and proc.returncode is None:
logger.info(f"[FETCHER] Terminating FFmpeg process PID: {proc.pid}")
proc.terminate()
await proc.wait()
logger.info(f"[FETCHER] FFmpeg process PID: {proc.pid} terminated.")
# Calculate backoff time based on consecutive failures
if consecutive_failures > 0:
backoff_time = min(5 * (2 ** (consecutive_failures - 1)), 60)
else:
backoff_time = 5
logger.info(f"[FETCHER] Cleanup complete. Waiting {backoff_time} seconds before next action.")
await asyncio.sleep(backoff_time)