Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 81%

382 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 14:27 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 Signal, 

19 SignalData, 

20 SignalSample, 

21 ServicesStatus, 

22 Device, 

23 DeviceUpdate, 

24 DeviceSetup, 

25 DeviceSetupUpdate, 

26 DeviceState, 

27 SignalUpdate, 

28 SignalsData, 

29 Event, 

30 EventRule, 

31 EventDay, 

32 User, 

33 UserUpdate, 

34 Campaign, 

35 Phase, 

36 CustomView, 

37 Command, 

38 CustomViewCreation, 

39 CustomViewUpdate, 

40 Video, 

41) 

42from twinpad_backend.auth import ( 

43 Token, 

44 authenticate_user, 

45 get_current_active_user, 

46 ACCESS_TOKEN_EXPIRE_MINUTES, 

47 create_access_token, 

48 get_password_hash, 

49) 

50from twinpad_backend.queries import SignalQuery, DeviceStatesQuery, EventQuery, EventRuleQuery, CommandQuery 

51from twinpad_backend.responses import ListResponse 

52 

53REQUEST_TIME_WARNING = 0.5 

54 

55DEBUG = os.environ.get("DEBUG", "false") == "true" 

56PROFILING = os.environ.get("PROFILING", "false") == "true" 

57 

58logger = logging.getLogger("uvicorn.error") 

59logger.propagate = False 

60logger.info("Debug mode: %s", DEBUG) 

61logger.info("log level: %s", logging.root.level) 

62 

63 

64app = FastAPI(title="Twinpad backend", version=__version__) 

65 

66app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

67 

68if PROFILING: 

69 profiling_folder = mkdtemp() 

70 logger.info("Profiling enabled") 

71 

72 @app.middleware("http") 

73 async def profile_request(request: Request, call_next): 

74 should_profile = True 

75 url = str(request.url) 

76 for segment in ["profiling", ".ico"]: 

77 if segment in url: 

78 should_profile = False 

79 break 

80 

81 if should_profile: # avoid recursion 

82 profiler = Profiler() 

83 profiler.start() 

84 await call_next(request) 

85 profiler.stop() 

86 url = "_".join(url.split("/")[3:]).rstrip("/") 

87 if not url: 

88 url = "slash" 

89 # filename = f"{round(time.time())}_{url}" 

90 filename = url.split("?", maxsplit=1)[0] 

91 logger.info("saving profiling to %s", filename) 

92 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

93 profiling_file.write(profiler.output_html()) 

94 return await call_next(request) 

95 

96 @app.get("/profilings") 

97 async def profilings(): 

98 return {"profilings": os.listdir(profiling_folder)} 

99 

100 @app.get("/profilings/{profiling_id}") 

101 async def profiling(profiling_id): 

102 

103 filename = os.path.join(profiling_folder, profiling_id) 

104 if os.path.exists(filename): 

105 with open(filename, "r", encoding="utf-8") as profiling: 

106 return HTMLResponse(profiling.read()) 

107 raise HTTPException( 

108 status_code=404, 

109 detail="Profiling not found", 

110 ) 

111 

112 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings") 

113 

114 

115@app.middleware("http") 

116async def log_request_time(request: Request, call_next): 

117 start_time = time.time() # Record the start time 

118 response = await call_next(request) # Process the request 

119 duration = time.time() - start_time # Calculate the time taken 

120 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

121 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

122 if duration > REQUEST_TIME_WARNING: 

123 logger.warning(message) 

124 else: 

125 logger.info(message) 

126 return response 

127 

128 

129@app.get("/") 

130async def slash(): 

131 return {"twinpad_version": __version__} 

132 

133 

134@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

135async def status(): 

136 """ 

137 Return service healthcheck 

138 """ 

139 return { 

140 "services": ServicesStatus.check(), 

141 } 

142 

143 

144@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

145async def get_devices() -> list[Device]: 

146 return Device.get_all(sort_by="device_id") 

147 

148 

149@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

150async def get_device(device_id): 

151 device = Device.get_one_by_attribute("device_id", device_id) 

152 if not device: 

153 raise HTTPException( 

154 status_code=404, 

155 detail="Device not found", 

156 ) 

157 return device 

158 

159 

160@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

161async def update_item( 

162 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

163): 

164 device = Device.get_one_by_attribute("device_id", device_id) 

165 if not device: 

166 raise HTTPException( 

167 status_code=404, 

168 detail="Device not found", 

169 ) 

170 result = await device.update(device_update, current_user) 

171 if result.get("error", False) is True: 

172 raise HTTPException( 

173 status_code=result.get("status_code", 500), 

174 detail=result.get("message", "An error has occurred"), 

175 ) 

176 return result 

177 

178 

179@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

180async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

181 return DeviceState.get_from_id_and_query(device_id, query) 

