Coverage for  / usr / local / lib / python3.11 / site-packages / twinpad_backend / api.py: 98%

422 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-20 06:48 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 Signal, 

19 ForcedSignal, 

20 SignalData, 

21 SignalSample, 

22 ServicesStatus, 

23 Device, 

24 DeviceUpdate, 

25 DeviceSetup, 

26 DeviceSetupUpdate, 

27 DeviceState, 

28 SignalUpdate, 

29 SignalsData, 

30 Event, 

31 EventRule, 

32 TwinPadActivity, 

33 User, 

34 UserUpdate, 

35 Campaign, 

36 Phase, 

37 CustomView, 

38 Command, 

39 CustomViewCreation, 

40 CustomViewUpdate, 

41 Video, 

42 SignalsPreset, 

43 SignalsPresetCreation, 

44 SignalsPresetUpdate, 

45 PrivateGraphTheme, 

46 PublicGraphTheme, 

47 GraphThemeCreation, 

48 GraphThemeUpdate, 

49) 

50from twinpad_backend.auth import ( 

51 Token, 

52 authenticate_user, 

53 get_current_active_user, 

54 ACCESS_TOKEN_EXPIRE_MINUTES, 

55 create_access_token, 

56 get_password_hash, 

57) 

58from twinpad_backend.queries import ( 

59 SignalQuery, 

60 ForcedSignalQuery, 

61 DeviceStatesQuery, 

62 EventQuery, 

63 EventRuleQuery, 

64 CommandQuery, 

65 GraphThemeQuery, 

66) 

67from twinpad_backend.responses import ListResponse 

68 

69REQUEST_TIME_WARNING = 0.5 

70 

71DEBUG = os.environ.get("DEBUG", "false") == "true" 

72PROFILING = os.environ.get("PROFILING", "false") == "true" 

73 

74logger = logging.getLogger("uvicorn.error") 

75logger.propagate = False 

76logger.info("Debug mode: %s", DEBUG) 

77logger.info("log level: %s", logging.root.level) 

78 

79 

80app = FastAPI(title="Twinpad backend", version=__version__) 

81 

82app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

83 

84if PROFILING: # pragma: no cover 

85 profiling_folder = mkdtemp() 

86 logger.info("Profiling enabled") 

87 

88 @app.middleware("http") 

89 async def profile_request(request: Request, call_next): 

90 should_profile = True 

91 url = str(request.url) 

92 for segment in ("profiling", ".ico"): 

93 if segment in url: 

94 should_profile = False 

95 break 

96 

97 if should_profile: # avoid recursion 

98 profiler = Profiler() 

99 profiler.start() 

100 

101 response = await call_next(request) 

102 

103 profiler.stop() 

104 url = "_".join(url.split("/")[3:]).rstrip("/") 

105 if not url: 

106 url = "slash" 

107 filename = url.split("?", maxsplit=1)[0] 

108 logger.info("saving profiling to %s", filename) 

109 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

110 profiling_file.write(profiler.output_html()) 

111 

112 return response 

113 

114 return await call_next(request) 

115 

116 @app.get("/profilings") 

117 async def profilings(): 

118 return {"profilings": os.listdir(profiling_folder)} 

119 

120 @app.get("/profilings/{file_name}") 

121 async def profiling(file_name): 

122 file_path = os.path.join(profiling_folder, file_name) 

123 

124 if not os.path.exists(file_path): 

125 raise HTTPException( 

126 status_code=404, 

127 detail=f"Profiling file '{file_name}' not found", 

128 ) 

129 

130 with open(file_path, "r", encoding="utf-8") as profiling_file: 

131 return Response( 

132 content=profiling_file.read(), 

133 media_type="application/html", 

134 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'}, 

135 ) 

136 

137 

138@app.middleware("http") 

139async def log_request_time(request: Request, call_next): 

140 start_time = time.time() # Record the start time 

141 response = await call_next(request) # Process the request 

142 duration = time.time() - start_time # Calculate the time taken 

143 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

144 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

145 if duration > REQUEST_TIME_WARNING: 

146 logger.warning(message) 

147 else: 

148 logger.info(message) 

149 return response 

150 

151 

152@app.get("/") 

153async def slash(): 

154 return {"twinpad_version": __version__} 

155 

156 

157@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

158async def status(): 

159 """ 

160 Return service healthcheck 

161 """ 

