Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / api.py: 95%

488 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:54 +0000

1import os 

2import logging 

3import time 

4from typing import Annotated 

5from datetime import timedelta 

6from pathlib import Path 

7from pyinstrument import Profiler 

8 

9from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

10from fastapi.middleware.cors import CORSMiddleware 

11from fastapi.responses import HTMLResponse 

12from fastapi.security import OAuth2PasswordRequestForm 

13 

14from twinpad_backend import __version__ 

15from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

16from twinpad_backend.models import ( 

17 DeviceId, 

18 MongoId, 

19 Signal, 

20 ForcedSignal, 

21 SignalData, 

22 SignalSample, 

23 ServicesStatus, 

24 Device, 

25 DeviceUpdate, 

26 DeviceSetup, 

27 DeviceSetupUpdate, 

28 DeviceState, 

29 SignalUpdate, 

30 SignalsData, 

31 Event, 

32 EventRule, 

33 TwinPadActivity, 

34 User, 

35 UserUpdate, 

36 Campaign, 

37 Phase, 

38 CustomView, 

39 Command, 

40 CustomViewCreation, 

41 CustomViewUpdate, 

42 Video, 

43 SignalsPreset, 

44 SignalsPresetCreation, 

45 SignalsPresetUpdate, 

46 PrivateGraphTheme, 

47 PublicGraphTheme, 

48 GraphThemeCreation, 

49 GraphThemeUpdate, 

50 SINGLE_POST_PROCESSING_FUNCTION, 

51 DOUBLE_POST_PROCESSING_FUNCTION, 

52 MULTIPLE_POST_PROCESSING_FUNCTION, 

53) 

54from twinpad_backend.auth import ( 

55 Token, 

56 authenticate_user, 

57 get_current_active_user, 

58 ACCESS_TOKEN_EXPIRE_MINUTES, 

59 create_access_token, 

60 get_password_hash, 

61) 

62from twinpad_backend.queries import ( 

63 SignalQuery, 

64 ForcedSignalQuery, 

65 DeviceStatesQuery, 

66 EventQuery, 

67 EventRuleQuery, 

68 CommandQuery, 

69 GraphThemeQuery, 

70) 

71from twinpad_backend.responses import ListResponse 

72from twinpad_backend.routes.deployers import router as deployers_router 

73 

74REQUEST_TIME_WARNING = 0.5 

75 

76DEBUG = os.environ.get("DEBUG", "false") == "true" 

77PROFILING = os.environ.get("PROFILING", "false") == "true" 

78DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true" 

79 

80logger = logging.getLogger("uvicorn.error") 

81logger.propagate = False 

82logger.info("Debug mode: %s", DEBUG) 

83logger.info("log level: %s", logging.root.level) 

84 

85 

86app = FastAPI(title="Twinpad backend", version=__version__) 

87 

88app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

89 

90if PROFILING: # pragma: no cover 

91 profiling_folder = "/tmp/twinpad_profiling" 

92 Path(profiling_folder).mkdir(parents=True, exist_ok=True) 

93 logger.info("Profiling enabled") 

94 

95 @app.middleware("http") 

96 async def profile_request(request: Request, call_next): 

97 should_profile = True 

98 url = str(request.url) 

99 for segment in ("profiling", ".ico"): 

100 if segment in url: 

101 should_profile = False 

102 break 

103 

104 if should_profile: # avoid recursion 

105 profiler = Profiler() 

106 profiler.start() 

107 

108 response = await call_next(request) 

109 

110 profiler.stop() 

111 url = "_".join(url.split("/")[3:]).rstrip("/") 

112 if not url: 

113 url = "slash" 

114 filename = url.split("?", maxsplit=1)[0] 

115 logger.info("saving profiling to %s", filename) 

116 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

117 profiling_file.write(profiler.output_html()) 

118 

119 return response 

120 

121 return await call_next(request) 

122 

123 @app.get("/profilings") 

124 async def profilings(): 

125 return {"profilings": os.listdir(profiling_folder)} 

126 

127 @app.get("/profilings/{file_name}") 

128 async def profiling(file_name): 

129 file_path = os.path.join(profiling_folder, file_name) 

