Github-Transfer / app.py
openfree's picture
Update app.py
8360e3e verified
raw
history blame
29.2 kB
import gradio as gr
from huggingface_hub import HfApi
import uuid
from slugify import slugify
import os
import json
import subprocess
import tempfile
import requests
import shutil
import time
from pathlib import Path
from typing import Optional, Dict, List
def is_lfs_pointer_file(filepath):
try:
with open(filepath, 'rb') as f:
header = f.read(100)
return header.startswith(b'version https://git-lfs.github.com/spec/v1')
except:
return False
def remove_lfs_files(folder):
removed_files = []
for root, dirs, files in os.walk(folder):
if '.git' in root:
continue
for file in files:
filepath = os.path.join(root, file)
if is_lfs_pointer_file(filepath):
os.remove(filepath)
removed_files.append(filepath.replace(folder + os.sep, ''))
return removed_files
def analyze_repository(src_path: Path) -> Dict:
analysis = {
"has_requirements": False,
"has_readme": False,
"main_language": "python",
"key_files": [],
"dependencies": [],
"description": "",
"entry_points": [],
"model_files": [],
"config_files": []
}
req_file = src_path / "requirements.txt"
if req_file.exists():
analysis["has_requirements"] = True
try:
reqs = req_file.read_text(encoding="utf-8").strip().split("\n")
cleaned_deps = []
for r in reqs:
r = r.strip()
if r and not r.startswith("#"):
if "opencv-python==4.10.0" in r:
r = "opencv-python>=4.10.0.82"
elif "opencv-python==4.10" in r:
r = "opencv-python>=4.10.0.82"
if "==" in r and not r.startswith("git+"):
pkg_name = r.split("==")[0]
if pkg_name.lower() in ["torch", "tensorflow", "transformers", "numpy"]:
cleaned_deps.append(r)
else:
version = r.split("==")[1]
if version.count('.') == 1:
version = version + ".0"
cleaned_deps.append(f"{pkg_name}>={version}")
else:
cleaned_deps.append(r)
analysis["dependencies"] = cleaned_deps
except:
analysis["dependencies"] = []
for readme_name in ["README.md", "readme.md", "README.rst", "README.txt"]:
readme_file = src_path / readme_name
if readme_file.exists():
analysis["has_readme"] = True
try:
readme_content = readme_file.read_text(encoding="utf-8")
analysis["readme_content"] = readme_content[:5000]
lines = readme_content.split("\n")
for i, line in enumerate(lines[:10]):
if line.strip() and not line.startswith("#") and not line.startswith("!"):
analysis["description"] = line.strip()
break
except:
pass
py_files = list(src_path.glob("**/*.py"))
for py_file in py_files[:20]:
if "__pycache__" not in str(py_file) and ".git" not in str(py_file):
relative_path = py_file.relative_to(src_path)
if any(name in py_file.name for name in ["main.py", "app.py", "demo.py", "run.py", "server.py", "streamlit_app.py"]):
analysis["entry_points"].append(str(relative_path))
try:
content = py_file.read_text(encoding="utf-8")[:1000]
if "if __name__" in content and "main" in content:
analysis["entry_points"].append(str(relative_path))
if any(lib in content for lib in ["torch", "tensorflow", "transformers", "numpy", "pandas", "cv2", "PIL"]):
analysis["key_files"].append({
"path": str(relative_path),
"preview": content[:500]
})
except:
pass
model_extensions = [".pth", ".pt", ".ckpt", ".h5", ".pb", ".onnx", ".safetensors"]
for ext in model_extensions:
model_files = list(src_path.glob(f"**/*{ext}"))
for mf in model_files[:5]:
if ".git" not in str(mf):
analysis["model_files"].append(str(mf.relative_to(src_path)))
config_patterns = ["config.json", "config.yaml", "config.yml", "*.json", "*.yaml"]
for pattern in config_patterns:
config_files = list(src_path.glob(pattern))
for cf in config_files[:5]:
if ".git" not in str(cf):
analysis["config_files"].append(str(cf.relative_to(src_path)))
return analysis
def generate_gradio_app(repo_url: str, analysis: Dict) -> Dict:
context = f"""Repository URL: {repo_url}
Repository Analysis:
- Description: {analysis.get('description', 'N/A')}
- Main Dependencies: {', '.join(analysis['dependencies'][:10])}
- Entry Points: {', '.join(analysis['entry_points'][:5])}
- Model Files: {', '.join(analysis['model_files'][:3])}
- Config Files: {', '.join(analysis['config_files'][:3])}
Key Files Found:
"""
for kf in analysis.get('key_files', [])[:3]:
context += f"\n--- {kf['path']} ---\n{kf['preview']}\n"
if analysis.get('readme_content'):
context += f"\n--- README.md (excerpt) ---\n{analysis['readme_content'][:2000]}\n"
system_prompt = """You are an expert at creating Gradio apps from GitHub repositories.
Your task is to generate a complete, working Gradio interface that demonstrates the main functionality of the repository.
CRITICAL REQUIREMENTS:
1. The app.py must be FULLY FUNCTIONAL and runnable
2. DO NOT use 'from agent import' or any repository-specific imports that won't exist
3. Handle errors gracefully with clear user feedback
4. Include API key inputs when external services are required
5. Create intuitive UI components for the main features
6. Always use gradio>=5.35.0
Return ONLY valid JSON with these exact keys:
- app_py: Complete Gradio app code
- requirements_txt: All necessary dependencies including gradio>=5.35.0
- summary: Brief description of what the app does"""
# Fireworks AI API ์‹œ๋„
fireworks_key = os.getenv("FIREWORKS_API_KEY")
if fireworks_key:
try:
url = "https://api.fireworks.ai/inference/v1/chat/completions"
payload = {
"model": "accounts/fireworks/models/qwen3-coder-480b-a35b-instruct",
"max_tokens": 4096,
"top_p": 1,
"top_k": 40,
"presence_penalty": 0,
"frequency_penalty": 0,
"temperature": 0.6,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Create a fully functional Gradio app for this repository:\n\n{context[:8000]}"}
]
}
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {fireworks_key.strip()}"
}
r = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30)
if r.status_code == 200:
response_text = r.json()["choices"][0]["message"]["content"]
print("โœ… Fireworks AI๋กœ ์•ฑ ์ƒ์„ฑ ์„ฑ๊ณต")
try:
if "```json" in response_text:
start = response_text.find("```json") + 7
end = response_text.find("```", start)
response_text = response_text[start:end].strip()
elif "```" in response_text:
start = response_text.find("```") + 3
end = response_text.find("```", start)
response_text = response_text[start:end].strip()
result = json.loads(response_text)
if not all(key in result for key in ["app_py", "requirements_txt", "summary"]):
raise ValueError("Missing required keys in response")
if "gradio" not in result.get("requirements_txt", "").lower():
result["requirements_txt"] = "gradio>=5.35.0\n" + result.get("requirements_txt", "")
return result
except (json.JSONDecodeError, ValueError) as e:
print(f"โš ๏ธ JSON ํŒŒ์‹ฑ ์˜ค๋ฅ˜: {e}")
return None
except Exception as e:
print(f"โš ๏ธ Fireworks AI API ์˜ค๋ฅ˜: {e}")
print("โ„น๏ธ AI API๊ฐ€ ์—†์–ด ๊ธฐ๋ณธ ํ…œํ”Œ๋ฆฟ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.")
return create_smart_template(repo_url, analysis)
def create_smart_template(repo_url: str, analysis: Dict) -> Dict:
repo_name = Path(repo_url.rstrip("/")).name
description = analysis.get("description", "A project deployed from GitHub") if analysis else "A project deployed from GitHub"
deps = " ".join(analysis.get("dependencies", [])) if analysis else ""
has_cv = any(lib in deps for lib in ["cv2", "PIL", "pillow", "opencv"])
has_nlp = any(lib in deps for lib in ["transformers", "nltk", "spacy"])
has_3d = any(lib in deps for lib in ["gaussian", "rasterizer", "plyfile", "trimesh"])
requirements = ["gradio>=5.35.0"]
if analysis and analysis.get("dependencies"):
filtered_deps = []
for dep in analysis["dependencies"][:15]:
if not dep.startswith("git+") and not dep.startswith("-e") and not dep.startswith("file:"):
if "==" in dep and dep.split("==")[0].lower() not in ["torch", "tensorflow", "numpy"]:
pkg_name = dep.split("==")[0]
version = dep.split("==")[1]
filtered_deps.append(f"{pkg_name}>={version}")
else:
filtered_deps.append(dep)
requirements.extend(filtered_deps)
if has_3d or "gaussian" in repo_name.lower():
app_code = f'''import gradio as gr
import os
def process_3d(input_file):
if input_file is None:
return "Please upload a 3D file or image"
info = """
## โš ๏ธ Build Requirements Notice
This project requires:
1. CUDA-enabled GPU
2. Custom C++/CUDA extensions compilation
Original repository: {repo_url}
"""
return info
with gr.Blocks(title="{repo_name}") as demo:
gr.Markdown(f"""
# {repo_name.replace("-", " ").title()}
{description}
This space was created from: [{repo_url}]({repo_url})
""")
with gr.Row():
with gr.Column():
input_file = gr.File(label="Upload 3D File or Image")
process_btn = gr.Button("Process", variant="primary")
with gr.Column():
output_info = gr.Markdown()
process_btn.click(
fn=process_3d,
inputs=input_file,
outputs=output_info
)
if __name__ == "__main__":
demo.launch()
'''
elif has_cv:
app_code = f'''import gradio as gr
from PIL import Image
import numpy as np
def process_image(image):
if image is None:
return None, "Please upload an image"
img_array = np.array(image)
processed = Image.fromarray(img_array)
info = f"Image shape: {{img_array.shape}}"
return processed, info
with gr.Blocks(title="{repo_name}") as demo:
gr.Markdown(f"""
# {repo_name.replace("-", " ").title()}
{description}
This space was created from: [{repo_url}]({repo_url})
""")
with gr.Row():
with gr.Column():
input_image = gr.Image(label="Input Image", type="pil")
process_btn = gr.Button("Process Image", variant="primary")
with gr.Column():
output_image = gr.Image(label="Output Image")
output_info = gr.Textbox(label="Information")
process_btn.click(
fn=process_image,
inputs=input_image,
outputs=[output_image, output_info]
)
if __name__ == "__main__":
demo.launch()
'''
elif has_nlp:
app_code = f'''import gradio as gr
def process_text(text, max_length=100):
if not text:
return "Please enter some text"
word_count = len(text.split())
char_count = len(text)
result = f"""
**Analysis Results:**
- Word count: {{word_count}}
- Character count: {{char_count}}
- Average word length: {{char_count/max(word_count, 1):.1f}}
"""
return result
with gr.Blocks(title="{repo_name}") as demo:
gr.Markdown(f"""
# {repo_name.replace("-", " ").title()}
{description}
This space was created from: [{repo_url}]({repo_url})
""")
with gr.Row():
with gr.Column():
input_text = gr.Textbox(
label="Input Text",
placeholder="Enter your text here...",
lines=5
)
max_length = gr.Slider(
minimum=10,
maximum=500,
value=100,
label="Max Length"
)
process_btn = gr.Button("Process Text", variant="primary")
with gr.Column():
output_text = gr.Markdown(label="Results")
process_btn.click(
fn=process_text,
inputs=[input_text, max_length],
outputs=output_text
)
if __name__ == "__main__":
demo.launch()
'''
else:
app_code = f'''import gradio as gr
def main_function(input_data):
if not input_data:
return "Please provide input"
result = f"Processed successfully! Input received: {{input_data}}"
return result
with gr.Blocks(title="{repo_name}") as demo:
gr.Markdown(f"""
# {repo_name.replace("-", " ").title()}
{description}
This space was created from: [{repo_url}]({repo_url})
""")
with gr.Row():
with gr.Column():
input_data = gr.Textbox(
label="Input",
placeholder="Enter your input here...",
lines=3
)
process_btn = gr.Button("Process", variant="primary")
with gr.Column():
output_data = gr.Textbox(label="Output")
process_btn.click(
fn=main_function,
inputs=input_data,
outputs=output_data
)
if __name__ == "__main__":
demo.launch()
'''
return {
"app_py": app_code,
"requirements_txt": "\n".join(requirements),
"summary": f"Smart template created for {repo_name}"
}
def clone(repo_git, repo_hf, sdk_type, skip_lfs, enable_smart_generation):
folder = str(uuid.uuid4())
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
yield "โŒ Error: HF_TOKEN not found in environment variables."
return
try:
yield "๐Ÿ”„ Starting clone process..."
api = HfApi(token=hf_token)
try:
user_info = api.whoami()
username = user_info["name"]
yield f"โœ… Authenticated as: {username}"
except Exception as e:
yield f"โŒ Authentication failed: {str(e)}"
return
yield f"๐Ÿ“ฅ Cloning repository from {repo_git}..."
env = os.environ.copy()
env['GIT_LFS_SKIP_SMUDGE'] = '1'
clone_cmd = ['git', 'clone', '--recurse-submodules', repo_git, folder]
subprocess.run(clone_cmd, check=True, env=env)
if not skip_lfs:
yield "๐Ÿ“ฆ Attempting to download LFS files..."
try:
subprocess.run(['git', 'lfs', 'install'], cwd=folder, check=True)
lfs_result = subprocess.run(['git', 'lfs', 'pull'], cwd=folder, capture_output=True, text=True)
if lfs_result.returncode != 0:
yield f"โš ๏ธ Warning: LFS download failed"
skip_lfs = True
else:
yield "โœ… LFS files downloaded successfully"
except Exception as e:
yield f"โš ๏ธ LFS error: {str(e)}"
skip_lfs = True
if skip_lfs:
yield "๐Ÿงน Removing LFS pointer files..."
removed_files = remove_lfs_files(folder)
if removed_files:
yield f"๐Ÿ“ Removed {len(removed_files)} LFS pointer files"
if enable_smart_generation:
yield "๐Ÿ” Analyzing repository structure..."
folder_path = Path(folder)
analysis = analyze_repository(folder_path)
yield "๐Ÿค– Generating smart Gradio app..."
generated = generate_gradio_app(repo_git, analysis)
if generated and isinstance(generated, dict) and "app_py" in generated:
app_path = folder_path / "app.py"
app_path.write_text(generated["app_py"], encoding="utf-8")
yield "โœ… Smart app.py generated"
req_path = folder_path / "requirements.txt"
existing_reqs = []
if req_path.exists():
try:
existing_reqs = req_path.read_text(encoding="utf-8").strip().split("\n")
except:
existing_reqs = []
new_reqs = generated["requirements_txt"].strip().split("\n") if generated["requirements_txt"] else []
all_reqs = set()
git_reqs = []
torch_reqs = []
regular_reqs = []
for req in existing_reqs + new_reqs:
req = req.strip()
if not req or req.startswith("#"):
continue
if req.startswith("git+"):
git_reqs.append(req)
elif "torch" in req.lower() or "cuda" in req.lower():
torch_reqs.append(req)
else:
regular_reqs.append(req)
has_gradio = any("gradio" in req for req in regular_reqs)
if not has_gradio:
regular_reqs.append("gradio>=5.35.0")
final_reqs = []
if torch_reqs:
final_reqs.extend(sorted(set(torch_reqs)))
final_reqs.append("")
final_reqs.extend(sorted(set(regular_reqs)))
if git_reqs:
final_reqs.append("")
final_reqs.extend(sorted(set(git_reqs)))
req_content = "\n".join(final_reqs)
req_path.write_text(req_content, encoding="utf-8")
yield "โœ… Requirements.txt updated"
readme_path = folder_path / "README.md"
readme_content = f"""---
title: {repo_hf.replace("-", " ").title()}
emoji: ๐Ÿš€
colorFrom: blue
colorTo: green
sdk: {sdk_type}
sdk_version: "5.35.0"
app_file: app.py
pinned: false
---
# {repo_hf.replace("-", " ").title()}
{analysis.get('description', 'Deployed from GitHub repository')}
Deployed from: {repo_git}
"""
readme_path.write_text(readme_content, encoding="utf-8")
yield "โœ… README.md created/updated"
git_dir = os.path.join(folder, '.git')
if os.path.exists(git_dir):
shutil.rmtree(git_dir)
yield "๐Ÿงน Removed .git directory"
gitattributes_path = os.path.join(folder, '.gitattributes')
if os.path.exists(gitattributes_path):
with open(gitattributes_path, 'r') as f:
lines = f.readlines()
new_lines = []
for line in lines:
if 'filter=lfs' not in line:
new_lines.append(line)
if new_lines:
with open(gitattributes_path, 'w') as f:
f.writelines(new_lines)
else:
os.remove(gitattributes_path)
yield "๐Ÿ—๏ธ Creating Hugging Face Space..."
repo_id = f"{username}/{slugify(repo_hf)}"
space_created = False
for attempt in range(3):
try:
yield f" Creating Space: {repo_id} (attempt {attempt + 1}/3)"
try:
existing_space = api.space_info(repo_id=repo_id, token=hf_token)
yield f" โ„น๏ธ Space already exists: {existing_space.id}"
space_created = True
break
except:
pass
create_result = api.create_repo(
repo_id=repo_id,
repo_type="space",
space_sdk=sdk_type,
exist_ok=True,
private=False,
token=hf_token
)
time.sleep(3)
space_info = api.space_info(repo_id=repo_id, token=hf_token)
yield f" โœ… Space created successfully: {space_info.id}"
space_created = True
break
except Exception as e:
error_msg = str(e)
if "429" in error_msg or "Too Many Requests" in error_msg:
yield f"โŒ Rate Limit Error - Try again in 17-24 hours"
raise Exception(f"Rate limit reached.")
yield f" โš ๏ธ Attempt {attempt + 1} failed: {error_msg[:100]}..."
if attempt < 2:
yield " Retrying in 5 seconds..."
time.sleep(5)
else:
yield f" โŒ Failed to create space after 3 attempts"
raise Exception(f"Could not create space: {error_msg}")
if not space_created:
raise Exception("Failed to create space")
folder_size = sum(os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(folder)
for filename in filenames) / (1024 * 1024)
yield f"๐Ÿ“Š Folder size: {folder_size:.2f} MB"
file_count = sum(len(files) for _, _, files in os.walk(folder))
yield f"๐Ÿ“ Total files to upload: {file_count}"
upload_success = False
max_retries = 3
for attempt in range(max_retries):
try:
if attempt > 0:
yield f"๐Ÿ“ค Upload attempt {attempt + 1}/{max_retries}..."
time.sleep(5)
if folder_size > 500:
yield "๐Ÿ“ค Uploading large folder to Hugging Face..."
api.upload_large_folder(
folder_path=folder,
repo_id=repo_id,
repo_type="space",
token=hf_token,
commit_message="Deploy from GitHub repository",
ignore_patterns=["*.pyc", "__pycache__", ".git*", ".DS_Store", "*.egg-info"]
)
else:
yield "๐Ÿ“ค Uploading to Hugging Face..."
api.upload_folder(
folder_path=folder,
repo_id=repo_id,
repo_type="space",
token=hf_token,
commit_message="Deploy from GitHub repository",
ignore_patterns=["*.pyc", "__pycache__", ".git*", ".DS_Store", "*.egg-info"]
)
upload_success = True
yield "โœ… Upload completed successfully"
break
except Exception as upload_error:
error_msg = str(upload_error)
if "404" in error_msg and attempt < max_retries - 1:
yield f" โš ๏ธ Upload failed (404). Retrying..."
time.sleep(10)
try:
space_info = api.space_info(repo_id=repo_id, token=hf_token)
yield f" โœ… Space confirmed to exist"
except:
yield " ๐Ÿ”„ Attempting to recreate space..."
try:
api.create_repo(
repo_id=repo_id,
repo_type="space",
space_sdk=sdk_type,
exist_ok=True,
private=False,
token=hf_token
)
yield " โœ… Space recreated"
except Exception as recreate_error:
yield f" โŒ Could not recreate space: {str(recreate_error)}"
elif "LFS pointer" in error_msg:
yield "โŒ Upload failed due to remaining LFS pointer files"
raise upload_error
elif attempt == max_retries - 1:
yield f"โŒ Upload failed after {max_retries} attempts"
raise upload_error
else:
yield f" โš ๏ธ Upload failed: {error_msg[:100]}..."
if not upload_success:
raise Exception("Upload failed after all retries")
shutil.rmtree(folder)
space_url = f"https://huggingface.co/spaces/{repo_id}"
yield f"""
โœ… **Successfully created Space!**
๐Ÿ”— **Your Space URL**: {space_url}
๐Ÿ“‹ **Summary:**
- Space ID: `{repo_id}`
- Source: {repo_git}
- SDK: {sdk_type}
- Smart Generation: {'Enabled' if enable_smart_generation else 'Disabled'}
- LFS Files: {'Skipped' if skip_lfs else 'Included'}
"""
if skip_lfs:
yield "\nโš ๏ธ LFS files were removed."
if enable_smart_generation:
yield "\n๐Ÿค– AI-generated Gradio interface was created"
except subprocess.CalledProcessError as e:
if os.path.exists(folder):
shutil.rmtree(folder)
yield f"โŒ Git error: {str(e)}"
except Exception as e:
if os.path.exists(folder):
shutil.rmtree(folder)
yield f"โŒ Error: {str(e)}"
css = """
.container {
max-width: 900px;
margin: auto;
padding: 20px;
}
.output-box {
min-height: 100px;
max-height: 400px;
overflow-y: auto;
font-family: monospace;
font-size: 14px;
line-height: 1.5;
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("# ๐Ÿš€ GitHub to Hugging Face Space Cloner")
if not os.getenv("HF_TOKEN"):
gr.Markdown("โŒ HF_TOKEN Required - Set it in Space settings")
else:
gr.Markdown("โœ… HF_TOKEN Found")
with gr.Row():
with gr.Column():
repo_git = gr.Textbox(
label="GitHub Repository URL",
placeholder="https://github.com/username/repository"
)
repo_hf = gr.Textbox(
label="Hugging Face Space Name",
placeholder="my-awesome-space"
)
sdk_choices = gr.Radio(
["gradio", "streamlit", "docker", "static"],
label="Space SDK",
value="gradio"
)
skip_lfs = gr.Checkbox(
label="Skip Git LFS files",
value=True
)
enable_smart_generation = gr.Checkbox(
label="๐Ÿค– Enable Smart app.py Generation",
value=False,
info="Requires FIREWORKS_API_KEY in environment variables"
)
btn = gr.Button("๐ŸŽฏ Clone Repository", variant="primary")
with gr.Column():
output = gr.Textbox(
label="Progress",
lines=15,
elem_classes=["output-box"],
interactive=False,
show_copy_button=True
)
btn.click(
fn=clone,
inputs=[repo_git, repo_hf, sdk_choices, skip_lfs, enable_smart_generation],
outputs=output
)
if __name__ == "__main__":
demo.launch()