Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 98%

407 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-25 07:17 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 Signal, 

19 SignalData, 

20 SignalSample, 

21 ServicesStatus, 

22 Device, 

23 DeviceUpdate, 

24 DeviceSetup, 

25 DeviceSetupUpdate, 

26 DeviceState, 

27 SignalUpdate, 

28 SignalsData, 

29 Event, 

30 EventRule, 

31 TwinPadActivity, 

32 User, 

33 UserUpdate, 

34 Campaign, 

35 Phase, 

36 CustomView, 

37 Command, 

38 CustomViewCreation, 

39 CustomViewUpdate, 

40 Video, 

41 SignalsPreset, 

42 SignalsPresetCreation, 

43 SignalsPresetUpdate, 

44 PrivateGraphTheme, 

45 PublicGraphTheme, 

46 GraphThemeCreation, 

47 GraphThemeUpdate, 

48) 

49from twinpad_backend.auth import ( 

50 Token, 

51 authenticate_user, 

52 get_current_active_user, 

53 ACCESS_TOKEN_EXPIRE_MINUTES, 

54 create_access_token, 

55 get_password_hash, 

56) 

57from twinpad_backend.queries import ( 

58 SignalQuery, 

59 DeviceStatesQuery, 

60 EventQuery, 

61 EventRuleQuery, 

62 CommandQuery, 

63 GraphThemeQuery, 

64) 

65from twinpad_backend.responses import ListResponse 

66 

67REQUEST_TIME_WARNING = 0.5 

68 

69DEBUG = os.environ.get("DEBUG", "false") == "true" 

70PROFILING = os.environ.get("PROFILING", "false") == "true" 

71 

72logger = logging.getLogger("uvicorn.error") 

73logger.propagate = False 

74logger.info("Debug mode: %s", DEBUG) 

75logger.info("log level: %s", logging.root.level) 

76 

77 

78app = FastAPI(title="Twinpad backend", version=__version__) 

79 

80app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

81 

82if PROFILING: # pragma: no cover 

83 profiling_folder = mkdtemp() 

84 logger.info("Profiling enabled") 

85 

86 @app.middleware("http") 

87 async def profile_request(request: Request, call_next): 

88 should_profile = True 

89 url = str(request.url) 

90 for segment in ("profiling", ".ico"): 

91 if segment in url: 

92 should_profile = False 

93 break 

94 

95 if should_profile: # avoid recursion 

96 profiler = Profiler() 

97 profiler.start() 

98 await call_next(request) 

99 profiler.stop() 

100 url = "_".join(url.split("/")[3:]).rstrip("/") 

101 if not url: 

102 url = "slash" 

103 # filename = f"{round(time.time())}_{url}" 

104 filename = url.split("?", maxsplit=1)[0] 

105 logger.info("saving profiling to %s", filename) 

106 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

107 profiling_file.write(profiler.output_html()) 

108 return await call_next(request) 

109 

110 @app.get("/profilings") 

111 async def profilings(): 

112 return {"profilings": os.listdir(profiling_folder)} 

113 

114 @app.get("/profilings/{profiling_id}") 

115 async def profiling(profiling_id): 

116 

117 filename = os.path.join(profiling_folder, profiling_id) 

118 if os.path.exists(filename): 

119 with open(filename, "r", encoding="utf-8") as profiling: 

120 return HTMLResponse(profiling.read()) 

121 raise HTTPException( 

122 status_code=404, 

123 detail="Profiling not found", 

124 ) 

125 

126 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings") 

127 

128 

129@app.middleware("http") 

130async def log_request_time(request: Request, call_next): 

131 start_time = time.time() # Record the start time 

132 response = await call_next(request) # Process the request 

133 duration = time.time() - start_time # Calculate the time taken 

134 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

135 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

136 if duration > REQUEST_TIME_WARNING: 

137 logger.warning(message) 

138 else: 

139 logger.info(message) 

140 return response 

141 

142 

143@app.get("/") 

144async def slash(): 

145 return {"twinpad_version": __version__} 

146 

147 

148@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

149async def status(): 

150 """ 

151 Return service healthcheck 

152 """ 

153 return { 

154 "services": ServicesStatus.check(), 

155 } 

156 

157 

158@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

159async def get_devices() -> list[Device]: 

160 return Device.get_all(sort_by="device_id") 

161 

162 

163@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

164async def get_device(device_id): 

