| import gradio as gr | |
| import hmac | |
| import hashlib | |
| import time | |
| import os | |
| import requests | |
| from io import BytesIO | |
| from PIL import Image | |
| import uuid | |
| example_path = os.path.join(os.path.dirname(__file__), 'assets') | |
| human_list = os.listdir(os.path.join(example_path, "human")) | |
| human_list_path = [os.path.join(example_path, "human", human) for human in human_list] | |
| base_url = os.getenv('base_url') | |
| upload_image_url = os.getenv('upload_image_url') | |
| create_save_task_url = os.getenv('create_save_task_url') | |
| execute_task_url = os.getenv('execute_task_url') | |
| query_task_url = os.getenv('query_task_url') | |
| secret_key = os.getenv('secret_key') | |
| app_id = os.getenv('app_id') | |
| agent_version = os.getenv('agent_version') | |
| agent_name = os.getenv('agent_name') | |
| login_status_key = os.getenv('login_status_key') | |
| login_info_key = os.getenv('login_info_key') | |
| login_status_key2 = os.getenv('login_status_key2') | |
| login_hash_key = os.getenv('login_hash_key') | |
| def parse_response(response): | |
| data = {} | |
| msg = '' | |
| if response.status_code == 200: | |
| try: | |
| datas = response.json() | |
| if datas: | |
| data = datas.get("data") | |
| if not data: | |
| msg = datas.get("msg") | |
| if not msg: | |
| msg = "Field error." | |
| else: | |
| msg = "The parsing result is empty." | |
| except Exception as e: | |
| msg = f"parse error: {repr(e)}." | |
| else: | |
| msg = f'request error.' | |
| return data, msg | |
| def generate_signature(key, did, timestamp): | |
| data = f"{did}:{timestamp}{app_id}" | |
| h = hmac.new(key.encode(), data.encode(), hashlib.sha256) | |
| return h.hexdigest() | |
| def url_to_image(url, ip, user_id, user_name): | |
| headers = { | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| 'X-Forwarded-For': ip, | |
| 'X-HfUid': user_id, | |
| 'X-HfUserName': user_name, | |
| } | |
| try: | |
| response = requests.get(url, headers=headers, timeout=30) | |
| except: | |
| return None | |
| if response.status_code == 200: | |
| img = Image.open(BytesIO(response.content)) | |
| return img | |
| return None | |
| def start_task(task_id, did, ip, user_id, user_name): | |
| timestamp = str(int(time.time())) | |
| signature = generate_signature( | |
| key=secret_key, | |
| did=did, | |
| timestamp=timestamp, | |
| ) | |
| headers = { | |
| 'Did': did, | |
| 'X-Timestamp': timestamp, | |
| 'X-Signature': signature, | |
| 'X-Forwarded-For': ip, | |
| 'X-AppId': app_id, | |
| 'X-HfUid': user_id, | |
| 'X-HfUserName': user_name, | |
| } | |
| data = { | |
| "agentVersion": agent_version, | |
| "agentName": agent_name, | |
| "taskId": task_id, | |
| "runFreeAsFallback": False | |
| } | |
| response = requests.post(base_url + execute_task_url, json=data, headers=headers) | |
| data, msg = parse_response(response) | |
| return data, msg | |
| def create_task(image_url, did, ip, user_id, user_name): | |
| timestamp = str(int(time.time())) | |
| signature = generate_signature( | |
| key=secret_key, | |
| did=did, | |
| timestamp=timestamp, | |
| ) | |
| headers = { | |
| 'Did': did, | |
| 'X-Timestamp': timestamp, | |
| 'X-Signature': signature, | |
| 'X-Forwarded-For': ip, | |
| 'X-AppId': app_id, | |
| 'X-HfUid': user_id, | |
| 'X-HfUserName': user_name, | |
| } | |
| data = { | |
| "agentVersion": agent_version, | |
| "agentName": agent_name, | |
| "image": image_url | |
| } | |
| response = requests.post(base_url + create_save_task_url, json=data, headers=headers) | |
| data, msg = parse_response(response) | |
| return data, msg | |
| def save_task(image_url, show_image, task_id, text_description, did, ip, user_id, user_name): | |
| timestamp = str(int(time.time())) | |
| signature = generate_signature( | |
| key=secret_key, | |
| did=did, | |
| timestamp=timestamp, | |
| ) | |
| headers = { | |
| 'Did': did, | |
| 'X-Timestamp': timestamp, | |
| 'X-Signature': signature, | |
| 'X-Forwarded-For': ip, | |
| 'X-AppId': app_id, | |
| 'X-HfUid': user_id, | |
| 'X-HfUserName': user_name, | |
| } | |
| data = { | |
| "agentVersion": agent_version, | |
| "agentName": agent_name, | |
| "image": image_url, | |
| "showImage": show_image, | |
| "taskId": task_id, | |
| "textDescription": text_description, | |
| } | |
| response = requests.post(base_url + create_save_task_url, json=data, headers=headers) | |
| data, msg = parse_response(response) | |
| return data, msg | |
| def query_task(task_id, execution_id, did, ip, user_id, user_name): | |
| timestamp = str(int(time.time())) | |
| signature = generate_signature( | |
| key=secret_key, | |
| did=did, | |
| timestamp=timestamp, | |
| ) | |
| headers = { | |
| 'Did': did, | |
| 'X-Timestamp': timestamp, | |
| 'X-Signature': signature, | |
| 'X-Forwarded-For': ip, | |
| 'X-AppId': app_id, | |
| 'X-HfUid': user_id, | |
| 'X-HfUserName': user_name, | |
| } | |
| data = { | |
| "agentVersion": agent_version, | |
| "agentName": agent_name, | |
| "taskId": task_id, | |
| "executionId": execution_id, | |
| } | |
| response = requests.post(base_url + query_task_url, json=data, headers=headers) | |
| data, msg = parse_response(response) | |
| return data, msg | |
| def upload_image(image, did, ip, user_id, user_name): | |
| if image is None: | |
| return None | |
| timestamp = str(int(time.time())) | |
| signature = generate_signature( | |
| key=secret_key, | |
| did=did, | |
| timestamp=timestamp, | |
| ) | |
| upload_url = upload_image_url | |
| image_format = image.format if image.format else "PNG" | |
| mime_type = f"image/{image_format.lower()}" | |
| with BytesIO() as m_img: | |
| image.save(m_img, format=image_format) | |
| m_img.seek(0) | |
| files = {'image': (f"main_image.{image_format.lower()}", m_img, mime_type)} | |
| headers = { | |
| 'Did': did, | |
| 'X-Timestamp': timestamp, | |
| 'X-Signature': signature, | |
| 'X-Forwarded-For': ip, | |
| 'X-AppId': app_id, | |
| 'X-HfUid': user_id, | |
| 'X-HfUserName': user_name, | |
| } | |
| response = requests.post(base_url + upload_url, files=files, headers=headers) | |
| data, msg = parse_response(response) | |
| return data, msg | |
| def load_description(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| content = file.read() | |
| return content | |
| def generate_image(main_image, text_description, did, request: gr.Request): | |
| if not did: | |
| did = str(uuid.uuid4()) | |
| user_id = request.session_hash | |
| if not user_id: | |
| user_id = f"{login_hash_key}{user_id}" | |
| user_name = request.session_hash | |
| if not user_id or not user_name: | |
| m = "We're sorry, but there seems to be an issue with your request. Please check your network connection or refresh the page and try again." | |
| return gr.Warning(m), did | |
| if main_image is None: | |
| m = "Please upload both the main image and the background reference image before generating." | |
| return gr.Warning(m), did | |
| client_ip = request.client.host | |
| x_forwarded_for = request.headers.get('x-forwarded-for') | |
| if x_forwarded_for: | |
| client_ip = x_forwarded_for | |
| upload_image_data, upload_image_msg = upload_image( | |
| image=main_image, | |
| did=did, | |
| ip=client_ip, | |
| user_id=user_id, | |
| user_name=user_name | |
| ) | |
| if not upload_image_data: | |
| return gr.Warning(upload_image_msg), did | |
| image_url = upload_image_data.get("image") | |
| if not image_url: | |
| m = 'Upload image failed.' | |
| return gr.Warning(m), did | |
| create_task_data, create_task_msg = create_task( | |
| image_url=image_url, | |
| did=did, | |
| ip=client_ip, | |
| user_id=user_id, | |
| user_name=user_name | |
| ) | |
| if not create_task_data: | |
| return gr.Warning(create_task_msg), did | |
| task_id = create_task_data.get("taskId") | |
| show_image = create_task_data.get("showImage") | |
| if not task_id or not show_image: | |
| m = 'Create task failed.' | |
| return gr.Warning(m), did | |
| save_task_data, save_task_msg = save_task( | |
| image_url=image_url, | |
| show_image=show_image, | |
| task_id=task_id, | |
| text_description=text_description, | |
| did=did, | |
| ip=client_ip, | |
| user_id=user_id, | |
| user_name=user_name | |
| ) | |
| if not save_task_data: | |
| return gr.Warning(save_task_msg), did | |
| save_task_id = save_task_data.get("taskId") | |
| save_show_image = save_task_data.get("showImage") | |
| if not save_task_id or not save_show_image: | |
| return gr.Warning('Save task failed'), did | |
| start_task_data, start_task_msg = start_task( | |
| task_id=save_task_id, | |
| did=did, | |
| ip=client_ip, | |
| user_id=user_id, | |
| user_name=user_name | |
| ) | |
| if not start_task_data: | |
| return gr.Warning(start_task_msg), did | |
| execution_id = start_task_data.get("executionId") | |
| if not execution_id: | |
| m = "The task failed to start." | |
| return gr.Warning(m), did | |
| start_time = int(time.time()) | |
| while True: | |
| m = 'Error occurred during AI generation. Please adjust the description or re-upload the image.' | |
| query_task_data, query_task_msg = query_task( | |
| task_id=save_task_id, | |
| execution_id=execution_id, | |
| did=did, | |
| ip=client_ip, | |
| user_id=user_id, | |
| user_name=user_name | |
| ) | |
| if not query_task_data: | |
| return gr.Warning(query_task_msg), did | |
| executions = query_task_data.get("executions") | |
| if not executions: | |
| return gr.Warning(m), did | |
| results = executions[0].get("result") | |
| if not results: | |
| return gr.Warning(m), did | |
| status = results[0].get("status") | |
| if status == "Failed": | |
| return gr.Warning(m), did | |
| elif status == "Success" or status == "Blocked": | |
| img = results[0].get("image") | |
| if img and str(img).strip() != "": | |
| return url_to_image(img, ip=client_ip, user_id=user_id, user_name=user_name), did | |
| end_time = int(time.time()) | |
| if end_time - start_time > 3600: | |
| m = 'Query task timeout.' | |
| return gr.Warning(m), did | |
| time.sleep(4) | |
| def preprocess_image(main_image): | |
| return main_image | |
| css = """ | |
| .text-box { | |
| min-height: 260px !important; | |
| } | |
| .image-container { | |
| min-height: 260px !important; | |
| } | |
| .image-container img { | |
| max-height: 500px; | |
| width: auto; | |
| } | |
| .hide-buttons .source-selection { | |
| display: none; | |
| } | |
| #example-images .gallery { | |
| display: flex; | |
| flex-wrap: wrap; | |
| } | |
| #example-images .gallery-item .container{ | |
| width: 100%; | |
| max-width: 100%; | |
| } | |
| #example-images .gallery-item { | |
| flex: 0 0 30%; | |
| max-width: 30%; | |
| box-sizing: border-box; | |
| display: flex; | |
| text-align: center; | |
| justify-content: center; | |
| } | |
| @media (max-width: 767px) { | |
| #example-res-images th { | |
| font-size: 12px; | |
| word-wrap: break-word; | |
| word-break: break-word; | |
| white-space: normal; | |
| overflow-wrap: break-word; | |
| } | |
| } | |
| """ | |
| with gr.Blocks(css=css) as WeShop: | |
| current_did = gr.State(value='') | |
| gr.HTML(load_description("assets/title.html")) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("#### Step 1: Upload a person image for Pose Change") | |
| main_image_input = gr.Image( | |
| type="pil", | |
| label="Main Image", | |
| elem_classes=["image-container", "hide-buttons"] | |
| ) | |
| clothing_example = gr.Examples( | |
| inputs=main_image_input, | |
| examples_per_page=12, | |
| examples=human_list_path, | |
| elem_id="example-images", | |
| outputs=main_image_input | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("#### Step 2: Input pose description for your desired Pose Change") | |
| text_description_input = gr.Textbox( | |
| label="Text Description", | |
| placeholder="Enter pose description here...", | |
| lines=9, | |
| elem_classes=["text-box"] | |
| ) | |
| text_description_example = gr.Examples( | |
| inputs=text_description_input, | |
| examples_per_page=12, | |
| examples=[ | |
| "Keep the background unchanged, only change the person's pose", | |
| "Following camera angle", | |
| "Change to back view", | |
| "Change pose", | |
| "Scene remains unchanged, clothing shown from multiple angles", | |
| "Walking angle", | |
| "Model simulating walking posture", | |
| "Low angle upward shot", | |
| ], | |
| outputs=text_description_input | |
| ) | |
| with gr.Column(): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("#### Step 3: Press 'Generate' to get your Pose Change result") | |
| output = gr.Image( | |
| label="Pose Change Result", | |
| elem_classes=["image-container", "hide-buttons"], | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| submit_button = gr.Button("Generate") | |
| submit_button.click( | |
| fn=generate_image, | |
| inputs=[main_image_input, text_description_input, current_did], | |
| outputs=[output, current_did], | |
| concurrency_limit=None | |
| ) | |
| with gr.Column(): | |
| show_case = gr.Examples( | |
| examples=[ | |
| [ | |
| "assets/human/07.png", | |
| "Walking angle", | |
| "assets/examples/result_07_01.jpeg", | |
| ], | |
| [ | |
| "assets/human/01.png", | |
| "Following camera angle", | |
| "assets/examples/result_01_01.jpeg" | |
| ], | |
| [ | |
| "assets/human/06.png", | |
| "Change pose", | |
| "assets/examples/result_06_01.jpeg" | |
| ], | |
| [ | |
| "assets/human/05.png", | |
| "Keep the background unchanged, only change the person's pose", | |
| "assets/examples/result_04_01.jpeg" | |
| ], | |
| ], | |
| inputs=[main_image_input, text_description_input, output], | |
| elem_id="example-res-images" | |
| ) | |
| main_image_input.upload( | |
| fn=preprocess_image, | |
| inputs=[main_image_input], | |
| outputs=main_image_input | |
| ) | |
| WeShop.queue(api_open=False).launch(show_api=False) | |