Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / api.py: 100%
472 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 16:13 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 16:13 +0000
1import os
2import logging
3import time
4from typing import Annotated
5from datetime import timedelta
6from pyinstrument import Profiler
8from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
9from fastapi.middleware.cors import CORSMiddleware
10from fastapi.security import OAuth2PasswordRequestForm
12from twinpad_backend import __version__
13from twinpad_backend.db import signal_datasize
14from twinpad_backend.models import (
15 DeviceId,
16 MongoId,
17 Signal,
18 ForcedSignal,
19 SignalData,
20 SignalSample,
21 ServicesStatus,
22 Device,
23 DeviceUpdate,
24 DeviceSetup,
25 DeviceSetupUpdate,
26 DeviceState,
27 SignalUpdate,
28 SignalsData,
29 Event,
30 EventRule,
31 TwinPadActivity,
32 User,
33 UserUpdate,
34 Campaign,
35 Phase,
36 CustomView,
37 Command,
38 CustomViewCreation,
39 CustomViewUpdate,
40 Video,
41 SignalsPreset,
42 SignalsPresetCreation,
43 SignalsPresetUpdate,
44 PrivateGraphTheme,
45 PublicGraphTheme,
46 GraphThemeCreation,
47 GraphThemeUpdate,
48 SINGLE_POST_PROCESSING_FUNCTION,
49 DOUBLE_POST_PROCESSING_FUNCTION,
50 MULTIPLE_POST_PROCESSING_FUNCTION,
51)
52from twinpad_backend.auth import (
53 Token,
54 authenticate_user,
55 get_current_active_user,
56 ACCESS_TOKEN_EXPIRE_MINUTES,
57 create_access_token,
58 get_password_hash,
59)
60from twinpad_backend.queries import (
61 SignalQuery,
62 ForcedSignalQuery,
63 DeviceStatesQuery,
64 EventQuery,
65 EventRuleQuery,
66 CommandQuery,
67 GraphThemeQuery,
68)
69from twinpad_backend.responses import ListResponse
70from twinpad_backend.routes.deployers import router as deployers_router
72REQUEST_TIME_WARNING = 0.5
74DEBUG = os.environ.get("DEBUG", "false") == "true"
75PROFILING = os.environ.get("PROFILING", "false") == "true"
76DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true"
78logger = logging.getLogger("uvicorn.error")
79logger.propagate = False
80logger.info("Debug mode: %s", DEBUG)
81logger.info("log level: %s", logging.root.level)
84app = FastAPI(title="Twinpad backend", version=__version__)
86app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
88if PROFILING: # pragma: no cover
89 profiling_folder = "/tmp/twinpad_profiling"
90 Path(profiling_folder).mkdir(parents=True, exist_ok=True)
91 logger.info("Profiling enabled")
93 @app.middleware("http")
94 async def profile_request(request: Request, call_next):
95 should_profile = True
96 url = str(request.url)
97 for segment in ("profiling", ".ico"):
98 if segment in url:
99 should_profile = False
100 break
102 if should_profile: # avoid recursion
103 profiler = Profiler()
104 profiler.start()
106 response = await call_next(request)
108 profiler.stop()
109 url = "_".join(url.split("/")[3:]).rstrip("/")
110 if not url:
111 url = "slash"
112 filename = url.split("?", maxsplit=1)[0]
113 logger.info("saving profiling to %s", filename)
114 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
115 profiling_file.write(profiler.output_html())
117 return response
119 return await call_next(request)
121 @app.get("/profilings")
122 async def profilings():
123 return {"profilings": os.listdir(profiling_folder)}
125 @app.get("/profilings/{file_name}")
126 async def profiling(file_name):
127 file_path = os.path.join(profiling_folder, file_name)
129 if not os.path.exists(file_path):
130 raise HTTPException(
131 status_code=404,
132 detail=f"Profiling file '{file_name}' not found",
133 )
135 with open(file_path, "r", encoding="utf-8") as profiling_file:
136 return Response(
137 content=profiling_file.read(),
138 media_type="application/html",
139 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
140 )
143@app.middleware("http")
144async def log_request_time(request: Request, call_next):
145 start_time = time.time() # Record the start time
146 response = await call_next(request) # Process the request
147 duration = time.time() - start_time # Calculate the time taken
148 client_ip = request.headers.get("x-forwarded-for", request.client.host)
149 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
150 if duration > REQUEST_TIME_WARNING:
151 logger.warning(message)
152 else:
153 logger.info(message)
154 return response
157@app.get("/")
158async def slash():
159 return {"twinpad_version": __version__}
162@app.get("/status", dependencies=[Depends(get_current_active_user)])
163async def status():
164 """
165 Return service healthcheck
166 """
167 return {
168 "services": ServicesStatus.check(),
169 }
172@app.get("/devices", dependencies=[Depends(get_current_active_user)])
173async def get_devices() -> list[Device]:
174 return Device.get_all(sort_by="device_id")
177@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
178async def get_device(device_id) -> Device:
179 device = Device.get_one_by_attribute("device_id", device_id)
180 if not device:
181 raise HTTPException(
182 status_code=404,
183 detail="Device not found",
184 )
185 return device
188@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
189async def update_item(
190 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
191):
192 device = Device.get_one_by_attribute("device_id", device_id)
193 if not device:
194 raise HTTPException(
195 status_code=404,
196 detail="Device not found",
197 )
198 result = await device.change_mode(device_update, current_user)
199 if result.get("error", False) is True:
200 raise HTTPException(
201 status_code=result.get("status_code", 500),
202 detail=result.get("message", "An error has occurred"),
203 )
204 return result
207@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
208async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
209 return DeviceState.get_from_id_and_query(device_id, query)
212@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
213async def get_device_setups() -> list[DeviceSetup]:
214 return DeviceSetup.get_all()
217@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
218async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
219 device_setup.insert()
220 return device_setup
223@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
224async def get_device_setup(device_setup_id: str):
225 device_setup = DeviceSetup.get_from_id(device_setup_id)
226 if device_setup is None:
227 raise HTTPException(
228 status_code=404,
229 detail="Device setup not found",
230 )
231 return device_setup
234@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
235async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
236 device_setup = DeviceSetup.get_from_id(device_setup_id)
237 if device_setup is None:
238 raise HTTPException(
239 status_code=404,
240 detail="Device setup not found",
241 )
242 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
243 return device_setup
246@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
247async def delete_device_setups(device_setup_id: str) -> bool:
248 device_setup = DeviceSetup.get_from_id(device_setup_id)
249 if device_setup is None:
250 raise HTTPException(
251 status_code=404,
252 detail="Device setup not found",
253 )
254 deleted = device_setup.delete()
255 return deleted
258@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
259async def get_number_samples(
260 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
261) -> list[TwinPadActivity]:
262 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
265@app.get("/signals", dependencies=[Depends(get_current_active_user)])
266async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
267 if "signal_id" not in query.sort_by:
268 query.sort_by += ",signal_id:1"
269 return Signal.response_from_query(query).to_dict(exclude={"device"})
272@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
273async def signals_names() -> list[str]:
274 return Signal.get_all_ids()
277@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
278async def signal_stats():
279 """
280 Returns signals stats
281 """
282 signal_statuses = Signal.get_all_statuses()
283 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
284 number_samples = Signal.total_number_samples()
285 number_signals = Signal.get_number_documents()
287 return {
288 "signal_data_size": signal_datasize(),
289 "number_signal_samples": number_samples,
290 "number_active_signals": number_active_signals,
291 "number_signals": number_signals,
292 }
295@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
296async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
297 return SignalSample.get_last_from_signal_ids(signal_ids)
300@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
301async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
302 return SignalSample.get_first_from_signal_ids(signal_ids)
305@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
306async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
307 return Signal.get_forcibility(signal_ids)
310@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)])
311async def get_forced_signals(
312 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends()
313):
314 if not current_user.is_admin:
315 raise HTTPException(401)
316 return ForcedSignal.response_from_query(query)
319@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
320async def get_signal(signal_id):
321 signal = Signal.get_from_signal_id(signal_id)
322 if not signal:
323 raise HTTPException(
324 status_code=404,
325 detail="Signal not found",
326 )
327 return signal.to_dict()
330@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
331async def update_signal(
332 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
333):
334 signal = Signal.get_from_signal_id(signal_id)
335 if not signal:
336 raise HTTPException(
337 status_code=404,
338 detail="Device not found",
339 )
340 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id)
341 if forced_signal is not None:
342 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
343 raise HTTPException(
344 status_code=403,
345 detail="Cannot override another user's forcing",
346 )
348 result = await signal.send_command(signal_update, current_user)
349 if result.get("error", False) is True:
350 raise HTTPException(
351 status_code=result.get("status_code", 500),
352 detail=result.get("message", "An error has occurred"),
353 )
355 if forced_signal is not None and signal_update.forced_value is None:
356 forced_signal.delete()
357 elif signal_update.forced_value is not None:
358 forced_signal = ForcedSignal(
359 signal_id=signal_id,
360 forcing_user_id=current_user.id,
361 forced_at=time.time(),
362 value=signal_update.forced_value,
363 )
364 forced_signal.insert()
366 return result
369@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)])
370async def get_signal_forcibility(
371 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)]
372) -> bool:
373 return ForcedSignal.can_force(signal_id, current_user)
376@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
377async def get_signal_data(
378 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
379) -> SignalData | None:
380 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
382 if number_samples_max is not None:
383 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
385 return signal_data
388@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
389async def get_last_value(signal_id) -> SignalSample:
390 sample = SignalSample.get_last_from_signal_id(signal_id)
391 if sample is None:
392 raise HTTPException(status_code=404, detail="No data")
393 return sample
396@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
397async def get_first_value(signal_id) -> SignalSample:
398 sample = SignalSample.get_first_from_signal_id(signal_id)
399 if sample is None:
400 raise HTTPException(status_code=404, detail="No data")
401 return sample
404@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
405async def get_signal_number_samples(signal_id):
406 signal = Signal.get_from_signal_id(signal_id)
407 if not signal:
408 raise HTTPException(
409 status_code=404,
410 detail="Signal not found",
411 )
412 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
415@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
416async def get_signals_data(
417 signal_ids: list[str] = Query(default=[]),
418 number_samples_max: int = None,
419 min_timestamp: float = None,
420 max_timestamp: float = None,
421 interpolate_bounds: bool = True,
422) -> SignalsData | None:
423 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
424 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
426 signals_data = SignalsData.get_from_signal_ids(
427 signal_ids,
428 min_timestamp=min_timestamp,
429 max_timestamp=max_timestamp,
430 window_min_timestamp=min_timestamp,
431 window_max_timestamp=max_timestamp,
432 interpolate_bounds=interpolate_bounds,
433 )
434 if number_samples_max is not None:
435 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
437 return signals_data
440@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
441async def get_signals_data_interest_window(
442 signal_ids: list[str] = Query(default=[]),
443 window_max_number_samples: int = None,
444 outside_max_number_samples: int = None,
445 window_min_timestamp: float = None,
446 window_max_timestamp: float = None,
447 min_timestamp: float = None,
448 max_timestamp: float = None,
449) -> SignalsData | None:
450 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
451 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
453 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
454 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
456 max_documents = 0
458 if window_max_number_samples is not None:
459 max_documents += 10 * window_max_number_samples
460 if outside_max_number_samples is not None:
461 max_documents += 10 * outside_max_number_samples
463 if max_documents == 0:
464 max_documents = None
466 signals_data = SignalsData.get_from_signal_ids(
467 signal_ids,
468 min_timestamp=min_timestamp,
469 max_timestamp=max_timestamp,
470 window_min_timestamp=window_min_timestamp,
471 window_max_timestamp=window_max_timestamp,
472 max_documents=max_documents,
473 )
475 signals_data = signals_data.interest_window_desampling(
476 window_max_number_samples=window_max_number_samples,
477 outside_max_number_samples=outside_max_number_samples,
478 window_min_timestamp=window_min_timestamp,
479 window_max_timestamp=window_max_timestamp,
480 )
482 return signals_data
485@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
486async def export_signals_zip(
487 file_format: str,
488 signal_ids: list[str] = Query(default=[]),
489 min_timestamp: float = None,
490 max_timestamp: float = None,
491):
492 signals_data = SignalsData.get_from_signal_ids(
493 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
494 )
495 zip_data = signals_data.zip_export(file_format)
496 return Response(
497 content=zip_data,
498 media_type="application/zip",
499 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
500 )
503@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
504async def export_signals_hdf5(
505 signal_ids: list[str] = Query(default=[]),
506 min_timestamp: float = None,
507 max_timestamp: float = None,
508):
509 signals_data = SignalsData.get_from_signal_ids(
510 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
511 )
512 data = signals_data.hdf5_export()
513 return Response(
514 content=data,
515 media_type="application/hdf5",
516 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
517 )
520@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)])
521async def get_signals_data_post_processing(
522 phase_ids: list[str] = Query(default=[]),
523 phase_sync_times: list[float | None] = Query(default=[]),
524 signal_ids: list[str] = Query(default=[]),
525 window_min_timestamps: list[float | None] = Query(default=[]),
526 window_max_timestamps: list[float | None] = Query(default=[]),
527 number_samples_max: int = None,
528) -> SignalsData | None:
529 if len(phase_sync_times) == 0:
530 phase_sync_times = [None for _ in range(len(phase_ids))]
531 if len(window_min_timestamps) == 0:
532 window_min_timestamps = [None for _ in range(len(phase_ids))]
533 if len(window_max_timestamps) == 0:
534 window_max_timestamps = [None for _ in range(len(phase_ids))]
536 if (
537 len(phase_ids) != len(phase_sync_times)
538 or len(phase_ids) != len(window_min_timestamps)
539 or len(phase_ids) != len(window_max_timestamps)
540 ):
541 raise HTTPException(
542 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps."
543 )
545 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
547 if None in phases:
548 raise HTTPException(404, "Phase not found")
550 signals_data = SignalsData.get_from_phase_and_signal_ids(
551 phases=phases,
552 phase_sync_times=phase_sync_times,
553 signal_ids=signal_ids,
554 window_min_timestamps=window_min_timestamps,
555 window_max_timestamps=window_max_timestamps,
556 )
558 if number_samples_max is not None:
559 signals_data = signals_data.min_max_downsampling(number_samples_max)
561 return signals_data
564@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)])
565async def apply_single_post_processing_function(
566 phase_id: str,
567 base_signal_id: str,
568 function: SINGLE_POST_PROCESSING_FUNCTION,
569 phase_sync_time: float = None,
570 window_min_timestamp: float = None,
571 window_max_timestamp: float = None,
572 number_samples_max: int = None,
573) -> SignalsData | None:
574 phase = Phase.get_from_id(phase_id)
576 if phase is None:
577 raise HTTPException(404, "Phase not found")
578 if phase_sync_time is None:
579 phase_sync_time = phase.start_at / 1000
581 signals_data = await SignalsData.apply_single_function(
582 phase,
583 base_signal_id,
584 function,
585 window_min_timestamp=window_min_timestamp,
586 window_max_timestamp=window_max_timestamp,
587 )
589 if signals_data is None:
590 raise HTTPException(500, "There was en error while applying the function")
592 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
593 signals_data = signals_data.min_max_downsampling(number_samples_max)
594 signals_data = signals_data.zero_time_vector(phase_sync_time)
596 return signals_data
599@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)])
600async def apply_multiple_post_processing_function(
601 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
602 phase_ids: list[str] = Query(default=[]),
603 phase_sync_times: list[float] = Query(default=[]),
604 signal_ids: list[str] = Query(default=[]),
605 window_min_timestamp: float = None,
606 window_max_timestamp: float = None,
607 number_samples_max: int = None,
608) -> SignalsData | None:
609 if len(phase_ids) != len(signal_ids):
610 raise HTTPException(400, "Each selected signal should correspond to a phase")
612 if len(signal_ids) < 2:
613 raise HTTPException(400, "These functions can only be applied to multiple signals")
615 if len(phase_ids) != len(phase_sync_times) and len(phase_sync_times) != 0:
616 raise HTTPException(400, "Number of synchronization times does not match the number of phases")
618 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
619 if None in phases:
620 raise HTTPException(404, "Phase not found")
622 if len(phase_sync_times) == 0:
623 phase_sync_times = [phase.start_at / 1000 for phase in phases]
625 signals_data = await SignalsData.apply_multiple_function(
626 phases,
627 signal_ids,
628 function,
629 window_min_timestamp=window_min_timestamp,
630 window_max_timestamp=window_max_timestamp,
631 )
633 if signals_data is None:
634 raise HTTPException(500, "There was en error while applying the function")
636 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
637 signals_data = signals_data.min_max_downsampling(number_samples_max)
638 if function in {"Align-X", "Using-X"}:
639 signals_data = signals_data.zero_time_vector(phase_sync_times[1])
640 else:
641 signals_data = signals_data.zero_time_vector(phase_sync_times[0])
643 return signals_data
646@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)])
647async def export_post_processing_zip(
648 file_format: str,
649 phase_ids: list[MongoId] = Query(default=[]),
650 phase_sync_times: list[float | None] = Query(default=[]),
651 signal_ids: list[str] = Query(default=[]),
652 window_min_timestamps: list[float | None] = Query(default=[]),
653 window_max_timestamps: list[float | None] = Query(default=[]),
654):
655 signals_data = await get_signals_data_post_processing(
656 phase_ids,
657 phase_sync_times,
658 signal_ids,
659 window_min_timestamps,
660 window_max_timestamps,
661 )
663 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids)
665 return Response(
666 content=zip_data,
667 media_type="application/zip",
668 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
669 )
672@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)])
673async def export_post_processing_hdf5(
674 phase_ids: list[str] = Query(default=[]),
675 phase_sync_times: list[float | None] = Query(default=[]),
676 signal_ids: list[str] = Query(default=[]),
677 window_min_timestamps: list[float | None] = Query(default=[]),
678 window_max_timestamps: list[float | None] = Query(default=[]),
679):
680 signals_data = await get_signals_data_post_processing(
681 phase_ids,
682 phase_sync_times,
683 signal_ids,
684 window_min_timestamps,
685 window_max_timestamps,
686 )
688 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids)
690 return Response(
691 content=data,
692 media_type="application/hdf5",
693 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
694 )
697@app.get("/events", dependencies=[Depends(get_current_active_user)])
698async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
699 return Event.response_from_query(query)
702@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
703async def get_event(event_id) -> Event:
704 event = Event.get_from_id(event_id)
705 if event is None:
706 raise HTTPException(status_code=404, detail="No such event")
707 return event
710@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
711async def get_number_events(
712 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
713) -> list[TwinPadActivity]:
714 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
717@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
718async def get_number_commands(
719 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
720) -> list[TwinPadActivity]:
721 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
724@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
725async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
726 return EventRule.response_from_query(query)
729@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
730async def get_event_rule(event_rule_id) -> EventRule:
731 event_rule = EventRule.get_from_id(event_rule_id)
732 if event_rule is None:
733 raise HTTPException(status_code=404, detail="No such event rule")
734 return event_rule
737@app.post("/users", status_code=201)
738async def create_user(user: User):
739 if User.get_one_by_attribute("email", user.email) is not None:
740 raise HTTPException(status_code=400, detail="An error occurred during account creation")
741 hashed_password = get_password_hash(user.password)
742 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
743 if new_user is None: # pragma: no cover
744 raise HTTPException(status_code=400, detail="An error occurred during account creation")
745 return new_user
748@app.post("/token", status_code=201)
749async def login_for_access_token(
750 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
751) -> Token:
752 user = authenticate_user(form_data.username, form_data.password)
753 if not user:
754 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
755 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
756 if user.is_active:
757 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
758 access_token = create_access_token(
759 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
760 )
761 return Token(access_token=access_token, token_type="bearer")
764@app.get("/users", dependencies=[Depends(get_current_active_user)])
765async def get_users():
766 return [u.to_dict() for u in User.get_all(sort_by="email")]
769@app.get("/users/me")
770async def get_current_user(
771 current_user: Annotated[User, Depends(get_current_active_user)],
772):
773 return current_user.to_dict()
776@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
777async def get_user(user_id: str):
778 user = User.get_from_id(user_id)
780 if user is None:
781 raise HTTPException(
782 status_code=404,
783 detail="User not found",
784 )
785 return user.to_dict(exclude={"password", "is_connected"})
788@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
789async def patch_user(user: UserUpdate, user_id):
790 if user.password == "" or user.password is None:
791 del user.password
792 else:
793 user.password = get_password_hash(user.password)
794 return User.update_info(user, user_id).to_dict()
797@app.get("/commands", dependencies=[Depends(get_current_active_user)])
798async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
799 return Command.response_from_query(query)
802@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
803async def get_campaigns():
804 return Campaign.get_all()
807@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
808async def get_campaign_by_id(campaign_id: str):
809 campaign = Campaign.get_from_id(campaign_id)
810 if campaign is None:
811 raise HTTPException(status_code=404, detail="Campaign not found")
812 return campaign
815@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
816async def add_campaign(campaign: Campaign):
817 campaign_id = campaign.insert()
818 if campaign_id is None: # pragma: no cover
819 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
820 return campaign
823@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
824async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
825 campaign = Campaign.get_from_id(campaign_id)
826 if campaign is None:
827 raise HTTPException(status_code=404, detail="Campaign not found")
828 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json"))
829 return campaign
832@app.delete(
833 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
834)
835async def delete_campaign(campaign_id: str):
836 campaign = Campaign.get_from_id(campaign_id)
837 if campaign is None:
838 raise HTTPException(status_code=404, detail="Campaign not found")
839 delete_phases = Phase.deleteMany(campaign_id)
840 if not delete_phases.acknowledged: # pragma: no cover
841 raise HTTPException(status_code=500, detail="An error occurred during phases deletion")
842 campaign_deleted = campaign.delete()
843 if not campaign_deleted: # pragma: no cover
844 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion")
845 return True
848@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
849async def get_campaign_phases(campaign_id: str):
850 return Phase.get_by_attribute("campaign_id", campaign_id)
853@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
854async def get_phase(phase_id: str):
855 phase = Phase.get_from_id(phase_id)
856 if phase is None:
857 raise HTTPException(status_code=404, detail="Phase not found")
858 return phase
861@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
862async def add_phase(phase: Phase):
863 phase_id = phase.insert()
864 if phase_id is None: # pragma: no cover
865 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
866 return phase
869@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
870async def edit_phase(phase_id, edit_phase: Phase):
871 phase = Phase.get_from_id(phase_id)
872 if phase is None:
873 raise HTTPException(status_code=404, detail="Phase does not exists")
874 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json"))
875 return phase
878@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
879async def delete_phase(phase_id: str):
880 phase = Phase.get_from_id(phase_id)
881 if phase is None:
882 raise HTTPException(status_code=404, detail="Phase not found")
883 deleted = phase.delete()
884 if not deleted: # pragma: no cover
885 raise HTTPException(status_code=500, detail="An error occurred during phase deletion")
886 return True
889@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
890async def get_custom_views():
891 return CustomView.get_all()
894@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
895async def get_custom_views_from_user_id(user_id: str):
896 return CustomView.get_by_attribute("user_id", user_id)
899@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
900async def get_custom_view(custom_view_id: str):
901 return CustomView.get_from_id(custom_view_id)
904@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
905async def create_custom_view(
906 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
907):
908 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
909 custom_view.insert()
910 return custom_view
913@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
914async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
915 custom_view = CustomView.get_from_id(custom_view_id)
916 return custom_view.update(custom_view_update.model_dump())
919@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
920async def delete_custom_view(custom_view_id: str):
921 custom_view = CustomView.get_from_id(custom_view_id)
922 return custom_view.delete()
925@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
926async def add_video(video: Video):
927 video.insert()
928 if not video: # pragma: no cover
929 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
930 return video
933@app.get("/videos", dependencies=[Depends(get_current_active_user)])
934async def get_videos():
935 return Video.get_all()
938@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
939def get_stream(video_id):
940 camera_name = Video.get_video(video_id)
941 if camera_name is None:
942 raise HTTPException(status_code=404, detail="Camera not found")
943 return camera_name
946@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
947async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
948 return SignalsPreset.get_by_attribute("user_id", current_user.id)
951@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
952async def create_signals_preset(
953 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
954):
955 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
956 return new_signals_preset
959@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
960async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
961 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
962 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
965@app.delete(
966 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
967)
968async def delete_signals_preset(signals_preset_id: str):
969 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
970 return signals_preset.delete()
973@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
974async def create_graph_theme(
975 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
976):
977 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
978 if styled_signal is None:
979 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
981 graph_theme = PrivateGraphTheme.create(
982 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
983 )
984 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
987@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
988async def get_all_graph_themes(
989 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
990) -> ListResponse[PublicGraphTheme]:
991 return PublicGraphTheme.response_from_query(query, current_user.id)
994@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
995async def get_graph_themes_in_library(
996 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
997) -> ListResponse[PublicGraphTheme]:
998 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
1001@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
1002async def update_graph_theme(
1003 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
1004):
1005 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
1006 update_dict = theme_update.model_dump(exclude_unset=True)
1007 if current_user.id != graph_theme.creator_id:
1008 for theme_property in update_dict.keys():
1009 if theme_property not in ["active_for_user", "in_user_library"]:
1010 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
1011 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
1012 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
1015@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
1016async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
1017 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
1018 if current_user.id != graph_theme.creator_id:
1019 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
1020 return graph_theme.delete()
1023@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
1024async def get_signals_appearances(
1025 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
1026) -> dict:
1027 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)
1030if DEVICE_DEPLOYERS:
1031 app.include_router(deployers_router, prefix="/device-deployers")