165 device = Device.get_one_by_attribute("device_id", device_id) 

166 if not device: 

167 raise HTTPException( 

168 status_code=404, 

169 detail="Device not found", 

170 ) 

171 return device 

172 

173 

174@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

175async def update_item( 

176 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

177): 

178 device = Device.get_one_by_attribute("device_id", device_id) 

179 if not device: 

180 raise HTTPException( 

181 status_code=404, 

182 detail="Device not found", 

183 ) 

184 result = await device.change_mode(device_update, current_user) 

185 if result.get("error", False) is True: 

186 raise HTTPException( 

187 status_code=result.get("status_code", 500), 

188 detail=result.get("message", "An error has occurred"), 

189 ) 

190 return result 

191 

192 

193@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

194async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

195 return DeviceState.get_from_id_and_query(device_id, query) 

196 

197 

198@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

199async def get_device_setups() -> list[DeviceSetup]: 

200 return DeviceSetup.get_all() 

201 

202 

203@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

204async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

205 device_setup.insert() 

206 return device_setup 

207 

208 

209@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

210async def get_device_setup(device_setup_id: str): 

211 device_setup = DeviceSetup.get_from_id(device_setup_id) 

212 if device_setup is None: 

213 raise HTTPException( 

214 status_code=404, 

215 detail="Device setup not found", 

216 ) 

217 return device_setup 

218 

219 

220@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

221async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

222 device_setup = DeviceSetup.get_from_id(device_setup_id) 

223 if device_setup is None: 

224 raise HTTPException( 

225 status_code=404, 

226 detail="Device setup not found", 

227 ) 

228 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

229 return device_setup 

230 

231 

232@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

233async def delete_device_setups(device_setup_id: str) -> bool: 

234 device_setup = DeviceSetup.get_from_id(device_setup_id) 

235 if device_setup is None: 

236 raise HTTPException( 

237 status_code=404, 

238 detail="Device setup not found", 

239 ) 

240 deleted = device_setup.delete() 

241 return deleted 

242 

243 

244@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

245async def get_number_samples( 

246 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

247) -> list[TwinPadActivity]: 

248 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

249 

250 

251@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

252async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

253 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

254 

255 

256@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

257async def signals_names() -> list[str]: 

258 return Signal.get_all_ids() 

259 

260 

261@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

262async def signal_stats(): 

263 """ 

264 Returns signals stats 

265 """ 

266 # profiler = Profiler() 

267 # profiler.start() 

268 

269 number_samples = 0 

270 number_active_signals = 0 

271 signals = [Signal.get_from_signal_id(signal_id=sid) for sid in get_signals_ids_from_collection_names()] 

272 

273 for s in signals: 

274 if s is not None: 

275 number_samples += await s.number_samples() 

276 if s.status.status == "up": 

277 number_active_signals += 1 

278 

279 number_signals = Signal.get_number_documents() 

280 

281 # profiler.stop() 

282 # filename = "signals_stats_profiling.html" 

283 # full_file_path = os.path.join(Path.home(), filename) 

284 # logger.info("Saving profiling to %s", full_file_path) 

285 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

286 # profiling_file.write(profiler.output_html()) 

287 

288 return { 

289 "signal_data_size": signal_datasize(), 

290 "number_signal_samples": number_samples, 

291 "number_active_signals": number_active_signals, 

292 "number_signals": number_signals, 

293 } 

294 

295 

296@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

297async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

298 return SignalSample.get_last_from_signal_ids(signal_ids) 

299 

300 

301@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)]) 

302async def get_last_values_interest_window( 

303 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0 

304) -> list[SignalSample | None]: 

305 # profiler = Profiler() 

306 # profiler.start() 

307 

308 result = SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp) 

309 

310 # profiler.stop() 

311 # filename = "fetch_last_point.html" 

312 # full_file_path = os.path.join(Path.home(), filename) 

313 # logger.info("Saving profiling to %s", full_file_path) 

314 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

315 # profiling_file.write(profiler.output_html()) 

316 

317 return result 

318 

319 

320@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

321async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

322 return SignalSample.get_first_from_signal_ids(signal_ids) 

323 

324 

325@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

326async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

327 return Signal.get_forcibility(signal_ids) 

328 

329 

330@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

331async def get_signal(signal_id): 

332 signal = Signal.get_from_signal_id(signal_id) 

333 if not signal: 

