Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / api.py: 95%

553 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 12:53 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 DeviceDeployerUpdate, 

19 DeviceFromDeployer, 

20 DeviceFromDeployerCreation, 

21 DeviceId, 

22 DeviceUpdateFromDeployer, 

23 MongoId, 

24 Signal, 

25 ForcedSignal, 

26 SignalData, 

27 SignalSample, 

28 ServicesStatus, 

29 Device, 

30 DeviceUpdate, 

31 DeviceSetup, 

32 DeviceSetupUpdate, 

33 DeviceState, 

34 SignalUpdate, 

35 SignalsData, 

36 Event, 

37 EventRule, 

38 TwinPadActivity, 

39 User, 

40 UserUpdate, 

41 Campaign, 

42 Phase, 

43 CustomView, 

44 Command, 

45 CustomViewCreation, 

46 CustomViewUpdate, 

47 Video, 

48 SignalsPreset, 

49 SignalsPresetCreation, 

50 SignalsPresetUpdate, 

51 PrivateGraphTheme, 

52 PublicGraphTheme, 

53 GraphThemeCreation, 

54 GraphThemeUpdate, 

55 DeviceDeployer, 

56 SINGLE_POST_PROCESSING_FUNCTION, 

57 DOUBLE_POST_PROCESSING_FUNCTION, 

58 MULTIPLE_POST_PROCESSING_FUNCTION, 

59) 

60from twinpad_backend.auth import ( 

61 Token, 

62 authenticate_user, 

63 get_current_active_user, 

64 ACCESS_TOKEN_EXPIRE_MINUTES, 

65 create_access_token, 

66 get_password_hash, 

67) 

68from twinpad_backend.queries import ( 

69 SignalQuery, 

70 ForcedSignalQuery, 

71 DeviceStatesQuery, 

72 EventQuery, 

73 EventRuleQuery, 

74 CommandQuery, 

75 GraphThemeQuery, 

76) 

77from twinpad_backend.responses import ListResponse 

78 

79REQUEST_TIME_WARNING = 0.5 

80 

81DEBUG = os.environ.get("DEBUG", "false") == "true" 

82PROFILING = os.environ.get("PROFILING", "false") == "true" 

83 

84logger = logging.getLogger("uvicorn.error") 

85logger.propagate = False 

86logger.info("Debug mode: %s", DEBUG) 

87logger.info("log level: %s", logging.root.level) 

88 

89 

90app = FastAPI(title="Twinpad backend", version=__version__) 

91 

92app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

93 

94if PROFILING: # pragma: no cover 

95 profiling_folder = mkdtemp() 

96 logger.info("Profiling enabled") 

97 

98 @app.middleware("http") 

99 async def profile_request(request: Request, call_next): 

100 should_profile = True 

101 url = str(request.url) 

102 for segment in ("profiling", ".ico"): 

103 if segment in url: 

104 should_profile = False 

105 break 

106 

107 if should_profile: # avoid recursion 

108 profiler = Profiler() 

109 profiler.start() 

110 

111 response = await call_next(request) 

112 

113 profiler.stop() 

114 url = "_".join(url.split("/")[3:]).rstrip("/") 

115 if not url: 

116 url = "slash" 

117 filename = url.split("?", maxsplit=1)[0] 

118 logger.info("saving profiling to %s", filename) 

119 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

120 profiling_file.write(profiler.output_html()) 

121 

122 return response 

123 

124 return await call_next(request) 

125 

126 @app.get("/profilings") 

127 async def profilings(): 

128 return {"profilings": os.listdir(profiling_folder)} 

129 

130 @app.get("/profilings/{file_name}") 

131 async def profiling(file_name): 

132 file_path = os.path.join(profiling_folder, file_name) 

133 

134 if not os.path.exists(file_path): 

135 raise HTTPException( 

136 status_code=404, 

137 detail=f"Profiling file '{file_name}' not found", 

138 ) 

139 

140 with open(file_path, "r", encoding="utf-8") as profiling_file: 

141 return Response( 

142 content=profiling_file.read(), 

143 media_type="application/html", 

144 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'}, 

145 ) 

146 

147 

148@app.middleware("http") 

149async def log_request_time(request: Request, call_next): 

