Spaces:
Runtime error
Runtime error
| import re | |
| import io | |
| import os | |
| from typing import Optional, Tuple | |
| import datetime | |
| import sys | |
| import gradio as gr | |
| import requests | |
| import json | |
| from threading import Lock | |
| from langchain import ConversationChain, LLMChain | |
| from langchain.agents import load_tools, initialize_agent, Tool | |
| from langchain.tools.bing_search.tool import BingSearchRun, BingSearchAPIWrapper | |
| from langchain.chains.conversation.memory import ConversationBufferMemory | |
| from langchain.llms import OpenAI | |
| from langchain.chains import PALChain | |
| from langchain.llms import AzureOpenAI | |
| from langchain.utilities import ImunAPIWrapper, ImunMultiAPIWrapper | |
| from openai.error import AuthenticationError, InvalidRequestError, RateLimitError | |
| import argparse | |
| import logging | |
| from opencensus.ext.azure.log_exporter import AzureLogHandler | |
| import uuid | |
| logger = None | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
| BUG_FOUND_MSG = "Some Functionalities not supported yet. Please refresh and hit 'Click to wake up MM-REACT'" | |
| AUTH_ERR_MSG = "OpenAI key needed" | |
| REFRESH_MSG = "Please refresh and hit 'Click to wake up MM-REACT'" | |
| MAX_TOKENS = 512 | |
| ############## ARGS ################# | |
| AGRS = None | |
| ##################################### | |
| def get_logger(): | |
| global logger | |
| if logger is None: | |
| logger = logging.getLogger(__name__) | |
| logger.addHandler(AzureLogHandler()) | |
| return logger | |
| # load chain | |
| def load_chain(history, log_state): | |
| global ARGS | |
| if ARGS.openAIModel == 'openAIGPT35': | |
| # openAI GPT 3.5 | |
| llm = OpenAI(temperature=0, max_tokens=MAX_TOKENS) | |
| elif ARGS.openAIModel == 'azureChatGPT': | |
| # for Azure OpenAI ChatGPT | |
| llm = AzureOpenAI(deployment_name="text-chat-davinci-002", model_name="text-chat-davinci-002", temperature=0, max_tokens=MAX_TOKENS) | |
| elif ARGS.openAIModel == 'azureGPT35turbo': | |
| # for Azure OpenAI gpt3.5 turbo | |
| llm = AzureOpenAI(deployment_name="gpt-35-turbo-version-0301", model_name="gpt-35-turbo (version 0301)", temperature=0, max_tokens=MAX_TOKENS) | |
| elif ARGS.openAIModel == 'azureTextDavinci003': | |
| # for Azure OpenAI text davinci | |
| llm = AzureOpenAI(deployment_name="text-davinci-003", model_name="text-davinci-003", temperature=0, max_tokens=MAX_TOKENS) | |
| memory = ConversationBufferMemory(memory_key="chat_history") | |
| ############################# | |
| # loading all tools | |
| imun_dense = ImunAPIWrapper( | |
| imun_url="https://ehazarwestus.cognitiveservices.azure.com/computervision/imageanalysis:analyze", | |
| params="api-version=2023-02-01-preview&model-version=latest&features=denseCaptions", | |
| imun_subscription_key=os.environ.get("IMUN_SUBSCRIPTION_KEY2")) | |
| imun = ImunAPIWrapper() | |
| imun = ImunMultiAPIWrapper(imuns=[imun, imun_dense]) | |
| imun_celeb = ImunAPIWrapper( | |
| imun_url="https://cvfiahmed.cognitiveservices.azure.com/vision/v3.2/models/celebrities/analyze", | |
| params="") | |
| imun_read = ImunAPIWrapper( | |
| imun_url="https://vigehazar.cognitiveservices.azure.com/formrecognizer/documentModels/prebuilt-read:analyze", | |
| params="api-version=2022-08-31", | |
| imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) | |
| imun_receipt = ImunAPIWrapper( | |
| imun_url="https://vigehazar.cognitiveservices.azure.com/formrecognizer/documentModels/prebuilt-receipt:analyze", | |
| params="api-version=2022-08-31", | |
| imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) | |
| imun_businesscard = ImunAPIWrapper( | |
| imun_url="https://vigehazar.cognitiveservices.azure.com/formrecognizer/documentModels/prebuilt-businessCard:analyze", | |
| params="api-version=2022-08-31", | |
| imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) | |
| imun_layout = ImunAPIWrapper( | |
| imun_url="https://vigehazar.cognitiveservices.azure.com/formrecognizer/documentModels/prebuilt-layout:analyze", | |
| params="api-version=2022-08-31", | |
| imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) | |
| bing = BingSearchAPIWrapper(k=2) | |
| def edit_photo(query: str) -> str: | |
| endpoint = os.environ.get("PHOTO_EDIT_ENDPOINT_URL") | |
| query = query.strip() | |
| url_idx = query.rfind(" ") | |
| img_url = query[url_idx + 1:].strip() | |
| if img_url.endswith((".", "?")): | |
| img_url = img_url[:-1] | |
| if not img_url.startswith(("http://", "https://")): | |
| return "Invalid image URL" | |
| img_url = img_url.replace("0.0.0.0", os.environ.get("PHOTO_EDIT_ENDPOINT_URL_SHORT")) | |
| instruction = query[:url_idx] | |
| # This should be some internal IP to wherever the server runs | |
| job = {"image_path": img_url, "instruction": instruction} | |
| response = requests.post(endpoint, json=job) | |
| if response.status_code != 200: | |
| return "Could not finish the task try again later!" | |
| return "Here is the edited image " + endpoint + response.json()["edited_image"] | |
| # these tools should not step on each other's toes | |
| tools = [ | |
| Tool( | |
| name="PAL-MATH", | |
| func=PALChain.from_math_prompt(llm).run, | |
| description=( | |
| "A wrapper around calculator. " | |
| "A language model that is really good at solving complex word math problems." | |
| "Input should be a fully worded hard word math problem." | |
| ) | |
| ), | |
| Tool( | |
| name = "Image Understanding", | |
| func=imun.run, | |
| description=( | |
| "A wrapper around Image Understanding. " | |
| "Useful for when you need to understand what is inside an image (objects, texts, people)." | |
| "Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
| ) | |
| ), | |
| Tool( | |
| name = "OCR Understanding", | |
| func=imun_read.run, | |
| description=( | |
| "A wrapper around OCR Understanding (Optical Character Recognition). " | |
| "Useful after Image Understanding tool has found text or handwriting is present in the image tags." | |
| "This tool can find the actual text, written name, or product name in the image." | |
| "Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
| ) | |
| ), | |
| Tool( | |
| name = "Receipt Understanding", | |
| func=imun_receipt.run, | |
| description=( | |
| "A wrapper receipt understanding. " | |
| "Useful after Image Understanding tool has recognized a receipt in the image tags." | |
| "This tool can find the actual receipt text, prices and detailed items." | |
| "Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
| ) | |
| ), | |
| Tool( | |
| name = "Business Card Understanding", | |
| func=imun_businesscard.run, | |
| description=( | |
| "A wrapper around business card understanding. " | |
| "Useful after Image Understanding tool has recognized businesscard in the image tags." | |
| "This tool can find the actual business card text, name, address, email, website on the card." | |
| "Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
| ) | |
| ), | |
| Tool( | |
| name = "Layout Understanding", | |
| func=imun_layout.run, | |
| description=( | |
| "A wrapper around layout and table understanding. " | |
| "Useful after Image Understanding tool has recognized businesscard in the image tags." | |
| "This tool can find the actual business card text, name, address, email, website on the card." | |
| "Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
| ) | |
| ), | |
| Tool( | |
| name = "Celebrity Understanding", | |
| func=imun_celeb.run, | |
| description=( | |
| "A wrapper around celebrity understanding. " | |
| "Useful after Image Understanding tool has recognized people in the image tags that could be celebrities." | |
| "This tool can find the name of celebrities in the image." | |
| "Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
| ) | |
| ), | |
| BingSearchRun(api_wrapper=bing), | |
| Tool( | |
| name = "Photo Editing", | |
| func=edit_photo, | |
| description=( | |
| "A wrapper around photo editing. " | |
| "Useful to edit an image with a given instruction." | |
| "Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
| ) | |
| ), | |
| ] | |
| chain = initialize_agent(tools, llm, agent="conversational-assistant", verbose=True, memory=memory, return_intermediate_steps=True, max_iterations=4) | |
| log_state = log_state or "" | |
| print ("log_state {}".format(log_state)) | |
| log_state = str(uuid.uuid1()) | |
| print("langchain reloaded") | |
| # eproperties = {'custom_dimensions': {'key_1': 'value_1', 'key_2': 'value_2'}} | |
| properties = {'custom_dimensions': {'session': log_state}} | |
| get_logger().warning("langchain reloaded", extra=properties) | |
| history = [] | |
| history.append(("Show me what you got!", "Hi Human, Please upload an image to get started!")) | |
| return history, history, chain, log_state, \ | |
| gr.Textbox.update(visible=True), \ | |
| gr.Button.update(visible=True), \ | |
| gr.UploadButton.update(visible=True), \ | |
| gr.Row.update(visible=True), \ | |
| gr.HTML.update(visible=True), \ | |
| gr.Button.update(variant="secondary") | |
| # executes input typed by human | |
| def run_chain(chain, inp): | |
| # global chain | |
| output = "" | |
| try: | |
| output = chain.conversation(input=inp, keep_short=ARGS.noIntermediateConv) | |
| # output = chain.run(input=inp) | |
| except AuthenticationError as ae: | |
| output = AUTH_ERR_MSG + str(datetime.datetime.now()) + ". " + str(ae) | |
| print("output", output) | |
| except RateLimitError as rle: | |
| output = "\n\nRateLimitError: " + str(rle) | |
| except ValueError as ve: | |
| output = "\n\nValueError: " + str(ve) | |
| except InvalidRequestError as ire: | |
| output = "\n\nInvalidRequestError: " + str(ire) | |
| except Exception as e: | |
| output = "\n\n" + BUG_FOUND_MSG + ":\n\n" + str(e) | |
| return output | |
| # simple chat function wrapper | |
| class ChatWrapper: | |
| def __init__(self): | |
| self.lock = Lock() | |
| def __call__( | |
| self, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain], log_state | |
| ): | |
| """Execute the chat functionality.""" | |
| self.lock.acquire() | |
| try: | |
| print("\n==== date/time: " + str(datetime.datetime.now()) + " ====") | |
| print("inp: " + inp) | |
| properties = {'custom_dimensions': {'session': log_state}} | |
| get_logger().warning("inp: " + inp, extra=properties) | |
| history = history or [] | |
| # If chain is None, that is because no API key was provided. | |
| output = "Please paste your OpenAI key from openai.com to use this app. " + str(datetime.datetime.now()) | |
| ######################## | |
| # multi line | |
| outputs = run_chain(chain, inp) | |
| outputs = process_chain_output(outputs) | |
| print (" len(outputs) {}".format(len(outputs))) | |
| for i, output in enumerate(outputs): | |
| if i==0: | |
| history.append((inp, output)) | |
| else: | |
| history.append((None, output)) | |
| except Exception as e: | |
| raise e | |
| finally: | |
| self.lock.release() | |
| print (history) | |
| properties = {'custom_dimensions': {'session': log_state}} | |
| if outputs is None: | |
| outputs = "" | |
| get_logger().warning(str(json.dumps(outputs)), extra=properties) | |
| return history, history, "" | |
| def add_image_with_path(state, chain, imagepath, log_state): | |
| global ARGS | |
| state = state or [] | |
| url_input_for_chain = "http://0.0.0.0:{}/file={}".format(ARGS.port, imagepath) | |
| outputs = run_chain(chain, url_input_for_chain) | |
| ######################## | |
| # multi line response handling | |
| outputs = process_chain_output(outputs) | |
| for i, output in enumerate(outputs): | |
| if i==0: | |
| # state.append((f"", output)) | |
| state.append(((imagepath,), output)) | |
| else: | |
| state.append((None, output)) | |
| print (state) | |
| properties = {'custom_dimensions': {'session': log_state}} | |
| get_logger().warning("url_input_for_chain: " + url_input_for_chain, extra=properties) | |
| if outputs is None: | |
| outputs = "" | |
| get_logger().warning(str(json.dumps(outputs)), extra=properties) | |
| return state, state | |
| # upload image | |
| def add_image(state, chain, image, log_state): | |
| global ARGS | |
| state = state or [] | |
| # handling spaces in image path | |
| imagepath = image.name.replace(" ", "%20") | |
| url_input_for_chain = "http://0.0.0.0:{}/file={}".format(ARGS.port, imagepath) | |
| outputs = run_chain(chain, url_input_for_chain) | |
| ######################## | |
| # multi line response handling | |
| outputs = process_chain_output(outputs) | |
| for i, output in enumerate(outputs): | |
| if i==0: | |
| state.append(((imagepath,), output)) | |
| else: | |
| state.append((None, output)) | |
| print (state) | |
| properties = {'custom_dimensions': {'session': log_state}} | |
| get_logger().warning("url_input_for_chain: " + url_input_for_chain, extra=properties) | |
| if outputs is None: | |
| outputs = "" | |
| get_logger().warning(str(json.dumps(outputs)), extra=properties) | |
| return state, state | |
| # extract image url from response and process differently | |
| def replace_with_image_markup(text): | |
| img_url = None | |
| text= text.strip() | |
| url_idx = text.rfind(" ") | |
| img_url = text[url_idx + 1:].strip() | |
| if img_url.endswith((".", "?")): | |
| img_url = img_url[:-1] | |
| # if img_url is not None: | |
| # img_url = f"" | |
| return img_url | |
| # multi line response handling | |
| def process_chain_output(outputs): | |
| global ARGS | |
| EMPTY_AI_REPLY = "AI:" | |
| # print("outputs {}".format(outputs)) | |
| if isinstance(outputs, str): # single line output | |
| if outputs.strip() == EMPTY_AI_REPLY: | |
| outputs = REFRESH_MSG | |
| outputs = [outputs] | |
| elif isinstance(outputs, list): # multi line output | |
| if ARGS.noIntermediateConv: # remove the items with assistant in it. | |
| cleanOutputs = [] | |
| for output in outputs: | |
| if output.strip() == EMPTY_AI_REPLY: | |
| output = REFRESH_MSG | |
| # found an edited image url to embed | |
| img_url = None | |
| # print ("type list: {}".format(output)) | |
| if "assistant: here is the edited image " in output.lower(): | |
| img_url = replace_with_image_markup(output) | |
| cleanOutputs.append("Assistant: Here is the edited image") | |
| if img_url is not None: | |
| cleanOutputs.append((img_url,)) | |
| else: | |
| cleanOutputs.append(output) | |
| # cleanOutputs = cleanOutputs + output+ "." | |
| outputs = cleanOutputs | |
| return outputs | |
| def init_and_kick_off(): | |
| global ARGS | |
| # initalize chatWrapper | |
| chat = ChatWrapper() | |
| exampleTitle = """<h3>Examples to start conversation..</h3>""" | |
| comingSoon = """<center><b><p style="color:Red;">MM-REACT: March 21th version with image understanding capabilities</p></b></center>""" | |
| with gr.Blocks(css="#tryButton {width: 120px;}") as block: | |
| llm_state = gr.State() | |
| history_state = gr.State() | |
| chain_state = gr.State() | |
| log_state = gr.State() | |
| reset_btn = gr.Button(value="!!!CLICK to wake up MM-REACT!!!", variant="primary", elem_id="resetbtn").style(full_width=True) | |
| gr.HTML(comingSoon) | |
| example_image_size = 90 | |
| col_min_width = 80 | |
| button_variant = "primary" | |
| with gr.Row(): | |
| with gr.Column(scale=1.0, min_width=100): | |
| chatbot = gr.Chatbot(elem_id="chatbot", label="MM-REACT Bot").style(height=620) | |
| with gr.Column(scale=0.20, min_width=200, visible=False) as exampleCol: | |
| with gr.Row(): | |
| grExampleTitle = gr.HTML(exampleTitle, visible=False) | |
| with gr.Row(): | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example3Image = gr.Image("images/receipt.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example3ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
| # dummy text field to hold the path | |
| example3ImagePath = gr.Text("images/receipt.png", interactive=False, visible=False) | |
| with gr.Row(): | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example1Image = gr.Image("images/money.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example1ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
| # dummy text field to hold the path | |
| example1ImagePath = gr.Text("images/money.png", interactive=False, visible=False) | |
| with gr.Row(): | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example2Image = gr.Image("images/cartoon.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example2ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
| # dummy text field to hold the path | |
| example2ImagePath = gr.Text("images/cartoon.png", interactive=False, visible=False) | |
| with gr.Row(): | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example4Image = gr.Image("images/product.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example4ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
| # dummy text field to hold the path | |
| example4ImagePath = gr.Text("images/product.png", interactive=False, visible=False) | |
| with gr.Row(): | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example5Image = gr.Image("images/celebrity.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
| with gr.Column(scale=0.50, min_width=col_min_width): | |
| example5ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
| # dummy text field to hold the path | |
| example5ImagePath = gr.Text("images/celebrity.png", interactive=False, visible=False) | |
| with gr.Row(): | |
| with gr.Column(scale=0.75): | |
| message = gr.Textbox(label="Upload a pic and ask!", | |
| placeholder="Type your question about the uploaded image", | |
| lines=1, visible=False) | |
| with gr.Column(scale=0.15): | |
| submit = gr.Button(value="Send", variant="secondary", visible=False).style(full_width=True) | |
| with gr.Column(scale=0.10, min_width=0): | |
| btn = gr.UploadButton("🖼️", file_types=["image"], visible=False).style(full_width=True) | |
| message.submit(chat, inputs=[message, history_state, chain_state, log_state], outputs=[chatbot, history_state, message]) | |
| submit.click(chat, inputs=[message, history_state, chain_state, log_state], outputs=[chatbot, history_state, message]) | |
| btn.upload(add_image, inputs=[history_state, chain_state, btn, log_state], outputs=[history_state, chatbot]) | |
| # load the chain | |
| reset_btn.click(load_chain, inputs=[history_state, log_state], outputs=[chatbot, history_state, chain_state, log_state, message, submit, btn, exampleCol, grExampleTitle, reset_btn]) | |
| # setup listener click for the examples | |
| example1ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example1ImagePath, log_state], outputs=[history_state, chatbot]) | |
| example2ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example2ImagePath, log_state], outputs=[history_state, chatbot]) | |
| example3ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example3ImagePath, log_state], outputs=[history_state, chatbot]) | |
| example4ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example4ImagePath, log_state], outputs=[history_state, chatbot]) | |
| example5ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example5ImagePath, log_state], outputs=[history_state, chatbot]) | |
| # launch the app | |
| block.launch(server_name="0.0.0.0", server_port = ARGS.port) | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--port', type=int, required=False, default=7860) | |
| parser.add_argument('--openAIModel', type=str, required=False, default='azureChatGPT') | |
| parser.add_argument('--noIntermediateConv', default=False, action='store_true', help='if this flag is turned on no intermediate conversation should be shown') | |
| global ARGS | |
| ARGS = parser.parse_args() | |
| init_and_kick_off() |