Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 97%

419 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-27 13:40 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 Signal, 

19 SignalData, 

20 SignalSample, 

21 ServicesStatus, 

22 Device, 

23 DeviceUpdate, 

24 DeviceSetup, 

25 DeviceSetupUpdate, 

26 DeviceState, 

27 SignalUpdate, 

28 SignalsData, 

29 Event, 

30 EventRule, 

31 TwinPadActivity, 

32 User, 

33 UserUpdate, 

34 Campaign, 

35 Phase, 

36 CustomView, 

37 Command, 

38 CustomViewCreation, 

39 CustomViewUpdate, 

40 Video, 

41 SignalsPreset, 

42 SignalsPresetCreation, 

43 SignalsPresetUpdate, 

44 PrivateGraphTheme, 

45 PublicGraphTheme, 

46 GraphThemeCreation, 

47 GraphThemeUpdate, 

48 Configuration, 

49) 

50from twinpad_backend.auth import ( 

51 Token, 

52 authenticate_user, 

53 get_current_active_user, 

54 ACCESS_TOKEN_EXPIRE_MINUTES, 

55 create_access_token, 

56 get_password_hash, 

57) 

58from twinpad_backend.queries import ( 

59 SignalQuery, 

60 DeviceStatesQuery, 

61 EventQuery, 

62 EventRuleQuery, 

63 CommandQuery, 

64 GraphThemeQuery, 

65 ConfigurationQuery, 

66) 

67from twinpad_backend.responses import ListResponse 

68 

69REQUEST_TIME_WARNING = 0.5 

70 

71DEBUG = os.environ.get("DEBUG", "false") == "true" 

72PROFILING = os.environ.get("PROFILING", "false") == "true" 

73 

74logger = logging.getLogger("uvicorn.error") 

75logger.propagate = False 

76logger.info("Debug mode: %s", DEBUG) 

77logger.info("log level: %s", logging.root.level) 

78 

79 

80app = FastAPI(title="Twinpad backend", version=__version__) 

81 

82app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

83 

84if PROFILING: # pragma: no cover 

85 profiling_folder = mkdtemp() 

86 logger.info("Profiling enabled") 

87 

88 @app.middleware("http") 

89 async def profile_request(request: Request, call_next): 

90 should_profile = True 

91 url = str(request.url) 

92 for segment in ("profiling", ".ico"): 

93 if segment in url: 

94 should_profile = False 

95 break 

96 

97 if should_profile: # avoid recursion 

98 profiler = Profiler() 

99 profiler.start() 

100 await call_next(request) 

101 profiler.stop() 

102 url = "_".join(url.split("/")[3:]).rstrip("/") 

103 if not url: 

104 url = "slash" 

105 # filename = f"{round(time.time())}_{url}" 

106 filename = url.split("?", maxsplit=1)[0] 

107 logger.info("saving profiling to %s", filename) 

108 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

109 profiling_file.write(profiler.output_html()) 

110 return await call_next(request) 

111 

112 @app.get("/profilings") 

113 async def profilings(): 

114 return {"profilings": os.listdir(profiling_folder)} 

115 

116 @app.get("/profilings/{profiling_id}") 

117 async def profiling(profiling_id): 

118 

119 filename = os.path.join(profiling_folder, profiling_id) 

120 if os.path.exists(filename): 

121 with open(filename, "r", encoding="utf-8") as profiling: 

122 return HTMLResponse(profiling.read()) 

123 raise HTTPException( 

124 status_code=404, 

125 detail="Profiling not found", 

126 ) 

127 

128 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings") 

129 

130 

131@app.middleware("http") 

132async def log_request_time(request: Request, call_next): 

133 start_time = time.time() # Record the start time 

134 response = await call_next(request) # Process the request 

135 duration = time.time() - start_time # Calculate the time taken 

136 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

137 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

138 if duration > REQUEST_TIME_WARNING: 

139 logger.warning(message) 

140 else: 