150 start_time = time.time() # Record the start time 

151 response = await call_next(request) # Process the request 

152 duration = time.time() - start_time # Calculate the time taken 

153 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

154 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

155 if duration > REQUEST_TIME_WARNING: 

156 logger.warning(message) 

157 else: 

158 logger.info(message) 

159 return response 

160 

161 

162@app.get("/") 

163async def slash(): 

164 return {"twinpad_version": __version__} 

165 

166 

167@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

168async def status(): 

169 """ 

170 Return service healthcheck 

171 """ 

172 return { 

173 "services": ServicesStatus.check(), 

174 } 

175 

176 

177@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

178async def get_devices() -> list[Device]: 

179 return Device.get_all(sort_by="device_id") 

180 

181 

182@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

183async def get_device(device_id) -> Device: 

184 device = Device.get_one_by_attribute("device_id", device_id) 

185 if not device: 

186 raise HTTPException( 

187 status_code=404, 

188 detail="Device not found", 

189 ) 

190 return device 

191 

192 

193@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

194async def update_item( 

195 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

196): 

197 device = Device.get_one_by_attribute("device_id", device_id) 

198 if not device: 

199 raise HTTPException( 

200 status_code=404, 

201 detail="Device not found", 

202 ) 

203 result = await device.change_mode(device_update, current_user) 

204 if result.get("error", False) is True: 

205 raise HTTPException( 

206 status_code=result.get("status_code", 500), 

207 detail=result.get("message", "An error has occurred"), 

208 ) 

209 return result 

210 

211 

212@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

213async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

214 return DeviceState.get_from_id_and_query(device_id, query) 

215 

216 

217@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

218async def get_device_setups() -> list[DeviceSetup]: 

219 return DeviceSetup.get_all() 

220 

221 

222@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

223async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

224 device_setup.insert() 

225 return device_setup 

226 

227 

228@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

229async def get_device_setup(device_setup_id: str): 

230 device_setup = DeviceSetup.get_from_id(device_setup_id) 

231 if device_setup is None: 

232 raise HTTPException( 

233 status_code=404, 

234 detail="Device setup not found", 

235 ) 

236 return device_setup 

237 

238 

239@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

240async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

241 device_setup = DeviceSetup.get_from_id(device_setup_id) 

242 if device_setup is None: 

243 raise HTTPException( 

244 status_code=404, 

245 detail="Device setup not found", 

246 ) 

247 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

248 return device_setup 

249 

250 

251@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

252async def delete_device_setups(device_setup_id: str) -> bool: 

253 device_setup = DeviceSetup.get_from_id(device_setup_id) 

254 if device_setup is None: 

255 raise HTTPException( 

256 status_code=404, 

257 detail="Device setup not found", 

258 ) 

259 deleted = device_setup.delete() 

260 return deleted 

261 

262 

263@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

264async def get_number_samples( 

265 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

266) -> list[TwinPadActivity]: 

267 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

268 

269 

270@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

271async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

272 if "signal_id" not in query.sort_by: 

273 query.sort_by += ",signal_id:1" 

274 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

275 

276 

277@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

278async def signals_names() -> list[str]: 

279 return Signal.get_all_ids() 

280 

281 

282@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

283async def signal_stats(): 

284 """ 

285 Returns signals stats 

286 """ 

287 signal_statuses = Signal.get_all_statuses() 

288 signal_ids = [signal["signal_id"] for signal in signal_statuses] 

289 

290 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids) 

291 number_samples = sum(number_samples_by_signal_id.values()) 

292 

293 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up") 

294 

295 number_signals = Signal.get_number_documents() 

296 

297 return { 

298 "signal_data_size": signal_datasize(), 

299 "number_signal_samples": number_samples, 

300 "number_active_signals": number_active_signals, 

301 "number_signals": number_signals, 

302 } 

303 

304 

305@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

306async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

307 return SignalSample.get_last_from_signal_ids(signal_ids) 

308 

309 

310@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

311async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

312 return SignalSample.get_first_from_signal_ids(signal_ids) 

313 

314 

315@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

316async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

317 return Signal.get_forcibility(signal_ids) 

318 

319 

320@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)]) 