334 raise HTTPException( 

335 status_code=404, 

336 detail="Signal not found", 

337 ) 

338 return signal.to_dict() 

339 

340 

341@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

342async def update_signal( 

343 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

344): 

345 signal = Signal.get_from_signal_id(signal_id) 

346 if not signal: 

347 raise HTTPException( 

348 status_code=404, 

349 detail="Device not found", 

350 ) 

351 result = await signal.send_command(signal_update, current_user) 

352 if result.get("error", False) is True: 

353 raise HTTPException( 

354 status_code=result.get("status_code", 500), 

355 detail=result.get("message", "An error has occurred"), 

356 ) 

357 return result 

358 

359 

360@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

361async def get_signal_data( 

362 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

363) -> SignalData | None: 

364 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

365 

366 if number_samples_max is not None: 

367 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

368 

369 return signal_data 

370 

371 

372@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

373async def get_last_value(signal_id) -> SignalSample: 

374 sample = SignalSample.get_last_from_signal_id(signal_id) 

375 if sample is None: 

376 raise HTTPException(status_code=404, detail="No data") 

377 return sample 

378 

379 

380@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

381async def get_first_value(signal_id) -> SignalSample: 

382 sample = SignalSample.get_first_from_signal_id(signal_id) 

383 if sample is None: 

384 raise HTTPException(status_code=404, detail="No data") 

385 return sample 

386 

387 

388@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

389async def get_signal_number_samples(signal_id): 

390 signal = Signal.get_from_signal_id(signal_id) 

391 if not signal: 

392 raise HTTPException( 

393 status_code=404, 

394 detail="Device not found", 

395 ) 

396 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

397 

398 

399@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

400async def get_signals_data( 

401 signal_ids: list[str] = Query(default=[]), 

402 number_samples_max: int = None, 

403 min_timestamp: float = None, 

404 max_timestamp: float = None, 

405 interpolate_bounds: bool = True, 

406) -> SignalsData | None: 

407 # profiler = Profiler() 

408 # profiler.start() 

409 

410 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

411 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

412 

413 signals_data = SignalsData.get_from_signal_ids( 

414 signal_ids, 

415 min_timestamp=min_timestamp, 

416 max_timestamp=max_timestamp, 

417 window_min_timestamp=min_timestamp, 

418 window_max_timestamp=max_timestamp, 

419 interpolate_bounds=interpolate_bounds, 

420 ) 

421 if number_samples_max is not None: 

422 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

423 

424 # profiler.stop() 

425 # filename = "signals-data.html" 

426 # full_file_path = os.path.join(Path.home(), filename) 

427 # logger.info(f"Saving profiling to %s", full_file_path) 

428 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

429 # profiling_file.write(profiler.output_html()) 

430 

431 return signals_data 

432 

433 

434@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

435async def get_signals_data_interest_window( 

436 window_max_number_samples: int = 600, 

437 outside_max_number_samples: int = 150, 

438 window_min_timestamp: float = None, 

439 window_max_timestamp: float = None, 

440 signal_ids: list[str] = Query(default=[]), 

441 min_timestamp: float = None, 

442 max_timestamp: float = None, 

443) -> SignalsData | None: 

444 # profiler = Profiler() 

445 # profiler.start() 

446 

447 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

448 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

449 

450 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

451 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

452 

453 signals_data = SignalsData.get_from_signal_ids( 

454 signal_ids, 

455 min_timestamp=min_timestamp, 

456 max_timestamp=max_timestamp, 

457 window_min_timestamp=window_min_timestamp, 

458 window_max_timestamp=window_max_timestamp, 

459 max_documents=10 * (window_max_number_samples + outside_max_number_samples), 

460 ) 

461 

462 signals_data = signals_data.interest_window_desampling( 

463 window_max_number_samples=window_max_number_samples, 

464 outside_max_number_samples=outside_max_number_samples, 

465 window_min_timestamp=window_min_timestamp, 

466 window_max_timestamp=window_max_timestamp, 

467 ) 

468 

469 # profiler.stop() 

470 # filename = "signals-data.html" 

471 # full_file_path = os.path.join(Path.home(), filename) 

472 # logger.info(f"Saving profiling to %s", full_file_path) 

473 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

474 # profiling_file.write(profiler.output_html()) 

475 

476 return signals_data 

477 

478 

479@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

480async def export_signals_zip( 

481 file_format: str, 

482 signal_ids: list[str] = Query(default=[]), 

483 min_timestamp: float = None, 

484 max_timestamp: float = None, 

485): 

