# extract_glossary_from_epub.py import os import json import argparse import zipfile import time import sys import tiktoken import threading import queue import ebooklib import re from ebooklib import epub from chapter_splitter import ChapterSplitter from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Tuple from unified_api_client import UnifiedClient, UnifiedClientError # Fix for PyInstaller - handle stdout reconfigure more carefully if sys.platform.startswith("win"): try: # Try to reconfigure if the method exists if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding="utf-8", errors="replace") except (AttributeError, ValueError): # If reconfigure doesn't work, try to set up UTF-8 another way import io import locale if sys.stdout and hasattr(sys.stdout, 'buffer'): sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') MODEL = os.getenv("MODEL", "gemini-2.0-flash") def interruptible_sleep(duration, check_stop_fn, interval=0.1): """Sleep that can be interrupted by stop request""" elapsed = 0 while elapsed < duration: if check_stop_fn and check_stop_fn(): # Add safety check for None return False # Interrupted sleep_time = min(interval, duration - elapsed) time.sleep(sleep_time) elapsed += sleep_time return True # Completed normally def cancel_all_futures(futures): """Cancel all pending futures immediately""" cancelled_count = 0 for future in futures: if not future.done() and future.cancel(): cancelled_count += 1 return cancelled_count def create_client_with_multi_key_support(api_key, model, output_dir, config): """Create a UnifiedClient with multi API key support if enabled""" # Check if multi API key mode is enabled use_multi_keys = config.get('use_multi_api_keys', False) # Set environment variables for UnifiedClient to pick up if use_multi_keys and 'multi_api_keys' in config and config['multi_api_keys']: print("๐Ÿ”‘ Multi API Key mode enabled for glossary extraction") # Set environment variables that UnifiedClient will read os.environ['USE_MULTI_API_KEYS'] = '1' os.environ['MULTI_API_KEYS'] = json.dumps(config['multi_api_keys']) os.environ['FORCE_KEY_ROTATION'] = '1' if config.get('force_key_rotation', True) else '0' os.environ['ROTATION_FREQUENCY'] = str(config.get('rotation_frequency', 1)) print(f" โ€ข Keys configured: {len(config['multi_api_keys'])}") print(f" โ€ข Force rotation: {config.get('force_key_rotation', True)}") print(f" โ€ข Rotation frequency: every {config.get('rotation_frequency', 1)} request(s)") else: # Ensure multi-key mode is disabled in environment os.environ['USE_MULTI_API_KEYS'] = '0' # Create UnifiedClient normally - it will check environment variables return UnifiedClient(api_key=api_key, model=model, output_dir=output_dir) def send_with_interrupt(messages, client, temperature, max_tokens, stop_check_fn, chunk_timeout=None): """Send API request with interrupt capability and optional timeout retry""" result_queue = queue.Queue() def api_call(): try: start_time = time.time() result = client.send(messages, temperature=temperature, max_tokens=max_tokens, context='glossary') elapsed = time.time() - start_time result_queue.put((result, elapsed)) except Exception as e: result_queue.put(e) api_thread = threading.Thread(target=api_call) api_thread.daemon = True api_thread.start() timeout = chunk_timeout if chunk_timeout is not None else 86400 check_interval = 0.1 elapsed = 0 while elapsed < timeout: try: # Check for results with shorter timeout result = result_queue.get(timeout=check_interval) if isinstance(result, Exception): raise result if isinstance(result, tuple): api_result, api_time = result if chunk_timeout and api_time > chunk_timeout: if hasattr(client, '_in_cleanup'): client._in_cleanup = True if hasattr(client, 'cancel_current_operation'): client.cancel_current_operation() raise UnifiedClientError(f"API call took {api_time:.1f}s (timeout: {chunk_timeout}s)") return api_result return result except queue.Empty: if stop_check_fn(): # More aggressive cancellation print("๐Ÿ›‘ Stop requested - cancelling API call immediately...") # Set cleanup flag if hasattr(client, '_in_cleanup'): client._in_cleanup = True # Try to cancel the operation if hasattr(client, 'cancel_current_operation'): client.cancel_current_operation() # Don't wait for the thread to finish - just raise immediately raise UnifiedClientError("Glossary extraction stopped by user") elapsed += check_interval # Timeout occurred if hasattr(client, '_in_cleanup'): client._in_cleanup = True if hasattr(client, 'cancel_current_operation'): client.cancel_current_operation() raise UnifiedClientError(f"API call timed out after {timeout} seconds") # Parse token limit from environment variable (same logic as translation) def parse_glossary_token_limit(): """Parse token limit from environment variable""" env_value = os.getenv("GLOSSARY_TOKEN_LIMIT", "1000000").strip() if not env_value or env_value == "": return None, "unlimited" if env_value.lower() == "unlimited": return None, "unlimited" if env_value.isdigit() and int(env_value) > 0: limit = int(env_value) return limit, str(limit) # Default fallback return 1000000, "1000000 (default)" MAX_GLOSSARY_TOKENS, GLOSSARY_LIMIT_STR = parse_glossary_token_limit() # Global stop flag for GUI integration _stop_requested = False def set_stop_flag(value): """Set the global stop flag""" global _stop_requested _stop_requested = value # When clearing the stop flag, also clear the multi-key environment variable if not value: os.environ['TRANSLATION_CANCELLED'] = '0' # Also clear UnifiedClient global flag try: import unified_api_client if hasattr(unified_api_client, 'UnifiedClient'): unified_api_client.UnifiedClient._global_cancelled = False except: pass def is_stop_requested(): """Check if stop was requested""" global _stop_requested return _stop_requested # โ”€โ”€โ”€ resilient tokenizer setup โ”€โ”€โ”€ try: enc = tiktoken.encoding_for_model(MODEL) except Exception: try: enc = tiktoken.get_encoding("cl100k_base") except Exception: enc = None def count_tokens(text: str) -> int: if enc: return len(enc.encode(text)) # crude fallback: assume ~1 token per 4 chars return max(1, len(text) // 4) from ebooklib import epub from bs4 import BeautifulSoup from unified_api_client import UnifiedClient from typing import List, Dict import re PROGRESS_FILE = "glossary_progress.json" def remove_honorifics(name): """Remove common honorifics from names""" if not name: return name # Check if honorifics filtering is disabled if os.getenv('GLOSSARY_DISABLE_HONORIFICS_FILTER', '0') == '1': return name.strip() # Modern Korean honorifics korean_honorifics = [ '๋‹˜', '์”จ', '์”จ๋Š”', '๊ตฐ', '์–‘', '์„ ์ƒ๋‹˜', '์„ ์ƒ', '์‚ฌ์žฅ๋‹˜', '์‚ฌ์žฅ', '๊ณผ์žฅ๋‹˜', '๊ณผ์žฅ', '๋Œ€๋ฆฌ๋‹˜', '๋Œ€๋ฆฌ', '์ฃผ์ž„๋‹˜', '์ฃผ์ž„', '์ด์‚ฌ๋‹˜', '์ด์‚ฌ', '๋ถ€์žฅ๋‹˜', '๋ถ€์žฅ', '์ฐจ์žฅ๋‹˜', '์ฐจ์žฅ', 'ํŒ€์žฅ๋‹˜', 'ํŒ€์žฅ', '์‹ค์žฅ๋‹˜', '์‹ค์žฅ', '๊ต์ˆ˜๋‹˜', '๊ต์ˆ˜', '๋ฐ•์‚ฌ๋‹˜', '๋ฐ•์‚ฌ', '์›์žฅ๋‹˜', '์›์žฅ', 'ํšŒ์žฅ๋‹˜', 'ํšŒ์žฅ', '์†Œ์žฅ๋‹˜', '์†Œ์žฅ', '์ „๋ฌด๋‹˜', '์ „๋ฌด', '์ƒ๋ฌด๋‹˜', '์ƒ๋ฌด', '์ด์‚ฌ์žฅ๋‹˜', '์ด์‚ฌ์žฅ' ] # Archaic/Historical Korean honorifics korean_archaic = [ '๊ณต', '์˜น', '์–ด๋ฅธ', '๋‚˜๋ฆฌ', '๋‚˜์œผ๋ฆฌ', '๋Œ€๊ฐ', '์˜๊ฐ', '๋งˆ๋‹˜', '๋งˆ๋งˆ', '๋Œ€๊ตฐ', '๊ตฐ', '์˜น์ฃผ', '๊ณต์ฃผ', '์™•์ž', '์„ธ์ž', '์˜์• ', '์˜์‹', '๋„๋ น', '๋‚ญ์ž', '๋‚ญ๊ตฐ', '์„œ๋ฐฉ', '์˜๊ฐ๋‹˜', '๋Œ€๊ฐ๋‹˜', '๋งˆ๋‹˜', '์•„์”จ', '๋„๋ จ๋‹˜', '์•„๊ฐ€์”จ', '๋‚˜์œผ๋ฆฌ', '์ง„์‚ฌ', '์ฒจ์ง€', '์˜์˜์ •', '์ขŒ์˜์ •', '์šฐ์˜์ •', 'ํŒ์„œ', '์ฐธํŒ', '์ •์Šน', '๋Œ€์›๊ตฐ' ] # Modern Japanese honorifics japanese_honorifics = [ 'ใ•ใ‚“', 'ใ•ใพ', 'ๆง˜', 'ใใ‚“', 'ๅ›', 'ใกใ‚ƒใ‚“', 'ใ›ใ‚“ใ›ใ„', 'ๅ…ˆ็”Ÿ', 'ใฉใฎ', 'ๆฎฟ', 'ใŸใ‚“', 'ใดใ‚‡ใ‚“', 'ใฝใ‚“', 'ใกใ‚“', 'ใ‚Šใ‚“', 'ใ›ใ‚“ใฑใ„', 'ๅ…ˆ่ผฉ', 'ใ“ใ†ใฏใ„', 'ๅพŒ่ผฉ', 'ใ—', 'ๆฐ', 'ใตใ˜ใ‚“', 'ๅคซไบบ', 'ใ‹ใกใ‚‡ใ†', '่ชฒ้•ท', 'ใถใกใ‚‡ใ†', '้ƒจ้•ท', 'ใ—ใ‚ƒใกใ‚‡ใ†', '็คพ้•ท' ] # Archaic/Historical Japanese honorifics japanese_archaic = [ 'ใฉใฎ', 'ๆฎฟ', 'ใŸใ„ใ‚†ใ†', 'ๅคงๅคซ', 'ใใฟ', 'ๅ…ฌ', 'ใ‚ใใ‚“', 'ๆœ่‡ฃ', 'ใŠใฟ', '่‡ฃ', 'ใ‚€ใ‚‰ใ˜', '้€ฃ', 'ใฟใ“ใจ', 'ๅ‘ฝ', 'ๅฐŠ', 'ใฒใ‚', 'ๅงซ', 'ใฟใ‚„', 'ๅฎฎ', 'ใŠใ†', '็Ž‹', 'ใ“ใ†', 'ไพฏ', 'ใฏใ', 'ไผฏ', 'ใ—', 'ๅญ', 'ใ ใ‚“', '็”ท', 'ใ˜ใ‚‡', 'ๅฅณ', 'ใฒใ“', 'ๅฝฆ', 'ใฒใ‚ใฟใ“', 'ๅงซๅพกๅญ', 'ใ™ใ‚ใ‚‰ใฟใ“ใจ', 'ๅคฉ็š‡', 'ใใ•ใ', 'ๅŽ', 'ใฟใ‹ใฉ', 'ๅธ' ] # Modern Chinese honorifics chinese_honorifics = [ 'ๅ…ˆ็”Ÿ', 'ๅฅณๅฃซ', 'ๅฐๅง', '่€ๅธˆ', 'ๅธˆๅ‚…', 'ๅคงไบบ', 'ๅ…ฌ', 'ๅ›', 'ๆ€ป', '่€ๆ€ป', '่€ๆฟ', '็ป็†', 'ไธปไปป', 'ๅค„้•ฟ', '็ง‘้•ฟ', '่‚ก้•ฟ', 'ๆ•™ๆŽˆ', 'ๅšๅฃซ', '้™ข้•ฟ', 'ๆ ก้•ฟ', 'ๅŒๅฟ—', 'ๅธˆๅ…„', 'ๅธˆๅง', 'ๅธˆๅผŸ', 'ๅธˆๅฆน', 'ๅญฆ้•ฟ', 'ๅญฆๅง', 'ๅ‰่พˆ', '้˜ไธ‹' ] # Archaic/Historical Chinese honorifics chinese_archaic = [ 'ๅ…ฌ', 'ไพฏ', 'ไผฏ', 'ๅญ', '็”ท', '็Ž‹', 'ๅ›', 'ๅฟ', 'ๅคงๅคซ', 'ๅฃซ', '้™›ไธ‹', 'ๆฎฟไธ‹', '้˜ไธ‹', '็ˆท', '่€็ˆท', 'ๅคงไบบ', 'ๅคซไบบ', 'ๅจ˜ๅจ˜', 'ๅ…ฌๅญ', 'ๅ…ฌไธป', '้ƒกไธป', 'ไธ–ๅญ', 'ๅคชๅญ', '็š‡ไธŠ', '็š‡ๅŽ', '่ดตๅฆƒ', 'ๅจ˜ๅญ', '็›ธๅ…ฌ', 'ๅฎ˜ไบบ', '้ƒŽๅ›', 'ๅฐๅง', 'ๅง‘ๅจ˜', 'ๅ…ฌๅ…ฌ', 'ๅฌทๅฌท', 'ๅคงไพ ', 'ๅฐ‘ไพ ', 'ๅ‰่พˆ', 'ๆ™š่พˆ', 'ๅœจไธ‹', '่ถณไธ‹', 'ๅ…„ๅฐ', 'ไปๅ…„', '่ดคๅผŸ', '่€ๅคซ', '่€ๆœฝ', 'ๆœฌๅบง', 'ๆœฌๅฐŠ', '็œŸไบบ', 'ไธŠไบบ', 'ๅฐŠ่€…' ] # Combine all honorifics all_honorifics = ( korean_honorifics + korean_archaic + japanese_honorifics + japanese_archaic + chinese_honorifics + chinese_archaic ) # Remove honorifics from the end of the name name_cleaned = name.strip() # Sort by length (longest first) to avoid partial matches sorted_honorifics = sorted(all_honorifics, key=len, reverse=True) for honorific in sorted_honorifics: if name_cleaned.endswith(honorific): name_cleaned = name_cleaned[:-len(honorific)].strip() # Only remove one honorific per pass break return name_cleaned def set_output_redirect(log_callback=None): """Redirect print statements to a callback function for GUI integration""" if log_callback: import sys import io class CallbackWriter: def __init__(self, callback): self.callback = callback self.buffer = "" def write(self, text): if text.strip(): self.callback(text.strip()) def flush(self): pass sys.stdout = CallbackWriter(log_callback) def load_config(path: str) -> Dict: with open(path, 'r', encoding='utf-8') as f: cfg = json.load(f) # override context_limit_chapters if GUI passed GLOSSARY_CONTEXT_LIMIT env_limit = os.getenv("GLOSSARY_CONTEXT_LIMIT") if env_limit is not None: try: cfg['context_limit_chapters'] = int(env_limit) except ValueError: pass # keep existing config value on parse error # override temperature if GUI passed GLOSSARY_TEMPERATURE env_temp = os.getenv("GLOSSARY_TEMPERATURE") if env_temp is not None: try: cfg['temperature'] = float(env_temp) except ValueError: pass # keep existing config value on parse error return cfg def get_custom_entry_types(): """Get custom entry types configuration from environment""" try: types_json = os.getenv('GLOSSARY_CUSTOM_ENTRY_TYPES', '{}') result = json.loads(types_json) # If empty, return defaults if not result: return { 'character': {'enabled': True, 'has_gender': True}, 'term': {'enabled': True, 'has_gender': False} } return result except: # Default configuration return { 'character': {'enabled': True, 'has_gender': True}, 'term': {'enabled': True, 'has_gender': False} } def save_glossary_json(glossary: List[Dict], output_path: str): """Save glossary in the new simple format with automatic sorting by type""" # Get custom types for sorting order custom_types = get_custom_entry_types() # Create sorting order: character=0, term=1, others alphabetically starting from 2 type_order = {'character': 0, 'term': 1} other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) for i, t in enumerate(other_types): type_order[t] = i + 2 # Sort glossary by type order, then by raw_name sorted_glossary = sorted(glossary, key=lambda x: ( type_order.get(x.get('type', 'term'), 999), # Unknown types go last x.get('raw_name', '').lower() )) with open(output_path, 'w', encoding='utf-8') as f: json.dump(sorted_glossary, f, ensure_ascii=False, indent=2) def save_glossary_csv(glossary: List[Dict], output_path: str): """Save glossary in CSV or token-efficient format based on environment variable""" import csv csv_path = output_path.replace('.json', '.csv') # Get custom types for sorting order and gender info custom_types = get_custom_entry_types() # Create sorting order type_order = {'character': 0, 'term': 1} other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) for i, t in enumerate(other_types): type_order[t] = i + 2 # Sort glossary sorted_glossary = sorted(glossary, key=lambda x: ( type_order.get(x.get('type', 'term'), 999), x.get('raw_name', '').lower() )) # Check if we should use legacy CSV format use_legacy_format = os.getenv('GLOSSARY_USE_LEGACY_CSV', '0') == '1' if use_legacy_format: # LEGACY CSV FORMAT with open(csv_path, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) # Build header row header = ['type', 'raw_name', 'translated_name', 'gender'] # Add any custom fields to header custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') try: custom_fields = json.loads(custom_fields_json) header.extend(custom_fields) except: custom_fields = [] # Write header row writer.writerow(header) # Write data rows for entry in sorted_glossary: entry_type = entry.get('type', 'term') type_config = custom_types.get(entry_type, {}) # Base row: type, raw_name, translated_name row = [entry_type, entry.get('raw_name', ''), entry.get('translated_name', '')] # Add gender only if type supports it if type_config.get('has_gender', False): row.append(entry.get('gender', '')) # Add custom field values for field in custom_fields: row.append(entry.get(field, '')) # Count how many fields we SHOULD have expected_fields = 4 + len(custom_fields) # type, raw_name, translated_name, gender + custom fields # Only trim if we have MORE than expected (extra trailing empties) while len(row) > expected_fields and row[-1] == '': row.pop() # Ensure minimum required fields (type, raw_name, translated_name) while len(row) < 3: row.append('') # Write row writer.writerow(row) print(f"โœ… Saved legacy CSV format: {csv_path}") else: # NEW TOKEN-EFFICIENT FORMAT (DEFAULT) # Group entries by type grouped_entries = {} for entry in sorted_glossary: entry_type = entry.get('type', 'term') if entry_type not in grouped_entries: grouped_entries[entry_type] = [] grouped_entries[entry_type].append(entry) # Get custom fields configuration custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') try: custom_fields = json.loads(custom_fields_json) except: custom_fields = [] # Write as plain text format for token efficiency with open(csv_path, 'w', encoding='utf-8') as f: # Write header f.write("Glossary: Characters, Terms, and Important Elements\n\n") # Process each type group for entry_type in sorted(grouped_entries.keys(), key=lambda x: type_order.get(x, 999)): entries = grouped_entries[entry_type] type_config = custom_types.get(entry_type, {}) # Write section header section_name = entry_type.upper() + 'S' if not entry_type.upper().endswith('S') else entry_type.upper() f.write(f"=== {section_name} ===\n") # Write entries for this type with indentation for entry in entries: # Build the entry line raw_name = entry.get('raw_name', '') translated_name = entry.get('translated_name', '') # Start with asterisk and name line = f"* {translated_name} ({raw_name})" # Add gender if applicable and not Unknown if type_config.get('has_gender', False): gender = entry.get('gender', '') if gender and gender != 'Unknown': line += f" [{gender}]" # Add custom field values if they exist custom_field_parts = [] for field in custom_fields: value = entry.get(field, '').strip() if value: # For description fields, add as continuation if field.lower() in ['description', 'notes', 'details']: line += f": {value}" else: custom_field_parts.append(f"{field}: {value}") # Add other custom fields in parentheses if custom_field_parts: line += f" ({', '.join(custom_field_parts)})" # Write the line f.write(line + "\n") # Add blank line between sections f.write("\n") print(f"โœ… Saved token-efficient glossary: {csv_path}") # Print summary for both formats type_counts = {} for entry_type in grouped_entries: type_counts[entry_type] = len(grouped_entries[entry_type]) total = sum(type_counts.values()) print(f" Total entries: {total}") for entry_type, count in type_counts.items(): print(f" - {entry_type}: {count} entries") def extract_chapters_from_epub(epub_path: str) -> List[str]: chapters = [] items = [] # Add this helper function def is_html_document(item): """Check if an EPUB item is an HTML document""" if hasattr(item, 'media_type'): return item.media_type in [ 'application/xhtml+xml', 'text/html', 'application/html+xml', 'text/xml' ] # Fallback for items that don't have media_type if hasattr(item, 'get_name'): name = item.get_name() return name.lower().endswith(('.html', '.xhtml', '.htm')) return False try: # Add stop check before reading if is_stop_requested(): return [] book = epub.read_epub(epub_path) # Replace the problematic line with media type checking items = [item for item in book.get_items() if is_html_document(item)] except Exception as e: print(f"[Warning] Manifest load failed, falling back to raw EPUB scan: {e}") try: with zipfile.ZipFile(epub_path, 'r') as zf: names = [n for n in zf.namelist() if n.lower().endswith(('.html', '.xhtml'))] for name in names: # Add stop check in loop if is_stop_requested(): return chapters try: data = zf.read(name) items.append(type('X', (), { 'get_content': lambda self, data=data: data, 'get_name': lambda self, name=name: name, 'media_type': 'text/html' # Add media_type for consistency })()) except Exception: print(f"[Warning] Could not read zip file entry: {name}") except Exception as ze: print(f"[Fatal] Cannot open EPUB as zip: {ze}") return chapters for item in items: # Add stop check before processing each chapter if is_stop_requested(): return chapters try: raw = item.get_content() soup = BeautifulSoup(raw, 'html.parser') text = soup.get_text("\n", strip=True) if text: chapters.append(text) except Exception as e: name = item.get_name() if hasattr(item, 'get_name') else repr(item) print(f"[Warning] Skipped corrupted chapter {name}: {e}") return chapters def trim_context_history(history: List[Dict], limit: int, rolling_window: bool = False) -> List[Dict]: """ Handle context history with either reset or rolling window mode Args: history: List of conversation history limit: Maximum number of exchanges to keep rolling_window: Whether to use rolling window mode """ # Count current exchanges current_exchanges = len(history) # Handle based on mode if limit > 0 and current_exchanges >= limit: if rolling_window: # Rolling window: keep the most recent exchanges print(f"๐Ÿ”„ Rolling glossary context window: keeping last {limit} chapters") # Keep only the most recent exchanges history = history[-(limit-1):] if limit > 1 else [] else: # Reset mode (original behavior) print(f"๐Ÿ”„ Reset glossary context after {limit} chapters") return [] # Return empty to reset context # Convert to message format trimmed = [] for entry in history: trimmed.append({"role": "user", "content": entry["user"]}) trimmed.append({"role": "assistant", "content": entry["assistant"]}) return trimmed def load_progress() -> Dict: if os.path.exists(PROGRESS_FILE): with open(PROGRESS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {"completed": [], "glossary": [], "context_history": []} def parse_api_response(response_text: str) -> List[Dict]: """Parse API response to extract glossary entries - handles custom types""" entries = [] # Get enabled types from custom configuration custom_types = get_custom_entry_types() enabled_types = [t for t, cfg in custom_types.items() if cfg.get('enabled', True)] # First try JSON parsing try: # Clean up response text cleaned_text = response_text.strip() # Remove markdown code blocks if present if '```json' in cleaned_text or '```' in cleaned_text: import re code_block_match = re.search(r'```(?:json)?\s*(.*?)\s*```', cleaned_text, re.DOTALL) if code_block_match: cleaned_text = code_block_match.group(1) # Try to find JSON array or object import re json_match = re.search(r'[\[\{].*[\]\}]', cleaned_text, re.DOTALL) if json_match: json_str = json_match.group(0) data = json.loads(json_str) if isinstance(data, list): for item in data: if isinstance(item, dict): # Check if entry type is enabled entry_type = item.get('type', '').lower() # Handle legacy format where type is the key if not entry_type: for type_name in enabled_types: if type_name in item: entry_type = type_name fixed_entry = { 'type': type_name, 'raw_name': item.get(type_name, ''), 'translated_name': item.get('translated_name', '') } # Add gender if type supports it if custom_types.get(type_name, {}).get('has_gender', False): fixed_entry['gender'] = item.get('gender', 'Unknown') # Copy other fields for k, v in item.items(): if k not in [type_name, 'translated_name', 'gender', 'type', 'raw_name']: fixed_entry[k] = v entries.append(fixed_entry) break else: # Standard format with type field if entry_type in enabled_types: entries.append(item) return entries elif isinstance(data, dict): # Handle single entry entry_type = data.get('type', '').lower() if entry_type in enabled_types: return [data] # Check for wrapper for key in ['entries', 'glossary', 'characters', 'terms', 'data']: if key in data and isinstance(data[key], list): return parse_api_response(json.dumps(data[key])) return [] except (json.JSONDecodeError, AttributeError) as e: print(f"[Debug] JSON parsing failed: {e}") pass # CSV-like format parsing lines = response_text.strip().split('\n') for line in lines: line = line.strip() if not line or line.startswith('#'): continue # Skip header lines if 'type' in line.lower() and 'raw_name' in line.lower(): continue # Parse CSV parts = [] current_part = [] in_quotes = False for char in line + ',': if char == '"': in_quotes = not in_quotes elif char == ',' and not in_quotes: parts.append(''.join(current_part).strip()) current_part = [] else: current_part.append(char) if parts and parts[-1] == '': parts = parts[:-1] if len(parts) >= 3: entry_type = parts[0].lower() # Check if type is enabled if entry_type not in enabled_types: continue entry = { 'type': entry_type, 'raw_name': parts[1], 'translated_name': parts[2] } # Add gender if type supports it and it's provided type_config = custom_types.get(entry_type, {}) if type_config.get('has_gender', False) and len(parts) > 3 and parts[3]: entry['gender'] = parts[3] elif type_config.get('has_gender', False): entry['gender'] = 'Unknown' # Add any custom fields custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') try: custom_fields = json.loads(custom_fields_json) start_idx = 4 # Always 4, not conditional for i, field in enumerate(custom_fields): if len(parts) > start_idx + i: field_value = parts[start_idx + i] if field_value: # Only add if not empty entry[field] = field_value except: pass entries.append(entry) return entries def validate_extracted_entry(entry): """Validate that extracted entry has required fields and enabled type""" if 'type' not in entry: return False # Check if type is enabled custom_types = get_custom_entry_types() entry_type = entry.get('type', '').lower() if entry_type not in custom_types: return False if not custom_types[entry_type].get('enabled', True): return False # Must have raw_name and translated_name if 'raw_name' not in entry or not entry['raw_name']: return False if 'translated_name' not in entry or not entry['translated_name']: return False return True def build_prompt(chapter_text: str) -> tuple: """Build the extraction prompt with custom types - returns (system_prompt, user_prompt)""" custom_prompt = os.getenv('GLOSSARY_SYSTEM_PROMPT', '').strip() if not custom_prompt: # If no custom prompt, create a default custom_prompt = """Extract all character names and important terms from the text. {fields} Only include entries that appear in the text. Return the data in the exact format specified above.""" # Check if the prompt contains {fields} placeholder if '{fields}' in custom_prompt: # Get enabled types custom_types = get_custom_entry_types() enabled_types = [(t, cfg) for t, cfg in custom_types.items() if cfg.get('enabled', True)] # Get custom fields custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') try: custom_fields = json.loads(custom_fields_json) except: custom_fields = [] # Build fields specification based on what the prompt expects # Check if the prompt mentions CSV or JSON to determine format if 'CSV' in custom_prompt.upper(): # CSV format fields_spec = [] # Show the header format header_parts = ['type', 'raw_name', 'translated_name', 'gender'] if custom_fields: header_parts.extend(custom_fields) fields_spec.append(','.join(header_parts)) # Show examples for each type for type_name, type_config in enabled_types: example_parts = [type_name, '', ''] # Add gender field if type_config.get('has_gender', False): example_parts.append('') else: example_parts.append('') # Empty for non-character types # Add custom field placeholders for field in custom_fields: example_parts.append(f'<{field} value>') fields_spec.append(','.join(example_parts)) fields_str = '\n'.join(fields_spec) else: # JSON format (default) fields_spec = [] fields_spec.append("Extract entities and return as a JSON array.") fields_spec.append("Each entry must be a JSON object with these exact fields:") fields_spec.append("") for type_name, type_config in enabled_types: fields_spec.append(f"For {type_name}s:") fields_spec.append(f' "type": "{type_name}" (required)') fields_spec.append(' "raw_name": the name in original language/script (required)') fields_spec.append(' "translated_name": English translation or romanization (required)') if type_config.get('has_gender', False): fields_spec.append(' "gender": "Male", "Female", or "Unknown" (required for characters)') fields_spec.append("") # Add custom fields info if custom_fields: fields_spec.append("Additional custom fields to include:") for field in custom_fields: fields_spec.append(f' "{field}": appropriate value') fields_spec.append("") # Add example if enabled_types: fields_spec.append("Example output format:") fields_spec.append('[') examples = [] if 'character' in [t[0] for t in enabled_types]: example = ' {"type": "character", "raw_name": "็”ฐไธญๅคช้ƒŽ", "translated_name": "Tanaka Taro", "gender": "Male"' for field in custom_fields: example += f', "{field}": "example value"' example += '}' examples.append(example) if 'term' in [t[0] for t in enabled_types]: example = ' {"type": "term", "raw_name": "ๆฑไบฌ้ง…", "translated_name": "Tokyo Station"' for field in custom_fields: example += f', "{field}": "example value"' example += '}' examples.append(example) fields_spec.append(',\n'.join(examples)) fields_spec.append(']') fields_str = '\n'.join(fields_spec) # Replace {fields} placeholder system_prompt = custom_prompt.replace('{fields}', fields_str) else: # No {fields} placeholder - use the prompt as-is system_prompt = custom_prompt # Remove any {chapter_text} placeholders from system prompt system_prompt = system_prompt.replace('{chapter_text}', '') system_prompt = system_prompt.replace('{{chapter_text}}', '') system_prompt = system_prompt.replace('{text}', '') system_prompt = system_prompt.replace('{{text}}', '') # Strip any trailing "Text:" or similar system_prompt = system_prompt.rstrip() if system_prompt.endswith('Text:'): system_prompt = system_prompt[:-5].rstrip() # User prompt is just the chapter text user_prompt = chapter_text return (system_prompt, user_prompt) def skip_duplicate_entries(glossary): """ Skip entries with duplicate raw names using fuzzy matching. Returns deduplicated list maintaining first occurrence of each unique raw name. """ import difflib # Get fuzzy threshold from environment fuzzy_threshold = float(os.getenv('GLOSSARY_FUZZY_THRESHOLD', '0.9')) seen_raw_names = [] # List of (cleaned_name, original_entry) tuples deduplicated = [] skipped_count = 0 for entry in glossary: # Get raw_name and clean it raw_name = entry.get('raw_name', '') if not raw_name: continue # Remove honorifics for comparison (unless disabled) cleaned_name = remove_honorifics(raw_name) # Check for fuzzy matches with seen names is_duplicate = False for seen_clean, seen_original in seen_raw_names: similarity = difflib.SequenceMatcher(None, cleaned_name.lower(), seen_clean.lower()).ratio() if similarity >= fuzzy_threshold: skipped_count += 1 print(f"[Skip] Duplicate entry: {raw_name} (cleaned: {cleaned_name}) - {similarity*100:.1f}% match with {seen_original}") is_duplicate = True break if not is_duplicate: # Add to seen list and keep the entry seen_raw_names.append((cleaned_name, entry.get('raw_name', ''))) deduplicated.append(entry) if skipped_count > 0: print(f"โญ๏ธ Skipped {skipped_count} duplicate entries (threshold: {fuzzy_threshold:.2f})") print(f"โœ… Kept {len(deduplicated)} unique entries") return deduplicated # Batch processing functions def process_chapter_batch(chapters_batch: List[Tuple[int, str]], client: UnifiedClient, config: Dict, contextual_enabled: bool, history: List[Dict], ctx_limit: int, rolling_window: bool, check_stop, chunk_timeout: int = None) -> List[Dict]: """ Process a batch of chapters in parallel with improved interrupt support """ temp = float(os.getenv("GLOSSARY_TEMPERATURE") or config.get('temperature', 0.1)) env_max_output = os.getenv("MAX_OUTPUT_TOKENS") if env_max_output and env_max_output.isdigit(): mtoks = int(env_max_output) else: mtoks = config.get('max_tokens', 4196) results = [] with ThreadPoolExecutor(max_workers=len(chapters_batch)) as executor: futures = {} for idx, chap in chapters_batch: if check_stop(): break # Get system and user prompts system_prompt, user_prompt = build_prompt(chap) # Build messages correctly with system and user prompts if not contextual_enabled: msgs = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] else: msgs = [{"role": "system", "content": system_prompt}] \ + trim_context_history(history, ctx_limit, rolling_window) \ + [{"role": "user", "content": user_prompt}] # Submit to thread pool future = executor.submit( process_single_chapter_api_call, idx, chap, msgs, client, temp, mtoks, check_stop, chunk_timeout ) futures[future] = (idx, chap) # Process results with better cancellation for future in as_completed(futures): # Removed timeout - let futures complete if check_stop(): print("๐Ÿ›‘ Stop detected - cancelling all pending operations...") # Cancel all pending futures immediately cancelled = cancel_all_futures(list(futures.keys())) if cancelled > 0: print(f"โœ… Cancelled {cancelled} pending API calls") # Shutdown executor immediately executor.shutdown(wait=False) break idx, chap = futures[future] try: result = future.result(timeout=0.5) # Short timeout on result retrieval # Ensure chap is added to result here if not already present if 'chap' not in result: result['chap'] = chap results.append(result) except Exception as e: if "stopped by user" in str(e).lower(): print(f"โœ… Chapter {idx+1} stopped by user") else: print(f"Error processing chapter {idx+1}: {e}") results.append({ 'idx': idx, 'data': [], 'resp': "", 'chap': chap, 'error': str(e) }) # Sort results by chapter index results.sort(key=lambda x: x['idx']) return results def process_single_chapter_api_call(idx: int, chap: str, msgs: List[Dict], client: UnifiedClient, temp: float, mtoks: int, stop_check_fn, chunk_timeout: int = None) -> Dict: """Process a single chapter API call with thread-safe payload handling""" # APPLY INTERRUPTIBLE THREADING DELAY FIRST thread_delay = float(os.getenv("THREAD_SUBMISSION_DELAY_SECONDS", "0.5")) if thread_delay > 0: # Check if we need to wait (same logic as unified_api_client) if hasattr(client, '_thread_submission_lock') and hasattr(client, '_last_thread_submission_time'): with client._thread_submission_lock: current_time = time.time() time_since_last = current_time - client._last_thread_submission_time if time_since_last < thread_delay: sleep_time = thread_delay - time_since_last thread_name = threading.current_thread().name # PRINT BEFORE THE DELAY STARTS print(f"๐Ÿงต [{thread_name}] Applying thread delay: {sleep_time:.1f}s for Chapter {idx+1}") # Interruptible sleep - check stop flag every 0.1 seconds elapsed = 0 check_interval = 0.1 while elapsed < sleep_time: if stop_check_fn(): print(f"๐Ÿ›‘ Threading delay interrupted by stop flag") raise UnifiedClientError("Glossary extraction stopped by user during threading delay") sleep_chunk = min(check_interval, sleep_time - elapsed) time.sleep(sleep_chunk) elapsed += sleep_chunk client._last_thread_submission_time = time.time() if not hasattr(client, '_thread_submission_count'): client._thread_submission_count = 0 client._thread_submission_count += 1 start_time = time.time() print(f"[BATCH] Starting API call for Chapter {idx+1} at {time.strftime('%H:%M:%S')}") # Thread-safe payload directory thread_name = threading.current_thread().name thread_id = threading.current_thread().ident thread_dir = os.path.join("Payloads", "glossary", f"{thread_name}_{thread_id}") os.makedirs(thread_dir, exist_ok=True) try: # Save request payload before API call payload_file = os.path.join(thread_dir, f"chapter_{idx+1}_request.json") with open(payload_file, 'w', encoding='utf-8') as f: json.dump({ 'chapter': idx + 1, 'messages': msgs, 'temperature': temp, 'max_tokens': mtoks, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') }, f, indent=2, ensure_ascii=False) # Use send_with_interrupt for API call raw = send_with_interrupt( messages=msgs, client=client, temperature=temp, max_tokens=mtoks, stop_check_fn=stop_check_fn, chunk_timeout=chunk_timeout ) # Handle the response - it might be a tuple or a string if raw is None: print(f"โš ๏ธ API returned None for chapter {idx+1}") return { 'idx': idx, 'data': [], 'resp': "", 'chap': chap, 'error': "API returned None" } if isinstance(raw, tuple): resp = raw[0] if raw[0] is not None else "" elif isinstance(raw, str): resp = raw elif hasattr(raw, 'content'): resp = raw.content if raw.content is not None else "" elif hasattr(raw, 'text'): resp = raw.text if raw.text is not None else "" else: resp = str(raw) if raw is not None else "" # Ensure resp is never None if resp is None: resp = "" # Save the raw response in thread-safe location response_file = os.path.join(thread_dir, f"chapter_{idx+1}_response.txt") with open(response_file, "w", encoding="utf-8", errors="replace") as f: f.write(resp) # Parse response using the new parser data = parse_api_response(resp) # More detailed debug logging print(f"[BATCH] Chapter {idx+1} - Raw response length: {len(resp)} chars") print(f"[BATCH] Chapter {idx+1} - Parsed {len(data)} entries before validation") # Filter out invalid entries valid_data = [] for entry in data: if validate_extracted_entry(entry): # Clean the raw_name if 'raw_name' in entry: entry['raw_name'] = entry['raw_name'].strip() valid_data.append(entry) else: print(f"[BATCH] Chapter {idx+1} - Invalid entry: {entry}") elapsed = time.time() - start_time print(f"[BATCH] Completed Chapter {idx+1} in {elapsed:.1f}s at {time.strftime('%H:%M:%S')} - Extracted {len(valid_data)} valid entries") return { 'idx': idx, 'data': valid_data, 'resp': resp, 'chap': chap, # Include the chapter text in the result 'error': None } except UnifiedClientError as e: print(f"[Error] API call interrupted/failed for chapter {idx+1}: {e}") return { 'idx': idx, 'data': [], 'resp': "", 'chap': chap, # Include chapter even on error 'error': str(e) } except Exception as e: print(f"[Error] Unexpected error for chapter {idx+1}: {e}") import traceback print(f"[Error] Traceback: {traceback.format_exc()}") return { 'idx': idx, 'data': [], 'resp': "", 'chap': chap, # Include chapter even on error 'error': str(e) } # Update main function to support batch processing: def main(log_callback=None, stop_callback=None): """Modified main function that can accept a logging callback and stop callback""" if log_callback: set_output_redirect(log_callback) # Set up stop checking def check_stop(): if stop_callback and stop_callback(): print("โŒ Glossary extraction stopped by user request.") return True return is_stop_requested() start = time.time() # Handle both command line and GUI calls if '--epub' in sys.argv: # Command line mode parser = argparse.ArgumentParser(description='Extract glossary from EPUB/TXT') parser.add_argument('--epub', required=True, help='Path to EPUB/TXT file') parser.add_argument('--output', required=True, help='Output glossary path') parser.add_argument('--config', help='Config file path') args = parser.parse_args() epub_path = args.epub else: # GUI mode - get from environment epub_path = os.getenv("EPUB_PATH", "") if not epub_path and len(sys.argv) > 1: epub_path = sys.argv[1] # Create args object for GUI mode import types args = types.SimpleNamespace() args.epub = epub_path args.output = os.getenv("OUTPUT_PATH", "glossary.json") args.config = os.getenv("CONFIG_PATH", "config.json") is_text_file = epub_path.lower().endswith('.txt') if is_text_file: # Import text processor from extract_glossary_from_txt import extract_chapters_from_txt chapters = extract_chapters_from_txt(epub_path) file_base = os.path.splitext(os.path.basename(epub_path))[0] else: # Existing EPUB code chapters = extract_chapters_from_epub(epub_path) epub_base = os.path.splitext(os.path.basename(epub_path))[0] file_base = epub_base # If user didn't override --output, derive it from the EPUB filename: if args.output == 'glossary.json': args.output = f"{file_base}_glossary.json" # ensure we have a Glossary subfolder next to the JSON/MD outputs glossary_dir = os.path.join(os.path.dirname(args.output), "Glossary") os.makedirs(glossary_dir, exist_ok=True) # override the moduleโ€level PROGRESS_FILE to include epub name global PROGRESS_FILE PROGRESS_FILE = os.path.join( glossary_dir, f"{file_base}_glossary_progress.json" ) config = load_config(args.config) # Get API key from environment variables (set by GUI) or config file api_key = (os.getenv("API_KEY") or os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_OR_Gemini_API_KEY") or os.getenv("GEMINI_API_KEY") or config.get('api_key')) # Get model from environment or config model = os.getenv("MODEL") or config.get('model', 'gemini-1.5-flash') # Define output directory (use current directory as default) out = os.path.dirname(args.output) if hasattr(args, 'output') else os.getcwd() # Use the variables we just retrieved client = create_client_with_multi_key_support(api_key, model, out, config) # Check for batch mode batch_enabled = os.getenv("BATCH_TRANSLATION", "0") == "1" batch_size = int(os.getenv("BATCH_SIZE", "5")) conservative_batching = os.getenv("CONSERVATIVE_BATCHING", "0") == "1" print(f"[DEBUG] BATCH_TRANSLATION = {os.getenv('BATCH_TRANSLATION')} (enabled: {batch_enabled})") print(f"[DEBUG] BATCH_SIZE = {batch_size}") print(f"[DEBUG] CONSERVATIVE_BATCHING = {os.getenv('CONSERVATIVE_BATCHING')} (enabled: {conservative_batching})") if batch_enabled: print(f"๐Ÿš€ Glossary batch mode enabled with size: {batch_size}") print(f"๐Ÿ“‘ Note: Glossary extraction uses direct batching (not affected by conservative batching setting)") #API call delay api_delay = float(os.getenv("SEND_INTERVAL_SECONDS", "2")) print(f"โฑ๏ธ API call delay: {api_delay} seconds") # Get compression factor from environment compression_factor = float(os.getenv("COMPRESSION_FACTOR", "1.0")) print(f"๐Ÿ“ Compression Factor: {compression_factor}") # Initialize chapter splitter with compression factor chapter_splitter = ChapterSplitter(model_name=model, compression_factor=compression_factor) # Get temperature from environment or config temp = float(os.getenv("GLOSSARY_TEMPERATURE") or config.get('temperature', 0.1)) env_max_output = os.getenv("MAX_OUTPUT_TOKENS") if env_max_output and env_max_output.isdigit(): mtoks = int(env_max_output) print(f"[DEBUG] Output Token Limit: {mtoks} (from GUI)") else: mtoks = config.get('max_tokens', 4196) print(f"[DEBUG] Output Token Limit: {mtoks} (from config)") # Get context limit from environment or config ctx_limit = int(os.getenv("GLOSSARY_CONTEXT_LIMIT") or config.get('context_limit_chapters', 3)) # Parse chapter range from environment chapter_range = os.getenv("CHAPTER_RANGE", "").strip() range_start = None range_end = None if chapter_range and re.match(r"^\d+\s*-\s*\d+$", chapter_range): range_start, range_end = map(int, chapter_range.split("-", 1)) print(f"๐Ÿ“Š Chapter Range Filter: {range_start} to {range_end}") elif chapter_range: print(f"โš ๏ธ Invalid chapter range format: {chapter_range} (use format: 5-10)") # Log settings format_parts = ["type", "raw_name", "translated_name", "gender"] custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') try: custom_fields = json.loads(custom_fields_json) if custom_fields: format_parts.extend(custom_fields) except: pass print(f"๐Ÿ“‘ Glossary Format: Simple ({', '.join(format_parts)})") # Check honorifics filter toggle honorifics_disabled = os.getenv('GLOSSARY_DISABLE_HONORIFICS_FILTER', '0') == '1' if honorifics_disabled: print("๐Ÿ“‘ Honorifics Filtering: โŒ DISABLED") else: print("๐Ÿ“‘ Honorifics Filtering: โœ… ENABLED") # Log custom fields custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') try: custom_fields = json.loads(custom_fields_json) if custom_fields: print(f"๐Ÿ“‘ Custom Fields: {', '.join(custom_fields)}") except: pass # Check if custom prompt is being used if os.getenv('GLOSSARY_SYSTEM_PROMPT'): print("๐Ÿ“‘ Using custom extraction prompt") else: print("๐Ÿ“‘ Using default extraction prompt") if is_text_file: from extract_glossary_from_txt import extract_chapters_from_txt chapters = extract_chapters_from_txt(args.epub) else: chapters = extract_chapters_from_epub(args.epub) if not chapters: print("No chapters found. Exiting.") return # Check for stop before starting processing if check_stop(): return prog = load_progress() completed = prog['completed'] glossary = prog['glossary'] history = prog['context_history'] total_chapters = len(chapters) # Get both settings contextual_enabled = os.getenv('CONTEXTUAL', '1') == '1' rolling_window = os.getenv('GLOSSARY_HISTORY_ROLLING', '0') == '1' # Count chapters that will be processed with range filter chapters_to_process = [] for idx, chap in enumerate(chapters): # Skip if chapter is outside the range if range_start is not None and range_end is not None: chapter_num = idx + 1 # 1-based chapter numbering if not (range_start <= chapter_num <= range_end): continue if idx not in completed: chapters_to_process.append((idx, chap)) if len(chapters_to_process) < total_chapters: print(f"๐Ÿ“Š Processing {len(chapters_to_process)} out of {total_chapters} chapters") # Get chunk timeout from environment chunk_timeout = int(os.getenv("CHUNK_TIMEOUT", "900")) # 15 minutes default # Process chapters based on mode if batch_enabled and len(chapters_to_process) > 0: # BATCH MODE: Process in batches with per-entry saving total_batches = (len(chapters_to_process) + batch_size - 1) // batch_size for batch_num in range(total_batches): # Check for stop at the beginning of each batch if check_stop(): print(f"โŒ Glossary extraction stopped at batch {batch_num+1}") # Apply deduplication before stopping if glossary: print("๐Ÿ”€ Applying deduplication and sorting before exit...") glossary[:] = skip_duplicate_entries(glossary) # Sort glossary custom_types = get_custom_entry_types() type_order = {'character': 0, 'term': 1} other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) for i, t in enumerate(other_types): type_order[t] = i + 2 glossary.sort(key=lambda x: ( type_order.get(x.get('type', 'term'), 999), x.get('raw_name', '').lower() )) save_progress(completed, glossary, history) save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) print(f"โœ… Saved {len(glossary)} deduplicated entries before exit") return # Get current batch batch_start = batch_num * batch_size batch_end = min(batch_start + batch_size, len(chapters_to_process)) current_batch = chapters_to_process[batch_start:batch_end] print(f"\n๐Ÿ”„ Processing Batch {batch_num+1}/{total_batches} (Chapters: {[idx+1 for idx, _ in current_batch]})") print(f"[BATCH] Submitting {len(current_batch)} chapters for parallel processing...") batch_start_time = time.time() # Process batch in parallel BUT handle results as they complete temp = float(os.getenv("GLOSSARY_TEMPERATURE") or config.get('temperature', 0.1)) env_max_output = os.getenv("MAX_OUTPUT_TOKENS") if env_max_output and env_max_output.isdigit(): mtoks = int(env_max_output) else: mtoks = config.get('max_tokens', 4196) batch_entry_count = 0 with ThreadPoolExecutor(max_workers=len(current_batch)) as executor: futures = {} # Submit all chapters in the batch for idx, chap in current_batch: if check_stop(): # Apply deduplication before breaking if glossary: print("๐Ÿ”€ Applying deduplication before stopping...") glossary[:] = skip_duplicate_entries(glossary) save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) break # Get system and user prompts system_prompt, user_prompt = build_prompt(chap) # Build messages if not contextual_enabled: msgs = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] else: msgs = [{"role": "system", "content": system_prompt}] \ + trim_context_history(history, ctx_limit, rolling_window) \ + [{"role": "user", "content": user_prompt}] # Submit to thread pool future = executor.submit( process_single_chapter_api_call, idx, chap, msgs, client, temp, mtoks, check_stop, chunk_timeout ) futures[future] = (idx, chap) # Small yield to keep GUI responsive when submitting many tasks if idx % 5 == 0: time.sleep(0.001) # Small yield to keep GUI responsive when submitting many tasks if idx % 5 == 0: time.sleep(0.001) # Process results AS THEY COMPLETE, not all at once for future in as_completed(futures): if check_stop(): print("๐Ÿ›‘ Stop detected - cancelling all pending operations...") cancelled = cancel_all_futures(list(futures.keys())) if cancelled > 0: print(f"โœ… Cancelled {cancelled} pending API calls") # Apply deduplication before stopping if glossary: print("๐Ÿ”€ Applying deduplication and sorting before exit...") glossary[:] = skip_duplicate_entries(glossary) # Sort glossary custom_types = get_custom_entry_types() type_order = {'character': 0, 'term': 1} other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) for i, t in enumerate(other_types): type_order[t] = i + 2 glossary.sort(key=lambda x: ( type_order.get(x.get('type', 'term'), 999), x.get('raw_name', '').lower() )) save_progress(completed, glossary, history) save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) print(f"โœ… Saved {len(glossary)} deduplicated entries before exit") executor.shutdown(wait=False) break idx, chap = futures[future] try: result = future.result(timeout=0.5) # Process this chapter's results immediately data = result.get('data', []) resp = result.get('resp', '') error = result.get('error') if error: print(f"[Chapter {idx+1}] Error: {error}") completed.append(idx) continue # Process and save entries IMMEDIATELY as each chapter completes if data and len(data) > 0: total_ent = len(data) batch_entry_count += total_ent for eidx, entry in enumerate(data, start=1): elapsed = time.time() - start # Get entry info entry_type = entry.get("type", "?") raw_name = entry.get("raw_name", "?") trans_name = entry.get("translated_name", "?") print(f'[Chapter {idx+1}/{total_chapters}] [{eidx}/{total_ent}] ({elapsed:.1f}s elapsed) โ†’ {entry_type}: {raw_name} ({trans_name})') # Add entry immediately WITHOUT deduplication glossary.append(entry) # Save immediately after EACH entry save_progress(completed, glossary, history) save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) completed.append(idx) # Add to history if contextual is enabled if contextual_enabled and resp and chap: system_prompt, user_prompt = build_prompt(chap) history.append({"user": user_prompt, "assistant": resp}) except Exception as e: if "stopped by user" in str(e).lower(): print(f"โœ… Chapter {idx+1} stopped by user") else: print(f"Error processing chapter {idx+1}: {e}") completed.append(idx) batch_elapsed = time.time() - batch_start_time print(f"[BATCH] Batch {batch_num+1} completed in {batch_elapsed:.1f}s total") # After batch completes, apply deduplication and sorting if batch_entry_count > 0: print(f"\n๐Ÿ”€ Applying deduplication and sorting after batch {batch_num+1}/{total_batches}") original_size = len(glossary) # Apply deduplication to entire glossary glossary[:] = skip_duplicate_entries(glossary) # Sort glossary by type and name custom_types = get_custom_entry_types() type_order = {'character': 0, 'term': 1} other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) for i, t in enumerate(other_types): type_order[t] = i + 2 glossary.sort(key=lambda x: ( type_order.get(x.get('type', 'term'), 999), x.get('raw_name', '').lower() )) deduplicated_size = len(glossary) removed = original_size - deduplicated_size if removed > 0: print(f"โœ… Removed {removed} duplicates (fuzzy threshold: {os.getenv('GLOSSARY_FUZZY_THRESHOLD', '0.90')})") print(f"๐Ÿ“Š Glossary size: {deduplicated_size} unique entries") # Save final deduplicated and sorted glossary save_progress(completed, glossary, history) save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) # Print batch summary if batch_entry_count > 0: print(f"\n๐Ÿ“Š Batch {batch_num+1}/{total_batches} Summary:") print(f" โ€ข Chapters processed: {len(current_batch)}") print(f" โ€ข Total entries extracted: {batch_entry_count}") print(f" โ€ข Glossary size: {len(glossary)} unique entries") # Handle context history if contextual_enabled: if not rolling_window and len(history) >= ctx_limit and ctx_limit > 0: print(f"๐Ÿ”„ Resetting glossary context (reached {ctx_limit} chapter limit)") history = [] prog['context_history'] = [] # Add delay between batches (but not after the last batch) if batch_num < total_batches - 1: print(f"\nโฑ๏ธ Waiting {api_delay}s before next batch...") if not interruptible_sleep(api_delay, check_stop, 0.1): print(f"โŒ Glossary extraction stopped during delay") # Apply deduplication before stopping if glossary: print("๐Ÿ”€ Applying deduplication and sorting before exit...") glossary[:] = skip_duplicate_entries(glossary) # Sort glossary custom_types = get_custom_entry_types() type_order = {'character': 0, 'term': 1} other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) for i, t in enumerate(other_types): type_order[t] = i + 2 glossary.sort(key=lambda x: ( type_order.get(x.get('type', 'term'), 999), x.get('raw_name', '').lower() )) save_progress(completed, glossary, history) save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) print(f"โœ… Saved {len(glossary)} deduplicated entries before exit") return else: # SEQUENTIAL MODE: Original behavior for idx, chap in enumerate(chapters): # Check for stop at the beginning of each chapter if check_stop(): print(f"โŒ Glossary extraction stopped at chapter {idx+1}") return # Apply chapter range filter if range_start is not None and range_end is not None: chapter_num = idx + 1 # 1-based chapter numbering if not (range_start <= chapter_num <= range_end): # Check if this is from a text file is_text_chapter = hasattr(chap, 'filename') and chap.get('filename', '').endswith('.txt') terminology = "Section" if is_text_chapter else "Chapter" print(f"[SKIP] {terminology} {chapter_num} - outside range filter") continue if idx in completed: # Check if processing text file chapters is_text_chapter = hasattr(chap, 'filename') and chap.get('filename', '').endswith('.txt') terminology = "section" if is_text_chapter else "chapter" print(f"Skipping {terminology} {idx+1} (already processed)") continue print(f"๐Ÿ”„ Processing Chapter {idx+1}/{total_chapters}") # Check if history will reset on this chapter if contextual_enabled and len(history) >= ctx_limit and ctx_limit > 0 and not rolling_window: print(f" ๐Ÿ“Œ Glossary context will reset after this chapter (current: {len(history)}/{ctx_limit} chapters)") try: # Get system and user prompts from build_prompt system_prompt, user_prompt = build_prompt(chap) if not contextual_enabled: # No context at all msgs = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] else: # Use context with trim_context_history handling the mode msgs = [{"role": "system", "content": system_prompt}] \ + trim_context_history(history, ctx_limit, rolling_window) \ + [{"role": "user", "content": user_prompt}] total_tokens = sum(count_tokens(m["content"]) for m in msgs) # READ THE TOKEN LIMIT env_value = os.getenv("MAX_INPUT_TOKENS", "1000000").strip() if not env_value or env_value == "": token_limit = None limit_str = "unlimited" elif env_value.isdigit() and int(env_value) > 0: token_limit = int(env_value) limit_str = str(token_limit) else: token_limit = 1000000 limit_str = "1000000 (default)" print(f"[DEBUG] Glossary prompt tokens = {total_tokens} / {limit_str}") # Check if we're over the token limit and need to split if token_limit is not None and total_tokens > token_limit: print(f"โš ๏ธ Chapter {idx+1} exceeds token limit: {total_tokens} > {token_limit}") print(f"๐Ÿ“„ Using ChapterSplitter to split into smaller chunks...") # Calculate available tokens for content system_tokens = chapter_splitter.count_tokens(system_prompt) context_tokens = sum(chapter_splitter.count_tokens(m["content"]) for m in trim_context_history(history, ctx_limit, rolling_window)) safety_margin = 1000 available_tokens = token_limit - system_tokens - context_tokens - safety_margin # Since glossary extraction works with plain text, wrap it in a simple HTML structure chapter_html = f"