130 

131 if not os.path.exists(file_path): 

132 raise HTTPException( 

133 status_code=404, 

134 detail=f"Profiling file '{file_name}' not found", 

135 ) 

136 

137 with open(file_path, "r", encoding="utf-8") as profiling_file: 

138 return Response( 

139 content=profiling_file.read(), 

140 media_type="application/html", 

141 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'}, 

142 ) 

143 

144 

145@app.middleware("http") 

146async def log_request_time(request: Request, call_next): 

147 start_time = time.time() # Record the start time 

148 response = await call_next(request) # Process the request 

149 duration = time.time() - start_time # Calculate the time taken 

150 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

151 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

152 if duration > REQUEST_TIME_WARNING: 

153 logger.warning(message) 

154 else: 

155 logger.info(message) 

156 return response 

157 

158 

159@app.get("/") 

160async def slash(): 

161 return {"twinpad_version": __version__} 

162 

163 

164@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

165async def status(): 

166 """ 

167 Return service healthcheck 

168 """ 

169 return { 

170 "services": ServicesStatus.check(), 

171 } 

172 

173 

174@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

175async def get_devices() -> list[Device]: 

176 return Device.get_all(sort_by="device_id") 

177 

178 

179@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

180async def get_device(device_id) -> Device: 

181 device = Device.get_one_by_attribute("device_id", device_id) 

182 if not device: 

183 raise HTTPException( 

184 status_code=404, 

185 detail="Device not found", 

186 ) 

187 return device 

188 

189 

190@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

191async def update_item( 

192 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

193): 

194 device = Device.get_one_by_attribute("device_id", device_id) 

195 if not device: 

196 raise HTTPException( 

197 status_code=404, 

198 detail="Device not found", 

199 ) 

200 result = await device.change_mode(device_update, current_user) 

201 if result.get("error", False) is True: 

202 raise HTTPException( 

203 status_code=result.get("status_code", 500), 

204 detail=result.get("message", "An error has occurred"), 

205 ) 

206 return result 

207 

208 

209@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

210async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

211 return DeviceState.get_from_id_and_query(device_id, query) 

212 

213 

214@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

215async def get_device_setups() -> list[DeviceSetup]: 

216 return DeviceSetup.get_all() 

217 

218 

219@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

220async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

221 device_setup.insert() 

222 return device_setup 

223 

224 

225@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

226async def get_device_setup(device_setup_id: str): 

227 device_setup = DeviceSetup.get_from_id(device_setup_id) 

228 if device_setup is None: 

229 raise HTTPException( 

230 status_code=404, 

231 detail="Device setup not found", 

232 ) 

233 return device_setup 

234 

235 

236@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

237async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

238 device_setup = DeviceSetup.get_from_id(device_setup_id) 

239 if device_setup is None: 

240 raise HTTPException( 

241 status_code=404, 

242 detail="Device setup not found", 

243 ) 

244 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

245 return device_setup 

246 

247 

248@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

249async def delete_device_setups(device_setup_id: str) -> bool: 

250 device_setup = DeviceSetup.get_from_id(device_setup_id) 

251 if device_setup is None: 

252 raise HTTPException( 

253 status_code=404, 

254 detail="Device setup not found", 

255 ) 

256 deleted = device_setup.delete() 

257 return deleted 

258 

259 

260@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

261async def get_number_samples( 

262 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

263) -> list[TwinPadActivity]: 

264 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

265 

266 

267@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

268async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

269 if "signal_id" not in query.sort_by: 

270 query.sort_by += ",signal_id:1" 

271 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

272 

273 

274@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

275async def signals_names() -> list[str]: 

276 return Signal.get_all_ids() 

277 

278 

279@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

280async def signal_stats(): 

281 """ 

282 Returns signals stats 

283 """ 

284 signal_statuses = Signal.get_all_statuses() 

285 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up") 

286 number_samples = Signal.total_number_samples() 

287 number_signals = Signal.get_number_documents() 

288 

289 return { 

290 "signal_data_size": signal_datasize(), 

291 "number_signal_samples": number_samples, 

292 "number_active_signals": number_active_signals, 

293 "number_signals": number_signals, 

294 } 

