Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 81%

372 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-10 15:38 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 Signal, 

19 SignalData, 

20 SignalSample, 

21 ServicesStatus, 

22 Device, 

23 DeviceUpdate, 

24 DeviceSetup, 

25 DeviceSetupUpdate, 

26 DeviceState, 

27 SignalUpdate, 

28 SignalsData, 

29 Event, 

30 EventRule, 

31 EventDay, 

32 User, 

33 UserUpdate, 

34 Campaign, 

35 Phase, 

36 CustomView, 

37 CustomViewCreation, 

38 CustomViewUpdate, 

39 Video, 

40) 

41from twinpad_backend.auth import ( 

42 Token, 

43 authenticate_user, 

44 get_current_active_user, 

45 ACCESS_TOKEN_EXPIRE_MINUTES, 

46 create_access_token, 

47 get_password_hash, 

48) 

49from twinpad_backend.queries import SignalQuery, DeviceStatesQuery, EventQuery, EventRuleQuery 

50from twinpad_backend.responses import ListResponse 

51 

52REQUEST_TIME_WARNING = 0.5 

53 

54DEBUG = os.environ.get("DEBUG", "false") == "true" 

55PROFILING = os.environ.get("PROFILING", "false") == "true" 

56 

57logger = logging.getLogger("uvicorn.error") 

58logger.propagate = False 

59logger.info("Debug mode: %s", DEBUG) 

60logger.info("log level: %s", logging.root.level) 

61 

62 

63app = FastAPI(title="Twinpad backend", version=__version__) 

64 

65app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

66 

67if PROFILING: 

68 profiling_folder = mkdtemp() 

69 logger.info("Profiling enabled") 

70 

71 @app.middleware("http") 

72 async def profile_request(request: Request, call_next): 

73 should_profile = True 

74 url = str(request.url) 

75 for segment in ["profiling", ".ico"]: 

76 if segment in url: 

77 should_profile = False 

78 break 

79 

80 if should_profile: # avoid recursion 

81 profiler = Profiler() 

82 profiler.start() 

83 await call_next(request) 

84 profiler.stop() 

85 url = "_".join(url.split("/")[3:]).rstrip("/") 

86 if not url: 

87 url = "slash" 

88 # filename = f"{round(time.time())}_{url}" 

89 filename = url.split("?", maxsplit=1)[0] 

90 logger.info("saving profiling to %s", filename) 

91 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

92 profiling_file.write(profiler.output_html()) 

93 return await call_next(request) 

94 

95 @app.get("/profilings") 

96 async def profilings(): 

97 return {"profilings": os.listdir(profiling_folder)} 

98 

99 @app.get("/profilings/{profiling_id}") 

100 async def profiling(profiling_id): 

101 

102 filename = os.path.join(profiling_folder, profiling_id) 

103 if os.path.exists(filename): 

104 with open(filename, "r", encoding="utf-8") as profiling: 

105 return HTMLResponse(profiling.read()) 

106 raise HTTPException( 

107 status_code=404, 

108 detail="Profiling not found", 

109 ) 

110 

111 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings") 

112 

113 

114@app.middleware("http") 

115async def log_request_time(request: Request, call_next): 

116 start_time = time.time() # Record the start time 

117 response = await call_next(request) # Process the request 

118 duration = time.time() - start_time # Calculate the time taken 

119 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

120 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

121 if duration > REQUEST_TIME_WARNING: 

122 logger.warning(message) 

123 else: 

124 logger.info(message) 

125 return response 

126 

127 

128@app.get("/") 

129async def slash(): 

130 return {"twinpad_version": __version__} 

131 

132 

133@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

134async def status(): 

135 """ 

136 Return service healthcheck 

137 """ 

138 return { 

139 "services": ServicesStatus.check(), 

140 } 

141 

142 

143@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

144async def get_devices() -> list[Device]: 

145 return Device.get_all(sort_by="device_id") 

146 

147 

148@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

149async def get_device(device_id): 

150 device = Device.get_one_by_attribute("device_id", device_id) 

151 if not device: 

152 raise HTTPException( 

153 status_code=404, 

154 detail="Device not found", 

155 ) 

156 return device 

157 

158 

159@app.patch("/devices/{device_id}", response_model=DeviceUpdate, dependencies=[Depends(get_current_active_user)]) 

160async def update_item(device_id: str, device_update: DeviceUpdate): 

161 device = Device.get_one_by_attribute("device_id", device_id) 

162 if not device: 

163 raise HTTPException( 

164 status_code=404, 

165 detail="Device not found", 

166 ) 

167 device.update(device_update) 

168 return device_update 

169 

170 

171@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

172async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

173 return DeviceState.get_from_id_and_query(device_id, query) 

174 

175 

176@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

