Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / api.py: 95%

495 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-11 15:40 +0000

1import os 

2import logging 

3import time 

4from typing import Annotated 

5from datetime import timedelta 

6from pathlib import Path 

7from pyinstrument import Profiler 

8 

9from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

10from fastapi.middleware.cors import CORSMiddleware 

11from fastapi.responses import HTMLResponse 

12from fastapi.security import OAuth2PasswordRequestForm 

13 

14from twinpad_backend import __version__ 

15from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

16from twinpad_backend.models import ( 

17 DeviceId, 

18 MongoId, 

19 Signal, 

20 ForcedSignal, 

21 SignalData, 

22 SignalSample, 

23 ServicesStatus, 

24 Device, 

25 DeviceUpdate, 

26 DeviceSetup, 

27 DeviceSetupUpdate, 

28 DeviceState, 

29 SignalUpdate, 

30 SignalsData, 

31 Event, 

32 EventRule, 

33 TwinPadActivity, 

34 User, 

35 UserUpdate, 

36 Campaign, 

37 Phase, 

38 CustomView, 

39 Command, 

40 CustomViewCreation, 

41 CustomViewUpdate, 

42 Video, 

43 SignalsPreset, 

44 SignalsPresetCreation, 

45 SignalsPresetUpdate, 

46 PrivateGraphTheme, 

47 PublicGraphTheme, 

48 GraphThemeCreation, 

49 GraphThemeUpdate, 

50 SINGLE_POST_PROCESSING_FUNCTION, 

51 DOUBLE_POST_PROCESSING_FUNCTION, 

52 MULTIPLE_POST_PROCESSING_FUNCTION, 

53) 

54from twinpad_backend.auth import ( 

55 Token, 

56 authenticate_user, 

57 get_current_active_user, 

58 ACCESS_TOKEN_EXPIRE_MINUTES, 

59 create_access_token, 

60 get_password_hash, 

61) 

62from twinpad_backend.queries import ( 

63 SignalQuery, 

64 ForcedSignalQuery, 

65 DeviceStatesQuery, 

66 EventQuery, 

67 EventRuleQuery, 

68 CommandQuery, 

69 GraphThemeQuery, 

70) 

71from twinpad_backend.responses import ListResponse 

72from twinpad_backend.routes.deployers import router as deployers_router 

73from twinpad_backend.routes.configurator import router as router_configurator 

74 

75routers = {"/configurator": router_configurator, "/device-deployers": deployers_router} 

76 

77REQUEST_TIME_WARNING = 0.5 

78 

79DEBUG = os.environ.get("DEBUG", "false") == "true" 

80PROFILING = os.environ.get("PROFILING", "false") == "true" 

81DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true" 

82 

83logger = logging.getLogger("uvicorn.error") 

84logger.propagate = False 

85logger.info("Debug mode: %s", DEBUG) 

86logger.info("log level: %s", logging.root.level) 

87 

88 

89app = FastAPI(title="Twinpad backend", version=__version__) 

90 

91app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

92 

93if PROFILING: # pragma: no cover 

94 profiling_folder = "/tmp/twinpad_profiling" 

95 Path(profiling_folder).mkdir(parents=True, exist_ok=True) 

96 logger.info("Profiling enabled") 

97 

98 @app.middleware("http") 

99 async def profile_request(request: Request, call_next): 

100 should_profile = True 

101 url = str(request.url) 

102 for segment in ("profiling", ".ico"): 

103 if segment in url: 

104 should_profile = False 

105 break 

106 

107 if should_profile: # avoid recursion 

108 profiler = Profiler() 

109 profiler.start() 

110 

111 response = await call_next(request) 

112 

113 profiler.stop() 

114 url = "_".join(url.split("/")[3:]).rstrip("/") 

115 if not url: 

116 url = "slash" 

117 filename = url.split("?", maxsplit=1)[0] 

118 logger.info("saving profiling to %s", filename) 

119 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

120 profiling_file.write(profiler.output_html()) 

121 

122 return response 

123 

124 return await call_next(request) 

125 

126 @app.get("/profilings") 

127 async def profilings(): 

128 return {"profilings": os.listdir(profiling_folder)} 

129 

130 @app.get("/profilings/{file_name}") 

