Spaces:
Runtime error
Runtime error
| from abc import ABC, abstractmethod | |
| from haystack.nodes import BM25Retriever, FARMReader | |
| from haystack.document_stores import ElasticsearchDocumentStore | |
| from haystack.pipelines import ExtractiveQAPipeline, DocumentSearchPipeline | |
| from haystack.document_stores import PineconeDocumentStore | |
| from haystack.nodes import EmbeddingRetriever, OpenAIAnswerGenerator | |
| from json import JSONDecodeError | |
| from pathlib import Path | |
| from typing import List, Optional | |
| import pandas as pd | |
| from haystack import BaseComponent, Document | |
| from haystack.document_stores import PineconeDocumentStore | |
| from haystack.nodes import ( | |
| EmbeddingRetriever, | |
| FARMReader | |
| ) | |
| from haystack.pipelines import ExtractiveQAPipeline, Pipeline, GenerativeQAPipeline | |
| from haystack.pipelines import BaseStandardPipeline | |
| from haystack.nodes.reader import BaseReader | |
| from haystack.nodes.retriever import BaseRetriever | |
| from sentence_transformers import SentenceTransformer | |
| import certifi | |
| import datetime | |
| import requests | |
| from base64 import b64encode | |
| ca_certs = certifi.where() | |
| class QAPipeline(BaseStandardPipeline): | |
| """ | |
| Pipeline for Extractive Question Answering. | |
| """ | |
| def __init__(self, reader: BaseReader, retriever: BaseRetriever): | |
| """ | |
| :param reader: Reader instance | |
| :param retriever: Retriever instance | |
| """ | |
| self.pipeline = Pipeline() | |
| self.pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"]) | |
| self.pipeline.add_node(component=reader, name="Reader", inputs=["Retriever"]) | |
| self.metrics_filter = {"Retriever": ["recall_single_hit"]} | |
| def run(self, query: str, params: Optional[dict] = None, debug: Optional[bool] = None): | |
| """ | |
| :param query: The search query string. | |
| :param params: Params for the `retriever` and `reader`. For instance, | |
| params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 5}} | |
| :param debug: Whether the pipeline should instruct nodes to collect debug information | |
| about their execution. By default these include the input parameters | |
| they received and the output they generated. | |
| All debug information can then be found in the dict returned | |
| by this method under the key "_debug" | |
| """ | |
| output = self.pipeline.run(query=query, params=params, debug=debug) | |
| return output | |
| class DocumentQueries(ABC): | |
| def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): | |
| pass | |
| class PineconeProposalQueries(DocumentQueries): | |
| def __init__(self, index_name: str, api_key, reader_name_or_path: str, use_gpu = True, | |
| embedding_dim = 384, environment = "us-east1-gcp", OPENAI_key = None) -> None: | |
| reader = FARMReader(model_name_or_path = reader_name_or_path, | |
| use_gpu = use_gpu, num_processes = 1, | |
| context_window_size = 200) | |
| self._initialize_pipeline(index_name, api_key, reader = reader, embedding_dim= | |
| embedding_dim, environment = environment, OPENAI_key= OPENAI_key) | |
| #self.log = Log(es_host= es_host, es_index="log", es_user = es_user, es_password= es_password) | |
| self.OpenAI_api_key = None | |
| def _initialize_pipeline(self, index_name, api_key, similarity = "cosine", | |
| embedding_dim = 384, reader = None, | |
| environment = "us-east1-gcp", | |
| metadata_config = {"indexed": ["title", "source_title"]}, | |
| OPENAI_key = None): | |
| if reader is not None: | |
| self.reader = reader | |
| #pinecone.init(api_key=es_password, environment="us-east1-gcp") | |
| self.document_store = PineconeDocumentStore( | |
| api_key = api_key, | |
| environment = environment, | |
| index = index_name, | |
| similarity = similarity, | |
| embedding_dim = embedding_dim, | |
| metadata_config = {"indexed": ["title","source_title"]} | |
| ) | |
| self.retriever = EmbeddingRetriever( | |
| document_store= self.document_store, | |
| embedding_model = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1", | |
| model_format="sentence_transformers" | |
| ) | |
| self.extractive_pipe = ExtractiveQAPipeline (reader = self.reader, | |
| retriever = self.retriever) | |
| self.generative_OPENAI_pipe = None | |
| if (OPENAI_key != None and OPENAI_key != ""): | |
| OPENAI_generator = OpenAIAnswerGenerator(api_key = OPENAI_key, | |
| model="text-davinci-003", temperature=.5, max_tokens=60) | |
| self.generative_OPENAI_pipe = GenerativeQAPipeline(generator = OPENAI_generator, | |
| retriever = self.retriever) | |
| def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): | |
| #self.document_store.update_embeddings(self.retriever, update_existing_embeddings=False) | |
| params = {"Retriever": {"top_k": retriever_top_k, | |
| "filters": filters}, | |
| "Reader": {"top_k": reader_top_k}} | |
| prediction = self.extractive_pipe.run( query = query, params = params, debug = True) | |
| return prediction["answers"] | |
| def __initialize_openAIGEnerator(self, OPENAI_key, openai_model_name= "text-davinci-003", temperature = .5, max_tokens = 30): | |
| if OPENAI_key != self.OpenAI_api_key: | |
| OPENAI_generator = OpenAIAnswerGenerator(api_key=OPENAI_key, | |
| model=openai_model_name, temperature= temperature, max_tokens=max_tokens) | |
| self.generative_OPENAI_pipe = GenerativeQAPipeline(generator = OPENAI_generator, | |
| retriever = self.retriever) | |
| self.OpenAI_api_key = OPENAI_key | |
| def genenerate_answer_OpenAI(self, query : str, retriever_top_k: int, generator_top_k: int, filters = None, | |
| OPENAI_key = None, openai_model_name= "text-davinci-003",temperature = .5, max_tokens = 30): | |
| if OPENAI_key != self.OpenAI_api_key: | |
| self.__initialize_openAIGEnerator(OPENAI_key, openai_model_name, temperature, max_tokens) | |
| params = {"Retriever": {"top_k": retriever_top_k, | |
| "filters": filters}, | |
| "Generator": {"top_k": generator_top_k}} | |
| prediction = self.generative_OPENAI_pipe.run( query = query, params = params) | |
| return prediction["answers"] | |
| else: | |
| return None | |
| def genenerate_answer_HF(self, query : str, retriever_top_k: int, reader_top_k: int, es_index: str = None, filters = None) : | |
| params = {"Retriever": {"top_k": retriever_top_k, | |
| "filters": filters}, | |
| "Generator": {"top_k": reader_top_k}} | |
| prediction = self.generative_HF_pipe.run( query = query, params = params) | |
| return prediction["answers"] | |
| class Log(): | |
| def __init__(self, es_host: str, es_index: str, es_user, es_password) -> None: | |
| self.elastic_endpoint = f"https://{es_host}:443/{es_index}/_doc" | |
| self.credentials = b64encode(b"3pvrzh9tl:4yl4vk9ijr").decode("ascii") | |
| self.auth_header = { 'Authorization' : 'Basic %s' % self.credentials } | |
| def write_log(self, message: str, source: str) -> None: | |
| created_date = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') | |
| post_data = { | |
| "message" : message, | |
| "createdDate": { | |
| "date" : created_date | |
| }, | |
| "source": source | |
| } | |
| r = requests.post(self.elastic_endpoint, json = post_data, headers = self.auth_header) | |
| print(r.text) |