141 logger.info(message) 

142 return response 

143 

144 

145@app.get("/") 

146async def slash(): 

147 return {"twinpad_version": __version__} 

148 

149 

150@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

151async def status(): 

152 """ 

153 Return service healthcheck 

154 """ 

155 return { 

156 "services": ServicesStatus.check(), 

157 } 

158 

159 

160@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

161async def get_devices() -> list[Device]: 

162 return Device.get_all(sort_by="device_id") 

163 

164 

165@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

166async def get_device(device_id): 

167 device = Device.get_one_by_attribute("device_id", device_id) 

168 if not device: 

169 raise HTTPException( 

170 status_code=404, 

171 detail="Device not found", 

172 ) 

173 return device 

174 

175 

176@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

177async def update_item( 

178 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

179): 

180 device = Device.get_one_by_attribute("device_id", device_id) 

181 if not device: 

182 raise HTTPException( 

183 status_code=404, 

184 detail="Device not found", 

185 ) 

186 result = await device.change_mode(device_update, current_user) 

187 if result.get("error", False) is True: 

188 raise HTTPException( 

189 status_code=result.get("status_code", 500), 

190 detail=result.get("message", "An error has occurred"), 

191 ) 

192 return result 

193 

194 

195@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

196async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

197 return DeviceState.get_from_id_and_query(device_id, query) 

198 

199 

200@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

201async def get_device_setups() -> list[DeviceSetup]: 

202 return DeviceSetup.get_all() 

203 

204 

205@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

206async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

207 device_setup.insert() 

208 return device_setup 

209 

210 

211@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

212async def get_device_setup(device_setup_id: str): 

213 device_setup = DeviceSetup.get_from_id(device_setup_id) 

214 if device_setup is None: 

215 raise HTTPException( 

216 status_code=404, 

217 detail="Device setup not found", 

218 ) 

219 return device_setup 

220 

221 

222@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

223async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

224 device_setup = DeviceSetup.get_from_id(device_setup_id) 

225 if device_setup is None: 

226 raise HTTPException( 

227 status_code=404, 

228 detail="Device setup not found", 

229 ) 

230 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

231 return device_setup 

232 

233 

234@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

235async def delete_device_setups(device_setup_id: str) -> bool: 

236 device_setup = DeviceSetup.get_from_id(device_setup_id) 

237 if device_setup is None: 

238 raise HTTPException( 

239 status_code=404, 

240 detail="Device setup not found", 

241 ) 

242 deleted = device_setup.delete() 

243 return deleted 

244 

245 

246@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

247async def get_number_samples( 

248 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

249) -> list[TwinPadActivity]: 

250 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

251 

252 

253@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

254async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

255 # profiler = Profiler() 

256 # profiler.start() 

257 

258 res = Signal.response_from_query(query).to_dict(exclude={"device"}) 

259 

260 # profiler.stop() 

261 # filename = "get_signals.html" 

262 # full_file_path = os.path.join(Path.home(), filename) 

263 # logger.info("Saving profiling to %s", full_file_path) 

264 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

265 # profiling_file.write(profiler.output_html()) 

266 return res 

267 

268 

269@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

270async def get_signals_names() -> list[str]: 

271 return Signal.get_all_ids() 

272 

273 

274@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

275async def signal_stats(): 

276 """ 

277 Returns signals stats 

278 """ 

279 # profiler = Profiler() 

280 # profiler.start() 

281 

282 number_samples = 0 

283 number_active_signals = 0 

284 

285 for s in Signal.get_all(): 

286 if s is not None: 

287 number_samples += await s.number_samples() 

288 if s.status.status == "up": 

289 number_active_signals += 1 

290 

291 number_signals = Signal.get_number_documents() 

292 

293 # profiler.stop() 

294 # filename = "signals_stats_profiling.html" 

295 # full_file_path = os.path.join(Path.home(), filename) 

296 # logger.info("Saving profiling to %s", full_file_path) 

297 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