131 async def profiling(file_name): 

132 file_path = os.path.join(profiling_folder, file_name) 

133 

134 if not os.path.exists(file_path): 

135 raise HTTPException( 

136 status_code=404, 

137 detail=f"Profiling file '{file_name}' not found", 

138 ) 

139 

140 with open(file_path, "r", encoding="utf-8") as profiling_file: 

141 return Response( 

142 content=profiling_file.read(), 

143 media_type="application/html", 

144 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'}, 

145 ) 

146 

147 

148@app.middleware("http") 

149async def log_request_time(request: Request, call_next): 

150 start_time = time.time() # Record the start time 

151 response = await call_next(request) # Process the request 

152 duration = time.time() - start_time # Calculate the time taken 

153 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

154 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

155 if duration > REQUEST_TIME_WARNING: 

156 logger.warning(message) 

157 else: 

158 logger.info(message) 

159 return response 

160 

161 

162@app.get("/") 

163async def slash(): 

164 return {"twinpad_version": __version__} 

165 

166 

167@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

168async def status(): 

169 """ 

170 Return service healthcheck 

171 """ 

172 return { 

173 "services": ServicesStatus.check(), 

174 } 

175 

176 

177@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

178async def get_devices() -> list[Device]: 

179 return Device.get_all(sort_by="device_id") 

180 

181 

182@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

183async def get_device(device_id) -> Device: 

184 device = Device.get_one_by_attribute("device_id", device_id) 

185 if not device: 

186 raise HTTPException( 

187 status_code=404, 

188 detail="Device not found", 

189 ) 

190 return device 

191 

192 

193@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

194async def update_item( 

195 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

196): 

197 device = Device.get_one_by_attribute("device_id", device_id) 

198 if not device: 

199 raise HTTPException( 

200 status_code=404, 

201 detail="Device not found", 

202 ) 

203 result = await device.change_mode(device_update, current_user) 

204 if result.get("error", False) is True: 

205 raise HTTPException( 

206 status_code=result.get("status_code", 500), 

207 detail=result.get("message", "An error has occurred"), 

208 ) 

209 return result 

210 

211 

212@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

213async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

214 return DeviceState.get_from_id_and_query(device_id, query) 

215 

216 

217@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

218async def get_device_setups() -> list[DeviceSetup]: 

219 return DeviceSetup.get_all() 

220 

221 

222@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

223async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

224 device_setup.insert() 

225 return device_setup 

226 

227 

228@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

229async def get_device_setup(device_setup_id: str): 

230 device_setup = DeviceSetup.get_from_id(device_setup_id) 

231 if device_setup is None: 

232 raise HTTPException( 

233 status_code=404, 

234 detail="Device setup not found", 

235 ) 

236 return device_setup 

237 

238 

239@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

240async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

241 device_setup = DeviceSetup.get_from_id(device_setup_id) 

242 if device_setup is None: 

243 raise HTTPException( 

244 status_code=404, 

245 detail="Device setup not found", 

246 ) 

247 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

248 return device_setup 

249 

250 

251@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

252async def delete_device_setups(device_setup_id: str) -> bool: 

253 device_setup = DeviceSetup.get_from_id(device_setup_id) 

254 if device_setup is None: 

255 raise HTTPException( 

256 status_code=404, 

257 detail="Device setup not found", 

258 ) 

259 deleted = device_setup.delete() 

260 return deleted 

261 

262 

263@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

264async def get_number_samples( 

265 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

266) -> list[TwinPadActivity]: 

267 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

268 

269 

270@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

271async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

272 if "signal_id" not in query.sort_by: 

273 query.sort_by += ",signal_id:1" 

274 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

275 

276 

277@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

278async def signals_names() -> list[str]: 

279 return Signal.get_all_ids() 

280 

281 

282@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

283async def signal_stats(): 

284 """ 

285 Returns signals stats 

286 """ 

287 signal_statuses = Signal.get_all_statuses() 

288 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up") 

289 number_samples = Signal.total_number_samples() 

290 number_signals = Signal.get_number_documents() 

291 

292 return { 

293 "signal_data_size": signal_datasize(), 

294 "number_signal_samples": number_samples, 

295 "number_active_signals": number_active_signals, 

296 "number_signals": number_signals, 

297 } 

