Coverage for / usr / local / lib / python3.11 / site-packages / twinpad_backend / api.py: 96%
489 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-01 14:04 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-01 14:04 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 ForcedSignal,
20 SignalData,
21 SignalSample,
22 ServicesStatus,
23 Device,
24 DeviceUpdate,
25 DeviceSetup,
26 DeviceSetupUpdate,
27 DeviceState,
28 SignalUpdate,
29 SignalsData,
30 Event,
31 EventRule,
32 TwinPadActivity,
33 User,
34 UserUpdate,
35 Campaign,
36 Phase,
37 CustomView,
38 Command,
39 CustomViewCreation,
40 CustomViewUpdate,
41 Video,
42 SignalsPreset,
43 SignalsPresetCreation,
44 SignalsPresetUpdate,
45 PrivateGraphTheme,
46 PublicGraphTheme,
47 GraphThemeCreation,
48 GraphThemeUpdate,
49 SINGLE_POST_PROCESSING_FUNCTION,
50 DOUBLE_POST_PROCESSING_FUNCTION,
51 MULTIPLE_POST_PROCESSING_FUNCTION,
52)
53from twinpad_backend.auth import (
54 Token,
55 authenticate_user,
56 get_current_active_user,
57 ACCESS_TOKEN_EXPIRE_MINUTES,
58 create_access_token,
59 get_password_hash,
60)
61from twinpad_backend.queries import (
62 SignalQuery,
63 ForcedSignalQuery,
64 DeviceStatesQuery,
65 EventQuery,
66 EventRuleQuery,
67 CommandQuery,
68 GraphThemeQuery,
69)
70from twinpad_backend.responses import ListResponse
72REQUEST_TIME_WARNING = 0.5
74DEBUG = os.environ.get("DEBUG", "false") == "true"
75PROFILING = os.environ.get("PROFILING", "false") == "true"
77logger = logging.getLogger("uvicorn.error")
78logger.propagate = False
79logger.info("Debug mode: %s", DEBUG)
80logger.info("log level: %s", logging.root.level)
83app = FastAPI(title="Twinpad backend", version=__version__)
85app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
87if PROFILING: # pragma: no cover
88 profiling_folder = mkdtemp()
89 logger.info("Profiling enabled")
91 @app.middleware("http")
92 async def profile_request(request: Request, call_next):
93 should_profile = True
94 url = str(request.url)
95 for segment in ("profiling", ".ico"):
96 if segment in url:
97 should_profile = False
98 break
100 if should_profile: # avoid recursion
101 profiler = Profiler()
102 profiler.start()
104 response = await call_next(request)
106 profiler.stop()
107 url = "_".join(url.split("/")[3:]).rstrip("/")
108 if not url:
109 url = "slash"
110 filename = url.split("?", maxsplit=1)[0]
111 logger.info("saving profiling to %s", filename)
112 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
113 profiling_file.write(profiler.output_html())
115 return response
117 return await call_next(request)
119 @app.get("/profilings")
120 async def profilings():
121 return {"profilings": os.listdir(profiling_folder)}
123 @app.get("/profilings/{file_name}")
124 async def profiling(file_name):
125 file_path = os.path.join(profiling_folder, file_name)
127 if not os.path.exists(file_path):
128 raise HTTPException(
129 status_code=404,
130 detail=f"Profiling file '{file_name}' not found",
131 )
133 with open(file_path, "r", encoding="utf-8") as profiling_file:
134 return Response(
135 content=profiling_file.read(),
136 media_type="application/html",
137 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
138 )
141@app.middleware("http")
142async def log_request_time(request: Request, call_next):
143 start_time = time.time() # Record the start time
144 response = await call_next(request) # Process the request
145 duration = time.time() - start_time # Calculate the time taken
146 client_ip = request.headers.get("x-forwarded-for", request.client.host)
147 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
148 if duration > REQUEST_TIME_WARNING:
149 logger.warning(message)
150 else:
151 logger.info(message)
152 return response
155@app.get("/")
156async def slash():
157 return {"twinpad_version": __version__}
160@app.get("/status", dependencies=[Depends(get_current_active_user)])
161async def status():
162 """
163 Return service healthcheck
164 """
165 return {
166 "services": ServicesStatus.check(),
167 }
170@app.get("/devices", dependencies=[Depends(get_current_active_user)])
171async def get_devices() -> list[Device]:
172 return Device.get_all(sort_by="device_id")
175@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
176async def get_device(device_id):
177 device = Device.get_one_by_attribute("device_id", device_id)
178 if not device:
179 raise HTTPException(
180 status_code=404,
181 detail="Device not found",
182 )
183 return device
186@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
187async def update_item(
188 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
189):
190 device = Device.get_one_by_attribute("device_id", device_id)
191 if not device:
192 raise HTTPException(
193 status_code=404,
194 detail="Device not found",
195 )
196 result = await device.change_mode(device_update, current_user)
197 if result.get("error", False) is True:
198 raise HTTPException(
199 status_code=result.get("status_code", 500),
200 detail=result.get("message", "An error has occurred"),
201 )
202 return result
205@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
206async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
207 return DeviceState.get_from_id_and_query(device_id, query)
210@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
211async def get_device_setups() -> list[DeviceSetup]:
212 return DeviceSetup.get_all()
215@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
216async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
217 device_setup.insert()
218 return device_setup
221@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
222async def get_device_setup(device_setup_id: str):
223 device_setup = DeviceSetup.get_from_id(device_setup_id)
224 if device_setup is None:
225 raise HTTPException(
226 status_code=404,
227 detail="Device setup not found",
228 )
229 return device_setup
232@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
233async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
234 device_setup = DeviceSetup.get_from_id(device_setup_id)
235 if device_setup is None:
236 raise HTTPException(
237 status_code=404,
238 detail="Device setup not found",
239 )
240 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
241 return device_setup
244@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
245async def delete_device_setups(device_setup_id: str) -> bool:
246 device_setup = DeviceSetup.get_from_id(device_setup_id)
247 if device_setup is None:
248 raise HTTPException(
249 status_code=404,
250 detail="Device setup not found",
251 )
252 deleted = device_setup.delete()
253 return deleted
256@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
257async def get_number_samples(
258 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
259) -> list[TwinPadActivity]:
260 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
263@app.get("/signals", dependencies=[Depends(get_current_active_user)])
264async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
265 if "signal_id" not in query.sort_by:
266 query.sort_by += ",signal_id:1"
267 return Signal.response_from_query(query).to_dict(exclude={"device"})
270@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
271async def signals_names() -> list[str]:
272 return Signal.get_all_ids()
275@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
276async def signal_stats():
277 """
278 Returns signals stats
279 """
280 signal_statuses = Signal.get_all_statuses()
281 signal_ids = [signal["signal_id"] for signal in signal_statuses]
283 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids)
284 number_samples = sum(number_samples_by_signal_id.values())
286 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
288 number_signals = Signal.get_number_documents()
290 return {
291 "signal_data_size": signal_datasize(),
292 "number_signal_samples": number_samples,
293 "number_active_signals": number_active_signals,
294 "number_signals": number_signals,
295 }
298@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
299async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
300 return SignalSample.get_last_from_signal_ids(signal_ids)
303@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
304async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
305 return SignalSample.get_first_from_signal_ids(signal_ids)
308@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
309async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
310 return Signal.get_forcibility(signal_ids)
313@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)])
314async def get_forced_signals(
315 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends()
316):
317 if not current_user.is_admin:
318 raise HTTPException(401)
319 return ForcedSignal.response_from_query(query)
322@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
323async def get_signal(signal_id):
324 signal = Signal.get_from_signal_id(signal_id)
325 if not signal:
326 raise HTTPException(
327 status_code=404,
328 detail="Signal not found",
329 )
330 return signal.to_dict()
333@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
334async def update_signal(
335 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
336):
337 signal = Signal.get_from_signal_id(signal_id)
338 if not signal:
339 raise HTTPException(
340 status_code=404,
341 detail="Device not found",
342 )
343 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id)
344 if forced_signal is not None:
345 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
346 raise HTTPException(
347 status_code=403,
348 detail="Cannot override another user's forcing",
349 )
351 result = await signal.send_command(signal_update, current_user)
352 if result.get("error", False) is True:
353 raise HTTPException(
354 status_code=result.get("status_code", 500),
355 detail=result.get("message", "An error has occurred"),
356 )
358 if forced_signal is not None and signal_update.forced_value is None:
359 forced_signal.delete()
360 elif signal_update.forced_value is not None:
361 forced_signal = ForcedSignal(
362 signal_id=signal_id,
363 forcing_user_id=current_user.id,
364 forced_at=time.time(),
365 value=signal_update.forced_value,
366 )
367 forced_signal.insert()
369 return result
372@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)])
373async def get_signal_forcibility(
374 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)]
375) -> bool:
376 return ForcedSignal.can_force(signal_id, current_user)
379@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
380async def get_signal_data(
381 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
382) -> SignalData | None:
383 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
385 if number_samples_max is not None:
386 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
388 return signal_data
391@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
392async def get_last_value(signal_id) -> SignalSample:
393 sample = SignalSample.get_last_from_signal_id(signal_id)
394 if sample is None:
395 raise HTTPException(status_code=404, detail="No data")
396 return sample
399@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
400async def get_first_value(signal_id) -> SignalSample:
401 sample = SignalSample.get_first_from_signal_id(signal_id)
402 if sample is None:
403 raise HTTPException(status_code=404, detail="No data")
404 return sample
407@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
408async def get_signal_number_samples(signal_id):
409 signal = Signal.get_from_signal_id(signal_id)
410 if not signal:
411 raise HTTPException(
412 status_code=404,
413 detail="Device not found",
414 )
415 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
418@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
419async def get_signals_data(
420 signal_ids: list[str] = Query(default=[]),
421 number_samples_max: int = None,
422 min_timestamp: float = None,
423 max_timestamp: float = None,
424 interpolate_bounds: bool = True,
425) -> SignalsData | None:
426 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
427 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
429 signals_data = SignalsData.get_from_signal_ids(
430 signal_ids,
431 min_timestamp=min_timestamp,
432 max_timestamp=max_timestamp,
433 window_min_timestamp=min_timestamp,
434 window_max_timestamp=max_timestamp,
435 interpolate_bounds=interpolate_bounds,
436 )
437 if number_samples_max is not None:
438 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
440 return signals_data
443@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
444async def get_signals_data_interest_window(
445 window_max_number_samples: int | None = None,
446 outside_max_number_samples: int | None = None,
447 window_min_timestamp: float = None,
448 window_max_timestamp: float = None,
449 signal_ids: list[str] = Query(default=[]),
450 min_timestamp: float = None,
451 max_timestamp: float = None,
452) -> SignalsData | None:
453 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
454 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
456 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
457 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
459 max_documents = 0
461 if window_max_number_samples is not None:
462 max_documents += 10 * window_max_number_samples
463 if outside_max_number_samples is not None:
464 max_documents += 10 * outside_max_number_samples
466 if max_documents == 0:
467 max_documents = None
469 signals_data = SignalsData.get_from_signal_ids(
470 signal_ids,
471 min_timestamp=min_timestamp,
472 max_timestamp=max_timestamp,
473 window_min_timestamp=window_min_timestamp,
474 window_max_timestamp=window_max_timestamp,
475 max_documents=max_documents,
476 )
478 signals_data = signals_data.interest_window_desampling(
479 window_max_number_samples=window_max_number_samples,
480 outside_max_number_samples=outside_max_number_samples,
481 window_min_timestamp=window_min_timestamp,
482 window_max_timestamp=window_max_timestamp,
483 )
485 return signals_data
488@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
489async def export_signals_zip(
490 file_format: str,
491 signal_ids: list[str] = Query(default=[]),
492 min_timestamp: float = None,
493 max_timestamp: float = None,
494):
495 signals_data = SignalsData.get_from_signal_ids(
496 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
497 )
498 zip_data = signals_data.zip_export(file_format)
499 return Response(
500 content=zip_data,
501 media_type="application/zip",
502 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
503 )
506@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
507async def export_signals_hdf5(
508 signal_ids: list[str] = Query(default=[]),
509 min_timestamp: float = None,
510 max_timestamp: float = None,
511):
512 signals_data = SignalsData.get_from_signal_ids(
513 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
514 )
515 data = signals_data.hdf5_export()
516 return Response(
517 content=data,
518 media_type="application/hdf5",
519 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
520 )
523@app.get("/post-processing/signals-data", dependencies=[Depends(get_current_active_user)])
524async def get_signals_data_post_processing(
525 phase_ids: list[str] = Query(default=[]),
526 phase_sync_times: list[float | None] = Query(default=[]),
527 signal_ids: list[str] = Query(default=[]),
528 window_min_timestamps: list[float | None] = Query(default=[]),
529 window_max_timestamps: list[float | None] = Query(default=[]),
530 number_samples_max: int = None,
531) -> SignalsData | None:
532 if len(phase_sync_times) == 0:
533 phase_sync_times = [None for _ in range(len(phase_ids))]
534 if len(window_min_timestamps) == 0:
535 window_min_timestamps = [None for _ in range(len(phase_ids))]
536 if len(window_max_timestamps) == 0:
537 window_max_timestamps = [None for _ in range(len(phase_ids))]
539 if (
540 len(phase_ids) != len(phase_sync_times)
541 or len(phase_ids) != len(window_min_timestamps)
542 or len(phase_ids) != len(window_max_timestamps)
543 ):
544 raise HTTPException(
545 400, "Each phase should have corresponding synchronization time, minimum and maximum timestamps."
546 )
548 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
550 if None in phases:
551 raise HTTPException(404, "Phase not found")
553 signals_data = SignalsData.get_from_phase_and_signal_ids(
554 phases=phases,
555 phase_sync_times=phase_sync_times,
556 signal_ids=signal_ids,
557 window_min_timestamps=window_min_timestamps,
558 window_max_timestamps=window_max_timestamps,
559 )
561 if number_samples_max is not None:
562 signals_data = signals_data.min_max_downsampling(number_samples_max)
564 return signals_data
567@app.get("/post-processing/functions/single", dependencies=[Depends(get_current_active_user)])
568async def apply_single_post_processing_function(
569 phase_id: str,
570 base_signal_id: str,
571 function: SINGLE_POST_PROCESSING_FUNCTION,
572 phase_sync_time: float = None,
573 window_min_timestamp: float = None,
574 window_max_timestamp: float = None,
575 number_samples_max: int = None,
576) -> SignalsData | None:
577 phase = Phase.get_from_id(phase_id)
579 if phase is None:
580 raise HTTPException(404, "Phase not found")
581 if phase_sync_time is None:
582 phase_sync_time = phase.start_at / 1000
584 signals_data = await SignalsData.apply_single_function(
585 phase,
586 base_signal_id,
587 function,
588 window_min_timestamp=window_min_timestamp,
589 window_max_timestamp=window_max_timestamp,
590 )
592 if signals_data is None:
593 raise HTTPException(500, "There was en error while applying the function")
595 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
596 signals_data = signals_data.min_max_downsampling(number_samples_max)
597 signals_data = signals_data.zero_time_vector(phase_sync_time)
599 return signals_data
602@app.get("/post-processing/functions/multiple", dependencies=[Depends(get_current_active_user)])
603async def apply_multiple_post_processing_function(
604 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
605 phase_ids: list[str] = Query(default=[]),
606 phase_sync_times: list[float] = Query(default=[]),
607 signal_ids: list[str] = Query(default=[]),
608 window_min_timestamp: float = None,
609 window_max_timestamp: float = None,
610 number_samples_max: int = None,
611) -> SignalsData | None:
612 if len(phase_ids) != len(signal_ids):
613 raise HTTPException(400, "Each selected signal should correspond to a phase")
615 if len(phase_ids) < 2:
616 raise HTTPException(400, "These functions can only be applied to multiple signals")
618 phases = [Phase.get_from_id(phase_id) for phase_id in phase_ids]
619 if None in phases:
620 raise HTTPException(404, "Phase not found")
622 if len(phase_sync_times) == 0:
623 phase_sync_times = [phase.start_at / 1000 for phase in phases]
624 if len(phases) != len(phase_sync_times):
625 raise HTTPException(400, "Number of synchronization times does not match the number of phases")
627 signals_data = await SignalsData.apply_multiple_function(
628 phases,
629 signal_ids,
630 function,
631 window_min_timestamp=window_min_timestamp,
632 window_max_timestamp=window_max_timestamp,
633 )
635 if signals_data is None:
636 raise HTTPException(500, "There was en error while applying the function")
638 if number_samples_max is not None and number_samples_max < len(signals_data.signals_data[0].time_vector):
639 signals_data = signals_data.min_max_downsampling(number_samples_max)
640 if function in {"Align-X", "Using-X"}:
641 signals_data = signals_data.zero_time_vector(phase_sync_times[1])
642 else:
643 signals_data = signals_data.zero_time_vector(phase_sync_times[0])
645 return signals_data
648@app.get("/post-processing/export_zip", dependencies=[Depends(get_current_active_user)])
649async def export_post_processing_zip(
650 file_format: str,
651 phase_ids: list[str] = Query(default=[]),
652 phase_sync_times: list[float | None] = Query(default=[]),
653 signal_ids: list[str] = Query(default=[]),
654 window_min_timestamps: list[float | None] = Query(default=[]),
655 window_max_timestamps: list[float | None] = Query(default=[]),
656):
657 signals_data = await get_signals_data_post_processing(
658 phase_ids,
659 phase_sync_times,
660 signal_ids,
661 window_min_timestamps,
662 window_max_timestamps,
663 )
665 zip_data = signals_data.zip_export(file_format, post_processing=True, phase_ids=phase_ids)
667 return Response(
668 content=zip_data,
669 media_type="application/zip",
670 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
671 )
674@app.get("/post-processing/export_hdf5", dependencies=[Depends(get_current_active_user)])
675async def export_post_processing_hdf5(
676 phase_ids: list[str] = Query(default=[]),
677 phase_sync_times: list[float | None] = Query(default=[]),
678 signal_ids: list[str] = Query(default=[]),
679 window_min_timestamps: list[float | None] = Query(default=[]),
680 window_max_timestamps: list[float | None] = Query(default=[]),
681):
682 signals_data = await get_signals_data_post_processing(
683 phase_ids,
684 phase_sync_times,
685 signal_ids,
686 window_min_timestamps,
687 window_max_timestamps,
688 )
690 data = signals_data.hdf5_export(post_processing=True, phase_ids=phase_ids)
692 return Response(
693 content=data,
694 media_type="application/hdf5",
695 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
696 )
699@app.get("/events", dependencies=[Depends(get_current_active_user)])
700async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
701 return Event.response_from_query(query)
704@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
705async def get_event(event_id) -> Event:
706 event = Event.get_from_id(event_id)
707 if event is None:
708 raise HTTPException(status_code=404, detail="No such event")
709 return event
712@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
713async def get_number_events(
714 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
715) -> list[TwinPadActivity]:
716 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
719@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
720async def get_number_commands(
721 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
722) -> list[TwinPadActivity]:
723 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
726@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
727async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
728 return EventRule.response_from_query(query)
731@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
732async def get_event_rule(event_rule_id) -> EventRule:
733 event_rule = EventRule.get_from_id(event_rule_id)
734 if event_rule is None:
735 raise HTTPException(status_code=404, detail="No such event rule")
736 return event_rule
739@app.post("/users", status_code=201)
740async def create_user(user: User):
741 if User.get_one_by_attribute("email", user.email) is not None:
742 raise HTTPException(status_code=400, detail="An error occurred during account creation")
743 hashed_password = get_password_hash(user.password)
744 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
745 if new_user is None:
746 raise HTTPException(status_code=400, detail="An error occurred during account creation")
747 return new_user
750@app.post("/token", status_code=201)
751async def login_for_access_token(
752 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
753) -> Token:
754 user = authenticate_user(form_data.username, form_data.password)
755 if not user:
756 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
757 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
758 if user.is_active:
759 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
760 access_token = create_access_token(
761 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
762 )
763 return Token(access_token=access_token, token_type="bearer")
766@app.get("/users", dependencies=[Depends(get_current_active_user)])
767async def get_users():
768 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
771@app.get("/users/me", response_model=User)
772async def get_current_user(
773 current_user: Annotated[User, Depends(get_current_active_user)],
774):
775 del current_user.password
776 return current_user
779@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
780async def get_user(user_id: str):
781 user = User.get_from_id(user_id)
783 if user is None:
784 raise HTTPException(
785 status_code=404,
786 detail="User not found",
787 )
788 return user.to_dict(exclude={"password", "is_connected"})
791@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
792async def patch_user(user: UserUpdate, user_id):
793 if user.password == "" or user.password is None:
794 del user.password
795 else:
796 user.password = get_password_hash(user.password)
797 return User.update_info(user, user_id)
800@app.get("/commands", dependencies=[Depends(get_current_active_user)])
801async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
802 return Command.response_from_query(query)
805@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
806async def get_campaigns():
807 return Campaign.get_all()
810@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
811async def get_campaign_by_id(campaign_id: str):
812 campaign = Campaign.get_from_id(campaign_id)
813 if campaign is None:
814 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
815 return campaign
818@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
819async def add_campaign(campaign: Campaign):
820 new_campaign = Campaign.create(campaign)
821 if new_campaign is None:
822 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
823 return new_campaign
826@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
827async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
828 campaign = Campaign.get_from_id(campaign_id)
829 if campaign is None:
830 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
831 campaign.name = edit_campaign.name
832 campaign.description = edit_campaign.description
833 return Campaign.update(campaign)
836@app.delete(
837 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
838)
839async def delete_campaign(campaign_id: str):
840 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
841 campaign = Campaign.get_from_id(campaign_id)
842 if campaign is None:
843 raise exception
844 delete_phases = Phase.deleteMany(campaign_id)
845 if not delete_phases.acknowledged:
846 raise exception
847 campaign_deleted = Campaign.delete(campaign_id)
848 return campaign_deleted.acknowledged
851@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
852async def get_campaign_phases(campaign_id: str):
853 return Phase.get_by_attribute("campaign_id", campaign_id)
856@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
857async def get_phase(phase_id: str):
858 phase = Phase.get_from_id(phase_id)
859 if phase is None:
860 raise HTTPException(status_code=404, detail="Phase not found")
861 return phase
864@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
865async def add_phase(phase: Phase):
866 new_phase = Phase.create(phase)
867 if new_phase is None:
868 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
869 return new_phase
872@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
873async def edit_phase(phase_id, edit_phase: Phase):
874 phase = Phase.get_from_id(phase_id)
875 if phase is None:
876 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
877 phase.name = edit_phase.name
878 phase.description = edit_phase.description
879 phase.start_at = edit_phase.start_at
880 phase.end_at = edit_phase.end_at
881 return Phase.update(phase)
884@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
885async def delete_phase(phase_id: str):
886 phase = Phase.get_from_id(phase_id)
887 if phase is None:
888 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
889 phase_deleted = Phase.delete(phase_id)
890 return phase_deleted.acknowledged
893@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
894async def get_custom_views():
895 return CustomView.get_all()
898@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
899async def get_custom_views_from_user_id(user_id: str):
900 return CustomView.get_by_attribute("user_id", user_id)
903@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
904async def get_custom_view(custom_view_id: str):
905 return CustomView.get_from_id(custom_view_id)
908@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
909async def create_custom_view(
910 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
911):
912 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
913 custom_view.insert()
914 return custom_view
917@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
918async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
919 custom_view = CustomView.get_from_id(custom_view_id)
920 return custom_view.update(custom_view_update.model_dump())
923@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
924async def delete_custom_view(custom_view_id: str):
925 custom_view = CustomView.get_from_id(custom_view_id)
926 return custom_view.delete()
929@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
930async def add_video(video: Video):
931 video.insert()
932 if not video:
933 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
934 return video
937@app.get("/videos", dependencies=[Depends(get_current_active_user)])
938async def get_videos():
939 return Video.get_all()
942@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
943def get_stream(video_id):
944 camera_name = Video.get_video(video_id)
945 if camera_name is None:
946 raise HTTPException(status_code=404, detail="Camera not found")
947 return camera_name
950@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
951async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
952 return SignalsPreset.get_by_attribute("user_id", current_user.id)
955@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
956async def create_signals_preset(
957 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
958):
959 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
960 return new_signals_preset
963@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
964async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
965 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
966 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
969@app.delete(
970 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
971)
972async def delete_signals_preset(signals_preset_id: str):
973 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
974 return signals_preset.delete()
977@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
978async def create_graph_theme(
979 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
980):
981 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
982 if styled_signal is None:
983 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
985 graph_theme = PrivateGraphTheme.create(
986 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
987 )
988 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
991@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
992async def get_all_graph_themes(
993 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
994) -> ListResponse[PublicGraphTheme]:
995 return PublicGraphTheme.response_from_query(query, current_user.id)
998@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
999async def get_graph_themes_in_library(
1000 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
1001) -> ListResponse[PublicGraphTheme]:
1002 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
1005@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
1006async def update_graph_theme(
1007 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
1008):
1009 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
1010 update_dict = theme_update.model_dump(exclude_unset=True)
1011 if current_user.id != graph_theme.creator_id:
1012 for theme_property in update_dict.keys():
1013 if theme_property not in ["active_for_user", "in_user_library"]:
1014 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
1015 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
1016 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
1019@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
1020async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
1021 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
1022 if current_user.id != graph_theme.creator_id:
1023 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
1024 return graph_theme.delete()
1027@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
1028async def get_signals_appearances(
1029 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
1030) -> dict:
1031 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)