Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/auth.py: 89%

57 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-20 11:44 +0000

1import os 

2from datetime import datetime, timedelta, timezone 

3from typing import Annotated 

4from secrets import token_hex 

5 

6import bcrypt 

7import jwt 

8from jwt.exceptions import InvalidTokenError 

9from fastapi import Depends, HTTPException, status 

10from fastapi.security import OAuth2PasswordBearer 

11from twinpad_backend.models import GenericMongo, User 

12 

13 

14SECRET_KEY = os.environ.get("SECRET_KEY", token_hex(32)) 

15ALGORITHM = os.environ.get("ALGORITHM", "HS256") 

16ACCESS_TOKEN_EXPIRE_MINUTES = os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 24 * 60) 

17 

18 

19class Token(GenericMongo): 

20 access_token: str 

21 token_type: str 

22 

23 

24oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 

25 

26 

27def verify_password(form_password: str, password: str): 

28 password_byte_enc = form_password.encode("utf-8") 

29 hashed_password = password.encode("utf-8") 

30 return bcrypt.checkpw(password=password_byte_enc, hashed_password=hashed_password) 

31 

32 

33def get_password_hash(form_password): 

34 pwd_bytes = form_password.encode("utf-8") 

35 salt = bcrypt.gensalt(rounds=12) 

36 return bcrypt.hashpw(password=pwd_bytes, salt=salt) 

37 

38 

39def authenticate_user(form_mail, form_password): 

40 user = User.get_one_by_attribute("email", form_mail) 

41 if not user: 

42 return False 

43 if not verify_password(form_password, user.password): 

44 return False 

45 return user 

46 

47 

48def create_access_token(data: dict, expires_delta: timedelta | None = None): 

49 to_encode = data.copy() 

50 if expires_delta: 

51 expire = datetime.now(timezone.utc) + expires_delta 

52 else: 

53 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 

54 to_encode.update({"exp": expire}) 

55 encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

56 return encode_jwt 

57 

58 

59async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): 

60 credentials_exception = HTTPException( 

61 status_code=status.HTTP_401_UNAUTHORIZED, 

62 detail="Could not validate credentials", 

63 headers={"WWW-Authenticate": "Bearer"}, 

64 ) 

65 try: 

66 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 

67 email: str = payload.get("sub") 

68 if email is None: 

69 raise credentials_exception 

70 except InvalidTokenError as e: 

71 raise credentials_exception from e 

72 user = User.get_one_by_attribute("email", email) 

73 if user is None: 

74 raise credentials_exception 

75 return user 

76 

77 

78async def get_current_active_user( 

79 current_user: Annotated[User, Depends(get_current_user)], 

80): 

81 if current_user.is_active: 

82 raise HTTPException(status_code=400, detail="Account blocked") 

83 return current_user