Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 98%

406 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 15:25 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 Signal, 

19 SignalData, 

20 SignalSample, 

21 ServicesStatus, 

22 Device, 

23 DeviceUpdate, 

24 DeviceSetup, 

25 DeviceSetupUpdate, 

26 DeviceState, 

27 SignalUpdate, 

28 SignalsData, 

29 Event, 

30 EventRule, 

31 TwinPadActivity, 

32 User, 

33 UserUpdate, 

34 Campaign, 

35 Phase, 

36 CustomView, 

37 Command, 

38 CustomViewCreation, 

39 CustomViewUpdate, 

40 Video, 

41 SignalsPreset, 

42 SignalsPresetCreation, 

43 SignalsPresetUpdate, 

44 PrivateGraphTheme, 

45 PublicGraphTheme, 

46 GraphThemeCreation, 

47 GraphThemeUpdate, 

48) 

49from twinpad_backend.auth import ( 

50 Token, 

51 authenticate_user, 

52 get_current_active_user, 

53 ACCESS_TOKEN_EXPIRE_MINUTES, 

54 create_access_token, 

55 get_password_hash, 

56) 

57from twinpad_backend.queries import ( 

58 SignalQuery, 

59 DeviceStatesQuery, 

60 EventQuery, 

61 EventRuleQuery, 

62 CommandQuery, 

63 GraphThemeQuery, 

64) 

65from twinpad_backend.responses import ListResponse 

66 

67REQUEST_TIME_WARNING = 0.5 

68 

69DEBUG = os.environ.get("DEBUG", "false") == "true" 

70PROFILING = os.environ.get("PROFILING", "false") == "true" 

71 

72logger = logging.getLogger("uvicorn.error") 

73logger.propagate = False 

74logger.info("Debug mode: %s", DEBUG) 

75logger.info("log level: %s", logging.root.level) 

76 

77 

78app = FastAPI(title="Twinpad backend", version=__version__) 

79 

80app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

81 

82if PROFILING: # pragma: no cover 

83 profiling_folder = mkdtemp() 

84 logger.info("Profiling enabled") 

85 

86 @app.middleware("http") 

87 async def profile_request(request: Request, call_next): 

88 should_profile = True 

89 url = str(request.url) 

90 for segment in ("profiling", ".ico"): 

91 if segment in url: 

92 should_profile = False 

93 break 

94 

95 if should_profile: # avoid recursion 

96 profiler = Profiler() 

97 profiler.start() 

98 

99 response = await call_next(request) 

100 

101 profiler.stop() 

102 url = "_".join(url.split("/")[3:]).rstrip("/") 

103 if not url: 

104 url = "slash" 

105 filename = url.split("?", maxsplit=1)[0] 

106 logger.info("saving profiling to %s", filename) 

107 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

108 profiling_file.write(profiler.output_html()) 

109 

110 return response 

111 

112 return await call_next(request) 

113 

114 @app.get("/profilings") 

115 async def profilings(): 

116 return {"profilings": os.listdir(profiling_folder)} 

117 

118 @app.get("/profilings/{file_name}") 

119 async def profiling(file_name): 

120 file_path = os.path.join(profiling_folder, file_name) 

121 

122 if not os.path.exists(file_path): 

123 raise HTTPException( 

124 status_code=404, 

125 detail=f"Profiling file '{file_name}' not found", 

126 ) 

127 

128 with open(file_path, "r", encoding="utf-8") as profiling_file: 

129 return Response( 

130 content=profiling_file.read(), 

131 media_type="application/html", 

132 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'}, 

133 ) 

134 

135 

136@app.middleware("http") 

137async def log_request_time(request: Request, call_next): 

138 start_time = time.time() # Record the start time 

139 response = await call_next(request) # Process the request 

140 duration = time.time() - start_time # Calculate the time taken 

141 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

142 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

143 if duration > REQUEST_TIME_WARNING: 

144 logger.warning(message) 

145 else: 

146 logger.info(message) 

147 return response 

148 

149 

150@app.get("/") 

151async def slash(): 

152 return {"twinpad_version": __version__} 

153 

154 

155@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

156async def status(): 

157 """ 

158 Return service healthcheck 

159 """ 

160 return { 

161 "services": ServicesStatus.check(), 

162 } 

163 

164 

165@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

