Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/auth.py: 85%
53 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-04 13:35 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-04 13:35 +0000
1import os
2from datetime import datetime, timedelta, timezone
3from typing import Annotated
5import jwt
6from jwt.exceptions import InvalidTokenError
7from fastapi import Depends, HTTPException, status
8from fastapi.security import OAuth2PasswordBearer
9from passlib.context import CryptContext
10from twinpad_backend.models import GenericMongo, User
13SECRET_KEY = os.environ.get("SECRET_KEY", "687e0b37c59ae3e241c09ab729b6221462ac5998760241ac52cb4ed0e026059b")
14ALGORITHM = os.environ.get("ALGORITHM", "HS256")
15ACCESS_TOKEN_EXPIRE_MINUTES = os.environ.get("ACCESS_TOKEN_EXPIRE_MINUTES", 24 * 60)
18class Token(GenericMongo):
19 access_token: str
20 token_type: str
23pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
24oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
27def verify_password(form_password, password):
28 return pwd_context.verify(form_password, password)
31def get_password_hash(form_password):
32 return pwd_context.hash(form_password)
35def authenticate_user(form_mail, form_password):
36 user = User.get_one_by_attribute("email", form_mail)
37 if not user:
38 return False
39 if not verify_password(form_password, user.password):
40 return False
41 return user
44def create_access_token(data: dict, expires_delta: timedelta | None = None):
45 to_encode = data.copy()
46 if expires_delta:
47 expire = datetime.now(timezone.utc) + expires_delta
48 else:
49 expire = datetime.now(timezone.utc) + timedelta(minutes=15)
50 to_encode.update({"exp": expire})
51 encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
52 return encode_jwt
55async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
56 credentials_exception = HTTPException(
57 status_code=status.HTTP_401_UNAUTHORIZED,
58 detail="Could not validate credentials",
59 headers={"WWW-Authenticate": "Bearer"},
60 )
61 try:
62 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
63 email: str = payload.get("sub")
64 if email is None:
65 raise credentials_exception
66 except InvalidTokenError as e:
67 raise credentials_exception from e
68 user = User.get_one_by_attribute("email", email)
69 if user is None:
70 raise credentials_exception
71 return user
74async def get_current_active_user(
75 current_user: Annotated[User, Depends(get_current_user)],
76):
77 if current_user.is_active:
78 raise HTTPException(status_code=400, detail="Account blocked")
79 return current_user