298 # profiling_file.write(profiler.output_html()) 

299 

300 return { 

301 "signal_data_size": signal_datasize(), 

302 "number_signal_samples": number_samples, 

303 "number_active_signals": number_active_signals, 

304 "number_signals": number_signals, 

305 } 

306 

307 

308@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

309async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

310 return SignalSample.get_last_from_signal_ids(signal_ids) 

311 

312 

313@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)]) 

314async def get_last_values_interest_window( 

315 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0 

316) -> list[SignalSample | None]: 

317 # profiler = Profiler() 

318 # profiler.start() 

319 

320 result = SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp) 

321 

322 # profiler.stop() 

323 # filename = "fetch_last_point.html" 

324 # full_file_path = os.path.join(Path.home(), filename) 

325 # logger.info("Saving profiling to %s", full_file_path) 

326 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

327 # profiling_file.write(profiler.output_html()) 

328 

329 return result 

330 

331 

332@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

333async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

334 return SignalSample.get_first_from_signal_ids(signal_ids) 

335 

336 

337@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

338async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

339 return Signal.get_forcibility(signal_ids) 

340 

341 

342@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

343async def get_signal(signal_id): 

344 signal = Signal.get_from_signal_id(signal_id) 

345 if not signal: 

346 raise HTTPException( 

347 status_code=404, 

348 detail="Signal not found", 

349 ) 

350 return signal.to_dict() 

351 

352 

353@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

354async def update_signal( 

355 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

356): 

357 signal = Signal.get_from_signal_id(signal_id) 

358 if not signal: 

359 raise HTTPException( 

360 status_code=404, 

361 detail="Signal not found", 

362 ) 

363 

364 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

365 if device is None: 

366 raise HTTPException( 

367 status_code=400, 

368 detail="Signal doesn't belong to an existing configuration", 

369 ) 

370 

371 result = await signal.send_command(device.device_id, signal_update, current_user) 

372 if result.get("error", False) is True: 

373 raise HTTPException( 

374 status_code=result.get("status_code", 500), 

375 detail=result.get("message", "An error has occurred"), 

376 ) 

377 return result 

378 

379 

380@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

381async def get_signal_data( 

382 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

383) -> SignalData | None: 

384 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

385 

386 if number_samples_max is not None: 

387 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

388 

389 return signal_data 

390 

391 

392@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

393async def get_last_value(signal_id) -> SignalSample: 

394 sample = SignalSample.get_last_from_signal_id(signal_id) 

395 if sample is None: 

396 raise HTTPException(status_code=404, detail="No data") 

397 return sample 

398 

399 

400@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

401async def get_first_value(signal_id) -> SignalSample: 

402 sample = SignalSample.get_first_from_signal_id(signal_id) 

403 if sample is None: 

404 raise HTTPException(status_code=404, detail="No data") 

405 return sample 

406 

407 

408@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

409async def get_signal_number_samples(signal_id): 

410 signal = Signal.get_from_signal_id(signal_id) 

411 if not signal: 

412 raise HTTPException( 

413 status_code=404, 

414 detail="Device not found", 

415 ) 

416 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

417 

418 

419@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

420async def get_signals_data( 

421 signal_ids: list[str] = Query(default=[]), 

422 number_samples_max: int = None, 

423 min_timestamp: float = None, 

424 max_timestamp: float = None, 

425 interpolate_bounds: bool = True, 

426) -> SignalsData | None: 

427 # profiler = Profiler() 

428 # profiler.start() 

429 

430 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

431 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

432 

433 signals_data = SignalsData.get_from_signal_ids( 

434 signal_ids, 

435 min_timestamp=min_timestamp, 

436 max_timestamp=max_timestamp, 

437 window_min_timestamp=min_timestamp, 

438 window_max_timestamp=max_timestamp, 

439 interpolate_bounds=interpolate_bounds, 

440 ) 

441 if number_samples_max is not None: 

442 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

443 

444 # profiler.stop() 

