Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / api.py: 100%

472 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-01 11:38 +0000

1import os 

2import logging 

3import time 

4from typing import Annotated 

5from datetime import timedelta 

6from pyinstrument import Profiler 

7 

8from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

9from fastapi.middleware.cors import CORSMiddleware 

10from fastapi.security import OAuth2PasswordRequestForm 

11 

12from twinpad_backend import __version__ 

13from twinpad_backend.db import signal_datasize 

14from twinpad_backend.models import ( 

15 DeviceId, 

16 MongoId, 

17 Signal, 

18 ForcedSignal, 

19 SignalData, 

20 SignalSample, 

21 ServicesStatus, 

22 Device, 

23 DeviceUpdate, 

24 DeviceSetup, 

25 DeviceSetupUpdate, 

26 DeviceState, 

27 SignalUpdate, 

28 SignalsData, 

29 Event, 

30 EventRule, 

31 TwinPadActivity, 

32 User, 

33 UserUpdate, 

34 Campaign, 

35 Phase, 

36 CustomView, 

37 Command, 

38 CustomViewCreation, 

39 CustomViewUpdate, 

40 Video, 

41 SignalsPreset, 

42 SignalsPresetCreation, 

43 SignalsPresetUpdate, 

44 PrivateGraphTheme, 

45 PublicGraphTheme, 

46 GraphThemeCreation, 

47 GraphThemeUpdate, 

48 SINGLE_POST_PROCESSING_FUNCTION, 

49 DOUBLE_POST_PROCESSING_FUNCTION, 

50 MULTIPLE_POST_PROCESSING_FUNCTION, 

51) 

52from twinpad_backend.auth import ( 

53 Token, 

54 authenticate_user, 

55 get_current_active_user, 

56 ACCESS_TOKEN_EXPIRE_MINUTES, 

57 create_access_token, 

58 get_password_hash, 

59) 

60from twinpad_backend.queries import ( 

61 SignalQuery, 

62 ForcedSignalQuery, 

63 DeviceStatesQuery, 

64 EventQuery, 

65 EventRuleQuery, 

66 CommandQuery, 

67 GraphThemeQuery, 

68) 

69from twinpad_backend.responses import ListResponse 

70from twinpad_backend.routes.deployers import router as deployers_router 

71 

72REQUEST_TIME_WARNING = 0.5 

73 

74DEBUG = os.environ.get("DEBUG", "false") == "true" 

75PROFILING = os.environ.get("PROFILING", "false") == "true" 

76DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true" 

77 

78logger = logging.getLogger("uvicorn.error") 

79logger.propagate = False 

80logger.info("Debug mode: %s", DEBUG) 

81logger.info("log level: %s", logging.root.level) 

82 

83 

84app = FastAPI(title="Twinpad backend", version=__version__) 

85 

86app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

87 

88if PROFILING: # pragma: no cover 

89 profiling_folder = "/tmp/twinpad_profiling" 

90 Path(profiling_folder).mkdir(parents=True, exist_ok=True) 

91 logger.info("Profiling enabled") 

92 

93 @app.middleware("http") 

94 async def profile_request(request: Request, call_next): 

95 should_profile = True 

96 url = str(request.url) 

97 for segment in ("profiling", ".ico"): 

98 if segment in url: 

99 should_profile = False 

100 break 

101 

102 if should_profile: # avoid recursion 

103 profiler = Profiler() 

104 profiler.start() 

105 

106 response = await call_next(request) 

107 

108 profiler.stop() 

109 url = "_".join(url.split("/")[3:]).rstrip("/") 

110 if not url: 

111 url = "slash" 

112 filename = url.split("?", maxsplit=1)[0] 

113 logger.info("saving profiling to %s", filename) 

114 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

115 profiling_file.write(profiler.output_html()) 

116 

117 return response 

118 

119 return await call_next(request) 

120 

121 @app.get("/profilings") 

122 async def profilings(): 

123 return {"profilings": os.listdir(profiling_folder)} 

124 

125 @app.get("/profilings/{file_name}") 

126 async def profiling(file_name): 

127 file_path = os.path.join(profiling_folder, file_name) 

128 

129 if not os.path.exists(file_path): 