162 return { 

163 "services": ServicesStatus.check(), 

164 } 

165 

166 

167@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

168async def get_devices() -> list[Device]: 

169 return Device.get_all(sort_by="device_id") 

170 

171 

172@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

173async def get_device(device_id): 

174 device = Device.get_one_by_attribute("device_id", device_id) 

175 if not device: 

176 raise HTTPException( 

177 status_code=404, 

178 detail="Device not found", 

179 ) 

180 return device 

181 

182 

183@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

184async def update_item( 

185 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

186): 

187 device = Device.get_one_by_attribute("device_id", device_id) 

188 if not device: 

189 raise HTTPException( 

190 status_code=404, 

191 detail="Device not found", 

192 ) 

193 result = await device.change_mode(device_update, current_user) 

194 if result.get("error", False) is True: 

195 raise HTTPException( 

196 status_code=result.get("status_code", 500), 

197 detail=result.get("message", "An error has occurred"), 

198 ) 

199 return result 

200 

201 

202@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

203async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

204 return DeviceState.get_from_id_and_query(device_id, query) 

205 

206 

207@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

208async def get_device_setups() -> list[DeviceSetup]: 

209 return DeviceSetup.get_all() 

210 

211 

212@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

213async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

214 device_setup.insert() 

215 return device_setup 

216 

217 

218@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

219async def get_device_setup(device_setup_id: str): 

220 device_setup = DeviceSetup.get_from_id(device_setup_id) 

221 if device_setup is None: 

222 raise HTTPException( 

223 status_code=404, 

224 detail="Device setup not found", 

225 ) 

226 return device_setup 

227 

228 

229@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

230async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

231 device_setup = DeviceSetup.get_from_id(device_setup_id) 

232 if device_setup is None: 

233 raise HTTPException( 

234 status_code=404, 

235 detail="Device setup not found", 

236 ) 

237 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

238 return device_setup 

239 

240 

241@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

242async def delete_device_setups(device_setup_id: str) -> bool: 

243 device_setup = DeviceSetup.get_from_id(device_setup_id) 

244 if device_setup is None: 

245 raise HTTPException( 

246 status_code=404, 

247 detail="Device setup not found", 

248 ) 

249 deleted = device_setup.delete() 

250 return deleted 

251 

252 

253@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

254async def get_number_samples( 

255 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

256) -> list[TwinPadActivity]: 

257 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

258 

259 

260@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

261async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

262 if "signal_id" not in query.sort_by: 

263 query.sort_by += ",signal_id:1" 

264 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

265 

266 

267@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

268async def signals_names() -> list[str]: 

269 return Signal.get_all_ids() 

270 

271 

272@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

273async def signal_stats(): 

274 """ 

275 Returns signals stats 

276 """ 

277 signal_statuses = Signal.get_all_statuses() 

278 signal_ids = [signal["signal_id"] for signal in signal_statuses] 

279 

280 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids) 

281 number_samples = sum(number_samples_by_signal_id.values()) 

282 

283 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up") 

284 

285 number_signals = Signal.get_number_documents() 

286 

287 return { 

288 "signal_data_size": signal_datasize(), 

289 "number_signal_samples": number_samples, 

290 "number_active_signals": number_active_signals, 

291 "number_signals": number_signals, 

292 } 

293 

294 

295@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

296async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

297 return SignalSample.get_last_from_signal_ids(signal_ids) 

298 

299 

300@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)]) 

301async def get_last_values_interest_window( 

302 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0 

303) -> list[SignalSample | None]: 

304 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp) 

305 

306 

307@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

308async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

309 return SignalSample.get_first_from_signal_ids(signal_ids) 

310 

311 

312@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

313async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

314 return Signal.get_forcibility(signal_ids) 

315 

316 

317@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)]) 

318async def get_forced_signals( 

319 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends() 

320): 

321 if not current_user.is_admin: 

322 raise HTTPException(401) 

323 return ForcedSignal.response_from_query(query) 

324 

325 

326@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

327async def get_signal(signal_id): 

328 signal = Signal.get_from_signal_id(signal_id) 

329 if not signal: 

330 raise HTTPException( 

331 status_code=404, 

332 detail="Signal not found", 

333 ) 

334 return signal.to_dict() 

335 

336 

337@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

338async def update_signal( 

339 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

340): 

341 signal = Signal.get_from_signal_id(signal_id) 

