|
|
import json |
|
|
import logging |
|
|
from datetime import datetime |
|
|
from uuid import uuid4 |
|
|
import requests |
|
|
from pathlib import Path |
|
|
from datasets import load_dataset, Dataset |
|
|
import os |
|
|
from huggingface_hub import CommitScheduler, HfApi |
|
|
import random |
|
|
|
|
|
class ChatLogger: |
|
|
def __init__(self, scheduler): |
|
|
"""Initialize the chat logger with paths and configurations""" |
|
|
if not scheduler: |
|
|
raise ValueError("Scheduler is required") |
|
|
|
|
|
self.scheduler = scheduler |
|
|
self.json_dataset_dir = Path(scheduler.folder_path) |
|
|
|
|
|
|
|
|
try: |
|
|
self.json_dataset_dir.mkdir(parents=True, exist_ok=True) |
|
|
logging.info(f"Using dataset directory at: {self.json_dataset_dir}") |
|
|
except Exception as e: |
|
|
logging.error(f"Error creating dataset directory: {str(e)}") |
|
|
raise |
|
|
|
|
|
self.logs_path = self.json_dataset_dir / f"logs-{uuid4()}.jsonl" |
|
|
logging.info(f"Log file will be created at: {self.logs_path}") |
|
|
|
|
|
def get_client_ip(self, request=None): |
|
|
"""Get the client IP address from the request context""" |
|
|
try: |
|
|
if request: |
|
|
|
|
|
ip = request.client.host |
|
|
|
|
|
forwarded_for = request.headers.get('X-Forwarded-For') |
|
|
if forwarded_for: |
|
|
|
|
|
ip = forwarded_for.split(',')[0].strip() |
|
|
|
|
|
logging.debug(f"Client IP detected: {ip}") |
|
|
return ip |
|
|
except Exception as e: |
|
|
logging.error(f"Error getting client IP: {e}") |
|
|
return "127.0.0.1" |
|
|
|
|
|
def get_client_location(self, ip_address): |
|
|
"""Get geolocation info using ipapi.co""" |
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
|
} |
|
|
try: |
|
|
response = requests.get( |
|
|
f'https://ipapi.co/{ip_address}/json/', |
|
|
headers=headers, |
|
|
timeout=5 |
|
|
) |
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
|
|
|
lat = data.get('latitude') |
|
|
lon = data.get('longitude') |
|
|
if lat is not None and lon is not None: |
|
|
lat += random.uniform(-0.01, 0.01) |
|
|
lon += random.uniform(-0.01, 0.01) |
|
|
|
|
|
return { |
|
|
'city': data.get('city'), |
|
|
'region': data.get('region'), |
|
|
'country': data.get('country_name'), |
|
|
'latitude': lat, |
|
|
'longitude': lon |
|
|
} |
|
|
elif response.status_code == 429: |
|
|
logging.warning(f"Rate limit exceeded for IP lookup") |
|
|
return None |
|
|
else: |
|
|
logging.error(f"Error in IP lookup: Status code {response.status_code}") |
|
|
return None |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
logging.error(f"Request failed in IP lookup: {str(e)}") |
|
|
return None |
|
|
|
|
|
def create_log_entry(self, query, answer, retrieved_content, feedback=None, request=None): |
|
|
"""Create a structured log entry with all required fields""" |
|
|
timestamp = datetime.now().timestamp() |
|
|
|
|
|
|
|
|
ip = self.get_client_ip(request) if request else None |
|
|
location = self.get_client_location(ip) if ip else None |
|
|
|
|
|
log_entry = { |
|
|
"record_id": str(uuid4()), |
|
|
"session_id": str(uuid4()), |
|
|
"time": str(timestamp), |
|
|
"client_location": location, |
|
|
"question": query, |
|
|
"answer": answer, |
|
|
"retrieved_content": retrieved_content if isinstance(retrieved_content, list) else [retrieved_content], |
|
|
"feedback": feedback |
|
|
} |
|
|
|
|
|
return log_entry |
|
|
|
|
|
def cleanup_local_files(self): |
|
|
"""Delete local JSON files after successful upload""" |
|
|
try: |
|
|
|
|
|
for file in self.json_dataset_dir.glob("*.json*"): |
|
|
try: |
|
|
file.unlink() |
|
|
logging.info(f"Deleted local file: {file}") |
|
|
except Exception as e: |
|
|
logging.error(f"Error deleting file {file}: {e}") |
|
|
|
|
|
|
|
|
if not any(self.json_dataset_dir.iterdir()): |
|
|
self.json_dataset_dir.rmdir() |
|
|
logging.info("Removed empty json_dataset directory") |
|
|
except Exception as e: |
|
|
logging.error(f"Error in cleanup: {e}") |
|
|
|
|
|
def save_local(self, log_entry): |
|
|
"""Save log entry to local JSONL file""" |
|
|
try: |
|
|
|
|
|
self.logs_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
field_order = [ |
|
|
"record_id", |
|
|
"session_id", |
|
|
"time", |
|
|
"client_location", |
|
|
"question", |
|
|
"answer", |
|
|
"retrieved_content", |
|
|
"feedback" |
|
|
] |
|
|
ordered_logs = {k: log_entry.get(k) for k in field_order if k in log_entry} |
|
|
|
|
|
with self.scheduler.lock: |
|
|
with open(self.logs_path, 'a') as f: |
|
|
json.dump(ordered_logs, f) |
|
|
f.write('\n') |
|
|
logging.info(f"Log entry saved to {self.logs_path}") |
|
|
|
|
|
|
|
|
self.cleanup_local_files() |
|
|
return True |
|
|
except Exception as e: |
|
|
logging.error(f"Error saving to local file: {str(e)}") |
|
|
return False |
|
|
|
|
|
def log(self, query, answer, retrieved_content, feedback=None, request=None): |
|
|
"""Main logging method that handles both local and HF storage""" |
|
|
|
|
|
log_entry = self.create_log_entry( |
|
|
query=query, |
|
|
answer=answer, |
|
|
retrieved_content=retrieved_content, |
|
|
feedback=feedback, |
|
|
request=request |
|
|
) |
|
|
logging.info("Logging results completed") |
|
|
|
|
|
return self.save_local(log_entry) |
|
|
|