298 

299 

300@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

301async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

302 return SignalSample.get_last_from_signal_ids(signal_ids) 

303 

304 

305@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

306async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

307 return SignalSample.get_first_from_signal_ids(signal_ids) 

308 

309 

310@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

311async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

312 return Signal.get_forcibility(signal_ids) 

313 

314 

315@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)]) 

316async def get_forced_signals( 

317 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends() 

318): 

319 if not current_user.is_admin: 

320 raise HTTPException(401) 

321 return ForcedSignal.response_from_query(query) 

322 

323 

324@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

325async def get_signal(signal_id): 

326 signal = Signal.get_from_signal_id(signal_id) 

327 if not signal: 

328 raise HTTPException( 

329 status_code=404, 

330 detail="Signal not found", 

331 ) 

332 return signal.to_dict() 

333 

334 

335@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

336async def update_signal( 

337 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

338): 

339 signal = Signal.get_from_signal_id(signal_id) 

340 if not signal: 

341 raise HTTPException( 

342 status_code=404, 

343 detail="Signal not found", 

344 ) 

345 

346 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

347 if device is None: 

348 raise HTTPException( 

349 status_code=400, 

350 detail="Signal doesn't belong to an existing configuration", 

351 ) 

352 

353 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id) 

354 if forced_signal is not None: 

355 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

356 raise HTTPException( 

357 status_code=403, 

358 detail="Cannot override another user's forcing", 

359 ) 

360 

361 result = await signal.send_command(device.device_id, signal_update, current_user) 

362 if result.get("error", False) is True: 

363 raise HTTPException( 

364 status_code=result.get("status_code", 500), 

365 detail=result.get("message", "An error has occurred"), 

366 ) 

367 

368 if forced_signal is not None and signal_update.forced_value is None: 

369 forced_signal.delete() 

370 elif signal_update.forced_value is not None: 

371 forced_signal = ForcedSignal( 

372 signal_id=signal_id, 

373 forcing_user_id=current_user.id, 

374 forced_at=time.time(), 

375 value=signal_update.forced_value, 

376 ) 

377 forced_signal.insert() 

378 

379 return result 

380 

381 

382@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)]) 

383async def get_signal_forcibility( 

384 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)] 

385) -> bool: 

386 return ForcedSignal.can_force(signal_id, current_user) 

387 

388 

389@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

390async def get_signal_data( 

391 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

392) -> SignalData | None: 

393 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

394 

395 if number_samples_max is not None: 

396 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

397 

398 return signal_data 

399 

400 

401@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

402async def get_last_value(signal_id) -> SignalSample: 

403 sample = SignalSample.get_last_from_signal_id(signal_id) 

404 if sample is None: 

405 raise HTTPException(status_code=404, detail="No data") 

406 return sample 

407 

408 

409@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

410async def get_first_value(signal_id) -> SignalSample: 

411 sample = SignalSample.get_first_from_signal_id(signal_id) 

412 if sample is None: 

413 raise HTTPException(status_code=404, detail="No data") 

414 return sample 

415 

416 

417@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

418async def get_signal_number_samples(signal_id): 

419 signal = Signal.get_from_signal_id(signal_id) 

420 if not signal: 

421 raise HTTPException( 

422 status_code=404, 

423 detail="Device not found", 

424 ) 

425 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

426 

427 

428@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

429async def get_signals_data( 

430 signal_ids: list[str] = Query(default=[]), 

431 number_samples_max: int = None, 

432 min_timestamp: float = None, 

433 max_timestamp: float = None, 

434 interpolate_bounds: bool = True, 

435) -> SignalsData | None: 

436 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

437 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

438 

439 signals_data = SignalsData.get_from_signal_ids( 

440 signal_ids, 

441 min_timestamp=min_timestamp, 

442 max_timestamp=max_timestamp, 

443 window_min_timestamp=min_timestamp, 

444 window_max_timestamp=max_timestamp, 

445 interpolate_bounds=interpolate_bounds, 

446 ) 

447 if number_samples_max is not None: 

448 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

449 

450 return signals_data 

451 

452 

453@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

