Coverage for  / usr / local / lib / python3.11 / site-packages / twinpad_backend / api.py: 96%

489 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-25 16:48 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 Signal, 

19 ForcedSignal, 

20 SignalData, 

21 SignalSample, 

22 ServicesStatus, 

23 Device, 

24 DeviceUpdate, 

25 DeviceSetup, 

26 DeviceSetupUpdate, 

27 DeviceState, 

28 SignalUpdate, 

29 SignalsData, 

30 Event, 

31 EventRule, 

32 TwinPadActivity, 

33 User, 

34 UserUpdate, 

35 Campaign, 

36 Phase, 

37 CustomView, 

38 Command, 

39 CustomViewCreation, 

40 CustomViewUpdate, 

41 Video, 

42 SignalsPreset, 

43 SignalsPresetCreation, 

44 SignalsPresetUpdate, 

45 PrivateGraphTheme, 

46 PublicGraphTheme, 

47 GraphThemeCreation, 

48 GraphThemeUpdate, 

49 SINGLE_POST_PROCESSING_FUNCTION, 

50 DOUBLE_POST_PROCESSING_FUNCTION, 

51 MULTIPLE_POST_PROCESSING_FUNCTION, 

52) 

53from twinpad_backend.auth import ( 

54 Token, 

55 authenticate_user, 

56 get_current_active_user, 

57 ACCESS_TOKEN_EXPIRE_MINUTES, 

58 create_access_token, 

59 get_password_hash, 

60) 

61from twinpad_backend.queries import ( 

62 SignalQuery, 

63 ForcedSignalQuery, 

64 DeviceStatesQuery, 

65 EventQuery, 

66 EventRuleQuery, 

67 CommandQuery, 

68 GraphThemeQuery, 

69) 

70from twinpad_backend.responses import ListResponse 

71 

72REQUEST_TIME_WARNING = 0.5 

73 

74DEBUG = os.environ.get("DEBUG", "false") == "true" 

75PROFILING = os.environ.get("PROFILING", "false") == "true" 

76 

77logger = logging.getLogger("uvicorn.error") 

78logger.propagate = False 

79logger.info("Debug mode: %s", DEBUG) 

80logger.info("log level: %s", logging.root.level) 

81 

82 

83app = FastAPI(title="Twinpad backend", version=__version__) 

84 

85app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

86 

87if PROFILING: # pragma: no cover 

88 profiling_folder = mkdtemp() 

89 logger.info("Profiling enabled") 

90 

91 @app.middleware("http") 

92 async def profile_request(request: Request, call_next): 

93 should_profile = True 

94 url = str(request.url) 

95 for segment in ("profiling", ".ico"): 

96 if segment in url: 

97 should_profile = False 

98 break 

99 

100 if should_profile: # avoid recursion 

101 profiler = Profiler() 

102 profiler.start() 

103 

104 response = await call_next(request) 

105 

106 profiler.stop() 

107 url = "_".join(url.split("/")[3:]).rstrip("/") 

108 if not url: 

109 url = "slash" 

110 filename = url.split("?", maxsplit=1)[0] 

111 logger.info("saving profiling to %s", filename) 

112 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

113 profiling_file.write(profiler.output_html()) 

114 

115 return response 

116 

117 return await call_next(request) 

118 

119 @app.get("/profilings") 

120 async def profilings(): 

121 return {"profilings": os.listdir(profiling_folder)} 

122 

123 @app.get("/profilings/{file_name}") 

124 async def profiling(file_name): 

125 file_path = os.path.join(profiling_folder, file_name) 

126 

127 if not os.path.exists(file_path): 

128 raise HTTPException( 

129 status_code=404, 

130 detail=f"Profiling file '{file_name}' not found", 

131 ) 

132 

133 with open(file_path, "r", encoding="utf-8") as profiling_file: 

134 return Response( 

135 content=profiling_file.read(), 

136 media_type="application/html", 

137 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'}, 

138 ) 

139 

140 

141@app.middleware("http") 

142async def log_request_time(request: Request, call_next): 

