from pymongo import MongoClient from datetime import datetime from dotenv import load_dotenv import os load_dotenv() MONGO_URI = os.getenv("ARK_LOGS_MONGO_URI") client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) # 数据库 db = client["arklogs"] # 豆包大模型的对话日志 chat_logs = db["chat_logs"] # 聊天历史记录 chat_history_col = db["chat_history"] # 兴趣圈集合 circle_prompts = db["circle_prompt"] # 带下划线的表示私有方法(Private) def _ensure_index(): try: chat_logs.create_index([("username", 1), ("asked_at", -1)]) chat_history_col.create_index([("username", 1), ("timestamp", -1)]) except Exception: pass def save_chat_log( username: str, question: str, stream_mode: bool, raw_response: str = None, status: str = "success", error: str = None, ): """ 保存聊天原始响应日志到 MongoDB Args: username: 提问人 question: 提问的问题 stream_mode: 回答方式(流式或非流式) raw_response: API 原始响应的 repr 字符串 status: 响应状态 success | error error: 异常时的错误信息 """ try: _ensure_index() chat_logs.insert_one({ "username": username, "question": question, "stream_mode": stream_mode, "raw_response": raw_response, "status": status, "error": error, "asked_at": datetime.now(), }) except Exception as e: print(f"MongoDB 日志写入失败: {e}") def save_chat_history( username: str, role: str, content: str, timestamp: datetime, response_id: str = None, thinking: str = None, searching: str = None, ): try: _ensure_index() chat_history_col.insert_one({ "username": username, "role": role, "content": content, "thinking": thinking, "searching": searching, "response_id": response_id, "timestamp": timestamp, }) except Exception as e: print(f"MongoDB 聊天历史写入失败: {e}") def get_chat_history(username: str) -> list: try: docs = chat_history_col.find( {"username": username}, {"_id": 0} ).sort("timestamp", 1) return list(docs) except Exception as e: print(f"MongoDB 聊天历史读取失败: {e}") return [] def get_last_response_id(username: str) -> str | None: try: doc = chat_history_col.find_one( {"username": username, "role": "assistant", "response_id": {"$ne": None}}, {"response_id": 1, "_id": 0}, sort=[("timestamp", -1)] ) return doc["response_id"] if doc else None except Exception as e: print(f"MongoDB 查询 response_id 失败: {e}") return None def delete_chat_history(username: str) -> int: try: result = chat_history_col.delete_many({"username": username}) return result.deleted_count except Exception as e: print(f"MongoDB 聊天历史删除失败: {e}") return 0 _DEFAULT_PROMPT_CONFIG = { "name": "兴趣圈", "role": "活跃用户", "style": "自然亲切,有活人感", "keywords": [], "forbidden": [], } def get_circle_prompt(app_name: str) -> dict: try: doc = circle_prompts.find_one({"appName": app_name}) return doc if doc else _DEFAULT_PROMPT_CONFIG except Exception: return _DEFAULT_PROMPT_CONFIG def upsert_circle_prompt(data: dict) -> None: circle_prompts.update_one( {"appName": data["appName"]}, {"$set": data}, upsert=True, )