321async def get_forced_signals( 

322 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends() 

323): 

324 if not current_user.is_admin: 

325 raise HTTPException(401) 

326 return ForcedSignal.response_from_query(query) 

327 

328 

329@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

330async def get_signal(signal_id): 

331 signal = Signal.get_from_signal_id(signal_id) 

332 if not signal: 

333 raise HTTPException( 

334 status_code=404, 

335 detail="Signal not found", 

336 ) 

337 return signal.to_dict() 

338 

339 

340@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

341async def update_signal( 

342 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

343): 

344 signal = Signal.get_from_signal_id(signal_id) 

345 if not signal: 

346 raise HTTPException( 

347 status_code=404, 

348 detail="Device not found", 

349 ) 

350 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id) 

351 if forced_signal is not None: 

352 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

353 raise HTTPException( 

354 status_code=403, 

355 detail="Cannot override another user's forcing", 

356 ) 

357 

358 result = await signal.send_command(signal_update, current_user) 

359 if result.get("error", False) is True: 

360 raise HTTPException( 

361 status_code=result.get("status_code", 500), 

362 detail=result.get("message", "An error has occurred"), 

363 ) 

364 

365 if forced_signal is not None and signal_update.forced_value is None: 

366 forced_signal.delete() 

367 elif signal_update.forced_value is not None: 

368 forced_signal = ForcedSignal( 

369 signal_id=signal_id, 

370 forcing_user_id=current_user.id, 

371 forced_at=time.time(), 

372 value=signal_update.forced_value, 

373 ) 

374 forced_signal.insert() 

375 

376 return result 

377 

378 

379@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)]) 

380async def get_signal_forcibility( 

381 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)] 

382) -> bool: 

383 return ForcedSignal.can_force(signal_id, current_user) 

384 

385 

386@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

387async def get_signal_data( 

388 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

389) -> SignalData | None: 

390 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

391 

392 if number_samples_max is not None: 

393 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

394 

395 return signal_data 

396 

397 

398@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

399async def get_last_value(signal_id) -> SignalSample: 

400 sample = SignalSample.get_last_from_signal_id(signal_id) 

401 if sample is None: 

402 raise HTTPException(status_code=404, detail="No data") 

403 return sample 

404 

405 

406@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

407async def get_first_value(signal_id) -> SignalSample: 

408 sample = SignalSample.get_first_from_signal_id(signal_id) 

409 if sample is None: 

410 raise HTTPException(status_code=404, detail="No data") 

411 return sample 

412 

413 

414@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

415async def get_signal_number_samples(signal_id): 

416 signal = Signal.get_from_signal_id(signal_id) 

417 if not signal: 

418 raise HTTPException( 

419 status_code=404, 

420 detail="Device not found", 

421 ) 

422 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

423 

424 

425@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

426async def get_signals_data( 

427 signal_ids: list[str] = Query(default=[]), 

428 number_samples_max: int = None, 

429 min_timestamp: float = None, 

430 max_timestamp: float = None, 

431 interpolate_bounds: bool = True, 

432) -> SignalsData | None: 

433 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

434 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

435 

436 signals_data = SignalsData.get_from_signal_ids( 

437 signal_ids, 

438 min_timestamp=min_timestamp, 

439 max_timestamp=max_timestamp, 

440 window_min_timestamp=min_timestamp, 

441 window_max_timestamp=max_timestamp, 

442 interpolate_bounds=interpolate_bounds, 

443 ) 

444 if number_samples_max is not None: 

445 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

446 

447 return signals_data 

448 

449 

450@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

451async def get_signals_data_interest_window( 

452 window_max_number_samples: int | None = None, 

453 outside_max_number_samples: int | None = None, 

454 window_min_timestamp: float = None, 

455 window_max_timestamp: float = None, 

456 signal_ids: list[str] = Query(default=[]), 

457 min_timestamp: float = None, 

458 max_timestamp: float = None, 

459) -> SignalsData | None: 

460 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

461 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

462 

463 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

464 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

465 

466 max_documents = 0 

467 

468 if window_max_number_samples is not None: 

469 max_documents += 10 * window_max_number_samples 

470 if outside_max_number_samples is not None: 

471 max_documents += 10 * outside_max_number_samples 