130 raise HTTPException( 

131 status_code=404, 

132 detail=f"Profiling file '{file_name}' not found", 

133 ) 

134 

135 with open(file_path, "r", encoding="utf-8") as profiling_file: 

136 return Response( 

137 content=profiling_file.read(), 

138 media_type="application/html", 

139 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'}, 

140 ) 

141 

142 

143@app.middleware("http") 

144async def log_request_time(request: Request, call_next): 

145 start_time = time.time() # Record the start time 

146 response = await call_next(request) # Process the request 

147 duration = time.time() - start_time # Calculate the time taken 

148 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

149 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

150 if duration > REQUEST_TIME_WARNING: 

151 logger.warning(message) 

152 else: 

153 logger.info(message) 

154 return response 

155 

156 

157@app.get("/") 

158async def slash(): 

159 return {"twinpad_version": __version__} 

160 

161 

162@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

163async def status(): 

164 """ 

165 Return service healthcheck 

166 """ 

167 return { 

168 "services": ServicesStatus.check(), 

169 } 

170 

171 

172@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

173async def get_devices() -> list[Device]: 

174 return Device.get_all(sort_by="device_id") 

175 

176 

177@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

178async def get_device(device_id) -> Device: 

179 device = Device.get_one_by_attribute("device_id", device_id) 

180 if not device: 

181 raise HTTPException( 

182 status_code=404, 

183 detail="Device not found", 

184 ) 

185 return device 

186 

187 

188@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

189async def update_item( 

190 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

191): 

192 device = Device.get_one_by_attribute("device_id", device_id) 

193 if not device: 

194 raise HTTPException( 

195 status_code=404, 

196 detail="Device not found", 

197 ) 

198 result = await device.change_mode(device_update, current_user) 

199 if result.get("error", False) is True: 

200 raise HTTPException( 

201 status_code=result.get("status_code", 500), 

202 detail=result.get("message", "An error has occurred"), 

203 ) 

204 return result 

205 

206 

207@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

208async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

209 return DeviceState.get_from_id_and_query(device_id, query) 

210 

211 

212@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

213async def get_device_setups() -> list[DeviceSetup]: 

214 return DeviceSetup.get_all() 

215 

216 

217@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

218async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

219 device_setup.insert() 

220 return device_setup 

221 

222 

223@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

224async def get_device_setup(device_setup_id: str): 

225 device_setup = DeviceSetup.get_from_id(device_setup_id) 

226 if device_setup is None: 

227 raise HTTPException( 

228 status_code=404, 

229 detail="Device setup not found", 

230 ) 

231 return device_setup 

232 

233 

234@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

235async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

236 device_setup = DeviceSetup.get_from_id(device_setup_id) 

237 if device_setup is None: 

238 raise HTTPException( 

239 status_code=404, 

240 detail="Device setup not found", 

241 ) 

242 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

243 return device_setup 

244 

245 

246@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

247async def delete_device_setups(device_setup_id: str) -> bool: 

248 device_setup = DeviceSetup.get_from_id(device_setup_id) 

249 if device_setup is None: 

250 raise HTTPException( 

251 status_code=404, 

252 detail="Device setup not found", 

253 ) 

254 deleted = device_setup.delete() 

255 return deleted 

256 

257 

258@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

259async def get_number_samples( 

260 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

261) -> list[TwinPadActivity]: 

262 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

263 

264 

265@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

266async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

267 if "signal_id" not in query.sort_by: 

268 query.sort_by += ",signal_id:1" 

269 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

270 

271 

272@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

273async def signals_names() -> list[str]: 

274 return Signal.get_all_ids() 

275 

276 

277@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

278async def signal_stats(): 

279 """ 

280 Returns signals stats 

281 """ 

282 signal_statuses = Signal.get_all_statuses() 

283 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up") 

284 number_samples = Signal.total_number_samples() 

285 number_signals = Signal.get_number_documents() 

286 

287 return { 

288 "signal_data_size": signal_datasize(), 

289 "number_signal_samples": number_samples, 

290 "number_active_signals": number_active_signals, 

291 "number_signals": number_signals, 

292 } 

293 

294 

295@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

296async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