486 signals_data = SignalsData.get_from_signal_ids( 

487 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

488 ) 

489 zip_data = signals_data.zip_export(file_format) 

490 return Response( 

491 content=zip_data, 

492 media_type="application/zip", 

493 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

494 ) 

495 

496 

497@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

498async def export_signals_hdf5( 

499 signal_ids: list[str] = Query(default=[]), 

500 min_timestamp: float = None, 

501 max_timestamp: float = None, 

502): 

503 signals_data = SignalsData.get_from_signal_ids( 

504 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

505 ) 

506 data = signals_data.hdf5_export() 

507 return Response( 

508 content=data, 

509 media_type="application/hdf5", 

510 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

511 ) 

512 

513 

514@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

515async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

516 return Event.response_from_query(query) 

517 

518 

519@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

520async def get_event(event_id) -> Event: 

521 event = Event.get_from_id(event_id) 

522 if event is None: 

523 raise HTTPException(status_code=404, detail="No such event") 

524 return event 

525 

526 

527@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

528async def get_number_events( 

529 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

530) -> list[TwinPadActivity]: 

531 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

532 

533 

534@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

535async def get_number_commands( 

536 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

537) -> list[TwinPadActivity]: 

538 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

539 

540 

541@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

542async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

543 return EventRule.response_from_query(query) 

544 

545 

546@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

547async def get_event_rule(event_rule_id) -> EventRule: 

548 event_rule = EventRule.get_from_id(event_rule_id) 

549 if event_rule is None: 

550 raise HTTPException(status_code=404, detail="No such event rule") 

551 return event_rule 

552 

553 

554@app.post("/users", status_code=201) 

555async def create_user(user: User): 

556 if User.get_one_by_attribute("email", user.email) is not None: 

557 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

558 hashed_password = get_password_hash(user.password) 

559 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

560 if new_user is None: 

561 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

562 return new_user 

563 

564 

565@app.post("/token", status_code=201) 

566async def login_for_access_token( 

567 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

568) -> Token: 

569 user = authenticate_user(form_data.username, form_data.password) 

570 if not user: 

571 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

572 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

573 if user.is_active: 

574 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

575 access_token = create_access_token( 

576 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

577 ) 

578 return Token(access_token=access_token, token_type="bearer") 

579 

580 

581@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

582async def get_users(): 

583 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

584 

585 

586@app.get("/users/me", response_model=User) 

587async def get_current_user( 

588 current_user: Annotated[User, Depends(get_current_active_user)], 

589): 

590 del current_user.password 

591 return current_user 

592 

593 

594@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

595async def get_user(user_id: str): 

596 user = User.get_from_id(user_id) 

597 

598 if user is None: 

599 raise HTTPException( 

600 status_code=404, 

601 detail="User not found", 

602 ) 

603 return user.to_dict(exclude={"password", "is_connected"}) 

604 

605 

606@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

607async def patch_user(user: UserUpdate, user_id): 

608 if user.password == "" or user.password is None: 

609 del user.password 

610 else: 

611 user.password = get_password_hash(user.password) 

612 return User.update_info(user, user_id) 

613 

614 

615@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

616async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

617 return Command.response_from_query(query) 

618 

619 

620@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

621async def get_campaigns(): 

622 return Campaign.get_all() 

623 

624 

625@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

626async def get_campaign_by_id(campaign_id: str): 

627 campaign = Campaign.get_from_id(campaign_id) 

628 if campaign is None: 

629 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign") 

630 return campaign 

631 

632 

633@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

634async def add_campaign(campaign: Campaign): 

635 new_campaign = Campaign.create(campaign) 

636 if new_campaign is None: 

637 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

638 return new_campaign 

639 

640 

641@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

642async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

643 campaign = Campaign.get_from_id(campaign_id) 

644 if campaign is None: 

645 raise HTTPException(status_code=500, detail="An error occurred during campaign edition") 

646 campaign.name = edit_campaign.name 

647 campaign.description = edit_campaign.description 

648 return Campaign.update(campaign) 

649 

650 

651@app.delete( 

652 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

653) 

654async def delete_campaign(campaign_id: str): 

655 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

656 campaign = Campaign.get_from_id(campaign_id) 

657 if campaign is None: 

658 raise exception 

659 delete_phases = Phase.deleteMany(campaign_id) 

