| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- import redis
- import json
- import requests
- from typing import Optional
- REDIS_HOST = "r-2zeitjlg0gdypzb4v6.redis.rds.aliyuncs.com"
- REDIS_PORT = 6379
- REDIS_TTL = 1800 # 30分钟
- THIRD_PARTY_API = "http://eco.zhongsou.com/eco/user/user.redis.info.groovy"
- try:
- redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
- except Exception:
- redis_client = None
- def get_app_user(token: str) -> Optional[dict]:
- """
- 通过第三方 token 获取用户信息
- 先查 Redis 缓存,未命中则调第三方接口并写入缓存
- Args:
- token: App 端传入的第三方 token
- Returns:
- 用户信息字典,token 无效或接口异常时返回 None
- """
- cache_key = f"app_user:{token}"
- # 1. 查 Redis 缓存
- try:
- if redis_client:
- cached = redis_client.get(cache_key)
- if cached:
- return json.loads(cached)
- except Exception as e:
- print(f"Redis 读取失败,降级调第三方接口: {e}")
- # 2. 调第三方接口
- try:
- resp = requests.get(THIRD_PARTY_API, params={"token": token}, timeout=5)
- data = resp.json()
- if data.get("head", {}).get("status") != 200:
- return None
- body = data.get("body")
- if not body or not body.get("userId"):
- return None
- user_info = {
- "userId": body["userId"],
- "userName": body["userName"],
- "nickName": body["nickName"],
- "mobile": body["mobile"],
- "imageUrl": body.get("imageUrl", ""),
- }
- # 3. 写入 Redis 缓存
- try:
- if redis_client:
- redis_client.setex(cache_key, REDIS_TTL, json.dumps(user_info))
- except Exception as e:
- print(f"Redis 写入失败: {e}")
- return user_info
- except Exception as e:
- print(f"第三方接口调用失败: {e}")
- return None
|