143 start_time = time.time() # Record the start time 

144 response = await call_next(request) # Process the request 

145 duration = time.time() - start_time # Calculate the time taken 

146 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

147 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

148 if duration > REQUEST_TIME_WARNING: 

149 logger.warning(message) 

150 else: 

151 logger.info(message) 

152 return response 

153 

154 

155@app.get("/") 

156async def slash(): 

157 return {"twinpad_version": __version__} 

158 

159 

160@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

161async def status(): 

162 """ 

163 Return service healthcheck 

164 """ 

165 return { 

166 "services": ServicesStatus.check(), 

167 } 

168 

169 

170@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

171async def get_devices() -> list[Device]: 

172 return Device.get_all(sort_by="device_id") 

173 

174 

175@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

176async def get_device(device_id): 

177 device = Device.get_one_by_attribute("device_id", device_id) 

178 if not device: 

179 raise HTTPException( 

180 status_code=404, 

181 detail="Device not found", 

182 ) 

183 return device 

184 

185 

186@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

187async def update_item( 

188 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

189): 

190 device = Device.get_one_by_attribute("device_id", device_id) 

191 if not device: 

192 raise HTTPException( 

193 status_code=404, 

194 detail="Device not found", 

195 ) 

196 result = await device.change_mode(device_update, current_user) 

197 if result.get("error", False) is True: 

198 raise HTTPException( 

199 status_code=result.get("status_code", 500), 

200 detail=result.get("message", "An error has occurred"), 

201 ) 

202 return result 

203 

204 

205@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

206async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

207 return DeviceState.get_from_id_and_query(device_id, query) 

208 

209 

210@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

211async def get_device_setups() -> list[DeviceSetup]: 

212 return DeviceSetup.get_all() 

213 

214 

215@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

216async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

217 device_setup.insert() 

218 return device_setup 

219 

220 

221@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

222async def get_device_setup(device_setup_id: str): 

223 device_setup = DeviceSetup.get_from_id(device_setup_id) 

224 if device_setup is None: 

225 raise HTTPException( 

226 status_code=404, 

227 detail="Device setup not found", 

228 ) 

229 return device_setup 

230 

231 

232@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

233async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

234 device_setup = DeviceSetup.get_from_id(device_setup_id) 

235 if device_setup is None: 

236 raise HTTPException( 

237 status_code=404, 

238 detail="Device setup not found", 

239 ) 

240 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

241 return device_setup 

242 

243 

244@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

245async def delete_device_setups(device_setup_id: str) -> bool: 

246 device_setup = DeviceSetup.get_from_id(device_setup_id) 

247 if device_setup is None: 

248 raise HTTPException( 

249 status_code=404, 

250 detail="Device setup not found", 

251 ) 

252 deleted = device_setup.delete() 

253 return deleted 

254 

255 

256@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

257async def get_number_samples( 

258 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

259) -> list[TwinPadActivity]: 

260 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

261 

262 

263@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

264async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

265 if "signal_id" not in query.sort_by: 

266 query.sort_by += ",signal_id:1" 

267 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

268 

269 

270@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

271async def signals_names() -> list[str]: 

272 return Signal.get_all_ids() 

273 

274 

275@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

276async def signal_stats(): 

277 """ 

278 Returns signals stats 

279 """ 

280 signal_statuses = Signal.get_all_statuses() 

281 signal_ids = [signal["signal_id"] for signal in signal_statuses] 

282 

283 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids) 

284 number_samples = sum(number_samples_by_signal_id.values()) 

285 

286 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up") 

287 

288 number_signals = Signal.get_number_documents() 

289 

290 return { 

291 "signal_data_size": signal_datasize(), 

292 "number_signal_samples": number_samples, 

293 "number_active_signals": number_active_signals, 

294 "number_signals": number_signals, 

295 } 

296 

297 

298@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

299async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

300 return SignalSample.get_last_from_signal_ids(signal_ids) 

301 

302 

303@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

304async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

305 return SignalSample.get_first_from_signal_ids(signal_ids) 

306 

307 

308@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

309async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