445 # filename = "signals-data.html" 

446 # full_file_path = os.path.join(Path.home(), filename) 

447 # logger.info(f"Saving profiling to %s", full_file_path) 

448 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

449 # profiling_file.write(profiler.output_html()) 

450 

451 return signals_data 

452 

453 

454@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

455async def get_signals_data_interest_window( 

456 window_max_number_samples: int = 600, 

457 outside_max_number_samples: int = 150, 

458 window_min_timestamp: float = None, 

459 window_max_timestamp: float = None, 

460 signal_ids: list[str] = Query(default=[]), 

461 min_timestamp: float = None, 

462 max_timestamp: float = None, 

463) -> SignalsData | None: 

464 # profiler = Profiler() 

465 # profiler.start() 

466 

467 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

468 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

469 

470 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

471 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

472 

473 signals_data = SignalsData.get_from_signal_ids( 

474 signal_ids, 

475 min_timestamp=min_timestamp, 

476 max_timestamp=max_timestamp, 

477 window_min_timestamp=window_min_timestamp, 

478 window_max_timestamp=window_max_timestamp, 

479 max_documents=10 * (window_max_number_samples + outside_max_number_samples), 

480 ) 

481 

482 signals_data = signals_data.interest_window_desampling( 

483 window_max_number_samples=window_max_number_samples, 

484 outside_max_number_samples=outside_max_number_samples, 

485 window_min_timestamp=window_min_timestamp, 

486 window_max_timestamp=window_max_timestamp, 

487 ) 

488 

489 # profiler.stop() 

490 # filename = "signals-data.html" 

491 # full_file_path = os.path.join(Path.home(), filename) 

492 # logger.info(f"Saving profiling to %s", full_file_path) 

493 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

494 # profiling_file.write(profiler.output_html()) 

495 

496 return signals_data 

497 

498 

499@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

500async def export_signals_zip( 

501 file_format: str, 

502 signal_ids: list[str] = Query(default=[]), 

503 min_timestamp: float = None, 

504 max_timestamp: float = None, 

505): 

506 signals_data = SignalsData.get_from_signal_ids( 

507 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

508 ) 

509 zip_data = signals_data.zip_export(file_format) 

510 return Response( 

511 content=zip_data, 

512 media_type="application/zip", 

513 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

514 ) 

515 

516 

517@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

518async def export_signals_hdf5( 

519 signal_ids: list[str] = Query(default=[]), 

520 min_timestamp: float = None, 

521 max_timestamp: float = None, 

522): 

523 signals_data = SignalsData.get_from_signal_ids( 

524 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

525 ) 

526 data = signals_data.hdf5_export() 

527 return Response( 

528 content=data, 

529 media_type="application/hdf5", 

530 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

531 ) 

532 

533 

534@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

535async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

536 return Event.response_from_query(query) 

537 

538 

539@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

540async def get_event(event_id) -> Event: 

541 event = Event.get_from_id(event_id) 

542 if event is None: 

543 raise HTTPException(status_code=404, detail="No such event") 

544 return event 

545 

546 

547@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

548async def get_number_events( 

549 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

550) -> list[TwinPadActivity]: 

551 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

552 

553 

554@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

555async def get_number_commands( 

556 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

557) -> list[TwinPadActivity]: 

558 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

559 

560 

561@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

562async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

563 return EventRule.response_from_query(query) 

564 

565 

566@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

567async def get_event_rule(event_rule_id) -> EventRule: 

568 event_rule = EventRule.get_from_id(event_rule_id) 

569 if event_rule is None: 

570 raise HTTPException(status_code=404, detail="No such event rule") 

571 return event_rule 

572 

573 

574@app.post("/users", status_code=201) 

575async def create_user(user: User): 

576 if User.get_one_by_attribute("email", user.email) is not None: 

577 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

578 hashed_password = get_password_hash(user.password) 

579 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

580 if new_user is None: 

581 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

582 return new_user 