297 return SignalSample.get_last_from_signal_ids(signal_ids) 

298 

299 

300@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

301async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

302 return SignalSample.get_first_from_signal_ids(signal_ids) 

303 

304 

305@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

306async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

307 return Signal.get_forcibility(signal_ids) 

308 

309 

310@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)]) 

311async def get_forced_signals( 

312 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends() 

313): 

314 if not current_user.is_admin: 

315 raise HTTPException(401) 

316 return ForcedSignal.response_from_query(query) 

317 

318 

319@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

320async def get_signal(signal_id): 

321 signal = Signal.get_from_signal_id(signal_id) 

322 if not signal: 

323 raise HTTPException( 

324 status_code=404, 

325 detail="Signal not found", 

326 ) 

327 return signal.to_dict() 

328 

329 

330@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

331async def update_signal( 

332 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

333): 

334 signal = Signal.get_from_signal_id(signal_id) 

335 if not signal: 

336 raise HTTPException( 

337 status_code=404, 

338 detail="Device not found", 

339 ) 

340 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id) 

341 if forced_signal is not None: 

342 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

343 raise HTTPException( 

344 status_code=403, 

345 detail="Cannot override another user's forcing", 

346 ) 

347 

348 result = await signal.send_command(signal_update, current_user) 

349 if result.get("error", False) is True: 

350 raise HTTPException( 

351 status_code=result.get("status_code", 500), 

352 detail=result.get("message", "An error has occurred"), 

353 ) 

354 

355 if forced_signal is not None and signal_update.forced_value is None: 

356 forced_signal.delete() 

357 elif signal_update.forced_value is not None: 

358 forced_signal = ForcedSignal( 

359 signal_id=signal_id, 

360 forcing_user_id=current_user.id, 

361 forced_at=time.time(), 

362 value=signal_update.forced_value, 

363 ) 

364 forced_signal.insert() 

365 

366 return result 

367 

368 

369@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)]) 

370async def get_signal_forcibility( 

371 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)] 

372) -> bool: 

373 return ForcedSignal.can_force(signal_id, current_user) 

374 

375 

376@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

377async def get_signal_data( 

378 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

379) -> SignalData | None: 

380 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

381 

382 if number_samples_max is not None: 

383 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

384 

385 return signal_data 

386 

387 

388@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

389async def get_last_value(signal_id) -> SignalSample: 

390 sample = SignalSample.get_last_from_signal_id(signal_id) 

391 if sample is None: 

392 raise HTTPException(status_code=404, detail="No data") 

393 return sample 

394 

395 

396@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

397async def get_first_value(signal_id) -> SignalSample: 

398 sample = SignalSample.get_first_from_signal_id(signal_id) 

399 if sample is None: 

400 raise HTTPException(status_code=404, detail="No data") 

401 return sample 

402 

403 

404@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

405async def get_signal_number_samples(signal_id): 

406 signal = Signal.get_from_signal_id(signal_id) 

407 if not signal: 

408 raise HTTPException( 

409 status_code=404, 

410 detail="Signal not found", 

411 ) 

412 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

413 

414 

415@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

416async def get_signals_data( 

417 signal_ids: list[str] = Query(default=[]), 

418 number_samples_max: int = None, 

419 min_timestamp: float = None, 

420 max_timestamp: float = None, 

421 interpolate_bounds: bool = True, 

422) -> SignalsData | None: 

423 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

424 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

425 

426 signals_data = SignalsData.get_from_signal_ids( 

427 signal_ids, 

428 min_timestamp=min_timestamp, 

429 max_timestamp=max_timestamp, 

430 window_min_timestamp=min_timestamp, 

431 window_max_timestamp=max_timestamp, 

432 interpolate_bounds=interpolate_bounds, 

433 ) 

434 if number_samples_max is not None: 

435 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

436 

437 return signals_data 

438 

439 

440@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

441async def get_signals_data_interest_window( 

442 signal_ids: list[str] = Query(default=[]), 

443 window_max_number_samples: int = None, 

444 outside_max_number_samples: int = None, 

445 window_min_timestamp: float = None, 

446 window_max_timestamp: float = None, 

447 min_timestamp: float = None, 

448 max_timestamp: float = None, 

449) -> SignalsData | None: 

