Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| EPUB Converter - Compiles translated HTML files into EPUB format | |
| Supports extraction of translated titles from chapter content | |
| """ | |
| import os | |
| import sys | |
| import io | |
| import json | |
| import mimetypes | |
| import re | |
| import zipfile | |
| import unicodedata | |
| import html as html_module | |
| from xml.etree import ElementTree as ET | |
| from typing import Dict, List, Tuple, Optional, Callable | |
| from ebooklib import epub, ITEM_DOCUMENT | |
| from bs4 import BeautifulSoup | |
| from metadata_batch_translator import enhance_epub_compiler | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| try: | |
| from unified_api_client import UnifiedClient | |
| except ImportError: | |
| UnifiedClient = None | |
| # Configure stdout for UTF-8 | |
| def configure_utf8_output(): | |
| """Configure stdout for UTF-8 encoding""" | |
| try: | |
| if hasattr(sys.stdout, 'reconfigure'): | |
| sys.stdout.reconfigure(encoding='utf-8', errors='ignore') | |
| except AttributeError: | |
| if sys.stdout is None: | |
| devnull = open(os.devnull, "wb") | |
| sys.stdout = io.TextIOWrapper(devnull, encoding='utf-8', errors='ignore') | |
| elif hasattr(sys.stdout, 'buffer'): | |
| try: | |
| sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='ignore') | |
| except: | |
| pass | |
| # Global configuration | |
| configure_utf8_output() | |
| _global_log_callback = None | |
| def set_global_log_callback(callback: Optional[Callable]): | |
| """Set the global log callback for module-level functions""" | |
| global _global_log_callback | |
| _global_log_callback = callback | |
| def log(message: str): | |
| """Module-level logging that works with or without callback""" | |
| if _global_log_callback: | |
| _global_log_callback(message) | |
| else: | |
| print(message) | |
| class HTMLEntityDecoder: | |
| """Handles comprehensive HTML entity decoding with full Unicode support""" | |
| # Comprehensive entity replacement dictionary | |
| ENTITY_MAP = { | |
| # Quotation marks and apostrophes | |
| '"': '"', '"': '"', | |
| ''': "'", ''': "'", | |
| '‘': '\u2018', '’': '\u2019', | |
| '“': '\u201c', '”': '\u201d', | |
| '‚': '‚', '„': '„', | |
| '‹': '‹', '›': '›', | |
| '«': '«', '»': '»', | |
| # Spaces and dashes | |
| ' ': ' ', ' ': ' ', | |
| ' ': ' ', ' ': ' ', | |
| ' ': ' ', '‌': '\u200c', | |
| '‍': '\u200d', '‎': '\u200e', | |
| '‏': '\u200f', | |
| '–': '–', '—': '—', | |
| '−': '−', '‐': '‐', | |
| # Common symbols | |
| '…': '…', '…': '…', | |
| '•': '•', '•': '•', | |
| '·': '·', '·': '·', | |
| '§': '§', '¶': '¶', | |
| '†': '†', '‡': '‡', | |
| '◊': '◊', '♦': '♦', | |
| '♣': '♣', '♥': '♥', | |
| '♠': '♠', | |
| # Currency symbols | |
| '¢': '¢', '£': '£', | |
| '¥': '¥', '€': '€', | |
| '¤': '¤', | |
| # Mathematical symbols | |
| '±': '±', '×': '×', | |
| '÷': '÷', '⁄': '⁄', | |
| '‰': '‰', '‱': '‱', | |
| '′': '\u2032', '″': '\u2033', | |
| '∞': '∞', '∅': '∅', | |
| '∇': '∇', '&partial;': '∂', | |
| '∑': '∑', '∏': '∏', | |
| '∫': '∫', '√': '√', | |
| '≈': '≈', '≠': '≠', | |
| '≡': '≡', '≤': '≤', | |
| '≥': '≥', '⊂': '⊂', | |
| '⊃': '⊃', '⊄': '⊄', | |
| '⊆': '⊆', '⊇': '⊇', | |
| # Intellectual property | |
| '©': '©', '©': '©', | |
| '®': '®', '®': '®', | |
| '™': '™', '™': '™', | |
| } | |
| # Common encoding fixes | |
| ENCODING_FIXES = { | |
| # UTF-8 decoded as Latin-1 | |
| '’': "'", 'â€Å"': '"', '�': '"', | |
| '–': '–', 'â€â€': '—', | |
| ' ': ' ', 'ÂÂ': '', | |
| 'â': 'â', 'é': 'é', 'è': 'è', | |
| 'ä': 'ä', 'ö': 'ö', 'ü': 'ü', | |
| 'ñ': 'ñ', 'ç': 'ç', | |
| # Common mojibake patterns | |
| '’': "'", '“': '"', 'â€': '"', | |
| 'â€"': '—', 'â€"': '–', | |
| '…': '…', '•': '•', | |
| 'â„¢': '™', '©': '©', '®': '®', | |
| # Windows-1252 interpreted as UTF-8 | |
| '‘': '\u2018', '’': '\u2019', | |
| '“': '\u201c', 'â€': '\u201d', | |
| '•': '•', 'â€"': '–', 'â€"': '—', | |
| } | |
| def decode(cls, text: str) -> str: | |
| """Comprehensive HTML entity decoding - PRESERVES UNICODE""" | |
| if text is None: | |
| return "" | |
| if not isinstance(text, str): | |
| text = str(text) | |
| if not text: | |
| return text | |
| # Fix common encoding issues first | |
| for bad, good in cls.ENCODING_FIXES.items(): | |
| text = text.replace(bad, good) | |
| # Multiple passes to handle nested/double-encoded entities | |
| max_passes = 3 | |
| for _ in range(max_passes): | |
| prev_text = text | |
| # Use html module for standard decoding (this handles <, >, etc.) | |
| text = html_module.unescape(text) | |
| if text == prev_text: | |
| break | |
| # Apply any remaining entity replacements | |
| for entity, char in cls.ENTITY_MAP.items(): | |
| text = text.replace(entity, char) | |
| return text | |
| def _decode_decimal(match): | |
| """Decode decimal HTML entity""" | |
| try: | |
| code = int(match.group(1)) | |
| if XMLValidator.is_valid_char_code(code): | |
| return chr(code) | |
| except: | |
| pass | |
| return match.group(0) | |
| def _decode_hex(match): | |
| """Decode hexadecimal HTML entity""" | |
| try: | |
| code = int(match.group(1), 16) | |
| if XMLValidator.is_valid_char_code(code): | |
| return chr(code) | |
| except: | |
| pass | |
| return match.group(0) | |
| class XMLValidator: | |
| """Handles XML validation and character checking""" | |
| def is_valid_char_code(codepoint: int) -> bool: | |
| """Check if a codepoint is valid for XML""" | |
| return ( | |
| codepoint == 0x9 or | |
| codepoint == 0xA or | |
| codepoint == 0xD or | |
| (0x20 <= codepoint <= 0xD7FF) or | |
| (0xE000 <= codepoint <= 0xFFFD) or | |
| (0x10000 <= codepoint <= 0x10FFFF) | |
| ) | |
| def is_valid_char(c: str) -> bool: | |
| """Check if a character is valid for XML""" | |
| return XMLValidator.is_valid_char_code(ord(c)) | |
| def clean_for_xml(text: str) -> str: | |
| """Remove invalid XML characters""" | |
| return ''.join(c for c in text if XMLValidator.is_valid_char(c)) | |
| class ContentProcessor: | |
| """Handles content cleaning and processing - UPDATED WITH UNICODE PRESERVATION""" | |
| def safe_escape(text: str) -> str: | |
| """Escape XML special characters for use in XHTML titles/attributes""" | |
| if text is None: | |
| return "" | |
| if not isinstance(text, str): | |
| try: | |
| text = str(text) | |
| except Exception: | |
| return "" | |
| # Use html.escape to handle &, <, > and quotes; then escape single quotes | |
| escaped = html_module.escape(text, quote=True) | |
| escaped = escaped.replace("'", "'") | |
| return escaped | |
| class TitleExtractor: | |
| """Handles extraction of titles from HTML content - UPDATED WITH UNICODE PRESERVATION""" | |
| def extract_from_html(html_content: str, chapter_num: Optional[int] = None, | |
| filename: Optional[str] = None) -> Tuple[str, float]: | |
| """Extract title from HTML content with confidence score - KEEP ALL HEADERS INCLUDING NUMBERS""" | |
| try: | |
| # Decode entities first - PRESERVES UNICODE | |
| html_content = HTMLEntityDecoder.decode(html_content) | |
| soup = BeautifulSoup(html_content, 'lxml', from_encoding='utf-8') | |
| candidates = [] | |
| # Strategy 1: <title> tag (highest confidence) | |
| title_tag = soup.find('title') | |
| if title_tag and title_tag.string: | |
| title_text = HTMLEntityDecoder.decode(title_tag.string.strip()) | |
| if title_text and len(title_text) > 0 and title_text.lower() not in ['untitled', 'chapter', 'document']: | |
| candidates.append((title_text, 0.95, "title_tag")) | |
| # Strategy 2: h1 tags (very high confidence) | |
| h1_tags = soup.find_all('h1') | |
| for i, h1 in enumerate(h1_tags[:3]): # Check first 3 h1 tags | |
| text = HTMLEntityDecoder.decode(h1.get_text(strip=True)) | |
| if text and len(text) < 300: | |
| # First h1 gets highest confidence | |
| confidence = 0.9 if i == 0 else 0.85 | |
| candidates.append((text, confidence, f"h1_tag_{i+1}")) | |
| # Strategy 3: h2 tags (high confidence) | |
| h2_tags = soup.find_all('h2') | |
| for i, h2 in enumerate(h2_tags[:3]): # Check first 3 h2 tags | |
| text = HTMLEntityDecoder.decode(h2.get_text(strip=True)) | |
| if text and len(text) < 250: | |
| # First h2 gets highest confidence among h2s | |
| confidence = 0.8 if i == 0 else 0.75 | |
| candidates.append((text, confidence, f"h2_tag_{i+1}")) | |
| # Strategy 4: h3 tags (moderate confidence) | |
| h3_tags = soup.find_all('h3') | |
| for i, h3 in enumerate(h3_tags[:3]): # Check first 3 h3 tags | |
| text = HTMLEntityDecoder.decode(h3.get_text(strip=True)) | |
| if text and len(text) < 200: | |
| confidence = 0.7 if i == 0 else 0.65 | |
| candidates.append((text, confidence, f"h3_tag_{i+1}")) | |
| # Strategy 5: Bold text in first elements (lower confidence) | |
| first_elements = soup.find_all(['p', 'div'])[:5] | |
| for elem in first_elements: | |
| for bold in elem.find_all(['b', 'strong'])[:2]: # Limit to first 2 bold items | |
| bold_text = HTMLEntityDecoder.decode(bold.get_text(strip=True)) | |
| if bold_text and 2 <= len(bold_text) <= 150: | |
| candidates.append((bold_text, 0.6, "bold_text")) | |
| # Strategy 6: Center-aligned text (common for chapter titles) | |
| center_elements = soup.find_all(['center', 'div', 'p'], | |
| attrs={'align': 'center'}) or \ | |
| soup.find_all(['div', 'p'], | |
| style=lambda x: x and 'text-align' in x and 'center' in x) | |
| for center in center_elements[:3]: # Check first 3 centered elements | |
| text = HTMLEntityDecoder.decode(center.get_text(strip=True)) | |
| if text and 2 <= len(text) <= 200: | |
| candidates.append((text, 0.65, "centered_text")) | |
| # Strategy 7: All-caps text (common for titles in older books) | |
| for elem in soup.find_all(['h1', 'h2', 'h3', 'p', 'div'])[:10]: | |
| text = elem.get_text(strip=True) | |
| # Check if text is mostly uppercase | |
| if text and len(text) > 2 and text.isupper(): | |
| decoded_text = HTMLEntityDecoder.decode(text) | |
| # Keep it as-is (don't convert to title case automatically) | |
| candidates.append((decoded_text, 0.55, "all_caps_text")) | |
| # Strategy 8: Patterns in first paragraph | |
| first_p = soup.find('p') | |
| if first_p: | |
| p_text = HTMLEntityDecoder.decode(first_p.get_text(strip=True)) | |
| # Look for "Chapter X: Title" patterns | |
| chapter_pattern = re.match( | |
| r'^(Chapter\s+[\dIVXLCDM]+\s*[:\-\u2013\u2014]\s*)(.{2,100})(?:\.|$)', | |
| p_text, re.IGNORECASE | |
| ) | |
| if chapter_pattern: | |
| # Extract just the title part after "Chapter X:" | |
| title_part = chapter_pattern.group(2).strip() | |
| if title_part: | |
| candidates.append((title_part, 0.8, "paragraph_pattern_title")) | |
| # Also add the full "Chapter X: Title" as a lower confidence option | |
| full_title = chapter_pattern.group(0).strip().rstrip('.') | |
| candidates.append((full_title, 0.75, "paragraph_pattern_full")) | |
| elif len(p_text) <= 100 and len(p_text) > 2: | |
| # Short first paragraph might be the title | |
| candidates.append((p_text, 0.4, "paragraph_standalone")) | |
| # Strategy 9: Filename | |
| if filename: | |
| filename_match = re.search(r'response_\d+_(.+?)\.html', filename) | |
| if filename_match: | |
| filename_title = filename_match.group(1).replace('_', ' ').title() | |
| if len(filename_title) > 2: | |
| candidates.append((filename_title, 0.3, "filename")) | |
| # Filter and rank candidates | |
| if candidates: | |
| unique_candidates = {} | |
| for title, confidence, source in candidates: | |
| # Clean the title but keep roman numerals and short titles | |
| title = TitleExtractor.clean_title(title) | |
| # Don't reject short titles (like "III", "IX") - they're valid! | |
| if title and len(title) > 0: | |
| # Don't apply is_valid_title check too strictly | |
| # Roman numerals and chapter numbers are valid titles | |
| if title not in unique_candidates or unique_candidates[title][1] < confidence: | |
| unique_candidates[title] = (title, confidence, source) | |
| if unique_candidates: | |
| sorted_candidates = sorted(unique_candidates.values(), key=lambda x: x[1], reverse=True) | |
| best_title, best_confidence, best_source = sorted_candidates[0] | |
| # Log what we found for debugging | |
| log(f"[DEBUG] Best title candidate: '{best_title}' (confidence: {best_confidence:.2f}, source: {best_source})") | |
| return best_title, best_confidence | |
| # Fallback - only use generic chapter number if we really found nothing | |
| if chapter_num: | |
| return f"Chapter {chapter_num}", 0.1 | |
| return "Untitled Chapter", 0.0 | |
| except Exception as e: | |
| log(f"[WARNING] Error extracting title: {e}") | |
| if chapter_num: | |
| return f"Chapter {chapter_num}", 0.1 | |
| return "Untitled Chapter", 0.0 | |
| def clean_title(title: str) -> str: | |
| """Clean and normalize extracted title - PRESERVE SHORT TITLES LIKE ROMAN NUMERALS""" | |
| if not title: | |
| return "" | |
| # Remove any [tag] patterns first | |
| #title = re.sub(r'\[(title|skill|ability|spell|detect|status|class|level|stat|buff|debuff|item|quest)[^\]]*?\]', '', title) | |
| # Decode entities - PRESERVES UNICODE | |
| title = HTMLEntityDecoder.decode(title) | |
| # Remove HTML tags | |
| title = re.sub(r'<[^>]+>', '', title) | |
| # Normalize spaces | |
| title = re.sub(r'[\xa0\u2000-\u200a\u202f\u205f\u3000]+', ' ', title) | |
| title = re.sub(r'\s+', ' ', title).strip() | |
| # Remove leading/trailing punctuation EXCEPT for roman numeral dots | |
| # Don't strip trailing dots from roman numerals like "III." or "IX." | |
| if not re.match(r'^[IVXLCDM]+\.?$', title, re.IGNORECASE): | |
| title = re.sub(r'^[][(){}\s\-\u2013\u2014:;,.|/\\]+', '', title).strip() | |
| title = re.sub(r'[][(){}\s\-\u2013\u2014:;,.|/\\]+$', '', title).strip() | |
| # Remove quotes if they wrap the entire title | |
| quote_pairs = [ | |
| ('"', '"'), ("'", "'"), | |
| ('\u201c', '\u201d'), ('\u2018', '\u2019'), # Smart quotes | |
| ('«', '»'), ('‹', '›'), # Guillemets | |
| ] | |
| for open_q, close_q in quote_pairs: | |
| if title.startswith(open_q) and title.endswith(close_q): | |
| title = title[len(open_q):-len(close_q)].strip() | |
| break | |
| # Normalize Unicode - PRESERVES READABILITY | |
| title = unicodedata.normalize('NFC', title) | |
| # Remove zero-width characters | |
| title = re.sub(r'[\u200b\u200c\u200d\u200e\u200f\ufeff]', '', title) | |
| # Final cleanup | |
| title = ' '.join(title.split()) | |
| # Truncate if too long | |
| if len(title) > 150: | |
| truncated = title[:147] | |
| last_space = truncated.rfind(' ') | |
| if last_space > 100: | |
| truncated = truncated[:last_space] | |
| title = truncated + "..." | |
| return title | |
| def is_valid_title(title: str) -> bool: | |
| """Check if extracted title is valid - ACCEPT SHORT TITLES LIKE ROMAN NUMERALS""" | |
| if not title: | |
| return False | |
| # Accept any non-empty title after cleaning | |
| # Don't reject roman numerals or short titles | |
| # Only reject truly invalid patterns | |
| invalid_patterns = [ | |
| r'^untitled$', # Just "untitled" | |
| r'^chapter$', # Just "chapter" without a number | |
| r'^document$', # Just "document" | |
| ] | |
| for pattern in invalid_patterns: | |
| if re.match(pattern, title.lower().strip()): | |
| return False | |
| # Skip obvious filler phrases | |
| filler_phrases = [ | |
| 'click here', 'read more', 'continue reading', 'next chapter', | |
| 'previous chapter', 'table of contents', 'back to top' | |
| ] | |
| title_lower = title.lower().strip() | |
| if any(phrase in title_lower for phrase in filler_phrases): | |
| return False | |
| # Accept everything else, including roman numerals and short titles | |
| return True | |
| class XHTMLConverter: | |
| """Handles XHTML conversion and compliance""" | |
| def ensure_compliance(html_content: str, title: str = "Chapter", | |
| css_links: Optional[List[str]] = None) -> str: | |
| """Ensure HTML content is XHTML-compliant while PRESERVING story tags""" | |
| try: | |
| import html | |
| import re | |
| # Add debug at the very start | |
| log(f"[DEBUG] Processing chapter: {title}") | |
| log(f"[DEBUG] Input HTML length: {len(html_content)}") | |
| # Unescape HTML entities but PRESERVE < and > so fake angle brackets in narrative | |
| # text don't become real tags (which breaks parsing across paragraphs like the sample). | |
| if any(ent in html_content for ent in ['&', '"', '&#', '<', '>']): | |
| log(f"[DEBUG] Unescaping HTML entities (preserving < and >)") | |
| # Temporarily protect < and > (both cases) from unescaping | |
| placeholder_lt = '\ue000' | |
| placeholder_gt = '\ue001' | |
| html_content = html_content.replace('<', placeholder_lt).replace('<', placeholder_lt) | |
| html_content = html_content.replace('>', placeholder_gt).replace('>', placeholder_gt) | |
| # Unescape remaining entities | |
| html_content = html.unescape(html_content) | |
| # Restore protected angle bracket entities | |
| html_content = html_content.replace(placeholder_lt, '<').replace(placeholder_gt, '>') | |
| # Strip out ANY existing DOCTYPE, XML declaration, or html wrapper | |
| # We only want the body content | |
| log(f"[DEBUG] Extracting body content") | |
| # Try to extract just body content | |
| body_match = re.search(r'<body[^>]*>(.*?)</body>', html_content, re.DOTALL | re.IGNORECASE) | |
| if body_match: | |
| html_content = body_match.group(1) | |
| log(f"[DEBUG] Extracted body content") | |
| else: | |
| # No body tags, strip any DOCTYPE/html tags if present | |
| html_content = re.sub(r'<\?xml[^>]*\?>', '', html_content) | |
| html_content = re.sub(r'<!DOCTYPE[^>]*>', '', html_content) | |
| html_content = re.sub(r'</?html[^>]*>', '', html_content) | |
| html_content = re.sub(r'<head[^>]*>.*?</head>', '', html_content, flags=re.DOTALL) | |
| log(f"[DEBUG] Stripped wrapper tags") | |
| # Now process the content normally | |
| # Fix broken attributes with ="" pattern | |
| def fix_broken_attributes_only(match): | |
| tag_content = match.group(0) | |
| if '=""' in tag_content and tag_content.count('=""') > 2: | |
| tag_match = re.match(r'<(\w+)', tag_content) | |
| if tag_match: | |
| tag_name = tag_match.group(1) | |
| words = re.findall(r'(\w+)=""', tag_content) | |
| if words: | |
| content = ' '.join(words) | |
| return f'<{tag_name}>{content}</{tag_name}>' | |
| return '' | |
| return tag_content | |
| html_content = re.sub(r'<[^>]*?=""[^>]*?>', fix_broken_attributes_only, html_content) | |
| # Sanitize attributes that contain a colon (:) but are NOT valid namespaces. | |
| # Example: <status effects:="" high="" temperature="" unconscious=""></status> | |
| # becomes: <status data-effects="" high="" temperature="" unconscious=""></status> | |
| def _sanitize_colon_attributes_in_tags(text: str) -> str: | |
| # Process only inside start tags; skip closing tags, comments, doctypes, processing instructions | |
| def _process_tag(tag_match): | |
| tag = tag_match.group(0) | |
| if tag.startswith('</') or tag.startswith('<!') or tag.startswith('<?'): | |
| return tag | |
| def _attr_repl(m): | |
| before, name, eqval = m.group(1), m.group(2), m.group(3) | |
| lname = name.lower() | |
| # Preserve known namespace attributes | |
| if ( | |
| lname.startswith('xml:') or lname.startswith('xlink:') or lname.startswith('epub:') or | |
| lname == 'xmlns' or lname.startswith('xmlns:') | |
| ): | |
| return m.group(0) | |
| if ':' not in name: | |
| return m.group(0) | |
| # Replace colon(s) with dashes and prefix with data- | |
| safe = re.sub(r'[:]+', '-', name).strip('-') | |
| safe = re.sub(r'[^A-Za-z0-9_.-]', '-', safe) or 'attr' | |
| if not safe.startswith('data-'): | |
| safe = 'data-' + safe | |
| return f'{before}{safe}{eqval}' | |
| # Replace attributes with colon in the name (handles both single and double quoted values) | |
| tag = re.sub(r'(\s)([A-Za-z_:][A-Za-z0-9_.:-]*:[A-Za-z0-9_.:-]*)(\s*=\s*(?:"[^"]*"|\'[^\']*\'))', _attr_repl, tag) | |
| return tag | |
| return re.sub(r'<[^>]+>', _process_tag, text) | |
| html_content = _sanitize_colon_attributes_in_tags(html_content) | |
| # Convert only "story tags" whose TAG NAME contains a colon (e.g., <System:Message>), | |
| # but DO NOT touch valid HTML/SVG tags where colons appear in attributes (e.g., style="color:red" or xlink:href) | |
| # and DO NOT touch namespaced tags like <svg:rect>. | |
| allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"} | |
| def _escape_story_tag(match): | |
| full_tag = match.group(0) # Entire <...> or </...> | |
| tag_name = match.group(1) # The tag name possibly containing ':' | |
| prefix = tag_name.split(':', 1)[0].lower() | |
| # If this is a known namespace prefix (e.g., svg:rect), leave it alone | |
| if prefix in allowed_ns_prefixes: | |
| return full_tag | |
| # Otherwise, treat as a story/fake tag and replace angle brackets with Chinese brackets | |
| return full_tag.replace('<', '《').replace('>', '》') | |
| # Escape invalid story tags (tag names containing ':') so they render literally with angle brackets. | |
| allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"} | |
| def _escape_story_tag_entities(m): | |
| tagname = m.group(1) | |
| prefix = tagname.split(':', 1)[0].lower() | |
| if prefix in allowed_ns_prefixes: | |
| return m.group(0) | |
| tag_text = m.group(0) | |
| return tag_text.replace('<', '<').replace('>', '>') | |
| # Apply in order: self-closing, opening, closing | |
| html_content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)/>', _escape_story_tag_entities, html_content) | |
| html_content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)>', _escape_story_tag_entities, html_content) | |
| html_content = re.sub(r'</([A-Za-z][\w.-]*:[\w.-]*)\s*>', _escape_story_tag_entities, html_content) | |
| # Parse with lxml | |
| from lxml import html as lxml_html, etree | |
| parser = lxml_html.HTMLParser(recover=True) | |
| doc = lxml_html.document_fromstring(f"<div>{html_content}</div>", parser=parser) | |
| # Get the content back | |
| body_xhtml = etree.tostring(doc, method='xml', encoding='unicode') | |
| # Remove the wrapper div we added | |
| body_xhtml = re.sub(r'^<div[^>]*>|</div>$', '', body_xhtml) | |
| # Optionally replace angle-bracket entities with Chinese brackets | |
| # Default behavior: keep them as entities (< >) so the output preserves the original text | |
| bracket_style = os.getenv('ANGLE_BRACKET_OUTPUT', 'entity').lower() | |
| if '<' in body_xhtml or '>' in body_xhtml: | |
| if bracket_style in ('cjk', 'chinese', 'cjk_brackets'): | |
| body_xhtml = body_xhtml.replace('<', '《').replace('>', '》') | |
| # else: keep as entities | |
| # Build our own clean XHTML document | |
| return XHTMLConverter._build_xhtml(title, body_xhtml, css_links) | |
| except Exception as e: | |
| log(f"[WARNING] Failed to ensure XHTML compliance: {e}") | |
| import traceback | |
| log(f"[DEBUG] Full traceback:\n{traceback.format_exc()}") | |
| log(f"[DEBUG] Failed chapter title: {title}") | |
| log(f"[DEBUG] First 500 chars of input: {html_content[:500] if html_content else 'EMPTY'}") | |
| return XHTMLConverter._build_fallback_xhtml(title) | |
| def _build_xhtml(title: str, body_content: str, css_links: Optional[List[str]] = None) -> str: | |
| """Build XHTML document""" | |
| if not body_content.strip(): | |
| body_content = '<p>Empty chapter</p>' | |
| title = ContentProcessor.safe_escape(title) | |
| body_content = XHTMLConverter._ensure_xml_safe_readable(body_content) | |
| xml_declaration = '<?xml version="1.0" encoding="utf-8"?>' | |
| doctype = '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">' | |
| xhtml_parts = [ | |
| xml_declaration, | |
| doctype, | |
| '<html xmlns="http://www.w3.org/1999/xhtml" xmlns:epub="http://www.idpf.org/2007/ops">', | |
| '<head>', | |
| '<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />', | |
| f'<title>{title}</title>' | |
| ] | |
| if css_links: | |
| for css_link in css_links: | |
| if css_link.startswith('<link'): | |
| href_match = re.search(r'href="([^"]+)"', css_link) | |
| if href_match: | |
| css_link = href_match.group(1) | |
| else: | |
| continue | |
| xhtml_parts.append(f'<link rel="stylesheet" type="text/css" href="{ContentProcessor.safe_escape(css_link)}" />') | |
| xhtml_parts.extend([ | |
| '</head>', | |
| '<body>', | |
| body_content, | |
| '</body>', | |
| '</html>' | |
| ]) | |
| return '\n'.join(xhtml_parts) | |
| def _ensure_xml_safe_readable(content: str) -> str: | |
| """Ensure content is XML-safe""" | |
| content = re.sub( | |
| r'&(?!(?:' | |
| r'[a-zA-Z][a-zA-Z0-9]{0,30};|' | |
| r'#[0-9]{1,7};|' | |
| r'#x[0-9a-fA-F]{1,6};' | |
| r'))', | |
| '&', | |
| content | |
| ) | |
| return content | |
| def _build_fallback_xhtml(title: str) -> str: | |
| """Build minimal fallback XHTML""" | |
| safe_title = re.sub(r'[<>&"\']+', '', str(title)) | |
| if not safe_title: | |
| safe_title = "Chapter" | |
| return f'''<?xml version="1.0" encoding="utf-8"?> | |
| <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> | |
| <html xmlns="http://www.w3.org/1999/xhtml"> | |
| <head> | |
| <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> | |
| <title>{ContentProcessor.safe_escape(safe_title)}</title> | |
| </head> | |
| <body> | |
| <p>Error processing content. Please check the source file.</p> | |
| </body> | |
| </html>''' | |
| def validate(content: str) -> str: | |
| """Validate and fix XHTML content - WITH DEBUGGING""" | |
| import re | |
| # Ensure XML declaration | |
| if not content.strip().startswith('<?xml'): | |
| content = '<?xml version="1.0" encoding="utf-8"?>\n' + content | |
| # Remove control characters | |
| content = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', content) | |
| # Fix unescaped ampersands | |
| content = re.sub( | |
| r'&(?!(?:' | |
| r'amp|lt|gt|quot|apos|' | |
| r'[a-zA-Z][a-zA-Z0-9]{1,31}|' | |
| r'#[0-9]{1,7}|' | |
| r'#x[0-9a-fA-F]{1,6}' | |
| r');)', | |
| '&', | |
| content | |
| ) | |
| # Fix unquoted attributes | |
| try: | |
| content = re.sub(r'<([^>]+)\s+(\w+)=([^\s"\'>]+)([>\s])', r'<\1 \2="\3"\4', content) | |
| except re.error: | |
| pass # Skip if regex fails | |
| # Sanitize invalid colon-containing attribute names (preserve XML/xlink/epub/xmlns) | |
| def _sanitize_colon_attrs_in_content(text: str) -> str: | |
| def _process_tag(m): | |
| tag = m.group(0) | |
| if tag.startswith('</') or tag.startswith('<!') or tag.startswith('<?'): | |
| return tag | |
| def _attr_repl(am): | |
| before, name, eqval = am.group(1), am.group(2), am.group(3) | |
| lname = name.lower() | |
| if ( | |
| lname.startswith('xml:') or lname.startswith('xlink:') or lname.startswith('epub:') or | |
| lname == 'xmlns' or lname.startswith('xmlns:') | |
| ): | |
| return am.group(0) | |
| if ':' not in name: | |
| return am.group(0) | |
| safe = re.sub(r'[:]+', '-', name).strip('-') | |
| safe = re.sub(r'[^A-Za-z0-9_.-]', '-', safe) or 'attr' | |
| if not safe.startswith('data-'): | |
| safe = 'data-' + safe | |
| return f'{before}{safe}{eqval}' | |
| return re.sub(r'(\s)([A-Za-z_:][A-Za-z0-9_.:-]*:[A-Za-z0-9_.:-]*)(\s*=\s*(?:"[^"]*"|\'[^\']*\'))', _attr_repl, tag) | |
| return re.sub(r'<[^>]+>', _process_tag, text) | |
| content = _sanitize_colon_attrs_in_content(content) | |
| # Escape invalid story tags so they render literally with angle brackets in output | |
| allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"} | |
| def _escape_story_tag_entities(m): | |
| tagname = m.group(1) | |
| prefix = tagname.split(':', 1)[0].lower() | |
| if prefix in allowed_ns_prefixes: | |
| return m.group(0) | |
| tag_text = m.group(0) | |
| return tag_text.replace('<', '<').replace('>', '>') | |
| # Apply in order: self-closing, opening, closing | |
| content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)/>', _escape_story_tag_entities, content) | |
| content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)>', _escape_story_tag_entities, content) | |
| content = re.sub(r'</([A-Za-z][\w.-]*:[\w.-]*)\s*>', _escape_story_tag_entities, content) | |
| # Clean for XML | |
| content = XMLValidator.clean_for_xml(content) | |
| # Try to parse for validation | |
| try: | |
| ET.fromstring(content.encode('utf-8')) | |
| except ET.ParseError as e: | |
| log(f"[WARNING] XHTML validation failed: {e}") | |
| # DEBUG: Show what's at the error location | |
| import re | |
| match = re.search(r'line (\d+), column (\d+)', str(e)) | |
| if match: | |
| line_num = int(match.group(1)) | |
| col_num = int(match.group(2)) | |
| lines = content.split('\n') | |
| log(f"[DEBUG] Error at line {line_num}, column {col_num}") | |
| log(f"[DEBUG] Total lines in content: {len(lines)}") | |
| if line_num <= len(lines): | |
| problem_line = lines[line_num - 1] | |
| log(f"[DEBUG] Full problem line: {problem_line!r}") | |
| # Show the problem area | |
| if col_num <= len(problem_line): | |
| # Show 40 characters before and after | |
| start = max(0, col_num - 40) | |
| end = min(len(problem_line), col_num + 40) | |
| log(f"[DEBUG] Context around error: {problem_line[start:end]!r}") | |
| log(f"[DEBUG] Character at column {col_num}: {problem_line[col_num-1]!r} (U+{ord(problem_line[col_num-1]):04X})") | |
| # Show 5 characters before and after with hex | |
| for i in range(max(0, col_num-5), min(len(problem_line), col_num+5)): | |
| char = problem_line[i] | |
| marker = " <-- ERROR" if i == col_num-1 else "" | |
| log(f"[DEBUG] Col {i+1}: {char!r} (U+{ord(char):04X}){marker}") | |
| else: | |
| log(f"[DEBUG] Column {col_num} is beyond line length {len(problem_line)}") | |
| else: | |
| log(f"[DEBUG] Line {line_num} doesn't exist (only {len(lines)} lines)") | |
| # Show last few lines | |
| for i in range(max(0, len(lines)-3), len(lines)): | |
| log(f"[DEBUG] Line {i+1}: {lines[i][:100]!r}...") | |
| # Try to recover | |
| content = XHTMLConverter._attempt_recovery(content, e) | |
| return content | |
| def _attempt_recovery(content: str, error: ET.ParseError) -> str: | |
| """Attempt to recover from XML parse errors - ENHANCED""" | |
| try: | |
| # Use BeautifulSoup to fix structure | |
| soup = BeautifulSoup(content, 'lxml') | |
| # Ensure we have proper XHTML structure | |
| if not soup.find('html'): | |
| new_soup = BeautifulSoup('<html xmlns="http://www.w3.org/1999/xhtml"></html>', 'lxml') | |
| html_tag = new_soup.html | |
| for child in list(soup.children): | |
| html_tag.append(child) | |
| soup = new_soup | |
| # Ensure we have head and body | |
| if not soup.find('head'): | |
| head = soup.new_tag('head') | |
| meta = soup.new_tag('meta') | |
| meta['http-equiv'] = 'Content-Type' | |
| meta['content'] = 'text/html; charset=utf-8' | |
| head.append(meta) | |
| title_tag = soup.new_tag('title') | |
| title_tag.string = 'Chapter' | |
| head.append(title_tag) | |
| if soup.html: | |
| soup.html.insert(0, head) | |
| if not soup.find('body'): | |
| body = soup.new_tag('body') | |
| if soup.html: | |
| for child in list(soup.html.children): | |
| if child.name not in ['head', 'body']: | |
| body.append(child.extract()) | |
| soup.html.append(body) | |
| # Convert back to string | |
| recovered = str(soup) | |
| # Ensure proper XML declaration | |
| if not recovered.strip().startswith('<?xml'): | |
| recovered = '<?xml version="1.0" encoding="utf-8"?>\n' + recovered | |
| # Add DOCTYPE if missing | |
| if '<!DOCTYPE' not in recovered: | |
| lines = recovered.split('\n') | |
| lines.insert(1, '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">') | |
| recovered = '\n'.join(lines) | |
| # Final validation | |
| ET.fromstring(recovered.encode('utf-8')) | |
| log(f"[INFO] Successfully recovered XHTML") | |
| return recovered | |
| except Exception as recovery_error: | |
| log(f"[WARNING] Recovery attempt failed: {recovery_error}") | |
| # Last resort: use fallback | |
| return XHTMLConverter._build_fallback_xhtml("Chapter") | |
| class FileUtils: | |
| """File handling utilities""" | |
| def sanitize_filename(filename: str, allow_unicode: bool = False) -> str: | |
| """Sanitize filename for safety""" | |
| if allow_unicode: | |
| filename = unicodedata.normalize('NFC', filename) | |
| replacements = { | |
| '/': '_', '\\': '_', ':': '_', '*': '_', | |
| '?': '_', '"': '_', '<': '_', '>': '_', | |
| '|': '_', '\0': '_', | |
| } | |
| for old, new in replacements.items(): | |
| filename = filename.replace(old, new) | |
| filename = ''.join(char for char in filename if ord(char) >= 32 or ord(char) == 9) | |
| else: | |
| filename = unicodedata.normalize('NFKD', filename) | |
| try: | |
| filename = filename.encode('ascii', 'ignore').decode('ascii') | |
| except: | |
| filename = ''.join(c if ord(c) < 128 else '_' for c in filename) | |
| replacements = { | |
| '/': '_', '\\': '_', ':': '_', '*': '_', | |
| '?': '_', '"': '_', '<': '_', '>': '_', | |
| '|': '_', '\n': '_', '\r': '_', '\t': '_', | |
| '&': '_and_', '#': '_num_', ' ': '_', | |
| } | |
| for old, new in replacements.items(): | |
| filename = filename.replace(old, new) | |
| filename = ''.join(char for char in filename if ord(char) >= 32) | |
| filename = re.sub(r'_+', '_', filename) | |
| filename = filename.strip('_') | |
| # Limit length | |
| name, ext = os.path.splitext(filename) | |
| if len(name) > 100: | |
| name = name[:100] | |
| if not name or name == '_': | |
| name = 'file' | |
| return name + ext | |
| def ensure_bytes(content) -> bytes: | |
| """Ensure content is bytes""" | |
| if content is None: | |
| return b'' | |
| if isinstance(content, bytes): | |
| return content | |
| if not isinstance(content, str): | |
| content = str(content) | |
| return content.encode('utf-8') | |
| class EPUBCompiler: | |
| """Main EPUB compilation class""" | |
| def __init__(self, base_dir: str, log_callback: Optional[Callable] = None): | |
| self.base_dir = os.path.abspath(base_dir) | |
| self.log_callback = log_callback | |
| self.output_dir = self.base_dir | |
| self.images_dir = os.path.join(self.output_dir, "images") | |
| self.css_dir = os.path.join(self.output_dir, "css") | |
| self.fonts_dir = os.path.join(self.output_dir, "fonts") | |
| self.metadata_path = os.path.join(self.output_dir, "metadata.json") | |
| self.attach_css_to_chapters = os.getenv('ATTACH_CSS_TO_CHAPTERS', '0') == '1' # Default to '0' (disabled) | |
| self.max_workers = int(os.environ.get("EXTRACTION_WORKERS", "4")) | |
| self.log(f"[INFO] Using {self.max_workers} workers for parallel processing") | |
| # Track auxiliary (non-chapter) HTML files to include in spine but omit from TOC | |
| self.auxiliary_html_files: set[str] = set() | |
| # SVG rasterization settings | |
| self.rasterize_svg = os.getenv('RASTERIZE_SVG_FALLBACK', '1') == '1' | |
| try: | |
| import cairosvg # noqa: F401 | |
| self._cairosvg_available = True | |
| except Exception: | |
| self._cairosvg_available = False | |
| # Set global log callback | |
| set_global_log_callback(log_callback) | |
| # translation features | |
| self.html_dir = self.output_dir # For compatibility | |
| self.translate_titles = os.getenv('TRANSLATE_BOOK_TITLE', '1') == '1' | |
| # Initialize API client if needed | |
| self.api_client = None | |
| if self.translate_titles or os.getenv('BATCH_TRANSLATE_HEADERS', '0') == '1': | |
| model = os.getenv('MODEL') | |
| api_key = os.getenv('API_KEY') | |
| if model and api_key and UnifiedClient: | |
| self.api_client = UnifiedClient(api_key=api_key, model=model, output_dir=self.output_dir) | |
| elif model and api_key and not UnifiedClient: | |
| self.log("Warning: UnifiedClient module not available, translation features disabled") | |
| # Enhance with translation features | |
| enhance_epub_compiler(self) | |
| def log(self, message: str): | |
| """Log a message""" | |
| if self.log_callback: | |
| self.log_callback(message) | |
| else: | |
| print(message) | |
| def compile(self): | |
| """Main compilation method""" | |
| try: | |
| # Debug: Check what metadata enhancement was done | |
| self.log("[DEBUG] Checking metadata translation setup...") | |
| self.log(f"[DEBUG] Has api_client: {hasattr(self, 'api_client') and self.api_client is not None}") | |
| self.log(f"[DEBUG] Has metadata_translator: {hasattr(self, 'metadata_translator')}") | |
| self.log(f"[DEBUG] Has translate_metadata_fields: {hasattr(self, 'translate_metadata_fields')}") | |
| if hasattr(self, 'translate_metadata_fields'): | |
| self.log(f"[DEBUG] translate_metadata_fields content: {self.translate_metadata_fields}") | |
| enabled_fields = [k for k, v in self.translate_metadata_fields.items() if v] | |
| self.log(f"[DEBUG] Enabled metadata fields: {enabled_fields}") | |
| # Pre-flight check | |
| if not self._preflight_check(): | |
| return | |
| # Analyze chapters FIRST to get the structure | |
| chapter_titles_info = self._analyze_chapters() | |
| # Debug: Check if batch translation is enabled | |
| self.log(f"[DEBUG] Batch translation enabled: {getattr(self, 'batch_translate_headers', False)}") | |
| self.log(f"[DEBUG] Has header translator: {hasattr(self, 'header_translator')}") | |
| self.log(f"[DEBUG] EPUB_PATH env: {os.getenv('EPUB_PATH', 'NOT SET')}") | |
| self.log(f"[DEBUG] HTML dir: {self.html_dir}") | |
| # Extract source headers AND current titles if batch translation is enabled | |
| source_headers = {} | |
| current_titles = {} | |
| if (hasattr(self, 'batch_translate_headers') and self.batch_translate_headers and | |
| hasattr(self, 'header_translator') and self.header_translator): | |
| # Check if the extraction method exists | |
| if hasattr(self, '_extract_source_headers_and_current_titles'): | |
| # Use the new extraction method | |
| source_headers, current_titles = self._extract_source_headers_and_current_titles() | |
| self.log(f"[DEBUG] Extraction complete: {len(source_headers)} source, {len(current_titles)} current") | |
| else: | |
| self.log("⚠️ Missing _extract_source_headers_and_current_titles method!") | |
| # Batch translate headers if we have source headers | |
| translated_headers = {} | |
| if source_headers and hasattr(self, 'header_translator') and self.header_translator: | |
| # Check if translated_headers.txt already exists | |
| translations_file = os.path.join(self.output_dir, "translated_headers.txt") | |
| if os.path.exists(translations_file): | |
| # File exists - skip translation entirely | |
| self.log("📁 Found existing translated_headers.txt - skipping header translation") | |
| # No need to parse or do anything else | |
| else: | |
| # No existing file - proceed with translation | |
| self.log("🌐 Batch translating chapter headers...") | |
| try: | |
| # Check if the translator has been initialized properly | |
| if not hasattr(self.header_translator, 'client') or not self.header_translator.client: | |
| self.log("⚠️ Header translator not properly initialized, skipping batch translation") | |
| else: | |
| self.log(f"📚 Found {len(source_headers)} headers to translate") | |
| self.log(f"📚 Found {len(current_titles)} current titles in HTML files") | |
| # Debug: Show a few examples | |
| for num in list(source_headers.keys())[:3]: | |
| self.log(f" Example - Chapter {num}: {source_headers[num]}") | |
| # Translate headers with current titles info | |
| translated_headers = self.header_translator.translate_and_save_headers( | |
| html_dir=self.html_dir, | |
| headers_dict=source_headers, | |
| batch_size=getattr(self, 'headers_per_batch', 400), | |
| output_dir=self.output_dir, | |
| update_html=getattr(self, 'update_html_headers', True), | |
| save_to_file=getattr(self, 'save_header_translations', True), | |
| current_titles=current_titles # Pass current titles for exact replacement | |
| ) | |
| # Update chapter_titles_info with translations | |
| if translated_headers: | |
| self.log("\n📝 Updating chapter titles in EPUB structure...") | |
| for chapter_num, translated_title in translated_headers.items(): | |
| if chapter_num in chapter_titles_info: | |
| # Keep the original confidence and method, just update the title | |
| orig_title, confidence, method = chapter_titles_info[chapter_num] | |
| chapter_titles_info[chapter_num] = (translated_title, confidence, method) | |
| self.log(f"✓ Chapter {chapter_num}: {source_headers.get(chapter_num, 'Unknown')} → {translated_title}") | |
| else: | |
| # Add new entry if not in chapter_titles_info | |
| chapter_titles_info[chapter_num] = (translated_title, 1.0, 'batch_translation') | |
| self.log(f"✓ Added Chapter {chapter_num}: {translated_title}") | |
| except Exception as e: | |
| self.log(f"⚠️ Batch translation failed: {e}") | |
| import traceback | |
| self.log(traceback.format_exc()) | |
| # Continue with compilation even if translation fails | |
| else: | |
| if not source_headers: | |
| self.log("⚠️ No source headers found, skipping batch translation") | |
| elif not hasattr(self, 'header_translator'): | |
| self.log("⚠️ No header translator available") | |
| # Find HTML files | |
| html_files = self._find_html_files() | |
| if not html_files: | |
| raise Exception("No translated chapters found to compile into EPUB") | |
| # Load metadata | |
| metadata = self._load_metadata() | |
| # Translate metadata if configured | |
| if hasattr(self, 'metadata_translator') and self.metadata_translator: | |
| if hasattr(self, 'translate_metadata_fields') and any(self.translate_metadata_fields.values()): | |
| self.log("🌐 Translating metadata fields...") | |
| try: | |
| translated_metadata = self.metadata_translator.translate_metadata( | |
| metadata, | |
| self.translate_metadata_fields, | |
| mode=getattr(self, 'metadata_translation_mode', 'together') | |
| ) | |
| # Preserve original values | |
| for field in self.translate_metadata_fields: | |
| if field in metadata and field in translated_metadata: | |
| if metadata[field] != translated_metadata[field]: | |
| translated_metadata[f'original_{field}'] = metadata[field] | |
| metadata = translated_metadata | |
| except Exception as e: | |
| self.log(f"⚠️ Metadata translation failed: {e}") | |
| # Continue with original metadata | |
| # Create EPUB book | |
| book = self._create_book(metadata) | |
| # Process all components | |
| spine = [] | |
| toc = [] | |
| # Add CSS | |
| css_items = self._add_css_files(book) | |
| # Add fonts | |
| self._add_fonts(book) | |
| # Process images and cover | |
| processed_images, cover_file = self._process_images() | |
| # Add images to book | |
| self._add_images_to_book(book, processed_images, cover_file) | |
| # Add cover page if exists | |
| if cover_file: | |
| cover_page = self._create_cover_page(book, cover_file, processed_images, css_items, metadata) | |
| if cover_page: | |
| spine.insert(0, cover_page) | |
| # Process chapters with updated titles | |
| chapters_added = self._process_chapters( | |
| book, html_files, chapter_titles_info, | |
| css_items, processed_images, spine, toc, metadata | |
| ) | |
| if chapters_added == 0: | |
| raise Exception("No chapters could be added to the EPUB") | |
| # Add optional gallery (unless disabled) | |
| disable_gallery = os.environ.get('DISABLE_EPUB_GALLERY', '0') == '1' | |
| if disable_gallery: | |
| self.log("📷 Image gallery disabled by user preference") | |
| else: | |
| gallery_images = [img for img in processed_images.values() if img != cover_file] | |
| if gallery_images: | |
| self.log(f"📷 Creating image gallery with {len(gallery_images)} images...") | |
| gallery_page = self._create_gallery_page(book, gallery_images, css_items, metadata) | |
| spine.append(gallery_page) | |
| toc.append(gallery_page) | |
| else: | |
| self.log("📷 No images found for gallery") | |
| # Finalize book | |
| self._finalize_book(book, spine, toc, cover_file) | |
| # Write EPUB | |
| self._write_epub(book, metadata) | |
| # Show summary | |
| self._show_summary(chapter_titles_info, css_items) | |
| except Exception as e: | |
| self.log(f"❌ EPUB compilation failed: {e}") | |
| raise | |
| def _fix_encoding_issues(self, content: str) -> str: | |
| """Convert smart quotes and other Unicode punctuation to ASCII.""" | |
| # Convert smart quotes to regular quotes and other punctuation | |
| fixes = { | |
| '’': "'", # Right single quotation mark | |
| '‘': "'", # Left single quotation mark | |
| '“': '"', # Left double quotation mark | |
| '”': '"', # Right double quotation mark | |
| '—': '-', # Em dash to hyphen | |
| '–': '-', # En dash to hyphen | |
| '…': '...', # Ellipsis to three dots | |
| } | |
| for bad, good in fixes.items(): | |
| if bad in content: | |
| content = content.replace(bad, good) | |
| #self.log(f"[DEBUG] Replaced {bad!r} with {good!r}") | |
| return content | |
| def _preflight_check(self) -> bool: | |
| """Pre-flight check before compilation with progressive fallback""" | |
| # Check if we have standard files | |
| if self._has_standard_files(): | |
| # Use original strict check | |
| return self._preflight_check_strict() | |
| else: | |
| # Use progressive check for non-standard files | |
| result = self._preflight_check_progressive() | |
| return result is not None | |
| def _has_standard_files(self) -> bool: | |
| """Check if directory contains standard response_ files""" | |
| if not os.path.exists(self.base_dir): | |
| return False | |
| html_exts = ('.html', '.xhtml', '.htm') | |
| html_files = [f for f in os.listdir(self.base_dir) if f.lower().endswith(html_exts)] | |
| response_files = [f for f in html_files if f.startswith('response_')] | |
| return len(response_files) > 0 | |
| def _preflight_check_strict(self) -> bool: | |
| """Original strict pre-flight check - for standard files""" | |
| self.log("\n📋 Pre-flight Check") | |
| self.log("=" * 50) | |
| issues = [] | |
| if not os.path.exists(self.base_dir): | |
| issues.append(f"Directory does not exist: {self.base_dir}") | |
| return False | |
| html_files = [f for f in os.listdir(self.base_dir) if f.endswith('.html')] | |
| response_files = [f for f in html_files if f.startswith('response_')] | |
| if not html_files: | |
| issues.append("No HTML files found in directory") | |
| elif not response_files: | |
| issues.append(f"Found {len(html_files)} HTML files but none start with 'response_'") | |
| else: | |
| self.log(f"✅ Found {len(response_files)} chapter files") | |
| if not os.path.exists(self.metadata_path): | |
| self.log("⚠️ No metadata.json found (will use defaults)") | |
| else: | |
| self.log("✅ Found metadata.json") | |
| for subdir in ['css', 'images', 'fonts']: | |
| path = os.path.join(self.base_dir, subdir) | |
| if os.path.exists(path): | |
| count = len(os.listdir(path)) | |
| self.log(f"✅ Found {subdir}/ with {count} files") | |
| if issues: | |
| self.log("\n❌ Pre-flight check FAILED:") | |
| for issue in issues: | |
| self.log(f" • {issue}") | |
| return False | |
| self.log("\n✅ Pre-flight check PASSED") | |
| return True | |
| def _preflight_check_progressive(self) -> dict: | |
| """Progressive pre-flight check for non-standard files""" | |
| self.log("\n📋 Starting Progressive Pre-flight Check") | |
| self.log("=" * 50) | |
| # Critical check - always required | |
| if not os.path.exists(self.base_dir): | |
| self.log(f"❌ CRITICAL: Directory does not exist: {self.base_dir}") | |
| return None | |
| # Phase 1: Try strict mode (response_ files) - already checked in caller | |
| # Phase 2: Try relaxed mode (any HTML files) | |
| self.log("\n[Phase 2] Checking for any HTML files...") | |
| html_exts = ('.html', '.xhtml', '.htm') | |
| html_files = [f for f in os.listdir(self.base_dir) if f.lower().endswith(html_exts)] | |
| if html_files: | |
| self.log(f"✅ Found {len(html_files)} HTML files:") | |
| # Show first 5 files as examples | |
| for i, f in enumerate(html_files[:5]): | |
| self.log(f" • {f}") | |
| if len(html_files) > 5: | |
| self.log(f" ... and {len(html_files) - 5} more") | |
| self._check_optional_resources() | |
| self.log("\n⚠️ Pre-flight check PASSED with warnings (relaxed mode)") | |
| return {'success': True, 'mode': 'relaxed'} | |
| # Phase 3: No HTML files at all | |
| self.log("❌ No HTML files found in directory") | |
| self.log("\n[Phase 3] Checking directory contents...") | |
| all_files = os.listdir(self.base_dir) | |
| self.log(f"📁 Directory contains {len(all_files)} total files") | |
| # Look for any potential content | |
| potential_content = [f for f in all_files if not f.startswith('.')] | |
| if potential_content: | |
| self.log("⚠️ Found non-HTML files:") | |
| for i, f in enumerate(potential_content[:5]): | |
| self.log(f" • {f}") | |
| if len(potential_content) > 5: | |
| self.log(f" ... and {len(potential_content) - 5} more") | |
| self.log("\n⚠️ BYPASSING standard checks - compilation may fail!") | |
| return {'success': True, 'mode': 'bypass'} | |
| self.log("\n❌ Directory appears to be empty") | |
| return None | |
| def _check_optional_resources(self): | |
| """Check for optional resources (metadata, CSS, images, fonts)""" | |
| self.log("\n📁 Checking optional resources:") | |
| if os.path.exists(self.metadata_path): | |
| self.log("✅ Found metadata.json") | |
| else: | |
| self.log("⚠️ No metadata.json found (will use defaults)") | |
| resources_found = False | |
| for subdir in ['css', 'images', 'fonts']: | |
| path = os.path.join(self.base_dir, subdir) | |
| if os.path.exists(path): | |
| items = os.listdir(path) | |
| if items: | |
| self.log(f"✅ Found {subdir}/ with {len(items)} files") | |
| resources_found = True | |
| else: | |
| self.log(f"📁 Found {subdir}/ (empty)") | |
| if not resources_found: | |
| self.log("⚠️ No resource directories found (CSS/images/fonts)") | |
| def _analyze_chapters(self) -> Dict[int, Tuple[str, float, str]]: | |
| """Analyze chapter files and extract titles using parallel processing""" | |
| self.log("\n📖 Extracting translated titles from chapter files...") | |
| chapter_info = {} | |
| sorted_files = self._find_html_files() | |
| if not sorted_files: | |
| self.log("⚠️ No translated chapter files found!") | |
| return chapter_info | |
| self.log(f"📖 Analyzing {len(sorted_files)} translated chapter files for titles...") | |
| self.log(f"🔧 Using {self.max_workers} parallel workers") | |
| def analyze_single_file(idx_filename): | |
| """Worker function to analyze a single file""" | |
| idx, filename = idx_filename | |
| file_path = os.path.join(self.output_dir, filename) | |
| try: | |
| # Read and process file | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| raw_html_content = f.read() | |
| # Decode HTML entities | |
| import html | |
| html_content = html.unescape(raw_html_content) | |
| html_content = self._fix_encoding_issues(html_content) | |
| html_content = HTMLEntityDecoder.decode(html_content) | |
| # Extract title | |
| title, confidence = TitleExtractor.extract_from_html( | |
| html_content, idx, filename | |
| ) | |
| return idx, (title, confidence, filename) | |
| except Exception as e: | |
| return idx, (f"Chapter {idx}", 0.0, filename), str(e) | |
| # Process files in parallel using environment variable worker count | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| # Submit all tasks | |
| futures = { | |
| executor.submit(analyze_single_file, (idx, filename)): idx | |
| for idx, filename in enumerate(sorted_files) | |
| } | |
| # Collect results as they complete | |
| completed = 0 | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| completed += 1 | |
| if len(result) == 2: # Success | |
| idx, info = result | |
| chapter_info[idx] = info | |
| # Log progress | |
| title, confidence, filename = info | |
| indicator = "✅" if confidence > 0.7 else "🟡" if confidence > 0.4 else "🔴" | |
| self.log(f" [{completed}/{len(sorted_files)}] {indicator} Chapter {idx}: '{title}' (confidence: {confidence:.2f})") | |
| else: # Error | |
| idx, info, error = result | |
| chapter_info[idx] = info | |
| self.log(f"❌ [{completed}/{len(sorted_files)}] Error processing chapter {idx}: {error}") | |
| except Exception as e: | |
| idx = futures[future] | |
| self.log(f"❌ Failed to process chapter {idx}: {e}") | |
| chapter_info[idx] = (f"Chapter {idx}", 0.0, sorted_files[idx]) | |
| return chapter_info | |
| def _process_chapters(self, book: epub.EpubBook, html_files: List[str], | |
| chapter_titles_info: Dict[int, Tuple[str, float, str]], | |
| css_items: List[epub.EpubItem], processed_images: Dict[str, str], | |
| spine: List, toc: List, metadata: dict) -> int: | |
| """Process chapters using parallel processing with AGGRESSIVE DEBUGGING""" | |
| chapters_added = 0 | |
| self.log(f"\n{'='*80}") | |
| self.log(f"📚 STARTING CHAPTER PROCESSING") | |
| self.log(f"📚 Total files to process: {len(html_files)}") | |
| self.log(f"🔧 Using {self.max_workers} parallel workers") | |
| self.log(f"📂 Output directory: {self.output_dir}") | |
| self.log(f"{'='*80}") | |
| # Debug chapter titles info | |
| self.log(f"\n[DEBUG] Chapter titles info has {len(chapter_titles_info)} entries") | |
| for num in list(chapter_titles_info.keys())[:5]: | |
| title, conf, method = chapter_titles_info[num] | |
| self.log(f" Chapter {num}: {title[:50]}... (conf: {conf}, method: {method})") | |
| # Prepare chapter data | |
| chapter_data = [] | |
| for idx, filename in enumerate(html_files): | |
| chapter_num = idx | |
| if chapter_num not in chapter_titles_info and (chapter_num + 1) in chapter_titles_info: | |
| chapter_num = idx + 1 | |
| chapter_data.append((chapter_num, filename)) | |
| # Debug specific problem chapters | |
| if 49 <= chapter_num <= 56: | |
| self.log(f"[DEBUG] Problem chapter found: {chapter_num} -> {filename}") | |
| def process_chapter_content(data): | |
| """Worker function to process chapter content with FULL DEBUGGING""" | |
| chapter_num, filename = data | |
| path = os.path.join(self.output_dir, filename) | |
| # Debug tracking for problem chapters | |
| is_problem_chapter = 49 <= chapter_num <= 56 | |
| try: | |
| if is_problem_chapter: | |
| self.log(f"\n[DEBUG] {'*'*60}") | |
| self.log(f"[DEBUG] PROCESSING PROBLEM CHAPTER {chapter_num}: {filename}") | |
| self.log(f"[DEBUG] Full path: {path}") | |
| # Check file exists | |
| if not os.path.exists(path): | |
| error_msg = f"File does not exist: {path}" | |
| self.log(f"[ERROR] {error_msg}") | |
| raise FileNotFoundError(error_msg) | |
| # Get file size | |
| file_size = os.path.getsize(path) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] File size: {file_size} bytes") | |
| # Read and decode | |
| raw_content = self._read_and_decode_html_file(path) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Raw content length after reading: {len(raw_content) if raw_content else 'NULL'}") | |
| if raw_content: | |
| self.log(f"[DEBUG] First 200 chars: {raw_content[:200]}") | |
| # Fix encoding | |
| raw_content = self._fix_encoding_issues(raw_content) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Content length after encoding fix: {len(raw_content) if raw_content else 'NULL'}") | |
| if not raw_content or not raw_content.strip(): | |
| error_msg = f"Empty content after reading/decoding: {filename}" | |
| if is_problem_chapter: | |
| self.log(f"[ERROR] {error_msg}") | |
| raise ValueError(error_msg) | |
| # Extract main content | |
| if not filename.startswith('response_'): | |
| before_len = len(raw_content) | |
| raw_content = self._extract_main_content(raw_content, filename) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Content extraction: {before_len} -> {len(raw_content)} chars") | |
| # Get title | |
| title = self._get_chapter_title(chapter_num, filename, raw_content, chapter_titles_info) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Chapter title: {title}") | |
| # Prepare CSS links | |
| css_links = [f"css/{item.file_name.split('/')[-1]}" for item in css_items] | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] CSS links: {css_links}") | |
| # XHTML conversion - THE CRITICAL PART | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Starting XHTML conversion...") | |
| xhtml_content = XHTMLConverter.ensure_compliance(raw_content, title, css_links) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] XHTML content length: {len(xhtml_content) if xhtml_content else 'NULL'}") | |
| if xhtml_content: | |
| self.log(f"[DEBUG] XHTML first 300 chars: {xhtml_content[:300]}") | |
| # Process images | |
| xhtml_content = self._process_chapter_images(xhtml_content, processed_images) | |
| # Validate | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Starting validation...") | |
| final_content = XHTMLConverter.validate(xhtml_content) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Final content length: {len(final_content)}") | |
| # Final XML validation | |
| try: | |
| ET.fromstring(final_content.encode('utf-8')) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] XML validation PASSED") | |
| except ET.ParseError as e: | |
| if is_problem_chapter: | |
| self.log(f"[ERROR] XML validation FAILED: {e}") | |
| # Show the exact error location | |
| lines = final_content.split('\n') | |
| import re | |
| match = re.search(r'line (\d+), column (\d+)', str(e)) | |
| if match: | |
| line_num = int(match.group(1)) | |
| if line_num <= len(lines): | |
| self.log(f"[ERROR] Problem line {line_num}: {lines[line_num-1][:100]}") | |
| # Create fallback | |
| final_content = XHTMLConverter._build_fallback_xhtml(title) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Using fallback XHTML") | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Chapter processing SUCCESSFUL") | |
| self.log(f"[DEBUG] {'*'*60}\n") | |
| return { | |
| 'num': chapter_num, | |
| 'filename': filename, | |
| 'title': title, | |
| 'content': final_content, | |
| 'success': True | |
| } | |
| except Exception as e: | |
| import traceback | |
| tb = traceback.format_exc() | |
| if is_problem_chapter: | |
| self.log(f"[ERROR] {'!'*60}") | |
| self.log(f"[ERROR] CHAPTER {chapter_num} PROCESSING FAILED") | |
| self.log(f"[ERROR] Exception type: {type(e).__name__}") | |
| self.log(f"[ERROR] Exception: {e}") | |
| self.log(f"[ERROR] Full traceback:\n{tb}") | |
| self.log(f"[ERROR] {'!'*60}\n") | |
| return { | |
| 'num': chapter_num, | |
| 'filename': filename, | |
| 'title': chapter_titles_info.get(chapter_num, (f"Chapter {chapter_num}", 0, ""))[0], | |
| 'error': str(e), | |
| 'traceback': tb, | |
| 'success': False | |
| } | |
| # Process in parallel | |
| processed_chapters = [] | |
| completed = 0 | |
| self.log(f"\n[DEBUG] Starting parallel processing...") | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| futures = { | |
| executor.submit(process_chapter_content, data): data[0] | |
| for data in chapter_data | |
| } | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| if result: | |
| processed_chapters.append(result) | |
| completed += 1 | |
| # Extra logging for problem chapters | |
| if 49 <= result['num'] <= 56: | |
| if result['success']: | |
| self.log(f" [{completed}/{len(chapter_data)}] ✅ PROBLEM CHAPTER PROCESSED: {result['num']} - {result['title']}") | |
| else: | |
| self.log(f" [{completed}/{len(chapter_data)}] ❌ PROBLEM CHAPTER FAILED: {result['num']} - {result['filename']}") | |
| self.log(f" Error: {result['error']}") | |
| else: | |
| if result['success']: | |
| self.log(f" [{completed}/{len(chapter_data)}] ✅ Processed: {result['title']}") | |
| else: | |
| self.log(f" [{completed}/{len(chapter_data)}] ❌ Failed: {result['filename']} - {result['error']}") | |
| except Exception as e: | |
| completed += 1 | |
| chapter_num = futures[future] | |
| self.log(f" [{completed}/{len(chapter_data)}] ❌ Exception processing chapter {chapter_num}: {e}") | |
| import traceback | |
| self.log(f"[ERROR] Traceback:\n{traceback.format_exc()}") | |
| # Sort by chapter number to maintain order | |
| processed_chapters.sort(key=lambda x: x['num']) | |
| # Debug what we have | |
| self.log(f"\n[DEBUG] Processed {len(processed_chapters)} chapters") | |
| failed_chapters = [c for c in processed_chapters if not c['success']] | |
| if failed_chapters: | |
| self.log(f"[WARNING] {len(failed_chapters)} chapters failed:") | |
| for fc in failed_chapters: | |
| self.log(f" - Chapter {fc['num']}: {fc['filename']} - {fc.get('error', 'Unknown error')}") | |
| # Add chapters to book in order (this must be sequential) | |
| self.log("\n📦 Adding chapters to EPUB structure...") | |
| for chapter_data in processed_chapters: | |
| # Debug for problem chapters | |
| if 49 <= chapter_data['num'] <= 56: | |
| self.log(f"[DEBUG] Adding problem chapter {chapter_data['num']} to EPUB...") | |
| if chapter_data['success']: | |
| try: | |
| # Create EPUB chapter | |
| import html | |
| chapter = epub.EpubHtml( | |
| title=html.unescape(chapter_data['title']), | |
| file_name=os.path.basename(chapter_data['filename']), | |
| lang=metadata.get("language", "en") | |
| ) | |
| chapter.content = FileUtils.ensure_bytes(chapter_data['content']) | |
| if self.attach_css_to_chapters: | |
| for css_item in css_items: | |
| chapter.add_item(css_item) | |
| # Add to book | |
| book.add_item(chapter) | |
| spine.append(chapter) | |
| # Include auxiliary files in spine but omit from TOC | |
| base_name = os.path.basename(chapter_data['filename']) | |
| if hasattr(self, 'auxiliary_html_files') and base_name in self.auxiliary_html_files: | |
| self.log(f" 🛈 Added auxiliary page to spine (not in TOC): {base_name}") | |
| else: | |
| toc.append(chapter) | |
| chapters_added += 1 | |
| if 49 <= chapter_data['num'] <= 56: | |
| self.log(f" ✅ ADDED PROBLEM CHAPTER {chapter_data['num']}: '{chapter_data['title']}'") | |
| else: | |
| if base_name in getattr(self, 'auxiliary_html_files', set()): | |
| self.log(f" ✅ Added auxiliary page (spine only): '{base_name}'") | |
| else: | |
| self.log(f" ✅ Added chapter {chapter_data['num']}: '{chapter_data['title']}'") | |
| except Exception as e: | |
| self.log(f" ❌ Failed to add chapter {chapter_data['num']} to book: {e}") | |
| import traceback | |
| self.log(f"[ERROR] Traceback:\n{traceback.format_exc()}") | |
| # Add error placeholder | |
| self._add_error_chapter_from_data(book, chapter_data, spine, toc, metadata) | |
| chapters_added += 1 | |
| else: | |
| self.log(f" ⚠️ Adding error placeholder for chapter {chapter_data['num']}") | |
| # Add error placeholder | |
| self._add_error_chapter_from_data(book, chapter_data, spine, toc, metadata) | |
| chapters_added += 1 | |
| self.log(f"\n{'='*80}") | |
| self.log(f"✅ CHAPTER PROCESSING COMPLETE") | |
| self.log(f"✅ Added {chapters_added} chapters to EPUB") | |
| self.log(f"{'='*80}\n") | |
| return chapters_added | |
| def _add_error_chapter_from_data(self, book, chapter_data, spine, toc, metadata): | |
| """Helper to add an error placeholder chapter""" | |
| try: | |
| title = chapter_data.get('title', f"Chapter {chapter_data['num']}") | |
| chapter = epub.EpubHtml( | |
| title=title, | |
| file_name=f"chapter_{chapter_data['num']:03d}.xhtml", | |
| lang=metadata.get("language", "en") | |
| ) | |
| error_content = f"""<?xml version="1.0" encoding="utf-8"?> | |
| <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> | |
| <html xmlns="http://www.w3.org/1999/xhtml"> | |
| <head><title>{ContentProcessor.safe_escape(title)}</title></head> | |
| <body> | |
| <h1>{ContentProcessor.safe_escape(title)}</h1> | |
| <p>Error loading chapter content.</p> | |
| <p>File: {chapter_data.get('filename', 'unknown')}</p> | |
| <p>Error: {chapter_data.get('error', 'unknown error')}</p> | |
| </body> | |
| </html>""" | |
| chapter.content = error_content.encode('utf-8') | |
| book.add_item(chapter) | |
| spine.append(chapter) | |
| toc.append(chapter) | |
| except Exception as e: | |
| self.log(f" ❌ Failed to add error placeholder: {e}") | |
| def _get_chapter_order_from_opf(self) -> Dict[str, int]: | |
| """Get chapter order from content.opf or source EPUB | |
| Returns dict mapping original_filename -> chapter_number | |
| """ | |
| # First, try to find content.opf in the current directory | |
| opf_path = os.path.join(self.output_dir, "content.opf") | |
| if os.path.exists(opf_path): | |
| self.log("✅ Found content.opf - using for chapter ordering") | |
| return self._parse_opf_file(opf_path) | |
| # If not found, try to extract from source EPUB | |
| source_epub = os.getenv('EPUB_PATH') | |
| if source_epub and os.path.exists(source_epub): | |
| self.log(f"📚 Extracting chapter order from source EPUB: {source_epub}") | |
| return self._extract_order_from_epub(source_epub) | |
| # Fallback to translation_progress.json if available | |
| progress_file = os.path.join(self.output_dir, "translation_progress.json") | |
| if os.path.exists(progress_file): | |
| self.log("📄 Using translation_progress.json for chapter order") | |
| return self._get_order_from_progress_file(progress_file) | |
| return None | |
| def _parse_opf_file(self, opf_path: str) -> Dict[str, int]: | |
| """Parse content.opf to get chapter order from spine | |
| Returns dict mapping original_filename -> chapter_number | |
| """ | |
| try: | |
| tree = ET.parse(opf_path) | |
| root = tree.getroot() | |
| # Handle namespaces | |
| ns = {'opf': 'http://www.idpf.org/2007/opf'} | |
| if root.tag.startswith('{'): | |
| # Extract default namespace | |
| default_ns = root.tag[1:root.tag.index('}')] | |
| ns = {'opf': default_ns} | |
| # Get manifest to map IDs to files | |
| manifest = {} | |
| for item in root.findall('.//opf:manifest/opf:item', ns): | |
| item_id = item.get('id') | |
| href = item.get('href') | |
| media_type = item.get('media-type', '') | |
| # Only include HTML/XHTML files | |
| if item_id and href and ('html' in media_type.lower() or href.endswith(('.html', '.xhtml', '.htm'))): | |
| # Get just the filename without path | |
| filename = os.path.basename(href) | |
| manifest[item_id] = filename | |
| # Get spine order | |
| filename_to_order = {} | |
| chapter_num = 0 # Start from 0 for array indexing | |
| spine = root.find('.//opf:spine', ns) | |
| if spine is not None: | |
| # Build dynamic skip list; allow cover when TRANSLATE_COVER_HTML is enabled | |
| skip_list = ['nav', 'toc', 'contents'] | |
| if os.environ.get('TRANSLATE_COVER_HTML', '0') != '1': | |
| skip_list.append('cover') | |
| for itemref in spine.findall('opf:itemref', ns): | |
| idref = itemref.get('idref') | |
| if idref and idref in manifest: | |
| filename = manifest[idref] | |
| # Skip navigation documents; optionally skip cover | |
| if not any(skip in filename.lower() for skip in skip_list): | |
| filename_to_order[filename] = chapter_num | |
| self.log(f" Chapter {chapter_num}: {filename}") | |
| chapter_num += 1 | |
| return filename_to_order | |
| except Exception as e: | |
| self.log(f"⚠️ Error parsing content.opf: {e}") | |
| import traceback | |
| self.log(traceback.format_exc()) | |
| return None | |
| def _extract_order_from_epub(self, epub_path: str) -> List[Tuple[int, str]]: | |
| """Extract chapter order from source EPUB file""" | |
| try: | |
| import zipfile | |
| with zipfile.ZipFile(epub_path, 'r') as zf: | |
| # Find content.opf (might be in different locations) | |
| opf_file = None | |
| for name in zf.namelist(): | |
| if name.endswith('content.opf'): | |
| opf_file = name | |
| break | |
| if not opf_file: | |
| # Try META-INF/container.xml to find content.opf | |
| try: | |
| container = zf.read('META-INF/container.xml') | |
| # Parse container.xml to find content.opf location | |
| container_tree = ET.fromstring(container) | |
| rootfile = container_tree.find('.//{urn:oasis:names:tc:opendocument:xmlns:container}rootfile') | |
| if rootfile is not None: | |
| opf_file = rootfile.get('full-path') | |
| except: | |
| pass | |
| if opf_file: | |
| opf_content = zf.read(opf_file) | |
| # Save temporarily and parse | |
| temp_opf = os.path.join(self.output_dir, "temp_content.opf") | |
| with open(temp_opf, 'wb') as f: | |
| f.write(opf_content) | |
| result = self._parse_opf_file(temp_opf) | |
| # Clean up temp file | |
| if os.path.exists(temp_opf): | |
| os.remove(temp_opf) | |
| return result | |
| except Exception as e: | |
| self.log(f"⚠️ Error extracting from EPUB: {e}") | |
| return None | |
| def _find_html_files(self) -> List[str]: | |
| """Find HTML files using OPF-based ordering when available""" | |
| self.log(f"\n[DEBUG] Scanning directory: {self.output_dir}") | |
| # Get all HTML files in directory | |
| all_files = os.listdir(self.output_dir) | |
| html_extensions = ('.html', '.htm', '.xhtml') | |
| html_files = [f for f in all_files if f.lower().endswith(html_extensions)] | |
| if not html_files: | |
| self.log("[ERROR] No HTML files found!") | |
| return [] | |
| # Try to get authoritative order from OPF/EPUB | |
| opf_order = self._get_chapter_order_from_opf() | |
| if opf_order: | |
| self.log("✅ Using authoritative chapter order from OPF/EPUB") | |
| self.log(f"[DEBUG] OPF entries (first 5): {list(opf_order.items())[:5]}") | |
| # Create mapping based on core filename (strip response_ and strip ALL extensions) | |
| ordered_files = [] | |
| unmapped_files = [] | |
| def strip_all_ext(name: str) -> str: | |
| # Remove all trailing known extensions | |
| core = name | |
| while True: | |
| parts = core.rsplit('.', 1) | |
| if len(parts) == 2 and parts[1].lower() in ['html', 'htm', 'xhtml', 'xml']: | |
| core = parts[0] | |
| else: | |
| break | |
| return core | |
| for output_file in html_files: | |
| core_name = output_file[9:] if output_file.startswith('response_') else output_file | |
| core_name = strip_all_ext(core_name) | |
| matched = False | |
| for opf_name, chapter_order in opf_order.items(): | |
| opf_file = opf_name.split('/')[-1] | |
| opf_core = strip_all_ext(opf_file) | |
| if core_name == opf_core: | |
| ordered_files.append((chapter_order, output_file)) | |
| self.log(f" Mapped: {output_file} -> {opf_name} (order: {chapter_order})") | |
| matched = True | |
| break | |
| if not matched: | |
| unmapped_files.append(output_file) | |
| self.log(f" ⚠️ Could not map: {output_file} (core: {core_name})") | |
| if ordered_files: | |
| # Sort by chapter order and extract just the filenames | |
| ordered_files.sort(key=lambda x: x[0]) | |
| final_order = [f for _, f in ordered_files] | |
| # Append any unmapped files at the end | |
| if unmapped_files: | |
| self.log(f"⚠️ Adding {len(unmapped_files)} unmapped files at the end") | |
| final_order.extend(sorted(unmapped_files)) | |
| # Mark non-response unmapped files as auxiliary (omit from TOC) | |
| aux = {f for f in unmapped_files if not f.startswith('response_')} | |
| # If skipping override is enabled, do NOT treat cover.html as auxiliary | |
| if os.environ.get('TRANSLATE_COVER_HTML', '0') == '1': | |
| aux = {f for f in aux if os.path.splitext(os.path.basename(f))[0].lower() not in ['cover']} | |
| self.auxiliary_html_files = aux | |
| else: | |
| self.auxiliary_html_files = set() | |
| self.log(f"✅ Successfully ordered {len(final_order)} chapters using OPF") | |
| return final_order | |
| else: | |
| self.log("⚠️ Could not map any files using OPF order, falling back to pattern matching") | |
| # Fallback to original pattern matching logic | |
| self.log("⚠️ No OPF/EPUB found or mapping failed, using filename pattern matching") | |
| # First, try to find response_ files | |
| response_files = [f for f in html_files if f.startswith('response_')] | |
| if response_files: | |
| # Sort response_ files as primary chapters | |
| main_files = list(response_files) | |
| self.log(f"[DEBUG] Found {len(response_files)} response_ files") | |
| # Check if files have -h- pattern | |
| if any('-h-' in f for f in response_files): | |
| # Use special sorting for -h- pattern | |
| def extract_h_number(filename): | |
| match = re.search(r'-h-(\d+)', filename) | |
| if match: | |
| return int(match.group(1)) | |
| return 999999 | |
| main_files.sort(key=extract_h_number) | |
| else: | |
| # Use numeric sorting for standard response_ files | |
| def extract_number(filename): | |
| match = re.match(r'response_(\d+)_', filename) | |
| if match: | |
| return int(match.group(1)) | |
| return 0 | |
| main_files.sort(key=extract_number) | |
| # Append non-response files as auxiliary pages (not in TOC) | |
| aux_files = sorted([f for f in html_files if not f.startswith('response_')]) | |
| if aux_files: | |
| aux_set = set(aux_files) | |
| # If skipping override is enabled, ensure cover.html is not marked auxiliary | |
| if os.environ.get('TRANSLATE_COVER_HTML', '0') == '1': | |
| aux_set = {f for f in aux_set if os.path.splitext(os.path.basename(f))[0].lower() != 'cover'} | |
| self.auxiliary_html_files = aux_set | |
| self.log(f"[DEBUG] Appending {len(aux_set)} auxiliary HTML file(s) (not in TOC): {list(aux_set)[:5]}") | |
| else: | |
| self.auxiliary_html_files = set() | |
| return main_files + aux_files | |
| else: | |
| # Progressive sorting for non-standard files | |
| html_files.sort(key=self.get_robust_sort_key) | |
| # No response_ files -> treat none as auxiliary | |
| self.auxiliary_html_files = set() | |
| return html_files | |
| def _read_and_decode_html_file(self, file_path: str) -> str: | |
| """Read HTML file and decode entities, preserving < and > as text. | |
| This prevents narrative angle-bracket text from becoming bogus tags.""" | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| if not content: | |
| return content | |
| import re | |
| import html | |
| # Placeholders for angle bracket entities | |
| LT_PLACEHOLDER = "\ue000" | |
| GT_PLACEHOLDER = "\ue001" | |
| # Patterns for common representations of < and > | |
| _lt_entity_patterns = [r'<', r'<', r'�*60;', r'�*3[cC];'] | |
| _gt_entity_patterns = [r'>', r'>', r'�*62;', r'�*3[eE];'] | |
| def protect_angle_entities(s: str) -> str: | |
| # Replace all forms of < and > with placeholders so unescape won't turn them into real < > | |
| for pat in _lt_entity_patterns: | |
| s = re.sub(pat, LT_PLACEHOLDER, s) | |
| for pat in _gt_entity_patterns: | |
| s = re.sub(pat, GT_PLACEHOLDER, s) | |
| return s | |
| max_iterations = 5 | |
| for _ in range(max_iterations): | |
| prev_content = content | |
| # Protect before each pass in case of double-encoded entities | |
| content = protect_angle_entities(content) | |
| # html.unescape handles all standard HTML entities (except our placeholders) | |
| content = html.unescape(content) | |
| if content == prev_content: | |
| break | |
| # Restore placeholders back to entities so they remain literal text in XHTML | |
| content = content.replace(LT_PLACEHOLDER, '<').replace(GT_PLACEHOLDER, '>') | |
| return content | |
| def _process_single_chapter(self, book: epub.EpubBook, num: int, filename: str, | |
| chapter_titles_info: Dict[int, Tuple[str, float, str]], | |
| css_items: List[epub.EpubItem], processed_images: Dict[str, str], | |
| spine: List, toc: List, metadata: dict) -> bool: | |
| """Process a single chapter with COMPREHENSIVE debugging""" | |
| path = os.path.join(self.output_dir, filename) | |
| # Flag for extra debugging on problem chapters | |
| is_problem_chapter = 49 <= num <= 56 | |
| is_response_file = filename.startswith('response_') | |
| try: | |
| if is_problem_chapter: | |
| self.log(f"\n{'='*70}") | |
| self.log(f"[DEBUG] PROCESSING PROBLEM CHAPTER {num}") | |
| self.log(f"[DEBUG] Filename: {filename}") | |
| self.log(f"[DEBUG] Is response file: {is_response_file}") | |
| self.log(f"[DEBUG] Full path: {path}") | |
| # Check file exists and size | |
| if not os.path.exists(path): | |
| self.log(f"[ERROR] File does not exist: {path}") | |
| return False | |
| file_size = os.path.getsize(path) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] File size: {file_size} bytes") | |
| if file_size == 0: | |
| self.log(f"[ERROR] File is empty (0 bytes): {filename}") | |
| return False | |
| # Read and decode | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Reading and decoding file...") | |
| raw_content = self._read_and_decode_html_file(path) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Raw content length: {len(raw_content) if raw_content else 'NULL'}") | |
| if raw_content: | |
| # Show first and last parts | |
| self.log(f"[DEBUG] First 300 chars of raw content:") | |
| self.log(f" {raw_content[:300]!r}") | |
| self.log(f"[DEBUG] Last 300 chars of raw content:") | |
| self.log(f" {raw_content[-300:]!r}") | |
| # Check for common issues | |
| if '<' in raw_content[:500]: | |
| self.log(f"[DEBUG] Found < entities in content") | |
| if '>' in raw_content[:500]: | |
| self.log(f"[DEBUG] Found > entities in content") | |
| if '<Official' in raw_content[:500] or '<System' in raw_content[:500]: | |
| self.log(f"[DEBUG] Found story tags in content") | |
| # Fix encoding issues | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Fixing encoding issues...") | |
| before_fix = len(raw_content) if raw_content else 0 | |
| raw_content = self._fix_encoding_issues(raw_content) | |
| after_fix = len(raw_content) if raw_content else 0 | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Encoding fix: {before_fix} -> {after_fix} chars") | |
| if before_fix != after_fix: | |
| self.log(f"[DEBUG] Content changed during encoding fix") | |
| if not raw_content or not raw_content.strip(): | |
| self.log(f"[WARNING] Chapter {num} is empty after decoding/encoding fix") | |
| if is_problem_chapter: | |
| self.log(f"[ERROR] Problem chapter {num} has no content!") | |
| return False | |
| # Extract main content if needed | |
| if not filename.startswith('response_'): | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Extracting main content (not a response file)...") | |
| before_extract = len(raw_content) | |
| raw_content = self._extract_main_content(raw_content, filename) | |
| after_extract = len(raw_content) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Content extraction: {before_extract} -> {after_extract} chars") | |
| if after_extract < before_extract / 2: | |
| self.log(f"[WARNING] Lost more than 50% of content during extraction!") | |
| self.log(f"[DEBUG] Content after extraction (first 300 chars):") | |
| self.log(f" {raw_content[:300]!r}") | |
| else: | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Skipping content extraction for response file") | |
| self.log(f"[DEBUG] Response file content structure:") | |
| # Check what's in a response file | |
| if '<body>' in raw_content: | |
| self.log(f" Has <body> tag") | |
| if '<html>' in raw_content: | |
| self.log(f" Has <html> tag") | |
| if '<!DOCTYPE' in raw_content: | |
| self.log(f" Has DOCTYPE declaration") | |
| # Check for any obvious issues | |
| if raw_content.strip().startswith('Error'): | |
| self.log(f"[ERROR] Response file starts with 'Error'") | |
| if 'failed' in raw_content.lower()[:500]: | |
| self.log(f"[WARNING] Response file contains 'failed' in first 500 chars") | |
| # Get chapter title | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Getting chapter title...") | |
| title = self._get_chapter_title(num, filename, raw_content, chapter_titles_info) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Chapter title: {title!r}") | |
| if title == f"Chapter {num}" or title.startswith("Chapter"): | |
| self.log(f"[WARNING] Using generic title, couldn't extract proper title") | |
| # Prepare CSS links | |
| css_links = [f"css/{item.file_name.split('/')[-1]}" for item in css_items] | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] CSS links: {css_links}") | |
| # XHTML conversion - CRITICAL PART | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Starting XHTML conversion...") | |
| self.log(f"[DEBUG] Content length before XHTML: {len(raw_content)}") | |
| xhtml_content = XHTMLConverter.ensure_compliance(raw_content, title, css_links) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] XHTML conversion complete") | |
| self.log(f"[DEBUG] XHTML content length: {len(xhtml_content) if xhtml_content else 'NULL'}") | |
| if xhtml_content: | |
| # Check if it's the fallback | |
| if 'Error processing content' in xhtml_content: | |
| self.log(f"[ERROR] Got fallback XHTML - conversion failed!") | |
| else: | |
| self.log(f"[DEBUG] XHTML first 400 chars:") | |
| self.log(f" {xhtml_content[:400]!r}") | |
| # Process chapter images | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Processing chapter images...") | |
| xhtml_content = self._process_chapter_images(xhtml_content, processed_images) | |
| # Validate final content | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Validating final XHTML...") | |
| final_content = XHTMLConverter.validate(xhtml_content) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Validation complete") | |
| self.log(f"[DEBUG] Final content length: {len(final_content)}") | |
| # Check for fallback again | |
| if 'Error processing content' in final_content: | |
| self.log(f"[ERROR] Final content is fallback error page!") | |
| # Create chapter object | |
| import html | |
| chapter = epub.EpubHtml( | |
| title=html.unescape(title), | |
| file_name=os.path.basename(filename), | |
| lang=metadata.get("language", "en") | |
| ) | |
| chapter.content = FileUtils.ensure_bytes(final_content) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Chapter object created") | |
| self.log(f"[DEBUG] Chapter content size: {len(chapter.content)} bytes") | |
| # Attach CSS if configured | |
| if self.attach_css_to_chapters: | |
| for css_item in css_items: | |
| chapter.add_item(css_item) | |
| if is_problem_chapter: | |
| self.log(f"[DEBUG] Attached {len(css_items)} CSS files") | |
| # Add to book | |
| book.add_item(chapter) | |
| spine.append(chapter) | |
| toc.append(chapter) | |
| if is_problem_chapter: | |
| self.log(f"[SUCCESS] Problem chapter {num} successfully added to EPUB!") | |
| self.log(f"{'='*70}\n") | |
| else: | |
| self.log(f" ✓ Chapter {num}: {title}") | |
| return True | |
| except Exception as e: | |
| import traceback | |
| tb = traceback.format_exc() | |
| self.log(f"\n{'!'*70}") | |
| self.log(f"[ERROR] Failed to process chapter {num}: {filename}") | |
| self.log(f"[ERROR] Exception type: {type(e).__name__}") | |
| self.log(f"[ERROR] Exception message: {e}") | |
| if is_problem_chapter: | |
| self.log(f"[ERROR] PROBLEM CHAPTER {num} FAILED!") | |
| self.log(f"[ERROR] Full traceback:") | |
| self.log(tb) | |
| # Try to identify the exact failure point | |
| if 'ensure_compliance' in tb: | |
| self.log(f"[ERROR] Failed during XHTML compliance") | |
| elif 'validate' in tb: | |
| self.log(f"[ERROR] Failed during validation") | |
| elif '_extract_main_content' in tb: | |
| self.log(f"[ERROR] Failed during content extraction") | |
| elif '_read_and_decode' in tb: | |
| self.log(f"[ERROR] Failed during file reading/decoding") | |
| self.log(f"{'!'*70}\n") | |
| # Add error chapter as fallback | |
| self._add_error_chapter(book, num, title if 'title' in locals() else f"Chapter {num}", | |
| spine, toc, metadata, str(e)) | |
| return False | |
| def _get_chapter_title(self, num: int, filename: str, content: str, | |
| chapter_titles_info: Dict[int, Tuple[str, float, str]]) -> str: | |
| """Get chapter title with fallbacks - uses position-based numbering""" | |
| title = None | |
| confidence = 0.0 | |
| # Primary source: pre-analyzed title using position-based number | |
| if num in chapter_titles_info: | |
| title, confidence, stored_filename = chapter_titles_info[num] | |
| # Re-extract if low confidence or missing | |
| if not title or confidence < 0.5: | |
| backup_title, backup_confidence = TitleExtractor.extract_from_html(content, num, filename) | |
| if backup_confidence > confidence: | |
| title = backup_title | |
| confidence = backup_confidence | |
| # Clean and validate | |
| if title: | |
| title = TitleExtractor.clean_title(title) | |
| if not TitleExtractor.is_valid_title(title): | |
| title = None | |
| # Fallback for non-standard files | |
| if not title and not filename.startswith('response_'): | |
| # Try enhanced extraction methods for web-scraped content | |
| title = self._fallback_title_extraction(content, filename, num) | |
| # Final fallback - use position-based chapter number | |
| if not title: | |
| title = f"Chapter {num}" | |
| return title | |
| def get_robust_sort_key(self, filename): | |
| """Extract chapter/sequence number using multiple patterns""" | |
| # Pattern 1: -h-NUMBER (your current pattern) | |
| match = re.search(r'-h-(\d+)', filename) | |
| if match: | |
| return (1, int(match.group(1))) | |
| # Pattern 2: chapter-NUMBER or chapter_NUMBER or chapterNUMBER | |
| match = re.search(r'chapter[-_\s]?(\d+)', filename, re.IGNORECASE) | |
| if match: | |
| return (2, int(match.group(1))) | |
| # Pattern 3: ch-NUMBER or ch_NUMBER or chNUMBER | |
| match = re.search(r'\bch[-_\s]?(\d+)\b', filename, re.IGNORECASE) | |
| if match: | |
| return (3, int(match.group(1))) | |
| # Pattern 4: response_NUMBER_ (if response_ prefix exists) | |
| if filename.startswith('response_'): | |
| match = re.match(r'response_(\d+)[-_]', filename) | |
| if match: | |
| return (4, int(match.group(1))) | |
| # Pattern 5: book_NUMBER, story_NUMBER, part_NUMBER, section_NUMBER | |
| match = re.search(r'(?:book|story|part|section)[-_\s]?(\d+)', filename, re.IGNORECASE) | |
| if match: | |
| return (5, int(match.group(1))) | |
| # Pattern 6: split_NUMBER (Calibre pattern) | |
| match = re.search(r'split_(\d+)', filename) | |
| if match: | |
| return (6, int(match.group(1))) | |
| # Pattern 7: Just NUMBER.html (like 1.html, 2.html) | |
| match = re.match(r'^(\d+)\.(?:html?|xhtml)$', filename) | |
| if match: | |
| return (7, int(match.group(1))) | |
| # Pattern 8: -NUMBER at end before extension | |
| match = re.search(r'-(\d+)\.(?:html?|xhtml)$', filename) | |
| if match: | |
| return (8, int(match.group(1))) | |
| # Pattern 9: _NUMBER at end before extension | |
| match = re.search(r'_(\d+)\.(?:html?|xhtml)$', filename) | |
| if match: | |
| return (9, int(match.group(1))) | |
| # Pattern 10: (NUMBER) in parentheses anywhere | |
| match = re.search(r'\((\d+)\)', filename) | |
| if match: | |
| return (10, int(match.group(1))) | |
| # Pattern 11: [NUMBER] in brackets anywhere | |
| match = re.search(r'\[(\d+)\]', filename) | |
| if match: | |
| return (11, int(match.group(1))) | |
| # Pattern 12: page-NUMBER or p-NUMBER or pg-NUMBER | |
| match = re.search(r'(?:page|pg?)[-_\s]?(\d+)', filename, re.IGNORECASE) | |
| if match: | |
| return (12, int(match.group(1))) | |
| # Pattern 13: Any file ending with NUMBER before extension | |
| match = re.search(r'(\d+)\.(?:html?|xhtml)$', filename) | |
| if match: | |
| return (13, int(match.group(1))) | |
| # Pattern 14: Roman numerals (I, II, III, IV, etc.) | |
| roman_pattern = r'\b(M{0,3}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3}))\b' | |
| match = re.search(roman_pattern, filename) | |
| if match: | |
| roman = match.group(1) | |
| # Convert roman to number | |
| roman_dict = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000} | |
| val = 0 | |
| for i in range(len(roman)): | |
| if i > 0 and roman_dict[roman[i]] > roman_dict[roman[i-1]]: | |
| val += roman_dict[roman[i]] - 2 * roman_dict[roman[i-1]] | |
| else: | |
| val += roman_dict[roman[i]] | |
| return (14, val) | |
| # Pattern 15: First significant number found | |
| numbers = re.findall(r'\d+', filename) | |
| if numbers: | |
| # Skip common year numbers (1900-2099) unless it's the only number | |
| significant_numbers = [int(n) for n in numbers if not (1900 <= int(n) <= 2099)] | |
| if significant_numbers: | |
| return (15, significant_numbers[0]) | |
| elif numbers: | |
| return (15, int(numbers[0])) | |
| # Final fallback: alphabetical | |
| return (99, filename) | |
| def _extract_chapter_number(self, filename: str, default_idx: int) -> int: | |
| """Extract chapter number using multiple patterns""" | |
| # FIXED: Pattern 1 - Check -h-NUMBER FIRST (YOUR FILES USE THIS!) | |
| match = re.search(r'-h-(\d+)', filename) | |
| if match: | |
| return int(match.group(1)) | |
| # Pattern 2: response_NUMBER_ (standard pattern) | |
| match = re.match(r"response_(\d+)_", filename) | |
| if match: | |
| return int(match.group(1)) | |
| # Pattern 3: chapter-NUMBER, chapter_NUMBER, chapterNUMBER | |
| match = re.search(r'chapter[-_\s]?(\d+)', filename, re.IGNORECASE) | |
| if match: | |
| return int(match.group(1)) | |
| # Pattern 4: ch-NUMBER, ch_NUMBER, chNUMBER | |
| match = re.search(r'\bch[-_\s]?(\d+)\b', filename, re.IGNORECASE) | |
| if match: | |
| return int(match.group(1)) | |
| # Pattern 5: Just NUMBER.html (like 127.html) | |
| match = re.match(r'^(\d+)\.(?:html?|xhtml)$', filename) | |
| if match: | |
| return int(match.group(1)) | |
| # Pattern 6: _NUMBER at end before extension | |
| match = re.search(r'_(\d+)\.(?:html?|xhtml)$', filename) | |
| if match: | |
| return int(match.group(1)) | |
| # Pattern 7: -NUMBER at end before extension | |
| match = re.search(r'-(\d+)\.(?:html?|xhtml)$', filename) | |
| if match: | |
| return int(match.group(1)) | |
| # Pattern 8: (NUMBER) in parentheses | |
| match = re.search(r'\((\d+)\)', filename) | |
| if match: | |
| return int(match.group(1)) | |
| # Pattern 9: [NUMBER] in brackets | |
| match = re.search(r'\[(\d+)\]', filename) | |
| if match: | |
| return int(match.group(1)) | |
| # Pattern 10: Use the sort key logic | |
| sort_key = self.get_robust_sort_key(filename) | |
| if isinstance(sort_key[1], int) and sort_key[1] > 0: | |
| return sort_key[1] | |
| # Final fallback: use position + 1 | |
| return default_idx + 1 | |
| def _extract_main_content(self, html_content: str, filename: str) -> str: | |
| """Extract main content from web-scraped HTML pages | |
| This method tries to find the actual chapter content within a full webpage | |
| """ | |
| try: | |
| # For web-scraped content, try to extract just the chapter part | |
| # Common patterns for chapter content containers | |
| content_patterns = [ | |
| # Look for specific class names commonly used for content | |
| (r'<div[^>]*class="[^"]*(?:chapter-content|entry-content|epcontent|post-content|content-area|main-content)[^"]*"[^>]*>(.*?)</div>', re.DOTALL | re.IGNORECASE), | |
| # Look for article tags with content | |
| (r'<article[^>]*>(.*?)</article>', re.DOTALL | re.IGNORECASE), | |
| # Look for main tags | |
| (r'<main[^>]*>(.*?)</main>', re.DOTALL | re.IGNORECASE), | |
| # Look for specific id patterns | |
| (r'<div[^>]*id="[^"]*(?:content|chapter|post)[^"]*"[^>]*>(.*?)</div>', re.DOTALL | re.IGNORECASE), | |
| ] | |
| for pattern, flags in content_patterns: | |
| match = re.search(pattern, html_content, flags) | |
| if match: | |
| extracted = match.group(1) | |
| # Make sure we got something substantial | |
| if len(extracted.strip()) > 100: | |
| self.log(f"📄 Extracted main content using pattern for {filename}") | |
| return extracted | |
| # If no patterns matched, check if this looks like a full webpage | |
| if '<html' in html_content.lower() and '<body' in html_content.lower(): | |
| # Try to extract body content | |
| body_match = re.search(r'<body[^>]*>(.*?)</body>', html_content, re.DOTALL | re.IGNORECASE) | |
| if body_match: | |
| self.log(f"📄 Extracted body content for {filename}") | |
| return body_match.group(1) | |
| # If all else fails, return original content | |
| self.log(f"📄 Using original content for {filename}") | |
| return html_content | |
| except Exception as e: | |
| self.log(f"⚠️ Content extraction failed for {filename}: {e}") | |
| return html_content | |
| def _fallback_title_extraction(self, content: str, filename: str, num: int) -> Optional[str]: | |
| """Fallback title extraction for when TitleExtractor fails | |
| This handles web-scraped pages and other non-standard formats | |
| """ | |
| # Try filename-based extraction first (often more reliable for web scrapes) | |
| filename_title = self._extract_title_from_filename_fallback(filename, num) | |
| if filename_title: | |
| return filename_title | |
| # Try HTML content extraction with patterns TitleExtractor might miss | |
| html_title = self._extract_title_from_html_fallback(content, num) | |
| if html_title: | |
| return html_title | |
| return None | |
| def _extract_title_from_html_fallback(self, content: str, num: int) -> Optional[str]: | |
| """Fallback HTML title extraction for web-scraped content""" | |
| # Look for title patterns that TitleExtractor might miss | |
| # Specifically for web-scraped novel sites | |
| patterns = [ | |
| # Title tags with site separators | |
| r'<title[^>]*>([^|–\-]+?)(?:\s*[|–\-]\s*[^<]+)?</title>', | |
| # Specific class patterns from novel sites | |
| r'<div[^>]*class="[^"]*cat-series[^"]*"[^>]*>([^<]+)</div>', | |
| r'<h1[^>]*class="[^"]*entry-title[^"]*"[^>]*>([^<]+)</h1>', | |
| r'<span[^>]*class="[^"]*chapter-title[^"]*"[^>]*>([^<]+)</span>', | |
| # Meta property patterns | |
| r'<meta[^>]*property="og:title"[^>]*content="([^"]+)"', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, content, re.IGNORECASE) | |
| if match: | |
| title = match.group(1).strip() | |
| # Decode HTML entities | |
| title = HTMLEntityDecoder.decode(title) | |
| # Additional cleanup for web-scraped content | |
| title = re.sub(r'\s+', ' ', title) # Normalize whitespace | |
| title = title.strip() | |
| # Validate it's reasonable | |
| if 3 < len(title) < 200 and title.lower() != 'untitled': | |
| self.log(f"📝 Fallback extracted title from HTML: '{title}'") | |
| return title | |
| return None | |
| def _extract_title_from_filename_fallback(self, filename: str, num: int) -> Optional[str]: | |
| """Fallback filename title extraction""" | |
| # Remove extension | |
| base_name = re.sub(r'\.(html?|xhtml)$', '', filename, flags=re.IGNORECASE) | |
| # Web-scraped filename patterns | |
| patterns = [ | |
| # "theend-chapter-127-apocalypse-7" -> "Chapter 127 - Apocalypse 7" | |
| r'(?:theend|story|novel)[-_]chapter[-_](\d+)[-_](.+)', | |
| # "chapter-127-apocalypse-7" -> "Chapter 127 - Apocalypse 7" | |
| r'chapter[-_](\d+)[-_](.+)', | |
| # "ch127-title" -> "Chapter 127 - Title" | |
| r'ch[-_]?(\d+)[-_](.+)', | |
| # Just the title part after number | |
| r'^\d+[-_](.+)', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, base_name, re.IGNORECASE) | |
| if match: | |
| if match.lastindex == 2: # Pattern with chapter number and title | |
| chapter_num = match.group(1) | |
| title_part = match.group(2) | |
| else: # Pattern with just title | |
| chapter_num = str(num) | |
| title_part = match.group(1) | |
| # Clean up the title part | |
| title_part = title_part.replace('-', ' ').replace('_', ' ') | |
| # Capitalize properly | |
| words = title_part.split() | |
| title_part = ' '.join(word.capitalize() if len(word) > 2 else word for word in words) | |
| title = f"Chapter {chapter_num} - {title_part}" | |
| self.log(f"📝 Fallback extracted title from filename: '{title}'") | |
| return title | |
| return None | |
| def _load_metadata(self) -> dict: | |
| """Load metadata from JSON file""" | |
| if os.path.exists(self.metadata_path): | |
| try: | |
| import html | |
| with open(self.metadata_path, 'r', encoding='utf-8') as f: | |
| metadata = json.load(f) | |
| self.log("[DEBUG] Metadata loaded successfully") | |
| return metadata | |
| except Exception as e: | |
| self.log(f"[WARNING] Failed to load metadata.json: {e}") | |
| else: | |
| self.log("[WARNING] metadata.json not found, using defaults") | |
| return {} | |
| def _create_book(self, metadata: dict) -> epub.EpubBook: | |
| """Create and configure EPUB book with complete metadata""" | |
| book = epub.EpubBook() | |
| # Set identifier | |
| book.set_identifier(metadata.get("identifier", f"translated-{os.path.basename(self.base_dir)}")) | |
| # Fix encoding issues in titles before using them | |
| if metadata.get('title'): | |
| metadata['title'] = self._fix_encoding_issues(metadata['title']) | |
| if metadata.get('original_title'): | |
| metadata['original_title'] = self._fix_encoding_issues(metadata['original_title']) | |
| # Determine title | |
| book_title = self._determine_book_title(metadata) | |
| book.set_title(book_title) | |
| # Set language | |
| book.set_language(metadata.get("language", "en")) | |
| # Store original title as alternative metadata (not as another dc:title) | |
| # This prevents EPUB readers from getting confused about which title to display | |
| if metadata.get('original_title') and metadata.get('original_title') != book_title: | |
| # Use 'alternative' field instead of 'title' to avoid display issues | |
| book.add_metadata('DC', 'alternative', metadata['original_title']) | |
| # Also store in a custom field for reference | |
| book.add_metadata('calibre', 'original_title', metadata['original_title']) | |
| self.log(f"[INFO] Stored original title as alternative: {metadata['original_title']}") | |
| # Set author/creator | |
| if metadata.get("creator"): | |
| book.add_author(metadata["creator"]) | |
| self.log(f"[INFO] Set author: {metadata['creator']}") | |
| # ADD DESCRIPTION - This is what Calibre looks for | |
| if metadata.get("description"): | |
| # Clean the description of any HTML entities | |
| description = HTMLEntityDecoder.decode(str(metadata["description"])) | |
| book.add_metadata('DC', 'description', description) | |
| self.log(f"[INFO] Set description: {description[:100]}..." if len(description) > 100 else f"[INFO] Set description: {description}") | |
| # Add publisher | |
| if metadata.get("publisher"): | |
| book.add_metadata('DC', 'publisher', metadata["publisher"]) | |
| self.log(f"[INFO] Set publisher: {metadata['publisher']}") | |
| # Add publication date | |
| if metadata.get("date"): | |
| book.add_metadata('DC', 'date', metadata["date"]) | |
| self.log(f"[INFO] Set date: {metadata['date']}") | |
| # Add rights/copyright | |
| if metadata.get("rights"): | |
| book.add_metadata('DC', 'rights', metadata["rights"]) | |
| self.log(f"[INFO] Set rights: {metadata['rights']}") | |
| # Add subject/genre/tags | |
| if metadata.get("subject"): | |
| if isinstance(metadata["subject"], list): | |
| for subject in metadata["subject"]: | |
| book.add_metadata('DC', 'subject', subject) | |
| self.log(f"[INFO] Added subject: {subject}") | |
| else: | |
| book.add_metadata('DC', 'subject', metadata["subject"]) | |
| self.log(f"[INFO] Set subject: {metadata['subject']}") | |
| # Add series information if available | |
| if metadata.get("series"): | |
| # Calibre uses a custom metadata field for series | |
| book.add_metadata('calibre', 'series', metadata["series"]) | |
| self.log(f"[INFO] Set series: {metadata['series']}") | |
| # Add series index if available | |
| if metadata.get("series_index"): | |
| book.add_metadata('calibre', 'series_index', str(metadata["series_index"])) | |
| self.log(f"[INFO] Set series index: {metadata['series_index']}") | |
| # Add custom metadata for translator info | |
| if metadata.get("translator"): | |
| book.add_metadata('DC', 'contributor', metadata["translator"], {'role': 'translator'}) | |
| self.log(f"[INFO] Set translator: {metadata['translator']}") | |
| # Add source information | |
| if metadata.get("source"): | |
| book.add_metadata('DC', 'source', metadata["source"]) | |
| self.log(f"[INFO] Set source: {metadata['source']}") | |
| # Add any ISBN if available | |
| if metadata.get("isbn"): | |
| book.add_metadata('DC', 'identifier', f"ISBN:{metadata['isbn']}", {'scheme': 'ISBN'}) | |
| self.log(f"[INFO] Set ISBN: {metadata['isbn']}") | |
| # Add coverage (geographic/temporal scope) if available | |
| if metadata.get("coverage"): | |
| book.add_metadata('DC', 'coverage', metadata["coverage"]) | |
| self.log(f"[INFO] Set coverage: {metadata['coverage']}") | |
| # Add any custom metadata that might be in the JSON | |
| # This handles any additional fields that might be present | |
| custom_metadata_fields = [ | |
| 'contributor', 'format', 'relation', 'type' | |
| ] | |
| for field in custom_metadata_fields: | |
| if metadata.get(field): | |
| book.add_metadata('DC', field, metadata[field]) | |
| self.log(f"[INFO] Set {field}: {metadata[field]}") | |
| return book | |
| def _determine_book_title(self, metadata: dict) -> str: | |
| """Determine the book title from metadata""" | |
| # Try translated title | |
| if metadata.get('title') and str(metadata['title']).strip(): | |
| title = str(metadata['title']).strip() | |
| self.log(f"✅ Using translated title: '{title}'") | |
| return title | |
| # Try original title | |
| if metadata.get('original_title') and str(metadata['original_title']).strip(): | |
| title = str(metadata['original_title']).strip() | |
| self.log(f"⚠️ Using original title: '{title}'") | |
| return title | |
| # Fallback to directory name | |
| title = os.path.basename(self.base_dir) | |
| self.log(f"📁 Using directory name: '{title}'") | |
| return title | |
| def _create_default_css(self) -> str: | |
| """Create default CSS for proper chapter formatting""" | |
| return """ | |
| /* Default EPUB CSS */ | |
| body { | |
| margin: 1em; | |
| padding: 0; | |
| font-family: serif; | |
| line-height: 1.6; | |
| } | |
| h1, h2, h3, h4, h5, h6 { | |
| font-weight: bold; | |
| margin-top: 1em; | |
| margin-bottom: 0.5em; | |
| page-break-after: avoid; | |
| } | |
| h1 { | |
| font-size: 1.5em; | |
| text-align: center; | |
| margin-top: 2em; | |
| margin-bottom: 2em; | |
| } | |
| p { | |
| margin: 1em 0; | |
| text-indent: 0; | |
| } | |
| img { | |
| max-width: 100%; | |
| height: auto; | |
| display: block; | |
| margin: 1em auto; | |
| } | |
| /* Prevent any overlay issues */ | |
| * { | |
| position: static !important; | |
| z-index: auto !important; | |
| } | |
| /* Remove any floating elements */ | |
| .title, [class*="title"] { | |
| position: static !important; | |
| float: none !important; | |
| background: transparent !important; | |
| } | |
| """ | |
| def _add_css_files(self, book: epub.EpubBook) -> List[epub.EpubItem]: | |
| """Add CSS files to book""" | |
| css_items = [] | |
| # First, add a default CSS to ensure proper formatting | |
| default_css = epub.EpubItem( | |
| uid="css_default", | |
| file_name="css/default.css", | |
| media_type="text/css", | |
| content=FileUtils.ensure_bytes(self._create_default_css()) | |
| ) | |
| book.add_item(default_css) | |
| css_items.append(default_css) | |
| self.log("✅ Added default CSS") | |
| # Then add user CSS files | |
| if not os.path.isdir(self.css_dir): | |
| return css_items | |
| css_files = [f for f in sorted(os.listdir(self.css_dir)) if f.endswith('.css')] | |
| self.log(f"[DEBUG] Found {len(css_files)} CSS files") | |
| for css_file in css_files: | |
| css_path = os.path.join(self.css_dir, css_file) | |
| try: | |
| import html | |
| with open(css_path, 'r', encoding='utf-8') as f: | |
| css_content = f.read() | |
| css_item = epub.EpubItem( | |
| uid=f"css_{css_file}", | |
| file_name=f"css/{css_file}", | |
| media_type="text/css", | |
| content=FileUtils.ensure_bytes(css_content) | |
| ) | |
| book.add_item(css_item) | |
| css_items.append(css_item) | |
| self.log(f"✅ Added CSS: {css_file}") | |
| except Exception as e: | |
| self.log(f"[WARNING] Failed to add CSS {css_file}: {e}") | |
| return css_items | |
| def _add_fonts(self, book: epub.EpubBook): | |
| """Add font files to book""" | |
| if not os.path.isdir(self.fonts_dir): | |
| return | |
| for font_file in os.listdir(self.fonts_dir): | |
| font_path = os.path.join(self.fonts_dir, font_file) | |
| if not os.path.isfile(font_path): | |
| continue | |
| try: | |
| mime_type = 'application/font-woff' | |
| if font_file.endswith('.ttf'): | |
| mime_type = 'font/ttf' | |
| elif font_file.endswith('.otf'): | |
| mime_type = 'font/otf' | |
| elif font_file.endswith('.woff2'): | |
| mime_type = 'font/woff2' | |
| with open(font_path, 'rb') as f: | |
| book.add_item(epub.EpubItem( | |
| uid=f"font_{font_file}", | |
| file_name=f"fonts/{font_file}", | |
| media_type=mime_type, | |
| content=f.read() | |
| )) | |
| self.log(f"✅ Added font: {font_file}") | |
| except Exception as e: | |
| self.log(f"[WARNING] Failed to add font {font_file}: {e}") | |
| def _process_images(self) -> Tuple[Dict[str, str], Optional[str]]: | |
| """Process images using parallel processing""" | |
| processed_images = {} | |
| cover_file = None | |
| try: | |
| # Find the images directory | |
| actual_images_dir = None | |
| possible_dirs = [ | |
| self.images_dir, | |
| os.path.join(self.base_dir, "images"), | |
| os.path.join(self.output_dir, "images"), | |
| ] | |
| for test_dir in possible_dirs: | |
| self.log(f"[DEBUG] Checking for images in: {test_dir}") | |
| if os.path.isdir(test_dir): | |
| files = os.listdir(test_dir) | |
| if files: | |
| self.log(f"[DEBUG] Found {len(files)} files in {test_dir}") | |
| actual_images_dir = test_dir | |
| break | |
| if not actual_images_dir: | |
| self.log("[WARNING] No images directory found or directory is empty") | |
| return processed_images, cover_file | |
| self.images_dir = actual_images_dir | |
| self.log(f"[INFO] Using images directory: {self.images_dir}") | |
| # Get list of files to process | |
| image_files = sorted(os.listdir(self.images_dir)) | |
| self.log(f"🖼️ Processing {len(image_files)} potential images with {self.max_workers} workers") | |
| def process_single_image(img): | |
| """Worker function to process a single image""" | |
| path = os.path.join(self.images_dir, img) | |
| if not os.path.isfile(path): | |
| return None | |
| # Check MIME type | |
| ctype, _ = mimetypes.guess_type(path) | |
| # If MIME type detection fails, check extension | |
| if not ctype: | |
| ext = os.path.splitext(img)[1].lower() | |
| mime_map = { | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.bmp': 'image/bmp', | |
| '.webp': 'image/webp', | |
| '.svg': 'image/svg+xml' | |
| } | |
| ctype = mime_map.get(ext) | |
| if ctype and ctype.startswith("image"): | |
| safe_name = FileUtils.sanitize_filename(img, allow_unicode=False) | |
| # Ensure extension | |
| if not os.path.splitext(safe_name)[1]: | |
| ext = os.path.splitext(img)[1] | |
| if ext: | |
| safe_name += ext | |
| elif ctype == 'image/jpeg': | |
| safe_name += '.jpg' | |
| elif ctype == 'image/png': | |
| safe_name += '.png' | |
| # Special handling for SVG: rasterize to PNG fallback for reader compatibility | |
| if ctype == 'image/svg+xml' and self.rasterize_svg and self._cairosvg_available: | |
| try: | |
| from cairosvg import svg2png | |
| png_name = os.path.splitext(safe_name)[0] + '.png' | |
| png_path = os.path.join(self.images_dir, png_name) | |
| # Generate PNG only if not already present | |
| if not os.path.exists(png_path): | |
| svg2png(url=path, write_to=png_path) | |
| self.log(f" 🖼️ Rasterized SVG → PNG: {img} -> {png_name}") | |
| # Return the PNG as the image to include | |
| return (png_name, png_name, 'image/png') | |
| except Exception as e: | |
| self.log(f"[WARNING] SVG rasterization failed for {img}: {e}") | |
| # Fall back to adding the raw SVG | |
| return (img, safe_name, ctype) | |
| return (img, safe_name, ctype) | |
| else: | |
| return None | |
| # Process images in parallel | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| futures = [executor.submit(process_single_image, img) for img in image_files] | |
| completed = 0 | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| completed += 1 | |
| if result: | |
| original, safe, ctype = result | |
| processed_images[original] = safe | |
| self.log(f" [{completed}/{len(image_files)}] ✅ Processed: {original} -> {safe}") | |
| else: | |
| self.log(f" [{completed}/{len(image_files)}] ⏭️ Skipped non-image file") | |
| except Exception as e: | |
| completed += 1 | |
| self.log(f" [{completed}/{len(image_files)}] ❌ Failed to process image: {e}") | |
| # Find cover (sequential - quick operation) | |
| # Respect user preference to disable automatic cover creation | |
| disable_auto_cover = os.environ.get('DISABLE_AUTOMATIC_COVER_CREATION', '0') == '1' | |
| if processed_images and not disable_auto_cover: | |
| cover_prefixes = ['cover', 'front'] | |
| for original_name, safe_name in processed_images.items(): | |
| name_lower = original_name.lower() | |
| if any(name_lower.startswith(prefix) for prefix in cover_prefixes): | |
| cover_file = safe_name | |
| self.log(f"📔 Found cover image: {original_name} -> {cover_file}") | |
| break | |
| if not cover_file: | |
| cover_file = next(iter(processed_images.values())) | |
| self.log(f"📔 Using first image as cover: {cover_file}") | |
| self.log(f"✅ Processed {len(processed_images)} images successfully") | |
| except Exception as e: | |
| self.log(f"[ERROR] Error processing images: {e}") | |
| import traceback | |
| self.log(f"[DEBUG] Traceback: {traceback.format_exc()}") | |
| return processed_images, cover_file | |
| def _add_images_to_book(self, book: epub.EpubBook, processed_images: Dict[str, str], | |
| cover_file: Optional[str]): | |
| """Add images to book using parallel processing for reading files""" | |
| # Filter out cover image | |
| images_to_add = [(orig, safe) for orig, safe in processed_images.items() | |
| if safe != cover_file] | |
| if not images_to_add: | |
| self.log("No images to add (besides cover)") | |
| return | |
| self.log(f"📚 Adding {len(images_to_add)} images to EPUB with {self.max_workers} workers") | |
| def read_image_file(image_data): | |
| """Worker function to read image file""" | |
| original_name, safe_name = image_data | |
| img_path = os.path.join(self.images_dir, original_name) | |
| try: | |
| ctype, _ = mimetypes.guess_type(img_path) | |
| if not ctype: | |
| ctype = "image/jpeg" # Default fallback | |
| with open(img_path, 'rb') as f: | |
| content = f.read() | |
| return { | |
| 'original': original_name, | |
| 'safe': safe_name, | |
| 'ctype': ctype, | |
| 'content': content, | |
| 'success': True | |
| } | |
| except Exception as e: | |
| return { | |
| 'original': original_name, | |
| 'safe': safe_name, | |
| 'error': str(e), | |
| 'success': False | |
| } | |
| # Read all images in parallel | |
| image_data_list = [] | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| futures = [executor.submit(read_image_file, img_data) for img_data in images_to_add] | |
| completed = 0 | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| completed += 1 | |
| if result['success']: | |
| image_data_list.append(result) | |
| self.log(f" [{completed}/{len(images_to_add)}] ✅ Read: {result['original']}") | |
| else: | |
| self.log(f" [{completed}/{len(images_to_add)}] ❌ Failed: {result['original']} - {result['error']}") | |
| except Exception as e: | |
| completed += 1 | |
| self.log(f" [{completed}/{len(images_to_add)}] ❌ Exception reading image: {e}") | |
| # Add images to book sequentially (required by ebooklib) | |
| self.log("\n📦 Adding images to EPUB structure...") | |
| added = 0 | |
| for img_data in image_data_list: | |
| try: | |
| book.add_item(epub.EpubItem( | |
| uid=img_data['safe'], | |
| file_name=f"images/{img_data['safe']}", | |
| media_type=img_data['ctype'], | |
| content=img_data['content'] | |
| )) | |
| added += 1 | |
| self.log(f" ✅ Added: {img_data['original']}") | |
| except Exception as e: | |
| self.log(f" ❌ Failed to add {img_data['original']} to EPUB: {e}") | |
| self.log(f"✅ Successfully added {added}/{len(images_to_add)} images to EPUB") | |
| def _create_cover_page(self, book: epub.EpubBook, cover_file: str, | |
| processed_images: Dict[str, str], css_items: List[epub.EpubItem], | |
| metadata: dict) -> Optional[epub.EpubHtml]: | |
| """Create cover page""" | |
| # Find original filename | |
| original_cover = None | |
| for orig, safe in processed_images.items(): | |
| if safe == cover_file: | |
| original_cover = orig | |
| break | |
| if not original_cover: | |
| return None | |
| cover_path = os.path.join(self.images_dir, original_cover) | |
| try: | |
| with open(cover_path, 'rb') as f: | |
| cover_data = f.read() | |
| # Add cover image | |
| cover_img = epub.EpubItem( | |
| uid="cover-image", | |
| file_name=f"images/{cover_file}", | |
| media_type=mimetypes.guess_type(cover_path)[0] or "image/jpeg", | |
| content=cover_data | |
| ) | |
| book.add_item(cover_img) | |
| # Set cover metadata | |
| cover_img.properties = ["cover-image"] | |
| book.add_metadata('http://purl.org/dc/elements/1.1/', 'cover', 'cover-image') | |
| # Create cover page | |
| cover_page = epub.EpubHtml( | |
| title="Cover", | |
| file_name="cover.xhtml", | |
| lang=metadata.get("language", "en") | |
| ) | |
| # Build cover HTML directly without going through ensure_compliance | |
| # Since it's simple and controlled, we can build it directly | |
| cover_content = f'''<?xml version="1.0" encoding="utf-8"?> | |
| <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> | |
| <html xmlns="http://www.w3.org/1999/xhtml"> | |
| <head> | |
| <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> | |
| <title>Cover</title> | |
| </head> | |
| <body> | |
| <div style="text-align: center;"> | |
| <img src="images/{cover_file}" alt="Cover" style="max-width: 100%; height: auto;" /> | |
| </div> | |
| </body> | |
| </html>''' | |
| cover_page.content = cover_content.encode('utf-8') | |
| # Associate CSS with cover page if needed | |
| if self.attach_css_to_chapters: | |
| for css_item in css_items: | |
| cover_page.add_item(css_item) | |
| book.add_item(cover_page) | |
| self.log(f"✅ Set cover image: {cover_file}") | |
| return cover_page | |
| except Exception as e: | |
| self.log(f"[WARNING] Failed to add cover: {e}") | |
| return None | |
| def _process_chapter_images(self, xhtml_content: str, processed_images: Dict[str, str]) -> str: | |
| """Process image paths and inline SVG in chapter content. | |
| - Rewrites <img src> to use images/ paths and prefers PNG fallback for SVGs. | |
| - Converts inline <svg> elements to <img src="data:image/png;base64,..."> when CairoSVG is available. | |
| """ | |
| try: | |
| soup = BeautifulSoup(xhtml_content, 'lxml') | |
| changed = False | |
| # Debug: Log what images we're looking for | |
| self.log(f"[DEBUG] Processing chapter images. Available images: {list(processed_images.keys())}") | |
| # 1) Handle <img> tags that reference files | |
| for img in soup.find_all('img'): | |
| src = img.get('src', '') | |
| if not src: | |
| self.log(f"[WARNING] Image tag with no src attribute found") | |
| continue | |
| # Get the base filename - handle various path formats | |
| # Remove query parameters first | |
| clean_src = src.split('?')[0] | |
| basename = os.path.basename(clean_src) | |
| # Debug: Log what we're looking for | |
| self.log(f"[DEBUG] Looking for image: {basename} (from src: {src})") | |
| # Look up the safe name | |
| if basename in processed_images: | |
| safe_name = processed_images[basename] | |
| new_src = f"images/{safe_name}" | |
| if src != new_src: | |
| self.log(f"[DEBUG] Updating image src: {src} -> {new_src}") | |
| img['src'] = new_src | |
| changed = True | |
| else: | |
| # Try without extension variations | |
| name_without_ext = os.path.splitext(basename)[0] | |
| found = False | |
| for original_name, safe_name in processed_images.items(): | |
| if os.path.splitext(original_name)[0] == name_without_ext: | |
| new_src = f"images/{safe_name}" | |
| self.log(f"[DEBUG] Found image by name match: {src} -> {new_src}") | |
| img['src'] = new_src | |
| changed = True | |
| found = True | |
| break | |
| if not found: | |
| self.log(f"[WARNING] Image not found in processed_images: {basename}") | |
| # Still update the path to use images/ prefix if it doesn't have it | |
| if not src.startswith('images/'): | |
| img['src'] = f"images/{basename}" | |
| changed = True | |
| # Ensure alt attribute exists (required for XHTML) | |
| if not img.get('alt'): | |
| img['alt'] = '' | |
| changed = True | |
| # 2) Convert inline SVG wrappers that point to raster images into plain <img> | |
| # Example: <svg ...><image xlink:href="../images/00002.jpeg"/></svg> | |
| for svg_tag in soup.find_all('svg'): | |
| try: | |
| image_child = svg_tag.find('image') | |
| if image_child: | |
| href = ( | |
| image_child.get('xlink:href') or | |
| image_child.get('href') or | |
| image_child.get('{http://www.w3.org/1999/xlink}href') | |
| ) | |
| if href: | |
| clean_href = href.split('?')[0] | |
| basename = os.path.basename(clean_href) | |
| # Map to processed image name | |
| if basename in processed_images: | |
| safe_name = processed_images[basename] | |
| else: | |
| name_wo = os.path.splitext(basename)[0] | |
| safe_name = None | |
| for orig, safe in processed_images.items(): | |
| if os.path.splitext(orig)[0] == name_wo: | |
| safe_name = safe | |
| break | |
| new_src = f"images/{safe_name}" if safe_name else f"images/{basename}" | |
| new_img = soup.new_tag('img') | |
| new_img['src'] = new_src | |
| new_img['alt'] = svg_tag.get('aria-label') or svg_tag.get('title') or '' | |
| new_img['style'] = 'width:100%; height:auto; display:block;' | |
| svg_tag.replace_with(new_img) | |
| changed = True | |
| self.log(f"[DEBUG] Rewrote inline SVG<image> to <img src='{new_src}'>") | |
| except Exception as e: | |
| self.log(f"[WARNING] Failed to rewrite inline SVG wrapper: {e}") | |
| # 3) Convert remaining inline <svg> (complex vector art) to PNG data URIs if possible | |
| if self.rasterize_svg and self._cairosvg_available: | |
| try: | |
| from cairosvg import svg2png | |
| import base64 | |
| for svg_tag in soup.find_all('svg'): | |
| try: | |
| svg_markup = str(svg_tag) | |
| png_bytes = svg2png(bytestring=svg_markup.encode('utf-8')) | |
| b64 = base64.b64encode(png_bytes).decode('ascii') | |
| alt_text = svg_tag.get('aria-label') or svg_tag.get('title') or '' | |
| new_img = soup.new_tag('img') | |
| new_img['src'] = f'data:image/png;base64,{b64}' | |
| new_img['alt'] = alt_text | |
| new_img['style'] = 'width:100%; height:auto; display:block;' | |
| svg_tag.replace_with(new_img) | |
| changed = True | |
| self.log("[DEBUG] Converted inline <svg> to PNG data URI") | |
| except Exception as e: | |
| self.log(f"[WARNING] Failed to rasterize inline SVG: {e}") | |
| except Exception: | |
| pass | |
| if changed: | |
| # Return the modified content | |
| return str(soup) | |
| return xhtml_content | |
| except Exception as e: | |
| self.log(f"[WARNING] Failed to process images in chapter: {e}") | |
| return xhtml_content | |
| def _create_gallery_page(self, book: epub.EpubBook, images: List[str], | |
| css_items: List[epub.EpubItem], metadata: dict) -> epub.EpubHtml: | |
| """Create image gallery page - FIXED to avoid escaping HTML tags""" | |
| gallery_page = epub.EpubHtml( | |
| title="Gallery", | |
| file_name="gallery.xhtml", | |
| lang=metadata.get("language", "en") | |
| ) | |
| # Build the gallery body content | |
| gallery_body_parts = ['<h1>Image Gallery</h1>'] | |
| for img in images: | |
| gallery_body_parts.append( | |
| f'<div style="text-align: center; margin: 20px;">' | |
| f'<img src="images/{img}" alt="{img}" />' | |
| f'</div>' | |
| ) | |
| gallery_body_content = '\n'.join(gallery_body_parts) | |
| # Build XHTML directly without going through ensure_compliance | |
| # which might escape our HTML tags | |
| css_links = [f"css/{item.file_name.split('/')[-1]}" for item in css_items] | |
| # Build the complete XHTML document manually | |
| xhtml_content = f'''<?xml version="1.0" encoding="utf-8"?> | |
| <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> | |
| <html xmlns="http://www.w3.org/1999/xhtml" xmlns:epub="http://www.idpf.org/2007/ops"> | |
| <head> | |
| <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> | |
| <title>Gallery</title>''' | |
| # Add CSS links | |
| for css_link in css_links: | |
| xhtml_content += f'\n<link rel="stylesheet" type="text/css" href="{css_link}" />' | |
| xhtml_content += f''' | |
| </head> | |
| <body> | |
| {gallery_body_content} | |
| </body> | |
| </html>''' | |
| # Validate the XHTML | |
| validated_content = XHTMLConverter.validate(xhtml_content) | |
| # Set the content | |
| gallery_page.content = FileUtils.ensure_bytes(validated_content) | |
| # Associate CSS with gallery page | |
| if self.attach_css_to_chapters: | |
| for css_item in css_items: | |
| gallery_page.add_item(css_item) | |
| book.add_item(gallery_page) | |
| return gallery_page | |
| def _create_nav_content(self, toc_items, book_title="Book"): | |
| """Create navigation content manually""" | |
| nav_content = '''<?xml version="1.0" encoding="utf-8"?> | |
| <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> | |
| <html xmlns="http://www.w3.org/1999/xhtml" xmlns:epub="http://www.idpf.org/2007/ops"> | |
| <head> | |
| <title>Table of Contents</title> | |
| </head> | |
| <body> | |
| <nav epub:type="toc" id="toc"> | |
| <h1>Table of Contents</h1> | |
| <ol>''' | |
| # The toc_items are already sorted properly by _finalize_book | |
| # Don't re-sort them here - just use them as-is | |
| for item in toc_items: | |
| if hasattr(item, 'title') and hasattr(item, 'file_name'): | |
| nav_content += f'\n<li><a href="{item.file_name}">{ContentProcessor.safe_escape(item.title)}</a></li>' | |
| nav_content += ''' | |
| </ol> | |
| </nav> | |
| </body> | |
| </html>''' | |
| return nav_content | |
| def _get_order_from_progress_file(self, progress_file: str) -> Dict[str, int]: | |
| """Get chapter order from translation_progress.json | |
| Returns dict mapping original_filename -> chapter_number | |
| """ | |
| try: | |
| with open(progress_file, 'r', encoding='utf-8') as f: | |
| progress_data = json.load(f) | |
| filename_to_order = {} | |
| # Extract chapter order from progress data | |
| chapters = progress_data.get('chapters', {}) | |
| for chapter_key, chapter_info in chapters.items(): | |
| # Get the original basename from progress data | |
| original_basename = chapter_info.get('original_basename', '') | |
| if original_basename: | |
| # Map to chapter position (key is usually the chapter number) | |
| try: | |
| chapter_num = int(chapter_key) | |
| filename_to_order[original_basename] = chapter_num - 1 # Convert to 0-based | |
| self.log(f" Progress mapping: {original_basename} -> Chapter {chapter_num}") | |
| except (ValueError, TypeError): | |
| pass | |
| return filename_to_order if filename_to_order else None | |
| except Exception as e: | |
| self.log(f"⚠️ Error reading translation_progress.json: {e}") | |
| return None | |
| def _finalize_book(self, book: epub.EpubBook, spine: List, toc: List, | |
| cover_file: Optional[str]): | |
| """Finalize book structure""" | |
| # Check if we should use NCX-only | |
| use_ncx_only = os.environ.get('FORCE_NCX_ONLY', '0') == '1' | |
| # Check if first item in spine is a cover | |
| has_cover = False | |
| cover_item = None | |
| if spine and len(spine) > 0: | |
| first_item = spine[0] | |
| if hasattr(first_item, 'title') and first_item.title == "Cover": | |
| has_cover = True | |
| cover_item = first_item | |
| spine = spine[1:] # Remove cover from spine temporarily | |
| # DEBUG: Log what we have before sorting | |
| self.log("\n[DEBUG] Before sorting TOC:") | |
| self.log("Spine order:") | |
| for idx, item in enumerate(spine): | |
| if hasattr(item, 'file_name') and hasattr(item, 'title'): | |
| self.log(f" Spine[{idx}]: {item.file_name} -> {item.title}") | |
| #self.log("\nTOC order (before sorting):") | |
| for idx, item in enumerate(toc): | |
| if hasattr(item, 'file_name') and hasattr(item, 'title'): | |
| self.log(f" TOC[{idx}]: {item.file_name} -> {item.title}") | |
| # CRITICAL FIX: Sort TOC to match spine order | |
| # Create a mapping of file_name to spine position | |
| spine_order = {} | |
| for idx, item in enumerate(spine): | |
| if hasattr(item, 'file_name'): | |
| spine_order[item.file_name] = idx | |
| # Sort the TOC based on spine order | |
| sorted_toc = [] | |
| unsorted_items = [] | |
| for toc_item in toc: | |
| if hasattr(toc_item, 'file_name'): | |
| if toc_item.file_name in spine_order: | |
| sorted_toc.append((spine_order[toc_item.file_name], toc_item)) | |
| else: | |
| # Items not in spine (like gallery) go at the end | |
| unsorted_items.append(toc_item) | |
| else: | |
| unsorted_items.append(toc_item) | |
| # Sort by spine position | |
| sorted_toc.sort(key=lambda x: x[0]) | |
| # Extract just the items (remove the sort key) | |
| final_toc = [item for _, item in sorted_toc] | |
| # Add any unsorted items at the end (like gallery) | |
| final_toc.extend(unsorted_items) | |
| # DEBUG: Log after sorting | |
| self.log("\nTOC order (after sorting to match spine):") | |
| for idx, item in enumerate(final_toc): | |
| if hasattr(item, 'file_name') and hasattr(item, 'title'): | |
| self.log(f" TOC[{idx}]: {item.file_name} -> {item.title}") | |
| # Set the sorted TOC | |
| book.toc = final_toc | |
| # Add NCX | |
| ncx = epub.EpubNcx() | |
| book.add_item(ncx) | |
| if use_ncx_only: | |
| self.log(f"[INFO] NCX-only navigation forced - {len(final_toc)} chapters") | |
| # Build final spine: Cover (if exists) → Chapters | |
| final_spine = [] | |
| if has_cover: | |
| final_spine.append(cover_item) | |
| final_spine.extend(spine) | |
| book.spine = final_spine | |
| self.log("📖 Using EPUB 3.3 with NCX navigation only") | |
| if has_cover: | |
| self.log("📖 Reading order: Cover → Chapters") | |
| else: | |
| self.log("📖 Reading order: Chapters") | |
| else: | |
| # Normal EPUB3 processing with Nav | |
| self.log(f"[INFO] EPUB3 format - {len(final_toc)} chapters") | |
| # Create Nav with manual content using SORTED TOC | |
| nav = epub.EpubNav() | |
| nav.content = self._create_nav_content(final_toc, book.title).encode('utf-8') | |
| nav.uid = 'nav' | |
| nav.file_name = 'nav.xhtml' | |
| book.add_item(nav) | |
| # Build final spine: Cover (if exists) → Nav → Chapters | |
| final_spine = [] | |
| if has_cover: | |
| final_spine.append(cover_item) | |
| final_spine.append(nav) | |
| final_spine.extend(spine) | |
| book.spine = final_spine | |
| self.log("📖 Using EPUB3 format with full navigation") | |
| if has_cover: | |
| self.log("📖 Reading order: Cover → Table of Contents → Chapters") | |
| else: | |
| self.log("📖 Reading order: Table of Contents → Chapters") | |
| def _write_epub(self, book: epub.EpubBook, metadata: dict): | |
| """Write EPUB file with automatic format selection""" | |
| # Determine output filename | |
| book_title = book.title | |
| if book_title and book_title != os.path.basename(self.output_dir): | |
| safe_filename = FileUtils.sanitize_filename(book_title, allow_unicode=True) | |
| out_path = os.path.join(self.output_dir, f"{safe_filename}.epub") | |
| else: | |
| base_name = os.path.basename(self.output_dir) | |
| out_path = os.path.join(self.output_dir, f"{base_name}.epub") | |
| self.log(f"\n[DEBUG] Writing EPUB to: {out_path}") | |
| # Always write as EPUB3 | |
| try: | |
| opts = {'epub3': True} | |
| epub.write_epub(out_path, book, opts) | |
| self.log("[SUCCESS] Written as EPUB 3.3") | |
| except Exception as e: | |
| self.log(f"[ERROR] Write failed: {e}") | |
| raise | |
| # Verify the file | |
| if os.path.exists(out_path): | |
| file_size = os.path.getsize(out_path) | |
| if file_size > 0: | |
| self.log(f"✅ EPUB created: {out_path}") | |
| self.log(f"📊 File size: {file_size:,} bytes ({file_size/1024/1024:.2f} MB)") | |
| self.log("📝 Format: EPUB 3.3") | |
| else: | |
| raise Exception("EPUB file is empty") | |
| else: | |
| raise Exception("EPUB file was not created") | |
| def _show_summary(self, chapter_titles_info: Dict[int, Tuple[str, float, str]], | |
| css_items: List[epub.EpubItem]): | |
| """Show compilation summary""" | |
| if chapter_titles_info: | |
| high = sum(1 for _, (_, conf, _) in chapter_titles_info.items() if conf > 0.7) | |
| medium = sum(1 for _, (_, conf, _) in chapter_titles_info.items() if 0.4 < conf <= 0.7) | |
| low = sum(1 for _, (_, conf, _) in chapter_titles_info.items() if conf <= 0.4) | |
| self.log(f"\n📊 Title Extraction Summary:") | |
| self.log(f" • High confidence: {high} chapters") | |
| self.log(f" • Medium confidence: {medium} chapters") | |
| self.log(f" • Low confidence: {low} chapters") | |
| if css_items: | |
| self.log(f"\n✅ Successfully embedded {len(css_items)} CSS files") | |
| # Gallery status | |
| if os.environ.get('DISABLE_EPUB_GALLERY', '0') == '1': | |
| self.log("\n📷 Image Gallery: Disabled by user preference") | |
| self.log("\n📱 Compatibility Notes:") | |
| self.log(" • XHTML 1.1 compliant") | |
| self.log(" • All tags properly closed") | |
| self.log(" • Special characters escaped") | |
| self.log(" • Extracted translated titles") | |
| self.log(" • Enhanced entity decoding") | |
| # Main entry point | |
| def compile_epub(base_dir: str, log_callback: Optional[Callable] = None): | |
| """Compile translated HTML files into EPUB""" | |
| compiler = EPUBCompiler(base_dir, log_callback) | |
| compiler.compile() | |
| # Legacy alias | |
| fallback_compile_epub = compile_epub | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2: | |
| print("Usage: python epub_converter.py <directory_path>") | |
| sys.exit(1) | |
| directory_path = sys.argv[1] | |
| try: | |
| compile_epub(directory_path) | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| sys.exit(1) | |