Coverage for /usr/local/lib/python3.14/site-packages/twinpad_backend/api.py: 99%
476 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-01 08:23 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-01 08:23 +0000
1import os
2import logging
3from pathlib import Path
4import time
5from typing import Annotated
6from datetime import timedelta
7from pyinstrument import Profiler
9from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
10from fastapi.middleware.cors import CORSMiddleware
11from fastapi.security import OAuth2PasswordRequestForm
13from twinpad_backend import __version__
14from twinpad_backend.db import signal_datasize
15from twinpad_backend.models import (
16 DeviceId,
17 MongoId,
18 Signal,
19 ForcedSignal,
20 SignalData,
21 SignalSample,
22 ServicesStatus,
23 Device,
24 DeviceUpdate,
25 DeviceSetup,
26 DeviceSetupUpdate,
27 DeviceState,
28 SignalUpdate,
29 SignalsData,
30 Event,
31 EventRule,
32 TwinPadActivity,
33 User,
34 UserUpdate,
35 Campaign,
36 Phase,
37 CustomView,
38 Command,
39 CustomViewCreation,
40 CustomViewUpdate,
41 Video,
42 SignalsPreset,
43 SignalsPresetCreation,
44 SignalsPresetUpdate,
45 PrivateGraphTheme,
46 PublicGraphTheme,
47 GraphThemeCreation,
48 GraphThemeUpdate,
49 SINGLE_POST_PROCESSING_FUNCTION,
50 DOUBLE_POST_PROCESSING_FUNCTION,
51 MULTIPLE_POST_PROCESSING_FUNCTION,
52)
53from twinpad_backend.auth import (
54 Token,
55 authenticate_user,
56 get_current_active_user,
57 ACCESS_TOKEN_EXPIRE_MINUTES,
58 create_access_token,
59 get_password_hash,
60)
61from twinpad_backend.queries import (
62 SignalQuery,
63 ForcedSignalQuery,
64 DeviceStatesQuery,
65 EventQuery,
66 EventRuleQuery,
67 CommandQuery,
68 GraphThemeQuery,
69)
70from twinpad_backend.responses import ListResponse
71from twinpad_backend.routes.deployers import router as deployers_router
73REQUEST_TIME_WARNING = 0.5
75DEBUG = os.environ.get("DEBUG", "false") == "true"
76PROFILING = os.environ.get("PROFILING", "false") == "true"
77DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true"
79logger = logging.getLogger("uvicorn.error")
80logger.propagate = False
81logger.info("Debug mode: %s", DEBUG)
82logger.info("log level: %s", logging.root.level)
85app = FastAPI(title="Twinpad backend", version=__version__)
87app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
89if PROFILING: # pragma: no cover
90 profiling_folder = "/tmp/twinpad_profiling"
91 Path(profiling_folder).mkdir(parents=True, exist_ok=True)
92 logger.info("Profiling enabled")
94 @app.middleware("http")
95 async def profile_request(request: Request, call_next):
96 should_profile = True
97 url = str(request.url)
98 for segment in ("profiling", ".ico"):
99 if segment in url:
100 should_profile = False
101 break
103 if should_profile: # avoid recursion
104 profiler = Profiler()
105 profiler.start()
107 response = await call_next(request)
109 profiler.stop()
110 url = "_".join(url.split("/")[3:]).rstrip("/")
111 if not url:
112 url = "slash"
113 filename = url.split("?", maxsplit=1)[0]
114 logger.info("saving profiling to %s", filename)
115 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
116 profiling_file.write(profiler.output_html())
118 return response
120 return await call_next(request)
122 @app.get("/profilings")
123 async def profilings():
124 return {"profilings": os.listdir(profiling_folder)}
126 @app.get("/profilings/{file_name}")
127 async def profiling(file_name):
128 file_path = os.path.join(profiling_folder, file_name)
130 if not os.path.exists(file_path):
131 raise HTTPException(
132 status_code=404,
133 detail=f"Profiling file '{file_name}' not found",
134 )
136 with open(file_path, "r", encoding="utf-8") as profiling_file:
137 return Response(
138 content=profiling_file.read(),
139 media_type="application/html",
140 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
141 )
144@app.middleware("http")
145async def log_request_time(request: Request, call_next):
146 start_time = time.time() # Record the start time
147 response = await call_next(request) # Process the request
148 duration = time.time() - start_time # Calculate the time taken
149 client_ip = request.headers.get("x-forwarded-for", request.client.host)
150 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
151 if duration > REQUEST_TIME_WARNING:
152 logger.warning(message)
153 else:
154 logger.info(message)
155 return response
158@app.get("/")
159async def slash():
160 return {"twinpad_version": __version__}
163@app.get("/status", dependencies=[Depends(get_current_active_user)])
164async def status():
165 """
166 Return service healthcheck
167 """
168 return {
169 "services": ServicesStatus.check(),
170 "timestamp": time.time(),
171 }
174@app.get("/devices", dependencies=[Depends(get_current_active_user)])
175async def get_devices() -> list[Device]:
176 return Device.get_all(sort_by="device_id")
179@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
180async def get_device(device_id) -> Device:
181 device = Device.get_one_by_attribute("device_id", device_id)
182 if not device:
183 raise HTTPException(
184 status_code=404,
185 detail="Device not found",
186 )
187 return device
190@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
191async def update_item(
192 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
193):
194 device = Device.get_one_by_attribute("device_id", device_id)
195 if not device:
196 raise HTTPException(
197 status_code=404,
198 detail="Device not found",
199 )
200 result = await device.change_mode(device_update, current_user)
201 if result.get("error", False) is True:
202 raise HTTPException(
203 status_code=result.get("status_code", 500),
204 detail=result.get("message", "An error has occurred"),
205 )
206 return result
209@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
210async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
211 return DeviceState.get_from_id_and_query(device_id, query)
214@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
215async def get_device_setups() -> list[DeviceSetup]:
216 return DeviceSetup.get_all()
219@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
220async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
221 device_setup.insert()
222 return device_setup
225@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
226async def get_device_setup(device_setup_id: str):
227 device_setup = DeviceSetup.get_from_id(device_setup_id)
228 if device_setup is None:
229 raise HTTPException(
230 status_code=404,
231 detail="Device setup not found",
232 )
233 return device_setup
236@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
237async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
238 device_setup = DeviceSetup.get_from_id(device_setup_id)
239 if device_setup is None:
240 raise HTTPException(
241 status_code=404,
242 detail="Device setup not found",
243 )
244 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
245 return device_setup
248@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
249async def delete_device_setups(device_setup_id: str) -> bool:
250 device_setup = DeviceSetup.get_from_id(device_setup_id)
251 if device_setup is None:
252 raise HTTPException(
253 status_code=404,
254 detail="Device setup not found",
255 )
256 deleted = device_setup.delete()
257 return deleted
260@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
261async def get_number_samples(
262 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
263) -> list[TwinPadActivity]:
264 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
267@app.get("/signals", dependencies=[Depends(get_current_active_user)])
268async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
269 if "signal_id" not in query.sort_by:
270 query.sort_by += ",signal_id:1"
271 return Signal.response_from_query(query).to_dict(exclude={"device"})
274@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
275async def signals_names() -> list[str]:
276 return Signal.get_all_ids()
279@app.get("/signals/statuses", dependencies=[Depends(get_current_active_user)])
280async def signals_statuses() -> list[dict[str, str]]:
281 return Signal.get_all_statuses()
284@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
285async def signal_stats():
286 """
287 Returns signals stats
288 """
289 signal_statuses = Signal.get_all_statuses()
290 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
291 number_samples = Signal.total_number_samples()
292 number_signals = Signal.get_number_documents()
294 return {
295 "signal_data_size": signal_datasize(),
296 "number_signal_samples": number_samples,
297 "number_active_signals": number_active_signals,
298 "number_signals": number_signals,
299 }
302@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
303async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
304 return SignalSample.get_last_from_signal_ids(signal_ids)
307@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
308async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
309 return SignalSample.get_first_from_signal_ids(signal_ids)
312@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
313async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
314 return Signal.get_forcibility(signal_ids)
317@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)])
318async def get_forced_signals(
319 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends()
320):
321 if not current_user.is_admin:
322 raise HTTPException(401)
323 return ForcedSignal.response_from_query(query)
326@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
327async def get_signal(signal_id):
328 signal = Signal.get_from_signal_id(signal_id)
329 if not signal:
330 raise HTTPException(
331 status_code=404,
332 detail="Signal not found",
333 )
334 return signal.to_dict()
337@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
338async def update_signal(
339 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
340):
341 signal = Signal.get_from_signal_id(signal_id)
342 if not signal:
343 raise HTTPException(
344 status_code=404,
345 detail="Device not found",
346 )
347 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id)
348 if forced_signal is not None:
349 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
350 raise HTTPException(
351 status_code=403,
352 detail="Cannot override another user's forcing",
353 )
355 result = await signal.send_command(signal_update, current_user)
356 if result.get("error", False) is True:
357 raise HTTPException(
358 status_code=result.get("status_code", 500),
359 detail=result.get("message", "An error has occurred"),
360 )
362 if forced_signal is not None and signal_update.forced_value is None:
363 forced_signal.delete()
364 elif signal_update.forced_value is not None:
365 forced_signal = ForcedSignal(
366 signal_id=signal_id,
367 forcing_user_id=current_user.id,
368 forced_at=time.time(),
369 value=signal_update.forced_value,
370 )
371 forced_signal.insert()
373 return result
376@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)])
377async def get_signal_forcibility(
378 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)]
379) -> bool:
380 return ForcedSignal.can_force(signal_id, current_user)
383@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
384async def get_signal_data(
385 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
386) -> SignalData | None:
387 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
389 if number_samples_max is not None:
390 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
392 return signal_data
395@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
396async def get_last_value(signal_id) -> SignalSample:
397 sample = SignalSample.get_last_from_signal_id(signal_id)
398 if sample is None:
399 raise HTTPException(status_code=404, detail="No data")
400 return sample
403@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
404async def get_first_value(signal_id) -> SignalSample:
405 sample = SignalSample.get_first_from_signal_id(signal_id)
406 if sample is None:
407 raise HTTPException(status_code=404, detail="No data")
408 return sample
411@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
412async def get_signal_number_samples(signal_id):
413 signal = Signal.get_from_signal_id(signal_id)
414 if not signal:
415 raise HTTPException(
416 status_code=404,
417 detail="Signal not found",
418 )
419 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
422@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
423async def get_signals_data(
424 signal_ids: list[str] = Query(default=[]),
425 number_samples_max: int = None,
426 min_timestamp: float = None,
427 max_timestamp: float = None,
428 interpolate_bounds: bool = True,
429) -> SignalsData | None:
430 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
431 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
433 signals_data = SignalsData.get_from_signal_ids(
434 signal_ids,
435 min_timestamp=min_timestamp,
436 max_timestamp=max_timestamp,
437 window_min_timestamp=min_timestamp,
438 window_max_timestamp=max_timestamp,
439 interpolate_bounds=interpolate_bounds,
440 )
441 if number_samples_max is not None:
442 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
444 return signals_data
447@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
448async def get_signals_data_interest_window(
449 signal_ids: list[str] = Query(default=[]),
450 window_max_number_samples: int = None,
451 outside_max_number_samples: int = None,
452 window_min_timestamp: float = None,
453 window_max_timestamp: float = None,
454 min_timestamp: float = None,
455 max_timestamp: float = None,
456) -> SignalsData | None:
457 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
458 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
460 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
461 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
463 max_documents = 0
465 if window_max_number_samples is not None:
466 max_documents += 10 * window_max_number_samples
467 if outside_max_number_samples is not None:
468 max_documents += 10 * outside_max_number_samples
470 if max_documents == 0:
471 max_documents = None
473 signals_data = SignalsData.get_from_signal_ids(
474 signal_ids,
475 min_timestamp=min_timestamp,
476 max_timestamp=max_timestamp,
477 window_min_timestamp=window_min_timestamp,
478 window_max_timestamp=window_max_timestamp,
479 max_documents=max_documents,
480 )
482 signals_data = signals_data.interest_window_desampling(
483 window_max_number_samples=window_max_number_samples,
484 outside_max_number_samples=outside_max_number_samples,
485 window_min_timestamp=window_min_timestamp,
486 window_max_timestamp=window_max_timestamp,
487 )
489 return signals_data
492@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
493async def export_signals_zip(
494 file_format: str,
495 signal_ids: list[str] = Query(default=[]),
496 min_timestamp: float = None,
497 max_timestamp: float = None,
498):
499 signals_data = SignalsData.get_from_signal_ids(
500 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
501 )
502 zip_data = signals_data.zip_export(file_format)
503 return Response(
504 content=zip_data,
505 media_type="application/zip",
506 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
507 )
510@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
511async def export_signals_hdf5(
512 signal_ids: list[str] = Query(default=[]),
513 min_timestamp: float = None,
514 max_timestamp: float = None,
515):
516 signals_data = SignalsData.get_from_signal_ids(
517 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
518 )
519 data = signals_data.hdf5_export()
520 return Response(
521 content=data,
522 media_type="application/hdf5",
523 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
524 )
527@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)])
528async def get_signals_data_post_processing(
529 phase_ids: list[str] = Query(default=[]),
530 phase_sync_times: list[float | None] = Query(default=[]),
531 signal_ids: list[str] = Query(default=[]),
532 window_min_timestamps: list[float | None] = Query(default=[]),
533 window_max_timestamps: list[float | None] = Query(default=[]),
534 number_samples_max: int = None,
535) -> SignalsData | None:
536 if len(phase_sync_times) == 0:
537 phase_sync_times = [None for _ in range(len(phase_ids))]
538 if len(window_min_timestamps) == 0:
539 window_min_timestamps = [None for _ in range(len(phase_ids))]
540 if len(window_max_timestamps) == 0:
541 window_max_timestamps = [None for _ in range(len(phase_ids))]
543 if (
544 len(phase_ids) != len(phase_sync_times)
545 or len(phase_ids) != len(window_min_timestamps)
546 or len(phase_ids) != len(window_max_timestamps)
547 ):
548 raise HTTPException(
549 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps."
550 )
552 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
554 if None in phases:
555 raise HTTPException(404, "Phase not found")
557 signals_data = SignalsData.get_from_phase_and_signal_ids(
558 phases=phases,
559 phase_sync_times=phase_sync_times,
560 signal_ids=signal_ids,
561 window_min_timestamps=window_min_timestamps,
562 window_max_timestamps=window_max_timestamps,
563 )
565 if number_samples_max is not None:
566 signals_data = signals_data.min_max_downsampling(number_samples_max)
568 return signals_data
571@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)])
572async def apply_single_post_processing_function(
573 phase_id: str,
574 base_signal_id: str,
575 function: SINGLE_POST_PROCESSING_FUNCTION,
576 phase_sync_time: float = None,
577 window_min_timestamp: float = None,
578 window_max_timestamp: float = None,
579 number_samples_max: int = None,
580) -> SignalsData | None:
581 phase = Phase.get_from_id(phase_id)
583 if phase is None:
584 raise HTTPException(404, "Phase not found")
585 if phase_sync_time is None:
586 phase_sync_time = phase.start_at / 1000
588 signals_data = await SignalsData.apply_single_function(
589 phase,
590 base_signal_id,
591 function,
592 window_min_timestamp=window_min_timestamp,
593 window_max_timestamp=window_max_timestamp,
594 )
596 if signals_data is None:
597 raise HTTPException(500, "There was en error while applying the function")
599 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
600 signals_data = signals_data.min_max_downsampling(number_samples_max)
601 signals_data = signals_data.zero_time_vector(phase_sync_time)
603 return signals_data
606@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)])
607async def apply_multiple_post_processing_function(
608 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
609 phase_ids: list[str] = Query(default=[]),
610 phase_sync_times: list[float] = Query(default=[]),
611 signal_ids: list[str] = Query(default=[]),
612 window_min_timestamp: float = None,
613 window_max_timestamp: float = None,
614 number_samples_max: int = None,
615) -> SignalsData | None:
616 if len(phase_ids) != len(signal_ids):
617 raise HTTPException(400, "Each selected signal should correspond to a phase")
619 if len(signal_ids) < 2:
620 raise HTTPException(400, "These functions can only be applied to multiple signals")
622 if len(phase_ids) != len(phase_sync_times) and len(phase_sync_times) != 0:
623 raise HTTPException(400, "Number of synchronization times does not match the number of phases")
625 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
626 if None in phases:
627 raise HTTPException(404, "Phase not found")
629 if len(phase_sync_times) == 0:
630 phase_sync_times = [phase.start_at / 1000 for phase in phases]
632 signals_data = await SignalsData.apply_multiple_function(
633 phases,
634 signal_ids,
635 function,
636 window_min_timestamp=window_min_timestamp,
637 window_max_timestamp=window_max_timestamp,
638 )
640 if signals_data is None:
641 raise HTTPException(500, "There was en error while applying the function")
643 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
644 signals_data = signals_data.min_max_downsampling(number_samples_max)
645 if function in {"Align-X", "Using-X"}:
646 signals_data = signals_data.zero_time_vector(phase_sync_times[1])
647 else:
648 signals_data = signals_data.zero_time_vector(phase_sync_times[0])
650 return signals_data
653@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)])
654async def export_post_processing_zip(
655 file_format: str,
656 phase_ids: list[MongoId] = Query(default=[]),
657 phase_sync_times: list[float | None] = Query(default=[]),
658 signal_ids: list[str] = Query(default=[]),
659 window_min_timestamps: list[float | None] = Query(default=[]),
660 window_max_timestamps: list[float | None] = Query(default=[]),
661):
662 signals_data = await get_signals_data_post_processing(
663 phase_ids,
664 phase_sync_times,
665 signal_ids,
666 window_min_timestamps,
667 window_max_timestamps,
668 )
670 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids)
672 return Response(
673 content=zip_data,
674 media_type="application/zip",
675 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
676 )
679@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)])
680async def export_post_processing_hdf5(
681 phase_ids: list[str] = Query(default=[]),
682 phase_sync_times: list[float | None] = Query(default=[]),
683 signal_ids: list[str] = Query(default=[]),
684 window_min_timestamps: list[float | None] = Query(default=[]),
685 window_max_timestamps: list[float | None] = Query(default=[]),
686):
687 signals_data = await get_signals_data_post_processing(
688 phase_ids,
689 phase_sync_times,
690 signal_ids,
691 window_min_timestamps,
692 window_max_timestamps,
693 )
695 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids)
697 return Response(
698 content=data,
699 media_type="application/hdf5",
700 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
701 )
704@app.get("/events", dependencies=[Depends(get_current_active_user)])
705async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
706 return Event.response_from_query(query)
709@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
710async def get_event(event_id) -> Event:
711 event = Event.get_from_id(event_id)
712 if event is None:
713 raise HTTPException(status_code=404, detail="No such event")
714 return event
717@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
718async def get_number_events(
719 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
720) -> list[TwinPadActivity]:
721 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
724@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
725async def get_number_commands(
726 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
727) -> list[TwinPadActivity]:
728 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
731@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
732async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
733 return EventRule.response_from_query(query)
736@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
737async def get_event_rule(event_rule_id) -> EventRule:
738 event_rule = EventRule.get_from_id(event_rule_id)
739 if event_rule is None:
740 raise HTTPException(status_code=404, detail="No such event rule")
741 return event_rule
744@app.post("/users", status_code=201)
745async def create_user(user: User):
746 if User.get_one_by_attribute("email", user.email) is not None:
747 raise HTTPException(status_code=400, detail="An error occurred during account creation")
748 hashed_password = get_password_hash(user.password)
749 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
750 if new_user is None: # pragma: no cover
751 raise HTTPException(status_code=400, detail="An error occurred during account creation")
752 return new_user
755@app.post("/token", status_code=201)
756async def login_for_access_token(
757 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
758) -> Token:
759 user = authenticate_user(form_data.username, form_data.password)
760 if not user:
761 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
762 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
763 if user.is_blocked:
764 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
765 access_token = create_access_token(
766 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
767 )
768 return Token(access_token=access_token, token_type="bearer")
771@app.get("/users", dependencies=[Depends(get_current_active_user)])
772async def get_users():
773 return [u.to_dict() for u in User.get_all(sort_by="email")]
776@app.get("/users/me")
777async def get_current_user(
778 current_user: Annotated[User, Depends(get_current_active_user)],
779):
780 return current_user.to_dict()
783@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
784async def get_user(user_id: str):
785 user = User.get_from_id(user_id)
787 if user is None:
788 raise HTTPException(
789 status_code=404,
790 detail="User not found",
791 )
792 return user.to_dict(exclude={"password", "is_connected"})
795@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
796async def patch_user(user: UserUpdate, user_id):
797 if user.password == "" or user.password is None:
798 del user.password
799 else:
800 user.password = get_password_hash(user.password)
801 return User.update_info(user, user_id).to_dict()
804@app.get("/commands", dependencies=[Depends(get_current_active_user)])
805async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
806 return Command.response_from_query(query)
809@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
810async def get_campaigns():
811 return Campaign.get_all()
814@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
815async def get_campaign_by_id(campaign_id: str):
816 campaign = Campaign.get_from_id(campaign_id)
817 if campaign is None:
818 raise HTTPException(status_code=404, detail="Campaign not found")
819 return campaign
822@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
823async def add_campaign(campaign: Campaign):
824 campaign_id = campaign.insert()
825 if campaign_id is None: # pragma: no cover
826 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
827 return campaign
830@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
831async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
832 campaign = Campaign.get_from_id(campaign_id)
833 if campaign is None:
834 raise HTTPException(status_code=404, detail="Campaign not found")
835 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json"))
836 return campaign
839@app.delete(
840 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
841)
842async def delete_campaign(campaign_id: str):
843 campaign = Campaign.get_from_id(campaign_id)
844 if campaign is None:
845 raise HTTPException(status_code=404, detail="Campaign not found")
846 delete_phases = Phase.deleteMany(campaign_id)
847 if not delete_phases.acknowledged: # pragma: no cover
848 raise HTTPException(status_code=500, detail="An error occurred during phases deletion")
849 campaign_deleted = campaign.delete()
850 if not campaign_deleted: # pragma: no cover
851 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion")
852 return True
855@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
856async def get_campaign_phases(campaign_id: str):
857 return Phase.get_by_attribute("campaign_id", campaign_id)
860@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
861async def get_phase(phase_id: str):
862 phase = Phase.get_from_id(phase_id)
863 if phase is None:
864 raise HTTPException(status_code=404, detail="Phase not found")
865 return phase
868@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
869async def add_phase(phase: Phase):
870 phase_id = phase.insert()
871 if phase_id is None: # pragma: no cover
872 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
873 return phase
876@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
877async def edit_phase(phase_id, edit_phase: Phase):
878 phase = Phase.get_from_id(phase_id)
879 if phase is None:
880 raise HTTPException(status_code=404, detail="Phase does not exists")
881 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json"))
882 return phase
885@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
886async def delete_phase(phase_id: str):
887 phase = Phase.get_from_id(phase_id)
888 if phase is None:
889 raise HTTPException(status_code=404, detail="Phase not found")
890 deleted = phase.delete()
891 if not deleted: # pragma: no cover
892 raise HTTPException(status_code=500, detail="An error occurred during phase deletion")
893 return True
896@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
897async def get_custom_views():
898 return CustomView.get_all()
901@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
902async def get_custom_views_from_user_id(user_id: str):
903 return CustomView.get_by_attribute("user_id", user_id)
906@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
907async def get_custom_view(custom_view_id: str):
908 return CustomView.get_from_id(custom_view_id)
911@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
912async def create_custom_view(
913 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
914):
915 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
916 custom_view.insert()
917 return custom_view
920@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
921async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
922 custom_view = CustomView.get_from_id(custom_view_id)
923 return custom_view.update(custom_view_update.model_dump())
926@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
927async def delete_custom_view(custom_view_id: str):
928 custom_view = CustomView.get_from_id(custom_view_id)
929 return custom_view.delete()
932@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
933async def add_video(video: Video):
934 video.insert()
935 if not video: # pragma: no cover
936 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
937 return video
940@app.get("/videos", dependencies=[Depends(get_current_active_user)])
941async def get_videos():
942 return Video.get_all()
945@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
946def get_stream(video_id):
947 camera_name = Video.get_video(video_id)
948 if camera_name is None:
949 raise HTTPException(status_code=404, detail="Camera not found")
950 return camera_name
953@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
954async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
955 return SignalsPreset.get_by_attribute("user_id", current_user.id)
958@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
959async def create_signals_preset(
960 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
961):
962 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
963 return new_signals_preset
966@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
967async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
968 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
969 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
972@app.delete(
973 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
974)
975async def delete_signals_preset(signals_preset_id: str):
976 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
977 return signals_preset.delete()
980@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
981async def create_graph_theme(
982 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
983):
984 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
985 if styled_signal is None:
986 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
988 graph_theme = PrivateGraphTheme.create(
989 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
990 )
991 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
994@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
995async def get_all_graph_themes(
996 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
997) -> ListResponse[PublicGraphTheme]:
998 return PublicGraphTheme.response_from_query(query, current_user.id)
1001@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
1002async def get_graph_themes_in_library(
1003 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
1004) -> ListResponse[PublicGraphTheme]:
1005 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
1008@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
1009async def update_graph_theme(
1010 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
1011):
1012 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
1013 update_dict = theme_update.model_dump(exclude_unset=True)
1014 if current_user.id != graph_theme.creator_id:
1015 for theme_property in update_dict.keys():
1016 if theme_property not in ["active_for_user", "in_user_library"]:
1017 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
1018 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
1019 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
1022@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
1023async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
1024 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
1025 if current_user.id != graph_theme.creator_id:
1026 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
1027 return graph_theme.delete()
1030@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
1031async def get_signals_appearances(
1032 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
1033) -> dict:
1034 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)
1037if DEVICE_DEPLOYERS:
1038 app.include_router(deployers_router, prefix="/device-deployers")