Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/auth.py: 89%
57 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
1import os
2from datetime import datetime, timedelta, timezone
3from typing import Annotated
4from secrets import token_hex
6import bcrypt
7import jwt
8from jwt.exceptions import InvalidTokenError
9from fastapi import Depends, HTTPException, status
10from fastapi.security import OAuth2PasswordBearer
11from twinpad_backend.models import GenericMongo, User
14SECRET_KEY = os.environ.get("SECRET_KEY", token_hex(32))
15ALGORITHM = os.environ.get("ALGORITHM", "HS256")
16ACCESS_TOKEN_EXPIRE_MINUTES = os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 24 * 60)
19class Token(GenericMongo):
20 access_token: str
21 token_type: str
24oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
27def verify_password(form_password: str, password: str):
28 password_byte_enc = form_password.encode("utf-8")
29 hashed_password = password.encode("utf-8")
30 return bcrypt.checkpw(password=password_byte_enc, hashed_password=hashed_password)
33def get_password_hash(form_password):
34 pwd_bytes = form_password.encode("utf-8")
35 salt = bcrypt.gensalt(rounds=12)
36 return bcrypt.hashpw(password=pwd_bytes, salt=salt)
39def authenticate_user(form_mail, form_password):
40 user = User.get_one_by_attribute("email", form_mail)
41 if not user:
42 return False
43 if not verify_password(form_password, user.password):
44 return False
45 return user
48def create_access_token(data: dict, expires_delta: timedelta | None = None):
49 to_encode = data.copy()
50 if expires_delta:
51 expire = datetime.now(timezone.utc) + expires_delta
52 else:
53 expire = datetime.now(timezone.utc) + timedelta(minutes=15)
54 to_encode.update({"exp": expire})
55 encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
56 return encode_jwt
59async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
60 credentials_exception = HTTPException(
61 status_code=status.HTTP_401_UNAUTHORIZED,
62 detail="Could not validate credentials",
63 headers={"WWW-Authenticate": "Bearer"},
64 )
65 try:
66 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
67 email: str = payload.get("sub")
68 if email is None:
69 raise credentials_exception
70 except InvalidTokenError as e:
71 raise credentials_exception from e
72 user = User.get_one_by_attribute("email", email)
73 if user is None:
74 raise credentials_exception
75 return user
78async def get_current_active_user(
79 current_user: Annotated[User, Depends(get_current_user)],
80):
81 if current_user.is_active:
82 raise HTTPException(status_code=400, detail="Account blocked")
83 return current_user