Spaces:
Runtime error
Runtime error
| import configparser | |
| import logging | |
| import sqlite3 | |
| from typing import List, Dict, Any | |
| import chromadb | |
| import requests | |
| from chromadb import Settings | |
| from App_Function_Libraries.Chunk_Lib import improved_chunking_process | |
| from App_Function_Libraries.DB.DB_Manager import add_media_chunk, update_fts_for_media | |
| from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings | |
| ####################################################################################################################### | |
| # | |
| # Functions for ChromaDB | |
| # Get ChromaDB settings | |
| # Load configuration | |
| config = configparser.ConfigParser() | |
| config.read('config.txt') | |
| chroma_db_path = config.get('Database', 'chroma_db_path', fallback='chroma_db') | |
| chroma_client = chromadb.PersistentClient(path=chroma_db_path, settings=Settings(anonymized_telemetry=False)) | |
| # Get embedding settings | |
| embedding_provider = config.get('Embeddings', 'provider', fallback='openai') | |
| embedding_model = config.get('Embeddings', 'model', fallback='text-embedding-3-small') | |
| embedding_api_key = config.get('Embeddings', 'api_key', fallback='') | |
| embedding_api_url = config.get('Embeddings', 'api_url', fallback='') | |
| # Get chunking options | |
| chunk_options = { | |
| 'method': config.get('Chunking', 'method', fallback='words'), | |
| 'max_size': config.getint('Chunking', 'max_size', fallback=400), | |
| 'overlap': config.getint('Chunking', 'overlap', fallback=200), | |
| 'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False), | |
| 'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False), | |
| 'language': config.get('Chunking', 'language', fallback='english') | |
| } | |
| def auto_update_chroma_embeddings(media_id: int, content: str): | |
| """ | |
| Automatically update ChromaDB embeddings when a new item is ingested into the SQLite database. | |
| :param media_id: The ID of the newly ingested media item | |
| :param content: The content of the newly ingested media item | |
| """ | |
| collection_name = f"media_{media_id}" | |
| # Initialize or get the ChromaDB collection | |
| collection = chroma_client.get_or_create_collection(name=collection_name) | |
| # Check if embeddings already exist for this media_id | |
| existing_embeddings = collection.get(ids=[f"{media_id}_chunk_{i}" for i in range(len(content))]) | |
| if existing_embeddings and len(existing_embeddings) > 0: | |
| logging.info(f"Embeddings already exist for media ID {media_id}, skipping...") | |
| else: | |
| # Process and store content if embeddings do not already exist | |
| process_and_store_content(content, collection_name, media_id) | |
| logging.info(f"Updated ChromaDB embeddings for media ID: {media_id}") | |
| # Function to process content, create chunks, embeddings, and store in ChromaDB and SQLite | |
| def process_and_store_content(content: str, collection_name: str, media_id: int): | |
| # Process the content into chunks | |
| chunks = improved_chunking_process(content, chunk_options) | |
| texts = [chunk['text'] for chunk in chunks] | |
| # Generate embeddings for each chunk | |
| embeddings = [create_embedding(text) for text in texts] | |
| # Create unique IDs for each chunk using the media_id and chunk index | |
| ids = [f"{media_id}_chunk_{i}" for i in range(len(texts))] | |
| # Store the texts, embeddings, and IDs in ChromaDB | |
| store_in_chroma(collection_name, texts, embeddings, ids) | |
| # Store the chunk metadata in SQLite | |
| for i, chunk in enumerate(chunks): | |
| add_media_chunk(media_id, chunk['text'], chunk['start'], chunk['end'], ids[i]) | |
| # Update the FTS table | |
| update_fts_for_media(media_id) | |
| # Function to store documents and their embeddings in ChromaDB | |
| def store_in_chroma(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str]): | |
| collection = chroma_client.get_or_create_collection(name=collection_name) | |
| collection.add( | |
| documents=texts, | |
| embeddings=embeddings, | |
| ids=ids | |
| ) | |
| # Function to perform vector search using ChromaDB | |
| def vector_search(collection_name: str, query: str, k: int = 10) -> List[str]: | |
| query_embedding = create_embedding(query) | |
| collection = chroma_client.get_collection(name=collection_name) | |
| results = collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=k | |
| ) | |
| return results['documents'][0] | |
| def create_embedding(text: str) -> List[float]: | |
| global embedding_provider, embedding_model, embedding_api_url, embedding_api_key | |
| if embedding_provider == 'openai': | |
| return get_openai_embeddings(text, embedding_model) | |
| elif embedding_provider == 'local': | |
| response = requests.post( | |
| embedding_api_url, | |
| json={"text": text, "model": embedding_model}, | |
| headers={"Authorization": f"Bearer {embedding_api_key}"} | |
| ) | |
| return response.json()['embedding'] | |
| elif embedding_provider == 'huggingface': | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| tokenizer = AutoTokenizer.from_pretrained(embedding_model) | |
| model = AutoModel.from_pretrained(embedding_model) | |
| inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| # Use the mean of the last hidden state as the sentence embedding | |
| embeddings = outputs.last_hidden_state.mean(dim=1) | |
| return embeddings[0].tolist() # Convert to list for consistency | |
| else: | |
| raise ValueError(f"Unsupported embedding provider: {embedding_provider}") | |
| def create_all_embeddings(api_choice: str, model_or_url: str) -> str: | |
| try: | |
| all_content = get_all_content_from_database() | |
| if not all_content: | |
| return "No content found in the database." | |
| texts_to_embed = [] | |
| embeddings_to_store = [] | |
| ids_to_store = [] | |
| collection_name = "all_content_embeddings" | |
| # Initialize or get the ChromaDB collection | |
| collection = chroma_client.get_or_create_collection(name=collection_name) | |
| for content_item in all_content: | |
| media_id = content_item['id'] | |
| text = content_item['content'] | |
| # Check if the embedding already exists in ChromaDB | |
| embedding_exists = collection.get(ids=[f"doc_{media_id}"]) | |
| if embedding_exists: | |
| logging.info(f"Embedding already exists for media ID {media_id}, skipping...") | |
| continue # Skip if embedding already exists | |
| # Create the embedding | |
| if api_choice == "openai": | |
| embedding = create_openai_embedding(text, model_or_url) | |
| else: # Llama.cpp | |
| embedding = create_llamacpp_embedding(text, model_or_url) | |
| # Collect the text, embedding, and ID for batch storage | |
| texts_to_embed.append(text) | |
| embeddings_to_store.append(embedding) | |
| ids_to_store.append(f"doc_{media_id}") | |
| # Store all new embeddings in ChromaDB | |
| if texts_to_embed and embeddings_to_store: | |
| store_in_chroma(collection_name, texts_to_embed, embeddings_to_store, ids_to_store) | |
| return "Embeddings created and stored successfully for all new content." | |
| except Exception as e: | |
| logging.error(f"Error during embedding creation: {str(e)}") | |
| return f"Error: {str(e)}" | |
| def create_openai_embedding(text: str, model: str) -> List[float]: | |
| openai_api_key = config['API']['openai_api_key'] | |
| embedding = get_openai_embeddings(text, model) | |
| return embedding | |
| def create_llamacpp_embedding(text: str, api_url: str) -> List[float]: | |
| response = requests.post( | |
| api_url, | |
| json={"input": text} | |
| ) | |
| if response.status_code == 200: | |
| return response.json()['embedding'] | |
| else: | |
| raise Exception(f"Error from Llama.cpp API: {response.text}") | |
| def get_all_content_from_database() -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all media content from the database that requires embedding. | |
| Returns: | |
| List[Dict[str, Any]]: A list of dictionaries, each containing the media ID, content, title, and other relevant fields. | |
| """ | |
| try: | |
| from App_Function_Libraries.DB.DB_Manager import db | |
| with db.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT id, content, title, author, type | |
| FROM Media | |
| WHERE is_trash = 0 -- Exclude items marked as trash | |
| """) | |
| media_items = cursor.fetchall() | |
| # Convert the results into a list of dictionaries | |
| all_content = [ | |
| { | |
| 'id': item[0], | |
| 'content': item[1], | |
| 'title': item[2], | |
| 'author': item[3], | |
| 'type': item[4] | |
| } | |
| for item in media_items | |
| ] | |
| return all_content | |
| except sqlite3.Error as e: | |
| logging.error(f"Error retrieving all content from database: {e}") | |
| from App_Function_Libraries.DB.SQLite_DB import DatabaseError | |
| raise DatabaseError(f"Error retrieving all content from database: {e}") | |
| def store_in_chroma_with_citation(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str], sources: List[str]): | |
| collection = chroma_client.get_or_create_collection(name=collection_name) | |
| collection.add( | |
| documents=texts, | |
| embeddings=embeddings, | |
| ids=ids, | |
| metadatas=[{'source': source} for source in sources] | |
| ) | |
| def check_embedding_status(selected_item): | |
| if not selected_item: | |
| return "Please select an item", "" | |
| item_id = selected_item.split('(')[0].strip() | |
| collection = chroma_client.get_or_create_collection(name="all_content_embeddings") | |
| result = collection.get(ids=[f"doc_{item_id}"]) | |
| if result['ids']: | |
| embedding = result['embeddings'][0] | |
| embedding_preview = str(embedding[:50]) # Convert first 50 elements to string | |
| return f"Embedding exists for item: {item_id}", f"Embedding preview: {embedding_preview}..." | |
| else: | |
| return f"No embedding found for item: {item_id}", "" | |
| def create_new_embedding(selected_item, api_choice, openai_model, llamacpp_url): | |
| if not selected_item: | |
| return "Please select an item" | |
| item_id = selected_item.split('(')[0].strip() | |
| items = get_all_content_from_database() | |
| item = next((item for item in items if item['title'] == item_id), None) | |
| if not item: | |
| return f"Item not found: {item_id}" | |
| try: | |
| if api_choice == "OpenAI": | |
| embedding = create_embedding(item['content']) | |
| else: # Llama.cpp | |
| embedding = create_embedding(item['content']) | |
| collection_name = "all_content_embeddings" | |
| store_in_chroma(collection_name, [item['content']], [embedding], [f"doc_{item['id']}"]) | |
| return f"New embedding created and stored for item: {item_id}" | |
| except Exception as e: | |
| return f"Error creating embedding: {str(e)}" | |
| # | |
| # End of Functions for ChromaDB | |
| ####################################################################################################################### |