472 

473 if max_documents == 0: 

474 max_documents = None 

475 

476 signals_data = SignalsData.get_from_signal_ids( 

477 signal_ids, 

478 min_timestamp=min_timestamp, 

479 max_timestamp=max_timestamp, 

480 window_min_timestamp=window_min_timestamp, 

481 window_max_timestamp=window_max_timestamp, 

482 max_documents=max_documents, 

483 ) 

484 

485 signals_data = signals_data.interest_window_desampling( 

486 window_max_number_samples=window_max_number_samples, 

487 outside_max_number_samples=outside_max_number_samples, 

488 window_min_timestamp=window_min_timestamp, 

489 window_max_timestamp=window_max_timestamp, 

490 ) 

491 

492 return signals_data 

493 

494 

495@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

496async def export_signals_zip( 

497 file_format: str, 

498 signal_ids: list[str] = Query(default=[]), 

499 min_timestamp: float = None, 

500 max_timestamp: float = None, 

501): 

502 signals_data = SignalsData.get_from_signal_ids( 

503 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

504 ) 

505 zip_data = signals_data.zip_export(file_format) 

506 return Response( 

507 content=zip_data, 

508 media_type="application/zip", 

509 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

510 ) 

511 

512 

513@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

514async def export_signals_hdf5( 

515 signal_ids: list[str] = Query(default=[]), 

516 min_timestamp: float = None, 

517 max_timestamp: float = None, 

518): 

519 signals_data = SignalsData.get_from_signal_ids( 

520 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

521 ) 

522 data = signals_data.hdf5_export() 

523 return Response( 

524 content=data, 

525 media_type="application/hdf5", 

526 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

527 ) 

528 

529 

530@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)]) 

531async def get_signals_data_post_processing( 

532 phase_ids: list[str] = Query(default=[]), 

533 phase_sync_times: list[float | None] = Query(default=[]), 

534 signal_ids: list[str] = Query(default=[]), 

535 window_min_timestamps: list[float | None] = Query(default=[]), 

536 window_max_timestamps: list[float | None] = Query(default=[]), 

537 number_samples_max: int = None, 

538) -> SignalsData | None: 

539 if len(phase_sync_times) == 0: 

540 phase_sync_times = [None for _ in range(len(phase_ids))] 

541 if len(window_min_timestamps) == 0: 

542 window_min_timestamps = [None for _ in range(len(phase_ids))] 

543 if len(window_max_timestamps) == 0: 

544 window_max_timestamps = [None for _ in range(len(phase_ids))] 

545 

546 if ( 

547 len(phase_ids) != len(phase_sync_times) 

548 or len(phase_ids) != len(window_min_timestamps) 

549 or len(phase_ids) != len(window_max_timestamps) 

550 ): 

551 raise HTTPException( 

552 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps." 

553 ) 

554 

555 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

556 

557 if None in phases: 

558 raise HTTPException(404, "Phase not found") 

559 

560 signals_data = SignalsData.get_from_phase_and_signal_ids( 

561 phases=phases, 

562 phase_sync_times=phase_sync_times, 

563 signal_ids=signal_ids, 

564 window_min_timestamps=window_min_timestamps, 

565 window_max_timestamps=window_max_timestamps, 

566 ) 

567 

568 if number_samples_max is not None: 

569 signals_data = signals_data.min_max_downsampling(number_samples_max) 

570 

571 return signals_data 

572 

573 

574@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)]) 

575async def apply_single_post_processing_function( 

576 phase_id: str, 

577 base_signal_id: str, 

578 function: SINGLE_POST_PROCESSING_FUNCTION, 

579 phase_sync_time: float = None, 

580 window_min_timestamp: float = None, 

581 window_max_timestamp: float = None, 

582 number_samples_max: int = None, 

583) -> SignalsData | None: 

584 phase = Phase.get_from_id(phase_id) 

585 

586 if phase is None: 

587 raise HTTPException(404, "Phase not found") 

588 if phase_sync_time is None: 

589 phase_sync_time = phase.start_at / 1000 

590 

591 signals_data = await SignalsData.apply_single_function( 

592 phase, 

593 base_signal_id, 

594 function, 

595 window_min_timestamp=window_min_timestamp, 

596 window_max_timestamp=window_max_timestamp, 

597 ) 