310 return Signal.get_forcibility(signal_ids) 

311 

312 

313@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)]) 

314async def get_forced_signals( 

315 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends() 

316): 

317 if not current_user.is_admin: 

318 raise HTTPException(401) 

319 return ForcedSignal.response_from_query(query) 

320 

321 

322@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

323async def get_signal(signal_id): 

324 signal = Signal.get_from_signal_id(signal_id) 

325 if not signal: 

326 raise HTTPException( 

327 status_code=404, 

328 detail="Signal not found", 

329 ) 

330 return signal.to_dict() 

331 

332 

333@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

334async def update_signal( 

335 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

336): 

337 signal = Signal.get_from_signal_id(signal_id) 

338 if not signal: 

339 raise HTTPException( 

340 status_code=404, 

341 detail="Device not found", 

342 ) 

343 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id) 

344 if forced_signal is not None: 

345 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

346 raise HTTPException( 

347 status_code=403, 

348 detail="Cannot override another user's forcing", 

349 ) 

350 

351 result = await signal.send_command(signal_update, current_user) 

352 if result.get("error", False) is True: 

353 raise HTTPException( 

354 status_code=result.get("status_code", 500), 

355 detail=result.get("message", "An error has occurred"), 

356 ) 

357 

358 if forced_signal is not None and signal_update.forced_value is None: 

359 forced_signal.delete() 

360 elif signal_update.forced_value is not None: 

361 forced_signal = ForcedSignal( 

362 signal_id=signal_id, 

363 forcing_user_id=current_user.id, 

364 forced_at=time.time(), 

365 value=signal_update.forced_value, 

366 ) 

367 forced_signal.insert() 

368 

369 return result 

370 

371 

372@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)]) 

373async def get_signal_forcibility( 

374 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)] 

375) -> bool: 

376 return ForcedSignal.can_force(signal_id, current_user) 

377 

378 

379@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

380async def get_signal_data( 

381 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

382) -> SignalData | None: 

383 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

384 

385 if number_samples_max is not None: 

386 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

387 

388 return signal_data 

389 

390 

391@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

392async def get_last_value(signal_id) -> SignalSample: 

393 sample = SignalSample.get_last_from_signal_id(signal_id) 

394 if sample is None: 

395 raise HTTPException(status_code=404, detail="No data") 

396 return sample 

397 

398 

399@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

400async def get_first_value(signal_id) -> SignalSample: 

401 sample = SignalSample.get_first_from_signal_id(signal_id) 

402 if sample is None: 

403 raise HTTPException(status_code=404, detail="No data") 

404 return sample 

405 

406 

407@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

408async def get_signal_number_samples(signal_id): 

409 signal = Signal.get_from_signal_id(signal_id) 

410 if not signal: 

411 raise HTTPException( 

412 status_code=404, 

413 detail="Device not found", 

414 ) 

415 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

416 

417 

418@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

419async def get_signals_data( 

420 signal_ids: list[str] = Query(default=[]), 

421 number_samples_max: int = None, 

422 min_timestamp: float = None, 

423 max_timestamp: float = None, 

424 interpolate_bounds: bool = True, 

425) -> SignalsData | None: 

426 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

427 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

428 

429 signals_data = SignalsData.get_from_signal_ids( 

430 signal_ids, 

431 min_timestamp=min_timestamp, 

432 max_timestamp=max_timestamp, 

433 window_min_timestamp=min_timestamp, 

434 window_max_timestamp=max_timestamp, 

435 interpolate_bounds=interpolate_bounds, 

436 ) 

437 if number_samples_max is not None: 

438 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

439 

440 return signals_data 

441 

442 

443@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

444async def get_signals_data_interest_window( 

445 window_max_number_samples: int | None = None, 

446 outside_max_number_samples: int | None = None, 

447 window_min_timestamp: float = None, 

448 window_max_timestamp: float = None, 

449 signal_ids: list[str] = Query(default=[]), 

450 min_timestamp: float = None, 

451 max_timestamp: float = None, 

452) -> SignalsData | None: 

453 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

454 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

455 

456 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

