File size: 17,143 Bytes
c6a4584
a715158
 
e5ab292
a715158
 
 
 
44e015c
c6a4584
a715158
 
e5ab292
 
a715158
 
728e522
a715158
 
c6a4584
728e522
a715158
c6a4584
a715158
 
 
241e4a1
e5ab292
a715158
 
 
e5ab292
a715158
e5ab292
a715158
e5ab292
a715158
e5ab292
728e522
e5ab292
a715158
e5ab292
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a715158
e5ab292
a715158
e5ab292
a715158
e5ab292
 
 
241e4a1
e5ab292
a715158
 
e5ab292
44e015c
a715158
e5ab292
44e015c
 
 
 
 
 
3f55f46
44e015c
 
 
 
a715158
 
 
 
c6a4584
44e015c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a715158
 
 
e5ab292
a715158
 
 
 
44e015c
 
e5ab292
728e522
a715158
 
 
 
241e4a1
a715158
 
241e4a1
 
 
 
 
44e015c
 
 
 
241e4a1
 
 
a715158
 
e5ab292
a715158
e5ab292
728e522
e5ab292
728e522
e5ab292
 
a715158
e5ab292
a715158
 
e5ab292
 
 
a715158
e5ab292
 
a715158
 
e5ab292
 
a715158
e5ab292
44e015c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
728e522
44e015c
 
 
 
 
3f55f46
44e015c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
728e522
a715158
44e015c
 
 
 
 
 
 
 
 
 
 
 
 
3f55f46
44e015c
 
 
 
 
 
 
 
 
728e522
44e015c
728e522
44e015c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e5ab292
44e015c
 
3f55f46
44e015c
 
 
 
 
a715158
e5ab292
a715158
44e015c
a715158
44e015c
 
 
a715158
e5ab292
 
a715158
 
e5ab292
a715158
 
e5ab292
 
44e015c
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# core/audio_streamer.py
"""
Fetches the live radio stream, transcodes it with FFmpeg, and distributes it.
ENHANCED WITH DETAILED DEBUGGING AND EXPLICIT TIMEOUTS.
"""
import asyncio
import logging
import aiohttp
import ssl
from config import CHUNK_SIZE, SAMPLE_RATE, BYTES_PER_SAMPLE
from core.connection_manager import ConnectionManager

logger = logging.getLogger("audio_streamer")
QUEUE_PUT_TIMEOUT = 5.0