598 

599 if signals_data is None: 

600 raise HTTPException(500, "There was en error while applying the function") 

601 

602 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

603 signals_data = signals_data.min_max_downsampling(number_samples_max) 

604 signals_data = signals_data.zero_time_vector(phase_sync_time) 

605 

606 return signals_data 

607 

608 

609@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)]) 

610async def apply_multiple_post_processing_function( 

611 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

612 phase_ids: list[str] = Query(default=[]), 

613 phase_sync_times: list[float] = Query(default=[]), 

614 signal_ids: list[str] = Query(default=[]), 

615 window_min_timestamp: float = None, 

616 window_max_timestamp: float = None, 

617 number_samples_max: int = None, 

618) -> SignalsData | None: 

619 if len(phase_ids) != len(signal_ids): 

620 raise HTTPException(400, "Each selected signal should correspond to a phase") 

621 

622 if len(phase_ids) < 2: 

623 raise HTTPException(400, "These functions can only be applied to multiple signals") 

624 

625 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

626 if None in phases: 

627 raise HTTPException(404, "Phase not found") 

628 

629 if len(phase_sync_times) == 0: 

630 phase_sync_times = [phase.start_at / 1000 for phase in phases] 

631 if len(phases) != len(phase_sync_times): 

632 raise HTTPException(400, "Number of synchronization times does not match the number of phases") 

633 

634 signals_data = await SignalsData.apply_multiple_function( 

635 phases, 

636 signal_ids, 

637 function, 

638 window_min_timestamp=window_min_timestamp, 

639 window_max_timestamp=window_max_timestamp, 

640 ) 

641 

642 if signals_data is None: 

643 raise HTTPException(500, "There was en error while applying the function") 

644 

645 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

646 signals_data = signals_data.min_max_downsampling(number_samples_max) 

647 if function in {"Align-X", "Using-X"}: 

648 signals_data = signals_data.zero_time_vector(phase_sync_times[1]) 

649 else: 

650 signals_data = signals_data.zero_time_vector(phase_sync_times[0]) 

651 

652 return signals_data 

653 

654 

655@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)]) 

656async def export_post_processing_zip( 

657 file_format: str, 

658 phase_ids: list[MongoId] = Query(default=[]), 

659 phase_sync_times: list[float | None] = Query(default=[]), 

660 signal_ids: list[str] = Query(default=[]), 

661 window_min_timestamps: list[float | None] = Query(default=[]), 

662 window_max_timestamps: list[float | None] = Query(default=[]), 

663): 

664 signals_data = await get_signals_data_post_processing( 

665 phase_ids, 

666 phase_sync_times, 

667 signal_ids, 

668 window_min_timestamps, 

669 window_max_timestamps, 

670 ) 

671 

672 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids) 

673 

674 return Response( 

675 content=zip_data, 

676 media_type="application/zip", 

677 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

678 ) 

679 

680 

681@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

682async def export_post_processing_hdf5( 

683 phase_ids: list[str] = Query(default=[]), 

684 phase_sync_times: list[float | None] = Query(default=[]), 

685 signal_ids: list[str] = Query(default=[]), 

686 window_min_timestamps: list[float | None] = Query(default=[]), 

687 window_max_timestamps: list[float | None] = Query(default=[]), 

688): 

689 signals_data = await get_signals_data_post_processing( 

690 phase_ids, 

691 phase_sync_times, 

692 signal_ids, 

693 window_min_timestamps, 

694 window_max_timestamps, 

695 ) 

696 

697 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids) 

698 

699 return Response( 

700 content=data, 

701 media_type="application/hdf5", 

702 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

703 ) 

704 

705 

706@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

707async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

708 return Event.response_from_query(query) 

709 

710 

711@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

712async def get_event(event_id) -> Event: 

713 event = Event.get_from_id(event_id) 

714 if event is None: 

715 raise HTTPException(status_code=404, detail="No such event") 

716 return event 

717 

718 

719@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

720async def get_number_events( 

721 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

722) -> list[TwinPadActivity]: 

723 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

724 

725 

726@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

727async def get_number_commands( 

728 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

729) -> list[TwinPadActivity]: 

