Spaces:
Running
Running
| import cv2 | |
| from mtcnn.mtcnn import MTCNN | |
| from utils import * | |
| import requests | |
| from urllib.parse import urlparse | |
| cloth_examples = get_cloth_examples(hr=0) | |
| cloth_hr_examples = get_cloth_examples(hr=1) | |
| pose_examples = get_pose_examples() | |
| tip1, tip2 = get_tips() | |
| face_detector = MTCNN() | |
| # Description | |
| title = r""" | |
| <h1 align="center">Outfit Anyway: Best customer try-on You ever See</h1> | |
| """ | |
| description = r""" | |
| <b>Join discord to know more about </b> <a href='https://discord.com/invite/QgJWCtSG58' target='_blank'><b> heybeauty prebuy vton solution</b></a>.<br> | |
| """ | |
| def is_http_resource_accessible(url): | |
| """ | |
| Check if HTTP resource is accessible | |
| Returns True if accessible, False otherwise | |
| """ | |
| if not url or not isinstance(url, str): | |
| return False | |
| try: | |
| # Parse URL to check if it's a valid HTTP/HTTPS URL | |
| parsed = urlparse(url) | |
| if parsed.scheme not in ['http', 'https']: | |
| return False | |
| # Make a HEAD request to check if resource exists | |
| response = requests.head(url, timeout=5, allow_redirects=True) | |
| return response.status_code == 200 | |
| except Exception as e: | |
| print(f"Error checking resource accessibility: {e}") | |
| return False | |
| def onPoseChange(prompt_text, source_image, token_input, request: gr.Request): | |
| """Handle pose change request""" | |
| # Check token first | |
| if token_input != POSEToken: | |
| return "please input the correct token!", None, None, None | |
| if source_image is None: | |
| return "Please provide source image first!", None, None, None | |
| if not prompt_text or prompt_text.strip() == "": | |
| prompt_text = "Change the pose: two hands on hips.#Change the pose: arms extended to show outfit." | |
| try: | |
| client_ip = request.client.host | |
| x_forwarded_for = dict(request.headers).get('x-forwarded-for') | |
| if x_forwarded_for: | |
| client_ip = x_forwarded_for | |
| # If source_image is a local file path, upload it first | |
| if isinstance(source_image, str) and not source_image.startswith('http'): | |
| timeId = int(str(time.time()).replace(".", "")) + random.randint(1000, 9999) | |
| image_url = upload_pose_img(client_ip, timeId, source_image) | |
| if not image_url: | |
| return "Image upload failed!", None, None, None | |
| else: | |
| image_url = source_image | |
| # Initiate pose change request | |
| pose_result = public_pose_changer(image_url, prompt_text) | |
| if pose_result is None: | |
| return "Pose change request failed!", None, None, None | |
| # Poll for results | |
| max_try = 120 | |
| wait_s = 1 | |
| for i in range(max_try): | |
| time.sleep(wait_s) | |
| result = get_pose_changer_res(pose_result['id']) | |
| if result is None: | |
| continue | |
| elif result['status'] == 'PROCESSING': | |
| continue | |
| elif result['status'] == 'SUCCEED': | |
| # Extract the first 3 valid output images | |
| output_images = [None, None, None] | |
| for j in range(1, 4): # output1 to output3 | |
| output_key = f'output{j}' | |
| if output_key in result and result[output_key] and result[output_key].strip(): | |
| timestamp = int(time.time() * 1000) | |
| output_images[j-1] = result[output_key] + f"?t={timestamp}" | |
| return f"Pose change completed! {result.get('msg', '')}", output_images[0], output_images[1], output_images[2] | |
| elif result['status'] == 'FAILED': | |
| return f"Pose change failed: {result.get('msg', '')}", None, None, None | |
| return "Pose change timeout!", None, None, None | |
| except Exception as e: | |
| print(f"Pose change exception: {e}") | |
| return f"Processing exception: {str(e)}", None, None, None | |
| def onClick(cloth_image, pose_image, high_resolution, request: gr.Request): | |
| if pose_image is None: | |
| yield None, "no pose image found !", "", None | |
| return None, "no pose image found !", "", None | |
| if cloth_image is None: | |
| yield None, "no cloth image found !", "", None | |
| return None, "no cloth image found !", "", None | |
| pose_id = os.path.basename(pose_image).split(".")[0] | |
| cloth_id = int(os.path.basename(cloth_image).split(".")[0]) | |
| try: | |
| client_ip = request.client.host | |
| x_forwarded_for = dict(request.headers).get('x-forwarded-for') | |
| if x_forwarded_for: | |
| client_ip = x_forwarded_for | |
| pose_np = cv2.imread(pose_image) | |
| faces = face_detector.detect_faces(pose_np[:,:,::-1]) | |
| if len(faces)==0: | |
| print(client_ip, 'faces num is 0! ', flush=True) | |
| yield None, "Fatal Error !!! No face detected !!! You must upload a human photo!!! Not clothing photo!!!", "", None | |
| return None, "Fatal Error !!! No face detected !!! You must upload a human photo!!! Not clothing photo!!!", "", None | |
| else: | |
| x, y, w, h = faces[0]["box"] | |
| H, W = pose_np.shape[:2] | |
| max_face_ratio = 1/3.3 | |
| if w/W>max_face_ratio or h/H>max_face_ratio: | |
| yield None, "Fatal Error !!! Headshot is not allowed !!! You must upload a full-body or half-body photo!!!", "", None | |
| return None, "Fatal Error !!! Headshot is not allowed !!! You must upload a full-body or half-body photo!!!", "", None | |
| if not check_region_warp(client_ip): | |
| yield None, "Failed !!! Our server is under maintenance, please try again later", "", None | |
| return None, "Failed !!! Our server is under maintenance, please try again later", "", None | |
| # client_ip = '8.8.8.8' | |
| yield None, "begin to upload ", "", None | |
| timeId = int( str(time.time()).replace(".", "") )+random.randint(1000, 9999) | |
| upload_url = upload_pose_img(client_ip, timeId, pose_image) | |
| # exit(0) | |
| yield None, "begin to public task ", "", None | |
| # return None, "begin to public task ", "" | |
| if len(upload_url)==0: | |
| yield None, "fail to upload", "", None | |
| return None, "fail to upload", "", None | |
| if high_resolution: | |
| public_res = publicClothSwap(upload_url, cloth_id, is_hr=1) | |
| else: | |
| public_res = publicClothSwap(upload_url, cloth_id, is_hr=0) | |
| if public_res is None: | |
| yield None, "fail to public you task", "", None | |
| return None, "fail to public you task", "", None | |
| print(client_ip, public_res['mid_result']) | |
| # Check if mid_result resource is accessible | |
| mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None | |
| yield mid_result, f"task is processing, task id: {public_res['id']}, {public_res['msg']}", "", mid_result | |
| max_try = 120*3 | |
| wait_s = 0.5 | |
| for i in range(max_try): | |
| time.sleep(wait_s) | |
| state = getInfRes(public_res['id']) | |
| timestamp = int(time.time() * 1000) | |
| if state is None: | |
| mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None | |
| result_url = mid_result + f"?t={timestamp}" if mid_result else None | |
| yield result_url, "task query failed,", "", result_url | |
| elif state['status']=='PROCESSING': | |
| mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None | |
| result_url = mid_result + f"?t={timestamp}" if mid_result else None | |
| yield result_url, f"task is processing, query {i}", "", result_url | |
| elif state['status']=='SUCCEED': | |
| result_image = state['output1'] + f"?t={timestamp}" | |
| yield result_image, f"task finished, {state['msg']}", "", result_image | |
| return result_image, f"task finished, {state['msg']}", "", result_image | |
| elif state['status']=='FAILED': | |
| yield None, f"task failed, {state['msg']}", "", None | |
| return None, f"task failed, {state['msg']}", "", None | |
| else: | |
| mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None | |
| result_url = mid_result + f"?t={timestamp}" if mid_result else None | |
| yield result_url, f"task is on processing, query {i}", "", result_url | |
| return None, "no machine...", "", None | |
| except Exception as e: | |
| print(e) | |
| raise e | |
| return None, "fail to create task", "", None | |
| with gr.Blocks() as demo: | |
| gr.Markdown(title) | |
| gr.Markdown(description) | |
| with gr.Accordion('upload tips', open=False): | |
| with gr.Row(): | |
| gr.HTML(f"<img src=\"{tip1}\" >") | |
| gr.HTML(f"<img src=\"{tip2}\" >") | |
| with gr.Row(): | |
| with gr.Column(): | |
| cloth_image = gr.Image(value=None, interactive=False, type="filepath", label="choose a clothing") | |
| example = gr.Examples(inputs=cloth_image,examples_per_page=20,examples=cloth_examples, label="clothing") | |
| hr_example = gr.Examples(inputs=cloth_image,examples_per_page=9,examples=cloth_hr_examples, label="invalid clothing") | |
| with gr.Column(): | |
| pose_image = gr.Image(value=None, type="filepath", label="choose/upload a photo") | |
| example_pose = gr.Examples(inputs=pose_image, | |
| examples_per_page=20, | |
| examples=pose_examples) | |
| with gr.Column(): | |
| with gr.Column(): | |
| # size_slider = gr.Slider(-3, 3, value=1, interactive=True, label="clothes size") | |
| high_resolution = gr.Checkbox(value=False, label="high resolution", interactive=True) | |
| run_button = gr.Button(value="Run") | |
| info_text = gr.Textbox(value="", interactive=False, | |
| label='runtime information') | |
| res_image = gr.Image(label="result image", value=None, type="filepath") | |
| MK01 = gr.Markdown() | |
| # Add pose changer module | |
| with gr.Accordion('pose changer', open=False): | |
| # Top: token input | |
| with gr.Row(): | |
| token_input = gr.Textbox( | |
| value="", | |
| label="Access Token", | |
| placeholder="请输入token...", | |
| type="password", | |
| scale=1 | |
| ) | |
| # Middle: text box and button | |
| with gr.Row(): | |
| pose_prompt = gr.Textbox( | |
| value="Change the pose: hands on hips.#Change the pose: arms extended.", | |
| label="Pose Change Prompt", | |
| placeholder="Enter pose change description...", | |
| lines=2, | |
| scale=4 | |
| ) | |
| change_button = gr.Button(value="Change", scale=1) | |
| # Bottom: source image on left, result images on right | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| pose_changer_image = gr.Image(value=None, type="filepath", label="Source Image", interactive=True) | |
| pose_change_info = gr.Textbox(value="", interactive=False, label="Processing Info") | |
| with gr.Column(scale=2): | |
| with gr.Row(): | |
| pose_result_1 = gr.Image(label="Result 1", value=None, type="filepath") | |
| pose_result_2 = gr.Image(label="Result 2", value=None, type="filepath") | |
| pose_result_3 = gr.Image(label="Result 3", value=None, type="filepath") | |
| run_button.click(fn=onClick, inputs=[cloth_image, pose_image, high_resolution], | |
| outputs=[res_image, info_text, MK01, pose_changer_image]) | |
| # Bind pose changer change button | |
| change_button.click( | |
| fn=onPoseChange, | |
| inputs=[pose_prompt, pose_changer_image, token_input], | |
| outputs=[pose_change_info, pose_result_1, pose_result_2, pose_result_3] | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=50) | |
| # demo.queue(concurrency_count=60) | |
| # demo.launch(server_name='0.0.0.0', server_port=225) | |
| # demo.launch(server_name='0.0.0.0') | |
| demo.launch() | |