454async def get_signals_data_interest_window( 

455 window_max_number_samples: int | None = None, 

456 outside_max_number_samples: int | None = None, 

457 window_min_timestamp: float = None, 

458 window_max_timestamp: float = None, 

459 signal_ids: list[str] = Query(default=[]), 

460 min_timestamp: float = None, 

461 max_timestamp: float = None, 

462) -> SignalsData | None: 

463 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

464 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

465 

466 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

467 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

468 

469 max_documents = 0 

470 

471 if window_max_number_samples is not None: 

472 max_documents += 10 * window_max_number_samples 

473 if outside_max_number_samples is not None: 

474 max_documents += 10 * outside_max_number_samples 

475 

476 if max_documents == 0: 

477 max_documents = None 

478 

479 signals_data = SignalsData.get_from_signal_ids( 

480 signal_ids, 

481 min_timestamp=min_timestamp, 

482 max_timestamp=max_timestamp, 

483 window_min_timestamp=window_min_timestamp, 

484 window_max_timestamp=window_max_timestamp, 

485 max_documents=max_documents, 

486 ) 

487 

488 signals_data = signals_data.interest_window_desampling( 

489 window_max_number_samples=window_max_number_samples, 

490 outside_max_number_samples=outside_max_number_samples, 

491 window_min_timestamp=window_min_timestamp, 

492 window_max_timestamp=window_max_timestamp, 

493 ) 

494 

495 return signals_data 

496 

497 

498@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

499async def export_signals_zip( 

500 file_format: str, 

501 signal_ids: list[str] = Query(default=[]), 

502 min_timestamp: float = None, 

503 max_timestamp: float = None, 

504): 

505 signals_data = SignalsData.get_from_signal_ids( 

506 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

507 ) 

508 zip_data = signals_data.zip_export(file_format) 

509 return Response( 

510 content=zip_data, 

511 media_type="application/zip", 

512 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

513 ) 

514 

515 

516@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

517async def export_signals_hdf5( 

518 signal_ids: list[str] = Query(default=[]), 

519 min_timestamp: float = None, 

520 max_timestamp: float = None, 

521): 

522 signals_data = SignalsData.get_from_signal_ids( 

523 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

524 ) 

525 data = signals_data.hdf5_export() 

526 return Response( 

527 content=data, 

528 media_type="application/hdf5", 

529 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

530 ) 

531 

532 

533@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)]) 

534async def get_signals_data_post_processing( 

535 phase_ids: list[str] = Query(default=[]), 

536 phase_sync_times: list[float | None] = Query(default=[]), 

537 signal_ids: list[str] = Query(default=[]), 

538 window_min_timestamps: list[float | None] = Query(default=[]), 

539 window_max_timestamps: list[float | None] = Query(default=[]), 

540 number_samples_max: int = None, 

541) -> SignalsData | None: 

542 if len(phase_sync_times) == 0: 

543 phase_sync_times = [None for _ in range(len(phase_ids))] 

544 if len(window_min_timestamps) == 0: 

545 window_min_timestamps = [None for _ in range(len(phase_ids))] 

546 if len(window_max_timestamps) == 0: 

547 window_max_timestamps = [None for _ in range(len(phase_ids))] 

548 

549 if ( 

550 len(phase_ids) != len(phase_sync_times) 

551 or len(phase_ids) != len(window_min_timestamps) 

552 or len(phase_ids) != len(window_max_timestamps) 

553 ): 

554 raise HTTPException( 

555 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps." 

556 ) 

557 

558 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

559 

560 if None in phases: 

561 raise HTTPException(404, "Phase not found") 

562 

563 signals_data = SignalsData.get_from_phase_and_signal_ids( 

564 phases=phases, 

565 phase_sync_times=phase_sync_times, 

566 signal_ids=signal_ids, 

567 window_min_timestamps=window_min_timestamps, 

568 window_max_timestamps=window_max_timestamps, 

569 ) 

570 

571 if number_samples_max is not None: 

572 signals_data = signals_data.min_max_downsampling(number_samples_max) 

573 

574 return signals_data 

575 

576 

577@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)]) 

