Spaces:
Running
Running
| from typing import Any | |
| from langchain.schema import Document | |
| from langchain_elasticsearch import ElasticsearchStore | |
| from loguru import logger | |
| from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store | |
| from langflow.io import ( | |
| DataInput, | |
| DropdownInput, | |
| FloatInput, | |
| HandleInput, | |
| IntInput, | |
| MultilineInput, | |
| SecretStrInput, | |
| StrInput, | |
| ) | |
| from langflow.schema import Data | |
| class ElasticsearchVectorStoreComponent(LCVectorStoreComponent): | |
| """Elasticsearch Vector Store with with advanced, customizable search capabilities.""" | |
| display_name: str = "Elasticsearch" | |
| description: str = "Elasticsearch Vector Store with with advanced, customizable search capabilities." | |
| documentation = "https://python.langchain.com/docs/integrations/vectorstores/elasticsearch" | |
| name = "Elasticsearch" | |
| icon = "ElasticsearchStore" | |
| inputs = [ | |
| StrInput( | |
| name="elasticsearch_url", | |
| display_name="Elasticsearch URL", | |
| value="http://localhost:9200", | |
| info="URL for self-managed Elasticsearch deployments (e.g., http://localhost:9200). " | |
| "Do not use with Elastic Cloud deployments, use Elastic Cloud ID instead.", | |
| ), | |
| SecretStrInput( | |
| name="cloud_id", | |
| display_name="Elastic Cloud ID", | |
| value="", | |
| info="Use this for Elastic Cloud deployments. Do not use together with 'Elasticsearch URL'.", | |
| ), | |
| StrInput( | |
| name="index_name", | |
| display_name="Index Name", | |
| value="langflow", | |
| info="The index name where the vectors will be stored in Elasticsearch cluster.", | |
| ), | |
| MultilineInput( | |
| name="search_input", | |
| display_name="Search Input", | |
| info="Enter a search query. Leave empty to retrieve all documents.", | |
| ), | |
| StrInput( | |
| name="username", | |
| display_name="Username", | |
| value="", | |
| advanced=False, | |
| info=( | |
| "Elasticsearch username (e.g., 'elastic'). " | |
| "Required for both local and Elastic Cloud setups unless API keys are used." | |
| ), | |
| ), | |
| SecretStrInput( | |
| name="password", | |
| display_name="Password", | |
| value="", | |
| advanced=False, | |
| info=( | |
| "Elasticsearch password for the specified user. " | |
| "Required for both local and Elastic Cloud setups unless API keys are used." | |
| ), | |
| ), | |
| DataInput( | |
| name="ingest_data", | |
| display_name="Ingest Data", | |
| is_list=True, | |
| ), | |
| HandleInput( | |
| name="embedding", | |
| display_name="Embedding", | |
| input_types=["Embeddings"], | |
| ), | |
| DropdownInput( | |
| name="search_type", | |
| display_name="Search Type", | |
| options=["similarity", "mmr"], | |
| value="similarity", | |
| advanced=True, | |
| ), | |
| IntInput( | |
| name="number_of_results", | |
| display_name="Number of Results", | |
| info="Number of results to return.", | |
| advanced=True, | |
| value=4, | |
| ), | |
| FloatInput( | |
| name="search_score_threshold", | |
| display_name="Search Score Threshold", | |
| info="Minimum similarity score threshold for search results.", | |
| value=0.0, | |
| advanced=True, | |
| ), | |
| SecretStrInput( | |
| name="api_key", | |
| display_name="Elastic API Key", | |
| value="", | |
| advanced=True, | |
| info="API Key for Elastic Cloud authentication. If used, 'username' and 'password' are not required.", | |
| ), | |
| ] | |
| def build_vector_store(self) -> ElasticsearchStore: | |
| """Builds the Elasticsearch Vector Store object.""" | |
| if self.cloud_id and self.elasticsearch_url: | |
| msg = ( | |
| "Both 'cloud_id' and 'elasticsearch_url' provided. " | |
| "Please use only one based on your deployment (Cloud or Local)." | |
| ) | |
| raise ValueError(msg) | |
| es_params = { | |
| "index_name": self.index_name, | |
| "embedding": self.embedding, | |
| "es_user": self.username or None, | |
| "es_password": self.password or None, | |
| } | |
| if self.cloud_id: | |
| es_params["es_cloud_id"] = self.cloud_id | |
| else: | |
| es_params["es_url"] = self.elasticsearch_url | |
| if self.api_key: | |
| es_params["api_key"] = self.api_key | |
| elasticsearch = ElasticsearchStore(**es_params) | |
| # If documents are provided, add them to the store | |
| if self.ingest_data: | |
| documents = self._prepare_documents() | |
| if documents: | |
| elasticsearch.add_documents(documents) | |
| return elasticsearch | |
| def _prepare_documents(self) -> list[Document]: | |
| """Prepares documents from the input data to add to the vector store.""" | |
| documents = [] | |
| for data in self.ingest_data: | |
| if isinstance(data, Data): | |
| documents.append(data.to_lc_document()) | |
| else: | |
| error_message = "Vector Store Inputs must be Data objects." | |
| logger.error(error_message) | |
| raise TypeError(error_message) | |
| return documents | |
| def _add_documents_to_vector_store(self, vector_store: "ElasticsearchStore") -> None: | |
| """Adds documents to the Vector Store.""" | |
| documents = self._prepare_documents() | |
| if documents and self.embedding: | |
| logger.debug(f"Adding {len(documents)} documents to the Vector Store.") | |
| vector_store.add_documents(documents) | |
| else: | |
| logger.debug("No documents to add to the Vector Store.") | |
| def search(self, query: str | None = None) -> list[dict[str, Any]]: | |
| """Search for similar documents in the vector store or retrieve all documents if no query is provided.""" | |
| vector_store = self.build_vector_store() | |
| search_kwargs = { | |
| "k": self.number_of_results, | |
| "score_threshold": self.search_score_threshold, | |
| } | |
| if query: | |
| search_type = self.search_type.lower() | |
| if search_type not in {"similarity", "mmr"}: | |
| msg = f"Invalid search type: {self.search_type}" | |
| logger.error(msg) | |
| raise ValueError(msg) | |
| try: | |
| if search_type == "similarity": | |
| results = vector_store.similarity_search_with_score(query, **search_kwargs) | |
| elif search_type == "mmr": | |
| results = vector_store.max_marginal_relevance_search(query, **search_kwargs) | |
| except Exception as e: | |
| msg = ( | |
| "Error occurred while querying the Elasticsearch VectorStore," | |
| " there is no Data into the VectorStore." | |
| ) | |
| logger.exception(msg) | |
| raise ValueError(msg) from e | |
| return [ | |
| {"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results | |
| ] | |
| results = self.get_all_documents(vector_store, **search_kwargs) | |
| return [{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results] | |
| def get_all_documents(self, vector_store: ElasticsearchStore, **kwargs) -> list[tuple[Document, float]]: | |
| """Retrieve all documents from the vector store.""" | |
| client = vector_store.client | |
| index_name = self.index_name | |
| query = { | |
| "query": {"match_all": {}}, | |
| "size": kwargs.get("k", self.number_of_results), | |
| } | |
| response = client.search(index=index_name, body=query) | |
| results = [] | |
| for hit in response["hits"]["hits"]: | |
| doc = Document( | |
| page_content=hit["_source"].get("text", ""), | |
| metadata=hit["_source"].get("metadata", {}), | |
| ) | |
| score = hit["_score"] | |
| results.append((doc, score)) | |
| return results | |
| def search_documents(self) -> list[Data]: | |
| """Search for documents in the vector store based on the search input. | |
| If no search input is provided, retrieve all documents. | |
| """ | |
| results = self.search(self.search_input) | |
| retrieved_data = [ | |
| Data( | |
| text=result["page_content"], | |
| file_path=result["metadata"].get("file_path", ""), | |
| ) | |
| for result in results | |
| ] | |
| self.status = retrieved_data | |
| return retrieved_data | |
| def get_retriever_kwargs(self): | |
| """Get the keyword arguments for the retriever.""" | |
| return { | |
| "search_type": self.search_type.lower(), | |
| "search_kwargs": { | |
| "k": self.number_of_results, | |
| "score_threshold": self.search_score_threshold, | |
| }, | |
| } | |