Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / api.py: 99%

476 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-25 12:17 +0000

1import os 

2import logging 

3from pathlib import Path 

4import time 

5from typing import Annotated 

6from datetime import timedelta 

7from pyinstrument import Profiler 

8 

9from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

10from fastapi.middleware.cors import CORSMiddleware 

11from fastapi.security import OAuth2PasswordRequestForm 

12 

13from twinpad_backend import __version__ 

14from twinpad_backend.db import signal_datasize 

15from twinpad_backend.models import ( 

16 DeviceId, 

17 MongoId, 

18 Signal, 

19 ForcedSignal, 

20 SignalData, 

21 SignalSample, 

22 ServicesStatus, 

23 Device, 

24 DeviceUpdate, 

25 DeviceSetup, 

26 DeviceSetupUpdate, 

27 DeviceState, 

28 SignalUpdate, 

29 SignalsData, 

30 Event, 

31 EventRule, 

32 TwinPadActivity, 

33 User, 

34 UserUpdate, 

35 Campaign, 

36 Phase, 

37 CustomView, 

38 Command, 

39 CustomViewCreation, 

40 CustomViewUpdate, 

41 Video, 

42 SignalsPreset, 

43 SignalsPresetCreation, 

44 SignalsPresetUpdate, 

45 PrivateGraphTheme, 

46 PublicGraphTheme, 

47 GraphThemeCreation, 

48 GraphThemeUpdate, 

49 SINGLE_POST_PROCESSING_FUNCTION, 

50 DOUBLE_POST_PROCESSING_FUNCTION, 

51 MULTIPLE_POST_PROCESSING_FUNCTION, 

52) 

53from twinpad_backend.auth import ( 

54 Token, 

55 authenticate_user, 

56 get_current_active_user, 

57 ACCESS_TOKEN_EXPIRE_MINUTES, 

58 create_access_token, 

59 get_password_hash, 

60) 

61from twinpad_backend.queries import ( 

62 SignalQuery, 

63 ForcedSignalQuery, 

64 DeviceStatesQuery, 

65 EventQuery, 

66 EventRuleQuery, 

67 CommandQuery, 

68 GraphThemeQuery, 

69) 

70from twinpad_backend.responses import ListResponse 

71from twinpad_backend.routes.deployers import router as deployers_router 

72 

73REQUEST_TIME_WARNING = 0.5 

74 

75DEBUG = os.environ.get("DEBUG", "false") == "true" 

76PROFILING = os.environ.get("PROFILING", "false") == "true" 

77DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true" 

78 

79logger = logging.getLogger("uvicorn.error") 

80logger.propagate = False 

81logger.info("Debug mode: %s", DEBUG) 

82logger.info("log level: %s", logging.root.level) 

83 

84 

85app = FastAPI(title="Twinpad backend", version=__version__) 

86 

87app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

88 

89if PROFILING: # pragma: no cover 

90 profiling_folder = "/tmp/twinpad_profiling" 

91 Path(profiling_folder).mkdir(parents=True, exist_ok=True) 

92 logger.info("Profiling enabled") 

93 

94 @app.middleware("http") 

95 async def profile_request(request: Request, call_next): 

96 should_profile = True 

97 url = str(request.url) 

98 for segment in ("profiling", ".ico"): 

99 if segment in url: 

100 should_profile = False 

101 break 

102 

103 if should_profile: # avoid recursion 

104 profiler = Profiler() 

105 profiler.start() 

106 

107 response = await call_next(request) 

108 

109 profiler.stop() 

110 url = "_".join(url.split("/")[3:]).rstrip("/") 

111 if not url: 

112 url = "slash" 

113 filename = url.split("?", maxsplit=1)[0] 

114 logger.info("saving profiling to %s", filename) 

115 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

116 profiling_file.write(profiler.output_html()) 

117 

118 return response 

119 

120 return await call_next(request) 

121 

122 @app.get("/profilings") 

123 async def profilings(): 

124 return {"profilings": os.listdir(profiling_folder)} 

125 

126 @app.get("/profilings/{file_name}") 

127 async def profiling(file_name): 

128 file_path = os.path.join(profiling_folder, file_name) 

129 

130 if not os.path.exists(file_path): 