583 

584 

585@app.post("/token", status_code=201) 

586async def login_for_access_token( 

587 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

588) -> Token: 

589 user = authenticate_user(form_data.username, form_data.password) 

590 if not user: 

591 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

592 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

593 if user.is_active: 

594 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

595 access_token = create_access_token( 

596 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

597 ) 

598 return Token(access_token=access_token, token_type="bearer") 

599 

600 

601@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

602async def get_users(): 

603 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

604 

605 

606@app.get("/users/me", response_model=User) 

607async def get_current_user( 

608 current_user: Annotated[User, Depends(get_current_active_user)], 

609): 

610 del current_user.password 

611 return current_user 

612 

613 

614@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

615async def get_user(user_id: str): 

616 user = User.get_from_id(user_id) 

617 

618 if user is None: 

619 raise HTTPException( 

620 status_code=404, 

621 detail="User not found", 

622 ) 

623 return user.to_dict(exclude={"password", "is_connected"}) 

624 

625 

626@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

627async def patch_user(user: UserUpdate, user_id): 

628 if user.password == "" or user.password is None: 

629 del user.password 

630 else: 

631 user.password = get_password_hash(user.password) 

632 return User.update_info(user, user_id) 

633 

634 

635@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

636async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

637 return Command.response_from_query(query) 

638 

639 

640@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

641async def get_campaigns(): 

642 return Campaign.get_all() 

643 

644 

645@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

646async def get_campaign_by_id(campaign_id: str): 

647 campaign = Campaign.get_from_id(campaign_id) 

648 if campaign is None: 

649 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign") 

650 return campaign 

651 

652 

653@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

654async def add_campaign(campaign: Campaign): 

655 new_campaign = Campaign.create(campaign) 

656 if new_campaign is None: 

657 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

658 return new_campaign 

659 

660 

661@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

662async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

663 campaign = Campaign.get_from_id(campaign_id) 

664 if campaign is None: 

665 raise HTTPException(status_code=500, detail="An error occurred during campaign edition") 

666 campaign.name = edit_campaign.name 

667 campaign.description = edit_campaign.description 

668 return Campaign.update(campaign) 

669 

670 

671@app.delete( 

672 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

673) 

674async def delete_campaign(campaign_id: str): 

675 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

676 campaign = Campaign.get_from_id(campaign_id) 

677 if campaign is None: 

678 raise exception 

679 delete_phases = Phase.deleteMany(campaign_id) 

680 if not delete_phases.acknowledged: 

681 raise exception 

682 campaign_deleted = Campaign.delete(campaign_id) 

683 return campaign_deleted.acknowledged 

684 

685 

686@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

687async def get_campaign_phases(campaign_id: str): 

688 return Phase.get_by_attribute("campaign_id", campaign_id) 

689 

690 

691@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

692async def get_phase(phase_id: str): 

693 phase = Phase.get_from_id(phase_id) 

694 if phase is None: 

695 raise HTTPException(status_code=404, detail="Phase not found") 

696 return phase 

697 

698 

699@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

700async def add_phase(phase: Phase): 

701 new_phase = Phase.create(phase) 

702 if new_phase is None: 

703 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

704 return new_phase 

705 

706 

707@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

708async def edit_phase(phase_id, edit_phase: Phase): 

709 phase = Phase.get_from_id(phase_id) 

710 if phase is None: 

711 raise HTTPException(status_code=500, detail="An error occurred during Phase edition") 

712 phase.name = edit_phase.name 

713 phase.description = edit_phase.description 

714 phase.start_at = edit_phase.start_at 

715 phase.end_at = edit_phase.end_at 

716 return Phase.update(phase) 

717 

718 

719@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

720async def delete_phase(phase_id: str): 

721 phase = Phase.get_from_id(phase_id) 

722 if phase is None: 

723 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion") 

724 phase_deleted = Phase.delete(phase_id) 

725 return phase_deleted.acknowledged 

726 

727 