730 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

731 

732 

733@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

734async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

735 return EventRule.response_from_query(query) 

736 

737 

738@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

739async def get_event_rule(event_rule_id) -> EventRule: 

740 event_rule = EventRule.get_from_id(event_rule_id) 

741 if event_rule is None: 

742 raise HTTPException(status_code=404, detail="No such event rule") 

743 return event_rule 

744 

745 

746@app.post("/users", status_code=201) 

747async def create_user(user: User): 

748 if User.get_one_by_attribute("email", user.email) is not None: 

749 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

750 hashed_password = get_password_hash(user.password) 

751 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

752 if new_user is None: 

753 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

754 return new_user 

755 

756 

757@app.post("/token", status_code=201) 

758async def login_for_access_token( 

759 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

760) -> Token: 

761 user = authenticate_user(form_data.username, form_data.password) 

762 if not user: 

763 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

764 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

765 if user.is_active: 

766 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

767 access_token = create_access_token( 

768 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

769 ) 

770 return Token(access_token=access_token, token_type="bearer") 

771 

772 

773@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

774async def get_users(): 

775 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

776 

777 

778@app.get("/users/me", response_model=User) 

779async def get_current_user( 

780 current_user: Annotated[User, Depends(get_current_active_user)], 

781): 

782 del current_user.password 

783 return current_user 

784 

785 

786@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

787async def get_user(user_id: str): 

788 user = User.get_from_id(user_id) 

789 

790 if user is None: 

791 raise HTTPException( 

792 status_code=404, 

793 detail="User not found", 

794 ) 

795 return user.to_dict(exclude={"password", "is_connected"}) 

796 

797 

798@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

799async def patch_user(user: UserUpdate, user_id): 

800 if user.password == "" or user.password is None: 

801 del user.password 

802 else: 

803 user.password = get_password_hash(user.password) 

804 return User.update_info(user, user_id) 

805 

806 

807@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

808async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

809 return Command.response_from_query(query) 

810 

811 

812@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

813async def get_campaigns(): 

814 return Campaign.get_all() 

815 

816 

817@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

818async def get_campaign_by_id(campaign_id: str): 

819 campaign = Campaign.get_from_id(campaign_id) 

820 if campaign is None: 

821 raise HTTPException(status_code=404, detail="Campaign not found") 

822 return campaign 

823 

824 

825@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

826async def add_campaign(campaign: Campaign): 

827 campaign_id = campaign.insert() 

828 if campaign_id is None: 

829 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

830 return campaign 

831 

832 

833@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

834async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

835 campaign = Campaign.get_from_id(campaign_id) 

836 if campaign is None: 

837 raise HTTPException(status_code=404, detail="Campaign not found") 

838 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json")) 

839 return campaign 

840 

841 

842@app.delete( 

843 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

844) 

845async def delete_campaign(campaign_id: str): 

846 campaign = Campaign.get_from_id(campaign_id) 

847 if campaign is None: 

848 raise HTTPException(status_code=404, detail="Campaign not found") 

849 delete_phases = Phase.deleteMany(campaign_id) 

850 if not delete_phases.acknowledged: 

851 raise HTTPException(status_code=500, detail="An error occurred during phases deletion") 

852 campaign_deleted = campaign.delete() 

853 if not campaign_deleted: 

854 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

855 return True 

856 

857 

858@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

859async def get_campaign_phases(campaign_id: str): 

860 return Phase.get_by_attribute("campaign_id", campaign_id) 

861 

862 

863@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

864async def get_phase(phase_id: str): 

865 phase = Phase.get_from_id(phase_id) 

866 if phase is None: 

867 raise HTTPException(status_code=404, detail="Phase not found") 

868 return phase 

869 

870 

871@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

872async def add_phase(phase: Phase): 

873 phase_id = phase.insert() 

874 if phase_id is None: 

875 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

876 return phase 

877 

878 

879@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

880async def edit_phase(phase_id, edit_phase: Phase): 

881 phase = Phase.get_from_id(phase_id) 

882 if phase is None: 

883 raise HTTPException(status_code=404, detail="Phase does not exists") 

884 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json")) 

885 return phase 

886 

887 

888@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