295 

296 

297@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

298async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

299 return SignalSample.get_last_from_signal_ids(signal_ids) 

300 

301 

302@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

303async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

304 return SignalSample.get_first_from_signal_ids(signal_ids) 

305 

306 

307@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

308async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

309 return Signal.get_forcibility(signal_ids) 

310 

311 

312@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)]) 

313async def get_forced_signals( 

314 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends() 

315): 

316 if not current_user.is_admin: 

317 raise HTTPException(401) 

318 return ForcedSignal.response_from_query(query) 

319 

320 

321@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

322async def get_signal(signal_id): 

323 signal = Signal.get_from_signal_id(signal_id) 

324 if not signal: 

325 raise HTTPException( 

326 status_code=404, 

327 detail="Signal not found", 

328 ) 

329 return signal.to_dict() 

330 

331 

332@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

333async def update_signal( 

334 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

335): 

336 signal = Signal.get_from_signal_id(signal_id) 

337 if not signal: 

338 raise HTTPException( 

339 status_code=404, 

340 detail="Device not found", 

341 ) 

342 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id) 

343 if forced_signal is not None: 

344 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

345 raise HTTPException( 

346 status_code=403, 

347 detail="Cannot override another user's forcing", 

348 ) 

349 

350 result = await signal.send_command(signal_update, current_user) 

351 if result.get("error", False) is True: 

352 raise HTTPException( 

353 status_code=result.get("status_code", 500), 

354 detail=result.get("message", "An error has occurred"), 

355 ) 

356 

357 if forced_signal is not None and signal_update.forced_value is None: 

358 forced_signal.delete() 

359 elif signal_update.forced_value is not None: 

360 forced_signal = ForcedSignal( 

361 signal_id=signal_id, 

362 forcing_user_id=current_user.id, 

363 forced_at=time.time(), 

364 value=signal_update.forced_value, 

365 ) 

366 forced_signal.insert() 

367 

368 return result 

369 

370 

371@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)]) 

372async def get_signal_forcibility( 

373 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)] 

374) -> bool: 

375 return ForcedSignal.can_force(signal_id, current_user) 

376 

377 

378@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

379async def get_signal_data( 

380 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

381) -> SignalData | None: 

382 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

383 

384 if number_samples_max is not None: 

385 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

386 

387 return signal_data 

388 

389 

390@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

391async def get_last_value(signal_id) -> SignalSample: 

392 sample = SignalSample.get_last_from_signal_id(signal_id) 

393 if sample is None: 

394 raise HTTPException(status_code=404, detail="No data") 

395 return sample 

396 

397 

398@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

399async def get_first_value(signal_id) -> SignalSample: 

400 sample = SignalSample.get_first_from_signal_id(signal_id) 

401 if sample is None: 

402 raise HTTPException(status_code=404, detail="No data") 

403 return sample 

404 

405 

406@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

407async def get_signal_number_samples(signal_id): 

408 signal = Signal.get_from_signal_id(signal_id) 

409 if not signal: 

410 raise HTTPException( 

411 status_code=404, 

412 detail="Device not found", 

413 ) 

414 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

415 

416 

417@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

418async def get_signals_data( 

419 signal_ids: list[str] = Query(default=[]), 

420 number_samples_max: int = None, 

421 min_timestamp: float = None, 

422 max_timestamp: float = None, 

423 interpolate_bounds: bool = True, 

424) -> SignalsData | None: 

425 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

426 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

427 

428 signals_data = SignalsData.get_from_signal_ids( 

429 signal_ids, 

430 min_timestamp=min_timestamp, 

431 max_timestamp=max_timestamp, 

432 window_min_timestamp=min_timestamp, 

433 window_max_timestamp=max_timestamp, 

434 interpolate_bounds=interpolate_bounds, 

435 ) 

436 if number_samples_max is not None: 

437 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

438 

439 return signals_data 

440 

441 

442@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

443async def get_signals_data_interest_window( 

444 window_max_number_samples: int | None = None, 

445 outside_max_number_samples: int | None = None, 

446 window_min_timestamp: float = None, 

447 window_max_timestamp: float = None, 

448 signal_ids: list[str] = Query(default=[]), 

449 min_timestamp: float = None, 

450 max_timestamp: float = None, 

451) -> SignalsData | None: 

