from datetime import datetime import json import os import pytz import traceback class UserHistoryManager: def __init__(self): """Initialize history record manager""" self.history_file = "user_history.json" print(f"Initializing UserHistoryManager with file: {os.path.abspath(self.history_file)}") self._init_file() def _init_file(self): """Initialize JSON file""" try: if not os.path.exists(self.history_file): print(f"Creating new history file: {self.history_file}") with open(self.history_file, 'w', encoding='utf-8') as f: json.dump([], f) else: print(f"History file exists: {self.history_file}") # Added a check for empty file before loading if os.path.getsize(self.history_file) > 0: with open(self.history_file, 'r', encoding='utf-8') as f: data = json.load(f) print(f"Current history entries: {len(data)}") else: print("History file is empty.") except Exception as e: print(f"Error in _init_file: {str(e)}") print(traceback.format_exc()) def save_history(self, user_preferences: dict = None, results: list = None, search_type: str = "criteria", description: str = None, user_description: str = None) -> bool: """ Save search history with complete result data """ try: taipei_tz = pytz.timezone('Asia/Taipei') current_time = datetime.now(taipei_tz) history_entry = { "timestamp": current_time.strftime("%Y-%m-%d %H:%M:%S"), "search_type": search_type } description_text = user_description or description if search_type == "description" and description_text: history_entry["user_description"] = description_text[:200] + "..." if len(description_text) > 200 else description_text def _to_float(x, default=0.0): try: return float(x) except Exception: return default def _to_int(x, default=0): try: return int(x) except Exception: return default if results and isinstance(results, list): processed_results = [] for i, r in enumerate(results[:15], start=1): processed_results.append({ "breed": str(r.get("breed", "Unknown")), "rank": _to_int(r.get("rank", i)), # 先拿 overall_score,沒有就退 final_score,都轉成 float "overall_score": _to_float(r.get("overall_score", r.get("final_score", 0))), # 描述搜尋常見附加分,也一併安全轉型 "semantic_score": _to_float(r.get("semantic_score", 0)), "comparative_bonus": _to_float(r.get("comparative_bonus", 0)), "lifestyle_bonus": _to_float(r.get("lifestyle_bonus", 0)), "size": str(r.get("size", "Unknown")), }) history_entry["results"] = processed_results if user_preferences: history_entry["preferences"] = { 'living_space': user_preferences.get('living_space'), 'exercise_time': user_preferences.get('exercise_time'), 'grooming_commitment': user_preferences.get('grooming_commitment'), 'experience_level': user_preferences.get('experience_level'), 'has_children': user_preferences.get('has_children'), 'noise_tolerance': user_preferences.get('noise_tolerance'), 'size_preference': user_preferences.get('size_preference') } try: history = [] if os.path.exists(self.history_file) and os.path.getsize(self.history_file) > 0: with open(self.history_file, 'r', encoding='utf-8') as f: history = json.load(f) except json.JSONDecodeError as e: print(f"JSON decode error when reading history: {str(e)}") backup_file = f"{self.history_file}.backup.{int(datetime.now().timestamp())}" if os.path.exists(self.history_file): os.rename(self.history_file, backup_file) print(f"Backed up corrupted file to {backup_file}") history = [] history.append(history_entry) history = history[-20:] # Keep recent 20 entries temp_file = f"{self.history_file}.tmp" try: with open(temp_file, 'w', encoding='utf-8') as f: json.dump(history, f, ensure_ascii=False, indent=2) os.rename(temp_file, self.history_file) except Exception as e: if os.path.exists(temp_file): os.remove(temp_file) raise print(f"Successfully saved history entry for {search_type} search.") return True except Exception as e: print(f"Error saving history: {str(e)}") print(traceback.format_exc()) return False # get_history, clear_all_history, and format_history_for_display methods remain the same as you provided def get_history(self) -> list: """Get search history""" try: print("Attempting to read history") # Debug # Check if file exists and is not empty if not os.path.exists(self.history_file): print("History file does not exist, creating empty file") with open(self.history_file, 'w', encoding='utf-8') as f: json.dump([], f) return [] # Check file size if os.path.getsize(self.history_file) == 0: print("History file is empty, initializing with empty array") with open(self.history_file, 'w', encoding='utf-8') as f: json.dump([], f) return [] # Try to read with error recovery try: with open(self.history_file, 'r', encoding='utf-8') as f: content = f.read().strip() if not content: print("File content is empty, returning empty list") return [] data = json.loads(content) print(f"Read {len(data)} history entries") # Debug return data if isinstance(data, list) else [] except json.JSONDecodeError as je: print(f"JSON decode error: {str(je)}") print(f"Corrupted content near position {je.pos}") # Backup corrupted file and create new one backup_file = f"{self.history_file}.backup" os.rename(self.history_file, backup_file) print(f"Backed up corrupted file to {backup_file}") with open(self.history_file, 'w', encoding='utf-8') as f: json.dump([], f) return [] except Exception as e: print(f"Error reading history: {str(e)}") print(traceback.format_exc()) return [] def clear_all_history(self) -> bool: """Clear all history records""" try: print("Attempting to clear all history") # Debug with open(self.history_file, 'w', encoding='utf-8') as f: json.dump([], f) print("History cleared successfully") # Debug return True except Exception as e: print(f"Error clearing history: {str(e)}") print(traceback.format_exc()) return False def format_history_for_display(self) -> str: """ Format history records for HTML display Returns: str: Formatted HTML string """ try: history = self.get_history() if not history: return """
No search history yet
Error loading history records: {str(e)}