{chap.replace(chr(10)+chr(10), '

')}

" # Use ChapterSplitter to split the chapter chunks = chapter_splitter.split_chapter(chapter_html, available_tokens) print(f"๐Ÿ“„ Chapter split into {len(chunks)} chunks") # Process each chunk chapter_glossary_data = [] # Collect data from all chunks for chunk_html, chunk_idx, total_chunks in chunks: if check_stop(): print(f"โŒ Glossary extraction stopped during chunk {chunk_idx} of chapter {idx+1}") return print(f"๐Ÿ”„ Processing chunk {chunk_idx}/{total_chunks} of Chapter {idx+1}") # Extract text from the chunk HTML from bs4 import BeautifulSoup soup = BeautifulSoup(chunk_html, 'html.parser') chunk_text = soup.get_text(strip=True) # Get system and user prompts for chunk chunk_system_prompt, chunk_user_prompt = build_prompt(chunk_text) # Build chunk messages if not contextual_enabled: chunk_msgs = [ {"role": "system", "content": chunk_system_prompt}, {"role": "user", "content": chunk_user_prompt} ] else: chunk_msgs = [{"role": "system", "content": chunk_system_prompt}] \ + trim_context_history(history, ctx_limit, rolling_window) \ + [{"role": "user", "content": chunk_user_prompt}] # API call for chunk try: chunk_raw = send_with_interrupt( messages=chunk_msgs, client=client, temperature=temp, max_tokens=mtoks, stop_check_fn=check_stop, chunk_timeout=chunk_timeout ) except UnifiedClientError as e: if "stopped by user" in str(e).lower(): print(f"โŒ Glossary extraction stopped during chunk {chunk_idx} API call") return elif "timeout" in str(e).lower(): print(f"โš ๏ธ Chunk {chunk_idx} API call timed out: {e}") continue # Skip this chunk else: print(f"โŒ Chunk {chunk_idx} API error: {e}") continue # Skip this chunk except Exception as e: print(f"โŒ Unexpected error in chunk {chunk_idx}: {e}") continue # Skip this chunk # Process chunk response if chunk_raw is None: print(f"โŒ API returned None for chunk {chunk_idx}") continue # Handle different response types if isinstance(chunk_raw, tuple): chunk_resp = chunk_raw[0] if chunk_raw[0] is not None else "" elif isinstance(chunk_raw, str): chunk_resp = chunk_raw elif hasattr(chunk_raw, 'content'): chunk_resp = chunk_raw.content if chunk_raw.content is not None else "" elif hasattr(chunk_raw, 'text'): chunk_resp = chunk_raw.text if chunk_raw.text is not None else "" else: print(f"โŒ Unexpected response type for chunk {chunk_idx}: {type(chunk_raw)}") chunk_resp = str(chunk_raw) if chunk_raw is not None else "" # Ensure resp is a string if not isinstance(chunk_resp, str): print(f"โš ๏ธ Converting non-string response to string for chunk {chunk_idx}") chunk_resp = str(chunk_resp) if chunk_resp is not None else "" # Check if response is empty if not chunk_resp or chunk_resp.strip() == "": print(f"โš ๏ธ Empty response for chunk {chunk_idx}, skipping...") continue # Save chunk response with thread-safe location thread_name = threading.current_thread().name thread_id = threading.current_thread().ident thread_dir = os.path.join("Payloads", "glossary", f"{thread_name}_{thread_id}") os.makedirs(thread_dir, exist_ok=True) with open(os.path.join(thread_dir, f"chunk_response_chap{idx+1}_chunk{chunk_idx}.txt"), "w", encoding="utf-8", errors="replace") as f: f.write(chunk_resp) # Extract data from chunk chunk_resp_data = parse_api_response(chunk_resp) if not chunk_resp_data: print(f"[Warning] No data found in chunk {chunk_idx}, skipping...") continue # The parse_api_response already returns parsed data, no need to parse again try: # Filter out invalid entries directly from chunk_resp_data valid_chunk_data = [] for entry in chunk_resp_data: if validate_extracted_entry(entry): # Clean the raw_name if 'raw_name' in entry: entry['raw_name'] = entry['raw_name'].strip() valid_chunk_data.append(entry) else: print(f"[Debug] Skipped invalid entry in chunk {chunk_idx}: {entry}") chapter_glossary_data.extend(valid_chunk_data) print(f"โœ… Chunk {chunk_idx}/{total_chunks}: extracted {len(valid_chunk_data)} entries") # Add chunk to history if contextual if contextual_enabled: history.append({"user": chunk_user_prompt, "assistant": chunk_resp}) except Exception as e: print(f"[Warning] Error processing chunk {chunk_idx} data: {e}") continue # Add delay between chunks (but not after last chunk) if chunk_idx < total_chunks: print(f"โฑ๏ธ Waiting {api_delay}s before next chunk...") if not interruptible_sleep(api_delay, check_stop, 0.1): print(f"โŒ Glossary extraction stopped during chunk delay") return # Use the collected data from all chunks data = chapter_glossary_data resp = "" # Combined response not needed for progress tracking print(f"โœ… Chapter {idx+1} processed in {len(chunks)} chunks, total entries: {len(data)}") else: # Original single-chapter processing # Check for stop before API call if check_stop(): print(f"โŒ Glossary extraction stopped before API call for chapter {idx+1}") return try: # Use send_with_interrupt for API call raw = send_with_interrupt( messages=msgs, client=client, temperature=temp, max_tokens=mtoks, stop_check_fn=check_stop, chunk_timeout=chunk_timeout ) except UnifiedClientError as e: if "stopped by user" in str(e).lower(): print(f"โŒ Glossary extraction stopped during API call for chapter {idx+1}") return elif "timeout" in str(e).lower(): print(f"โš ๏ธ API call timed out for chapter {idx+1}: {e}") continue else: print(f"โŒ API error for chapter {idx+1}: {e}") continue except Exception as e: print(f"โŒ Unexpected error for chapter {idx+1}: {e}") continue # Handle response if raw is None: print(f"โŒ API returned None for chapter {idx+1}") continue # Handle different response types if isinstance(raw, tuple): resp = raw[0] if raw[0] is not None else "" elif isinstance(raw, str): resp = raw elif hasattr(raw, 'content'): resp = raw.content if raw.content is not None else "" elif hasattr(raw, 'text'): resp = raw.text if raw.text is not None else "" else: print(f"โŒ Unexpected response type for chapter {idx+1}: {type(raw)}") resp = str(raw) if raw is not None else "" # Ensure resp is a string if not isinstance(resp, str): print(f"โš ๏ธ Converting non-string response to string for chapter {idx+1}") resp = str(resp) if resp is not None else "" # NULL CHECK before checking if response is empty if resp is None: print(f"โš ๏ธ Response is None for chapter {idx+1}, skipping...") continue # Check if response is empty if not resp or resp.strip() == "": print(f"โš ๏ธ Empty response for chapter {idx+1}, skipping...") continue # Save the raw response with thread-safe location thread_name = threading.current_thread().name thread_id = threading.current_thread().ident thread_dir = os.path.join("Payloads", "glossary", f"{thread_name}_{thread_id}") os.makedirs(thread_dir, exist_ok=True) with open(os.path.join(thread_dir, f"response_chap{idx+1}.txt"), "w", encoding="utf-8", errors="replace") as f: f.write(resp) # Parse response using the new parser try: data = parse_api_response(resp) except Exception as e: print(f"โŒ Error parsing response for chapter {idx+1}: {e}") print(f" Response preview: {resp[:200] if resp else 'None'}...") continue # Filter out invalid entries valid_data = [] for entry in data: if validate_extracted_entry(entry): # Clean the raw_name if 'raw_name' in entry: entry['raw_name'] = entry['raw_name'].strip() valid_data.append(entry) else: print(f"[Debug] Skipped invalid entry: {entry}") data = valid_data total_ent = len(data) # Log entries for eidx, entry in enumerate(data, start=1): if check_stop(): print(f"โŒ Glossary extraction stopped during entry processing for chapter {idx+1}") return elapsed = time.time() - start if idx == 0 and eidx == 1: eta = 0 else: avg = elapsed / ((idx * 100) + eidx) eta = avg * (total_chapters * 100 - ((idx * 100) + eidx)) # Get entry info based on new format entry_type = entry.get("type", "?") raw_name = entry.get("raw_name", "?") trans_name = entry.get("translated_name", "?") print(f'[Chapter {idx+1}/{total_chapters}] [{eidx}/{total_ent}] ({elapsed:.1f}s elapsed, ETA {eta:.1f}s) โ†’ {entry_type}: {raw_name} ({trans_name})') # Apply skip logic and save glossary.extend(data) glossary[:] = skip_duplicate_entries(glossary) completed.append(idx) # Only add to history if contextual is enabled if contextual_enabled and 'resp' in locals() and resp: history.append({"user": user_prompt, "assistant": resp}) # Reset history when limit reached without rolling window if not rolling_window and len(history) >= ctx_limit and ctx_limit > 0: print(f"๐Ÿ”„ Resetting glossary context (reached {ctx_limit} chapter limit)") history = [] prog['context_history'] = [] save_progress(completed, glossary, history) save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) # Add delay before next API call (but not after the last chapter) if idx < len(chapters) - 1: # Check if we're within the range or if there are more chapters to process next_chapter_in_range = True if range_start is not None and range_end is not None: next_chapter_num = idx + 2 # idx+1 is current, idx+2 is next next_chapter_in_range = (range_start <= next_chapter_num <= range_end) else: # No range filter, check if next chapter is already completed next_chapter_in_range = (idx + 1) not in completed if next_chapter_in_range: print(f"โฑ๏ธ Waiting {api_delay}s before next chapter...") if not interruptible_sleep(api_delay, check_stop, 0.1): print(f"โŒ Glossary extraction stopped during delay") return # Check for stop after processing chapter if check_stop(): print(f"โŒ Glossary extraction stopped after processing chapter {idx+1}") return except Exception as e: print(f"Error at chapter {idx+1}: {e}") import traceback print(f"Full traceback: {traceback.format_exc()}") # Check for stop even after error if check_stop(): print(f"โŒ Glossary extraction stopped after error in chapter {idx+1}") return print(f"Done. Glossary saved to {args.output}") # Also save as CSV format for compatibility try: csv_output = args.output.replace('.json', '.csv') csv_path = os.path.join(glossary_dir, os.path.basename(csv_output)) save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) print(f"Also saved as CSV: {csv_path}") except Exception as e: print(f"[Warning] Could not save CSV format: {e}") def save_progress(completed: List[int], glossary: List[Dict], context_history: List[Dict]): """Save progress to JSON file""" progress_data = { "completed": completed, "glossary": glossary, "context_history": context_history } try: # Use atomic write to prevent corruption temp_file = PROGRESS_FILE + '.tmp' with open(temp_file, 'w', encoding='utf-8') as f: json.dump(progress_data, f, ensure_ascii=False, indent=2) # Replace the old file with the new one if os.path.exists(PROGRESS_FILE): os.remove(PROGRESS_FILE) os.rename(temp_file, PROGRESS_FILE) except Exception as e: print(f"[Warning] Failed to save progress: {e}") # Try direct write as fallback try: with open(PROGRESS_FILE, 'w', encoding='utf-8') as f: json.dump(progress_data, f, ensure_ascii=False, indent=2) except Exception as e2: print(f"[Error] Could not save progress: {e2}") if __name__=='__main__': main()