452 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

453 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

454 

455 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

456 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

457 

458 max_documents = 0 

459 

460 if window_max_number_samples is not None: 

461 max_documents += 10 * window_max_number_samples 

462 if outside_max_number_samples is not None: 

463 max_documents += 10 * outside_max_number_samples 

464 

465 if max_documents == 0: 

466 max_documents = None 

467 

468 signals_data = SignalsData.get_from_signal_ids( 

469 signal_ids, 

470 min_timestamp=min_timestamp, 

471 max_timestamp=max_timestamp, 

472 window_min_timestamp=window_min_timestamp, 

473 window_max_timestamp=window_max_timestamp, 

474 max_documents=max_documents, 

475 ) 

476 

477 signals_data = signals_data.interest_window_desampling( 

478 window_max_number_samples=window_max_number_samples, 

479 outside_max_number_samples=outside_max_number_samples, 

480 window_min_timestamp=window_min_timestamp, 

481 window_max_timestamp=window_max_timestamp, 

482 ) 

483 

484 return signals_data 

485 

486 

487@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

488async def export_signals_zip( 

489 file_format: str, 

490 signal_ids: list[str] = Query(default=[]), 

491 min_timestamp: float = None, 

492 max_timestamp: float = None, 

493): 

494 signals_data = SignalsData.get_from_signal_ids( 

495 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

496 ) 

497 zip_data = signals_data.zip_export(file_format) 

498 return Response( 

499 content=zip_data, 

500 media_type="application/zip", 

501 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

502 ) 

503 

504 

505@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

506async def export_signals_hdf5( 

507 signal_ids: list[str] = Query(default=[]), 

508 min_timestamp: float = None, 

509 max_timestamp: float = None, 

510): 

511 signals_data = SignalsData.get_from_signal_ids( 

512 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

513 ) 

514 data = signals_data.hdf5_export() 

515 return Response( 

516 content=data, 

517 media_type="application/hdf5", 

518 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

519 ) 

520 

521 

522@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)]) 

523async def get_signals_data_post_processing( 

524 phase_ids: list[str] = Query(default=[]), 

525 phase_sync_times: list[float | None] = Query(default=[]), 

526 signal_ids: list[str] = Query(default=[]), 

527 window_min_timestamps: list[float | None] = Query(default=[]), 

528 window_max_timestamps: list[float | None] = Query(default=[]), 

529 number_samples_max: int = None, 

530) -> SignalsData | None: 

531 if len(phase_sync_times) == 0: 

532 phase_sync_times = [None for _ in range(len(phase_ids))] 

533 if len(window_min_timestamps) == 0: 

534 window_min_timestamps = [None for _ in range(len(phase_ids))] 

535 if len(window_max_timestamps) == 0: 

536 window_max_timestamps = [None for _ in range(len(phase_ids))] 

537 

538 if ( 

539 len(phase_ids) != len(phase_sync_times) 

540 or len(phase_ids) != len(window_min_timestamps) 

541 or len(phase_ids) != len(window_max_timestamps) 

542 ): 

543 raise HTTPException( 

544 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps." 

545 ) 

546 

547 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

548 

549 if None in phases: 

550 raise HTTPException(404, "Phase not found") 

551 

552 signals_data = SignalsData.get_from_phase_and_signal_ids( 

553 phases=phases, 

554 phase_sync_times=phase_sync_times, 

555 signal_ids=signal_ids, 

556 window_min_timestamps=window_min_timestamps, 

557 window_max_timestamps=window_max_timestamps, 

558 ) 

559 

560 if number_samples_max is not None: 

561 signals_data = signals_data.min_max_downsampling(number_samples_max) 

562 

563 return signals_data 

564 

565 

566@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)]) 

567async def apply_single_post_processing_function( 

568 phase_id: str, 

569 base_signal_id: str, 

570 function: SINGLE_POST_PROCESSING_FUNCTION, 

571 phase_sync_time: float = None, 

572 window_min_timestamp: float = None, 

573 window_max_timestamp: float = None, 

574 number_samples_max: int = None, 

575) -> SignalsData | None: 

