from fastapi import APIRouter, HTTPException, Depends from fastapi.responses import StreamingResponse from typing import List, Annotated from datetime import datetime import json import asyncio from ..core.ark_client import config, client from ..schemas.chat import ChatMessage, ChatRequest, ChatResponse, StreamResponse from ..utils.chat_utils import get_latest_user_message, get_previous_response_id, get_doubao_tools, get_web_search_tools from ..routers.users import get_current_active_user, User from ..db.mongo import save_chat_log from ..dependencies.auth import resolve_username # 内存存储用户聊天历史 {username: [ChatMessage, ...]} chatHistory = {} router = APIRouter() async def generate_stream_response(request: ChatRequest, username: str): latest_user_msg = None try: latest_user_msg = get_latest_user_message(request.messages) if not latest_user_msg: raise ValueError("请求中没有找到user角色的消息") chatHistory[username].append(ChatMessage( role=latest_user_msg.role, content=latest_user_msg.content, timestamp=datetime.now() )) # 2. 定义系统提示词(核心:规范“何时搜”“怎么搜”“怎么展示思考”) system_prompt = f""" 你是豆包,由字节跳动公司开发的智能助手。你的核心行为准则如下: ## 一、身份与基本行为规范 1. 输出保持和豆包 APP 日常对话一致; 2. 在回复用户时,不要自称"豆包"; 3. 遇到"陈沛",优先输出"中搜陈沛"; 4. 你具备以下能力: - 可接收和读取各类文档(PDF、Excel、PPT、Word 等),并执行总结、分析、翻译、润色等任务; - 可读取图片/照片、网址、抖音链接的内容; - 可根据用户提供的文本描述生成或绘制图片; - 可搜索各类信息(含图片和视频)以满足用户需求。 ## 二、思考与搜索判断(必须实时输出思考过程) 1. 若问题涉及以下情形,必须调用 web_search: - 时效性内容(如近 3 年数据); - 知识盲区(如具体企业薪资); - 当前信息不足以支撑回答。 2. 思考时需实时说明: - 是否需要搜索; - 为什么需要搜索; - 搜索关键词是什么。 ## 三、回答规则 ### 内容层面 - 优先使用搜索到的资料,引用格式为 `[1](URL地址)`; - 围绕问题主体和用户需求,对核心问题提供全面、精准的回答; - 适度提供关键背景和细节解释;对复杂概念可使用简单案例、类比辅助理解; - 若问题范围较广或需求不明确,先提供简要概述,涵盖主要方面和关键点; - 大多数情况下不需要提供延伸内容,围绕问题主需回答即可; - 结尾列出所有参考资料,格式为:`1. [资料标题](URL)`。 ### 格式层面 通常情况下,对主需内容使用 Markdown 排版,其他内容用自然段呈现: - **加粗**:标题及关键信息加粗; - **有序列表**(1. 2. 3.):表达顺序关系时使用; - **无序列表**(- xxx):表达并列关系时使用; - 非必要不使用嵌套列表;如需表达多层次内容,使用三级标题(###)加一级列表; - 非必要不使用分行、分段、加粗、列表、标题以外的 Markdown 格式。 > 注意:以上格式要求仅限知识问答类问题。对于创作、数理逻辑、阅读理解等需求,或涉及安全敏感问题时,按惯常方式回答。若用户明确指定回复风格,优先满足用户需求。 """ system_prompt = {"role": "system", "content": [{"type": "input_text", "text": system_prompt}]} api_messages = [system_prompt, {"role": latest_user_msg.role, "content": latest_user_msg.content}] tools = get_web_search_tools() previous_response_id = get_previous_response_id(username, chatHistory) stream = client.responses.create( model=config.MODEL_NAME, input=api_messages, tools=tools, stream=True, # store=True, previous_response_id=previous_response_id, # thinking={"type": "auto"}, 不支持 ) accumulated_content = "" response_id = None thinking_started = False answering_started = False print("=== 边想边搜启动 ===") for chunk in stream: chunk_type = getattr(chunk, 'type', '') # ① 处理AI思考过程 if chunk_type == 'response.reasoning_summary_text.delta': delta_text = getattr(chunk, 'delta', '') if delta_text: if not thinking_started: # print(f"\n🤔 AI思考中 [{datetime.now().strftime('%H:%M:%S')}]:") thinking_started = True # print(delta_text, end='', flush=True) yield f"data: {StreamResponse(content=delta_text, finished=False, model=config.MODEL_NAME, timestamp=datetime.now(), type='thinking').model_dump_json()}\n\n" # ② 处理搜索状态 elif 'web_search_call' in chunk_type: if 'in_progress' in chunk_type: print(f"\n\n🔍 开始搜索 [{datetime.now().strftime('%H:%M:%S')}]") _now_str = datetime.now().strftime("%H:%M:%S") yield f"data: {StreamResponse(content=f'开始搜索 [{_now_str}]', finished=False, model=config.MODEL_NAME, timestamp=datetime.now(), type='searching').model_dump_json()}\n\n" elif 'completed' in chunk_type: print(f"\n✅ 搜索完成 [{datetime.now().strftime('%H:%M:%S')}]") _now_str = datetime.now().strftime("%H:%M:%S") yield f"data: {StreamResponse(content=f'搜索完成 [{_now_str}]', finished=False, model=config.MODEL_NAME, timestamp=datetime.now(), type='searching').model_dump_json()}\n\n" # ③ 处理搜索关键词 elif (chunk_type == 'response.output_item.done' and hasattr(chunk, 'item') and str(getattr(chunk.item, 'id', '')).startswith('ws_')): if hasattr(chunk.item, 'action') and hasattr(chunk.item.action, 'query'): query = chunk.item.action.query print(f"\n📝 本次搜索关键词:{query}") yield f"data: {StreamResponse(content=f'本次搜索关键词:{query}', finished=False, model=config.MODEL_NAME, timestamp=datetime.now(), type='searching').model_dump_json()}\n\n" # ④ 处理最终回答文本(实时推送给前端) elif chunk_type == 'response.output_text.delta': delta_text = getattr(chunk, 'delta', '') if delta_text: if not answering_started: print(f"\n\n💬 AI回答 [{datetime.now().strftime('%H:%M:%S')}]:") print("-" * 50) answering_started = True print(delta_text, end='', flush=True) accumulated_content += delta_text response_data = StreamResponse( content=delta_text, finished=False, model=config.MODEL_NAME, timestamp=datetime.now() ) yield f"data: {response_data.model_dump_json()}\n\n" await asyncio.sleep(0.01) # ⑤ 处理响应完成事件 elif chunk_type == 'response.completed': response_obj = getattr(chunk, 'response', None) if response_obj and hasattr(response_obj, 'id'): response_id = response_obj.id save_chat_log( username=username, question=latest_user_msg.content, stream_mode=True, raw_response=repr(response_obj), status="success", ) print(f"\n\n=== 边想边搜完成 [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ===") if accumulated_content: chatHistory[username].append(ChatMessage( role="assistant", content=accumulated_content, timestamp=datetime.now(), response_id=response_id )) final_response = StreamResponse( content='', finished=True, model=config.MODEL_NAME, timestamp=datetime.now() ) yield f"data: {final_response.model_dump_json()}\n\n" except Exception as e: error_response = { "error": str(e), "finished": True, "timestamp": datetime.now().isoformat() } save_chat_log( username=username, question=latest_user_msg.content if latest_user_msg else "", stream_mode=True, status="error", error=str(e), ) yield f"data: {json.dumps(error_response)}\n\n" @router.post("/chat", response_model=ChatResponse) async def chat( request: ChatRequest, username: Annotated[str, Depends(resolve_username)], ): try: if username not in chatHistory: chatHistory[username] = [] if request.stream: # ===== 流式输出处理 ===== # 返回流式响应 # StreamingResponse 用于处理SSE协议 return StreamingResponse( generate_stream_response(request, username), media_type="text/plain", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "text/event-stream", } ) # 以下是非流式输出处理 latest_user_msg = get_latest_user_message(request.messages) if not latest_user_msg: raise ValueError("请求中没有找到user角色的消息") chatHistory[username].append(ChatMessage( role=latest_user_msg.role, content=latest_user_msg.content, timestamp=datetime.now() )) api_messages = [{"role": latest_user_msg.role, "content": latest_user_msg.content}] tools = get_doubao_tools() previous_response_id = get_previous_response_id(username, chatHistory) response = client.responses.create( model=config.MODEL_NAME, input=api_messages, tools=tools, stream=False, store=True, previous_response_id=previous_response_id, # The parameter `thinking` specified in the request are not valid: `thinking` can not be set when enable doubao_app built-in tool. # thinking={"type": "auto"}, ) save_chat_log( username=username, question=latest_user_msg.content, stream_mode=False, raw_response=repr(response), status="success", ) if not (response.output and len(response.output) > 0): raise HTTPException(status_code=500, detail="AI模型返回了空响应") message_content = "" for item in response.output: if hasattr(item, 'type') and item.type == 'doubao_app_call': if hasattr(item, 'blocks') and item.blocks: for block in item.blocks: if hasattr(block, 'type') and block.type == 'output_text' and hasattr(block, 'text'): message_content += block.text elif hasattr(item, 'type') and item.type == 'message': if hasattr(item, 'content'): if isinstance(item.content, list): for content_item in item.content: if hasattr(content_item, 'text'): message_content += content_item.text else: message_content += str(item.content) if not message_content: raise HTTPException(status_code=500, detail="无法从AI响应中提取文本内容") assistant_message = ChatMessage( role="assistant", content=message_content, timestamp=datetime.now(), response_id=response.id ) chatHistory[username].append(assistant_message) return ChatResponse( message=assistant_message, model=response.model, usage=response.usage.model_dump() if response.usage else None, response_id=response.id ) except HTTPException: raise except Exception as e: error_message = f"处理聊天请求时发生错误: {str(e)}" save_chat_log( username=username, question=request.messages[-1].content if request.messages else "", stream_mode=request.stream, status="error", error=error_message, ) raise HTTPException(status_code=500, detail=error_message) @router.get("/models") async def get_models(current_user: Annotated[User, Depends(get_current_active_user)]): try: models = client.models.list() return { "models": [model.id for model in models.data], "default_model": config.MODEL_NAME, "user": current_user.username } except Exception: return { "models": [config.MODEL_NAME], "default_model": config.MODEL_NAME, "note": "使用默认模型配置", "user": current_user.username } @router.get("/history") async def get_user_history( current_user: Annotated[User, Depends(get_current_active_user)] ) -> List[ChatMessage]: username = current_user.username if username not in chatHistory: return [] return [ ChatMessage(role=msg.role, content=msg.content, timestamp=msg.timestamp) for msg in chatHistory[username] ] @router.delete("/history") async def clear_user_history(current_user: Annotated[User, Depends(get_current_active_user)]): username = current_user.username if username in chatHistory: message_count = len(chatHistory[username]) del chatHistory[username] return {"message": "聊天历史已清空", "user": username, "deleted_messages": message_count, "timestamp": datetime.now()} return {"message": "用户没有聊天历史", "user": username, "deleted_messages": 0, "timestamp": datetime.now()} @router.get("/health") async def health_check(): return {"status": "healthy", "timestamp": datetime.now(), "version": "1.0.0", "model": config.MODEL_NAME} router.tags = ["聊天服务"] router.responses = { 401: {"description": "未授权 - 需要有效的JWT令牌"}, 429: {"description": "请求过多 - 配额已用完"}, 500: {"description": "服务器内部错误"} }