Coverage for /usr/local/lib/python3.14/site-packages/twinpad_backend/api.py: 99%
476 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-09 13:43 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-09 13:43 +0000
1import os
2import logging
3from pathlib import Path
4import time
5from typing import Annotated
6from datetime import timedelta
7from pyinstrument import Profiler
9from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
10from fastapi.middleware.cors import CORSMiddleware
11from fastapi.security import OAuth2PasswordRequestForm
13from twinpad_backend import __version__
14from twinpad_backend.db import signal_datasize
15from twinpad_backend.models import (
16 DeviceId,
17 MongoId,
18 Signal,
19 ForcedSignal,
20 SignalData,
21 SignalSample,
22 ServicesStatus,
23 Device,
24 DeviceUpdate,
25 DeviceSetup,
26 DeviceSetupUpdate,
27 DeviceState,
28 SignalUpdate,
29 SignalsData,
30 Event,
31 EventRule,
32 TwinPadActivity,
33 User,
34 UserUpdate,
35 Campaign,
36 Phase,
37 CustomView,
38 Command,
39 CustomViewCreation,
40 CustomViewUpdate,
41 Video,
42 SignalsPreset,
43 SignalsPresetCreation,
44 SignalsPresetUpdate,
45 PrivateGraphTheme,
46 PublicGraphTheme,
47 GraphThemeCreation,
48 GraphThemeUpdate,
49 SINGLE_POST_PROCESSING_FUNCTION,
50 DOUBLE_POST_PROCESSING_FUNCTION,
51 MULTIPLE_POST_PROCESSING_FUNCTION,
52)
53from twinpad_backend.auth import (
54 Token,
55 authenticate_user,
56 get_current_active_user,
57 ACCESS_TOKEN_EXPIRE_MINUTES,
58 create_access_token,
59 get_password_hash,
60)
61from twinpad_backend.queries import (
62 SignalQuery,
63 ForcedSignalQuery,
64 DeviceStatesQuery,
65 EventQuery,
66 EventRuleQuery,
67 CommandQuery,
68 GraphThemeQuery,
69)
70from twinpad_backend.responses import ListResponse
71from twinpad_backend.routes.deployers import router as deployers_router
73REQUEST_TIME_WARNING = 0.5
75DEBUG = os.environ.get("DEBUG", "false") == "true"
76PROFILING = os.environ.get("PROFILING", "false") == "true"
77DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true"
79logger = logging.getLogger("uvicorn.error")
80logger.propagate = False
81logger.info("Debug mode: %s", DEBUG)
82logger.info("log level: %s", logging.root.level)
85app = FastAPI(title="Twinpad backend", version=__version__)
87app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
89if PROFILING: # pragma: no cover
90 profiling_folder = "/tmp/twinpad_profiling"
91 Path(profiling_folder).mkdir(parents=True, exist_ok=True)
92 logger.info("Profiling enabled")
94 @app.middleware("http")
95 async def profile_request(request: Request, call_next):
96 should_profile = True
97 url = str(request.url)
98 for segment in ("profiling", ".ico"):
99 if segment in url:
100 should_profile = False
101 break
103 if should_profile: # avoid recursion
104 profiler = Profiler()
105 profiler.start()
107 response = await call_next(request)
109 profiler.stop()
110 url = "_".join(url.split("/")[3:]).rstrip("/")
111 if not url:
112 url = "slash"
113 filename = url.split("?", maxsplit=1)[0]
114 logger.info("saving profiling to %s", filename)
115 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
116 profiling_file.write(profiler.output_html())
118 return response
120 return await call_next(request)
122 @app.get("/profilings")
123 async def profilings():
124 return {"profilings": os.listdir(profiling_folder)}
126 @app.get("/profilings/{file_name}")
127 async def profiling(file_name):
128 file_path = os.path.join(profiling_folder, file_name)
130 if not os.path.exists(file_path):
131 raise HTTPException(
132 status_code=404,
133 detail=f"Profiling file '{file_name}' not found",
134 )
136 with open(file_path, "r", encoding="utf-8") as profiling_file:
137 return Response(
138 content=profiling_file.read(),
139 media_type="application/html",
140 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
141 )
144@app.middleware("http")
145async def log_request_time(request: Request, call_next):
146 start_time = time.time() # Record the start time
147 response = await call_next(request) # Process the request
148 duration = time.time() - start_time # Calculate the time taken
149 client_ip = request.headers.get("x-forwarded-for", request.client.host)
150 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
151 if duration > REQUEST_TIME_WARNING:
152 logger.warning(message)
153 else:
154 logger.info(message)
155 return response
158@app.get("/")
159async def slash():
160 return {"twinpad_version": __version__}
163@app.get("/status", dependencies=[Depends(get_current_active_user)])
164async def status():
165 """
166 Return service healthcheck
167 """
168 return {
169 "services": ServicesStatus.check(),
170 }
173@app.get("/devices", dependencies=[Depends(get_current_active_user)])
174async def get_devices() -> list[Device]:
175 return Device.get_all(sort_by="device_id")
178@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
179async def get_device(device_id) -> Device:
180 device = Device.get_one_by_attribute("device_id", device_id)
181 if not device:
182 raise HTTPException(
183 status_code=404,
184 detail="Device not found",
185 )
186 return device
189@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
190async def update_item(
191 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
192):
193 device = Device.get_one_by_attribute("device_id", device_id)
194 if not device:
195 raise HTTPException(
196 status_code=404,
197 detail="Device not found",
198 )
199 result = await device.change_mode(device_update, current_user)
200 if result.get("error", False) is True:
201 raise HTTPException(
202 status_code=result.get("status_code", 500),
203 detail=result.get("message", "An error has occurred"),
204 )
205 return result
208@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
209async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
210 return DeviceState.get_from_id_and_query(device_id, query)
213@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
214async def get_device_setups() -> list[DeviceSetup]:
215 return DeviceSetup.get_all()
218@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
219async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
220 device_setup.insert()
221 return device_setup
224@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
225async def get_device_setup(device_setup_id: str):
226 device_setup = DeviceSetup.get_from_id(device_setup_id)
227 if device_setup is None:
228 raise HTTPException(
229 status_code=404,
230 detail="Device setup not found",
231 )
232 return device_setup
235@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
236async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
237 device_setup = DeviceSetup.get_from_id(device_setup_id)
238 if device_setup is None:
239 raise HTTPException(
240 status_code=404,
241 detail="Device setup not found",
242 )
243 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
244 return device_setup
247@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
248async def delete_device_setups(device_setup_id: str) -> bool:
249 device_setup = DeviceSetup.get_from_id(device_setup_id)
250 if device_setup is None:
251 raise HTTPException(
252 status_code=404,
253 detail="Device setup not found",
254 )
255 deleted = device_setup.delete()
256 return deleted
259@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
260async def get_number_samples(
261 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
262) -> list[TwinPadActivity]:
263 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
266@app.get("/signals", dependencies=[Depends(get_current_active_user)])
267async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
268 if "signal_id" not in query.sort_by:
269 query.sort_by += ",signal_id:1"
270 return Signal.response_from_query(query).to_dict(exclude={"device"})
273@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
274async def signals_names() -> list[str]:
275 return Signal.get_all_ids()
278@app.get("/signals/statuses", dependencies=[Depends(get_current_active_user)])
279async def signals_statuses() -> list[dict[str, str]]:
280 return Signal.get_all_statuses()
283@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
284async def signal_stats():
285 """
286 Returns signals stats
287 """
288 signal_statuses = Signal.get_all_statuses()
289 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
290 number_samples = Signal.total_number_samples()
291 number_signals = Signal.get_number_documents()
293 return {
294 "signal_data_size": signal_datasize(),
295 "number_signal_samples": number_samples,
296 "number_active_signals": number_active_signals,
297 "number_signals": number_signals,
298 }
301@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
302async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
303 return SignalSample.get_last_from_signal_ids(signal_ids)
306@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
307async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
308 return SignalSample.get_first_from_signal_ids(signal_ids)
311@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
312async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
313 return Signal.get_forcibility(signal_ids)
316@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)])
317async def get_forced_signals(
318 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends()
319):
320 if not current_user.is_admin:
321 raise HTTPException(401)
322 return ForcedSignal.response_from_query(query)
325@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
326async def get_signal(signal_id):
327 signal = Signal.get_from_signal_id(signal_id)
328 if not signal:
329 raise HTTPException(
330 status_code=404,
331 detail="Signal not found",
332 )
333 return signal.to_dict()
336@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
337async def update_signal(
338 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
339):
340 signal = Signal.get_from_signal_id(signal_id)
341 if not signal:
342 raise HTTPException(
343 status_code=404,
344 detail="Device not found",
345 )
346 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id)
347 if forced_signal is not None:
348 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
349 raise HTTPException(
350 status_code=403,
351 detail="Cannot override another user's forcing",
352 )
354 result = await signal.send_command(signal_update, current_user)
355 if result.get("error", False) is True:
356 raise HTTPException(
357 status_code=result.get("status_code", 500),
358 detail=result.get("message", "An error has occurred"),
359 )
361 if forced_signal is not None and signal_update.forced_value is None:
362 forced_signal.delete()
363 elif signal_update.forced_value is not None:
364 forced_signal = ForcedSignal(
365 signal_id=signal_id,
366 forcing_user_id=current_user.id,
367 forced_at=time.time(),
368 value=signal_update.forced_value,
369 )
370 forced_signal.insert()
372 return result
375@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)])
376async def get_signal_forcibility(
377 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)]
378) -> bool:
379 return ForcedSignal.can_force(signal_id, current_user)
382@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
383async def get_signal_data(
384 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
385) -> SignalData | None:
386 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
388 if number_samples_max is not None:
389 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
391 return signal_data
394@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
395async def get_last_value(signal_id) -> SignalSample:
396 sample = SignalSample.get_last_from_signal_id(signal_id)
397 if sample is None:
398 raise HTTPException(status_code=404, detail="No data")
399 return sample
402@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
403async def get_first_value(signal_id) -> SignalSample:
404 sample = SignalSample.get_first_from_signal_id(signal_id)
405 if sample is None:
406 raise HTTPException(status_code=404, detail="No data")
407 return sample
410@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
411async def get_signal_number_samples(signal_id):
412 signal = Signal.get_from_signal_id(signal_id)
413 if not signal:
414 raise HTTPException(
415 status_code=404,
416 detail="Signal not found",
417 )
418 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
421@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
422async def get_signals_data(
423 signal_ids: list[str] = Query(default=[]),
424 number_samples_max: int = None,
425 min_timestamp: float = None,
426 max_timestamp: float = None,
427 interpolate_bounds: bool = True,
428) -> SignalsData | None:
429 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
430 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
432 signals_data = SignalsData.get_from_signal_ids(
433 signal_ids,
434 min_timestamp=min_timestamp,
435 max_timestamp=max_timestamp,
436 window_min_timestamp=min_timestamp,
437 window_max_timestamp=max_timestamp,
438 interpolate_bounds=interpolate_bounds,
439 )
440 if number_samples_max is not None:
441 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
443 return signals_data
446@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
447async def get_signals_data_interest_window(
448 signal_ids: list[str] = Query(default=[]),
449 window_max_number_samples: int = None,
450 outside_max_number_samples: int = None,
451 window_min_timestamp: float = None,
452 window_max_timestamp: float = None,
453 min_timestamp: float = None,
454 max_timestamp: float = None,
455) -> SignalsData | None:
456 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
457 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
459 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
460 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
462 max_documents = 0
464 if window_max_number_samples is not None:
465 max_documents += 10 * window_max_number_samples
466 if outside_max_number_samples is not None:
467 max_documents += 10 * outside_max_number_samples
469 if max_documents == 0:
470 max_documents = None
472 signals_data = SignalsData.get_from_signal_ids(
473 signal_ids,
474 min_timestamp=min_timestamp,
475 max_timestamp=max_timestamp,
476 window_min_timestamp=window_min_timestamp,
477 window_max_timestamp=window_max_timestamp,
478 max_documents=max_documents,
479 )
481 signals_data = signals_data.interest_window_desampling(
482 window_max_number_samples=window_max_number_samples,
483 outside_max_number_samples=outside_max_number_samples,
484 window_min_timestamp=window_min_timestamp,
485 window_max_timestamp=window_max_timestamp,
486 )
488 return signals_data
491@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
492async def export_signals_zip(
493 file_format: str,
494 signal_ids: list[str] = Query(default=[]),
495 min_timestamp: float = None,
496 max_timestamp: float = None,
497):
498 signals_data = SignalsData.get_from_signal_ids(
499 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
500 )
501 zip_data = signals_data.zip_export(file_format)
502 return Response(
503 content=zip_data,
504 media_type="application/zip",
505 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
506 )
509@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
510async def export_signals_hdf5(
511 signal_ids: list[str] = Query(default=[]),
512 min_timestamp: float = None,
513 max_timestamp: float = None,
514):
515 signals_data = SignalsData.get_from_signal_ids(
516 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
517 )
518 data = signals_data.hdf5_export()
519 return Response(
520 content=data,
521 media_type="application/hdf5",
522 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
523 )
526@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)])
527async def get_signals_data_post_processing(
528 phase_ids: list[str] = Query(default=[]),
529 phase_sync_times: list[float | None] = Query(default=[]),
530 signal_ids: list[str] = Query(default=[]),
531 window_min_timestamps: list[float | None] = Query(default=[]),
532 window_max_timestamps: list[float | None] = Query(default=[]),
533 number_samples_max: int = None,
534) -> SignalsData | None:
535 if len(phase_sync_times) == 0:
536 phase_sync_times = [None for _ in range(len(phase_ids))]
537 if len(window_min_timestamps) == 0:
538 window_min_timestamps = [None for _ in range(len(phase_ids))]
539 if len(window_max_timestamps) == 0:
540 window_max_timestamps = [None for _ in range(len(phase_ids))]
542 if (
543 len(phase_ids) != len(phase_sync_times)
544 or len(phase_ids) != len(window_min_timestamps)
545 or len(phase_ids) != len(window_max_timestamps)
546 ):
547 raise HTTPException(
548 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps."
549 )
551 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
553 if None in phases:
554 raise HTTPException(404, "Phase not found")
556 signals_data = SignalsData.get_from_phase_and_signal_ids(
557 phases=phases,
558 phase_sync_times=phase_sync_times,
559 signal_ids=signal_ids,
560 window_min_timestamps=window_min_timestamps,
561 window_max_timestamps=window_max_timestamps,
562 )
564 if number_samples_max is not None:
565 signals_data = signals_data.min_max_downsampling(number_samples_max)
567 return signals_data
570@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)])
571async def apply_single_post_processing_function(
572 phase_id: str,
573 base_signal_id: str,
574 function: SINGLE_POST_PROCESSING_FUNCTION,
575 phase_sync_time: float = None,
576 window_min_timestamp: float = None,
577 window_max_timestamp: float = None,
578 number_samples_max: int = None,
579) -> SignalsData | None:
580 phase = Phase.get_from_id(phase_id)
582 if phase is None:
583 raise HTTPException(404, "Phase not found")
584 if phase_sync_time is None:
585 phase_sync_time = phase.start_at / 1000
587 signals_data = await SignalsData.apply_single_function(
588 phase,
589 base_signal_id,
590 function,
591 window_min_timestamp=window_min_timestamp,
592 window_max_timestamp=window_max_timestamp,
593 )
595 if signals_data is None:
596 raise HTTPException(500, "There was en error while applying the function")
598 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
599 signals_data = signals_data.min_max_downsampling(number_samples_max)
600 signals_data = signals_data.zero_time_vector(phase_sync_time)
602 return signals_data
605@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)])
606async def apply_multiple_post_processing_function(
607 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
608 phase_ids: list[str] = Query(default=[]),
609 phase_sync_times: list[float] = Query(default=[]),
610 signal_ids: list[str] = Query(default=[]),
611 window_min_timestamp: float = None,
612 window_max_timestamp: float = None,
613 number_samples_max: int = None,
614) -> SignalsData | None:
615 if len(phase_ids) != len(signal_ids):
616 raise HTTPException(400, "Each selected signal should correspond to a phase")
618 if len(signal_ids) < 2:
619 raise HTTPException(400, "These functions can only be applied to multiple signals")
621 if len(phase_ids) != len(phase_sync_times) and len(phase_sync_times) != 0:
622 raise HTTPException(400, "Number of synchronization times does not match the number of phases")
624 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
625 if None in phases:
626 raise HTTPException(404, "Phase not found")
628 if len(phase_sync_times) == 0:
629 phase_sync_times = [phase.start_at / 1000 for phase in phases]
631 signals_data = await SignalsData.apply_multiple_function(
632 phases,
633 signal_ids,
634 function,
635 window_min_timestamp=window_min_timestamp,
636 window_max_timestamp=window_max_timestamp,
637 )
639 if signals_data is None:
640 raise HTTPException(500, "There was en error while applying the function")
642 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
643 signals_data = signals_data.min_max_downsampling(number_samples_max)
644 if function in {"Align-X", "Using-X"}:
645 signals_data = signals_data.zero_time_vector(phase_sync_times[1])
646 else:
647 signals_data = signals_data.zero_time_vector(phase_sync_times[0])
649 return signals_data
652@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)])
653async def export_post_processing_zip(
654 file_format: str,
655 phase_ids: list[MongoId] = Query(default=[]),
656 phase_sync_times: list[float | None] = Query(default=[]),
657 signal_ids: list[str] = Query(default=[]),
658 window_min_timestamps: list[float | None] = Query(default=[]),
659 window_max_timestamps: list[float | None] = Query(default=[]),
660):
661 signals_data = await get_signals_data_post_processing(
662 phase_ids,
663 phase_sync_times,
664 signal_ids,
665 window_min_timestamps,
666 window_max_timestamps,
667 )
669 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids)
671 return Response(
672 content=zip_data,
673 media_type="application/zip",
674 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
675 )
678@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)])
679async def export_post_processing_hdf5(
680 phase_ids: list[str] = Query(default=[]),
681 phase_sync_times: list[float | None] = Query(default=[]),
682 signal_ids: list[str] = Query(default=[]),
683 window_min_timestamps: list[float | None] = Query(default=[]),
684 window_max_timestamps: list[float | None] = Query(default=[]),
685):
686 signals_data = await get_signals_data_post_processing(
687 phase_ids,
688 phase_sync_times,
689 signal_ids,
690 window_min_timestamps,
691 window_max_timestamps,
692 )
694 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids)
696 return Response(
697 content=data,
698 media_type="application/hdf5",
699 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
700 )
703@app.get("/events", dependencies=[Depends(get_current_active_user)])
704async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
705 return Event.response_from_query(query)
708@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
709async def get_event(event_id) -> Event:
710 event = Event.get_from_id(event_id)
711 if event is None:
712 raise HTTPException(status_code=404, detail="No such event")
713 return event
716@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
717async def get_number_events(
718 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
719) -> list[TwinPadActivity]:
720 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
723@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
724async def get_number_commands(
725 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
726) -> list[TwinPadActivity]:
727 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
730@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
731async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
732 return EventRule.response_from_query(query)
735@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
736async def get_event_rule(event_rule_id) -> EventRule:
737 event_rule = EventRule.get_from_id(event_rule_id)
738 if event_rule is None:
739 raise HTTPException(status_code=404, detail="No such event rule")
740 return event_rule
743@app.post("/users", status_code=201)
744async def create_user(user: User):
745 if User.get_one_by_attribute("email", user.email) is not None:
746 raise HTTPException(status_code=400, detail="An error occurred during account creation")
747 hashed_password = get_password_hash(user.password)
748 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
749 if new_user is None: # pragma: no cover
750 raise HTTPException(status_code=400, detail="An error occurred during account creation")
751 return new_user
754@app.post("/token", status_code=201)
755async def login_for_access_token(
756 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
757) -> Token:
758 user = authenticate_user(form_data.username, form_data.password)
759 if not user:
760 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
761 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
762 if user.is_blocked:
763 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
764 access_token = create_access_token(
765 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
766 )
767 return Token(access_token=access_token, token_type="bearer")
770@app.get("/users", dependencies=[Depends(get_current_active_user)])
771async def get_users():
772 return [u.to_dict() for u in User.get_all(sort_by="email")]
775@app.get("/users/me")
776async def get_current_user(
777 current_user: Annotated[User, Depends(get_current_active_user)],
778):
779 return current_user.to_dict()
782@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
783async def get_user(user_id: str):
784 user = User.get_from_id(user_id)
786 if user is None:
787 raise HTTPException(
788 status_code=404,
789 detail="User not found",
790 )
791 return user.to_dict(exclude={"password", "is_connected"})
794@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
795async def patch_user(user: UserUpdate, user_id):
796 if user.password == "" or user.password is None:
797 del user.password
798 else:
799 user.password = get_password_hash(user.password)
800 return User.update_info(user, user_id).to_dict()
803@app.get("/commands", dependencies=[Depends(get_current_active_user)])
804async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
805 return Command.response_from_query(query)
808@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
809async def get_campaigns():
810 return Campaign.get_all()
813@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
814async def get_campaign_by_id(campaign_id: str):
815 campaign = Campaign.get_from_id(campaign_id)
816 if campaign is None:
817 raise HTTPException(status_code=404, detail="Campaign not found")
818 return campaign
821@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
822async def add_campaign(campaign: Campaign):
823 campaign_id = campaign.insert()
824 if campaign_id is None: # pragma: no cover
825 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
826 return campaign
829@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
830async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
831 campaign = Campaign.get_from_id(campaign_id)
832 if campaign is None:
833 raise HTTPException(status_code=404, detail="Campaign not found")
834 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json"))
835 return campaign
838@app.delete(
839 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
840)
841async def delete_campaign(campaign_id: str):
842 campaign = Campaign.get_from_id(campaign_id)
843 if campaign is None:
844 raise HTTPException(status_code=404, detail="Campaign not found")
845 delete_phases = Phase.deleteMany(campaign_id)
846 if not delete_phases.acknowledged: # pragma: no cover
847 raise HTTPException(status_code=500, detail="An error occurred during phases deletion")
848 campaign_deleted = campaign.delete()
849 if not campaign_deleted: # pragma: no cover
850 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion")
851 return True
854@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
855async def get_campaign_phases(campaign_id: str):
856 return Phase.get_by_attribute("campaign_id", campaign_id)
859@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
860async def get_phase(phase_id: str):
861 phase = Phase.get_from_id(phase_id)
862 if phase is None:
863 raise HTTPException(status_code=404, detail="Phase not found")
864 return phase
867@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
868async def add_phase(phase: Phase):
869 phase_id = phase.insert()
870 if phase_id is None: # pragma: no cover
871 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
872 return phase
875@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
876async def edit_phase(phase_id, edit_phase: Phase):
877 phase = Phase.get_from_id(phase_id)
878 if phase is None:
879 raise HTTPException(status_code=404, detail="Phase does not exists")
880 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json"))
881 return phase
884@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
885async def delete_phase(phase_id: str):
886 phase = Phase.get_from_id(phase_id)
887 if phase is None:
888 raise HTTPException(status_code=404, detail="Phase not found")
889 deleted = phase.delete()
890 if not deleted: # pragma: no cover
891 raise HTTPException(status_code=500, detail="An error occurred during phase deletion")
892 return True
895@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
896async def get_custom_views():
897 return CustomView.get_all()
900@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
901async def get_custom_views_from_user_id(user_id: str):
902 return CustomView.get_by_attribute("user_id", user_id)
905@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
906async def get_custom_view(custom_view_id: str):
907 return CustomView.get_from_id(custom_view_id)
910@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
911async def create_custom_view(
912 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
913):
914 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
915 custom_view.insert()
916 return custom_view
919@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
920async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
921 custom_view = CustomView.get_from_id(custom_view_id)
922 return custom_view.update(custom_view_update.model_dump())
925@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
926async def delete_custom_view(custom_view_id: str):
927 custom_view = CustomView.get_from_id(custom_view_id)
928 return custom_view.delete()
931@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
932async def add_video(video: Video):
933 video.insert()
934 if not video: # pragma: no cover
935 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
936 return video
939@app.get("/videos", dependencies=[Depends(get_current_active_user)])
940async def get_videos():
941 return Video.get_all()
944@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
945def get_stream(video_id):
946 camera_name = Video.get_video(video_id)
947 if camera_name is None:
948 raise HTTPException(status_code=404, detail="Camera not found")
949 return camera_name
952@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
953async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
954 return SignalsPreset.get_by_attribute("user_id", current_user.id)
957@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
958async def create_signals_preset(
959 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
960):
961 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
962 return new_signals_preset
965@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
966async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
967 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
968 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
971@app.delete(
972 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
973)
974async def delete_signals_preset(signals_preset_id: str):
975 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
976 return signals_preset.delete()
979@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
980async def create_graph_theme(
981 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
982):
983 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
984 if styled_signal is None:
985 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
987 graph_theme = PrivateGraphTheme.create(
988 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
989 )
990 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
993@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
994async def get_all_graph_themes(
995 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
996) -> ListResponse[PublicGraphTheme]:
997 return PublicGraphTheme.response_from_query(query, current_user.id)
1000@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
1001async def get_graph_themes_in_library(
1002 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
1003) -> ListResponse[PublicGraphTheme]:
1004 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
1007@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
1008async def update_graph_theme(
1009 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
1010):
1011 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
1012 update_dict = theme_update.model_dump(exclude_unset=True)
1013 if current_user.id != graph_theme.creator_id:
1014 for theme_property in update_dict.keys():
1015 if theme_property not in ["active_for_user", "in_user_library"]:
1016 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
1017 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
1018 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
1021@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
1022async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
1023 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
1024 if current_user.id != graph_theme.creator_id:
1025 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
1026 return graph_theme.delete()
1029@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
1030async def get_signals_appearances(
1031 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
1032) -> dict:
1033 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)
1036if DEVICE_DEPLOYERS:
1037 app.include_router(deployers_router, prefix="/device-deployers")