578async def apply_single_post_processing_function( 

579 phase_id: str, 

580 base_signal_id: str, 

581 function: SINGLE_POST_PROCESSING_FUNCTION, 

582 phase_sync_time: float = None, 

583 window_min_timestamp: float = None, 

584 window_max_timestamp: float = None, 

585 number_samples_max: int = None, 

586) -> SignalsData | None: 

587 phase = Phase.get_from_id(phase_id) 

588 

589 if phase is None: 

590 raise HTTPException(404, "Phase not found") 

591 if phase_sync_time is None: 

592 phase_sync_time = phase.start_at / 1000 

593 

594 signals_data = await SignalsData.apply_single_function( 

595 phase, 

596 base_signal_id, 

597 function, 

598 window_min_timestamp=window_min_timestamp, 

599 window_max_timestamp=window_max_timestamp, 

600 ) 

601 

602 if signals_data is None: 

603 raise HTTPException(500, "There was en error while applying the function") 

604 

605 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

606 signals_data = signals_data.min_max_downsampling(number_samples_max) 

607 signals_data = signals_data.zero_time_vector(phase_sync_time) 

608 

609 return signals_data 

610 

611 

612@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)]) 

613async def apply_multiple_post_processing_function( 

614 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

615 phase_ids: list[str] = Query(default=[]), 

616 phase_sync_times: list[float] = Query(default=[]), 

617 signal_ids: list[str] = Query(default=[]), 

618 window_min_timestamp: float = None, 

619 window_max_timestamp: float = None, 

620 number_samples_max: int = None, 

621) -> SignalsData | None: 

622 if len(phase_ids) != len(signal_ids): 

623 raise HTTPException(400, "Each selected signal should correspond to a phase") 

624 

625 if len(phase_ids) < 2: 

626 raise HTTPException(400, "These functions can only be applied to multiple signals") 

627 

628 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

629 if None in phases: 

630 raise HTTPException(404, "Phase not found") 

631 

632 if len(phase_sync_times) == 0: 

633 phase_sync_times = [phase.start_at / 1000 for phase in phases] 

634 if len(phases) != len(phase_sync_times): 

635 raise HTTPException(400, "Number of synchronization times does not match the number of phases") 

636 

637 signals_data = await SignalsData.apply_multiple_function( 

638 phases, 

639 signal_ids, 

640 function, 

641 window_min_timestamp=window_min_timestamp, 

642 window_max_timestamp=window_max_timestamp, 

643 ) 

644 

645 if signals_data is None: 

646 raise HTTPException(500, "There was en error while applying the function") 

647 

648 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

649 signals_data = signals_data.min_max_downsampling(number_samples_max) 

650 if function in {"Align-X", "Using-X"}: 

651 signals_data = signals_data.zero_time_vector(phase_sync_times[1]) 

652 else: 

653 signals_data = signals_data.zero_time_vector(phase_sync_times[0]) 

654 

655 return signals_data 

656 

657 

658@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)]) 

659async def export_post_processing_zip( 

660 file_format: str, 

661 phase_ids: list[MongoId] = Query(default=[]), 

662 phase_sync_times: list[float | None] = Query(default=[]), 

663 signal_ids: list[str] = Query(default=[]), 

664 window_min_timestamps: list[float | None] = Query(default=[]), 

665 window_max_timestamps: list[float | None] = Query(default=[]), 

666): 

667 signals_data = await get_signals_data_post_processing( 

668 phase_ids, 

669 phase_sync_times, 

670 signal_ids, 

671 window_min_timestamps, 

672 window_max_timestamps, 

673 ) 

674 

675 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids) 

676 

677 return Response( 

678 content=zip_data, 

679 media_type="application/zip", 

680 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

681 ) 

682 

683 

684@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

685async def export_post_processing_hdf5( 

686 phase_ids: list[str] = Query(default=[]), 

687 phase_sync_times: list[float | None] = Query(default=[]), 

688 signal_ids: list[str] = Query(default=[]), 

689 window_min_timestamps: list[float | None] = Query(default=[]), 

690 window_max_timestamps: list[float | None] = Query(default=[]), 

691): 

692 signals_data = await get_signals_data_post_processing( 

693 phase_ids, 

694 phase_sync_times, 

695 signal_ids, 

696 window_min_timestamps, 

697 window_max_timestamps, 

698 ) 

699 

700 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids) 

