Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 98%
406 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 15:25 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 15:25 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 SignalData,
20 SignalSample,
21 ServicesStatus,
22 Device,
23 DeviceUpdate,
24 DeviceSetup,
25 DeviceSetupUpdate,
26 DeviceState,
27 SignalUpdate,
28 SignalsData,
29 Event,
30 EventRule,
31 TwinPadActivity,
32 User,
33 UserUpdate,
34 Campaign,
35 Phase,
36 CustomView,
37 Command,
38 CustomViewCreation,
39 CustomViewUpdate,
40 Video,
41 SignalsPreset,
42 SignalsPresetCreation,
43 SignalsPresetUpdate,
44 PrivateGraphTheme,
45 PublicGraphTheme,
46 GraphThemeCreation,
47 GraphThemeUpdate,
48)
49from twinpad_backend.auth import (
50 Token,
51 authenticate_user,
52 get_current_active_user,
53 ACCESS_TOKEN_EXPIRE_MINUTES,
54 create_access_token,
55 get_password_hash,
56)
57from twinpad_backend.queries import (
58 SignalQuery,
59 DeviceStatesQuery,
60 EventQuery,
61 EventRuleQuery,
62 CommandQuery,
63 GraphThemeQuery,
64)
65from twinpad_backend.responses import ListResponse
67REQUEST_TIME_WARNING = 0.5
69DEBUG = os.environ.get("DEBUG", "false") == "true"
70PROFILING = os.environ.get("PROFILING", "false") == "true"
72logger = logging.getLogger("uvicorn.error")
73logger.propagate = False
74logger.info("Debug mode: %s", DEBUG)
75logger.info("log level: %s", logging.root.level)
78app = FastAPI(title="Twinpad backend", version=__version__)
80app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
82if PROFILING: # pragma: no cover
83 profiling_folder = mkdtemp()
84 logger.info("Profiling enabled")
86 @app.middleware("http")
87 async def profile_request(request: Request, call_next):
88 should_profile = True
89 url = str(request.url)
90 for segment in ("profiling", ".ico"):
91 if segment in url:
92 should_profile = False
93 break
95 if should_profile: # avoid recursion
96 profiler = Profiler()
97 profiler.start()
99 response = await call_next(request)
101 profiler.stop()
102 url = "_".join(url.split("/")[3:]).rstrip("/")
103 if not url:
104 url = "slash"
105 filename = url.split("?", maxsplit=1)[0]
106 logger.info("saving profiling to %s", filename)
107 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
108 profiling_file.write(profiler.output_html())
110 return response
112 return await call_next(request)
114 @app.get("/profilings")
115 async def profilings():
116 return {"profilings": os.listdir(profiling_folder)}
118 @app.get("/profilings/{file_name}")
119 async def profiling(file_name):
120 file_path = os.path.join(profiling_folder, file_name)
122 if not os.path.exists(file_path):
123 raise HTTPException(
124 status_code=404,
125 detail=f"Profiling file '{file_name}' not found",
126 )
128 with open(file_path, "r", encoding="utf-8") as profiling_file:
129 return Response(
130 content=profiling_file.read(),
131 media_type="application/html",
132 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
133 )
136@app.middleware("http")
137async def log_request_time(request: Request, call_next):
138 start_time = time.time() # Record the start time
139 response = await call_next(request) # Process the request
140 duration = time.time() - start_time # Calculate the time taken
141 client_ip = request.headers.get("x-forwarded-for", request.client.host)
142 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
143 if duration > REQUEST_TIME_WARNING:
144 logger.warning(message)
145 else:
146 logger.info(message)
147 return response
150@app.get("/")
151async def slash():
152 return {"twinpad_version": __version__}
155@app.get("/status", dependencies=[Depends(get_current_active_user)])
156async def status():
157 """
158 Return service healthcheck
159 """
160 return {
161 "services": ServicesStatus.check(),
162 }
165@app.get("/devices", dependencies=[Depends(get_current_active_user)])
166async def get_devices() -> list[Device]:
167 return Device.get_all(sort_by="device_id")
170@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
171async def get_device(device_id):
172 device = Device.get_one_by_attribute("device_id", device_id)
173 if not device:
174 raise HTTPException(
175 status_code=404,
176 detail="Device not found",
177 )
178 return device
181@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
182async def update_item(
183 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
184):
185 device = Device.get_one_by_attribute("device_id", device_id)
186 if not device:
187 raise HTTPException(
188 status_code=404,
189 detail="Device not found",
190 )
191 result = await device.change_mode(device_update, current_user)
192 if result.get("error", False) is True:
193 raise HTTPException(
194 status_code=result.get("status_code", 500),
195 detail=result.get("message", "An error has occurred"),
196 )
197 return result
200@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
201async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
202 return DeviceState.get_from_id_and_query(device_id, query)
205@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
206async def get_device_setups() -> list[DeviceSetup]:
207 return DeviceSetup.get_all()
210@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
211async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
212 device_setup.insert()
213 return device_setup
216@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
217async def get_device_setup(device_setup_id: str):
218 device_setup = DeviceSetup.get_from_id(device_setup_id)
219 if device_setup is None:
220 raise HTTPException(
221 status_code=404,
222 detail="Device setup not found",
223 )
224 return device_setup
227@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
228async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
229 device_setup = DeviceSetup.get_from_id(device_setup_id)
230 if device_setup is None:
231 raise HTTPException(
232 status_code=404,
233 detail="Device setup not found",
234 )
235 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
236 return device_setup
239@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
240async def delete_device_setups(device_setup_id: str) -> bool:
241 device_setup = DeviceSetup.get_from_id(device_setup_id)
242 if device_setup is None:
243 raise HTTPException(
244 status_code=404,
245 detail="Device setup not found",
246 )
247 deleted = device_setup.delete()
248 return deleted
251@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
252async def get_number_samples(
253 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
254) -> list[TwinPadActivity]:
255 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
258@app.get("/signals", dependencies=[Depends(get_current_active_user)])
259async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
260 if "signal_id" not in query.sort_by:
261 query.sort_by += ",signal_id:1"
262 return Signal.response_from_query(query).to_dict(exclude={"device"})
265@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
266async def signals_names() -> list[str]:
267 return Signal.get_all_ids()
270@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
271async def signal_stats():
272 """
273 Returns signals stats
274 """
275 signal_ids = Signal.get_all_ids()
277 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids)
278 number_samples = sum(number_samples_by_signal_id.values())
280 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
281 statuses_by_signal_ids = Signal.status_batch(signal_ids, devices_by_ids)
282 number_active_signals = sum(1 for _, status in statuses_by_signal_ids.items() if status.status == "up")
284 number_signals = Signal.get_number_documents()
286 return {
287 "signal_data_size": signal_datasize(),
288 "number_signal_samples": number_samples,
289 "number_active_signals": number_active_signals,
290 "number_signals": number_signals,
291 }
294@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
295async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
296 return SignalSample.get_last_from_signal_ids(signal_ids)
299@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)])
300async def get_last_values_interest_window(
301 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0
302) -> list[SignalSample | None]:
303 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp)
306@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
307async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
308 return SignalSample.get_first_from_signal_ids(signal_ids)
311@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
312async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
313 return Signal.get_forcibility(signal_ids)
316@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
317async def get_signal(signal_id):
318 signal = Signal.get_from_signal_id(signal_id)
319 if not signal:
320 raise HTTPException(
321 status_code=404,
322 detail="Signal not found",
323 )
324 return signal.to_dict()
327@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
328async def update_signal(
329 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
330):
331 signal = Signal.get_from_signal_id(signal_id)
332 if not signal:
333 raise HTTPException(
334 status_code=404,
335 detail="Device not found",
336 )
337 result = await signal.send_command(signal_update, current_user)
338 if result.get("error", False) is True:
339 raise HTTPException(
340 status_code=result.get("status_code", 500),
341 detail=result.get("message", "An error has occurred"),
342 )
343 return result
346@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
347async def get_signal_data(
348 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
349) -> SignalData | None:
350 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
352 if number_samples_max is not None:
353 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
355 return signal_data
358@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
359async def get_last_value(signal_id) -> SignalSample:
360 sample = SignalSample.get_last_from_signal_id(signal_id)
361 if sample is None:
362 raise HTTPException(status_code=404, detail="No data")
363 return sample
366@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
367async def get_first_value(signal_id) -> SignalSample:
368 sample = SignalSample.get_first_from_signal_id(signal_id)
369 if sample is None:
370 raise HTTPException(status_code=404, detail="No data")
371 return sample
374@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
375async def get_signal_number_samples(signal_id):
376 signal = Signal.get_from_signal_id(signal_id)
377 if not signal:
378 raise HTTPException(
379 status_code=404,
380 detail="Device not found",
381 )
382 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
385@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
386async def get_signals_data(
387 signal_ids: list[str] = Query(default=[]),
388 number_samples_max: int = None,
389 min_timestamp: float = None,
390 max_timestamp: float = None,
391 interpolate_bounds: bool = True,
392) -> SignalsData | None:
393 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
394 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
396 signals_data = SignalsData.get_from_signal_ids(
397 signal_ids,
398 min_timestamp=min_timestamp,
399 max_timestamp=max_timestamp,
400 window_min_timestamp=min_timestamp,
401 window_max_timestamp=max_timestamp,
402 interpolate_bounds=interpolate_bounds,
403 )
404 if number_samples_max is not None:
405 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
407 return signals_data
410@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
411async def get_signals_data_interest_window(
412 window_max_number_samples: int = 600,
413 outside_max_number_samples: int = 150,
414 window_min_timestamp: float = None,
415 window_max_timestamp: float = None,
416 signal_ids: list[str] = Query(default=[]),
417 min_timestamp: float = None,
418 max_timestamp: float = None,
419) -> SignalsData | None:
420 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
421 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
423 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
424 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
426 signals_data = SignalsData.get_from_signal_ids(
427 signal_ids,
428 min_timestamp=min_timestamp,
429 max_timestamp=max_timestamp,
430 window_min_timestamp=window_min_timestamp,
431 window_max_timestamp=window_max_timestamp,
432 max_documents=10 * (window_max_number_samples + outside_max_number_samples),
433 )
435 signals_data = signals_data.interest_window_desampling(
436 window_max_number_samples=window_max_number_samples,
437 outside_max_number_samples=outside_max_number_samples,
438 window_min_timestamp=window_min_timestamp,
439 window_max_timestamp=window_max_timestamp,
440 )
442 return signals_data
445@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
446async def export_signals_zip(
447 file_format: str,
448 signal_ids: list[str] = Query(default=[]),
449 min_timestamp: float = None,
450 max_timestamp: float = None,
451):
452 signals_data = SignalsData.get_from_signal_ids(
453 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
454 )
455 zip_data = signals_data.zip_export(file_format)
456 return Response(
457 content=zip_data,
458 media_type="application/zip",
459 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
460 )
463@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
464async def export_signals_hdf5(
465 signal_ids: list[str] = Query(default=[]),
466 min_timestamp: float = None,
467 max_timestamp: float = None,
468):
469 signals_data = SignalsData.get_from_signal_ids(
470 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
471 )
472 data = signals_data.hdf5_export()
473 return Response(
474 content=data,
475 media_type="application/hdf5",
476 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
477 )
480@app.get("/events", dependencies=[Depends(get_current_active_user)])
481async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
482 return Event.response_from_query(query)
485@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
486async def get_event(event_id) -> Event:
487 event = Event.get_from_id(event_id)
488 if event is None:
489 raise HTTPException(status_code=404, detail="No such event")
490 return event
493@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
494async def get_number_events(
495 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
496) -> list[TwinPadActivity]:
497 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
500@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
501async def get_number_commands(
502 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
503) -> list[TwinPadActivity]:
504 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
507@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
508async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
509 return EventRule.response_from_query(query)
512@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
513async def get_event_rule(event_rule_id) -> EventRule:
514 event_rule = EventRule.get_from_id(event_rule_id)
515 if event_rule is None:
516 raise HTTPException(status_code=404, detail="No such event rule")
517 return event_rule
520@app.post("/users", status_code=201)
521async def create_user(user: User):
522 if User.get_one_by_attribute("email", user.email) is not None:
523 raise HTTPException(status_code=400, detail="An error occurred during account creation")
524 hashed_password = get_password_hash(user.password)
525 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
526 if new_user is None:
527 raise HTTPException(status_code=400, detail="An error occurred during account creation")
528 return new_user
531@app.post("/token", status_code=201)
532async def login_for_access_token(
533 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
534) -> Token:
535 user = authenticate_user(form_data.username, form_data.password)
536 if not user:
537 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
538 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
539 if user.is_active:
540 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
541 access_token = create_access_token(
542 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
543 )
544 return Token(access_token=access_token, token_type="bearer")
547@app.get("/users", dependencies=[Depends(get_current_active_user)])
548async def get_users():
549 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
552@app.get("/users/me", response_model=User)
553async def get_current_user(
554 current_user: Annotated[User, Depends(get_current_active_user)],
555):
556 del current_user.password
557 return current_user
560@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
561async def get_user(user_id: str):
562 user = User.get_from_id(user_id)
564 if user is None:
565 raise HTTPException(
566 status_code=404,
567 detail="User not found",
568 )
569 return user.to_dict(exclude={"password", "is_connected"})
572@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
573async def patch_user(user: UserUpdate, user_id):
574 if user.password == "" or user.password is None:
575 del user.password
576 else:
577 user.password = get_password_hash(user.password)
578 return User.update_info(user, user_id)
581@app.get("/commands", dependencies=[Depends(get_current_active_user)])
582async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
583 return Command.response_from_query(query)
586@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
587async def get_campaigns():
588 return Campaign.get_all()
591@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
592async def get_campaign_by_id(campaign_id: str):
593 campaign = Campaign.get_from_id(campaign_id)
594 if campaign is None:
595 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
596 return campaign
599@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
600async def add_campaign(campaign: Campaign):
601 new_campaign = Campaign.create(campaign)
602 if new_campaign is None:
603 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
604 return new_campaign
607@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
608async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
609 campaign = Campaign.get_from_id(campaign_id)
610 if campaign is None:
611 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
612 campaign.name = edit_campaign.name
613 campaign.description = edit_campaign.description
614 return Campaign.update(campaign)
617@app.delete(
618 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
619)
620async def delete_campaign(campaign_id: str):
621 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
622 campaign = Campaign.get_from_id(campaign_id)
623 if campaign is None:
624 raise exception
625 delete_phases = Phase.deleteMany(campaign_id)
626 if not delete_phases.acknowledged:
627 raise exception
628 campaign_deleted = Campaign.delete(campaign_id)
629 return campaign_deleted.acknowledged
632@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
633async def get_campaign_phases(campaign_id: str):
634 return Phase.get_by_attribute("campaign_id", campaign_id)
637@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
638async def get_phase(phase_id: str):
639 phase = Phase.get_from_id(phase_id)
640 if phase is None:
641 raise HTTPException(status_code=404, detail="Phase not found")
642 return phase
645@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
646async def add_phase(phase: Phase):
647 new_phase = Phase.create(phase)
648 if new_phase is None:
649 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
650 return new_phase
653@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
654async def edit_phase(phase_id, edit_phase: Phase):
655 phase = Phase.get_from_id(phase_id)
656 if phase is None:
657 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
658 phase.name = edit_phase.name
659 phase.description = edit_phase.description
660 phase.start_at = edit_phase.start_at
661 phase.end_at = edit_phase.end_at
662 return Phase.update(phase)
665@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
666async def delete_phase(phase_id: str):
667 phase = Phase.get_from_id(phase_id)
668 if phase is None:
669 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
670 phase_deleted = Phase.delete(phase_id)
671 return phase_deleted.acknowledged
674@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
675async def get_custom_views():
676 return CustomView.get_all()
679@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
680async def get_custom_views_from_user_id(user_id: str):
681 return CustomView.get_by_attribute("user_id", user_id)
684@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
685async def get_custom_view(custom_view_id: str):
686 return CustomView.get_from_id(custom_view_id)
689@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
690async def create_custom_view(
691 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
692):
693 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
694 custom_view.insert()
695 return custom_view
698@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
699async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
700 custom_view = CustomView.get_from_id(custom_view_id)
701 return custom_view.update(custom_view_update.model_dump())
704@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
705async def delete_custom_view(custom_view_id: str):
706 custom_view = CustomView.get_from_id(custom_view_id)
707 return custom_view.delete()
710@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
711async def add_video(video: Video):
712 video.insert()
713 if not video:
714 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
715 return video
718@app.get("/videos", dependencies=[Depends(get_current_active_user)])
719async def get_videos():
720 return Video.get_all()
723@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
724def get_stream(video_id):
725 camera_name = Video.get_video(video_id)
726 if camera_name is None:
727 raise HTTPException(status_code=404, detail="Camera not found")
728 return camera_name
731@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
732async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
733 return SignalsPreset.get_by_attribute("user_id", current_user.id)
736@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
737async def create_signals_preset(
738 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
739):
740 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
741 return new_signals_preset
744@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
745async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
746 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
747 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
750@app.delete(
751 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
752)
753async def delete_signals_preset(signals_preset_id: str):
754 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
755 return signals_preset.delete()
758@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
759async def create_graph_theme(
760 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
761):
762 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
763 if styled_signal is None:
764 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
766 graph_theme = PrivateGraphTheme.create(
767 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
768 )
769 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
772@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
773async def get_all_graph_themes(
774 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
775) -> ListResponse[PublicGraphTheme]:
776 return PublicGraphTheme.response_from_query(query, current_user.id)
779@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
780async def get_graph_themes_in_library(
781 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
782) -> ListResponse[PublicGraphTheme]:
783 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
786@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
787async def update_graph_theme(
788 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
789):
790 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
791 update_dict = theme_update.model_dump(exclude_unset=True)
792 if current_user.id != graph_theme.creator_id:
793 for theme_property in update_dict.keys():
794 if theme_property not in ["active_for_user", "in_user_library"]:
795 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
796 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
797 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
800@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
801async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
802 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
803 if current_user.id != graph_theme.creator_id:
804 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
805 return graph_theme.delete()
808@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
809async def get_signals_appearances(
810 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
811) -> dict:
812 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)