|
|
import gradio as gr |
|
|
from huggingface_hub import HfApi, snapshot_download |
|
|
from huggingface_hub.utils import HfHubHTTPError, RepositoryNotFoundError |
|
|
import os |
|
|
import uuid |
|
|
import shutil |
|
|
import tempfile |
|
|
|
|
|
|
|
|
|
|
|
def get_hf_api(token): |
|
|
"""Initializes the HfApi client. Allows read-only operations if no token is provided.""" |
|
|
return HfApi(token=token if token else None) |
|
|
|
|
|
|
|
|
|
|
|
def handle_token_change(token): |
|
|
""" |
|
|
Called when the token is entered. Fetches user info and updates UI interactivity. |
|
|
""" |
|
|
if not token: |
|
|
|
|
|
update_dict = { |
|
|
manage_files_btn: gr.update(interactive=False), |
|
|
delete_repo_btn: gr.update(interactive=False), |
|
|
commit_btn: gr.update(interactive=False), |
|
|
author_input: gr.update(value=""), |
|
|
whoami_output: gr.update(value=None, visible=False) |
|
|
} |
|
|
return (None, "", *update_dict.values()) |
|
|
|
|
|
try: |
|
|
api = get_hf_api(token) |
|
|
user_info = api.whoami() |
|
|
username = user_info.get('name') |
|
|
|
|
|
|
|
|
update_dict = { |
|
|
manage_files_btn: gr.update(interactive=True), |
|
|
delete_repo_btn: gr.update(interactive=True), |
|
|
commit_btn: gr.update(interactive=True), |
|
|
author_input: gr.update(value=username), |
|
|
whoami_output: gr.update(value=user_info, visible=True) |
|
|
} |
|
|
return (token, username, *update_dict.values()) |
|
|
|
|
|
except HfHubHTTPError as e: |
|
|
gr.Warning(f"Invalid Token: {e}. You can only perform read-only actions.") |
|
|
update_dict = { |
|
|
manage_files_btn: gr.update(interactive=False), |
|
|
delete_repo_btn: gr.update(interactive=False), |
|
|
commit_btn: gr.update(interactive=False), |
|
|
whoami_output: gr.update(value=None, visible=False) |
|
|
} |
|
|
return (token, "", *update_dict.values()) |
|
|
|
|
|
|
|
|
def list_repos(token, author, repo_type): |
|
|
"""Lists repositories for a given author and type.""" |
|
|
if not author: |
|
|
gr.Info("Please enter an author (username or organization) to list repositories.") |
|
|
return gr.update(choices=[], value=None), gr.update(visible=False), gr.update(visible=False) |
|
|
try: |
|
|
api = get_hf_api(token) |
|
|
|
|
|
list_fn = getattr(api, f"list_{repo_type}s") |
|
|
repos = list_fn(author=author) |
|
|
repo_ids = [repo.id for repo in repos] |
|
|
return gr.update(choices=repo_ids, value=None), gr.update(visible=False), gr.update(visible=False) |
|
|
except HfHubHTTPError as e: |
|
|
gr.Error(f"Could not list repositories: {e}") |
|
|
return gr.update(choices=[], value=None), gr.update(visible=False), gr.update(visible=False) |
|
|
|
|
|
def handle_repo_selection(repo_id): |
|
|
"""Called when a repo is selected. Makes action buttons visible.""" |
|
|
if repo_id: |
|
|
return gr.update(visible=True), gr.update(visible=False) |
|
|
return gr.update(visible=False), gr.update(visible=False) |
|
|
|
|
|
def delete_repo(token, repo_id, repo_type): |
|
|
"""Deletes the selected repository.""" |
|
|
if not token: |
|
|
gr.Error("A write-enabled Hugging Face token is required to delete a repository.") |
|
|
return repo_id, gr.update(visible=True), gr.update(visible=False) |
|
|
if not repo_id: |
|
|
gr.Warning("No repository selected to delete.") |
|
|
return repo_id, gr.update(visible=True), gr.update(visible=False) |
|
|
try: |
|
|
api = get_hf_api(token) |
|
|
api.delete_repo(repo_id=repo_id, repo_type=repo_type) |
|
|
gr.Info(f"Successfully deleted '{repo_id}'.") |
|
|
return None, gr.update(visible=False), gr.update(visible=False) |
|
|
except HfHubHTTPError as e: |
|
|
gr.Error(f"Failed to delete repository: {e}") |
|
|
return repo_id, gr.update(visible=True), gr.update(visible=False) |
|
|
|
|
|
|
|
|
|
|
|
def show_file_manager(token, repo_id, repo_type): |
|
|
if not repo_id: |
|
|
gr.Warning("No repository selected.") |
|
|
return gr.update(visible=False), gr.update(), gr.update(), gr.update() |
|
|
try: |
|
|
api = get_hf_api(token) |
|
|
repo_files = api.list_repo_files(repo_id=repo_id, repo_type=repo_type) |
|
|
filtered_files = [f for f in repo_files if not f.startswith('.')] |
|
|
return ( |
|
|
gr.update(visible=True), gr.update(choices=filtered_files, value=None), |
|
|
gr.update(value="## Select a file to view or edit.", language='markdown'), "" |
|
|
) |
|
|
except Exception as e: |
|
|
gr.Error(f"Could not list files: {e}") |
|
|
return gr.update(visible=False), gr.update(), gr.update(), gr.update() |
|
|
|
|
|
def load_file_content(token, repo_id, repo_type, filepath): |
|
|
if not filepath: |
|
|
return gr.update(value="## Select a file to view its content.", language='markdown') |
|
|
try: |
|
|
api = get_hf_api(token) |
|
|
local_path = api.hf_hub_download(repo_id=repo_id, repo_type=repo_type, filename=filepath, token=token) |
|
|
with open(local_path, 'r', encoding='utf-8') as f: content = f.read() |
|
|
language = os.path.splitext(filepath)[1].lstrip('.').lower() |
|
|
if language == 'py': language = 'python' |
|
|
if language == 'js': language = 'javascript' |
|
|
if language == 'md': language = 'markdown' |
|
|
else: language = 'python' |
|
|
return gr.update(value=content, language=language) |
|
|
except Exception as e: |
|
|
return gr.update(value=f"Error loading file: {e}", language='plaintext') |
|
|
|
|
|
def commit_file(token, repo_id, repo_type, filepath, content, commit_message): |
|
|
if not token: gr.Error("A write-enabled token is required."); return |
|
|
if not filepath: gr.Warning("No file selected."); return |
|
|
if not commit_message: gr.Warning("Commit message cannot be empty."); return |
|
|
try: |
|
|
api = get_hf_api(token) |
|
|
api.upload_file( |
|
|
path_or_fileobj=bytes(content, 'utf-8'), path_in_repo=filepath, |
|
|
repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, |
|
|
) |
|
|
gr.Info(f"Successfully committed '{filepath}' to '{repo_id}'!") |
|
|
except Exception as e: |
|
|
gr.Error(f"Failed to commit file: {e}") |
|
|
|
|
|
|
|
|
|
|
|
def list_spaces_for_download(token, author): |
|
|
"""Lists spaces for a given author to populate the dropdown.""" |
|
|
if not author: |
|
|
gr.Info("Please enter an author (username or organization) to list spaces.") |
|
|
return gr.update(choices=[], value=None) |
|
|
try: |
|
|
api = get_hf_api(token) |
|
|
spaces = api.list_spaces(author=author) |
|
|
repo_ids = [space.id for space in spaces] |
|
|
if not repo_ids: |
|
|
gr.Warning(f"No Spaces found for author '{author}'.") |
|
|
return gr.update(choices=repo_ids, value=None) |
|
|
except RepositoryNotFoundError: |
|
|
gr.Warning(f"Author '{author}' not found or has no public spaces.") |
|
|
return gr.update(choices=[], value=None) |
|
|
except HfHubHTTPError as e: |
|
|
gr.Error(f"Could not list spaces: {e}") |
|
|
return gr.update(choices=[], value=None) |
|
|
|
|
|
def download_spaces_as_zip(token, selected_space_ids, progress=gr.Progress()): |
|
|
"""Downloads selected spaces and zips them up.""" |
|
|
if not selected_space_ids: |
|
|
gr.Warning("No spaces selected for download.") |
|
|
return gr.update(visible=False, value=None) |
|
|
|
|
|
|
|
|
download_root_dir = tempfile.mkdtemp() |
|
|
|
|
|
try: |
|
|
total_spaces = len(selected_space_ids) |
|
|
progress(0, desc="Starting download...") |
|
|
|
|
|
|
|
|
for i, repo_id in enumerate(selected_space_ids): |
|
|
progress((i) / total_spaces, desc=f"Downloading {repo_id} ({i+1}/{total_spaces})") |
|
|
|
|
|
|
|
|
folder_name = repo_id.replace("/", "__") |
|
|
target_path = os.path.join(download_root_dir, folder_name) |
|
|
|
|
|
try: |
|
|
|
|
|
snapshot_download( |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
local_dir=target_path, |
|
|
token=token, |
|
|
local_dir_use_symlinks=False, |
|
|
resume_download=True, |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
gr.Error(f"Failed to download {repo_id}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
progress(0.95, desc="All spaces downloaded. Creating ZIP file...") |
|
|
|
|
|
|
|
|
zip_base_name = os.path.join(tempfile.gettempdir(), f"hf_spaces_archive_{uuid.uuid4().hex}") |
|
|
|
|
|
|
|
|
zip_path = shutil.make_archive( |
|
|
base_name=zip_base_name, |
|
|
format='zip', |
|
|
root_dir=download_root_dir |
|
|
) |
|
|
|
|
|
progress(1, desc="Download ready!") |
|
|
gr.Info("ZIP file created successfully!") |
|
|
|
|
|
|
|
|
return gr.update(value=zip_path, visible=True) |
|
|
|
|
|
finally: |
|
|
|
|
|
shutil.rmtree(download_root_dir, ignore_errors=True) |
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Hugging Face Hub Toolkit") as demo: |
|
|
|
|
|
hf_token_state = gr.State(None) |
|
|
author_state = gr.State("") |
|
|
selected_repo_id = gr.State(None) |
|
|
selected_repo_type = gr.State("space") |
|
|
|
|
|
gr.Markdown("# Hugging Face Hub Toolkit") |
|
|
gr.Markdown("An intuitive interface to manage your Hugging Face repositories. **Enter a write-token for full access.**") |
|
|
|
|
|
with gr.Sidebar(): |
|
|
hf_token = gr.Textbox(label="Hugging Face API Token", type="password", placeholder="hf_...", scale=3) |
|
|
whoami_output = gr.JSON(label="Authenticated User", visible=False, scale=1) |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("Manage Repositories"): |
|
|
with gr.Row(equal_height=False): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 1. Select a Repository") |
|
|
author_input = gr.Textbox(label="Author (Username or Org)", interactive=True) |
|
|
repo_selector = gr.Radio(label="Select a Repository", interactive=True, value=None) |
|
|
with gr.Tabs() as repo_type_tabs: |
|
|
for repo_type, label in [("space", "Spaces"), ("model", "Models"), ("dataset", "Datasets")]: |
|
|
with gr.Tab(label, id=repo_type): |
|
|
btn = gr.Button(f"List {label}") |
|
|
btn.click( |
|
|
fn=list_repos, |
|
|
inputs=[hf_token_state, author_input, gr.State(repo_type)], |
|
|
outputs=[repo_selector, gr.Column(), gr.Column()] |
|
|
).then(fn=lambda author: author, inputs=author_input, outputs=author_state) |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
with gr.Column(visible=False) as action_panel: |
|
|
gr.Markdown("### 2. Choose an Action") |
|
|
with gr.Row(): |
|
|
manage_files_btn = gr.Button("Manage Files", interactive=False, scale=1) |
|
|
delete_repo_btn = gr.Button("Delete This Repo", variant="stop", interactive=False, scale=1) |
|
|
|
|
|
with gr.Column(visible=False) as editor_panel: |
|
|
gr.Markdown("### 3. Edit Files") |
|
|
file_selector = gr.Dropdown(label="Select File", interactive=_True) |
|
|
code_editor = gr.Code(label="File Content", language="markdown", interactive=True) |
|
|
commit_message_input = gr.Textbox(label="Commit Message", placeholder="e.g., Update README.md", interactive=True) |
|
|
commit_btn = gr.Button("Commit Changes", variant="primary", interactive=False) |
|
|
|
|
|
with gr.TabItem("Download Spaces (ZIP)"): |
|
|
gr.Markdown("## Bulk Download Spaces as a ZIP Archive") |
|
|
gr.Markdown("Select one or more Spaces from an author to download them as a single ZIP file. Each Space will be in its own folder inside the archive.") |
|
|
|
|
|
with gr.Row(): |
|
|
download_author_input = gr.Textbox( |
|
|
label="Author (Username or Org)", |
|
|
interactive=True, |
|
|
placeholder="e.g., huggingface-projects or osanseviero" |
|
|
) |
|
|
list_spaces_btn = gr.Button("List Spaces", variant="secondary") |
|
|
|
|
|
spaces_dropdown = gr.Dropdown(label="Available Spaces", info="Select the spaces you want to download.", multiselect=True, interactive=True) |
|
|
download_btn = gr.Button("Download Selected Spaces as ZIP", variant="primary") |
|
|
download_output_file = gr.File(label="Your Downloaded ZIP File", visible=False) |
|
|
|
|
|
|
|
|
list_spaces_btn.click( |
|
|
fn=list_spaces_for_download, |
|
|
inputs=[hf_token_state, download_author_input], |
|
|
outputs=[spaces_dropdown] |
|
|
) |
|
|
|
|
|
download_btn.click( |
|
|
fn=download_spaces_as_zip, |
|
|
inputs=[hf_token_state, spaces_dropdown], |
|
|
outputs=[download_output_file] |
|
|
) |
|
|
|
|
|
|
|
|
hf_token.change( |
|
|
fn=handle_token_change, inputs=hf_token, |
|
|
outputs=[hf_token_state, author_state, manage_files_btn, delete_repo_btn, commit_btn, author_input, whoami_output] |
|
|
) |
|
|
repo_type_tabs.select( |
|
|
fn=lambda rt: (rt, None, gr.update(choices=[], value=None), gr.update(visible=False), gr.update(visible=False)), |
|
|
inputs=repo_type_tabs, |
|
|
outputs=[selected_repo_type, selected_repo_id, repo_selector, action_panel, editor_panel] |
|
|
) |
|
|
repo_selector.select( |
|
|
fn=lambda repo_id: (repo_id, *handle_repo_selection(repo_id)), |
|
|
inputs=repo_selector, outputs=[selected_repo_id, action_panel, editor_panel] |
|
|
) |
|
|
manage_files_btn.click( |
|
|
fn=show_file_manager, inputs=[hf_token_state, selected_repo_id, selected_repo_type], |
|
|
outputs=[editor_panel, file_selector, code_editor, commit_message_input] |
|
|
) |
|
|
delete_repo_btn.click( |
|
|
fn=delete_repo, inputs=[hf_token_state, selected_repo_id, selected_repo_type], |
|
|
outputs=[selected_repo_id, action_panel, editor_panel], |
|
|
js="() => confirm('Are you sure you want to permanently delete this repository? This action cannot be undone.')" |
|
|
).then( |
|
|
fn=list_repos, |
|
|
inputs=[hf_token_state, author_state, selected_repo_type], |
|
|
outputs=[repo_selector, action_panel, editor_panel] |
|
|
) |
|
|
file_selector.change( |
|
|
fn=load_file_content, inputs=[hf_token_state, selected_repo_id, selected_repo_type, file_selector], |
|
|
outputs=code_editor |
|
|
) |
|
|
commit_btn.click( |
|
|
fn=commit_file, |
|
|
inputs=[hf_token_state, selected_repo_id, selected_repo_type, file_selector, code_editor, commit_message_input] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(debug=True) |