131 raise HTTPException( 

132 status_code=404, 

133 detail=f"Profiling file '{file_name}' not found", 

134 ) 

135 

136 with open(file_path, "r", encoding="utf-8") as profiling_file: 

137 return Response( 

138 content=profiling_file.read(), 

139 media_type="application/html", 

140 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'}, 

141 ) 

142 

143 

144@app.middleware("http") 

145async def log_request_time(request: Request, call_next): 

146 start_time = time.time() # Record the start time 

147 response = await call_next(request) # Process the request 

148 duration = time.time() - start_time # Calculate the time taken 

149 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

150 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

151 if duration > REQUEST_TIME_WARNING: 

152 logger.warning(message) 

153 else: 

154 logger.info(message) 

155 return response 

156 

157 

158@app.get("/") 

159async def slash(): 

160 return {"twinpad_version": __version__} 

161 

162 

163@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

164async def status(): 

165 """ 

166 Return service healthcheck 

167 """ 

168 return { 

169 "services": ServicesStatus.check(), 

170 "timestamp": time.time(), 

171 } 

172 

173 

174@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

175async def get_devices() -> list[Device]: 

176 return Device.get_all(sort_by="device_id") 

177 

178 

179@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

180async def get_device(device_id) -> Device: 

181 device = Device.get_one_by_attribute("device_id", device_id) 

182 if not device: 

183 raise HTTPException( 

184 status_code=404, 

185 detail="Device not found", 

186 ) 

187 return device 

188 

189 

190@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

191async def update_item( 

192 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

193): 

194 device = Device.get_one_by_attribute("device_id", device_id) 

195 if not device: 

196 raise HTTPException( 

197 status_code=404, 

198 detail="Device not found", 

199 ) 

200 result = await device.change_mode(device_update, current_user) 

201 if result.get("error", False) is True: 

202 raise HTTPException( 

203 status_code=result.get("status_code", 500), 

204 detail=result.get("message", "An error has occurred"), 

205 ) 

206 return result 

207 

208 

209@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

210async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

211 return DeviceState.get_from_id_and_query(device_id, query) 

212 

213 

214@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

215async def get_device_setups() -> list[DeviceSetup]: 

216 return DeviceSetup.get_all() 

217 

218 

219@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

220async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

221 device_setup.insert() 

222 return device_setup 

223 

224 

225@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

226async def get_device_setup(device_setup_id: str): 

227 device_setup = DeviceSetup.get_from_id(device_setup_id) 

228 if device_setup is None: 

229 raise HTTPException( 

230 status_code=404, 

231 detail="Device setup not found", 

232 ) 

233 return device_setup 

234 

235 

236@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

237async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

238 device_setup = DeviceSetup.get_from_id(device_setup_id) 

239 if device_setup is None: 

240 raise HTTPException( 

241 status_code=404, 

242 detail="Device setup not found", 

243 ) 

244 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

245 return device_setup 

246 

247 

248@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

249async def delete_device_setups(device_setup_id: str) -> bool: 

250 device_setup = DeviceSetup.get_from_id(device_setup_id) 

251 if device_setup is None: 

252 raise HTTPException( 

253 status_code=404, 

254 detail="Device setup not found", 

255 ) 

256 deleted = device_setup.delete() 

257 return deleted 

258 

259 

260@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

261async def get_number_samples( 

262 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

263) -> list[TwinPadActivity]: 

264 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

265 

266 

267@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

268async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

269 if "signal_id" not in query.sort_by: 

270 query.sort_by += ",signal_id:1" 

271 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

272 

273 

274@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

275async def signals_names() -> list[str]: 

276 return Signal.get_all_ids() 

277 

278 

279@app.get("/signals/statuses", dependencies=[Depends(get_current_active_user)]) 

280async def signals_statuses() -> list[dict[str, str]]: 

281 return Signal.get_all_statuses() 

282 

283 

284@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

285async def signal_stats(): 

286 """ 

287 Returns signals stats 

288 """ 

289 signal_statuses = Signal.get_all_statuses() 

290 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up") 

291 number_samples = Signal.total_number_samples() 

292 number_signals = Signal.get_number_documents() 

293 

