Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 79%

351 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-16 14:43 +0000

1import os 

2import logging 

3import time 

4from tempfile import mkdtemp 

5from typing import Annotated 

6from datetime import timedelta 

7from pathlib import Path 

8from pyinstrument import Profiler 

9 

10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request 

11from fastapi.middleware.cors import CORSMiddleware 

12from fastapi.responses import HTMLResponse 

13from fastapi.security import OAuth2PasswordRequestForm 

14 

15from twinpad_backend import __version__ 

16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names 

17from twinpad_backend.models import ( 

18 Signal, 

19 SignalData, 

20 SignalSample, 

21 ServicesStatus, 

22 Device, 

23 DeviceUpdate, 

24 DeviceSetup, 

25 DeviceSetupUpdate, 

26 DeviceState, 

27 SignalUpdate, 

28 SignalsData, 

29 Event, 

30 EventRule, 

31 User, 

32 UserUpdate, 

33 Campaign, 

34 Phase, 

35 CustomView, 

36) 

37from twinpad_backend.auth import ( 

38 Token, 

39 authenticate_user, 

40 get_current_active_user, 

41 ACCESS_TOKEN_EXPIRE_MINUTES, 

42 create_access_token, 

43 get_password_hash, 

44) 

45from twinpad_backend.queries import SignalQuery, DeviceStatesQuery, EventQuery, EventRuleQuery 

46from twinpad_backend.responses import ListResponse 

47 

48REQUEST_TIME_WARNING = 0.5 

49 

50DEBUG = os.environ.get("DEBUG", "false") == "true" 

51PROFILING = os.environ.get("PROFILING", "false") == "true" 

52 

53logger = logging.getLogger("uvicorn.error") 

54logger.propagate = False 

55logger.info("Debug mode: %s", DEBUG) 

56logger.info("log level: %s", logging.root.level) 

57 

58 

59app = FastAPI(title="Twinpad backend", version=__version__) 

60 

61app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) 

62 

63if PROFILING: 

64 profiling_folder = mkdtemp() 

65 logger.info(f"Profiling enabled") 

66 from pyinstrument import Profiler 

67 

68 @app.middleware("http") 

69 async def profile_request(request: Request, call_next): 

70 should_profile = True 

71 url = str(request.url) 

72 for segment in ["profiling", ".ico"]: 

73 if segment in url: 

74 should_profile = False 

75 break 

76 

77 if should_profile: # avoid recursion 

78 profiler = Profiler() 

79 profiler.start() 

80 await call_next(request) 

81 profiler.stop() 

82 url = "_".join(url.split("/")[3:]).rstrip("/") 

83 if not url: 

84 url = "slash" 

85 # filename = f"{round(time.time())}_{url}" 

86 filename = url.split("?", maxsplit=1)[0] 

87 logger.info("saving profiling to %s", filename) 

88 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file: 

89 profiling_file.write(profiler.output_html()) 

90 return await call_next(request) 

91 

92 @app.get("/profilings") 

93 async def profilings(): 

94 return {"profilings": os.listdir(profiling_folder)} 

95 

96 @app.get("/profilings/{profiling_id}") 

97 async def profiling(profiling_id): 

98 

99 filename = os.path.join(profiling_folder, profiling_id) 

100 if os.path.exists(filename): 

101 with open(filename, "r", encoding="utf-8") as profiling: 

102 return HTMLResponse(profiling.read()) 

103 raise HTTPException( 

104 status_code=404, 

105 detail="Profiling not found", 

106 ) 

107 

108 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings") 

109 

110 

111@app.middleware("http") 

112async def log_request_time(request: Request, call_next): 

113 start_time = time.time() # Record the start time 

114 response = await call_next(request) # Process the request 

115 duration = time.time() - start_time # Calculate the time taken 

116 client_ip = request.headers.get("x-forwarded-for", request.client.host) 

117 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms" 

118 if duration > REQUEST_TIME_WARNING: 

119 logger.warning(message) 

120 else: 

121 logger.info(message) 

122 return response 

123 

124 

125@app.get("/") 

126async def slash(): 

127 return {"twinpad_version": __version__} 

128 

129 

130@app.get("/status", dependencies=[Depends(get_current_active_user)]) 

131async def status(): 

132 """ 

133 Return service healthcheck 

134 """ 

135 return { 

136 "services": ServicesStatus.check(), 

137 } 

138 

139 

140@app.get("/devices", dependencies=[Depends(get_current_active_user)]) 

141async def get_devices() -> list[Device]: 

142 return Device.get_all(sort_by="device_id") 

143 

144 

145@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)]) 

146async def get_device(device_id): 

147 device = Device.get_one_by_attribute("device_id", device_id) 

148 if not device: 

149 raise HTTPException( 

150 status_code=404, 

151 detail="Device not found", 

152 ) 

