Spaces:
Sleeping
Sleeping
File size: 17,143 Bytes
c6a4584 a715158 e5ab292 a715158 44e015c c6a4584 a715158 e5ab292 a715158 728e522 a715158 c6a4584 728e522 a715158 c6a4584 a715158 241e4a1 e5ab292 a715158 e5ab292 a715158 e5ab292 a715158 e5ab292 a715158 e5ab292 728e522 e5ab292 a715158 e5ab292 a715158 e5ab292 a715158 e5ab292 a715158 e5ab292 241e4a1 e5ab292 a715158 e5ab292 44e015c a715158 e5ab292 44e015c 3f55f46 44e015c a715158 c6a4584 44e015c a715158 e5ab292 a715158 44e015c e5ab292 728e522 a715158 241e4a1 a715158 241e4a1 44e015c 241e4a1 a715158 e5ab292 a715158 e5ab292 728e522 e5ab292 728e522 e5ab292 a715158 e5ab292 a715158 e5ab292 a715158 e5ab292 a715158 e5ab292 a715158 e5ab292 44e015c 728e522 44e015c 3f55f46 44e015c 728e522 a715158 44e015c 3f55f46 44e015c 728e522 44e015c 728e522 44e015c e5ab292 44e015c 3f55f46 44e015c a715158 e5ab292 a715158 44e015c a715158 44e015c a715158 e5ab292 a715158 e5ab292 a715158 e5ab292 44e015c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# core/audio_streamer.py
"""
Fetches the live radio stream, transcodes it with FFmpeg, and distributes it.
ENHANCED WITH DETAILED DEBUGGING AND EXPLICIT TIMEOUTS.
"""
import asyncio
import logging
import aiohttp
import ssl
from config import CHUNK_SIZE, SAMPLE_RATE, BYTES_PER_SAMPLE
from core.connection_manager import ConnectionManager
logger = logging.getLogger("audio_streamer")
QUEUE_PUT_TIMEOUT = 5.0
class AudioStreamer:
def __init__(self, pcm_queue: asyncio.Queue, manager: ConnectionManager, radio_url: str, station_name: str):
self.pcm_queue = pcm_queue
self.manager = manager
self.radio_url = radio_url
self.station_name = station_name
self._pcm_reader_task: asyncio.Task | None = None
logger.info(f"Audio Streamer initialized for station: {radio_url}")
async def _pcm_reader_loop(self, proc: asyncio.subprocess.Process):
"""Reads PCM data from FFmpeg's stdout and queues it for ASR."""
logger.info("[PCM_READER] Reader started for PID %s", proc.pid)
logger.info(f"[PCM_READER] Task started. Reading from FFmpeg PID {proc.pid} stdout.")
server_stream_time = 0.0
try:
while True:
logger.debug(f"[PCM_READER] Awaiting proc.stdout.read({CHUNK_SIZE})...")
pcm_chunk = await proc.stdout.read(CHUNK_SIZE)
if len(pcm_chunk) == 0:
logger.warning("[PCM_READER] Reached end of FFmpeg stream (read 0 bytes). Exiting loop.")
break
# logger.info(f"[PCM_READER] Read {len(pcm_chunk)} bytes of PCM data from FFmpeg.")
chunk_duration = len(pcm_chunk) / (SAMPLE_RATE * BYTES_PER_SAMPLE)
try:
q_size = self.pcm_queue.qsize()
logger.debug(f"[PCM_READER] PCM queue size is {q_size}/{self.pcm_queue.maxsize} before put.")
await asyncio.wait_for(
self.pcm_queue.put((pcm_chunk, server_stream_time)),
timeout=QUEUE_PUT_TIMEOUT
)
logger.debug(f"[PCM_READER] Successfully queued chunk for ASR at stream time {server_stream_time:.2f}s.")
except asyncio.TimeoutError:
logger.error(
f"[PCM_READER] DEADLOCK ALERT: Failed to put PCM chunk into ASR queue within {QUEUE_PUT_TIMEOUT}s. "
"The ASR service may be stuck. Clearing queue to prevent total stall."
)
while not self.pcm_queue.empty():
self.pcm_queue.get_nowait()
except Exception as q_err:
logger.error(f"[PCM_READER] Error putting to queue: {q_err}", exc_info=True)
break
server_stream_time += chunk_duration
except asyncio.CancelledError:
logger.info("[PCM_READER] Task cancelled.")
except Exception as e:
logger.error(f"[PCM_READER] π₯ Task failed unexpectedly: {e}", exc_info=True)
finally:
logger.warning("[PCM_READER] Task finished.")
logger.warning("[PCM_READER] Reader exited for PID %s", proc.pid)
async def run_fetching_loop(self):
"""The main loop for fetching and processing the radio stream."""
logger.info("[FETCHER] Main fetcher task started.")
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
loop = asyncio.get_running_loop()
# Create SSL context with relaxed verification for problematic streams
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Define timeout for the connection
timeout = aiohttp.ClientTimeout(total=None, connect=30, sock_read=None, sock_connect=30)
# Track consecutive failures to implement exponential backoff
consecutive_failures = 0
max_consecutive_failures = 5
while True:
proc = None
try:
logger.info(f"[FETCHER] Attempting to connect to radio stream: {self.radio_url}")
# Try with SSL first (for HTTPS streams)
connector = aiohttp.TCPConnector(
ssl=ssl_context,
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
use_dns_cache=False,
)
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers=headers
) as session:
async with session.get(self.radio_url) as response:
if response.status != 200:
raise aiohttp.ClientError(f"Radio stream returned status: {response.status}")
logger.info("β
[FETCHER] Successfully connected to radio stream.")
content_type = response.headers.get('Content-Type', '').lower()
mime = 'audio/mpeg'
if 'aac' in content_type:
mime = 'audio/aac'
elif 'ogg' in content_type:
mime = 'audio/ogg'
logger.info(f"[FETCHER] Determined client MIME type: {mime}")
self.manager.broadcast_to_station(self.station_name, {"type": "config", "payload": {"mime": mime}})
proc = await asyncio.create_subprocess_exec(
'ffmpeg', '-i', 'pipe:0', '-f', 's16le', '-ar', str(SAMPLE_RATE),
'-ac', '1', '-loglevel', 'warning', 'pipe:1',
stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
)
logger.info("[FETCHER] Started FFmpeg process. PID: %s", proc.pid)
# cancel old reader (if any) *before* spawning the new one
if self._pcm_reader_task and not self._pcm_reader_task.done():
self._pcm_reader_task.cancel()
try:
await asyncio.wait_for(self._pcm_reader_task, timeout=2.0)
except asyncio.TimeoutError:
logger.warning("[FETCHER] PCM reader task did not cancel in time.")
# now spawn reader for the *new* process
logger.info("[FETCHER] Spawning reader for PID %s", proc.pid)
self._pcm_reader_task = asyncio.create_task(self._pcm_reader_loop(proc))
chunk_count = 0
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
chunk_count += 1
# logger.info(f"[FETCHER] Received chunk #{chunk_count} ({len(chunk)} bytes) from radio stream.")
self.manager.broadcast_to_station(self.station_name, chunk)
logger.debug(f"[FETCHER] Broadcasted {len(chunk)} raw bytes to clients.")
try:
logger.debug(f"[FETCHER] Writing {len(chunk)} bytes to FFmpeg stdin...")
await loop.run_in_executor(None, proc.stdin.write, chunk)
await proc.stdin.drain()
logger.debug("[FETCHER] FFmpeg stdin drained successfully.")
except (BrokenPipeError, ConnectionResetError) as pipe_err:
logger.warning(f"[FETCHER] FFmpeg process closed stdin pipe unexpectedly: {pipe_err}. Ending stream processing.")
break
logger.warning("[FETCHER] Radio stream chunk iteration ended. Closing FFmpeg stdin.")
if proc.stdin and not proc.stdin.is_closing():
proc.stdin.close()
logger.info("[FETCHER] Waiting for PCM reader task to complete...")
await self._pcm_reader_task
logger.info("[FETCHER] PCM reader task has completed.")
# Reset failure counter on successful connection
consecutive_failures = 0
except aiohttp.ClientConnectorError as conn_err:
consecutive_failures += 1
error_msg = str(conn_err)
logger.error(f"[FETCHER] Connection error: {error_msg}")
# Check if it's a connection refused error
if "Connect call failed" in error_msg or "Connection refused" in error_msg:
logger.warning(f"[FETCHER] Connection refused for {self.radio_url}. This may indicate the stream is offline or the URL is incorrect.")
elif "Name or service not known" in error_msg:
logger.warning(f"[FETCHER] DNS resolution failed for {self.radio_url}. Check the URL or network connectivity.")
# Implement exponential backoff
backoff_time = min(5 * (2 ** consecutive_failures), 60) # Max 60 seconds
logger.info(f"[FETCHER] Consecutive failures: {consecutive_failures}. Backing off for {backoff_time} seconds...")
if consecutive_failures >= max_consecutive_failures:
logger.error(f"[FETCHER] Too many consecutive failures ({consecutive_failures}). Consider checking the stream URL.")
# Send error message to clients
self.manager.broadcast_to_station(self.station_name, {
"type": "error",
"payload": {
"message": f"Unable to connect to stream after {consecutive_failures} attempts. The stream may be offline."
}
})
await asyncio.sleep(backoff_time)
except ssl.SSLError as ssl_err:
logger.warning(f"[FETCHER] SSL error occurred: {ssl_err}. Trying without SSL verification...")
# Retry with completely disabled SSL verification
try:
connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(timeout=timeout, connector=connector, headers=headers) as session:
async with session.get(self.radio_url) as response:
if response.status != 200:
raise aiohttp.ClientError(f"Radio stream returned status: {response.status}")
logger.info("β
[FETCHER] Successfully connected to radio stream (SSL disabled).")
content_type = response.headers.get('Content-Type', '').lower()
mime = 'audio/mpeg'
if 'aac' in content_type:
mime = 'audio/aac'
elif 'ogg' in content_type:
mime = 'audio/ogg'
logger.info(f"[FETCHER] Determined client MIME type: {mime}")
self.manager.broadcast_to_station(self.station_name, {"type": "config", "payload": {"mime": mime}})
proc = await asyncio.create_subprocess_exec(
'ffmpeg', '-i', 'pipe:0', '-f', 's16le', '-ar', str(SAMPLE_RATE),
'-ac', '1', '-loglevel', 'warning', 'pipe:1',
stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
)
logger.info("[FETCHER] Started FFmpeg process. PID: %s", proc.pid)
# cancel old reader (if any) *before* spawning the new one
if self._pcm_reader_task and not self._pcm_reader_task.done():
self._pcm_reader_task.cancel()
try:
await asyncio.wait_for(self._pcm_reader_task, timeout=2.0)
except (asyncio.TimeoutError, aiohttp.ServerTimeoutError):
logger.warning("[FETCHER] PCM reader task did not cancel in time.")
# now spawn reader for the *new* process
logger.info("[FETCHER] Spawning reader for PID %s", proc.pid)
self._pcm_reader_task = asyncio.create_task(self._pcm_reader_loop(proc))
chunk_count = 0
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
chunk_count += 1
# logger.info(f"[FETCHER] Received chunk #{chunk_count} ({len(chunk)} bytes) from radio stream.")
self.manager.broadcast_to_station(self.station_name, chunk)
logger.debug(f"[FETCHER] Broadcasted {len(chunk)} raw bytes to clients.")
try:
logger.debug(f"[FETCHER] Writing {len(chunk)} bytes to FFmpeg stdin...")
await loop.run_in_executor(None, proc.stdin.write, chunk)
await proc.stdin.drain()
logger.debug("[FETCHER] FFmpeg stdin drained successfully.")
except (BrokenPipeError, ConnectionResetError) as pipe_err:
logger.warning(f"[FETCHER] FFmpeg process closed stdin pipe unexpectedly: {pipe_err}. Ending stream processing.")
break
logger.warning("[FETCHER] Radio stream chunk iteration ended. Closing FFmpeg stdin.")
if proc.stdin and not proc.stdin.is_closing():
proc.stdin.close()
logger.info("[FETCHER] Waiting for PCM reader task to complete...")
await self._pcm_reader_task
logger.info("[FETCHER] PCM reader task has completed.")
# Reset failure counter on successful connection
consecutive_failures = 0
except Exception as retry_err:
consecutive_failures += 1
logger.error(f"[FETCHER] Retry with disabled SSL also failed: {retry_err}")
except asyncio.TimeoutError:
consecutive_failures += 1
logger.warning("[FETCHER] Connection to radio stream timed out.")
await asyncio.sleep(5) # Wait before reconnecting on timeout
except aiohttp.ClientError as client_err:
consecutive_failures += 1
logger.error(f"[FETCHER] Client error: {client_err}")
except asyncio.CancelledError:
logger.info("[FETCHER] Main fetcher task cancelled.")
break
except Exception as e:
consecutive_failures += 1
logger.error(f"[FETCHER] π₯ Fetcher loop failed: {e}. Reconnecting...", exc_info=True)
finally:
logger.warning("[FETCHER] Cleaning up current session...")
if self._pcm_reader_task and not self._pcm_reader_task.done():
self._pcm_reader_task.cancel()
if proc and proc.returncode is None:
logger.info(f"[FETCHER] Terminating FFmpeg process PID: {proc.pid}")
proc.terminate()
await proc.wait()
logger.info(f"[FETCHER] FFmpeg process PID: {proc.pid} terminated.")
# Calculate backoff time based on consecutive failures
if consecutive_failures > 0:
backoff_time = min(5 * (2 ** (consecutive_failures - 1)), 60)
else:
backoff_time = 5
logger.info(f"[FETCHER] Cleanup complete. Waiting {backoff_time} seconds before next action.")
await asyncio.sleep(backoff_time) |