import redis import json import requests from typing import Optional REDIS_HOST = "r-2zeitjlg0gdypzb4v6.redis.rds.aliyuncs.com" REDIS_PORT = 6379 REDIS_TTL = 1800 # 30分钟 THIRD_PARTY_API = "http://eco.zhongsou.com/eco/user/user.redis.info.groovy" try: redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) except Exception: redis_client = None def get_app_user(token: str) -> Optional[dict]: """ 通过第三方 token 获取用户信息 先查 Redis 缓存,未命中则调第三方接口并写入缓存 Args: token: App 端传入的第三方 token Returns: 用户信息字典,token 无效或接口异常时返回 None """ cache_key = f"app_user:{token}" # 1. 查 Redis 缓存 try: if redis_client: cached = redis_client.get(cache_key) if cached: return json.loads(cached) except Exception as e: print(f"Redis 读取失败,降级调第三方接口: {e}") # 2. 调第三方接口 try: resp = requests.get(THIRD_PARTY_API, params={"token": token}, timeout=5) data = resp.json() if data.get("head", {}).get("status") != 200: return None body = data.get("body") if not body or not body.get("userId"): return None user_info = { "userId": body["userId"], "userName": body["userName"], "nickName": body["nickName"], "mobile": body["mobile"], "imageUrl": body.get("imageUrl", ""), } # 3. 写入 Redis 缓存 try: if redis_client: redis_client.setex(cache_key, REDIS_TTL, json.dumps(user_info)) except Exception as e: print(f"Redis 写入失败: {e}") return user_info except Exception as e: print(f"第三方接口调用失败: {e}") return None