153 return device 

154 

155 

156@app.patch("/devices/{device_id}", response_model=DeviceUpdate, dependencies=[Depends(get_current_active_user)]) 

157async def update_item(device_id: str, device_update: DeviceUpdate): 

158 device = Device.get_one_by_attribute("device_id", device_id) 

159 if not device: 

160 raise HTTPException( 

161 status_code=404, 

162 detail="Device not found", 

163 ) 

164 device.update(device_update) 

165 return device_update 

166 

167 

168@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)]) 

169async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]: 

170 return DeviceState.get_from_id_and_query(device_id, query) 

171 

172 

173@app.get("/device-setups", dependencies=[Depends(get_current_active_user)]) 

174async def get_device_setups() -> list[DeviceSetup]: 

175 return DeviceSetup.get_all() 

176 

177 

178@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201) 

179async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup: 

180 device_setup.insert() 

181 return device_setup 

182 

183 

184@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

185async def get_device_setup(device_setup_id: str): 

186 device_setup = DeviceSetup.get_from_id(device_setup_id) 

187 if device_setup is None: 

188 raise HTTPException( 

189 status_code=404, 

190 detail="Device setup not found", 

191 ) 

192 return device_setup 

193 

194 

195@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)]) 

196async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup: 

197 device_setup = DeviceSetup.get_from_id(device_setup_id) 

198 if device_setup is None: 

199 raise HTTPException( 

200 status_code=404, 

201 detail="Device setup not found", 

202 ) 

203 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None}) 

204 return device_setup 

205 

206 

207@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

208async def delete_device_setups(device_setup_id: str) -> bool: 

209 device_setup = DeviceSetup.get_from_id(device_setup_id) 

210 if device_setup is None: 

211 raise HTTPException( 

212 status_code=404, 

213 detail="Device setup not found", 

214 ) 

215 deleted = device_setup.delete() 

216 return deleted 

217 

218 

219@app.get("/signals", dependencies=[Depends(get_current_active_user)]) 

220async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]: 

221 return Signal.response_from_query(query).to_dict(exclude={"device"}) 

222 

223 

224@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)]) 

225async def signal_stats(): 

226 """ 

227 Returns signals stats 

228 """ 

229 # profiler = Profiler() 

230 # profiler.start() 

231 

232 number_samples = 0 

233 number_active_signals = 0 

234 signals = [Signal.get_from_signal_id(signal_id=sid) for sid in get_signals_ids_from_collection_names()] 

235 

236 for s in signals: 

237 if s is not None: 

238 number_samples += s.number_samples() 

239 if s.status.status == "up": 

240 number_active_signals += 1 

241 

242 # profiler.stop() 

243 # filename = "signals_stats_profiling.html" 

244 # full_file_path = os.path.join(Path.home(), filename) 

245 # logger.info(f"Saving profiling to {full_file_path}") 

246 # with open(full_file_path, "w", encoding="utf-8") as profiling_file: 

247 # profiling_file.write(profiler.output_html()) 

248 

249 return { 

250 "signal_data_size": signal_datasize(), 

251 "number_signal_samples": number_samples, 

252 "number_active_signals": number_active_signals, 

253 "number_signals": len(signals), 

254 } 

255 

256 

257@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)]) 

258async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

259 return SignalSample.get_last_from_signal_ids(signal_ids) 

260 

261 

262@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)]) 

263async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]: 

264 return SignalSample.get_first_from_signal_ids(signal_ids) 

265 

266 

267@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)]) 

268async def get_signal(signal_id): 

269 signal = Signal.get_from_signal_id(signal_id) 

270 if not signal: 

271 raise HTTPException( 

272 status_code=404, 

273 detail="Signal not found", 

274 ) 

275 return signal.to_dict() 

276 

277 

278@app.patch("/signals/{signal_id}", response_model=SignalUpdate, dependencies=[Depends(get_current_active_user)]) 

279async def update_signal(signal_id: str, signal_update: SignalUpdate): 

280 signal = Signal.get_from_signal_id(signal_id) 

281 if not signal: 

282 raise HTTPException( 

283 status_code=404, 

284 detail="Device not found", 

285 ) 

286 signal.update(signal_update) 

287 return signal_update 

288 

289 

290@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)]) 

291async def get_signal_data( 

292 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None 

293) -> SignalData | None: 

294 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

295 

296 if number_samples_max is not None: 

297 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max) 

298 

299 return signal_data 

300 

301 

302@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)]) 

303async def get_last_value(signal_id) -> SignalSample: 

304 sample = SignalSample.get_last_from_signal_id(signal_id) 

305 if sample is None: 

306 raise HTTPException(status_code=404, detail="No data") 

307 return sample 

308 

309 

310@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)]) 

311async def get_first_value(signal_id) -> SignalSample: 