450 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

451 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

452 

453 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

454 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

455 

456 max_documents = 0 

457 

458 if window_max_number_samples is not None: 

459 max_documents += 10 * window_max_number_samples 

460 if outside_max_number_samples is not None: 

461 max_documents += 10 * outside_max_number_samples 

462 

463 if max_documents == 0: 

464 max_documents = None 

465 

466 signals_data = SignalsData.get_from_signal_ids( 

467 signal_ids, 

468 min_timestamp=min_timestamp, 

469 max_timestamp=max_timestamp, 

470 window_min_timestamp=window_min_timestamp, 

471 window_max_timestamp=window_max_timestamp, 

472 max_documents=max_documents, 

473 ) 

474 

475 signals_data = signals_data.interest_window_desampling( 

476 window_max_number_samples=window_max_number_samples, 

477 outside_max_number_samples=outside_max_number_samples, 

478 window_min_timestamp=window_min_timestamp, 

479 window_max_timestamp=window_max_timestamp, 

480 ) 

481 

482 return signals_data 

483 

484 

485@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

486async def export_signals_zip( 

487 file_format: str, 

488 signal_ids: list[str] = Query(default=[]), 

489 min_timestamp: float = None, 

490 max_timestamp: float = None, 

491): 

492 signals_data = SignalsData.get_from_signal_ids( 

493 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

494 ) 

495 zip_data = signals_data.zip_export(file_format) 

496 return Response( 

497 content=zip_data, 

498 media_type="application/zip", 

499 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

500 ) 

501 

502 

503@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

504async def export_signals_hdf5( 

505 signal_ids: list[str] = Query(default=[]), 

506 min_timestamp: float = None, 

507 max_timestamp: float = None, 

508): 

509 signals_data = SignalsData.get_from_signal_ids( 

510 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

511 ) 

512 data = signals_data.hdf5_export() 

513 return Response( 

514 content=data, 

515 media_type="application/hdf5", 

516 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

517 ) 

518 

519 

520@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)]) 

521async def get_signals_data_post_processing( 

522 phase_ids: list[str] = Query(default=[]), 

523 phase_sync_times: list[float | None] = Query(default=[]), 

524 signal_ids: list[str] = Query(default=[]), 

525 window_min_timestamps: list[float | None] = Query(default=[]), 

526 window_max_timestamps: list[float | None] = Query(default=[]), 

527 number_samples_max: int = None, 

528) -> SignalsData | None: 

529 if len(phase_sync_times) == 0: 

530 phase_sync_times = [None for _ in range(len(phase_ids))] 

531 if len(window_min_timestamps) == 0: 

532 window_min_timestamps = [None for _ in range(len(phase_ids))] 

533 if len(window_max_timestamps) == 0: 

534 window_max_timestamps = [None for _ in range(len(phase_ids))] 

535 

536 if ( 

537 len(phase_ids) != len(phase_sync_times) 

538 or len(phase_ids) != len(window_min_timestamps) 

539 or len(phase_ids) != len(window_max_timestamps) 

540 ): 

541 raise HTTPException( 

542 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps." 

543 ) 

544 

545 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

546 

547 if None in phases: 

548 raise HTTPException(404, "Phase not found") 

549 

550 signals_data = SignalsData.get_from_phase_and_signal_ids( 

551 phases=phases, 

552 phase_sync_times=phase_sync_times, 

553 signal_ids=signal_ids, 

554 window_min_timestamps=window_min_timestamps, 

555 window_max_timestamps=window_max_timestamps, 

556 ) 

557 

558 if number_samples_max is not None: 

559 signals_data = signals_data.min_max_downsampling(number_samples_max) 

560 

561 return signals_data 

562 

563 

564@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)]) 

565async def apply_single_post_processing_function( 

566 phase_id: str, 

567 base_signal_id: str, 

568 function: SINGLE_POST_PROCESSING_FUNCTION, 

569 phase_sync_time: float = None, 

570 window_min_timestamp: float = None, 

571 window_max_timestamp: float = None, 

572 number_samples_max: int = None, 

573) -> SignalsData | None: 