342 if not signal: 

343 raise HTTPException( 

344 status_code=404, 

345 detail="Device not found", 

346 ) 

347 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id) 

348 if forced_signal is not None: 

349 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

350 raise HTTPException( 

351 status_code=403, 

352 detail="Cannot override another user's forcing", 

353 ) 

354 

355 result = await signal.send_command(signal_update, current_user) 

356 if result.get("error", False) is True: 

357 raise HTTPException( 

358 status_code=result.get("status_code", 500), 

359 detail=result.get("message", "An error has occurred"), 

360 ) 

361 

362 if forced_signal is not None and signal_update.forced_value is None: 

363 forced_signal.delete() 

364 elif signal_update.forced_value is not None: 

365 forced_signal = ForcedSignal( 

366 signal_id=signal_id, 

367 forcing_user_id=current_user.id, 

368 forced_at=time.time(), 

369 value=signal_update.forced_value, 

370 ) 

371 forced_signal.insert() 

372 

373 return result 

374 

375 

376@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)]) 

377async def get_signal_forcibility( 

378 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)] 

379) -> bool: 

380 return ForcedSignal.can_force(signal_id, current_user) 

381 

382 

383@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

384async def get_signal_data( 

385 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

386) -> SignalData | None: 

387 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

388 

389 if number_samples_max is not None: 

390 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

391 

392 return signal_data 

393 

394 

395@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

396async def get_last_value(signal_id) -> SignalSample: 

397 sample = SignalSample.get_last_from_signal_id(signal_id) 

398 if sample is None: 

399 raise HTTPException(status_code=404, detail="No data") 

400 return sample 

401 

402 

403@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

404async def get_first_value(signal_id) -> SignalSample: 

405 sample = SignalSample.get_first_from_signal_id(signal_id) 

406 if sample is None: 

407 raise HTTPException(status_code=404, detail="No data") 

408 return sample 

409 

410 

411@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

412async def get_signal_number_samples(signal_id): 

413 signal = Signal.get_from_signal_id(signal_id) 

414 if not signal: 

415 raise HTTPException( 

416 status_code=404, 

417 detail="Device not found", 

418 ) 

419 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

420 

421 

422@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

423async def get_signals_data( 

424 signal_ids: list[str] = Query(default=[]), 

425 number_samples_max: int = None, 

426 min_timestamp: float = None, 

427 max_timestamp: float = None, 

428 interpolate_bounds: bool = True, 

429) -> SignalsData | None: 

430 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

431 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

432 

433 signals_data = SignalsData.get_from_signal_ids( 

434 signal_ids, 

435 min_timestamp=min_timestamp, 

436 max_timestamp=max_timestamp, 

437 window_min_timestamp=min_timestamp, 

438 window_max_timestamp=max_timestamp, 

439 interpolate_bounds=interpolate_bounds, 

440 ) 

441 if number_samples_max is not None: 

442 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

443 

444 return signals_data 

445 

446 

447@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

448async def get_signals_data_interest_window( 

449 window_max_number_samples: int = 600, 

450 outside_max_number_samples: int = 150, 

451 window_min_timestamp: float = None, 

452 window_max_timestamp: float = None, 

453 signal_ids: list[str] = Query(default=[]), 

454 min_timestamp: float = None, 

455 max_timestamp: float = None, 

456) -> SignalsData | None: 

457 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

458 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

459 

460 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

461 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

462 

463 signals_data = SignalsData.get_from_signal_ids( 

464 signal_ids, 

465 min_timestamp=min_timestamp, 

466 max_timestamp=max_timestamp, 

467 window_min_timestamp=window_min_timestamp, 

468 window_max_timestamp=window_max_timestamp, 

469 max_documents=10 * (window_max_number_samples + outside_max_number_samples), 

470 ) 

471 

472 signals_data = signals_data.interest_window_desampling( 

473 window_max_number_samples=window_max_number_samples, 

474 outside_max_number_samples=outside_max_number_samples, 

475 window_min_timestamp=window_min_timestamp, 

476 window_max_timestamp=window_max_timestamp, 

477 ) 

478 

479 return signals_data 

480 

481 

482@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

483async def export_signals_zip( 

484 file_format: str, 

485 signal_ids: list[str] = Query(default=[]), 

486 min_timestamp: float = None, 

487 max_timestamp: float = None, 

488): 

