Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / routes / deployers.py: 97%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-03 09:52 +0000

1from fastapi import APIRouter 

2 

3from twinpad_backend.api import HTTPException, Depends, get_current_active_user 

4from twinpad_backend.models import ( 

5 DeviceDeployer, 

6 DeviceDeployerUpdate, 

7 DeviceFromDeployer, 

8 DeviceFromDeployerCreation, 

9 DeviceId, 

10 DeviceUpdateFromDeployer, 

11 MongoId, 

12) 

13 

14router = APIRouter() 

15 

16 

17@router.get("/", dependencies=[Depends(get_current_active_user)]) 

18async def get_device_deployers() -> list[DeviceDeployer]: 

19 return DeviceDeployer.get_all() 

20 

21 

22@router.post("/", dependencies=[Depends(get_current_active_user)], status_code=201) 

23async def add_deployer(deployer: DeviceDeployer): 

24 deployer.insert() 

25 return deployer 

26 

27 

28@router.get("/{deployer_id}", dependencies=[Depends(get_current_active_user)]) 

29async def get_device_deployers(deployer_id: MongoId) -> DeviceDeployer: 

30 deployer = DeviceDeployer.get_from_id(deployer_id) 

31 if deployer is None: 

32 raise HTTPException( 

33 status_code=404, 

34 detail=f"Deployer not found", 

35 ) 

36 return deployer 

37 

38 

39@router.patch("/{deployer_id}", dependencies=[Depends(get_current_active_user)]) 

40async def update_device_deployers(deployer_id: MongoId, deployer_update: DeviceDeployerUpdate) -> DeviceDeployer: 

41 deployer = DeviceDeployer.get_from_id(deployer_id) 

42 if deployer is None: 

43 raise HTTPException( 

44 status_code=404, 

45 detail=f"Deployer not found", 

46 ) 

47 return deployer.update(deployer_update.model_dump(exclude_unset=True, mode="json")) 

48 

49 

50@router.delete("/{deployer_id}", dependencies=[Depends(get_current_active_user)]) 

51async def add_deployer(deployer_id: MongoId): 

52 deployer = DeviceDeployer.get_from_id(deployer_id) 

53 if deployer is None: 

54 raise HTTPException( 

55 status_code=404, 

56 detail=f"Deployer not found", 

57 ) 

58 deleted_info = deployer.delete() 

59 return deleted_info 

60 

61 

62@router.post("/{deployer_id}/devices", dependencies=[Depends(get_current_active_user)], status_code=201) 

63async def create_device_from_deployer(deployer_id: MongoId, device: DeviceFromDeployerCreation) -> DeviceFromDeployer: 

64 deployer = DeviceDeployer.get_from_id(deployer_id) 

65 device = deployer.create_device(device) 

66 if device is None: 

67 raise HTTPException(status_code=400, detail="Cannot create device") 

68 return device 

69 

70 

71@router.get("/{deployer_id}/devices", dependencies=[Depends(get_current_active_user)]) 

72async def get_devices_from_deployer(deployer_id: MongoId) -> list[DeviceFromDeployer]: 

73 deployer = DeviceDeployer.get_from_id(deployer_id) 

74 if deployer is None: 

75 raise HTTPException( 

76 status_code=404, 

77 detail=f"Deployer not found", 

78 ) 

79 devices = deployer.devices() 

80 if devices is None: 

81 raise HTTPException( 

82 status_code=500, 

83 detail="Error getting devices", 

84 ) 

85 return devices 

86 

87 

88@router.get("/{deployer_id}/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

89async def update_device_from_deployer(deployer_id: MongoId, device_id: DeviceId) -> DeviceFromDeployer: 

90 deployer = DeviceDeployer.get_from_id(deployer_id) 

91 if deployer is None: 

92 raise HTTPException( 

93 status_code=404, 

94 detail=f"Deployer not found", 

95 ) 

96 device = deployer.get_device(device_id=device_id) 

97 if device is None: 

98 raise HTTPException( 

99 status_code=404, 

100 detail=f"Device not found", 

101 ) 

102 return device 

103 

104 

105@router.patch("/{deployer_id}/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

106async def update_device_from_deployer( 

107 deployer_id: MongoId, 

108 device_id: DeviceId, 

109 device_update: DeviceUpdateFromDeployer, 

110) -> DeviceFromDeployer: 

111 deployer = DeviceDeployer.get_from_id(deployer_id) 

112 device = deployer.update_device(device_id, device_update) 

113 return device 

114 

115 

116@router.delete("/{deployer_id}/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

117async def delete_device_from_deployer(deployer_id: MongoId, device_id: DeviceId) -> bool: 

118 deployer = DeviceDeployer.get_from_id(deployer_id) 

119 if deployer is None: 

120 raise HTTPException( 

121 status_code=404, 

122 detail=f"Deployer not found", 

123 ) 

124 delete_info = deployer.delete_device(device_id=device_id) 

125 if not delete_info.is_deleted: 

126 raise HTTPException( 

127 status_code=500, 

128 detail=f"Cannot delete device: {delete_info.detail}", 

129 ) 

130 return delete_info.is_deleted