class AudioStreamer:
    def __init__(self, pcm_queue: asyncio.Queue, manager: ConnectionManager, radio_url: str, station_name: str):
        self.pcm_queue = pcm_queue
        self.manager = manager
        self.radio_url = radio_url
        self.station_name = station_name
        self._pcm_reader_task: asyncio.Task | None = None
        logger.info(f"Audio Streamer initialized for station: {radio_url}")

    async def _pcm_reader_loop(self, proc: asyncio.subprocess.Process):
        """Reads PCM data from FFmpeg's stdout and queues it for ASR."""
        logger.info("[PCM_READER] Reader started for PID %s", proc.pid)
        logger.info(f"[PCM_READER] Task started. Reading from FFmpeg PID {proc.pid} stdout.")
        server_stream_time = 0.0
        try:
            while True:
                logger.debug(f"[PCM_READER] Awaiting proc.stdout.read({CHUNK_SIZE})...")
                pcm_chunk = await proc.stdout.read(CHUNK_SIZE)
                
                if len(pcm_chunk) == 0:
                    logger.warning("[PCM_READER] Reached end of FFmpeg stream (read 0 bytes). Exiting loop.")
                    break
                
                # logger.info(f"[PCM_READER] Read {len(pcm_chunk)} bytes of PCM data from FFmpeg.")

                chunk_duration = len(pcm_chunk) / (SAMPLE_RATE * BYTES_PER_SAMPLE)
                
                try:
                    q_size = self.pcm_queue.qsize()
                    logger.debug(f"[PCM_READER] PCM queue size is {q_size}/{self.pcm_queue.maxsize} before put.")
                    
                    await asyncio.wait_for(
                        self.pcm_queue.put((pcm_chunk, server_stream_time)),
                        timeout=QUEUE_PUT_TIMEOUT
                    )
                    
                    logger.debug(f"[PCM_READER] Successfully queued chunk for ASR at stream time {server_stream_time:.2f}s.")
                except asyncio.TimeoutError:
                    logger.error(
                        f"[PCM_READER] DEADLOCK ALERT: Failed to put PCM chunk into ASR queue within {QUEUE_PUT_TIMEOUT}s. "
                        "The ASR service may be stuck. Clearing queue to prevent total stall."
                    )
                    while not self.pcm_queue.empty():
                        self.pcm_queue.get_nowait()
                except Exception as q_err:
                    logger.error(f"[PCM_READER] Error putting to queue: {q_err}", exc_info=True)
                    break

                server_stream_time += chunk_duration

        except asyncio.CancelledError:
            logger.info("[PCM_READER] Task cancelled.")
        except Exception as e:
            logger.error(f"[PCM_READER] πŸ’₯ Task failed unexpectedly: {e}", exc_info=True)
        finally:
            logger.warning("[PCM_READER] Task finished.")
            logger.warning("[PCM_READER] Reader exited for PID %s", proc.pid)

    async def run_fetching_loop(self):
        """The main loop for fetching and processing the radio stream."""
        logger.info("[FETCHER] Main fetcher task started.")
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
        loop = asyncio.get_running_loop()
        
        # Create SSL context with relaxed verification for problematic streams
        ssl_context = ssl.create_default_context()
        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE
        
        # Define timeout for the connection
        timeout = aiohttp.ClientTimeout(total=None, connect=30, sock_read=None, sock_connect=30)
        
        # Track consecutive failures to implement exponential backoff
        consecutive_failures = 0
        max_consecutive_failures = 5

        while True:
            proc = None
            try:
                logger.info(f"[FETCHER] Attempting to connect to radio stream: {self.radio_url}")
                
                # Try with SSL first (for HTTPS streams)
                connector = aiohttp.TCPConnector(
                    ssl=ssl_context,
                    limit=100,
                    limit_per_host=30,
                    ttl_dns_cache=300,
                    use_dns_cache=False,
                )
                
                async with aiohttp.ClientSession(
                    timeout=timeout, 
                    connector=connector,
                    headers=headers
                ) as session:
                    async with session.get(self.radio_url) as response:
                        if response.status != 200:
                            raise aiohttp.ClientError(f"Radio stream returned status: {response.status}")

                        logger.info("βœ… [FETCHER] Successfully connected to radio stream.")
                        content_type = response.headers.get('Content-Type', '').lower()
                        mime = 'audio/mpeg'
                        if 'aac' in content_type:
                            mime = 'audio/aac'
                        elif 'ogg' in content_type:
                            mime = 'audio/ogg'
                        logger.info(f"[FETCHER] Determined client MIME type: {mime}")
                        self.manager.broadcast_to_station(self.station_name, {"type": "config", "payload": {"mime": mime}})

                        proc = await asyncio.create_subprocess_exec(
                            'ffmpeg', '-i', 'pipe:0', '-f', 's16le', '-ar', str(SAMPLE_RATE),
                            '-ac', '1', '-loglevel', 'warning', 'pipe:1',
                            stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, 
                        )

                        logger.info("[FETCHER] Started FFmpeg process. PID: %s", proc.pid)

                        # cancel old reader (if any) *before* spawning the new one
                        if self._pcm_reader_task and not self._pcm_reader_task.done():
                            self._pcm_reader_task.cancel()
                            try:
                                await asyncio.wait_for(self._pcm_reader_task, timeout=2.0)
                            except asyncio.TimeoutError:
                                logger.warning("[FETCHER] PCM reader task did not cancel in time.")

                        # now spawn reader for the *new* process
                        logger.info("[FETCHER] Spawning reader for PID %s", proc.pid)
                        self._pcm_reader_task = asyncio.create_task(self._pcm_reader_loop(proc))

                        chunk_count = 0
                        async for chunk in response.content.iter_chunked(CHUNK_SIZE):
                            chunk_count += 1
                            # logger.info(f"[FETCHER] Received chunk #{chunk_count} ({len(chunk)} bytes) from radio stream.")
                            
                            self.manager.broadcast_to_station(self.station_name, chunk)
                            logger.debug(f"[FETCHER] Broadcasted {len(chunk)} raw bytes to clients.")
                            
                            try:
                                logger.debug(f"[FETCHER] Writing {len(chunk)} bytes to FFmpeg stdin...")
                                await loop.run_in_executor(None, proc.stdin.write, chunk)
                                await proc.stdin.drain()
                                logger.debug("[FETCHER] FFmpeg stdin drained successfully.")
                            except (BrokenPipeError, ConnectionResetError) as pipe_err:
                                logger.warning(f"[FETCHER] FFmpeg process closed stdin pipe unexpectedly: {pipe_err}. Ending stream processing.")
                                break
                        
                        logger.warning("[FETCHER] Radio stream chunk iteration ended. Closing FFmpeg stdin.")
                        if proc.stdin and not proc.stdin.is_closing():
                            proc.stdin.close()
                        
                        logger.info("[FETCHER] Waiting for PCM reader task to complete...")
                        await self._pcm_reader_task
                        logger.info("[FETCHER] PCM reader task has completed.")
                
                # Reset failure counter on successful connection
                consecutive_failures = 0
                
            except aiohttp.ClientConnectorError as conn_err:
                consecutive_failures += 1
                error_msg = str(conn_err)
                logger.error(f"[FETCHER] Connection error: {error_msg}")
                
                # Check if it's a connection refused error
                if "Connect call failed" in error_msg or "Connection refused" in error_msg:
                    logger.warning(f"[FETCHER] Connection refused for {self.radio_url}. This may indicate the stream is offline or the URL is incorrect.")
                elif "Name or service not known" in error_msg:
                    logger.warning(f"[FETCHER] DNS resolution failed for {self.radio_url}. Check the URL or network connectivity.")
                
                # Implement exponential backoff
                backoff_time = min(5 * (2 ** consecutive_failures), 60)  # Max 60 seconds
                logger.info(f"[FETCHER] Consecutive failures: {consecutive_failures}. Backing off for {backoff_time} seconds...")
                
                if consecutive_failures >= max_consecutive_failures:
                    logger.error(f"[FETCHER] Too many consecutive failures ({consecutive_failures}). Consider checking the stream URL.")
                    # Send error message to clients
                    self.manager.broadcast_to_station(self.station_name, {
                        "type": "error", 
                        "payload": {
                            "message": f"Unable to connect to stream after {consecutive_failures} attempts. The stream may be offline."
                        }
                    })
                await asyncio.sleep(backoff_time)
            except ssl.SSLError as ssl_err:
                logger.warning(f"[FETCHER] SSL error occurred: {ssl_err}. Trying without SSL verification...")
                # Retry with completely disabled SSL verification
                try:
                    connector = aiohttp.TCPConnector(ssl=False)
                    async with aiohttp.ClientSession(timeout=timeout, connector=connector, headers=headers) as session:
                        async with session.get(self.radio_url) as response:
                            if response.status != 200:
                                raise aiohttp.ClientError(f"Radio stream returned status: {response.status}")
                            
                            logger.info("βœ… [FETCHER] Successfully connected to radio stream (SSL disabled).")
                            content_type = response.headers.get('Content-Type', '').lower()
                            mime = 'audio/mpeg'
                            if 'aac' in content_type:
                                mime = 'audio/aac'
                            elif 'ogg' in content_type:
                                mime = 'audio/ogg'
                            logger.info(f"[FETCHER] Determined client MIME type: {mime}")
                            self.manager.broadcast_to_station(self.station_name, {"type": "config", "payload": {"mime": mime}})

                            proc = await asyncio.create_subprocess_exec(
                                'ffmpeg', '-i', 'pipe:0', '-f', 's16le', '-ar', str(SAMPLE_RATE),
                                '-ac', '1', '-loglevel', 'warning', 'pipe:1',
                                stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, 
                            )

                            logger.info("[FETCHER] Started FFmpeg process. PID: %s", proc.pid)

                            # cancel old reader (if any) *before* spawning the new one
                            if self._pcm_reader_task and not self._pcm_reader_task.done():
                                self._pcm_reader_task.cancel()
                                try:
                                    await asyncio.wait_for(self._pcm_reader_task, timeout=2.0)
                                except (asyncio.TimeoutError, aiohttp.ServerTimeoutError):
                                    logger.warning("[FETCHER] PCM reader task did not cancel in time.")

                            # now spawn reader for the *new* process
                            logger.info("[FETCHER] Spawning reader for PID %s", proc.pid)
                            self._pcm_reader_task = asyncio.create_task(self._pcm_reader_loop(proc))

                            chunk_count = 0
                            async for chunk in response.content.iter_chunked(CHUNK_SIZE):
                                chunk_count += 1
                                # logger.info(f"[FETCHER] Received chunk #{chunk_count} ({len(chunk)} bytes) from radio stream.")
                                
                                self.manager.broadcast_to_station(self.station_name, chunk)
                                logger.debug(f"[FETCHER] Broadcasted {len(chunk)} raw bytes to clients.")
                                
                                try:
                                    logger.debug(f"[FETCHER] Writing {len(chunk)} bytes to FFmpeg stdin...")
                                    await loop.run_in_executor(None, proc.stdin.write, chunk)
                                    await proc.stdin.drain()
                                    logger.debug("[FETCHER] FFmpeg stdin drained successfully.")
                                except (BrokenPipeError, ConnectionResetError) as pipe_err:
                                    logger.warning(f"[FETCHER] FFmpeg process closed stdin pipe unexpectedly: {pipe_err}. Ending stream processing.")
                                    break
                            
                            logger.warning("[FETCHER] Radio stream chunk iteration ended. Closing FFmpeg stdin.")
                            if proc.stdin and not proc.stdin.is_closing():
                                proc.stdin.close()
                            
                            logger.info("[FETCHER] Waiting for PCM reader task to complete...")
                            await self._pcm_reader_task
                            logger.info("[FETCHER] PCM reader task has completed.")
                            
                    # Reset failure counter on successful connection
                    consecutive_failures = 0
                except Exception as retry_err:
                    consecutive_failures += 1
                    logger.error(f"[FETCHER] Retry with disabled SSL also failed: {retry_err}")
                    
            except asyncio.TimeoutError:
                consecutive_failures += 1
                logger.warning("[FETCHER] Connection to radio stream timed out.")
                await asyncio.sleep(5) # Wait before reconnecting on timeout
                
            except aiohttp.ClientError as client_err:
                consecutive_failures += 1
                logger.error(f"[FETCHER] Client error: {client_err}")
                
            except asyncio.CancelledError:
                logger.info("[FETCHER] Main fetcher task cancelled.")
                break
                
            except Exception as e:
                consecutive_failures += 1
                logger.error(f"[FETCHER] πŸ’₯ Fetcher loop failed: {e}. Reconnecting...", exc_info=True)
                
            finally:
                logger.warning("[FETCHER] Cleaning up current session...")
                if self._pcm_reader_task and not self._pcm_reader_task.done():
                    self._pcm_reader_task.cancel()
                if proc and proc.returncode is None:
                    logger.info(f"[FETCHER] Terminating FFmpeg process PID: {proc.pid}")
                    proc.terminate()
                    await proc.wait()
                    logger.info(f"[FETCHER] FFmpeg process PID: {proc.pid} terminated.")
                
                # Calculate backoff time based on consecutive failures
                if consecutive_failures > 0:
                    backoff_time = min(5 * (2 ** (consecutive_failures - 1)), 60)
                else:
                    backoff_time = 5
                    
                logger.info(f"[FETCHER] Cleanup complete. Waiting {backoff_time} seconds before next action.")
                await asyncio.sleep(backoff_time)