Spaces:
Running
Running
| """FAISS-based semantic search engine using ID-mapped index""" | |
| import faiss | |
| import numpy as np | |
| from typing import List, Tuple | |
| import os | |
| import random | |
| class SearchEngine: | |
| """FAISS-based search engine for image embeddings""" | |
| def __init__(self, dim: int = 1024, index_path: str = "faiss_index.bin"): | |
| self.dim = dim | |
| self.index_path = index_path | |
| # Load existing index or create a new one | |
| if os.path.exists(index_path): | |
| self.index = faiss.read_index(index_path) | |
| else: | |
| base_index = faiss.IndexFlatL2(dim) | |
| self.index = faiss.IndexIDMap(base_index) | |
| def create_albums(self, top_k: int = 5, distance_threshold: float = 0.3, album_size: int = 5) -> List[List[int]]: | |
| """ | |
| Group similar images into albums (clusters). | |
| Returns exactly top_k albums, each containing up to album_size similar photos. | |
| Photos are marked as visited to avoid duplicate albums. | |
| Only includes photos within the distance threshold. | |
| Args: | |
| top_k: Number of albums to return | |
| distance_threshold: Maximum distance to consider photos as similar (default 0.3) | |
| album_size: How many similar photos to search for per album (default 5) | |
| Returns: | |
| List of top_k albums, each album is a list of photo_ids (randomized order each call) | |
| """ | |
| from cloudzy.database import SessionLocal | |
| from cloudzy.models import Photo | |
| from sqlmodel import select | |
| self.load() | |
| if self.index.ntotal == 0: | |
| return [] | |
| # Get all photo IDs from FAISS index | |
| id_map = self.index.id_map | |
| all_ids = [id_map.at(i) for i in range(id_map.size())] | |
| # Shuffle for randomization - different albums each call | |
| random.shuffle(all_ids) | |
| visited = set() | |
| albums = [] | |
| for photo_id in all_ids: | |
| # Stop if we have enough albums | |
| if len(albums) >= top_k: | |
| break | |
| # Skip if already in an album | |
| if photo_id in visited: | |
| continue | |
| # Get embedding from database | |
| session = SessionLocal() | |
| try: | |
| photo = session.exec(select(Photo).where(Photo.id == photo_id)).first() | |
| if not photo: | |
| continue | |
| embedding = photo.get_embedding() | |
| if not embedding: | |
| continue | |
| # Search for similar images | |
| query_embedding = np.array(embedding).reshape(1, -1).astype(np.float32) | |
| distances, ids = self.index.search(query_embedding, album_size) | |
| # Build album: collect similar photos that haven't been visited and are within threshold | |
| album = [] | |
| for pid, distance in zip(ids[0], distances[0]): | |
| if pid != -1 and pid not in visited and distance <= distance_threshold: | |
| album.append(int(pid)) | |
| visited.add(pid) | |
| # Add album if it has at least 1 photo | |
| if album: | |
| albums.append(album) | |
| finally: | |
| session.close() | |
| return albums | |
| def add_embedding(self, photo_id: int, embedding: np.ndarray) -> None: | |
| """ | |
| Add an embedding to the index. | |
| Args: | |
| photo_id: Unique photo identifier | |
| embedding: 1D numpy array of shape (dim,) | |
| """ | |
| # Ensure embedding is float32 and correct shape | |
| embedding = embedding.astype(np.float32).reshape(1, -1) | |
| # Add embedding with its ID | |
| self.index.add_with_ids(embedding, np.array([photo_id], dtype=np.int64)) | |
| # Save index to disk | |
| self.save() | |
| def search(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[int, float]]: | |
| """ | |
| Search for similar embeddings. | |
| Args: | |
| query_embedding: 1D numpy array of shape (dim,) | |
| top_k: Number of results to return | |
| Returns: | |
| List of (photo_id, distance) tuples with distance <= 0.5 | |
| """ | |
| self.load() | |
| if self.index.ntotal == 0: | |
| return [] | |
| # Ensure query is float32 and correct shape | |
| query_embedding = query_embedding.astype(np.float32).reshape(1, -1) | |
| # Search in FAISS index | |
| distances, ids = self.index.search(query_embedding, top_k) | |
| # Filter invalid and distant results | |
| results = [ | |
| (int(photo_id), float(distance)) | |
| for photo_id, distance in zip(ids[0], distances[0]) | |
| if photo_id != -1 and distance <= 0.5 | |
| ] | |
| return results | |
| def save(self) -> None: | |
| """Save FAISS index to disk""" | |
| faiss.write_index(self.index, self.index_path) | |
| def load(self) -> None: | |
| """Load FAISS index from disk""" | |
| if os.path.exists(self.index_path): | |
| self.index = faiss.read_index(self.index_path) | |
| else: | |
| # Recreate empty ID-mapped index if missing | |
| base_index = faiss.IndexFlatL2(self.dim) | |
| self.index = faiss.IndexIDMap(base_index) | |
| def get_stats(self) -> dict: | |
| """Get index statistics""" | |
| return { | |
| "total_embeddings": self.index.ntotal, | |
| "dimension": self.dim, | |
| "index_type": type(self.index).__name__, | |
| } | |