Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 98%

406 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-16 15:32 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 Signal, 

19 SignalData, 

20 SignalSample, 

21 ServicesStatus, 

22 Device, 

23 DeviceUpdate, 

24 DeviceSetup, 

25 DeviceSetupUpdate, 

26 DeviceState, 

27 SignalUpdate, 

28 SignalsData, 

29 Event, 

30 EventRule, 

31 TwinPadActivity, 

32 User, 

33 UserUpdate, 

34 Campaign, 

35 Phase, 

36 CustomView, 

37 Command, 

38 CustomViewCreation, 

39 CustomViewUpdate, 

40 Video, 

41 SignalsPreset, 

42 SignalsPresetCreation, 

43 SignalsPresetUpdate, 

44 PrivateGraphTheme, 

45 PublicGraphTheme, 

46 GraphThemeCreation, 

47 GraphThemeUpdate, 

48) 

49from twinpad_backend.auth import ( 

50 Token, 

51 authenticate_user, 

52 get_current_active_user, 

53 ACCESS_TOKEN_EXPIRE_MINUTES, 

54 create_access_token, 

55 get_password_hash, 

56) 

57from twinpad_backend.queries import ( 

58 SignalQuery, 

59 DeviceStatesQuery, 

60 EventQuery, 

61 EventRuleQuery, 

62 CommandQuery, 

63 GraphThemeQuery, 

64) 

65from twinpad_backend.responses import ListResponse 

66 

67REQUEST_TIME_WARNING = 0.5 

68 

69DEBUG = os.environ.get("DEBUG", "false") == "true" 

70PROFILING = os.environ.get("PROFILING", "false") == "true" 

71 

72logger = logging.getLogger("uvicorn.error") 

73logger.propagate = False 

74logger.info("Debug mode: %s", DEBUG) 

75logger.info("log level: %s", logging.root.level) 

76 

77 

78app = FastAPI(title="Twinpad backend", version=__version__) 

79 

80app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

81 

82if PROFILING: # pragma: no cover 

83 profiling_folder = mkdtemp() 

84 logger.info("Profiling enabled") 

85 

86 @app.middleware("http") 

87 async def profile_request(request: Request, call_next): 

88 should_profile = True 

89 url = str(request.url) 

90 for segment in ("profiling", ".ico"): 

91 if segment in url: 

92 should_profile = False 

93 break 

94 

95 if should_profile: # avoid recursion 

96 profiler = Profiler() 

97 profiler.start() 

98 await call_next(request) 

99 profiler.stop() 

100 url = "_".join(url.split("/")[3:]).rstrip("/") 

101 if not url: 

102 url = "slash" 

103 # filename = f"{round(time.time())}_{url}" 

104 filename = url.split("?", maxsplit=1)[0] 

105 logger.info("saving profiling to %s", filename) 

106 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

107 profiling_file.write(profiler.output_html()) 

108 return await call_next(request) 

109 

110 @app.get("/profilings") 

111 async def profilings(): 

112 return {"profilings": os.listdir(profiling_folder)} 

113 

114 @app.get("/profilings/{profiling_id}") 

115 async def profiling(profiling_id): 

116 

117 filename = os.path.join(profiling_folder, profiling_id) 

118 if os.path.exists(filename): 

119 with open(filename, "r", encoding="utf-8") as profiling: 

120 return HTMLResponse(profiling.read()) 

121 raise HTTPException( 

122 status_code=404, 

123 detail="Profiling not found", 

124 ) 

125 

126 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings") 

127 

128 

129@app.middleware("http") 

130async def log_request_time(request: Request, call_next): 

131 start_time = time.time() # Record the start time 

132 response = await call_next(request) # Process the request 

133 duration = time.time() - start_time # Calculate the time taken 

134 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

135 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

136 if duration > REQUEST_TIME_WARNING: 

137 logger.warning(message) 

138 else: 

139 logger.info(message) 

140 return response 

141 

142 

143@app.get("/") 

144async def slash(): 

145 return {"twinpad_version": __version__} 

146 

147 

148@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

149async def status(): 

150 """ 

151 Return service healthcheck 

152 """ 

153 return { 

154 "services": ServicesStatus.check(), 

155 } 

156 

157 

158@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

159async def get_devices() -> list[Device]: 

160 return Device.get_all(sort_by="device_id") 