294 return { 

295 "signal_data_size": signal_datasize(), 

296 "number_signal_samples": number_samples, 

297 "number_active_signals": number_active_signals, 

298 "number_signals": number_signals, 

299 } 

300 

301 

302@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

303async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

304 return SignalSample.get_last_from_signal_ids(signal_ids) 

305 

306 

307@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

308async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

309 return SignalSample.get_first_from_signal_ids(signal_ids) 

310 

311 

312@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

313async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

314 return Signal.get_forcibility(signal_ids) 

315 

316 

317@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)]) 

318async def get_forced_signals( 

319 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends() 

320): 

321 if not current_user.is_admin: 

322 raise HTTPException(401) 

323 return ForcedSignal.response_from_query(query) 

324 

325 

326@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

327async def get_signal(signal_id): 

328 signal = Signal.get_from_signal_id(signal_id) 

329 if not signal: 

330 raise HTTPException( 

331 status_code=404, 

332 detail="Signal not found", 

333 ) 

334 return signal.to_dict() 

335 

336 

337@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

338async def update_signal( 

339 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

340): 

341 signal = Signal.get_from_signal_id(signal_id) 

342 if not signal: 

343 raise HTTPException( 

344 status_code=404, 

345 detail="Device not found", 

346 ) 

347 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id) 

348 if forced_signal is not None: 

349 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

350 raise HTTPException( 

351 status_code=403, 

352 detail="Cannot override another user's forcing", 

353 ) 

354 

355 result = await signal.send_command(signal_update, current_user) 

356 if result.get("error", False) is True: 

357 raise HTTPException( 

358 status_code=result.get("status_code", 500), 

359 detail=result.get("message", "An error has occurred"), 

360 ) 

361 

362 if forced_signal is not None and signal_update.forced_value is None: 

363 forced_signal.delete() 

364 elif signal_update.forced_value is not None: 

365 forced_signal = ForcedSignal( 

366 signal_id=signal_id, 

367 forcing_user_id=current_user.id, 

368 forced_at=time.time(), 

369 value=signal_update.forced_value, 

370 ) 

371 forced_signal.insert() 

372 

373 return result 

374 

375 

376@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)]) 

377async def get_signal_forcibility( 

378 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)] 

379) -> bool: 

380 return ForcedSignal.can_force(signal_id, current_user) 

381 

382 

383@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

384async def get_signal_data( 

385 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

386) -> SignalData | None: 

387 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

388 

389 if number_samples_max is not None: 

390 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

391 

392 return signal_data 

393 

394 

395@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

396async def get_last_value(signal_id) -> SignalSample: 

397 sample = SignalSample.get_last_from_signal_id(signal_id) 

398 if sample is None: 

399 raise HTTPException(status_code=404, detail="No data") 

400 return sample 

401 

402 

403@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

404async def get_first_value(signal_id) -> SignalSample: 

405 sample = SignalSample.get_first_from_signal_id(signal_id) 

406 if sample is None: 

407 raise HTTPException(status_code=404, detail="No data") 

408 return sample 

409 

410 

411@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

412async def get_signal_number_samples(signal_id): 

413 signal = Signal.get_from_signal_id(signal_id) 

414 if not signal: 

415 raise HTTPException( 

416 status_code=404, 

417 detail="Signal not found", 

418 ) 

419 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

420 

421 

422@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

423async def get_signals_data( 

424 signal_ids: list[str] = Query(default=[]), 

425 number_samples_max: int = None, 

426 min_timestamp: float = None, 

427 max_timestamp: float = None, 

428 interpolate_bounds: bool = True, 

429) -> SignalsData | None: 

430 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

431 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

432 

433 signals_data = SignalsData.get_from_signal_ids( 

434 signal_ids, 

435 min_timestamp=min_timestamp, 

436 max_timestamp=max_timestamp, 

437 window_min_timestamp=min_timestamp, 

438 window_max_timestamp=max_timestamp, 

439 interpolate_bounds=interpolate_bounds, 

440 ) 

441 if number_samples_max is not None: 

442 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

443 

444 return signals_data 

445 

446 

447@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