182 

183 

184@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

185async def get_device_setups() -> list[DeviceSetup]: 

186 return DeviceSetup.get_all() 

187 

188 

189@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

190async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

191 device_setup.insert() 

192 return device_setup 

193 

194 

195@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

196async def get_device_setup(device_setup_id: str): 

197 device_setup = DeviceSetup.get_from_id(device_setup_id) 

198 if device_setup is None: 

199 raise HTTPException( 

200 status_code=404, 

201 detail="Device setup not found", 

202 ) 

203 return device_setup 

204 

205 

206@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

207async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

208 device_setup = DeviceSetup.get_from_id(device_setup_id) 

209 if device_setup is None: 

210 raise HTTPException( 

211 status_code=404, 

212 detail="Device setup not found", 

213 ) 

214 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

215 return device_setup 

216 

217 

218@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

219async def delete_device_setups(device_setup_id: str) -> bool: 

220 device_setup = DeviceSetup.get_from_id(device_setup_id) 

221 if device_setup is None: 

222 raise HTTPException( 

223 status_code=404, 

224 detail="Device setup not found", 

225 ) 

226 deleted = device_setup.delete() 

227 return deleted 

228 

229 

230@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

231async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

232 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

233 

234 

235@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

236async def signals_names() -> list[str]: 

237 return Signal.get_all_ids() 

238 

239 

240@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

241async def signal_stats(): 

242 """ 

243 Returns signals stats 

244 """ 

245 # profiler = Profiler() 

246 # profiler.start() 

247 

248 number_samples = 0 

249 number_active_signals = 0 

250 signals = [Signal.get_from_signal_id(signal_id=sid) for sid in get_signals_ids_from_collection_names()] 

251 

252 for s in signals: 

253 if s is not None: 

254 number_samples += await s.number_samples() 

255 if s.status.status == "up": 

256 number_active_signals += 1 

257 

258 number_signals = Signal.get_number_documents() 

259 

260 # profiler.stop() 

261 # filename = "signals_stats_profiling.html" 

262 # full_file_path = os.path.join(Path.home(), filename) 

263 # logger.info("Saving profiling to %s", full_file_path) 

264 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

265 # profiling_file.write(profiler.output_html()) 

266 

267 return { 

268 "signal_data_size": signal_datasize(), 

269 "number_signal_samples": number_samples, 

270 "number_active_signals": number_active_signals, 

271 "number_signals": number_signals, 

272 } 

273 

274 

275@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

276async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

277 return SignalSample.get_last_from_signal_ids(signal_ids) 

278 

279 

280@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)]) 

281async def get_last_values_interest_window( 

282 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0 

283) -> list[SignalSample | None]: 

284 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp) 

285 

286 

287@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

288async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

289 return SignalSample.get_first_from_signal_ids(signal_ids) 

290 

291 

292@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

293async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

294 return Signal.get_forcibility(signal_ids) 

295 

296 

297@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

298async def get_signal(signal_id): 

299 signal = Signal.get_from_signal_id(signal_id) 

300 if not signal: 

301 raise HTTPException( 

302 status_code=404, 

303 detail="Signal not found", 

304 ) 

305 return signal.to_dict() 

306 

307 

308@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

309async def update_signal( 

310 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

311): 

312 signal = Signal.get_from_signal_id(signal_id) 

313 if not signal: 

314 raise HTTPException( 

315 status_code=404, 

316 detail="Device not found", 

317 ) 

318 result = await signal.update(signal_update, current_user) 

319 if result.get("error", False) is True: 

320 raise HTTPException( 

321 status_code=result.get("status_code", 500), 

322 detail=result.get("message", "An error has occurred"), 

323 ) 

324 return result 

325 

326 

327@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

328async def get_signal_data( 

329 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

330) -> SignalData | None: 

331 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

332 

333 if number_samples_max is not None: 

334 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

335 

336 return signal_data 

337 

338 

339@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

340async def get_last_value(signal_id) -> SignalSample: 

341 sample = SignalSample.get_last_from_signal_id(signal_id) 

342 if sample is None: 

343 raise HTTPException(status_code=404, detail="No data") 

344 return sample 

345 

346 

347@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

348async def get_first_value(signal_id) -> SignalSample: 

349 sample = SignalSample.get_first_from_signal_id(signal_id) 

350 if sample is None: 

351 raise HTTPException(status_code=404, detail="No data") 

352 return sample 

353 

354 

355@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

356async def get_signal_number_samples(signal_id): 

357 signal = Signal.get_from_signal_id(signal_id) 

358 if not signal: 

359 raise HTTPException( 

360 status_code=404, 

361 detail="Device not found", 

362 ) 

363 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

364 

365 

366@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