161 

162 

163@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

164async def get_device(device_id): 

165 device = Device.get_one_by_attribute("device_id", device_id) 

166 if not device: 

167 raise HTTPException( 

168 status_code=404, 

169 detail="Device not found", 

170 ) 

171 return device 

172 

173 

174@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

175async def update_item( 

176 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

177): 

178 device = Device.get_one_by_attribute("device_id", device_id) 

179 if not device: 

180 raise HTTPException( 

181 status_code=404, 

182 detail="Device not found", 

183 ) 

184 result = await device.change_mode(device_update, current_user) 

185 if result.get("error", False) is True: 

186 raise HTTPException( 

187 status_code=result.get("status_code", 500), 

188 detail=result.get("message", "An error has occurred"), 

189 ) 

190 return result 

191 

192 

193@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

194async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

195 return DeviceState.get_from_id_and_query(device_id, query) 

196 

197 

198@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

199async def get_device_setups() -> list[DeviceSetup]: 

200 return DeviceSetup.get_all() 

201 

202 

203@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

204async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

205 device_setup.insert() 

206 return device_setup 

207 

208 

209@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

210async def get_device_setup(device_setup_id: str): 

211 device_setup = DeviceSetup.get_from_id(device_setup_id) 

212 if device_setup is None: 

213 raise HTTPException( 

214 status_code=404, 

215 detail="Device setup not found", 

216 ) 

217 return device_setup 

218 

219 

220@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

221async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

222 device_setup = DeviceSetup.get_from_id(device_setup_id) 

223 if device_setup is None: 

224 raise HTTPException( 

225 status_code=404, 

226 detail="Device setup not found", 

227 ) 

228 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

229 return device_setup 

230 

231 

232@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

233async def delete_device_setups(device_setup_id: str) -> bool: 

234 device_setup = DeviceSetup.get_from_id(device_setup_id) 

235 if device_setup is None: 

236 raise HTTPException( 

237 status_code=404, 

238 detail="Device setup not found", 

239 ) 

240 deleted = device_setup.delete() 

241 return deleted 

242 

243 

244@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

245async def get_number_samples( 

246 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

247) -> list[TwinPadActivity]: 

248 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

249 

250 

251@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

252async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

253 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

254 

255 

256@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

257async def signals_names() -> list[str]: 

258 return Signal.get_all_ids() 

259 

260 

261@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

262async def signal_stats(): 

263 """ 

264 Returns signals stats 

265 """ 

266 # profiler = Profiler() 

267 # profiler.start() 

268 

269 number_samples = 0 

270 number_active_signals = 0 

271 signals = [Signal.get_from_signal_id(signal_id=sid) for sid in get_signals_ids_from_collection_names()] 

272 

273 for s in signals: 

274 if s is not None: 

275 number_samples += await s.number_samples() 

276 if s.status.status == "up": 

277 number_active_signals += 1 

278 

279 number_signals = Signal.get_number_documents() 

280 

281 # profiler.stop() 

282 # filename = "signals_stats_profiling.html" 

283 # full_file_path = os.path.join(Path.home(), filename) 

284 # logger.info("Saving profiling to %s", full_file_path) 

285 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

286 # profiling_file.write(profiler.output_html()) 

287 

288 return { 

289 "signal_data_size": signal_datasize(), 

290 "number_signal_samples": number_samples, 

291 "number_active_signals": number_active_signals, 

292 "number_signals": number_signals, 

293 } 

294 

295 

296@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

297async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

298 return SignalSample.get_last_from_signal_ids(signal_ids) 

299 

300 

301@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)]) 

302async def get_last_values_interest_window( 

303 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0 

304) -> list[SignalSample | None]: 

305 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp) 

306 

307 

308@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

309async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

310 return SignalSample.get_first_from_signal_ids(signal_ids) 

311 

312 

313@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

314async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

315 return Signal.get_forcibility(signal_ids) 

316 

317 

318@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

319async def get_signal(signal_id): 

320 signal = Signal.get_from_signal_id(signal_id) 

321 if not signal: 

322 raise HTTPException( 

323 status_code=404, 

324 detail="Signal not found", 

325 ) 

326 return signal.to_dict() 

327 

328 

329@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

330async def update_signal( 

331 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

332): 

333 signal = Signal.get_from_signal_id(signal_id) 

