Glossarion / history_manager.py
Shirochi's picture
Upload 41 files
457b8fd verified
import json
import os
import time
import tempfile
import shutil
from threading import Lock
from contextlib import contextmanager
class HistoryManager:
"""Thread-safe history management with file locking"""
def __init__(self, payloads_dir):
self.payloads_dir = payloads_dir
self.hist_path = os.path.join(payloads_dir, "translation_history.json")
self.lock = Lock()
self._file_locks = {}
@contextmanager
def _file_lock(self, filepath):
"""Simple file locking mechanism"""
lock_file = filepath + '.lock'
acquired = False
try:
# Try to acquire lock with timeout
start_time = time.time()
while time.time() - start_time < 30: # 30 second timeout
try:
# Create lock file atomically
fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
acquired = True
break
except FileExistsError:
time.sleep(0.1)
if not acquired:
raise TimeoutError(f"Could not acquire lock for {filepath}")
yield
finally:
if acquired and os.path.exists(lock_file):
try:
os.remove(lock_file)
except:
pass
def load_history(self):
"""Load history with retry logic and file locking"""
with self.lock:
for attempt in range(3):
try:
with self._file_lock(self.hist_path):
if os.path.exists(self.hist_path):
with open(self.hist_path, "r", encoding="utf-8") as f:
return json.load(f)
return []
except (json.JSONDecodeError, IOError) as e:
print(f"[WARNING] Failed to load history (attempt {attempt + 1}): {e}")
if attempt < 2:
time.sleep(0.5)
else:
# Return empty history if all attempts fail
return []
return []
def save_history(self, history):
"""Save history atomically with file locking"""
with self.lock:
with self._file_lock(self.hist_path):
# Write to temporary file first
temp_fd, temp_path = tempfile.mkstemp(dir=self.payloads_dir, text=True)
try:
with os.fdopen(temp_fd, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
# Atomically replace the old file
shutil.move(temp_path, self.hist_path)
except Exception as e:
# Clean up temp file on error
if os.path.exists(temp_path):
os.remove(temp_path)
raise e
def append_to_history(self, user_content, assistant_content, hist_limit, reset_on_limit=True, rolling_window=False):
"""
Append to history with automatic reset or rolling window when limit is reached
Args:
user_content: User message content
assistant_content: Assistant message content
hist_limit: Maximum number of exchanges to keep (0 = no history)
reset_on_limit: Whether to reset when limit is reached (old behavior)
rolling_window: Whether to use rolling window mode (new behavior)
"""
# CRITICAL FIX: If hist_limit is 0 or negative, don't maintain any history
if hist_limit <= 0:
# Don't load, save, or maintain any history when contextual is disabled
return []
history = self.load_history()
# Count current exchanges (each exchange = 2 messages: user + assistant)
current_exchanges = len(history) // 2
# Handle limit reached
if current_exchanges >= hist_limit:
if rolling_window:
# Rolling window mode: keep only the most recent (limit-1) exchanges
# We keep limit-1 to make room for the new exchange
messages_to_keep = (hist_limit - 1) * 2
if messages_to_keep > 0:
history = history[-messages_to_keep:]
print(f"🔄 Rolling history window: keeping last {hist_limit-1} exchanges")
else:
history = []
elif reset_on_limit:
# Old behavior: complete reset
history = []
print(f"🔄 Reset history after reaching limit of {hist_limit} exchanges")
# Append new entries
history.append({"role": "user", "content": user_content})
history.append({"role": "assistant", "content": assistant_content})
self.save_history(history)
return history
def will_reset_on_next_append(self, hist_limit, rolling_window=False):
"""Check if the next append will trigger a reset or rolling window"""
if hist_limit <= 0:
return False
history = self.load_history()
current_exchanges = len(history) // 2
return current_exchanges >= hist_limit