|
|
|
|
|
""" |
|
|
SFTP Model Downloader Agent |
|
|
Handles downloading model files from SFTP server |
|
|
""" |
|
|
|
|
|
import os |
|
|
import re |
|
|
import glob |
|
|
import pysftp |
|
|
from typing import List, Dict |
|
|
from langchain.tools import tool |
|
|
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder |
|
|
from langchain.agents import AgentExecutor, create_openai_tools_agent |
|
|
|
|
|
|
|
|
@tool |
|
|
def scan_transcription_files(transcriptions_dir: str = "transcriptions") -> List[Dict[str, str]]: |
|
|
"""Scan the transcriptions directory and extract model identifiers from filenames.""" |
|
|
if not os.path.exists(transcriptions_dir): |
|
|
raise FileNotFoundError( |
|
|
f"Transcriptions directory not found: {transcriptions_dir}") |
|
|
|
|
|
transcription_files = glob.glob(os.path.join(transcriptions_dir, "*.json")) |
|
|
model_identifiers = [] |
|
|
|
|
|
for file_path in transcription_files: |
|
|
filename = os.path.basename(file_path) |
|
|
|
|
|
match = re.search(r'transcriptions_(.+)\.rtf_', filename) |
|
|
if match: |
|
|
model_id = match.group(1) |
|
|
model_identifiers.append({ |
|
|
'model_id': model_id, |
|
|
'filename': filename, |
|
|
'file_path': file_path, |
|
|
|
|
|
'model_filename': f"{model_id}.rtf", |
|
|
|
|
|
'local_filename': f"{model_id}.doc" |
|
|
}) |
|
|
|
|
|
return model_identifiers |
|
|
|
|
|
|
|
|
@tool |
|
|
def download_model_from_sftp(model_filename: str, local_download_dir: str = "models", force_download: bool = False) -> str: |
|
|
"""Download a specific model file from SFTP server and convert extension from .rtf to .doc. If force_download is True, always re-download.""" |
|
|
|
|
|
try: |
|
|
from sftp_config import get_sftp_config |
|
|
sftp_config = get_sftp_config() |
|
|
except ImportError: |
|
|
|
|
|
sftp_config = { |
|
|
'host': os.getenv('SFTP_HOST', 'localhost'), |
|
|
'port': int(os.getenv('SFTP_PORT', '22')), |
|
|
'username': os.getenv('SFTP_USERNAME', 'user'), |
|
|
'password': os.getenv('SFTP_PASSWORD', 'password'), |
|
|
'remote_path': os.getenv('SFTP_REMOTE_PATH', '/models/') |
|
|
} |
|
|
|
|
|
|
|
|
os.makedirs(local_download_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
doc_filename = model_filename.replace('.rtf', '.doc') |
|
|
local_file_path = os.path.join(local_download_dir, doc_filename) |
|
|
|
|
|
|
|
|
if not force_download and os.path.exists(local_file_path): |
|
|
print(f"βΉοΈ Model already exists locally: {local_file_path}") |
|
|
return local_file_path |
|
|
|
|
|
try: |
|
|
|
|
|
cnopts = pysftp.CnOpts() |
|
|
cnopts.hostkeys = None |
|
|
|
|
|
print( |
|
|
f"π Connecting to SFTP server: {sftp_config['host']}:{sftp_config['port']}") |
|
|
|
|
|
with pysftp.Connection( |
|
|
host=sftp_config['host'], |
|
|
port=sftp_config['port'], |
|
|
username=sftp_config['username'], |
|
|
password=sftp_config['password'], |
|
|
cnopts=cnopts |
|
|
) as sftp: |
|
|
remote_file_path = os.path.join( |
|
|
sftp_config['remote_path'], model_filename) |
|
|
|
|
|
|
|
|
if not sftp.exists(remote_file_path): |
|
|
raise FileNotFoundError( |
|
|
f"Model file not found on SFTP server: {remote_file_path}") |
|
|
|
|
|
|
|
|
file_size = sftp.stat(remote_file_path).st_size |
|
|
print( |
|
|
f"π Found file on server: {remote_file_path} ({file_size} bytes)") |
|
|
|
|
|
|
|
|
temp_rtf_path = os.path.join(local_download_dir, model_filename) |
|
|
sftp.get(remote_file_path, temp_rtf_path) |
|
|
print(f"π₯ Downloaded model: {model_filename}") |
|
|
|
|
|
|
|
|
if os.path.exists(local_file_path): |
|
|
os.remove(local_file_path) |
|
|
os.rename(temp_rtf_path, local_file_path) |
|
|
print(f"β
Converted extension: {model_filename} -> {doc_filename}") |
|
|
|
|
|
return local_file_path |
|
|
|
|
|
except pysftp.AuthenticationException: |
|
|
error_msg = f"Authentication failed for SFTP server {sftp_config['host']}" |
|
|
print(f"β {error_msg}") |
|
|
raise Exception(error_msg) |
|
|
except pysftp.ConnectionException as e: |
|
|
error_msg = f"Connection failed to SFTP server {sftp_config['host']}: {str(e)}" |
|
|
print(f"β {error_msg}") |
|
|
raise Exception(error_msg) |
|
|
except FileNotFoundError as e: |
|
|
error_msg = str(e) |
|
|
print(f"β {error_msg}") |
|
|
raise |
|
|
except Exception as e: |
|
|
error_msg = f"Error downloading model {model_filename}: {str(e)}" |
|
|
print(f"β {error_msg}") |
|
|
raise Exception(error_msg) |
|
|
|
|
|
|
|
|
@tool |
|
|
def batch_download_models(model_identifiers: List[Dict[str, str]], local_download_dir: str = "models") -> List[str]: |
|
|
"""Download multiple model files from SFTP server in batch.""" |
|
|
downloaded_files = [] |
|
|
|
|
|
for model_info in model_identifiers: |
|
|
model_filename = model_info['model_filename'] |
|
|
local_filename = model_info.get('local_filename', model_filename.replace( |
|
|
'.rtf', '.doc')) |
|
|
|
|
|
try: |
|
|
local_path = download_model_from_sftp( |
|
|
model_filename, local_download_dir) |
|
|
downloaded_files.append({ |
|
|
'model_id': model_info['model_id'], |
|
|
'local_path': local_path, |
|
|
'local_filename': local_filename, |
|
|
'status': 'success' |
|
|
}) |
|
|
except Exception as e: |
|
|
downloaded_files.append({ |
|
|
'model_id': model_info['model_id'], |
|
|
'local_path': None, |
|
|
'local_filename': local_filename, |
|
|
'status': 'error', |
|
|
'error': str(e) |
|
|
}) |
|
|
|
|
|
return downloaded_files |
|
|
|
|
|
|
|
|
def create_sftp_downloader_agent(llm): |
|
|
"""Create the SFTP downloader agent.""" |
|
|
sftp_downloader_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", """You are an SFTP model downloader agent. Your task is to: |
|
|
1. Scan the transcriptions directory to identify which models are needed |
|
|
2. Download the corresponding model files from the SFTP server |
|
|
3. Return the list of successfully downloaded models |
|
|
|
|
|
You should handle errors gracefully and provide detailed feedback about the download process."""), |
|
|
("human", |
|
|
"Analyze the transcriptions in {transcriptions_dir} and download the corresponding models from SFTP."), |
|
|
MessagesPlaceholder("agent_scratchpad") |
|
|
]) |
|
|
|
|
|
sftp_downloader_agent = create_openai_tools_agent( |
|
|
llm=llm, |
|
|
tools=[scan_transcription_files, |
|
|
download_model_from_sftp, batch_download_models], |
|
|
prompt=sftp_downloader_prompt |
|
|
) |
|
|
|
|
|
sftp_downloader_executor = AgentExecutor( |
|
|
agent=sftp_downloader_agent, |
|
|
tools=[scan_transcription_files, |
|
|
download_model_from_sftp, batch_download_models], |
|
|
verbose=True |
|
|
) |
|
|
|
|
|
return sftp_downloader_executor |
|
|
|