728@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

729async def get_custom_views(): 

730 return CustomView.get_all() 

731 

732 

733@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

734async def get_custom_views_from_user_id(user_id: str): 

735 return CustomView.get_by_attribute("user_id", user_id) 

736 

737 

738@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

739async def get_custom_view(custom_view_id: str): 

740 return CustomView.get_from_id(custom_view_id) 

741 

742 

743@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

744async def create_custom_view( 

745 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

746): 

747 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

748 custom_view.insert() 

749 return custom_view 

750 

751 

752@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

753async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

754 custom_view = CustomView.get_from_id(custom_view_id) 

755 return custom_view.update(custom_view_update.model_dump()) 

756 

757 

758@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

759async def delete_custom_view(custom_view_id: str): 

760 custom_view = CustomView.get_from_id(custom_view_id) 

761 return custom_view.delete() 

762 

763 

764@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

765async def add_video(video: Video): 

766 video.insert() 

767 if not video: 

768 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

769 return video 

770 

771 

772@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

773async def get_videos(): 

774 return Video.get_all() 

775 

776 

777@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

778def get_stream(video_id): 

779 camera_name = Video.get_video(video_id) 

780 if camera_name is None: 

781 raise HTTPException(status_code=404, detail="Camera not found") 

782 return camera_name 

783 

784 

785@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

786async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

787 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

788 

789 

790@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

791async def create_signals_preset( 

792 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

793): 

794 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

795 return new_signals_preset 

796 

797 

798@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

799async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

800 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

801 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

802 

803 

804@app.delete( 

805 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

806) 

807async def delete_signals_preset(signals_preset_id: str): 

808 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

809 return signals_preset.delete() 

810 

811 

812@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

813async def create_graph_theme( 

814 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

815): 

816 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

817 if styled_signal is None: 

818 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

819 

820 graph_theme = PrivateGraphTheme.create( 

821 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

822 ) 

823 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

824 

825 

826@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

827async def get_all_graph_themes( 

828 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

829) -> ListResponse[PublicGraphTheme]: 

830 return PublicGraphTheme.response_from_query(query, current_user.id) 

831 

832 

833@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

834async def get_graph_themes_in_library( 

835 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

836) -> ListResponse[PublicGraphTheme]: 

837 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

838 

839 

840@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

841async def update_graph_theme( 

842 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

843): 

844 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

845 update_dict = theme_update.model_dump(exclude_unset=True) 

846 if current_user.id != graph_theme.creator_id: 

847 for theme_property in update_dict.keys(): 

848 if theme_property not in ["active_for_user", "in_user_library"]: 

849 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

850 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

851 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

852 

853 

854@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

855async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

856 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

857 if current_user.id != graph_theme.creator_id: 

858 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

859 return graph_theme.delete() 

860 

861 

862@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

863async def get_signals_appearances( 

864 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

865) -> dict: 

866 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id) 

867 

868 

869@app.get("/configs", dependencies=[Depends(get_current_active_user)]) 

870async def get_configs(query: ConfigurationQuery = Depends()) -> ListResponse[Configuration]: 

871 # profiler = Profiler() 

872 # profiler.start() 

873 

874 # profiler.stop() 

875 # filename = "configs.html" 

876 # full_file_path = os.path.join(Path.home(), filename) 

877 # logger.info("Saving profiling to %s", full_file_path) 

878 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

879 # profiling_file.write(profiler.output_html()) 

880 return Configuration.response_from_query(query) 

881 

882 

883@app.get("/configs/ids", dependencies=[Depends(get_current_active_user)]) 

884async def get_config_ids(): 

885 return Configuration.get_all_ids() 

886 

887 

888@app.get("/configs/{config_id}", response_model=Configuration, dependencies=[Depends(get_current_active_user)]) 

889async def get_config(config_id: str): 

890 return Configuration.get_from_config_id(config_id).to_dict(exclude={"petri_network", "pid", "event_rules", "modes"})