166async def get_devices() -> list[Device]: 

167 return Device.get_all(sort_by="device_id") 

168 

169 

170@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

171async def get_device(device_id): 

172 device = Device.get_one_by_attribute("device_id", device_id) 

173 if not device: 

174 raise HTTPException( 

175 status_code=404, 

176 detail="Device not found", 

177 ) 

178 return device 

179 

180 

181@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

182async def update_item( 

183 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

184): 

185 device = Device.get_one_by_attribute("device_id", device_id) 

186 if not device: 

187 raise HTTPException( 

188 status_code=404, 

189 detail="Device not found", 

190 ) 

191 result = await device.change_mode(device_update, current_user) 

192 if result.get("error", False) is True: 

193 raise HTTPException( 

194 status_code=result.get("status_code", 500), 

195 detail=result.get("message", "An error has occurred"), 

196 ) 

197 return result 

198 

199 

200@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

201async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

202 return DeviceState.get_from_id_and_query(device_id, query) 

203 

204 

205@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

206async def get_device_setups() -> list[DeviceSetup]: 

207 return DeviceSetup.get_all() 

208 

209 

210@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

211async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

212 device_setup.insert() 

213 return device_setup 

214 

215 

216@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

217async def get_device_setup(device_setup_id: str): 

218 device_setup = DeviceSetup.get_from_id(device_setup_id) 

219 if device_setup is None: 

220 raise HTTPException( 

221 status_code=404, 

222 detail="Device setup not found", 

223 ) 

224 return device_setup 

225 

226 

227@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

228async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

229 device_setup = DeviceSetup.get_from_id(device_setup_id) 

230 if device_setup is None: 

231 raise HTTPException( 

232 status_code=404, 

233 detail="Device setup not found", 

234 ) 

235 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

236 return device_setup 

237 

238 

239@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

240async def delete_device_setups(device_setup_id: str) -> bool: 

241 device_setup = DeviceSetup.get_from_id(device_setup_id) 

242 if device_setup is None: 

243 raise HTTPException( 

244 status_code=404, 

245 detail="Device setup not found", 

246 ) 

247 deleted = device_setup.delete() 

248 return deleted 

249 

250 

251@app.get("/number-samples", dependencies=[Depends(get_current_active_user)]) 

252async def get_number_samples( 

253 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

254) -> list[TwinPadActivity]: 

255 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount) 

256 

257 

258@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

259async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

260 if "signal_id" not in query.sort_by: 

261 query.sort_by += ",signal_id:1" 

262 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

263 

264 

265@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

266async def signals_names() -> list[str]: 

267 return Signal.get_all_ids() 

268 

269 

270@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

271async def signal_stats(): 

272 """ 

273 Returns signals stats 

274 """ 

275 signal_ids = Signal.get_all_ids() 

276 

277 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids) 

278 number_samples = sum(number_samples_by_signal_id.values()) 

279 

280 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

281 statuses_by_signal_ids = Signal.status_batch(signal_ids, devices_by_ids) 

282 number_active_signals = sum(1 for _, status in statuses_by_signal_ids.items() if status.status == "up") 

283 

284 number_signals = Signal.get_number_documents() 

285 

286 return { 

287 "signal_data_size": signal_datasize(), 

288 "number_signal_samples": number_samples, 

289 "number_active_signals": number_active_signals, 

290 "number_signals": number_signals, 

291 } 

292 

293 

294@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

295async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

296 return SignalSample.get_last_from_signal_ids(signal_ids) 

297 

298 

299@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)]) 

300async def get_last_values_interest_window( 

301 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0 

302) -> list[SignalSample | None]: 

303 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp) 

304 

305 

306@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

307async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

308 return SignalSample.get_first_from_signal_ids(signal_ids) 

309 

310 

311@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

312async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

313 return Signal.get_forcibility(signal_ids) 

314 

315 

316@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

317async def get_signal(signal_id): 

318 signal = Signal.get_from_signal_id(signal_id) 

319 if not signal: 

320 raise HTTPException( 

321 status_code=404, 

322 detail="Signal not found", 

323 ) 

324 return signal.to_dict() 

325 

326 

327@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

328async def update_signal( 

329 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)] 

330): 

331 signal = Signal.get_from_signal_id(signal_id) 

332 if not signal: 

