Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / api.py: 96%
489 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-28 09:12 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-28 09:12 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 DeviceId,
19 MongoId,
20 Signal,
21 ForcedSignal,
22 SignalData,
23 SignalSample,
24 ServicesStatus,
25 Device,
26 DeviceUpdate,
27 DeviceSetup,
28 DeviceSetupUpdate,
29 DeviceState,
30 SignalUpdate,
31 SignalsData,
32 Event,
33 EventRule,
34 TwinPadActivity,
35 User,
36 UserUpdate,
37 Campaign,
38 Phase,
39 CustomView,
40 Command,
41 CustomViewCreation,
42 CustomViewUpdate,
43 Video,
44 SignalsPreset,
45 SignalsPresetCreation,
46 SignalsPresetUpdate,
47 PrivateGraphTheme,
48 PublicGraphTheme,
49 GraphThemeCreation,
50 GraphThemeUpdate,
51 SINGLE_POST_PROCESSING_FUNCTION,
52 DOUBLE_POST_PROCESSING_FUNCTION,
53 MULTIPLE_POST_PROCESSING_FUNCTION,
54)
55from twinpad_backend.auth import (
56 Token,
57 authenticate_user,
58 get_current_active_user,
59 ACCESS_TOKEN_EXPIRE_MINUTES,
60 create_access_token,
61 get_password_hash,
62)
63from twinpad_backend.queries import (
64 SignalQuery,
65 ForcedSignalQuery,
66 DeviceStatesQuery,
67 EventQuery,
68 EventRuleQuery,
69 CommandQuery,
70 GraphThemeQuery,
71)
72from twinpad_backend.responses import ListResponse
73from twinpad_backend.routes.deployers import router as deployers_router
75REQUEST_TIME_WARNING = 0.5
77DEBUG = os.environ.get("DEBUG", "false") == "true"
78PROFILING = os.environ.get("PROFILING", "false") == "true"
79DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true"
81logger = logging.getLogger("uvicorn.error")
82logger.propagate = False
83logger.info("Debug mode: %s", DEBUG)
84logger.info("log level: %s", logging.root.level)
87app = FastAPI(title="Twinpad backend", version=__version__)
89app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
91if PROFILING: # pragma: no cover
92 profiling_folder = mkdtemp()
93 logger.info("Profiling enabled")
95 @app.middleware("http")
96 async def profile_request(request: Request, call_next):
97 should_profile = True
98 url = str(request.url)
99 for segment in ("profiling", ".ico"):
100 if segment in url:
101 should_profile = False
102 break
104 if should_profile: # avoid recursion
105 profiler = Profiler()
106 profiler.start()
108 response = await call_next(request)
110 profiler.stop()
111 url = "_".join(url.split("/")[3:]).rstrip("/")
112 if not url:
113 url = "slash"
114 filename = url.split("?", maxsplit=1)[0]
115 logger.info("saving profiling to %s", filename)
116 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
117 profiling_file.write(profiler.output_html())
119 return response
121 return await call_next(request)
123 @app.get("/profilings")
124 async def profilings():
125 return {"profilings": os.listdir(profiling_folder)}
127 @app.get("/profilings/{file_name}")
128 async def profiling(file_name):
129 file_path = os.path.join(profiling_folder, file_name)
131 if not os.path.exists(file_path):
132 raise HTTPException(
133 status_code=404,
134 detail=f"Profiling file '{file_name}' not found",
135 )
137 with open(file_path, "r", encoding="utf-8") as profiling_file:
138 return Response(
139 content=profiling_file.read(),
140 media_type="application/html",
141 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
142 )
145@app.middleware("http")
146async def log_request_time(request: Request, call_next):
147 start_time = time.time() # Record the start time
148 response = await call_next(request) # Process the request
149 duration = time.time() - start_time # Calculate the time taken
150 client_ip = request.headers.get("x-forwarded-for", request.client.host)
151 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
152 if duration > REQUEST_TIME_WARNING:
153 logger.warning(message)
154 else:
155 logger.info(message)
156 return response
159@app.get("/")
160async def slash():
161 return {"twinpad_version": __version__}
164@app.get("/status", dependencies=[Depends(get_current_active_user)])
165async def status():
166 """
167 Return service healthcheck
168 """
169 return {
170 "services": ServicesStatus.check(),
171 }
174@app.get("/devices", dependencies=[Depends(get_current_active_user)])
175async def get_devices() -> list[Device]:
176 return Device.get_all(sort_by="device_id")
179@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
180async def get_device(device_id) -> Device:
181 device = Device.get_one_by_attribute("device_id", device_id)
182 if not device:
183 raise HTTPException(
184 status_code=404,
185 detail="Device not found",
186 )
187 return device
190@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
191async def update_item(
192 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
193):
194 device = Device.get_one_by_attribute("device_id", device_id)
195 if not device:
196 raise HTTPException(
197 status_code=404,
198 detail="Device not found",
199 )
200 result = await device.change_mode(device_update, current_user)
201 if result.get("error", False) is True:
202 raise HTTPException(
203 status_code=result.get("status_code", 500),
204 detail=result.get("message", "An error has occurred"),
205 )
206 return result
209@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
210async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
211 return DeviceState.get_from_id_and_query(device_id, query)
214@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
215async def get_device_setups() -> list[DeviceSetup]:
216 return DeviceSetup.get_all()
219@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
220async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
221 device_setup.insert()
222 return device_setup
225@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
226async def get_device_setup(device_setup_id: str):
227 device_setup = DeviceSetup.get_from_id(device_setup_id)
228 if device_setup is None:
229 raise HTTPException(
230 status_code=404,
231 detail="Device setup not found",
232 )
233 return device_setup
236@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
237async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
238 device_setup = DeviceSetup.get_from_id(device_setup_id)
239 if device_setup is None:
240 raise HTTPException(
241 status_code=404,
242 detail="Device setup not found",
243 )
244 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
245 return device_setup
248@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
249async def delete_device_setups(device_setup_id: str) -> bool:
250 device_setup = DeviceSetup.get_from_id(device_setup_id)
251 if device_setup is None:
252 raise HTTPException(
253 status_code=404,
254 detail="Device setup not found",
255 )
256 deleted = device_setup.delete()
257 return deleted
260@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
261async def get_number_samples(
262 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
263) -> list[TwinPadActivity]:
264 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
267@app.get("/signals", dependencies=[Depends(get_current_active_user)])
268async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
269 if "signal_id" not in query.sort_by:
270 query.sort_by += ",signal_id:1"
271 return Signal.response_from_query(query).to_dict(exclude={"device"})
274@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
275async def signals_names() -> list[str]:
276 return Signal.get_all_ids()
279@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
280async def signal_stats():
281 """
282 Returns signals stats
283 """
284 signal_statuses = Signal.get_all_statuses()
285 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
286 number_samples = Signal.total_number_samples()
287 number_signals = Signal.get_number_documents()
289 return {
290 "signal_data_size": signal_datasize(),
291 "number_signal_samples": number_samples,
292 "number_active_signals": number_active_signals,
293 "number_signals": number_signals,
294 }
297@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
298async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
299 return SignalSample.get_last_from_signal_ids(signal_ids)
302@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
303async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
304 return SignalSample.get_first_from_signal_ids(signal_ids)
307@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
308async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
309 return Signal.get_forcibility(signal_ids)
312@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)])
313async def get_forced_signals(
314 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends()
315):
316 if not current_user.is_admin:
317 raise HTTPException(401)
318 return ForcedSignal.response_from_query(query)
321@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
322async def get_signal(signal_id):
323 signal = Signal.get_from_signal_id(signal_id)
324 if not signal:
325 raise HTTPException(
326 status_code=404,
327 detail="Signal not found",
328 )
329 return signal.to_dict()
332@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
333async def update_signal(
334 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
335):
336 signal = Signal.get_from_signal_id(signal_id)
337 if not signal:
338 raise HTTPException(
339 status_code=404,
340 detail="Device not found",
341 )
342 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id)
343 if forced_signal is not None:
344 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
345 raise HTTPException(
346 status_code=403,
347 detail="Cannot override another user's forcing",
348 )
350 result = await signal.send_command(signal_update, current_user)
351 if result.get("error", False) is True:
352 raise HTTPException(
353 status_code=result.get("status_code", 500),
354 detail=result.get("message", "An error has occurred"),
355 )
357 if forced_signal is not None and signal_update.forced_value is None:
358 forced_signal.delete()
359 elif signal_update.forced_value is not None:
360 forced_signal = ForcedSignal(
361 signal_id=signal_id,
362 forcing_user_id=current_user.id,
363 forced_at=time.time(),
364 value=signal_update.forced_value,
365 )
366 forced_signal.insert()
368 return result
371@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)])
372async def get_signal_forcibility(
373 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)]
374) -> bool:
375 return ForcedSignal.can_force(signal_id, current_user)
378@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
379async def get_signal_data(
380 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
381) -> SignalData | None:
382 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
384 if number_samples_max is not None:
385 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
387 return signal_data
390@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
391async def get_last_value(signal_id) -> SignalSample:
392 sample = SignalSample.get_last_from_signal_id(signal_id)
393 if sample is None:
394 raise HTTPException(status_code=404, detail="No data")
395 return sample
398@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
399async def get_first_value(signal_id) -> SignalSample:
400 sample = SignalSample.get_first_from_signal_id(signal_id)
401 if sample is None:
402 raise HTTPException(status_code=404, detail="No data")
403 return sample
406@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
407async def get_signal_number_samples(signal_id):
408 signal = Signal.get_from_signal_id(signal_id)
409 if not signal:
410 raise HTTPException(
411 status_code=404,
412 detail="Device not found",
413 )
414 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
417@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
418async def get_signals_data(
419 signal_ids: list[str] = Query(default=[]),
420 number_samples_max: int = None,
421 min_timestamp: float = None,
422 max_timestamp: float = None,
423 interpolate_bounds: bool = True,
424) -> SignalsData | None:
425 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
426 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
428 signals_data = SignalsData.get_from_signal_ids(
429 signal_ids,
430 min_timestamp=min_timestamp,
431 max_timestamp=max_timestamp,
432 window_min_timestamp=min_timestamp,
433 window_max_timestamp=max_timestamp,
434 interpolate_bounds=interpolate_bounds,
435 )
436 if number_samples_max is not None:
437 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
439 return signals_data
442@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
443async def get_signals_data_interest_window(
444 window_max_number_samples: int | None = None,
445 outside_max_number_samples: int | None = None,
446 window_min_timestamp: float = None,
447 window_max_timestamp: float = None,
448 signal_ids: list[str] = Query(default=[]),
449 min_timestamp: float = None,
450 max_timestamp: float = None,
451) -> SignalsData | None:
452 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
453 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
455 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
456 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
458 max_documents = 0
460 if window_max_number_samples is not None:
461 max_documents += 10 * window_max_number_samples
462 if outside_max_number_samples is not None:
463 max_documents += 10 * outside_max_number_samples
465 if max_documents == 0:
466 max_documents = None
468 signals_data = SignalsData.get_from_signal_ids(
469 signal_ids,
470 min_timestamp=min_timestamp,
471 max_timestamp=max_timestamp,
472 window_min_timestamp=window_min_timestamp,
473 window_max_timestamp=window_max_timestamp,
474 max_documents=max_documents,
475 )
477 signals_data = signals_data.interest_window_desampling(
478 window_max_number_samples=window_max_number_samples,
479 outside_max_number_samples=outside_max_number_samples,
480 window_min_timestamp=window_min_timestamp,
481 window_max_timestamp=window_max_timestamp,
482 )
484 return signals_data
487@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
488async def export_signals_zip(
489 file_format: str,
490 signal_ids: list[str] = Query(default=[]),
491 min_timestamp: float = None,
492 max_timestamp: float = None,
493):
494 signals_data = SignalsData.get_from_signal_ids(
495 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
496 )
497 zip_data = signals_data.zip_export(file_format)
498 return Response(
499 content=zip_data,
500 media_type="application/zip",
501 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
502 )
505@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
506async def export_signals_hdf5(
507 signal_ids: list[str] = Query(default=[]),
508 min_timestamp: float = None,
509 max_timestamp: float = None,
510):
511 signals_data = SignalsData.get_from_signal_ids(
512 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
513 )
514 data = signals_data.hdf5_export()
515 return Response(
516 content=data,
517 media_type="application/hdf5",
518 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
519 )
522@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)])
523async def get_signals_data_post_processing(
524 phase_ids: list[str] = Query(default=[]),
525 phase_sync_times: list[float | None] = Query(default=[]),
526 signal_ids: list[str] = Query(default=[]),
527 window_min_timestamps: list[float | None] = Query(default=[]),
528 window_max_timestamps: list[float | None] = Query(default=[]),
529 number_samples_max: int = None,
530) -> SignalsData | None:
531 if len(phase_sync_times) == 0:
532 phase_sync_times = [None for _ in range(len(phase_ids))]
533 if len(window_min_timestamps) == 0:
534 window_min_timestamps = [None for _ in range(len(phase_ids))]
535 if len(window_max_timestamps) == 0:
536 window_max_timestamps = [None for _ in range(len(phase_ids))]
538 if (
539 len(phase_ids) != len(phase_sync_times)
540 or len(phase_ids) != len(window_min_timestamps)
541 or len(phase_ids) != len(window_max_timestamps)
542 ):
543 raise HTTPException(
544 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps."
545 )
547 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
549 if None in phases:
550 raise HTTPException(404, "Phase not found")
552 signals_data = SignalsData.get_from_phase_and_signal_ids(
553 phases=phases,
554 phase_sync_times=phase_sync_times,
555 signal_ids=signal_ids,
556 window_min_timestamps=window_min_timestamps,
557 window_max_timestamps=window_max_timestamps,
558 )
560 if number_samples_max is not None:
561 signals_data = signals_data.min_max_downsampling(number_samples_max)
563 return signals_data
566@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)])
567async def apply_single_post_processing_function(
568 phase_id: str,
569 base_signal_id: str,
570 function: SINGLE_POST_PROCESSING_FUNCTION,
571 phase_sync_time: float = None,
572 window_min_timestamp: float = None,
573 window_max_timestamp: float = None,
574 number_samples_max: int = None,
575) -> SignalsData | None:
576 phase = Phase.get_from_id(phase_id)
578 if phase is None:
579 raise HTTPException(404, "Phase not found")
580 if phase_sync_time is None:
581 phase_sync_time = phase.start_at / 1000
583 signals_data = await SignalsData.apply_single_function(
584 phase,
585 base_signal_id,
586 function,
587 window_min_timestamp=window_min_timestamp,
588 window_max_timestamp=window_max_timestamp,
589 )
591 if signals_data is None:
592 raise HTTPException(500, "There was en error while applying the function")
594 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
595 signals_data = signals_data.min_max_downsampling(number_samples_max)
596 signals_data = signals_data.zero_time_vector(phase_sync_time)
598 return signals_data
601@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)])
602async def apply_multiple_post_processing_function(
603 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
604 phase_ids: list[str] = Query(default=[]),
605 phase_sync_times: list[float] = Query(default=[]),
606 signal_ids: list[str] = Query(default=[]),
607 window_min_timestamp: float = None,
608 window_max_timestamp: float = None,
609 number_samples_max: int = None,
610) -> SignalsData | None:
611 if len(phase_ids) != len(signal_ids):
612 raise HTTPException(400, "Each selected signal should correspond to a phase")
614 if len(phase_ids) < 2:
615 raise HTTPException(400, "These functions can only be applied to multiple signals")
617 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
618 if None in phases:
619 raise HTTPException(404, "Phase not found")
621 if len(phase_sync_times) == 0:
622 phase_sync_times = [phase.start_at / 1000 for phase in phases]
623 if len(phases) != len(phase_sync_times):
624 raise HTTPException(400, "Number of synchronization times does not match the number of phases")
626 signals_data = await SignalsData.apply_multiple_function(
627 phases,
628 signal_ids,
629 function,
630 window_min_timestamp=window_min_timestamp,
631 window_max_timestamp=window_max_timestamp,
632 )
634 if signals_data is None:
635 raise HTTPException(500, "There was en error while applying the function")
637 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
638 signals_data = signals_data.min_max_downsampling(number_samples_max)
639 if function in {"Align-X", "Using-X"}:
640 signals_data = signals_data.zero_time_vector(phase_sync_times[1])
641 else:
642 signals_data = signals_data.zero_time_vector(phase_sync_times[0])
644 return signals_data
647@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)])
648async def export_post_processing_zip(
649 file_format: str,
650 phase_ids: list[MongoId] = Query(default=[]),
651 phase_sync_times: list[float | None] = Query(default=[]),
652 signal_ids: list[str] = Query(default=[]),
653 window_min_timestamps: list[float | None] = Query(default=[]),
654 window_max_timestamps: list[float | None] = Query(default=[]),
655):
656 signals_data = await get_signals_data_post_processing(
657 phase_ids,
658 phase_sync_times,
659 signal_ids,
660 window_min_timestamps,
661 window_max_timestamps,
662 )
664 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids)
666 return Response(
667 content=zip_data,
668 media_type="application/zip",
669 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
670 )
673@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)])
674async def export_post_processing_hdf5(
675 phase_ids: list[str] = Query(default=[]),
676 phase_sync_times: list[float | None] = Query(default=[]),
677 signal_ids: list[str] = Query(default=[]),
678 window_min_timestamps: list[float | None] = Query(default=[]),
679 window_max_timestamps: list[float | None] = Query(default=[]),
680):
681 signals_data = await get_signals_data_post_processing(
682 phase_ids,
683 phase_sync_times,
684 signal_ids,
685 window_min_timestamps,
686 window_max_timestamps,
687 )
689 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids)
691 return Response(
692 content=data,
693 media_type="application/hdf5",
694 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
695 )
698@app.get("/events", dependencies=[Depends(get_current_active_user)])
699async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
700 return Event.response_from_query(query)
703@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
704async def get_event(event_id) -> Event:
705 event = Event.get_from_id(event_id)
706 if event is None:
707 raise HTTPException(status_code=404, detail="No such event")
708 return event
711@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
712async def get_number_events(
713 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
714) -> list[TwinPadActivity]:
715 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
718@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
719async def get_number_commands(
720 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
721) -> list[TwinPadActivity]:
722 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
725@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
726async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
727 return EventRule.response_from_query(query)
730@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
731async def get_event_rule(event_rule_id) -> EventRule:
732 event_rule = EventRule.get_from_id(event_rule_id)
733 if event_rule is None:
734 raise HTTPException(status_code=404, detail="No such event rule")
735 return event_rule
738@app.post("/users", status_code=201)
739async def create_user(user: User):
740 if User.get_one_by_attribute("email", user.email) is not None:
741 raise HTTPException(status_code=400, detail="An error occurred during account creation")
742 hashed_password = get_password_hash(user.password)
743 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
744 if new_user is None:
745 raise HTTPException(status_code=400, detail="An error occurred during account creation")
746 return new_user
749@app.post("/token", status_code=201)
750async def login_for_access_token(
751 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
752) -> Token:
753 user = authenticate_user(form_data.username, form_data.password)
754 if not user:
755 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
756 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
757 if user.is_active:
758 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
759 access_token = create_access_token(
760 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
761 )
762 return Token(access_token=access_token, token_type="bearer")
765@app.get("/users", dependencies=[Depends(get_current_active_user)])
766async def get_users():
767 return [u.to_dict() for u in User.get_all(sort_by="email")]
770@app.get("/users/me")
771async def get_current_user(
772 current_user: Annotated[User, Depends(get_current_active_user)],
773):
774 return current_user.to_dict()
777@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
778async def get_user(user_id: str):
779 user = User.get_from_id(user_id)
781 if user is None:
782 raise HTTPException(
783 status_code=404,
784 detail="User not found",
785 )
786 return user.to_dict(exclude={"password", "is_connected"})
789@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
790async def patch_user(user: UserUpdate, user_id):
791 if user.password == "" or user.password is None:
792 del user.password
793 else:
794 user.password = get_password_hash(user.password)
795 return User.update_info(user, user_id).to_dict()
798@app.get("/commands", dependencies=[Depends(get_current_active_user)])
799async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
800 return Command.response_from_query(query)
803@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
804async def get_campaigns():
805 return Campaign.get_all()
808@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
809async def get_campaign_by_id(campaign_id: str):
810 campaign = Campaign.get_from_id(campaign_id)
811 if campaign is None:
812 raise HTTPException(status_code=404, detail="Campaign not found")
813 return campaign
816@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
817async def add_campaign(campaign: Campaign):
818 campaign_id = campaign.insert()
819 if campaign_id is None:
820 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
821 return campaign
824@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
825async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
826 campaign = Campaign.get_from_id(campaign_id)
827 if campaign is None:
828 raise HTTPException(status_code=404, detail="Campaign not found")
829 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json"))
830 return campaign
833@app.delete(
834 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
835)
836async def delete_campaign(campaign_id: str):
837 campaign = Campaign.get_from_id(campaign_id)
838 if campaign is None:
839 raise HTTPException(status_code=404, detail="Campaign not found")
840 delete_phases = Phase.deleteMany(campaign_id)
841 if not delete_phases.acknowledged:
842 raise HTTPException(status_code=500, detail="An error occurred during phases deletion")
843 campaign_deleted = campaign.delete()
844 if not campaign_deleted:
845 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion")
846 return True
849@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
850async def get_campaign_phases(campaign_id: str):
851 return Phase.get_by_attribute("campaign_id", campaign_id)
854@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
855async def get_phase(phase_id: str):
856 phase = Phase.get_from_id(phase_id)
857 if phase is None:
858 raise HTTPException(status_code=404, detail="Phase not found")
859 return phase
862@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
863async def add_phase(phase: Phase):
864 phase_id = phase.insert()
865 if phase_id is None:
866 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
867 return phase
870@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
871async def edit_phase(phase_id, edit_phase: Phase):
872 phase = Phase.get_from_id(phase_id)
873 if phase is None:
874 raise HTTPException(status_code=404, detail="Phase does not exists")
875 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json"))
876 return phase
879@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
880async def delete_phase(phase_id: str):
881 phase = Phase.get_from_id(phase_id)
882 if phase is None:
883 raise HTTPException(status_code=404, detail="Phase not found")
884 deleted = phase.delete()
885 if not deleted:
886 raise HTTPException(status_code=500, detail="An error occurred during phase deletion")
887 return True
890@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
891async def get_custom_views():
892 return CustomView.get_all()
895@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
896async def get_custom_views_from_user_id(user_id: str):
897 return CustomView.get_by_attribute("user_id", user_id)
900@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
901async def get_custom_view(custom_view_id: str):
902 return CustomView.get_from_id(custom_view_id)
905@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
906async def create_custom_view(
907 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
908):
909 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
910 custom_view.insert()
911 return custom_view
914@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
915async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
916 custom_view = CustomView.get_from_id(custom_view_id)
917 return custom_view.update(custom_view_update.model_dump())
920@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
921async def delete_custom_view(custom_view_id: str):
922 custom_view = CustomView.get_from_id(custom_view_id)
923 return custom_view.delete()
926@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
927async def add_video(video: Video):
928 video.insert()
929 if not video:
930 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
931 return video
934@app.get("/videos", dependencies=[Depends(get_current_active_user)])
935async def get_videos():
936 return Video.get_all()
939@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
940def get_stream(video_id):
941 camera_name = Video.get_video(video_id)
942 if camera_name is None:
943 raise HTTPException(status_code=404, detail="Camera not found")
944 return camera_name
947@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
948async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
949 return SignalsPreset.get_by_attribute("user_id", current_user.id)
952@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
953async def create_signals_preset(
954 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
955):
956 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
957 return new_signals_preset
960@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
961async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
962 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
963 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
966@app.delete(
967 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
968)
969async def delete_signals_preset(signals_preset_id: str):
970 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
971 return signals_preset.delete()
974@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
975async def create_graph_theme(
976 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
977):
978 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
979 if styled_signal is None:
980 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
982 graph_theme = PrivateGraphTheme.create(
983 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
984 )
985 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
988@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
989async def get_all_graph_themes(
990 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
991) -> ListResponse[PublicGraphTheme]:
992 return PublicGraphTheme.response_from_query(query, current_user.id)
995@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
996async def get_graph_themes_in_library(
997 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
998) -> ListResponse[PublicGraphTheme]:
999 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
1002@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
1003async def update_graph_theme(
1004 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
1005):
1006 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
1007 update_dict = theme_update.model_dump(exclude_unset=True)
1008 if current_user.id != graph_theme.creator_id:
1009 for theme_property in update_dict.keys():
1010 if theme_property not in ["active_for_user", "in_user_library"]:
1011 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
1012 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
1013 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
1016@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
1017async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
1018 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
1019 if current_user.id != graph_theme.creator_id:
1020 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
1021 return graph_theme.delete()
1024@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
1025async def get_signals_appearances(
1026 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
1027) -> dict:
1028 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)
1031if DEVICE_DEPLOYERS:
1032 app.include_router(deployers_router, prefix="/device-deployers")