367async def get_signals_data( 

368 signal_ids: list[str] = Query(default=[]), 

369 number_samples_max: int = None, 

370 min_timestamp: float = None, 

371 max_timestamp: float = None, 

372 interpolate_bounds: bool = True, 

373) -> SignalsData | None: 

374 # profiler = Profiler() 

375 # profiler.start() 

376 

377 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

378 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

379 

380 signals_data = SignalsData.get_from_signal_ids( 

381 signal_ids, 

382 min_timestamp=min_timestamp, 

383 max_timestamp=max_timestamp, 

384 window_min_timestamp=min_timestamp, 

385 window_max_timestamp=max_timestamp, 

386 interpolate_bounds=interpolate_bounds, 

387 ) 

388 if number_samples_max is not None: 

389 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

390 

391 # profiler.stop() 

392 # filename = "signals-data.html" 

393 # full_file_path = os.path.join(Path.home(), filename) 

394 # logger.info(f"Saving profiling to %s", full_file_path) 

395 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

396 # profiling_file.write(profiler.output_html()) 

397 

398 return signals_data 

399 

400 

401@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

402async def get_signals_data_interest_window( 

403 window_max_number_samples: int = 600, 

404 outside_max_number_samples: int = 150, 

405 window_min_timestamp: float = None, 

406 window_max_timestamp: float = None, 

407 signal_ids: list[str] = Query(default=[]), 

408 min_timestamp: float = None, 

409 max_timestamp: float = None, 

410) -> SignalsData | None: 

411 # profiler = Profiler() 

412 # profiler.start() 

413 

414 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

415 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

416 

417 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

418 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

419 

420 signals_data = SignalsData.get_from_signal_ids( 

421 signal_ids, 

422 min_timestamp=min_timestamp, 

423 max_timestamp=max_timestamp, 

424 window_min_timestamp=window_min_timestamp, 

425 window_max_timestamp=window_max_timestamp, 

426 max_documents=10 * (window_max_number_samples + outside_max_number_samples), 

427 ) 

428 

429 signals_data = signals_data.interest_window_desampling( 

430 window_max_number_samples=window_max_number_samples, 

431 outside_max_number_samples=outside_max_number_samples, 

432 window_min_timestamp=window_min_timestamp, 

433 window_max_timestamp=window_max_timestamp, 

434 ) 

435 

436 # profiler.stop() 

437 # filename = "signals-data.html" 

438 # full_file_path = os.path.join(Path.home(), filename) 

439 # logger.info(f"Saving profiling to %s", full_file_path) 

440 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

441 # profiling_file.write(profiler.output_html()) 

442 

443 return signals_data 

444 

445 

446@app.get("/signals-data/export", dependencies=[Depends(get_current_active_user)]) 

447async def export_signals_zip( 

448 file_format: str, 

449 signal_ids: list[str] = Query(default=[]), 

450 min_timestamp: float = None, 

451 max_timestamp: float = None, 

452): 

453 signals_data = SignalsData.get_from_signal_ids( 

454 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

455 ) 

456 zip_data = signals_data.zip_export(file_format) 

457 return Response( 

458 content=zip_data, 

459 media_type="application/zip", 

460 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

461 ) 

462 

463 

464@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

465async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

466 return Event.response_from_query(query) 

467 

468 

469@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

470async def get_event(event_id) -> Event: 

471 event = Event.get_from_id(event_id) 

472 if event is None: 

473 raise HTTPException(status_code=404, detail="No such event") 

474 return event 

475 

476 

477@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

478async def get_number_events( 

479 min_timestamp: float | int, max_timestamp: float | int, recompute_events: bool = False 

480) -> list[EventDay]: 

481 return EventDay.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_events) 

482 

483 

484@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

485async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

486 return EventRule.response_from_query(query) 

487 

488 

489@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

490async def get_event_rule(event_rule_id) -> EventRule: 

491 event_rule = EventRule.get_from_id(event_rule_id) 

492 if event_rule is None: 

493 raise HTTPException(status_code=404, detail="No such event rule") 

494 return event_rule 

495 

496 

497@app.post("/users", status_code=201) 

498async def create_user(user: User): 

499 if User.get_one_by_attribute("user", user.email) is not None: 

500 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

501 hashed_password = get_password_hash(user.password) 

502 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

503 if new_user is None: 

504 raise HTTPException(status_code=400, details="An error occurred during account creation") 

505 return new_user 

506 

507 

508@app.post("/token", status_code=201) 

509async def login_for_access_token( 

510 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

511) -> Token: 

512 user = authenticate_user(form_data.username, form_data.password) 

513 if not user: 

514 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

515 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

516 if user.is_active: 

517 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

518 access_token = create_access_token( 

519 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

520 ) 

521 return Token(access_token=access_token, token_type="bearer") 

522 

523 

524@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

525async def get_users(): 

526 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

527 