333 raise HTTPException( 

334 status_code=404, 

335 detail="Device not found", 

336 ) 

337 result = await signal.send_command(signal_update, current_user) 

338 if result.get("error", False) is True: 

339 raise HTTPException( 

340 status_code=result.get("status_code", 500), 

341 detail=result.get("message", "An error has occurred"), 

342 ) 

343 return result 

344 

345 

346@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

347async def get_signal_data( 

348 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

349) -> SignalData | None: 

350 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

351 

352 if number_samples_max is not None: 

353 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

354 

355 return signal_data 

356 

357 

358@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

359async def get_last_value(signal_id) -> SignalSample: 

360 sample = SignalSample.get_last_from_signal_id(signal_id) 

361 if sample is None: 

362 raise HTTPException(status_code=404, detail="No data") 

363 return sample 

364 

365 

366@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

367async def get_first_value(signal_id) -> SignalSample: 

368 sample = SignalSample.get_first_from_signal_id(signal_id) 

369 if sample is None: 

370 raise HTTPException(status_code=404, detail="No data") 

371 return sample 

372 

373 

374@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

375async def get_signal_number_samples(signal_id): 

376 signal = Signal.get_from_signal_id(signal_id) 

377 if not signal: 

378 raise HTTPException( 

379 status_code=404, 

380 detail="Device not found", 

381 ) 

382 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

383 

384 

385@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

386async def get_signals_data( 

387 signal_ids: list[str] = Query(default=[]), 

388 number_samples_max: int = None, 

389 min_timestamp: float = None, 

390 max_timestamp: float = None, 

391 interpolate_bounds: bool = True, 

392) -> SignalsData | None: 

393 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

394 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

395 

396 signals_data = SignalsData.get_from_signal_ids( 

397 signal_ids, 

398 min_timestamp=min_timestamp, 

399 max_timestamp=max_timestamp, 

400 window_min_timestamp=min_timestamp, 

401 window_max_timestamp=max_timestamp, 

402 interpolate_bounds=interpolate_bounds, 

403 ) 

404 if number_samples_max is not None: 

405 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

406 

407 return signals_data 

408 

409 

410@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

411async def get_signals_data_interest_window( 

412 window_max_number_samples: int = 600, 

413 outside_max_number_samples: int = 150, 

414 window_min_timestamp: float = None, 

415 window_max_timestamp: float = None, 

416 signal_ids: list[str] = Query(default=[]), 

417 min_timestamp: float = None, 

418 max_timestamp: float = None, 

419) -> SignalsData | None: 

420 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

421 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

422 

423 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

424 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

425 

426 signals_data = SignalsData.get_from_signal_ids( 

427 signal_ids, 

428 min_timestamp=min_timestamp, 

429 max_timestamp=max_timestamp, 

430 window_min_timestamp=window_min_timestamp, 

431 window_max_timestamp=window_max_timestamp, 

432 max_documents=10 * (window_max_number_samples + outside_max_number_samples), 

433 ) 

434 

435 signals_data = signals_data.interest_window_desampling( 

436 window_max_number_samples=window_max_number_samples, 

437 outside_max_number_samples=outside_max_number_samples, 

438 window_min_timestamp=window_min_timestamp, 

439 window_max_timestamp=window_max_timestamp, 

440 ) 

441 

442 return signals_data 

443 

444 

445@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)]) 

446async def export_signals_zip( 

447 file_format: str, 

448 signal_ids: list[str] = Query(default=[]), 

449 min_timestamp: float = None, 

450 max_timestamp: float = None, 

451): 

452 signals_data = SignalsData.get_from_signal_ids( 

453 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

454 ) 

455 zip_data = signals_data.zip_export(file_format) 

456 return Response( 

457 content=zip_data, 

458 media_type="application/zip", 

459 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

460 ) 

461 

462 

463@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)]) 

464async def export_signals_hdf5( 

465 signal_ids: list[str] = Query(default=[]), 

466 min_timestamp: float = None, 

467 max_timestamp: float = None, 

468): 

469 signals_data = SignalsData.get_from_signal_ids( 

470 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

471 ) 

472 data = signals_data.hdf5_export() 

473 return Response( 

474 content=data, 

475 media_type="application/hdf5", 

476 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'}, 

477 ) 

478 

479 

480@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

481async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

482 return Event.response_from_query(query) 

