Spaces:
Running
Running
| import gradio as gr | |
| import time | |
| import base64 | |
| from openai import OpenAI | |
| import os | |
| from io import BytesIO | |
| from PIL import Image | |
| # 配置 | |
| BASE_URL = "https://api.stepfun.com/v1" | |
| # 从环境变量获取API密钥 | |
| STEP_API_KEY = os.environ.get("STEP_API_KEY", "") | |
| # 可选模型 | |
| MODELS = ["step-3", "step-r1-v-mini"] | |
| def image_to_base64(image): | |
| """将PIL图像转换为base64字符串""" | |
| if image is None: | |
| return None | |
| if isinstance(image, Image.Image): | |
| buffered = BytesIO() | |
| image.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| return img_str | |
| return None | |
| def call_step_api_stream(message, history, model, temperature, max_tokens, image=None): | |
| """调用Step API进行流式对话""" | |
| if not message and not image: | |
| yield history | |
| return | |
| if not STEP_API_KEY: | |
| history.append([message or "图片", "❌ API密钥未配置。请在 Settings 中添加 STEP_API_KEY。"]) | |
| yield history | |
| return | |
| # 构造消息历史 | |
| messages = [] | |
| # 添加历史对话 | |
| for h in history: | |
| if h[0]: # 用户消息 | |
| messages.append({"role": "user", "content": h[0]}) | |
| if h[1]: # 助手回复 | |
| messages.append({"role": "assistant", "content": h[1]}) | |
| # 构造当前消息 | |
| if image is not None: | |
| # 有图片的情况 | |
| try: | |
| base64_image = image_to_base64(image) | |
| if base64_image is None: | |
| history.append([message or "图片", "❌ 图片处理失败"]) | |
| yield history | |
| return | |
| current_content = [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{base64_image}", "detail": "high"}} | |
| ] | |
| if message: | |
| current_content.append({"type": "text", "text": message}) | |
| messages.append({"role": "user", "content": current_content}) | |
| display_message = f"[图片] {message}" if message else "[图片]" | |
| except Exception as e: | |
| history.append([message or "图片", f"❌ 图片处理错误: {str(e)}"]) | |
| yield history | |
| return | |
| else: | |
| # 纯文本 | |
| messages.append({"role": "user", "content": message}) | |
| display_message = message | |
| # 添加到历史记录 | |
| history.append([display_message, ""]) | |
| # 创建客户端 | |
| try: | |
| client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) | |
| except Exception as e: | |
| history[-1][1] = f"❌ 客户端初始化失败: {str(e)}" | |
| yield history | |
| return | |
| # 调用API | |
| try: | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| stream=True | |
| ) | |
| # 处理流式响应 | |
| full_response = "" | |
| for chunk in response: | |
| if chunk.choices and len(chunk.choices) > 0: | |
| delta = chunk.choices[0].delta | |
| if hasattr(delta, 'content') and delta.content: | |
| full_response += delta.content | |
| history[-1][1] = full_response | |
| yield history | |
| except Exception as e: | |
| history[-1][1] = f"❌ API请求失败: {str(e)}" | |
| yield history | |
| def user_input(message, history, image): | |
| """处理用户输入""" | |
| if message or image: | |
| return "", history, None | |
| return message, history, image | |
| def clear_history(): | |
| """清空对话历史""" | |
| return [], None, "" | |
| # 创建Gradio界面 | |
| with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🤖 Step-3 | |
| Hello, I am Step-3! | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| # 对话界面 | |
| chatbot = gr.Chatbot( | |
| height=500, | |
| show_label=False, | |
| elem_id="chatbot", | |
| bubble_full_width=False | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=8): | |
| msg = gr.Textbox( | |
| label="输入消息", | |
| placeholder="输入你的问题...", | |
| lines=1, | |
| max_lines=5, | |
| show_label=False, | |
| elem_id="msg", | |
| container=False | |
| ) | |
| with gr.Column(scale=1, min_width=100): | |
| submit_btn = gr.Button("发送", variant="primary") | |
| with gr.Column(scale=1, min_width=100): | |
| clear_btn = gr.Button("清空对话") | |
| # 图片上传 | |
| with gr.Row(): | |
| image_input = gr.Image( | |
| label="上传图片(可选)", | |
| type="pil", | |
| height=150, | |
| scale=1 | |
| ) | |
| with gr.Column(scale=1): | |
| # 设置面板 | |
| gr.Markdown("### ⚙️ 设置") | |
| model_select = gr.Dropdown( | |
| choices=MODELS, | |
| value=MODELS[0], | |
| label="模型选择", | |
| interactive=True | |
| ) | |
| temperature_slider = gr.Slider( | |
| minimum=0, | |
| maximum=1, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature", | |
| interactive=True | |
| ) | |
| max_tokens_slider = gr.Slider( | |
| minimum=100, | |
| maximum=4000, | |
| value=2000, | |
| step=100, | |
| label="最大输出长度", | |
| interactive=True | |
| ) | |
| gr.Markdown(""" | |
| ### 📝 使用说明 | |
| - 支持多轮对话 | |
| - 可上传图片进行分析 | |
| - 支持纯文本对话 | |
| - 历史记录会保留上下文 | |
| """) | |
| # 事件处理 | |
| msg.submit( | |
| user_input, | |
| [msg, chatbot, image_input], | |
| [msg, chatbot, image_input], | |
| queue=False | |
| ).then( | |
| call_step_api_stream, | |
| [msg, chatbot, model_select, temperature_slider, max_tokens_slider, image_input], | |
| chatbot | |
| ) | |
| submit_btn.click( | |
| user_input, | |
| [msg, chatbot, image_input], | |
| [msg, chatbot, image_input], | |
| queue=False | |
| ).then( | |
| call_step_api_stream, | |
| [msg, chatbot, model_select, temperature_slider, max_tokens_slider, image_input], | |
| chatbot | |
| ) | |
| clear_btn.click( | |
| clear_history, | |
| None, | |
| [chatbot, image_input, msg], | |
| queue=False | |
| ) | |
| # 页脚 | |
| gr.Markdown(""" | |
| --- | |
| <div style="text-align: center;"> | |
| <img src="https://huggingface.co/stepfun-ai/step3/resolve/main/figures/stepfun-logo.png" alt="StepFun Logo" style="height: 40px; margin: 10px;"> | |
| <br> | |
| Powered by <a href="https://www.stepfun.com/" target="_blank">StepFun</a> | |
| </div> | |
| """) | |
| # 启动应用 | |
| if __name__ == "__main__": | |
| demo.queue() | |
| demo.launch() |