528 

529@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

530async def get_user(user_id: str, current_user: Annotated[User, Depends(get_current_active_user)]): 

531 user = None 

532 

533 if user_id == "me": 

534 user = current_user 

535 else: 

536 user = User.get_from_id(user_id) 

537 

538 if user is None: 

539 raise HTTPException( 

540 status_code=404, 

541 detail="User not found", 

542 ) 

543 return user 

544 

545 

546@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

547async def patch_user(user: UserUpdate, user_id): 

548 if user.password == "" or user.password is None: 

549 del user.password 

550 else: 

551 user.password = get_password_hash(user.password) 

552 return User.update(user, user_id) 

553 

554 

555@app.post("/users/me", response_model=User) 

556async def read_users_me( 

557 current_user: Annotated[User, Depends(get_current_active_user)], 

558): 

559 del (current_user.password, current_user.is_active, current_user.is_connected) 

560 return current_user 

561 

562 

563@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

564async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

565 return Command.response_from_query(query) 

566 

567 

568@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

569async def get_campaigns(): 

570 return Campaign.get_all() 

571 

572 

573@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

574async def get_campaign_by_id(campaign_id: str): 

575 campaign = Campaign.get_from_id(campaign_id) 

576 if campaign is None: 

577 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign") 

578 return campaign 

579 

580 

581@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

582async def add_campaign(campaign: Campaign): 

583 new_campaign = Campaign.create(campaign) 

584 if new_campaign is None: 

585 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

586 return new_campaign 

587 

588 

589@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

590async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

591 campaign = Campaign.get_from_id(campaign_id) 

592 if campaign is None: 

593 raise HTTPException(status_code=500, detail="An error occurred during campaign edition") 

594 campaign.name = edit_campaign.name 

595 campaign.description = edit_campaign.description 

596 return Campaign.update(campaign) 

597 

598 

599@app.delete( 

600 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

601) 

602async def delete_campaign(campaign_id: str): 

603 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

604 campaign = Campaign.get_from_id(campaign_id) 

605 if campaign is None: 

606 raise exception 

607 delete_phases = Phase.deleteMany(campaign_id) 

608 if not delete_phases.acknowledged: 

609 raise exception 

610 campaign_deleted = Campaign.delete(campaign_id) 

611 return campaign_deleted.acknowledged 

612 

613 

614@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

615async def get_campaign_phases(campaign_id: str): 

616 return Phase.get_by_attribute("campaign_id", campaign_id) 

617 

618 

619@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

620async def get_phase(phase_id: str): 

621 return Phase.get_from_id(phase_id) 

622 

623 

624@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

625async def add_phase(phase: Phase): 

626 new_phase = Phase.create(phase) 

627 if new_phase is None: 

628 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

629 return new_phase 

630 

631 

632@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

633async def edit_phase(phase_id, edit_phase: Phase): 

634 phase = Phase.get_from_id(phase_id) 

635 if phase is None: 

636 raise HTTPException(status_code=500, detail="An error occurred during Phase edition") 

637 phase.name = edit_phase.name 

638 phase.description = edit_phase.description 

639 phase.start_at = edit_phase.start_at 

640 phase.end_at = edit_phase.end_at 

641 return Phase.update(phase) 

642 

643 

644@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

645async def delete_phase(phase_id: str): 

646 phase = Phase.get_from_id(phase_id) 

647 if phase is None: 

648 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion") 

649 phase_deleted = Phase.delete(phase_id) 

650 return phase_deleted.acknowledged 

651 

652 

653@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

654async def get_custom_views(): 

655 return CustomView.get_all() 

656 

657 

658@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

659async def get_custom_views_from_user_id(user_id: str): 

660 return CustomView.get_by_attribute("user_id", user_id) 

661 

662 

663@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

664async def get_custom_view(custom_view_id: str): 

665 return CustomView.get_from_id(custom_view_id) 

666 

667 

668@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

669async def create_custom_view( 

670 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

671): 

672 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

673 custom_view.insert() 

674 return custom_view 

675 

676 

677@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

678async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

679 custom_view = CustomView.get_from_id(custom_view_id) 

680 return custom_view.update(custom_view_update.model_dump()) 

681 

682 

683@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

684async def delete_custom_view(custom_view_id: str): 

685 custom_view = CustomView.get_from_id(custom_view_id) 

686 return custom_view.delete() 

687 

688 

689@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

690async def add_video(video: Video): 

691 video.insert() 

692 if not video: 

693 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

694 return video 

695 

696 

697@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

698async def get_videos(): 

699 return Video.get_all() 

700 

701 

702@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

703def get_stream(video_id): 

704 camera_name = Video.get_video(video_id) 

705 if not camera_name: 

706 raise HTTPException(status_code=404, detail="Camera not found") 

707 return camera_name