457 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

458 

459 max_documents = 0 

460 

461 if window_max_number_samples is not None: 

462 max_documents += 10 * window_max_number_samples 

463 if outside_max_number_samples is not None: 

464 max_documents += 10 * outside_max_number_samples 

465 

466 if max_documents == 0: 

467 max_documents = None 

468 

469 signals_data = SignalsData.get_from_signal_ids( 

470 signal_ids, 

471 min_timestamp=min_timestamp, 

472 max_timestamp=max_timestamp, 

473 window_min_timestamp=window_min_timestamp, 

474 window_max_timestamp=window_max_timestamp, 

475 max_documents=max_documents, 

476 ) 

477 

478 signals_data = signals_data.interest_window_desampling( 

479 window_max_number_samples=window_max_number_samples, 

480 outside_max_number_samples=outside_max_number_samples, 

481 window_min_timestamp=window_min_timestamp, 

482 window_max_timestamp=window_max_timestamp, 

483 ) 

484 

485 return signals_data 

486 

487 

488@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

489async def export_signals_zip( 

490 file_format: str, 

491 signal_ids: list[str] = Query(default=[]), 

492 min_timestamp: float = None, 

493 max_timestamp: float = None, 

494): 

495 signals_data = SignalsData.get_from_signal_ids( 

496 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

497 ) 

498 zip_data = signals_data.zip_export(file_format) 

499 return Response( 

500 content=zip_data, 

501 media_type="application/zip", 

502 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

503 ) 

504 

505 

506@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

507async def export_signals_hdf5( 

508 signal_ids: list[str] = Query(default=[]), 

509 min_timestamp: float = None, 

510 max_timestamp: float = None, 

511): 

512 signals_data = SignalsData.get_from_signal_ids( 

513 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

514 ) 

515 data = signals_data.hdf5_export() 

516 return Response( 

517 content=data, 

518 media_type="application/hdf5", 

519 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

520 ) 

521 

522 

523@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)]) 

524async def get_signals_data_post_processing( 

525 phase_ids: list[str] = Query(default=[]), 

526 phase_sync_times: list[float | None] = Query(default=[]), 

527 signal_ids: list[str] = Query(default=[]), 

528 window_min_timestamps: list[float | None] = Query(default=[]), 

529 window_max_timestamps: list[float | None] = Query(default=[]), 

530 number_samples_max: int = None, 

531) -> SignalsData | None: 

532 if len(phase_sync_times) == 0: 

533 phase_sync_times = [None for _ in range(len(phase_ids))] 

534 if len(window_min_timestamps) == 0: 

535 window_min_timestamps = [None for _ in range(len(phase_ids))] 

536 if len(window_max_timestamps) == 0: 

537 window_max_timestamps = [None for _ in range(len(phase_ids))] 

538 

539 if ( 

540 len(phase_ids) != len(phase_sync_times) 

541 or len(phase_ids) != len(window_min_timestamps) 

542 or len(phase_ids) != len(window_max_timestamps) 

543 ): 

544 raise HTTPException( 

545 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps." 

546 ) 

547 

548 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

549 

550 if None in phases: 

551 raise HTTPException(404, "Phase not found") 

552 

553 signals_data = SignalsData.get_from_phase_and_signal_ids( 

554 phases=phases, 

555 phase_sync_times=phase_sync_times, 

556 signal_ids=signal_ids, 

557 window_min_timestamps=window_min_timestamps, 

558 window_max_timestamps=window_max_timestamps, 

559 ) 

560 

561 if number_samples_max is not None: 

562 signals_data = signals_data.min_max_downsampling(number_samples_max) 

563 

564 return signals_data 

565 

566 

567@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)]) 

568async def apply_single_post_processing_function( 

569 phase_id: str, 

570 base_signal_id: str, 

571 function: SINGLE_POST_PROCESSING_FUNCTION, 

572 phase_sync_time: float = None, 

573 window_min_timestamp: float = None, 

574 window_max_timestamp: float = None, 

575 number_samples_max: int = None, 

576) -> SignalsData | None: 

