Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / api.py: 96%
491 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-23 13:11 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-23 13:11 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 DeviceId,
19 MongoId,
20 Signal,
21 ForcedSignal,
22 SignalData,
23 SignalSample,
24 ServicesStatus,
25 Device,
26 DeviceUpdate,
27 DeviceSetup,
28 DeviceSetupUpdate,
29 DeviceState,
30 SignalUpdate,
31 SignalsData,
32 Event,
33 EventRule,
34 TwinPadActivity,
35 User,
36 UserUpdate,
37 Campaign,
38 Phase,
39 CustomView,
40 Command,
41 CustomViewCreation,
42 CustomViewUpdate,
43 Video,
44 SignalsPreset,
45 SignalsPresetCreation,
46 SignalsPresetUpdate,
47 PrivateGraphTheme,
48 PublicGraphTheme,
49 GraphThemeCreation,
50 GraphThemeUpdate,
51 SINGLE_POST_PROCESSING_FUNCTION,
52 DOUBLE_POST_PROCESSING_FUNCTION,
53 MULTIPLE_POST_PROCESSING_FUNCTION,
54)
55from twinpad_backend.auth import (
56 Token,
57 authenticate_user,
58 get_current_active_user,
59 ACCESS_TOKEN_EXPIRE_MINUTES,
60 create_access_token,
61 get_password_hash,
62)
63from twinpad_backend.queries import (
64 SignalQuery,
65 ForcedSignalQuery,
66 DeviceStatesQuery,
67 EventQuery,
68 EventRuleQuery,
69 CommandQuery,
70 GraphThemeQuery,
71)
72from twinpad_backend.responses import ListResponse
73from twinpad_backend.routes.deployers import router as deployers_router
75REQUEST_TIME_WARNING = 0.5
77DEBUG = os.environ.get("DEBUG", "false") == "true"
78PROFILING = os.environ.get("PROFILING", "false") == "true"
79DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true"
81logger = logging.getLogger("uvicorn.error")
82logger.propagate = False
83logger.info("Debug mode: %s", DEBUG)
84logger.info("log level: %s", logging.root.level)
87app = FastAPI(title="Twinpad backend", version=__version__)
89app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
91if PROFILING: # pragma: no cover
92 profiling_folder = mkdtemp()
93 logger.info("Profiling enabled")
95 @app.middleware("http")
96 async def profile_request(request: Request, call_next):
97 should_profile = True
98 url = str(request.url)
99 for segment in ("profiling", ".ico"):
100 if segment in url:
101 should_profile = False
102 break
104 if should_profile: # avoid recursion
105 profiler = Profiler()
106 profiler.start()
108 response = await call_next(request)
110 profiler.stop()
111 url = "_".join(url.split("/")[3:]).rstrip("/")
112 if not url:
113 url = "slash"
114 filename = url.split("?", maxsplit=1)[0]
115 logger.info("saving profiling to %s", filename)
116 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
117 profiling_file.write(profiler.output_html())
119 return response
121 return await call_next(request)
123 @app.get("/profilings")
124 async def profilings():
125 return {"profilings": os.listdir(profiling_folder)}
127 @app.get("/profilings/{file_name}")
128 async def profiling(file_name):
129 file_path = os.path.join(profiling_folder, file_name)
131 if not os.path.exists(file_path):
132 raise HTTPException(
133 status_code=404,
134 detail=f"Profiling file '{file_name}' not found",
135 )
137 with open(file_path, "r", encoding="utf-8") as profiling_file:
138 return Response(
139 content=profiling_file.read(),
140 media_type="application/html",
141 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
142 )
145@app.middleware("http")
146async def log_request_time(request: Request, call_next):
147 start_time = time.time() # Record the start time
148 response = await call_next(request) # Process the request
149 duration = time.time() - start_time # Calculate the time taken
150 client_ip = request.headers.get("x-forwarded-for", request.client.host)
151 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
152 if duration > REQUEST_TIME_WARNING:
153 logger.warning(message)
154 else:
155 logger.info(message)
156 return response
159@app.get("/")
160async def slash():
161 return {"twinpad_version": __version__}
164@app.get("/status", dependencies=[Depends(get_current_active_user)])
165async def status():
166 """
167 Return service healthcheck
168 """
169 return {
170 "services": ServicesStatus.check(),
171 }
174@app.get("/devices", dependencies=[Depends(get_current_active_user)])
175async def get_devices() -> list[Device]:
176 return Device.get_all(sort_by="device_id")
179@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
180async def get_device(device_id) -> Device:
181 device = Device.get_one_by_attribute("device_id", device_id)
182 if not device:
183 raise HTTPException(
184 status_code=404,
185 detail="Device not found",
186 )
187 return device
190@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
191async def update_item(
192 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
193):
194 device = Device.get_one_by_attribute("device_id", device_id)
195 if not device:
196 raise HTTPException(
197 status_code=404,
198 detail="Device not found",
199 )
200 result = await device.change_mode(device_update, current_user)
201 if result.get("error", False) is True:
202 raise HTTPException(
203 status_code=result.get("status_code", 500),
204 detail=result.get("message", "An error has occurred"),
205 )
206 return result
209@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
210async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
211 return DeviceState.get_from_id_and_query(device_id, query)
214@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
215async def get_device_setups() -> list[DeviceSetup]:
216 return DeviceSetup.get_all()
219@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
220async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
221 device_setup.insert()
222 return device_setup
225@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
226async def get_device_setup(device_setup_id: str):
227 device_setup = DeviceSetup.get_from_id(device_setup_id)
228 if device_setup is None:
229 raise HTTPException(
230 status_code=404,
231 detail="Device setup not found",
232 )
233 return device_setup
236@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
237async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
238 device_setup = DeviceSetup.get_from_id(device_setup_id)
239 if device_setup is None:
240 raise HTTPException(
241 status_code=404,
242 detail="Device setup not found",
243 )
244 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
245 return device_setup
248@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
249async def delete_device_setups(device_setup_id: str) -> bool:
250 device_setup = DeviceSetup.get_from_id(device_setup_id)
251 if device_setup is None:
252 raise HTTPException(
253 status_code=404,
254 detail="Device setup not found",
255 )
256 deleted = device_setup.delete()
257 return deleted
260@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
261async def get_number_samples(
262 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
263) -> list[TwinPadActivity]:
264 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
267@app.get("/signals", dependencies=[Depends(get_current_active_user)])
268async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
269 if "signal_id" not in query.sort_by:
270 query.sort_by += ",signal_id:1"
271 return Signal.response_from_query(query).to_dict(exclude={"device"})
274@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
275async def signals_names() -> list[str]:
276 return Signal.get_all_ids()
279@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
280async def signal_stats():
281 """
282 Returns signals stats
283 """
284 signal_statuses = Signal.get_all_statuses()
285 signal_ids = [signal["signal_id"] for signal in signal_statuses]
287 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids)
288 number_samples = sum(number_samples_by_signal_id.values())
290 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
292 number_signals = Signal.get_number_documents()
294 return {
295 "signal_data_size": signal_datasize(),
296 "number_signal_samples": number_samples,
297 "number_active_signals": number_active_signals,
298 "number_signals": number_signals,
299 }
302@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
303async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
304 return SignalSample.get_last_from_signal_ids(signal_ids)
307@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
308async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
309 return SignalSample.get_first_from_signal_ids(signal_ids)
312@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
313async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
314 return Signal.get_forcibility(signal_ids)
317@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)])
318async def get_forced_signals(
319 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends()
320):
321 if not current_user.is_admin:
322 raise HTTPException(401)
323 return ForcedSignal.response_from_query(query)
326@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
327async def get_signal(signal_id):
328 signal = Signal.get_from_signal_id(signal_id)
329 if not signal:
330 raise HTTPException(
331 status_code=404,
332 detail="Signal not found",
333 )
334 return signal.to_dict()
337@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
338async def update_signal(
339 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
340):
341 signal = Signal.get_from_signal_id(signal_id)
342 if not signal:
343 raise HTTPException(
344 status_code=404,
345 detail="Device not found",
346 )
347 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id)
348 if forced_signal is not None:
349 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
350 raise HTTPException(
351 status_code=403,
352 detail="Cannot override another user's forcing",
353 )
355 result = await signal.send_command(signal_update, current_user)
356 if result.get("error", False) is True:
357 raise HTTPException(
358 status_code=result.get("status_code", 500),
359 detail=result.get("message", "An error has occurred"),
360 )
362 if forced_signal is not None and signal_update.forced_value is None:
363 forced_signal.delete()
364 elif signal_update.forced_value is not None:
365 forced_signal = ForcedSignal(
366 signal_id=signal_id,
367 forcing_user_id=current_user.id,
368 forced_at=time.time(),
369 value=signal_update.forced_value,
370 )
371 forced_signal.insert()
373 return result
376@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)])
377async def get_signal_forcibility(
378 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)]
379) -> bool:
380 return ForcedSignal.can_force(signal_id, current_user)
383@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
384async def get_signal_data(
385 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
386) -> SignalData | None:
387 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
389 if number_samples_max is not None:
390 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
392 return signal_data
395@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
396async def get_last_value(signal_id) -> SignalSample:
397 sample = SignalSample.get_last_from_signal_id(signal_id)
398 if sample is None:
399 raise HTTPException(status_code=404, detail="No data")
400 return sample
403@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
404async def get_first_value(signal_id) -> SignalSample:
405 sample = SignalSample.get_first_from_signal_id(signal_id)
406 if sample is None:
407 raise HTTPException(status_code=404, detail="No data")
408 return sample
411@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
412async def get_signal_number_samples(signal_id):
413 signal = Signal.get_from_signal_id(signal_id)
414 if not signal:
415 raise HTTPException(
416 status_code=404,
417 detail="Device not found",
418 )
419 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
422@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
423async def get_signals_data(
424 signal_ids: list[str] = Query(default=[]),
425 number_samples_max: int = None,
426 min_timestamp: float = None,
427 max_timestamp: float = None,
428 interpolate_bounds: bool = True,
429) -> SignalsData | None:
430 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
431 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
433 signals_data = SignalsData.get_from_signal_ids(
434 signal_ids,
435 min_timestamp=min_timestamp,
436 max_timestamp=max_timestamp,
437 window_min_timestamp=min_timestamp,
438 window_max_timestamp=max_timestamp,
439 interpolate_bounds=interpolate_bounds,
440 )
441 if number_samples_max is not None:
442 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
444 return signals_data
447@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
448async def get_signals_data_interest_window(
449 window_max_number_samples: int | None = None,
450 outside_max_number_samples: int | None = None,
451 window_min_timestamp: float = None,
452 window_max_timestamp: float = None,
453 signal_ids: list[str] = Query(default=[]),
454 min_timestamp: float = None,
455 max_timestamp: float = None,
456) -> SignalsData | None:
457 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
458 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
460 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
461 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
463 max_documents = 0
465 if window_max_number_samples is not None:
466 max_documents += 10 * window_max_number_samples
467 if outside_max_number_samples is not None:
468 max_documents += 10 * outside_max_number_samples
470 if max_documents == 0:
471 max_documents = None
473 signals_data = SignalsData.get_from_signal_ids(
474 signal_ids,
475 min_timestamp=min_timestamp,
476 max_timestamp=max_timestamp,
477 window_min_timestamp=window_min_timestamp,
478 window_max_timestamp=window_max_timestamp,
479 max_documents=max_documents,
480 )
482 signals_data = signals_data.interest_window_desampling(
483 window_max_number_samples=window_max_number_samples,
484 outside_max_number_samples=outside_max_number_samples,
485 window_min_timestamp=window_min_timestamp,
486 window_max_timestamp=window_max_timestamp,
487 )
489 return signals_data
492@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
493async def export_signals_zip(
494 file_format: str,
495 signal_ids: list[str] = Query(default=[]),
496 min_timestamp: float = None,
497 max_timestamp: float = None,
498):
499 signals_data = SignalsData.get_from_signal_ids(
500 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
501 )
502 zip_data = signals_data.zip_export(file_format)
503 return Response(
504 content=zip_data,
505 media_type="application/zip",
506 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
507 )
510@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
511async def export_signals_hdf5(
512 signal_ids: list[str] = Query(default=[]),
513 min_timestamp: float = None,
514 max_timestamp: float = None,
515):
516 signals_data = SignalsData.get_from_signal_ids(
517 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
518 )
519 data = signals_data.hdf5_export()
520 return Response(
521 content=data,
522 media_type="application/hdf5",
523 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
524 )
527@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)])
528async def get_signals_data_post_processing(
529 phase_ids: list[str] = Query(default=[]),
530 phase_sync_times: list[float | None] = Query(default=[]),
531 signal_ids: list[str] = Query(default=[]),
532 window_min_timestamps: list[float | None] = Query(default=[]),
533 window_max_timestamps: list[float | None] = Query(default=[]),
534 number_samples_max: int = None,
535) -> SignalsData | None:
536 if len(phase_sync_times) == 0:
537 phase_sync_times = [None for _ in range(len(phase_ids))]
538 if len(window_min_timestamps) == 0:
539 window_min_timestamps = [None for _ in range(len(phase_ids))]
540 if len(window_max_timestamps) == 0:
541 window_max_timestamps = [None for _ in range(len(phase_ids))]
543 if (
544 len(phase_ids) != len(phase_sync_times)
545 or len(phase_ids) != len(window_min_timestamps)
546 or len(phase_ids) != len(window_max_timestamps)
547 ):
548 raise HTTPException(
549 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps."
550 )
552 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
554 if None in phases:
555 raise HTTPException(404, "Phase not found")
557 signals_data = SignalsData.get_from_phase_and_signal_ids(
558 phases=phases,
559 phase_sync_times=phase_sync_times,
560 signal_ids=signal_ids,
561 window_min_timestamps=window_min_timestamps,
562 window_max_timestamps=window_max_timestamps,
563 )
565 if number_samples_max is not None:
566 signals_data = signals_data.min_max_downsampling(number_samples_max)
568 return signals_data
571@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)])
572async def apply_single_post_processing_function(
573 phase_id: str,
574 base_signal_id: str,
575 function: SINGLE_POST_PROCESSING_FUNCTION,
576 phase_sync_time: float = None,
577 window_min_timestamp: float = None,
578 window_max_timestamp: float = None,
579 number_samples_max: int = None,
580) -> SignalsData | None:
581 phase = Phase.get_from_id(phase_id)
583 if phase is None:
584 raise HTTPException(404, "Phase not found")
585 if phase_sync_time is None:
586 phase_sync_time = phase.start_at / 1000
588 signals_data = await SignalsData.apply_single_function(
589 phase,
590 base_signal_id,
591 function,
592 window_min_timestamp=window_min_timestamp,
593 window_max_timestamp=window_max_timestamp,
594 )
596 if signals_data is None:
597 raise HTTPException(500, "There was en error while applying the function")
599 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
600 signals_data = signals_data.min_max_downsampling(number_samples_max)
601 signals_data = signals_data.zero_time_vector(phase_sync_time)
603 return signals_data
606@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)])
607async def apply_multiple_post_processing_function(
608 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
609 phase_ids: list[str] = Query(default=[]),
610 phase_sync_times: list[float] = Query(default=[]),
611 signal_ids: list[str] = Query(default=[]),
612 window_min_timestamp: float = None,
613 window_max_timestamp: float = None,
614 number_samples_max: int = None,
615) -> SignalsData | None:
616 if len(phase_ids) != len(signal_ids):
617 raise HTTPException(400, "Each selected signal should correspond to a phase")
619 if len(phase_ids) < 2:
620 raise HTTPException(400, "These functions can only be applied to multiple signals")
622 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
623 if None in phases:
624 raise HTTPException(404, "Phase not found")
626 if len(phase_sync_times) == 0:
627 phase_sync_times = [phase.start_at / 1000 for phase in phases]
628 if len(phases) != len(phase_sync_times):
629 raise HTTPException(400, "Number of synchronization times does not match the number of phases")
631 signals_data = await SignalsData.apply_multiple_function(
632 phases,
633 signal_ids,
634 function,
635 window_min_timestamp=window_min_timestamp,
636 window_max_timestamp=window_max_timestamp,
637 )
639 if signals_data is None:
640 raise HTTPException(500, "There was en error while applying the function")
642 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
643 signals_data = signals_data.min_max_downsampling(number_samples_max)
644 if function in {"Align-X", "Using-X"}:
645 signals_data = signals_data.zero_time_vector(phase_sync_times[1])
646 else:
647 signals_data = signals_data.zero_time_vector(phase_sync_times[0])
649 return signals_data
652@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)])
653async def export_post_processing_zip(
654 file_format: str,
655 phase_ids: list[MongoId] = Query(default=[]),
656 phase_sync_times: list[float | None] = Query(default=[]),
657 signal_ids: list[str] = Query(default=[]),
658 window_min_timestamps: list[float | None] = Query(default=[]),
659 window_max_timestamps: list[float | None] = Query(default=[]),
660):
661 signals_data = await get_signals_data_post_processing(
662 phase_ids,
663 phase_sync_times,
664 signal_ids,
665 window_min_timestamps,
666 window_max_timestamps,
667 )
669 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids)
671 return Response(
672 content=zip_data,
673 media_type="application/zip",
674 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
675 )
678@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)])
679async def export_post_processing_hdf5(
680 phase_ids: list[str] = Query(default=[]),
681 phase_sync_times: list[float | None] = Query(default=[]),
682 signal_ids: list[str] = Query(default=[]),
683 window_min_timestamps: list[float | None] = Query(default=[]),
684 window_max_timestamps: list[float | None] = Query(default=[]),
685):
686 signals_data = await get_signals_data_post_processing(
687 phase_ids,
688 phase_sync_times,
689 signal_ids,
690 window_min_timestamps,
691 window_max_timestamps,
692 )
694 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids)
696 return Response(
697 content=data,
698 media_type="application/hdf5",
699 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
700 )
703@app.get("/events", dependencies=[Depends(get_current_active_user)])
704async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
705 return Event.response_from_query(query)
708@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
709async def get_event(event_id) -> Event:
710 event = Event.get_from_id(event_id)
711 if event is None:
712 raise HTTPException(status_code=404, detail="No such event")
713 return event
716@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
717async def get_number_events(
718 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
719) -> list[TwinPadActivity]:
720 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
723@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
724async def get_number_commands(
725 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
726) -> list[TwinPadActivity]:
727 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
730@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
731async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
732 return EventRule.response_from_query(query)
735@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
736async def get_event_rule(event_rule_id) -> EventRule:
737 event_rule = EventRule.get_from_id(event_rule_id)
738 if event_rule is None:
739 raise HTTPException(status_code=404, detail="No such event rule")
740 return event_rule
743@app.post("/users", status_code=201)
744async def create_user(user: User):
745 if User.get_one_by_attribute("email", user.email) is not None:
746 raise HTTPException(status_code=400, detail="An error occurred during account creation")
747 hashed_password = get_password_hash(user.password)
748 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
749 if new_user is None:
750 raise HTTPException(status_code=400, detail="An error occurred during account creation")
751 return new_user
754@app.post("/token", status_code=201)
755async def login_for_access_token(
756 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
757) -> Token:
758 user = authenticate_user(form_data.username, form_data.password)
759 if not user:
760 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
761 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
762 if user.is_active:
763 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
764 access_token = create_access_token(
765 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
766 )
767 return Token(access_token=access_token, token_type="bearer")
770@app.get("/users", dependencies=[Depends(get_current_active_user)])
771async def get_users():
772 return [u.to_dict() for u in User.get_all(sort_by="email")]
775@app.get("/users/me")
776async def get_current_user(
777 current_user: Annotated[User, Depends(get_current_active_user)],
778):
779 return current_user.to_dict()
782@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
783async def get_user(user_id: str):
784 user = User.get_from_id(user_id)
786 if user is None:
787 raise HTTPException(
788 status_code=404,
789 detail="User not found",
790 )
791 return user.to_dict(exclude={"password", "is_connected"})
794@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
795async def patch_user(user: UserUpdate, user_id):
796 if user.password == "" or user.password is None:
797 del user.password
798 else:
799 user.password = get_password_hash(user.password)
800 return User.update_info(user, user_id).to_dict()
803@app.get("/commands", dependencies=[Depends(get_current_active_user)])
804async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
805 return Command.response_from_query(query)
808@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
809async def get_campaigns():
810 return Campaign.get_all()
813@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
814async def get_campaign_by_id(campaign_id: str):
815 campaign = Campaign.get_from_id(campaign_id)
816 if campaign is None:
817 raise HTTPException(status_code=404, detail="Campaign not found")
818 return campaign
821@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
822async def add_campaign(campaign: Campaign):
823 campaign_id = campaign.insert()
824 if campaign_id is None:
825 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
826 return campaign
829@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
830async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
831 campaign = Campaign.get_from_id(campaign_id)
832 if campaign is None:
833 raise HTTPException(status_code=404, detail="Campaign not found")
834 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json"))
835 return campaign
838@app.delete(
839 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
840)
841async def delete_campaign(campaign_id: str):
842 campaign = Campaign.get_from_id(campaign_id)
843 if campaign is None:
844 raise HTTPException(status_code=404, detail="Campaign not found")
845 delete_phases = Phase.deleteMany(campaign_id)
846 if not delete_phases.acknowledged:
847 raise HTTPException(status_code=500, detail="An error occurred during phases deletion")
848 campaign_deleted = campaign.delete()
849 if not campaign_deleted:
850 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion")
851 return True
854@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
855async def get_campaign_phases(campaign_id: str):
856 return Phase.get_by_attribute("campaign_id", campaign_id)
859@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
860async def get_phase(phase_id: str):
861 phase = Phase.get_from_id(phase_id)
862 if phase is None:
863 raise HTTPException(status_code=404, detail="Phase not found")
864 return phase
867@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
868async def add_phase(phase: Phase):
869 phase_id = phase.insert()
870 if phase_id is None:
871 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
872 return phase
875@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
876async def edit_phase(phase_id, edit_phase: Phase):
877 phase = Phase.get_from_id(phase_id)
878 if phase is None:
879 raise HTTPException(status_code=404, detail="Phase does not exists")
880 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json"))
881 return phase
884@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
885async def delete_phase(phase_id: str):
886 phase = Phase.get_from_id(phase_id)
887 if phase is None:
888 raise HTTPException(status_code=404, detail="Phase not found")
889 deleted = phase.delete()
890 if not deleted:
891 raise HTTPException(status_code=500, detail="An error occurred during phase deletion")
892 return True
895@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
896async def get_custom_views():
897 return CustomView.get_all()
900@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
901async def get_custom_views_from_user_id(user_id: str):
902 return CustomView.get_by_attribute("user_id", user_id)
905@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
906async def get_custom_view(custom_view_id: str):
907 return CustomView.get_from_id(custom_view_id)
910@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
911async def create_custom_view(
912 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
913):
914 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
915 custom_view.insert()
916 return custom_view
919@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
920async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
921 custom_view = CustomView.get_from_id(custom_view_id)
922 return custom_view.update(custom_view_update.model_dump())
925@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
926async def delete_custom_view(custom_view_id: str):
927 custom_view = CustomView.get_from_id(custom_view_id)
928 return custom_view.delete()
931@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
932async def add_video(video: Video):
933 video.insert()
934 if not video:
935 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
936 return video
939@app.get("/videos", dependencies=[Depends(get_current_active_user)])
940async def get_videos():
941 return Video.get_all()
944@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
945def get_stream(video_id):
946 camera_name = Video.get_video(video_id)
947 if camera_name is None:
948 raise HTTPException(status_code=404, detail="Camera not found")
949 return camera_name
952@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
953async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
954 return SignalsPreset.get_by_attribute("user_id", current_user.id)
957@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
958async def create_signals_preset(
959 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
960):
961 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
962 return new_signals_preset
965@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
966async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
967 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
968 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
971@app.delete(
972 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
973)
974async def delete_signals_preset(signals_preset_id: str):
975 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
976 return signals_preset.delete()
979@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
980async def create_graph_theme(
981 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
982):
983 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
984 if styled_signal is None:
985 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
987 graph_theme = PrivateGraphTheme.create(
988 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
989 )
990 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
993@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
994async def get_all_graph_themes(
995 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
996) -> ListResponse[PublicGraphTheme]:
997 return PublicGraphTheme.response_from_query(query, current_user.id)
1000@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
1001async def get_graph_themes_in_library(
1002 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
1003) -> ListResponse[PublicGraphTheme]:
1004 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
1007@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
1008async def update_graph_theme(
1009 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
1010):
1011 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
1012 update_dict = theme_update.model_dump(exclude_unset=True)
1013 if current_user.id != graph_theme.creator_id:
1014 for theme_property in update_dict.keys():
1015 if theme_property not in ["active_for_user", "in_user_library"]:
1016 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
1017 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
1018 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
1021@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
1022async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
1023 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
1024 if current_user.id != graph_theme.creator_id:
1025 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
1026 return graph_theme.delete()
1029@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
1030async def get_signals_appearances(
1031 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
1032) -> dict:
1033 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)
1036if DEVICE_DEPLOYERS:
1037 app.include_router(deployers_router, prefix="/device-deployers")