576 phase = Phase.get_from_id(phase_id) 

577 

578 if phase is None: 

579 raise HTTPException(404, "Phase not found") 

580 if phase_sync_time is None: 

581 phase_sync_time = phase.start_at / 1000 

582 

583 signals_data = await SignalsData.apply_single_function( 

584 phase, 

585 base_signal_id, 

586 function, 

587 window_min_timestamp=window_min_timestamp, 

588 window_max_timestamp=window_max_timestamp, 

589 ) 

590 

591 if signals_data is None: 

592 raise HTTPException(500, "There was en error while applying the function") 

593 

594 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

595 signals_data = signals_data.min_max_downsampling(number_samples_max) 

596 signals_data = signals_data.zero_time_vector(phase_sync_time) 

597 

598 return signals_data 

599 

600 

601@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)]) 

602async def apply_multiple_post_processing_function( 

603 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

604 phase_ids: list[str] = Query(default=[]), 

605 phase_sync_times: list[float] = Query(default=[]), 

606 signal_ids: list[str] = Query(default=[]), 

607 window_min_timestamp: float = None, 

608 window_max_timestamp: float = None, 

609 number_samples_max: int = None, 

610) -> SignalsData | None: 

611 if len(phase_ids) != len(signal_ids): 

612 raise HTTPException(400, "Each selected signal should correspond to a phase") 

613 

614 if len(phase_ids) < 2: 

615 raise HTTPException(400, "These functions can only be applied to multiple signals") 

616 

617 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

618 if None in phases: 

619 raise HTTPException(404, "Phase not found") 

620 

621 if len(phase_sync_times) == 0: 

622 phase_sync_times = [phase.start_at / 1000 for phase in phases] 

623 if len(phases) != len(phase_sync_times): 

624 raise HTTPException(400, "Number of synchronization times does not match the number of phases") 

625 

626 signals_data = await SignalsData.apply_multiple_function( 

627 phases, 

628 signal_ids, 

629 function, 

630 window_min_timestamp=window_min_timestamp, 

631 window_max_timestamp=window_max_timestamp, 

632 ) 

633 

634 if signals_data is None: 

635 raise HTTPException(500, "There was en error while applying the function") 

636 

637 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

638 signals_data = signals_data.min_max_downsampling(number_samples_max) 

639 if function in {"Align-X", "Using-X"}: 

640 signals_data = signals_data.zero_time_vector(phase_sync_times[1]) 

641 else: 

642 signals_data = signals_data.zero_time_vector(phase_sync_times[0]) 

643 

644 return signals_data 

645 

646 

647@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)]) 

648async def export_post_processing_zip( 

649 file_format: str, 

650 phase_ids: list[MongoId] = Query(default=[]), 

651 phase_sync_times: list[float | None] = Query(default=[]), 

652 signal_ids: list[str] = Query(default=[]), 

653 window_min_timestamps: list[float | None] = Query(default=[]), 

654 window_max_timestamps: list[float | None] = Query(default=[]), 

655): 

656 signals_data = await get_signals_data_post_processing( 

657 phase_ids, 

658 phase_sync_times, 

659 signal_ids, 

660 window_min_timestamps, 

661 window_max_timestamps, 

662 ) 

663 

664 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids) 

665 

666 return Response( 

667 content=zip_data, 

668 media_type="application/zip", 

669 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

670 ) 

671 

672 

673@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

674async def export_post_processing_hdf5( 

675 phase_ids: list[str] = Query(default=[]), 

676 phase_sync_times: list[float | None] = Query(default=[]), 

677 signal_ids: list[str] = Query(default=[]), 

678 window_min_timestamps: list[float | None] = Query(default=[]), 

679 window_max_timestamps: list[float | None] = Query(default=[]), 

680): 

681 signals_data = await get_signals_data_post_processing( 

682 phase_ids, 

683 phase_sync_times, 

684 signal_ids, 

685 window_min_timestamps, 

686 window_max_timestamps, 

687 ) 

688 

689 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids) 

690 

