File size: 7,509 Bytes
f92da22 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
#!/usr/bin/env python3
"""
SFTP Model Downloader Agent
Handles downloading model files from SFTP server
"""
import os
import re
import glob
import pysftp
from typing import List, Dict
from langchain.tools import tool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.agents import AgentExecutor, create_openai_tools_agent
@tool
def scan_transcription_files(transcriptions_dir: str = "transcriptions") -> List[Dict[str, str]]:
"""Scan the transcriptions directory and extract model identifiers from filenames."""
if not os.path.exists(transcriptions_dir):
raise FileNotFoundError(
f"Transcriptions directory not found: {transcriptions_dir}")
transcription_files = glob.glob(os.path.join(transcriptions_dir, "*.json"))
model_identifiers = []
for file_path in transcription_files:
filename = os.path.basename(file_path)
# Extract model identifier from filename pattern: transcriptions_default.99.019111585.rtf_...
match = re.search(r'transcriptions_(.+)\.rtf_', filename)
if match:
model_id = match.group(1)
model_identifiers.append({
'model_id': model_id,
'filename': filename,
'file_path': file_path,
# Keep .rtf for SFTP download
'model_filename': f"{model_id}.rtf",
# Use .doc for local storage
'local_filename': f"{model_id}.doc"
})
return model_identifiers
@tool
def download_model_from_sftp(model_filename: str, local_download_dir: str = "models", force_download: bool = False) -> str:
"""Download a specific model file from SFTP server and convert extension from .rtf to .doc. If force_download is True, always re-download."""
# Import configuration
try:
from sftp_config import get_sftp_config
sftp_config = get_sftp_config()
except ImportError:
# Fallback to environment variables if config file not available
sftp_config = {
'host': os.getenv('SFTP_HOST', 'localhost'),
'port': int(os.getenv('SFTP_PORT', '22')),
'username': os.getenv('SFTP_USERNAME', 'user'),
'password': os.getenv('SFTP_PASSWORD', 'password'),
'remote_path': os.getenv('SFTP_REMOTE_PATH', '/models/')
}
# Create local directory if it doesn't exist
os.makedirs(local_download_dir, exist_ok=True)
# Convert filename from .rtf to .doc
doc_filename = model_filename.replace('.rtf', '.doc')
local_file_path = os.path.join(local_download_dir, doc_filename)
# If force_download is False and file exists, skip download
if not force_download and os.path.exists(local_file_path):
print(f"βΉοΈ Model already exists locally: {local_file_path}")
return local_file_path
try:
# Connect to SFTP server
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None # Disable host key checking for development
print(
f"π Connecting to SFTP server: {sftp_config['host']}:{sftp_config['port']}")
with pysftp.Connection(
host=sftp_config['host'],
port=sftp_config['port'],
username=sftp_config['username'],
password=sftp_config['password'],
cnopts=cnopts
) as sftp:
remote_file_path = os.path.join(
sftp_config['remote_path'], model_filename)
# Check if file exists on server
if not sftp.exists(remote_file_path):
raise FileNotFoundError(
f"Model file not found on SFTP server: {remote_file_path}")
# Get file size for progress tracking
file_size = sftp.stat(remote_file_path).st_size
print(
f"π Found file on server: {remote_file_path} ({file_size} bytes)")
# Download the file with original .rtf extension first
temp_rtf_path = os.path.join(local_download_dir, model_filename)
sftp.get(remote_file_path, temp_rtf_path)
print(f"π₯ Downloaded model: {model_filename}")
# Rename file from .rtf to .doc
if os.path.exists(local_file_path):
os.remove(local_file_path)
os.rename(temp_rtf_path, local_file_path)
print(f"β
Converted extension: {model_filename} -> {doc_filename}")
return local_file_path
except pysftp.AuthenticationException:
error_msg = f"Authentication failed for SFTP server {sftp_config['host']}"
print(f"β {error_msg}")
raise Exception(error_msg)
except pysftp.ConnectionException as e:
error_msg = f"Connection failed to SFTP server {sftp_config['host']}: {str(e)}"
print(f"β {error_msg}")
raise Exception(error_msg)
except FileNotFoundError as e:
error_msg = str(e)
print(f"β {error_msg}")
raise
except Exception as e:
error_msg = f"Error downloading model {model_filename}: {str(e)}"
print(f"β {error_msg}")
raise Exception(error_msg)
@tool
def batch_download_models(model_identifiers: List[Dict[str, str]], local_download_dir: str = "models") -> List[str]:
"""Download multiple model files from SFTP server in batch."""
downloaded_files = []
for model_info in model_identifiers:
model_filename = model_info['model_filename'] # .rtf file for SFTP
local_filename = model_info.get('local_filename', model_filename.replace(
'.rtf', '.doc')) # .doc file for local
try:
local_path = download_model_from_sftp(
model_filename, local_download_dir)
downloaded_files.append({
'model_id': model_info['model_id'],
'local_path': local_path,
'local_filename': local_filename,
'status': 'success'
})
except Exception as e:
downloaded_files.append({
'model_id': model_info['model_id'],
'local_path': None,
'local_filename': local_filename,
'status': 'error',
'error': str(e)
})
return downloaded_files
def create_sftp_downloader_agent(llm):
"""Create the SFTP downloader agent."""
sftp_downloader_prompt = ChatPromptTemplate.from_messages([
("system", """You are an SFTP model downloader agent. Your task is to:
1. Scan the transcriptions directory to identify which models are needed
2. Download the corresponding model files from the SFTP server
3. Return the list of successfully downloaded models
You should handle errors gracefully and provide detailed feedback about the download process."""),
("human",
"Analyze the transcriptions in {transcriptions_dir} and download the corresponding models from SFTP."),
MessagesPlaceholder("agent_scratchpad")
])
sftp_downloader_agent = create_openai_tools_agent(
llm=llm,
tools=[scan_transcription_files,
download_model_from_sftp, batch_download_models],
prompt=sftp_downloader_prompt
)
sftp_downloader_executor = AgentExecutor(
agent=sftp_downloader_agent,
tools=[scan_transcription_files,
download_model_from_sftp, batch_download_models],
verbose=True
)
return sftp_downloader_executor
|