# multi_api_key_manager.py """ Multi API Key Manager for Glossarion Handles multiple API keys with round-robin load balancing and rate limit management """ import tkinter as tk from tkinter import ttk, messagebox, scrolledtext, filedialog import ttkbootstrap as tb import json import os import threading import time import queue from typing import Dict, List, Optional, Tuple import requests from datetime import datetime, timedelta import logging from model_options import get_model_options # Dialog for configuring per-key endpoint try: from individual_endpoint_dialog import IndividualEndpointDialog except Exception: IndividualEndpointDialog = None logger = logging.getLogger(__name__) class RateLimitCache: """Thread-safe rate limit cache""" def __init__(self): self._cache = {} # key_id -> expiry_time self._lock = threading.Lock() def add_rate_limit(self, key_id: str, cooldown_seconds: int): """Add a key to rate limit cache""" with self._lock: self._cache[key_id] = time.time() + cooldown_seconds logger.info(f"Added {key_id} to rate limit cache for {cooldown_seconds}s") def is_rate_limited(self, key_id: str) -> bool: """Check if key is rate limited""" with self._lock: if key_id not in self._cache: return False if time.time() >= self._cache[key_id]: # Expired, remove it del self._cache[key_id] return False return True def clear_expired(self): """Remove expired entries""" with self._lock: current_time = time.time() expired = [k for k, v in self._cache.items() if current_time >= v] for k in expired: del self._cache[k] def get_remaining_cooldown(self, key_id: str) -> float: """Get remaining cooldown time in seconds""" with self._lock: if key_id not in self._cache: return 0 remaining = self._cache[key_id] - time.time() return max(0, remaining) class APIKeyEntry: """Enhanced API key entry with thread-safe operations""" def __init__(self, api_key: str, model: str, cooldown: int = 60, enabled: bool = True, google_credentials: str = None, azure_endpoint: str = None, google_region: str = None, azure_api_version: str = None, use_individual_endpoint: bool = False): self.api_key = api_key self.model = model self.cooldown = cooldown self.enabled = enabled self.google_credentials = google_credentials # Path to Google service account JSON self.azure_endpoint = azure_endpoint # Azure endpoint URL (only used if use_individual_endpoint is True) self.google_region = google_region # Google Cloud region (e.g., us-east5, us-central1) self.azure_api_version = azure_api_version or '2025-01-01-preview' # Azure API version self.use_individual_endpoint = use_individual_endpoint # Toggle to enable/disable individual endpoint self.last_error_time = None self.error_count = 0 self.success_count = 0 self.last_used_time = None self.times_used = 0 # Incremented whenever this key is assigned/used self.is_cooling_down = False # Add lock for thread-safe modifications self._lock = threading.Lock() # Add test result storage self.last_test_result = None self.last_test_time = None self.last_test_message = None def is_available(self) -> bool: with self._lock: if not self.enabled: return False if self.last_error_time and self.is_cooling_down: time_since_error = time.time() - self.last_error_time if time_since_error < self.cooldown: return False else: self.is_cooling_down = False return True def mark_error(self, error_code: int = None): with self._lock: self.error_count += 1 self.times_used = getattr(self, 'times_used', 0) + 1 self.last_error_time = time.time() if error_code == 429: self.is_cooling_down = True def mark_success(self): with self._lock: self.success_count += 1 self.times_used = getattr(self, 'times_used', 0) + 1 self.last_used_time = time.time() self.error_count = 0 def set_test_result(self, result: str, message: str = None): """Store test result""" with self._lock: self.last_test_result = result self.last_test_time = time.time() self.last_test_message = message def to_dict(self): """Convert to dictionary for saving""" return { 'api_key': self.api_key, 'model': self.model, 'cooldown': self.cooldown, 'enabled': self.enabled, 'google_credentials': self.google_credentials, 'azure_endpoint': self.azure_endpoint, 'google_region': self.google_region, 'azure_api_version': self.azure_api_version, 'use_individual_endpoint': self.use_individual_endpoint, # Persist times used optionally (non-breaking if ignored elsewhere) 'times_used': getattr(self, 'times_used', 0) } class APIKeyPool: """Thread-safe API key pool with proper rotation""" def __init__(self): self.keys: List[APIKeyEntry] = [] self.lock = threading.Lock() # This already exists self._rotation_index = 0 self._thread_assignments = {} self._rate_limit_cache = RateLimitCache() # NEW LOCKS: self.key_locks = {} # Will be populated when keys are loaded self.key_selection_lock = threading.Lock() # For coordinating key selection across threads # Track which keys are currently being used by which threads self._keys_in_use = {} # key_index -> set of thread_ids self._usage_lock = threading.Lock() def load_from_list(self, key_list: List[dict]): with self.lock: # Preserve existing counters by mapping old entries by (api_key, model) old_map = {} for old in getattr(self, 'keys', []): key = (getattr(old, 'api_key', ''), getattr(old, 'model', '')) old_map[key] = old self.keys.clear() self.key_locks.clear() # Clear existing locks for i, key_data in enumerate(key_list): api_key = key_data.get('api_key', '') model = key_data.get('model', '') entry = APIKeyEntry( api_key=api_key, model=model, cooldown=key_data.get('cooldown', 60), enabled=key_data.get('enabled', True), google_credentials=key_data.get('google_credentials'), azure_endpoint=key_data.get('azure_endpoint'), google_region=key_data.get('google_region'), azure_api_version=key_data.get('azure_api_version'), use_individual_endpoint=key_data.get('use_individual_endpoint', False) ) # Restore counters if we had this key before old = old_map.get((api_key, model)) if old is not None: try: entry.success_count = getattr(old, 'success_count', entry.success_count) entry.error_count = getattr(old, 'error_count', entry.error_count) entry.times_used = getattr(old, 'times_used', getattr(old, 'success_count', 0) + getattr(old, 'error_count', 0)) entry.last_used_time = getattr(old, 'last_used_time', None) entry.last_error_time = getattr(old, 'last_error_time', None) entry.is_cooling_down = getattr(old, 'is_cooling_down', False) entry.last_test_result = getattr(old, 'last_test_result', None) entry.last_test_time = getattr(old, 'last_test_time', None) entry.last_test_message = getattr(old, 'last_test_message', None) except Exception: pass self.keys.append(entry) # Create a lock for each key self.key_locks[i] = threading.Lock() # Keep rotation index if possible if getattr(self, '_rotation_index', 0) >= len(self.keys): self._rotation_index = 0 else: self._rotation_index = getattr(self, '_rotation_index', 0) self._keys_in_use.clear() logger.info(f"Loaded {len(self.keys)} API keys into pool with individual locks (preserved counters where possible)") def get_key_for_thread(self, force_rotation: bool = False, rotation_frequency: int = 1) -> Optional[Tuple[APIKeyEntry, int, str]]: """Get a key for the current thread with proper rotation logic""" thread_id = threading.current_thread().ident thread_name = threading.current_thread().name # Clear expired rate limits first self._rate_limit_cache.clear_expired() # Use key_selection_lock for the entire selection process with self.key_selection_lock: if not self.keys: return None # Check if thread already has an assignment if thread_id in self._thread_assignments and not force_rotation: key_index, assignment_time = self._thread_assignments[thread_id] if key_index < len(self.keys): key = self.keys[key_index] key_id = f"Key#{key_index+1} ({key.model})" # Check if the assigned key is still available # Use the key-specific lock for checking availability with self.key_locks.get(key_index, threading.Lock()): if key.is_available() and not self._rate_limit_cache.is_rate_limited(key_id): logger.debug(f"[Thread-{thread_name}] Reusing assigned {key_id}") # Track usage with self._usage_lock: if key_index not in self._keys_in_use: self._keys_in_use[key_index] = set() self._keys_in_use[key_index].add(thread_id) return key, key_index, key_id else: # Remove invalid assignment del self._thread_assignments[thread_id] # Find next available key using round-robin start_index = self._rotation_index attempts = 0 while attempts < len(self.keys): # Get current index and immediately increment for next thread key_index = self._rotation_index self._rotation_index = (self._rotation_index + 1) % len(self.keys) key = self.keys[key_index] key_id = f"Key#{key_index+1} ({key.model})" # Use key-specific lock when checking and modifying key state with self.key_locks.get(key_index, threading.Lock()): if key.is_available() and not self._rate_limit_cache.is_rate_limited(key_id): # Assign to thread self._thread_assignments[thread_id] = (key_index, time.time()) # Increment usage counter on assignment try: key.times_used += 1 except Exception: pass # Track usage with self._usage_lock: if key_index not in self._keys_in_use: self._keys_in_use[key_index] = set() self._keys_in_use[key_index].add(thread_id) # Clean up old assignments current_time = time.time() expired_threads = [ tid for tid, (_, ts) in self._thread_assignments.items() if current_time - ts > 300 # 5 minutes ] for tid in expired_threads: del self._thread_assignments[tid] # Remove from usage tracking with self._usage_lock: for k_idx in list(self._keys_in_use.keys()): self._keys_in_use[k_idx].discard(tid) if not self._keys_in_use[k_idx]: del self._keys_in_use[k_idx] logger.info(f"[Thread-{thread_name}] Assigned {key_id}") time.sleep(0.5) # Brief pause to improve retry responsiveness logger.debug("💤 Pausing briefly to improve retry responsiveness after key assignment") return key, key_index, key_id attempts += 1 # No available keys - find one with shortest cooldown best_key_index = None min_cooldown = float('inf') for i, key in enumerate(self.keys): if key.enabled: # At least check if enabled key_id = f"Key#{i+1} ({key.model})" remaining = self._rate_limit_cache.get_remaining_cooldown(key_id) # Also check key's own cooldown if key.is_cooling_down and key.last_error_time: key_cooldown = key.cooldown - (time.time() - key.last_error_time) remaining = max(remaining, key_cooldown) if remaining < min_cooldown: min_cooldown = remaining best_key_index = i if best_key_index is not None: key = self.keys[best_key_index] key_id = f"Key#{best_key_index+1} ({key.model})" logger.warning(f"[Thread-{thread_name}] All keys on cooldown, using {key_id} (cooldown: {min_cooldown:.1f}s)") self._thread_assignments[thread_id] = (best_key_index, time.time()) time.sleep(0.5) # Brief pause to improve retry responsiveness logger.debug("💤 Pausing briefly to improve retry responsiveness after cooldown key selection") return key, best_key_index, key_id logger.error(f"[Thread-{thread_name}] No keys available at all") return None def mark_key_error(self, key_index: int, error_code: int = None): """Mark a key as having an error (thread-safe with key-specific lock)""" if 0 <= key_index < len(self.keys): # Use key-specific lock for this operation with self.key_locks.get(key_index, threading.Lock()): # Mark error on the key itself self.keys[key_index].mark_error(error_code) # Add to rate limit cache if it's a 429 if error_code == 429: key = self.keys[key_index] key_id = f"Key#{key_index+1} ({key.model})" self._rate_limit_cache.add_rate_limit(key_id, key.cooldown) print(f"Marked key {key_id} with an error code") time.sleep(0.5) # Brief pause to improve retry responsiveness logger.debug("💤 Pausing briefly to improve retry responsiveness after marking key error") def mark_key_success(self, key_index: int): """Mark a key as successful (thread-safe with key-specific lock)""" if 0 <= key_index < len(self.keys): # Use key-specific lock for this operation with self.key_locks.get(key_index, threading.Lock()): self.keys[key_index].mark_success() key = self.keys[key_index] print(f"Marked key {key_index} ({key.model}) as successful") def release_thread_assignment(self, thread_id: int = None): """Release key assignment for a thread""" if thread_id is None: thread_id = threading.current_thread().ident with self.key_selection_lock: # Remove from assignments if thread_id in self._thread_assignments: key_index, _ = self._thread_assignments[thread_id] del self._thread_assignments[thread_id] # Remove from usage tracking with self._usage_lock: if key_index in self._keys_in_use: self._keys_in_use[key_index].discard(thread_id) if not self._keys_in_use[key_index]: del self._keys_in_use[key_index] print(f"Released key assignment for thread {thread_id}") def get_all_keys(self) -> List[APIKeyEntry]: """Get all keys in the pool""" with self.lock: return self.keys.copy() @property def current_index(self): """Get the current rotation index""" with self.lock: return self._rotation_index @current_index.setter def current_index(self, value: int): """Set the current rotation index""" with self.lock: if self.keys: self._rotation_index = value % len(self.keys) else: self._rotation_index = 0 def add_key(self, key_entry: APIKeyEntry): """Add a new key to the pool""" with self.lock: self.keys.append(key_entry) logger.info(f"Added key for model {key_entry.model} to pool") def remove_key(self, index: int): """Remove a key from the pool by index""" with self.lock: if 0 <= index < len(self.keys): removed_key = self.keys.pop(index) # Clean up any thread assignments for this key threads_to_remove = [] for thread_id, (key_index, _) in self._thread_assignments.items(): if key_index == index: threads_to_remove.append(thread_id) elif key_index > index: # Adjust indices for keys after the removed one self._thread_assignments[thread_id] = (key_index - 1, self._thread_assignments[thread_id][1]) for thread_id in threads_to_remove: del self._thread_assignments[thread_id] # Reset rotation index if needed if self._rotation_index >= len(self.keys) and len(self.keys) > 0: self._rotation_index = 0 logger.info(f"Removed key for model {removed_key.model} from pool") class MultiAPIKeyDialog: """Dialog for managing multiple API keys""" def __init__(self, parent, translator_gui): self.parent = parent self.translator_gui = translator_gui self.dialog = None # Keep a reference for icon image to avoid GC self._icon_photo_ref = None self.key_pool = APIKeyPool() self.tree = None self.test_results = queue.Queue() # Attempt to bind to UnifiedClient's shared pool so UI reflects live usage self._bind_shared_pool() # Load existing keys from config self._load_keys_from_config() # Create and show dialog self._create_dialog() # Auto-resize to fit content if WindowManager is available if hasattr(self.translator_gui, 'wm') and hasattr(self, 'canvas'): self.translator_gui.wm.auto_resize_dialog(self.dialog, self.canvas, max_width_ratio=0.9, max_height_ratio=1.55) def _set_icon(self, window): """Set Halgakos.ico as window icon if available.""" try: base_dir = getattr(self.translator_gui, 'base_dir', os.getcwd()) ico_path = os.path.join(base_dir, 'Halgakos.ico') if os.path.isfile(ico_path): try: window.iconbitmap(ico_path) except Exception: pass # Try iconphoto for better scaling try: from PIL import Image, ImageTk img = Image.open(ico_path) if img.mode != 'RGBA': img = img.convert('RGBA') self._icon_photo_ref = ImageTk.PhotoImage(img) window.iconphoto(False, self._icon_photo_ref) except Exception: pass except Exception: pass def _bind_shared_pool(self): """Bind this dialog to the UnifiedClient's shared APIKeyPool if available. If UnifiedClient has no pool yet, register our pool as the shared pool. This keeps Times Used and other counters in sync across UI and runtime. """ try: from unified_api_client import UnifiedClient # If UC already has a pool, use it; otherwise share ours if getattr(UnifiedClient, '_api_key_pool', None) is not None: self.key_pool = UnifiedClient._api_key_pool else: UnifiedClient._api_key_pool = self.key_pool except Exception: # If import fails (early load), continue with local pool pass def _load_keys_from_config(self): """Load API keys from translator GUI config""" if hasattr(self.translator_gui, 'config'): multi_api_keys = self.translator_gui.config.get('multi_api_keys', []) self.key_pool.load_from_list(multi_api_keys) def _update_rotation_display(self, *args): """Update the rotation description based on settings""" if self.force_rotation_var.get(): freq = self.rotation_frequency_var.get() if freq == 1: desc = "Keys will rotate on every request (maximum distribution)" else: desc = f"Keys will rotate every {freq} requests" else: desc = "Keys will only rotate on errors or rate limits" self.rotation_desc_label.config(text=desc) def _save_keys_to_config(self): """Save API keys and rotation settings to translator GUI config""" if hasattr(self.translator_gui, 'config'): # Convert keys to list of dicts key_list = [key.to_dict() for key in self.key_pool.get_all_keys()] self.translator_gui.config['multi_api_keys'] = key_list # Save fallback settings self.translator_gui.config['use_fallback_keys'] = self.use_fallback_var.get() # Update the parent GUI's variable to stay in sync if hasattr(self.translator_gui, 'use_fallback_keys_var'): self.translator_gui.use_fallback_keys_var.set(self.use_fallback_var.get()) # Fallback keys are already saved when added/removed # Use the current state of the toggle self.translator_gui.config['use_multi_api_keys'] = self.enabled_var.get() # Save rotation settings self.translator_gui.config['force_key_rotation'] = self.force_rotation_var.get() self.translator_gui.config['rotation_frequency'] = self.rotation_frequency_var.get() # Save config self.translator_gui.save_config(show_message=False) def _create_dialog(self): """Create the main dialog using WindowManager""" # Use WindowManager to create scrollable dialog if hasattr(self.translator_gui, 'wm'): self.dialog, scrollable_frame, self.canvas = self.translator_gui.wm.setup_scrollable( self.parent, "Multi API Key Manager", width=900, height=700, max_width_ratio=0.9, max_height_ratio=1.45 ) else: # Fallback to regular dialog self.dialog = tk.Toplevel(self.parent) self.dialog.title("Multi API Key Manager") self.dialog.geometry("900x700") scrollable_frame = self.dialog self.canvas = None # Main container with consistent padding main_frame = tk.Frame(scrollable_frame, padx=20, pady=20) main_frame.pack(fill=tk.BOTH, expand=True) # Store references self.main_frame = main_frame self.scrollable_frame = scrollable_frame # Title and description title_frame = tk.Frame(main_frame) title_frame.pack(fill=tk.X, pady=(0, 10)) tk.Label(title_frame, text="Multi API Key Management", font=('TkDefaultFont', 16, 'bold')).pack(side=tk.LEFT) # Enable/Disable toggle self.enabled_var = tk.BooleanVar(value=self.translator_gui.config.get('use_multi_api_keys', False)) tb.Checkbutton(title_frame, text="Enable Multi-Key Mode", variable=self.enabled_var, bootstyle="round-toggle", command=self._toggle_multi_key_mode).pack(side=tk.RIGHT, padx=(20, 0)) tk.Label(main_frame, text="Manage multiple API keys with automatic rotation and rate limit handling.\n" "Keys can be rotated automatically to distribute load evenly.\n" "Rate-limited keys are automatically cooled down and skipped in rotation.", font=('TkDefaultFont', 10), fg='gray', justify=tk.LEFT).pack(anchor=tk.W, pady=(0, 15)) # Rotation settings frame rotation_frame = tk.LabelFrame(main_frame, text="Rotation Settings", padx=15, pady=10) rotation_frame.pack(fill=tk.X, pady=(0, 15)) # Force rotation toggle rotation_settings = tk.Frame(rotation_frame) rotation_settings.pack(fill=tk.X) self.force_rotation_var = tk.BooleanVar(value=self.translator_gui.config.get('force_key_rotation', True)) tb.Checkbutton(rotation_settings, text="Force Key Rotation", variable=self.force_rotation_var, bootstyle="round-toggle", command=self._update_rotation_display).pack(side=tk.LEFT) # Rotation frequency tk.Label(rotation_settings, text="Every").pack(side=tk.LEFT, padx=(20, 5)) self.rotation_frequency_var = tk.IntVar(value=self.translator_gui.config.get('rotation_frequency', 1)) frequency_spinbox = tb.Spinbox(rotation_settings, from_=1, to=100, textvariable=self.rotation_frequency_var, width=5, command=self._update_rotation_display) frequency_spinbox.pack(side=tk.LEFT) # Disable mouse wheel changing values (use main GUI helper if available) try: if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'): self.translator_gui.ui.disable_spinbox_mousewheel(frequency_spinbox) except Exception: pass tk.Label(rotation_settings, text="requests").pack(side=tk.LEFT, padx=(5, 0)) # Rotation description self.rotation_desc_label = tk.Label(rotation_frame, text="", font=('TkDefaultFont', 9), fg='blue') self.rotation_desc_label.pack(anchor=tk.W, pady=(5, 0)) self._update_rotation_display() # Add key section self._create_add_key_section(main_frame) # Separator ttk.Separator(main_frame, orient='horizontal').pack(fill=tk.X, pady=15) # Key list section self._create_key_list_section(main_frame) # Create fallback container (hidden by default) self._create_fallback_section(main_frame) # Button bar at the bottom self._create_button_bar(main_frame) # Load existing keys into tree self._refresh_key_list() # Center dialog self.dialog.transient(self.parent) # Handle window close self.dialog.protocol("WM_DELETE_WINDOW", self._on_close) def _create_fallback_section(self, parent): """Create the fallback keys section at the bottom""" # Container that can be hidden self.fallback_container = tk.Frame(parent) # Always show fallback section (works in both single and multi-key mode) self.fallback_container.pack(fill=tk.X, pady=(10, 0)) # Separator ttk.Separator(self.fallback_container, orient='horizontal').pack(fill=tk.X, pady=(0, 10)) # Main fallback frame fallback_frame = tk.LabelFrame(self.fallback_container, text="Fallback Keys (For Prohibited Content)", padx=15, pady=15) fallback_frame.pack(fill=tk.X) # Description tk.Label(fallback_frame, text="Configure fallback keys that will be used when content is blocked.\n" "These should use different API keys or models that are less restrictive.\n" "In Multi-Key Mode: tried when main rotation encounters prohibited content.\n" "In Single-Key Mode: tried directly when main key fails, bypassing main key retry.", font=('TkDefaultFont', 10), fg='gray', justify=tk.LEFT).pack(anchor=tk.W, pady=(0, 10)) # Enable fallback checkbox self.use_fallback_var = tk.BooleanVar(value=self.translator_gui.config.get('use_fallback_keys', False)) tb.Checkbutton(fallback_frame, text="Enable Fallback Keys", variable=self.use_fallback_var, bootstyle="round-toggle", command=self._toggle_fallback_section).pack(anchor=tk.W, pady=(0, 10)) # Add fallback key section add_fallback_frame = tk.Frame(fallback_frame) add_fallback_frame.pack(fill=tk.X, pady=(0, 10)) # Configure grid for more columns add_fallback_frame.columnconfigure(1, weight=1) add_fallback_frame.columnconfigure(4, weight=1) # Don't give weight to column 3 to keep labels close to fields # Row 0: Fallback API Key and Model tk.Label(add_fallback_frame, text="Fallback API Key:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10), pady=5) self.fallback_key_var = tk.StringVar() self.fallback_key_entry = tb.Entry(add_fallback_frame, textvariable=self.fallback_key_var, show='*') self.fallback_key_entry.grid(row=0, column=1, sticky=tk.EW, pady=5) # Toggle fallback visibility self.show_fallback_btn = tb.Button(add_fallback_frame, text="👁", width=3, command=self._toggle_fallback_visibility) self.show_fallback_btn.grid(row=0, column=2, padx=5, pady=5) # Fallback Model tk.Label(add_fallback_frame, text="Model:").grid(row=0, column=3, sticky=tk.W, padx=(20, 10), pady=5) self.fallback_model_var = tk.StringVar() fallback_models = get_model_options() self.fallback_model_combo = tb.Combobox(add_fallback_frame, textvariable=self.fallback_model_var, values=fallback_models, state='normal') self.fallback_model_combo.grid(row=0, column=4, sticky=tk.EW, pady=5) # Block mouse wheel on combobox to avoid accidental changes try: if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'): self.translator_gui.ui.disable_spinbox_mousewheel(self.fallback_model_combo) except Exception: pass # Attach gentle autofill self._attach_model_autofill(self.fallback_model_combo, self.fallback_model_var) # Add fallback button tb.Button(add_fallback_frame, text="Add Fallback Key", command=self._add_fallback_key, bootstyle="info").grid(row=0, column=5, sticky=tk.E, padx=(10, 0), pady=5) # Row 1: Google Credentials (optional, discretely styled) tk.Label(add_fallback_frame, text="Google Creds:", font=('TkDefaultFont', 8), fg='gray').grid(row=1, column=0, sticky=tk.W, padx=(0, 10), pady=2) self.fallback_google_creds_var = tk.StringVar() self.fallback_google_creds_entry = tb.Entry(add_fallback_frame, textvariable=self.fallback_google_creds_var, font=('TkDefaultFont', 7), state='normal') self.fallback_google_creds_entry.grid(row=1, column=1, sticky=tk.EW, pady=2) # Google credentials browse button (moved closer) tb.Button(add_fallback_frame, text="📁", width=3, command=self._browse_fallback_google_credentials, bootstyle="secondary-outline").grid(row=1, column=2, padx=(5, 0), pady=2) # Google region field for fallback tk.Label(add_fallback_frame, text="Region:", font=('TkDefaultFont', 10), fg='gray').grid(row=1, column=3, sticky=tk.W, padx=(10, 5), pady=2) self.fallback_google_region_var = tk.StringVar(value='us-east5') # Default region self.fallback_google_region_entry = tb.Entry(add_fallback_frame, textvariable=self.fallback_google_region_var, font=('TkDefaultFont', 7), state='normal', width=10) self.fallback_google_region_entry.grid(row=1, column=4, sticky=tk.W, pady=2) # Row 2: Azure Endpoint (optional, discretely styled) tk.Label(add_fallback_frame, text="Azure Endpoint:", font=('TkDefaultFont', 8), fg='gray').grid(row=2, column=0, sticky=tk.W, padx=(0, 10), pady=2) self.fallback_azure_endpoint_var = tk.StringVar() self.fallback_azure_endpoint_entry = tb.Entry(add_fallback_frame, textvariable=self.fallback_azure_endpoint_var, font=('TkDefaultFont', 7), state='normal') self.fallback_azure_endpoint_entry.grid(row=2, column=1, columnspan=2, sticky=tk.EW, pady=2) # Azure API Version for fallback (small dropdown) tk.Label(add_fallback_frame, text="API Ver:", font=('TkDefaultFont', 10), fg='gray').grid(row=2, column=3, sticky=tk.W, padx=(10, 5), pady=2) self.fallback_azure_api_version_var = tk.StringVar(value='2025-01-01-preview') fallback_azure_versions = [ '2025-01-01-preview', '2024-12-01-preview', '2024-10-01-preview', '2024-08-01-preview', '2024-06-01', '2024-02-01', '2023-12-01-preview' ] self.fallback_azure_api_version_combo = ttk.Combobox(add_fallback_frame, textvariable=self.fallback_azure_api_version_var, values=fallback_azure_versions, width=18, state='normal', font=('TkDefaultFont', 7)) self.fallback_azure_api_version_combo.grid(row=2, column=4, sticky=tk.W, pady=2) # Block mouse wheel on version combobox try: if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'): self.translator_gui.ui.disable_spinbox_mousewheel(self.fallback_azure_api_version_combo) except Exception: pass # Fallback keys list self._create_fallback_list(fallback_frame) # Initially disable if checkbox is unchecked self._toggle_fallback_section() def _create_fallback_list(self, parent): """Create the fallback keys list""" list_frame = tk.Frame(parent) list_frame.pack(fill=tk.BOTH, expand=True) # Label tk.Label(list_frame, text="Fallback Keys (tried in order):", font=('TkDefaultFont', 10, 'bold')).pack(anchor=tk.W, pady=(10, 5)) # Container for tree and buttons container = tk.Frame(list_frame) container.pack(fill=tk.BOTH, expand=True) # Left side: Move buttons move_frame = tk.Frame(container) move_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5)) tk.Label(move_frame, text="Order", font=('TkDefaultFont', 9, 'bold')).pack(pady=(0, 5)) tb.Button(move_frame, text="↑", width=3, command=lambda: self._move_fallback_key('up'), bootstyle="secondary-outline").pack(pady=2) tb.Button(move_frame, text="↓", width=3, command=lambda: self._move_fallback_key('down'), bootstyle="secondary-outline").pack(pady=2) # Right side: Treeview tree_container = tk.Frame(container) tree_container.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) # Scrollbar scrollbar = ttk.Scrollbar(tree_container, orient=tk.VERTICAL) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # Treeview for fallback keys columns = ('Model', 'Status', 'Times Used') self.fallback_tree = ttk.Treeview(tree_container, columns=columns, show='tree headings', yscrollcommand=scrollbar.set, height=5) self.fallback_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.config(command=self.fallback_tree.yview) self.fallback_tree.bind('', self._show_fallback_context_menu) # Configure columns self.fallback_tree.heading('#0', text='API Key', anchor='w') self.fallback_tree.column('#0', width=220, minwidth=160, anchor='w') self.fallback_tree.heading('Model', text='Model', anchor='w') self.fallback_tree.column('Model', width=220, minwidth=140, anchor='w') self.fallback_tree.heading('Status', text='Status', anchor='center') self.fallback_tree.column('Status', width=120, minwidth=80, anchor='center') self.fallback_tree.heading('Times Used', text='Times Used', anchor='center') self.fallback_tree.column('Times Used', width=100, minwidth=70, anchor='center') # Action buttons - store reference for toggling self.fallback_action_frame = tk.Frame(list_frame) self.fallback_action_frame.pack(fill=tk.X, pady=(10, 0)) tb.Button(self.fallback_action_frame, text="Test Selected", command=self._test_selected_fallback, bootstyle="warning").pack(side=tk.LEFT, padx=(0, 5)) tb.Button(self.fallback_action_frame, text="Test All", command=self._test_all_fallbacks, bootstyle="warning").pack(side=tk.LEFT, padx=5) tb.Button(self.fallback_action_frame, text="Remove Selected", command=self._remove_selected_fallback, bootstyle="danger").pack(side=tk.LEFT, padx=5) tb.Button(self.fallback_action_frame, text="Clear All", command=self._clear_all_fallbacks, bootstyle="danger-outline").pack(side=tk.LEFT, padx=5) # Load existing fallback keys self._load_fallback_keys() def _test_all_fallbacks(self): """Test all fallback keys""" fallback_keys = self.translator_gui.config.get('fallback_keys', []) if not fallback_keys: messagebox.showwarning("Warning", "No fallback keys to test") return # Update UI to show testing status for all keys items = self.fallback_tree.get_children() for item in items: values = list(self.fallback_tree.item(item, 'values')) values[1] = "⏳ Testing..." self.fallback_tree.item(item, values=values) # Run tests in thread for all fallback keys # Ensure UnifiedClient uses the same shared pool instance try: from unified_api_client import UnifiedClient UnifiedClient._api_key_pool = self.key_pool except Exception: pass thread = threading.Thread(target=self._test_all_fallback_keys_batch) thread.daemon = True thread.start() def _test_all_fallback_keys_batch(self): """Test all fallback keys in batch""" from unified_api_client import UnifiedClient from concurrent.futures import ThreadPoolExecutor, as_completed fallback_keys = self.translator_gui.config.get('fallback_keys', []) def test_single_key(index, key_data): """Test a single fallback key""" api_key = key_data.get('api_key', '') model = key_data.get('model', '') try: client = UnifiedClient( api_key=api_key, model=model, output_dir=None ) messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Say 'API test successful' and nothing else."} ] response = client.send( messages, temperature=0.7, max_tokens=100 ) if response and isinstance(response, tuple): content, _ = response if content and "test successful" in content.lower(): return (index, True, "Passed") except Exception as e: print(f"Fallback key test failed: {e}") return (index, False, str(e)[:30]) return (index, False, "Failed") # Test all keys in parallel with ThreadPoolExecutor(max_workers=min(5, len(fallback_keys))) as executor: futures = [] for i, key_data in enumerate(fallback_keys): future = executor.submit(test_single_key, i, key_data) futures.append(future) # Process results as they complete for future in as_completed(futures): result = future.result() if result: index, success, message = result # Update UI in main thread self.dialog.after(0, lambda idx=index, s=success: self._update_fallback_test_result(idx, s)) # Show completion message successful = sum(1 for future in futures if future.result() and future.result()[1]) total = len(fallback_keys) self.dialog.after(0, lambda: self._show_status( f"Fallback test complete: {successful}/{total} passed")) def _show_fallback_context_menu(self, event): """Show context menu for fallback keys - includes model name editing""" # Select item under cursor item = self.fallback_tree.identify_row(event.y) if item: # If the clicked item is not in selection, select only it if item not in self.fallback_tree.selection(): self.fallback_tree.selection_set(item) # Create context menu menu = tk.Menu(self.dialog, tearoff=0) # Get index for position info index = self.fallback_tree.index(item) fallback_keys = self.translator_gui.config.get('fallback_keys', []) total = len(fallback_keys) # Reorder submenu if total > 1: # Only show reorder if there's more than one key reorder_menu = tk.Menu(menu, tearoff=0) if index > 0: reorder_menu.add_command(label="Move Up", command=lambda: self._move_fallback_key('up')) if index < total - 1: reorder_menu.add_command(label="Move Down", command=lambda: self._move_fallback_key('down')) menu.add_cascade(label="Reorder", menu=reorder_menu) menu.add_separator() # Add Change Model option selected_count = len(self.fallback_tree.selection()) if selected_count > 1: menu.add_command(label=f"Change Model ({selected_count} selected)", command=self._change_fallback_model_for_selected) else: menu.add_command(label="Change Model", command=self._change_fallback_model_for_selected) menu.add_separator() # Test and Remove options menu.add_command(label="Test", command=self._test_selected_fallback) menu.add_separator() menu.add_command(label="Remove", command=self._remove_selected_fallback) if total > 1: menu.add_command(label="Clear All", command=self._clear_all_fallbacks) # Show menu menu.post(event.x_root, event.y_root) def _change_fallback_model_for_selected(self): """Change model name for selected fallback keys""" selected = self.fallback_tree.selection() if not selected: return # Get fallback keys fallback_keys = self.translator_gui.config.get('fallback_keys', []) # Create simple dialog (same style as main tree) dialog = tk.Toplevel(self.dialog) dialog.title(f"Change Model for {len(selected)} Fallback Keys") dialog.geometry("400x130") dialog.transient(self.dialog) # Set icon self._set_icon(dialog) # Center the dialog dialog.update_idletasks() x = (dialog.winfo_screenwidth() // 2) - (dialog.winfo_width() // 2) y = (dialog.winfo_screenheight() // 2) - (dialog.winfo_height() // 2) dialog.geometry(f"+{x}+{y}") # Main frame main_frame = tk.Frame(dialog, padx=20, pady=20) main_frame.pack(fill=tk.BOTH, expand=True) # Label tk.Label(main_frame, text="Enter new model name (press Enter to apply):", font=('TkDefaultFont', 10)).pack(pady=(0, 10)) # Model entry with dropdown model_var = tk.StringVar() # Full model list (same as main GUI) all_models = get_model_options() model_combo = ttk.Combobox(main_frame, values=all_models, textvariable=model_var, width=45, height=12) model_combo.pack(pady=(0, 10)) # Block mouse wheel on combobox try: if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'): self.translator_gui.ui.disable_spinbox_mousewheel(model_combo) except Exception: pass # Attach gentle autofill self._attach_model_autofill(model_combo, model_var) # Get current model from first selected item as default selected_indices = [self.fallback_tree.index(item) for item in selected] if selected_indices and selected_indices[0] < len(fallback_keys): current_model = fallback_keys[selected_indices[0]].get('model', '') model_var.set(current_model) model_combo.select_range(0, tk.END) # Select all text for easy replacement def apply_change(event=None): new_model = model_var.get().strip() if new_model: # Update all selected fallback keys for item in selected: index = self.fallback_tree.index(item) if index < len(fallback_keys): fallback_keys[index]['model'] = new_model # Save to config self.translator_gui.config['fallback_keys'] = fallback_keys self.translator_gui.save_config(show_message=False) # Reload the list self._load_fallback_keys() # Show status self._show_status(f"Changed model to '{new_model}' for {len(selected)} fallback keys") dialog.destroy() # Focus on the combobox model_combo.focus() # Bind Enter key to apply dialog.bind('', apply_change) model_combo.bind('', apply_change) dialog.bind('', lambda e: dialog.destroy()) def _load_fallback_keys(self): """Load fallback keys from config""" fallback_keys = self.translator_gui.config.get('fallback_keys', []) # Clear tree for item in self.fallback_tree.get_children(): self.fallback_tree.delete(item) # Add keys to tree for key_data in fallback_keys: api_key = key_data.get('api_key', '') model = key_data.get('model', '') times_used = int(key_data.get('times_used', 0)) # Mask API key masked_key = api_key[:8] + "..." + api_key[-4:] if len(api_key) > 12 else api_key # Insert into tree self.fallback_tree.insert('', 'end', text=masked_key, values=(model, "Not tested", times_used), tags=('untested',)) # Configure tags self.fallback_tree.tag_configure('untested', foreground='gray') self.fallback_tree.tag_configure('testing', foreground='blue', font=('TkDefaultFont', 10, 'bold')) self.fallback_tree.tag_configure('passed', foreground='green') self.fallback_tree.tag_configure('failed', foreground='red') def _add_fallback_key(self): """Add a new fallback key with optional Google credentials and Azure endpoint""" api_key = self.fallback_key_var.get().strip() model = self.fallback_model_var.get().strip() google_credentials = self.fallback_google_creds_var.get().strip() or None azure_endpoint = self.fallback_azure_endpoint_var.get().strip() or None google_region = self.fallback_google_region_var.get().strip() or None azure_api_version = self.fallback_azure_api_version_var.get().strip() or None if not api_key or not model: messagebox.showerror("Error", "Please enter both API key and model name") return # Get current fallback keys fallback_keys = self.translator_gui.config.get('fallback_keys', []) # Add new key with additional fields fallback_keys.append({ 'api_key': api_key, 'model': model, 'google_credentials': google_credentials, 'azure_endpoint': azure_endpoint, 'google_region': google_region, 'azure_api_version': azure_api_version, 'times_used': 0 }) # Save to config self.translator_gui.config['fallback_keys'] = fallback_keys self.translator_gui.save_config(show_message=False) # Clear inputs self.fallback_key_var.set("") self.fallback_model_var.set("") self.fallback_google_creds_var.set("") self.fallback_azure_endpoint_var.set("") self.fallback_google_region_var.set("us-east5") self.fallback_azure_api_version_var.set('2025-01-01-preview') # Reload list self._load_fallback_keys() # Show success extras = [] if google_credentials: extras.append(f"Google: {os.path.basename(google_credentials)}") if azure_endpoint: extras.append(f"Azure: {azure_endpoint[:30]}...") extra_info = f" ({', '.join(extras)})" if extras else "" self._show_status(f"Added fallback key for model: {model}{extra_info}") def _move_fallback_key(self, direction): """Move selected fallback key up or down""" selected = self.fallback_tree.selection() if not selected: return item = selected[0] index = self.fallback_tree.index(item) # Get current fallback keys fallback_keys = self.translator_gui.config.get('fallback_keys', []) if index >= len(fallback_keys): return new_index = index if direction == 'up' and index > 0: new_index = index - 1 elif direction == 'down' and index < len(fallback_keys) - 1: new_index = index + 1 if new_index != index: # Swap keys fallback_keys[index], fallback_keys[new_index] = fallback_keys[new_index], fallback_keys[index] # Save to config self.translator_gui.config['fallback_keys'] = fallback_keys self.translator_gui.save_config(show_message=False) # Reload list self._load_fallback_keys() # Reselect item items = self.fallback_tree.get_children() if new_index < len(items): self.fallback_tree.selection_set(items[new_index]) self.fallback_tree.focus(items[new_index]) def _test_selected_fallback(self): """Test selected fallback key""" selected = self.fallback_tree.selection() if not selected: messagebox.showwarning("Warning", "Please select a fallback key to test") return index = self.fallback_tree.index(selected[0]) fallback_keys = self.translator_gui.config.get('fallback_keys', []) if index >= len(fallback_keys): return # Update UI to show testing status immediately items = self.fallback_tree.get_children() if index < len(items): item = items[index] values = list(self.fallback_tree.item(item, 'values')) values[1] = "⏳ Testing..." self.fallback_tree.item(item, values=values) key_data = fallback_keys[index] # Ensure UnifiedClient uses the same shared pool instance try: from unified_api_client import UnifiedClient UnifiedClient._api_key_pool = self.key_pool except Exception: pass # Run test in thread thread = threading.Thread(target=self._test_single_fallback_key, args=(key_data, index)) thread.daemon = True thread.start() def _update_fallback_test_result(self, index, success): """Update fallback tree item with test result and bump times used""" # Increment times_used in config fallback_keys = self.translator_gui.config.get('fallback_keys', []) if index < len(fallback_keys): try: fallback_keys[index]['times_used'] = int(fallback_keys[index].get('times_used', 0)) + 1 # Persist self.translator_gui.config['fallback_keys'] = fallback_keys self.translator_gui.save_config(show_message=False) except Exception: pass items = self.fallback_tree.get_children() if index < len(items): item = items[index] values = list(self.fallback_tree.item(item, 'values')) # Update status if len(values) < 3: values = values + [0] * (3 - len(values)) values[1] = "✅ Passed" if success else "❌ Failed" # Update times used cell try: values[2] = int(values[2]) + 1 except Exception: values[2] = 1 self.fallback_tree.item(item, values=values) def _test_single_fallback_key(self, key_data, index): """Test a single fallback key""" from unified_api_client import UnifiedClient api_key = key_data.get('api_key', '') model = key_data.get('model', '') try: client = UnifiedClient( api_key=api_key, model=model, output_dir=None ) messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Say 'API test successful' and nothing else."} ] response = client.send( messages, temperature=0.7, max_tokens=100 ) if response and isinstance(response, tuple): content, _ = response if content and "test successful" in content.lower(): # Update tree item to show success self.dialog.after(0, lambda: self._update_fallback_test_result(index, True)) return except Exception as e: print(f"Fallback key test failed: {e}") # Update tree item to show failure self.dialog.after(0, lambda: self._update_fallback_test_result(index, False)) def _remove_selected_fallback(self): """Remove selected fallback key""" selected = self.fallback_tree.selection() if not selected: return if messagebox.askyesno("Confirm", "Remove selected fallback key?"): index = self.fallback_tree.index(selected[0]) # Get current fallback keys fallback_keys = self.translator_gui.config.get('fallback_keys', []) if index < len(fallback_keys): del fallback_keys[index] # Save to config self.translator_gui.config['fallback_keys'] = fallback_keys self.translator_gui.save_config(show_message=False) # Reload list self._load_fallback_keys() self._show_status("Removed fallback key") def _clear_all_fallbacks(self): """Clear all fallback keys""" if not self.fallback_tree.get_children(): return if messagebox.askyesno("Confirm", "Remove ALL fallback keys?"): # Clear fallback keys self.translator_gui.config['fallback_keys'] = [] self.translator_gui.save_config(show_message=False) # Reload list self._load_fallback_keys() self._show_status("Cleared all fallback keys") def _toggle_fallback_section(self): """Enable/disable fallback section based on checkbox""" enabled = self.use_fallback_var.get() if enabled: # Show and enable all fallback widgets state = tk.NORMAL # Enable input widgets self.fallback_key_entry.config(state=state) self.fallback_model_combo.config(state=state) self.fallback_google_creds_entry.config(state=state) self.fallback_google_region_entry.config(state=state) self.fallback_azure_endpoint_entry.config(state=state) self.show_fallback_btn.config(state=state) # Enable add button for widget in self.fallback_key_entry.master.winfo_children(): if isinstance(widget, tb.Button) and "Add Fallback" in str(widget.cget('text')): widget.config(state=state) # Show the tree container self.fallback_tree.master.master.pack(fill=tk.BOTH, expand=True) # Show the action buttons frame if hasattr(self, 'fallback_action_frame'): self.fallback_action_frame.pack(fill=tk.X, pady=(10, 0)) else: # Hide and disable all fallback widgets state = tk.DISABLED # Disable input widgets self.fallback_key_entry.config(state=state) self.fallback_model_combo.config(state=state) self.fallback_google_creds_entry.config(state=state) self.fallback_google_region_entry.config(state=state) self.fallback_azure_endpoint_entry.config(state=state) self.show_fallback_btn.config(state=state) # Disable add button for widget in self.fallback_key_entry.master.winfo_children(): if isinstance(widget, tb.Button) and "Add Fallback" in str(widget.cget('text')): widget.config(state=state) # Hide the tree container self.fallback_tree.master.master.pack_forget() # Hide the action buttons frame if hasattr(self, 'fallback_action_frame'): self.fallback_action_frame.pack_forget() # Clear selection self.fallback_tree.selection_remove(*self.fallback_tree.selection()) def _toggle_fallback_visibility(self): """Toggle fallback key visibility""" if self.fallback_key_entry.cget('show') == '*': self.fallback_key_entry.config(show='') self.show_fallback_btn.config(text='🔒') else: self.fallback_key_entry.config(show='*') self.show_fallback_btn.config(text='👁') def _create_button_bar(self, parent): """Create the bottom button bar""" self.button_frame = tk.Frame(parent) self.button_frame.pack(fill=tk.X, pady=(20, 0)) # Save button tb.Button(self.button_frame, text="Save & Close", command=self._save_and_close, bootstyle="success").pack(side=tk.RIGHT, padx=(5, 0)) # Cancel button tb.Button(self.button_frame, text="Cancel", command=self._on_close, bootstyle="secondary").pack(side=tk.RIGHT) # Import/Export tb.Button(self.button_frame, text="Import", command=self._import_keys, bootstyle="info-outline").pack(side=tk.LEFT, padx=(0, 5)) tb.Button(self.button_frame, text="Export", command=self._export_keys, bootstyle="info-outline").pack(side=tk.LEFT) def _create_key_list_section(self, parent): """Create the key list section with inline editing and rearrangement controls""" list_frame = tk.LabelFrame(parent, text="API Keys", padx=15, pady=15) list_frame.pack(fill=tk.BOTH, expand=True) # Add primary key indicator frame at the top primary_frame = tk.Frame(list_frame, bg='#FF8C00', relief=tk.RAISED, bd=2) primary_frame.pack(fill=tk.X, pady=(0, 10)) self.primary_key_label = tk.Label(primary_frame, text="⭐ PRIMARY KEY: Position #1 will be used first in rotation ⭐", bg='#FF8C00', fg='white', font=('TkDefaultFont', 11, 'bold'), pady=5) self.primary_key_label.pack(fill=tk.X) # Main container with treeview and controls main_container = tk.Frame(list_frame) main_container.pack(fill=tk.BOTH, expand=True) # Left side: Move buttons move_frame = tk.Frame(main_container) move_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5)) tk.Label(move_frame, text="Reorder", font=('TkDefaultFont', 9, 'bold')).pack(pady=(0, 5)) # Move to top button tb.Button(move_frame, text="⬆⬆", width=3, command=lambda: self._move_key('top'), bootstyle="secondary-outline").pack(pady=2) # Move up button tb.Button(move_frame, text="⬆", width=3, command=lambda: self._move_key('up'), bootstyle="secondary-outline").pack(pady=2) # Move down button tb.Button(move_frame, text="⬇", width=3, command=lambda: self._move_key('down'), bootstyle="secondary-outline").pack(pady=2) # Move to bottom button tb.Button(move_frame, text="⬇⬇", width=3, command=lambda: self._move_key('bottom'), bootstyle="secondary-outline").pack(pady=2) # Spacer tk.Frame(move_frame).pack(pady=10) # Position label self.position_label = tk.Label(move_frame, text="", font=('TkDefaultFont', 9), fg='gray') self.position_label.pack() # Right side: Treeview with scrollbar tree_frame = tk.Frame(main_container) tree_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) # Scrollbar scrollbar = ttk.Scrollbar(tree_frame, orient=tk.VERTICAL) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # Treeview columns = ('Model', 'Cooldown', 'Status', 'Success', 'Errors', 'Times Used') self.tree = ttk.Treeview(tree_frame, columns=columns, show='tree headings', yscrollcommand=scrollbar.set, height=10) self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.config(command=self.tree.yview) # Configure columns with better widths and anchoring self.tree.heading('#0', text='API Key', anchor='w') self.tree.column('#0', width=180, minwidth=150, anchor='w') self.tree.heading('Model', text='Model', anchor='w') self.tree.column('Model', width=260, minwidth=160, anchor='w') self.tree.heading('Cooldown', text='Cooldown', anchor='center') self.tree.column('Cooldown', width=80, minwidth=60, anchor='center') self.tree.heading('Status', text='Status', anchor='center') self.tree.column('Status', width=160, minwidth=100, anchor='center') self.tree.heading('Success', text='✓', anchor='center') self.tree.column('Success', width=40, minwidth=30, anchor='center') self.tree.heading('Errors', text='✗', anchor='center') self.tree.column('Errors', width=40, minwidth=30, anchor='center') self.tree.heading('Times Used', text='Times Used', anchor='center') self.tree.column('Times Used', width=90, minwidth=60, anchor='center') # Configure tree style for better appearance style = ttk.Style() style.configure("Treeview.Heading", font=('TkDefaultFont', 11, 'bold')) # Bind events for inline editing self.tree.bind('', self._on_click) self.tree.bind('', self._show_context_menu) self.tree.bind('<>', self._on_selection_change) # Enable drag and drop self.tree.bind('', self._on_drag_start, add='+') self.tree.bind('', self._on_drag_motion) self.tree.bind('', self._on_drag_release) # Track editing state self.edit_widget = None # Track drag state self.drag_start_item = None self.drag_start_y = None # Action buttons action_frame = tk.Frame(list_frame) action_frame.pack(fill=tk.X, pady=(10, 0)) tb.Button(action_frame, text="Test Selected", command=self._test_selected, bootstyle="warning").pack(side=tk.LEFT, padx=(0, 5)) tb.Button(action_frame, text="Test All", command=self._test_all, bootstyle="warning").pack(side=tk.LEFT, padx=5) tb.Button(action_frame, text="Enable Selected", command=self._enable_selected, bootstyle="success").pack(side=tk.LEFT, padx=5) tb.Button(action_frame, text="Disable Selected", command=self._disable_selected, bootstyle="danger").pack(side=tk.LEFT, padx=5) tb.Button(action_frame, text="Remove Selected", command=self._remove_selected, bootstyle="danger").pack(side=tk.LEFT, padx=5) # Stats label self.stats_label = tk.Label(action_frame, text="", font=('TkDefaultFont', 11), fg='gray') self.stats_label.pack(side=tk.RIGHT) def _create_add_key_section(self, parent): """Create the add key section with Google credentials and Azure endpoint support""" add_frame = tk.LabelFrame(parent, text="Add New API Key", padx=15, pady=15) add_frame.pack(fill=tk.X, pady=(0, 10)) # Grid configuration - expand for more columns add_frame.columnconfigure(1, weight=1) add_frame.columnconfigure(4, weight=1) # Don't give weight to column 3 to keep labels close to fields # Row 0: API Key and Model tk.Label(add_frame, text="API Key:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10), pady=5) self.api_key_var = tk.StringVar() self.api_key_entry = tb.Entry(add_frame, textvariable=self.api_key_var, show='*') self.api_key_entry.grid(row=0, column=1, sticky=tk.EW, pady=5) # Toggle visibility button self.show_key_btn = tb.Button(add_frame, text="👁", width=3, command=self._toggle_key_visibility) self.show_key_btn.grid(row=0, column=2, padx=5, pady=5) # Model tk.Label(add_frame, text="Model:").grid(row=0, column=3, sticky=tk.W, padx=(20, 10), pady=5) self.model_var = tk.StringVar() add_models = get_model_options() self.model_combo = tb.Combobox(add_frame, textvariable=self.model_var, values=add_models, state='normal') self.model_combo.grid(row=0, column=4, sticky=tk.EW, pady=5) # Block mouse wheel on combobox try: if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'): self.translator_gui.ui.disable_spinbox_mousewheel(self.model_combo) except Exception: pass # Attach gentle autofill self._attach_model_autofill(self.model_combo, self.model_var) # Row 1: Cooldown and optional credentials tk.Label(add_frame, text="Cooldown (s):").grid(row=1, column=0, sticky=tk.W, padx=(0, 10), pady=5) self.cooldown_var = tk.IntVar(value=60) cooldown_frame = tk.Frame(add_frame) cooldown_frame.grid(row=1, column=1, sticky=tk.W, pady=5) cooldown_spinbox = tb.Spinbox(cooldown_frame, from_=10, to=3600, textvariable=self.cooldown_var, width=10) cooldown_spinbox.pack(side=tk.LEFT) # Disable mouse wheel for cooldown try: if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'): self.translator_gui.ui.disable_spinbox_mousewheel(cooldown_spinbox) except Exception: pass tk.Label(cooldown_frame, text="(10-3600)", font=('TkDefaultFont', 9), fg='gray').pack(side=tk.LEFT, padx=(10, 0)) # Add button and Copy Current Key button button_frame = tk.Frame(add_frame) button_frame.grid(row=1, column=4, sticky=tk.E, pady=5) tb.Button(button_frame, text="Add Key", command=self._add_key, bootstyle="success").pack(side=tk.LEFT, padx=(0, 5)) tb.Button(button_frame, text="Copy Current Key", command=self._copy_current_settings, bootstyle="info-outline").pack(side=tk.LEFT) # Row 2: Google Credentials (optional, discretely styled) tk.Label(add_frame, text="Google Creds:", font=('TkDefaultFont', 9), fg='gray').grid(row=2, column=0, sticky=tk.W, padx=(0, 10), pady=2) self.google_creds_var = tk.StringVar() self.google_creds_entry = tb.Entry(add_frame, textvariable=self.google_creds_var, font=('TkDefaultFont', 8), state='normal') self.google_creds_entry.grid(row=2, column=1, sticky=tk.EW, pady=2) # Google credentials browse button (moved closer) tb.Button(add_frame, text="📁", width=3, command=self._browse_google_credentials, bootstyle="secondary-outline").grid(row=2, column=2, padx=(5, 0), pady=2) # Google region field tk.Label(add_frame, text="Region:", font=('TkDefaultFont', 11), fg='gray').grid(row=2, column=3, sticky=tk.W, padx=(10, 5), pady=2) self.google_region_var = tk.StringVar(value='us-east5') # Default region self.google_region_entry = tb.Entry(add_frame, textvariable=self.google_region_var, font=('TkDefaultFont', 8), state='normal', width=12) self.google_region_entry.grid(row=2, column=4, sticky=tk.W, pady=2) # Row 3: Individual Endpoint Toggle self.use_individual_endpoint_var = tk.BooleanVar(value=False) individual_endpoint_toggle = tb.Checkbutton(add_frame, text="Use Individual Endpoint", variable=self.use_individual_endpoint_var, bootstyle="round-toggle", command=self._toggle_individual_endpoint_fields) individual_endpoint_toggle.grid(row=3, column=0, columnspan=2, sticky=tk.W, padx=(0, 10), pady=5) # Row 4: Individual Endpoint (initially hidden) self.individual_endpoint_label = tk.Label(add_frame, text="Individual Endpoint:", font=('TkDefaultFont', 9), fg='gray') self.individual_endpoint_label.grid(row=4, column=0, sticky=tk.W, padx=(0, 10), pady=2) self.azure_endpoint_var = tk.StringVar() self.azure_endpoint_entry = tb.Entry(add_frame, textvariable=self.azure_endpoint_var, font=('TkDefaultFont', 8), state='disabled') self.azure_endpoint_entry.grid(row=4, column=1, columnspan=2, sticky=tk.EW, pady=2) # Individual Endpoint API Version (small dropdown, initially hidden) self.individual_api_version_label = tk.Label(add_frame, text="API Ver:", font=('TkDefaultFont', 11), fg='gray') self.individual_api_version_label.grid(row=4, column=3, sticky=tk.W, padx=(10, 5), pady=2) self.azure_api_version_var = tk.StringVar(value='2025-01-01-preview') azure_versions = [ '2025-01-01-preview', '2024-12-01-preview', '2024-10-01-preview', '2024-08-01-preview', '2024-06-01', '2024-02-01', '2023-12-01-preview' ] self.azure_api_version_combo = ttk.Combobox(add_frame, textvariable=self.azure_api_version_var, values=azure_versions, width=18, state='disabled', font=('TkDefaultFont', 7)) self.azure_api_version_combo.grid(row=4, column=4, sticky=tk.W, pady=2) # Block mouse wheel on version combobox try: if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'): self.translator_gui.ui.disable_spinbox_mousewheel(self.azure_api_version_combo) except Exception: pass # Initially hide the endpoint fields self._toggle_individual_endpoint_fields() # Setup inline editor hooks to use model options as a dropdown too # (Optional enhancement could be added later) # Row 5: (Copy Current Key button moved up next to Add Key) def _toggle_individual_endpoint_fields(self): """Toggle visibility and state of individual endpoint fields""" enabled = self.use_individual_endpoint_var.get() if enabled: # Show and enable endpoint fields state = tk.NORMAL self.individual_endpoint_label.grid() self.azure_endpoint_entry.grid() self.individual_api_version_label.grid() self.azure_api_version_combo.grid() self.azure_endpoint_entry.config(state=state) self.azure_api_version_combo.config(state='readonly') else: # Hide and disable endpoint fields state = tk.DISABLED self.individual_endpoint_label.grid_remove() self.azure_endpoint_entry.grid_remove() self.individual_api_version_label.grid_remove() self.azure_api_version_combo.grid_remove() # Clear the fields when disabled self.azure_endpoint_var.set("") self.azure_api_version_var.set('2025-01-01-preview') def _move_key(self, direction): """Move selected key in the specified direction""" selected = self.tree.selection() if not selected or len(selected) != 1: return item = selected[0] index = self.tree.index(item) if index >= len(self.key_pool.keys): return new_index = index if direction == 'up' and index > 0: new_index = index - 1 elif direction == 'down' and index < len(self.key_pool.keys) - 1: new_index = index + 1 elif direction == 'top': new_index = 0 elif direction == 'bottom': new_index = len(self.key_pool.keys) - 1 if new_index != index: # Swap keys in the pool with self.key_pool.lock: self.key_pool.keys[index], self.key_pool.keys[new_index] = \ self.key_pool.keys[new_index], self.key_pool.keys[index] # Refresh display self._refresh_key_list() # Reselect the moved item items = self.tree.get_children() if new_index < len(items): self.tree.selection_set(items[new_index]) self.tree.focus(items[new_index]) self.tree.see(items[new_index]) # Show status self._show_status(f"Moved key to position {new_index + 1}") def _on_selection_change(self, event): """Update position label when selection changes""" selected = self.tree.selection() if selected: index = self.tree.index(selected[0]) total = len(self.key_pool.keys) self.position_label.config(text=f"#{index + 1}/{total}") else: self.position_label.config(text="") def _on_drag_start(self, event): """Start drag operation""" # Check if we clicked on an item item = self.tree.identify_row(event.y) if item: self.drag_start_item = item self.drag_start_y = event.y # Select the item being dragged self.tree.selection_set(item) # Set cursor self.tree.config(cursor="hand2") def _on_drag_motion(self, event): """Handle drag motion""" if not self.drag_start_item: return # Get the item under the cursor target_item = self.tree.identify_row(event.y) if target_item and target_item != self.drag_start_item: # Visual feedback - change cursor self.tree.config(cursor="sb_v_double_arrow") def _on_drag_release(self, event): """Complete drag operation""" if not self.drag_start_item: return # Reset cursor self.tree.config(cursor="") # Get the target item target_item = self.tree.identify_row(event.y) if target_item and target_item != self.drag_start_item: # Get indices source_index = self.tree.index(self.drag_start_item) target_index = self.tree.index(target_item) # Reorder the keys in the pool with self.key_pool.lock: # Remove from source position key = self.key_pool.keys.pop(source_index) # Insert at target position self.key_pool.keys.insert(target_index, key) # Refresh display self._refresh_key_list() # Reselect the moved item items = self.tree.get_children() if target_index < len(items): self.tree.selection_set(items[target_index]) self.tree.focus(items[target_index]) self.tree.see(items[target_index]) # Show status self._show_status(f"Moved key from position {source_index + 1} to {target_index + 1}") # Reset drag state self.drag_start_item = None self.drag_start_y = None def _refresh_key_list(self): """Refresh the key list display preserving test results and highlighting key #1""" # Clear tree for item in self.tree.get_children(): self.tree.delete(item) # Update primary key label if it exists if hasattr(self, 'primary_key_label'): keys = self.key_pool.get_all_keys() if keys: first_key = keys[0] masked = first_key.api_key[:8] + "..." + first_key.api_key[-4:] if len(first_key.api_key) > 12 else first_key.api_key self.primary_key_label.config(text=f"⭐ PRIMARY KEY: {first_key.model} ({masked}) ⭐") # Add keys keys = self.key_pool.get_all_keys() for i, key in enumerate(keys): # Mask API key for display masked_key = key.api_key[:8] + "..." + key.api_key[-4:] if len(key.api_key) > 12 else key.api_key # Position indicator position = f"#{i+1}" if i == 0: position = "⭐ #1" # Determine status based on test results and current state if key.last_test_result is None and hasattr(key, '_testing'): status = "⏳ Testing..." tags = ('testing',) elif not key.enabled: status = "Disabled" tags = ('disabled',) elif key.last_test_result == 'passed': status = "✅ Passed" tags = ('passed',) elif key.last_test_result == 'failed': status = "❌ Failed" tags = ('failed',) elif key.last_test_result == 'rate_limited': status = "⚠️ Rate Limited" tags = ('ratelimited',) elif key.last_test_result == 'error': status = "❌ Error" if key.last_test_message: status += f": {key.last_test_message[:20]}..." tags = ('error',) elif key.is_cooling_down and key.last_error_time: remaining = int(key.cooldown - (time.time() - key.last_error_time)) if remaining > 0: status = f"Cooling ({remaining}s)" tags = ('cooling',) else: key.is_cooling_down = False status = "Active" tags = ('active',) else: status = "Active" tags = ('active',) # Times used (counter) times_used = getattr(key, 'times_used', key.success_count + key.error_count) # Insert into tree with position column self.tree.insert('', 'end', text=masked_key, values=(position, key.model, f"{key.cooldown}s", status, key.success_count, key.error_count, times_used), tags=tags) # Configure tags (these may or may not work depending on ttkbootstrap theme) self.tree.tag_configure('active', foreground='green') self.tree.tag_configure('cooling', foreground='orange') self.tree.tag_configure('disabled', foreground='gray') self.tree.tag_configure('testing', foreground='blue') self.tree.tag_configure('passed', foreground='dark green') self.tree.tag_configure('failed', foreground='red') self.tree.tag_configure('ratelimited', foreground='orange') self.tree.tag_configure('error', foreground='dark red') # Update stats active_count = sum(1 for k in keys if k.enabled and not k.is_cooling_down) total_count = len(keys) passed_count = sum(1 for k in keys if k.last_test_result == 'passed') self.stats_label.config(text=f"Keys: {active_count} active / {total_count} total | {passed_count} passed tests") def _on_click(self, event): """Handle click on tree item for inline editing""" # Close any existing edit widget if self.edit_widget: self.edit_widget.destroy() self.edit_widget = None # Identify what was clicked region = self.tree.identify_region(event.x, event.y) if region != "cell": return item = self.tree.identify_row(event.y) column = self.tree.identify_column(event.x) if not item: return # Get column index col_index = int(column.replace('#', '')) # Get the key index index = self.tree.index(item) if index >= len(self.key_pool.keys): return key = self.key_pool.keys[index] # Only allow editing Model (column #1) and Cooldown (column #2) if col_index == 1: # Model column self._edit_model_inline(item, column, key) elif col_index == 2: # Cooldown column self._edit_cooldown_inline(item, column, key) def _edit_model_inline(self, item, column, key): """Create inline editor for model name""" # Get the bounding box of the cell x, y, width, height = self.tree.bbox(item, column) # Expand the width to show more text (make it wider than the column) expanded_width = max(width + 100, 250) # At least 250 pixels wide expanded_height = height + 8 # Add some padding to the height # Create entry widget edit_var = tk.StringVar(value=key.model) self.edit_widget = tb.Entry(self.tree, textvariable=edit_var, font=('TkDefaultFont', 11)) def save_edit(): new_value = edit_var.get().strip() if new_value and new_value != key.model: key.model = new_value self._refresh_key_list() self._show_status(f"Updated model to: {new_value}") if self.edit_widget: self.edit_widget.destroy() self.edit_widget = None def cancel_edit(event=None): if self.edit_widget: self.edit_widget.destroy() self.edit_widget = None # Place and configure the entry with expanded dimensions # Adjust y position slightly to center it better self.edit_widget.place(x=x, y=y-2, width=expanded_width, height=expanded_height) self.edit_widget.focus() self.edit_widget.select_range(0, tk.END) # Make sure the widget appears on top self.edit_widget.lift() # Bind events self.edit_widget.bind('', lambda e: save_edit()) self.edit_widget.bind('', cancel_edit) self.edit_widget.bind('', lambda e: save_edit()) # Prevent the click from selecting the item return "break" def _edit_cooldown_inline(self, item, column, key): """Create inline editor for cooldown""" # Get the bounding box of the cell x, y, width, height = self.tree.bbox(item, column) # Create spinbox widget edit_var = tk.IntVar(value=key.cooldown) self.edit_widget = tb.Spinbox(self.tree, from_=10, to=3600, textvariable=edit_var, width=10) # Disable mouse wheel changing values on inline editor try: if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'): self.translator_gui.ui.disable_spinbox_mousewheel(self.edit_widget) except Exception: pass def save_edit(): new_value = edit_var.get() if new_value != key.cooldown: key.cooldown = new_value self._refresh_key_list() self._show_status(f"Updated cooldown to: {new_value}s") if self.edit_widget: self.edit_widget.destroy() self.edit_widget = None def cancel_edit(event=None): if self.edit_widget: self.edit_widget.destroy() self.edit_widget = None # Place and configure the spinbox self.edit_widget.place(x=x, y=y, width=width, height=height) self.edit_widget.focus() # Bind events self.edit_widget.bind('', lambda e: save_edit()) self.edit_widget.bind('', cancel_edit) self.edit_widget.bind('', lambda e: save_edit()) # Prevent the click from selecting the item return "break" def _show_context_menu(self, event): """Show context menu with reorder options""" # Select item under cursor item = self.tree.identify_row(event.y) if item: # If the clicked item is not in selection, select only it if item not in self.tree.selection(): self.tree.selection_set(item) # Create context menu menu = tk.Menu(self.dialog, tearoff=0) # Reorder submenu reorder_menu = tk.Menu(menu, tearoff=0) reorder_menu.add_command(label="Move to Top", command=lambda: self._move_key('top')) reorder_menu.add_command(label="Move Up", command=lambda: self._move_key('up')) reorder_menu.add_command(label="Move Down", command=lambda: self._move_key('down')) reorder_menu.add_command(label="Move to Bottom", command=lambda: self._move_key('bottom')) menu.add_cascade(label="Reorder", menu=reorder_menu) menu.add_separator() # Add change model option selected_count = len(self.tree.selection()) if selected_count > 1: menu.add_command(label=f"Change Model ({selected_count} selected)", command=self._change_model_for_selected) else: menu.add_command(label="Change Model", command=self._change_model_for_selected) menu.add_separator() # Individual Endpoint options index = self.tree.index(item) if index < len(self.key_pool.keys): key = self.key_pool.keys[index] endpoint_enabled = getattr(key, 'use_individual_endpoint', False) endpoint_url = getattr(key, 'azure_endpoint', '') if endpoint_enabled and endpoint_url: menu.add_command(label="✅ Individual Endpoint", command=lambda: self._configure_individual_endpoint(index)) menu.add_command(label="Disable Individual Endpoint", command=lambda: self._toggle_individual_endpoint(index, False)) else: menu.add_command(label="🔧 Configure Individual Endpoint", command=lambda: self._configure_individual_endpoint(index)) menu.add_separator() menu.add_command(label="Test", command=self._test_selected) menu.add_command(label="Enable", command=self._enable_selected) menu.add_command(label="Disable", command=self._disable_selected) menu.add_separator() menu.add_command(label="Remove", command=self._remove_selected) # Show menu menu.post(event.x_root, event.y_root) def _change_model_for_selected(self): """Change model for all selected entries""" selected = self.tree.selection() if not selected: return # Create simple dialog dialog = tk.Toplevel(self.dialog) dialog.title(f"Change Model for {len(selected)} Keys") dialog.geometry("400x130") dialog.transient(self.dialog) # Set icon self._set_icon(dialog) # Center the dialog dialog.update_idletasks() x = (dialog.winfo_screenwidth() // 2) - (dialog.winfo_width() // 2) y = (dialog.winfo_screenheight() // 2) - (dialog.winfo_height() // 2) dialog.geometry(f"+{x}+{y}") # Main frame main_frame = tk.Frame(dialog, padx=20, pady=20) main_frame.pack(fill=tk.BOTH, expand=True) # Label tk.Label(main_frame, text="Enter new model name (press Enter to apply):", font=('TkDefaultFont', 10)).pack(pady=(0, 10)) # Model entry with dropdown model_var = tk.StringVar() # Full model list (same as main GUI) all_models = get_model_options() model_combo = ttk.Combobox(main_frame, values=all_models, textvariable=model_var, width=45, height=12) model_combo.pack(pady=(0, 10)) # Block mouse wheel on combobox try: if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'): self.translator_gui.ui.disable_spinbox_mousewheel(model_combo) except Exception: pass # Attach gentle autofill self._attach_model_autofill(model_combo, model_var) # Get current model from first selected item as default selected_indices = [self.tree.index(item) for item in selected] if selected_indices and selected_indices[0] < len(self.key_pool.keys): current_model = self.key_pool.keys[selected_indices[0]].model model_var.set(current_model) model_combo.select_range(0, tk.END) # Select all text for easy replacement def apply_change(event=None): new_model = model_var.get().strip() if new_model: # Update all selected keys for item in selected: index = self.tree.index(item) if index < len(self.key_pool.keys): self.key_pool.keys[index].model = new_model # Refresh the display self._refresh_key_list() # Show status self._show_status(f"Changed model to '{new_model}' for {len(selected)} keys") dialog.destroy() # Focus on the combobox model_combo.focus() # Bind Enter key to apply dialog.bind('', apply_change) model_combo.bind('', apply_change) dialog.bind('', lambda e: dialog.destroy()) def _configure_individual_endpoint(self, key_index): """Configure individual endpoint for a specific key""" if key_index >= len(self.key_pool.keys): return key = self.key_pool.keys[key_index] # Create individual endpoint dialog using the class if IndividualEndpointDialog is None: messagebox.showerror("Error", "IndividualEndpointDialog is not available.") return IndividualEndpointDialog(self.dialog, self.translator_gui, key, self._refresh_key_list, self._show_status) def _toggle_endpoint_fields(self, enable_var, endpoint_entry, version_combo): """Toggle endpoint configuration fields based on enable state""" if enable_var.get(): endpoint_entry.config(state='normal') version_combo.config(state='readonly') else: endpoint_entry.config(state='disabled') version_combo.config(state='disabled') def _toggle_individual_endpoint(self, key_index, enabled): """Quick toggle individual endpoint on/off""" if key_index >= len(self.key_pool.keys): return key = self.key_pool.keys[key_index] key.use_individual_endpoint = enabled # Refresh display self._refresh_key_list() # Show status status = "enabled" if enabled else "disabled" self._show_status(f"Individual endpoint {status} for {key.model}") # Additional helper method to swap keys programmatically def swap_keys(self, index1: int, index2: int): """Swap two keys by their indices""" with self.key_pool.lock: if 0 <= index1 < len(self.key_pool.keys) and 0 <= index2 < len(self.key_pool.keys): self.key_pool.keys[index1], self.key_pool.keys[index2] = \ self.key_pool.keys[index2], self.key_pool.keys[index1] self._refresh_key_list() return True return False # Method to move a key to a specific position def move_key_to_position(self, from_index: int, to_index: int): """Move a key from one position to another""" with self.key_pool.lock: if 0 <= from_index < len(self.key_pool.keys) and 0 <= to_index < len(self.key_pool.keys): key = self.key_pool.keys.pop(from_index) self.key_pool.keys.insert(to_index, key) self._refresh_key_list() return True return False def _create_button_bar(self, parent): """Create the bottom button bar""" button_frame = tk.Frame(parent) button_frame.pack(fill=tk.X, pady=(20, 0)) # Save button tb.Button(button_frame, text="Save & Close", command=self._save_and_close, bootstyle="success").pack(side=tk.RIGHT, padx=(5, 0)) # Cancel button tb.Button(button_frame, text="Cancel", command=self._on_close, bootstyle="secondary").pack(side=tk.RIGHT) # Import/Export tb.Button(button_frame, text="Import", command=self._import_keys, bootstyle="info-outline").pack(side=tk.LEFT, padx=(0, 5)) tb.Button(button_frame, text="Export", command=self._export_keys, bootstyle="info-outline").pack(side=tk.LEFT) def _browse_google_credentials(self): """Browse for Google Cloud credentials JSON file""" filename = filedialog.askopenfilename( title="Select Google Cloud Credentials JSON", filetypes=[("JSON files", "*.json"), ("All files", "*.*")] ) if filename: try: # Validate it's a valid Google Cloud credentials file with open(filename, 'r') as f: creds_data = json.load(f) if 'type' in creds_data and 'project_id' in creds_data: self.google_creds_var.set(filename) self._show_status(f"Selected Google credentials: {os.path.basename(filename)}") else: messagebox.showerror( "Error", "Invalid Google Cloud credentials file. Please select a valid service account JSON file." ) except Exception as e: messagebox.showerror("Error", f"Failed to load credentials: {str(e)}") def _browse_fallback_google_credentials(self): """Browse for Google Cloud credentials JSON file for fallback keys""" filename = filedialog.askopenfilename( title="Select Google Cloud Credentials JSON for Fallback", filetypes=[("JSON files", "*.json"), ("All files", "*.*")] ) if filename: try: # Validate it's a valid Google Cloud credentials file with open(filename, 'r') as f: creds_data = json.load(f) if 'type' in creds_data and 'project_id' in creds_data: self.fallback_google_creds_var.set(filename) self._show_status(f"Selected fallback Google credentials: {os.path.basename(filename)}") else: messagebox.showerror( "Error", "Invalid Google Cloud credentials file. Please select a valid service account JSON file." ) except Exception as e: messagebox.showerror("Error", f"Failed to load credentials: {str(e)}") def _attach_model_autofill(self, combo: ttk.Combobox, var: tk.StringVar, on_change=None): """Attach the same gentle autofill/scroll behavior as the main GUI. - No filtering; keeps full list intact. - Gentle autofill only when appending at end; Backspace/Delete respected. - Scroll/highlight match if dropdown is open. """ import tkinter as _tk import logging as _logging # Store full values list on the widget try: combo._all_values = list(combo['values']) except Exception: combo._all_values = [] combo._prev_text = var.get() if var else combo.get() def _scroll_to_value(_combo: ttk.Combobox, value: str): try: values = getattr(_combo, '_all_values', []) or list(_combo['values']) if value not in values: return index = values.index(value) popdown = _combo.tk.eval(f'ttk::combobox::PopdownWindow {_combo._w}') listbox = f'{popdown}.f.l' tkobj = _combo.tk tkobj.call(listbox, 'see', index) tkobj.call(listbox, 'selection', 'clear', 0, 'end') tkobj.call(listbox, 'selection', 'set', index) tkobj.call(listbox, 'activate', index) except Exception: pass def _on_keyrelease(event=None): try: typed = combo.get() prev = getattr(combo, '_prev_text', '') keysym = (getattr(event, 'keysym', '') or '').lower() if keysym in {'up', 'down', 'left', 'right', 'return', 'escape', 'tab'}: return source = getattr(combo, '_all_values', []) or list(combo['values']) first_match = None if typed: lowered = typed.lower() pref = [v for v in source if v.lower().startswith(lowered)] cont = [v for v in source if lowered in v.lower()] if not pref else [] if pref: first_match = pref[0] elif cont: first_match = cont[0] grew = len(typed) > len(prev) and typed.startswith(prev) is_del = keysym in {'backspace', 'delete'} or len(typed) < len(prev) try: at_end = combo.index(_tk.INSERT) == len(typed) except Exception: at_end = True try: has_sel = combo.selection_present() except Exception: has_sel = False # Gentle autofill if first_match and grew and at_end and not has_sel and not is_del: if first_match.lower().startswith(typed.lower()) and first_match != typed: combo.set(first_match) try: combo.icursor(len(typed)) combo.selection_range(len(typed), len(first_match)) except Exception: pass # If dropdown is open, scroll/highlight (no auto-open) if first_match: _scroll_to_value(combo, first_match) combo._prev_text = typed if on_change and typed != prev: on_change() except Exception as e: try: _logging.debug(f"Combobox autocomplete error: {e}") except Exception: pass combo.bind('', _on_keyrelease) def _on_return(event=None): try: typed = combo.get() source = getattr(combo, '_all_values', []) or list(combo['values']) match = None if typed: lowered = typed.lower() pref = [v for v in source if v.lower().startswith(lowered)] cont = [v for v in source if lowered in v.lower()] if not pref else [] match = pref[0] if pref else (cont[0] if cont else None) if match and match != typed: combo.set(match) # Place caret at end and clear selection try: combo.icursor('end') try: combo.selection_clear() except Exception: combo.selection_range(0, 0) except Exception: pass combo._prev_text = combo.get() if on_change: on_change() except Exception as e: try: _logging.debug(f"Combobox enter-commit error: {e}") except Exception: pass # Do not return "break" so outer dialogs bound to still fire return None combo.bind('', _on_return) combo.bind('<>', lambda e: on_change() if on_change else None) combo.bind('', lambda e: on_change() if on_change else None) def _toggle_key_visibility(self): """Toggle API key visibility""" if self.api_key_entry.cget('show') == '*': self.api_key_entry.config(show='') self.show_key_btn.config(text='🔒') else: self.api_key_entry.config(show='*') self.show_key_btn.config(text='👁') def _toggle_multi_key_mode(self): """Toggle multi-key mode""" enabled = self.enabled_var.get() self.translator_gui.config['use_multi_api_keys'] = enabled # Save the config immediately self.translator_gui.save_config(show_message=False) # Fallback section is always visible now (works in both modes) # No need to show/hide fallback section based on multi-key mode # Update other UI elements for widget in [self.api_key_entry, self.model_combo]: if widget: widget.config(state=tk.NORMAL if enabled else tk.DISABLED) # Handle Treeview separately - it doesn't support state property if self.tree: if enabled: # Re-enable tree interactions by restoring original bindings self.tree.bind('', self._on_click) self.tree.bind('', self._show_context_menu) self.tree.bind('<>', self._on_selection_change) # Re-enable drag and drop self.tree.bind('', self._on_drag_start, add='+') self.tree.bind('', self._on_drag_motion) self.tree.bind('', self._on_drag_release) else: # Disable tree interactions self.tree.unbind('') self.tree.unbind('') self.tree.unbind('<>') self.tree.unbind('') self.tree.unbind('') # Update action buttons state for child in self.dialog.winfo_children(): if isinstance(child, tk.Frame): for subchild in child.winfo_children(): if isinstance(subchild, tk.Frame): for button in subchild.winfo_children(): if isinstance(button, (tb.Button, ttk.Button)) and button.cget('text') in [ 'Test Selected', 'Test All', 'Enable Selected', 'Disable Selected', 'Remove Selected', 'Add Key' ]: button.config(state=tk.NORMAL if enabled else tk.DISABLED) def _copy_current_settings(self): """Copy current API key and model from main GUI""" if hasattr(self.translator_gui, 'api_key_var'): self.api_key_var.set(self.translator_gui.api_key_var.get()) if hasattr(self.translator_gui, 'model_var'): self.model_var.set(self.translator_gui.model_var.get()) def _add_key(self): """Add a new API key with optional Google credentials and individual endpoint""" api_key = self.api_key_var.get().strip() model = self.model_var.get().strip() cooldown = self.cooldown_var.get() google_credentials = self.google_creds_var.get().strip() or None google_region = self.google_region_var.get().strip() or None # Only use individual endpoint if toggle is enabled use_individual_endpoint = self.use_individual_endpoint_var.get() azure_endpoint = self.azure_endpoint_var.get().strip() if use_individual_endpoint else None azure_api_version = self.azure_api_version_var.get().strip() if use_individual_endpoint else None if not api_key or not model: messagebox.showerror("Error", "Please enter both API key and model name") return # Add to pool with new fields key_entry = APIKeyEntry(api_key, model, cooldown, enabled=True, google_credentials=google_credentials, azure_endpoint=azure_endpoint, google_region=google_region, azure_api_version=azure_api_version, use_individual_endpoint=use_individual_endpoint) self.key_pool.add_key(key_entry) # Clear inputs self.api_key_var.set("") self.model_var.set("") self.cooldown_var.set(60) self.google_creds_var.set("") self.azure_endpoint_var.set("") self.google_region_var.set("us-east5") self.azure_api_version_var.set('2025-01-01-preview') self.use_individual_endpoint_var.set(False) # Update the UI to hide endpoint fields self._toggle_individual_endpoint_fields() # Refresh list self._refresh_key_list() # Show success extras = [] if google_credentials: extras.append(f"Google: {os.path.basename(google_credentials)}") if azure_endpoint: extras.append(f"Azure: {azure_endpoint[:30]}...") extra_info = f" ({', '.join(extras)})" if extras else "" self._show_status(f"Added key for model: {model}{extra_info}") def _refresh_key_list(self): """Refresh the key list display""" # Clear tree for item in self.tree.get_children(): self.tree.delete(item) # Add keys keys = self.key_pool.get_all_keys() for i, key in enumerate(keys): # Mask API key for display masked_key = key.api_key[:8] + "..." + key.api_key[-4:] if len(key.api_key) > 12 else key.api_key # Status if not key.enabled: status = "Disabled" tags = ('disabled',) elif key.is_cooling_down: remaining = int(key.cooldown - (time.time() - key.last_error_time)) status = f"Cooling ({remaining}s)" tags = ('cooling',) else: status = "Active" tags = ('active',) # Times used (counter) times_used = getattr(key, 'times_used', key.success_count + key.error_count) # Insert into tree self.tree.insert('', 'end', text=masked_key, values=(key.model, f"{key.cooldown}s", status, key.success_count, key.error_count, times_used), tags=tags) # Configure tags self.tree.tag_configure('active', foreground='green') self.tree.tag_configure('cooling', foreground='orange') self.tree.tag_configure('disabled', foreground='gray') # Update stats active_count = sum(1 for k in keys if k.enabled and not k.is_cooling_down) total_count = len(keys) self.stats_label.config(text=f"Keys: {active_count} active / {total_count} total") def _test_selected(self): """Test selected API keys with inline progress""" selected = self.tree.selection() if not selected: messagebox.showwarning("Warning", "Please select keys to test") return # Get selected indices indices = [self.tree.index(item) for item in selected] # Ensure UnifiedClient uses the same shared pool instance try: from unified_api_client import UnifiedClient UnifiedClient._api_key_pool = self.key_pool except Exception: pass # Start testing in thread thread = threading.Thread(target=self._run_inline_tests, args=(indices,)) thread.daemon = True thread.start() def _test_all(self): """Test all API keys with inline progress""" if not self.key_pool.keys: messagebox.showwarning("Warning", "No keys to test") return indices = list(range(len(self.key_pool.keys))) # Start testing in thread thread = threading.Thread(target=self._run_inline_tests, args=(indices,)) thread.daemon = True thread.start() def _run_inline_tests(self, indices: List[int]): """Run API tests with persistent inline results""" from concurrent.futures import ThreadPoolExecutor, as_completed import os print(f"[DEBUG] Starting tests for {len(indices)} keys") # Mark all selected keys as testing for index in indices: if index < len(self.key_pool.keys): key = self.key_pool.keys[index] key.last_test_result = None key._testing = True print(f"[DEBUG] Marked key {index} as testing") # Refresh once to show "Testing..." status self.dialog.after(0, self._refresh_key_list) # Create thread pool for parallel testing max_workers = min(10, len(indices)) def test_single_key(index): """Test a single API key directly""" print(f"[DEBUG] Testing key at index {index}") if index >= len(self.key_pool.keys): return None key = self.key_pool.keys[index] try: # Simple test - just check if we can import the libraries # This is a minimal test to see if the function completes print(f"[DEBUG] Testing {key.model} with key {key.api_key[:8]}...") # Simulate a test import time time.sleep(1) # Simulate API call # For now, just mark as passed to test the flow key.mark_success() key.set_test_result('passed', 'Test successful') print(f"[DEBUG] Key {index} test completed - PASSED") time.sleep(0.5) # Brief pause to improve retry responsiveness logger.debug("💤 Pausing briefly to improve retry responsiveness after test completion") return (index, True, "Test passed") except Exception as e: print(f"[DEBUG] Key {index} test failed: {e}") key.mark_error() key.set_test_result('error', str(e)[:30]) return (index, False, f"Error: {str(e)[:50]}...") # Run tests in parallel results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all test tasks future_to_index = {executor.submit(test_single_key, i): i for i in indices} # Process results as they complete for future in as_completed(future_to_index): result = future.result() if result: results.append(result) print(f"[DEBUG] Got result: {result}") print(f"[DEBUG] All tests complete. Results: {len(results)}") # Calculate summary success_count = sum(1 for _, success, _ in results if success) total_count = len(results) # Clear testing flags for index in indices: if index < len(self.key_pool.keys): key = self.key_pool.keys[index] if hasattr(key, '_testing'): delattr(key, '_testing') print(f"[DEBUG] Cleared testing flag for key {index}") # Update UI in main thread print(f"[DEBUG] Refreshing UI with results") self.dialog.after(0, self._refresh_key_list) self.dialog.after(0, lambda: self.stats_label.config( text=f"Test complete: {success_count}/{total_count} passed")) def _update_tree_item(self, index: int): """Update a single tree item based on current key state""" def update(): # Find the tree item for this index items = self.tree.get_children() if index < len(items): item = items[index] key = self.key_pool.keys[index] # Determine status and tags if key.last_test_result is None: # Currently testing status = "⏳ Testing..." tags = ('testing',) elif not key.enabled: status = "Disabled" tags = ('disabled',) elif key.last_test_result == 'passed': if key.is_cooling_down: remaining = int(key.cooldown - (time.time() - key.last_error_time)) status = f"✅ Passed (cooling {remaining}s)" tags = ('passed_cooling',) else: status = "✅ Passed" tags = ('passed',) elif key.last_test_result == 'failed': status = "❌ Failed" tags = ('failed',) elif key.last_test_result == 'rate_limited': remaining = int(key.cooldown - (time.time() - key.last_error_time)) status = f"⚠️ Rate Limited ({remaining}s)" tags = ('ratelimited',) elif key.last_test_result == 'error': status = "❌ Error" if key.last_test_message: status += f": {key.last_test_message[:20]}..." tags = ('error',) elif key.is_cooling_down: remaining = int(key.cooldown - (time.time() - key.last_error_time)) status = f"Cooling ({remaining}s)" tags = ('cooling',) else: status = "Active" tags = ('active',) # Get current values values = list(self.tree.item(item, 'values')) # Update status column values[2] = status # Update success/error counts values[3] = key.success_count values[4] = key.error_count # Update times used (counter) values[5] = getattr(key, 'times_used', key.success_count + key.error_count) # Update the item self.tree.item(item, values=values, tags=tags) # Run in main thread self.dialog.after(0, update) def _refresh_key_list(self): """Refresh the key list display preserving test results""" # Clear tree for item in self.tree.get_children(): self.tree.delete(item) # Add keys keys = self.key_pool.get_all_keys() for i, key in enumerate(keys): # Mask API key for display masked_key = key.api_key[:8] + "..." + key.api_key[-4:] if len(key.api_key) > 12 else key.api_key # Determine status based on test results and current state if key.last_test_result is None and hasattr(key, '_testing'): # Currently testing (temporary flag) status = "⏳ Testing..." tags = ('testing',) elif not key.enabled: status = "Disabled" tags = ('disabled',) elif key.last_test_result == 'passed': status = "✅ Passed" tags = ('passed',) elif key.last_test_result == 'failed': status = "❌ Failed" tags = ('failed',) elif key.last_test_result == 'rate_limited': status = "⚠️ Rate Limited" tags = ('ratelimited',) elif key.last_test_result == 'error': status = "❌ Error" if key.last_test_message: status += f": {key.last_test_message[:20]}..." tags = ('error',) elif key.is_cooling_down and key.last_error_time: remaining = int(key.cooldown - (time.time() - key.last_error_time)) if remaining > 0: status = f"Cooling ({remaining}s)" tags = ('cooling',) else: key.is_cooling_down = False status = "Active" tags = ('active',) else: status = "Active" tags = ('active',) # Times used (counter) times_used = getattr(key, 'times_used', key.success_count + key.error_count) # Insert into tree self.tree.insert('', 'end', text=masked_key, values=(key.model, f"{key.cooldown}s", status, key.success_count, key.error_count, times_used), tags=tags) # Configure tags self.tree.tag_configure('active', foreground='green') self.tree.tag_configure('cooling', foreground='orange') self.tree.tag_configure('disabled', foreground='gray') self.tree.tag_configure('testing', foreground='blue', font=('TkDefaultFont', 11)) self.tree.tag_configure('passed', foreground='dark green', font=('TkDefaultFont', 11)) self.tree.tag_configure('failed', foreground='red') self.tree.tag_configure('ratelimited', foreground='orange') self.tree.tag_configure('error', foreground='dark red') # Update stats active_count = sum(1 for k in keys if k.enabled and not k.is_cooling_down) total_count = len(keys) passed_count = sum(1 for k in keys if k.last_test_result == 'passed') self.stats_label.config(text=f"Keys: {active_count} active / {total_count} total | {passed_count} passed tests") def _create_progress_dialog(self): """Create simple progress dialog at mouse cursor position""" self.progress_dialog = tk.Toplevel(self.dialog) self.progress_dialog.title("Testing API Keys") # Get mouse position x = self.progress_dialog.winfo_pointerx() y = self.progress_dialog.winfo_pointery() # Set geometry at cursor position (offset slightly so cursor is inside window) self.progress_dialog.geometry(f"500x400+{x-50}+{y-30}") # Add label label = tb.Label(self.progress_dialog, text="Testing in progress...", font=('TkDefaultFont', 10, 'bold')) label.pack(pady=10) # Add text widget for results self.progress_text = scrolledtext.ScrolledText(self.progress_dialog, wrap=tk.WORD, width=60, height=20) self.progress_text.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10)) # Add close button (initially disabled) self.close_button = tb.Button(self.progress_dialog, text="Close", command=self.progress_dialog.destroy, bootstyle="secondary", state=tk.DISABLED) self.close_button.pack(pady=(0, 10)) self.progress_dialog.transient(self.dialog) def _run_tests(self, indices: List[int]): """Run API tests for specified keys in parallel""" from unified_api_client import UnifiedClient from concurrent.futures import ThreadPoolExecutor, as_completed import os # Get Gemini endpoint settings use_gemini_endpoint = os.getenv("USE_GEMINI_OPENAI_ENDPOINT", "0") == "1" gemini_endpoint = os.getenv("GEMINI_OPENAI_ENDPOINT", "") # Create thread pool for parallel testing max_workers = min(10, len(indices)) # Limit to 10 concurrent tests def test_single_key(index): """Test a single API key""" if index >= len(self.key_pool.keys): return None key = self.key_pool.keys[index] # Create a key identifier key_preview = f"{key.api_key[:8]}...{key.api_key[-4:]}" if len(key.api_key) > 12 else key.api_key test_label = f"{key.model} [{key_preview}]" # Update UI to show test started self.dialog.after(0, lambda label=test_label: self.progress_text.insert(tk.END, f"Testing {label}... ")) self.dialog.after(0, lambda: self.progress_text.see(tk.END)) try: # Count this usage for times used in testing as well try: key.times_used += 1 except Exception: pass # Check if this is a Gemini model with custom endpoint is_gemini_model = key.model.lower().startswith('gemini') if is_gemini_model and use_gemini_endpoint and gemini_endpoint: # Test Gemini with OpenAI-compatible endpoint import openai endpoint_url = gemini_endpoint if not endpoint_url.endswith('/openai/'): endpoint_url = endpoint_url.rstrip('/') + '/openai/' client = openai.OpenAI( api_key=key.api_key, base_url=endpoint_url, timeout=10.0 ) response = client.chat.completions.create( model=key.model.replace('gemini/', ''), messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Say 'API test successful' and nothing else."} ], max_tokens=100, temperature=0.7 ) content = response.choices[0].message.content if content and "test successful" in content.lower(): self.dialog.after(0, lambda label=test_label: self._update_test_result(label, True)) key.mark_success() return (index, True, "Test passed") else: self.dialog.after(0, lambda label=test_label: self._update_test_result(label, False)) key.mark_error() return (index, False, "Unexpected response") else: # Use UnifiedClient for non-Gemini or regular Gemini client = UnifiedClient( api_key=key.api_key, model=key.model, output_dir=None ) messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Say 'API test successful' and nothing else."} ] response = client.send( messages, temperature=0.7, max_tokens=100 ) if response and isinstance(response, tuple): content, finish_reason = response if content and "test successful" in content.lower(): self.dialog.after(0, lambda label=test_label: self._update_test_result(label, True)) key.mark_success() return (index, True, "Test passed") else: self.dialog.after(0, lambda label=test_label: self._update_test_result(label, False)) key.mark_error() return (index, False, "Unexpected response") else: self.dialog.after(0, lambda label=test_label: self._update_test_result(label, False)) key.mark_error() return (index, False, "No response") except Exception as e: error_msg = str(e) error_code = None if "429" in error_msg or "rate limit" in error_msg.lower(): error_code = 429 self.dialog.after(0, lambda label=test_label: self._update_test_result(label, False, error=True)) key.mark_error(error_code) return (index, False, f"Error: {error_msg}") # Run tests in parallel with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all test tasks future_to_index = {executor.submit(test_single_key, i): i for i in indices} # Process results as they complete for future in as_completed(future_to_index): result = future.result() if result: self.test_results.put(result) # Show completion and close button self.dialog.after(0, self._show_completion) # Process final results self.dialog.after(0, self._process_test_results) def _update_test_result(self, test_label, success, error=False): """Update the progress text with test result""" # Find the line with this test label content = self.progress_text.get("1.0", tk.END) lines = content.split('\n') for i, line in enumerate(lines): if test_label in line and not any(status in line for status in ["✅", "❌"]): # This is our line, update it if error: result_text = "❌ ERROR" elif success: result_text = "✅ PASSED" else: result_text = "❌ FAILED" # Calculate position line_num = i + 1 line_end = f"{line_num}.end" self.progress_text.insert(line_end, result_text) self.progress_text.insert(line_end, "\n") self.progress_text.see(tk.END) break def _show_completion(self): """Show completion in the same dialog""" self.progress_text.insert(tk.END, "\n--- Testing Complete ---\n") self.progress_text.see(tk.END) def _process_test_results(self): """Process test results and show in the same dialog""" results = [] # Get all results while not self.test_results.empty(): try: results.append(self.test_results.get_nowait()) except: break if results: # Build result message success_count = sum(1 for _, success, _ in results if success) total_count = len(results) # Update everything at once after all tests complete def final_update(): # Clear testing flags for index in indices: if index < len(self.key_pool.keys): key = self.key_pool.keys[index] if hasattr(key, '_testing'): delattr(key, '_testing') self._refresh_key_list() self.stats_label.config(text=f"Test complete: {success_count}/{total_count} passed") # Use lambda to capture the variables in scope self.dialog.after(0, lambda: final_update()) # Add summary to the same dialog self.progress_text.insert(tk.END, f"\nSummary: {success_count}/{total_count} passed\n") self.progress_text.insert(tk.END, "-" * 50 + "\n\n") for i, success, msg in results: key = self.key_pool.keys[i] # Show key identifier in results too key_preview = f"{key.api_key[:8]}...{key.api_key[-4:]}" if len(key.api_key) > 12 else key.api_key status = "✅" if success else "❌" self.progress_text.insert(tk.END, f"{status} {key.model} [{key_preview}]: {msg}\n") self.progress_text.see(tk.END) # Enable close button now that testing is complete self.close_button.config(state=tk.NORMAL) # Update the dialog title self.progress_dialog.title(f"API Test Results - {success_count}/{total_count} passed") # Refresh list self._refresh_key_list() def _enable_selected(self): """Enable selected keys""" selected = self.tree.selection() for item in selected: index = self.tree.index(item) if index < len(self.key_pool.keys): self.key_pool.keys[index].enabled = True self._refresh_key_list() self._show_status(f"Enabled {len(selected)} key(s)") def _disable_selected(self): """Disable selected keys""" selected = self.tree.selection() for item in selected: index = self.tree.index(item) if index < len(self.key_pool.keys): self.key_pool.keys[index].enabled = False self._refresh_key_list() self._show_status(f"Disabled {len(selected)} key(s)") def _remove_selected(self): """Remove selected keys""" selected = self.tree.selection() if not selected: return if messagebox.askyesno("Confirm", f"Remove {len(selected)} selected key(s)?"): # Get indices in reverse order to avoid index shifting indices = sorted([self.tree.index(item) for item in selected], reverse=True) for index in indices: self.key_pool.remove_key(index) self._refresh_key_list() self._show_status(f"Removed {len(selected)} key(s)") def _edit_cooldown(self): """Edit cooldown for selected key""" selected = self.tree.selection() if not selected or len(selected) != 1: messagebox.showwarning("Warning", "Please select exactly one key") return index = self.tree.index(selected[0]) if index >= len(self.key_pool.keys): return key = self.key_pool.keys[index] # Create simple dialog dialog = tk.Toplevel(self.dialog) dialog.title("Edit Cooldown") dialog.geometry("300x150") tk.Label(dialog, text=f"Cooldown for {key.model}:").pack(pady=10) cooldown_var = tk.IntVar(value=key.cooldown) tb.Spinbox(dialog, from_=10, to=3600, textvariable=cooldown_var, width=10).pack(pady=5) def _import_keys(self): """Import keys from JSON file""" from tkinter import filedialog filename = filedialog.askopenfilename( title="Import API Keys", filetypes=[("JSON files", "*.json"), ("All files", "*.*")] ) if filename: try: with open(filename, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, list): # Load keys imported_count = 0 for key_data in data: if isinstance(key_data, dict) and 'api_key' in key_data and 'model' in key_data: self.key_pool.add_key(APIKeyEntry.from_dict(key_data)) imported_count += 1 self._refresh_key_list() messagebox.showinfo("Success", f"Imported {imported_count} API keys") else: messagebox.showerror("Error", "Invalid file format") except Exception as e: messagebox.showerror("Error", f"Failed to import: {str(e)}") def _export_keys(self): """Export keys to JSON file""" from tkinter import filedialog if not self.key_pool.keys: messagebox.showwarning("Warning", "No keys to export") return filename = filedialog.asksaveasfilename( title="Export API Keys", defaultextension=".json", filetypes=[("JSON files", "*.json"), ("All files", "*.*")] ) if filename: try: # Convert keys to list of dicts key_list = [key.to_dict() for key in self.key_pool.get_all_keys()] with open(filename, 'w', encoding='utf-8') as f: json.dump(key_list, f, indent=2, ensure_ascii=False) messagebox.showinfo("Success", f"Exported {len(key_list)} API keys") except Exception as e: messagebox.showerror("Error", f"Failed to export: {str(e)}") def _show_status(self, message: str): """Show status message""" self.stats_label.config(text=message) def _save_and_close(self): """Save configuration and close""" self._save_keys_to_config() messagebox.showinfo("Success", "API key configuration saved") self.dialog.destroy() def _on_close(self): """Handle dialog close""" self.dialog.destroy()