Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 81%
382 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 14:27 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 14:27 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 SignalData,
20 SignalSample,
21 ServicesStatus,
22 Device,
23 DeviceUpdate,
24 DeviceSetup,
25 DeviceSetupUpdate,
26 DeviceState,
27 SignalUpdate,
28 SignalsData,
29 Event,
30 EventRule,
31 EventDay,
32 User,
33 UserUpdate,
34 Campaign,
35 Phase,
36 CustomView,
37 Command,
38 CustomViewCreation,
39 CustomViewUpdate,
40 Video,
41)
42from twinpad_backend.auth import (
43 Token,
44 authenticate_user,
45 get_current_active_user,
46 ACCESS_TOKEN_EXPIRE_MINUTES,
47 create_access_token,
48 get_password_hash,
49)
50from twinpad_backend.queries import SignalQuery, DeviceStatesQuery, EventQuery, EventRuleQuery, CommandQuery
51from twinpad_backend.responses import ListResponse
53REQUEST_TIME_WARNING = 0.5
55DEBUG = os.environ.get("DEBUG", "false") == "true"
56PROFILING = os.environ.get("PROFILING", "false") == "true"
58logger = logging.getLogger("uvicorn.error")
59logger.propagate = False
60logger.info("Debug mode: %s", DEBUG)
61logger.info("log level: %s", logging.root.level)
64app = FastAPI(title="Twinpad backend", version=__version__)
66app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
68if PROFILING:
69 profiling_folder = mkdtemp()
70 logger.info("Profiling enabled")
72 @app.middleware("http")
73 async def profile_request(request: Request, call_next):
74 should_profile = True
75 url = str(request.url)
76 for segment in ["profiling", ".ico"]:
77 if segment in url:
78 should_profile = False
79 break
81 if should_profile: # avoid recursion
82 profiler = Profiler()
83 profiler.start()
84 await call_next(request)
85 profiler.stop()
86 url = "_".join(url.split("/")[3:]).rstrip("/")
87 if not url:
88 url = "slash"
89 # filename = f"{round(time.time())}_{url}"
90 filename = url.split("?", maxsplit=1)[0]
91 logger.info("saving profiling to %s", filename)
92 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
93 profiling_file.write(profiler.output_html())
94 return await call_next(request)
96 @app.get("/profilings")
97 async def profilings():
98 return {"profilings": os.listdir(profiling_folder)}
100 @app.get("/profilings/{profiling_id}")
101 async def profiling(profiling_id):
103 filename = os.path.join(profiling_folder, profiling_id)
104 if os.path.exists(filename):
105 with open(filename, "r", encoding="utf-8") as profiling:
106 return HTMLResponse(profiling.read())
107 raise HTTPException(
108 status_code=404,
109 detail="Profiling not found",
110 )
112 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings")
115@app.middleware("http")
116async def log_request_time(request: Request, call_next):
117 start_time = time.time() # Record the start time
118 response = await call_next(request) # Process the request
119 duration = time.time() - start_time # Calculate the time taken
120 client_ip = request.headers.get("x-forwarded-for", request.client.host)
121 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
122 if duration > REQUEST_TIME_WARNING:
123 logger.warning(message)
124 else:
125 logger.info(message)
126 return response
129@app.get("/")
130async def slash():
131 return {"twinpad_version": __version__}
134@app.get("/status", dependencies=[Depends(get_current_active_user)])
135async def status():
136 """
137 Return service healthcheck
138 """
139 return {
140 "services": ServicesStatus.check(),
141 }
144@app.get("/devices", dependencies=[Depends(get_current_active_user)])
145async def get_devices() -> list[Device]:
146 return Device.get_all(sort_by="device_id")
149@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
150async def get_device(device_id):
151 device = Device.get_one_by_attribute("device_id", device_id)
152 if not device:
153 raise HTTPException(
154 status_code=404,
155 detail="Device not found",
156 )
157 return device
160@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
161async def update_item(
162 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
163):
164 device = Device.get_one_by_attribute("device_id", device_id)
165 if not device:
166 raise HTTPException(
167 status_code=404,
168 detail="Device not found",
169 )
170 result = await device.update(device_update, current_user)
171 if result.get("error", False) is True:
172 raise HTTPException(
173 status_code=result.get("status_code", 500),
174 detail=result.get("message", "An error has occurred"),
175 )
176 return result
179@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
180async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
181 return DeviceState.get_from_id_and_query(device_id, query)
184@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
185async def get_device_setups() -> list[DeviceSetup]:
186 return DeviceSetup.get_all()
189@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
190async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
191 device_setup.insert()
192 return device_setup
195@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
196async def get_device_setup(device_setup_id: str):
197 device_setup = DeviceSetup.get_from_id(device_setup_id)
198 if device_setup is None:
199 raise HTTPException(
200 status_code=404,
201 detail="Device setup not found",
202 )
203 return device_setup
206@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
207async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
208 device_setup = DeviceSetup.get_from_id(device_setup_id)
209 if device_setup is None:
210 raise HTTPException(
211 status_code=404,
212 detail="Device setup not found",
213 )
214 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
215 return device_setup
218@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
219async def delete_device_setups(device_setup_id: str) -> bool:
220 device_setup = DeviceSetup.get_from_id(device_setup_id)
221 if device_setup is None:
222 raise HTTPException(
223 status_code=404,
224 detail="Device setup not found",
225 )
226 deleted = device_setup.delete()
227 return deleted
230@app.get("/signals", dependencies=[Depends(get_current_active_user)])
231async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
232 return Signal.response_from_query(query).to_dict(exclude={"device"})
235@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
236async def signals_names() -> list[str]:
237 return Signal.get_all_ids()
240@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
241async def signal_stats():
242 """
243 Returns signals stats
244 """
245 # profiler = Profiler()
246 # profiler.start()
248 number_samples = 0
249 number_active_signals = 0
250 signals = [Signal.get_from_signal_id(signal_id=sid) for sid in get_signals_ids_from_collection_names()]
252 for s in signals:
253 if s is not None:
254 number_samples += await s.number_samples()
255 if s.status.status == "up":
256 number_active_signals += 1
258 number_signals = Signal.get_number_documents()
260 # profiler.stop()
261 # filename = "signals_stats_profiling.html"
262 # full_file_path = os.path.join(Path.home(), filename)
263 # logger.info("Saving profiling to %s", full_file_path)
264 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
265 # profiling_file.write(profiler.output_html())
267 return {
268 "signal_data_size": signal_datasize(),
269 "number_signal_samples": number_samples,
270 "number_active_signals": number_active_signals,
271 "number_signals": number_signals,
272 }
275@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
276async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
277 return SignalSample.get_last_from_signal_ids(signal_ids)
280@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)])
281async def get_last_values_interest_window(
282 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0
283) -> list[SignalSample | None]:
284 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp)
287@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
288async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
289 return SignalSample.get_first_from_signal_ids(signal_ids)
292@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
293async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
294 return Signal.get_forcibility(signal_ids)
297@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
298async def get_signal(signal_id):
299 signal = Signal.get_from_signal_id(signal_id)
300 if not signal:
301 raise HTTPException(
302 status_code=404,
303 detail="Signal not found",
304 )
305 return signal.to_dict()
308@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
309async def update_signal(
310 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
311):
312 signal = Signal.get_from_signal_id(signal_id)
313 if not signal:
314 raise HTTPException(
315 status_code=404,
316 detail="Device not found",
317 )
318 result = await signal.update(signal_update, current_user)
319 if result.get("error", False) is True:
320 raise HTTPException(
321 status_code=result.get("status_code", 500),
322 detail=result.get("message", "An error has occurred"),
323 )
324 return result
327@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
328async def get_signal_data(
329 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
330) -> SignalData | None:
331 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
333 if number_samples_max is not None:
334 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
336 return signal_data
339@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
340async def get_last_value(signal_id) -> SignalSample:
341 sample = SignalSample.get_last_from_signal_id(signal_id)
342 if sample is None:
343 raise HTTPException(status_code=404, detail="No data")
344 return sample
347@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
348async def get_first_value(signal_id) -> SignalSample:
349 sample = SignalSample.get_first_from_signal_id(signal_id)
350 if sample is None:
351 raise HTTPException(status_code=404, detail="No data")
352 return sample
355@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
356async def get_signal_number_samples(signal_id):
357 signal = Signal.get_from_signal_id(signal_id)
358 if not signal:
359 raise HTTPException(
360 status_code=404,
361 detail="Device not found",
362 )
363 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
366@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
367async def get_signals_data(
368 signal_ids: list[str] = Query(default=[]),
369 number_samples_max: int = None,
370 min_timestamp: float = None,
371 max_timestamp: float = None,
372 interpolate_bounds: bool = True,
373) -> SignalsData | None:
374 # profiler = Profiler()
375 # profiler.start()
377 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
378 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
380 signals_data = SignalsData.get_from_signal_ids(
381 signal_ids,
382 min_timestamp=min_timestamp,
383 max_timestamp=max_timestamp,
384 window_min_timestamp=min_timestamp,
385 window_max_timestamp=max_timestamp,
386 interpolate_bounds=interpolate_bounds,
387 )
388 if number_samples_max is not None:
389 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
391 # profiler.stop()
392 # filename = "signals-data.html"
393 # full_file_path = os.path.join(Path.home(), filename)
394 # logger.info(f"Saving profiling to %s", full_file_path)
395 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
396 # profiling_file.write(profiler.output_html())
398 return signals_data
401@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
402async def get_signals_data_interest_window(
403 window_max_number_samples: int = 600,
404 outside_max_number_samples: int = 150,
405 window_min_timestamp: float = None,
406 window_max_timestamp: float = None,
407 signal_ids: list[str] = Query(default=[]),
408 min_timestamp: float = None,
409 max_timestamp: float = None,
410) -> SignalsData | None:
411 # profiler = Profiler()
412 # profiler.start()
414 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
415 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
417 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
418 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
420 signals_data = SignalsData.get_from_signal_ids(
421 signal_ids,
422 min_timestamp=min_timestamp,
423 max_timestamp=max_timestamp,
424 window_min_timestamp=window_min_timestamp,
425 window_max_timestamp=window_max_timestamp,
426 max_documents=10 * (window_max_number_samples + outside_max_number_samples),
427 )
429 signals_data = signals_data.interest_window_desampling(
430 window_max_number_samples=window_max_number_samples,
431 outside_max_number_samples=outside_max_number_samples,
432 window_min_timestamp=window_min_timestamp,
433 window_max_timestamp=window_max_timestamp,
434 )
436 # profiler.stop()
437 # filename = "signals-data.html"
438 # full_file_path = os.path.join(Path.home(), filename)
439 # logger.info(f"Saving profiling to %s", full_file_path)
440 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
441 # profiling_file.write(profiler.output_html())
443 return signals_data
446@app.get("/signals-data/export", dependencies=[Depends(get_current_active_user)])
447async def export_signals_zip(
448 file_format: str,
449 signal_ids: list[str] = Query(default=[]),
450 min_timestamp: float = None,
451 max_timestamp: float = None,
452):
453 signals_data = SignalsData.get_from_signal_ids(
454 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
455 )
456 zip_data = signals_data.zip_export(file_format)
457 return Response(
458 content=zip_data,
459 media_type="application/zip",
460 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
461 )
464@app.get("/events", dependencies=[Depends(get_current_active_user)])
465async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
466 return Event.response_from_query(query)
469@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
470async def get_event(event_id) -> Event:
471 event = Event.get_from_id(event_id)
472 if event is None:
473 raise HTTPException(status_code=404, detail="No such event")
474 return event
477@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
478async def get_number_events(
479 min_timestamp: float | int, max_timestamp: float | int, recompute_events: bool = False
480) -> list[EventDay]:
481 return EventDay.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_events)
484@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
485async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
486 return EventRule.response_from_query(query)
489@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
490async def get_event_rule(event_rule_id) -> EventRule:
491 event_rule = EventRule.get_from_id(event_rule_id)
492 if event_rule is None:
493 raise HTTPException(status_code=404, detail="No such event rule")
494 return event_rule
497@app.post("/users", status_code=201)
498async def create_user(user: User):
499 if User.get_one_by_attribute("user", user.email) is not None:
500 raise HTTPException(status_code=400, detail="An error occurred during account creation")
501 hashed_password = get_password_hash(user.password)
502 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
503 if new_user is None:
504 raise HTTPException(status_code=400, details="An error occurred during account creation")
505 return new_user
508@app.post("/token", status_code=201)
509async def login_for_access_token(
510 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
511) -> Token:
512 user = authenticate_user(form_data.username, form_data.password)
513 if not user:
514 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
515 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
516 if user.is_active:
517 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
518 access_token = create_access_token(
519 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
520 )
521 return Token(access_token=access_token, token_type="bearer")
524@app.get("/users", dependencies=[Depends(get_current_active_user)])
525async def get_users():
526 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
529@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
530async def get_user(user_id: str, current_user: Annotated[User, Depends(get_current_active_user)]):
531 user = None
533 if user_id == "me":
534 user = current_user
535 else:
536 user = User.get_from_id(user_id)
538 if user is None:
539 raise HTTPException(
540 status_code=404,
541 detail="User not found",
542 )
543 return user
546@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
547async def patch_user(user: UserUpdate, user_id):
548 if user.password == "" or user.password is None:
549 del user.password
550 else:
551 user.password = get_password_hash(user.password)
552 return User.update(user, user_id)
555@app.post("/users/me", response_model=User)
556async def read_users_me(
557 current_user: Annotated[User, Depends(get_current_active_user)],
558):
559 del (current_user.password, current_user.is_active, current_user.is_connected)
560 return current_user
563@app.get("/commands", dependencies=[Depends(get_current_active_user)])
564async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
565 return Command.response_from_query(query)
568@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
569async def get_campaigns():
570 return Campaign.get_all()
573@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
574async def get_campaign_by_id(campaign_id: str):
575 campaign = Campaign.get_from_id(campaign_id)
576 if campaign is None:
577 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
578 return campaign
581@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
582async def add_campaign(campaign: Campaign):
583 new_campaign = Campaign.create(campaign)
584 if new_campaign is None:
585 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
586 return new_campaign
589@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
590async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
591 campaign = Campaign.get_from_id(campaign_id)
592 if campaign is None:
593 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
594 campaign.name = edit_campaign.name
595 campaign.description = edit_campaign.description
596 return Campaign.update(campaign)
599@app.delete(
600 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
601)
602async def delete_campaign(campaign_id: str):
603 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
604 campaign = Campaign.get_from_id(campaign_id)
605 if campaign is None:
606 raise exception
607 delete_phases = Phase.deleteMany(campaign_id)
608 if not delete_phases.acknowledged:
609 raise exception
610 campaign_deleted = Campaign.delete(campaign_id)
611 return campaign_deleted.acknowledged
614@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
615async def get_campaign_phases(campaign_id: str):
616 return Phase.get_by_attribute("campaign_id", campaign_id)
619@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
620async def get_phase(phase_id: str):
621 return Phase.get_from_id(phase_id)
624@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
625async def add_phase(phase: Phase):
626 new_phase = Phase.create(phase)
627 if new_phase is None:
628 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
629 return new_phase
632@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
633async def edit_phase(phase_id, edit_phase: Phase):
634 phase = Phase.get_from_id(phase_id)
635 if phase is None:
636 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
637 phase.name = edit_phase.name
638 phase.description = edit_phase.description
639 phase.start_at = edit_phase.start_at
640 phase.end_at = edit_phase.end_at
641 return Phase.update(phase)
644@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
645async def delete_phase(phase_id: str):
646 phase = Phase.get_from_id(phase_id)
647 if phase is None:
648 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
649 phase_deleted = Phase.delete(phase_id)
650 return phase_deleted.acknowledged
653@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
654async def get_custom_views():
655 return CustomView.get_all()
658@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
659async def get_custom_views_from_user_id(user_id: str):
660 return CustomView.get_by_attribute("user_id", user_id)
663@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
664async def get_custom_view(custom_view_id: str):
665 return CustomView.get_from_id(custom_view_id)
668@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
669async def create_custom_view(
670 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
671):
672 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
673 custom_view.insert()
674 return custom_view
677@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
678async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
679 custom_view = CustomView.get_from_id(custom_view_id)
680 return custom_view.update(custom_view_update.model_dump())
683@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
684async def delete_custom_view(custom_view_id: str):
685 custom_view = CustomView.get_from_id(custom_view_id)
686 return custom_view.delete()
689@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
690async def add_video(video: Video):
691 video.insert()
692 if not video:
693 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
694 return video
697@app.get("/videos", dependencies=[Depends(get_current_active_user)])
698async def get_videos():
699 return Video.get_all()
702@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
703def get_stream(video_id):
704 camera_name = Video.get_video(video_id)
705 if not camera_name:
706 raise HTTPException(status_code=404, detail="Camera not found")
707 return camera_name