| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- from pymongo import MongoClient
- from bson import ObjectId
- from datetime import datetime
- from dotenv import load_dotenv
- import os
- load_dotenv()
- MONGO_URI = os.getenv("ARK_LOGS_MONGO_URI")
- client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
- # 数据库
- db = client["arklogs"]
- # 豆包大模型的对话日志
- chat_logs = db["chat_logs"]
- # 聊天历史记录
- chat_history_col = db["chat_history"]
- # 兴趣圈集合
- circle_prompts = db["circle_prompt"]
- # 历史人物集合
- historical_figures = db["historical_figures"]
- def _ensure_index():
- try:
- chat_logs.create_index([("user_id", 1), ("asked_at", -1)])
- chat_history_col.create_index([("user_id", 1), ("session_id", 1), ("timestamp", -1)])
- except Exception:
- pass
- def save_chat_log(
- user_id: str,
- question: str,
- stream_mode: bool,
- raw_response: str = None,
- status: str = "success",
- error: str = None,
- ):
- """
- 保存聊天原始响应日志到 MongoDB
- Args:
- user_id: 提问人
- question: 提问的问题
- stream_mode: 回答方式(流式或非流式)
- raw_response: API 原始响应的 repr 字符串
- status: 响应状态 success | error
- error: 异常时的错误信息
- """
- try:
- _ensure_index()
- chat_logs.insert_one({
- "user_id": user_id,
- "question": question,
- "stream_mode": stream_mode,
- "raw_response": raw_response,
- "status": status,
- "error": error,
- "asked_at": datetime.now(),
- })
- except Exception as e:
- print(f"MongoDB 日志写入失败: {e}")
- def save_chat_history(
- user_id: str,
- session_id: str,
- role: str,
- content: str,
- timestamp: datetime,
- response_id: str = None,
- thinking: str = None,
- searching: str = None,
- ):
- try:
- _ensure_index()
- chat_history_col.insert_one({
- "user_id": user_id,
- "session_id": session_id,
- "role": role,
- "content": content,
- "thinking": thinking,
- "searching": searching,
- "response_id": response_id,
- "timestamp": timestamp,
- })
- except Exception as e:
- print(f"MongoDB 聊天历史写入失败: {e}")
- def get_chat_history(user_id: str, session_id: str) -> list:
- try:
- docs = chat_history_col.find(
- {"user_id": user_id, "session_id": session_id},
- {"_id": 0}
- ).sort("timestamp", 1)
- return list(docs)
- except Exception as e:
- print(f"MongoDB 聊天历史读取失败: {e}")
- return []
- def get_last_response_id(user_id: str, session_id: str) -> str | None:
- try:
- doc = chat_history_col.find_one(
- {"user_id": user_id, "session_id": session_id, "role": "assistant", "response_id": {"$ne": None}},
- {"response_id": 1, "_id": 0},
- sort=[("timestamp", -1)]
- )
- return doc["response_id"] if doc else None
- except Exception as e:
- print(f"MongoDB 查询 response_id 失败: {e}")
- return None
- def delete_chat_history(user_id: str, session_id: str) -> int:
- try:
- result = chat_history_col.delete_many({"user_id": user_id, "session_id": session_id})
- return result.deleted_count
- except Exception as e:
- print(f"MongoDB 聊天历史删除失败: {e}")
- return 0
- def get_sessions(user_id: str) -> list:
- try:
- pipeline = [
- {"$match": {"user_id": user_id, "role": "user"}},
- {"$sort": {"timestamp": 1}},
- {"$group": {
- "_id": "$session_id",
- "createdAt": {"$first": "$timestamp"},
- "preview": {"$first": "$content"},
- }},
- {"$sort": {"createdAt": -1}},
- {"$project": {
- "_id": 0,
- "sessionId": "$_id",
- "createdAt": 1,
- "preview": {"$substrCP": ["$preview", 0, 20]},
- }},
- ]
- return list(chat_history_col.aggregate(pipeline))
- except Exception as e:
- print(f"MongoDB 会话列表查询失败: {e}")
- return []
- _DEFAULT_PROMPT_CONFIG = {
- "name": "兴趣圈",
- "role": "活跃用户",
- "style": "自然亲切,有活人感",
- "keywords": [],
- "forbidden": [],
- }
- def get_circle_prompt(app_name: str) -> dict:
- try:
- doc = circle_prompts.find_one({"appName": app_name})
- return doc if doc else _DEFAULT_PROMPT_CONFIG
- except Exception:
- return _DEFAULT_PROMPT_CONFIG
- def upsert_circle_prompt(data: dict) -> None:
- circle_prompts.update_one(
- {"appName": data["appName"]},
- {"$set": data},
- upsert=True,
- )
- # ===================== 历史人物 =====================
- def get_all_figures() -> list:
- try:
- docs = historical_figures.find({})
- return [{"_id": str(doc["_id"]), **{k: v for k, v in doc.items() if k != "_id"}} for doc in docs]
- except Exception as e:
- print(f"MongoDB 历史人物列表查询失败: {e}")
- return []
- def get_figure_by_id(figure_id: str) -> dict | None:
- try:
- doc = historical_figures.find_one({"_id": ObjectId(figure_id)})
- if doc:
- doc["_id"] = str(doc["_id"])
- return doc
- except Exception as e:
- print(f"MongoDB 历史人物查询失败: {e}")
- return None
- def insert_figure(data: dict) -> str:
- try:
- result = historical_figures.insert_one(data)
- return str(result.inserted_id)
- except Exception as e:
- print(f"MongoDB 历史人物新增失败: {e}")
- return None
- def update_figure(figure_id: str, data: dict) -> int:
- try:
- result = historical_figures.update_one(
- {"_id": ObjectId(figure_id)},
- {"$set": data},
- )
- return result.matched_count
- except Exception as e:
- print(f"MongoDB 历史人物修改失败: {e}")
- return 0
- def delete_figure(figure_id: str) -> int:
- try:
- result = historical_figures.delete_one({"_id": ObjectId(figure_id)})
- return result.deleted_count
- except Exception as e:
- print(f"MongoDB 历史人物删除失败: {e}")
- return 0
|