Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 98%
405 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-17 10:20 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-17 10:20 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 SignalData,
20 SignalSample,
21 ServicesStatus,
22 Device,
23 DeviceUpdate,
24 DeviceSetup,
25 DeviceSetupUpdate,
26 DeviceState,
27 SignalUpdate,
28 SignalsData,
29 Event,
30 EventRule,
31 TwinPadActivity,
32 User,
33 UserUpdate,
34 Campaign,
35 Phase,
36 CustomView,
37 Command,
38 CustomViewCreation,
39 CustomViewUpdate,
40 Video,
41 SignalsPreset,
42 SignalsPresetCreation,
43 SignalsPresetUpdate,
44 PrivateGraphTheme,
45 PublicGraphTheme,
46 GraphThemeCreation,
47 GraphThemeUpdate,
48)
49from twinpad_backend.auth import (
50 Token,
51 authenticate_user,
52 get_current_active_user,
53 ACCESS_TOKEN_EXPIRE_MINUTES,
54 create_access_token,
55 get_password_hash,
56)
57from twinpad_backend.queries import (
58 SignalQuery,
59 DeviceStatesQuery,
60 EventQuery,
61 EventRuleQuery,
62 CommandQuery,
63 GraphThemeQuery,
64)
65from twinpad_backend.responses import ListResponse
67REQUEST_TIME_WARNING = 0.5
69DEBUG = os.environ.get("DEBUG", "false") == "true"
70PROFILING = os.environ.get("PROFILING", "false") == "true"
72logger = logging.getLogger("uvicorn.error")
73logger.propagate = False
74logger.info("Debug mode: %s", DEBUG)
75logger.info("log level: %s", logging.root.level)
78app = FastAPI(title="Twinpad backend", version=__version__)
80app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
82if PROFILING: # pragma: no cover
83 profiling_folder = mkdtemp()
84 logger.info("Profiling enabled")
86 @app.middleware("http")
87 async def profile_request(request: Request, call_next):
88 should_profile = True
89 url = str(request.url)
90 for segment in ("profiling", ".ico"):
91 if segment in url:
92 should_profile = False
93 break
95 if should_profile: # avoid recursion
96 profiler = Profiler()
97 profiler.start()
99 response = await call_next(request)
101 profiler.stop()
102 url = "_".join(url.split("/")[3:]).rstrip("/")
103 if not url:
104 url = "slash"
105 filename = url.split("?", maxsplit=1)[0]
106 logger.info("saving profiling to %s", filename)
107 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
108 profiling_file.write(profiler.output_html())
110 return response
112 return await call_next(request)
114 @app.get("/profilings")
115 async def profilings():
116 return {"profilings": os.listdir(profiling_folder)}
118 @app.get("/profilings/{file_name}")
119 async def profiling(file_name):
120 file_path = os.path.join(profiling_folder, file_name)
122 if not os.path.exists(file_path):
123 raise HTTPException(
124 status_code=404,
125 detail=f"Profiling file '{file_name}' not found",
126 )
128 with open(file_path, "r", encoding="utf-8") as profiling_file:
129 return Response(
130 content=profiling_file.read(),
131 media_type="application/html",
132 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
133 )
136@app.middleware("http")
137async def log_request_time(request: Request, call_next):
138 start_time = time.time() # Record the start time
139 response = await call_next(request) # Process the request
140 duration = time.time() - start_time # Calculate the time taken
141 client_ip = request.headers.get("x-forwarded-for", request.client.host)
142 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
143 if duration > REQUEST_TIME_WARNING:
144 logger.warning(message)
145 else:
146 logger.info(message)
147 return response
150@app.get("/")
151async def slash():
152 return {"twinpad_version": __version__}
155@app.get("/status", dependencies=[Depends(get_current_active_user)])
156async def status():
157 """
158 Return service healthcheck
159 """
160 return {
161 "services": ServicesStatus.check(),
162 }
165@app.get("/devices", dependencies=[Depends(get_current_active_user)])
166async def get_devices() -> list[Device]:
167 return Device.get_all(sort_by="device_id")
170@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
171async def get_device(device_id):
172 device = Device.get_one_by_attribute("device_id", device_id)
173 if not device:
174 raise HTTPException(
175 status_code=404,
176 detail="Device not found",
177 )
178 return device
181@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
182async def update_item(
183 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
184):
185 device = Device.get_one_by_attribute("device_id", device_id)
186 if not device:
187 raise HTTPException(
188 status_code=404,
189 detail="Device not found",
190 )
191 result = await device.change_mode(device_update, current_user)
192 if result.get("error", False) is True:
193 raise HTTPException(
194 status_code=result.get("status_code", 500),
195 detail=result.get("message", "An error has occurred"),
196 )
197 return result
200@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
201async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
202 return DeviceState.get_from_id_and_query(device_id, query)
205@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
206async def get_device_setups() -> list[DeviceSetup]:
207 return DeviceSetup.get_all()
210@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
211async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
212 device_setup.insert()
213 return device_setup
216@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
217async def get_device_setup(device_setup_id: str):
218 device_setup = DeviceSetup.get_from_id(device_setup_id)
219 if device_setup is None:
220 raise HTTPException(
221 status_code=404,
222 detail="Device setup not found",
223 )
224 return device_setup
227@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
228async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
229 device_setup = DeviceSetup.get_from_id(device_setup_id)
230 if device_setup is None:
231 raise HTTPException(
232 status_code=404,
233 detail="Device setup not found",
234 )
235 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
236 return device_setup
239@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
240async def delete_device_setups(device_setup_id: str) -> bool:
241 device_setup = DeviceSetup.get_from_id(device_setup_id)
242 if device_setup is None:
243 raise HTTPException(
244 status_code=404,
245 detail="Device setup not found",
246 )
247 deleted = device_setup.delete()
248 return deleted
251@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
252async def get_number_samples(
253 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
254) -> list[TwinPadActivity]:
255 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
258@app.get("/signals", dependencies=[Depends(get_current_active_user)])
259async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
260 if "signal_id" not in query.sort_by:
261 query.sort_by += ",signal_id:1"
262 return Signal.response_from_query(query).to_dict(exclude={"device"})
265@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
266async def signals_names() -> list[str]:
267 return Signal.get_all_ids()
270@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
271async def signal_stats():
272 """
273 Returns signals stats
274 """
275 signal_statuses = Signal.get_all_statuses()
276 signal_ids = [signal["signal_id"] for signal in signal_statuses]
278 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids)
279 number_samples = sum(number_samples_by_signal_id.values())
281 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
283 number_signals = Signal.get_number_documents()
285 return {
286 "signal_data_size": signal_datasize(),
287 "number_signal_samples": number_samples,
288 "number_active_signals": number_active_signals,
289 "number_signals": number_signals,
290 }
293@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
294async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
295 return SignalSample.get_last_from_signal_ids(signal_ids)
298@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)])
299async def get_last_values_interest_window(
300 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0
301) -> list[SignalSample | None]:
302 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp)
305@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
306async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
307 return SignalSample.get_first_from_signal_ids(signal_ids)
310@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
311async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
312 return Signal.get_forcibility(signal_ids)
315@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
316async def get_signal(signal_id):
317 signal = Signal.get_from_signal_id(signal_id)
318 if not signal:
319 raise HTTPException(
320 status_code=404,
321 detail="Signal not found",
322 )
323 return signal.to_dict()
326@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
327async def update_signal(
328 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
329):
330 signal = Signal.get_from_signal_id(signal_id)
331 if not signal:
332 raise HTTPException(
333 status_code=404,
334 detail="Device not found",
335 )
336 result = await signal.send_command(signal_update, current_user)
337 if result.get("error", False) is True:
338 raise HTTPException(
339 status_code=result.get("status_code", 500),
340 detail=result.get("message", "An error has occurred"),
341 )
342 return result
345@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
346async def get_signal_data(
347 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
348) -> SignalData | None:
349 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
351 if number_samples_max is not None:
352 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
354 return signal_data
357@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
358async def get_last_value(signal_id) -> SignalSample:
359 sample = SignalSample.get_last_from_signal_id(signal_id)
360 if sample is None:
361 raise HTTPException(status_code=404, detail="No data")
362 return sample
365@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
366async def get_first_value(signal_id) -> SignalSample:
367 sample = SignalSample.get_first_from_signal_id(signal_id)
368 if sample is None:
369 raise HTTPException(status_code=404, detail="No data")
370 return sample
373@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
374async def get_signal_number_samples(signal_id):
375 signal = Signal.get_from_signal_id(signal_id)
376 if not signal:
377 raise HTTPException(
378 status_code=404,
379 detail="Device not found",
380 )
381 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
384@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
385async def get_signals_data(
386 signal_ids: list[str] = Query(default=[]),
387 number_samples_max: int = None,
388 min_timestamp: float = None,
389 max_timestamp: float = None,
390 interpolate_bounds: bool = True,
391) -> SignalsData | None:
392 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
393 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
395 signals_data = SignalsData.get_from_signal_ids(
396 signal_ids,
397 min_timestamp=min_timestamp,
398 max_timestamp=max_timestamp,
399 window_min_timestamp=min_timestamp,
400 window_max_timestamp=max_timestamp,
401 interpolate_bounds=interpolate_bounds,
402 )
403 if number_samples_max is not None:
404 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
406 return signals_data
409@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
410async def get_signals_data_interest_window(
411 window_max_number_samples: int = 600,
412 outside_max_number_samples: int = 150,
413 window_min_timestamp: float = None,
414 window_max_timestamp: float = None,
415 signal_ids: list[str] = Query(default=[]),
416 min_timestamp: float = None,
417 max_timestamp: float = None,
418) -> SignalsData | None:
419 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
420 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
422 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
423 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
425 signals_data = SignalsData.get_from_signal_ids(
426 signal_ids,
427 min_timestamp=min_timestamp,
428 max_timestamp=max_timestamp,
429 window_min_timestamp=window_min_timestamp,
430 window_max_timestamp=window_max_timestamp,
431 max_documents=10 * (window_max_number_samples + outside_max_number_samples),
432 )
434 signals_data = signals_data.interest_window_desampling(
435 window_max_number_samples=window_max_number_samples,
436 outside_max_number_samples=outside_max_number_samples,
437 window_min_timestamp=window_min_timestamp,
438 window_max_timestamp=window_max_timestamp,
439 )
441 return signals_data
444@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
445async def export_signals_zip(
446 file_format: str,
447 signal_ids: list[str] = Query(default=[]),
448 min_timestamp: float = None,
449 max_timestamp: float = None,
450):
451 signals_data = SignalsData.get_from_signal_ids(
452 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
453 )
454 zip_data = signals_data.zip_export(file_format)
455 return Response(
456 content=zip_data,
457 media_type="application/zip",
458 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
459 )
462@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
463async def export_signals_hdf5(
464 signal_ids: list[str] = Query(default=[]),
465 min_timestamp: float = None,
466 max_timestamp: float = None,
467):
468 signals_data = SignalsData.get_from_signal_ids(
469 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
470 )
471 data = signals_data.hdf5_export()
472 return Response(
473 content=data,
474 media_type="application/hdf5",
475 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
476 )
479@app.get("/events", dependencies=[Depends(get_current_active_user)])
480async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
481 return Event.response_from_query(query)
484@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
485async def get_event(event_id) -> Event:
486 event = Event.get_from_id(event_id)
487 if event is None:
488 raise HTTPException(status_code=404, detail="No such event")
489 return event
492@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
493async def get_number_events(
494 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
495) -> list[TwinPadActivity]:
496 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
499@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
500async def get_number_commands(
501 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
502) -> list[TwinPadActivity]:
503 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
506@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
507async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
508 return EventRule.response_from_query(query)
511@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
512async def get_event_rule(event_rule_id) -> EventRule:
513 event_rule = EventRule.get_from_id(event_rule_id)
514 if event_rule is None:
515 raise HTTPException(status_code=404, detail="No such event rule")
516 return event_rule
519@app.post("/users", status_code=201)
520async def create_user(user: User):
521 if User.get_one_by_attribute("email", user.email) is not None:
522 raise HTTPException(status_code=400, detail="An error occurred during account creation")
523 hashed_password = get_password_hash(user.password)
524 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
525 if new_user is None:
526 raise HTTPException(status_code=400, detail="An error occurred during account creation")
527 return new_user
530@app.post("/token", status_code=201)
531async def login_for_access_token(
532 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
533) -> Token:
534 user = authenticate_user(form_data.username, form_data.password)
535 if not user:
536 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
537 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
538 if user.is_active:
539 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
540 access_token = create_access_token(
541 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
542 )
543 return Token(access_token=access_token, token_type="bearer")
546@app.get("/users", dependencies=[Depends(get_current_active_user)])
547async def get_users():
548 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
551@app.get("/users/me", response_model=User)
552async def get_current_user(
553 current_user: Annotated[User, Depends(get_current_active_user)],
554):
555 del current_user.password
556 return current_user
559@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
560async def get_user(user_id: str):
561 user = User.get_from_id(user_id)
563 if user is None:
564 raise HTTPException(
565 status_code=404,
566 detail="User not found",
567 )
568 return user.to_dict(exclude={"password", "is_connected"})
571@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
572async def patch_user(user: UserUpdate, user_id):
573 if user.password == "" or user.password is None:
574 del user.password
575 else:
576 user.password = get_password_hash(user.password)
577 return User.update_info(user, user_id)
580@app.get("/commands", dependencies=[Depends(get_current_active_user)])
581async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
582 return Command.response_from_query(query)
585@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
586async def get_campaigns():
587 return Campaign.get_all()
590@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
591async def get_campaign_by_id(campaign_id: str):
592 campaign = Campaign.get_from_id(campaign_id)
593 if campaign is None:
594 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
595 return campaign
598@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
599async def add_campaign(campaign: Campaign):
600 new_campaign = Campaign.create(campaign)
601 if new_campaign is None:
602 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
603 return new_campaign
606@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
607async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
608 campaign = Campaign.get_from_id(campaign_id)
609 if campaign is None:
610 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
611 campaign.name = edit_campaign.name
612 campaign.description = edit_campaign.description
613 return Campaign.update(campaign)
616@app.delete(
617 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
618)
619async def delete_campaign(campaign_id: str):
620 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
621 campaign = Campaign.get_from_id(campaign_id)
622 if campaign is None:
623 raise exception
624 delete_phases = Phase.deleteMany(campaign_id)
625 if not delete_phases.acknowledged:
626 raise exception
627 campaign_deleted = Campaign.delete(campaign_id)
628 return campaign_deleted.acknowledged
631@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
632async def get_campaign_phases(campaign_id: str):
633 return Phase.get_by_attribute("campaign_id", campaign_id)
636@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
637async def get_phase(phase_id: str):
638 phase = Phase.get_from_id(phase_id)
639 if phase is None:
640 raise HTTPException(status_code=404, detail="Phase not found")
641 return phase
644@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
645async def add_phase(phase: Phase):
646 new_phase = Phase.create(phase)
647 if new_phase is None:
648 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
649 return new_phase
652@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
653async def edit_phase(phase_id, edit_phase: Phase):
654 phase = Phase.get_from_id(phase_id)
655 if phase is None:
656 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
657 phase.name = edit_phase.name
658 phase.description = edit_phase.description
659 phase.start_at = edit_phase.start_at
660 phase.end_at = edit_phase.end_at
661 return Phase.update(phase)
664@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
665async def delete_phase(phase_id: str):
666 phase = Phase.get_from_id(phase_id)
667 if phase is None:
668 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
669 phase_deleted = Phase.delete(phase_id)
670 return phase_deleted.acknowledged
673@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
674async def get_custom_views():
675 return CustomView.get_all()
678@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
679async def get_custom_views_from_user_id(user_id: str):
680 return CustomView.get_by_attribute("user_id", user_id)
683@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
684async def get_custom_view(custom_view_id: str):
685 return CustomView.get_from_id(custom_view_id)
688@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
689async def create_custom_view(
690 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
691):
692 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
693 custom_view.insert()
694 return custom_view
697@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
698async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
699 custom_view = CustomView.get_from_id(custom_view_id)
700 return custom_view.update(custom_view_update.model_dump())
703@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
704async def delete_custom_view(custom_view_id: str):
705 custom_view = CustomView.get_from_id(custom_view_id)
706 return custom_view.delete()
709@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
710async def add_video(video: Video):
711 video.insert()
712 if not video:
713 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
714 return video
717@app.get("/videos", dependencies=[Depends(get_current_active_user)])
718async def get_videos():
719 return Video.get_all()
722@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
723def get_stream(video_id):
724 camera_name = Video.get_video(video_id)
725 if camera_name is None:
726 raise HTTPException(status_code=404, detail="Camera not found")
727 return camera_name
730@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
731async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
732 return SignalsPreset.get_by_attribute("user_id", current_user.id)
735@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
736async def create_signals_preset(
737 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
738):
739 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
740 return new_signals_preset
743@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
744async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
745 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
746 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
749@app.delete(
750 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
751)
752async def delete_signals_preset(signals_preset_id: str):
753 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
754 return signals_preset.delete()
757@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
758async def create_graph_theme(
759 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
760):
761 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
762 if styled_signal is None:
763 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
765 graph_theme = PrivateGraphTheme.create(
766 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
767 )
768 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
771@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
772async def get_all_graph_themes(
773 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
774) -> ListResponse[PublicGraphTheme]:
775 return PublicGraphTheme.response_from_query(query, current_user.id)
778@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
779async def get_graph_themes_in_library(
780 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
781) -> ListResponse[PublicGraphTheme]:
782 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
785@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
786async def update_graph_theme(
787 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
788):
789 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
790 update_dict = theme_update.model_dump(exclude_unset=True)
791 if current_user.id != graph_theme.creator_id:
792 for theme_property in update_dict.keys():
793 if theme_property not in ["active_for_user", "in_user_library"]:
794 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
795 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
796 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
799@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
800async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
801 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
802 if current_user.id != graph_theme.creator_id:
803 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
804 return graph_theme.delete()
807@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
808async def get_signals_appearances(
809 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
810) -> dict:
811 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)