Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/auth.py: 85%

53 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 14:27 +0000

1import os 

2from datetime import datetime, timedelta, timezone 

3from typing import Annotated 

4 

5import jwt 

6from jwt.exceptions import InvalidTokenError 

7from fastapi import Depends, HTTPException, status 

8from fastapi.security import OAuth2PasswordBearer 

9from passlib.context import CryptContext 

10from twinpad_backend.models import GenericMongo, User 

11 

12 

13SECRET_KEY = os.environ.get("SECRET_KEY", "687e0b37c59ae3e241c09ab729b6221462ac5998760241ac52cb4ed0e026059b") 

14ALGORITHM = os.environ.get("ALGORITHM", "HS256") 

15ACCESS_TOKEN_EXPIRE_MINUTES = os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 24 * 60) 

16 

17 

18class Token(GenericMongo): 

19 access_token: str 

20 token_type: str 

21 

22 

23pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 

24oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 

25 

26 

27def verify_password(form_password, password): 

28 return pwd_context.verify(form_password, password) 

29 

30 

31def get_password_hash(form_password): 

32 return pwd_context.hash(form_password) 

33 

34 

35def authenticate_user(form_mail, form_password): 

36 user = User.get_one_by_attribute("email", form_mail) 

37 if not user: 

38 return False 

39 if not verify_password(form_password, user.password): 

40 return False 

41 return user 

42 

43 

44def create_access_token(data: dict, expires_delta: timedelta | None = None): 

45 to_encode = data.copy() 

46 if expires_delta: 

47 expire = datetime.now(timezone.utc) + expires_delta 

48 else: 

49 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 

50 to_encode.update({"exp": expire}) 

51 encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

52 return encode_jwt 

53 

54 

55async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): 

56 credentials_exception = HTTPException( 

57 status_code=status.HTTP_401_UNAUTHORIZED, 

58 detail="Could not validate credentials", 

59 headers={"WWW-Authenticate": "Bearer"}, 

60 ) 

61 try: 

62 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 

63 email: str = payload.get("sub") 

64 if email is None: 

65 raise credentials_exception 

66 except InvalidTokenError as e: 

67 raise credentials_exception from e 

68 user = User.get_one_by_attribute("email", email) 

69 if user is None: 

70 raise credentials_exception 

71 return user 

72 

73 

74async def get_current_active_user( 

75 current_user: Annotated[User, Depends(get_current_user)], 

76): 

77 if current_user.is_active: 

78 raise HTTPException(status_code=400, detail="Account blocked") 

79 return current_user