Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| import os | |
| import warnings | |
| import traceback | |
| import gradio as gr | |
| import subprocess | |
| from huggingface_hub import Repository | |
| from git import Repo | |
| import requests | |
| warnings.filterwarnings('ignore') | |
| DOC_INDEXER = "indexer_multi.py" | |
| SPEC_INDEXER = "spec_indexer_multi.py" | |
| SPEC_DOC_INDEXER = "spec_doc_indexer_multi.py" | |
| BM25_INDEXER = "bm25_maker.py" | |
| DOC_INDEX_FILE = "indexed_docs.json" | |
| SPEC_INDEX_FILE = "indexed_specifications.json" | |
| SPEC_DOC_INDEX_FILE = "indexed_docs_content.zip" | |
| BM25_INDEX_FILE = "bm25s.zip" | |
| HF_SEARCH_REPO = "OrganizedProgrammers/3GPPDocFinder" | |
| REPO_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| def get_docs_stats(): | |
| if os.path.exists(DOC_INDEX_FILE): | |
| import json | |
| with open(DOC_INDEX_FILE, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return len(data["docs"]) | |
| return 0 | |
| def get_specs_stats(): | |
| if os.path.exists(SPEC_INDEX_FILE): | |
| import json | |
| with open(SPEC_INDEX_FILE, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return len(data["specs"]) | |
| return 0 | |
| def get_scopes_stats(): | |
| if os.path.exists(SPEC_INDEX_FILE): | |
| import json | |
| with open(SPEC_INDEX_FILE, 'r', encoding="utf-8") as f: | |
| data = json.load(f) | |
| return len(data['scopes']) | |
| return 0 | |
| def check_permissions(user: str, token: str): | |
| try: | |
| req = requests.get("https://huggingface.co/api/whoami-v2", verify=False, headers={"Accept": "application/json", "Authorization": f"Bearer {token}"}) | |
| if req.status_code != 200: | |
| return False | |
| reqJson: dict = req.json() | |
| if not reqJson.get("name") or reqJson['name'] != user: | |
| return False | |
| if not reqJson.get("orgs") or len(reqJson['orgs']) == 0: | |
| return False | |
| for org in reqJson['orgs']: | |
| if "645cfa1b5ebf379fd6d8a339" == org['id']: | |
| return True | |
| if not reqJson.get('auth') or reqJson['auth'] == {}: | |
| return False | |
| if reqJson['auth']['accessToken']['role'] != "fineGrained": | |
| return False | |
| for scope in reqJson['auth']['accessToken']['fineGrained']['scoped']: | |
| if scope['entity']['type'] == "org" and scope['entity']['_id'] == "645cfa1b5ebf379fd6d8a339" and all(perm in scope['permissions'] for perm in ['repo.write', 'repo.content.read']): | |
| return True | |
| return False | |
| except Exception as e: | |
| traceback.print_exception(e) | |
| return False | |
| def update_logged(user: str, token: str): | |
| if check_permissions(user, token): | |
| return gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) | |
| else: | |
| return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) | |
| def commit_and_push_3gppindexers(user, token, files, message, current_log=""): | |
| log = current_log + "\n" | |
| repo = Repo(REPO_DIR) | |
| origin = repo.remotes.origin | |
| repo.config_writer().set_value("user", "name", "3GPP Indexer Automatic Git Tool").release() | |
| repo.config_writer().set_value("user", "email", "example@mail.org").release() | |
| origin.pull() | |
| log += "Git pull succeed !\n" | |
| yield log | |
| repo.git.add(files) | |
| repo.index.commit(message) | |
| try: | |
| repo.git.push(f"https://{user}:{token}@huggingface.co/spaces/OrganizedProgrammers/3GPPIndexers") | |
| log += "Git push succeed !\n" | |
| yield log | |
| log += "Wait for Huggingface to restart the Space\n" | |
| yield log | |
| except Exception as e: | |
| log += f"Git push failed: {e}\n" | |
| yield log | |
| def commit_and_push_3gppdocfinder(token, files, message, current_log=""): | |
| log = current_log + "\n" | |
| if not token: | |
| log += "No token provided. Skipping HuggingFace push.\n" | |
| yield log | |
| return | |
| hf_repo_dir = os.path.join(REPO_DIR, "hf_spaces") | |
| repo = None | |
| if not os.path.exists(hf_repo_dir): | |
| repo = Repository( | |
| local_dir=hf_repo_dir, | |
| repo_type="space", | |
| clone_from=HF_SEARCH_REPO, | |
| git_user="3GPP Indexer Automatic Git Tool", | |
| git_email="example@mail.org", | |
| token=token, | |
| skip_lfs_files=True | |
| ) | |
| else: | |
| repo = Repository( | |
| local_dir=hf_repo_dir, | |
| repo_type="space", | |
| git_user="3GPP Indexer Automatic Git Tool", | |
| git_email="example@mail.org", | |
| token=token, | |
| skip_lfs_files=True | |
| ) | |
| repo.git_pull() | |
| # Copy artifact files to huggingface space | |
| for f in files: | |
| import shutil | |
| shutil.copy2(f, os.path.join(hf_repo_dir, f)) | |
| repo.git_add(auto_lfs_track=True) | |
| repo.git_commit(message) | |
| repo.git_push() | |
| log += "Pushed to HuggingFace.\n" | |
| yield log | |
| def refresh_stats(): | |
| return str(get_docs_stats()), str(get_specs_stats()), str(get_scopes_stats()) | |
| def stream_script_output(script_path, current_log=""): | |
| accumulated_output = current_log | |
| process = subprocess.Popen( | |
| ["python", script_path], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| bufsize=1, | |
| universal_newlines=True, | |
| ) | |
| for line in process.stdout: | |
| accumulated_output += line | |
| yield accumulated_output | |
| process.stdout.close() | |
| process.wait() | |
| yield accumulated_output | |
| def index_documents(user, token): | |
| log_output = "⏳ Indexation en cours...\n" | |
| # Désactiver tous les boutons | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log_output | |
| # Lancer l'indexation | |
| if not check_permissions(user, token): | |
| log_output += "❌ Identifiants invalides\n" | |
| yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output | |
| return | |
| for log in stream_script_output(DOC_INDEXER, log_output): | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
| log_output = log | |
| d = datetime.today().strftime("%d/%m/%Y-%H:%M:%S") | |
| for log in commit_and_push_3gppdocfinder(token, [DOC_INDEX_FILE], f"Update documents indexer via Indexer: {d}", log_output): | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
| log_output = log | |
| for log in commit_and_push_3gppindexers(user, token, [DOC_INDEX_FILE], f"Update documents indexer via Indexer: {d}", log_output): | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
| log_output = log | |
| # Réactiver les boutons à la fin | |
| log_output += "✅ Terminé.\n" | |
| yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output | |
| def index_specifications(user, token): | |
| log_output = "⏳ Indexation en cours...\n" | |
| # Désactiver tous les boutons | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log_output | |
| # Lancer l'indexation | |
| if not check_permissions(user, token): | |
| log_output += "❌ Identifiants invalides\n" | |
| yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output | |
| return | |
| for log in stream_script_output(SPEC_INDEXER, log_output): | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
| log_output = log | |
| for log in stream_script_output(SPEC_DOC_INDEXER, log_output): | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
| log_output = log | |
| for log in stream_script_output(BM25_INDEXER, log_output): | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
| log_output = log | |
| d = datetime.today().strftime("%d/%m/%Y-%H:%M:%S") | |
| for log in commit_and_push_3gppdocfinder(token, [SPEC_DOC_INDEX_FILE, BM25_INDEX_FILE, SPEC_INDEX_FILE], f"Update specifications indexer via Indexer: {d}", log_output): | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
| log_output = log | |
| for log in commit_and_push_3gppindexers(user, token, [SPEC_DOC_INDEX_FILE, BM25_INDEX_FILE, SPEC_INDEX_FILE], f"Update specifications indexer via Indexer: {d}", log_output): | |
| yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
| log_output = log | |
| # Réactiver les boutons à la fin | |
| log_output += "✅ Terminé.\n" | |
| yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("## 📄 3GPP Indexers") | |
| with gr.Row() as r1: | |
| with gr.Column(): | |
| git_user = gr.Textbox(label="Git user (for push/pull indexes)") | |
| git_pass = gr.Textbox(label="Git Token", type="password") | |
| btn_login = gr.Button("Login", variant="primary") | |
| with gr.Row(visible=False) as r2: | |
| with gr.Column(): | |
| doc_count = gr.Textbox(label="Docs Indexed", value=str(get_docs_stats()), interactive=False) | |
| btn_docs = gr.Button("Re-index Documents", variant="primary") | |
| with gr.Column(): | |
| spec_count = gr.Textbox(label="Specs Indexed", value=str(get_specs_stats()), interactive=False) | |
| btn_specs = gr.Button("Re-index Specifications", variant="primary") | |
| with gr.Column(): | |
| scope_count = gr.Textbox(label="Scopes Indexed", value=str(get_scopes_stats()), interactive=False) | |
| out = gr.Textbox(label="Output/Log", lines=13, autoscroll=True, visible=False) | |
| refresh = gr.Button(value="🔄 Refresh Stats", visible=False) | |
| btn_login.click(update_logged, inputs=[git_user, git_pass], outputs=[r1, r2, out, refresh]) | |
| btn_docs.click(index_documents, inputs=[git_user, git_pass], outputs=[btn_docs, btn_specs, refresh, out]) | |
| btn_specs.click(index_specifications, inputs=[git_user, git_pass], outputs=[btn_docs, btn_specs, refresh, out]) | |
| refresh.click(refresh_stats, outputs=[doc_count, spec_count, scope_count]) | |
| demo.launch() | |