Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """Test database operations and frontend-backend connectivity""" | |
| import sys | |
| import os | |
| # Add the parent directory to the path so we can import app modules | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) | |
| import requests | |
| import time | |
| from app.database import SessionLocal | |
| from app import crud, models | |
| def test_frontend_backend_connection(): | |
| """Test if frontend and backend are properly connected""" | |
| print("=== Testing Frontend-Backend Connection ===") | |
| # Test 1: Check if backend is accessible from frontend perspective | |
| print("1. Testing backend accessibility...") | |
| try: | |
| response = requests.get('http://localhost:8080/api/images/', timeout=10) | |
| print(f" Backend status: {response.status_code}") | |
| if response.status_code == 200: | |
| print(" β Backend is accessible!") | |
| data = response.json() | |
| print(f" β Found {len(data)} images in database") | |
| else: | |
| print(f" β Backend not accessible: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| print(f" β Cannot connect to backend: {e}") | |
| return False | |
| # Test 2: Check if frontend proxy is working (simulate frontend request) | |
| print("\n2. Testing frontend proxy connection...") | |
| try: | |
| # Simulate frontend request with proper headers | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'Accept': 'application/json', | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| response = requests.get('http://localhost:8080/api/images/', headers=headers, timeout=10) | |
| print(f" Frontend proxy status: {response.status_code}") | |
| if response.status_code == 200: | |
| print(" β Frontend proxy is working!") | |
| else: | |
| print(f" β Frontend proxy failed: {response.status_code}") | |
| except Exception as e: | |
| print(f" β Frontend proxy error: {e}") | |
| return True | |
| def test_database_save(): | |
| """Test if data is actually being saved to the database during upload""" | |
| print("\n=== Testing Database Save ===") | |
| # Create test image | |
| test_content = b"fake image data for database test" | |
| # Step 1: Check initial database state | |
| print("1. Checking initial database state...") | |
| db = SessionLocal() | |
| try: | |
| initial_images = db.query(models.Images).count() | |
| initial_captions = db.query(models.Captions).count() | |
| print(f" Initial images: {initial_images}") | |
| print(f" Initial captions: {initial_captions}") | |
| finally: | |
| db.close() | |
| # Step 2: Upload image via API | |
| print("2. Uploading image via API...") | |
| files = {'file': ('db_test.jpg', test_content, 'image/jpeg')} | |
| data = { | |
| 'source': 'OTHER', | |
| 'type': 'OTHER', | |
| 'countries': ['US'], | |
| 'epsg': '4326', | |
| 'image_type': 'crisis_map' | |
| } | |
| try: | |
| upload_response = requests.post('http://localhost:8080/api/images/', files=files, data=data, timeout=30) | |
| print(f" Upload status: {upload_response.status_code}") | |
| if upload_response.status_code == 200: | |
| upload_result = upload_response.json() | |
| image_id = upload_result.get('image_id') | |
| print(f" β Upload successful! Image ID: {image_id}") | |
| # Step 3: Check if image is in database immediately after upload | |
| print("3. Checking if image is saved to database...") | |
| db = SessionLocal() | |
| try: | |
| db_image = db.query(models.Images).filter(models.Images.image_id == image_id).first() | |
| if db_image: | |
| print(f" β Image found in database!") | |
| print(f" β Source: {db_image.source}") | |
| print(f" β Type: {db_image.type}") | |
| print(f" β File key: {db_image.file_key}") | |
| else: | |
| print(f" β ERROR: Image NOT found in database!") | |
| print(f" β This means the upload failed to save to database!") | |
| return False | |
| finally: | |
| db.close() | |
| # Step 4: Create caption via API | |
| print("4. Creating caption via API...") | |
| caption_data = { | |
| 'title': 'Database Test Caption', | |
| 'prompt': 'Describe this test image' | |
| } | |
| caption_response = requests.post( | |
| f'http://localhost:8080/api/images/{image_id}/caption', | |
| data=caption_data | |
| ) | |
| print(f" Caption creation status: {caption_response.status_code}") | |
| if caption_response.status_code == 200: | |
| caption_result = caption_response.json() | |
| caption_id = caption_result.get('cap_id') | |
| print(f" β Caption created! Caption ID: {caption_id}") | |
| # Step 5: Check if caption is in database | |
| print("5. Checking if caption is saved to database...") | |
| db = SessionLocal() | |
| try: | |
| db_caption = db.query(models.Captions).filter(models.Captions.cap_id == caption_id).first() | |
| if db_caption: | |
| print(f" β Caption found in database!") | |
| print(f" β Title: {db_caption.title}") | |
| print(f" β Generated: {db_caption.generated[:50]}...") | |
| else: | |
| print(f" β ERROR: Caption NOT found in database!") | |
| print(f" β This means caption creation failed to save to database!") | |
| return False | |
| finally: | |
| db.close() | |
| # Step 6: Submit caption via API | |
| print("6. Submitting caption via API...") | |
| submit_data = { | |
| 'edited': 'This is a test caption for database save test.', | |
| 'accuracy': 85, | |
| 'context': 90, | |
| 'usability': 80 | |
| } | |
| submit_response = requests.put( | |
| f'http://localhost:8080/api/captions/{caption_id}', | |
| json=submit_data | |
| ) | |
| print(f" Submit status: {submit_response.status_code}") | |
| if submit_response.status_code == 200: | |
| print(" β Caption submitted successfully!") | |
| # Step 7: Check if submitted data is in database | |
| print("7. Checking if submitted data is saved to database...") | |
| db = SessionLocal() | |
| try: | |
| db_caption = db.query(models.Captions).filter(models.Captions.cap_id == caption_id).first() | |
| if db_caption: | |
| print(f" β Submitted caption found in database!") | |
| print(f" β Edited: {db_caption.edited}") | |
| print(f" β Accuracy: {db_caption.accuracy}") | |
| print(f" β Context: {db_caption.context}") | |
| print(f" β Usability: {db_caption.usability}") | |
| else: | |
| print(f" β ERROR: Submitted caption NOT found in database!") | |
| return False | |
| finally: | |
| db.close() | |
| # Clean up | |
| print("8. Cleaning up...") | |
| delete_response = requests.delete(f'http://localhost:8080/api/images/{image_id}', timeout=10) | |
| print(f" Delete status: {delete_response.status_code}") | |
| else: | |
| print(f" β ERROR: Caption submission failed: {submit_response.text}") | |
| else: | |
| print(f" β ERROR: Caption creation failed: {caption_response.text}") | |
| else: | |
| print(f" β ERROR: Upload failed: {upload_response.text}") | |
| except Exception as e: | |
| print(f" β ERROR: Database save test failed: {e}") | |
| def test_submit_flow(): | |
| """Test the complete submit flow like the frontend does""" | |
| print("\n=== Testing Complete Submit Flow ===") | |
| # Step 1: Upload image (like frontend handleGenerate) | |
| print("1. Uploading image (simulating frontend handleGenerate)...") | |
| test_content = b"submit test image data" | |
| files = {'file': ('submit_test.jpg', test_content, 'image/jpeg')} | |
| data = { | |
| 'source': 'OTHER', | |
| 'type': 'OTHER', | |
| 'countries': ['US'], | |
| 'epsg': '4326', | |
| 'image_type': 'crisis_map' | |
| } | |
| try: | |
| upload_response = requests.post('http://localhost:8080/api/images/', files=files, data=data, timeout=30) | |
| print(f" Upload status: {upload_response.status_code}") | |
| if upload_response.status_code == 200: | |
| upload_result = upload_response.json() | |
| image_id = upload_result.get('image_id') | |
| print(f" β Upload successful! Image ID: {image_id}") | |
| # Step 2: Create caption (like frontend handleGenerate) | |
| print("2. Creating caption (simulating frontend handleGenerate)...") | |
| caption_data = { | |
| 'title': 'Submit Test Caption', | |
| 'prompt': 'Describe this test image for submit flow' | |
| } | |
| caption_response = requests.post( | |
| f'http://localhost:8080/api/images/{image_id}/caption', | |
| data=caption_data | |
| ) | |
| print(f" Caption creation status: {caption_response.status_code}") | |
| if caption_response.status_code == 200: | |
| caption_result = caption_response.json() | |
| caption_id = caption_result.get('cap_id') | |
| print(f" β Caption created! Caption ID: {caption_id}") | |
| # Step 3: Submit caption (like frontend handleSubmit) | |
| print("3. Submitting caption (simulating frontend handleSubmit)...") | |
| submit_data = { | |
| 'edited': 'This is a test caption submitted via the submit flow.', | |
| 'accuracy': 85, | |
| 'context': 90, | |
| 'usability': 80 | |
| } | |
| submit_response = requests.put( | |
| f'http://localhost:8080/api/captions/{caption_id}', | |
| json=submit_data | |
| ) | |
| print(f" Submit status: {submit_response.status_code}") | |
| if submit_response.status_code == 200: | |
| print(" β Caption submitted successfully!") | |
| # Step 4: Verify data is saved and accessible | |
| print("4. Verifying data is saved and accessible...") | |
| time.sleep(1) | |
| # Check via API | |
| list_response = requests.get('http://localhost:8080/api/images/', timeout=10) | |
| if list_response.status_code == 200: | |
| images = list_response.json() | |
| submitted_image = next((img for img in images if img.get('image_id') == image_id), None) | |
| if submitted_image: | |
| print(" β SUCCESS: Submitted image found via API!") | |
| caption = submitted_image.get('caption') | |
| if caption: | |
| print(f" β Caption edited: {caption.get('edited', 'N/A')}") | |
| print(f" β Accuracy: {caption.get('accuracy', 'N/A')}") | |
| else: | |
| print(" β ERROR: Caption not found in API response!") | |
| else: | |
| print(" β ERROR: Submitted image NOT found via API!") | |
| # Check database directly | |
| db = SessionLocal() | |
| try: | |
| db_image = db.query(models.Images).filter(models.Images.image_id == image_id).first() | |
| if db_image: | |
| print(" β SUCCESS: Submitted image found in database!") | |
| if db_image.caption: | |
| print(f" β Caption edited: {db_image.caption.edited}") | |
| print(f" β Accuracy: {db_image.caption.accuracy}") | |
| else: | |
| print(" β ERROR: Caption not found in database!") | |
| else: | |
| print(" β ERROR: Submitted image NOT found in database!") | |
| finally: | |
| db.close() | |
| # Clean up | |
| print("5. Cleaning up...") | |
| delete_response = requests.delete(f'http://localhost:8080/api/images/{image_id}', timeout=10) | |
| print(f" Delete status: {delete_response.status_code}") | |
| else: | |
| print(f" β ERROR: Caption submission failed: {submit_response.text}") | |
| else: | |
| print(f" β ERROR: Caption creation failed: {caption_response.text}") | |
| else: | |
| print(f" β ERROR: Upload failed: {upload_response.text}") | |
| except Exception as e: | |
| print(f" β ERROR: Submit flow test failed: {e}") | |
| def check_caption_exists(): | |
| """Check if a specific caption exists in the database""" | |
| print("\n=== Checking Specific Caption ===") | |
| caption_id = "a57497fe-1583-4cec-8b34-a94bb229e3ea" | |
| print(f"Checking if caption {caption_id} exists...") | |
| db = SessionLocal() | |
| try: | |
| caption = db.query(models.Captions).filter(models.Captions.cap_id == caption_id).first() | |
| if caption: | |
| print(f"β Caption found!") | |
| print(f" Image ID: {caption.image_id}") | |
| print(f" Title: {caption.title}") | |
| print(f" Generated: {caption.generated[:50]}...") | |
| else: | |
| print(f"β Caption NOT found!") | |
| # Check if the image exists | |
| all_captions = db.query(models.Captions).all() | |
| print(f"Total captions: {len(all_captions)}") | |
| if all_captions: | |
| print("Recent captions:") | |
| for i, c in enumerate(all_captions[-3:]): # Show last 3 | |
| print(f" {i+1}. {c.cap_id} -> Image: {c.image_id}") | |
| finally: | |
| db.close() | |
| def check_database_consistency(): | |
| """Check database consistency and relationships""" | |
| print("\n=== Checking Database Consistency ===") | |
| db = SessionLocal() | |
| try: | |
| # Check all tables | |
| images = db.query(models.Images).all() | |
| captions = db.query(models.Captions).all() | |
| sources = db.query(models.Source).all() | |
| types = db.query(models.Type).all() | |
| countries = db.query(models.Country).all() | |
| print(f"Images: {len(images)}") | |
| for img in images: | |
| print(f" - {img.image_id}: {img.source}/{img.type} (has caption: {img.caption is not None})") | |
| print(f"Captions: {len(captions)}") | |
| for cap in captions: | |
| print(f" - {cap.cap_id}: {cap.title}") | |
| print(f"Sources: {len(sources)}") | |
| print(f"Types: {len(types)}") | |
| print(f"Countries: {len(countries)}") | |
| # Check for orphaned captions | |
| orphaned_captions = db.query(models.Captions).filter( | |
| ~models.Captions.image_id.in_([img.image_id for img in images]) | |
| ).all() | |
| if orphaned_captions: | |
| print(f"WARNING: Found {len(orphaned_captions)} orphaned captions!") | |
| for cap in orphaned_captions: | |
| print(f" - {cap.cap_id} -> Image: {cap.image_id}") | |
| else: | |
| print("β No orphaned captions found") | |
| finally: | |
| db.close() | |
| if __name__ == "__main__": | |
| test_frontend_backend_connection() | |
| test_database_save() | |
| test_submit_flow() | |
| check_caption_exists() | |
| check_database_consistency() |