691 return Response( 

692 content=data, 

693 media_type="application/hdf5", 

694 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

695 ) 

696 

697 

698@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

699async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

700 return Event.response_from_query(query) 

701 

702 

703@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

704async def get_event(event_id) -> Event: 

705 event = Event.get_from_id(event_id) 

706 if event is None: 

707 raise HTTPException(status_code=404, detail="No such event") 

708 return event 

709 

710 

711@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

712async def get_number_events( 

713 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

714) -> list[TwinPadActivity]: 

715 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

716 

717 

718@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

719async def get_number_commands( 

720 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

721) -> list[TwinPadActivity]: 

722 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

723 

724 

725@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

726async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

727 return EventRule.response_from_query(query) 

728 

729 

730@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

731async def get_event_rule(event_rule_id) -> EventRule: 

732 event_rule = EventRule.get_from_id(event_rule_id) 

733 if event_rule is None: 

734 raise HTTPException(status_code=404, detail="No such event rule") 

735 return event_rule 

736 

737 

738@app.post("/users", status_code=201) 

739async def create_user(user: User): 

740 if User.get_one_by_attribute("email", user.email) is not None: 

741 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

742 hashed_password = get_password_hash(user.password) 

743 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

744 if new_user is None: 

745 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

746 return new_user 

747 

748 

749@app.post("/token", status_code=201) 

750async def login_for_access_token( 

751 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

752) -> Token: 

753 user = authenticate_user(form_data.username, form_data.password) 

754 if not user: 

755 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

756 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

757 if user.is_active: 

758 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

759 access_token = create_access_token( 

760 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

761 ) 

762 return Token(access_token=access_token, token_type="bearer") 

763 

764 

765@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

766async def get_users(): 

767 return [u.to_dict() for u in User.get_all(sort_by="email")] 

768 

769 

770@app.get("/users/me") 

771async def get_current_user( 

772 current_user: Annotated[User, Depends(get_current_active_user)], 

773): 

774 return current_user.to_dict() 

775 

776 

777@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

778async def get_user(user_id: str): 

779 user = User.get_from_id(user_id) 

780 

781 if user is None: 

782 raise HTTPException( 

783 status_code=404, 

784 detail="User not found", 

785 ) 

786 return user.to_dict(exclude={"password", "is_connected"}) 

787 

788 

789@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

790async def patch_user(user: UserUpdate, user_id): 

791 if user.password == "" or user.password is None: 

792 del user.password 

793 else: 

794 user.password = get_password_hash(user.password) 

795 return User.update_info(user, user_id).to_dict() 

796 

797 

798@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

799async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

800 return Command.response_from_query(query) 

801 

802 

803@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

804async def get_campaigns(): 

805 return Campaign.get_all() 

806 

807 

808@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

809async def get_campaign_by_id(campaign_id: str): 

810 campaign = Campaign.get_from_id(campaign_id) 

811 if campaign is None: 

812 raise HTTPException(status_code=404, detail="Campaign not found") 

813 return campaign 

814 

815 

816@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

817async def add_campaign(campaign: Campaign): 

818 campaign_id = campaign.insert() 

819 if campaign_id is None: 

820 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

821 return campaign 

822 

823 

824@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

825async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

826 campaign = Campaign.get_from_id(campaign_id) 

827 if campaign is None: 

828 raise HTTPException(status_code=404, detail="Campaign not found") 

829 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json")) 

830 return campaign 

831 

832 

833@app.delete( 

834 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

835) 

836async def delete_campaign(campaign_id: str): 

837 campaign = Campaign.get_from_id(campaign_id) 

838 if campaign is None: 

839 raise HTTPException(status_code=404, detail="Campaign not found") 

840 delete_phases = Phase.deleteMany(campaign_id) 

841 if not delete_phases.acknowledged: 

842 raise HTTPException(status_code=500, detail="An error occurred during phases deletion") 

843 campaign_deleted = campaign.delete() 

844 if not campaign_deleted: 

845 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

846 return True 

847 

848 

849@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

850async def get_campaign_phases(campaign_id: str): 

851 return Phase.get_by_attribute("campaign_id", campaign_id) 

852 

853 