574 phase = Phase.get_from_id(phase_id) 

575 

576 if phase is None: 

577 raise HTTPException(404, "Phase not found") 

578 if phase_sync_time is None: 

579 phase_sync_time = phase.start_at / 1000 

580 

581 signals_data = await SignalsData.apply_single_function( 

582 phase, 

583 base_signal_id, 

584 function, 

585 window_min_timestamp=window_min_timestamp, 

586 window_max_timestamp=window_max_timestamp, 

587 ) 

588 

589 if signals_data is None: 

590 raise HTTPException(500, "There was en error while applying the function") 

591 

592 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

593 signals_data = signals_data.min_max_downsampling(number_samples_max) 

594 signals_data = signals_data.zero_time_vector(phase_sync_time) 

595 

596 return signals_data 

597 

598 

599@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)]) 

600async def apply_multiple_post_processing_function( 

601 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

602 phase_ids: list[str] = Query(default=[]), 

603 phase_sync_times: list[float] = Query(default=[]), 

604 signal_ids: list[str] = Query(default=[]), 

605 window_min_timestamp: float = None, 

606 window_max_timestamp: float = None, 

607 number_samples_max: int = None, 

608) -> SignalsData | None: 

609 if len(phase_ids) != len(signal_ids): 

610 raise HTTPException(400, "Each selected signal should correspond to a phase") 

611 

612 if len(signal_ids) < 2: 

613 raise HTTPException(400, "These functions can only be applied to multiple signals") 

614 

615 if len(phase_ids) != len(phase_sync_times) and len(phase_sync_times) != 0: 

616 raise HTTPException(400, "Number of synchronization times does not match the number of phases") 

617 

618 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

619 if None in phases: 

620 raise HTTPException(404, "Phase not found") 

621 

622 if len(phase_sync_times) == 0: 

623 phase_sync_times = [phase.start_at / 1000 for phase in phases] 

624 

625 signals_data = await SignalsData.apply_multiple_function( 

626 phases, 

627 signal_ids, 

628 function, 

629 window_min_timestamp=window_min_timestamp, 

630 window_max_timestamp=window_max_timestamp, 

631 ) 

632 

633 if signals_data is None: 

634 raise HTTPException(500, "There was en error while applying the function") 

635 

636 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

637 signals_data = signals_data.min_max_downsampling(number_samples_max) 

638 if function in {"Align-X", "Using-X"}: 

639 signals_data = signals_data.zero_time_vector(phase_sync_times[1]) 

640 else: 

641 signals_data = signals_data.zero_time_vector(phase_sync_times[0]) 

642 

643 return signals_data 

644 

645 

646@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)]) 

647async def export_post_processing_zip( 

648 file_format: str, 

649 phase_ids: list[MongoId] = Query(default=[]), 

650 phase_sync_times: list[float | None] = Query(default=[]), 

651 signal_ids: list[str] = Query(default=[]), 

652 window_min_timestamps: list[float | None] = Query(default=[]), 

653 window_max_timestamps: list[float | None] = Query(default=[]), 

654): 

655 signals_data = await get_signals_data_post_processing( 

656 phase_ids, 

657 phase_sync_times, 

658 signal_ids, 

659 window_min_timestamps, 

660 window_max_timestamps, 

661 ) 

662 

663 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids) 

664 

665 return Response( 

666 content=zip_data, 

667 media_type="application/zip", 

668 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

669 ) 

670 

671 

672@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

673async def export_post_processing_hdf5( 

674 phase_ids: list[str] = Query(default=[]), 

675 phase_sync_times: list[float | None] = Query(default=[]), 

676 signal_ids: list[str] = Query(default=[]), 

677 window_min_timestamps: list[float | None] = Query(default=[]), 

678 window_max_timestamps: list[float | None] = Query(default=[]), 

679): 

680 signals_data = await get_signals_data_post_processing( 

681 phase_ids, 

682 phase_sync_times, 

683 signal_ids, 

684 window_min_timestamps, 

685 window_max_timestamps, 

686 ) 

687 

688 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids) 

689 

690 return Response( 

691 content=data, 

692 media_type="application/hdf5", 

693 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

694 ) 

695 