312 sample = SignalSample.get_first_from_signal_id(signal_id) 

313 if sample is None: 

314 raise HTTPException(status_code=404, detail="No data") 

315 return sample 

316 

317 

318@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)]) 

319async def get_signal_number_samples(signal_id): 

320 signal = Signal.get_from_signal_id(signal_id) 

321 if not signal: 

322 raise HTTPException( 

323 status_code=404, 

324 detail="Device not found", 

325 ) 

326 return {"signal_id": signal_id, "number_samples": signal.number_samples(), "size": signal.sample_datasize()} 

327 

328 

329@app.get("/signals-data", dependencies=[Depends(get_current_active_user)]) 

330async def get_signals_data( 

331 signal_ids: list[str] = Query(default=[]), 

332 number_samples_max: int = None, 

333 min_timestamp: float = None, 

334 max_timestamp: float = None, 

335) -> SignalsData | None: 

336 

337 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

338 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

339 

340 signals_data = SignalsData.get_from_signal_ids(signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

341 if number_samples_max is not None: 

342 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max) 

343 return signals_data 

344 

345 

346@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)]) 

347async def get_signals_data_interest_window( 

348 window_max_number_samples: int = 600, 

349 outside_max_number_samples: int = 150, 

350 window_min_timestamp: float = None, 

351 window_max_timestamp: float = None, 

352 signal_ids: list[str] = Query(default=[]), 

353 min_timestamp: float = None, 

354 max_timestamp: float = None, 

355) -> SignalsData | None: 

356 

357 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp: 

358 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp") 

359 

360 if min_timestamp and max_timestamp and min_timestamp > max_timestamp: 

361 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp") 

362 

363 signals_data = SignalsData.get_from_signal_ids( 

364 signal_ids, 

365 min_timestamp=min_timestamp, 

366 max_timestamp=max_timestamp, 

367 max_documents=10 * (window_max_number_samples + outside_max_number_samples), 

368 ) 

369 

370 signals_data = signals_data.interest_window_desampling( 

371 window_max_number_samples=window_max_number_samples, 

372 outside_max_number_samples=outside_max_number_samples, 

373 window_min_timestamp=window_min_timestamp, 

374 window_max_timestamp=window_max_timestamp, 

375 ) 

376 

377 return signals_data 

378 

379 

380@app.get("/signals-data/export", dependencies=[Depends(get_current_active_user)]) 

381async def export_signals_zip( 

382 format: str, signal_ids: list[str] = Query(default=[]), min_timestamp: float = None, max_timestamp: float = None 

383): 

384 signals_data = SignalsData.get_from_signal_ids(signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp) 

385 zip_data = signals_data.zip_export(format) 

386 return Response( 

387 content=zip_data, 

388 media_type="application/zip", 

389 headers={"Content-Disposition": 'attachment; filename="signals.zip"'}, 

390 ) 

391 

392 

393@app.get("/events", dependencies=[Depends(get_current_active_user)]) 

394async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]: 

395 return Event.response_from_query(query) 

396 

397 

398@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)]) 

399async def get_event(event_id) -> Event: 

400 event = Event.get_from_id(event_id) 

401 if event is None: 

402 raise HTTPException(status_code=404, detail="No such event") 

403 return event 

404 

405 

406@app.get("/event-rules", dependencies=[Depends(get_current_active_user)]) 

407async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]: 

408 return EventRule.response_from_query(query) 

409 

410 

411@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)]) 

412async def get_event_rule(event_rule_id) -> EventRule: 

413 event_rule = EventRule.get_from_id(event_rule_id) 

414 if event_rule is None: 

415 raise HTTPException(status_code=404, detail="No such event rule") 

416 return event_rule 

417 

418 

419@app.post("/users", status_code=201) 

420async def create_user(user: User): 

421 if User.get_one_by_attribute("user", user.email) is not None: 

422 raise HTTPException(status_code=400, detail="An error occurred during account creation") 

423 hashed_password = get_password_hash(user.password) 

424 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False) 

425 if new_user is None: 

426 raise HTTPException(status_code=400, details="An error occurred during account creation") 

427 return new_user 

428 

429 

430@app.post("/token", status_code=201) 

431async def login_for_access_token( 

432 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

433) -> Token: 

434 user = authenticate_user(form_data.username, form_data.password) 

435 if not user: 

436 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"}) 

437 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES)) 

438 if user.is_active: 

439 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"}) 

440 access_token = create_access_token( 

441 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires 

442 ) 

443 return Token(access_token=access_token, token_type="bearer") 

444 

445 

446@app.get("/users", dependencies=[Depends(get_current_active_user)]) 

447async def get_users(): 

448 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")] 

449 

450 

451@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)]) 

452async def get_user(user_id): 

453 user = None 

454 if user_id is not None: 

455 user = User.get_from_id(user_id) 