489 signals_data = SignalsData.get_from_signal_ids( 

490 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

491 ) 

492 zip_data = signals_data.zip_export(file_format) 

493 return Response( 

494 content=zip_data, 

495 media_type="application/zip", 

496 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

497 ) 

498 

499 

500@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

501async def export_signals_hdf5( 

502 signal_ids: list[str] = Query(default=[]), 

503 min_timestamp: float = None, 

504 max_timestamp: float = None, 

505): 

506 signals_data = SignalsData.get_from_signal_ids( 

507 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

508 ) 

509 data = signals_data.hdf5_export() 

510 return Response( 

511 content=data, 

512 media_type="application/hdf5", 

513 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

514 ) 

515 

516 

517@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

518async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

519 return Event.response_from_query(query) 

520 

521 

522@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

523async def get_event(event_id) -> Event: 

524 event = Event.get_from_id(event_id) 

525 if event is None: 

526 raise HTTPException(status_code=404, detail="No such event") 

527 return event 

528 

529 

530@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

531async def get_number_events( 

532 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

533) -> list[TwinPadActivity]: 

534 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

535 

536 

537@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

538async def get_number_commands( 

539 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

540) -> list[TwinPadActivity]: 

541 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

542 

543 

544@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

545async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

546 return EventRule.response_from_query(query) 

547 

548 

549@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

550async def get_event_rule(event_rule_id) -> EventRule: 

551 event_rule = EventRule.get_from_id(event_rule_id) 

552 if event_rule is None: 

553 raise HTTPException(status_code=404, detail="No such event rule") 

554 return event_rule 

555 

556 

557@app.post("/users", status_code=201) 

558async def create_user(user: User): 

559 if User.get_one_by_attribute("email", user.email) is not None: 

560 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

561 hashed_password = get_password_hash(user.password) 

562 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

563 if new_user is None: 

564 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

565 return new_user 

566 

567 

568@app.post("/token", status_code=201) 

569async def login_for_access_token( 

570 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

571) -> Token: 

572 user = authenticate_user(form_data.username, form_data.password) 

573 if not user: 

574 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

575 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

576 if user.is_active: 

577 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

578 access_token = create_access_token( 

579 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

580 ) 

581 return Token(access_token=access_token, token_type="bearer") 

582 

583 

584@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

585async def get_users(): 

586 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

587 

588 

589@app.get("/users/me", response_model=User) 

590async def get_current_user( 

591 current_user: Annotated[User, Depends(get_current_active_user)], 

592): 

593 del current_user.password 

594 return current_user 

595 

596 

597@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

598async def get_user(user_id: str): 

599 user = User.get_from_id(user_id) 

600 

601 if user is None: 

602 raise HTTPException( 

603 status_code=404, 

604 detail="User not found", 

605 ) 

606 return user.to_dict(exclude={"password", "is_connected"}) 

607 

608 

609@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

610async def patch_user(user: UserUpdate, user_id): 

611 if user.password == "" or user.password is None: 

612 del user.password 

613 else: 

614 user.password = get_password_hash(user.password) 

615 return User.update_info(user, user_id) 

616 

617 

618@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

619async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

620 return Command.response_from_query(query) 

621 

622 

623@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

624async def get_campaigns(): 

625 return Campaign.get_all() 

626 

627 

628@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

629async def get_campaign_by_id(campaign_id: str): 

630 campaign = Campaign.get_from_id(campaign_id) 

631 if campaign is None: 

632 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign") 

633 return campaign 

634 

635 

636@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

637async def add_campaign(campaign: Campaign): 

638 new_campaign = Campaign.create(campaign) 

639 if new_campaign is None: 

640 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

641 return new_campaign 

642 

643 

644@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

645async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

646 campaign = Campaign.get_from_id(campaign_id) 

647 if campaign is None: 

648 raise HTTPException(status_code=500, detail="An error occurred during campaign edition") 

649 campaign.name = edit_campaign.name 

650 campaign.description = edit_campaign.description 

651 return Campaign.update(campaign) 

652 

653 

654@app.delete( 

655 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

656) 

657async def delete_campaign(campaign_id: str): 

658 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

659 campaign = Campaign.get_from_id(campaign_id) 

660 if campaign is None: 

661 raise exception 

662 delete_phases = Phase.deleteMany(campaign_id) 

