Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 98%
407 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 14:06 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 14:06 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 SignalData,
20 SignalSample,
21 ServicesStatus,
22 Device,
23 DeviceUpdate,
24 DeviceSetup,
25 DeviceSetupUpdate,
26 DeviceState,
27 SignalUpdate,
28 SignalsData,
29 Event,
30 EventRule,
31 TwinPadActivity,
32 User,
33 UserUpdate,
34 Campaign,
35 Phase,
36 CustomView,
37 Command,
38 CustomViewCreation,
39 CustomViewUpdate,
40 Video,
41 SignalsPreset,
42 SignalsPresetCreation,
43 SignalsPresetUpdate,
44 PrivateGraphTheme,
45 PublicGraphTheme,
46 GraphThemeCreation,
47 GraphThemeUpdate,
48)
49from twinpad_backend.auth import (
50 Token,
51 authenticate_user,
52 get_current_active_user,
53 ACCESS_TOKEN_EXPIRE_MINUTES,
54 create_access_token,
55 get_password_hash,
56)
57from twinpad_backend.queries import (
58 SignalQuery,
59 DeviceStatesQuery,
60 EventQuery,
61 EventRuleQuery,
62 CommandQuery,
63 GraphThemeQuery,
64)
65from twinpad_backend.responses import ListResponse
67REQUEST_TIME_WARNING = 0.5
69DEBUG = os.environ.get("DEBUG", "false") == "true"
70PROFILING = os.environ.get("PROFILING", "false") == "true"
72logger = logging.getLogger("uvicorn.error")
73logger.propagate = False
74logger.info("Debug mode: %s", DEBUG)
75logger.info("log level: %s", logging.root.level)
78app = FastAPI(title="Twinpad backend", version=__version__)
80app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
82if PROFILING: # pragma: no cover
83 profiling_folder = mkdtemp()
84 logger.info("Profiling enabled")
86 @app.middleware("http")
87 async def profile_request(request: Request, call_next):
88 should_profile = True
89 url = str(request.url)
90 for segment in ("profiling", ".ico"):
91 if segment in url:
92 should_profile = False
93 break
95 if should_profile: # avoid recursion
96 profiler = Profiler()
97 profiler.start()
98 await call_next(request)
99 profiler.stop()
100 url = "_".join(url.split("/")[3:]).rstrip("/")
101 if not url:
102 url = "slash"
103 # filename = f"{round(time.time())}_{url}"
104 filename = url.split("?", maxsplit=1)[0]
105 logger.info("saving profiling to %s", filename)
106 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
107 profiling_file.write(profiler.output_html())
108 return await call_next(request)
110 @app.get("/profilings")
111 async def profilings():
112 return {"profilings": os.listdir(profiling_folder)}
114 @app.get("/profilings/{profiling_id}")
115 async def profiling(profiling_id):
117 filename = os.path.join(profiling_folder, profiling_id)
118 if os.path.exists(filename):
119 with open(filename, "r", encoding="utf-8") as profiling:
120 return HTMLResponse(profiling.read())
121 raise HTTPException(
122 status_code=404,
123 detail="Profiling not found",
124 )
126 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings")
129@app.middleware("http")
130async def log_request_time(request: Request, call_next):
131 start_time = time.time() # Record the start time
132 response = await call_next(request) # Process the request
133 duration = time.time() - start_time # Calculate the time taken
134 client_ip = request.headers.get("x-forwarded-for", request.client.host)
135 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
136 if duration > REQUEST_TIME_WARNING:
137 logger.warning(message)
138 else:
139 logger.info(message)
140 return response
143@app.get("/")
144async def slash():
145 return {"twinpad_version": __version__}
148@app.get("/status", dependencies=[Depends(get_current_active_user)])
149async def status():
150 """
151 Return service healthcheck
152 """
153 return {
154 "services": ServicesStatus.check(),
155 }
158@app.get("/devices", dependencies=[Depends(get_current_active_user)])
159async def get_devices() -> list[Device]:
160 return Device.get_all(sort_by="device_id")
163@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
164async def get_device(device_id):
165 device = Device.get_one_by_attribute("device_id", device_id)
166 if not device:
167 raise HTTPException(
168 status_code=404,
169 detail="Device not found",
170 )
171 return device
174@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
175async def update_item(
176 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
177):
178 device = Device.get_one_by_attribute("device_id", device_id)
179 if not device:
180 raise HTTPException(
181 status_code=404,
182 detail="Device not found",
183 )
184 result = await device.change_mode(device_update, current_user)
185 if result.get("error", False) is True:
186 raise HTTPException(
187 status_code=result.get("status_code", 500),
188 detail=result.get("message", "An error has occurred"),
189 )
190 return result
193@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
194async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
195 return DeviceState.get_from_id_and_query(device_id, query)
198@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
199async def get_device_setups() -> list[DeviceSetup]:
200 return DeviceSetup.get_all()
203@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
204async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
205 device_setup.insert()
206 return device_setup
209@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
210async def get_device_setup(device_setup_id: str):
211 device_setup = DeviceSetup.get_from_id(device_setup_id)
212 if device_setup is None:
213 raise HTTPException(
214 status_code=404,
215 detail="Device setup not found",
216 )
217 return device_setup
220@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
221async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
222 device_setup = DeviceSetup.get_from_id(device_setup_id)
223 if device_setup is None:
224 raise HTTPException(
225 status_code=404,
226 detail="Device setup not found",
227 )
228 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
229 return device_setup
232@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
233async def delete_device_setups(device_setup_id: str) -> bool:
234 device_setup = DeviceSetup.get_from_id(device_setup_id)
235 if device_setup is None:
236 raise HTTPException(
237 status_code=404,
238 detail="Device setup not found",
239 )
240 deleted = device_setup.delete()
241 return deleted
244@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
245async def get_number_samples(
246 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
247) -> list[TwinPadActivity]:
248 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
251@app.get("/signals", dependencies=[Depends(get_current_active_user)])
252async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
253 return Signal.response_from_query(query).to_dict(exclude={"device"})
256@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
257async def signals_names() -> list[str]:
258 return Signal.get_all_ids()
261@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
262async def signal_stats():
263 """
264 Returns signals stats
265 """
266 # profiler = Profiler()
267 # profiler.start()
269 number_samples = 0
270 number_active_signals = 0
271 signals = [Signal.get_from_signal_id(signal_id=sid) for sid in get_signals_ids_from_collection_names()]
273 for s in signals:
274 if s is not None:
275 number_samples += await s.number_samples()
276 if s.status.status == "up":
277 number_active_signals += 1
279 number_signals = Signal.get_number_documents()
281 # profiler.stop()
282 # filename = "signals_stats_profiling.html"
283 # full_file_path = os.path.join(Path.home(), filename)
284 # logger.info("Saving profiling to %s", full_file_path)
285 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
286 # profiling_file.write(profiler.output_html())
288 return {
289 "signal_data_size": signal_datasize(),
290 "number_signal_samples": number_samples,
291 "number_active_signals": number_active_signals,
292 "number_signals": number_signals,
293 }
296@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
297async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
298 return SignalSample.get_last_from_signal_ids(signal_ids)
301@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)])
302async def get_last_values_interest_window(
303 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0
304) -> list[SignalSample | None]:
305 # profiler = Profiler()
306 # profiler.start()
308 result = SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp)
310 # profiler.stop()
311 # filename = "fetch_last_point.html"
312 # full_file_path = os.path.join(Path.home(), filename)
313 # logger.info("Saving profiling to %s", full_file_path)
314 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
315 # profiling_file.write(profiler.output_html())
317 return result
320@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
321async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
322 return SignalSample.get_first_from_signal_ids(signal_ids)
325@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
326async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
327 return Signal.get_forcibility(signal_ids)
330@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
331async def get_signal(signal_id):
332 signal = Signal.get_from_signal_id(signal_id)
333 if not signal:
334 raise HTTPException(
335 status_code=404,
336 detail="Signal not found",
337 )
338 return signal.to_dict()
341@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
342async def update_signal(
343 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
344):
345 signal = Signal.get_from_signal_id(signal_id)
346 if not signal:
347 raise HTTPException(
348 status_code=404,
349 detail="Device not found",
350 )
351 result = await signal.send_command(signal_update, current_user)
352 if result.get("error", False) is True:
353 raise HTTPException(
354 status_code=result.get("status_code", 500),
355 detail=result.get("message", "An error has occurred"),
356 )
357 return result
360@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
361async def get_signal_data(
362 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
363) -> SignalData | None:
364 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
366 if number_samples_max is not None:
367 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
369 return signal_data
372@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
373async def get_last_value(signal_id) -> SignalSample:
374 sample = SignalSample.get_last_from_signal_id(signal_id)
375 if sample is None:
376 raise HTTPException(status_code=404, detail="No data")
377 return sample
380@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
381async def get_first_value(signal_id) -> SignalSample:
382 sample = SignalSample.get_first_from_signal_id(signal_id)
383 if sample is None:
384 raise HTTPException(status_code=404, detail="No data")
385 return sample
388@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
389async def get_signal_number_samples(signal_id):
390 signal = Signal.get_from_signal_id(signal_id)
391 if not signal:
392 raise HTTPException(
393 status_code=404,
394 detail="Device not found",
395 )
396 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
399@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
400async def get_signals_data(
401 signal_ids: list[str] = Query(default=[]),
402 number_samples_max: int = None,
403 min_timestamp: float = None,
404 max_timestamp: float = None,
405 interpolate_bounds: bool = True,
406) -> SignalsData | None:
407 # profiler = Profiler()
408 # profiler.start()
410 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
411 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
413 signals_data = SignalsData.get_from_signal_ids(
414 signal_ids,
415 min_timestamp=min_timestamp,
416 max_timestamp=max_timestamp,
417 window_min_timestamp=min_timestamp,
418 window_max_timestamp=max_timestamp,
419 interpolate_bounds=interpolate_bounds,
420 )
421 if number_samples_max is not None:
422 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
424 # profiler.stop()
425 # filename = "signals-data.html"
426 # full_file_path = os.path.join(Path.home(), filename)
427 # logger.info(f"Saving profiling to %s", full_file_path)
428 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
429 # profiling_file.write(profiler.output_html())
431 return signals_data
434@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
435async def get_signals_data_interest_window(
436 window_max_number_samples: int = 600,
437 outside_max_number_samples: int = 150,
438 window_min_timestamp: float = None,
439 window_max_timestamp: float = None,
440 signal_ids: list[str] = Query(default=[]),
441 min_timestamp: float = None,
442 max_timestamp: float = None,
443) -> SignalsData | None:
444 # profiler = Profiler()
445 # profiler.start()
447 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
448 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
450 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
451 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
453 signals_data = SignalsData.get_from_signal_ids(
454 signal_ids,
455 min_timestamp=min_timestamp,
456 max_timestamp=max_timestamp,
457 window_min_timestamp=window_min_timestamp,
458 window_max_timestamp=window_max_timestamp,
459 max_documents=10 * (window_max_number_samples + outside_max_number_samples),
460 )
462 signals_data = signals_data.interest_window_desampling(
463 window_max_number_samples=window_max_number_samples,
464 outside_max_number_samples=outside_max_number_samples,
465 window_min_timestamp=window_min_timestamp,
466 window_max_timestamp=window_max_timestamp,
467 )
469 # profiler.stop()
470 # filename = "signals-data.html"
471 # full_file_path = os.path.join(Path.home(), filename)
472 # logger.info(f"Saving profiling to %s", full_file_path)
473 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
474 # profiling_file.write(profiler.output_html())
476 return signals_data
479@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
480async def export_signals_zip(
481 file_format: str,
482 signal_ids: list[str] = Query(default=[]),
483 min_timestamp: float = None,
484 max_timestamp: float = None,
485):
486 signals_data = SignalsData.get_from_signal_ids(
487 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
488 )
489 zip_data = signals_data.zip_export(file_format)
490 return Response(
491 content=zip_data,
492 media_type="application/zip",
493 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
494 )
497@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
498async def export_signals_hdf5(
499 signal_ids: list[str] = Query(default=[]),
500 min_timestamp: float = None,
501 max_timestamp: float = None,
502):
503 signals_data = SignalsData.get_from_signal_ids(
504 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
505 )
506 data = signals_data.hdf5_export()
507 return Response(
508 content=data,
509 media_type="application/hdf5",
510 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
511 )
514@app.get("/events", dependencies=[Depends(get_current_active_user)])
515async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
516 return Event.response_from_query(query)
519@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
520async def get_event(event_id) -> Event:
521 event = Event.get_from_id(event_id)
522 if event is None:
523 raise HTTPException(status_code=404, detail="No such event")
524 return event
527@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
528async def get_number_events(
529 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
530) -> list[TwinPadActivity]:
531 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
534@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
535async def get_number_commands(
536 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
537) -> list[TwinPadActivity]:
538 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
541@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
542async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
543 return EventRule.response_from_query(query)
546@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
547async def get_event_rule(event_rule_id) -> EventRule:
548 event_rule = EventRule.get_from_id(event_rule_id)
549 if event_rule is None:
550 raise HTTPException(status_code=404, detail="No such event rule")
551 return event_rule
554@app.post("/users", status_code=201)
555async def create_user(user: User):
556 if User.get_one_by_attribute("email", user.email) is not None:
557 raise HTTPException(status_code=400, detail="An error occurred during account creation")
558 hashed_password = get_password_hash(user.password)
559 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
560 if new_user is None:
561 raise HTTPException(status_code=400, detail="An error occurred during account creation")
562 return new_user
565@app.post("/token", status_code=201)
566async def login_for_access_token(
567 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
568) -> Token:
569 user = authenticate_user(form_data.username, form_data.password)
570 if not user:
571 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
572 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
573 if user.is_active:
574 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
575 access_token = create_access_token(
576 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
577 )
578 return Token(access_token=access_token, token_type="bearer")
581@app.get("/users", dependencies=[Depends(get_current_active_user)])
582async def get_users():
583 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
586@app.get("/users/me", response_model=User)
587async def get_current_user(
588 current_user: Annotated[User, Depends(get_current_active_user)],
589):
590 del current_user.password
591 return current_user
594@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
595async def get_user(user_id: str):
596 user = User.get_from_id(user_id)
598 if user is None:
599 raise HTTPException(
600 status_code=404,
601 detail="User not found",
602 )
603 return user.to_dict(exclude={"password", "is_connected"})
606@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
607async def patch_user(user: UserUpdate, user_id):
608 if user.password == "" or user.password is None:
609 del user.password
610 else:
611 user.password = get_password_hash(user.password)
612 return User.update_info(user, user_id)
615@app.get("/commands", dependencies=[Depends(get_current_active_user)])
616async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
617 return Command.response_from_query(query)
620@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
621async def get_campaigns():
622 return Campaign.get_all()
625@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
626async def get_campaign_by_id(campaign_id: str):
627 campaign = Campaign.get_from_id(campaign_id)
628 if campaign is None:
629 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
630 return campaign
633@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
634async def add_campaign(campaign: Campaign):
635 new_campaign = Campaign.create(campaign)
636 if new_campaign is None:
637 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
638 return new_campaign
641@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
642async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
643 campaign = Campaign.get_from_id(campaign_id)
644 if campaign is None:
645 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
646 campaign.name = edit_campaign.name
647 campaign.description = edit_campaign.description
648 return Campaign.update(campaign)
651@app.delete(
652 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
653)
654async def delete_campaign(campaign_id: str):
655 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
656 campaign = Campaign.get_from_id(campaign_id)
657 if campaign is None:
658 raise exception
659 delete_phases = Phase.deleteMany(campaign_id)
660 if not delete_phases.acknowledged:
661 raise exception
662 campaign_deleted = Campaign.delete(campaign_id)
663 return campaign_deleted.acknowledged
666@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
667async def get_campaign_phases(campaign_id: str):
668 return Phase.get_by_attribute("campaign_id", campaign_id)
671@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
672async def get_phase(phase_id: str):
673 phase = Phase.get_from_id(phase_id)
674 if phase is None:
675 raise HTTPException(status_code=404, detail="Phase not found")
676 return phase
679@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
680async def add_phase(phase: Phase):
681 new_phase = Phase.create(phase)
682 if new_phase is None:
683 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
684 return new_phase
687@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
688async def edit_phase(phase_id, edit_phase: Phase):
689 phase = Phase.get_from_id(phase_id)
690 if phase is None:
691 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
692 phase.name = edit_phase.name
693 phase.description = edit_phase.description
694 phase.start_at = edit_phase.start_at
695 phase.end_at = edit_phase.end_at
696 return Phase.update(phase)
699@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
700async def delete_phase(phase_id: str):
701 phase = Phase.get_from_id(phase_id)
702 if phase is None:
703 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
704 phase_deleted = Phase.delete(phase_id)
705 return phase_deleted.acknowledged
708@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
709async def get_custom_views():
710 return CustomView.get_all()
713@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
714async def get_custom_views_from_user_id(user_id: str):
715 return CustomView.get_by_attribute("user_id", user_id)
718@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
719async def get_custom_view(custom_view_id: str):
720 return CustomView.get_from_id(custom_view_id)
723@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
724async def create_custom_view(
725 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
726):
727 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
728 custom_view.insert()
729 return custom_view
732@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
733async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
734 custom_view = CustomView.get_from_id(custom_view_id)
735 return custom_view.update(custom_view_update.model_dump())
738@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
739async def delete_custom_view(custom_view_id: str):
740 custom_view = CustomView.get_from_id(custom_view_id)
741 return custom_view.delete()
744@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
745async def add_video(video: Video):
746 video.insert()
747 if not video:
748 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
749 return video
752@app.get("/videos", dependencies=[Depends(get_current_active_user)])
753async def get_videos():
754 return Video.get_all()
757@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
758def get_stream(video_id):
759 camera_name = Video.get_video(video_id)
760 if camera_name is None:
761 raise HTTPException(status_code=404, detail="Camera not found")
762 return camera_name
765@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
766async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
767 return SignalsPreset.get_by_attribute("user_id", current_user.id)
770@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
771async def create_signals_preset(
772 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
773):
774 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
775 return new_signals_preset
778@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
779async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
780 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
781 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
784@app.delete(
785 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
786)
787async def delete_signals_preset(signals_preset_id: str):
788 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
789 return signals_preset.delete()
792@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
793async def create_graph_theme(
794 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
795):
796 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
797 if styled_signal is None:
798 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
800 graph_theme = PrivateGraphTheme.create(
801 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
802 )
803 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
806@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
807async def get_all_graph_themes(
808 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
809) -> ListResponse[PublicGraphTheme]:
810 return PublicGraphTheme.response_from_query(query, current_user.id)
813@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
814async def get_graph_themes_in_library(
815 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
816) -> ListResponse[PublicGraphTheme]:
817 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
820@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
821async def update_graph_theme(
822 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
823):
824 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
825 update_dict = theme_update.model_dump(exclude_unset=True)
826 if current_user.id != graph_theme.creator_id:
827 for theme_property in update_dict.keys():
828 if theme_property not in ["active_for_user", "in_user_library"]:
829 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
830 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
831 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
834@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
835async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
836 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
837 if current_user.id != graph_theme.creator_id:
838 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
839 return graph_theme.delete()
842@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
843async def get_signals_appearances(
844 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
845) -> dict:
846 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)