Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 79%
351 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-03 07:30 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-03 07:30 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 SignalData,
20 SignalSample,
21 ServicesStatus,
22 Device,
23 DeviceUpdate,
24 DeviceSetup,
25 DeviceSetupUpdate,
26 DeviceState,
27 SignalUpdate,
28 SignalsData,
29 Event,
30 EventRule,
31 User,
32 UserUpdate,
33 Campaign,
34 Phase,
35 CustomView,
36)
37from twinpad_backend.auth import (
38 Token,
39 authenticate_user,
40 get_current_active_user,
41 ACCESS_TOKEN_EXPIRE_MINUTES,
42 create_access_token,
43 get_password_hash,
44)
45from twinpad_backend.queries import SignalQuery, DeviceStatesQuery, EventQuery, EventRuleQuery
46from twinpad_backend.responses import ListResponse
48REQUEST_TIME_WARNING = 0.5
50DEBUG = os.environ.get("DEBUG", "false") == "true"
51PROFILING = os.environ.get("PROFILING", "false") == "true"
53logger = logging.getLogger("uvicorn.error")
54logger.propagate = False
55logger.info("Debug mode: %s", DEBUG)
56logger.info("log level: %s", logging.root.level)
59app = FastAPI(title="Twinpad backend", version=__version__)
61app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
63if PROFILING:
64 profiling_folder = mkdtemp()
65 logger.info(f"Profiling enabled")
66 from pyinstrument import Profiler
68 @app.middleware("http")
69 async def profile_request(request: Request, call_next):
70 should_profile = True
71 url = str(request.url)
72 for segment in ["profiling", ".ico"]:
73 if segment in url:
74 should_profile = False
75 break
77 if should_profile: # avoid recursion
78 profiler = Profiler()
79 profiler.start()
80 await call_next(request)
81 profiler.stop()
82 url = "_".join(url.split("/")[3:]).rstrip("/")
83 if not url:
84 url = "slash"
85 # filename = f"{round(time.time())}_{url}"
86 filename = url.split("?", maxsplit=1)[0]
87 logger.info("saving profiling to %s", filename)
88 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
89 profiling_file.write(profiler.output_html())
90 return await call_next(request)
92 @app.get("/profilings")
93 async def profilings():
94 return {"profilings": os.listdir(profiling_folder)}
96 @app.get("/profilings/{profiling_id}")
97 async def profiling(profiling_id):
99 filename = os.path.join(profiling_folder, profiling_id)
100 if os.path.exists(filename):
101 with open(filename, "r", encoding="utf-8") as profiling:
102 return HTMLResponse(profiling.read())
103 raise HTTPException(
104 status_code=404,
105 detail="Profiling not found",
106 )
108 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings")
111@app.middleware("http")
112async def log_request_time(request: Request, call_next):
113 start_time = time.time() # Record the start time
114 response = await call_next(request) # Process the request
115 duration = time.time() - start_time # Calculate the time taken
116 client_ip = request.headers.get("x-forwarded-for", request.client.host)
117 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
118 if duration > REQUEST_TIME_WARNING:
119 logger.warning(message)
120 else:
121 logger.info(message)
122 return response
125@app.get("/")
126async def slash():
127 return {"twinpad_version": __version__}
130@app.get("/status", dependencies=[Depends(get_current_active_user)])
131async def status():
132 """
133 Return service healthcheck
134 """
135 return {
136 "services": ServicesStatus.check(),
137 }
140@app.get("/devices", dependencies=[Depends(get_current_active_user)])
141async def get_devices() -> list[Device]:
142 return Device.get_all(sort_by="device_id")
145@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
146async def get_device(device_id):
147 device = Device.get_one_by_attribute("device_id", device_id)
148 if not device:
149 raise HTTPException(
150 status_code=404,
151 detail="Device not found",
152 )
153 return device
156@app.patch("/devices/{device_id}", response_model=DeviceUpdate, dependencies=[Depends(get_current_active_user)])
157async def update_item(device_id: str, device_update: DeviceUpdate):
158 device = Device.get_one_by_attribute("device_id", device_id)
159 if not device:
160 raise HTTPException(
161 status_code=404,
162 detail="Device not found",
163 )
164 device.update(device_update)
165 return device_update
168@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
169async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
170 return DeviceState.get_from_id_and_query(device_id, query)
173@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
174async def get_device_setups() -> list[DeviceSetup]:
175 return DeviceSetup.get_all()
178@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
179async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
180 device_setup.insert()
181 return device_setup
184@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
185async def get_device_setup(device_setup_id: str):
186 device_setup = DeviceSetup.get_from_id(device_setup_id)
187 if device_setup is None:
188 raise HTTPException(
189 status_code=404,
190 detail="Device setup not found",
191 )
192 return device_setup
195@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
196async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
197 device_setup = DeviceSetup.get_from_id(device_setup_id)
198 if device_setup is None:
199 raise HTTPException(
200 status_code=404,
201 detail="Device setup not found",
202 )
203 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
204 return device_setup
207@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
208async def delete_device_setups(device_setup_id: str) -> bool:
209 device_setup = DeviceSetup.get_from_id(device_setup_id)
210 if device_setup is None:
211 raise HTTPException(
212 status_code=404,
213 detail="Device setup not found",
214 )
215 deleted = device_setup.delete()
216 return deleted
219@app.get("/signals", dependencies=[Depends(get_current_active_user)])
220async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
221 return Signal.response_from_query(query).to_dict(exclude={"device"})
224@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
225async def signal_stats():
226 """
227 Returns signals stats
228 """
229 # profiler = Profiler()
230 # profiler.start()
232 number_samples = 0
233 number_active_signals = 0
234 signals = [Signal.get_from_signal_id(signal_id=sid) for sid in get_signals_ids_from_collection_names()]
236 for s in signals:
237 if s is not None:
238 number_samples += s.number_samples()
239 if s.status.status == "up":
240 number_active_signals += 1
242 # profiler.stop()
243 # filename = "signals_stats_profiling.html"
244 # full_file_path = os.path.join(Path.home(), filename)
245 # logger.info(f"Saving profiling to {full_file_path}")
246 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
247 # profiling_file.write(profiler.output_html())
249 return {
250 "signal_data_size": signal_datasize(),
251 "number_signal_samples": number_samples,
252 "number_active_signals": number_active_signals,
253 "number_signals": len(signals),
254 }
257@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
258async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
259 return SignalSample.get_last_from_signal_ids(signal_ids)
262@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
263async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
264 return SignalSample.get_first_from_signal_ids(signal_ids)
267@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
268async def get_signal(signal_id):
269 signal = Signal.get_from_signal_id(signal_id)
270 if not signal:
271 raise HTTPException(
272 status_code=404,
273 detail="Signal not found",
274 )
275 return signal.to_dict()
278@app.patch("/signals/{signal_id}", response_model=SignalUpdate, dependencies=[Depends(get_current_active_user)])
279async def update_signal(signal_id: str, signal_update: SignalUpdate):
280 signal = Signal.get_from_signal_id(signal_id)
281 if not signal:
282 raise HTTPException(
283 status_code=404,
284 detail="Device not found",
285 )
286 signal.update(signal_update)
287 return signal_update
290@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
291async def get_signal_data(
292 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
293) -> SignalData | None:
294 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
296 if number_samples_max is not None:
297 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
299 return signal_data
302@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
303async def get_last_value(signal_id) -> SignalSample:
304 sample = SignalSample.get_last_from_signal_id(signal_id)
305 if sample is None:
306 raise HTTPException(status_code=404, detail="No data")
307 return sample
310@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
311async def get_first_value(signal_id) -> SignalSample:
312 sample = SignalSample.get_first_from_signal_id(signal_id)
313 if sample is None:
314 raise HTTPException(status_code=404, detail="No data")
315 return sample
318@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
319async def get_signal_number_samples(signal_id):
320 signal = Signal.get_from_signal_id(signal_id)
321 if not signal:
322 raise HTTPException(
323 status_code=404,
324 detail="Device not found",
325 )
326 return {"signal_id": signal_id, "number_samples": signal.number_samples(), "size": signal.sample_datasize()}
329@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
330async def get_signals_data(
331 signal_ids: list[str] = Query(default=[]),
332 number_samples_max: int = None,
333 min_timestamp: float = None,
334 max_timestamp: float = None,
335) -> SignalsData | None:
337 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
338 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
340 signals_data = SignalsData.get_from_signal_ids(signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
341 if number_samples_max is not None:
342 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
343 return signals_data
346@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
347async def get_signals_data_interest_window(
348 window_max_number_samples: int = 600,
349 outside_max_number_samples: int = 150,
350 window_min_timestamp: float = None,
351 window_max_timestamp: float = None,
352 signal_ids: list[str] = Query(default=[]),
353 min_timestamp: float = None,
354 max_timestamp: float = None,
355) -> SignalsData | None:
357 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
358 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
360 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
361 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
363 signals_data = SignalsData.get_from_signal_ids(
364 signal_ids,
365 min_timestamp=min_timestamp,
366 max_timestamp=max_timestamp,
367 max_documents=10 * (window_max_number_samples + outside_max_number_samples),
368 )
370 signals_data = signals_data.interest_window_desampling(
371 window_max_number_samples=window_max_number_samples,
372 outside_max_number_samples=outside_max_number_samples,
373 window_min_timestamp=window_min_timestamp,
374 window_max_timestamp=window_max_timestamp,
375 )
377 return signals_data
380@app.get("/signals-data/export", dependencies=[Depends(get_current_active_user)])
381async def export_signals_zip(
382 format: str, signal_ids: list[str] = Query(default=[]), min_timestamp: float = None, max_timestamp: float = None
383):
384 signals_data = SignalsData.get_from_signal_ids(signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
385 zip_data = signals_data.zip_export(format)
386 return Response(
387 content=zip_data,
388 media_type="application/zip",
389 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
390 )
393@app.get("/events", dependencies=[Depends(get_current_active_user)])
394async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
395 return Event.response_from_query(query)
398@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
399async def get_event(event_id) -> Event:
400 event = Event.get_from_id(event_id)
401 if event is None:
402 raise HTTPException(status_code=404, detail="No such event")
403 return event
406@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
407async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
408 return EventRule.response_from_query(query)
411@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
412async def get_event_rule(event_rule_id) -> EventRule:
413 event_rule = EventRule.get_from_id(event_rule_id)
414 if event_rule is None:
415 raise HTTPException(status_code=404, detail="No such event rule")
416 return event_rule
419@app.post("/users", status_code=201)
420async def create_user(user: User):
421 if User.get_one_by_attribute("user", user.email) is not None:
422 raise HTTPException(status_code=400, detail="An error occurred during account creation")
423 hashed_password = get_password_hash(user.password)
424 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
425 if new_user is None:
426 raise HTTPException(status_code=400, details="An error occurred during account creation")
427 return new_user
430@app.post("/token", status_code=201)
431async def login_for_access_token(
432 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
433) -> Token:
434 user = authenticate_user(form_data.username, form_data.password)
435 if not user:
436 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
437 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
438 if user.is_active:
439 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
440 access_token = create_access_token(
441 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
442 )
443 return Token(access_token=access_token, token_type="bearer")
446@app.get("/users", dependencies=[Depends(get_current_active_user)])
447async def get_users():
448 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
451@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
452async def get_user(user_id):
453 user = None
454 if user_id is not None:
455 user = User.get_from_id(user_id)
456 if user is None:
457 raise HTTPException(
458 status_code=404,
459 detail="User not found",
460 )
461 return user
464@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
465async def patch_user(user: UserUpdate, user_id):
466 if user.password == "" or user.password is None:
467 del user.password
468 else:
469 user.password = get_password_hash(user.password)
470 return User.update(user, user_id)
473@app.post("/users/me", response_model=User)
474async def read_users_me(
475 current_user: Annotated[User, Depends(get_current_active_user)],
476):
477 del (current_user.password, current_user.is_active, current_user.is_connected)
478 return current_user
481@app.post("/authenticated")
482async def authenticated(current_user: Annotated[User, Depends(get_current_active_user)]):
483 return isinstance(current_user, User)
486@app.get("/is_admin", response_model=bool)
487async def is_admin(current_user: Annotated[User, Depends(get_current_active_user)]):
488 if not current_user.is_admin:
489 return False
490 return True
493@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
494async def get_campaigns():
495 return Campaign.get_all()
498@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
499async def get_campaign_by_id(campaign_id: str):
500 campaign = Campaign.get_from_id(campaign_id)
501 if campaign is None:
502 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
503 return campaign
506@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
507async def add_campaign(campaign: Campaign):
508 new_campaign = Campaign.create(campaign)
509 if new_campaign is None:
510 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
511 return new_campaign
514@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
515async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
516 campaign = Campaign.get_from_id(campaign_id)
517 if campaign is None:
518 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
519 campaign.name = edit_campaign.name
520 campaign.description = edit_campaign.description
521 return Campaign.update(campaign)
524@app.delete(
525 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
526)
527async def delete_campaign(campaign_id: str):
528 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
529 campaign = Campaign.get_from_id(campaign_id)
530 if campaign is None:
531 raise exception
532 delete_phases = Phase.deleteMany(campaign_id)
533 if not delete_phases.acknowledged:
534 raise exception
535 campaign_deleted = Campaign.delete(campaign_id)
536 return campaign_deleted.acknowledged
539@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
540async def get_campaign_phases(campaign_id: str):
541 return Phase.get_all(campaign_id)
544@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
545async def get_phase(phase_id: str):
546 return Phase.get_from_id(phase_id)
549@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
550async def add_phase(phase: Phase):
551 new_phase = Phase.create(phase)
552 if new_phase is None:
553 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
554 return new_phase
557@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
558async def edit_phase(phase_id, edit_phase: Phase):
559 phase = Phase.get_from_id(phase_id)
560 if phase is None:
561 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
562 phase.name = edit_phase.name
563 phase.description = edit_phase.description
564 phase.start_at = edit_phase.start_at
565 phase.end_at = edit_phase.end_at
566 return Phase.update(phase)
569@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
570async def delete_phase(phase_id: str):
571 phase = Phase.get_from_id(phase_id)
572 if phase is None:
573 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
574 phase_deleted = Phase.delete(phase_id)
575 return phase_deleted.acknowledged
578@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
579async def get_custom_views():
580 return CustomView.get_all()
583@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
584async def get_custom_views_from_user_id(user_id: str):
585 return CustomView.get_by_attribute("user_id", user_id)
588@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
589async def get_custom_view(custom_view_id: str):
590 return CustomView.get_from_id(custom_view_id)
593@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
594async def create_custom_view(custom_view: CustomView):
595 return CustomView.create(custom_view)
598@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
599async def update_custom_views(custom_view: CustomView, custom_view_id: str):
600 return CustomView.update(custom_view, custom_view_id)
603@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
604async def delete_custom_view(custom_view_id: str):
605 return CustomView.delete(custom_view_id)