Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / api.py: 95%
495 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 15:40 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 15:40 +0000
1import os
2import logging
3import time
4from typing import Annotated
5from datetime import timedelta
6from pathlib import Path
7from pyinstrument import Profiler
9from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
10from fastapi.middleware.cors import CORSMiddleware
11from fastapi.responses import HTMLResponse
12from fastapi.security import OAuth2PasswordRequestForm
14from twinpad_backend import __version__
15from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
16from twinpad_backend.models import (
17 DeviceId,
18 MongoId,
19 Signal,
20 ForcedSignal,
21 SignalData,
22 SignalSample,
23 ServicesStatus,
24 Device,
25 DeviceUpdate,
26 DeviceSetup,
27 DeviceSetupUpdate,
28 DeviceState,
29 SignalUpdate,
30 SignalsData,
31 Event,
32 EventRule,
33 TwinPadActivity,
34 User,
35 UserUpdate,
36 Campaign,
37 Phase,
38 CustomView,
39 Command,
40 CustomViewCreation,
41 CustomViewUpdate,
42 Video,
43 SignalsPreset,
44 SignalsPresetCreation,
45 SignalsPresetUpdate,
46 PrivateGraphTheme,
47 PublicGraphTheme,
48 GraphThemeCreation,
49 GraphThemeUpdate,
50 SINGLE_POST_PROCESSING_FUNCTION,
51 DOUBLE_POST_PROCESSING_FUNCTION,
52 MULTIPLE_POST_PROCESSING_FUNCTION,
53)
54from twinpad_backend.auth import (
55 Token,
56 authenticate_user,
57 get_current_active_user,
58 ACCESS_TOKEN_EXPIRE_MINUTES,
59 create_access_token,
60 get_password_hash,
61)
62from twinpad_backend.queries import (
63 SignalQuery,
64 ForcedSignalQuery,
65 DeviceStatesQuery,
66 EventQuery,
67 EventRuleQuery,
68 CommandQuery,
69 GraphThemeQuery,
70)
71from twinpad_backend.responses import ListResponse
72from twinpad_backend.routes.deployers import router as deployers_router
73from twinpad_backend.routes.configurator import router as router_configurator
75routers = {"/configurator": router_configurator, "/device-deployers": deployers_router}
77REQUEST_TIME_WARNING = 0.5
79DEBUG = os.environ.get("DEBUG", "false") == "true"
80PROFILING = os.environ.get("PROFILING", "false") == "true"
81DEVICE_DEPLOYERS = os.environ.get("DEVICE_DEPLOYERS", "true") == "true"
83logger = logging.getLogger("uvicorn.error")
84logger.propagate = False
85logger.info("Debug mode: %s", DEBUG)
86logger.info("log level: %s", logging.root.level)
89app = FastAPI(title="Twinpad backend", version=__version__)
91app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
93if PROFILING: # pragma: no cover
94 profiling_folder = "/tmp/twinpad_profiling"
95 Path(profiling_folder).mkdir(parents=True, exist_ok=True)
96 logger.info("Profiling enabled")
98 @app.middleware("http")
99 async def profile_request(request: Request, call_next):
100 should_profile = True
101 url = str(request.url)
102 for segment in ("profiling", ".ico"):
103 if segment in url:
104 should_profile = False
105 break
107 if should_profile: # avoid recursion
108 profiler = Profiler()
109 profiler.start()
111 response = await call_next(request)
113 profiler.stop()
114 url = "_".join(url.split("/")[3:]).rstrip("/")
115 if not url:
116 url = "slash"
117 filename = url.split("?", maxsplit=1)[0]
118 logger.info("saving profiling to %s", filename)
119 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
120 profiling_file.write(profiler.output_html())
122 return response
124 return await call_next(request)
126 @app.get("/profilings")
127 async def profilings():
128 return {"profilings": os.listdir(profiling_folder)}
130 @app.get("/profilings/{file_name}")
131 async def profiling(file_name):
132 file_path = os.path.join(profiling_folder, file_name)
134 if not os.path.exists(file_path):
135 raise HTTPException(
136 status_code=404,
137 detail=f"Profiling file '{file_name}' not found",
138 )
140 with open(file_path, "r", encoding="utf-8") as profiling_file:
141 return Response(
142 content=profiling_file.read(),
143 media_type="application/html",
144 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
145 )
148@app.middleware("http")
149async def log_request_time(request: Request, call_next):
150 start_time = time.time() # Record the start time
151 response = await call_next(request) # Process the request
152 duration = time.time() - start_time # Calculate the time taken
153 client_ip = request.headers.get("x-forwarded-for", request.client.host)
154 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
155 if duration > REQUEST_TIME_WARNING:
156 logger.warning(message)
157 else:
158 logger.info(message)
159 return response
162@app.get("/")
163async def slash():
164 return {"twinpad_version": __version__}
167@app.get("/status", dependencies=[Depends(get_current_active_user)])
168async def status():
169 """
170 Return service healthcheck
171 """
172 return {
173 "services": ServicesStatus.check(),
174 }
177@app.get("/devices", dependencies=[Depends(get_current_active_user)])
178async def get_devices() -> list[Device]:
179 return Device.get_all(sort_by="device_id")
182@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
183async def get_device(device_id) -> Device:
184 device = Device.get_one_by_attribute("device_id", device_id)
185 if not device:
186 raise HTTPException(
187 status_code=404,
188 detail="Device not found",
189 )
190 return device
193@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
194async def update_item(
195 device_id: DeviceId, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
196):
197 device = Device.get_one_by_attribute("device_id", device_id)
198 if not device:
199 raise HTTPException(
200 status_code=404,
201 detail="Device not found",
202 )
203 result = await device.change_mode(device_update, current_user)
204 if result.get("error", False) is True:
205 raise HTTPException(
206 status_code=result.get("status_code", 500),
207 detail=result.get("message", "An error has occurred"),
208 )
209 return result
212@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
213async def get_device_states(device_id: DeviceId, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
214 return DeviceState.get_from_id_and_query(device_id, query)
217@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
218async def get_device_setups() -> list[DeviceSetup]:
219 return DeviceSetup.get_all()
222@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
223async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
224 device_setup.insert()
225 return device_setup
228@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
229async def get_device_setup(device_setup_id: str):
230 device_setup = DeviceSetup.get_from_id(device_setup_id)
231 if device_setup is None:
232 raise HTTPException(
233 status_code=404,
234 detail="Device setup not found",
235 )
236 return device_setup
239@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
240async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
241 device_setup = DeviceSetup.get_from_id(device_setup_id)
242 if device_setup is None:
243 raise HTTPException(
244 status_code=404,
245 detail="Device setup not found",
246 )
247 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
248 return device_setup
251@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
252async def delete_device_setups(device_setup_id: str) -> bool:
253 device_setup = DeviceSetup.get_from_id(device_setup_id)
254 if device_setup is None:
255 raise HTTPException(
256 status_code=404,
257 detail="Device setup not found",
258 )
259 deleted = device_setup.delete()
260 return deleted
263@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
264async def get_number_samples(
265 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
266) -> list[TwinPadActivity]:
267 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
270@app.get("/signals", dependencies=[Depends(get_current_active_user)])
271async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
272 if "signal_id" not in query.sort_by:
273 query.sort_by += ",signal_id:1"
274 return Signal.response_from_query(query).to_dict(exclude={"device"})
277@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
278async def signals_names() -> list[str]:
279 return Signal.get_all_ids()
282@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
283async def signal_stats():
284 """
285 Returns signals stats
286 """
287 signal_statuses = Signal.get_all_statuses()
288 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
289 number_samples = Signal.total_number_samples()
290 number_signals = Signal.get_number_documents()
292 return {
293 "signal_data_size": signal_datasize(),
294 "number_signal_samples": number_samples,
295 "number_active_signals": number_active_signals,
296 "number_signals": number_signals,
297 }
300@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
301async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
302 return SignalSample.get_last_from_signal_ids(signal_ids)
305@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
306async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
307 return SignalSample.get_first_from_signal_ids(signal_ids)
310@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
311async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
312 return Signal.get_forcibility(signal_ids)
315@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)])
316async def get_forced_signals(
317 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends()
318):
319 if not current_user.is_admin:
320 raise HTTPException(401)
321 return ForcedSignal.response_from_query(query)
324@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
325async def get_signal(signal_id):
326 signal = Signal.get_from_signal_id(signal_id)
327 if not signal:
328 raise HTTPException(
329 status_code=404,
330 detail="Signal not found",
331 )
332 return signal.to_dict()
335@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
336async def update_signal(
337 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
338):
339 signal = Signal.get_from_signal_id(signal_id)
340 if not signal:
341 raise HTTPException(
342 status_code=404,
343 detail="Signal not found",
344 )
346 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
347 if device is None:
348 raise HTTPException(
349 status_code=400,
350 detail="Signal doesn't belong to an existing configuration",
351 )
353 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id)
354 if forced_signal is not None:
355 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
356 raise HTTPException(
357 status_code=403,
358 detail="Cannot override another user's forcing",
359 )
361 result = await signal.send_command(device.device_id, signal_update, current_user)
362 if result.get("error", False) is True:
363 raise HTTPException(
364 status_code=result.get("status_code", 500),
365 detail=result.get("message", "An error has occurred"),
366 )
368 if forced_signal is not None and signal_update.forced_value is None:
369 forced_signal.delete()
370 elif signal_update.forced_value is not None:
371 forced_signal = ForcedSignal(
372 signal_id=signal_id,
373 forcing_user_id=current_user.id,
374 forced_at=time.time(),
375 value=signal_update.forced_value,
376 )
377 forced_signal.insert()
379 return result
382@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)])
383async def get_signal_forcibility(
384 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)]
385) -> bool:
386 return ForcedSignal.can_force(signal_id, current_user)
389@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
390async def get_signal_data(
391 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
392) -> SignalData | None:
393 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
395 if number_samples_max is not None:
396 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
398 return signal_data
401@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
402async def get_last_value(signal_id) -> SignalSample:
403 sample = SignalSample.get_last_from_signal_id(signal_id)
404 if sample is None:
405 raise HTTPException(status_code=404, detail="No data")
406 return sample
409@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
410async def get_first_value(signal_id) -> SignalSample:
411 sample = SignalSample.get_first_from_signal_id(signal_id)
412 if sample is None:
413 raise HTTPException(status_code=404, detail="No data")
414 return sample
417@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
418async def get_signal_number_samples(signal_id):
419 signal = Signal.get_from_signal_id(signal_id)
420 if not signal:
421 raise HTTPException(
422 status_code=404,
423 detail="Device not found",
424 )
425 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
428@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
429async def get_signals_data(
430 signal_ids: list[str] = Query(default=[]),
431 number_samples_max: int = None,
432 min_timestamp: float = None,
433 max_timestamp: float = None,
434 interpolate_bounds: bool = True,
435) -> SignalsData | None:
436 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
437 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
439 signals_data = SignalsData.get_from_signal_ids(
440 signal_ids,
441 min_timestamp=min_timestamp,
442 max_timestamp=max_timestamp,
443 window_min_timestamp=min_timestamp,
444 window_max_timestamp=max_timestamp,
445 interpolate_bounds=interpolate_bounds,
446 )
447 if number_samples_max is not None:
448 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
450 return signals_data
453@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
454async def get_signals_data_interest_window(
455 window_max_number_samples: int | None = None,
456 outside_max_number_samples: int | None = None,
457 window_min_timestamp: float = None,
458 window_max_timestamp: float = None,
459 signal_ids: list[str] = Query(default=[]),
460 min_timestamp: float = None,
461 max_timestamp: float = None,
462) -> SignalsData | None:
463 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
464 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
466 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
467 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
469 max_documents = 0
471 if window_max_number_samples is not None:
472 max_documents += 10 * window_max_number_samples
473 if outside_max_number_samples is not None:
474 max_documents += 10 * outside_max_number_samples
476 if max_documents == 0:
477 max_documents = None
479 signals_data = SignalsData.get_from_signal_ids(
480 signal_ids,
481 min_timestamp=min_timestamp,
482 max_timestamp=max_timestamp,
483 window_min_timestamp=window_min_timestamp,
484 window_max_timestamp=window_max_timestamp,
485 max_documents=max_documents,
486 )
488 signals_data = signals_data.interest_window_desampling(
489 window_max_number_samples=window_max_number_samples,
490 outside_max_number_samples=outside_max_number_samples,
491 window_min_timestamp=window_min_timestamp,
492 window_max_timestamp=window_max_timestamp,
493 )
495 return signals_data
498@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
499async def export_signals_zip(
500 file_format: str,
501 signal_ids: list[str] = Query(default=[]),
502 min_timestamp: float = None,
503 max_timestamp: float = None,
504):
505 signals_data = SignalsData.get_from_signal_ids(
506 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
507 )
508 zip_data = signals_data.zip_export(file_format)
509 return Response(
510 content=zip_data,
511 media_type="application/zip",
512 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
513 )
516@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
517async def export_signals_hdf5(
518 signal_ids: list[str] = Query(default=[]),
519 min_timestamp: float = None,
520 max_timestamp: float = None,
521):
522 signals_data = SignalsData.get_from_signal_ids(
523 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
524 )
525 data = signals_data.hdf5_export()
526 return Response(
527 content=data,
528 media_type="application/hdf5",
529 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
530 )
533@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)])
534async def get_signals_data_post_processing(
535 phase_ids: list[str] = Query(default=[]),
536 phase_sync_times: list[float | None] = Query(default=[]),
537 signal_ids: list[str] = Query(default=[]),
538 window_min_timestamps: list[float | None] = Query(default=[]),
539 window_max_timestamps: list[float | None] = Query(default=[]),
540 number_samples_max: int = None,
541) -> SignalsData | None:
542 if len(phase_sync_times) == 0:
543 phase_sync_times = [None for _ in range(len(phase_ids))]
544 if len(window_min_timestamps) == 0:
545 window_min_timestamps = [None for _ in range(len(phase_ids))]
546 if len(window_max_timestamps) == 0:
547 window_max_timestamps = [None for _ in range(len(phase_ids))]
549 if (
550 len(phase_ids) != len(phase_sync_times)
551 or len(phase_ids) != len(window_min_timestamps)
552 or len(phase_ids) != len(window_max_timestamps)
553 ):
554 raise HTTPException(
555 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps."
556 )
558 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
560 if None in phases:
561 raise HTTPException(404, "Phase not found")
563 signals_data = SignalsData.get_from_phase_and_signal_ids(
564 phases=phases,
565 phase_sync_times=phase_sync_times,
566 signal_ids=signal_ids,
567 window_min_timestamps=window_min_timestamps,
568 window_max_timestamps=window_max_timestamps,
569 )
571 if number_samples_max is not None:
572 signals_data = signals_data.min_max_downsampling(number_samples_max)
574 return signals_data
577@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)])
578async def apply_single_post_processing_function(
579 phase_id: str,
580 base_signal_id: str,
581 function: SINGLE_POST_PROCESSING_FUNCTION,
582 phase_sync_time: float = None,
583 window_min_timestamp: float = None,
584 window_max_timestamp: float = None,
585 number_samples_max: int = None,
586) -> SignalsData | None:
587 phase = Phase.get_from_id(phase_id)
589 if phase is None:
590 raise HTTPException(404, "Phase not found")
591 if phase_sync_time is None:
592 phase_sync_time = phase.start_at / 1000
594 signals_data = await SignalsData.apply_single_function(
595 phase,
596 base_signal_id,
597 function,
598 window_min_timestamp=window_min_timestamp,
599 window_max_timestamp=window_max_timestamp,
600 )
602 if signals_data is None:
603 raise HTTPException(500, "There was en error while applying the function")
605 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
606 signals_data = signals_data.min_max_downsampling(number_samples_max)
607 signals_data = signals_data.zero_time_vector(phase_sync_time)
609 return signals_data
612@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)])
613async def apply_multiple_post_processing_function(
614 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
615 phase_ids: list[str] = Query(default=[]),
616 phase_sync_times: list[float] = Query(default=[]),
617 signal_ids: list[str] = Query(default=[]),
618 window_min_timestamp: float = None,
619 window_max_timestamp: float = None,
620 number_samples_max: int = None,
621) -> SignalsData | None:
622 if len(phase_ids) != len(signal_ids):
623 raise HTTPException(400, "Each selected signal should correspond to a phase")
625 if len(phase_ids) < 2:
626 raise HTTPException(400, "These functions can only be applied to multiple signals")
628 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
629 if None in phases:
630 raise HTTPException(404, "Phase not found")
632 if len(phase_sync_times) == 0:
633 phase_sync_times = [phase.start_at / 1000 for phase in phases]
634 if len(phases) != len(phase_sync_times):
635 raise HTTPException(400, "Number of synchronization times does not match the number of phases")
637 signals_data = await SignalsData.apply_multiple_function(
638 phases,
639 signal_ids,
640 function,
641 window_min_timestamp=window_min_timestamp,
642 window_max_timestamp=window_max_timestamp,
643 )
645 if signals_data is None:
646 raise HTTPException(500, "There was en error while applying the function")
648 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
649 signals_data = signals_data.min_max_downsampling(number_samples_max)
650 if function in {"Align-X", "Using-X"}:
651 signals_data = signals_data.zero_time_vector(phase_sync_times[1])
652 else:
653 signals_data = signals_data.zero_time_vector(phase_sync_times[0])
655 return signals_data
658@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)])
659async def export_post_processing_zip(
660 file_format: str,
661 phase_ids: list[MongoId] = Query(default=[]),
662 phase_sync_times: list[float | None] = Query(default=[]),
663 signal_ids: list[str] = Query(default=[]),
664 window_min_timestamps: list[float | None] = Query(default=[]),
665 window_max_timestamps: list[float | None] = Query(default=[]),
666):
667 signals_data = await get_signals_data_post_processing(
668 phase_ids,
669 phase_sync_times,
670 signal_ids,
671 window_min_timestamps,
672 window_max_timestamps,
673 )
675 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids)
677 return Response(
678 content=zip_data,
679 media_type="application/zip",
680 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
681 )
684@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)])
685async def export_post_processing_hdf5(
686 phase_ids: list[str] = Query(default=[]),
687 phase_sync_times: list[float | None] = Query(default=[]),
688 signal_ids: list[str] = Query(default=[]),
689 window_min_timestamps: list[float | None] = Query(default=[]),
690 window_max_timestamps: list[float | None] = Query(default=[]),
691):
692 signals_data = await get_signals_data_post_processing(
693 phase_ids,
694 phase_sync_times,
695 signal_ids,
696 window_min_timestamps,
697 window_max_timestamps,
698 )
700 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids)
702 return Response(
703 content=data,
704 media_type="application/hdf5",
705 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
706 )
709@app.get("/events", dependencies=[Depends(get_current_active_user)])
710async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
711 return Event.response_from_query(query)
714@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
715async def get_event(event_id) -> Event:
716 event = Event.get_from_id(event_id)
717 if event is None:
718 raise HTTPException(status_code=404, detail="No such event")
719 return event
722@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
723async def get_number_events(
724 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
725) -> list[TwinPadActivity]:
726 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
729@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
730async def get_number_commands(
731 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
732) -> list[TwinPadActivity]:
733 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
736@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
737async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
738 return EventRule.response_from_query(query)
741@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
742async def get_event_rule(event_rule_id) -> EventRule:
743 event_rule = EventRule.get_from_id(event_rule_id)
744 if event_rule is None:
745 raise HTTPException(status_code=404, detail="No such event rule")
746 return event_rule
749@app.post("/users", status_code=201)
750async def create_user(user: User):
751 if User.get_one_by_attribute("email", user.email) is not None:
752 raise HTTPException(status_code=400, detail="An error occurred during account creation")
753 hashed_password = get_password_hash(user.password)
754 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
755 if new_user is None:
756 raise HTTPException(status_code=400, detail="An error occurred during account creation")
757 return new_user
760@app.post("/token", status_code=201)
761async def login_for_access_token(
762 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
763) -> Token:
764 user = authenticate_user(form_data.username, form_data.password)
765 if not user:
766 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
767 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
768 if user.is_active:
769 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
770 access_token = create_access_token(
771 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
772 )
773 return Token(access_token=access_token, token_type="bearer")
776@app.get("/users", dependencies=[Depends(get_current_active_user)])
777async def get_users():
778 return [u.to_dict() for u in User.get_all(sort_by="email")]
781@app.get("/users/me")
782async def get_current_user(
783 current_user: Annotated[User, Depends(get_current_active_user)],
784):
785 return current_user.to_dict()
788@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
789async def get_user(user_id: str):
790 user = User.get_from_id(user_id)
792 if user is None:
793 raise HTTPException(
794 status_code=404,
795 detail="User not found",
796 )
797 return user.to_dict(exclude={"password", "is_connected"})
800@app.patch("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
801async def patch_user(user: UserUpdate, user_id):
802 if user.password == "" or user.password is None:
803 del user.password
804 else:
805 user.password = get_password_hash(user.password)
806 return User.update_info(user, user_id).to_dict()
809@app.get("/commands", dependencies=[Depends(get_current_active_user)])
810async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
811 return Command.response_from_query(query)
814@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
815async def get_campaigns():
816 return Campaign.get_all()
819@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
820async def get_campaign_by_id(campaign_id: str):
821 campaign = Campaign.get_from_id(campaign_id)
822 if campaign is None:
823 raise HTTPException(status_code=404, detail="Campaign not found")
824 return campaign
827@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
828async def add_campaign(campaign: Campaign):
829 campaign_id = campaign.insert()
830 if campaign_id is None:
831 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
832 return campaign
835@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
836async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
837 campaign = Campaign.get_from_id(campaign_id)
838 if campaign is None:
839 raise HTTPException(status_code=404, detail="Campaign not found")
840 campaign.update(edit_campaign.model_dump(exclude_unset=True, mode="json"))
841 return campaign
844@app.delete(
845 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
846)
847async def delete_campaign(campaign_id: str):
848 campaign = Campaign.get_from_id(campaign_id)
849 if campaign is None:
850 raise HTTPException(status_code=404, detail="Campaign not found")
851 delete_phases = Phase.deleteMany(campaign_id)
852 if not delete_phases.acknowledged:
853 raise HTTPException(status_code=500, detail="An error occurred during phases deletion")
854 campaign_deleted = campaign.delete()
855 if not campaign_deleted:
856 raise HTTPException(status_code=500, detail="An error occurred during campaign deletion")
857 return True
860@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
861async def get_campaign_phases(campaign_id: str):
862 return Phase.get_by_attribute("campaign_id", campaign_id)
865@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
866async def get_phase(phase_id: str):
867 phase = Phase.get_from_id(phase_id)
868 if phase is None:
869 raise HTTPException(status_code=404, detail="Phase not found")
870 return phase
873@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
874async def add_phase(phase: Phase):
875 phase_id = phase.insert()
876 if phase_id is None:
877 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
878 return phase
881@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
882async def edit_phase(phase_id, edit_phase: Phase):
883 phase = Phase.get_from_id(phase_id)
884 if phase is None:
885 raise HTTPException(status_code=404, detail="Phase does not exists")
886 phase.update(edit_phase.model_dump(exclude_unset=True, mode="json"))
887 return phase
890@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
891async def delete_phase(phase_id: str):
892 phase = Phase.get_from_id(phase_id)
893 if phase is None:
894 raise HTTPException(status_code=404, detail="Phase not found")
895 deleted = phase.delete()
896 if not deleted:
897 raise HTTPException(status_code=500, detail="An error occurred during phase deletion")
898 return True
901@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
902async def get_custom_views():
903 return CustomView.get_all()
906@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
907async def get_custom_views_from_user_id(user_id: str):
908 return CustomView.get_by_attribute("user_id", user_id)
911@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
912async def get_custom_view(custom_view_id: str):
913 return CustomView.get_from_id(custom_view_id)
916@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
917async def create_custom_view(
918 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
919):
920 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
921 custom_view.insert()
922 return custom_view
925@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
926async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
927 custom_view = CustomView.get_from_id(custom_view_id)
928 return custom_view.update(custom_view_update.model_dump())
931@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
932async def delete_custom_view(custom_view_id: str):
933 custom_view = CustomView.get_from_id(custom_view_id)
934 return custom_view.delete()
937@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
938async def add_video(video: Video):
939 video.insert()
940 if not video:
941 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
942 return video
945@app.get("/videos", dependencies=[Depends(get_current_active_user)])
946async def get_videos():
947 return Video.get_all()
950@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
951def get_stream(video_id):
952 camera_name = Video.get_video(video_id)
953 if camera_name is None:
954 raise HTTPException(status_code=404, detail="Camera not found")
955 return camera_name
958@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
959async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
960 return SignalsPreset.get_by_attribute("user_id", current_user.id)
963@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
964async def create_signals_preset(
965 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
966):
967 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
968 return new_signals_preset
971@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
972async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
973 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
974 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
977@app.delete(
978 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
979)
980async def delete_signals_preset(signals_preset_id: str):
981 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
982 return signals_preset.delete()
985@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
986async def create_graph_theme(
987 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
988):
989 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
990 if styled_signal is None:
991 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
993 graph_theme = PrivateGraphTheme.create(
994 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
995 )
996 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
999@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
1000async def get_all_graph_themes(
1001 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
1002) -> ListResponse[PublicGraphTheme]:
1003 return PublicGraphTheme.response_from_query(query, current_user.id)
1006@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
1007async def get_graph_themes_in_library(
1008 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
1009) -> ListResponse[PublicGraphTheme]:
1010 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
1013@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
1014async def update_graph_theme(
1015 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
1016):
1017 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
1018 update_dict = theme_update.model_dump(exclude_unset=True)
1019 if current_user.id != graph_theme.creator_id:
1020 for theme_property in update_dict.keys():
1021 if theme_property not in ["active_for_user", "in_user_library"]:
1022 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
1023 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
1024 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
1027@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
1028async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
1029 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
1030 if current_user.id != graph_theme.creator_id:
1031 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
1032 return graph_theme.delete()
1035@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
1036async def get_signals_appearances(
1037 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
1038) -> dict:
1039 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)
1042for prefix, router in routers.items():
1043 if prefix == "/device-deployers" and not DEVICE_DEPLOYERS:
1044 continue
1046 app.include_router(router, prefix=prefix)