660 if not delete_phases.acknowledged: 

661 raise exception 

662 campaign_deleted = Campaign.delete(campaign_id) 

663 return campaign_deleted.acknowledged 

664 

665 

666@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

667async def get_campaign_phases(campaign_id: str): 

668 return Phase.get_by_attribute("campaign_id", campaign_id) 

669 

670 

671@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

672async def get_phase(phase_id: str): 

673 phase = Phase.get_from_id(phase_id) 

674 if phase is None: 

675 raise HTTPException(status_code=404, detail="Phase not found") 

676 return phase 

677 

678 

679@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

680async def add_phase(phase: Phase): 

681 new_phase = Phase.create(phase) 

682 if new_phase is None: 

683 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

684 return new_phase 

685 

686 

687@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

688async def edit_phase(phase_id, edit_phase: Phase): 

689 phase = Phase.get_from_id(phase_id) 

690 if phase is None: 

691 raise HTTPException(status_code=500, detail="An error occurred during Phase edition") 

692 phase.name = edit_phase.name 

693 phase.description = edit_phase.description 

694 phase.start_at = edit_phase.start_at 

695 phase.end_at = edit_phase.end_at 

696 return Phase.update(phase) 

697 

698 

699@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

700async def delete_phase(phase_id: str): 

701 phase = Phase.get_from_id(phase_id) 

702 if phase is None: 

703 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion") 

704 phase_deleted = Phase.delete(phase_id) 

705 return phase_deleted.acknowledged 

706 

707 

708@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

709async def get_custom_views(): 

710 return CustomView.get_all() 

711 

712 

713@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

714async def get_custom_views_from_user_id(user_id: str): 

715 return CustomView.get_by_attribute("user_id", user_id) 

716 

717 

718@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

719async def get_custom_view(custom_view_id: str): 

720 return CustomView.get_from_id(custom_view_id) 

721 

722 

723@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

724async def create_custom_view( 

725 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

726): 

727 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

728 custom_view.insert() 

729 return custom_view 

730 

731 

732@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

733async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

734 custom_view = CustomView.get_from_id(custom_view_id) 

735 return custom_view.update(custom_view_update.model_dump()) 

736 

737 

738@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

739async def delete_custom_view(custom_view_id: str): 

740 custom_view = CustomView.get_from_id(custom_view_id) 

741 return custom_view.delete() 

742 

743 

744@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

745async def add_video(video: Video): 

746 video.insert() 

747 if not video: 

748 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

749 return video 

750 

751 

752@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

753async def get_videos(): 

754 return Video.get_all() 

755 

756 

757@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

758def get_stream(video_id): 

759 camera_name = Video.get_video(video_id) 

760 if camera_name is None: 

761 raise HTTPException(status_code=404, detail="Camera not found") 

762 return camera_name 

763 

764 

765@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

766async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

767 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

768 

769 

770@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

771async def create_signals_preset( 

772 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

773): 

774 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

775 return new_signals_preset 

776 

777 

778@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

779async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

780 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

781 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

782 

783 

784@app.delete( 

785 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

786) 

787async def delete_signals_preset(signals_preset_id: str): 

788 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

789 return signals_preset.delete() 

790 

791 

792@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

793async def create_graph_theme( 

794 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

795): 

796 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

797 if styled_signal is None: 

798 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

799 

800 graph_theme = PrivateGraphTheme.create( 

801 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

802 ) 

803 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

804 

805 

806@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

807async def get_all_graph_themes( 

808 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

809) -> ListResponse[PublicGraphTheme]: 

810 return PublicGraphTheme.response_from_query(query, current_user.id) 

811 

812 

813@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

814async def get_graph_themes_in_library( 

815 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

816) -> ListResponse[PublicGraphTheme]: 

817 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

818 

819 

820@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

821async def update_graph_theme( 

822 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

823): 

824 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

825 update_dict = theme_update.model_dump(exclude_unset=True) 

826 if current_user.id != graph_theme.creator_id: 

827 for theme_property in update_dict.keys(): 

828 if theme_property not in ["active_for_user", "in_user_library"]: 

829 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

830 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

831 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

832 

833 

834@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

835async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

836 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

837 if current_user.id != graph_theme.creator_id: 

838 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

839 return graph_theme.delete() 

840 

841 

842@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

843async def get_signals_appearances( 

844 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

845) -> dict: 

846 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)