334 if not signal: 

335 raise HTTPException( 

336 status_code=404, 

337 detail="Device not found", 

338 ) 

339 result = await signal.send_command(signal_update, current_user) 

340 if result.get("error", False) is True: 

341 raise HTTPException( 

342 status_code=result.get("status_code", 500), 

343 detail=result.get("message", "An error has occurred"), 

344 ) 

345 return result 

346 

347 

348@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

349async def get_signal_data( 

350 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

351) -> SignalData | None: 

352 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

353 

354 if number_samples_max is not None: 

355 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

356 

357 return signal_data 

358 

359 

360@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

361async def get_last_value(signal_id) -> SignalSample: 

362 sample = SignalSample.get_last_from_signal_id(signal_id) 

363 if sample is None: 

364 raise HTTPException(status_code=404, detail="No data") 

365 return sample 

366 

367 

368@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

369async def get_first_value(signal_id) -> SignalSample: 

370 sample = SignalSample.get_first_from_signal_id(signal_id) 

371 if sample is None: 

372 raise HTTPException(status_code=404, detail="No data") 

373 return sample 

374 

375 

376@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

377async def get_signal_number_samples(signal_id): 

378 signal = Signal.get_from_signal_id(signal_id) 

379 if not signal: 

380 raise HTTPException( 

381 status_code=404, 

382 detail="Device not found", 

383 ) 

384 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

385 

386 

387@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

388async def get_signals_data( 

389 signal_ids: list[str] = Query(default=[]), 

390 number_samples_max: int = None, 

391 min_timestamp: float = None, 

392 max_timestamp: float = None, 

393 interpolate_bounds: bool = True, 

394) -> SignalsData | None: 

395 # profiler = Profiler() 

396 # profiler.start() 

397 

398 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

399 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

400 

401 signals_data = SignalsData.get_from_signal_ids( 

402 signal_ids, 

403 min_timestamp=min_timestamp, 

404 max_timestamp=max_timestamp, 

405 window_min_timestamp=min_timestamp, 

406 window_max_timestamp=max_timestamp, 

407 interpolate_bounds=interpolate_bounds, 

408 ) 

409 if number_samples_max is not None: 

410 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

411 

412 # profiler.stop() 

413 # filename = "signals-data.html" 

414 # full_file_path = os.path.join(Path.home(), filename) 

415 # logger.info(f"Saving profiling to %s", full_file_path) 

416 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

417 # profiling_file.write(profiler.output_html()) 

418 

419 return signals_data 

420 

421 

422@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

423async def get_signals_data_interest_window( 

424 window_max_number_samples: int = 600, 

425 outside_max_number_samples: int = 150, 

426 window_min_timestamp: float = None, 

427 window_max_timestamp: float = None, 

428 signal_ids: list[str] = Query(default=[]), 

429 min_timestamp: float = None, 

430 max_timestamp: float = None, 

431) -> SignalsData | None: 

432 # profiler = Profiler() 

433 # profiler.start() 

434 

435 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

436 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

437 

438 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

439 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

440 

441 signals_data = SignalsData.get_from_signal_ids( 

442 signal_ids, 

443 min_timestamp=min_timestamp, 

444 max_timestamp=max_timestamp, 

445 window_min_timestamp=window_min_timestamp, 

446 window_max_timestamp=window_max_timestamp, 

447 max_documents=10 * (window_max_number_samples + outside_max_number_samples), 

448 ) 

449 

450 signals_data = signals_data.interest_window_desampling( 

451 window_max_number_samples=window_max_number_samples, 

452 outside_max_number_samples=outside_max_number_samples, 

453 window_min_timestamp=window_min_timestamp, 

454 window_max_timestamp=window_max_timestamp, 

455 ) 

456 

457 # profiler.stop() 

458 # filename = "signals-data.html" 

459 # full_file_path = os.path.join(Path.home(), filename) 

460 # logger.info(f"Saving profiling to %s", full_file_path) 

461 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

462 # profiling_file.write(profiler.output_html()) 

463 

464 return signals_data 

465 

466 

467@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

468async def export_signals_zip( 

469 file_format: str, 

470 signal_ids: list[str] = Query(default=[]), 

471 min_timestamp: float = None, 

472 max_timestamp: float = None, 

473): 

474 signals_data = SignalsData.get_from_signal_ids( 

475 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

476 ) 

