Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / auth.py: 89%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 09:52 +0000

1import os 

2from datetime import datetime, timedelta, timezone 

3from typing import Annotated 

4 

5import bcrypt 

6import jwt 

7from jwt.exceptions import InvalidTokenError 

8from fastapi import Depends, HTTPException, status 

9from fastapi.security import OAuth2PasswordBearer 

10from twinpad_backend.models import GenericMongo, User 

11 

12SECRET_KEY = os.environ.get("SECRET_KEY", "687e0b37c59ae3e241c09ab729b6221462ac5998760241ac52cb4ed0e026059b") 

13ALGORITHM = os.environ.get("ALGORITHM", "HS256") 

14ACCESS_TOKEN_EXPIRE_MINUTES = os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 24 * 60) 

15 

16 

17class Token(GenericMongo): 

18 access_token: str 

19 token_type: str 

20 

21 

22oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 

23 

24 

25def verify_password(form_password: str, password: str): 

26 password_byte_enc = form_password.encode("utf-8") 

27 hashed_password = password.encode("utf-8") 

28 return bcrypt.checkpw(password=password_byte_enc, hashed_password=hashed_password) 

29 

30 

31def get_password_hash(form_password): 

32 pwd_bytes = form_password.encode("utf-8") 

33 salt = bcrypt.gensalt(rounds=12) 

34 return bcrypt.hashpw(password=pwd_bytes, salt=salt) 

35 

36 

37def authenticate_user(form_mail, form_password): 

38 user = User.get_one_by_attribute("email", form_mail) 

39 if not user: 

40 return False 

41 if not verify_password(form_password, user.password): 

42 return False 

43 return user 

44 

45 

46def create_access_token(data: dict, expires_delta: timedelta | None = None): 

47 to_encode = data.copy() 

48 if expires_delta: 

49 expire = datetime.now(timezone.utc) + expires_delta 

50 else: 

51 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 

52 to_encode.update({"exp": expire}) 

53 encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

54 return encode_jwt 

55 

56 

57async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): 

58 credentials_exception = HTTPException( 

59 status_code=status.HTTP_401_UNAUTHORIZED, 

60 detail="Could not validate credentials", 

61 headers={"WWW-Authenticate": "Bearer"}, 

62 ) 

63 try: 

64 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 

65 email: str = payload.get("sub") 

66 if email is None: 

67 raise credentials_exception 

68 except InvalidTokenError as e: 

69 raise credentials_exception from e 

70 user = User.get_one_by_attribute("email", email) 

71 if user is None: 

72 raise credentials_exception 

73 return user 

74 

75 

76async def get_current_active_user( 

77 current_user: Annotated[User, Depends(get_current_user)], 

78): 

79 if current_user.is_active: 

80 raise HTTPException(status_code=400, detail="Account blocked") 

81 return current_user