448async def get_signals_data_interest_window( 

449 signal_ids: list[str] = Query(default=[]), 

450 window_max_number_samples: int = None, 

451 outside_max_number_samples: int = None, 

452 window_min_timestamp: float = None, 

453 window_max_timestamp: float = None, 

454 min_timestamp: float = None, 

455 max_timestamp: float = None, 

456) -> SignalsData | None: 

457 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

458 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

459 

460 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

461 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

462 

463 max_documents = 0 

464 

465 if window_max_number_samples is not None: 

466 max_documents += 10 * window_max_number_samples 

467 if outside_max_number_samples is not None: 

468 max_documents += 10 * outside_max_number_samples 

469 

470 if max_documents == 0: 

471 max_documents = None 

472 

473 signals_data = SignalsData.get_from_signal_ids( 

474 signal_ids, 

475 min_timestamp=min_timestamp, 

476 max_timestamp=max_timestamp, 

477 window_min_timestamp=window_min_timestamp, 

478 window_max_timestamp=window_max_timestamp, 

479 max_documents=max_documents, 

480 ) 

481 

482 signals_data = signals_data.interest_window_desampling( 

483 window_max_number_samples=window_max_number_samples, 

484 outside_max_number_samples=outside_max_number_samples, 

485 window_min_timestamp=window_min_timestamp, 

486 window_max_timestamp=window_max_timestamp, 

487 ) 

488 

489 return signals_data 

490 

491 

492@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

493async def export_signals_zip( 

494 file_format: str, 

495 signal_ids: list[str] = Query(default=[]), 

496 min_timestamp: float = None, 

497 max_timestamp: float = None, 

498): 

499 signals_data = SignalsData.get_from_signal_ids( 

500 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

501 ) 

502 zip_data = signals_data.zip_export(file_format) 

503 return Response( 

504 content=zip_data, 

505 media_type="application/zip", 

506 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

507 ) 

508 

509 

510@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

511async def export_signals_hdf5( 

512 signal_ids: list[str] = Query(default=[]), 

513 min_timestamp: float = None, 

514 max_timestamp: float = None, 

515): 

516 signals_data = SignalsData.get_from_signal_ids( 

517 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

518 ) 

519 data = signals_data.hdf5_export() 

520 return Response( 

521 content=data, 

522 media_type="application/hdf5", 

523 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

524 ) 

525 

526 

527@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)]) 

528async def get_signals_data_post_processing( 

529 phase_ids: list[str] = Query(default=[]), 

530 phase_sync_times: list[float | None] = Query(default=[]), 

531 signal_ids: list[str] = Query(default=[]), 

532 window_min_timestamps: list[float | None] = Query(default=[]), 

533 window_max_timestamps: list[float | None] = Query(default=[]), 

534 number_samples_max: int = None, 

535) -> SignalsData | None: 

536 if len(phase_sync_times) == 0: 

537 phase_sync_times = [None for _ in range(len(phase_ids))] 

538 if len(window_min_timestamps) == 0: 

539 window_min_timestamps = [None for _ in range(len(phase_ids))] 

540 if len(window_max_timestamps) == 0: 

541 window_max_timestamps = [None for _ in range(len(phase_ids))] 

542 

543 if ( 

544 len(phase_ids) != len(phase_sync_times) 

545 or len(phase_ids) != len(window_min_timestamps) 

546 or len(phase_ids) != len(window_max_timestamps) 

547 ): 

548 raise HTTPException( 

549 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps." 

550 ) 

551 

552 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

553 

554 if None in phases: 

555 raise HTTPException(404, "Phase not found") 

556 

557 signals_data = SignalsData.get_from_phase_and_signal_ids( 

558 phases=phases, 

559 phase_sync_times=phase_sync_times, 

560 signal_ids=signal_ids, 

561 window_min_timestamps=window_min_timestamps, 

562 window_max_timestamps=window_max_timestamps, 

563 ) 

564 

565 if number_samples_max is not None: 

566 signals_data = signals_data.min_max_downsampling(number_samples_max) 

567 

568 return signals_data 

569 

570 

571@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)]) 

572async def apply_single_post_processing_function( 

573 phase_id: str, 

574 base_signal_id: str, 

575 function: SINGLE_POST_PROCESSING_FUNCTION, 

576 phase_sync_time: float = None, 

577 window_min_timestamp: float = None, 

578 window_max_timestamp: float = None, 

579 number_samples_max: int = None, 

580) -> SignalsData | None: 

