Spaces:
Sleeping
Sleeping
File size: 6,752 Bytes
8fae503 728e522 8fae503 728e522 98fb113 728e522 98fb113 728e522 8fae503 728e522 8fae503 728e522 98fb113 728e522 8fae503 728e522 8fae503 728e522 8fae503 728e522 8fae503 728e522 8fae503 728e522 8fae503 728e522 8fae503 728e522 8fae503 728e522 8fae503 728e522 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# core/summarizer_service.py (FINAL VERSION)
"""
Handles background text summarization using a GGUF model with llama-cpp-python.
"""
import asyncio
import logging
from datetime import datetime, timezone
from llama_cpp import Llama
from config import SUMMARIZER_MODEL_DIR
from core.connection_manager import ConnectionManager
from performance_config import PERF_CONFIG
import os
logger = logging.getLogger(__name__)
# --- Tunable Parameters - Dynamic based on performance mode ---
SUMMARY_TRIGGER_THRESHOLD = PERF_CONFIG["summary_threshold"]
MAX_INPUT_CHARS = PERF_CONFIG["llm_context_size"]
CONTEXT_KEEPALIVE_CHARS = min(100, MAX_INPUT_CHARS // 20) # Dynamic based on context size
class SummarizerService:
"""
Manages text aggregation and periodic summarization for a single station.
"""
def __init__(self, text_queue: asyncio.Queue, manager: ConnectionManager, station_name: str):
self.text_queue = text_queue
self.manager = manager
self.station_name = station_name
self.transcript_buffer = ""
self.utterance_id_buffer = [] # Buffer for utterance IDs
self.llm = None
self.summarization_in_progress = False
def _load_model(self):
"""Loads the Llama.cpp model. This is a blocking operation."""
logger.info("Loading summarizer model... This may take a moment.")
try:
self.llm = Llama(
model_path=SUMMARIZER_MODEL_DIR,
n_ctx=MAX_INPUT_CHARS,
n_gpu_layers=0,
verbose=False,
n_threads=PERF_CONFIG["llm_threads"],
repeat_penalty=1.1, # Slightly reduced from 1.2
n_batch=64, # Added for optimization
low_vram=True, # Added to save memory
)
logger.info("Summarizer model loaded successfully.")
except Exception as e:
logger.error(f"Failed to load summarizer model: {e}", exc_info=True)
self.llm = None
def _generate_summary(self, text_to_summarize: str) -> str:
"""
Generates a summary from the given text. This is a blocking, CPU-intensive operation.
"""
if not self.llm:
return "Summarizer model is not available."
messages = [
{"role": "system", "content": "You are a broadcast summarizer. Your task is to provide a brief, neutral, and concise summary of the live radio transcript. Capture the main points. Do not add any preamble like 'Here is a summary'."},
{"role": "user", "content": f"Please summarize the following transcript:\n\n---\n{text_to_summarize}\n---"}
]
try:
output = self.llm.create_chat_completion(messages=messages, max_tokens=400)
summary = output['choices'][0]['message']['content'].strip()
return summary
except Exception as e:
logger.error(f"Error during summary generation: {e}")
return "Error generating summary."
async def run_summarization_loop(self):
"""
Main loop to listen for new text and trigger summarization.
"""
loop = asyncio.get_running_loop()
self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "loading"}})
await loop.run_in_executor(None, self._load_model)
if not self.llm:
logger.error("Cannot start summarization loop; model failed to load.")
self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "error", "message": "Model failed to load."}})
return
self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "ready"}})
logger.info(f"🎤 Summarizer task started for station: {self.station_name}")
while True:
try:
new_text, utterance_id = await self.text_queue.get()
self.transcript_buffer += f" {new_text}"
self.utterance_id_buffer.append(utterance_id)
logger.debug(f"[{self.station_name}] Transcript buffer is now {len(self.transcript_buffer)} chars.")
if len(self.transcript_buffer) > SUMMARY_TRIGGER_THRESHOLD and not self.summarization_in_progress:
self.summarization_in_progress = True
text_snapshot = self.transcript_buffer[-MAX_INPUT_CHARS:]
source_ids_snapshot = self.utterance_id_buffer[:]
logger.info(f"Triggering summary for {self.station_name} with {len(text_snapshot)} chars.")
self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "summarizing"}})
summary = await loop.run_in_executor(None, self._generate_summary, text_snapshot)
logger.info(f"Generated summary for {self.station_name}: {summary}")
timestamp_utc = datetime.now(timezone.utc).isoformat()
self.manager.broadcast_to_station(self.station_name, {
"type": "summary",
"payload": {
"summary": summary,
"station": self.station_name,
"timestamp": timestamp_utc,
"source_utterance_ids": source_ids_snapshot
}
})
self.transcript_buffer = self.transcript_buffer[-CONTEXT_KEEPALIVE_CHARS:]
keep_ratio = CONTEXT_KEEPALIVE_CHARS / len(text_snapshot) if text_snapshot else 0
num_ids_to_keep = int(len(source_ids_snapshot) * keep_ratio)
self.utterance_id_buffer = self.utterance_id_buffer[-num_ids_to_keep:]
self.manager.broadcast_to_station(self.station_name, {"type": "summary_state", "payload": {"status": "ready"}})
self.summarization_in_progress = False
else:
logger.debug(f"[{self.station_name}] Not enough text to summarize or summarization already in progress.")
self.text_queue.task_done()
except asyncio.CancelledError:
logger.info(f"🎤 Summarizer task for {self.station_name} cancelled.")
break
except Exception as e:
logger.error(f"💥 Summarizer task for {self.station_name} failed: {e}", exc_info=True)
self.summarization_in_progress = False
await asyncio.sleep(5) |