663 if not delete_phases.acknowledged: 

664 raise exception 

665 campaign_deleted = Campaign.delete(campaign_id) 

666 return campaign_deleted.acknowledged 

667 

668 

669@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

670async def get_campaign_phases(campaign_id: str): 

671 return Phase.get_by_attribute("campaign_id", campaign_id) 

672 

673 

674@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

675async def get_phase(phase_id: str): 

676 phase = Phase.get_from_id(phase_id) 

677 if phase is None: 

678 raise HTTPException(status_code=404, detail="Phase not found") 

679 return phase 

680 

681 

682@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

683async def add_phase(phase: Phase): 

684 new_phase = Phase.create(phase) 

685 if new_phase is None: 

686 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

687 return new_phase 

688 

689 

690@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

691async def edit_phase(phase_id, edit_phase: Phase): 

692 phase = Phase.get_from_id(phase_id) 

693 if phase is None: 

694 raise HTTPException(status_code=500, detail="An error occurred during Phase edition") 

695 phase.name = edit_phase.name 

696 phase.description = edit_phase.description 

697 phase.start_at = edit_phase.start_at 

698 phase.end_at = edit_phase.end_at 

699 return Phase.update(phase) 

700 

701 

702@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

703async def delete_phase(phase_id: str): 

704 phase = Phase.get_from_id(phase_id) 

705 if phase is None: 

706 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion") 

707 phase_deleted = Phase.delete(phase_id) 

708 return phase_deleted.acknowledged 

709 

710 

711@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

712async def get_custom_views(): 

713 return CustomView.get_all() 

714 

715 

716@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

717async def get_custom_views_from_user_id(user_id: str): 

718 return CustomView.get_by_attribute("user_id", user_id) 

719 

720 

721@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

722async def get_custom_view(custom_view_id: str): 

723 return CustomView.get_from_id(custom_view_id) 

724 

725 

726@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

727async def create_custom_view( 

728 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

729): 

730 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

731 custom_view.insert() 

732 return custom_view 

733 

734 

735@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

736async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

737 custom_view = CustomView.get_from_id(custom_view_id) 

738 return custom_view.update(custom_view_update.model_dump()) 

739 

740 

741@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

742async def delete_custom_view(custom_view_id: str): 

743 custom_view = CustomView.get_from_id(custom_view_id) 

744 return custom_view.delete() 

745 

746 

747@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

748async def add_video(video: Video): 

749 video.insert() 

750 if not video: 

751 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

752 return video 

753 

754 

755@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

756async def get_videos(): 

757 return Video.get_all() 

758 

759 

760@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

761def get_stream(video_id): 

762 camera_name = Video.get_video(video_id) 

763 if camera_name is None: 

764 raise HTTPException(status_code=404, detail="Camera not found") 

765 return camera_name 

766 

767 

768@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

769async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

770 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

771 

772 

773@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

774async def create_signals_preset( 

775 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

776): 

777 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

778 return new_signals_preset 

779 

780 

781@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

782async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

783 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

784 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

785 

786 

787@app.delete( 

788 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

789) 

790async def delete_signals_preset(signals_preset_id: str): 

791 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

792 return signals_preset.delete() 

793 

794 

795@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

796async def create_graph_theme( 

797 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

798): 

799 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

800 if styled_signal is None: 

801 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

802 

803 graph_theme = PrivateGraphTheme.create( 

804 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

805 ) 

806 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

807 

808 

809@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

810async def get_all_graph_themes( 

811 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

812) -> ListResponse[PublicGraphTheme]: 

813 return PublicGraphTheme.response_from_query(query, current_user.id) 

814 

815 

816@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

817async def get_graph_themes_in_library( 

818 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

819) -> ListResponse[PublicGraphTheme]: 

820 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

821 

822 

823@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

824async def update_graph_theme( 

825 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

826): 

827 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

828 update_dict = theme_update.model_dump(exclude_unset=True) 

829 if current_user.id != graph_theme.creator_id: 

830 for theme_property in update_dict.keys(): 

831 if theme_property not in ["active_for_user", "in_user_library"]: 

832 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

833 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

834 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

835 

836 

837@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

838async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

839 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

840 if current_user.id != graph_theme.creator_id: 

841 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

842 return graph_theme.delete() 

843 

844 

845@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

846async def get_signals_appearances( 

847 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

848) -> dict: 

849 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)