581 phase = Phase.get_from_id(phase_id) 

582 

583 if phase is None: 

584 raise HTTPException(404, "Phase not found") 

585 if phase_sync_time is None: 

586 phase_sync_time = phase.start_at / 1000 

587 

588 signals_data = await SignalsData.apply_single_function( 

589 phase, 

590 base_signal_id, 

591 function, 

592 window_min_timestamp=window_min_timestamp, 

593 window_max_timestamp=window_max_timestamp, 

594 ) 

595 

596 if signals_data is None: 

597 raise HTTPException(500, "There was en error while applying the function") 

598 

599 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

600 signals_data = signals_data.min_max_downsampling(number_samples_max) 

601 signals_data = signals_data.zero_time_vector(phase_sync_time) 

602 

603 return signals_data 

604 

605 

606@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)]) 

607async def apply_multiple_post_processing_function( 

608 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

609 phase_ids: list[str] = Query(default=[]), 

610 phase_sync_times: list[float] = Query(default=[]), 

611 signal_ids: list[str] = Query(default=[]), 

612 window_min_timestamp: float = None, 

613 window_max_timestamp: float = None, 

614 number_samples_max: int = None, 

615) -> SignalsData | None: 

616 if len(phase_ids) != len(signal_ids): 

617 raise HTTPException(400, "Each selected signal should correspond to a phase") 

618 

619 if len(signal_ids) < 2: 

620 raise HTTPException(400, "These functions can only be applied to multiple signals") 

621 

622 if len(phase_ids) != len(phase_sync_times) and len(phase_sync_times) != 0: 

623 raise HTTPException(400, "Number of synchronization times does not match the number of phases") 

624 

625 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids] 

626 if None in phases: 

627 raise HTTPException(404, "Phase not found") 

628 

629 if len(phase_sync_times) == 0: 

630 phase_sync_times = [phase.start_at / 1000 for phase in phases] 

631 

632 signals_data = await SignalsData.apply_multiple_function( 

633 phases, 

634 signal_ids, 

635 function, 

636 window_min_timestamp=window_min_timestamp, 

637 window_max_timestamp=window_max_timestamp, 

638 ) 

639 

640 if signals_data is None: 

641 raise HTTPException(500, "There was en error while applying the function") 

642 

643 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector): 

644 signals_data = signals_data.min_max_downsampling(number_samples_max) 

645 if function in {"Align-X", "Using-X"}: 

646 signals_data = signals_data.zero_time_vector(phase_sync_times[1]) 

647 else: 

648 signals_data = signals_data.zero_time_vector(phase_sync_times[0]) 

649 

650 return signals_data 

651 

652 

653@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)]) 

654async def export_post_processing_zip( 

655 file_format: str, 

656 phase_ids: list[MongoId] = Query(default=[]), 

657 phase_sync_times: list[float | None] = Query(default=[]), 

658 signal_ids: list[str] = Query(default=[]), 

659 window_min_timestamps: list[float | None] = Query(default=[]), 

660 window_max_timestamps: list[float | None] = Query(default=[]), 

661): 

662 signals_data = await get_signals_data_post_processing( 

663 phase_ids, 

664 phase_sync_times, 

665 signal_ids, 

666 window_min_timestamps, 

667 window_max_timestamps, 

668 ) 

669 

670 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids) 

671 

672 return Response( 

673 content=zip_data, 

674 media_type="application/zip", 

675 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

676 ) 

677 

678 

679@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

680async def export_post_processing_hdf5( 

681 phase_ids: list[str] = Query(default=[]), 

682 phase_sync_times: list[float | None] = Query(default=[]), 

683 signal_ids: list[str] = Query(default=[]), 

684 window_min_timestamps: list[float | None] = Query(default=[]), 

685 window_max_timestamps: list[float | None] = Query(default=[]), 

686): 

687 signals_data = await get_signals_data_post_processing( 

688 phase_ids, 

689 phase_sync_times, 

690 signal_ids, 

691 window_min_timestamps, 

692 window_max_timestamps, 

693 ) 

694 

695 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids) 

696 

