Spaces:
Running
Running
File size: 5,622 Bytes
457b8fd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
import json
import os
import time
import tempfile
import shutil
from threading import Lock
from contextlib import contextmanager
class HistoryManager:
"""Thread-safe history management with file locking"""
def __init__(self, payloads_dir):
self.payloads_dir = payloads_dir
self.hist_path = os.path.join(payloads_dir, "translation_history.json")
self.lock = Lock()
self._file_locks = {}
@contextmanager
def _file_lock(self, filepath):
"""Simple file locking mechanism"""
lock_file = filepath + '.lock'
acquired = False
try:
# Try to acquire lock with timeout
start_time = time.time()
while time.time() - start_time < 30: # 30 second timeout
try:
# Create lock file atomically
fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
acquired = True
break
except FileExistsError:
time.sleep(0.1)
if not acquired:
raise TimeoutError(f"Could not acquire lock for {filepath}")
yield
finally:
if acquired and os.path.exists(lock_file):
try:
os.remove(lock_file)
except:
pass
def load_history(self):
"""Load history with retry logic and file locking"""
with self.lock:
for attempt in range(3):
try:
with self._file_lock(self.hist_path):
if os.path.exists(self.hist_path):
with open(self.hist_path, "r", encoding="utf-8") as f:
return json.load(f)
return []
except (json.JSONDecodeError, IOError) as e:
print(f"[WARNING] Failed to load history (attempt {attempt + 1}): {e}")
if attempt < 2:
time.sleep(0.5)
else:
# Return empty history if all attempts fail
return []
return []
def save_history(self, history):
"""Save history atomically with file locking"""
with self.lock:
with self._file_lock(self.hist_path):
# Write to temporary file first
temp_fd, temp_path = tempfile.mkstemp(dir=self.payloads_dir, text=True)
try:
with os.fdopen(temp_fd, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
# Atomically replace the old file
shutil.move(temp_path, self.hist_path)
except Exception as e:
# Clean up temp file on error
if os.path.exists(temp_path):
os.remove(temp_path)
raise e
def append_to_history(self, user_content, assistant_content, hist_limit, reset_on_limit=True, rolling_window=False):
"""
Append to history with automatic reset or rolling window when limit is reached
Args:
user_content: User message content
assistant_content: Assistant message content
hist_limit: Maximum number of exchanges to keep (0 = no history)
reset_on_limit: Whether to reset when limit is reached (old behavior)
rolling_window: Whether to use rolling window mode (new behavior)
"""
# CRITICAL FIX: If hist_limit is 0 or negative, don't maintain any history
if hist_limit <= 0:
# Don't load, save, or maintain any history when contextual is disabled
return []
history = self.load_history()
# Count current exchanges (each exchange = 2 messages: user + assistant)
current_exchanges = len(history) // 2
# Handle limit reached
if current_exchanges >= hist_limit:
if rolling_window:
# Rolling window mode: keep only the most recent (limit-1) exchanges
# We keep limit-1 to make room for the new exchange
messages_to_keep = (hist_limit - 1) * 2
if messages_to_keep > 0:
history = history[-messages_to_keep:]
print(f"🔄 Rolling history window: keeping last {hist_limit-1} exchanges")
else:
history = []
elif reset_on_limit:
# Old behavior: complete reset
history = []
print(f"🔄 Reset history after reaching limit of {hist_limit} exchanges")
# Append new entries
history.append({"role": "user", "content": user_content})
history.append({"role": "assistant", "content": assistant_content})
self.save_history(history)
return history
def will_reset_on_next_append(self, hist_limit, rolling_window=False):
"""Check if the next append will trigger a reset or rolling window"""
if hist_limit <= 0:
return False
history = self.load_history()
current_exchanges = len(history) // 2
return current_exchanges >= hist_limit
|