483 

484 

485@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

486async def get_event(event_id) -> Event: 

487 event = Event.get_from_id(event_id) 

488 if event is None: 

489 raise HTTPException(status_code=404, detail="No such event") 

490 return event 

491 

492 

493@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

494async def get_number_events( 

495 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

496) -> list[TwinPadActivity]: 

497 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount) 

498 

499 

500@app.get("/number-commands", dependencies=[Depends(get_current_active_user)]) 

501async def get_number_commands( 

502 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False 

503) -> list[TwinPadActivity]: 

504 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount) 

505 

506 

507@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

508async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

509 return EventRule.response_from_query(query) 

510 

511 

512@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

513async def get_event_rule(event_rule_id) -> EventRule: 

514 event_rule = EventRule.get_from_id(event_rule_id) 

515 if event_rule is None: 

516 raise HTTPException(status_code=404, detail="No such event rule") 

517 return event_rule 

518 

519 

520@app.post("/users", status_code=201) 

521async def create_user(user: User): 

522 if User.get_one_by_attribute("email", user.email) is not None: 

523 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

524 hashed_password = get_password_hash(user.password) 

525 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

526 if new_user is None: 

527 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

528 return new_user 

529 

530 

531@app.post("/token", status_code=201) 

532async def login_for_access_token( 

533 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

534) -> Token: 

535 user = authenticate_user(form_data.username, form_data.password) 

536 if not user: 

537 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

538 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

539 if user.is_active: 

540 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

541 access_token = create_access_token( 

542 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

543 ) 

544 return Token(access_token=access_token, token_type="bearer") 

545 

546 

547@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

548async def get_users(): 

549 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

550 

551 

552@app.get("/users/me", response_model=User) 

553async def get_current_user( 

554 current_user: Annotated[User, Depends(get_current_active_user)], 

555): 

556 del current_user.password 

557 return current_user 

558 

559 

560@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

561async def get_user(user_id: str): 

562 user = User.get_from_id(user_id) 

563 

564 if user is None: 

565 raise HTTPException( 

566 status_code=404, 

567 detail="User not found", 

568 ) 

569 return user.to_dict(exclude={"password", "is_connected"}) 

570 

571 

572@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

573async def patch_user(user: UserUpdate, user_id): 

574 if user.password == "" or user.password is None: 

575 del user.password 

576 else: 

577 user.password = get_password_hash(user.password) 

578 return User.update_info(user, user_id) 

579 

580 

581@app.get("/commands", dependencies=[Depends(get_current_active_user)]) 

582async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]: 

583 return Command.response_from_query(query) 

584 

585 

586@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

587async def get_campaigns(): 

588 return Campaign.get_all() 

589 

590 

591@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

592async def get_campaign_by_id(campaign_id: str): 

593 campaign = Campaign.get_from_id(campaign_id) 

594 if campaign is None: 

595 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign") 

596 return campaign 

597 

598 

599@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

600async def add_campaign(campaign: Campaign): 

601 new_campaign = Campaign.create(campaign) 

602 if new_campaign is None: 

603 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

604 return new_campaign 

605 

606 

607@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

608async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

609 campaign = Campaign.get_from_id(campaign_id) 

610 if campaign is None: 

611 raise HTTPException(status_code=500, detail="An error occurred during campaign edition") 

612 campaign.name = edit_campaign.name 

613 campaign.description = edit_campaign.description 

614 return Campaign.update(campaign) 

615 

616 

617@app.delete( 

618 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

619) 

620async def delete_campaign(campaign_id: str): 

621 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

622 campaign = Campaign.get_from_id(campaign_id) 

623 if campaign is None: 

624 raise exception 

625 delete_phases = Phase.deleteMany(campaign_id) 

626 if not delete_phases.acknowledged: 

627 raise exception 

628 campaign_deleted = Campaign.delete(campaign_id) 

629 return campaign_deleted.acknowledged 

630 

631 

632@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

633async def get_campaign_phases(campaign_id: str): 

634 return Phase.get_by_attribute("campaign_id", campaign_id) 

635 

636 

637@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

638async def get_phase(phase_id: str): 

639 phase = Phase.get_from_id(phase_id) 

640 if phase is None: 

641 raise HTTPException(status_code=404, detail="Phase not found") 

642 return phase 

