File size: 6,880 Bytes
5b5f50c 45df059 8c617d5 5b5f50c e0ec429 5b5f50c bed2d0a e0ec429 5b5f50c 45df059 5b5f50c bed2d0a 5b5f50c dac104e 45df059 dac104e 5b5f50c dac104e 5b5f50c dac104e 5b5f50c dac104e 5b5f50c dac104e 5b5f50c 8c617d5 dac104e e0ec429 5b5f50c dac104e 5b5f50c dac104e 5b5f50c bed2d0a dac104e e0ec429 dac104e 5b5f50c dac104e 5b5f50c dac104e 5b5f50c dac104e 5b5f50c fde6c6f 5b5f50c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
import requests
import logging
import re
from datetime import datetime
from typing import List, Dict, Optional, Union
from core.providers.base import LLMProvider
from utils.config import config
from services.weather import weather_service
logger = logging.getLogger(__name__)
class OllamaProvider(LLMProvider):
"""Ollama LLM provider implementation"""
def __init__(self, model_name: str, timeout: int = 60, max_retries: int = 3):
# Increased timeout from 30 to 60
super().__init__(model_name, timeout, max_retries)
self.host = self._sanitize_host(config.ollama_host or "http://localhost:11434")
# Headers to skip ngrok browser warning
self.headers = {
"ngrok-skip-browser-warning": "true",
"User-Agent": "CosmicCat-AI-Assistant"
}
def _sanitize_host(self, host: str) -> str:
"""Sanitize host URL by removing whitespace and control characters"""
if not host:
return "http://localhost:11434"
# Remove leading/trailing whitespace and control characters
host = host.strip()
# Remove any newlines or control characters
host = re.sub(r'[\r\n\t\0]+', '', host)
# Ensure URL has a scheme
if not host.startswith(('http://', 'https://')):
host = 'http://' + host
return host
def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
"""Generate a response synchronously"""
try:
return self._retry_with_backoff(self._generate_impl, prompt, conversation_history)
except Exception as e:
logger.error(f"Ollama generation failed: {e}")
return None
def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]:
"""Generate a response with streaming support"""
try:
return self._retry_with_backoff(self._stream_generate_impl, prompt, conversation_history)
except Exception as e:
logger.error(f"Ollama stream generation failed: {e}")
return None
def validate_model(self) -> bool:
"""Validate if the model is available"""
try:
response = requests.get(
f"{self.host}/api/tags",
headers=self.headers,
timeout=self.timeout
)
if response.status_code == 200:
models = response.json().get("models", [])
model_names = [model.get("name") for model in models]
return self.model_name in model_names
elif response.status_code == 404:
# Try alternative endpoint
response2 = requests.get(
f"{self.host}",
headers=self.headers,
timeout=self.timeout
)
return response2.status_code == 200
return False
except Exception as e:
logger.error(f"Model validation failed: {e}")
return False
def _generate_impl(self, prompt: str, conversation_history: List[Dict]) -> str:
"""Implementation of synchronous generation with enhanced debugging"""
url = f"{self.host}/api/chat"
# Prepare messages - ensure proper format
messages = []
for msg in conversation_history:
if isinstance(msg, dict) and "role" in msg and "content" in msg:
messages.append({
"role": msg["role"],
"content": str(msg["content"])
})
# Add the current prompt if not already in history
if not messages or messages[-1].get("content") != prompt:
messages.append({"role": "user", "content": prompt})
payload = {
"model": self.model_name,
"messages": messages,
"stream": False
}
logger.info(f"Ollama request URL: {url}")
logger.info(f"Ollama request payload: {payload}")
logger.info(f"Ollama headers: {self.headers}")
try:
response = requests.post(
url,
json=payload,
headers=self.headers,
timeout=self.timeout
)
logger.info(f"Ollama response status: {response.status_code}")
logger.info(f"Ollama response headers: {dict(response.headers)}")
response.raise_for_status()
result = response.json()
logger.info(f"Ollama response body: {result}")
# Extract content properly
if "message" in result and "content" in result["message"]:
content = result["message"]["content"]
logger.info(f"Extracted content: {content[:100] if content else 'None'}")
return content
elif "response" in result:
content = result["response"]
logger.info(f"Extracted response: {content[:100] if content else 'None'}")
return content
else:
content = str(result)
logger.info(f"Raw result as string: {content[:100]}")
return content
except requests.exceptions.RequestException as e:
logger.error(f"Ollama API request error: {str(e)}")
raise Exception(f"Ollama API error: {str(e)}")
except Exception as e:
logger.error(f"Failed to parse Ollama response: {str(e)}")
raise Exception(f"Failed to parse Ollama response: {str(e)}")
def _stream_generate_impl(self, prompt: str, conversation_history: List[Dict]) -> List[str]:
"""Implementation of streaming generation"""
url = f"{self.host}/api/chat"
messages = conversation_history.copy()
# Add the current prompt if not already in history
if not messages or messages[-1].get("content") != prompt:
messages.append({"role": "user", "content": prompt})
payload = {
"model": self.model_name,
"messages": messages,
"stream": True
}
response = requests.post(
url,
json=payload,
headers=self.headers,
timeout=self.timeout,
stream=True
)
response.raise_for_status()
chunks = []
for line in response.iter_lines():
if line:
chunk = line.decode('utf-8')
try:
data = eval(chunk) # Simplified JSON parsing
content = data.get("message", {}).get("content", "")
if content:
chunks.append(content)
except:
continue
return chunks
|