696 

697@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

698async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

699 return Event.response_from_query(query) 

700 

701 

702@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

703async def get_event(event_id) -> Event: 

704 event = Event.get_from_id(event_id) 

705 if event is None: 

706 raise HTTPException(status_code=404, detail="No such event") 

707 return event 

708 

709 

710@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

711async def get_number_events( 

712 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

713) -> list[TwinPadActivity]: 

714 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

715 

716 

717@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

718async def get_number_commands( 

719 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

720) -> list[TwinPadActivity]: 

721 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

722 

723 

724@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

725async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

726 return EventRule.response_from_query(query) 

727 

728 

729@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

730async def get_event_rule(event_rule_id) -> EventRule: 

731 event_rule = EventRule.get_from_id(event_rule_id) 

732 if event_rule is None: 

733 raise HTTPException(status_code=404, detail="No such event rule") 

734 return event_rule 

735 

736 

737@app.post("/users", status_code=201) 

738async def create_user(user: User): 

739 if User.get_one_by_attribute("email", user.email) is not None: 

740 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

741 hashed_password = get_password_hash(user.password) 

742 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

743 if new_user is None: # pragma: no cover 

744 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

745 return new_user 

746 

747 

748@app.post("/token", status_code=201) 

749async def login_for_access_token( 

750 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

751) -> Token: 

752 user = authenticate_user(form_data.username, form_data.password) 

753 if not user: 

754 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

755 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

756 if user.is_blocked: 

757 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

758 access_token = create_access_token( 

759 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

760 ) 

761 return Token(access_token=access_token, token_type="bearer") 

762 

763 

764@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

765async def get_users(): 

766 return [u.to_dict() for u in User.get_all(sort_by="email")] 

767 

768 

769@app.get("/users/me") 

770async def get_current_user( 

771 current_user: Annotated[User, Depends(get_current_active_user)], 

772): 

773 return current_user.to_dict() 

774 

775 

776@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

777async def get_user(user_id: str): 

778 user = User.get_from_id(user_id) 

779 

780 if user is None: 

781 raise HTTPException( 

782 status_code=404, 

783 detail="User not found", 

784 ) 

785 return user.to_dict(exclude={"password", "is_connected"}) 

786 

787 

788@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

789async def patch_user(user: UserUpdate, user_id): 

790 if user.password == "" or user.password is None: 

791 del user.password 

792 else: 

793 user.password = get_password_hash(user.password) 

794 return User.update_info(user, user_id).to_dict() 

795 

796 

797@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

798async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

799 return Command.response_from_query(query) 

800 

801 

802@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

803async def get_campaigns(): 

804 return Campaign.get_all() 

805 

806 

807@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

808async def get_campaign_by_id(campaign_id: str): 

809 campaign = Campaign.get_from_id(campaign_id) 

810 if campaign is None: 

811 raise HTTPException(status_code=404, detail="Campaign not found") 

812 return campaign 

813 

814 

815@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

816async def add_campaign(campaign: Campaign): 

817 campaign_id = campaign.insert() 

818 if campaign_id is None: # pragma: no cover 

819 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

820 return campaign 

821 

822 

823@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

824async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

825 campaign = Campaign.get_from_id(campaign_id) 

826 if campaign is None: 

827 raise HTTPException(status_code=404, detail="Campaign not found") 

828 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json")) 

829 return campaign 

830 

831 

832@app.delete( 

833 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

834) 

835async def delete_campaign(campaign_id: str): 

836 campaign = Campaign.get_from_id(campaign_id) 

837 if campaign is None: 

838 raise HTTPException(status_code=404, detail="Campaign not found") 

839 delete_phases = Phase.deleteMany(campaign_id) 

840 if not delete_phases.acknowledged: # pragma: no cover 

841 raise HTTPException(status_code=500, detail="An error occurred during phases deletion") 

842 campaign_deleted = campaign.delete() 

843 if not campaign_deleted: # pragma: no cover 

844 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

845 return True 

846 

847 

848@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

849async def get_campaign_phases(campaign_id: str): 

850 return Phase.get_by_attribute("campaign_id", campaign_id) 

851 

852 

853@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

854async def get_phase(phase_id: str): 