701 

702 return Response( 

703 content=data, 

704 media_type="application/hdf5", 

705 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

706 ) 

707 

708 

709@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

710async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

711 return Event.response_from_query(query) 

712 

713 

714@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

715async def get_event(event_id) -> Event: 

716 event = Event.get_from_id(event_id) 

717 if event is None: 

718 raise HTTPException(status_code=404, detail="No such event") 

719 return event 

720 

721 

722@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

723async def get_number_events( 

724 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

725) -> list[TwinPadActivity]: 

726 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

727 

728 

729@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

730async def get_number_commands( 

731 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

732) -> list[TwinPadActivity]: 

733 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

734 

735 

736@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

737async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

738 return EventRule.response_from_query(query) 

739 

740 

741@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

742async def get_event_rule(event_rule_id) -> EventRule: 

743 event_rule = EventRule.get_from_id(event_rule_id) 

744 if event_rule is None: 

745 raise HTTPException(status_code=404, detail="No such event rule") 

746 return event_rule 

747 

748 

749@app.post("/users", status_code=201) 

750async def create_user(user: User): 

751 if User.get_one_by_attribute("email", user.email) is not None: 

752 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

753 hashed_password = get_password_hash(user.password) 

754 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

755 if new_user is None: 

756 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

757 return new_user 

758 

759 

760@app.post("/token", status_code=201) 

761async def login_for_access_token( 

762 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

763) -> Token: 

764 user = authenticate_user(form_data.username, form_data.password) 

765 if not user: 

766 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

767 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

768 if user.is_active: 

769 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

770 access_token = create_access_token( 

771 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

772 ) 

773 return Token(access_token=access_token, token_type="bearer") 

774 

775 

776@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

777async def get_users(): 

778 return [u.to_dict() for u in User.get_all(sort_by="email")] 

779 

780 

781@app.get("/users/me") 

782async def get_current_user( 

783 current_user: Annotated[User, Depends(get_current_active_user)], 

784): 

785 return current_user.to_dict() 

786 

787 

788@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

789async def get_user(user_id: str): 

790 user = User.get_from_id(user_id) 

791 

792 if user is None: 

793 raise HTTPException( 

794 status_code=404, 

795 detail="User not found", 

796 ) 

797 return user.to_dict(exclude={"password", "is_connected"}) 

798 

799 

800@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

801async def patch_user(user: UserUpdate, user_id): 

802 if user.password == "" or user.password is None: 

803 del user.password 

804 else: 

805 user.password = get_password_hash(user.password) 

806 return User.update_info(user, user_id).to_dict() 

807 

808 

809@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

810async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

811 return Command.response_from_query(query) 

812 

813 

814@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

815async def get_campaigns(): 

816 return Campaign.get_all() 

817 

818 

819@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

820async def get_campaign_by_id(campaign_id: str): 

821 campaign = Campaign.get_from_id(campaign_id) 

822 if campaign is None: 

823 raise HTTPException(status_code=404, detail="Campaign not found") 

824 return campaign 

825 

826 

827@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

828async def add_campaign(campaign: Campaign): 

829 campaign_id = campaign.insert() 

830 if campaign_id is None: 

831 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

832 return campaign 

833 

834 

835@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

836async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

837 campaign = Campaign.get_from_id(campaign_id) 

838 if campaign is None: 

839 raise HTTPException(status_code=404, detail="Campaign not found") 

840 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json")) 

841 return campaign 

842 

843 

844@app.delete( 

845 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

846) 

847async def delete_campaign(campaign_id: str): 

848 campaign = Campaign.get_from_id(campaign_id) 

849 if campaign is None: 

850 raise HTTPException(status_code=404, detail="Campaign not found") 

851 delete_phases = Phase.deleteMany(campaign_id) 

852 if not delete_phases.acknowledged: 

853 raise HTTPException(status_code=500, detail="An error occurred during phases deletion") 

854 campaign_deleted = campaign.delete() 

855 if not campaign_deleted: 

856 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

857 return True 

858 

859 

860@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

861async def get_campaign_phases(campaign_id: str): 

862 return Phase.get_by_attribute("campaign_id", campaign_id) 

863 

864 

865@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

866async def get_phase(phase_id: str): 

