Spaces:
Sleeping
Sleeping
| import base64 | |
| import cv2 | |
| import glob | |
| import json | |
| import math | |
| import os | |
| import pytz | |
| import random | |
| import re | |
| import requests | |
| import streamlit as st | |
| import streamlit.components.v1 as components | |
| import textract | |
| import time | |
| import zipfile | |
| import uuid | |
| import platform | |
| import extra_streamlit_components as stx | |
| from audio_recorder_streamlit import audio_recorder | |
| from bs4 import BeautifulSoup | |
| from collections import deque | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from gradio_client import Client | |
| from huggingface_hub import InferenceClient | |
| from io import BytesIO | |
| from moviepy.editor import VideoFileClip | |
| from PIL import Image | |
| from PyPDF2 import PdfReader | |
| from templates import bot_template, css, user_template | |
| from urllib.parse import quote | |
| from xml.etree import ElementTree as ET | |
| import openai | |
| from openai import OpenAI | |
| # Load environment variables | |
| load_dotenv() | |
| # Configuration | |
| Site_Name = 'Scholarly-Article-Document-Search-With-Memory' | |
| title = "🔬🧠ScienceBrain.AI" | |
| helpURL = 'https://huggingface.co/awacke1' | |
| bugURL = 'https://huggingface.co/spaces/awacke1' | |
| icons = '🔬' | |
| st.set_page_config( | |
| page_title=title, | |
| page_icon=icons, | |
| layout="wide", | |
| initial_sidebar_state="auto", | |
| menu_items={ | |
| 'Get Help': helpURL, | |
| 'Report a bug': bugURL, | |
| 'About': title | |
| } | |
| ) | |
| # Initialize cookie manager | |
| cookie_manager = stx.CookieManager() | |
| # File to store chat history and user data | |
| CHAT_FILE = "chat_history.txt" | |
| # API configurations | |
| API_URL = 'https://qe55p8afio98s0u3.us-east-1.aws.endpoints.huggingface.cloud' | |
| API_KEY = st.secrets['API_KEY'] | |
| MODEL1 = "meta-llama/Llama-2-7b-chat-hf" | |
| MODEL1URL = "https://huggingface.co/meta-llama/Llama-2-7b-chat-hf" | |
| HF_KEY = st.secrets['HF_KEY'] | |
| headers = { | |
| "Authorization": f"Bearer {HF_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| # OpenAI client setup | |
| client = OpenAI(api_key=st.secrets['OPENAI_API_KEY'], organization=st.secrets['OPENAI_ORG_ID']) | |
| MODEL = "gpt-4-1106-preview" | |
| # Session state initialization | |
| if "openai_model" not in st.session_state: | |
| st.session_state["openai_model"] = MODEL | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "users" not in st.session_state: | |
| st.session_state.users = [] | |
| if "current_user" not in st.session_state: | |
| st.session_state.current_user = get_or_create_user() | |
| # Sidebar configurations | |
| should_save = st.sidebar.checkbox("💾 Save", value=True, help="Save your session data.") | |
| if st.sidebar.button("Clear Session"): | |
| st.session_state.messages = [] | |
| # Function to save chat history and user data to file | |
| def save_data(): | |
| with open(CHAT_FILE, 'w') as f: | |
| json.dump({ | |
| 'messages': st.session_state.messages, | |
| 'users': st.session_state.users | |
| }, f) | |
| # Function to load chat history and user data from file | |
| def load_data(): | |
| try: | |
| with open(CHAT_FILE, 'r') as f: | |
| data = json.load(f) | |
| st.session_state.messages = data['messages'] | |
| st.session_state.users = data['users'] | |
| except FileNotFoundError: | |
| st.session_state.messages = [] | |
| st.session_state.users = [] | |
| # Load data at the start | |
| if 'data_loaded' not in st.session_state: | |
| load_data() | |
| st.session_state.data_loaded = True | |
| # Function to get or create user | |
| def get_or_create_user(): | |
| user_id = cookie_manager.get(cookie='user_id') | |
| if not user_id: | |
| user_id = str(uuid.uuid4()) | |
| cookie_manager.set('user_id', user_id) | |
| user = next((u for u in st.session_state.users if u['id'] == user_id), None) | |
| if not user: | |
| user = { | |
| 'id': user_id, | |
| 'name': random.choice(['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Henry']), | |
| 'browser': f"{platform.system()} - {st.session_state.get('browser_info', 'Unknown')}" | |
| } | |
| st.session_state.users.append(user) | |
| save_data() | |
| return user | |
| # HTML5 based Speech Synthesis (Text to Speech in Browser) | |
| def SpeechSynthesis(result): | |
| documentHTML5 = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Read It Aloud</title> | |
| <script type="text/javascript"> | |
| function readAloud() {{ | |
| const text = document.getElementById("textArea").value; | |
| const speech = new SpeechSynthesisUtterance(text); | |
| window.speechSynthesis.speak(speech); | |
| }} | |
| </script> | |
| </head> | |
| <body> | |
| <h1>🔊 Read It Aloud</h1> | |
| <textarea id="textArea" rows="10" cols="80">{result}</textarea> | |
| <br> | |
| <button onclick="readAloud()">🔊 Read Aloud</button> | |
| </body> | |
| </html> | |
| """ | |
| components.html(documentHTML5, width=1280, height=300) | |
| # Function to generate filename | |
| def generate_filename(prompt, file_type): | |
| central = pytz.timezone('US/Central') | |
| safe_date_time = datetime.now(central).strftime("%m%d_%H%M") | |
| replaced_prompt = prompt.replace(" ", "_").replace("\n", "_") | |
| safe_prompt = "".join(x for x in replaced_prompt if x.isalnum() or x == "_")[:240] | |
| return f"{safe_date_time}_{safe_prompt}.{file_type}" | |
| # Function to process text | |
| def process_text(text_input): | |
| if text_input: | |
| st.session_state.messages.append({"role": "user", "content": text_input}) | |
| with st.chat_message("user"): | |
| st.markdown(text_input) | |
| with st.chat_message("assistant"): | |
| message_placeholder = st.empty() | |
| full_response = "" | |
| for response in client.chat.completions.create( | |
| model=st.session_state["openai_model"], | |
| messages=[ | |
| {"role": m["role"], "content": m["content"]} | |
| for m in st.session_state.messages | |
| ], | |
| stream=True, | |
| ): | |
| full_response += (response.choices[0].delta.content or "") | |
| message_placeholder.markdown(full_response + "▌") | |
| message_placeholder.markdown(full_response) | |
| st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
| filename = generate_filename(text_input, "md") | |
| create_file(filename, text_input, full_response, should_save) | |
| return full_response | |
| # Function to create file | |
| def create_file(filename, prompt, response, should_save=True): | |
| if should_save: | |
| with open(filename, "w", encoding="utf-8") as f: | |
| f.write(prompt + "\n\n" + response) | |
| # ArXiv search function | |
| def search_arxiv(query): | |
| base_url = "http://export.arxiv.org/api/query?" | |
| search_query = f"search_query=all:{quote(query)}&start=0&max_results=5" | |
| response = requests.get(base_url + search_query) | |
| if response.status_code == 200: | |
| results = [] | |
| for entry in response.text.split('<entry>')[1:]: | |
| title = entry.split('<title>')[1].split('</title>')[0] | |
| summary = entry.split('<summary>')[1].split('</summary>')[0] | |
| link = entry.split('<id>')[1].split('</id>')[0] | |
| results.append(f"Title: {title}\nSummary: {summary}\nLink: {link}\n") | |
| return "\n".join(results) | |
| else: | |
| return "Error fetching results from ArXiv." | |
| # Sidebar for user information and settings | |
| with st.sidebar: | |
| st.title("User Info") | |
| st.write(f"Current User: {st.session_state.current_user['name']}") | |
| st.write(f"Browser: {st.session_state.current_user['browser']}") | |
| new_name = st.text_input("Change your name:") | |
| if st.button("Update Name"): | |
| if new_name: | |
| for user in st.session_state.users: | |
| if user['id'] == st.session_state.current_user['id']: | |
| user['name'] = new_name | |
| st.session_state.current_user['name'] = new_name | |
| save_data() | |
| st.success(f"Name updated to {new_name}") | |
| st.rerun() | |
| st.title("Active Users") | |
| for user in st.session_state.users: | |
| st.write(f"{user['name']} ({user['browser']})") | |
| # Main chat area | |
| st.title("Personalized Real-Time Chat with ArXiv Search and AI") | |
| # Display chat messages | |
| for message in st.session_state.messages: | |
| with st.chat_message(message["role"]): | |
| st.markdown(message["content"]) | |
| # Chat input | |
| if prompt := st.chat_input("What would you like to know?"): | |
| st.session_state.messages.append({"role": "user", "content": prompt}) | |
| with st.chat_message("user"): | |
| st.markdown(prompt) | |
| # Check if it's an ArXiv search query | |
| if prompt.lower().startswith("arxiv:"): | |
| query = prompt[6:].strip() | |
| with st.chat_message("assistant"): | |
| with st.spinner("Searching ArXiv..."): | |
| search_results = search_arxiv(query) | |
| st.markdown(f"Search results for '{query}':\n\n{search_results}") | |
| # Get AI commentary on the search results | |
| ai_commentary = process_text(f"Provide a brief analysis of these ArXiv search results: {search_results}") | |
| st.markdown(f"\nAI Analysis:\n{ai_commentary}") | |
| st.session_state.messages.append({"role": "assistant", "content": f"Search results for '{query}':\n\n{search_results}\n\nAI Analysis:\n{ai_commentary}"}) | |
| else: | |
| # Regular chat processing | |
| with st.chat_message("assistant"): | |
| message_placeholder = st.empty() | |
| full_response = "" | |
| for response in client.chat.completions.create( | |
| model=st.session_state["openai_model"], | |
| messages=[ | |
| {"role": m["role"], "content": m["content"]} | |
| for m in st.session_state.messages | |
| ], | |
| stream=True, | |
| ): | |
| full_response += (response.choices[0].delta.content or "") | |
| message_placeholder.markdown(full_response + "▌") | |
| message_placeholder.markdown(full_response) | |
| st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
| save_data() | |
| st.rerun() | |
| # Polling for updates | |
| if st.button("Refresh Chat"): | |
| st.rerun() | |
| # Auto-refresh | |
| if 'last_refresh' not in st.session_state: | |
| st.session_state.last_refresh = time.time() | |
| if time.time() - st.session_state.last_refresh > 5: # Refresh every 5 seconds | |
| st.session_state.last_refresh = time.time() | |
| st.rerun() | |
| # Main function to handle different input types | |
| def main(): | |
| st.markdown("##### GPT-4 Multimodal AI Assistant: Text, Audio, Image, & Video") | |
| option = st.selectbox("Select an option", ("Text", "Image", "Audio", "Video")) | |
| if option == "Text": | |
| text_input = st.text_input("Enter your text:") | |
| if text_input: | |
| process_text(text_input) | |
| elif option == "Image": | |
| text = "Help me understand what is in this picture and list ten facts as markdown outline with appropriate emojis that describes what you see." | |
| text_input = st.text_input(label="Enter text prompt to use with Image context.", value=text) | |
| image_input = st.file_uploader("Upload an image", type=["png", "jpg", "jpeg"]) | |
| if image_input: | |
| image = Image.open(image_input) | |
| st.image(image, caption="Uploaded Image", use_column_width=True) | |
| if st.button("Analyze Image"): | |
| with st.spinner("Analyzing..."): | |
| image_byte_arr = BytesIO() | |
| image.save(image_byte_arr, format='PNG') | |
| image_byte_arr = image_byte_arr.getvalue() | |
| response = client.chat.completions.create( | |
| model="gpt-4-vision-preview", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": text_input}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{base64.b64encode(image_byte_arr).decode()}" | |
| } | |
| }, | |
| ], | |
| } | |
| ], | |
| max_tokens=300, | |
| ) | |
| st.write(response.choices[0].message.content) | |
| elif option == "Audio": | |
| text = "You are generating a transcript summary. Create a summary of the provided transcription. Respond in Markdown." | |
| text_input = st.text_input(label="Enter text prompt to use with Audio context.", value=text) | |
| audio_file = st.file_uploader("Upload an audio file", type=["mp3", "wav"]) | |
| if audio_file: | |
| if st.button("Transcribe Audio"): | |
| with st.spinner("Transcribing..."): | |
| transcription = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file | |
| ) | |
| st.write(transcription.text) | |
| st.session_state.messages.append({"role": "user", "content": f"Transcription: {transcription.text}"}) | |
| process_text(f"{text}\n\nTranscription: {transcription.text}") | |