697 return Response( 

698 content=data, 

699 media_type="application/hdf5", 

700 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

701 ) 

702 

703 

704@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

705async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

706 return Event.response_from_query(query) 

707 

708 

709@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

710async def get_event(event_id) -> Event: 

711 event = Event.get_from_id(event_id) 

712 if event is None: 

713 raise HTTPException(status_code=404, detail="No such event") 

714 return event 

715 

716 

717@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

718async def get_number_events( 

719 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

720) -> list[TwinPadActivity]: 

721 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

722 

723 

724@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

725async def get_number_commands( 

726 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

727) -> list[TwinPadActivity]: 

728 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

729 

730 

731@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

732async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

733 return EventRule.response_from_query(query) 

734 

735 

736@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

737async def get_event_rule(event_rule_id) -> EventRule: 

738 event_rule = EventRule.get_from_id(event_rule_id) 

739 if event_rule is None: 

740 raise HTTPException(status_code=404, detail="No such event rule") 

741 return event_rule 

742 

743 

744@app.post("/users", status_code=201) 

745async def create_user(user: User): 

746 if User.get_one_by_attribute("email", user.email) is not None: 

747 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

748 hashed_password = get_password_hash(user.password) 

749 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

750 if new_user is None: # pragma: no cover 

751 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

752 return new_user 

753 

754 

755@app.post("/token", status_code=201) 

756async def login_for_access_token( 

757 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

758) -> Token: 

759 user = authenticate_user(form_data.username, form_data.password) 

760 if not user: 

761 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

762 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

763 if user.is_blocked: 

764 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

765 access_token = create_access_token( 

766 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

767 ) 

768 return Token(access_token=access_token, token_type="bearer") 

769 

770 

771@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

772async def get_users(): 

773 return [u.to_dict() for u in User.get_all(sort_by="email")] 

774 

775 

776@app.get("/users/me") 

777async def get_current_user( 

778 current_user: Annotated[User, Depends(get_current_active_user)], 

779): 

780 return current_user.to_dict() 

781 

782 

783@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

784async def get_user(user_id: str): 

785 user = User.get_from_id(user_id) 

786 

787 if user is None: 

788 raise HTTPException( 

789 status_code=404, 

790 detail="User not found", 

791 ) 

792 return user.to_dict(exclude={"password", "is_connected"}) 

793 

794 

795@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

796async def patch_user(user: UserUpdate, user_id): 

797 if user.password == "" or user.password is None: 

798 del user.password 

799 else: 

800 user.password = get_password_hash(user.password) 

801 return User.update_info(user, user_id).to_dict() 

802 

803 

804@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

805async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

806 return Command.response_from_query(query) 

807 

808 

809@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

810async def get_campaigns(): 

811 return Campaign.get_all() 

812 

813 

814@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

815async def get_campaign_by_id(campaign_id: str): 

816 campaign = Campaign.get_from_id(campaign_id) 

817 if campaign is None: 

818 raise HTTPException(status_code=404, detail="Campaign not found") 

819 return campaign 

820 

821 

822@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

823async def add_campaign(campaign: Campaign): 

824 campaign_id = campaign.insert() 

825 if campaign_id is None: # pragma: no cover 

826 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

827 return campaign 

828 

829 

830@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

831async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

832 campaign = Campaign.get_from_id(campaign_id) 

833 if campaign is None: 

834 raise HTTPException(status_code=404, detail="Campaign not found") 

835 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json")) 

836 return campaign 

837 

838 

839@app.delete( 

840 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

841) 

842async def delete_campaign(campaign_id: str): 

843 campaign = Campaign.get_from_id(campaign_id) 

844 if campaign is None: 

845 raise HTTPException(status_code=404, detail="Campaign not found") 

846 delete_phases = Phase.deleteMany(campaign_id) 

847 if not delete_phases.acknowledged: # pragma: no cover 

848 raise HTTPException(status_code=500, detail="An error occurred during phases deletion") 

849 campaign_deleted = campaign.delete() 

850 if not campaign_deleted: # pragma: no cover 

851 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

852 return True 

853 

854 

855@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

856async def get_campaign_phases(campaign_id: str): 

857 return Phase.get_by_attribute("campaign_id", campaign_id) 

858 

859 

860@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

