broadfield-dev's picture
Update app.py
6fb9d53 verified
import gradio as gr
from huggingface_hub import HfApi, snapshot_download
from huggingface_hub.utils import HfHubHTTPError
import os
import uuid
import shutil
import tempfile
# --- State Management and API Client ---
def get_hf_api(token):
"""Initializes the HfApi client. Allows read-only operations if no token is provided."""
return HfApi(token=token if token else None)
# --- Core Logic Functions ---
def handle_token_change(token):
"""
Called when the token is entered. Fetches user info, updates UI interactivity,
and auto-fills the author fields in both tabs.
"""
if not token:
updates = {
manage_files_btn: gr.update(interactive=False), delete_repo_btn: gr.update(interactive=False),
commit_btn: gr.update(interactive=False), author_input: gr.update(value=""),
download_author_input: gr.update(value=""), whoami_output: gr.update(value=None, visible=False)
}
return (None, *updates.values())
try:
api = get_hf_api(token)
user_info = api.whoami()
username = user_info.get('name')
updates = {
manage_files_btn: gr.update(interactive=True), delete_repo_btn: gr.update(interactive=True),
commit_btn: gr.update(interactive=True), author_input: gr.update(value=username),
download_author_input: gr.update(value=username), whoami_output: gr.update(value=user_info, visible=True)
}
return (token, *updates.values())
except HfHubHTTPError as e:
gr.Warning(f"Invalid Token: {e}. You can only perform read-only actions.")
updates = {
manage_files_btn: gr.update(interactive=False), delete_repo_btn: gr.update(interactive=False),
commit_btn: gr.update(interactive=False), whoami_output: gr.update(value=None, visible=False)
}
return (token, *updates.values())
def list_repos_backend(token, author, repo_type):
"""Backend function to fetch repository IDs."""
if not author:
gr.Info("Please enter an author (username or organization).")
return []
try:
api = get_hf_api(token)
list_fn = getattr(api, f"list_{repo_type}s")
repos = list_fn(author=author)
repo_ids = [repo.id for repo in repos]
gr.Info(f"Found {len(repo_ids)} {repo_type}s for '{author}'.")
return repo_ids
except HfHubHTTPError as e:
gr.Error(f"Could not list repositories: {e}")
return []
def list_repos_for_management(token, author, repo_type):
"""Gradio wrapper to update the management dropdown and reset the UI."""
repo_ids = list_repos_backend(token, author, repo_type)
return (
repo_type,
gr.update(choices=repo_ids, value=None),
gr.update(visible=False),
gr.update(visible=False)
)
def list_repos_for_download(token, author, repo_type):
"""Gradio wrapper to update the download dropdown."""
repo_ids = list_repos_backend(token, author, repo_type)
return repo_type, gr.update(choices=repo_ids, value=None)
def on_manage_repo_select(repo_id):
"""Shows action buttons when a repo is selected in the Manage tab."""
return gr.update(visible=bool(repo_id))
def delete_repo(token, repo_id, repo_type):
"""Deletes the selected repository."""
if not token:
gr.Error("A write-enabled Hugging Face token is required to delete a repository.")
return repo_id, gr.update(visible=True), gr.update(visible=False)
try:
api = get_hf_api(token)
api.delete_repo(repo_id=repo_id, repo_type=repo_type)
gr.Info(f"Successfully deleted '{repo_id}'.")
return None, gr.update(visible=False), gr.update(visible=False)
except HfHubHTTPError as e:
gr.Error(f"Failed to delete repository: {e}")
return repo_id, gr.update(visible=True), gr.update(visible=False)
# --- File Management Functions ---
def show_files_and_load_first(token, repo_id, repo_type):
"""Lists files in a dropdown and pre-loads the first file's content."""
if not repo_id:
gr.Warning("No repository selected.")
return gr.update(visible=False), gr.update(choices=[], value=None), gr.update(value="")
try:
api = get_hf_api(token)
repo_files = api.list_repo_files(repo_id=repo_id, repo_type=repo_type)
filtered_files = [f for f in repo_files if not f.startswith('.')]
if not filtered_files:
return (gr.update(visible=True), gr.update(choices=[], value=None),
gr.update(value="## This repository is empty.", language='markdown'))
first_file_path = filtered_files[0]
content, language = load_file_content_backend(token, repo_id, repo_type, first_file_path)
return (gr.update(visible=True), gr.update(choices=filtered_files, value=first_file_path),
gr.update(value=content, language=language))
except Exception as e:
gr.Error(f"Could not manage files: {e}")
return gr.update(visible=False), gr.update(choices=[], value=None), gr.update(value="")
def load_file_content_backend(token, repo_id, repo_type, filepath):
"""Backend logic to fetch and format file content."""
if not filepath: return "## Select a file to view.", 'markdown'
try:
api = get_hf_api(token)
local_path = api.hf_hub_download(repo_id=repo_id, repo_type=repo_type, filename=filepath, token=token)
with open(local_path, 'r', encoding='utf-8') as f: content = f.read()
ext = os.path.splitext(filepath)[1].lstrip('.').lower()
lang_map = {'py': 'python', 'js': 'javascript', 'md': 'markdown'}
language = lang_map.get(ext, 'python')
return content, language
except Exception as e:
return f"Error loading file: {e}", 'python'
def load_file_content_for_editor(token, repo_id, repo_type, filepath):
"""Gradio wrapper to update the code editor when a file is selected."""
content, language = load_file_content_backend(token, repo_id, repo_type, filepath)
return gr.update(value=content, language=language)
def commit_file(token, repo_id, repo_type, filepath, content, commit_message):
"""Commits a file to the repository."""
if not token: gr.Error("A write-enabled token is required."); return
if not filepath: gr.Warning("No file selected."); return
if not commit_message: gr.Warning("Commit message cannot be empty."); return
try:
api = get_hf_api(token)
api.upload_file(bytes(content, 'utf-8'), path_in_repo=filepath,
repo_id=repo_id, repo_type=repo_type, commit_message=commit_message)
gr.Info(f"Successfully committed '{filepath}' to '{repo_id}'!")
except Exception as e: gr.Error(f"Failed to commit file: {e}")
# --- Download Tab Functions ---
def download_repos_as_zip(token, selected_repo_ids, repo_type, progress=gr.Progress()):
"""Downloads selected repos and zips them."""
if not selected_repo_ids:
gr.Warning("No repositories selected for download."); return gr.update(value=None, visible=False)
if not repo_type:
gr.Warning("Please list a repository type (Spaces, etc.) before downloading."); return gr.update(value=None, visible=False)
download_root_dir = tempfile.mkdtemp()
try:
total_repos = len(selected_repo_ids)
for i, repo_id in enumerate(selected_repo_ids):
progress((i) / total_repos, desc=f"Downloading {repo_id} ({i+1}/{total_repos})")
try:
folder_name = repo_id.replace("/", "__")
snapshot_download(repo_id=repo_id, repo_type=repo_type, local_dir=os.path.join(download_root_dir, folder_name),
token=token, local_dir_use_symlinks=False, resume_download=True)
except Exception as e: gr.Error(f"Failed to download {repo_id}: {e}")
progress(0.95, desc="All items downloaded. Creating ZIP file...")
zip_base_name = os.path.join(tempfile.gettempdir(), f"hf_{repo_type}s_archive_{uuid.uuid4().hex}")
zip_path = shutil.make_archive(zip_base_name, 'zip', download_root_dir)
progress(1, desc="Download ready!")
gr.Info("ZIP file created successfully!")
return gr.update(value=zip_path, visible=True)
finally:
shutil.rmtree(download_root_dir, ignore_errors=True)
# --- Gradio UI Layout ---
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Hugging Face Hub Toolkit") as demo:
# State management
hf_token_state = gr.State(None)
manage_repo_type_state = gr.State(None)
download_repo_type_state = gr.State(None)
gr.Markdown("# Hugging Face Hub Toolkit")
with gr.Sidebar():
hf_token = gr.Textbox(label="Hugging Face API Token", type="password", placeholder="hf_...")
whoami_output = gr.JSON(label="Authenticated User", visible=False)
with gr.Tabs():
with gr.TabItem("Manage Repositories"):
# Define all components first
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Select a Repository")
author_input = gr.Textbox(label="Author (Username or Org)")
with gr.Row():
manage_buttons = [gr.Button(f"List {label}") for label in ["Spaces", "Models", "Datasets"]]
manage_repo_dropdown = gr.Dropdown(label="Select a Repository", interactive=True)
with gr.Column(scale=2):
with gr.Column(visible=False) as action_panel:
gr.Markdown("### 2. Choose an Action")
manage_files_btn = gr.Button("Manage Files", interactive=False)
delete_repo_btn = gr.Button("Delete This Repo", variant="stop", interactive=False)
with gr.Column(visible=False) as editor_panel:
gr.Markdown("### 3. Edit Files")
file_selector = gr.Dropdown(label="Select File", interactive=True)
code_editor = gr.Code(label="File Content", interactive=True)
commit_message_input = gr.Textbox(label="Commit Message", placeholder="e.g., Update README.md")
commit_btn = gr.Button("Commit Changes", variant="primary", interactive=False)
# Now, attach event handlers
repo_types = ["space", "model", "dataset"]
for i, btn in enumerate(manage_buttons):
btn.click(fn=list_repos_for_management,
inputs=[hf_token_state, author_input, gr.State(repo_types[i])],
outputs=[manage_repo_type_state, manage_repo_dropdown, action_panel, editor_panel])
with gr.TabItem("Bulk Download (ZIP)"):
# Define all components first
gr.Markdown("## Download Multiple Repositories as a ZIP")
download_author_input = gr.Textbox(label="Author (Username or Org)")
with gr.Row():
download_buttons = [gr.Button(f"List {label}") for label in ["Spaces", "Models", "Datasets"]]
download_repo_dropdown = gr.Dropdown(label="Select Repositories", multiselect=True, interactive=True)
download_btn = gr.Button("Download Selected as ZIP", variant="primary")
download_output_file = gr.File(label="Your Downloaded ZIP File", visible=False)
# Now, attach event handlers
for i, btn in enumerate(download_buttons):
btn.click(fn=list_repos_for_download,
inputs=[hf_token_state, download_author_input, gr.State(repo_types[i])],
outputs=[download_repo_type_state, download_repo_dropdown])
# --- Global and Cross-Tab Event Handlers ---
hf_token.change(fn=handle_token_change, inputs=hf_token,
outputs=[hf_token_state, manage_files_btn, delete_repo_btn, commit_btn, author_input, download_author_input, whoami_output])
manage_repo_dropdown.select(fn=on_manage_repo_select, inputs=manage_repo_dropdown, outputs=action_panel)
manage_files_btn.click(fn=show_files_and_load_first,
inputs=[hf_token_state, manage_repo_dropdown, manage_repo_type_state],
outputs=[editor_panel, file_selector, code_editor])
delete_repo_btn.click(fn=delete_repo, inputs=[hf_token_state, manage_repo_dropdown, manage_repo_type_state],
outputs=[manage_repo_dropdown, action_panel, editor_panel],
js="() => confirm('Are you sure you want to permanently delete this repository?')")
file_selector.change(fn=load_file_content_for_editor,
inputs=[hf_token_state, manage_repo_dropdown, manage_repo_type_state, file_selector],
outputs=code_editor)
commit_btn.click(fn=commit_file,
inputs=[hf_token_state, manage_repo_dropdown, manage_repo_type_state, file_selector, code_editor, commit_message_input])
download_btn.click(fn=download_repos_as_zip,
inputs=[hf_token_state, download_repo_dropdown, download_repo_type_state],
outputs=[download_output_file])
if __name__ == "__main__":
demo.launch(debug=True)