Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 98%
404 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 10:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 10:53 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 SignalData,
20 SignalSample,
21 ServicesStatus,
22 Device,
23 DeviceUpdate,
24 DeviceSetup,
25 DeviceSetupUpdate,
26 DeviceState,
27 SignalUpdate,
28 SignalsData,
29 Event,
30 EventRule,
31 TwinPadActivity,
32 User,
33 UserUpdate,
34 Campaign,
35 Phase,
36 CustomView,
37 Command,
38 CustomViewCreation,
39 CustomViewUpdate,
40 Video,
41 SignalsPreset,
42 SignalsPresetCreation,
43 SignalsPresetUpdate,
44 PrivateGraphTheme,
45 PublicGraphTheme,
46 GraphThemeCreation,
47 GraphThemeUpdate,
48)
49from twinpad_backend.auth import (
50 Token,
51 authenticate_user,
52 get_current_active_user,
53 ACCESS_TOKEN_EXPIRE_MINUTES,
54 create_access_token,
55 get_password_hash,
56)
57from twinpad_backend.queries import (
58 SignalQuery,
59 DeviceStatesQuery,
60 EventQuery,
61 EventRuleQuery,
62 CommandQuery,
63 GraphThemeQuery,
64)
65from twinpad_backend.responses import ListResponse
67REQUEST_TIME_WARNING = 0.5
69DEBUG = os.environ.get("DEBUG", "false") == "true"
70PROFILING = os.environ.get("PROFILING", "false") == "true"
72logger = logging.getLogger("uvicorn.error")
73logger.propagate = False
74logger.info("Debug mode: %s", DEBUG)
75logger.info("log level: %s", logging.root.level)
78app = FastAPI(title="Twinpad backend", version=__version__)
80app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
82if PROFILING: # pragma: no cover
83 profiling_folder = mkdtemp()
84 logger.info("Profiling enabled")
86 @app.middleware("http")
87 async def profile_request(request: Request, call_next):
88 should_profile = True
89 url = str(request.url)
90 for segment in ("profiling", ".ico"):
91 if segment in url:
92 should_profile = False
93 break
95 if should_profile: # avoid recursion
96 profiler = Profiler()
97 profiler.start()
99 response = await call_next(request)
101 profiler.stop()
102 url = "_".join(url.split("/")[3:]).rstrip("/")
103 if not url:
104 url = "slash"
105 filename = url.split("?", maxsplit=1)[0]
106 logger.info("saving profiling to %s", filename)
107 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
108 profiling_file.write(profiler.output_html())
110 return response
112 return await call_next(request)
114 @app.get("/profilings")
115 async def profilings():
116 return {"profilings": os.listdir(profiling_folder)}
118 @app.get("/profilings/{file_name}")
119 async def profiling(file_name):
120 file_path = os.path.join(profiling_folder, file_name)
122 if not os.path.exists(file_path):
123 raise HTTPException(
124 status_code=404,
125 detail=f"Profiling file '{file_name}' not found",
126 )
128 with open(file_path, "r", encoding="utf-8") as profiling_file:
129 return Response(
130 content=profiling_file.read(),
131 media_type="application/html",
132 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
133 )
136@app.middleware("http")
137async def log_request_time(request: Request, call_next):
138 start_time = time.time() # Record the start time
139 response = await call_next(request) # Process the request
140 duration = time.time() - start_time # Calculate the time taken
141 client_ip = request.headers.get("x-forwarded-for", request.client.host)
142 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
143 if duration > REQUEST_TIME_WARNING:
144 logger.warning(message)
145 else:
146 logger.info(message)
147 return response
150@app.get("/")
151async def slash():
152 return {"twinpad_version": __version__}
155@app.get("/status", dependencies=[Depends(get_current_active_user)])
156async def status():
157 """
158 Return service healthcheck
159 """
160 return {
161 "services": ServicesStatus.check(),
162 }
165@app.get("/devices", dependencies=[Depends(get_current_active_user)])
166async def get_devices() -> list[Device]:
167 return Device.get_all(sort_by="device_id")
170@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
171async def get_device(device_id):
172 device = Device.get_one_by_attribute("device_id", device_id)
173 if not device:
174 raise HTTPException(
175 status_code=404,
176 detail="Device not found",
177 )
178 return device
181@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
182async def update_item(
183 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
184):
185 device = Device.get_one_by_attribute("device_id", device_id)
186 if not device:
187 raise HTTPException(
188 status_code=404,
189 detail="Device not found",
190 )
191 result = await device.change_mode(device_update, current_user)
192 if result.get("error", False) is True:
193 raise HTTPException(
194 status_code=result.get("status_code", 500),
195 detail=result.get("message", "An error has occurred"),
196 )
197 return result
200@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
201async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
202 return DeviceState.get_from_id_and_query(device_id, query)
205@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
206async def get_device_setups() -> list[DeviceSetup]:
207 return DeviceSetup.get_all()
210@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
211async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
212 device_setup.insert()
213 return device_setup
216@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
217async def get_device_setup(device_setup_id: str):
218 device_setup = DeviceSetup.get_from_id(device_setup_id)
219 if device_setup is None:
220 raise HTTPException(
221 status_code=404,
222 detail="Device setup not found",
223 )
224 return device_setup
227@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
228async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
229 device_setup = DeviceSetup.get_from_id(device_setup_id)
230 if device_setup is None:
231 raise HTTPException(
232 status_code=404,
233 detail="Device setup not found",
234 )
235 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
236 return device_setup
239@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
240async def delete_device_setups(device_setup_id: str) -> bool:
241 device_setup = DeviceSetup.get_from_id(device_setup_id)
242 if device_setup is None:
243 raise HTTPException(
244 status_code=404,
245 detail="Device setup not found",
246 )
247 deleted = device_setup.delete()
248 return deleted
251@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
252async def get_number_samples(
253 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
254) -> list[TwinPadActivity]:
255 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
258@app.get("/signals", dependencies=[Depends(get_current_active_user)])
259async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
260 return Signal.response_from_query(query).to_dict(exclude={"device"})
263@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
264async def signals_names() -> list[str]:
265 return Signal.get_all_ids()
268@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
269async def signal_stats():
270 """
271 Returns signals stats
272 """
273 signal_ids = Signal.get_all_ids()
275 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids)
276 number_samples = sum(number_samples_by_signal_id.values())
278 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
279 statuses_by_signal_ids = Signal.status_batch(signal_ids, devices_by_ids)
280 number_active_signals = sum(1 for _, status in statuses_by_signal_ids.items() if status.status == "up")
282 number_signals = Signal.get_number_documents()
284 return {
285 "signal_data_size": signal_datasize(),
286 "number_signal_samples": number_samples,
287 "number_active_signals": number_active_signals,
288 "number_signals": number_signals,
289 }
292@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
293async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
294 return SignalSample.get_last_from_signal_ids(signal_ids)
297@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)])
298async def get_last_values_interest_window(
299 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0
300) -> list[SignalSample | None]:
301 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp)
304@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
305async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
306 return SignalSample.get_first_from_signal_ids(signal_ids)
309@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
310async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
311 return Signal.get_forcibility(signal_ids)
314@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
315async def get_signal(signal_id):
316 signal = Signal.get_from_signal_id(signal_id)
317 if not signal:
318 raise HTTPException(
319 status_code=404,
320 detail="Signal not found",
321 )
322 return signal.to_dict()
325@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
326async def update_signal(
327 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
328):
329 signal = Signal.get_from_signal_id(signal_id)
330 if not signal:
331 raise HTTPException(
332 status_code=404,
333 detail="Device not found",
334 )
335 result = await signal.send_command(signal_update, current_user)
336 if result.get("error", False) is True:
337 raise HTTPException(
338 status_code=result.get("status_code", 500),
339 detail=result.get("message", "An error has occurred"),
340 )
341 return result
344@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
345async def get_signal_data(
346 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
347) -> SignalData | None:
348 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
350 if number_samples_max is not None:
351 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
353 return signal_data
356@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
357async def get_last_value(signal_id) -> SignalSample:
358 sample = SignalSample.get_last_from_signal_id(signal_id)
359 if sample is None:
360 raise HTTPException(status_code=404, detail="No data")
361 return sample
364@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
365async def get_first_value(signal_id) -> SignalSample:
366 sample = SignalSample.get_first_from_signal_id(signal_id)
367 if sample is None:
368 raise HTTPException(status_code=404, detail="No data")
369 return sample
372@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
373async def get_signal_number_samples(signal_id):
374 signal = Signal.get_from_signal_id(signal_id)
375 if not signal:
376 raise HTTPException(
377 status_code=404,
378 detail="Device not found",
379 )
380 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
383@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
384async def get_signals_data(
385 signal_ids: list[str] = Query(default=[]),
386 number_samples_max: int = None,
387 min_timestamp: float = None,
388 max_timestamp: float = None,
389 interpolate_bounds: bool = True,
390) -> SignalsData | None:
391 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
392 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
394 signals_data = SignalsData.get_from_signal_ids(
395 signal_ids,
396 min_timestamp=min_timestamp,
397 max_timestamp=max_timestamp,
398 window_min_timestamp=min_timestamp,
399 window_max_timestamp=max_timestamp,
400 interpolate_bounds=interpolate_bounds,
401 )
402 if number_samples_max is not None:
403 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
405 return signals_data
408@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
409async def get_signals_data_interest_window(
410 window_max_number_samples: int = 600,
411 outside_max_number_samples: int = 150,
412 window_min_timestamp: float = None,
413 window_max_timestamp: float = None,
414 signal_ids: list[str] = Query(default=[]),
415 min_timestamp: float = None,
416 max_timestamp: float = None,
417) -> SignalsData | None:
418 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
419 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
421 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
422 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
424 signals_data = SignalsData.get_from_signal_ids(
425 signal_ids,
426 min_timestamp=min_timestamp,
427 max_timestamp=max_timestamp,
428 window_min_timestamp=window_min_timestamp,
429 window_max_timestamp=window_max_timestamp,
430 max_documents=10 * (window_max_number_samples + outside_max_number_samples),
431 )
433 signals_data = signals_data.interest_window_desampling(
434 window_max_number_samples=window_max_number_samples,
435 outside_max_number_samples=outside_max_number_samples,
436 window_min_timestamp=window_min_timestamp,
437 window_max_timestamp=window_max_timestamp,
438 )
440 return signals_data
443@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
444async def export_signals_zip(
445 file_format: str,
446 signal_ids: list[str] = Query(default=[]),
447 min_timestamp: float = None,
448 max_timestamp: float = None,
449):
450 signals_data = SignalsData.get_from_signal_ids(
451 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
452 )
453 zip_data = signals_data.zip_export(file_format)
454 return Response(
455 content=zip_data,
456 media_type="application/zip",
457 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
458 )
461@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
462async def export_signals_hdf5(
463 signal_ids: list[str] = Query(default=[]),
464 min_timestamp: float = None,
465 max_timestamp: float = None,
466):
467 signals_data = SignalsData.get_from_signal_ids(
468 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
469 )
470 data = signals_data.hdf5_export()
471 return Response(
472 content=data,
473 media_type="application/hdf5",
474 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
475 )
478@app.get("/events", dependencies=[Depends(get_current_active_user)])
479async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
480 return Event.response_from_query(query)
483@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
484async def get_event(event_id) -> Event:
485 event = Event.get_from_id(event_id)
486 if event is None:
487 raise HTTPException(status_code=404, detail="No such event")
488 return event
491@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
492async def get_number_events(
493 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
494) -> list[TwinPadActivity]:
495 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
498@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
499async def get_number_commands(
500 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
501) -> list[TwinPadActivity]:
502 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
505@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
506async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
507 return EventRule.response_from_query(query)
510@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
511async def get_event_rule(event_rule_id) -> EventRule:
512 event_rule = EventRule.get_from_id(event_rule_id)
513 if event_rule is None:
514 raise HTTPException(status_code=404, detail="No such event rule")
515 return event_rule
518@app.post("/users", status_code=201)
519async def create_user(user: User):
520 if User.get_one_by_attribute("email", user.email) is not None:
521 raise HTTPException(status_code=400, detail="An error occurred during account creation")
522 hashed_password = get_password_hash(user.password)
523 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
524 if new_user is None:
525 raise HTTPException(status_code=400, detail="An error occurred during account creation")
526 return new_user
529@app.post("/token", status_code=201)
530async def login_for_access_token(
531 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
532) -> Token:
533 user = authenticate_user(form_data.username, form_data.password)
534 if not user:
535 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
536 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
537 if user.is_active:
538 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
539 access_token = create_access_token(
540 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
541 )
542 return Token(access_token=access_token, token_type="bearer")
545@app.get("/users", dependencies=[Depends(get_current_active_user)])
546async def get_users():
547 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
550@app.get("/users/me", response_model=User)
551async def get_current_user(
552 current_user: Annotated[User, Depends(get_current_active_user)],
553):
554 del current_user.password
555 return current_user
558@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
559async def get_user(user_id: str):
560 user = User.get_from_id(user_id)
562 if user is None:
563 raise HTTPException(
564 status_code=404,
565 detail="User not found",
566 )
567 return user.to_dict(exclude={"password", "is_connected"})
570@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
571async def patch_user(user: UserUpdate, user_id):
572 if user.password == "" or user.password is None:
573 del user.password
574 else:
575 user.password = get_password_hash(user.password)
576 return User.update_info(user, user_id)
579@app.get("/commands", dependencies=[Depends(get_current_active_user)])
580async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
581 return Command.response_from_query(query)
584@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
585async def get_campaigns():
586 return Campaign.get_all()
589@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
590async def get_campaign_by_id(campaign_id: str):
591 campaign = Campaign.get_from_id(campaign_id)
592 if campaign is None:
593 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
594 return campaign
597@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
598async def add_campaign(campaign: Campaign):
599 new_campaign = Campaign.create(campaign)
600 if new_campaign is None:
601 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
602 return new_campaign
605@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
606async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
607 campaign = Campaign.get_from_id(campaign_id)
608 if campaign is None:
609 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
610 campaign.name = edit_campaign.name
611 campaign.description = edit_campaign.description
612 return Campaign.update(campaign)
615@app.delete(
616 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
617)
618async def delete_campaign(campaign_id: str):
619 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
620 campaign = Campaign.get_from_id(campaign_id)
621 if campaign is None:
622 raise exception
623 delete_phases = Phase.deleteMany(campaign_id)
624 if not delete_phases.acknowledged:
625 raise exception
626 campaign_deleted = Campaign.delete(campaign_id)
627 return campaign_deleted.acknowledged
630@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
631async def get_campaign_phases(campaign_id: str):
632 return Phase.get_by_attribute("campaign_id", campaign_id)
635@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
636async def get_phase(phase_id: str):
637 phase = Phase.get_from_id(phase_id)
638 if phase is None:
639 raise HTTPException(status_code=404, detail="Phase not found")
640 return phase
643@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
644async def add_phase(phase: Phase):
645 new_phase = Phase.create(phase)
646 if new_phase is None:
647 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
648 return new_phase
651@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
652async def edit_phase(phase_id, edit_phase: Phase):
653 phase = Phase.get_from_id(phase_id)
654 if phase is None:
655 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
656 phase.name = edit_phase.name
657 phase.description = edit_phase.description
658 phase.start_at = edit_phase.start_at
659 phase.end_at = edit_phase.end_at
660 return Phase.update(phase)
663@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
664async def delete_phase(phase_id: str):
665 phase = Phase.get_from_id(phase_id)
666 if phase is None:
667 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
668 phase_deleted = Phase.delete(phase_id)
669 return phase_deleted.acknowledged
672@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
673async def get_custom_views():
674 return CustomView.get_all()
677@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
678async def get_custom_views_from_user_id(user_id: str):
679 return CustomView.get_by_attribute("user_id", user_id)
682@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
683async def get_custom_view(custom_view_id: str):
684 return CustomView.get_from_id(custom_view_id)
687@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
688async def create_custom_view(
689 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
690):
691 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
692 custom_view.insert()
693 return custom_view
696@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
697async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
698 custom_view = CustomView.get_from_id(custom_view_id)
699 return custom_view.update(custom_view_update.model_dump())
702@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
703async def delete_custom_view(custom_view_id: str):
704 custom_view = CustomView.get_from_id(custom_view_id)
705 return custom_view.delete()
708@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
709async def add_video(video: Video):
710 video.insert()
711 if not video:
712 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
713 return video
716@app.get("/videos", dependencies=[Depends(get_current_active_user)])
717async def get_videos():
718 return Video.get_all()
721@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
722def get_stream(video_id):
723 camera_name = Video.get_video(video_id)
724 if camera_name is None:
725 raise HTTPException(status_code=404, detail="Camera not found")
726 return camera_name
729@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
730async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
731 return SignalsPreset.get_by_attribute("user_id", current_user.id)
734@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
735async def create_signals_preset(
736 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
737):
738 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
739 return new_signals_preset
742@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
743async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
744 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
745 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
748@app.delete(
749 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
750)
751async def delete_signals_preset(signals_preset_id: str):
752 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
753 return signals_preset.delete()
756@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
757async def create_graph_theme(
758 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
759):
760 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
761 if styled_signal is None:
762 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
764 graph_theme = PrivateGraphTheme.create(
765 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
766 )
767 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
770@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
771async def get_all_graph_themes(
772 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
773) -> ListResponse[PublicGraphTheme]:
774 return PublicGraphTheme.response_from_query(query, current_user.id)
777@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
778async def get_graph_themes_in_library(
779 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
780) -> ListResponse[PublicGraphTheme]:
781 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
784@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
785async def update_graph_theme(
786 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
787):
788 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
789 update_dict = theme_update.model_dump(exclude_unset=True)
790 if current_user.id != graph_theme.creator_id:
791 for theme_property in update_dict.keys():
792 if theme_property not in ["active_for_user", "in_user_library"]:
793 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
794 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
795 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
798@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
799async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
800 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
801 if current_user.id != graph_theme.creator_id:
802 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
803 return graph_theme.delete()
806@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
807async def get_signals_appearances(
808 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
809) -> dict:
810 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)