Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/auth.py: 89%
56 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-16 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-16 15:32 +0000
1import os
2from datetime import datetime, timedelta, timezone
3from typing import Annotated
5import bcrypt
6import jwt
7from jwt.exceptions import InvalidTokenError
8from fastapi import Depends, HTTPException, status
9from fastapi.security import OAuth2PasswordBearer
10from twinpad_backend.models import GenericMongo, User
13SECRET_KEY = os.environ.get("SECRET_KEY", "687e0b37c59ae3e241c09ab729b6221462ac5998760241ac52cb4ed0e026059b")
14ALGORITHM = os.environ.get("ALGORITHM", "HS256")
15ACCESS_TOKEN_EXPIRE_MINUTES = os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 24 * 60)
18class Token(GenericMongo):
19 access_token: str
20 token_type: str
23oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
26def verify_password(form_password: str, password: str):
27 password_byte_enc = form_password.encode("utf-8")
28 hashed_password = password.encode("utf-8")
29 return bcrypt.checkpw(password=password_byte_enc, hashed_password=hashed_password)
32def get_password_hash(form_password):
33 pwd_bytes = form_password.encode("utf-8")
34 salt = bcrypt.gensalt(rounds=12)
35 return bcrypt.hashpw(password=pwd_bytes, salt=salt)
38def authenticate_user(form_mail, form_password):
39 user = User.get_one_by_attribute("email", form_mail)
40 if not user:
41 return False
42 if not verify_password(form_password, user.password):
43 return False
44 return user
47def create_access_token(data: dict, expires_delta: timedelta | None = None):
48 to_encode = data.copy()
49 if expires_delta:
50 expire = datetime.now(timezone.utc) + expires_delta
51 else:
52 expire = datetime.now(timezone.utc) + timedelta(minutes=15)
53 to_encode.update({"exp": expire})
54 encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
55 return encode_jwt
58async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
59 credentials_exception = HTTPException(
60 status_code=status.HTTP_401_UNAUTHORIZED,
61 detail="Could not validate credentials",
62 headers={"WWW-Authenticate": "Bearer"},
63 )
64 try:
65 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
66 email: str = payload.get("sub")
67 if email is None:
68 raise credentials_exception
69 except InvalidTokenError as e:
70 raise credentials_exception from e
71 user = User.get_one_by_attribute("email", email)
72 if user is None:
73 raise credentials_exception
74 return user
77async def get_current_active_user(
78 current_user: Annotated[User, Depends(get_current_user)],
79):
80 if current_user.is_active:
81 raise HTTPException(status_code=400, detail="Account blocked")
82 return current_user