auth.py 1.2 KB

123456789101112131415161718192021222324252627282930313233
  1. from fastapi import HTTPException, Query, Depends
  2. from fastapi.security import OAuth2PasswordBearer
  3. from typing import Optional, Annotated
  4. import jwt
  5. from ..db.redis_client import get_app_user
  6. from ..config.config import Config
  7. config = Config()
  8. oauth2_scheme_optional = OAuth2PasswordBearer(tokenUrl="/users/token", auto_error=False)
  9. async def resolve_username(
  10. jwt_token: Annotated[Optional[str], Depends(oauth2_scheme_optional)] = None,
  11. source: Optional[str] = Query(default=None),
  12. token: Optional[str] = Query(default=None),
  13. ) -> str:
  14. if source == "app" and token:
  15. app_user = get_app_user(token)
  16. if not app_user:
  17. raise HTTPException(status_code=401, detail="无效的 App token")
  18. return f"app_{app_user['userId']}"
  19. if not jwt_token:
  20. raise HTTPException(status_code=401, detail="未提供认证令牌")
  21. try:
  22. payload = jwt.decode(jwt_token, config.SECRET_KEY, algorithms=[config.ALGORITHM])
  23. sub = payload.get("sub")
  24. if not sub:
  25. raise HTTPException(status_code=401, detail="无效的令牌")
  26. return sub
  27. except jwt.PyJWTError:
  28. raise HTTPException(status_code=401, detail="无效的令牌")