mongo.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. from pymongo import MongoClient
  2. from datetime import datetime
  3. from dotenv import load_dotenv
  4. import os
  5. load_dotenv()
  6. MONGO_URI = os.getenv("ARK_LOGS_MONGO_URI")
  7. client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
  8. # 数据库
  9. db = client["arklogs"]
  10. # 豆包大模型的对话日志
  11. chat_logs = db["chat_logs"]
  12. # 聊天历史记录
  13. chat_history_col = db["chat_history"]
  14. # 兴趣圈集合
  15. circle_prompts = db["circle_prompt"]
  16. def _ensure_index():
  17. try:
  18. chat_logs.create_index([("user_id", 1), ("asked_at", -1)])
  19. chat_history_col.create_index([("user_id", 1), ("session_id", 1), ("timestamp", -1)])
  20. except Exception:
  21. pass
  22. def save_chat_log(
  23. user_id: str,
  24. question: str,
  25. stream_mode: bool,
  26. raw_response: str = None,
  27. status: str = "success",
  28. error: str = None,
  29. ):
  30. """
  31. 保存聊天原始响应日志到 MongoDB
  32. Args:
  33. user_id: 提问人
  34. question: 提问的问题
  35. stream_mode: 回答方式(流式或非流式)
  36. raw_response: API 原始响应的 repr 字符串
  37. status: 响应状态 success | error
  38. error: 异常时的错误信息
  39. """
  40. try:
  41. _ensure_index()
  42. chat_logs.insert_one({
  43. "user_id": user_id,
  44. "question": question,
  45. "stream_mode": stream_mode,
  46. "raw_response": raw_response,
  47. "status": status,
  48. "error": error,
  49. "asked_at": datetime.now(),
  50. })
  51. except Exception as e:
  52. print(f"MongoDB 日志写入失败: {e}")
  53. def save_chat_history(
  54. user_id: str,
  55. session_id: str,
  56. role: str,
  57. content: str,
  58. timestamp: datetime,
  59. response_id: str = None,
  60. thinking: str = None,
  61. searching: str = None,
  62. ):
  63. try:
  64. _ensure_index()
  65. chat_history_col.insert_one({
  66. "user_id": user_id,
  67. "session_id": session_id,
  68. "role": role,
  69. "content": content,
  70. "thinking": thinking,
  71. "searching": searching,
  72. "response_id": response_id,
  73. "timestamp": timestamp,
  74. })
  75. except Exception as e:
  76. print(f"MongoDB 聊天历史写入失败: {e}")
  77. def get_chat_history(user_id: str, session_id: str) -> list:
  78. try:
  79. docs = chat_history_col.find(
  80. {"user_id": user_id, "session_id": session_id},
  81. {"_id": 0}
  82. ).sort("timestamp", 1)
  83. return list(docs)
  84. except Exception as e:
  85. print(f"MongoDB 聊天历史读取失败: {e}")
  86. return []
  87. def get_last_response_id(user_id: str, session_id: str) -> str | None:
  88. try:
  89. doc = chat_history_col.find_one(
  90. {"user_id": user_id, "session_id": session_id, "role": "assistant", "response_id": {"$ne": None}},
  91. {"response_id": 1, "_id": 0},
  92. sort=[("timestamp", -1)]
  93. )
  94. return doc["response_id"] if doc else None
  95. except Exception as e:
  96. print(f"MongoDB 查询 response_id 失败: {e}")
  97. return None
  98. def delete_chat_history(user_id: str, session_id: str) -> int:
  99. try:
  100. result = chat_history_col.delete_many({"user_id": user_id, "session_id": session_id})
  101. return result.deleted_count
  102. except Exception as e:
  103. print(f"MongoDB 聊天历史删除失败: {e}")
  104. return 0
  105. def get_sessions(user_id: str) -> list:
  106. try:
  107. pipeline = [
  108. {"$match": {"user_id": user_id, "role": "user"}},
  109. {"$sort": {"timestamp": 1}},
  110. {"$group": {
  111. "_id": "$session_id",
  112. "createdAt": {"$first": "$timestamp"},
  113. "preview": {"$first": "$content"},
  114. }},
  115. {"$sort": {"createdAt": -1}},
  116. {"$project": {
  117. "_id": 0,
  118. "sessionId": "$_id",
  119. "createdAt": 1,
  120. "preview": {"$substrCP": ["$preview", 0, 20]},
  121. }},
  122. ]
  123. return list(chat_history_col.aggregate(pipeline))
  124. except Exception as e:
  125. print(f"MongoDB 会话列表查询失败: {e}")
  126. return []
  127. _DEFAULT_PROMPT_CONFIG = {
  128. "name": "兴趣圈",
  129. "role": "活跃用户",
  130. "style": "自然亲切,有活人感",
  131. "keywords": [],
  132. "forbidden": [],
  133. }
  134. def get_circle_prompt(app_name: str) -> dict:
  135. try:
  136. doc = circle_prompts.find_one({"appName": app_name})
  137. return doc if doc else _DEFAULT_PROMPT_CONFIG
  138. except Exception:
  139. return _DEFAULT_PROMPT_CONFIG
  140. def upsert_circle_prompt(data: dict) -> None:
  141. circle_prompts.update_one(
  142. {"appName": data["appName"]},
  143. {"$set": data},
  144. upsert=True,
  145. )