456 if user is None: 

457 raise HTTPException( 

458 status_code=404, 

459 detail="User not found", 

460 ) 

461 return user 

462 

463 

464@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)]) 

465async def patch_user(user: UserUpdate, user_id): 

466 if user.password == "" or user.password is None: 

467 del user.password 

468 else: 

469 user.password = get_password_hash(user.password) 

470 return User.update(user, user_id) 

471 

472 

473@app.post("/users/me", response_model=User) 

474async def read_users_me( 

475 current_user: Annotated[User, Depends(get_current_active_user)], 

476): 

477 del (current_user.password, current_user.is_active, current_user.is_connected) 

478 return current_user 

479 

480 

481@app.post("/authenticated") 

482async def authenticated(current_user: Annotated[User, Depends(get_current_active_user)]): 

483 return isinstance(current_user, User) 

484 

485 

486@app.get("/is_admin", response_model=bool) 

487async def is_admin(current_user: Annotated[User, Depends(get_current_active_user)]): 

488 if not current_user.is_admin: 

489 return False 

490 return True 

491 

492 

493@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)]) 

494async def get_campaigns(): 

495 return Campaign.get_all() 

496 

497 

498@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

499async def get_campaign_by_id(campaign_id: str): 

500 campaign = Campaign.get_from_id(campaign_id) 

501 if campaign is None: 

502 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign") 

503 return campaign 

504 

505 

506@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201) 

507async def add_campaign(campaign: Campaign): 

508 new_campaign = Campaign.create(campaign) 

509 if new_campaign is None: 

510 raise HTTPException(status_code=500, detail="An error occurred during campaign creation") 

511 return new_campaign 

512 

513 

514@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)]) 

515async def edit_campaign(campaign_id: str, edit_campaign: Campaign): 

516 campaign = Campaign.get_from_id(campaign_id) 

517 if campaign is None: 

518 raise HTTPException(status_code=500, detail="An error occurred during campaign edition") 

519 campaign.name = edit_campaign.name 

520 campaign.description = edit_campaign.description 

521 return Campaign.update(campaign) 

522 

523 

524@app.delete( 

525 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200 

526) 

527async def delete_campaign(campaign_id: str): 

528 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion") 

529 campaign = Campaign.get_from_id(campaign_id) 

530 if campaign is None: 

531 raise exception 

532 delete_phases = Phase.deleteMany(campaign_id) 

533 if not delete_phases.acknowledged: 

534 raise exception 

535 campaign_deleted = Campaign.delete(campaign_id) 

536 return campaign_deleted.acknowledged 

537 

538 

539@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)]) 

540async def get_campaign_phases(campaign_id: str): 

541 return Phase.get_all(campaign_id) 

542 

543 

544@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

545async def get_phase(phase_id: str): 

546 return Phase.get_from_id(phase_id) 

547 

548 

549@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201) 

550async def add_phase(phase: Phase): 

551 new_phase = Phase.create(phase) 

552 if new_phase is None: 

553 raise HTTPException(status_code=500, detail="An error occurred during phase creation") 

554 return new_phase 

555 

556 

557@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)]) 

558async def edit_phase(phase_id, edit_phase: Phase): 

559 phase = Phase.get_from_id(phase_id) 

560 if phase is None: 

561 raise HTTPException(status_code=500, detail="An error occurred during Phase edition") 

562 phase.name = edit_phase.name 

563 phase.description = edit_phase.description 

564 phase.start_at = edit_phase.start_at 

565 phase.end_at = edit_phase.end_at 

566 return Phase.update(phase) 

567 

568 

569@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200) 

570async def delete_phase(phase_id: str): 

571 phase = Phase.get_from_id(phase_id) 

572 if phase is None: 

573 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion") 

574 phase_deleted = Phase.delete(phase_id) 

575 return phase_deleted.acknowledged 

576 

577 

578@app.get("/custom-views", dependencies=[Depends(get_current_active_user)]) 

579async def get_custom_views(): 

580 return CustomView.get_all() 

581 

582 

583@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)]) 

584async def get_custom_views_from_user_id(user_id: str): 

585 return CustomView.get_by_attribute("user_id", user_id) 

586 

587 

588@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

589async def get_custom_view(custom_view_id: str): 

590 return CustomView.get_from_id(custom_view_id) 

591 

592 

593@app.post("/custom-views", dependencies=[Depends(get_current_active_user)]) 

594async def create_custom_view(custom_view: CustomView): 

595 return CustomView.create(custom_view) 

596 

597 

598@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)]) 

599async def update_custom_views(custom_view: CustomView, custom_view_id: str): 

600 return CustomView.update(custom_view, custom_view_id) 

601 

602 

603@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]) 

604async def delete_custom_view(custom_view_id: str): 

605 return CustomView.delete(custom_view_id)