577 phase = Phase.get_from_id(phase_id) 

578 

579 if phase is None: 

580 raise HTTPException(404, "Phase not found") 

581 if phase_sync_time is None: 

582 phase_sync_time = phase.start_at / 1000 

583 

584 signals_data = await SignalsData.apply_single_function( 

585 phase, 

586 base_signal_id, 

587 function, 

588 window_min_timestamp=window_min_timestamp, 

589 window_max_timestamp=window_max_timestamp, 

590 ) 

591 

592 if signals_data is None: 

593 raise HTTPException(500, "There was en error while applying the function") 

594 

595 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

596 signals_data = signals_data.min_max_downsampling(number_samples_max) 

597 signals_data = signals_data.zero_time_vector(phase_sync_time) 

598 

599 return signals_data 

600 

601 

602@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)]) 

603async def apply_multiple_post_processing_function( 

604 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

605 phase_ids: list[str] = Query(default=[]), 

606 phase_sync_times: list[float] = Query(default=[]), 

607 signal_ids: list[str] = Query(default=[]), 

608 window_min_timestamp: float = None, 

609 window_max_timestamp: float = None, 

610 number_samples_max: int = None, 

611) -> SignalsData | None: 

612 if len(phase_ids) != len(signal_ids): 

613 raise HTTPException(400, "Each selected signal should correspond to a phase") 

614 

615 if len(phase_ids) < 2: 

616 raise HTTPException(400, "These functions can only be applied to multiple signals") 

617 

618 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

619 if None in phases: 

620 raise HTTPException(404, "Phase not found") 

621 

622 if len(phase_sync_times) == 0: 

623 phase_sync_times = [phase.start_at / 1000 for phase in phases] 

624 if len(phases) != len(phase_sync_times): 

625 raise HTTPException(400, "Number of synchronization times does not match the number of phases") 

626 

627 signals_data = await SignalsData.apply_multiple_function( 

628 phases, 

629 signal_ids, 

630 function, 

631 window_min_timestamp=window_min_timestamp, 

632 window_max_timestamp=window_max_timestamp, 

633 ) 

634 

635 if signals_data is None: 

636 raise HTTPException(500, "There was en error while applying the function") 

637 

638 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

639 signals_data = signals_data.min_max_downsampling(number_samples_max) 

640 if function in {"Align-X", "Using-X"}: 

641 signals_data = signals_data.zero_time_vector(phase_sync_times[1]) 

642 else: 

643 signals_data = signals_data.zero_time_vector(phase_sync_times[0]) 

644 

645 return signals_data 

646 

647 

648@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)]) 

649async def export_post_processing_zip( 

650 file_format: str, 

651 phase_ids: list[str] = Query(default=[]), 

652 phase_sync_times: list[float | None] = Query(default=[]), 

653 signal_ids: list[str] = Query(default=[]), 

654 window_min_timestamps: list[float | None] = Query(default=[]), 

655 window_max_timestamps: list[float | None] = Query(default=[]), 

656): 

657 signals_data = await get_signals_data_post_processing( 

658 phase_ids, 

659 phase_sync_times, 

660 signal_ids, 

661 window_min_timestamps, 

662 window_max_timestamps, 

663 ) 

664 

665 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids) 

666 

667 return Response( 

668 content=zip_data, 

669 media_type="application/zip", 

670 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

671 ) 

672 

673 

674@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

675async def export_post_processing_hdf5( 

676 phase_ids: list[str] = Query(default=[]), 

677 phase_sync_times: list[float | None] = Query(default=[]), 

678 signal_ids: list[str] = Query(default=[]), 

679 window_min_timestamps: list[float | None] = Query(default=[]), 

680 window_max_timestamps: list[float | None] = Query(default=[]), 

681): 

682 signals_data = await get_signals_data_post_processing( 

683 phase_ids, 

684 phase_sync_times, 

685 signal_ids, 

686 window_min_timestamps, 

687 window_max_timestamps, 

688 ) 

689 

690 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids) 

691 

692 return Response( 

693 content=data, 

694 media_type="application/hdf5", 

695 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

696 ) 

