Glossarion / multi_api_key_manager.py
Shirochi's picture
Upload 41 files
457b8fd verified
# multi_api_key_manager.py
"""
Multi API Key Manager for Glossarion
Handles multiple API keys with round-robin load balancing and rate limit management
"""
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext, filedialog
import ttkbootstrap as tb
import json
import os
import threading
import time
import queue
from typing import Dict, List, Optional, Tuple
import requests
from datetime import datetime, timedelta
import logging
from model_options import get_model_options
# Dialog for configuring per-key endpoint
try:
from individual_endpoint_dialog import IndividualEndpointDialog
except Exception:
IndividualEndpointDialog = None
logger = logging.getLogger(__name__)
class RateLimitCache:
"""Thread-safe rate limit cache"""
def __init__(self):
self._cache = {} # key_id -> expiry_time
self._lock = threading.Lock()
def add_rate_limit(self, key_id: str, cooldown_seconds: int):
"""Add a key to rate limit cache"""
with self._lock:
self._cache[key_id] = time.time() + cooldown_seconds
logger.info(f"Added {key_id} to rate limit cache for {cooldown_seconds}s")
def is_rate_limited(self, key_id: str) -> bool:
"""Check if key is rate limited"""
with self._lock:
if key_id not in self._cache:
return False
if time.time() >= self._cache[key_id]:
# Expired, remove it
del self._cache[key_id]
return False
return True
def clear_expired(self):
"""Remove expired entries"""
with self._lock:
current_time = time.time()
expired = [k for k, v in self._cache.items() if current_time >= v]
for k in expired:
del self._cache[k]
def get_remaining_cooldown(self, key_id: str) -> float:
"""Get remaining cooldown time in seconds"""
with self._lock:
if key_id not in self._cache:
return 0
remaining = self._cache[key_id] - time.time()
return max(0, remaining)
class APIKeyEntry:
"""Enhanced API key entry with thread-safe operations"""
def __init__(self, api_key: str, model: str, cooldown: int = 60, enabled: bool = True,
google_credentials: str = None, azure_endpoint: str = None, google_region: str = None,
azure_api_version: str = None, use_individual_endpoint: bool = False):
self.api_key = api_key
self.model = model
self.cooldown = cooldown
self.enabled = enabled
self.google_credentials = google_credentials # Path to Google service account JSON
self.azure_endpoint = azure_endpoint # Azure endpoint URL (only used if use_individual_endpoint is True)
self.google_region = google_region # Google Cloud region (e.g., us-east5, us-central1)
self.azure_api_version = azure_api_version or '2025-01-01-preview' # Azure API version
self.use_individual_endpoint = use_individual_endpoint # Toggle to enable/disable individual endpoint
self.last_error_time = None
self.error_count = 0
self.success_count = 0
self.last_used_time = None
self.times_used = 0 # Incremented whenever this key is assigned/used
self.is_cooling_down = False
# Add lock for thread-safe modifications
self._lock = threading.Lock()
# Add test result storage
self.last_test_result = None
self.last_test_time = None
self.last_test_message = None
def is_available(self) -> bool:
with self._lock:
if not self.enabled:
return False
if self.last_error_time and self.is_cooling_down:
time_since_error = time.time() - self.last_error_time
if time_since_error < self.cooldown:
return False
else:
self.is_cooling_down = False
return True
def mark_error(self, error_code: int = None):
with self._lock:
self.error_count += 1
self.times_used = getattr(self, 'times_used', 0) + 1
self.last_error_time = time.time()
if error_code == 429:
self.is_cooling_down = True
def mark_success(self):
with self._lock:
self.success_count += 1
self.times_used = getattr(self, 'times_used', 0) + 1
self.last_used_time = time.time()
self.error_count = 0
def set_test_result(self, result: str, message: str = None):
"""Store test result"""
with self._lock:
self.last_test_result = result
self.last_test_time = time.time()
self.last_test_message = message
def to_dict(self):
"""Convert to dictionary for saving"""
return {
'api_key': self.api_key,
'model': self.model,
'cooldown': self.cooldown,
'enabled': self.enabled,
'google_credentials': self.google_credentials,
'azure_endpoint': self.azure_endpoint,
'google_region': self.google_region,
'azure_api_version': self.azure_api_version,
'use_individual_endpoint': self.use_individual_endpoint,
# Persist times used optionally (non-breaking if ignored elsewhere)
'times_used': getattr(self, 'times_used', 0)
}
class APIKeyPool:
"""Thread-safe API key pool with proper rotation"""
def __init__(self):
self.keys: List[APIKeyEntry] = []
self.lock = threading.Lock() # This already exists
self._rotation_index = 0
self._thread_assignments = {}
self._rate_limit_cache = RateLimitCache()
# NEW LOCKS:
self.key_locks = {} # Will be populated when keys are loaded
self.key_selection_lock = threading.Lock() # For coordinating key selection across threads
# Track which keys are currently being used by which threads
self._keys_in_use = {} # key_index -> set of thread_ids
self._usage_lock = threading.Lock()
def load_from_list(self, key_list: List[dict]):
with self.lock:
# Preserve existing counters by mapping old entries by (api_key, model)
old_map = {}
for old in getattr(self, 'keys', []):
key = (getattr(old, 'api_key', ''), getattr(old, 'model', ''))
old_map[key] = old
self.keys.clear()
self.key_locks.clear() # Clear existing locks
for i, key_data in enumerate(key_list):
api_key = key_data.get('api_key', '')
model = key_data.get('model', '')
entry = APIKeyEntry(
api_key=api_key,
model=model,
cooldown=key_data.get('cooldown', 60),
enabled=key_data.get('enabled', True),
google_credentials=key_data.get('google_credentials'),
azure_endpoint=key_data.get('azure_endpoint'),
google_region=key_data.get('google_region'),
azure_api_version=key_data.get('azure_api_version'),
use_individual_endpoint=key_data.get('use_individual_endpoint', False)
)
# Restore counters if we had this key before
old = old_map.get((api_key, model))
if old is not None:
try:
entry.success_count = getattr(old, 'success_count', entry.success_count)
entry.error_count = getattr(old, 'error_count', entry.error_count)
entry.times_used = getattr(old, 'times_used', getattr(old, 'success_count', 0) + getattr(old, 'error_count', 0))
entry.last_used_time = getattr(old, 'last_used_time', None)
entry.last_error_time = getattr(old, 'last_error_time', None)
entry.is_cooling_down = getattr(old, 'is_cooling_down', False)
entry.last_test_result = getattr(old, 'last_test_result', None)
entry.last_test_time = getattr(old, 'last_test_time', None)
entry.last_test_message = getattr(old, 'last_test_message', None)
except Exception:
pass
self.keys.append(entry)
# Create a lock for each key
self.key_locks[i] = threading.Lock()
# Keep rotation index if possible
if getattr(self, '_rotation_index', 0) >= len(self.keys):
self._rotation_index = 0
else:
self._rotation_index = getattr(self, '_rotation_index', 0)
self._keys_in_use.clear()
logger.info(f"Loaded {len(self.keys)} API keys into pool with individual locks (preserved counters where possible)")
def get_key_for_thread(self, force_rotation: bool = False,
rotation_frequency: int = 1) -> Optional[Tuple[APIKeyEntry, int, str]]:
"""Get a key for the current thread with proper rotation logic"""
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
# Clear expired rate limits first
self._rate_limit_cache.clear_expired()
# Use key_selection_lock for the entire selection process
with self.key_selection_lock:
if not self.keys:
return None
# Check if thread already has an assignment
if thread_id in self._thread_assignments and not force_rotation:
key_index, assignment_time = self._thread_assignments[thread_id]
if key_index < len(self.keys):
key = self.keys[key_index]
key_id = f"Key#{key_index+1} ({key.model})"
# Check if the assigned key is still available
# Use the key-specific lock for checking availability
with self.key_locks.get(key_index, threading.Lock()):
if key.is_available() and not self._rate_limit_cache.is_rate_limited(key_id):
logger.debug(f"[Thread-{thread_name}] Reusing assigned {key_id}")
# Track usage
with self._usage_lock:
if key_index not in self._keys_in_use:
self._keys_in_use[key_index] = set()
self._keys_in_use[key_index].add(thread_id)
return key, key_index, key_id
else:
# Remove invalid assignment
del self._thread_assignments[thread_id]
# Find next available key using round-robin
start_index = self._rotation_index
attempts = 0
while attempts < len(self.keys):
# Get current index and immediately increment for next thread
key_index = self._rotation_index
self._rotation_index = (self._rotation_index + 1) % len(self.keys)
key = self.keys[key_index]
key_id = f"Key#{key_index+1} ({key.model})"
# Use key-specific lock when checking and modifying key state
with self.key_locks.get(key_index, threading.Lock()):
if key.is_available() and not self._rate_limit_cache.is_rate_limited(key_id):
# Assign to thread
self._thread_assignments[thread_id] = (key_index, time.time())
# Increment usage counter on assignment
try:
key.times_used += 1
except Exception:
pass
# Track usage
with self._usage_lock:
if key_index not in self._keys_in_use:
self._keys_in_use[key_index] = set()
self._keys_in_use[key_index].add(thread_id)
# Clean up old assignments
current_time = time.time()
expired_threads = [
tid for tid, (_, ts) in self._thread_assignments.items()
if current_time - ts > 300 # 5 minutes
]
for tid in expired_threads:
del self._thread_assignments[tid]
# Remove from usage tracking
with self._usage_lock:
for k_idx in list(self._keys_in_use.keys()):
self._keys_in_use[k_idx].discard(tid)
if not self._keys_in_use[k_idx]:
del self._keys_in_use[k_idx]
logger.info(f"[Thread-{thread_name}] Assigned {key_id}")
time.sleep(0.5) # Brief pause to improve retry responsiveness
logger.debug("💤 Pausing briefly to improve retry responsiveness after key assignment")
return key, key_index, key_id
attempts += 1
# No available keys - find one with shortest cooldown
best_key_index = None
min_cooldown = float('inf')
for i, key in enumerate(self.keys):
if key.enabled: # At least check if enabled
key_id = f"Key#{i+1} ({key.model})"
remaining = self._rate_limit_cache.get_remaining_cooldown(key_id)
# Also check key's own cooldown
if key.is_cooling_down and key.last_error_time:
key_cooldown = key.cooldown - (time.time() - key.last_error_time)
remaining = max(remaining, key_cooldown)
if remaining < min_cooldown:
min_cooldown = remaining
best_key_index = i
if best_key_index is not None:
key = self.keys[best_key_index]
key_id = f"Key#{best_key_index+1} ({key.model})"
logger.warning(f"[Thread-{thread_name}] All keys on cooldown, using {key_id} (cooldown: {min_cooldown:.1f}s)")
self._thread_assignments[thread_id] = (best_key_index, time.time())
time.sleep(0.5) # Brief pause to improve retry responsiveness
logger.debug("💤 Pausing briefly to improve retry responsiveness after cooldown key selection")
return key, best_key_index, key_id
logger.error(f"[Thread-{thread_name}] No keys available at all")
return None
def mark_key_error(self, key_index: int, error_code: int = None):
"""Mark a key as having an error (thread-safe with key-specific lock)"""
if 0 <= key_index < len(self.keys):
# Use key-specific lock for this operation
with self.key_locks.get(key_index, threading.Lock()):
# Mark error on the key itself
self.keys[key_index].mark_error(error_code)
# Add to rate limit cache if it's a 429
if error_code == 429:
key = self.keys[key_index]
key_id = f"Key#{key_index+1} ({key.model})"
self._rate_limit_cache.add_rate_limit(key_id, key.cooldown)
print(f"Marked key {key_id} with an error code")
time.sleep(0.5) # Brief pause to improve retry responsiveness
logger.debug("💤 Pausing briefly to improve retry responsiveness after marking key error")
def mark_key_success(self, key_index: int):
"""Mark a key as successful (thread-safe with key-specific lock)"""
if 0 <= key_index < len(self.keys):
# Use key-specific lock for this operation
with self.key_locks.get(key_index, threading.Lock()):
self.keys[key_index].mark_success()
key = self.keys[key_index]
print(f"Marked key {key_index} ({key.model}) as successful")
def release_thread_assignment(self, thread_id: int = None):
"""Release key assignment for a thread"""
if thread_id is None:
thread_id = threading.current_thread().ident
with self.key_selection_lock:
# Remove from assignments
if thread_id in self._thread_assignments:
key_index, _ = self._thread_assignments[thread_id]
del self._thread_assignments[thread_id]
# Remove from usage tracking
with self._usage_lock:
if key_index in self._keys_in_use:
self._keys_in_use[key_index].discard(thread_id)
if not self._keys_in_use[key_index]:
del self._keys_in_use[key_index]
print(f"Released key assignment for thread {thread_id}")
def get_all_keys(self) -> List[APIKeyEntry]:
"""Get all keys in the pool"""
with self.lock:
return self.keys.copy()
@property
def current_index(self):
"""Get the current rotation index"""
with self.lock:
return self._rotation_index
@current_index.setter
def current_index(self, value: int):
"""Set the current rotation index"""
with self.lock:
if self.keys:
self._rotation_index = value % len(self.keys)
else:
self._rotation_index = 0
def add_key(self, key_entry: APIKeyEntry):
"""Add a new key to the pool"""
with self.lock:
self.keys.append(key_entry)
logger.info(f"Added key for model {key_entry.model} to pool")
def remove_key(self, index: int):
"""Remove a key from the pool by index"""
with self.lock:
if 0 <= index < len(self.keys):
removed_key = self.keys.pop(index)
# Clean up any thread assignments for this key
threads_to_remove = []
for thread_id, (key_index, _) in self._thread_assignments.items():
if key_index == index:
threads_to_remove.append(thread_id)
elif key_index > index:
# Adjust indices for keys after the removed one
self._thread_assignments[thread_id] = (key_index - 1, self._thread_assignments[thread_id][1])
for thread_id in threads_to_remove:
del self._thread_assignments[thread_id]
# Reset rotation index if needed
if self._rotation_index >= len(self.keys) and len(self.keys) > 0:
self._rotation_index = 0
logger.info(f"Removed key for model {removed_key.model} from pool")
class MultiAPIKeyDialog:
"""Dialog for managing multiple API keys"""
def __init__(self, parent, translator_gui):
self.parent = parent
self.translator_gui = translator_gui
self.dialog = None
# Keep a reference for icon image to avoid GC
self._icon_photo_ref = None
self.key_pool = APIKeyPool()
self.tree = None
self.test_results = queue.Queue()
# Attempt to bind to UnifiedClient's shared pool so UI reflects live usage
self._bind_shared_pool()
# Load existing keys from config
self._load_keys_from_config()
# Create and show dialog
self._create_dialog()
# Auto-resize to fit content if WindowManager is available
if hasattr(self.translator_gui, 'wm') and hasattr(self, 'canvas'):
self.translator_gui.wm.auto_resize_dialog(self.dialog, self.canvas,
max_width_ratio=0.9,
max_height_ratio=1.55)
def _set_icon(self, window):
"""Set Halgakos.ico as window icon if available."""
try:
base_dir = getattr(self.translator_gui, 'base_dir', os.getcwd())
ico_path = os.path.join(base_dir, 'Halgakos.ico')
if os.path.isfile(ico_path):
try:
window.iconbitmap(ico_path)
except Exception:
pass
# Try iconphoto for better scaling
try:
from PIL import Image, ImageTk
img = Image.open(ico_path)
if img.mode != 'RGBA':
img = img.convert('RGBA')
self._icon_photo_ref = ImageTk.PhotoImage(img)
window.iconphoto(False, self._icon_photo_ref)
except Exception:
pass
except Exception:
pass
def _bind_shared_pool(self):
"""Bind this dialog to the UnifiedClient's shared APIKeyPool if available.
If UnifiedClient has no pool yet, register our pool as the shared pool.
This keeps Times Used and other counters in sync across UI and runtime.
"""
try:
from unified_api_client import UnifiedClient
# If UC already has a pool, use it; otherwise share ours
if getattr(UnifiedClient, '_api_key_pool', None) is not None:
self.key_pool = UnifiedClient._api_key_pool
else:
UnifiedClient._api_key_pool = self.key_pool
except Exception:
# If import fails (early load), continue with local pool
pass
def _load_keys_from_config(self):
"""Load API keys from translator GUI config"""
if hasattr(self.translator_gui, 'config'):
multi_api_keys = self.translator_gui.config.get('multi_api_keys', [])
self.key_pool.load_from_list(multi_api_keys)
def _update_rotation_display(self, *args):
"""Update the rotation description based on settings"""
if self.force_rotation_var.get():
freq = self.rotation_frequency_var.get()
if freq == 1:
desc = "Keys will rotate on every request (maximum distribution)"
else:
desc = f"Keys will rotate every {freq} requests"
else:
desc = "Keys will only rotate on errors or rate limits"
self.rotation_desc_label.config(text=desc)
def _save_keys_to_config(self):
"""Save API keys and rotation settings to translator GUI config"""
if hasattr(self.translator_gui, 'config'):
# Convert keys to list of dicts
key_list = [key.to_dict() for key in self.key_pool.get_all_keys()]
self.translator_gui.config['multi_api_keys'] = key_list
# Save fallback settings
self.translator_gui.config['use_fallback_keys'] = self.use_fallback_var.get()
# Update the parent GUI's variable to stay in sync
if hasattr(self.translator_gui, 'use_fallback_keys_var'):
self.translator_gui.use_fallback_keys_var.set(self.use_fallback_var.get())
# Fallback keys are already saved when added/removed
# Use the current state of the toggle
self.translator_gui.config['use_multi_api_keys'] = self.enabled_var.get()
# Save rotation settings
self.translator_gui.config['force_key_rotation'] = self.force_rotation_var.get()
self.translator_gui.config['rotation_frequency'] = self.rotation_frequency_var.get()
# Save config
self.translator_gui.save_config(show_message=False)
def _create_dialog(self):
"""Create the main dialog using WindowManager"""
# Use WindowManager to create scrollable dialog
if hasattr(self.translator_gui, 'wm'):
self.dialog, scrollable_frame, self.canvas = self.translator_gui.wm.setup_scrollable(
self.parent,
"Multi API Key Manager",
width=900,
height=700,
max_width_ratio=0.9,
max_height_ratio=1.45
)
else:
# Fallback to regular dialog
self.dialog = tk.Toplevel(self.parent)
self.dialog.title("Multi API Key Manager")
self.dialog.geometry("900x700")
scrollable_frame = self.dialog
self.canvas = None
# Main container with consistent padding
main_frame = tk.Frame(scrollable_frame, padx=20, pady=20)
main_frame.pack(fill=tk.BOTH, expand=True)
# Store references
self.main_frame = main_frame
self.scrollable_frame = scrollable_frame
# Title and description
title_frame = tk.Frame(main_frame)
title_frame.pack(fill=tk.X, pady=(0, 10))
tk.Label(title_frame, text="Multi API Key Management",
font=('TkDefaultFont', 16, 'bold')).pack(side=tk.LEFT)
# Enable/Disable toggle
self.enabled_var = tk.BooleanVar(value=self.translator_gui.config.get('use_multi_api_keys', False))
tb.Checkbutton(title_frame, text="Enable Multi-Key Mode",
variable=self.enabled_var,
bootstyle="round-toggle",
command=self._toggle_multi_key_mode).pack(side=tk.RIGHT, padx=(20, 0))
tk.Label(main_frame,
text="Manage multiple API keys with automatic rotation and rate limit handling.\n"
"Keys can be rotated automatically to distribute load evenly.\n"
"Rate-limited keys are automatically cooled down and skipped in rotation.",
font=('TkDefaultFont', 10), fg='gray', justify=tk.LEFT).pack(anchor=tk.W, pady=(0, 15))
# Rotation settings frame
rotation_frame = tk.LabelFrame(main_frame, text="Rotation Settings", padx=15, pady=10)
rotation_frame.pack(fill=tk.X, pady=(0, 15))
# Force rotation toggle
rotation_settings = tk.Frame(rotation_frame)
rotation_settings.pack(fill=tk.X)
self.force_rotation_var = tk.BooleanVar(value=self.translator_gui.config.get('force_key_rotation', True))
tb.Checkbutton(rotation_settings, text="Force Key Rotation",
variable=self.force_rotation_var,
bootstyle="round-toggle",
command=self._update_rotation_display).pack(side=tk.LEFT)
# Rotation frequency
tk.Label(rotation_settings, text="Every").pack(side=tk.LEFT, padx=(20, 5))
self.rotation_frequency_var = tk.IntVar(value=self.translator_gui.config.get('rotation_frequency', 1))
frequency_spinbox = tb.Spinbox(rotation_settings, from_=1, to=100,
textvariable=self.rotation_frequency_var,
width=5, command=self._update_rotation_display)
frequency_spinbox.pack(side=tk.LEFT)
# Disable mouse wheel changing values (use main GUI helper if available)
try:
if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'):
self.translator_gui.ui.disable_spinbox_mousewheel(frequency_spinbox)
except Exception:
pass
tk.Label(rotation_settings, text="requests").pack(side=tk.LEFT, padx=(5, 0))
# Rotation description
self.rotation_desc_label = tk.Label(rotation_frame,
text="",
font=('TkDefaultFont', 9), fg='blue')
self.rotation_desc_label.pack(anchor=tk.W, pady=(5, 0))
self._update_rotation_display()
# Add key section
self._create_add_key_section(main_frame)
# Separator
ttk.Separator(main_frame, orient='horizontal').pack(fill=tk.X, pady=15)
# Key list section
self._create_key_list_section(main_frame)
# Create fallback container (hidden by default)
self._create_fallback_section(main_frame)
# Button bar at the bottom
self._create_button_bar(main_frame)
# Load existing keys into tree
self._refresh_key_list()
# Center dialog
self.dialog.transient(self.parent)
# Handle window close
self.dialog.protocol("WM_DELETE_WINDOW", self._on_close)
def _create_fallback_section(self, parent):
"""Create the fallback keys section at the bottom"""
# Container that can be hidden
self.fallback_container = tk.Frame(parent)
# Always show fallback section (works in both single and multi-key mode)
self.fallback_container.pack(fill=tk.X, pady=(10, 0))
# Separator
ttk.Separator(self.fallback_container, orient='horizontal').pack(fill=tk.X, pady=(0, 10))
# Main fallback frame
fallback_frame = tk.LabelFrame(self.fallback_container,
text="Fallback Keys (For Prohibited Content)",
padx=15, pady=15)
fallback_frame.pack(fill=tk.X)
# Description
tk.Label(fallback_frame,
text="Configure fallback keys that will be used when content is blocked.\n"
"These should use different API keys or models that are less restrictive.\n"
"In Multi-Key Mode: tried when main rotation encounters prohibited content.\n"
"In Single-Key Mode: tried directly when main key fails, bypassing main key retry.",
font=('TkDefaultFont', 10), fg='gray', justify=tk.LEFT).pack(anchor=tk.W, pady=(0, 10))
# Enable fallback checkbox
self.use_fallback_var = tk.BooleanVar(value=self.translator_gui.config.get('use_fallback_keys', False))
tb.Checkbutton(fallback_frame, text="Enable Fallback Keys",
variable=self.use_fallback_var,
bootstyle="round-toggle",
command=self._toggle_fallback_section).pack(anchor=tk.W, pady=(0, 10))
# Add fallback key section
add_fallback_frame = tk.Frame(fallback_frame)
add_fallback_frame.pack(fill=tk.X, pady=(0, 10))
# Configure grid for more columns
add_fallback_frame.columnconfigure(1, weight=1)
add_fallback_frame.columnconfigure(4, weight=1)
# Don't give weight to column 3 to keep labels close to fields
# Row 0: Fallback API Key and Model
tk.Label(add_fallback_frame, text="Fallback API Key:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10), pady=5)
self.fallback_key_var = tk.StringVar()
self.fallback_key_entry = tb.Entry(add_fallback_frame, textvariable=self.fallback_key_var, show='*')
self.fallback_key_entry.grid(row=0, column=1, sticky=tk.EW, pady=5)
# Toggle fallback visibility
self.show_fallback_btn = tb.Button(add_fallback_frame, text="👁", width=3,
command=self._toggle_fallback_visibility)
self.show_fallback_btn.grid(row=0, column=2, padx=5, pady=5)
# Fallback Model
tk.Label(add_fallback_frame, text="Model:").grid(row=0, column=3, sticky=tk.W, padx=(20, 10), pady=5)
self.fallback_model_var = tk.StringVar()
fallback_models = get_model_options()
self.fallback_model_combo = tb.Combobox(add_fallback_frame, textvariable=self.fallback_model_var,
values=fallback_models, state='normal')
self.fallback_model_combo.grid(row=0, column=4, sticky=tk.EW, pady=5)
# Block mouse wheel on combobox to avoid accidental changes
try:
if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'):
self.translator_gui.ui.disable_spinbox_mousewheel(self.fallback_model_combo)
except Exception:
pass
# Attach gentle autofill
self._attach_model_autofill(self.fallback_model_combo, self.fallback_model_var)
# Add fallback button
tb.Button(add_fallback_frame, text="Add Fallback Key",
command=self._add_fallback_key,
bootstyle="info").grid(row=0, column=5, sticky=tk.E, padx=(10, 0), pady=5)
# Row 1: Google Credentials (optional, discretely styled)
tk.Label(add_fallback_frame, text="Google Creds:", font=('TkDefaultFont', 8),
fg='gray').grid(row=1, column=0, sticky=tk.W, padx=(0, 10), pady=2)
self.fallback_google_creds_var = tk.StringVar()
self.fallback_google_creds_entry = tb.Entry(add_fallback_frame, textvariable=self.fallback_google_creds_var,
font=('TkDefaultFont', 7), state='normal')
self.fallback_google_creds_entry.grid(row=1, column=1, sticky=tk.EW, pady=2)
# Google credentials browse button (moved closer)
tb.Button(add_fallback_frame, text="📁", width=3,
command=self._browse_fallback_google_credentials,
bootstyle="secondary-outline").grid(row=1, column=2, padx=(5, 0), pady=2)
# Google region field for fallback
tk.Label(add_fallback_frame, text="Region:", font=('TkDefaultFont', 10),
fg='gray').grid(row=1, column=3, sticky=tk.W, padx=(10, 5), pady=2)
self.fallback_google_region_var = tk.StringVar(value='us-east5') # Default region
self.fallback_google_region_entry = tb.Entry(add_fallback_frame, textvariable=self.fallback_google_region_var,
font=('TkDefaultFont', 7), state='normal', width=10)
self.fallback_google_region_entry.grid(row=1, column=4, sticky=tk.W, pady=2)
# Row 2: Azure Endpoint (optional, discretely styled)
tk.Label(add_fallback_frame, text="Azure Endpoint:", font=('TkDefaultFont', 8),
fg='gray').grid(row=2, column=0, sticky=tk.W, padx=(0, 10), pady=2)
self.fallback_azure_endpoint_var = tk.StringVar()
self.fallback_azure_endpoint_entry = tb.Entry(add_fallback_frame, textvariable=self.fallback_azure_endpoint_var,
font=('TkDefaultFont', 7), state='normal')
self.fallback_azure_endpoint_entry.grid(row=2, column=1, columnspan=2, sticky=tk.EW, pady=2)
# Azure API Version for fallback (small dropdown)
tk.Label(add_fallback_frame, text="API Ver:", font=('TkDefaultFont', 10),
fg='gray').grid(row=2, column=3, sticky=tk.W, padx=(10, 5), pady=2)
self.fallback_azure_api_version_var = tk.StringVar(value='2025-01-01-preview')
fallback_azure_versions = [
'2025-01-01-preview',
'2024-12-01-preview',
'2024-10-01-preview',
'2024-08-01-preview',
'2024-06-01',
'2024-02-01',
'2023-12-01-preview'
]
self.fallback_azure_api_version_combo = ttk.Combobox(add_fallback_frame,
textvariable=self.fallback_azure_api_version_var,
values=fallback_azure_versions, width=18,
state='normal', font=('TkDefaultFont', 7))
self.fallback_azure_api_version_combo.grid(row=2, column=4, sticky=tk.W, pady=2)
# Block mouse wheel on version combobox
try:
if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'):
self.translator_gui.ui.disable_spinbox_mousewheel(self.fallback_azure_api_version_combo)
except Exception:
pass
# Fallback keys list
self._create_fallback_list(fallback_frame)
# Initially disable if checkbox is unchecked
self._toggle_fallback_section()
def _create_fallback_list(self, parent):
"""Create the fallback keys list"""
list_frame = tk.Frame(parent)
list_frame.pack(fill=tk.BOTH, expand=True)
# Label
tk.Label(list_frame, text="Fallback Keys (tried in order):",
font=('TkDefaultFont', 10, 'bold')).pack(anchor=tk.W, pady=(10, 5))
# Container for tree and buttons
container = tk.Frame(list_frame)
container.pack(fill=tk.BOTH, expand=True)
# Left side: Move buttons
move_frame = tk.Frame(container)
move_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5))
tk.Label(move_frame, text="Order", font=('TkDefaultFont', 9, 'bold')).pack(pady=(0, 5))
tb.Button(move_frame, text="↑", width=3,
command=lambda: self._move_fallback_key('up'),
bootstyle="secondary-outline").pack(pady=2)
tb.Button(move_frame, text="↓", width=3,
command=lambda: self._move_fallback_key('down'),
bootstyle="secondary-outline").pack(pady=2)
# Right side: Treeview
tree_container = tk.Frame(container)
tree_container.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# Scrollbar
scrollbar = ttk.Scrollbar(tree_container, orient=tk.VERTICAL)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Treeview for fallback keys
columns = ('Model', 'Status', 'Times Used')
self.fallback_tree = ttk.Treeview(tree_container, columns=columns, show='tree headings',
yscrollcommand=scrollbar.set, height=5)
self.fallback_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.config(command=self.fallback_tree.yview)
self.fallback_tree.bind('<Button-3>', self._show_fallback_context_menu)
# Configure columns
self.fallback_tree.heading('#0', text='API Key', anchor='w')
self.fallback_tree.column('#0', width=220, minwidth=160, anchor='w')
self.fallback_tree.heading('Model', text='Model', anchor='w')
self.fallback_tree.column('Model', width=220, minwidth=140, anchor='w')
self.fallback_tree.heading('Status', text='Status', anchor='center')
self.fallback_tree.column('Status', width=120, minwidth=80, anchor='center')
self.fallback_tree.heading('Times Used', text='Times Used', anchor='center')
self.fallback_tree.column('Times Used', width=100, minwidth=70, anchor='center')
# Action buttons - store reference for toggling
self.fallback_action_frame = tk.Frame(list_frame)
self.fallback_action_frame.pack(fill=tk.X, pady=(10, 0))
tb.Button(self.fallback_action_frame, text="Test Selected",
command=self._test_selected_fallback,
bootstyle="warning").pack(side=tk.LEFT, padx=(0, 5))
tb.Button(self.fallback_action_frame, text="Test All",
command=self._test_all_fallbacks,
bootstyle="warning").pack(side=tk.LEFT, padx=5)
tb.Button(self.fallback_action_frame, text="Remove Selected",
command=self._remove_selected_fallback,
bootstyle="danger").pack(side=tk.LEFT, padx=5)
tb.Button(self.fallback_action_frame, text="Clear All",
command=self._clear_all_fallbacks,
bootstyle="danger-outline").pack(side=tk.LEFT, padx=5)
# Load existing fallback keys
self._load_fallback_keys()
def _test_all_fallbacks(self):
"""Test all fallback keys"""
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
if not fallback_keys:
messagebox.showwarning("Warning", "No fallback keys to test")
return
# Update UI to show testing status for all keys
items = self.fallback_tree.get_children()
for item in items:
values = list(self.fallback_tree.item(item, 'values'))
values[1] = "⏳ Testing..."
self.fallback_tree.item(item, values=values)
# Run tests in thread for all fallback keys
# Ensure UnifiedClient uses the same shared pool instance
try:
from unified_api_client import UnifiedClient
UnifiedClient._api_key_pool = self.key_pool
except Exception:
pass
thread = threading.Thread(target=self._test_all_fallback_keys_batch)
thread.daemon = True
thread.start()
def _test_all_fallback_keys_batch(self):
"""Test all fallback keys in batch"""
from unified_api_client import UnifiedClient
from concurrent.futures import ThreadPoolExecutor, as_completed
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
def test_single_key(index, key_data):
"""Test a single fallback key"""
api_key = key_data.get('api_key', '')
model = key_data.get('model', '')
try:
client = UnifiedClient(
api_key=api_key,
model=model,
output_dir=None
)
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say 'API test successful' and nothing else."}
]
response = client.send(
messages,
temperature=0.7,
max_tokens=100
)
if response and isinstance(response, tuple):
content, _ = response
if content and "test successful" in content.lower():
return (index, True, "Passed")
except Exception as e:
print(f"Fallback key test failed: {e}")
return (index, False, str(e)[:30])
return (index, False, "Failed")
# Test all keys in parallel
with ThreadPoolExecutor(max_workers=min(5, len(fallback_keys))) as executor:
futures = []
for i, key_data in enumerate(fallback_keys):
future = executor.submit(test_single_key, i, key_data)
futures.append(future)
# Process results as they complete
for future in as_completed(futures):
result = future.result()
if result:
index, success, message = result
# Update UI in main thread
self.dialog.after(0, lambda idx=index, s=success:
self._update_fallback_test_result(idx, s))
# Show completion message
successful = sum(1 for future in futures if future.result() and future.result()[1])
total = len(fallback_keys)
self.dialog.after(0, lambda: self._show_status(
f"Fallback test complete: {successful}/{total} passed"))
def _show_fallback_context_menu(self, event):
"""Show context menu for fallback keys - includes model name editing"""
# Select item under cursor
item = self.fallback_tree.identify_row(event.y)
if item:
# If the clicked item is not in selection, select only it
if item not in self.fallback_tree.selection():
self.fallback_tree.selection_set(item)
# Create context menu
menu = tk.Menu(self.dialog, tearoff=0)
# Get index for position info
index = self.fallback_tree.index(item)
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
total = len(fallback_keys)
# Reorder submenu
if total > 1: # Only show reorder if there's more than one key
reorder_menu = tk.Menu(menu, tearoff=0)
if index > 0:
reorder_menu.add_command(label="Move Up",
command=lambda: self._move_fallback_key('up'))
if index < total - 1:
reorder_menu.add_command(label="Move Down",
command=lambda: self._move_fallback_key('down'))
menu.add_cascade(label="Reorder", menu=reorder_menu)
menu.add_separator()
# Add Change Model option
selected_count = len(self.fallback_tree.selection())
if selected_count > 1:
menu.add_command(label=f"Change Model ({selected_count} selected)",
command=self._change_fallback_model_for_selected)
else:
menu.add_command(label="Change Model",
command=self._change_fallback_model_for_selected)
menu.add_separator()
# Test and Remove options
menu.add_command(label="Test", command=self._test_selected_fallback)
menu.add_separator()
menu.add_command(label="Remove", command=self._remove_selected_fallback)
if total > 1:
menu.add_command(label="Clear All", command=self._clear_all_fallbacks)
# Show menu
menu.post(event.x_root, event.y_root)
def _change_fallback_model_for_selected(self):
"""Change model name for selected fallback keys"""
selected = self.fallback_tree.selection()
if not selected:
return
# Get fallback keys
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
# Create simple dialog (same style as main tree)
dialog = tk.Toplevel(self.dialog)
dialog.title(f"Change Model for {len(selected)} Fallback Keys")
dialog.geometry("400x130")
dialog.transient(self.dialog)
# Set icon
self._set_icon(dialog)
# Center the dialog
dialog.update_idletasks()
x = (dialog.winfo_screenwidth() // 2) - (dialog.winfo_width() // 2)
y = (dialog.winfo_screenheight() // 2) - (dialog.winfo_height() // 2)
dialog.geometry(f"+{x}+{y}")
# Main frame
main_frame = tk.Frame(dialog, padx=20, pady=20)
main_frame.pack(fill=tk.BOTH, expand=True)
# Label
tk.Label(main_frame, text="Enter new model name (press Enter to apply):",
font=('TkDefaultFont', 10)).pack(pady=(0, 10))
# Model entry with dropdown
model_var = tk.StringVar()
# Full model list (same as main GUI)
all_models = get_model_options()
model_combo = ttk.Combobox(main_frame, values=all_models,
textvariable=model_var, width=45, height=12)
model_combo.pack(pady=(0, 10))
# Block mouse wheel on combobox
try:
if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'):
self.translator_gui.ui.disable_spinbox_mousewheel(model_combo)
except Exception:
pass
# Attach gentle autofill
self._attach_model_autofill(model_combo, model_var)
# Get current model from first selected item as default
selected_indices = [self.fallback_tree.index(item) for item in selected]
if selected_indices and selected_indices[0] < len(fallback_keys):
current_model = fallback_keys[selected_indices[0]].get('model', '')
model_var.set(current_model)
model_combo.select_range(0, tk.END) # Select all text for easy replacement
def apply_change(event=None):
new_model = model_var.get().strip()
if new_model:
# Update all selected fallback keys
for item in selected:
index = self.fallback_tree.index(item)
if index < len(fallback_keys):
fallback_keys[index]['model'] = new_model
# Save to config
self.translator_gui.config['fallback_keys'] = fallback_keys
self.translator_gui.save_config(show_message=False)
# Reload the list
self._load_fallback_keys()
# Show status
self._show_status(f"Changed model to '{new_model}' for {len(selected)} fallback keys")
dialog.destroy()
# Focus on the combobox
model_combo.focus()
# Bind Enter key to apply
dialog.bind('<Return>', apply_change)
model_combo.bind('<Return>', apply_change)
dialog.bind('<Escape>', lambda e: dialog.destroy())
def _load_fallback_keys(self):
"""Load fallback keys from config"""
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
# Clear tree
for item in self.fallback_tree.get_children():
self.fallback_tree.delete(item)
# Add keys to tree
for key_data in fallback_keys:
api_key = key_data.get('api_key', '')
model = key_data.get('model', '')
times_used = int(key_data.get('times_used', 0))
# Mask API key
masked_key = api_key[:8] + "..." + api_key[-4:] if len(api_key) > 12 else api_key
# Insert into tree
self.fallback_tree.insert('', 'end',
text=masked_key,
values=(model, "Not tested", times_used),
tags=('untested',))
# Configure tags
self.fallback_tree.tag_configure('untested', foreground='gray')
self.fallback_tree.tag_configure('testing', foreground='blue', font=('TkDefaultFont', 10, 'bold'))
self.fallback_tree.tag_configure('passed', foreground='green')
self.fallback_tree.tag_configure('failed', foreground='red')
def _add_fallback_key(self):
"""Add a new fallback key with optional Google credentials and Azure endpoint"""
api_key = self.fallback_key_var.get().strip()
model = self.fallback_model_var.get().strip()
google_credentials = self.fallback_google_creds_var.get().strip() or None
azure_endpoint = self.fallback_azure_endpoint_var.get().strip() or None
google_region = self.fallback_google_region_var.get().strip() or None
azure_api_version = self.fallback_azure_api_version_var.get().strip() or None
if not api_key or not model:
messagebox.showerror("Error", "Please enter both API key and model name")
return
# Get current fallback keys
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
# Add new key with additional fields
fallback_keys.append({
'api_key': api_key,
'model': model,
'google_credentials': google_credentials,
'azure_endpoint': azure_endpoint,
'google_region': google_region,
'azure_api_version': azure_api_version,
'times_used': 0
})
# Save to config
self.translator_gui.config['fallback_keys'] = fallback_keys
self.translator_gui.save_config(show_message=False)
# Clear inputs
self.fallback_key_var.set("")
self.fallback_model_var.set("")
self.fallback_google_creds_var.set("")
self.fallback_azure_endpoint_var.set("")
self.fallback_google_region_var.set("us-east5")
self.fallback_azure_api_version_var.set('2025-01-01-preview')
# Reload list
self._load_fallback_keys()
# Show success
extras = []
if google_credentials:
extras.append(f"Google: {os.path.basename(google_credentials)}")
if azure_endpoint:
extras.append(f"Azure: {azure_endpoint[:30]}...")
extra_info = f" ({', '.join(extras)})" if extras else ""
self._show_status(f"Added fallback key for model: {model}{extra_info}")
def _move_fallback_key(self, direction):
"""Move selected fallback key up or down"""
selected = self.fallback_tree.selection()
if not selected:
return
item = selected[0]
index = self.fallback_tree.index(item)
# Get current fallback keys
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
if index >= len(fallback_keys):
return
new_index = index
if direction == 'up' and index > 0:
new_index = index - 1
elif direction == 'down' and index < len(fallback_keys) - 1:
new_index = index + 1
if new_index != index:
# Swap keys
fallback_keys[index], fallback_keys[new_index] = fallback_keys[new_index], fallback_keys[index]
# Save to config
self.translator_gui.config['fallback_keys'] = fallback_keys
self.translator_gui.save_config(show_message=False)
# Reload list
self._load_fallback_keys()
# Reselect item
items = self.fallback_tree.get_children()
if new_index < len(items):
self.fallback_tree.selection_set(items[new_index])
self.fallback_tree.focus(items[new_index])
def _test_selected_fallback(self):
"""Test selected fallback key"""
selected = self.fallback_tree.selection()
if not selected:
messagebox.showwarning("Warning", "Please select a fallback key to test")
return
index = self.fallback_tree.index(selected[0])
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
if index >= len(fallback_keys):
return
# Update UI to show testing status immediately
items = self.fallback_tree.get_children()
if index < len(items):
item = items[index]
values = list(self.fallback_tree.item(item, 'values'))
values[1] = "⏳ Testing..."
self.fallback_tree.item(item, values=values)
key_data = fallback_keys[index]
# Ensure UnifiedClient uses the same shared pool instance
try:
from unified_api_client import UnifiedClient
UnifiedClient._api_key_pool = self.key_pool
except Exception:
pass
# Run test in thread
thread = threading.Thread(target=self._test_single_fallback_key, args=(key_data, index))
thread.daemon = True
thread.start()
def _update_fallback_test_result(self, index, success):
"""Update fallback tree item with test result and bump times used"""
# Increment times_used in config
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
if index < len(fallback_keys):
try:
fallback_keys[index]['times_used'] = int(fallback_keys[index].get('times_used', 0)) + 1
# Persist
self.translator_gui.config['fallback_keys'] = fallback_keys
self.translator_gui.save_config(show_message=False)
except Exception:
pass
items = self.fallback_tree.get_children()
if index < len(items):
item = items[index]
values = list(self.fallback_tree.item(item, 'values'))
# Update status
if len(values) < 3:
values = values + [0] * (3 - len(values))
values[1] = "✅ Passed" if success else "❌ Failed"
# Update times used cell
try:
values[2] = int(values[2]) + 1
except Exception:
values[2] = 1
self.fallback_tree.item(item, values=values)
def _test_single_fallback_key(self, key_data, index):
"""Test a single fallback key"""
from unified_api_client import UnifiedClient
api_key = key_data.get('api_key', '')
model = key_data.get('model', '')
try:
client = UnifiedClient(
api_key=api_key,
model=model,
output_dir=None
)
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say 'API test successful' and nothing else."}
]
response = client.send(
messages,
temperature=0.7,
max_tokens=100
)
if response and isinstance(response, tuple):
content, _ = response
if content and "test successful" in content.lower():
# Update tree item to show success
self.dialog.after(0, lambda: self._update_fallback_test_result(index, True))
return
except Exception as e:
print(f"Fallback key test failed: {e}")
# Update tree item to show failure
self.dialog.after(0, lambda: self._update_fallback_test_result(index, False))
def _remove_selected_fallback(self):
"""Remove selected fallback key"""
selected = self.fallback_tree.selection()
if not selected:
return
if messagebox.askyesno("Confirm", "Remove selected fallback key?"):
index = self.fallback_tree.index(selected[0])
# Get current fallback keys
fallback_keys = self.translator_gui.config.get('fallback_keys', [])
if index < len(fallback_keys):
del fallback_keys[index]
# Save to config
self.translator_gui.config['fallback_keys'] = fallback_keys
self.translator_gui.save_config(show_message=False)
# Reload list
self._load_fallback_keys()
self._show_status("Removed fallback key")
def _clear_all_fallbacks(self):
"""Clear all fallback keys"""
if not self.fallback_tree.get_children():
return
if messagebox.askyesno("Confirm", "Remove ALL fallback keys?"):
# Clear fallback keys
self.translator_gui.config['fallback_keys'] = []
self.translator_gui.save_config(show_message=False)
# Reload list
self._load_fallback_keys()
self._show_status("Cleared all fallback keys")
def _toggle_fallback_section(self):
"""Enable/disable fallback section based on checkbox"""
enabled = self.use_fallback_var.get()
if enabled:
# Show and enable all fallback widgets
state = tk.NORMAL
# Enable input widgets
self.fallback_key_entry.config(state=state)
self.fallback_model_combo.config(state=state)
self.fallback_google_creds_entry.config(state=state)
self.fallback_google_region_entry.config(state=state)
self.fallback_azure_endpoint_entry.config(state=state)
self.show_fallback_btn.config(state=state)
# Enable add button
for widget in self.fallback_key_entry.master.winfo_children():
if isinstance(widget, tb.Button) and "Add Fallback" in str(widget.cget('text')):
widget.config(state=state)
# Show the tree container
self.fallback_tree.master.master.pack(fill=tk.BOTH, expand=True)
# Show the action buttons frame
if hasattr(self, 'fallback_action_frame'):
self.fallback_action_frame.pack(fill=tk.X, pady=(10, 0))
else:
# Hide and disable all fallback widgets
state = tk.DISABLED
# Disable input widgets
self.fallback_key_entry.config(state=state)
self.fallback_model_combo.config(state=state)
self.fallback_google_creds_entry.config(state=state)
self.fallback_google_region_entry.config(state=state)
self.fallback_azure_endpoint_entry.config(state=state)
self.show_fallback_btn.config(state=state)
# Disable add button
for widget in self.fallback_key_entry.master.winfo_children():
if isinstance(widget, tb.Button) and "Add Fallback" in str(widget.cget('text')):
widget.config(state=state)
# Hide the tree container
self.fallback_tree.master.master.pack_forget()
# Hide the action buttons frame
if hasattr(self, 'fallback_action_frame'):
self.fallback_action_frame.pack_forget()
# Clear selection
self.fallback_tree.selection_remove(*self.fallback_tree.selection())
def _toggle_fallback_visibility(self):
"""Toggle fallback key visibility"""
if self.fallback_key_entry.cget('show') == '*':
self.fallback_key_entry.config(show='')
self.show_fallback_btn.config(text='🔒')
else:
self.fallback_key_entry.config(show='*')
self.show_fallback_btn.config(text='👁')
def _create_button_bar(self, parent):
"""Create the bottom button bar"""
self.button_frame = tk.Frame(parent)
self.button_frame.pack(fill=tk.X, pady=(20, 0))
# Save button
tb.Button(self.button_frame, text="Save & Close", command=self._save_and_close,
bootstyle="success").pack(side=tk.RIGHT, padx=(5, 0))
# Cancel button
tb.Button(self.button_frame, text="Cancel", command=self._on_close,
bootstyle="secondary").pack(side=tk.RIGHT)
# Import/Export
tb.Button(self.button_frame, text="Import", command=self._import_keys,
bootstyle="info-outline").pack(side=tk.LEFT, padx=(0, 5))
tb.Button(self.button_frame, text="Export", command=self._export_keys,
bootstyle="info-outline").pack(side=tk.LEFT)
def _create_key_list_section(self, parent):
"""Create the key list section with inline editing and rearrangement controls"""
list_frame = tk.LabelFrame(parent, text="API Keys", padx=15, pady=15)
list_frame.pack(fill=tk.BOTH, expand=True)
# Add primary key indicator frame at the top
primary_frame = tk.Frame(list_frame, bg='#FF8C00', relief=tk.RAISED, bd=2)
primary_frame.pack(fill=tk.X, pady=(0, 10))
self.primary_key_label = tk.Label(primary_frame,
text="⭐ PRIMARY KEY: Position #1 will be used first in rotation ⭐",
bg='#FF8C00', fg='white',
font=('TkDefaultFont', 11, 'bold'),
pady=5)
self.primary_key_label.pack(fill=tk.X)
# Main container with treeview and controls
main_container = tk.Frame(list_frame)
main_container.pack(fill=tk.BOTH, expand=True)
# Left side: Move buttons
move_frame = tk.Frame(main_container)
move_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5))
tk.Label(move_frame, text="Reorder", font=('TkDefaultFont', 9, 'bold')).pack(pady=(0, 5))
# Move to top button
tb.Button(move_frame, text="⬆⬆", width=3,
command=lambda: self._move_key('top'),
bootstyle="secondary-outline").pack(pady=2)
# Move up button
tb.Button(move_frame, text="⬆", width=3,
command=lambda: self._move_key('up'),
bootstyle="secondary-outline").pack(pady=2)
# Move down button
tb.Button(move_frame, text="⬇", width=3,
command=lambda: self._move_key('down'),
bootstyle="secondary-outline").pack(pady=2)
# Move to bottom button
tb.Button(move_frame, text="⬇⬇", width=3,
command=lambda: self._move_key('bottom'),
bootstyle="secondary-outline").pack(pady=2)
# Spacer
tk.Frame(move_frame).pack(pady=10)
# Position label
self.position_label = tk.Label(move_frame, text="", font=('TkDefaultFont', 9), fg='gray')
self.position_label.pack()
# Right side: Treeview with scrollbar
tree_frame = tk.Frame(main_container)
tree_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# Scrollbar
scrollbar = ttk.Scrollbar(tree_frame, orient=tk.VERTICAL)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Treeview
columns = ('Model', 'Cooldown', 'Status', 'Success', 'Errors', 'Times Used')
self.tree = ttk.Treeview(tree_frame, columns=columns, show='tree headings',
yscrollcommand=scrollbar.set, height=10)
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.config(command=self.tree.yview)
# Configure columns with better widths and anchoring
self.tree.heading('#0', text='API Key', anchor='w')
self.tree.column('#0', width=180, minwidth=150, anchor='w')
self.tree.heading('Model', text='Model', anchor='w')
self.tree.column('Model', width=260, minwidth=160, anchor='w')
self.tree.heading('Cooldown', text='Cooldown', anchor='center')
self.tree.column('Cooldown', width=80, minwidth=60, anchor='center')
self.tree.heading('Status', text='Status', anchor='center')
self.tree.column('Status', width=160, minwidth=100, anchor='center')
self.tree.heading('Success', text='✓', anchor='center')
self.tree.column('Success', width=40, minwidth=30, anchor='center')
self.tree.heading('Errors', text='✗', anchor='center')
self.tree.column('Errors', width=40, minwidth=30, anchor='center')
self.tree.heading('Times Used', text='Times Used', anchor='center')
self.tree.column('Times Used', width=90, minwidth=60, anchor='center')
# Configure tree style for better appearance
style = ttk.Style()
style.configure("Treeview.Heading", font=('TkDefaultFont', 11, 'bold'))
# Bind events for inline editing
self.tree.bind('<Button-1>', self._on_click)
self.tree.bind('<Button-3>', self._show_context_menu)
self.tree.bind('<<TreeviewSelect>>', self._on_selection_change)
# Enable drag and drop
self.tree.bind('<Button-1>', self._on_drag_start, add='+')
self.tree.bind('<B1-Motion>', self._on_drag_motion)
self.tree.bind('<ButtonRelease-1>', self._on_drag_release)
# Track editing state
self.edit_widget = None
# Track drag state
self.drag_start_item = None
self.drag_start_y = None
# Action buttons
action_frame = tk.Frame(list_frame)
action_frame.pack(fill=tk.X, pady=(10, 0))
tb.Button(action_frame, text="Test Selected", command=self._test_selected,
bootstyle="warning").pack(side=tk.LEFT, padx=(0, 5))
tb.Button(action_frame, text="Test All", command=self._test_all,
bootstyle="warning").pack(side=tk.LEFT, padx=5)
tb.Button(action_frame, text="Enable Selected", command=self._enable_selected,
bootstyle="success").pack(side=tk.LEFT, padx=5)
tb.Button(action_frame, text="Disable Selected", command=self._disable_selected,
bootstyle="danger").pack(side=tk.LEFT, padx=5)
tb.Button(action_frame, text="Remove Selected", command=self._remove_selected,
bootstyle="danger").pack(side=tk.LEFT, padx=5)
# Stats label
self.stats_label = tk.Label(action_frame, text="", font=('TkDefaultFont', 11), fg='gray')
self.stats_label.pack(side=tk.RIGHT)
def _create_add_key_section(self, parent):
"""Create the add key section with Google credentials and Azure endpoint support"""
add_frame = tk.LabelFrame(parent, text="Add New API Key", padx=15, pady=15)
add_frame.pack(fill=tk.X, pady=(0, 10))
# Grid configuration - expand for more columns
add_frame.columnconfigure(1, weight=1)
add_frame.columnconfigure(4, weight=1)
# Don't give weight to column 3 to keep labels close to fields
# Row 0: API Key and Model
tk.Label(add_frame, text="API Key:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10), pady=5)
self.api_key_var = tk.StringVar()
self.api_key_entry = tb.Entry(add_frame, textvariable=self.api_key_var, show='*')
self.api_key_entry.grid(row=0, column=1, sticky=tk.EW, pady=5)
# Toggle visibility button
self.show_key_btn = tb.Button(add_frame, text="👁", width=3,
command=self._toggle_key_visibility)
self.show_key_btn.grid(row=0, column=2, padx=5, pady=5)
# Model
tk.Label(add_frame, text="Model:").grid(row=0, column=3, sticky=tk.W, padx=(20, 10), pady=5)
self.model_var = tk.StringVar()
add_models = get_model_options()
self.model_combo = tb.Combobox(add_frame, textvariable=self.model_var, values=add_models, state='normal')
self.model_combo.grid(row=0, column=4, sticky=tk.EW, pady=5)
# Block mouse wheel on combobox
try:
if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'):
self.translator_gui.ui.disable_spinbox_mousewheel(self.model_combo)
except Exception:
pass
# Attach gentle autofill
self._attach_model_autofill(self.model_combo, self.model_var)
# Row 1: Cooldown and optional credentials
tk.Label(add_frame, text="Cooldown (s):").grid(row=1, column=0, sticky=tk.W, padx=(0, 10), pady=5)
self.cooldown_var = tk.IntVar(value=60)
cooldown_frame = tk.Frame(add_frame)
cooldown_frame.grid(row=1, column=1, sticky=tk.W, pady=5)
cooldown_spinbox = tb.Spinbox(cooldown_frame, from_=10, to=3600, textvariable=self.cooldown_var,
width=10)
cooldown_spinbox.pack(side=tk.LEFT)
# Disable mouse wheel for cooldown
try:
if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'):
self.translator_gui.ui.disable_spinbox_mousewheel(cooldown_spinbox)
except Exception:
pass
tk.Label(cooldown_frame, text="(10-3600)", font=('TkDefaultFont', 9),
fg='gray').pack(side=tk.LEFT, padx=(10, 0))
# Add button and Copy Current Key button
button_frame = tk.Frame(add_frame)
button_frame.grid(row=1, column=4, sticky=tk.E, pady=5)
tb.Button(button_frame, text="Add Key", command=self._add_key,
bootstyle="success").pack(side=tk.LEFT, padx=(0, 5))
tb.Button(button_frame, text="Copy Current Key",
command=self._copy_current_settings,
bootstyle="info-outline").pack(side=tk.LEFT)
# Row 2: Google Credentials (optional, discretely styled)
tk.Label(add_frame, text="Google Creds:", font=('TkDefaultFont', 9),
fg='gray').grid(row=2, column=0, sticky=tk.W, padx=(0, 10), pady=2)
self.google_creds_var = tk.StringVar()
self.google_creds_entry = tb.Entry(add_frame, textvariable=self.google_creds_var,
font=('TkDefaultFont', 8), state='normal')
self.google_creds_entry.grid(row=2, column=1, sticky=tk.EW, pady=2)
# Google credentials browse button (moved closer)
tb.Button(add_frame, text="📁", width=3,
command=self._browse_google_credentials,
bootstyle="secondary-outline").grid(row=2, column=2, padx=(5, 0), pady=2)
# Google region field
tk.Label(add_frame, text="Region:", font=('TkDefaultFont', 11),
fg='gray').grid(row=2, column=3, sticky=tk.W, padx=(10, 5), pady=2)
self.google_region_var = tk.StringVar(value='us-east5') # Default region
self.google_region_entry = tb.Entry(add_frame, textvariable=self.google_region_var,
font=('TkDefaultFont', 8), state='normal', width=12)
self.google_region_entry.grid(row=2, column=4, sticky=tk.W, pady=2)
# Row 3: Individual Endpoint Toggle
self.use_individual_endpoint_var = tk.BooleanVar(value=False)
individual_endpoint_toggle = tb.Checkbutton(add_frame, text="Use Individual Endpoint",
variable=self.use_individual_endpoint_var,
bootstyle="round-toggle",
command=self._toggle_individual_endpoint_fields)
individual_endpoint_toggle.grid(row=3, column=0, columnspan=2, sticky=tk.W, padx=(0, 10), pady=5)
# Row 4: Individual Endpoint (initially hidden)
self.individual_endpoint_label = tk.Label(add_frame, text="Individual Endpoint:", font=('TkDefaultFont', 9),
fg='gray')
self.individual_endpoint_label.grid(row=4, column=0, sticky=tk.W, padx=(0, 10), pady=2)
self.azure_endpoint_var = tk.StringVar()
self.azure_endpoint_entry = tb.Entry(add_frame, textvariable=self.azure_endpoint_var,
font=('TkDefaultFont', 8), state='disabled')
self.azure_endpoint_entry.grid(row=4, column=1, columnspan=2, sticky=tk.EW, pady=2)
# Individual Endpoint API Version (small dropdown, initially hidden)
self.individual_api_version_label = tk.Label(add_frame, text="API Ver:", font=('TkDefaultFont', 11),
fg='gray')
self.individual_api_version_label.grid(row=4, column=3, sticky=tk.W, padx=(10, 5), pady=2)
self.azure_api_version_var = tk.StringVar(value='2025-01-01-preview')
azure_versions = [
'2025-01-01-preview',
'2024-12-01-preview',
'2024-10-01-preview',
'2024-08-01-preview',
'2024-06-01',
'2024-02-01',
'2023-12-01-preview'
]
self.azure_api_version_combo = ttk.Combobox(add_frame, textvariable=self.azure_api_version_var,
values=azure_versions, width=18, state='disabled',
font=('TkDefaultFont', 7))
self.azure_api_version_combo.grid(row=4, column=4, sticky=tk.W, pady=2)
# Block mouse wheel on version combobox
try:
if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'):
self.translator_gui.ui.disable_spinbox_mousewheel(self.azure_api_version_combo)
except Exception:
pass
# Initially hide the endpoint fields
self._toggle_individual_endpoint_fields()
# Setup inline editor hooks to use model options as a dropdown too
# (Optional enhancement could be added later)
# Row 5: (Copy Current Key button moved up next to Add Key)
def _toggle_individual_endpoint_fields(self):
"""Toggle visibility and state of individual endpoint fields"""
enabled = self.use_individual_endpoint_var.get()
if enabled:
# Show and enable endpoint fields
state = tk.NORMAL
self.individual_endpoint_label.grid()
self.azure_endpoint_entry.grid()
self.individual_api_version_label.grid()
self.azure_api_version_combo.grid()
self.azure_endpoint_entry.config(state=state)
self.azure_api_version_combo.config(state='readonly')
else:
# Hide and disable endpoint fields
state = tk.DISABLED
self.individual_endpoint_label.grid_remove()
self.azure_endpoint_entry.grid_remove()
self.individual_api_version_label.grid_remove()
self.azure_api_version_combo.grid_remove()
# Clear the fields when disabled
self.azure_endpoint_var.set("")
self.azure_api_version_var.set('2025-01-01-preview')
def _move_key(self, direction):
"""Move selected key in the specified direction"""
selected = self.tree.selection()
if not selected or len(selected) != 1:
return
item = selected[0]
index = self.tree.index(item)
if index >= len(self.key_pool.keys):
return
new_index = index
if direction == 'up' and index > 0:
new_index = index - 1
elif direction == 'down' and index < len(self.key_pool.keys) - 1:
new_index = index + 1
elif direction == 'top':
new_index = 0
elif direction == 'bottom':
new_index = len(self.key_pool.keys) - 1
if new_index != index:
# Swap keys in the pool
with self.key_pool.lock:
self.key_pool.keys[index], self.key_pool.keys[new_index] = \
self.key_pool.keys[new_index], self.key_pool.keys[index]
# Refresh display
self._refresh_key_list()
# Reselect the moved item
items = self.tree.get_children()
if new_index < len(items):
self.tree.selection_set(items[new_index])
self.tree.focus(items[new_index])
self.tree.see(items[new_index])
# Show status
self._show_status(f"Moved key to position {new_index + 1}")
def _on_selection_change(self, event):
"""Update position label when selection changes"""
selected = self.tree.selection()
if selected:
index = self.tree.index(selected[0])
total = len(self.key_pool.keys)
self.position_label.config(text=f"#{index + 1}/{total}")
else:
self.position_label.config(text="")
def _on_drag_start(self, event):
"""Start drag operation"""
# Check if we clicked on an item
item = self.tree.identify_row(event.y)
if item:
self.drag_start_item = item
self.drag_start_y = event.y
# Select the item being dragged
self.tree.selection_set(item)
# Set cursor
self.tree.config(cursor="hand2")
def _on_drag_motion(self, event):
"""Handle drag motion"""
if not self.drag_start_item:
return
# Get the item under the cursor
target_item = self.tree.identify_row(event.y)
if target_item and target_item != self.drag_start_item:
# Visual feedback - change cursor
self.tree.config(cursor="sb_v_double_arrow")
def _on_drag_release(self, event):
"""Complete drag operation"""
if not self.drag_start_item:
return
# Reset cursor
self.tree.config(cursor="")
# Get the target item
target_item = self.tree.identify_row(event.y)
if target_item and target_item != self.drag_start_item:
# Get indices
source_index = self.tree.index(self.drag_start_item)
target_index = self.tree.index(target_item)
# Reorder the keys in the pool
with self.key_pool.lock:
# Remove from source position
key = self.key_pool.keys.pop(source_index)
# Insert at target position
self.key_pool.keys.insert(target_index, key)
# Refresh display
self._refresh_key_list()
# Reselect the moved item
items = self.tree.get_children()
if target_index < len(items):
self.tree.selection_set(items[target_index])
self.tree.focus(items[target_index])
self.tree.see(items[target_index])
# Show status
self._show_status(f"Moved key from position {source_index + 1} to {target_index + 1}")
# Reset drag state
self.drag_start_item = None
self.drag_start_y = None
def _refresh_key_list(self):
"""Refresh the key list display preserving test results and highlighting key #1"""
# Clear tree
for item in self.tree.get_children():
self.tree.delete(item)
# Update primary key label if it exists
if hasattr(self, 'primary_key_label'):
keys = self.key_pool.get_all_keys()
if keys:
first_key = keys[0]
masked = first_key.api_key[:8] + "..." + first_key.api_key[-4:] if len(first_key.api_key) > 12 else first_key.api_key
self.primary_key_label.config(text=f"⭐ PRIMARY KEY: {first_key.model} ({masked}) ⭐")
# Add keys
keys = self.key_pool.get_all_keys()
for i, key in enumerate(keys):
# Mask API key for display
masked_key = key.api_key[:8] + "..." + key.api_key[-4:] if len(key.api_key) > 12 else key.api_key
# Position indicator
position = f"#{i+1}"
if i == 0:
position = "⭐ #1"
# Determine status based on test results and current state
if key.last_test_result is None and hasattr(key, '_testing'):
status = "⏳ Testing..."
tags = ('testing',)
elif not key.enabled:
status = "Disabled"
tags = ('disabled',)
elif key.last_test_result == 'passed':
status = "✅ Passed"
tags = ('passed',)
elif key.last_test_result == 'failed':
status = "❌ Failed"
tags = ('failed',)
elif key.last_test_result == 'rate_limited':
status = "⚠️ Rate Limited"
tags = ('ratelimited',)
elif key.last_test_result == 'error':
status = "❌ Error"
if key.last_test_message:
status += f": {key.last_test_message[:20]}..."
tags = ('error',)
elif key.is_cooling_down and key.last_error_time:
remaining = int(key.cooldown - (time.time() - key.last_error_time))
if remaining > 0:
status = f"Cooling ({remaining}s)"
tags = ('cooling',)
else:
key.is_cooling_down = False
status = "Active"
tags = ('active',)
else:
status = "Active"
tags = ('active',)
# Times used (counter)
times_used = getattr(key, 'times_used', key.success_count + key.error_count)
# Insert into tree with position column
self.tree.insert('', 'end',
text=masked_key,
values=(position, key.model, f"{key.cooldown}s", status,
key.success_count, key.error_count, times_used),
tags=tags)
# Configure tags (these may or may not work depending on ttkbootstrap theme)
self.tree.tag_configure('active', foreground='green')
self.tree.tag_configure('cooling', foreground='orange')
self.tree.tag_configure('disabled', foreground='gray')
self.tree.tag_configure('testing', foreground='blue')
self.tree.tag_configure('passed', foreground='dark green')
self.tree.tag_configure('failed', foreground='red')
self.tree.tag_configure('ratelimited', foreground='orange')
self.tree.tag_configure('error', foreground='dark red')
# Update stats
active_count = sum(1 for k in keys if k.enabled and not k.is_cooling_down)
total_count = len(keys)
passed_count = sum(1 for k in keys if k.last_test_result == 'passed')
self.stats_label.config(text=f"Keys: {active_count} active / {total_count} total | {passed_count} passed tests")
def _on_click(self, event):
"""Handle click on tree item for inline editing"""
# Close any existing edit widget
if self.edit_widget:
self.edit_widget.destroy()
self.edit_widget = None
# Identify what was clicked
region = self.tree.identify_region(event.x, event.y)
if region != "cell":
return
item = self.tree.identify_row(event.y)
column = self.tree.identify_column(event.x)
if not item:
return
# Get column index
col_index = int(column.replace('#', ''))
# Get the key index
index = self.tree.index(item)
if index >= len(self.key_pool.keys):
return
key = self.key_pool.keys[index]
# Only allow editing Model (column #1) and Cooldown (column #2)
if col_index == 1: # Model column
self._edit_model_inline(item, column, key)
elif col_index == 2: # Cooldown column
self._edit_cooldown_inline(item, column, key)
def _edit_model_inline(self, item, column, key):
"""Create inline editor for model name"""
# Get the bounding box of the cell
x, y, width, height = self.tree.bbox(item, column)
# Expand the width to show more text (make it wider than the column)
expanded_width = max(width + 100, 250) # At least 250 pixels wide
expanded_height = height + 8 # Add some padding to the height
# Create entry widget
edit_var = tk.StringVar(value=key.model)
self.edit_widget = tb.Entry(self.tree, textvariable=edit_var, font=('TkDefaultFont', 11))
def save_edit():
new_value = edit_var.get().strip()
if new_value and new_value != key.model:
key.model = new_value
self._refresh_key_list()
self._show_status(f"Updated model to: {new_value}")
if self.edit_widget:
self.edit_widget.destroy()
self.edit_widget = None
def cancel_edit(event=None):
if self.edit_widget:
self.edit_widget.destroy()
self.edit_widget = None
# Place and configure the entry with expanded dimensions
# Adjust y position slightly to center it better
self.edit_widget.place(x=x, y=y-2, width=expanded_width, height=expanded_height)
self.edit_widget.focus()
self.edit_widget.select_range(0, tk.END)
# Make sure the widget appears on top
self.edit_widget.lift()
# Bind events
self.edit_widget.bind('<Return>', lambda e: save_edit())
self.edit_widget.bind('<Escape>', cancel_edit)
self.edit_widget.bind('<FocusOut>', lambda e: save_edit())
# Prevent the click from selecting the item
return "break"
def _edit_cooldown_inline(self, item, column, key):
"""Create inline editor for cooldown"""
# Get the bounding box of the cell
x, y, width, height = self.tree.bbox(item, column)
# Create spinbox widget
edit_var = tk.IntVar(value=key.cooldown)
self.edit_widget = tb.Spinbox(self.tree, from_=10, to=3600,
textvariable=edit_var, width=10)
# Disable mouse wheel changing values on inline editor
try:
if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'):
self.translator_gui.ui.disable_spinbox_mousewheel(self.edit_widget)
except Exception:
pass
def save_edit():
new_value = edit_var.get()
if new_value != key.cooldown:
key.cooldown = new_value
self._refresh_key_list()
self._show_status(f"Updated cooldown to: {new_value}s")
if self.edit_widget:
self.edit_widget.destroy()
self.edit_widget = None
def cancel_edit(event=None):
if self.edit_widget:
self.edit_widget.destroy()
self.edit_widget = None
# Place and configure the spinbox
self.edit_widget.place(x=x, y=y, width=width, height=height)
self.edit_widget.focus()
# Bind events
self.edit_widget.bind('<Return>', lambda e: save_edit())
self.edit_widget.bind('<Escape>', cancel_edit)
self.edit_widget.bind('<FocusOut>', lambda e: save_edit())
# Prevent the click from selecting the item
return "break"
def _show_context_menu(self, event):
"""Show context menu with reorder options"""
# Select item under cursor
item = self.tree.identify_row(event.y)
if item:
# If the clicked item is not in selection, select only it
if item not in self.tree.selection():
self.tree.selection_set(item)
# Create context menu
menu = tk.Menu(self.dialog, tearoff=0)
# Reorder submenu
reorder_menu = tk.Menu(menu, tearoff=0)
reorder_menu.add_command(label="Move to Top", command=lambda: self._move_key('top'))
reorder_menu.add_command(label="Move Up", command=lambda: self._move_key('up'))
reorder_menu.add_command(label="Move Down", command=lambda: self._move_key('down'))
reorder_menu.add_command(label="Move to Bottom", command=lambda: self._move_key('bottom'))
menu.add_cascade(label="Reorder", menu=reorder_menu)
menu.add_separator()
# Add change model option
selected_count = len(self.tree.selection())
if selected_count > 1:
menu.add_command(label=f"Change Model ({selected_count} selected)",
command=self._change_model_for_selected)
else:
menu.add_command(label="Change Model",
command=self._change_model_for_selected)
menu.add_separator()
# Individual Endpoint options
index = self.tree.index(item)
if index < len(self.key_pool.keys):
key = self.key_pool.keys[index]
endpoint_enabled = getattr(key, 'use_individual_endpoint', False)
endpoint_url = getattr(key, 'azure_endpoint', '')
if endpoint_enabled and endpoint_url:
menu.add_command(label="✅ Individual Endpoint",
command=lambda: self._configure_individual_endpoint(index))
menu.add_command(label="Disable Individual Endpoint",
command=lambda: self._toggle_individual_endpoint(index, False))
else:
menu.add_command(label="🔧 Configure Individual Endpoint",
command=lambda: self._configure_individual_endpoint(index))
menu.add_separator()
menu.add_command(label="Test", command=self._test_selected)
menu.add_command(label="Enable", command=self._enable_selected)
menu.add_command(label="Disable", command=self._disable_selected)
menu.add_separator()
menu.add_command(label="Remove", command=self._remove_selected)
# Show menu
menu.post(event.x_root, event.y_root)
def _change_model_for_selected(self):
"""Change model for all selected entries"""
selected = self.tree.selection()
if not selected:
return
# Create simple dialog
dialog = tk.Toplevel(self.dialog)
dialog.title(f"Change Model for {len(selected)} Keys")
dialog.geometry("400x130")
dialog.transient(self.dialog)
# Set icon
self._set_icon(dialog)
# Center the dialog
dialog.update_idletasks()
x = (dialog.winfo_screenwidth() // 2) - (dialog.winfo_width() // 2)
y = (dialog.winfo_screenheight() // 2) - (dialog.winfo_height() // 2)
dialog.geometry(f"+{x}+{y}")
# Main frame
main_frame = tk.Frame(dialog, padx=20, pady=20)
main_frame.pack(fill=tk.BOTH, expand=True)
# Label
tk.Label(main_frame, text="Enter new model name (press Enter to apply):",
font=('TkDefaultFont', 10)).pack(pady=(0, 10))
# Model entry with dropdown
model_var = tk.StringVar()
# Full model list (same as main GUI)
all_models = get_model_options()
model_combo = ttk.Combobox(main_frame, values=all_models,
textvariable=model_var, width=45, height=12)
model_combo.pack(pady=(0, 10))
# Block mouse wheel on combobox
try:
if hasattr(self.translator_gui, 'ui') and hasattr(self.translator_gui.ui, 'disable_spinbox_mousewheel'):
self.translator_gui.ui.disable_spinbox_mousewheel(model_combo)
except Exception:
pass
# Attach gentle autofill
self._attach_model_autofill(model_combo, model_var)
# Get current model from first selected item as default
selected_indices = [self.tree.index(item) for item in selected]
if selected_indices and selected_indices[0] < len(self.key_pool.keys):
current_model = self.key_pool.keys[selected_indices[0]].model
model_var.set(current_model)
model_combo.select_range(0, tk.END) # Select all text for easy replacement
def apply_change(event=None):
new_model = model_var.get().strip()
if new_model:
# Update all selected keys
for item in selected:
index = self.tree.index(item)
if index < len(self.key_pool.keys):
self.key_pool.keys[index].model = new_model
# Refresh the display
self._refresh_key_list()
# Show status
self._show_status(f"Changed model to '{new_model}' for {len(selected)} keys")
dialog.destroy()
# Focus on the combobox
model_combo.focus()
# Bind Enter key to apply
dialog.bind('<Return>', apply_change)
model_combo.bind('<Return>', apply_change)
dialog.bind('<Escape>', lambda e: dialog.destroy())
def _configure_individual_endpoint(self, key_index):
"""Configure individual endpoint for a specific key"""
if key_index >= len(self.key_pool.keys):
return
key = self.key_pool.keys[key_index]
# Create individual endpoint dialog using the class
if IndividualEndpointDialog is None:
messagebox.showerror("Error", "IndividualEndpointDialog is not available.")
return
IndividualEndpointDialog(self.dialog, self.translator_gui, key, self._refresh_key_list, self._show_status)
def _toggle_endpoint_fields(self, enable_var, endpoint_entry, version_combo):
"""Toggle endpoint configuration fields based on enable state"""
if enable_var.get():
endpoint_entry.config(state='normal')
version_combo.config(state='readonly')
else:
endpoint_entry.config(state='disabled')
version_combo.config(state='disabled')
def _toggle_individual_endpoint(self, key_index, enabled):
"""Quick toggle individual endpoint on/off"""
if key_index >= len(self.key_pool.keys):
return
key = self.key_pool.keys[key_index]
key.use_individual_endpoint = enabled
# Refresh display
self._refresh_key_list()
# Show status
status = "enabled" if enabled else "disabled"
self._show_status(f"Individual endpoint {status} for {key.model}")
# Additional helper method to swap keys programmatically
def swap_keys(self, index1: int, index2: int):
"""Swap two keys by their indices"""
with self.key_pool.lock:
if 0 <= index1 < len(self.key_pool.keys) and 0 <= index2 < len(self.key_pool.keys):
self.key_pool.keys[index1], self.key_pool.keys[index2] = \
self.key_pool.keys[index2], self.key_pool.keys[index1]
self._refresh_key_list()
return True
return False
# Method to move a key to a specific position
def move_key_to_position(self, from_index: int, to_index: int):
"""Move a key from one position to another"""
with self.key_pool.lock:
if 0 <= from_index < len(self.key_pool.keys) and 0 <= to_index < len(self.key_pool.keys):
key = self.key_pool.keys.pop(from_index)
self.key_pool.keys.insert(to_index, key)
self._refresh_key_list()
return True
return False
def _create_button_bar(self, parent):
"""Create the bottom button bar"""
button_frame = tk.Frame(parent)
button_frame.pack(fill=tk.X, pady=(20, 0))
# Save button
tb.Button(button_frame, text="Save & Close", command=self._save_and_close,
bootstyle="success").pack(side=tk.RIGHT, padx=(5, 0))
# Cancel button
tb.Button(button_frame, text="Cancel", command=self._on_close,
bootstyle="secondary").pack(side=tk.RIGHT)
# Import/Export
tb.Button(button_frame, text="Import", command=self._import_keys,
bootstyle="info-outline").pack(side=tk.LEFT, padx=(0, 5))
tb.Button(button_frame, text="Export", command=self._export_keys,
bootstyle="info-outline").pack(side=tk.LEFT)
def _browse_google_credentials(self):
"""Browse for Google Cloud credentials JSON file"""
filename = filedialog.askopenfilename(
title="Select Google Cloud Credentials JSON",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
)
if filename:
try:
# Validate it's a valid Google Cloud credentials file
with open(filename, 'r') as f:
creds_data = json.load(f)
if 'type' in creds_data and 'project_id' in creds_data:
self.google_creds_var.set(filename)
self._show_status(f"Selected Google credentials: {os.path.basename(filename)}")
else:
messagebox.showerror(
"Error",
"Invalid Google Cloud credentials file. Please select a valid service account JSON file."
)
except Exception as e:
messagebox.showerror("Error", f"Failed to load credentials: {str(e)}")
def _browse_fallback_google_credentials(self):
"""Browse for Google Cloud credentials JSON file for fallback keys"""
filename = filedialog.askopenfilename(
title="Select Google Cloud Credentials JSON for Fallback",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
)
if filename:
try:
# Validate it's a valid Google Cloud credentials file
with open(filename, 'r') as f:
creds_data = json.load(f)
if 'type' in creds_data and 'project_id' in creds_data:
self.fallback_google_creds_var.set(filename)
self._show_status(f"Selected fallback Google credentials: {os.path.basename(filename)}")
else:
messagebox.showerror(
"Error",
"Invalid Google Cloud credentials file. Please select a valid service account JSON file."
)
except Exception as e:
messagebox.showerror("Error", f"Failed to load credentials: {str(e)}")
def _attach_model_autofill(self, combo: ttk.Combobox, var: tk.StringVar, on_change=None):
"""Attach the same gentle autofill/scroll behavior as the main GUI.
- No filtering; keeps full list intact.
- Gentle autofill only when appending at end; Backspace/Delete respected.
- Scroll/highlight match if dropdown is open.
"""
import tkinter as _tk
import logging as _logging
# Store full values list on the widget
try:
combo._all_values = list(combo['values'])
except Exception:
combo._all_values = []
combo._prev_text = var.get() if var else combo.get()
def _scroll_to_value(_combo: ttk.Combobox, value: str):
try:
values = getattr(_combo, '_all_values', []) or list(_combo['values'])
if value not in values:
return
index = values.index(value)
popdown = _combo.tk.eval(f'ttk::combobox::PopdownWindow {_combo._w}')
listbox = f'{popdown}.f.l'
tkobj = _combo.tk
tkobj.call(listbox, 'see', index)
tkobj.call(listbox, 'selection', 'clear', 0, 'end')
tkobj.call(listbox, 'selection', 'set', index)
tkobj.call(listbox, 'activate', index)
except Exception:
pass
def _on_keyrelease(event=None):
try:
typed = combo.get()
prev = getattr(combo, '_prev_text', '')
keysym = (getattr(event, 'keysym', '') or '').lower()
if keysym in {'up', 'down', 'left', 'right', 'return', 'escape', 'tab'}:
return
source = getattr(combo, '_all_values', []) or list(combo['values'])
first_match = None
if typed:
lowered = typed.lower()
pref = [v for v in source if v.lower().startswith(lowered)]
cont = [v for v in source if lowered in v.lower()] if not pref else []
if pref:
first_match = pref[0]
elif cont:
first_match = cont[0]
grew = len(typed) > len(prev) and typed.startswith(prev)
is_del = keysym in {'backspace', 'delete'} or len(typed) < len(prev)
try:
at_end = combo.index(_tk.INSERT) == len(typed)
except Exception:
at_end = True
try:
has_sel = combo.selection_present()
except Exception:
has_sel = False
# Gentle autofill
if first_match and grew and at_end and not has_sel and not is_del:
if first_match.lower().startswith(typed.lower()) and first_match != typed:
combo.set(first_match)
try:
combo.icursor(len(typed))
combo.selection_range(len(typed), len(first_match))
except Exception:
pass
# If dropdown is open, scroll/highlight (no auto-open)
if first_match:
_scroll_to_value(combo, first_match)
combo._prev_text = typed
if on_change and typed != prev:
on_change()
except Exception as e:
try:
_logging.debug(f"Combobox autocomplete error: {e}")
except Exception:
pass
combo.bind('<KeyRelease>', _on_keyrelease)
def _on_return(event=None):
try:
typed = combo.get()
source = getattr(combo, '_all_values', []) or list(combo['values'])
match = None
if typed:
lowered = typed.lower()
pref = [v for v in source if v.lower().startswith(lowered)]
cont = [v for v in source if lowered in v.lower()] if not pref else []
match = pref[0] if pref else (cont[0] if cont else None)
if match and match != typed:
combo.set(match)
# Place caret at end and clear selection
try:
combo.icursor('end')
try:
combo.selection_clear()
except Exception:
combo.selection_range(0, 0)
except Exception:
pass
combo._prev_text = combo.get()
if on_change:
on_change()
except Exception as e:
try:
_logging.debug(f"Combobox enter-commit error: {e}")
except Exception:
pass
# Do not return "break" so outer dialogs bound to <Return> still fire
return None
combo.bind('<Return>', _on_return)
combo.bind('<<ComboboxSelected>>', lambda e: on_change() if on_change else None)
combo.bind('<FocusOut>', lambda e: on_change() if on_change else None)
def _toggle_key_visibility(self):
"""Toggle API key visibility"""
if self.api_key_entry.cget('show') == '*':
self.api_key_entry.config(show='')
self.show_key_btn.config(text='🔒')
else:
self.api_key_entry.config(show='*')
self.show_key_btn.config(text='👁')
def _toggle_multi_key_mode(self):
"""Toggle multi-key mode"""
enabled = self.enabled_var.get()
self.translator_gui.config['use_multi_api_keys'] = enabled
# Save the config immediately
self.translator_gui.save_config(show_message=False)
# Fallback section is always visible now (works in both modes)
# No need to show/hide fallback section based on multi-key mode
# Update other UI elements
for widget in [self.api_key_entry, self.model_combo]:
if widget:
widget.config(state=tk.NORMAL if enabled else tk.DISABLED)
# Handle Treeview separately - it doesn't support state property
if self.tree:
if enabled:
# Re-enable tree interactions by restoring original bindings
self.tree.bind('<Button-1>', self._on_click)
self.tree.bind('<Button-3>', self._show_context_menu)
self.tree.bind('<<TreeviewSelect>>', self._on_selection_change)
# Re-enable drag and drop
self.tree.bind('<Button-1>', self._on_drag_start, add='+')
self.tree.bind('<B1-Motion>', self._on_drag_motion)
self.tree.bind('<ButtonRelease-1>', self._on_drag_release)
else:
# Disable tree interactions
self.tree.unbind('<Button-1>')
self.tree.unbind('<Button-3>')
self.tree.unbind('<<TreeviewSelect>>')
self.tree.unbind('<B1-Motion>')
self.tree.unbind('<ButtonRelease-1>')
# Update action buttons state
for child in self.dialog.winfo_children():
if isinstance(child, tk.Frame):
for subchild in child.winfo_children():
if isinstance(subchild, tk.Frame):
for button in subchild.winfo_children():
if isinstance(button, (tb.Button, ttk.Button)) and button.cget('text') in [
'Test Selected', 'Test All', 'Enable Selected',
'Disable Selected', 'Remove Selected', 'Add Key'
]:
button.config(state=tk.NORMAL if enabled else tk.DISABLED)
def _copy_current_settings(self):
"""Copy current API key and model from main GUI"""
if hasattr(self.translator_gui, 'api_key_var'):
self.api_key_var.set(self.translator_gui.api_key_var.get())
if hasattr(self.translator_gui, 'model_var'):
self.model_var.set(self.translator_gui.model_var.get())
def _add_key(self):
"""Add a new API key with optional Google credentials and individual endpoint"""
api_key = self.api_key_var.get().strip()
model = self.model_var.get().strip()
cooldown = self.cooldown_var.get()
google_credentials = self.google_creds_var.get().strip() or None
google_region = self.google_region_var.get().strip() or None
# Only use individual endpoint if toggle is enabled
use_individual_endpoint = self.use_individual_endpoint_var.get()
azure_endpoint = self.azure_endpoint_var.get().strip() if use_individual_endpoint else None
azure_api_version = self.azure_api_version_var.get().strip() if use_individual_endpoint else None
if not api_key or not model:
messagebox.showerror("Error", "Please enter both API key and model name")
return
# Add to pool with new fields
key_entry = APIKeyEntry(api_key, model, cooldown, enabled=True,
google_credentials=google_credentials,
azure_endpoint=azure_endpoint,
google_region=google_region,
azure_api_version=azure_api_version,
use_individual_endpoint=use_individual_endpoint)
self.key_pool.add_key(key_entry)
# Clear inputs
self.api_key_var.set("")
self.model_var.set("")
self.cooldown_var.set(60)
self.google_creds_var.set("")
self.azure_endpoint_var.set("")
self.google_region_var.set("us-east5")
self.azure_api_version_var.set('2025-01-01-preview')
self.use_individual_endpoint_var.set(False)
# Update the UI to hide endpoint fields
self._toggle_individual_endpoint_fields()
# Refresh list
self._refresh_key_list()
# Show success
extras = []
if google_credentials:
extras.append(f"Google: {os.path.basename(google_credentials)}")
if azure_endpoint:
extras.append(f"Azure: {azure_endpoint[:30]}...")
extra_info = f" ({', '.join(extras)})" if extras else ""
self._show_status(f"Added key for model: {model}{extra_info}")
def _refresh_key_list(self):
"""Refresh the key list display"""
# Clear tree
for item in self.tree.get_children():
self.tree.delete(item)
# Add keys
keys = self.key_pool.get_all_keys()
for i, key in enumerate(keys):
# Mask API key for display
masked_key = key.api_key[:8] + "..." + key.api_key[-4:] if len(key.api_key) > 12 else key.api_key
# Status
if not key.enabled:
status = "Disabled"
tags = ('disabled',)
elif key.is_cooling_down:
remaining = int(key.cooldown - (time.time() - key.last_error_time))
status = f"Cooling ({remaining}s)"
tags = ('cooling',)
else:
status = "Active"
tags = ('active',)
# Times used (counter)
times_used = getattr(key, 'times_used', key.success_count + key.error_count)
# Insert into tree
self.tree.insert('', 'end',
text=masked_key,
values=(key.model, f"{key.cooldown}s", status,
key.success_count, key.error_count, times_used),
tags=tags)
# Configure tags
self.tree.tag_configure('active', foreground='green')
self.tree.tag_configure('cooling', foreground='orange')
self.tree.tag_configure('disabled', foreground='gray')
# Update stats
active_count = sum(1 for k in keys if k.enabled and not k.is_cooling_down)
total_count = len(keys)
self.stats_label.config(text=f"Keys: {active_count} active / {total_count} total")
def _test_selected(self):
"""Test selected API keys with inline progress"""
selected = self.tree.selection()
if not selected:
messagebox.showwarning("Warning", "Please select keys to test")
return
# Get selected indices
indices = [self.tree.index(item) for item in selected]
# Ensure UnifiedClient uses the same shared pool instance
try:
from unified_api_client import UnifiedClient
UnifiedClient._api_key_pool = self.key_pool
except Exception:
pass
# Start testing in thread
thread = threading.Thread(target=self._run_inline_tests, args=(indices,))
thread.daemon = True
thread.start()
def _test_all(self):
"""Test all API keys with inline progress"""
if not self.key_pool.keys:
messagebox.showwarning("Warning", "No keys to test")
return
indices = list(range(len(self.key_pool.keys)))
# Start testing in thread
thread = threading.Thread(target=self._run_inline_tests, args=(indices,))
thread.daemon = True
thread.start()
def _run_inline_tests(self, indices: List[int]):
"""Run API tests with persistent inline results"""
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
print(f"[DEBUG] Starting tests for {len(indices)} keys")
# Mark all selected keys as testing
for index in indices:
if index < len(self.key_pool.keys):
key = self.key_pool.keys[index]
key.last_test_result = None
key._testing = True
print(f"[DEBUG] Marked key {index} as testing")
# Refresh once to show "Testing..." status
self.dialog.after(0, self._refresh_key_list)
# Create thread pool for parallel testing
max_workers = min(10, len(indices))
def test_single_key(index):
"""Test a single API key directly"""
print(f"[DEBUG] Testing key at index {index}")
if index >= len(self.key_pool.keys):
return None
key = self.key_pool.keys[index]
try:
# Simple test - just check if we can import the libraries
# This is a minimal test to see if the function completes
print(f"[DEBUG] Testing {key.model} with key {key.api_key[:8]}...")
# Simulate a test
import time
time.sleep(1) # Simulate API call
# For now, just mark as passed to test the flow
key.mark_success()
key.set_test_result('passed', 'Test successful')
print(f"[DEBUG] Key {index} test completed - PASSED")
time.sleep(0.5) # Brief pause to improve retry responsiveness
logger.debug("💤 Pausing briefly to improve retry responsiveness after test completion")
return (index, True, "Test passed")
except Exception as e:
print(f"[DEBUG] Key {index} test failed: {e}")
key.mark_error()
key.set_test_result('error', str(e)[:30])
return (index, False, f"Error: {str(e)[:50]}...")
# Run tests in parallel
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all test tasks
future_to_index = {executor.submit(test_single_key, i): i for i in indices}
# Process results as they complete
for future in as_completed(future_to_index):
result = future.result()
if result:
results.append(result)
print(f"[DEBUG] Got result: {result}")
print(f"[DEBUG] All tests complete. Results: {len(results)}")
# Calculate summary
success_count = sum(1 for _, success, _ in results if success)
total_count = len(results)
# Clear testing flags
for index in indices:
if index < len(self.key_pool.keys):
key = self.key_pool.keys[index]
if hasattr(key, '_testing'):
delattr(key, '_testing')
print(f"[DEBUG] Cleared testing flag for key {index}")
# Update UI in main thread
print(f"[DEBUG] Refreshing UI with results")
self.dialog.after(0, self._refresh_key_list)
self.dialog.after(0, lambda: self.stats_label.config(
text=f"Test complete: {success_count}/{total_count} passed"))
def _update_tree_item(self, index: int):
"""Update a single tree item based on current key state"""
def update():
# Find the tree item for this index
items = self.tree.get_children()
if index < len(items):
item = items[index]
key = self.key_pool.keys[index]
# Determine status and tags
if key.last_test_result is None:
# Currently testing
status = "⏳ Testing..."
tags = ('testing',)
elif not key.enabled:
status = "Disabled"
tags = ('disabled',)
elif key.last_test_result == 'passed':
if key.is_cooling_down:
remaining = int(key.cooldown - (time.time() - key.last_error_time))
status = f"✅ Passed (cooling {remaining}s)"
tags = ('passed_cooling',)
else:
status = "✅ Passed"
tags = ('passed',)
elif key.last_test_result == 'failed':
status = "❌ Failed"
tags = ('failed',)
elif key.last_test_result == 'rate_limited':
remaining = int(key.cooldown - (time.time() - key.last_error_time))
status = f"⚠️ Rate Limited ({remaining}s)"
tags = ('ratelimited',)
elif key.last_test_result == 'error':
status = "❌ Error"
if key.last_test_message:
status += f": {key.last_test_message[:20]}..."
tags = ('error',)
elif key.is_cooling_down:
remaining = int(key.cooldown - (time.time() - key.last_error_time))
status = f"Cooling ({remaining}s)"
tags = ('cooling',)
else:
status = "Active"
tags = ('active',)
# Get current values
values = list(self.tree.item(item, 'values'))
# Update status column
values[2] = status
# Update success/error counts
values[3] = key.success_count
values[4] = key.error_count
# Update times used (counter)
values[5] = getattr(key, 'times_used', key.success_count + key.error_count)
# Update the item
self.tree.item(item, values=values, tags=tags)
# Run in main thread
self.dialog.after(0, update)
def _refresh_key_list(self):
"""Refresh the key list display preserving test results"""
# Clear tree
for item in self.tree.get_children():
self.tree.delete(item)
# Add keys
keys = self.key_pool.get_all_keys()
for i, key in enumerate(keys):
# Mask API key for display
masked_key = key.api_key[:8] + "..." + key.api_key[-4:] if len(key.api_key) > 12 else key.api_key
# Determine status based on test results and current state
if key.last_test_result is None and hasattr(key, '_testing'):
# Currently testing (temporary flag)
status = "⏳ Testing..."
tags = ('testing',)
elif not key.enabled:
status = "Disabled"
tags = ('disabled',)
elif key.last_test_result == 'passed':
status = "✅ Passed"
tags = ('passed',)
elif key.last_test_result == 'failed':
status = "❌ Failed"
tags = ('failed',)
elif key.last_test_result == 'rate_limited':
status = "⚠️ Rate Limited"
tags = ('ratelimited',)
elif key.last_test_result == 'error':
status = "❌ Error"
if key.last_test_message:
status += f": {key.last_test_message[:20]}..."
tags = ('error',)
elif key.is_cooling_down and key.last_error_time:
remaining = int(key.cooldown - (time.time() - key.last_error_time))
if remaining > 0:
status = f"Cooling ({remaining}s)"
tags = ('cooling',)
else:
key.is_cooling_down = False
status = "Active"
tags = ('active',)
else:
status = "Active"
tags = ('active',)
# Times used (counter)
times_used = getattr(key, 'times_used', key.success_count + key.error_count)
# Insert into tree
self.tree.insert('', 'end',
text=masked_key,
values=(key.model, f"{key.cooldown}s", status,
key.success_count, key.error_count, times_used),
tags=tags)
# Configure tags
self.tree.tag_configure('active', foreground='green')
self.tree.tag_configure('cooling', foreground='orange')
self.tree.tag_configure('disabled', foreground='gray')
self.tree.tag_configure('testing', foreground='blue', font=('TkDefaultFont', 11))
self.tree.tag_configure('passed', foreground='dark green', font=('TkDefaultFont', 11))
self.tree.tag_configure('failed', foreground='red')
self.tree.tag_configure('ratelimited', foreground='orange')
self.tree.tag_configure('error', foreground='dark red')
# Update stats
active_count = sum(1 for k in keys if k.enabled and not k.is_cooling_down)
total_count = len(keys)
passed_count = sum(1 for k in keys if k.last_test_result == 'passed')
self.stats_label.config(text=f"Keys: {active_count} active / {total_count} total | {passed_count} passed tests")
def _create_progress_dialog(self):
"""Create simple progress dialog at mouse cursor position"""
self.progress_dialog = tk.Toplevel(self.dialog)
self.progress_dialog.title("Testing API Keys")
# Get mouse position
x = self.progress_dialog.winfo_pointerx()
y = self.progress_dialog.winfo_pointery()
# Set geometry at cursor position (offset slightly so cursor is inside window)
self.progress_dialog.geometry(f"500x400+{x-50}+{y-30}")
# Add label
label = tb.Label(self.progress_dialog, text="Testing in progress...",
font=('TkDefaultFont', 10, 'bold'))
label.pack(pady=10)
# Add text widget for results
self.progress_text = scrolledtext.ScrolledText(self.progress_dialog,
wrap=tk.WORD, width=60, height=20)
self.progress_text.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10))
# Add close button (initially disabled)
self.close_button = tb.Button(self.progress_dialog, text="Close",
command=self.progress_dialog.destroy,
bootstyle="secondary", state=tk.DISABLED)
self.close_button.pack(pady=(0, 10))
self.progress_dialog.transient(self.dialog)
def _run_tests(self, indices: List[int]):
"""Run API tests for specified keys in parallel"""
from unified_api_client import UnifiedClient
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
# Get Gemini endpoint settings
use_gemini_endpoint = os.getenv("USE_GEMINI_OPENAI_ENDPOINT", "0") == "1"
gemini_endpoint = os.getenv("GEMINI_OPENAI_ENDPOINT", "")
# Create thread pool for parallel testing
max_workers = min(10, len(indices)) # Limit to 10 concurrent tests
def test_single_key(index):
"""Test a single API key"""
if index >= len(self.key_pool.keys):
return None
key = self.key_pool.keys[index]
# Create a key identifier
key_preview = f"{key.api_key[:8]}...{key.api_key[-4:]}" if len(key.api_key) > 12 else key.api_key
test_label = f"{key.model} [{key_preview}]"
# Update UI to show test started
self.dialog.after(0, lambda label=test_label: self.progress_text.insert(tk.END, f"Testing {label}... "))
self.dialog.after(0, lambda: self.progress_text.see(tk.END))
try:
# Count this usage for times used in testing as well
try:
key.times_used += 1
except Exception:
pass
# Check if this is a Gemini model with custom endpoint
is_gemini_model = key.model.lower().startswith('gemini')
if is_gemini_model and use_gemini_endpoint and gemini_endpoint:
# Test Gemini with OpenAI-compatible endpoint
import openai
endpoint_url = gemini_endpoint
if not endpoint_url.endswith('/openai/'):
endpoint_url = endpoint_url.rstrip('/') + '/openai/'
client = openai.OpenAI(
api_key=key.api_key,
base_url=endpoint_url,
timeout=10.0
)
response = client.chat.completions.create(
model=key.model.replace('gemini/', ''),
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say 'API test successful' and nothing else."}
],
max_tokens=100,
temperature=0.7
)
content = response.choices[0].message.content
if content and "test successful" in content.lower():
self.dialog.after(0, lambda label=test_label: self._update_test_result(label, True))
key.mark_success()
return (index, True, "Test passed")
else:
self.dialog.after(0, lambda label=test_label: self._update_test_result(label, False))
key.mark_error()
return (index, False, "Unexpected response")
else:
# Use UnifiedClient for non-Gemini or regular Gemini
client = UnifiedClient(
api_key=key.api_key,
model=key.model,
output_dir=None
)
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say 'API test successful' and nothing else."}
]
response = client.send(
messages,
temperature=0.7,
max_tokens=100
)
if response and isinstance(response, tuple):
content, finish_reason = response
if content and "test successful" in content.lower():
self.dialog.after(0, lambda label=test_label: self._update_test_result(label, True))
key.mark_success()
return (index, True, "Test passed")
else:
self.dialog.after(0, lambda label=test_label: self._update_test_result(label, False))
key.mark_error()
return (index, False, "Unexpected response")
else:
self.dialog.after(0, lambda label=test_label: self._update_test_result(label, False))
key.mark_error()
return (index, False, "No response")
except Exception as e:
error_msg = str(e)
error_code = None
if "429" in error_msg or "rate limit" in error_msg.lower():
error_code = 429
self.dialog.after(0, lambda label=test_label: self._update_test_result(label, False, error=True))
key.mark_error(error_code)
return (index, False, f"Error: {error_msg}")
# Run tests in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all test tasks
future_to_index = {executor.submit(test_single_key, i): i for i in indices}
# Process results as they complete
for future in as_completed(future_to_index):
result = future.result()
if result:
self.test_results.put(result)
# Show completion and close button
self.dialog.after(0, self._show_completion)
# Process final results
self.dialog.after(0, self._process_test_results)
def _update_test_result(self, test_label, success, error=False):
"""Update the progress text with test result"""
# Find the line with this test label
content = self.progress_text.get("1.0", tk.END)
lines = content.split('\n')
for i, line in enumerate(lines):
if test_label in line and not any(status in line for status in ["✅", "❌"]):
# This is our line, update it
if error:
result_text = "❌ ERROR"
elif success:
result_text = "✅ PASSED"
else:
result_text = "❌ FAILED"
# Calculate position
line_num = i + 1
line_end = f"{line_num}.end"
self.progress_text.insert(line_end, result_text)
self.progress_text.insert(line_end, "\n")
self.progress_text.see(tk.END)
break
def _show_completion(self):
"""Show completion in the same dialog"""
self.progress_text.insert(tk.END, "\n--- Testing Complete ---\n")
self.progress_text.see(tk.END)
def _process_test_results(self):
"""Process test results and show in the same dialog"""
results = []
# Get all results
while not self.test_results.empty():
try:
results.append(self.test_results.get_nowait())
except:
break
if results:
# Build result message
success_count = sum(1 for _, success, _ in results if success)
total_count = len(results)
# Update everything at once after all tests complete
def final_update():
# Clear testing flags
for index in indices:
if index < len(self.key_pool.keys):
key = self.key_pool.keys[index]
if hasattr(key, '_testing'):
delattr(key, '_testing')
self._refresh_key_list()
self.stats_label.config(text=f"Test complete: {success_count}/{total_count} passed")
# Use lambda to capture the variables in scope
self.dialog.after(0, lambda: final_update())
# Add summary to the same dialog
self.progress_text.insert(tk.END, f"\nSummary: {success_count}/{total_count} passed\n")
self.progress_text.insert(tk.END, "-" * 50 + "\n\n")
for i, success, msg in results:
key = self.key_pool.keys[i]
# Show key identifier in results too
key_preview = f"{key.api_key[:8]}...{key.api_key[-4:]}" if len(key.api_key) > 12 else key.api_key
status = "✅" if success else "❌"
self.progress_text.insert(tk.END, f"{status} {key.model} [{key_preview}]: {msg}\n")
self.progress_text.see(tk.END)
# Enable close button now that testing is complete
self.close_button.config(state=tk.NORMAL)
# Update the dialog title
self.progress_dialog.title(f"API Test Results - {success_count}/{total_count} passed")
# Refresh list
self._refresh_key_list()
def _enable_selected(self):
"""Enable selected keys"""
selected = self.tree.selection()
for item in selected:
index = self.tree.index(item)
if index < len(self.key_pool.keys):
self.key_pool.keys[index].enabled = True
self._refresh_key_list()
self._show_status(f"Enabled {len(selected)} key(s)")
def _disable_selected(self):
"""Disable selected keys"""
selected = self.tree.selection()
for item in selected:
index = self.tree.index(item)
if index < len(self.key_pool.keys):
self.key_pool.keys[index].enabled = False
self._refresh_key_list()
self._show_status(f"Disabled {len(selected)} key(s)")
def _remove_selected(self):
"""Remove selected keys"""
selected = self.tree.selection()
if not selected:
return
if messagebox.askyesno("Confirm", f"Remove {len(selected)} selected key(s)?"):
# Get indices in reverse order to avoid index shifting
indices = sorted([self.tree.index(item) for item in selected], reverse=True)
for index in indices:
self.key_pool.remove_key(index)
self._refresh_key_list()
self._show_status(f"Removed {len(selected)} key(s)")
def _edit_cooldown(self):
"""Edit cooldown for selected key"""
selected = self.tree.selection()
if not selected or len(selected) != 1:
messagebox.showwarning("Warning", "Please select exactly one key")
return
index = self.tree.index(selected[0])
if index >= len(self.key_pool.keys):
return
key = self.key_pool.keys[index]
# Create simple dialog
dialog = tk.Toplevel(self.dialog)
dialog.title("Edit Cooldown")
dialog.geometry("300x150")
tk.Label(dialog, text=f"Cooldown for {key.model}:").pack(pady=10)
cooldown_var = tk.IntVar(value=key.cooldown)
tb.Spinbox(dialog, from_=10, to=3600, textvariable=cooldown_var,
width=10).pack(pady=5)
def _import_keys(self):
"""Import keys from JSON file"""
from tkinter import filedialog
filename = filedialog.askopenfilename(
title="Import API Keys",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
)
if filename:
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
# Load keys
imported_count = 0
for key_data in data:
if isinstance(key_data, dict) and 'api_key' in key_data and 'model' in key_data:
self.key_pool.add_key(APIKeyEntry.from_dict(key_data))
imported_count += 1
self._refresh_key_list()
messagebox.showinfo("Success", f"Imported {imported_count} API keys")
else:
messagebox.showerror("Error", "Invalid file format")
except Exception as e:
messagebox.showerror("Error", f"Failed to import: {str(e)}")
def _export_keys(self):
"""Export keys to JSON file"""
from tkinter import filedialog
if not self.key_pool.keys:
messagebox.showwarning("Warning", "No keys to export")
return
filename = filedialog.asksaveasfilename(
title="Export API Keys",
defaultextension=".json",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
)
if filename:
try:
# Convert keys to list of dicts
key_list = [key.to_dict() for key in self.key_pool.get_all_keys()]
with open(filename, 'w', encoding='utf-8') as f:
json.dump(key_list, f, indent=2, ensure_ascii=False)
messagebox.showinfo("Success", f"Exported {len(key_list)} API keys")
except Exception as e:
messagebox.showerror("Error", f"Failed to export: {str(e)}")
def _show_status(self, message: str):
"""Show status message"""
self.stats_label.config(text=message)
def _save_and_close(self):
"""Save configuration and close"""
self._save_keys_to_config()
messagebox.showinfo("Success", "API key configuration saved")
self.dialog.destroy()
def _on_close(self):
"""Handle dialog close"""
self.dialog.destroy()