Spaces:
Running
Running
| # extract_glossary_from_epub.py | |
| import os | |
| import json | |
| import argparse | |
| import zipfile | |
| import time | |
| import sys | |
| import tiktoken | |
| import threading | |
| import queue | |
| import ebooklib | |
| import re | |
| from ebooklib import epub | |
| from chapter_splitter import ChapterSplitter | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import List, Dict, Tuple | |
| from unified_api_client import UnifiedClient, UnifiedClientError | |
| # Fix for PyInstaller - handle stdout reconfigure more carefully | |
| if sys.platform.startswith("win"): | |
| try: | |
| # Try to reconfigure if the method exists | |
| if hasattr(sys.stdout, 'reconfigure'): | |
| sys.stdout.reconfigure(encoding="utf-8", errors="replace") | |
| except (AttributeError, ValueError): | |
| # If reconfigure doesn't work, try to set up UTF-8 another way | |
| import io | |
| import locale | |
| if sys.stdout and hasattr(sys.stdout, 'buffer'): | |
| sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') | |
| MODEL = os.getenv("MODEL", "gemini-2.0-flash") | |
| def interruptible_sleep(duration, check_stop_fn, interval=0.1): | |
| """Sleep that can be interrupted by stop request""" | |
| elapsed = 0 | |
| while elapsed < duration: | |
| if check_stop_fn and check_stop_fn(): # Add safety check for None | |
| return False # Interrupted | |
| sleep_time = min(interval, duration - elapsed) | |
| time.sleep(sleep_time) | |
| elapsed += sleep_time | |
| return True # Completed normally | |
| def cancel_all_futures(futures): | |
| """Cancel all pending futures immediately""" | |
| cancelled_count = 0 | |
| for future in futures: | |
| if not future.done() and future.cancel(): | |
| cancelled_count += 1 | |
| return cancelled_count | |
| def create_client_with_multi_key_support(api_key, model, output_dir, config): | |
| """Create a UnifiedClient with multi API key support if enabled""" | |
| # Check if multi API key mode is enabled | |
| use_multi_keys = config.get('use_multi_api_keys', False) | |
| # Set environment variables for UnifiedClient to pick up | |
| if use_multi_keys and 'multi_api_keys' in config and config['multi_api_keys']: | |
| print("🔑 Multi API Key mode enabled for glossary extraction") | |
| # Set environment variables that UnifiedClient will read | |
| os.environ['USE_MULTI_API_KEYS'] = '1' | |
| os.environ['MULTI_API_KEYS'] = json.dumps(config['multi_api_keys']) | |
| os.environ['FORCE_KEY_ROTATION'] = '1' if config.get('force_key_rotation', True) else '0' | |
| os.environ['ROTATION_FREQUENCY'] = str(config.get('rotation_frequency', 1)) | |
| print(f" • Keys configured: {len(config['multi_api_keys'])}") | |
| print(f" • Force rotation: {config.get('force_key_rotation', True)}") | |
| print(f" • Rotation frequency: every {config.get('rotation_frequency', 1)} request(s)") | |
| else: | |
| # Ensure multi-key mode is disabled in environment | |
| os.environ['USE_MULTI_API_KEYS'] = '0' | |
| # Create UnifiedClient normally - it will check environment variables | |
| return UnifiedClient(api_key=api_key, model=model, output_dir=output_dir) | |
| def send_with_interrupt(messages, client, temperature, max_tokens, stop_check_fn, chunk_timeout=None): | |
| """Send API request with interrupt capability and optional timeout retry""" | |
| result_queue = queue.Queue() | |
| def api_call(): | |
| try: | |
| start_time = time.time() | |
| result = client.send(messages, temperature=temperature, max_tokens=max_tokens, context='glossary') | |
| elapsed = time.time() - start_time | |
| result_queue.put((result, elapsed)) | |
| except Exception as e: | |
| result_queue.put(e) | |
| api_thread = threading.Thread(target=api_call) | |
| api_thread.daemon = True | |
| api_thread.start() | |
| timeout = chunk_timeout if chunk_timeout is not None else 86400 | |
| check_interval = 0.1 | |
| elapsed = 0 | |
| while elapsed < timeout: | |
| try: | |
| # Check for results with shorter timeout | |
| result = result_queue.get(timeout=check_interval) | |
| if isinstance(result, Exception): | |
| raise result | |
| if isinstance(result, tuple): | |
| api_result, api_time = result | |
| if chunk_timeout and api_time > chunk_timeout: | |
| if hasattr(client, '_in_cleanup'): | |
| client._in_cleanup = True | |
| if hasattr(client, 'cancel_current_operation'): | |
| client.cancel_current_operation() | |
| raise UnifiedClientError(f"API call took {api_time:.1f}s (timeout: {chunk_timeout}s)") | |
| return api_result | |
| return result | |
| except queue.Empty: | |
| if stop_check_fn(): | |
| # More aggressive cancellation | |
| print("🛑 Stop requested - cancelling API call immediately...") | |
| # Set cleanup flag | |
| if hasattr(client, '_in_cleanup'): | |
| client._in_cleanup = True | |
| # Try to cancel the operation | |
| if hasattr(client, 'cancel_current_operation'): | |
| client.cancel_current_operation() | |
| # Don't wait for the thread to finish - just raise immediately | |
| raise UnifiedClientError("Glossary extraction stopped by user") | |
| elapsed += check_interval | |
| # Timeout occurred | |
| if hasattr(client, '_in_cleanup'): | |
| client._in_cleanup = True | |
| if hasattr(client, 'cancel_current_operation'): | |
| client.cancel_current_operation() | |
| raise UnifiedClientError(f"API call timed out after {timeout} seconds") | |
| # Parse token limit from environment variable (same logic as translation) | |
| def parse_glossary_token_limit(): | |
| """Parse token limit from environment variable""" | |
| env_value = os.getenv("GLOSSARY_TOKEN_LIMIT", "1000000").strip() | |
| if not env_value or env_value == "": | |
| return None, "unlimited" | |
| if env_value.lower() == "unlimited": | |
| return None, "unlimited" | |
| if env_value.isdigit() and int(env_value) > 0: | |
| limit = int(env_value) | |
| return limit, str(limit) | |
| # Default fallback | |
| return 1000000, "1000000 (default)" | |
| MAX_GLOSSARY_TOKENS, GLOSSARY_LIMIT_STR = parse_glossary_token_limit() | |
| # Global stop flag for GUI integration | |
| _stop_requested = False | |
| def set_stop_flag(value): | |
| """Set the global stop flag""" | |
| global _stop_requested | |
| _stop_requested = value | |
| # When clearing the stop flag, also clear the multi-key environment variable | |
| if not value: | |
| os.environ['TRANSLATION_CANCELLED'] = '0' | |
| # Also clear UnifiedClient global flag | |
| try: | |
| import unified_api_client | |
| if hasattr(unified_api_client, 'UnifiedClient'): | |
| unified_api_client.UnifiedClient._global_cancelled = False | |
| except: | |
| pass | |
| def is_stop_requested(): | |
| """Check if stop was requested""" | |
| global _stop_requested | |
| return _stop_requested | |
| # ─── resilient tokenizer setup ─── | |
| try: | |
| enc = tiktoken.encoding_for_model(MODEL) | |
| except Exception: | |
| try: | |
| enc = tiktoken.get_encoding("cl100k_base") | |
| except Exception: | |
| enc = None | |
| def count_tokens(text: str) -> int: | |
| if enc: | |
| return len(enc.encode(text)) | |
| # crude fallback: assume ~1 token per 4 chars | |
| return max(1, len(text) // 4) | |
| from ebooklib import epub | |
| from bs4 import BeautifulSoup | |
| from unified_api_client import UnifiedClient | |
| from typing import List, Dict | |
| import re | |
| PROGRESS_FILE = "glossary_progress.json" | |
| def remove_honorifics(name): | |
| """Remove common honorifics from names""" | |
| if not name: | |
| return name | |
| # Check if honorifics filtering is disabled | |
| if os.getenv('GLOSSARY_DISABLE_HONORIFICS_FILTER', '0') == '1': | |
| return name.strip() | |
| # Modern Korean honorifics | |
| korean_honorifics = [ | |
| '님', '씨', '씨는', '군', '양', '선생님', '선생', '사장님', '사장', | |
| '과장님', '과장', '대리님', '대리', '주임님', '주임', '이사님', '이사', | |
| '부장님', '부장', '차장님', '차장', '팀장님', '팀장', '실장님', '실장', | |
| '교수님', '교수', '박사님', '박사', '원장님', '원장', '회장님', '회장', | |
| '소장님', '소장', '전무님', '전무', '상무님', '상무', '이사장님', '이사장' | |
| ] | |
| # Archaic/Historical Korean honorifics | |
| korean_archaic = [ | |
| '공', '옹', '어른', '나리', '나으리', '대감', '영감', '마님', '마마', | |
| '대군', '군', '옹주', '공주', '왕자', '세자', '영애', '영식', '도령', | |
| '낭자', '낭군', '서방', '영감님', '대감님', '마님', '아씨', '도련님', | |
| '아가씨', '나으리', '진사', '첨지', '영의정', '좌의정', '우의정', | |
| '판서', '참판', '정승', '대원군' | |
| ] | |
| # Modern Japanese honorifics | |
| japanese_honorifics = [ | |
| 'さん', 'さま', '様', 'くん', '君', 'ちゃん', 'せんせい', '先生', | |
| 'どの', '殿', 'たん', 'ぴょん', 'ぽん', 'ちん', 'りん', 'せんぱい', | |
| '先輩', 'こうはい', '後輩', 'し', '氏', 'ふじん', '夫人', 'かちょう', | |
| '課長', 'ぶちょう', '部長', 'しゃちょう', '社長' | |
| ] | |
| # Archaic/Historical Japanese honorifics | |
| japanese_archaic = [ | |
| 'どの', '殿', 'たいゆう', '大夫', 'きみ', '公', 'あそん', '朝臣', | |
| 'おみ', '臣', 'むらじ', '連', 'みこと', '命', '尊', 'ひめ', '姫', | |
| 'みや', '宮', 'おう', '王', 'こう', '侯', 'はく', '伯', 'し', '子', | |
| 'だん', '男', 'じょ', '女', 'ひこ', '彦', 'ひめみこ', '姫御子', | |
| 'すめらみこと', '天皇', 'きさき', '后', 'みかど', '帝' | |
| ] | |
| # Modern Chinese honorifics | |
| chinese_honorifics = [ | |
| '先生', '女士', '小姐', '老师', '师傅', '大人', '公', '君', '总', | |
| '老总', '老板', '经理', '主任', '处长', '科长', '股长', '教授', | |
| '博士', '院长', '校长', '同志', '师兄', '师姐', '师弟', '师妹', | |
| '学长', '学姐', '前辈', '阁下' | |
| ] | |
| # Archaic/Historical Chinese honorifics | |
| chinese_archaic = [ | |
| '公', '侯', '伯', '子', '男', '王', '君', '卿', '大夫', '士', | |
| '陛下', '殿下', '阁下', '爷', '老爷', '大人', '夫人', '娘娘', | |
| '公子', '公主', '郡主', '世子', '太子', '皇上', '皇后', '贵妃', | |
| '娘子', '相公', '官人', '郎君', '小姐', '姑娘', '公公', '嬷嬷', | |
| '大侠', '少侠', '前辈', '晚辈', '在下', '足下', '兄台', '仁兄', | |
| '贤弟', '老夫', '老朽', '本座', '本尊', '真人', '上人', '尊者' | |
| ] | |
| # Combine all honorifics | |
| all_honorifics = ( | |
| korean_honorifics + korean_archaic + | |
| japanese_honorifics + japanese_archaic + | |
| chinese_honorifics + chinese_archaic | |
| ) | |
| # Remove honorifics from the end of the name | |
| name_cleaned = name.strip() | |
| # Sort by length (longest first) to avoid partial matches | |
| sorted_honorifics = sorted(all_honorifics, key=len, reverse=True) | |
| for honorific in sorted_honorifics: | |
| if name_cleaned.endswith(honorific): | |
| name_cleaned = name_cleaned[:-len(honorific)].strip() | |
| # Only remove one honorific per pass | |
| break | |
| return name_cleaned | |
| def set_output_redirect(log_callback=None): | |
| """Redirect print statements to a callback function for GUI integration""" | |
| if log_callback: | |
| import sys | |
| import io | |
| class CallbackWriter: | |
| def __init__(self, callback): | |
| self.callback = callback | |
| self.buffer = "" | |
| def write(self, text): | |
| if text.strip(): | |
| self.callback(text.strip()) | |
| def flush(self): | |
| pass | |
| sys.stdout = CallbackWriter(log_callback) | |
| def load_config(path: str) -> Dict: | |
| with open(path, 'r', encoding='utf-8') as f: | |
| cfg = json.load(f) | |
| # override context_limit_chapters if GUI passed GLOSSARY_CONTEXT_LIMIT | |
| env_limit = os.getenv("GLOSSARY_CONTEXT_LIMIT") | |
| if env_limit is not None: | |
| try: | |
| cfg['context_limit_chapters'] = int(env_limit) | |
| except ValueError: | |
| pass # keep existing config value on parse error | |
| # override temperature if GUI passed GLOSSARY_TEMPERATURE | |
| env_temp = os.getenv("GLOSSARY_TEMPERATURE") | |
| if env_temp is not None: | |
| try: | |
| cfg['temperature'] = float(env_temp) | |
| except ValueError: | |
| pass # keep existing config value on parse error | |
| return cfg | |
| def get_custom_entry_types(): | |
| """Get custom entry types configuration from environment""" | |
| try: | |
| types_json = os.getenv('GLOSSARY_CUSTOM_ENTRY_TYPES', '{}') | |
| result = json.loads(types_json) | |
| # If empty, return defaults | |
| if not result: | |
| return { | |
| 'character': {'enabled': True, 'has_gender': True}, | |
| 'term': {'enabled': True, 'has_gender': False} | |
| } | |
| return result | |
| except: | |
| # Default configuration | |
| return { | |
| 'character': {'enabled': True, 'has_gender': True}, | |
| 'term': {'enabled': True, 'has_gender': False} | |
| } | |
| def save_glossary_json(glossary: List[Dict], output_path: str): | |
| """Save glossary in the new simple format with automatic sorting by type""" | |
| # Get custom types for sorting order | |
| custom_types = get_custom_entry_types() | |
| # Create sorting order: character=0, term=1, others alphabetically starting from 2 | |
| type_order = {'character': 0, 'term': 1} | |
| other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) | |
| for i, t in enumerate(other_types): | |
| type_order[t] = i + 2 | |
| # Sort glossary by type order, then by raw_name | |
| sorted_glossary = sorted(glossary, key=lambda x: ( | |
| type_order.get(x.get('type', 'term'), 999), # Unknown types go last | |
| x.get('raw_name', '').lower() | |
| )) | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(sorted_glossary, f, ensure_ascii=False, indent=2) | |
| def save_glossary_csv(glossary: List[Dict], output_path: str): | |
| """Save glossary in CSV or token-efficient format based on environment variable""" | |
| import csv | |
| csv_path = output_path.replace('.json', '.csv') | |
| # Get custom types for sorting order and gender info | |
| custom_types = get_custom_entry_types() | |
| # Create sorting order | |
| type_order = {'character': 0, 'term': 1} | |
| other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) | |
| for i, t in enumerate(other_types): | |
| type_order[t] = i + 2 | |
| # Sort glossary | |
| sorted_glossary = sorted(glossary, key=lambda x: ( | |
| type_order.get(x.get('type', 'term'), 999), | |
| x.get('raw_name', '').lower() | |
| )) | |
| # Check if we should use legacy CSV format | |
| use_legacy_format = os.getenv('GLOSSARY_USE_LEGACY_CSV', '0') == '1' | |
| if use_legacy_format: | |
| # LEGACY CSV FORMAT | |
| with open(csv_path, 'w', encoding='utf-8', newline='') as f: | |
| writer = csv.writer(f) | |
| # Build header row | |
| header = ['type', 'raw_name', 'translated_name', 'gender'] | |
| # Add any custom fields to header | |
| custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') | |
| try: | |
| custom_fields = json.loads(custom_fields_json) | |
| header.extend(custom_fields) | |
| except: | |
| custom_fields = [] | |
| # Write header row | |
| writer.writerow(header) | |
| # Write data rows | |
| for entry in sorted_glossary: | |
| entry_type = entry.get('type', 'term') | |
| type_config = custom_types.get(entry_type, {}) | |
| # Base row: type, raw_name, translated_name | |
| row = [entry_type, entry.get('raw_name', ''), entry.get('translated_name', '')] | |
| # Add gender only if type supports it | |
| if type_config.get('has_gender', False): | |
| row.append(entry.get('gender', '')) | |
| # Add custom field values | |
| for field in custom_fields: | |
| row.append(entry.get(field, '')) | |
| # Count how many fields we SHOULD have | |
| expected_fields = 4 + len(custom_fields) # type, raw_name, translated_name, gender + custom fields | |
| # Only trim if we have MORE than expected (extra trailing empties) | |
| while len(row) > expected_fields and row[-1] == '': | |
| row.pop() | |
| # Ensure minimum required fields (type, raw_name, translated_name) | |
| while len(row) < 3: | |
| row.append('') | |
| # Write row | |
| writer.writerow(row) | |
| print(f"✅ Saved legacy CSV format: {csv_path}") | |
| else: | |
| # NEW TOKEN-EFFICIENT FORMAT (DEFAULT) | |
| # Group entries by type | |
| grouped_entries = {} | |
| for entry in sorted_glossary: | |
| entry_type = entry.get('type', 'term') | |
| if entry_type not in grouped_entries: | |
| grouped_entries[entry_type] = [] | |
| grouped_entries[entry_type].append(entry) | |
| # Get custom fields configuration | |
| custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') | |
| try: | |
| custom_fields = json.loads(custom_fields_json) | |
| except: | |
| custom_fields = [] | |
| # Write as plain text format for token efficiency | |
| with open(csv_path, 'w', encoding='utf-8') as f: | |
| # Write header | |
| f.write("Glossary: Characters, Terms, and Important Elements\n\n") | |
| # Process each type group | |
| for entry_type in sorted(grouped_entries.keys(), key=lambda x: type_order.get(x, 999)): | |
| entries = grouped_entries[entry_type] | |
| type_config = custom_types.get(entry_type, {}) | |
| # Write section header | |
| section_name = entry_type.upper() + 'S' if not entry_type.upper().endswith('S') else entry_type.upper() | |
| f.write(f"=== {section_name} ===\n") | |
| # Write entries for this type with indentation | |
| for entry in entries: | |
| # Build the entry line | |
| raw_name = entry.get('raw_name', '') | |
| translated_name = entry.get('translated_name', '') | |
| # Start with asterisk and name | |
| line = f"* {translated_name} ({raw_name})" | |
| # Add gender if applicable and not Unknown | |
| if type_config.get('has_gender', False): | |
| gender = entry.get('gender', '') | |
| if gender and gender != 'Unknown': | |
| line += f" [{gender}]" | |
| # Add custom field values if they exist | |
| custom_field_parts = [] | |
| for field in custom_fields: | |
| value = entry.get(field, '').strip() | |
| if value: | |
| # For description fields, add as continuation | |
| if field.lower() in ['description', 'notes', 'details']: | |
| line += f": {value}" | |
| else: | |
| custom_field_parts.append(f"{field}: {value}") | |
| # Add other custom fields in parentheses | |
| if custom_field_parts: | |
| line += f" ({', '.join(custom_field_parts)})" | |
| # Write the line | |
| f.write(line + "\n") | |
| # Add blank line between sections | |
| f.write("\n") | |
| print(f"✅ Saved token-efficient glossary: {csv_path}") | |
| # Print summary for both formats | |
| type_counts = {} | |
| for entry_type in grouped_entries: | |
| type_counts[entry_type] = len(grouped_entries[entry_type]) | |
| total = sum(type_counts.values()) | |
| print(f" Total entries: {total}") | |
| for entry_type, count in type_counts.items(): | |
| print(f" - {entry_type}: {count} entries") | |
| def extract_chapters_from_epub(epub_path: str) -> List[str]: | |
| chapters = [] | |
| items = [] | |
| # Add this helper function | |
| def is_html_document(item): | |
| """Check if an EPUB item is an HTML document""" | |
| if hasattr(item, 'media_type'): | |
| return item.media_type in [ | |
| 'application/xhtml+xml', | |
| 'text/html', | |
| 'application/html+xml', | |
| 'text/xml' | |
| ] | |
| # Fallback for items that don't have media_type | |
| if hasattr(item, 'get_name'): | |
| name = item.get_name() | |
| return name.lower().endswith(('.html', '.xhtml', '.htm')) | |
| return False | |
| try: | |
| # Add stop check before reading | |
| if is_stop_requested(): | |
| return [] | |
| book = epub.read_epub(epub_path) | |
| # Replace the problematic line with media type checking | |
| items = [item for item in book.get_items() if is_html_document(item)] | |
| except Exception as e: | |
| print(f"[Warning] Manifest load failed, falling back to raw EPUB scan: {e}") | |
| try: | |
| with zipfile.ZipFile(epub_path, 'r') as zf: | |
| names = [n for n in zf.namelist() if n.lower().endswith(('.html', '.xhtml'))] | |
| for name in names: | |
| # Add stop check in loop | |
| if is_stop_requested(): | |
| return chapters | |
| try: | |
| data = zf.read(name) | |
| items.append(type('X', (), { | |
| 'get_content': lambda self, data=data: data, | |
| 'get_name': lambda self, name=name: name, | |
| 'media_type': 'text/html' # Add media_type for consistency | |
| })()) | |
| except Exception: | |
| print(f"[Warning] Could not read zip file entry: {name}") | |
| except Exception as ze: | |
| print(f"[Fatal] Cannot open EPUB as zip: {ze}") | |
| return chapters | |
| for item in items: | |
| # Add stop check before processing each chapter | |
| if is_stop_requested(): | |
| return chapters | |
| try: | |
| raw = item.get_content() | |
| soup = BeautifulSoup(raw, 'html.parser') | |
| text = soup.get_text("\n", strip=True) | |
| if text: | |
| chapters.append(text) | |
| except Exception as e: | |
| name = item.get_name() if hasattr(item, 'get_name') else repr(item) | |
| print(f"[Warning] Skipped corrupted chapter {name}: {e}") | |
| return chapters | |
| def trim_context_history(history: List[Dict], limit: int, rolling_window: bool = False) -> List[Dict]: | |
| """ | |
| Handle context history with either reset or rolling window mode | |
| Args: | |
| history: List of conversation history | |
| limit: Maximum number of exchanges to keep | |
| rolling_window: Whether to use rolling window mode | |
| """ | |
| # Count current exchanges | |
| current_exchanges = len(history) | |
| # Handle based on mode | |
| if limit > 0 and current_exchanges >= limit: | |
| if rolling_window: | |
| # Rolling window: keep the most recent exchanges | |
| print(f"🔄 Rolling glossary context window: keeping last {limit} chapters") | |
| # Keep only the most recent exchanges | |
| history = history[-(limit-1):] if limit > 1 else [] | |
| else: | |
| # Reset mode (original behavior) | |
| print(f"🔄 Reset glossary context after {limit} chapters") | |
| return [] # Return empty to reset context | |
| # Convert to message format | |
| trimmed = [] | |
| for entry in history: | |
| trimmed.append({"role": "user", "content": entry["user"]}) | |
| trimmed.append({"role": "assistant", "content": entry["assistant"]}) | |
| return trimmed | |
| def load_progress() -> Dict: | |
| if os.path.exists(PROGRESS_FILE): | |
| with open(PROGRESS_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| return {"completed": [], "glossary": [], "context_history": []} | |
| def parse_api_response(response_text: str) -> List[Dict]: | |
| """Parse API response to extract glossary entries - handles custom types""" | |
| entries = [] | |
| # Get enabled types from custom configuration | |
| custom_types = get_custom_entry_types() | |
| enabled_types = [t for t, cfg in custom_types.items() if cfg.get('enabled', True)] | |
| # First try JSON parsing | |
| try: | |
| # Clean up response text | |
| cleaned_text = response_text.strip() | |
| # Remove markdown code blocks if present | |
| if '```json' in cleaned_text or '```' in cleaned_text: | |
| import re | |
| code_block_match = re.search(r'```(?:json)?\s*(.*?)\s*```', cleaned_text, re.DOTALL) | |
| if code_block_match: | |
| cleaned_text = code_block_match.group(1) | |
| # Try to find JSON array or object | |
| import re | |
| json_match = re.search(r'[\[\{].*[\]\}]', cleaned_text, re.DOTALL) | |
| if json_match: | |
| json_str = json_match.group(0) | |
| data = json.loads(json_str) | |
| if isinstance(data, list): | |
| for item in data: | |
| if isinstance(item, dict): | |
| # Check if entry type is enabled | |
| entry_type = item.get('type', '').lower() | |
| # Handle legacy format where type is the key | |
| if not entry_type: | |
| for type_name in enabled_types: | |
| if type_name in item: | |
| entry_type = type_name | |
| fixed_entry = { | |
| 'type': type_name, | |
| 'raw_name': item.get(type_name, ''), | |
| 'translated_name': item.get('translated_name', '') | |
| } | |
| # Add gender if type supports it | |
| if custom_types.get(type_name, {}).get('has_gender', False): | |
| fixed_entry['gender'] = item.get('gender', 'Unknown') | |
| # Copy other fields | |
| for k, v in item.items(): | |
| if k not in [type_name, 'translated_name', 'gender', 'type', 'raw_name']: | |
| fixed_entry[k] = v | |
| entries.append(fixed_entry) | |
| break | |
| else: | |
| # Standard format with type field | |
| if entry_type in enabled_types: | |
| entries.append(item) | |
| return entries | |
| elif isinstance(data, dict): | |
| # Handle single entry | |
| entry_type = data.get('type', '').lower() | |
| if entry_type in enabled_types: | |
| return [data] | |
| # Check for wrapper | |
| for key in ['entries', 'glossary', 'characters', 'terms', 'data']: | |
| if key in data and isinstance(data[key], list): | |
| return parse_api_response(json.dumps(data[key])) | |
| return [] | |
| except (json.JSONDecodeError, AttributeError) as e: | |
| print(f"[Debug] JSON parsing failed: {e}") | |
| pass | |
| # CSV-like format parsing | |
| lines = response_text.strip().split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if not line or line.startswith('#'): | |
| continue | |
| # Skip header lines | |
| if 'type' in line.lower() and 'raw_name' in line.lower(): | |
| continue | |
| # Parse CSV | |
| parts = [] | |
| current_part = [] | |
| in_quotes = False | |
| for char in line + ',': | |
| if char == '"': | |
| in_quotes = not in_quotes | |
| elif char == ',' and not in_quotes: | |
| parts.append(''.join(current_part).strip()) | |
| current_part = [] | |
| else: | |
| current_part.append(char) | |
| if parts and parts[-1] == '': | |
| parts = parts[:-1] | |
| if len(parts) >= 3: | |
| entry_type = parts[0].lower() | |
| # Check if type is enabled | |
| if entry_type not in enabled_types: | |
| continue | |
| entry = { | |
| 'type': entry_type, | |
| 'raw_name': parts[1], | |
| 'translated_name': parts[2] | |
| } | |
| # Add gender if type supports it and it's provided | |
| type_config = custom_types.get(entry_type, {}) | |
| if type_config.get('has_gender', False) and len(parts) > 3 and parts[3]: | |
| entry['gender'] = parts[3] | |
| elif type_config.get('has_gender', False): | |
| entry['gender'] = 'Unknown' | |
| # Add any custom fields | |
| custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') | |
| try: | |
| custom_fields = json.loads(custom_fields_json) | |
| start_idx = 4 # Always 4, not conditional | |
| for i, field in enumerate(custom_fields): | |
| if len(parts) > start_idx + i: | |
| field_value = parts[start_idx + i] | |
| if field_value: # Only add if not empty | |
| entry[field] = field_value | |
| except: | |
| pass | |
| entries.append(entry) | |
| return entries | |
| def validate_extracted_entry(entry): | |
| """Validate that extracted entry has required fields and enabled type""" | |
| if 'type' not in entry: | |
| return False | |
| # Check if type is enabled | |
| custom_types = get_custom_entry_types() | |
| entry_type = entry.get('type', '').lower() | |
| if entry_type not in custom_types: | |
| return False | |
| if not custom_types[entry_type].get('enabled', True): | |
| return False | |
| # Must have raw_name and translated_name | |
| if 'raw_name' not in entry or not entry['raw_name']: | |
| return False | |
| if 'translated_name' not in entry or not entry['translated_name']: | |
| return False | |
| return True | |
| def build_prompt(chapter_text: str) -> tuple: | |
| """Build the extraction prompt with custom types - returns (system_prompt, user_prompt)""" | |
| custom_prompt = os.getenv('GLOSSARY_SYSTEM_PROMPT', '').strip() | |
| if not custom_prompt: | |
| # If no custom prompt, create a default | |
| custom_prompt = """Extract all character names and important terms from the text. | |
| {fields} | |
| Only include entries that appear in the text. | |
| Return the data in the exact format specified above.""" | |
| # Check if the prompt contains {fields} placeholder | |
| if '{fields}' in custom_prompt: | |
| # Get enabled types | |
| custom_types = get_custom_entry_types() | |
| enabled_types = [(t, cfg) for t, cfg in custom_types.items() if cfg.get('enabled', True)] | |
| # Get custom fields | |
| custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') | |
| try: | |
| custom_fields = json.loads(custom_fields_json) | |
| except: | |
| custom_fields = [] | |
| # Build fields specification based on what the prompt expects | |
| # Check if the prompt mentions CSV or JSON to determine format | |
| if 'CSV' in custom_prompt.upper(): | |
| # CSV format | |
| fields_spec = [] | |
| # Show the header format | |
| header_parts = ['type', 'raw_name', 'translated_name', 'gender'] | |
| if custom_fields: | |
| header_parts.extend(custom_fields) | |
| fields_spec.append(','.join(header_parts)) | |
| # Show examples for each type | |
| for type_name, type_config in enabled_types: | |
| example_parts = [type_name, '<name in original language>', '<English translation>'] | |
| # Add gender field | |
| if type_config.get('has_gender', False): | |
| example_parts.append('<Male/Female/Unknown>') | |
| else: | |
| example_parts.append('') # Empty for non-character types | |
| # Add custom field placeholders | |
| for field in custom_fields: | |
| example_parts.append(f'<{field} value>') | |
| fields_spec.append(','.join(example_parts)) | |
| fields_str = '\n'.join(fields_spec) | |
| else: | |
| # JSON format (default) | |
| fields_spec = [] | |
| fields_spec.append("Extract entities and return as a JSON array.") | |
| fields_spec.append("Each entry must be a JSON object with these exact fields:") | |
| fields_spec.append("") | |
| for type_name, type_config in enabled_types: | |
| fields_spec.append(f"For {type_name}s:") | |
| fields_spec.append(f' "type": "{type_name}" (required)') | |
| fields_spec.append(' "raw_name": the name in original language/script (required)') | |
| fields_spec.append(' "translated_name": English translation or romanization (required)') | |
| if type_config.get('has_gender', False): | |
| fields_spec.append(' "gender": "Male", "Female", or "Unknown" (required for characters)') | |
| fields_spec.append("") | |
| # Add custom fields info | |
| if custom_fields: | |
| fields_spec.append("Additional custom fields to include:") | |
| for field in custom_fields: | |
| fields_spec.append(f' "{field}": appropriate value') | |
| fields_spec.append("") | |
| # Add example | |
| if enabled_types: | |
| fields_spec.append("Example output format:") | |
| fields_spec.append('[') | |
| examples = [] | |
| if 'character' in [t[0] for t in enabled_types]: | |
| example = ' {"type": "character", "raw_name": "田中太郎", "translated_name": "Tanaka Taro", "gender": "Male"' | |
| for field in custom_fields: | |
| example += f', "{field}": "example value"' | |
| example += '}' | |
| examples.append(example) | |
| if 'term' in [t[0] for t in enabled_types]: | |
| example = ' {"type": "term", "raw_name": "東京駅", "translated_name": "Tokyo Station"' | |
| for field in custom_fields: | |
| example += f', "{field}": "example value"' | |
| example += '}' | |
| examples.append(example) | |
| fields_spec.append(',\n'.join(examples)) | |
| fields_spec.append(']') | |
| fields_str = '\n'.join(fields_spec) | |
| # Replace {fields} placeholder | |
| system_prompt = custom_prompt.replace('{fields}', fields_str) | |
| else: | |
| # No {fields} placeholder - use the prompt as-is | |
| system_prompt = custom_prompt | |
| # Remove any {chapter_text} placeholders from system prompt | |
| system_prompt = system_prompt.replace('{chapter_text}', '') | |
| system_prompt = system_prompt.replace('{{chapter_text}}', '') | |
| system_prompt = system_prompt.replace('{text}', '') | |
| system_prompt = system_prompt.replace('{{text}}', '') | |
| # Strip any trailing "Text:" or similar | |
| system_prompt = system_prompt.rstrip() | |
| if system_prompt.endswith('Text:'): | |
| system_prompt = system_prompt[:-5].rstrip() | |
| # User prompt is just the chapter text | |
| user_prompt = chapter_text | |
| return (system_prompt, user_prompt) | |
| def skip_duplicate_entries(glossary): | |
| """ | |
| Skip entries with duplicate raw names using fuzzy matching. | |
| Returns deduplicated list maintaining first occurrence of each unique raw name. | |
| """ | |
| import difflib | |
| # Get fuzzy threshold from environment | |
| fuzzy_threshold = float(os.getenv('GLOSSARY_FUZZY_THRESHOLD', '0.9')) | |
| seen_raw_names = [] # List of (cleaned_name, original_entry) tuples | |
| deduplicated = [] | |
| skipped_count = 0 | |
| for entry in glossary: | |
| # Get raw_name and clean it | |
| raw_name = entry.get('raw_name', '') | |
| if not raw_name: | |
| continue | |
| # Remove honorifics for comparison (unless disabled) | |
| cleaned_name = remove_honorifics(raw_name) | |
| # Check for fuzzy matches with seen names | |
| is_duplicate = False | |
| for seen_clean, seen_original in seen_raw_names: | |
| similarity = difflib.SequenceMatcher(None, cleaned_name.lower(), seen_clean.lower()).ratio() | |
| if similarity >= fuzzy_threshold: | |
| skipped_count += 1 | |
| print(f"[Skip] Duplicate entry: {raw_name} (cleaned: {cleaned_name}) - {similarity*100:.1f}% match with {seen_original}") | |
| is_duplicate = True | |
| break | |
| if not is_duplicate: | |
| # Add to seen list and keep the entry | |
| seen_raw_names.append((cleaned_name, entry.get('raw_name', ''))) | |
| deduplicated.append(entry) | |
| if skipped_count > 0: | |
| print(f"⏭️ Skipped {skipped_count} duplicate entries (threshold: {fuzzy_threshold:.2f})") | |
| print(f"✅ Kept {len(deduplicated)} unique entries") | |
| return deduplicated | |
| # Batch processing functions | |
| def process_chapter_batch(chapters_batch: List[Tuple[int, str]], | |
| client: UnifiedClient, | |
| config: Dict, | |
| contextual_enabled: bool, | |
| history: List[Dict], | |
| ctx_limit: int, | |
| rolling_window: bool, | |
| check_stop, | |
| chunk_timeout: int = None) -> List[Dict]: | |
| """ | |
| Process a batch of chapters in parallel with improved interrupt support | |
| """ | |
| temp = float(os.getenv("GLOSSARY_TEMPERATURE") or config.get('temperature', 0.1)) | |
| env_max_output = os.getenv("MAX_OUTPUT_TOKENS") | |
| if env_max_output and env_max_output.isdigit(): | |
| mtoks = int(env_max_output) | |
| else: | |
| mtoks = config.get('max_tokens', 4196) | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=len(chapters_batch)) as executor: | |
| futures = {} | |
| for idx, chap in chapters_batch: | |
| if check_stop(): | |
| break | |
| # Get system and user prompts | |
| system_prompt, user_prompt = build_prompt(chap) | |
| # Build messages correctly with system and user prompts | |
| if not contextual_enabled: | |
| msgs = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| else: | |
| msgs = [{"role": "system", "content": system_prompt}] \ | |
| + trim_context_history(history, ctx_limit, rolling_window) \ | |
| + [{"role": "user", "content": user_prompt}] | |
| # Submit to thread pool | |
| future = executor.submit( | |
| process_single_chapter_api_call, | |
| idx, chap, msgs, client, temp, mtoks, check_stop, chunk_timeout | |
| ) | |
| futures[future] = (idx, chap) | |
| # Process results with better cancellation | |
| for future in as_completed(futures): # Removed timeout - let futures complete | |
| if check_stop(): | |
| print("🛑 Stop detected - cancelling all pending operations...") | |
| # Cancel all pending futures immediately | |
| cancelled = cancel_all_futures(list(futures.keys())) | |
| if cancelled > 0: | |
| print(f"✅ Cancelled {cancelled} pending API calls") | |
| # Shutdown executor immediately | |
| executor.shutdown(wait=False) | |
| break | |
| idx, chap = futures[future] | |
| try: | |
| result = future.result(timeout=0.5) # Short timeout on result retrieval | |
| # Ensure chap is added to result here if not already present | |
| if 'chap' not in result: | |
| result['chap'] = chap | |
| results.append(result) | |
| except Exception as e: | |
| if "stopped by user" in str(e).lower(): | |
| print(f"✅ Chapter {idx+1} stopped by user") | |
| else: | |
| print(f"Error processing chapter {idx+1}: {e}") | |
| results.append({ | |
| 'idx': idx, | |
| 'data': [], | |
| 'resp': "", | |
| 'chap': chap, | |
| 'error': str(e) | |
| }) | |
| # Sort results by chapter index | |
| results.sort(key=lambda x: x['idx']) | |
| return results | |
| def process_single_chapter_api_call(idx: int, chap: str, msgs: List[Dict], | |
| client: UnifiedClient, temp: float, mtoks: int, | |
| stop_check_fn, chunk_timeout: int = None) -> Dict: | |
| """Process a single chapter API call with thread-safe payload handling""" | |
| # APPLY INTERRUPTIBLE THREADING DELAY FIRST | |
| thread_delay = float(os.getenv("THREAD_SUBMISSION_DELAY_SECONDS", "0.5")) | |
| if thread_delay > 0: | |
| # Check if we need to wait (same logic as unified_api_client) | |
| if hasattr(client, '_thread_submission_lock') and hasattr(client, '_last_thread_submission_time'): | |
| with client._thread_submission_lock: | |
| current_time = time.time() | |
| time_since_last = current_time - client._last_thread_submission_time | |
| if time_since_last < thread_delay: | |
| sleep_time = thread_delay - time_since_last | |
| thread_name = threading.current_thread().name | |
| # PRINT BEFORE THE DELAY STARTS | |
| print(f"🧵 [{thread_name}] Applying thread delay: {sleep_time:.1f}s for Chapter {idx+1}") | |
| # Interruptible sleep - check stop flag every 0.1 seconds | |
| elapsed = 0 | |
| check_interval = 0.1 | |
| while elapsed < sleep_time: | |
| if stop_check_fn(): | |
| print(f"🛑 Threading delay interrupted by stop flag") | |
| raise UnifiedClientError("Glossary extraction stopped by user during threading delay") | |
| sleep_chunk = min(check_interval, sleep_time - elapsed) | |
| time.sleep(sleep_chunk) | |
| elapsed += sleep_chunk | |
| client._last_thread_submission_time = time.time() | |
| if not hasattr(client, '_thread_submission_count'): | |
| client._thread_submission_count = 0 | |
| client._thread_submission_count += 1 | |
| start_time = time.time() | |
| print(f"[BATCH] Starting API call for Chapter {idx+1} at {time.strftime('%H:%M:%S')}") | |
| # Thread-safe payload directory | |
| thread_name = threading.current_thread().name | |
| thread_id = threading.current_thread().ident | |
| thread_dir = os.path.join("Payloads", "glossary", f"{thread_name}_{thread_id}") | |
| os.makedirs(thread_dir, exist_ok=True) | |
| try: | |
| # Save request payload before API call | |
| payload_file = os.path.join(thread_dir, f"chapter_{idx+1}_request.json") | |
| with open(payload_file, 'w', encoding='utf-8') as f: | |
| json.dump({ | |
| 'chapter': idx + 1, | |
| 'messages': msgs, | |
| 'temperature': temp, | |
| 'max_tokens': mtoks, | |
| 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') | |
| }, f, indent=2, ensure_ascii=False) | |
| # Use send_with_interrupt for API call | |
| raw = send_with_interrupt( | |
| messages=msgs, | |
| client=client, | |
| temperature=temp, | |
| max_tokens=mtoks, | |
| stop_check_fn=stop_check_fn, | |
| chunk_timeout=chunk_timeout | |
| ) | |
| # Handle the response - it might be a tuple or a string | |
| if raw is None: | |
| print(f"⚠️ API returned None for chapter {idx+1}") | |
| return { | |
| 'idx': idx, | |
| 'data': [], | |
| 'resp': "", | |
| 'chap': chap, | |
| 'error': "API returned None" | |
| } | |
| if isinstance(raw, tuple): | |
| resp = raw[0] if raw[0] is not None else "" | |
| elif isinstance(raw, str): | |
| resp = raw | |
| elif hasattr(raw, 'content'): | |
| resp = raw.content if raw.content is not None else "" | |
| elif hasattr(raw, 'text'): | |
| resp = raw.text if raw.text is not None else "" | |
| else: | |
| resp = str(raw) if raw is not None else "" | |
| # Ensure resp is never None | |
| if resp is None: | |
| resp = "" | |
| # Save the raw response in thread-safe location | |
| response_file = os.path.join(thread_dir, f"chapter_{idx+1}_response.txt") | |
| with open(response_file, "w", encoding="utf-8", errors="replace") as f: | |
| f.write(resp) | |
| # Parse response using the new parser | |
| data = parse_api_response(resp) | |
| # More detailed debug logging | |
| print(f"[BATCH] Chapter {idx+1} - Raw response length: {len(resp)} chars") | |
| print(f"[BATCH] Chapter {idx+1} - Parsed {len(data)} entries before validation") | |
| # Filter out invalid entries | |
| valid_data = [] | |
| for entry in data: | |
| if validate_extracted_entry(entry): | |
| # Clean the raw_name | |
| if 'raw_name' in entry: | |
| entry['raw_name'] = entry['raw_name'].strip() | |
| valid_data.append(entry) | |
| else: | |
| print(f"[BATCH] Chapter {idx+1} - Invalid entry: {entry}") | |
| elapsed = time.time() - start_time | |
| print(f"[BATCH] Completed Chapter {idx+1} in {elapsed:.1f}s at {time.strftime('%H:%M:%S')} - Extracted {len(valid_data)} valid entries") | |
| return { | |
| 'idx': idx, | |
| 'data': valid_data, | |
| 'resp': resp, | |
| 'chap': chap, # Include the chapter text in the result | |
| 'error': None | |
| } | |
| except UnifiedClientError as e: | |
| print(f"[Error] API call interrupted/failed for chapter {idx+1}: {e}") | |
| return { | |
| 'idx': idx, | |
| 'data': [], | |
| 'resp': "", | |
| 'chap': chap, # Include chapter even on error | |
| 'error': str(e) | |
| } | |
| except Exception as e: | |
| print(f"[Error] Unexpected error for chapter {idx+1}: {e}") | |
| import traceback | |
| print(f"[Error] Traceback: {traceback.format_exc()}") | |
| return { | |
| 'idx': idx, | |
| 'data': [], | |
| 'resp': "", | |
| 'chap': chap, # Include chapter even on error | |
| 'error': str(e) | |
| } | |
| # Update main function to support batch processing: | |
| def main(log_callback=None, stop_callback=None): | |
| """Modified main function that can accept a logging callback and stop callback""" | |
| if log_callback: | |
| set_output_redirect(log_callback) | |
| # Set up stop checking | |
| def check_stop(): | |
| if stop_callback and stop_callback(): | |
| print("❌ Glossary extraction stopped by user request.") | |
| return True | |
| return is_stop_requested() | |
| start = time.time() | |
| # Handle both command line and GUI calls | |
| if '--epub' in sys.argv: | |
| # Command line mode | |
| parser = argparse.ArgumentParser(description='Extract glossary from EPUB/TXT') | |
| parser.add_argument('--epub', required=True, help='Path to EPUB/TXT file') | |
| parser.add_argument('--output', required=True, help='Output glossary path') | |
| parser.add_argument('--config', help='Config file path') | |
| args = parser.parse_args() | |
| epub_path = args.epub | |
| else: | |
| # GUI mode - get from environment | |
| epub_path = os.getenv("EPUB_PATH", "") | |
| if not epub_path and len(sys.argv) > 1: | |
| epub_path = sys.argv[1] | |
| # Create args object for GUI mode | |
| import types | |
| args = types.SimpleNamespace() | |
| args.epub = epub_path | |
| args.output = os.getenv("OUTPUT_PATH", "glossary.json") | |
| args.config = os.getenv("CONFIG_PATH", "config.json") | |
| is_text_file = epub_path.lower().endswith('.txt') | |
| if is_text_file: | |
| # Import text processor | |
| from extract_glossary_from_txt import extract_chapters_from_txt | |
| chapters = extract_chapters_from_txt(epub_path) | |
| file_base = os.path.splitext(os.path.basename(epub_path))[0] | |
| else: | |
| # Existing EPUB code | |
| chapters = extract_chapters_from_epub(epub_path) | |
| epub_base = os.path.splitext(os.path.basename(epub_path))[0] | |
| file_base = epub_base | |
| # If user didn't override --output, derive it from the EPUB filename: | |
| if args.output == 'glossary.json': | |
| args.output = f"{file_base}_glossary.json" | |
| # ensure we have a Glossary subfolder next to the JSON/MD outputs | |
| glossary_dir = os.path.join(os.path.dirname(args.output), "Glossary") | |
| os.makedirs(glossary_dir, exist_ok=True) | |
| # override the module‐level PROGRESS_FILE to include epub name | |
| global PROGRESS_FILE | |
| PROGRESS_FILE = os.path.join( | |
| glossary_dir, | |
| f"{file_base}_glossary_progress.json" | |
| ) | |
| config = load_config(args.config) | |
| # Get API key from environment variables (set by GUI) or config file | |
| api_key = (os.getenv("API_KEY") or | |
| os.getenv("OPENAI_API_KEY") or | |
| os.getenv("OPENAI_OR_Gemini_API_KEY") or | |
| os.getenv("GEMINI_API_KEY") or | |
| config.get('api_key')) | |
| # Get model from environment or config | |
| model = os.getenv("MODEL") or config.get('model', 'gemini-1.5-flash') | |
| # Define output directory (use current directory as default) | |
| out = os.path.dirname(args.output) if hasattr(args, 'output') else os.getcwd() | |
| # Use the variables we just retrieved | |
| client = create_client_with_multi_key_support(api_key, model, out, config) | |
| # Check for batch mode | |
| batch_enabled = os.getenv("BATCH_TRANSLATION", "0") == "1" | |
| batch_size = int(os.getenv("BATCH_SIZE", "5")) | |
| conservative_batching = os.getenv("CONSERVATIVE_BATCHING", "0") == "1" | |
| print(f"[DEBUG] BATCH_TRANSLATION = {os.getenv('BATCH_TRANSLATION')} (enabled: {batch_enabled})") | |
| print(f"[DEBUG] BATCH_SIZE = {batch_size}") | |
| print(f"[DEBUG] CONSERVATIVE_BATCHING = {os.getenv('CONSERVATIVE_BATCHING')} (enabled: {conservative_batching})") | |
| if batch_enabled: | |
| print(f"🚀 Glossary batch mode enabled with size: {batch_size}") | |
| print(f"📑 Note: Glossary extraction uses direct batching (not affected by conservative batching setting)") | |
| #API call delay | |
| api_delay = float(os.getenv("SEND_INTERVAL_SECONDS", "2")) | |
| print(f"⏱️ API call delay: {api_delay} seconds") | |
| # Get compression factor from environment | |
| compression_factor = float(os.getenv("COMPRESSION_FACTOR", "1.0")) | |
| print(f"📐 Compression Factor: {compression_factor}") | |
| # Initialize chapter splitter with compression factor | |
| chapter_splitter = ChapterSplitter(model_name=model, compression_factor=compression_factor) | |
| # Get temperature from environment or config | |
| temp = float(os.getenv("GLOSSARY_TEMPERATURE") or config.get('temperature', 0.1)) | |
| env_max_output = os.getenv("MAX_OUTPUT_TOKENS") | |
| if env_max_output and env_max_output.isdigit(): | |
| mtoks = int(env_max_output) | |
| print(f"[DEBUG] Output Token Limit: {mtoks} (from GUI)") | |
| else: | |
| mtoks = config.get('max_tokens', 4196) | |
| print(f"[DEBUG] Output Token Limit: {mtoks} (from config)") | |
| # Get context limit from environment or config | |
| ctx_limit = int(os.getenv("GLOSSARY_CONTEXT_LIMIT") or config.get('context_limit_chapters', 3)) | |
| # Parse chapter range from environment | |
| chapter_range = os.getenv("CHAPTER_RANGE", "").strip() | |
| range_start = None | |
| range_end = None | |
| if chapter_range and re.match(r"^\d+\s*-\s*\d+$", chapter_range): | |
| range_start, range_end = map(int, chapter_range.split("-", 1)) | |
| print(f"📊 Chapter Range Filter: {range_start} to {range_end}") | |
| elif chapter_range: | |
| print(f"⚠️ Invalid chapter range format: {chapter_range} (use format: 5-10)") | |
| # Log settings | |
| format_parts = ["type", "raw_name", "translated_name", "gender"] | |
| custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') | |
| try: | |
| custom_fields = json.loads(custom_fields_json) | |
| if custom_fields: | |
| format_parts.extend(custom_fields) | |
| except: | |
| pass | |
| print(f"📑 Glossary Format: Simple ({', '.join(format_parts)})") | |
| # Check honorifics filter toggle | |
| honorifics_disabled = os.getenv('GLOSSARY_DISABLE_HONORIFICS_FILTER', '0') == '1' | |
| if honorifics_disabled: | |
| print("📑 Honorifics Filtering: ❌ DISABLED") | |
| else: | |
| print("📑 Honorifics Filtering: ✅ ENABLED") | |
| # Log custom fields | |
| custom_fields_json = os.getenv('GLOSSARY_CUSTOM_FIELDS', '[]') | |
| try: | |
| custom_fields = json.loads(custom_fields_json) | |
| if custom_fields: | |
| print(f"📑 Custom Fields: {', '.join(custom_fields)}") | |
| except: | |
| pass | |
| # Check if custom prompt is being used | |
| if os.getenv('GLOSSARY_SYSTEM_PROMPT'): | |
| print("📑 Using custom extraction prompt") | |
| else: | |
| print("📑 Using default extraction prompt") | |
| if is_text_file: | |
| from extract_glossary_from_txt import extract_chapters_from_txt | |
| chapters = extract_chapters_from_txt(args.epub) | |
| else: | |
| chapters = extract_chapters_from_epub(args.epub) | |
| if not chapters: | |
| print("No chapters found. Exiting.") | |
| return | |
| # Check for stop before starting processing | |
| if check_stop(): | |
| return | |
| prog = load_progress() | |
| completed = prog['completed'] | |
| glossary = prog['glossary'] | |
| history = prog['context_history'] | |
| total_chapters = len(chapters) | |
| # Get both settings | |
| contextual_enabled = os.getenv('CONTEXTUAL', '1') == '1' | |
| rolling_window = os.getenv('GLOSSARY_HISTORY_ROLLING', '0') == '1' | |
| # Count chapters that will be processed with range filter | |
| chapters_to_process = [] | |
| for idx, chap in enumerate(chapters): | |
| # Skip if chapter is outside the range | |
| if range_start is not None and range_end is not None: | |
| chapter_num = idx + 1 # 1-based chapter numbering | |
| if not (range_start <= chapter_num <= range_end): | |
| continue | |
| if idx not in completed: | |
| chapters_to_process.append((idx, chap)) | |
| if len(chapters_to_process) < total_chapters: | |
| print(f"📊 Processing {len(chapters_to_process)} out of {total_chapters} chapters") | |
| # Get chunk timeout from environment | |
| chunk_timeout = int(os.getenv("CHUNK_TIMEOUT", "900")) # 15 minutes default | |
| # Process chapters based on mode | |
| if batch_enabled and len(chapters_to_process) > 0: | |
| # BATCH MODE: Process in batches with per-entry saving | |
| total_batches = (len(chapters_to_process) + batch_size - 1) // batch_size | |
| for batch_num in range(total_batches): | |
| # Check for stop at the beginning of each batch | |
| if check_stop(): | |
| print(f"❌ Glossary extraction stopped at batch {batch_num+1}") | |
| # Apply deduplication before stopping | |
| if glossary: | |
| print("🔀 Applying deduplication and sorting before exit...") | |
| glossary[:] = skip_duplicate_entries(glossary) | |
| # Sort glossary | |
| custom_types = get_custom_entry_types() | |
| type_order = {'character': 0, 'term': 1} | |
| other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) | |
| for i, t in enumerate(other_types): | |
| type_order[t] = i + 2 | |
| glossary.sort(key=lambda x: ( | |
| type_order.get(x.get('type', 'term'), 999), | |
| x.get('raw_name', '').lower() | |
| )) | |
| save_progress(completed, glossary, history) | |
| save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| print(f"✅ Saved {len(glossary)} deduplicated entries before exit") | |
| return | |
| # Get current batch | |
| batch_start = batch_num * batch_size | |
| batch_end = min(batch_start + batch_size, len(chapters_to_process)) | |
| current_batch = chapters_to_process[batch_start:batch_end] | |
| print(f"\n🔄 Processing Batch {batch_num+1}/{total_batches} (Chapters: {[idx+1 for idx, _ in current_batch]})") | |
| print(f"[BATCH] Submitting {len(current_batch)} chapters for parallel processing...") | |
| batch_start_time = time.time() | |
| # Process batch in parallel BUT handle results as they complete | |
| temp = float(os.getenv("GLOSSARY_TEMPERATURE") or config.get('temperature', 0.1)) | |
| env_max_output = os.getenv("MAX_OUTPUT_TOKENS") | |
| if env_max_output and env_max_output.isdigit(): | |
| mtoks = int(env_max_output) | |
| else: | |
| mtoks = config.get('max_tokens', 4196) | |
| batch_entry_count = 0 | |
| with ThreadPoolExecutor(max_workers=len(current_batch)) as executor: | |
| futures = {} | |
| # Submit all chapters in the batch | |
| for idx, chap in current_batch: | |
| if check_stop(): | |
| # Apply deduplication before breaking | |
| if glossary: | |
| print("🔀 Applying deduplication before stopping...") | |
| glossary[:] = skip_duplicate_entries(glossary) | |
| save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| break | |
| # Get system and user prompts | |
| system_prompt, user_prompt = build_prompt(chap) | |
| # Build messages | |
| if not contextual_enabled: | |
| msgs = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| else: | |
| msgs = [{"role": "system", "content": system_prompt}] \ | |
| + trim_context_history(history, ctx_limit, rolling_window) \ | |
| + [{"role": "user", "content": user_prompt}] | |
| # Submit to thread pool | |
| future = executor.submit( | |
| process_single_chapter_api_call, | |
| idx, chap, msgs, client, temp, mtoks, check_stop, chunk_timeout | |
| ) | |
| futures[future] = (idx, chap) | |
| # Small yield to keep GUI responsive when submitting many tasks | |
| if idx % 5 == 0: | |
| time.sleep(0.001) | |
| # Small yield to keep GUI responsive when submitting many tasks | |
| if idx % 5 == 0: | |
| time.sleep(0.001) | |
| # Process results AS THEY COMPLETE, not all at once | |
| for future in as_completed(futures): | |
| if check_stop(): | |
| print("🛑 Stop detected - cancelling all pending operations...") | |
| cancelled = cancel_all_futures(list(futures.keys())) | |
| if cancelled > 0: | |
| print(f"✅ Cancelled {cancelled} pending API calls") | |
| # Apply deduplication before stopping | |
| if glossary: | |
| print("🔀 Applying deduplication and sorting before exit...") | |
| glossary[:] = skip_duplicate_entries(glossary) | |
| # Sort glossary | |
| custom_types = get_custom_entry_types() | |
| type_order = {'character': 0, 'term': 1} | |
| other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) | |
| for i, t in enumerate(other_types): | |
| type_order[t] = i + 2 | |
| glossary.sort(key=lambda x: ( | |
| type_order.get(x.get('type', 'term'), 999), | |
| x.get('raw_name', '').lower() | |
| )) | |
| save_progress(completed, glossary, history) | |
| save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| print(f"✅ Saved {len(glossary)} deduplicated entries before exit") | |
| executor.shutdown(wait=False) | |
| break | |
| idx, chap = futures[future] | |
| try: | |
| result = future.result(timeout=0.5) | |
| # Process this chapter's results immediately | |
| data = result.get('data', []) | |
| resp = result.get('resp', '') | |
| error = result.get('error') | |
| if error: | |
| print(f"[Chapter {idx+1}] Error: {error}") | |
| completed.append(idx) | |
| continue | |
| # Process and save entries IMMEDIATELY as each chapter completes | |
| if data and len(data) > 0: | |
| total_ent = len(data) | |
| batch_entry_count += total_ent | |
| for eidx, entry in enumerate(data, start=1): | |
| elapsed = time.time() - start | |
| # Get entry info | |
| entry_type = entry.get("type", "?") | |
| raw_name = entry.get("raw_name", "?") | |
| trans_name = entry.get("translated_name", "?") | |
| print(f'[Chapter {idx+1}/{total_chapters}] [{eidx}/{total_ent}] ({elapsed:.1f}s elapsed) → {entry_type}: {raw_name} ({trans_name})') | |
| # Add entry immediately WITHOUT deduplication | |
| glossary.append(entry) | |
| # Save immediately after EACH entry | |
| save_progress(completed, glossary, history) | |
| save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| completed.append(idx) | |
| # Add to history if contextual is enabled | |
| if contextual_enabled and resp and chap: | |
| system_prompt, user_prompt = build_prompt(chap) | |
| history.append({"user": user_prompt, "assistant": resp}) | |
| except Exception as e: | |
| if "stopped by user" in str(e).lower(): | |
| print(f"✅ Chapter {idx+1} stopped by user") | |
| else: | |
| print(f"Error processing chapter {idx+1}: {e}") | |
| completed.append(idx) | |
| batch_elapsed = time.time() - batch_start_time | |
| print(f"[BATCH] Batch {batch_num+1} completed in {batch_elapsed:.1f}s total") | |
| # After batch completes, apply deduplication and sorting | |
| if batch_entry_count > 0: | |
| print(f"\n🔀 Applying deduplication and sorting after batch {batch_num+1}/{total_batches}") | |
| original_size = len(glossary) | |
| # Apply deduplication to entire glossary | |
| glossary[:] = skip_duplicate_entries(glossary) | |
| # Sort glossary by type and name | |
| custom_types = get_custom_entry_types() | |
| type_order = {'character': 0, 'term': 1} | |
| other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) | |
| for i, t in enumerate(other_types): | |
| type_order[t] = i + 2 | |
| glossary.sort(key=lambda x: ( | |
| type_order.get(x.get('type', 'term'), 999), | |
| x.get('raw_name', '').lower() | |
| )) | |
| deduplicated_size = len(glossary) | |
| removed = original_size - deduplicated_size | |
| if removed > 0: | |
| print(f"✅ Removed {removed} duplicates (fuzzy threshold: {os.getenv('GLOSSARY_FUZZY_THRESHOLD', '0.90')})") | |
| print(f"📊 Glossary size: {deduplicated_size} unique entries") | |
| # Save final deduplicated and sorted glossary | |
| save_progress(completed, glossary, history) | |
| save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| # Print batch summary | |
| if batch_entry_count > 0: | |
| print(f"\n📊 Batch {batch_num+1}/{total_batches} Summary:") | |
| print(f" • Chapters processed: {len(current_batch)}") | |
| print(f" • Total entries extracted: {batch_entry_count}") | |
| print(f" • Glossary size: {len(glossary)} unique entries") | |
| # Handle context history | |
| if contextual_enabled: | |
| if not rolling_window and len(history) >= ctx_limit and ctx_limit > 0: | |
| print(f"🔄 Resetting glossary context (reached {ctx_limit} chapter limit)") | |
| history = [] | |
| prog['context_history'] = [] | |
| # Add delay between batches (but not after the last batch) | |
| if batch_num < total_batches - 1: | |
| print(f"\n⏱️ Waiting {api_delay}s before next batch...") | |
| if not interruptible_sleep(api_delay, check_stop, 0.1): | |
| print(f"❌ Glossary extraction stopped during delay") | |
| # Apply deduplication before stopping | |
| if glossary: | |
| print("🔀 Applying deduplication and sorting before exit...") | |
| glossary[:] = skip_duplicate_entries(glossary) | |
| # Sort glossary | |
| custom_types = get_custom_entry_types() | |
| type_order = {'character': 0, 'term': 1} | |
| other_types = sorted([t for t in custom_types.keys() if t not in ['character', 'term']]) | |
| for i, t in enumerate(other_types): | |
| type_order[t] = i + 2 | |
| glossary.sort(key=lambda x: ( | |
| type_order.get(x.get('type', 'term'), 999), | |
| x.get('raw_name', '').lower() | |
| )) | |
| save_progress(completed, glossary, history) | |
| save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| print(f"✅ Saved {len(glossary)} deduplicated entries before exit") | |
| return | |
| else: | |
| # SEQUENTIAL MODE: Original behavior | |
| for idx, chap in enumerate(chapters): | |
| # Check for stop at the beginning of each chapter | |
| if check_stop(): | |
| print(f"❌ Glossary extraction stopped at chapter {idx+1}") | |
| return | |
| # Apply chapter range filter | |
| if range_start is not None and range_end is not None: | |
| chapter_num = idx + 1 # 1-based chapter numbering | |
| if not (range_start <= chapter_num <= range_end): | |
| # Check if this is from a text file | |
| is_text_chapter = hasattr(chap, 'filename') and chap.get('filename', '').endswith('.txt') | |
| terminology = "Section" if is_text_chapter else "Chapter" | |
| print(f"[SKIP] {terminology} {chapter_num} - outside range filter") | |
| continue | |
| if idx in completed: | |
| # Check if processing text file chapters | |
| is_text_chapter = hasattr(chap, 'filename') and chap.get('filename', '').endswith('.txt') | |
| terminology = "section" if is_text_chapter else "chapter" | |
| print(f"Skipping {terminology} {idx+1} (already processed)") | |
| continue | |
| print(f"🔄 Processing Chapter {idx+1}/{total_chapters}") | |
| # Check if history will reset on this chapter | |
| if contextual_enabled and len(history) >= ctx_limit and ctx_limit > 0 and not rolling_window: | |
| print(f" 📌 Glossary context will reset after this chapter (current: {len(history)}/{ctx_limit} chapters)") | |
| try: | |
| # Get system and user prompts from build_prompt | |
| system_prompt, user_prompt = build_prompt(chap) | |
| if not contextual_enabled: | |
| # No context at all | |
| msgs = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| else: | |
| # Use context with trim_context_history handling the mode | |
| msgs = [{"role": "system", "content": system_prompt}] \ | |
| + trim_context_history(history, ctx_limit, rolling_window) \ | |
| + [{"role": "user", "content": user_prompt}] | |
| total_tokens = sum(count_tokens(m["content"]) for m in msgs) | |
| # READ THE TOKEN LIMIT | |
| env_value = os.getenv("MAX_INPUT_TOKENS", "1000000").strip() | |
| if not env_value or env_value == "": | |
| token_limit = None | |
| limit_str = "unlimited" | |
| elif env_value.isdigit() and int(env_value) > 0: | |
| token_limit = int(env_value) | |
| limit_str = str(token_limit) | |
| else: | |
| token_limit = 1000000 | |
| limit_str = "1000000 (default)" | |
| print(f"[DEBUG] Glossary prompt tokens = {total_tokens} / {limit_str}") | |
| # Check if we're over the token limit and need to split | |
| if token_limit is not None and total_tokens > token_limit: | |
| print(f"⚠️ Chapter {idx+1} exceeds token limit: {total_tokens} > {token_limit}") | |
| print(f"📄 Using ChapterSplitter to split into smaller chunks...") | |
| # Calculate available tokens for content | |
| system_tokens = chapter_splitter.count_tokens(system_prompt) | |
| context_tokens = sum(chapter_splitter.count_tokens(m["content"]) for m in trim_context_history(history, ctx_limit, rolling_window)) | |
| safety_margin = 1000 | |
| available_tokens = token_limit - system_tokens - context_tokens - safety_margin | |
| # Since glossary extraction works with plain text, wrap it in a simple HTML structure | |
| chapter_html = f"<html><body><p>{chap.replace(chr(10)+chr(10), '</p><p>')}</p></body></html>" | |
| # Use ChapterSplitter to split the chapter | |
| chunks = chapter_splitter.split_chapter(chapter_html, available_tokens) | |
| print(f"📄 Chapter split into {len(chunks)} chunks") | |
| # Process each chunk | |
| chapter_glossary_data = [] # Collect data from all chunks | |
| for chunk_html, chunk_idx, total_chunks in chunks: | |
| if check_stop(): | |
| print(f"❌ Glossary extraction stopped during chunk {chunk_idx} of chapter {idx+1}") | |
| return | |
| print(f"🔄 Processing chunk {chunk_idx}/{total_chunks} of Chapter {idx+1}") | |
| # Extract text from the chunk HTML | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(chunk_html, 'html.parser') | |
| chunk_text = soup.get_text(strip=True) | |
| # Get system and user prompts for chunk | |
| chunk_system_prompt, chunk_user_prompt = build_prompt(chunk_text) | |
| # Build chunk messages | |
| if not contextual_enabled: | |
| chunk_msgs = [ | |
| {"role": "system", "content": chunk_system_prompt}, | |
| {"role": "user", "content": chunk_user_prompt} | |
| ] | |
| else: | |
| chunk_msgs = [{"role": "system", "content": chunk_system_prompt}] \ | |
| + trim_context_history(history, ctx_limit, rolling_window) \ | |
| + [{"role": "user", "content": chunk_user_prompt}] | |
| # API call for chunk | |
| try: | |
| chunk_raw = send_with_interrupt( | |
| messages=chunk_msgs, | |
| client=client, | |
| temperature=temp, | |
| max_tokens=mtoks, | |
| stop_check_fn=check_stop, | |
| chunk_timeout=chunk_timeout | |
| ) | |
| except UnifiedClientError as e: | |
| if "stopped by user" in str(e).lower(): | |
| print(f"❌ Glossary extraction stopped during chunk {chunk_idx} API call") | |
| return | |
| elif "timeout" in str(e).lower(): | |
| print(f"⚠️ Chunk {chunk_idx} API call timed out: {e}") | |
| continue # Skip this chunk | |
| else: | |
| print(f"❌ Chunk {chunk_idx} API error: {e}") | |
| continue # Skip this chunk | |
| except Exception as e: | |
| print(f"❌ Unexpected error in chunk {chunk_idx}: {e}") | |
| continue # Skip this chunk | |
| # Process chunk response | |
| if chunk_raw is None: | |
| print(f"❌ API returned None for chunk {chunk_idx}") | |
| continue | |
| # Handle different response types | |
| if isinstance(chunk_raw, tuple): | |
| chunk_resp = chunk_raw[0] if chunk_raw[0] is not None else "" | |
| elif isinstance(chunk_raw, str): | |
| chunk_resp = chunk_raw | |
| elif hasattr(chunk_raw, 'content'): | |
| chunk_resp = chunk_raw.content if chunk_raw.content is not None else "" | |
| elif hasattr(chunk_raw, 'text'): | |
| chunk_resp = chunk_raw.text if chunk_raw.text is not None else "" | |
| else: | |
| print(f"❌ Unexpected response type for chunk {chunk_idx}: {type(chunk_raw)}") | |
| chunk_resp = str(chunk_raw) if chunk_raw is not None else "" | |
| # Ensure resp is a string | |
| if not isinstance(chunk_resp, str): | |
| print(f"⚠️ Converting non-string response to string for chunk {chunk_idx}") | |
| chunk_resp = str(chunk_resp) if chunk_resp is not None else "" | |
| # Check if response is empty | |
| if not chunk_resp or chunk_resp.strip() == "": | |
| print(f"⚠️ Empty response for chunk {chunk_idx}, skipping...") | |
| continue | |
| # Save chunk response with thread-safe location | |
| thread_name = threading.current_thread().name | |
| thread_id = threading.current_thread().ident | |
| thread_dir = os.path.join("Payloads", "glossary", f"{thread_name}_{thread_id}") | |
| os.makedirs(thread_dir, exist_ok=True) | |
| with open(os.path.join(thread_dir, f"chunk_response_chap{idx+1}_chunk{chunk_idx}.txt"), "w", encoding="utf-8", errors="replace") as f: | |
| f.write(chunk_resp) | |
| # Extract data from chunk | |
| chunk_resp_data = parse_api_response(chunk_resp) | |
| if not chunk_resp_data: | |
| print(f"[Warning] No data found in chunk {chunk_idx}, skipping...") | |
| continue | |
| # The parse_api_response already returns parsed data, no need to parse again | |
| try: | |
| # Filter out invalid entries directly from chunk_resp_data | |
| valid_chunk_data = [] | |
| for entry in chunk_resp_data: | |
| if validate_extracted_entry(entry): | |
| # Clean the raw_name | |
| if 'raw_name' in entry: | |
| entry['raw_name'] = entry['raw_name'].strip() | |
| valid_chunk_data.append(entry) | |
| else: | |
| print(f"[Debug] Skipped invalid entry in chunk {chunk_idx}: {entry}") | |
| chapter_glossary_data.extend(valid_chunk_data) | |
| print(f"✅ Chunk {chunk_idx}/{total_chunks}: extracted {len(valid_chunk_data)} entries") | |
| # Add chunk to history if contextual | |
| if contextual_enabled: | |
| history.append({"user": chunk_user_prompt, "assistant": chunk_resp}) | |
| except Exception as e: | |
| print(f"[Warning] Error processing chunk {chunk_idx} data: {e}") | |
| continue | |
| # Add delay between chunks (but not after last chunk) | |
| if chunk_idx < total_chunks: | |
| print(f"⏱️ Waiting {api_delay}s before next chunk...") | |
| if not interruptible_sleep(api_delay, check_stop, 0.1): | |
| print(f"❌ Glossary extraction stopped during chunk delay") | |
| return | |
| # Use the collected data from all chunks | |
| data = chapter_glossary_data | |
| resp = "" # Combined response not needed for progress tracking | |
| print(f"✅ Chapter {idx+1} processed in {len(chunks)} chunks, total entries: {len(data)}") | |
| else: | |
| # Original single-chapter processing | |
| # Check for stop before API call | |
| if check_stop(): | |
| print(f"❌ Glossary extraction stopped before API call for chapter {idx+1}") | |
| return | |
| try: | |
| # Use send_with_interrupt for API call | |
| raw = send_with_interrupt( | |
| messages=msgs, | |
| client=client, | |
| temperature=temp, | |
| max_tokens=mtoks, | |
| stop_check_fn=check_stop, | |
| chunk_timeout=chunk_timeout | |
| ) | |
| except UnifiedClientError as e: | |
| if "stopped by user" in str(e).lower(): | |
| print(f"❌ Glossary extraction stopped during API call for chapter {idx+1}") | |
| return | |
| elif "timeout" in str(e).lower(): | |
| print(f"⚠️ API call timed out for chapter {idx+1}: {e}") | |
| continue | |
| else: | |
| print(f"❌ API error for chapter {idx+1}: {e}") | |
| continue | |
| except Exception as e: | |
| print(f"❌ Unexpected error for chapter {idx+1}: {e}") | |
| continue | |
| # Handle response | |
| if raw is None: | |
| print(f"❌ API returned None for chapter {idx+1}") | |
| continue | |
| # Handle different response types | |
| if isinstance(raw, tuple): | |
| resp = raw[0] if raw[0] is not None else "" | |
| elif isinstance(raw, str): | |
| resp = raw | |
| elif hasattr(raw, 'content'): | |
| resp = raw.content if raw.content is not None else "" | |
| elif hasattr(raw, 'text'): | |
| resp = raw.text if raw.text is not None else "" | |
| else: | |
| print(f"❌ Unexpected response type for chapter {idx+1}: {type(raw)}") | |
| resp = str(raw) if raw is not None else "" | |
| # Ensure resp is a string | |
| if not isinstance(resp, str): | |
| print(f"⚠️ Converting non-string response to string for chapter {idx+1}") | |
| resp = str(resp) if resp is not None else "" | |
| # NULL CHECK before checking if response is empty | |
| if resp is None: | |
| print(f"⚠️ Response is None for chapter {idx+1}, skipping...") | |
| continue | |
| # Check if response is empty | |
| if not resp or resp.strip() == "": | |
| print(f"⚠️ Empty response for chapter {idx+1}, skipping...") | |
| continue | |
| # Save the raw response with thread-safe location | |
| thread_name = threading.current_thread().name | |
| thread_id = threading.current_thread().ident | |
| thread_dir = os.path.join("Payloads", "glossary", f"{thread_name}_{thread_id}") | |
| os.makedirs(thread_dir, exist_ok=True) | |
| with open(os.path.join(thread_dir, f"response_chap{idx+1}.txt"), "w", encoding="utf-8", errors="replace") as f: | |
| f.write(resp) | |
| # Parse response using the new parser | |
| try: | |
| data = parse_api_response(resp) | |
| except Exception as e: | |
| print(f"❌ Error parsing response for chapter {idx+1}: {e}") | |
| print(f" Response preview: {resp[:200] if resp else 'None'}...") | |
| continue | |
| # Filter out invalid entries | |
| valid_data = [] | |
| for entry in data: | |
| if validate_extracted_entry(entry): | |
| # Clean the raw_name | |
| if 'raw_name' in entry: | |
| entry['raw_name'] = entry['raw_name'].strip() | |
| valid_data.append(entry) | |
| else: | |
| print(f"[Debug] Skipped invalid entry: {entry}") | |
| data = valid_data | |
| total_ent = len(data) | |
| # Log entries | |
| for eidx, entry in enumerate(data, start=1): | |
| if check_stop(): | |
| print(f"❌ Glossary extraction stopped during entry processing for chapter {idx+1}") | |
| return | |
| elapsed = time.time() - start | |
| if idx == 0 and eidx == 1: | |
| eta = 0 | |
| else: | |
| avg = elapsed / ((idx * 100) + eidx) | |
| eta = avg * (total_chapters * 100 - ((idx * 100) + eidx)) | |
| # Get entry info based on new format | |
| entry_type = entry.get("type", "?") | |
| raw_name = entry.get("raw_name", "?") | |
| trans_name = entry.get("translated_name", "?") | |
| print(f'[Chapter {idx+1}/{total_chapters}] [{eidx}/{total_ent}] ({elapsed:.1f}s elapsed, ETA {eta:.1f}s) → {entry_type}: {raw_name} ({trans_name})') | |
| # Apply skip logic and save | |
| glossary.extend(data) | |
| glossary[:] = skip_duplicate_entries(glossary) | |
| completed.append(idx) | |
| # Only add to history if contextual is enabled | |
| if contextual_enabled and 'resp' in locals() and resp: | |
| history.append({"user": user_prompt, "assistant": resp}) | |
| # Reset history when limit reached without rolling window | |
| if not rolling_window and len(history) >= ctx_limit and ctx_limit > 0: | |
| print(f"🔄 Resetting glossary context (reached {ctx_limit} chapter limit)") | |
| history = [] | |
| prog['context_history'] = [] | |
| save_progress(completed, glossary, history) | |
| save_glossary_json(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| # Add delay before next API call (but not after the last chapter) | |
| if idx < len(chapters) - 1: | |
| # Check if we're within the range or if there are more chapters to process | |
| next_chapter_in_range = True | |
| if range_start is not None and range_end is not None: | |
| next_chapter_num = idx + 2 # idx+1 is current, idx+2 is next | |
| next_chapter_in_range = (range_start <= next_chapter_num <= range_end) | |
| else: | |
| # No range filter, check if next chapter is already completed | |
| next_chapter_in_range = (idx + 1) not in completed | |
| if next_chapter_in_range: | |
| print(f"⏱️ Waiting {api_delay}s before next chapter...") | |
| if not interruptible_sleep(api_delay, check_stop, 0.1): | |
| print(f"❌ Glossary extraction stopped during delay") | |
| return | |
| # Check for stop after processing chapter | |
| if check_stop(): | |
| print(f"❌ Glossary extraction stopped after processing chapter {idx+1}") | |
| return | |
| except Exception as e: | |
| print(f"Error at chapter {idx+1}: {e}") | |
| import traceback | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| # Check for stop even after error | |
| if check_stop(): | |
| print(f"❌ Glossary extraction stopped after error in chapter {idx+1}") | |
| return | |
| print(f"Done. Glossary saved to {args.output}") | |
| # Also save as CSV format for compatibility | |
| try: | |
| csv_output = args.output.replace('.json', '.csv') | |
| csv_path = os.path.join(glossary_dir, os.path.basename(csv_output)) | |
| save_glossary_csv(glossary, os.path.join(glossary_dir, os.path.basename(args.output))) | |
| print(f"Also saved as CSV: {csv_path}") | |
| except Exception as e: | |
| print(f"[Warning] Could not save CSV format: {e}") | |
| def save_progress(completed: List[int], glossary: List[Dict], context_history: List[Dict]): | |
| """Save progress to JSON file""" | |
| progress_data = { | |
| "completed": completed, | |
| "glossary": glossary, | |
| "context_history": context_history | |
| } | |
| try: | |
| # Use atomic write to prevent corruption | |
| temp_file = PROGRESS_FILE + '.tmp' | |
| with open(temp_file, 'w', encoding='utf-8') as f: | |
| json.dump(progress_data, f, ensure_ascii=False, indent=2) | |
| # Replace the old file with the new one | |
| if os.path.exists(PROGRESS_FILE): | |
| os.remove(PROGRESS_FILE) | |
| os.rename(temp_file, PROGRESS_FILE) | |
| except Exception as e: | |
| print(f"[Warning] Failed to save progress: {e}") | |
| # Try direct write as fallback | |
| try: | |
| with open(PROGRESS_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(progress_data, f, ensure_ascii=False, indent=2) | |
| except Exception as e2: | |
| print(f"[Error] Could not save progress: {e2}") | |
| if __name__=='__main__': | |
| main() |