697 

698 

699@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

700async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

701 return Event.response_from_query(query) 

702 

703 

704@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

705async def get_event(event_id) -> Event: 

706 event = Event.get_from_id(event_id) 

707 if event is None: 

708 raise HTTPException(status_code=404, detail="No such event") 

709 return event 

710 

711 

712@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

713async def get_number_events( 

714 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

715) -> list[TwinPadActivity]: 

716 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

717 

718 

719@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

720async def get_number_commands( 

721 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

722) -> list[TwinPadActivity]: 

723 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

724 

725 

726@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

727async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

728 return EventRule.response_from_query(query) 

729 

730 

731@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

732async def get_event_rule(event_rule_id) -> EventRule: 

733 event_rule = EventRule.get_from_id(event_rule_id) 

734 if event_rule is None: 

735 raise HTTPException(status_code=404, detail="No such event rule") 

736 return event_rule 

737 

738 

739@app.post("/users", status_code=201) 

740async def create_user(user: User): 

741 if User.get_one_by_attribute("email", user.email) is not None: 

742 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

743 hashed_password = get_password_hash(user.password) 

744 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

745 if new_user is None: 

746 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

747 return new_user 

748 

749 

750@app.post("/token", status_code=201) 

751async def login_for_access_token( 

752 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

753) -> Token: 

754 user = authenticate_user(form_data.username, form_data.password) 

755 if not user: 

756 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

757 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

758 if user.is_active: 

759 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

760 access_token = create_access_token( 

761 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

762 ) 

763 return Token(access_token=access_token, token_type="bearer") 

764 

765 

766@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

767async def get_users(): 

768 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

769 

770 

771@app.get("/users/me", response_model=User) 

772async def get_current_user( 

773 current_user: Annotated[User, Depends(get_current_active_user)], 

774): 

775 del current_user.password 

776 return current_user 

777 

778 

779@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

780async def get_user(user_id: str): 

781 user = User.get_from_id(user_id) 

782 

783 if user is None: 

784 raise HTTPException( 

785 status_code=404, 

786 detail="User not found", 

787 ) 

788 return user.to_dict(exclude={"password", "is_connected"}) 

789 

790 

791@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

792async def patch_user(user: UserUpdate, user_id): 

793 if user.password == "" or user.password is None: 

794 del user.password 

795 else: 

796 user.password = get_password_hash(user.password) 

797 return User.update_info(user, user_id) 

798 

799 

800@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

801async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

802 return Command.response_from_query(query) 

803 

804 

805@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

806async def get_campaigns(): 

807 return Campaign.get_all() 

808 

809 

810@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

811async def get_campaign_by_id(campaign_id: str): 

812 campaign = Campaign.get_from_id(campaign_id) 

813 if campaign is None: 

814 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign") 

815 return campaign 

816 

817 

818@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

819async def add_campaign(campaign: Campaign): 

820 new_campaign = Campaign.create(campaign) 

821 if new_campaign is None: 

822 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

823 return new_campaign 

824 

825 

826@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

827async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

828 campaign = Campaign.get_from_id(campaign_id) 

829 if campaign is None: 

830 raise HTTPException(status_code=500, detail="An error occurred during campaign edition") 

831 campaign.name = edit_campaign.name 

832 campaign.description = edit_campaign.description 

833 return Campaign.update(campaign) 

834 

835 

836@app.delete( 

837 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

838) 

839async def delete_campaign(campaign_id: str): 

840 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

841 campaign = Campaign.get_from_id(campaign_id) 

842 if campaign is None: 

843 raise exception 

844 delete_phases = Phase.deleteMany(campaign_id) 

845 if not delete_phases.acknowledged: 

846 raise exception 

847 campaign_deleted = Campaign.delete(campaign_id) 

848 return campaign_deleted.acknowledged 

849 

850 

851@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

852async def get_campaign_phases(campaign_id: str): 

853 return Phase.get_by_attribute("campaign_id", campaign_id) 

854 

855 

856@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

857async def get_phase(phase_id: str): 