854@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

855async def get_phase(phase_id: str): 

856 phase = Phase.get_from_id(phase_id) 

857 if phase is None: 

858 raise HTTPException(status_code=404, detail="Phase not found") 

859 return phase 

860 

861 

862@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

863async def add_phase(phase: Phase): 

864 phase_id = phase.insert() 

865 if phase_id is None: 

866 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

867 return phase 

868 

869 

870@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

871async def edit_phase(phase_id, edit_phase: Phase): 

872 phase = Phase.get_from_id(phase_id) 

873 if phase is None: 

874 raise HTTPException(status_code=404, detail="Phase does not exists") 

875 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json")) 

876 return phase 

877 

878 

879@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

880async def delete_phase(phase_id: str): 

881 phase = Phase.get_from_id(phase_id) 

882 if phase is None: 

883 raise HTTPException(status_code=404, detail="Phase not found") 

884 deleted = phase.delete() 

885 if not deleted: 

886 raise HTTPException(status_code=500, detail="An error occurred during phase deletion") 

887 return True 

888 

889 

890@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

891async def get_custom_views(): 

892 return CustomView.get_all() 

893 

894 

895@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

896async def get_custom_views_from_user_id(user_id: str): 

897 return CustomView.get_by_attribute("user_id", user_id) 

898 

899 

900@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

901async def get_custom_view(custom_view_id: str): 

902 return CustomView.get_from_id(custom_view_id) 

903 

904 

905@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

906async def create_custom_view( 

907 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

908): 

909 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

910 custom_view.insert() 

911 return custom_view 

912 

913 

914@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

915async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

916 custom_view = CustomView.get_from_id(custom_view_id) 

917 return custom_view.update(custom_view_update.model_dump()) 

918 

919 

920@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

921async def delete_custom_view(custom_view_id: str): 

922 custom_view = CustomView.get_from_id(custom_view_id) 

923 return custom_view.delete() 

924 

925 

926@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

927async def add_video(video: Video): 

928 video.insert() 

929 if not video: 

930 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

931 return video 

932 

933 

934@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

935async def get_videos(): 

936 return Video.get_all() 

937 

938 

939@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

940def get_stream(video_id): 

941 camera_name = Video.get_video(video_id) 

942 if camera_name is None: 

943 raise HTTPException(status_code=404, detail="Camera not found") 

944 return camera_name 

945 

946 

947@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

948async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

949 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

950 

951 

952@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

953async def create_signals_preset( 

954 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

955): 

956 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

957 return new_signals_preset 

958 

959 

960@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

961async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

962 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

963 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

964 

965 

966@app.delete( 

967 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

968) 

969async def delete_signals_preset(signals_preset_id: str): 

970 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

971 return signals_preset.delete() 

972 

973 

974@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

975async def create_graph_theme( 

976 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

977): 

978 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

979 if styled_signal is None: 

980 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

981 

982 graph_theme = PrivateGraphTheme.create( 

983 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

984 ) 

985 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

986 

987 

988@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

989async def get_all_graph_themes( 

990 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

991) -> ListResponse[PublicGraphTheme]: 

992 return PublicGraphTheme.response_from_query(query, current_user.id) 

993 

994 

995@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

996async def get_graph_themes_in_library( 

997 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

998) -> ListResponse[PublicGraphTheme]: 

999 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

1000 

1001 

1002@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

1003async def update_graph_theme( 

1004 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

1005): 

1006 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

1007 update_dict = theme_update.model_dump(exclude_unset=True) 

1008 if current_user.id != graph_theme.creator_id: 

1009 for theme_property in update_dict.keys(): 

1010 if theme_property not in ["active_for_user", "in_user_library"]: 

1011 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

1012 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

1013 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

1014 

1015 

1016@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

1017async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

1018 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

1019 if current_user.id != graph_theme.creator_id: 

1020 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

1021 return graph_theme.delete() 

1022 

1023 

1024@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

1025async def get_signals_appearances( 

1026 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

1027) -> dict: 

1028 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id) 

1029 

1030 

1031if DEVICE_DEPLOYERS: 

1032 app.include_router(deployers_router, prefix="/device-deployers")