889async def delete_phase(phase_id: str): 

890 phase = Phase.get_from_id(phase_id) 

891 if phase is None: 

892 raise HTTPException(status_code=404, detail="Phase not found") 

893 deleted = phase.delete() 

894 if not deleted: 

895 raise HTTPException(status_code=500, detail="An error occurred during phase deletion") 

896 return True 

897 

898 

899@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

900async def get_custom_views(): 

901 return CustomView.get_all() 

902 

903 

904@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

905async def get_custom_views_from_user_id(user_id: str): 

906 return CustomView.get_by_attribute("user_id", user_id) 

907 

908 

909@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

910async def get_custom_view(custom_view_id: str): 

911 return CustomView.get_from_id(custom_view_id) 

912 

913 

914@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

915async def create_custom_view( 

916 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

917): 

918 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

919 custom_view.insert() 

920 return custom_view 

921 

922 

923@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

924async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

925 custom_view = CustomView.get_from_id(custom_view_id) 

926 return custom_view.update(custom_view_update.model_dump()) 

927 

928 

929@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

930async def delete_custom_view(custom_view_id: str): 

931 custom_view = CustomView.get_from_id(custom_view_id) 

932 return custom_view.delete() 

933 

934 

935@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

936async def add_video(video: Video): 

937 video.insert() 

938 if not video: 

939 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

940 return video 

941 

942 

943@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

944async def get_videos(): 

945 return Video.get_all() 

946 

947 

948@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

949def get_stream(video_id): 

950 camera_name = Video.get_video(video_id) 

951 if camera_name is None: 

952 raise HTTPException(status_code=404, detail="Camera not found") 

953 return camera_name 

954 

955 

956@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

957async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

958 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

959 

960 

961@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

962async def create_signals_preset( 

963 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

964): 

965 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

966 return new_signals_preset 

967 

968 

969@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

970async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

971 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

972 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

973 

974 

975@app.delete( 

976 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

977) 

978async def delete_signals_preset(signals_preset_id: str): 

979 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

980 return signals_preset.delete() 

981 

982 

983@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

984async def create_graph_theme( 

985 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

986): 

987 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

988 if styled_signal is None: 

989 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

990 

991 graph_theme = PrivateGraphTheme.create( 

992 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

993 ) 

994 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

995 

996 

997@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

998async def get_all_graph_themes( 

999 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

1000) -> ListResponse[PublicGraphTheme]: 

1001 return PublicGraphTheme.response_from_query(query, current_user.id) 

1002 

1003 

1004@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

1005async def get_graph_themes_in_library( 

1006 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

1007) -> ListResponse[PublicGraphTheme]: 

1008 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

1009 

1010 

1011@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

1012async def update_graph_theme( 

1013 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

1014): 

1015 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

1016 update_dict = theme_update.model_dump(exclude_unset=True) 

1017 if current_user.id != graph_theme.creator_id: 

1018 for theme_property in update_dict.keys(): 

1019 if theme_property not in ["active_for_user", "in_user_library"]: 

1020 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

1021 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

1022 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

1023 

1024 

1025@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

1026async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

1027 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

1028 if current_user.id != graph_theme.creator_id: 

1029 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

1030 return graph_theme.delete() 

1031 

1032 

1033@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

1034async def get_signals_appearances( 

1035 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

1036) -> dict: 

1037 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id) 

1038 

1039 

1040@app.get("/device-deployers", dependencies=[Depends(get_current_active_user)]) 

1041async def get_device_deployers() -> list[DeviceDeployer]: 

1042 return DeviceDeployer.get_all() 

1043 

1044 

1045@app.get("/device-deployers/{deployer_id}", dependencies=[Depends(get_current_active_user)]) 

1046async def get_device_deployers(deployer_id: MongoId) -> DeviceDeployer: 

1047 deployer = DeviceDeployer.get_from_id(deployer_id) 

1048 if deployer is None: 

1049 raise HTTPException( 

1050 status_code=404, 

1051 detail=f"Deployer not found", 

1052 ) 

1053 return deployer 

1054 

1055 

1056@app.patch("/device-deployers/{deployer_id}", dependencies=[Depends(get_current_active_user)]) 