858 phase = Phase.get_from_id(phase_id) 

859 if phase is None: 

860 raise HTTPException(status_code=404, detail="Phase not found") 

861 return phase 

862 

863 

864@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

865async def add_phase(phase: Phase): 

866 new_phase = Phase.create(phase) 

867 if new_phase is None: 

868 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

869 return new_phase 

870 

871 

872@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

873async def edit_phase(phase_id, edit_phase: Phase): 

874 phase = Phase.get_from_id(phase_id) 

875 if phase is None: 

876 raise HTTPException(status_code=500, detail="An error occurred during Phase edition") 

877 phase.name = edit_phase.name 

878 phase.description = edit_phase.description 

879 phase.start_at = edit_phase.start_at 

880 phase.end_at = edit_phase.end_at 

881 return Phase.update(phase) 

882 

883 

884@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

885async def delete_phase(phase_id: str): 

886 phase = Phase.get_from_id(phase_id) 

887 if phase is None: 

888 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion") 

889 phase_deleted = Phase.delete(phase_id) 

890 return phase_deleted.acknowledged 

891 

892 

893@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

894async def get_custom_views(): 

895 return CustomView.get_all() 

896 

897 

898@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

899async def get_custom_views_from_user_id(user_id: str): 

900 return CustomView.get_by_attribute("user_id", user_id) 

901 

902 

903@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

904async def get_custom_view(custom_view_id: str): 

905 return CustomView.get_from_id(custom_view_id) 

906 

907 

908@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

909async def create_custom_view( 

910 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

911): 

912 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

913 custom_view.insert() 

914 return custom_view 

915 

916 

917@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

918async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

919 custom_view = CustomView.get_from_id(custom_view_id) 

920 return custom_view.update(custom_view_update.model_dump()) 

921 

922 

923@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

924async def delete_custom_view(custom_view_id: str): 

925 custom_view = CustomView.get_from_id(custom_view_id) 

926 return custom_view.delete() 

927 

928 

929@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

930async def add_video(video: Video): 

931 video.insert() 

932 if not video: 

933 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

934 return video 

935 

936 

937@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

938async def get_videos(): 

939 return Video.get_all() 

940 

941 

942@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

943def get_stream(video_id): 

944 camera_name = Video.get_video(video_id) 

945 if camera_name is None: 

946 raise HTTPException(status_code=404, detail="Camera not found") 

947 return camera_name 

948 

949 

950@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

951async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

952 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

953 

954 

955@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

956async def create_signals_preset( 

957 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

958): 

959 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

960 return new_signals_preset 

961 

962 

963@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

964async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

965 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

966 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

967 

968 

969@app.delete( 

970 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

971) 

972async def delete_signals_preset(signals_preset_id: str): 

973 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

974 return signals_preset.delete() 

975 

976 

977@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

978async def create_graph_theme( 

979 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

980): 

981 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

982 if styled_signal is None: 

983 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

984 

985 graph_theme = PrivateGraphTheme.create( 

986 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

987 ) 

988 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

989 

990 

991@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

992async def get_all_graph_themes( 

993 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

994) -> ListResponse[PublicGraphTheme]: 

995 return PublicGraphTheme.response_from_query(query, current_user.id) 

996 

997 

998@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

999async def get_graph_themes_in_library( 

1000 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

1001) -> ListResponse[PublicGraphTheme]: 

1002 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

1003 

1004 

1005@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

1006async def update_graph_theme( 

1007 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

1008): 

1009 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

1010 update_dict = theme_update.model_dump(exclude_unset=True) 

1011 if current_user.id != graph_theme.creator_id: 

1012 for theme_property in update_dict.keys(): 

1013 if theme_property not in ["active_for_user", "in_user_library"]: 

1014 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

1015 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

1016 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

1017 

1018 

1019@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

1020async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

1021 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

1022 if current_user.id != graph_theme.creator_id: 

1023 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

1024 return graph_theme.delete() 

1025 

1026 

1027@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

1028async def get_signals_appearances( 

1029 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

1030) -> dict: 

1031 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)