Coverage for / usr / local / lib / python3.11 / site-packages / twinpad_backend / api.py: 96%
492 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-21 08:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-21 08:45 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 ForcedSignal,
20 SignalData,
21 SignalSample,
22 ServicesStatus,
23 Device,
24 DeviceUpdate,
25 DeviceSetup,
26 DeviceSetupUpdate,
27 DeviceState,
28 SignalUpdate,
29 SignalsData,
30 Event,
31 EventRule,
32 TwinPadActivity,
33 User,
34 UserUpdate,
35 Campaign,
36 Phase,
37 CustomView,
38 Command,
39 CustomViewCreation,
40 CustomViewUpdate,
41 Video,
42 SignalsPreset,
43 SignalsPresetCreation,
44 SignalsPresetUpdate,
45 PrivateGraphTheme,
46 PublicGraphTheme,
47 GraphThemeCreation,
48 GraphThemeUpdate,
49 SINGLE_POST_PROCESSING_FUNCTION,
50 DOUBLE_POST_PROCESSING_FUNCTION,
51 MULTIPLE_POST_PROCESSING_FUNCTION,
52)
53from twinpad_backend.auth import (
54 Token,
55 authenticate_user,
56 get_current_active_user,
57 ACCESS_TOKEN_EXPIRE_MINUTES,
58 create_access_token,
59 get_password_hash,
60)
61from twinpad_backend.queries import (
62 SignalQuery,
63 ForcedSignalQuery,
64 DeviceStatesQuery,
65 EventQuery,
66 EventRuleQuery,
67 CommandQuery,
68 GraphThemeQuery,
69)
70from twinpad_backend.responses import ListResponse
72REQUEST_TIME_WARNING = 0.5
74DEBUG = os.environ.get("DEBUG", "false") == "true"
75PROFILING = os.environ.get("PROFILING", "false") == "true"
77logger = logging.getLogger("uvicorn.error")
78logger.propagate = False
79logger.info("Debug mode: %s", DEBUG)
80logger.info("log level: %s", logging.root.level)
83app = FastAPI(title="Twinpad backend", version=__version__)
85app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
87if PROFILING: # pragma: no cover
88 profiling_folder = mkdtemp()
89 logger.info("Profiling enabled")
91 @app.middleware("http")
92 async def profile_request(request: Request, call_next):
93 should_profile = True
94 url = str(request.url)
95 for segment in ("profiling", ".ico"):
96 if segment in url:
97 should_profile = False
98 break
100 if should_profile: # avoid recursion
101 profiler = Profiler()
102 profiler.start()
104 response = await call_next(request)
106 profiler.stop()
107 url = "_".join(url.split("/")[3:]).rstrip("/")
108 if not url:
109 url = "slash"
110 filename = url.split("?", maxsplit=1)[0]
111 logger.info("saving profiling to %s", filename)
112 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
113 profiling_file.write(profiler.output_html())
115 return response
117 return await call_next(request)
119 @app.get("/profilings")
120 async def profilings():
121 return {"profilings": os.listdir(profiling_folder)}
123 @app.get("/profilings/{file_name}")
124 async def profiling(file_name):
125 file_path = os.path.join(profiling_folder, file_name)
127 if not os.path.exists(file_path):
128 raise HTTPException(
129 status_code=404,
130 detail=f"Profiling file '{file_name}' not found",
131 )
133 with open(file_path, "r", encoding="utf-8") as profiling_file:
134 return Response(
135 content=profiling_file.read(),
136 media_type="application/html",
137 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
138 )
141@app.middleware("http")
142async def log_request_time(request: Request, call_next):
143 start_time = time.time() # Record the start time
144 response = await call_next(request) # Process the request
145 duration = time.time() - start_time # Calculate the time taken
146 client_ip = request.headers.get("x-forwarded-for", request.client.host)
147 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
148 if duration > REQUEST_TIME_WARNING:
149 logger.warning(message)
150 else:
151 logger.info(message)
152 return response
155@app.get("/")
156async def slash():
157 return {"twinpad_version": __version__}
160@app.get("/status", dependencies=[Depends(get_current_active_user)])
161async def status():
162 """
163 Return service healthcheck
164 """
165 return {
166 "services": ServicesStatus.check(),
167 }
170@app.get("/devices", dependencies=[Depends(get_current_active_user)])
171async def get_devices() -> list[Device]:
172 return Device.get_all(sort_by="device_id")
175@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
176async def get_device(device_id):
177 device = Device.get_one_by_attribute("device_id", device_id)
178 if not device:
179 raise HTTPException(
180 status_code=404,
181 detail="Device not found",
182 )
183 return device
186@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
187async def update_item(
188 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
189):
190 device = Device.get_one_by_attribute("device_id", device_id)
191 if not device:
192 raise HTTPException(
193 status_code=404,
194 detail="Device not found",
195 )
196 result = await device.change_mode(device_update, current_user)
197 if result.get("error", False) is True:
198 raise HTTPException(
199 status_code=result.get("status_code", 500),
200 detail=result.get("message", "An error has occurred"),
201 )
202 return result
205@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
206async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
207 return DeviceState.get_from_id_and_query(device_id, query)
210@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
211async def get_device_setups() -> list[DeviceSetup]:
212 return DeviceSetup.get_all()
215@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
216async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
217 device_setup.insert()
218 return device_setup
221@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
222async def get_device_setup(device_setup_id: str):
223 device_setup = DeviceSetup.get_from_id(device_setup_id)
224 if device_setup is None:
225 raise HTTPException(
226 status_code=404,
227 detail="Device setup not found",
228 )
229 return device_setup
232@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
233async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
234 device_setup = DeviceSetup.get_from_id(device_setup_id)
235 if device_setup is None:
236 raise HTTPException(
237 status_code=404,
238 detail="Device setup not found",
239 )
240 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
241 return device_setup
244@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
245async def delete_device_setups(device_setup_id: str) -> bool:
246 device_setup = DeviceSetup.get_from_id(device_setup_id)
247 if device_setup is None:
248 raise HTTPException(
249 status_code=404,
250 detail="Device setup not found",
251 )
252 deleted = device_setup.delete()
253 return deleted
256@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
257async def get_number_samples(
258 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
259) -> list[TwinPadActivity]:
260 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
263@app.get("/signals", dependencies=[Depends(get_current_active_user)])
264async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
265 if "signal_id" not in query.sort_by:
266 query.sort_by += ",signal_id:1"
267 return Signal.response_from_query(query).to_dict(exclude={"device"})
270@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
271async def signals_names() -> list[str]:
272 return Signal.get_all_ids()
275@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
276async def signal_stats():
277 """
278 Returns signals stats
279 """
280 signal_statuses = Signal.get_all_statuses()
281 signal_ids = [signal["signal_id"] for signal in signal_statuses]
283 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids)
284 number_samples = sum(number_samples_by_signal_id.values())
286 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
288 number_signals = Signal.get_number_documents()
290 return {
291 "signal_data_size": signal_datasize(),
292 "number_signal_samples": number_samples,
293 "number_active_signals": number_active_signals,
294 "number_signals": number_signals,
295 }
298@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
299async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
300 return SignalSample.get_last_from_signal_ids(signal_ids)
303@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)])
304async def get_last_values_interest_window(
305 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0
306) -> list[SignalSample | None]:
307 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp)
310@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
311async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
312 return SignalSample.get_first_from_signal_ids(signal_ids)
315@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
316async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
317 return Signal.get_forcibility(signal_ids)
320@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)])
321async def get_forced_signals(
322 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends()
323):
324 if not current_user.is_admin:
325 raise HTTPException(401)
326 return ForcedSignal.response_from_query(query)
329@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
330async def get_signal(signal_id):
331 signal = Signal.get_from_signal_id(signal_id)
332 if not signal:
333 raise HTTPException(
334 status_code=404,
335 detail="Signal not found",
336 )
337 return signal.to_dict()
340@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
341async def update_signal(
342 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
343):
344 signal = Signal.get_from_signal_id(signal_id)
345 if not signal:
346 raise HTTPException(
347 status_code=404,
348 detail="Device not found",
349 )
350 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id)
351 if forced_signal is not None:
352 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
353 raise HTTPException(
354 status_code=403,
355 detail="Cannot override another user's forcing",
356 )
358 result = await signal.send_command(signal_update, current_user)
359 if result.get("error", False) is True:
360 raise HTTPException(
361 status_code=result.get("status_code", 500),
362 detail=result.get("message", "An error has occurred"),
363 )
365 if forced_signal is not None and signal_update.forced_value is None:
366 forced_signal.delete()
367 elif signal_update.forced_value is not None:
368 forced_signal = ForcedSignal(
369 signal_id=signal_id,
370 forcing_user_id=current_user.id,
371 forced_at=time.time(),
372 value=signal_update.forced_value,
373 )
374 forced_signal.insert()
376 return result
379@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)])
380async def get_signal_forcibility(
381 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)]
382) -> bool:
383 return ForcedSignal.can_force(signal_id, current_user)
386@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
387async def get_signal_data(
388 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
389) -> SignalData | None:
390 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
392 if number_samples_max is not None:
393 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
395 return signal_data
398@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
399async def get_last_value(signal_id) -> SignalSample:
400 sample = SignalSample.get_last_from_signal_id(signal_id)
401 if sample is None:
402 raise HTTPException(status_code=404, detail="No data")
403 return sample
406@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
407async def get_first_value(signal_id) -> SignalSample:
408 sample = SignalSample.get_first_from_signal_id(signal_id)
409 if sample is None:
410 raise HTTPException(status_code=404, detail="No data")
411 return sample
414@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
415async def get_signal_number_samples(signal_id):
416 signal = Signal.get_from_signal_id(signal_id)
417 if not signal:
418 raise HTTPException(
419 status_code=404,
420 detail="Device not found",
421 )
422 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
425@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
426async def get_signals_data(
427 signal_ids: list[str] = Query(default=[]),
428 number_samples_max: int = None,
429 min_timestamp: float = None,
430 max_timestamp: float = None,
431 interpolate_bounds: bool = True,
432) -> SignalsData | None:
433 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
434 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
436 signals_data = SignalsData.get_from_signal_ids(
437 signal_ids,
438 min_timestamp=min_timestamp,
439 max_timestamp=max_timestamp,
440 window_min_timestamp=min_timestamp,
441 window_max_timestamp=max_timestamp,
442 interpolate_bounds=interpolate_bounds,
443 )
444 if number_samples_max is not None:
445 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
447 return signals_data
450@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
451async def get_signals_data_interest_window(
452 window_max_number_samples: int | None = None,
453 outside_max_number_samples: int | None = None,
454 window_min_timestamp: float = None,
455 window_max_timestamp: float = None,
456 signal_ids: list[str] = Query(default=[]),
457 min_timestamp: float = None,
458 max_timestamp: float = None,
459) -> SignalsData | None:
460 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
461 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
463 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
464 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
466 max_documents = 0
468 if window_max_number_samples is not None:
469 max_documents += 10 * window_max_number_samples
470 if outside_max_number_samples is not None:
471 max_documents += 10 * outside_max_number_samples
473 if max_documents == 0:
474 max_documents = None
476 signals_data = SignalsData.get_from_signal_ids(
477 signal_ids,
478 min_timestamp=min_timestamp,
479 max_timestamp=max_timestamp,
480 window_min_timestamp=window_min_timestamp,
481 window_max_timestamp=window_max_timestamp,
482 max_documents=max_documents,
483 )
485 signals_data = signals_data.interest_window_desampling(
486 window_max_number_samples=window_max_number_samples,
487 outside_max_number_samples=outside_max_number_samples,
488 window_min_timestamp=window_min_timestamp,
489 window_max_timestamp=window_max_timestamp,
490 )
492 return signals_data
495@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
496async def export_signals_zip(
497 file_format: str,
498 signal_ids: list[str] = Query(default=[]),
499 min_timestamp: float = None,
500 max_timestamp: float = None,
501):
502 signals_data = SignalsData.get_from_signal_ids(
503 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
504 )
505 zip_data = signals_data.zip_export(file_format)
506 return Response(
507 content=zip_data,
508 media_type="application/zip",
509 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
510 )
513@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
514async def export_signals_hdf5(
515 signal_ids: list[str] = Query(default=[]),
516 min_timestamp: float = None,
517 max_timestamp: float = None,
518):
519 signals_data = SignalsData.get_from_signal_ids(
520 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
521 )
522 data = signals_data.hdf5_export()
523 return Response(
524 content=data,
525 media_type="application/hdf5",
526 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
527 )
530@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)])
531async def get_signals_data_post_processing(
532 phase_ids: list[str] = Query(default=[]),
533 phase_sync_times: list[float | None] = Query(default=[]),
534 signal_ids: list[str] = Query(default=[]),
535 window_min_timestamps: list[float | None] = Query(default=[]),
536 window_max_timestamps: list[float | None] = Query(default=[]),
537 number_samples_max: int = None,
538) -> SignalsData | None:
539 if len(phase_sync_times) == 0:
540 phase_sync_times = [None for _ in range(len(phase_ids))]
541 if len(window_min_timestamps) == 0:
542 window_min_timestamps = [None for _ in range(len(phase_ids))]
543 if len(window_max_timestamps) == 0:
544 window_max_timestamps = [None for _ in range(len(phase_ids))]
546 if (
547 len(phase_ids) != len(phase_sync_times)
548 or len(phase_ids) != len(window_min_timestamps)
549 or len(phase_ids) != len(window_max_timestamps)
550 ):
551 raise HTTPException(
552 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps."
553 )
555 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
557 if None in phases:
558 raise HTTPException(404, "Phase not found")
560 signals_data = SignalsData.get_from_phase_and_signal_ids(
561 phases=phases,
562 phase_sync_times=phase_sync_times,
563 signal_ids=signal_ids,
564 window_min_timestamps=window_min_timestamps,
565 window_max_timestamps=window_max_timestamps,
566 )
568 if number_samples_max is not None:
569 signals_data = signals_data.min_max_downsampling(number_samples_max)
571 return signals_data
574@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)])
575async def apply_single_post_processing_function(
576 phase_id: str,
577 base_signal_id: str,
578 function: SINGLE_POST_PROCESSING_FUNCTION,
579 phase_sync_time: float = None,
580 window_min_timestamp: float = None,
581 window_max_timestamp: float = None,
582 number_samples_max: int = None,
583) -> SignalsData | None:
584 phase = Phase.get_from_id(phase_id)
586 if phase is None:
587 raise HTTPException(404, "Phase not found")
588 if phase_sync_time is None:
589 phase_sync_time = phase.start_at / 1000
591 signals_data = await SignalsData.apply_single_function(
592 phase,
593 base_signal_id,
594 function,
595 window_min_timestamp=window_min_timestamp,
596 window_max_timestamp=window_max_timestamp,
597 )
599 if signals_data is None:
600 raise HTTPException(500, "There was en error while applying the function")
602 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
603 signals_data = signals_data.min_max_downsampling(number_samples_max)
604 signals_data = signals_data.zero_time_vector(phase_sync_time)
606 return signals_data
609@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)])
610async def apply_multiple_post_processing_function(
611 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
612 phase_ids: list[str] = Query(default=[]),
613 phase_sync_times: list[float] = Query(default=[]),
614 signal_ids: list[str] = Query(default=[]),
615 window_min_timestamp: float = None,
616 window_max_timestamp: float = None,
617 number_samples_max: int = None,
618) -> SignalsData | None:
619 if len(phase_ids) != len(signal_ids):
620 raise HTTPException(400, "Each selected signal should correspond to a phase")
622 if len(phase_ids) < 2:
623 raise HTTPException(400, "These functions can only be applied to multiple signals")
625 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
626 if None in phases:
627 raise HTTPException(404, "Phase not found")
629 if len(phase_sync_times) == 0:
630 phase_sync_times = [phase.start_at / 1000 for phase in phases]
631 if len(phases) != len(phase_sync_times):
632 raise HTTPException(400, "Number of synchronization times does not match the number of phases")
634 signals_data = await SignalsData.apply_multiple_function(
635 phases,
636 signal_ids,
637 function,
638 window_min_timestamp=window_min_timestamp,
639 window_max_timestamp=window_max_timestamp,
640 )
642 if signals_data is None:
643 raise HTTPException(500, "There was en error while applying the function")
645 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
646 signals_data = signals_data.min_max_downsampling(number_samples_max)
647 if function in {"Align-X", "Using-X"}:
648 signals_data = signals_data.zero_time_vector(phase_sync_times[1])
649 else:
650 signals_data = signals_data.zero_time_vector(phase_sync_times[0])
652 return signals_data
655@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)])
656async def export_post_processing_zip(
657 file_format: str,
658 phase_ids: list[str] = Query(default=[]),
659 phase_sync_times: list[float | None] = Query(default=[]),
660 signal_ids: list[str] = Query(default=[]),
661 window_min_timestamps: list[float | None] = Query(default=[]),
662 window_max_timestamps: list[float | None] = Query(default=[]),
663):
664 signals_data = await get_signals_data_post_processing(
665 phase_ids,
666 phase_sync_times,
667 signal_ids,
668 window_min_timestamps,
669 window_max_timestamps,
670 )
672 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids)
674 return Response(
675 content=zip_data,
676 media_type="application/zip",
677 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
678 )
681@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)])
682async def export_post_processing_hdf5(
683 phase_ids: list[str] = Query(default=[]),
684 phase_sync_times: list[float | None] = Query(default=[]),
685 signal_ids: list[str] = Query(default=[]),
686 window_min_timestamps: list[float | None] = Query(default=[]),
687 window_max_timestamps: list[float | None] = Query(default=[]),
688):
689 signals_data = await get_signals_data_post_processing(
690 phase_ids,
691 phase_sync_times,
692 signal_ids,
693 window_min_timestamps,
694 window_max_timestamps,
695 )
697 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids)
699 return Response(
700 content=data,
701 media_type="application/hdf5",
702 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
703 )
706@app.get("/events", dependencies=[Depends(get_current_active_user)])
707async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
708 return Event.response_from_query(query)
711@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
712async def get_event(event_id) -> Event:
713 event = Event.get_from_id(event_id)
714 if event is None:
715 raise HTTPException(status_code=404, detail="No such event")
716 return event
719@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
720async def get_number_events(
721 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
722) -> list[TwinPadActivity]:
723 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
726@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
727async def get_number_commands(
728 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
729) -> list[TwinPadActivity]:
730 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
733@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
734async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
735 return EventRule.response_from_query(query)
738@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
739async def get_event_rule(event_rule_id) -> EventRule:
740 event_rule = EventRule.get_from_id(event_rule_id)
741 if event_rule is None:
742 raise HTTPException(status_code=404, detail="No such event rule")
743 return event_rule
746@app.post("/users", status_code=201)
747async def create_user(user: User):
748 if User.get_one_by_attribute("email", user.email) is not None:
749 raise HTTPException(status_code=400, detail="An error occurred during account creation")
750 hashed_password = get_password_hash(user.password)
751 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
752 if new_user is None:
753 raise HTTPException(status_code=400, detail="An error occurred during account creation")
754 return new_user
757@app.post("/token", status_code=201)
758async def login_for_access_token(
759 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
760) -> Token:
761 user = authenticate_user(form_data.username, form_data.password)
762 if not user:
763 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
764 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
765 if user.is_active:
766 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
767 access_token = create_access_token(
768 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
769 )
770 return Token(access_token=access_token, token_type="bearer")
773@app.get("/users", dependencies=[Depends(get_current_active_user)])
774async def get_users():
775 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
778@app.get("/users/me", response_model=User)
779async def get_current_user(
780 current_user: Annotated[User, Depends(get_current_active_user)],
781):
782 del current_user.password
783 return current_user
786@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
787async def get_user(user_id: str):
788 user = User.get_from_id(user_id)
790 if user is None:
791 raise HTTPException(
792 status_code=404,
793 detail="User not found",
794 )
795 return user.to_dict(exclude={"password", "is_connected"})
798@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
799async def patch_user(user: UserUpdate, user_id):
800 if user.password == "" or user.password is None:
801 del user.password
802 else:
803 user.password = get_password_hash(user.password)
804 return User.update_info(user, user_id)
807@app.get("/commands", dependencies=[Depends(get_current_active_user)])
808async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
809 return Command.response_from_query(query)
812@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
813async def get_campaigns():
814 return Campaign.get_all()
817@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
818async def get_campaign_by_id(campaign_id: str):
819 campaign = Campaign.get_from_id(campaign_id)
820 if campaign is None:
821 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
822 return campaign
825@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
826async def add_campaign(campaign: Campaign):
827 new_campaign = Campaign.create(campaign)
828 if new_campaign is None:
829 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
830 return new_campaign
833@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
834async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
835 campaign = Campaign.get_from_id(campaign_id)
836 if campaign is None:
837 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
838 campaign.name = edit_campaign.name
839 campaign.description = edit_campaign.description
840 return Campaign.update(campaign)
843@app.delete(
844 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
845)
846async def delete_campaign(campaign_id: str):
847 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
848 campaign = Campaign.get_from_id(campaign_id)
849 if campaign is None:
850 raise exception
851 delete_phases = Phase.deleteMany(campaign_id)
852 if not delete_phases.acknowledged:
853 raise exception
854 campaign_deleted = Campaign.delete(campaign_id)
855 return campaign_deleted.acknowledged
858@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
859async def get_campaign_phases(campaign_id: str):
860 return Phase.get_by_attribute("campaign_id", campaign_id)
863@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
864async def get_phase(phase_id: str):
865 phase = Phase.get_from_id(phase_id)
866 if phase is None:
867 raise HTTPException(status_code=404, detail="Phase not found")
868 return phase
871@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
872async def add_phase(phase: Phase):
873 new_phase = Phase.create(phase)
874 if new_phase is None:
875 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
876 return new_phase
879@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
880async def edit_phase(phase_id, edit_phase: Phase):
881 phase = Phase.get_from_id(phase_id)
882 if phase is None:
883 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
884 phase.name = edit_phase.name
885 phase.description = edit_phase.description
886 phase.start_at = edit_phase.start_at
887 phase.end_at = edit_phase.end_at
888 return Phase.update(phase)
891@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
892async def delete_phase(phase_id: str):
893 phase = Phase.get_from_id(phase_id)
894 if phase is None:
895 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
896 phase_deleted = Phase.delete(phase_id)
897 return phase_deleted.acknowledged
900@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
901async def get_custom_views():
902 return CustomView.get_all()
905@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
906async def get_custom_views_from_user_id(user_id: str):
907 return CustomView.get_by_attribute("user_id", user_id)
910@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
911async def get_custom_view(custom_view_id: str):
912 return CustomView.get_from_id(custom_view_id)
915@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
916async def create_custom_view(
917 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
918):
919 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
920 custom_view.insert()
921 return custom_view
924@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
925async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
926 custom_view = CustomView.get_from_id(custom_view_id)
927 return custom_view.update(custom_view_update.model_dump())
930@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
931async def delete_custom_view(custom_view_id: str):
932 custom_view = CustomView.get_from_id(custom_view_id)
933 return custom_view.delete()
936@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
937async def add_video(video: Video):
938 video.insert()
939 if not video:
940 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
941 return video
944@app.get("/videos", dependencies=[Depends(get_current_active_user)])
945async def get_videos():
946 return Video.get_all()
949@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
950def get_stream(video_id):
951 camera_name = Video.get_video(video_id)
952 if camera_name is None:
953 raise HTTPException(status_code=404, detail="Camera not found")
954 return camera_name
957@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
958async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
959 return SignalsPreset.get_by_attribute("user_id", current_user.id)
962@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
963async def create_signals_preset(
964 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
965):
966 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
967 return new_signals_preset
970@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
971async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
972 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
973 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
976@app.delete(
977 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
978)
979async def delete_signals_preset(signals_preset_id: str):
980 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
981 return signals_preset.delete()
984@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
985async def create_graph_theme(
986 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
987):
988 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
989 if styled_signal is None:
990 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
992 graph_theme = PrivateGraphTheme.create(
993 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
994 )
995 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
998@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
999async def get_all_graph_themes(
1000 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
1001) -> ListResponse[PublicGraphTheme]:
1002 return PublicGraphTheme.response_from_query(query, current_user.id)
1005@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
1006async def get_graph_themes_in_library(
1007 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
1008) -> ListResponse[PublicGraphTheme]:
1009 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
1012@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
1013async def update_graph_theme(
1014 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
1015):
1016 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
1017 update_dict = theme_update.model_dump(exclude_unset=True)
1018 if current_user.id != graph_theme.creator_id:
1019 for theme_property in update_dict.keys():
1020 if theme_property not in ["active_for_user", "in_user_library"]:
1021 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
1022 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
1023 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
1026@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
1027async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
1028 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
1029 if current_user.id != graph_theme.creator_id:
1030 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
1031 return graph_theme.delete()
1034@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
1035async def get_signals_appearances(
1036 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
1037) -> dict:
1038 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)