177async def get_device_setups() -> list[DeviceSetup]: 

178 return DeviceSetup.get_all() 

179 

180 

181@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

182async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

183 device_setup.insert() 

184 return device_setup 

185 

186 

187@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

188async def get_device_setup(device_setup_id: str): 

189 device_setup = DeviceSetup.get_from_id(device_setup_id) 

190 if device_setup is None: 

191 raise HTTPException( 

192 status_code=404, 

193 detail="Device setup not found", 

194 ) 

195 return device_setup 

196 

197 

198@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

199async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

200 device_setup = DeviceSetup.get_from_id(device_setup_id) 

201 if device_setup is None: 

202 raise HTTPException( 

203 status_code=404, 

204 detail="Device setup not found", 

205 ) 

206 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

207 return device_setup 

208 

209 

210@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

211async def delete_device_setups(device_setup_id: str) -> bool: 

212 device_setup = DeviceSetup.get_from_id(device_setup_id) 

213 if device_setup is None: 

214 raise HTTPException( 

215 status_code=404, 

216 detail="Device setup not found", 

217 ) 

218 deleted = device_setup.delete() 

219 return deleted 

220 

221 

222@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

223async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

224 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

225 

226 

227@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)]) 

228async def signals_names() -> list[str]: 

229 return Signal.get_all_ids() 

230 

231 

232@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

233async def signal_stats(): 

234 """ 

235 Returns signals stats 

236 """ 

237 # profiler = Profiler() 

238 # profiler.start() 

239 

240 number_samples = 0 

241 number_active_signals = 0 

242 signals = [Signal.get_from_signal_id(signal_id=sid) for sid in get_signals_ids_from_collection_names()] 

243 

244 for s in signals: 

245 if s is not None: 

246 number_samples += await s.number_samples() 

247 if s.status.status == "up": 

248 number_active_signals += 1 

249 

250 number_signals = Signal.get_number_documents() 

251 

252 # profiler.stop() 

253 # filename = "signals_stats_profiling.html" 

254 # full_file_path = os.path.join(Path.home(), filename) 

255 # logger.info("Saving profiling to %s", full_file_path) 

256 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

257 # profiling_file.write(profiler.output_html()) 

258 

259 return { 

260 "signal_data_size": signal_datasize(), 

261 "number_signal_samples": number_samples, 

262 "number_active_signals": number_active_signals, 

263 "number_signals": number_signals, 

264 } 

265 

266 

267@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

268async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

269 return SignalSample.get_last_from_signal_ids(signal_ids) 

270 

271 

272@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

273async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

274 return SignalSample.get_first_from_signal_ids(signal_ids) 

275 

276 

277@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)]) 

278async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]: 

279 return Signal.get_forcibility(signal_ids) 

280 

281 

282@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

283async def get_signal(signal_id): 

284 signal = Signal.get_from_signal_id(signal_id) 

285 if not signal: 

286 raise HTTPException( 

287 status_code=404, 

288 detail="Signal not found", 

289 ) 

290 return signal.to_dict() 

291 

292 

293@app.patch("/signals/{signal_id}", response_model=SignalUpdate, dependencies=[Depends(get_current_active_user)]) 

294async def update_signal(signal_id: str, signal_update: SignalUpdate): 

295 signal = Signal.get_from_signal_id(signal_id) 

296 if not signal: 

297 raise HTTPException( 

298 status_code=404, 

299 detail="Device not found", 

300 ) 

301 signal.update(signal_update) 

302 return signal_update 

303 

304 

305@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

306async def get_signal_data( 

307 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

308) -> SignalData | None: 

309 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

310 

311 if number_samples_max is not None: 

312 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

313 

314 return signal_data 

315 

316 

317@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

318async def get_last_value(signal_id) -> SignalSample: 

319 sample = SignalSample.get_last_from_signal_id(signal_id) 

320 if sample is None: 

321 raise HTTPException(status_code=404, detail="No data") 

322 return sample 

323 

324 

325@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

326async def get_first_value(signal_id) -> SignalSample: 

327 sample = SignalSample.get_first_from_signal_id(signal_id) 

328 if sample is None: 

329 raise HTTPException(status_code=404, detail="No data") 

330 return sample 

331 

332 

333@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

334async def get_signal_number_samples(signal_id): 

335 signal = Signal.get_from_signal_id(signal_id) 

336 if not signal: 

337 raise HTTPException( 

338 status_code=404, 

339 detail="Device not found", 

340 ) 

341 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()} 

342 

343 

344@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

345async def get_signals_data( 

346 signal_ids: list[str] = Query(default=[]), 

347 number_samples_max: int = None, 

348 min_timestamp: float = None, 

349 max_timestamp: float = None, 

350 interpolate_bounds: bool = True, 

351) -> SignalsData | None: 