867 phase = Phase.get_from_id(phase_id) 

868 if phase is None: 

869 raise HTTPException(status_code=404, detail="Phase not found") 

870 return phase 

871 

872 

873@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

874async def add_phase(phase: Phase): 

875 phase_id = phase.insert() 

876 if phase_id is None: 

877 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

878 return phase 

879 

880 

881@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

882async def edit_phase(phase_id, edit_phase: Phase): 

883 phase = Phase.get_from_id(phase_id) 

884 if phase is None: 

885 raise HTTPException(status_code=404, detail="Phase does not exists") 

886 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json")) 

887 return phase 

888 

889 

890@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

891async def delete_phase(phase_id: str): 

892 phase = Phase.get_from_id(phase_id) 

893 if phase is None: 

894 raise HTTPException(status_code=404, detail="Phase not found") 

895 deleted = phase.delete() 

896 if not deleted: 

897 raise HTTPException(status_code=500, detail="An error occurred during phase deletion") 

898 return True 

899 

900 

901@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

902async def get_custom_views(): 

903 return CustomView.get_all() 

904 

905 

906@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

907async def get_custom_views_from_user_id(user_id: str): 

908 return CustomView.get_by_attribute("user_id", user_id) 

909 

910 

911@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

912async def get_custom_view(custom_view_id: str): 

913 return CustomView.get_from_id(custom_view_id) 

914 

915 

916@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

917async def create_custom_view( 

918 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

919): 

920 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

921 custom_view.insert() 

922 return custom_view 

923 

924 

925@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

926async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

927 custom_view = CustomView.get_from_id(custom_view_id) 

928 return custom_view.update(custom_view_update.model_dump()) 

929 

930 

931@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

932async def delete_custom_view(custom_view_id: str): 

933 custom_view = CustomView.get_from_id(custom_view_id) 

934 return custom_view.delete() 

935 

936 

937@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

938async def add_video(video: Video): 

939 video.insert() 

940 if not video: 

941 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

942 return video 

943 

944 

945@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

946async def get_videos(): 

947 return Video.get_all() 

948 

949 

950@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

951def get_stream(video_id): 

952 camera_name = Video.get_video(video_id) 

953 if camera_name is None: 

954 raise HTTPException(status_code=404, detail="Camera not found") 

955 return camera_name 

956 

957 

958@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

959async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

960 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

961 

962 

963@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

964async def create_signals_preset( 

965 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

966): 

967 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

968 return new_signals_preset 

969 

970 

971@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

972async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

973 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

974 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

975 

976 

977@app.delete( 

978 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

979) 

980async def delete_signals_preset(signals_preset_id: str): 

981 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

982 return signals_preset.delete() 

983 

984 

985@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

986async def create_graph_theme( 

987 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

988): 

989 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

990 if styled_signal is None: 

991 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

992 

993 graph_theme = PrivateGraphTheme.create( 

994 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

995 ) 

996 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

997 

998 

999@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

1000async def get_all_graph_themes( 

1001 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

1002) -> ListResponse[PublicGraphTheme]: 

1003 return PublicGraphTheme.response_from_query(query, current_user.id) 

1004 

1005 

1006@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

1007async def get_graph_themes_in_library( 

1008 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

1009) -> ListResponse[PublicGraphTheme]: 

1010 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

1011 

1012 

1013@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

1014async def update_graph_theme( 

1015 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

1016): 

1017 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

1018 update_dict = theme_update.model_dump(exclude_unset=True) 

1019 if current_user.id != graph_theme.creator_id: 

1020 for theme_property in update_dict.keys(): 

1021 if theme_property not in ["active_for_user", "in_user_library"]: 

1022 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

1023 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

1024 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

1025 

1026 

1027@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

1028async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

1029 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

1030 if current_user.id != graph_theme.creator_id: 

1031 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

1032 return graph_theme.delete() 

1033 

1034 

1035@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

1036async def get_signals_appearances( 

1037 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

1038) -> dict: 

1039 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id) 

1040 

1041 

1042for prefix, router in routers.items(): 

1043 if prefix == "/device-deployers" and not DEVICE_DEPLOYERS: 

1044 continue 

1045 

1046 app.include_router(router, prefix=prefix)