861async def get_phase(phase_id: str): 

862 phase = Phase.get_from_id(phase_id) 

863 if phase is None: 

864 raise HTTPException(status_code=404, detail="Phase not found") 

865 return phase 

866 

867 

868@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

869async def add_phase(phase: Phase): 

870 phase_id = phase.insert() 

871 if phase_id is None: # pragma: no cover 

872 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

873 return phase 

874 

875 

876@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

877async def edit_phase(phase_id, edit_phase: Phase): 

878 phase = Phase.get_from_id(phase_id) 

879 if phase is None: 

880 raise HTTPException(status_code=404, detail="Phase does not exists") 

881 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json")) 

882 return phase 

883 

884 

885@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

886async def delete_phase(phase_id: str): 

887 phase = Phase.get_from_id(phase_id) 

888 if phase is None: 

889 raise HTTPException(status_code=404, detail="Phase not found") 

890 deleted = phase.delete() 

891 if not deleted: # pragma: no cover 

892 raise HTTPException(status_code=500, detail="An error occurred during phase deletion") 

893 return True 

894 

895 

896@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

897async def get_custom_views(): 

898 return CustomView.get_all() 

899 

900 

901@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

902async def get_custom_views_from_user_id(user_id: str): 

903 return CustomView.get_by_attribute("user_id", user_id) 

904 

905 

906@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

907async def get_custom_view(custom_view_id: str): 

908 return CustomView.get_from_id(custom_view_id) 

909 

910 

911@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

912async def create_custom_view( 

913 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

914): 

915 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

916 custom_view.insert() 

917 return custom_view 

918 

919 

920@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

921async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

922 custom_view = CustomView.get_from_id(custom_view_id) 

923 return custom_view.update(custom_view_update.model_dump()) 

924 

925 

926@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

927async def delete_custom_view(custom_view_id: str): 

928 custom_view = CustomView.get_from_id(custom_view_id) 

929 return custom_view.delete() 

930 

931 

932@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

933async def add_video(video: Video): 

934 video.insert() 

935 if not video: # pragma: no cover 

936 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

937 return video 

938 

939 

940@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

941async def get_videos(): 

942 return Video.get_all() 

943 

944 

945@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

946def get_stream(video_id): 

947 camera_name = Video.get_video(video_id) 

948 if camera_name is None: 

949 raise HTTPException(status_code=404, detail="Camera not found") 

950 return camera_name 

951 

952 

953@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

954async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

955 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

956 

957 

958@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

959async def create_signals_preset( 

960 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

961): 

962 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

963 return new_signals_preset 

964 

965 

966@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

967async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

968 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

969 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

970 

971 

972@app.delete( 

973 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

974) 

975async def delete_signals_preset(signals_preset_id: str): 

976 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

977 return signals_preset.delete() 

978 

979 

980@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

981async def create_graph_theme( 

982 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

983): 

984 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

985 if styled_signal is None: 

986 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

987 

988 graph_theme = PrivateGraphTheme.create( 

989 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

990 ) 

991 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

992 

993 

994@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

995async def get_all_graph_themes( 

996 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

997) -> ListResponse[PublicGraphTheme]: 

998 return PublicGraphTheme.response_from_query(query, current_user.id) 

999 

1000 

1001@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

1002async def get_graph_themes_in_library( 

1003 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

1004) -> ListResponse[PublicGraphTheme]: 

1005 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

1006 

1007 

1008@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

1009async def update_graph_theme( 

1010 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

1011): 

1012 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

1013 update_dict = theme_update.model_dump(exclude_unset=True) 

1014 if current_user.id != graph_theme.creator_id: 

1015 for theme_property in update_dict.keys(): 

1016 if theme_property not in ["active_for_user", "in_user_library"]: 

1017 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

1018 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

1019 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

1020 

1021 

1022@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

1023async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

1024 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

1025 if current_user.id != graph_theme.creator_id: 

1026 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

1027 return graph_theme.delete() 

1028 

1029 

1030@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

1031async def get_signals_appearances( 

1032 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

1033) -> dict: 

1034 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id) 

1035 

1036 

1037if DEVICE_DEPLOYERS: 

1038 app.include_router(deployers_router, prefix="/device-deployers")