352 # profiler = Profiler() 

353 # profiler.start() 

354 

355 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

356 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

357 

358 signals_data = SignalsData.get_from_signal_ids( 

359 signal_ids, 

360 min_timestamp=min_timestamp, 

361 max_timestamp=max_timestamp, 

362 window_min_timestamp=min_timestamp, 

363 window_max_timestamp=max_timestamp, 

364 interpolate_bounds=interpolate_bounds, 

365 ) 

366 if number_samples_max is not None: 

367 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

368 

369 # profiler.stop() 

370 # filename = "signals-data.html" 

371 # full_file_path = os.path.join(Path.home(), filename) 

372 # logger.info(f"Saving profiling to %s", full_file_path) 

373 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

374 # profiling_file.write(profiler.output_html()) 

375 

376 return signals_data 

377 

378 

379@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

380async def get_signals_data_interest_window( 

381 window_max_number_samples: int = 600, 

382 outside_max_number_samples: int = 150, 

383 window_min_timestamp: float = None, 

384 window_max_timestamp: float = None, 

385 signal_ids: list[str] = Query(default=[]), 

386 min_timestamp: float = None, 

387 max_timestamp: float = None, 

388) -> SignalsData | None: 

389 # profiler = Profiler() 

390 # profiler.start() 

391 

392 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

393 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

394 

395 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

396 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

397 

398 signals_data = SignalsData.get_from_signal_ids( 

399 signal_ids, 

400 min_timestamp=min_timestamp, 

401 max_timestamp=max_timestamp, 

402 window_min_timestamp=window_min_timestamp, 

403 window_max_timestamp=window_max_timestamp, 

404 max_documents=10 * (window_max_number_samples + outside_max_number_samples), 

405 ) 

406 

407 signals_data = signals_data.interest_window_desampling( 

408 window_max_number_samples=window_max_number_samples, 

409 outside_max_number_samples=outside_max_number_samples, 

410 window_min_timestamp=window_min_timestamp, 

411 window_max_timestamp=window_max_timestamp, 

412 ) 

413 

414 # profiler.stop() 

415 # filename = "signals-data.html" 

416 # full_file_path = os.path.join(Path.home(), filename) 

417 # logger.info(f"Saving profiling to %s", full_file_path) 

418 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

419 # profiling_file.write(profiler.output_html()) 

420 

421 return signals_data 

422 

423 

424@app.get("/signals-data/export", dependencies=[Depends(get_current_active_user)]) 

425async def export_signals_zip( 

426 file_format: str, 

427 signal_ids: list[str] = Query(default=[]), 

428 min_timestamp: float = None, 

429 max_timestamp: float = None, 

430): 

431 signals_data = SignalsData.get_from_signal_ids( 

432 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False 

433 ) 

434 zip_data = signals_data.zip_export(file_format) 

435 return Response( 

436 content=zip_data, 

437 media_type="application/zip", 

438 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

439 ) 

440 

441 

442@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

443async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

444 return Event.response_from_query(query) 

445 

446 

447@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

448async def get_event(event_id) -> Event: 

449 event = Event.get_from_id(event_id) 

450 if event is None: 

451 raise HTTPException(status_code=404, detail="No such event") 

452 return event 

453 

454 

455@app.get("/number-events", dependencies=[Depends(get_current_active_user)]) 

456async def get_number_events( 

457 min_timestamp: float | int, max_timestamp: float | int, recompute_events: bool = False 

458) -> list[EventDay]: 

459 return EventDay.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_events) 

460 

461 

462@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

463async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

464 return EventRule.response_from_query(query) 

465 

466 

467@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

468async def get_event_rule(event_rule_id) -> EventRule: 

469 event_rule = EventRule.get_from_id(event_rule_id) 

470 if event_rule is None: 

471 raise HTTPException(status_code=404, detail="No such event rule") 

472 return event_rule 

473 

474 

475@app.post("/users", status_code=201) 

476async def create_user(user: User): 

477 if User.get_one_by_attribute("user", user.email) is not None: 

478 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

479 hashed_password = get_password_hash(user.password) 

480 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

481 if new_user is None: 

482 raise HTTPException(status_code=400, details="An error occurred during account creation") 

483 return new_user 

484 

485 

486@app.post("/token", status_code=201) 

487async def login_for_access_token( 

488 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

489) -> Token: 

490 user = authenticate_user(form_data.username, form_data.password) 

491 if not user: 

492 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

493 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

494 if user.is_active: 

495 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

496 access_token = create_access_token( 

497 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

498 ) 

499 return Token(access_token=access_token, token_type="bearer") 

500 

501 

502@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

503async def get_users(): 

504 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

505 

506 

507@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

508async def get_user(user_id: str, current_user: Annotated[User, Depends(get_current_active_user)]): 

