Spaces:
Runtime error
Runtime error
File size: 5,951 Bytes
dd67299 7c5c440 c559bc7 03852d5 7fae8fb c559bc7 03852d5 c559bc7 03852d5 7c5c440 22ef1d5 dd67299 8f370f4 c65ef6e c559bc7 c65ef6e dd67299 c559bc7 dd67299 7c5c440 c65ef6e c559bc7 22ef1d5 c65ef6e c559bc7 c65ef6e c559bc7 9e00920 22ef1d5 03852d5 c559bc7 c65ef6e 03852d5 c65ef6e c559bc7 0c4adc5 22ef1d5 7fae8fb 03852d5 dd67299 03852d5 7fae8fb 9e00920 22ef1d5 746bf5b c65ef6e c559bc7 7c5c440 0d5f8a4 c559bc7 c65ef6e c559bc7 8f370f4 22ef1d5 7c5c440 7fae8fb 7c5c440 c559bc7 22ef1d5 c559bc7 a1501eb 22ef1d5 a06f639 c65ef6e c559bc7 dd67299 c559bc7 dd67299 c559bc7 dd67299 c65ef6e c559bc7 03852d5 c559bc7 22ef1d5 c559bc7 c65ef6e c559bc7 03852d5 c559bc7 03852d5 c559bc7 03852d5 c65ef6e c559bc7 22ef1d5 c559bc7 7c5c440 03852d5 dd67299 c559bc7 03852d5 7c5c440 c65ef6e dd67299 c559bc7 dd67299 c559bc7 22ef1d5 c559bc7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
import os
import uuid
import gradio as gr
import qdrant_client
from qdrant_client import models
from sentence_transformers import SentenceTransformer
from PIL import Image
# ===============================
# Setup
# ===============================
UPLOAD_DIR = "uploaded_images"
os.makedirs(UPLOAD_DIR, exist_ok=True)
COLLECTION = "lost_and_found"
# Qdrant client (in-memory for Hugging Face)
qclient = qdrant_client.QdrantClient(":memory:")
encoder = SentenceTransformer("clip-ViT-B-32")
# Create collection if not exists
if not qclient.collection_exists(COLLECTION):
qclient.create_collection(
collection_name=COLLECTION,
vectors_config=models.VectorParams(size=512, distance=models.Distance.COSINE),
)
# ===============================
# Encode Function (Text or Image)
# ===============================
def encode_data(text=None, image=None):
if isinstance(image, Image.Image): # Image is already PIL
return encoder.encode(image.convert("RGB"))
elif isinstance(image, str): # Path to image
return encoder.encode(Image.open(image).convert("RGB"))
elif text:
return encoder.encode([text])[0]
else:
return None
# ===============================
# Add Item
# ===============================
def add_item(text, image, uploader_name, uploader_phone):
try:
img_path = None
vector = None
if isinstance(image, Image.Image): # PIL image
img_id = str(uuid.uuid4())
img_path = os.path.join(UPLOAD_DIR, f"{img_id}.png")
image.save(img_path)
vector = encode_data(image=image)
elif text:
vector = encode_data(text=text)
if vector is None:
return "β Please provide an image or text."
qclient.upsert(
collection_name=COLLECTION,
points=[
models.PointStruct(
id=str(uuid.uuid4()),
vector=vector.tolist(),
payload={
"text": text or "",
"uploader_name": uploader_name or "N/A",
"uploader_phone": uploader_phone or "N/A",
"image_path": img_path,
},
)
],
)
return "β
Item added to database!"
except Exception as e:
return f"β Error: {e}"
# ===============================
# Search Function
# ===============================
def search_items(text, image, max_results, min_score):
try:
vector = None
if isinstance(image, Image.Image): # Search with PIL
vector = encode_data(image=image)
elif text:
vector = encode_data(text=text)
if vector is None:
return "β Please provide an image or text.", []
results = qclient.search(
collection_name=COLLECTION,
query_vector=vector.tolist(),
limit=max_results,
score_threshold=min_score,
)
if not results:
return "No matches found.", []
# Format results
result_texts, result_imgs = [], []
for r in results:
payload = r.payload
result_texts.append(
f"id:{r.id} | score:{r.score:.3f} | "
f"text:{payload.get('text','')} | "
f"finder:{payload.get('uploader_name','N/A')} "
f"({payload.get('uploader_phone','N/A')})"
)
if payload.get("image_path") and os.path.exists(payload["image_path"]):
result_imgs.append(payload["image_path"])
return "\n".join(result_texts), result_imgs
except Exception as e:
return f"β Error: {e}", []
# ===============================
# Delete All
# ===============================
def clear_database():
qclient.delete_collection(COLLECTION)
qclient.create_collection(
collection_name=COLLECTION,
vectors_config=models.VectorParams(size=512, distance=models.Distance.COSINE),
)
return "ποΈ Database cleared!"
# ===============================
# Gradio UI
# ===============================
with gr.Blocks() as demo:
gr.Markdown("## ποΈ Lost & Found - Database")
# --- Add Item Tab ---
with gr.Tab("β Add Item"):
with gr.Row():
text_input = gr.Textbox(label="Description (optional)")
img_input = gr.Image(type="pil", label="Upload Image")
with gr.Row():
uploader_name = gr.Textbox(label="Finder Name")
uploader_phone = gr.Textbox(label="Finder Phone")
add_btn = gr.Button("Add to Database")
add_output = gr.Textbox(label="Status")
add_btn.click(
add_item,
inputs=[text_input, img_input, uploader_name, uploader_phone],
outputs=add_output,
)
# --- Search Tab ---
with gr.Tab("π Search"):
with gr.Row():
search_text = gr.Textbox(label="Search by text (optional)")
search_img = gr.Image(type="pil", label="Search by image (optional)")
with gr.Row():
max_results = gr.Slider(1, 10, value=5, step=1, label="Max results")
min_score = gr.Slider(0.5, 1.0, value=0.8, step=0.01, label="Min similarity threshold")
search_btn = gr.Button("Search")
search_text_out = gr.Textbox(label="Search results (text)")
search_gallery = gr.Gallery(label="Search Results", columns=2, height="auto")
search_btn.click(
search_items,
inputs=[search_text, search_img, max_results, min_score],
outputs=[search_text_out, search_gallery],
)
# --- Admin Tab ---
with gr.Tab("ποΈ Admin"):
clear_btn = gr.Button("Clear Database")
clear_out = gr.Textbox(label="Status")
clear_btn.click(clear_database, outputs=clear_out)
demo.launch()
|