1057async def update_device_deployers(deployer_id: MongoId, deployer_update: DeviceDeployerUpdate) -> DeviceDeployer: 

1058 deployer = DeviceDeployer.get_from_id(deployer_id) 

1059 if deployer is None: 

1060 raise HTTPException( 

1061 status_code=404, 

1062 detail=f"Deployer not found", 

1063 ) 

1064 return deployer.update(deployer_update.model_dump(exclude_unset=True, mode="json")) 

1065 

1066 

1067@app.post("/device-deployers/{deployer_id}/devices", dependencies=[Depends(get_current_active_user)], status_code=201) 

1068async def create_device_from_deployer(deployer_id: MongoId, device: DeviceFromDeployerCreation) -> DeviceFromDeployer: 

1069 deployer = DeviceDeployer.get_from_id(deployer_id) 

1070 device = deployer.create_device(device) 

1071 if device is None: 

1072 raise HTTPException(status_code=400, detail="Cannot create device") 

1073 return device 

1074 

1075 

1076@app.post("/device-deployers", dependencies=[Depends(get_current_active_user)], status_code=201) 

1077async def add_deployer(deployer: DeviceDeployer): 

1078 deployer.insert() 

1079 return deployer 

1080 

1081 

1082@app.delete("/device-deployers/{deployer_id}", dependencies=[Depends(get_current_active_user)]) 

1083async def add_deployer(deployer_id: MongoId): 

1084 deployer = DeviceDeployer.get_from_id(deployer_id) 

1085 if deployer is None: 

1086 raise HTTPException( 

1087 status_code=404, 

1088 detail=f"Deployer not found", 

1089 ) 

1090 deleted_info = deployer.delete() 

1091 return deleted_info 

1092 

1093 

1094@app.get("/device-deployers/{deployer_id}/devices", dependencies=[Depends(get_current_active_user)]) 

1095async def get_devices_from_deployer(deployer_id: MongoId) -> list[DeviceFromDeployer]: 

1096 deployer = DeviceDeployer.get_from_id(deployer_id) 

1097 if deployer is None: 

1098 raise HTTPException( 

1099 status_code=404, 

1100 detail=f"Deployer not found", 

1101 ) 

1102 devices = deployer.devices() 

1103 if devices is None: 

1104 raise HTTPException( 

1105 status_code=500, 

1106 detail="Error getting devices", 

1107 ) 

1108 return devices 

1109 

1110 

1111@app.get("/device-deployers/{deployer_id}/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

1112async def update_device_from_deployer(deployer_id: MongoId, device_id: DeviceId) -> DeviceFromDeployer: 

1113 deployer = DeviceDeployer.get_from_id(deployer_id) 

1114 if deployer is None: 

1115 raise HTTPException( 

1116 status_code=404, 

1117 detail=f"Deployer not found", 

1118 ) 

1119 device = deployer.get_device(device_id=device_id) 

1120 if device is None: 

1121 raise HTTPException( 

1122 status_code=404, 

1123 detail=f"Device not found", 

1124 ) 

1125 return device 

1126 

1127 

1128@app.patch("/device-deployers/{deployer_id}/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

1129async def update_device_from_deployer( 

1130 deployer_id: MongoId, 

1131 device_id: DeviceId, 

1132 device_update: DeviceUpdateFromDeployer, 

1133) -> DeviceFromDeployer: 

1134 deployer = DeviceDeployer.get_from_id(deployer_id) 

1135 device = deployer.update_device(device_id, device_update) 

1136 return device 

1137 

1138 

1139@app.delete("/device-deployers/{deployer_id}/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

1140async def delete_device_from_deployer(deployer_id: MongoId, device_id: DeviceId) -> bool: 

1141 deployer = DeviceDeployer.get_from_id(deployer_id) 

1142 if deployer is None: 

1143 raise HTTPException( 

1144 status_code=404, 

1145 detail=f"Deployer not found", 

1146 ) 

1147 delete_info = deployer.delete_device(device_id=device_id) 

1148 if not delete_info.is_deleted: 

1149 raise HTTPException( 

1150 status_code=500, 

1151 detail=f"Cannot delete device: {delete_info.detail}", 

1152 ) 

1153 return delete_info.is_deleted