477 zip_data = signals_data.zip_export(file_format) 

478 return Response( 

479 content=zip_data, 

480 media_type="application/zip", 

481 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

482 ) 

483 

484 

485@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

486async def export_signals_hdf5( 

487 signal_ids: list[str] = Query(default=[]), 

488 min_timestamp: float = None, 

489 max_timestamp: float = None, 

490): 

491 signals_data = SignalsData.get_from_signal_ids( 

492 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

493 ) 

494 data = signals_data.hdf5_export() 

495 return Response( 

496 content=data, 

497 media_type="application/hdf5", 

498 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

499 ) 

500 

501 

502@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

503async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

504 return Event.response_from_query(query) 

505 

506 

507@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

508async def get_event(event_id) -> Event: 

509 event = Event.get_from_id(event_id) 

510 if event is None: 

511 raise HTTPException(status_code=404, detail="No such event") 

512 return event 

513 

514 

515@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

516async def get_number_events( 

517 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

518) -> list[TwinPadActivity]: 

519 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

520 

521 

522@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

523async def get_number_commands( 

524 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

525) -> list[TwinPadActivity]: 

526 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

527 

528 

529@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

530async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

531 return EventRule.response_from_query(query) 

532 

533 

534@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

535async def get_event_rule(event_rule_id) -> EventRule: 

536 event_rule = EventRule.get_from_id(event_rule_id) 

537 if event_rule is None: 

538 raise HTTPException(status_code=404, detail="No such event rule") 

539 return event_rule 

540 

541 

542@app.post("/users", status_code=201) 

543async def create_user(user: User): 

544 if User.get_one_by_attribute("email", user.email) is not None: 

545 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

546 hashed_password = get_password_hash(user.password) 

547 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

548 if new_user is None: 

549 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

550 return new_user 

551 

552 

553@app.post("/token", status_code=201) 

554async def login_for_access_token( 

555 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

556) -> Token: 

557 user = authenticate_user(form_data.username, form_data.password) 

558 if not user: 

559 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

560 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

561 if user.is_active: 

562 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

563 access_token = create_access_token( 

564 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

565 ) 

566 return Token(access_token=access_token, token_type="bearer") 

567 

568 

569@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

570async def get_users(): 

571 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

572 

573 

574@app.get("/users/me", response_model=User) 

575async def get_current_user( 

576 current_user: Annotated[User, Depends(get_current_active_user)], 

577): 

578 del current_user.password 

579 return current_user 

580 

581 

582@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

583async def get_user(user_id: str): 

584 user = User.get_from_id(user_id) 

585 

586 if user is None: 

587 raise HTTPException( 

588 status_code=404, 

589 detail="User not found", 

590 ) 

591 return user.to_dict(exclude={"password", "is_connected"}) 

592 

593 

594@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

595async def patch_user(user: UserUpdate, user_id): 

596 if user.password == "" or user.password is None: 

597 del user.password 

598 else: 

599 user.password = get_password_hash(user.password) 

600 return User.update_info(user, user_id) 

601 

602 

603@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

604async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

605 return Command.response_from_query(query) 

606 

607 

608@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

609async def get_campaigns(): 

610 return Campaign.get_all() 

611 

612 

613@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

614async def get_campaign_by_id(campaign_id: str): 

615 campaign = Campaign.get_from_id(campaign_id) 

616 if campaign is None: 

617 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign") 

618 return campaign 

619 

620 

621@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

622async def add_campaign(campaign: Campaign): 

623 new_campaign = Campaign.create(campaign) 

624 if new_campaign is None: 

625 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

626 return new_campaign 

627 

628 

629@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

630async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

631 campaign = Campaign.get_from_id(campaign_id) 

632 if campaign is None: 

633 raise HTTPException(status_code=500, detail="An error occurred during campaign edition") 

634 campaign.name = edit_campaign.name 

635 campaign.description = edit_campaign.description 

636 return Campaign.update(campaign) 

637 

638 

639@app.delete( 

640 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

641) 

642async def delete_campaign(campaign_id: str): 

643 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

644 campaign = Campaign.get_from_id(campaign_id) 

645 if campaign is None: 

646 raise exception 

647 delete_phases = Phase.deleteMany(campaign_id) 

648 if not delete_phases.acknowledged: 

