Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / auth.py: 89%
54 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 08:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 08:17 +0000
1import os
2from datetime import datetime, timedelta, timezone
3from typing import Annotated
5import bcrypt
6import jwt
7from jwt.exceptions import InvalidTokenError
8from fastapi import Depends, HTTPException, status
9from fastapi.security import OAuth2PasswordBearer
10from twinpad_backend.models import GenericMongo, User
12SECRET_KEY = os.environ.get("SECRET_KEY", "687e0b37c59ae3e241c09ab729b6221462ac5998760241ac52cb4ed0e026059b")
13ALGORITHM = os.environ.get("ALGORITHM", "HS256")
14ACCESS_TOKEN_EXPIRE_MINUTES = os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 24 * 60)
17class Token(GenericMongo):
18 access_token: str
19 token_type: str
22oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
25def verify_password(form_password: str, password: str):
26 password_byte_enc = form_password.encode("utf-8")
27 hashed_password = password.encode("utf-8")
28 return bcrypt.checkpw(password=password_byte_enc, hashed_password=hashed_password)
31def get_password_hash(form_password):
32 pwd_bytes = form_password.encode("utf-8")
33 salt = bcrypt.gensalt(rounds=12)
34 return bcrypt.hashpw(password=pwd_bytes, salt=salt)
37def authenticate_user(form_mail, form_password):
38 user = User.get_one_by_attribute("email", form_mail)
39 if not user:
40 return False
41 if not verify_password(form_password, user.password):
42 return False
43 return user
46def create_access_token(data: dict, expires_delta: timedelta | None = None):
47 to_encode = data.copy()
48 if expires_delta:
49 expire = datetime.now(timezone.utc) + expires_delta
50 else:
51 expire = datetime.now(timezone.utc) + timedelta(minutes=15)
52 to_encode.update({"exp": expire})
53 encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
54 return encode_jwt
57async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
58 credentials_exception = HTTPException(
59 status_code=status.HTTP_401_UNAUTHORIZED,
60 detail="Could not validate credentials",
61 headers={"WWW-Authenticate": "Bearer"},
62 )
63 try:
64 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
65 email: str = payload.get("sub")
66 if email is None:
67 raise credentials_exception
68 except InvalidTokenError as e:
69 raise credentials_exception from e
70 user = User.get_one_by_attribute("email", email)
71 if user is None:
72 raise credentials_exception
73 return user
76async def get_current_active_user(
77 current_user: Annotated[User, Depends(get_current_user)],
78):
79 if current_user.is_active:
80 raise HTTPException(status_code=400, detail="Account blocked")
81 return current_user