509 user = None 

510 

511 if user_id == "me": 

512 user = current_user 

513 else: 

514 user = User.get_from_id(user_id) 

515 

516 if user is None: 

517 raise HTTPException( 

518 status_code=404, 

519 detail="User not found", 

520 ) 

521 return user 

522 

523 

524@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

525async def patch_user(user: UserUpdate, user_id): 

526 if user.password == "" or user.password is None: 

527 del user.password 

528 else: 

529 user.password = get_password_hash(user.password) 

530 return User.update(user, user_id) 

531 

532 

533@app.post("/users/me", response_model=User) 

534async def read_users_me( 

535 current_user: Annotated[User, Depends(get_current_active_user)], 

536): 

537 del (current_user.password, current_user.is_active, current_user.is_connected) 

538 return current_user 

539 

540 

541@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

542async def get_campaigns(): 

543 return Campaign.get_all() 

544 

545 

546@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

547async def get_campaign_by_id(campaign_id: str): 

548 campaign = Campaign.get_from_id(campaign_id) 

549 if campaign is None: 

550 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign") 

551 return campaign 

552 

553 

554@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

555async def add_campaign(campaign: Campaign): 

556 new_campaign = Campaign.create(campaign) 

557 if new_campaign is None: 

558 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

559 return new_campaign 

560 

561 

562@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

563async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

564 campaign = Campaign.get_from_id(campaign_id) 

565 if campaign is None: 

566 raise HTTPException(status_code=500, detail="An error occurred during campaign edition") 

567 campaign.name = edit_campaign.name 

568 campaign.description = edit_campaign.description 

569 return Campaign.update(campaign) 

570 

571 

572@app.delete( 

573 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

574) 

575async def delete_campaign(campaign_id: str): 

576 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

577 campaign = Campaign.get_from_id(campaign_id) 

578 if campaign is None: 

579 raise exception 

580 delete_phases = Phase.deleteMany(campaign_id) 

581 if not delete_phases.acknowledged: 

582 raise exception 

583 campaign_deleted = Campaign.delete(campaign_id) 

584 return campaign_deleted.acknowledged 

585 

586 

587@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

588async def get_campaign_phases(campaign_id: str): 

589 return Phase.get_all(campaign_id) 

590 

591 

592@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

593async def get_phase(phase_id: str): 

594 return Phase.get_from_id(phase_id) 

595 

596 

597@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

598async def add_phase(phase: Phase): 

599 new_phase = Phase.create(phase) 

600 if new_phase is None: 

601 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

602 return new_phase 

603 

604 

605@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

606async def edit_phase(phase_id, edit_phase: Phase): 

607 phase = Phase.get_from_id(phase_id) 

608 if phase is None: 

609 raise HTTPException(status_code=500, detail="An error occurred during Phase edition") 

610 phase.name = edit_phase.name 

611 phase.description = edit_phase.description 

612 phase.start_at = edit_phase.start_at 

613 phase.end_at = edit_phase.end_at 

614 return Phase.update(phase) 

615 

616 

617@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

618async def delete_phase(phase_id: str): 

619 phase = Phase.get_from_id(phase_id) 

620 if phase is None: 

621 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion") 

622 phase_deleted = Phase.delete(phase_id) 

623 return phase_deleted.acknowledged 

624 

625 

626@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

627async def get_custom_views(): 

628 return CustomView.get_all() 

629 

630 

631@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

632async def get_custom_views_from_user_id(user_id: str): 

633 return CustomView.get_by_attribute("user_id", user_id) 

634 

635 

636@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

637async def get_custom_view(custom_view_id: str): 

638 return CustomView.get_from_id(custom_view_id) 

639 

640 

641@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

642async def create_custom_view( 

643 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user) 

644): 

645 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id) 

646 custom_view.insert() 

647 return custom_view 

648 

649 

650@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

651async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate): 

652 custom_view = CustomView.get_from_id(custom_view_id) 

653 return custom_view.update(custom_view_update.model_dump()) 

654 

655 

656@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

657async def delete_custom_view(custom_view_id: str): 

658 custom_view = CustomView.get_from_id(custom_view_id) 

659 return custom_view.delete() 

660 

661 

662@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)]) 

663async def add_video(video: Video): 

664 video.insert() 

665 if not video: 

666 raise HTTPException(status_code=500, detail="An error occurred during cctv creation") 

667 return video 

668 

669 

670@app.get("/videos", dependencies=[Depends(get_current_active_user)]) 

671async def get_videos(): 

672 return Video.get_all() 

673 

674 

675@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)]) 

676def get_stream(video_id): 

677 camera_name = Video.get_video(video_id) 

678 if not camera_name: 

679 raise HTTPException(status_code=404, detail="Camera not found") 

680 return camera_name