import streamlit as st import openai import os import tempfile import sys import io import subprocess import importlib.util import re from contextlib import redirect_stdout import textwrap import shutil import traceback import json from typing import List, Tuple, Optional, Dict import requests # Added for downloading from URLs # Clean up any existing temp files on startup to save space try: tempdir = tempfile.gettempdir() for item in os.listdir(tempdir): item_path = os.path.join(tempdir, item) if item.startswith('tmp') or item.endswith('.py'): try: if os.path.isfile(item_path): os.unlink(item_path) elif os.path.isdir(item_path): shutil.rmtree(item_path) except: pass # Clean pip cache if exists pip_cache = os.path.expanduser('~/.cache/pip') if os.path.exists(pip_cache): try: shutil.rmtree(pip_cache) except: pass except: pass # Use OpenRouter API (OpenAI-compatible) OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") client = openai.OpenAI( api_key=OPENROUTER_API_KEY, base_url="https://openrouter.ai/api/v1" ) MODEL_NAME = "x-ai/grok-4-fast:free" class ErrorAnalyzer: """Analyze errors and suggest fixes""" @staticmethod def analyze_error(error_message: str, code: str) -> Dict: """Analyze error and return fix strategy""" error_type = "unknown" suggestions = [] packages_to_install = [] # Import errors if "No module named" in error_message or "ModuleNotFoundError" in error_message: error_type = "import_error" module_match = re.search(r"No module named ['\"]([^'\"]+)['\"]", error_message) if module_match: module = module_match.group(1) packages_to_install.append(module) suggestions.append(f"Install missing module: {module}") # Permission errors elif "Permission denied" in error_message or "PermissionError" in error_message: error_type = "permission_error" suggestions.append("Use temp directory for file operations") suggestions.append("Avoid system directories") # Memory errors elif "MemoryError" in error_message or "killed" in error_message.lower(): error_type = "memory_error" suggestions.append("Reduce data size or use chunks") suggestions.append("Process data in smaller batches") # File not found elif "FileNotFoundError" in error_message or "No such file" in error_message: error_type = "file_error" suggestions.append("Check file paths") suggestions.append("Create directory if needed") # Syntax errors elif "SyntaxError" in error_message: error_type = "syntax_error" suggestions.append("Fix syntax issues") suggestions.append("Check indentation") # Attribute errors elif "AttributeError" in error_message: error_type = "attribute_error" suggestions.append("Check method/attribute names") suggestions.append("Verify object types") # Type errors elif "TypeError" in error_message: error_type = "type_error" suggestions.append("Check data types") suggestions.append("Add type conversions") # Value errors elif "ValueError" in error_message: error_type = "value_error" suggestions.append("Validate input data") suggestions.append("Add error handling") # Network errors elif "URLError" in error_message or "ConnectionError" in error_message: error_type = "network_error" suggestions.append("Check internet connection") suggestions.append("Add retry logic") # Package specific errors if "openpyxl" in error_message or "xlrd" in error_message: packages_to_install.append("openpyxl") suggestions.append("Install Excel support: openpyxl") if "PIL" in error_message or "Pillow" in error_message: packages_to_install.append("Pillow") suggestions.append("Install image processing: Pillow") return { "error_type": error_type, "suggestions": suggestions, "packages": packages_to_install, "original_error": error_message } def indent_code(code, spaces=4): """Indent the code by the specified number of spaces.""" indented_lines = [] for line in code.split('\n'): if line.strip(): # Only indent non-empty lines indented_lines.append(' ' * spaces + line) else: indented_lines.append(line) return '\n'.join(indented_lines) def detect_required_packages(code): """Detect required packages from Python code (optimized for accuracy).""" required_packages = set() # Pre-installed packages from requirements.txt pre_installed = { 'gradio', 'openai', 'pillow', 'rembg', 'numpy', 'opencv-python', 'scikit-learn', 'tensorflow', 'torch', 'lxml', 'requests', 'matplotlib', 'seaborn', 'onnxruntime', 'proglog', 'openpyxl', 'moviepy' } # Import patterns import_patterns = [ r'^(?:import|from)\s+(\w+)(?:\.\w+)*', ] # Patterns for pip install in code/comments pip_patterns = [ r'#?\s*pip\s+install\s+([^\s#]+)', r'#?\s*install\s+([^\s#]+)' ] # Usage patterns that require additional packages usage_patterns = { r'\.to_excel': 'openpyxl', r'\.read_excel': 'openpyxl', r'\.ExcelWriter': 'openpyxl', r'\.to_parquet': 'pyarrow', r'\.read_parquet': 'pyarrow', r'\.to_sql': 'sqlalchemy', r'\.read_sql': 'sqlalchemy', r'\.to_feather': 'pyarrow', r'\.read_feather': 'pyarrow', r'\.to_stata': 'statsmodels', r'\.read_stata': 'statsmodels', r'\.to_clipboard': 'pyperclip', r'xlsxwriter': 'xlsxwriter', r'openpyxl': 'openpyxl', } # Check for pip install comments for pattern in pip_patterns: matches = re.findall(pattern, code, re.IGNORECASE | re.MULTILINE) for match in matches: if match and not match.startswith(('"', "'")): pkg = match.split('==')[0].split('>')[0].split('<')[0].strip() if pkg and pkg not in pre_installed: required_packages.add(pkg) # Check imports with mapping for line in code.split('\n'): line = line.strip() if line.startswith(('import ', 'from ')): match = re.search(import_patterns[0], line) if match: module = match.group(1) if module and module not in {'__future__', 'typing'}: module_map = { 'cv2': 'opencv-python', 'sklearn': 'scikit-learn', 'tf': 'tensorflow', 'torch': 'torch', 'numpy': 'numpy', 'pandas': 'pandas', 'PIL': 'Pillow', 'pillow': 'Pillow', 'matplotlib': 'matplotlib', 'seaborn': 'seaborn', 'onnxruntime': 'onnxruntime', 'rembg': 'rembg', 'requests': 'requests', 'openpyxl': 'openpyxl', 'xlrd': 'xlrd', 'xlwt': 'xlwt', 'xlsxwriter': 'xlsxwriter', 'pyarrow': 'pyarrow', 'sqlalchemy': 'sqlalchemy', 'psycopg2': 'psycopg2-binary', 'pymongo': 'pymongo', 'redis': 'redis', 'beautifulsoup4': 'beautifulsoup4', 'bs4': 'beautifulsoup4', 'scrapy': 'scrapy', 'selenium': 'selenium', 'flask': 'flask', 'fastapi': 'fastapi', 'django': 'django', 'pytest': 'pytest', 'hypothesis': 'hypothesis', 'faker': 'faker', } if module in module_map: pkg = module_map[module] if pkg not in pre_installed: required_packages.add(pkg) elif module not in { 'os', 'sys', 'io', 'json', 're', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'operator', 'string', 'pathlib', 'tempfile', 'subprocess', 'logging', 'argparse', 'csv', 'xml', 'html', 'base64', 'hashlib', 'urllib', 'http', 'threading', 'multiprocessing', 'socket', 'asyncio', 'concurrent', 'abc', 'enum', 'dataclasses', 'zipfile', 'datetime', 'calendar', 'copy', 'pickle', 'struct', 'binascii', 'codecs' }: if module not in pre_installed: required_packages.add(module) # Check for usage patterns that need additional packages for pattern, package in usage_patterns.items(): if re.search(pattern, code, re.IGNORECASE) and package not in pre_installed: required_packages.add(package) # Special case: if pandas is imported and Excel operations detected if 'pandas' in code and any(pattern in code for pattern in ['.to_excel', '.read_excel', 'ExcelWriter']): if 'openpyxl' not in pre_installed: required_packages.add('openpyxl') # Remove pre-installed packages required_packages = required_packages - pre_installed return list(required_packages) def install_package(package_name): """Install a package using pip if it's not already installed.""" try: # Special handling for some packages import_name = package_name if package_name == 'opencv-python': import_name = 'cv2' elif package_name == 'scikit-learn': import_name = 'sklearn' elif package_name == 'pillow' or package_name == 'Pillow': import_name = 'PIL' elif package_name == 'beautifulsoup4': import_name = 'bs4' elif package_name == 'psycopg2-binary': import_name = 'psycopg2' else: import_name = package_name.replace('-', '_') spec = importlib.util.find_spec(import_name) if spec is None: print(f"Installing package: {package_name}") result = subprocess.run([ sys.executable, "-m", "pip", "install", "--quiet", "--no-cache-dir", package_name ], capture_output=True, text=True) if result.returncode == 0: print(f"✅ {package_name} installed successfully.") return True else: print(f"❌ Failed to install {package_name}: {result.stderr}") return False else: print(f"✅ {package_name} already installed.") return True except Exception as e: print(f"❌ Error checking/installing {package_name}: {str(e)}") return False def install_packages_if_needed(packages): """Install required packages.""" if not packages: print("No additional packages to install.") return True success_count = 0 failed_packages = [] for package in packages: if package: if install_package(package): success_count += 1 else: failed_packages.append(package) print(f"✅ Installed/checked {success_count}/{len(packages)} packages.") if failed_packages: print(f"⚠️ Failed to install: {', '.join(failed_packages)}") return len(failed_packages) == 0 def download_file_from_url(url: str, temp_dir: str) -> Optional[str]: """Download a file from URL to temp_dir and return local path.""" try: filename = url.split('/')[-1].split('?')[0] or 'downloaded_file' if '.' not in filename: filename += '.txt' # Default extension local_path = os.path.join(temp_dir, filename) response = requests.get(url, stream=True, timeout=30) response.raise_for_status() with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"Downloaded {url} to {local_path}") return local_path except Exception as e: print(f"Failed to download {url}: {e}") return None def generate_code_with_openrouter(instruction, file_paths, previous_errors=None, attempt=1): """Generate Python code using OpenRouter API with error awareness.""" error_context = "" if previous_errors: error_context = "\n\nPREVIOUS ERRORS TO AVOID:\n" for err in previous_errors: error_context += f"- Error type: {err.get('error_type', 'unknown')}\n" error_context += f" Details: {err.get('original_error', '')[:200]}\n" error_context += f" Suggestions: {', '.join(err.get('suggestions', []))}\n" alternative_approaches = "" if attempt > 1: alternative_approaches = f"\n\nThis is attempt {attempt}. Please use a different approach:" if attempt == 2: alternative_approaches += "\n- Use simpler libraries if possible" alternative_approaches += "\n- Add more error handling" alternative_approaches += "\n- Check file paths carefully" elif attempt >= 3: alternative_approaches += "\n- Use only standard library if possible" alternative_approaches += "\n- Implement fallback solutions" alternative_approaches += "\n- Generate mock data if files are problematic" prompt_template = textwrap.dedent(""" You are a Python expert. Instruction: "{instruction}" Input files: {file_paths_str} (use file_paths[0] for first file, iterate for multiple; if empty, generate based on instruction alone). {error_context} {alternative_approaches} Write a complete Python script that: 1. Import all necessary libraries at the top. 2. Add "# pip install package_name" comments after imports for all needed libraries. 3. Define file_paths = {file_paths_list} 4. Add comprehensive error handling with try-except blocks 5. Create output directory if needed using os.makedirs(exist_ok=True) 6. Save output to a temp directory using tempfile.mkdtemp() 7. Print "OUTPUT_FILE_PATH: /full/path/to/output" at the end using os.path.abspath() 8. If file operations fail, try alternative approaches Important rules: - For pandas Excel operations, always add: # pip install openpyxl - Always use absolute paths with os.path.abspath() - Create directories before saving files - Handle common errors (FileNotFoundError, PermissionError, etc.) - If a library fails, try alternatives (e.g., csv instead of pandas) - No __name__ == '__main__', no functions, just direct code - Add detailed error messages Example with error handling: import os import tempfile try: import pandas as pd # pip install pandas openpyxl except ImportError: print("Pandas not available, using csv module") import csv file_paths = {file_paths_list} try: temp_dir = tempfile.mkdtemp() os.makedirs(temp_dir, exist_ok=True) output_path = os.path.join(temp_dir, 'output.xlsx') # Your main logic here with error handling print(f"OUTPUT_FILE_PATH: {{os.path.abspath(output_path)}}") except Exception as e: print(f"ERROR: {{e}}") # Fallback solution try: # Alternative approach pass except: print("All approaches failed") Output ONLY Python code, no markdown. """) file_paths_list = str(file_paths) file_paths_str = ', '.join([os.path.basename(p) if p else 'None' for p in file_paths]) if file_paths else 'None (generate from scratch)' prompt = prompt_template.format( instruction=instruction or "No instruction provided", file_paths_str=file_paths_str, file_paths_list=file_paths_list, error_context=error_context, alternative_approaches=alternative_approaches ) try: response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": "Output only clean executable Python code with comprehensive error handling."}, {"role": "user", "content": prompt} ], max_tokens=4000, temperature=0.1 if attempt == 1 else 0.3 # Increase creativity on retries ) generated_code = response.choices[0].message.content.strip() # Clean code blocks generated_code = re.sub(r'^```python\s*|\s*```$', '', generated_code, flags=re.MULTILINE).strip() return generated_code if generated_code else "import sys\nprint('OUTPUT_TEXT: No code generated')\nsys.exit(0)" except Exception as api_error: error_msg = f"API Error: {api_error}" print(error_msg) # Return simple fallback return """import sys print("OUTPUT_TEXT: Code generation failed due to API error") sys.exit(0)""" def execute_code_with_retry(code: str, max_attempts: int = 3) -> Tuple[bool, str, Optional[str]]: """Execute code with retry logic and error recovery""" tf_path = None attempt = 0 while attempt < max_attempts: attempt += 1 print(f"\n=== Execution attempt {attempt}/{max_attempts} ===") try: # Step 1: Detect and install packages print("Detecting packages...") required_packages = detect_required_packages(code) print("Detected packages:", required_packages) install_packages_if_needed(required_packages) # Step 2: Wrap code indented = indent_code(code) wrapped_code = f"try:\n{indented}\nexcept Exception as e:\n print(f'ERROR: {{e}}')\n import traceback; traceback.print_exc()\n import sys; sys.exit(1)" # Step 3: Compile check print("Compiling code...") try: compile(wrapped_code, '', 'exec') print("Compile OK.") except SyntaxError as se: error_msg = f"Syntax Error: {se}" if attempt < max_attempts: print(f"Syntax error on attempt {attempt}, will regenerate code") return False, error_msg, None else: return False, error_msg, None # Step 4: Create temp file print("Creating temp file...") with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tf: tf.write(wrapped_code) tf_path = tf.name print(f"Temp file: {tf_path}") # Step 5: Execute print(f"Executing...") result = subprocess.run( [sys.executable, tf_path], capture_output=True, text=True, timeout=60 ) stdout, stderr = result.stdout, result.stderr rc = result.returncode # Clean up temp file if tf_path and os.path.exists(tf_path): try: os.unlink(tf_path) except: pass if rc != 0: error_msg = f"Execution failed (RC {rc}):\nStderr: {stderr}" print(error_msg) # Check if we should retry if attempt < max_attempts: # Analyze error for next attempt error_analysis = ErrorAnalyzer.analyze_error(stderr, code) # Try to fix by installing missing packages if error_analysis['packages']: print(f"Attempting to install missing packages: {error_analysis['packages']}") for pkg in error_analysis['packages']: install_package(pkg) return False, stderr, None else: return False, error_msg, None # Extract output output_path_match = re.search(r'OUTPUT_FILE_PATH:\s*(.+)', stdout, re.I) output_text_match = re.search(r'OUTPUT_TEXT:\s*(.+)', stdout, re.I | re.DOTALL) if output_path_match: output_path = output_path_match.group(1).strip() if os.path.exists(output_path): return True, stdout, output_path else: error_msg = f"Output path not found: {output_path}" if attempt < max_attempts: print(error_msg) return False, error_msg, None else: return False, error_msg, None elif output_text_match: return True, output_text_match.group(1).strip(), None else: if stdout.strip(): return True, stdout, None else: return False, "No output generated", None except subprocess.TimeoutExpired: error_msg = "Timeout: Code execution took too long" if attempt < max_attempts: print(error_msg) return False, error_msg, None else: return False, error_msg, None except Exception as e: error_msg = f"Execution error: {str(e)}" if attempt < max_attempts: print(error_msg) return False, error_msg, None else: return False, error_msg, None return False, "Max attempts reached", None def process_request(instruction, files, urls_input): """Main processing function with self-correction, supporting URLs.""" try: if not instruction.strip(): return "لطفاً دستور را وارد کنید. (فایل‌ها و لینک‌ها اختیاری هستند)", None file_paths = [] # Handle uploaded files if files: file_paths = [f.name for f in files] # Handle URLs: download to temp dir if urls_input and urls_input.strip(): temp_dir_for_downloads = tempfile.mkdtemp(prefix='url_downloads_') urls = [url.strip() for url in urls_input.split(',') if url.strip()] downloaded_paths = [] for url in urls: local_path = download_file_from_url(url, temp_dir_for_downloads) if local_path: downloaded_paths.append(local_path) file_paths.extend(downloaded_paths) print(f"Downloaded {len(downloaded_paths)} files from URLs") # Clean up note: Temp dirs will be cleaned on shutdown or manually if needed # Track errors for learning previous_errors = [] generated_codes = [] # Main retry loop for attempt in range(1, 4): # 3 attempts print(f"\n{'='*50}") print(f"MAIN ATTEMPT {attempt}/3") print(f"{'='*50}") # Generate code print("Generating code...") generated_code = generate_code_with_openrouter( instruction, file_paths, previous_errors=previous_errors if attempt > 1 else None, attempt=attempt ) if len(generated_code) < 20: return f"کد ضعیف تولید شد: {generated_code}", None generated_codes.append(generated_code) print(f"Generated code preview: {generated_code[:200]}...") # Try to execute success, output, file_path = execute_code_with_retry(generated_code, max_attempts=2) if success: # Success! result_text = f"✅ Success on attempt {attempt}!\n\n" result_text += f"Generated Code:\n```python\n{generated_code}\n```\n\n" result_text += f"Output:\n{output}" return result_text, file_path else: # Analyze error print(f"\n❌ Attempt {attempt} failed") error_analysis = ErrorAnalyzer.analyze_error(output, generated_code) previous_errors.append(error_analysis) print(f"Error type: {error_analysis['error_type']}") print(f"Suggestions: {', '.join(error_analysis['suggestions'])}") # If this was the last attempt if attempt == 3: error_report = f"❌ Failed after {attempt} attempts.\n\n" error_report += "Error History:\n" for i, err in enumerate(previous_errors, 1): error_report += f"\nAttempt {i}:\n" error_report += f"- Error type: {err['error_type']}\n" error_report += f"- Details: {err['original_error'][:200]}...\n" error_report += f"\n\nLast generated code:\n```python\n{generated_code}\n```" return error_report, None return "Unexpected end of retry loop", None except Exception as e: error_msg = f"General error: {type(e).__name__}: {e}\nFull traceback: {traceback.format_exc()}" print(error_msg) return error_msg, None # Streamlit Interface def main(): st.set_page_config(page_title="AI File Processor - Self Correcting", page_icon="🤖", layout="wide") st.title("🤖 AI File Processor - Self Correcting Edition") st.markdown(""" این سیستم می‌تواند: - کد Python تولید کند - خطاها را تشخیص و تحلیل کند - به طور خودکار مشکلات را برطرف کند - تا 3 بار با رویکردهای مختلف تلاش کند **مثال دستورات:** - "یک فایل اکسل با 1000 نام و شماره تلفن ایرانی بساز" - "پس‌زمینه این تصویر را حذف کن" - "این فایل CSV را به JSON تبدیل کن" **نکته:** می‌توانید فایل‌ها را آپلود کنید یا لینک‌های فایل را (جدا شده با کاما) وارد کنید. """) # Inputs instruction = st.text_area( "دستور", height=100, placeholder="مثال: یک نمودار دایره‌ای از داده‌های فروش بکش" ) col1, col2 = st.columns(2) with col1: uploaded_files = st.file_uploader( "فایل‌های آپلود شده (اختیاری)", accept_multiple_files=True ) with col2: urls_input = st.text_input( "لینک فایل‌ها (جدا با کاما، اختیاری)", placeholder="https://example.com/file1.csv, https://example.com/file2.jpg", help="لینک‌ها دانلود خواهند شد و حجم فایل چک نمی‌شود." ) # Process button if st.button("🚀 اجرا", type="primary"): if not instruction.strip(): st.error("لطفاً دستور را وارد کنید.") else: with st.spinner("در حال پردازش..."): # Convert uploaded files to file-like objects files = [] if uploaded_files: for uploaded_file in uploaded_files: # Save uploaded file to temp location temp_dir = tempfile.mkdtemp() file_path = os.path.join(temp_dir, uploaded_file.name) with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) files.append(type('FileObj', (), {'name': file_path})()) # Process request result_text, output_file = process_request(instruction, files, urls_input) # Display results st.subheader("نتایج") st.text_area("خروجی", result_text, height=300) # Download button for output file if output_file and os.path.exists(output_file): with open(output_file, "rb") as f: st.download_button( label="📥 دانلود فایل خروجی", data=f, file_name=os.path.basename(output_file), mime="application/octet-stream" ) # Examples st.subheader("مثال‌ها") examples = [ ["یک فایل اکسل با 100 محصول فروشگاهی شامل نام، قیمت و موجودی بساز", None, None], ["یک نمودار میله‌ای از داده‌های تصادفی رسم کن", None, None], ["یک تصویر 500x500 پیکسل با رنگ‌های تصادفی بساز", None, None], ["این فایل را به فرمت JSON تبدیل کن", None, "https://example.com/sample.csv"], ] for i, example in enumerate(examples): if st.button(f"مثال {i+1}: {example[0][:30]}..."): st.experimental_set_query_params( instruction=example[0], urls=example[2] if example[2] else "" ) st.experimental_rerun() if __name__ == "__main__": main()