649 raise exception 

650 campaign_deleted = Campaign.delete(campaign_id) 

651 return campaign_deleted.acknowledged 

652 

653 

654@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

655async def get_campaign_phases(campaign_id: str): 

656 return Phase.get_by_attribute("campaign_id", campaign_id) 

657 

658 

659@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

660async def get_phase(phase_id: str): 

661 phase = Phase.get_from_id(phase_id) 

662 if phase is None: 

663 raise HTTPException(status_code=404, detail="Phase not found") 

664 return phase 

665 

666 

667@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

668async def add_phase(phase: Phase): 

669 new_phase = Phase.create(phase) 

670 if new_phase is None: 

671 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

672 return new_phase 

673 

674 

675@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

676async def edit_phase(phase_id, edit_phase: Phase): 

677 phase = Phase.get_from_id(phase_id) 

678 if phase is None: 

679 raise HTTPException(status_code=500, detail="An error occurred during Phase edition") 

680 phase.name = edit_phase.name 

681 phase.description = edit_phase.description 

682 phase.start_at = edit_phase.start_at 

683 phase.end_at = edit_phase.end_at 

684 return Phase.update(phase) 

685 

686 

687@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

688async def delete_phase(phase_id: str): 

689 phase = Phase.get_from_id(phase_id) 

690 if phase is None: 

691 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion") 

692 phase_deleted = Phase.delete(phase_id) 

693 return phase_deleted.acknowledged 

694 

695 

696@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

697async def get_custom_views(): 

698 return CustomView.get_all() 

699 

700 

701@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

702async def get_custom_views_from_user_id(user_id: str): 

703 return CustomView.get_by_attribute("user_id", user_id) 

704 

705 

706@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

707async def get_custom_view(custom_view_id: str): 

708 return CustomView.get_from_id(custom_view_id) 

709 

710 

711@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

712async def create_custom_view( 

713 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

714): 

715 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

716 custom_view.insert() 

717 return custom_view 

718 

719 

720@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

721async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

722 custom_view = CustomView.get_from_id(custom_view_id) 

723 return custom_view.update(custom_view_update.model_dump()) 

724 

725 

726@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

727async def delete_custom_view(custom_view_id: str): 

728 custom_view = CustomView.get_from_id(custom_view_id) 

729 return custom_view.delete() 

730 

731 

732@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

733async def add_video(video: Video): 

734 video.insert() 

735 if not video: 

736 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

737 return video 

738 

739 

740@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

741async def get_videos(): 

742 return Video.get_all() 

743 

744 

745@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

746def get_stream(video_id): 

747 camera_name = Video.get_video(video_id) 

748 if camera_name is None: 

749 raise HTTPException(status_code=404, detail="Camera not found") 

750 return camera_name 

751 

752 

753@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

754async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

755 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

756 

757 

758@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

759async def create_signals_preset( 

760 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

761): 

762 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

763 return new_signals_preset 

764 

765 

766@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

767async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

768 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

769 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

770 

771 

772@app.delete( 

773 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

774) 

775async def delete_signals_preset(signals_preset_id: str): 

776 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

777 return signals_preset.delete() 

778 

779 

780@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

781async def create_graph_theme( 

782 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

783): 

784 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

785 if styled_signal is None: 

786 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

787 

788 graph_theme = PrivateGraphTheme.create( 

789 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

790 ) 

791 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

792 

793 

794@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

795async def get_all_graph_themes( 

796 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

797) -> ListResponse[PublicGraphTheme]: 

798 return PublicGraphTheme.response_from_query(query, current_user.id) 

799 

800 

801@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

802async def get_graph_themes_in_library( 

803 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

804) -> ListResponse[PublicGraphTheme]: 

805 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

806 

807 

808@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

809async def update_graph_theme( 

810 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

811): 

812 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

813 update_dict = theme_update.model_dump(exclude_unset=True) 

814 if current_user.id != graph_theme.creator_id: 

815 for theme_property in update_dict.keys(): 

816 if theme_property not in ["active_for_user", "in_user_library"]: 

817 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

818 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

819 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

820 

821 

822@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

823async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

824 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

825 if current_user.id != graph_theme.creator_id: 

826 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

827 return graph_theme.delete() 

828 

829 

830@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

831async def get_signals_appearances( 

832 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

833) -> dict: 

834 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)