Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import openai | |
| import os | |
| import tempfile | |
| import sys | |
| import io | |
| import subprocess | |
| import importlib.util | |
| import re | |
| from contextlib import redirect_stdout | |
| import textwrap | |
| import shutil | |
| import traceback | |
| import json | |
| from typing import List, Tuple, Optional, Dict | |
| import requests # Added for downloading from URLs | |
| # Clean up any existing temp files on startup to save space | |
| try: | |
| tempdir = tempfile.gettempdir() | |
| for item in os.listdir(tempdir): | |
| item_path = os.path.join(tempdir, item) | |
| if item.startswith('tmp') or item.endswith('.py'): | |
| try: | |
| if os.path.isfile(item_path): | |
| os.unlink(item_path) | |
| elif os.path.isdir(item_path): | |
| shutil.rmtree(item_path) | |
| except: | |
| pass | |
| # Clean pip cache if exists | |
| pip_cache = os.path.expanduser('~/.cache/pip') | |
| if os.path.exists(pip_cache): | |
| try: | |
| shutil.rmtree(pip_cache) | |
| except: | |
| pass | |
| except: | |
| pass | |
| # Use OpenRouter API (OpenAI-compatible) | |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") | |
| client = openai.OpenAI( | |
| api_key=OPENROUTER_API_KEY, | |
| base_url="https://openrouter.ai/api/v1" | |
| ) | |
| MODEL_NAME = "x-ai/grok-4-fast:free" | |
| class ErrorAnalyzer: | |
| """Analyze errors and suggest fixes""" | |
| def analyze_error(error_message: str, code: str) -> Dict: | |
| """Analyze error and return fix strategy""" | |
| error_type = "unknown" | |
| suggestions = [] | |
| packages_to_install = [] | |
| # Import errors | |
| if "No module named" in error_message or "ModuleNotFoundError" in error_message: | |
| error_type = "import_error" | |
| module_match = re.search(r"No module named ['\"]([^'\"]+)['\"]", error_message) | |
| if module_match: | |
| module = module_match.group(1) | |
| packages_to_install.append(module) | |
| suggestions.append(f"Install missing module: {module}") | |
| # Permission errors | |
| elif "Permission denied" in error_message or "PermissionError" in error_message: | |
| error_type = "permission_error" | |
| suggestions.append("Use temp directory for file operations") | |
| suggestions.append("Avoid system directories") | |
| # Memory errors | |
| elif "MemoryError" in error_message or "killed" in error_message.lower(): | |
| error_type = "memory_error" | |
| suggestions.append("Reduce data size or use chunks") | |
| suggestions.append("Process data in smaller batches") | |
| # File not found | |
| elif "FileNotFoundError" in error_message or "No such file" in error_message: | |
| error_type = "file_error" | |
| suggestions.append("Check file paths") | |
| suggestions.append("Create directory if needed") | |
| # Syntax errors | |
| elif "SyntaxError" in error_message: | |
| error_type = "syntax_error" | |
| suggestions.append("Fix syntax issues") | |
| suggestions.append("Check indentation") | |
| # Attribute errors | |
| elif "AttributeError" in error_message: | |
| error_type = "attribute_error" | |
| suggestions.append("Check method/attribute names") | |
| suggestions.append("Verify object types") | |
| # Type errors | |
| elif "TypeError" in error_message: | |
| error_type = "type_error" | |
| suggestions.append("Check data types") | |
| suggestions.append("Add type conversions") | |
| # Value errors | |
| elif "ValueError" in error_message: | |
| error_type = "value_error" | |
| suggestions.append("Validate input data") | |
| suggestions.append("Add error handling") | |
| # Network errors | |
| elif "URLError" in error_message or "ConnectionError" in error_message: | |
| error_type = "network_error" | |
| suggestions.append("Check internet connection") | |
| suggestions.append("Add retry logic") | |
| # Package specific errors | |
| if "openpyxl" in error_message or "xlrd" in error_message: | |
| packages_to_install.append("openpyxl") | |
| suggestions.append("Install Excel support: openpyxl") | |
| if "PIL" in error_message or "Pillow" in error_message: | |
| packages_to_install.append("Pillow") | |
| suggestions.append("Install image processing: Pillow") | |
| return { | |
| "error_type": error_type, | |
| "suggestions": suggestions, | |
| "packages": packages_to_install, | |
| "original_error": error_message | |
| } | |
| def indent_code(code, spaces=4): | |
| """Indent the code by the specified number of spaces.""" | |
| indented_lines = [] | |
| for line in code.split('\n'): | |
| if line.strip(): # Only indent non-empty lines | |
| indented_lines.append(' ' * spaces + line) | |
| else: | |
| indented_lines.append(line) | |
| return '\n'.join(indented_lines) | |
| def detect_required_packages(code): | |
| """Detect required packages from Python code (optimized for accuracy).""" | |
| required_packages = set() | |
| # Pre-installed packages from requirements.txt | |
| pre_installed = { | |
| 'gradio', 'openai', 'pillow', 'rembg', 'numpy', 'opencv-python', 'scikit-learn', | |
| 'tensorflow', 'torch', 'lxml', 'requests', 'matplotlib', 'seaborn', 'onnxruntime', | |
| 'proglog', 'openpyxl', 'moviepy' | |
| } | |
| # Import patterns | |
| import_patterns = [ | |
| r'^(?:import|from)\s+(\w+)(?:\.\w+)*', | |
| ] | |
| # Patterns for pip install in code/comments | |
| pip_patterns = [ | |
| r'#?\s*pip\s+install\s+([^\s#]+)', | |
| r'#?\s*install\s+([^\s#]+)' | |
| ] | |
| # Usage patterns that require additional packages | |
| usage_patterns = { | |
| r'\.to_excel': 'openpyxl', | |
| r'\.read_excel': 'openpyxl', | |
| r'\.ExcelWriter': 'openpyxl', | |
| r'\.to_parquet': 'pyarrow', | |
| r'\.read_parquet': 'pyarrow', | |
| r'\.to_sql': 'sqlalchemy', | |
| r'\.read_sql': 'sqlalchemy', | |
| r'\.to_feather': 'pyarrow', | |
| r'\.read_feather': 'pyarrow', | |
| r'\.to_stata': 'statsmodels', | |
| r'\.read_stata': 'statsmodels', | |
| r'\.to_clipboard': 'pyperclip', | |
| r'xlsxwriter': 'xlsxwriter', | |
| r'openpyxl': 'openpyxl', | |
| } | |
| # Check for pip install comments | |
| for pattern in pip_patterns: | |
| matches = re.findall(pattern, code, re.IGNORECASE | re.MULTILINE) | |
| for match in matches: | |
| if match and not match.startswith(('"', "'")): | |
| pkg = match.split('==')[0].split('>')[0].split('<')[0].strip() | |
| if pkg and pkg not in pre_installed: | |
| required_packages.add(pkg) | |
| # Check imports with mapping | |
| for line in code.split('\n'): | |
| line = line.strip() | |
| if line.startswith(('import ', 'from ')): | |
| match = re.search(import_patterns[0], line) | |
| if match: | |
| module = match.group(1) | |
| if module and module not in {'__future__', 'typing'}: | |
| module_map = { | |
| 'cv2': 'opencv-python', | |
| 'sklearn': 'scikit-learn', | |
| 'tf': 'tensorflow', | |
| 'torch': 'torch', | |
| 'numpy': 'numpy', | |
| 'pandas': 'pandas', | |
| 'PIL': 'Pillow', | |
| 'pillow': 'Pillow', | |
| 'matplotlib': 'matplotlib', | |
| 'seaborn': 'seaborn', | |
| 'onnxruntime': 'onnxruntime', | |
| 'rembg': 'rembg', | |
| 'requests': 'requests', | |
| 'openpyxl': 'openpyxl', | |
| 'xlrd': 'xlrd', | |
| 'xlwt': 'xlwt', | |
| 'xlsxwriter': 'xlsxwriter', | |
| 'pyarrow': 'pyarrow', | |
| 'sqlalchemy': 'sqlalchemy', | |
| 'psycopg2': 'psycopg2-binary', | |
| 'pymongo': 'pymongo', | |
| 'redis': 'redis', | |
| 'beautifulsoup4': 'beautifulsoup4', | |
| 'bs4': 'beautifulsoup4', | |
| 'scrapy': 'scrapy', | |
| 'selenium': 'selenium', | |
| 'flask': 'flask', | |
| 'fastapi': 'fastapi', | |
| 'django': 'django', | |
| 'pytest': 'pytest', | |
| 'hypothesis': 'hypothesis', | |
| 'faker': 'faker', | |
| } | |
| if module in module_map: | |
| pkg = module_map[module] | |
| if pkg not in pre_installed: | |
| required_packages.add(pkg) | |
| elif module not in { | |
| 'os', 'sys', 'io', 'json', 're', 'time', 'math', 'random', 'collections', | |
| 'itertools', 'functools', 'operator', 'string', 'pathlib', 'tempfile', | |
| 'subprocess', 'logging', 'argparse', 'csv', 'xml', 'html', 'base64', | |
| 'hashlib', 'urllib', 'http', 'threading', 'multiprocessing', 'socket', | |
| 'asyncio', 'concurrent', 'abc', 'enum', 'dataclasses', 'zipfile', | |
| 'datetime', 'calendar', 'copy', 'pickle', 'struct', 'binascii', 'codecs' | |
| }: | |
| if module not in pre_installed: | |
| required_packages.add(module) | |
| # Check for usage patterns that need additional packages | |
| for pattern, package in usage_patterns.items(): | |
| if re.search(pattern, code, re.IGNORECASE) and package not in pre_installed: | |
| required_packages.add(package) | |
| # Special case: if pandas is imported and Excel operations detected | |
| if 'pandas' in code and any(pattern in code for pattern in ['.to_excel', '.read_excel', 'ExcelWriter']): | |
| if 'openpyxl' not in pre_installed: | |
| required_packages.add('openpyxl') | |
| # Remove pre-installed packages | |
| required_packages = required_packages - pre_installed | |
| return list(required_packages) | |
| def install_package(package_name): | |
| """Install a package using pip if it's not already installed.""" | |
| try: | |
| # Special handling for some packages | |
| import_name = package_name | |
| if package_name == 'opencv-python': | |
| import_name = 'cv2' | |
| elif package_name == 'scikit-learn': | |
| import_name = 'sklearn' | |
| elif package_name == 'pillow' or package_name == 'Pillow': | |
| import_name = 'PIL' | |
| elif package_name == 'beautifulsoup4': | |
| import_name = 'bs4' | |
| elif package_name == 'psycopg2-binary': | |
| import_name = 'psycopg2' | |
| else: | |
| import_name = package_name.replace('-', '_') | |
| spec = importlib.util.find_spec(import_name) | |
| if spec is None: | |
| print(f"Installing package: {package_name}") | |
| result = subprocess.run([ | |
| sys.executable, "-m", "pip", "install", "--quiet", "--no-cache-dir", package_name | |
| ], capture_output=True, text=True) | |
| if result.returncode == 0: | |
| print(f"✅ {package_name} installed successfully.") | |
| return True | |
| else: | |
| print(f"❌ Failed to install {package_name}: {result.stderr}") | |
| return False | |
| else: | |
| print(f"✅ {package_name} already installed.") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error checking/installing {package_name}: {str(e)}") | |
| return False | |
| def install_packages_if_needed(packages): | |
| """Install required packages.""" | |
| if not packages: | |
| print("No additional packages to install.") | |
| return True | |
| success_count = 0 | |
| failed_packages = [] | |
| for package in packages: | |
| if package: | |
| if install_package(package): | |
| success_count += 1 | |
| else: | |
| failed_packages.append(package) | |
| print(f"✅ Installed/checked {success_count}/{len(packages)} packages.") | |
| if failed_packages: | |
| print(f"⚠️ Failed to install: {', '.join(failed_packages)}") | |
| return len(failed_packages) == 0 | |
| def download_file_from_url(url: str, temp_dir: str) -> Optional[str]: | |
| """Download a file from URL to temp_dir and return local path.""" | |
| try: | |
| filename = url.split('/')[-1].split('?')[0] or 'downloaded_file' | |
| if '.' not in filename: | |
| filename += '.txt' # Default extension | |
| local_path = os.path.join(temp_dir, filename) | |
| response = requests.get(url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| with open(local_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"Downloaded {url} to {local_path}") | |
| return local_path | |
| except Exception as e: | |
| print(f"Failed to download {url}: {e}") | |
| return None | |
| def generate_code_with_openrouter(instruction, file_paths, previous_errors=None, attempt=1): | |
| """Generate Python code using OpenRouter API with error awareness.""" | |
| error_context = "" | |
| if previous_errors: | |
| error_context = "\n\nPREVIOUS ERRORS TO AVOID:\n" | |
| for err in previous_errors: | |
| error_context += f"- Error type: {err.get('error_type', 'unknown')}\n" | |
| error_context += f" Details: {err.get('original_error', '')[:200]}\n" | |
| error_context += f" Suggestions: {', '.join(err.get('suggestions', []))}\n" | |
| alternative_approaches = "" | |
| if attempt > 1: | |
| alternative_approaches = f"\n\nThis is attempt {attempt}. Please use a different approach:" | |
| if attempt == 2: | |
| alternative_approaches += "\n- Use simpler libraries if possible" | |
| alternative_approaches += "\n- Add more error handling" | |
| alternative_approaches += "\n- Check file paths carefully" | |
| elif attempt >= 3: | |
| alternative_approaches += "\n- Use only standard library if possible" | |
| alternative_approaches += "\n- Implement fallback solutions" | |
| alternative_approaches += "\n- Generate mock data if files are problematic" | |
| prompt_template = textwrap.dedent(""" | |
| You are a Python expert. Instruction: "{instruction}" | |
| Input files: {file_paths_str} (use file_paths[0] for first file, iterate for multiple; if empty, generate based on instruction alone). | |
| {error_context} | |
| {alternative_approaches} | |
| Write a complete Python script that: | |
| 1. Import all necessary libraries at the top. | |
| 2. Add "# pip install package_name" comments after imports for all needed libraries. | |
| 3. Define file_paths = {file_paths_list} | |
| 4. Add comprehensive error handling with try-except blocks | |
| 5. Create output directory if needed using os.makedirs(exist_ok=True) | |
| 6. Save output to a temp directory using tempfile.mkdtemp() | |
| 7. Print "OUTPUT_FILE_PATH: /full/path/to/output" at the end using os.path.abspath() | |
| 8. If file operations fail, try alternative approaches | |
| Important rules: | |
| - For pandas Excel operations, always add: # pip install openpyxl | |
| - Always use absolute paths with os.path.abspath() | |
| - Create directories before saving files | |
| - Handle common errors (FileNotFoundError, PermissionError, etc.) | |
| - If a library fails, try alternatives (e.g., csv instead of pandas) | |
| - No __name__ == '__main__', no functions, just direct code | |
| - Add detailed error messages | |
| Example with error handling: | |
| import os | |
| import tempfile | |
| try: | |
| import pandas as pd | |
| # pip install pandas openpyxl | |
| except ImportError: | |
| print("Pandas not available, using csv module") | |
| import csv | |
| file_paths = {file_paths_list} | |
| try: | |
| temp_dir = tempfile.mkdtemp() | |
| os.makedirs(temp_dir, exist_ok=True) | |
| output_path = os.path.join(temp_dir, 'output.xlsx') | |
| # Your main logic here with error handling | |
| print(f"OUTPUT_FILE_PATH: {{os.path.abspath(output_path)}}") | |
| except Exception as e: | |
| print(f"ERROR: {{e}}") | |
| # Fallback solution | |
| try: | |
| # Alternative approach | |
| pass | |
| except: | |
| print("All approaches failed") | |
| Output ONLY Python code, no markdown. | |
| """) | |
| file_paths_list = str(file_paths) | |
| file_paths_str = ', '.join([os.path.basename(p) if p else 'None' for p in file_paths]) if file_paths else 'None (generate from scratch)' | |
| prompt = prompt_template.format( | |
| instruction=instruction or "No instruction provided", | |
| file_paths_str=file_paths_str, | |
| file_paths_list=file_paths_list, | |
| error_context=error_context, | |
| alternative_approaches=alternative_approaches | |
| ) | |
| try: | |
| response = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": "Output only clean executable Python code with comprehensive error handling."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=4000, | |
| temperature=0.1 if attempt == 1 else 0.3 # Increase creativity on retries | |
| ) | |
| generated_code = response.choices[0].message.content.strip() | |
| # Clean code blocks | |
| generated_code = re.sub(r'^```python\s*|\s*```$', '', generated_code, flags=re.MULTILINE).strip() | |
| return generated_code if generated_code else "import sys\nprint('OUTPUT_TEXT: No code generated')\nsys.exit(0)" | |
| except Exception as api_error: | |
| error_msg = f"API Error: {api_error}" | |
| print(error_msg) | |
| # Return simple fallback | |
| return """import sys | |
| print("OUTPUT_TEXT: Code generation failed due to API error") | |
| sys.exit(0)""" | |
| def execute_code_with_retry(code: str, max_attempts: int = 3) -> Tuple[bool, str, Optional[str]]: | |
| """Execute code with retry logic and error recovery""" | |
| tf_path = None | |
| attempt = 0 | |
| while attempt < max_attempts: | |
| attempt += 1 | |
| print(f"\n=== Execution attempt {attempt}/{max_attempts} ===") | |
| try: | |
| # Step 1: Detect and install packages | |
| print("Detecting packages...") | |
| required_packages = detect_required_packages(code) | |
| print("Detected packages:", required_packages) | |
| install_packages_if_needed(required_packages) | |
| # Step 2: Wrap code | |
| indented = indent_code(code) | |
| wrapped_code = f"try:\n{indented}\nexcept Exception as e:\n print(f'ERROR: {{e}}')\n import traceback; traceback.print_exc()\n import sys; sys.exit(1)" | |
| # Step 3: Compile check | |
| print("Compiling code...") | |
| try: | |
| compile(wrapped_code, '<string>', 'exec') | |
| print("Compile OK.") | |
| except SyntaxError as se: | |
| error_msg = f"Syntax Error: {se}" | |
| if attempt < max_attempts: | |
| print(f"Syntax error on attempt {attempt}, will regenerate code") | |
| return False, error_msg, None | |
| else: | |
| return False, error_msg, None | |
| # Step 4: Create temp file | |
| print("Creating temp file...") | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tf: | |
| tf.write(wrapped_code) | |
| tf_path = tf.name | |
| print(f"Temp file: {tf_path}") | |
| # Step 5: Execute | |
| print(f"Executing...") | |
| result = subprocess.run( | |
| [sys.executable, tf_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=60 | |
| ) | |
| stdout, stderr = result.stdout, result.stderr | |
| rc = result.returncode | |
| # Clean up temp file | |
| if tf_path and os.path.exists(tf_path): | |
| try: | |
| os.unlink(tf_path) | |
| except: | |
| pass | |
| if rc != 0: | |
| error_msg = f"Execution failed (RC {rc}):\nStderr: {stderr}" | |
| print(error_msg) | |
| # Check if we should retry | |
| if attempt < max_attempts: | |
| # Analyze error for next attempt | |
| error_analysis = ErrorAnalyzer.analyze_error(stderr, code) | |
| # Try to fix by installing missing packages | |
| if error_analysis['packages']: | |
| print(f"Attempting to install missing packages: {error_analysis['packages']}") | |
| for pkg in error_analysis['packages']: | |
| install_package(pkg) | |
| return False, stderr, None | |
| else: | |
| return False, error_msg, None | |
| # Extract output | |
| output_path_match = re.search(r'OUTPUT_FILE_PATH:\s*(.+)', stdout, re.I) | |
| output_text_match = re.search(r'OUTPUT_TEXT:\s*(.+)', stdout, re.I | re.DOTALL) | |
| if output_path_match: | |
| output_path = output_path_match.group(1).strip() | |
| if os.path.exists(output_path): | |
| return True, stdout, output_path | |
| else: | |
| error_msg = f"Output path not found: {output_path}" | |
| if attempt < max_attempts: | |
| print(error_msg) | |
| return False, error_msg, None | |
| else: | |
| return False, error_msg, None | |
| elif output_text_match: | |
| return True, output_text_match.group(1).strip(), None | |
| else: | |
| if stdout.strip(): | |
| return True, stdout, None | |
| else: | |
| return False, "No output generated", None | |
| except subprocess.TimeoutExpired: | |
| error_msg = "Timeout: Code execution took too long" | |
| if attempt < max_attempts: | |
| print(error_msg) | |
| return False, error_msg, None | |
| else: | |
| return False, error_msg, None | |
| except Exception as e: | |
| error_msg = f"Execution error: {str(e)}" | |
| if attempt < max_attempts: | |
| print(error_msg) | |
| return False, error_msg, None | |
| else: | |
| return False, error_msg, None | |
| return False, "Max attempts reached", None | |
| def process_request(instruction, files, urls_input): | |
| """Main processing function with self-correction, supporting URLs.""" | |
| try: | |
| if not instruction.strip(): | |
| return "لطفاً دستور را وارد کنید. (فایلها و لینکها اختیاری هستند)", None | |
| file_paths = [] | |
| # Handle uploaded files | |
| if files: | |
| file_paths = [f.name for f in files] | |
| # Handle URLs: download to temp dir | |
| if urls_input and urls_input.strip(): | |
| temp_dir_for_downloads = tempfile.mkdtemp(prefix='url_downloads_') | |
| urls = [url.strip() for url in urls_input.split(',') if url.strip()] | |
| downloaded_paths = [] | |
| for url in urls: | |
| local_path = download_file_from_url(url, temp_dir_for_downloads) | |
| if local_path: | |
| downloaded_paths.append(local_path) | |
| file_paths.extend(downloaded_paths) | |
| print(f"Downloaded {len(downloaded_paths)} files from URLs") | |
| # Clean up note: Temp dirs will be cleaned on shutdown or manually if needed | |
| # Track errors for learning | |
| previous_errors = [] | |
| generated_codes = [] | |
| # Main retry loop | |
| for attempt in range(1, 4): # 3 attempts | |
| print(f"\n{'='*50}") | |
| print(f"MAIN ATTEMPT {attempt}/3") | |
| print(f"{'='*50}") | |
| # Generate code | |
| print("Generating code...") | |
| generated_code = generate_code_with_openrouter( | |
| instruction, | |
| file_paths, | |
| previous_errors=previous_errors if attempt > 1 else None, | |
| attempt=attempt | |
| ) | |
| if len(generated_code) < 20: | |
| return f"کد ضعیف تولید شد: {generated_code}", None | |
| generated_codes.append(generated_code) | |
| print(f"Generated code preview: {generated_code[:200]}...") | |
| # Try to execute | |
| success, output, file_path = execute_code_with_retry(generated_code, max_attempts=2) | |
| if success: | |
| # Success! | |
| result_text = f"✅ Success on attempt {attempt}!\n\n" | |
| result_text += f"Generated Code:\n```python\n{generated_code}\n```\n\n" | |
| result_text += f"Output:\n{output}" | |
| return result_text, file_path | |
| else: | |
| # Analyze error | |
| print(f"\n❌ Attempt {attempt} failed") | |
| error_analysis = ErrorAnalyzer.analyze_error(output, generated_code) | |
| previous_errors.append(error_analysis) | |
| print(f"Error type: {error_analysis['error_type']}") | |
| print(f"Suggestions: {', '.join(error_analysis['suggestions'])}") | |
| # If this was the last attempt | |
| if attempt == 3: | |
| error_report = f"❌ Failed after {attempt} attempts.\n\n" | |
| error_report += "Error History:\n" | |
| for i, err in enumerate(previous_errors, 1): | |
| error_report += f"\nAttempt {i}:\n" | |
| error_report += f"- Error type: {err['error_type']}\n" | |
| error_report += f"- Details: {err['original_error'][:200]}...\n" | |
| error_report += f"\n\nLast generated code:\n```python\n{generated_code}\n```" | |
| return error_report, None | |
| return "Unexpected end of retry loop", None | |
| except Exception as e: | |
| error_msg = f"General error: {type(e).__name__}: {e}\nFull traceback: {traceback.format_exc()}" | |
| print(error_msg) | |
| return error_msg, None | |
| # Streamlit Interface | |
| def main(): | |
| st.set_page_config(page_title="AI File Processor - Self Correcting", page_icon="🤖", layout="wide") | |
| st.title("🤖 AI File Processor - Self Correcting Edition") | |
| st.markdown(""" | |
| این سیستم میتواند: | |
| - کد Python تولید کند | |
| - خطاها را تشخیص و تحلیل کند | |
| - به طور خودکار مشکلات را برطرف کند | |
| - تا 3 بار با رویکردهای مختلف تلاش کند | |
| **مثال دستورات:** | |
| - "یک فایل اکسل با 1000 نام و شماره تلفن ایرانی بساز" | |
| - "پسزمینه این تصویر را حذف کن" | |
| - "این فایل CSV را به JSON تبدیل کن" | |
| **نکته:** میتوانید فایلها را آپلود کنید یا لینکهای فایل را (جدا شده با کاما) وارد کنید. | |
| """) | |
| # Inputs | |
| instruction = st.text_area( | |
| "دستور", | |
| height=100, | |
| placeholder="مثال: یک نمودار دایرهای از دادههای فروش بکش" | |
| ) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| uploaded_files = st.file_uploader( | |
| "فایلهای آپلود شده (اختیاری)", | |
| accept_multiple_files=True | |
| ) | |
| with col2: | |
| urls_input = st.text_input( | |
| "لینک فایلها (جدا با کاما، اختیاری)", | |
| placeholder="https://example.com/file1.csv, https://example.com/file2.jpg", | |
| help="لینکها دانلود خواهند شد و حجم فایل چک نمیشود." | |
| ) | |
| # Process button | |
| if st.button("🚀 اجرا", type="primary"): | |
| if not instruction.strip(): | |
| st.error("لطفاً دستور را وارد کنید.") | |
| else: | |
| with st.spinner("در حال پردازش..."): | |
| # Convert uploaded files to file-like objects | |
| files = [] | |
| if uploaded_files: | |
| for uploaded_file in uploaded_files: | |
| # Save uploaded file to temp location | |
| temp_dir = tempfile.mkdtemp() | |
| file_path = os.path.join(temp_dir, uploaded_file.name) | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| files.append(type('FileObj', (), {'name': file_path})()) | |
| # Process request | |
| result_text, output_file = process_request(instruction, files, urls_input) | |
| # Display results | |
| st.subheader("نتایج") | |
| st.text_area("خروجی", result_text, height=300) | |
| # Download button for output file | |
| if output_file and os.path.exists(output_file): | |
| with open(output_file, "rb") as f: | |
| st.download_button( | |
| label="📥 دانلود فایل خروجی", | |
| data=f, | |
| file_name=os.path.basename(output_file), | |
| mime="application/octet-stream" | |
| ) | |
| # Examples | |
| st.subheader("مثالها") | |
| examples = [ | |
| ["یک فایل اکسل با 100 محصول فروشگاهی شامل نام، قیمت و موجودی بساز", None, None], | |
| ["یک نمودار میلهای از دادههای تصادفی رسم کن", None, None], | |
| ["یک تصویر 500x500 پیکسل با رنگهای تصادفی بساز", None, None], | |
| ["این فایل را به فرمت JSON تبدیل کن", None, "https://example.com/sample.csv"], | |
| ] | |
| for i, example in enumerate(examples): | |
| if st.button(f"مثال {i+1}: {example[0][:30]}..."): | |
| st.experimental_set_query_params( | |
| instruction=example[0], | |
| urls=example[2] if example[2] else "" | |
| ) | |
| st.experimental_rerun() | |
| if __name__ == "__main__": | |
| main() |