855 phase = Phase.get_from_id(phase_id) 

856 if phase is None: 

857 raise HTTPException(status_code=404, detail="Phase not found") 

858 return phase 

859 

860 

861@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

862async def add_phase(phase: Phase): 

863 phase_id = phase.insert() 

864 if phase_id is None: # pragma: no cover 

865 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

866 return phase 

867 

868 

869@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

870async def edit_phase(phase_id, edit_phase: Phase): 

871 phase = Phase.get_from_id(phase_id) 

872 if phase is None: 

873 raise HTTPException(status_code=404, detail="Phase does not exists") 

874 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json")) 

875 return phase 

876 

877 

878@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

879async def delete_phase(phase_id: str): 

880 phase = Phase.get_from_id(phase_id) 

881 if phase is None: 

882 raise HTTPException(status_code=404, detail="Phase not found") 

883 deleted = phase.delete() 

884 if not deleted: # pragma: no cover 

885 raise HTTPException(status_code=500, detail="An error occurred during phase deletion") 

886 return True 

887 

888 

889@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

890async def get_custom_views(): 

891 return CustomView.get_all() 

892 

893 

894@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

895async def get_custom_views_from_user_id(user_id: str): 

896 return CustomView.get_by_attribute("user_id", user_id) 

897 

898 

899@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

900async def get_custom_view(custom_view_id: str): 

901 return CustomView.get_from_id(custom_view_id) 

902 

903 

904@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

905async def create_custom_view( 

906 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

907): 

908 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

909 custom_view.insert() 

910 return custom_view 

911 

912 

913@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

914async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

915 custom_view = CustomView.get_from_id(custom_view_id) 

916 return custom_view.update(custom_view_update.model_dump()) 

917 

918 

919@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

920async def delete_custom_view(custom_view_id: str): 

921 custom_view = CustomView.get_from_id(custom_view_id) 

922 return custom_view.delete() 

923 

924 

925@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

926async def add_video(video: Video): 

927 video.insert() 

928 if not video: # pragma: no cover 

929 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

930 return video 

931 

932 

933@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

934async def get_videos(): 

935 return Video.get_all() 

936 

937 

938@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

939def get_stream(video_id): 

940 camera_name = Video.get_video(video_id) 

941 if camera_name is None: 

942 raise HTTPException(status_code=404, detail="Camera not found") 

943 return camera_name 

944 

945 

946@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

947async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

948 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

949 

950 

951@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

952async def create_signals_preset( 

953 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

954): 

955 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

956 return new_signals_preset 

957 

958 

959@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

960async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

961 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

962 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

963 

964 

965@app.delete( 

966 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

967) 

968async def delete_signals_preset(signals_preset_id: str): 

969 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

970 return signals_preset.delete() 

971 

972 

973@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

974async def create_graph_theme( 

975 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

976): 

977 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

978 if styled_signal is None: 

979 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

980 

981 graph_theme = PrivateGraphTheme.create( 

982 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

983 ) 

984 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

985 

986 

987@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

988async def get_all_graph_themes( 

989 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

990) -> ListResponse[PublicGraphTheme]: 

991 return PublicGraphTheme.response_from_query(query, current_user.id) 

992 

993 

994@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

995async def get_graph_themes_in_library( 

996 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

997) -> ListResponse[PublicGraphTheme]: 

998 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

999 

1000 

1001@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

1002async def update_graph_theme( 

1003 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

1004): 

1005 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

1006 update_dict = theme_update.model_dump(exclude_unset=True) 

1007 if current_user.id != graph_theme.creator_id: 

1008 for theme_property in update_dict.keys(): 

1009 if theme_property not in ["active_for_user", "in_user_library"]: 

1010 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

1011 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

1012 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

1013 

1014 

1015@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

1016async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

1017 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

1018 if current_user.id != graph_theme.creator_id: 

1019 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

1020 return graph_theme.delete() 

1021 

1022 

1023@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

1024async def get_signals_appearances( 

1025 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

1026) -> dict: 

1027 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id) 

1028 

1029 

1030if DEVICE_DEPLOYERS: 

1031 app.include_router(deployers_router, prefix="/device-deployers")