# core/summarizer_service.py (FINAL VERSION) """ Handles background text summarization using a GGUF model with llama-cpp-python. """ import asyncio import logging from datetime import datetime, timezone from llama_cpp import Llama from config import SUMMARIZER_MODEL_DIR from core.connection_manager import ConnectionManager from performance_config import PERF_CONFIG import os logger = logging.getLogger(__name__) # --- Tunable Parameters - Dynamic based on performance mode --- SUMMARY_TRIGGER_THRESHOLD = PERF_CONFIG["summary_threshold"] MAX_INPUT_CHARS = PERF_CONFIG["llm_context_size"] CONTEXT_KEEPALIVE_CHARS = min(100, MAX_INPUT_CHARS // 20) # Dynamic based on context size class SummarizerService: """ Manages text aggregation and periodic summarization for a single station. """ def __init__(self, text_queue: asyncio.Queue, manager: ConnectionManager, station_name: str): self.text_queue = text_queue self.manager = manager self.station_name = station_name self.transcript_buffer = "" self.utterance_id_buffer = [] # Buffer for utterance IDs self.llm = None self.summarization_in_progress = False def _load_model(self): """Loads the Llama.cpp model. This is a blocking operation.""" logger.info("Loading summarizer model... This may take a moment.") try: self.llm = Llama( model_path=SUMMARIZER_MODEL_DIR, n_ctx=MAX_INPUT_CHARS, n_gpu_layers=0, verbose=False, n_threads=PERF_CONFIG["llm_threads"], repeat_penalty=1.1, # Slightly reduced from 1.2 n_batch=64, # Added for optimization low_vram=True, # Added to save memory ) logger.info("Summarizer model loaded successfully.") except Exception as e: logger.error(f"Failed to load summarizer model: {e}", exc_info=True) self.llm = None def _generate_summary(self, text_to_summarize: str) -> str: """ Generates a summary from the given text. This is a blocking, CPU-intensive operation. """ if not self.llm: return "Summarizer model is not available." messages = [ {"role": "system", "content": "You are a broadcast summarizer. Your task is to provide a brief, neutral, and concise summary of the live radio transcript. Capture the main points. Do not add any preamble like 'Here is a summary'."}, {"role": "user", "content": f"Please summarize the following transcript:\n\n---\n{text_to_summarize}\n---"} ] try: output = self.llm.create_chat_completion(messages=messages, max_tokens=400) summary = output['choices'][0]['message']['content'].strip() return summary except Exception as e: logger.error(f"Error during summary generation: {e}") return "Error generating summary." async def run_summarization_loop(self): """ Main loop to listen for new text and trigger summarization. """ loop = asyncio.get_running_loop() self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "loading"}}) await loop.run_in_executor(None, self._load_model) if not self.llm: logger.error("Cannot start summarization loop; model failed to load.") self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "error", "message": "Model failed to load."}}) return self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "ready"}}) logger.info(f"🎤 Summarizer task started for station: {self.station_name}") while True: try: new_text, utterance_id = await self.text_queue.get() self.transcript_buffer += f" {new_text}" self.utterance_id_buffer.append(utterance_id) logger.debug(f"[{self.station_name}] Transcript buffer is now {len(self.transcript_buffer)} chars.") if len(self.transcript_buffer) > SUMMARY_TRIGGER_THRESHOLD and not self.summarization_in_progress: self.summarization_in_progress = True text_snapshot = self.transcript_buffer[-MAX_INPUT_CHARS:] source_ids_snapshot = self.utterance_id_buffer[:] logger.info(f"Triggering summary for {self.station_name} with {len(text_snapshot)} chars.") self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "summarizing"}}) summary = await loop.run_in_executor(None, self._generate_summary, text_snapshot) logger.info(f"Generated summary for {self.station_name}: {summary}") timestamp_utc = datetime.now(timezone.utc).isoformat() self.manager.broadcast_to_station(self.station_name, { "type": "summary", "payload": { "summary": summary, "station": self.station_name, "timestamp": timestamp_utc, "source_utterance_ids": source_ids_snapshot } }) self.transcript_buffer = self.transcript_buffer[-CONTEXT_KEEPALIVE_CHARS:] keep_ratio = CONTEXT_KEEPALIVE_CHARS / len(text_snapshot) if text_snapshot else 0 num_ids_to_keep = int(len(source_ids_snapshot) * keep_ratio) self.utterance_id_buffer = self.utterance_id_buffer[-num_ids_to_keep:] self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "ready"}}) self.summarization_in_progress = False else: logger.debug(f"[{self.station_name}] Not enough text to summarize or summarization already in progress.") self.text_queue.task_done() except asyncio.CancelledError: logger.info(f"🎤 Summarizer task for {self.station_name} cancelled.") break except Exception as e: logger.error(f"💥 Summarizer task for {self.station_name} failed: {e}", exc_info=True) self.summarization_in_progress = False await asyncio.sleep(5)