643 

644 

645@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

646async def add_phase(phase: Phase): 

647 new_phase = Phase.create(phase) 

648 if new_phase is None: 

649 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

650 return new_phase 

651 

652 

653@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

654async def edit_phase(phase_id, edit_phase: Phase): 

655 phase = Phase.get_from_id(phase_id) 

656 if phase is None: 

657 raise HTTPException(status_code=500, detail="An error occurred during Phase edition") 

658 phase.name = edit_phase.name 

659 phase.description = edit_phase.description 

660 phase.start_at = edit_phase.start_at 

661 phase.end_at = edit_phase.end_at 

662 return Phase.update(phase) 

663 

664 

665@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

666async def delete_phase(phase_id: str): 

667 phase = Phase.get_from_id(phase_id) 

668 if phase is None: 

669 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion") 

670 phase_deleted = Phase.delete(phase_id) 

671 return phase_deleted.acknowledged 

672 

673 

674@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

675async def get_custom_views(): 

676 return CustomView.get_all() 

677 

678 

679@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

680async def get_custom_views_from_user_id(user_id: str): 

681 return CustomView.get_by_attribute("user_id", user_id) 

682 

683 

684@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

685async def get_custom_view(custom_view_id: str): 

686 return CustomView.get_from_id(custom_view_id) 

687 

688 

689@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

690async def create_custom_view( 

691 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

692): 

693 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

694 custom_view.insert() 

695 return custom_view 

696 

697 

698@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

699async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

700 custom_view = CustomView.get_from_id(custom_view_id) 

701 return custom_view.update(custom_view_update.model_dump()) 

702 

703 

704@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

705async def delete_custom_view(custom_view_id: str): 

706 custom_view = CustomView.get_from_id(custom_view_id) 

707 return custom_view.delete() 

708 

709 

710@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

711async def add_video(video: Video): 

712 video.insert() 

713 if not video: 

714 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

715 return video 

716 

717 

718@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

719async def get_videos(): 

720 return Video.get_all() 

721 

722 

723@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

724def get_stream(video_id): 

725 camera_name = Video.get_video(video_id) 

726 if camera_name is None: 

727 raise HTTPException(status_code=404, detail="Camera not found") 

728 return camera_name 

729 

730 

731@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

732async def get_signals_preset(current_user: User = Depends(get_current_active_user)): 

733 return SignalsPreset.get_by_attribute("user_id", current_user.id) 

734 

735 

736@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)]) 

737async def create_signals_preset( 

738 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user) 

739): 

740 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id) 

741 return new_signals_preset 

742 

743 

744@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)]) 

745async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate): 

746 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

747 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True)) 

748 

749 

750@app.delete( 

751 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)] 

752) 

753async def delete_signals_preset(signals_preset_id: str): 

754 signals_preset = SignalsPreset.get_from_id(signals_preset_id) 

755 return signals_preset.delete() 

756 

757 

758@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

759async def create_graph_theme( 

760 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user) 

761): 

762 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id) 

763 if styled_signal is None: 

764 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist") 

765 

766 graph_theme = PrivateGraphTheme.create( 

767 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id 

768 ) 

769 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

770 

771 

772@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)]) 

773async def get_all_graph_themes( 

774 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

775) -> ListResponse[PublicGraphTheme]: 

776 return PublicGraphTheme.response_from_query(query, current_user.id) 

777 

778 

779@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)]) 

780async def get_graph_themes_in_library( 

781 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user) 

782) -> ListResponse[PublicGraphTheme]: 

783 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id) 

784 

785 

786@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)]) 

787async def update_graph_theme( 

788 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user) 

789): 

790 graph_theme = PrivateGraphTheme.get_from_id(theme_id) 

791 update_dict = theme_update.model_dump(exclude_unset=True) 

792 if current_user.id != graph_theme.creator_id: 

793 for theme_property in update_dict.keys(): 

794 if theme_property not in ["active_for_user", "in_user_library"]: 

795 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him") 

796 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id) 

797 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id) 

798 

799 

800@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

801async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)): 

802 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id) 

803 if current_user.id != graph_theme.creator_id: 

804 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him") 

805 return graph_theme.delete() 

806 

807 

808@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)]) 

809async def get_signals_appearances( 

810 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user) 

811) -> dict: 

812 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)