Coverage for / usr / local / lib / python3.11 / site-packages / twinpad_backend / api.py: 98%
422 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-20 06:48 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-20 06:48 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 ForcedSignal,
20 SignalData,
21 SignalSample,
22 ServicesStatus,
23 Device,
24 DeviceUpdate,
25 DeviceSetup,
26 DeviceSetupUpdate,
27 DeviceState,
28 SignalUpdate,
29 SignalsData,
30 Event,
31 EventRule,
32 TwinPadActivity,
33 User,
34 UserUpdate,
35 Campaign,
36 Phase,
37 CustomView,
38 Command,
39 CustomViewCreation,
40 CustomViewUpdate,
41 Video,
42 SignalsPreset,
43 SignalsPresetCreation,
44 SignalsPresetUpdate,
45 PrivateGraphTheme,
46 PublicGraphTheme,
47 GraphThemeCreation,
48 GraphThemeUpdate,
49)
50from twinpad_backend.auth import (
51 Token,
52 authenticate_user,
53 get_current_active_user,
54 ACCESS_TOKEN_EXPIRE_MINUTES,
55 create_access_token,
56 get_password_hash,
57)
58from twinpad_backend.queries import (
59 SignalQuery,
60 ForcedSignalQuery,
61 DeviceStatesQuery,
62 EventQuery,
63 EventRuleQuery,
64 CommandQuery,
65 GraphThemeQuery,
66)
67from twinpad_backend.responses import ListResponse
69REQUEST_TIME_WARNING = 0.5
71DEBUG = os.environ.get("DEBUG", "false") == "true"
72PROFILING = os.environ.get("PROFILING", "false") == "true"
74logger = logging.getLogger("uvicorn.error")
75logger.propagate = False
76logger.info("Debug mode: %s", DEBUG)
77logger.info("log level: %s", logging.root.level)
80app = FastAPI(title="Twinpad backend", version=__version__)
82app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
84if PROFILING: # pragma: no cover
85 profiling_folder = mkdtemp()
86 logger.info("Profiling enabled")
88 @app.middleware("http")
89 async def profile_request(request: Request, call_next):
90 should_profile = True
91 url = str(request.url)
92 for segment in ("profiling", ".ico"):
93 if segment in url:
94 should_profile = False
95 break
97 if should_profile: # avoid recursion
98 profiler = Profiler()
99 profiler.start()
101 response = await call_next(request)
103 profiler.stop()
104 url = "_".join(url.split("/")[3:]).rstrip("/")
105 if not url:
106 url = "slash"
107 filename = url.split("?", maxsplit=1)[0]
108 logger.info("saving profiling to %s", filename)
109 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
110 profiling_file.write(profiler.output_html())
112 return response
114 return await call_next(request)
116 @app.get("/profilings")
117 async def profilings():
118 return {"profilings": os.listdir(profiling_folder)}
120 @app.get("/profilings/{file_name}")
121 async def profiling(file_name):
122 file_path = os.path.join(profiling_folder, file_name)
124 if not os.path.exists(file_path):
125 raise HTTPException(
126 status_code=404,
127 detail=f"Profiling file '{file_name}' not found",
128 )
130 with open(file_path, "r", encoding="utf-8") as profiling_file:
131 return Response(
132 content=profiling_file.read(),
133 media_type="application/html",
134 headers={"Content-Disposition": f'attachment; filename="{file_name}_profiling.html"'},
135 )
138@app.middleware("http")
139async def log_request_time(request: Request, call_next):
140 start_time = time.time() # Record the start time
141 response = await call_next(request) # Process the request
142 duration = time.time() - start_time # Calculate the time taken
143 client_ip = request.headers.get("x-forwarded-for", request.client.host)
144 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
145 if duration > REQUEST_TIME_WARNING:
146 logger.warning(message)
147 else:
148 logger.info(message)
149 return response
152@app.get("/")
153async def slash():
154 return {"twinpad_version": __version__}
157@app.get("/status", dependencies=[Depends(get_current_active_user)])
158async def status():
159 """
160 Return service healthcheck
161 """
162 return {
163 "services": ServicesStatus.check(),
164 }
167@app.get("/devices", dependencies=[Depends(get_current_active_user)])
168async def get_devices() -> list[Device]:
169 return Device.get_all(sort_by="device_id")
172@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
173async def get_device(device_id):
174 device = Device.get_one_by_attribute("device_id", device_id)
175 if not device:
176 raise HTTPException(
177 status_code=404,
178 detail="Device not found",
179 )
180 return device
183@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
184async def update_item(
185 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
186):
187 device = Device.get_one_by_attribute("device_id", device_id)
188 if not device:
189 raise HTTPException(
190 status_code=404,
191 detail="Device not found",
192 )
193 result = await device.change_mode(device_update, current_user)
194 if result.get("error", False) is True:
195 raise HTTPException(
196 status_code=result.get("status_code", 500),
197 detail=result.get("message", "An error has occurred"),
198 )
199 return result
202@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
203async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
204 return DeviceState.get_from_id_and_query(device_id, query)
207@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
208async def get_device_setups() -> list[DeviceSetup]:
209 return DeviceSetup.get_all()
212@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
213async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
214 device_setup.insert()
215 return device_setup
218@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
219async def get_device_setup(device_setup_id: str):
220 device_setup = DeviceSetup.get_from_id(device_setup_id)
221 if device_setup is None:
222 raise HTTPException(
223 status_code=404,
224 detail="Device setup not found",
225 )
226 return device_setup
229@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
230async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
231 device_setup = DeviceSetup.get_from_id(device_setup_id)
232 if device_setup is None:
233 raise HTTPException(
234 status_code=404,
235 detail="Device setup not found",
236 )
237 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
238 return device_setup
241@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
242async def delete_device_setups(device_setup_id: str) -> bool:
243 device_setup = DeviceSetup.get_from_id(device_setup_id)
244 if device_setup is None:
245 raise HTTPException(
246 status_code=404,
247 detail="Device setup not found",
248 )
249 deleted = device_setup.delete()
250 return deleted
253@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
254async def get_number_samples(
255 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
256) -> list[TwinPadActivity]:
257 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
260@app.get("/signals", dependencies=[Depends(get_current_active_user)])
261async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
262 if "signal_id" not in query.sort_by:
263 query.sort_by += ",signal_id:1"
264 return Signal.response_from_query(query).to_dict(exclude={"device"})
267@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
268async def signals_names() -> list[str]:
269 return Signal.get_all_ids()
272@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
273async def signal_stats():
274 """
275 Returns signals stats
276 """
277 signal_statuses = Signal.get_all_statuses()
278 signal_ids = [signal["signal_id"] for signal in signal_statuses]
280 number_samples_by_signal_id = await Signal.number_samples_batch(signal_ids)
281 number_samples = sum(number_samples_by_signal_id.values())
283 number_active_signals = sum(1 for signal in signal_statuses if signal["status"] == "up")
285 number_signals = Signal.get_number_documents()
287 return {
288 "signal_data_size": signal_datasize(),
289 "number_signal_samples": number_samples,
290 "number_active_signals": number_active_signals,
291 "number_signals": number_signals,
292 }
295@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
296async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
297 return SignalSample.get_last_from_signal_ids(signal_ids)
300@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)])
301async def get_last_values_interest_window(
302 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0
303) -> list[SignalSample | None]:
304 return SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp)
307@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
308async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
309 return SignalSample.get_first_from_signal_ids(signal_ids)
312@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
313async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
314 return Signal.get_forcibility(signal_ids)
317@app.get("/signals/forced", response_model=ListResponse[ForcedSignal], dependencies=[Depends(get_current_active_user)])
318async def get_forced_signals(
319 current_user: Annotated[User, Depends(get_current_active_user)], query: ForcedSignalQuery = Depends()
320):
321 if not current_user.is_admin:
322 raise HTTPException(401)
323 return ForcedSignal.response_from_query(query)
326@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
327async def get_signal(signal_id):
328 signal = Signal.get_from_signal_id(signal_id)
329 if not signal:
330 raise HTTPException(
331 status_code=404,
332 detail="Signal not found",
333 )
334 return signal.to_dict()
337@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
338async def update_signal(
339 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
340):
341 signal = Signal.get_from_signal_id(signal_id)
342 if not signal:
343 raise HTTPException(
344 status_code=404,
345 detail="Device not found",
346 )
347 forced_signal = ForcedSignal.get_one_by_attribute("signal_id", signal_id)
348 if forced_signal is not None:
349 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
350 raise HTTPException(
351 status_code=403,
352 detail="Cannot override another user's forcing",
353 )
355 result = await signal.send_command(signal_update, current_user)
356 if result.get("error", False) is True:
357 raise HTTPException(
358 status_code=result.get("status_code", 500),
359 detail=result.get("message", "An error has occurred"),
360 )
362 if forced_signal is not None and signal_update.forced_value is None:
363 forced_signal.delete()
364 elif signal_update.forced_value is not None:
365 forced_signal = ForcedSignal(
366 signal_id=signal_id,
367 forcing_user_id=current_user.id,
368 forced_at=time.time(),
369 value=signal_update.forced_value,
370 )
371 forced_signal.insert()
373 return result
376@app.get("/signals/{signal_id}/can-force", dependencies=[Depends(get_current_active_user)])
377async def get_signal_forcibility(
378 signal_id: str, current_user: Annotated[User, Depends(get_current_active_user)]
379) -> bool:
380 return ForcedSignal.can_force(signal_id, current_user)
383@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
384async def get_signal_data(
385 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
386) -> SignalData | None:
387 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
389 if number_samples_max is not None:
390 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
392 return signal_data
395@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
396async def get_last_value(signal_id) -> SignalSample:
397 sample = SignalSample.get_last_from_signal_id(signal_id)
398 if sample is None:
399 raise HTTPException(status_code=404, detail="No data")
400 return sample
403@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
404async def get_first_value(signal_id) -> SignalSample:
405 sample = SignalSample.get_first_from_signal_id(signal_id)
406 if sample is None:
407 raise HTTPException(status_code=404, detail="No data")
408 return sample
411@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
412async def get_signal_number_samples(signal_id):
413 signal = Signal.get_from_signal_id(signal_id)
414 if not signal:
415 raise HTTPException(
416 status_code=404,
417 detail="Device not found",
418 )
419 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
422@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
423async def get_signals_data(
424 signal_ids: list[str] = Query(default=[]),
425 number_samples_max: int = None,
426 min_timestamp: float = None,
427 max_timestamp: float = None,
428 interpolate_bounds: bool = True,
429) -> SignalsData | None:
430 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
431 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
433 signals_data = SignalsData.get_from_signal_ids(
434 signal_ids,
435 min_timestamp=min_timestamp,
436 max_timestamp=max_timestamp,
437 window_min_timestamp=min_timestamp,
438 window_max_timestamp=max_timestamp,
439 interpolate_bounds=interpolate_bounds,
440 )
441 if number_samples_max is not None:
442 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
444 return signals_data
447@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
448async def get_signals_data_interest_window(
449 window_max_number_samples: int = 600,
450 outside_max_number_samples: int = 150,
451 window_min_timestamp: float = None,
452 window_max_timestamp: float = None,
453 signal_ids: list[str] = Query(default=[]),
454 min_timestamp: float = None,
455 max_timestamp: float = None,
456) -> SignalsData | None:
457 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
458 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
460 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
461 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
463 signals_data = SignalsData.get_from_signal_ids(
464 signal_ids,
465 min_timestamp=min_timestamp,
466 max_timestamp=max_timestamp,
467 window_min_timestamp=window_min_timestamp,
468 window_max_timestamp=window_max_timestamp,
469 max_documents=10 * (window_max_number_samples + outside_max_number_samples),
470 )
472 signals_data = signals_data.interest_window_desampling(
473 window_max_number_samples=window_max_number_samples,
474 outside_max_number_samples=outside_max_number_samples,
475 window_min_timestamp=window_min_timestamp,
476 window_max_timestamp=window_max_timestamp,
477 )
479 return signals_data
482@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
483async def export_signals_zip(
484 file_format: str,
485 signal_ids: list[str] = Query(default=[]),
486 min_timestamp: float = None,
487 max_timestamp: float = None,
488):
489 signals_data = SignalsData.get_from_signal_ids(
490 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
491 )
492 zip_data = signals_data.zip_export(file_format)
493 return Response(
494 content=zip_data,
495 media_type="application/zip",
496 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
497 )
500@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
501async def export_signals_hdf5(
502 signal_ids: list[str] = Query(default=[]),
503 min_timestamp: float = None,
504 max_timestamp: float = None,
505):
506 signals_data = SignalsData.get_from_signal_ids(
507 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
508 )
509 data = signals_data.hdf5_export()
510 return Response(
511 content=data,
512 media_type="application/hdf5",
513 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
514 )
517@app.get("/events", dependencies=[Depends(get_current_active_user)])
518async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
519 return Event.response_from_query(query)
522@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
523async def get_event(event_id) -> Event:
524 event = Event.get_from_id(event_id)
525 if event is None:
526 raise HTTPException(status_code=404, detail="No such event")
527 return event
530@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
531async def get_number_events(
532 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
533) -> list[TwinPadActivity]:
534 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
537@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
538async def get_number_commands(
539 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
540) -> list[TwinPadActivity]:
541 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
544@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
545async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
546 return EventRule.response_from_query(query)
549@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
550async def get_event_rule(event_rule_id) -> EventRule:
551 event_rule = EventRule.get_from_id(event_rule_id)
552 if event_rule is None:
553 raise HTTPException(status_code=404, detail="No such event rule")
554 return event_rule
557@app.post("/users", status_code=201)
558async def create_user(user: User):
559 if User.get_one_by_attribute("email", user.email) is not None:
560 raise HTTPException(status_code=400, detail="An error occurred during account creation")
561 hashed_password = get_password_hash(user.password)
562 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
563 if new_user is None:
564 raise HTTPException(status_code=400, detail="An error occurred during account creation")
565 return new_user
568@app.post("/token", status_code=201)
569async def login_for_access_token(
570 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
571) -> Token:
572 user = authenticate_user(form_data.username, form_data.password)
573 if not user:
574 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
575 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
576 if user.is_active:
577 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
578 access_token = create_access_token(
579 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
580 )
581 return Token(access_token=access_token, token_type="bearer")
584@app.get("/users", dependencies=[Depends(get_current_active_user)])
585async def get_users():
586 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
589@app.get("/users/me", response_model=User)
590async def get_current_user(
591 current_user: Annotated[User, Depends(get_current_active_user)],
592):
593 del current_user.password
594 return current_user
597@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
598async def get_user(user_id: str):
599 user = User.get_from_id(user_id)
601 if user is None:
602 raise HTTPException(
603 status_code=404,
604 detail="User not found",
605 )
606 return user.to_dict(exclude={"password", "is_connected"})
609@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
610async def patch_user(user: UserUpdate, user_id):
611 if user.password == "" or user.password is None:
612 del user.password
613 else:
614 user.password = get_password_hash(user.password)
615 return User.update_info(user, user_id)
618@app.get("/commands", dependencies=[Depends(get_current_active_user)])
619async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
620 return Command.response_from_query(query)
623@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
624async def get_campaigns():
625 return Campaign.get_all()
628@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
629async def get_campaign_by_id(campaign_id: str):
630 campaign = Campaign.get_from_id(campaign_id)
631 if campaign is None:
632 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
633 return campaign
636@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
637async def add_campaign(campaign: Campaign):
638 new_campaign = Campaign.create(campaign)
639 if new_campaign is None:
640 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
641 return new_campaign
644@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
645async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
646 campaign = Campaign.get_from_id(campaign_id)
647 if campaign is None:
648 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
649 campaign.name = edit_campaign.name
650 campaign.description = edit_campaign.description
651 return Campaign.update(campaign)
654@app.delete(
655 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
656)
657async def delete_campaign(campaign_id: str):
658 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
659 campaign = Campaign.get_from_id(campaign_id)
660 if campaign is None:
661 raise exception
662 delete_phases = Phase.deleteMany(campaign_id)
663 if not delete_phases.acknowledged:
664 raise exception
665 campaign_deleted = Campaign.delete(campaign_id)
666 return campaign_deleted.acknowledged
669@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
670async def get_campaign_phases(campaign_id: str):
671 return Phase.get_by_attribute("campaign_id", campaign_id)
674@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
675async def get_phase(phase_id: str):
676 phase = Phase.get_from_id(phase_id)
677 if phase is None:
678 raise HTTPException(status_code=404, detail="Phase not found")
679 return phase
682@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
683async def add_phase(phase: Phase):
684 new_phase = Phase.create(phase)
685 if new_phase is None:
686 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
687 return new_phase
690@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
691async def edit_phase(phase_id, edit_phase: Phase):
692 phase = Phase.get_from_id(phase_id)
693 if phase is None:
694 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
695 phase.name = edit_phase.name
696 phase.description = edit_phase.description
697 phase.start_at = edit_phase.start_at
698 phase.end_at = edit_phase.end_at
699 return Phase.update(phase)
702@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
703async def delete_phase(phase_id: str):
704 phase = Phase.get_from_id(phase_id)
705 if phase is None:
706 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
707 phase_deleted = Phase.delete(phase_id)
708 return phase_deleted.acknowledged
711@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
712async def get_custom_views():
713 return CustomView.get_all()
716@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
717async def get_custom_views_from_user_id(user_id: str):
718 return CustomView.get_by_attribute("user_id", user_id)
721@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
722async def get_custom_view(custom_view_id: str):
723 return CustomView.get_from_id(custom_view_id)
726@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
727async def create_custom_view(
728 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
729):
730 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
731 custom_view.insert()
732 return custom_view
735@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
736async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
737 custom_view = CustomView.get_from_id(custom_view_id)
738 return custom_view.update(custom_view_update.model_dump())
741@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
742async def delete_custom_view(custom_view_id: str):
743 custom_view = CustomView.get_from_id(custom_view_id)
744 return custom_view.delete()
747@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
748async def add_video(video: Video):
749 video.insert()
750 if not video:
751 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
752 return video
755@app.get("/videos", dependencies=[Depends(get_current_active_user)])
756async def get_videos():
757 return Video.get_all()
760@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
761def get_stream(video_id):
762 camera_name = Video.get_video(video_id)
763 if camera_name is None:
764 raise HTTPException(status_code=404, detail="Camera not found")
765 return camera_name
768@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
769async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
770 return SignalsPreset.get_by_attribute("user_id", current_user.id)
773@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
774async def create_signals_preset(
775 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
776):
777 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
778 return new_signals_preset
781@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
782async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
783 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
784 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
787@app.delete(
788 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
789)
790async def delete_signals_preset(signals_preset_id: str):
791 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
792 return signals_preset.delete()
795@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
796async def create_graph_theme(
797 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
798):
799 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
800 if styled_signal is None:
801 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
803 graph_theme = PrivateGraphTheme.create(
804 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
805 )
806 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
809@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
810async def get_all_graph_themes(
811 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
812) -> ListResponse[PublicGraphTheme]:
813 return PublicGraphTheme.response_from_query(query, current_user.id)
816@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
817async def get_graph_themes_in_library(
818 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
819) -> ListResponse[PublicGraphTheme]:
820 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
823@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
824async def update_graph_theme(
825 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
826):
827 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
828 update_dict = theme_update.model_dump(exclude_unset=True)
829 if current_user.id != graph_theme.creator_id:
830 for theme_property in update_dict.keys():
831 if theme_property not in ["active_for_user", "in_user_library"]:
832 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
833 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
834 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
837@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
838async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
839 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
840 if current_user.id != graph_theme.creator_id:
841 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
842 return graph_theme.delete()
845@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
846async def get_signals_appearances(
847 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
848) -> dict:
849 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)