Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 98%
406 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-23 08:30 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-23 08:30 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 SignalData,
20 SignalSample,
21 ServicesStatus,
22 Device,
23 DeviceUpdate,
24 DeviceSetup,
25 DeviceSetupUpdate,
26 DeviceState,
27 SignalUpdate,
28 SignalsData,
29 Event,
30 EventRule,
31 TwinPadActivity,
32 User,
33 UserUpdate,
34 Campaign,
35 Phase,
36 CustomView,
37 Command,
38 CustomViewCreation,
39 CustomViewUpdate,
40 Video,
41 SignalsPreset,
42 SignalsPresetCreation,
43 SignalsPresetUpdate,
44 PrivateGraphTheme,
45 PublicGraphTheme,
46 GraphThemeCreation,
47 GraphThemeUpdate,
48)
49from twinpad_backend.auth import (
50 Token,
51 authenticate_user,
52 get_current_active_user,
53 ACCESS_TOKEN_EXPIRE_MINUTES,
54 create_access_token,
55 get_password_hash,
56)
57from twinpad_backend.queries import (
58 SignalQuery,
59 DeviceStatesQuery,
60 EventQuery,
61 EventRuleQuery,
62 CommandQuery,
63 GraphThemeQuery,
64)
65from twinpad_backend.responses import ListResponse
67REQUEST_TIME_WARNING = 0.5
69DEBUG = os.environ.get("DEBUG", "false") == "true"
70PROFILING = os.environ.get("PROFILING", "false") == "true"
72logger = logging.getLogger("uvicorn.error")
73logger.propagate = False
74logger.info("Debug mode: %s", DEBUG)
75logger.info("log level: %s", logging.root.level)
78app = FastAPI(title="Twinpad backend", version=__version__)
80app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
82if PROFILING: # pragma: no cover
83 profiling_folder = mkdtemp()
84 logger.info("Profiling enabled")
86 @app.middleware("http")
87 async def profile_request(request: Request, call_next):
88 should_profile = True
89 url = str(request.url)
90 for segment in ("profiling", ".ico"):
91 if segment in url:
92 should_profile = False
93 break
95 if should_profile: # avoid recursion
96 profiler = Profiler()
97 profiler.start()
98 await call_next(request)
99 profiler.stop()
100 url = "_".join(url.split("/")[3:]).rstrip("/")
101 if not url:
102 url = "slash"
103 # filename = f"{round(time.time())}_{url}"
104 filename = url.split("?", maxsplit=1)[0]
105 logger.info("saving profiling to %s", filename)
106 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
107 profiling_file.write(profiler.output_html())
108 return await call_next(request)
110 @app.get("/profilings")
111 async def profilings():
112 return {"profilings": os.listdir(profiling_folder)}
114 @app.get("/profilings/{profiling_id}")
115 async def profiling(profiling_id):
117 filename = os.path.join(profiling_folder, profiling_id)
118 if os.path.exists(filename):
119 with open(filename, "r", encoding="utf-8") as profiling:
120 return HTMLResponse(profiling.read())
121 raise HTTPException(
122 status_code=404,
123 detail="Profiling not found",
124 )
126 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings")
129@app.middleware("http")
130async def log_request_time(request: Request, call_next):
131 start_time = time.time() # Record the start time
132 response = await call_next(request) # Process the request
133 duration = time.time() - start_time # Calculate the time taken
134 client_ip = request.headers.get("x-forwarded-for", request.client.host)
135 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
136 if duration > REQUEST_TIME_WARNING:
137 logger.warning(message)
138 else:
139 logger.info(message)
140 return response
143@app.get("/")
144async def slash():
145 return {"twinpad_version": __version__}
148@app.get("/status", dependencies=[Depends(get_current_active_user)])
149async def status():
150 """
151 Return service healthcheck
152 """
153 return {
154 "services": ServicesStatus.check(),
155 }
158@app.get("/devices", dependencies=[Depends(get_current_active_user)])
159async def get_devices() -> list[Device]:
160 return Device.get_all(sort_by="device_id")
163@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
164async def get_device(device_id):
165 device = Device.get_one_by_attribute("device_id", device_id)
166 if not device:
167 raise HTTPException(
168 status_code=404,
169 detail="Device not found",
170 )
171 return device
174@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
175async def update_item(
176 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
177):
178 device = Device.get_one_by_attribute("device_id", device_id)
179 if not device:
180 raise HTTPException(
181 status_code=404,
182 detail="Device not found",
183 )
184 result = await device.change_mode(device_update, current_user)
185 if result.get("error", False) is True:
186 raise HTTPException(
187 status_code=result.get("status_code", 500),
188 detail=result.get("message", "An error has occurred"),
189 )
190 return result
193@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
194async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
195 return DeviceState.get_from_id_and_query(device_id, query)
198@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
199async def get_device_setups() -> list[DeviceSetup]:
200 return DeviceSetup.get_all()
203@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
204async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
205 device_setup.insert()
206 return device_setup
209@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
210async def get_device_setup(device_setup_id: str):
211 device_setup = DeviceSetup.get_from_id(device_setup_id)
212 if device_setup is None:
213 raise HTTPException(
214 status_code=404,
215 detail="Device setup not found",
216 )
217 return device_setup
220@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
221async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
222 device_setup = DeviceSetup.get_from_id(device_setup_id)
223 if device_setup is None:
224 raise HTTPException(
225 status_code=404,
226 detail="Device setup not found",
227 )
228 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
229 return device_setup
232@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
233async def delete_device_setups(device_setup_id: str) -> bool:
234 device_setup = DeviceSetup.get_from_id(device_setup_id)
235 if device_setup is None:
236 raise HTTPException(
237 status_code=404,
238 detail="Device setup not found",
239 )
240 deleted = device_setup.delete()
241 return deleted
244@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
245async def get_number_samples(
246 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
247) -> list[TwinPadActivity]:
248 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
251@app.get("/signals", dependencies=[Depends(get_current_active_user)])
252async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
253 return Signal.response_from_query(query).to_dict(exclude={"device"})
256@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
257async def signals_names() -> list[str]:
258 return Signal.get_all_ids()
261@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
262async def signal_stats():
263 """
264 Returns signals stats
265 """
266 # profiler = Profiler()
267 # profiler.start()
269 number_samples = 0
270 number_active_signals = 0
271 signals = [Signal.get_from_signal_id(signal_id=sid) for sid in get_signals_ids_from_collection_names()]
273 for s in signals:
274 if s is not None:
275 number_samples += await s.number_samples()
276 if s.status.status == "up":
277 number_active_signals += 1
279 number_signals = Signal.get_number_documents()
281 # profiler.stop()
282 # filename = "signals_stats_profiling.html"
283 # full_file_path = os.path.join(Path.home(), filename)
284 # logger.info("Saving profiling to %s", full_file_path)
285 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
286 # profiling_file.write(profiler.output_html())
288 return {
289 "signal_data_size": signal_datasize(),
290 "number_signal_samples": number_samples,
291 "number_active_signals": number_active_signals,
292 "number_signals": number_signals,
293 }
296@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
297async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
298 return SignalSample.get_last_from_signal_ids(signal_ids)
301@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)])
302async def get_last_values_interest_window(
303 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0
304) -> list[SignalSample | None]:
305 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp)
308@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
309async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
310 return SignalSample.get_first_from_signal_ids(signal_ids)
313@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
314async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
315 return Signal.get_forcibility(signal_ids)
318@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
319async def get_signal(signal_id):
320 signal = Signal.get_from_signal_id(signal_id)
321 if not signal:
322 raise HTTPException(
323 status_code=404,
324 detail="Signal not found",
325 )
326 return signal.to_dict()
329@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
330async def update_signal(
331 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
332):
333 signal = Signal.get_from_signal_id(signal_id)
334 if not signal:
335 raise HTTPException(
336 status_code=404,
337 detail="Device not found",
338 )
339 result = await signal.send_command(signal_update, current_user)
340 if result.get("error", False) is True:
341 raise HTTPException(
342 status_code=result.get("status_code", 500),
343 detail=result.get("message", "An error has occurred"),
344 )
345 return result
348@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
349async def get_signal_data(
350 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
351) -> SignalData | None:
352 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
354 if number_samples_max is not None:
355 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
357 return signal_data
360@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
361async def get_last_value(signal_id) -> SignalSample:
362 sample = SignalSample.get_last_from_signal_id(signal_id)
363 if sample is None:
364 raise HTTPException(status_code=404, detail="No data")
365 return sample
368@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
369async def get_first_value(signal_id) -> SignalSample:
370 sample = SignalSample.get_first_from_signal_id(signal_id)
371 if sample is None:
372 raise HTTPException(status_code=404, detail="No data")
373 return sample
376@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
377async def get_signal_number_samples(signal_id):
378 signal = Signal.get_from_signal_id(signal_id)
379 if not signal:
380 raise HTTPException(
381 status_code=404,
382 detail="Device not found",
383 )
384 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
387@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
388async def get_signals_data(
389 signal_ids: list[str] = Query(default=[]),
390 number_samples_max: int = None,
391 min_timestamp: float = None,
392 max_timestamp: float = None,
393 interpolate_bounds: bool = True,
394) -> SignalsData | None:
395 # profiler = Profiler()
396 # profiler.start()
398 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
399 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
401 signals_data = SignalsData.get_from_signal_ids(
402 signal_ids,
403 min_timestamp=min_timestamp,
404 max_timestamp=max_timestamp,
405 window_min_timestamp=min_timestamp,
406 window_max_timestamp=max_timestamp,
407 interpolate_bounds=interpolate_bounds,
408 )
409 if number_samples_max is not None:
410 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
412 # profiler.stop()
413 # filename = "signals-data.html"
414 # full_file_path = os.path.join(Path.home(), filename)
415 # logger.info(f"Saving profiling to %s", full_file_path)
416 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
417 # profiling_file.write(profiler.output_html())
419 return signals_data
422@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
423async def get_signals_data_interest_window(
424 window_max_number_samples: int = 600,
425 outside_max_number_samples: int = 150,
426 window_min_timestamp: float = None,
427 window_max_timestamp: float = None,
428 signal_ids: list[str] = Query(default=[]),
429 min_timestamp: float = None,
430 max_timestamp: float = None,
431) -> SignalsData | None:
432 # profiler = Profiler()
433 # profiler.start()
435 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
436 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
438 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
439 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
441 signals_data = SignalsData.get_from_signal_ids(
442 signal_ids,
443 min_timestamp=min_timestamp,
444 max_timestamp=max_timestamp,
445 window_min_timestamp=window_min_timestamp,
446 window_max_timestamp=window_max_timestamp,
447 max_documents=10 * (window_max_number_samples + outside_max_number_samples),
448 )
450 signals_data = signals_data.interest_window_desampling(
451 window_max_number_samples=window_max_number_samples,
452 outside_max_number_samples=outside_max_number_samples,
453 window_min_timestamp=window_min_timestamp,
454 window_max_timestamp=window_max_timestamp,
455 )
457 # profiler.stop()
458 # filename = "signals-data.html"
459 # full_file_path = os.path.join(Path.home(), filename)
460 # logger.info(f"Saving profiling to %s", full_file_path)
461 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
462 # profiling_file.write(profiler.output_html())
464 return signals_data
467@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
468async def export_signals_zip(
469 file_format: str,
470 signal_ids: list[str] = Query(default=[]),
471 min_timestamp: float = None,
472 max_timestamp: float = None,
473):
474 signals_data = SignalsData.get_from_signal_ids(
475 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
476 )
477 zip_data = signals_data.zip_export(file_format)
478 return Response(
479 content=zip_data,
480 media_type="application/zip",
481 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
482 )
485@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
486async def export_signals_hdf5(
487 signal_ids: list[str] = Query(default=[]),
488 min_timestamp: float = None,
489 max_timestamp: float = None,
490):
491 signals_data = SignalsData.get_from_signal_ids(
492 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
493 )
494 data = signals_data.hdf5_export()
495 return Response(
496 content=data,
497 media_type="application/hdf5",
498 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
499 )
502@app.get("/events", dependencies=[Depends(get_current_active_user)])
503async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
504 return Event.response_from_query(query)
507@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
508async def get_event(event_id) -> Event:
509 event = Event.get_from_id(event_id)
510 if event is None:
511 raise HTTPException(status_code=404, detail="No such event")
512 return event
515@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
516async def get_number_events(
517 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
518) -> list[TwinPadActivity]:
519 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
522@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
523async def get_number_commands(
524 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
525) -> list[TwinPadActivity]:
526 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
529@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
530async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
531 return EventRule.response_from_query(query)
534@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
535async def get_event_rule(event_rule_id) -> EventRule:
536 event_rule = EventRule.get_from_id(event_rule_id)
537 if event_rule is None:
538 raise HTTPException(status_code=404, detail="No such event rule")
539 return event_rule
542@app.post("/users", status_code=201)
543async def create_user(user: User):
544 if User.get_one_by_attribute("email", user.email) is not None:
545 raise HTTPException(status_code=400, detail="An error occurred during account creation")
546 hashed_password = get_password_hash(user.password)
547 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
548 if new_user is None:
549 raise HTTPException(status_code=400, detail="An error occurred during account creation")
550 return new_user
553@app.post("/token", status_code=201)
554async def login_for_access_token(
555 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
556) -> Token:
557 user = authenticate_user(form_data.username, form_data.password)
558 if not user:
559 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
560 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
561 if user.is_active:
562 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
563 access_token = create_access_token(
564 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
565 )
566 return Token(access_token=access_token, token_type="bearer")
569@app.get("/users", dependencies=[Depends(get_current_active_user)])
570async def get_users():
571 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
574@app.get("/users/me", response_model=User)
575async def get_current_user(
576 current_user: Annotated[User, Depends(get_current_active_user)],
577):
578 del current_user.password
579 return current_user
582@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
583async def get_user(user_id: str):
584 user = User.get_from_id(user_id)
586 if user is None:
587 raise HTTPException(
588 status_code=404,
589 detail="User not found",
590 )
591 return user.to_dict(exclude={"password", "is_connected"})
594@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
595async def patch_user(user: UserUpdate, user_id):
596 if user.password == "" or user.password is None:
597 del user.password
598 else:
599 user.password = get_password_hash(user.password)
600 return User.update_info(user, user_id)
603@app.get("/commands", dependencies=[Depends(get_current_active_user)])
604async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
605 return Command.response_from_query(query)
608@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
609async def get_campaigns():
610 return Campaign.get_all()
613@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
614async def get_campaign_by_id(campaign_id: str):
615 campaign = Campaign.get_from_id(campaign_id)
616 if campaign is None:
617 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
618 return campaign
621@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
622async def add_campaign(campaign: Campaign):
623 new_campaign = Campaign.create(campaign)
624 if new_campaign is None:
625 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
626 return new_campaign
629@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
630async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
631 campaign = Campaign.get_from_id(campaign_id)
632 if campaign is None:
633 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
634 campaign.name = edit_campaign.name
635 campaign.description = edit_campaign.description
636 return Campaign.update(campaign)
639@app.delete(
640 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
641)
642async def delete_campaign(campaign_id: str):
643 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
644 campaign = Campaign.get_from_id(campaign_id)
645 if campaign is None:
646 raise exception
647 delete_phases = Phase.deleteMany(campaign_id)
648 if not delete_phases.acknowledged:
649 raise exception
650 campaign_deleted = Campaign.delete(campaign_id)
651 return campaign_deleted.acknowledged
654@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
655async def get_campaign_phases(campaign_id: str):
656 return Phase.get_by_attribute("campaign_id", campaign_id)
659@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
660async def get_phase(phase_id: str):
661 phase = Phase.get_from_id(phase_id)
662 if phase is None:
663 raise HTTPException(status_code=404, detail="Phase not found")
664 return phase
667@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
668async def add_phase(phase: Phase):
669 new_phase = Phase.create(phase)
670 if new_phase is None:
671 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
672 return new_phase
675@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
676async def edit_phase(phase_id, edit_phase: Phase):
677 phase = Phase.get_from_id(phase_id)
678 if phase is None:
679 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
680 phase.name = edit_phase.name
681 phase.description = edit_phase.description
682 phase.start_at = edit_phase.start_at
683 phase.end_at = edit_phase.end_at
684 return Phase.update(phase)
687@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
688async def delete_phase(phase_id: str):
689 phase = Phase.get_from_id(phase_id)
690 if phase is None:
691 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
692 phase_deleted = Phase.delete(phase_id)
693 return phase_deleted.acknowledged
696@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
697async def get_custom_views():
698 return CustomView.get_all()
701@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
702async def get_custom_views_from_user_id(user_id: str):
703 return CustomView.get_by_attribute("user_id", user_id)
706@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
707async def get_custom_view(custom_view_id: str):
708 return CustomView.get_from_id(custom_view_id)
711@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
712async def create_custom_view(
713 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
714):
715 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
716 custom_view.insert()
717 return custom_view
720@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
721async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
722 custom_view = CustomView.get_from_id(custom_view_id)
723 return custom_view.update(custom_view_update.model_dump())
726@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
727async def delete_custom_view(custom_view_id: str):
728 custom_view = CustomView.get_from_id(custom_view_id)
729 return custom_view.delete()
732@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
733async def add_video(video: Video):
734 video.insert()
735 if not video:
736 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
737 return video
740@app.get("/videos", dependencies=[Depends(get_current_active_user)])
741async def get_videos():
742 return Video.get_all()
745@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
746def get_stream(video_id):
747 camera_name = Video.get_video(video_id)
748 if camera_name is None:
749 raise HTTPException(status_code=404, detail="Camera not found")
750 return camera_name
753@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
754async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
755 return SignalsPreset.get_by_attribute("user_id", current_user.id)
758@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
759async def create_signals_preset(
760 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
761):
762 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
763 return new_signals_preset
766@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
767async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
768 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
769 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
772@app.delete(
773 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
774)
775async def delete_signals_preset(signals_preset_id: str):
776 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
777 return signals_preset.delete()
780@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
781async def create_graph_theme(
782 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
783):
784 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
785 if styled_signal is None:
786 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
788 graph_theme = PrivateGraphTheme.create(
789 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
790 )
791 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
794@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
795async def get_all_graph_themes(
796 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
797) -> ListResponse[PublicGraphTheme]:
798 return PublicGraphTheme.response_from_query(query, current_user.id)
801@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
802async def get_graph_themes_in_library(
803 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
804) -> ListResponse[PublicGraphTheme]:
805 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
808@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
809async def update_graph_theme(
810 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
811):
812 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
813 update_dict = theme_update.model_dump(exclude_unset=True)
814 if current_user.id != graph_theme.creator_id:
815 for theme_property in update_dict.keys():
816 if theme_property not in ["active_for_user", "in_user_library"]:
817 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
818 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
819 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
822@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
823async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
824 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
825 if current_user.id != graph_theme.creator_id:
826 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
827 return graph_theme.delete()
830@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
831async def get_signals_appearances(
832 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
833) -> dict:
834 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)