Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/api.py: 97%
419 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-27 13:40 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-27 13:40 +0000
1import os
2import logging
3import time
4from tempfile import mkdtemp
5from typing import Annotated
6from datetime import timedelta
7from pathlib import Path
8from pyinstrument import Profiler
10from fastapi import FastAPI, HTTPException, Depends, Query, Response, Request
11from fastapi.middleware.cors import CORSMiddleware
12from fastapi.responses import HTMLResponse
13from fastapi.security import OAuth2PasswordRequestForm
15from twinpad_backend import __version__
16from twinpad_backend.db import signal_datasize, get_signals_ids_from_collection_names
17from twinpad_backend.models import (
18 Signal,
19 SignalData,
20 SignalSample,
21 ServicesStatus,
22 Device,
23 DeviceUpdate,
24 DeviceSetup,
25 DeviceSetupUpdate,
26 DeviceState,
27 SignalUpdate,
28 SignalsData,
29 Event,
30 EventRule,
31 TwinPadActivity,
32 User,
33 UserUpdate,
34 Campaign,
35 Phase,
36 CustomView,
37 Command,
38 CustomViewCreation,
39 CustomViewUpdate,
40 Video,
41 SignalsPreset,
42 SignalsPresetCreation,
43 SignalsPresetUpdate,
44 PrivateGraphTheme,
45 PublicGraphTheme,
46 GraphThemeCreation,
47 GraphThemeUpdate,
48 Configuration,
49)
50from twinpad_backend.auth import (
51 Token,
52 authenticate_user,
53 get_current_active_user,
54 ACCESS_TOKEN_EXPIRE_MINUTES,
55 create_access_token,
56 get_password_hash,
57)
58from twinpad_backend.queries import (
59 SignalQuery,
60 DeviceStatesQuery,
61 EventQuery,
62 EventRuleQuery,
63 CommandQuery,
64 GraphThemeQuery,
65 ConfigurationQuery,
66)
67from twinpad_backend.responses import ListResponse
69REQUEST_TIME_WARNING = 0.5
71DEBUG = os.environ.get("DEBUG", "false") == "true"
72PROFILING = os.environ.get("PROFILING", "false") == "true"
74logger = logging.getLogger("uvicorn.error")
75logger.propagate = False
76logger.info("Debug mode: %s", DEBUG)
77logger.info("log level: %s", logging.root.level)
80app = FastAPI(title="Twinpad backend", version=__version__)
82app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
84if PROFILING: # pragma: no cover
85 profiling_folder = mkdtemp()
86 logger.info("Profiling enabled")
88 @app.middleware("http")
89 async def profile_request(request: Request, call_next):
90 should_profile = True
91 url = str(request.url)
92 for segment in ("profiling", ".ico"):
93 if segment in url:
94 should_profile = False
95 break
97 if should_profile: # avoid recursion
98 profiler = Profiler()
99 profiler.start()
100 await call_next(request)
101 profiler.stop()
102 url = "_".join(url.split("/")[3:]).rstrip("/")
103 if not url:
104 url = "slash"
105 # filename = f"{round(time.time())}_{url}"
106 filename = url.split("?", maxsplit=1)[0]
107 logger.info("saving profiling to %s", filename)
108 with open(os.path.join(profiling_folder, filename), "w", encoding="utf-8") as profiling_file:
109 profiling_file.write(profiler.output_html())
110 return await call_next(request)
112 @app.get("/profilings")
113 async def profilings():
114 return {"profilings": os.listdir(profiling_folder)}
116 @app.get("/profilings/{profiling_id}")
117 async def profiling(profiling_id):
119 filename = os.path.join(profiling_folder, profiling_id)
120 if os.path.exists(filename):
121 with open(filename, "r", encoding="utf-8") as profiling:
122 return HTMLResponse(profiling.read())
123 raise HTTPException(
124 status_code=404,
125 detail="Profiling not found",
126 )
128 # app.mount("/profilings", StaticFiles(directory=profiling_folder, html=True), name="profilings")
131@app.middleware("http")
132async def log_request_time(request: Request, call_next):
133 start_time = time.time() # Record the start time
134 response = await call_next(request) # Process the request
135 duration = time.time() - start_time # Calculate the time taken
136 client_ip = request.headers.get("x-forwarded-for", request.client.host)
137 message = f"{client_ip} {request.method} {request.url.path} - {response.status_code} - {round(1000*duration)}ms"
138 if duration > REQUEST_TIME_WARNING:
139 logger.warning(message)
140 else:
141 logger.info(message)
142 return response
145@app.get("/")
146async def slash():
147 return {"twinpad_version": __version__}
150@app.get("/status", dependencies=[Depends(get_current_active_user)])
151async def status():
152 """
153 Return service healthcheck
154 """
155 return {
156 "services": ServicesStatus.check(),
157 }
160@app.get("/devices", dependencies=[Depends(get_current_active_user)])
161async def get_devices() -> list[Device]:
162 return Device.get_all(sort_by="device_id")
165@app.get("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
166async def get_device(device_id):
167 device = Device.get_one_by_attribute("device_id", device_id)
168 if not device:
169 raise HTTPException(
170 status_code=404,
171 detail="Device not found",
172 )
173 return device
176@app.patch("/devices/{device_id}", dependencies=[Depends(get_current_active_user)])
177async def update_item(
178 device_id: str, device_update: DeviceUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
179):
180 device = Device.get_one_by_attribute("device_id", device_id)
181 if not device:
182 raise HTTPException(
183 status_code=404,
184 detail="Device not found",
185 )
186 result = await device.change_mode(device_update, current_user)
187 if result.get("error", False) is True:
188 raise HTTPException(
189 status_code=result.get("status_code", 500),
190 detail=result.get("message", "An error has occurred"),
191 )
192 return result
195@app.get("/devices/{device_id}/states", dependencies=[Depends(get_current_active_user)])
196async def get_device_states(device_id: str, query: DeviceStatesQuery = Depends()) -> ListResponse[DeviceState]:
197 return DeviceState.get_from_id_and_query(device_id, query)
200@app.get("/device-setups", dependencies=[Depends(get_current_active_user)])
201async def get_device_setups() -> list[DeviceSetup]:
202 return DeviceSetup.get_all()
205@app.post("/device-setups", dependencies=[Depends(get_current_active_user)], status_code=201)
206async def create_device_setups(device_setup: DeviceSetup) -> DeviceSetup:
207 device_setup.insert()
208 return device_setup
211@app.get("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
212async def get_device_setup(device_setup_id: str):
213 device_setup = DeviceSetup.get_from_id(device_setup_id)
214 if device_setup is None:
215 raise HTTPException(
216 status_code=404,
217 detail="Device setup not found",
218 )
219 return device_setup
222@app.patch("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)])
223async def edit_device_setups(device_setup_id: str, device_setup_update: DeviceSetupUpdate) -> DeviceSetup:
224 device_setup = DeviceSetup.get_from_id(device_setup_id)
225 if device_setup is None:
226 raise HTTPException(
227 status_code=404,
228 detail="Device setup not found",
229 )
230 device_setup.update({k: v for k, v in device_setup_update.model_dump().items() if v is not None})
231 return device_setup
234@app.delete("/device-setups/{device_setup_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
235async def delete_device_setups(device_setup_id: str) -> bool:
236 device_setup = DeviceSetup.get_from_id(device_setup_id)
237 if device_setup is None:
238 raise HTTPException(
239 status_code=404,
240 detail="Device setup not found",
241 )
242 deleted = device_setup.delete()
243 return deleted
246@app.get("/number-samples", dependencies=[Depends(get_current_active_user)])
247async def get_number_samples(
248 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
249) -> list[TwinPadActivity]:
250 return TwinPadActivity.get_number_samples_timeframe(min_timestamp, max_timestamp, recompute_amount)
253@app.get("/signals", dependencies=[Depends(get_current_active_user)])
254async def route_get_signals(query: SignalQuery = Depends()) -> ListResponse[Signal]:
255 # profiler = Profiler()
256 # profiler.start()
258 res = Signal.response_from_query(query).to_dict(exclude={"device"})
260 # profiler.stop()
261 # filename = "get_signals.html"
262 # full_file_path = os.path.join(Path.home(), filename)
263 # logger.info("Saving profiling to %s", full_file_path)
264 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
265 # profiling_file.write(profiler.output_html())
266 return res
269@app.get("/signals/ids", dependencies=[Depends(get_current_active_user)])
270async def get_signals_names() -> list[str]:
271 return Signal.get_all_ids()
274@app.get("/signals/stats", dependencies=[Depends(get_current_active_user)])
275async def signal_stats():
276 """
277 Returns signals stats
278 """
279 # profiler = Profiler()
280 # profiler.start()
282 number_samples = 0
283 number_active_signals = 0
285 for s in Signal.get_all():
286 if s is not None:
287 number_samples += await s.number_samples()
288 if s.status.status == "up":
289 number_active_signals += 1
291 number_signals = Signal.get_number_documents()
293 # profiler.stop()
294 # filename = "signals_stats_profiling.html"
295 # full_file_path = os.path.join(Path.home(), filename)
296 # logger.info("Saving profiling to %s", full_file_path)
297 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
298 # profiling_file.write(profiler.output_html())
300 return {
301 "signal_data_size": signal_datasize(),
302 "number_signal_samples": number_samples,
303 "number_active_signals": number_active_signals,
304 "number_signals": number_signals,
305 }
308@app.get("/signals/last-value", dependencies=[Depends(get_current_active_user)])
309async def get_last_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
310 return SignalSample.get_last_from_signal_ids(signal_ids)
313@app.get("/signals/last-value/interest-window", dependencies=[Depends(get_current_active_user)])
314async def get_last_values_interest_window(
315 signal_ids: list[str] = Query(default=[]), min_timestamp: float = 0.0
316) -> list[SignalSample | None]:
317 # profiler = Profiler()
318 # profiler.start()
320 result = SignalSample.get_last_from_signal_ids_interest_window(signal_ids, min_timestamp)
322 # profiler.stop()
323 # filename = "fetch_last_point.html"
324 # full_file_path = os.path.join(Path.home(), filename)
325 # logger.info("Saving profiling to %s", full_file_path)
326 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
327 # profiling_file.write(profiler.output_html())
329 return result
332@app.get("/signals/first-value", dependencies=[Depends(get_current_active_user)])
333async def get_first_values(signal_ids: list[str] = Query(default=[])) -> list[SignalSample | None]:
334 return SignalSample.get_first_from_signal_ids(signal_ids)
337@app.get("/signals/forcibility", dependencies=[Depends(get_current_active_user)])
338async def get_signals_forcibility(signal_ids: list[str] = Query(default=[])) -> dict[str, bool]:
339 return Signal.get_forcibility(signal_ids)
342@app.get("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
343async def get_signal(signal_id):
344 signal = Signal.get_from_signal_id(signal_id)
345 if not signal:
346 raise HTTPException(
347 status_code=404,
348 detail="Signal not found",
349 )
350 return signal.to_dict()
353@app.patch("/signals/{signal_id}", dependencies=[Depends(get_current_active_user)])
354async def update_signal(
355 signal_id: str, signal_update: SignalUpdate, current_user: Annotated[User, Depends(get_current_active_user)]
356):
357 signal = Signal.get_from_signal_id(signal_id)
358 if not signal:
359 raise HTTPException(
360 status_code=404,
361 detail="Signal not found",
362 )
364 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
365 if device is None:
366 raise HTTPException(
367 status_code=400,
368 detail="Signal doesn't belong to an existing configuration",
369 )
371 result = await signal.send_command(device.device_id, signal_update, current_user)
372 if result.get("error", False) is True:
373 raise HTTPException(
374 status_code=result.get("status_code", 500),
375 detail=result.get("message", "An error has occurred"),
376 )
377 return result
380@app.get("/signals/{signal_id}/data", dependencies=[Depends(get_current_active_user)])
381async def get_signal_data(
382 signal_id: str, number_samples_max: int = None, min_timestamp: float = None, max_timestamp: float = None
383) -> SignalData | None:
384 signal_data = SignalData.get_from_signal_id(signal_id, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
386 if number_samples_max is not None:
387 signal_data = signal_data.uniform_desampling(number_samples_max=number_samples_max)
389 return signal_data
392@app.get("/signals/{signal_id}/last-value", dependencies=[Depends(get_current_active_user)])
393async def get_last_value(signal_id) -> SignalSample:
394 sample = SignalSample.get_last_from_signal_id(signal_id)
395 if sample is None:
396 raise HTTPException(status_code=404, detail="No data")
397 return sample
400@app.get("/signals/{signal_id}/first-value", dependencies=[Depends(get_current_active_user)])
401async def get_first_value(signal_id) -> SignalSample:
402 sample = SignalSample.get_first_from_signal_id(signal_id)
403 if sample is None:
404 raise HTTPException(status_code=404, detail="No data")
405 return sample
408@app.get("/signals/{signal_id}/number-samples", dependencies=[Depends(get_current_active_user)])
409async def get_signal_number_samples(signal_id):
410 signal = Signal.get_from_signal_id(signal_id)
411 if not signal:
412 raise HTTPException(
413 status_code=404,
414 detail="Device not found",
415 )
416 return {"signal_id": signal_id, "number_samples": await signal.number_samples(), "size": signal.sample_datasize()}
419@app.get("/signals-data", dependencies=[Depends(get_current_active_user)])
420async def get_signals_data(
421 signal_ids: list[str] = Query(default=[]),
422 number_samples_max: int = None,
423 min_timestamp: float = None,
424 max_timestamp: float = None,
425 interpolate_bounds: bool = True,
426) -> SignalsData | None:
427 # profiler = Profiler()
428 # profiler.start()
430 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
431 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
433 signals_data = SignalsData.get_from_signal_ids(
434 signal_ids,
435 min_timestamp=min_timestamp,
436 max_timestamp=max_timestamp,
437 window_min_timestamp=min_timestamp,
438 window_max_timestamp=max_timestamp,
439 interpolate_bounds=interpolate_bounds,
440 )
441 if number_samples_max is not None:
442 signals_data = signals_data.uniform_desampling(number_samples_max=number_samples_max)
444 # profiler.stop()
445 # filename = "signals-data.html"
446 # full_file_path = os.path.join(Path.home(), filename)
447 # logger.info(f"Saving profiling to %s", full_file_path)
448 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
449 # profiling_file.write(profiler.output_html())
451 return signals_data
454@app.get("/signals-data/interest-window", dependencies=[Depends(get_current_active_user)])
455async def get_signals_data_interest_window(
456 window_max_number_samples: int = 600,
457 outside_max_number_samples: int = 150,
458 window_min_timestamp: float = None,
459 window_max_timestamp: float = None,
460 signal_ids: list[str] = Query(default=[]),
461 min_timestamp: float = None,
462 max_timestamp: float = None,
463) -> SignalsData | None:
464 # profiler = Profiler()
465 # profiler.start()
467 if window_min_timestamp and window_max_timestamp and window_min_timestamp > window_max_timestamp:
468 raise HTTPException(status_code=400, detail="window_min_timestamp should be less than window_max_timestamp")
470 if min_timestamp and max_timestamp and min_timestamp > max_timestamp:
471 raise HTTPException(status_code=400, detail="min_timestamp should be less than max_timestamp")
473 signals_data = SignalsData.get_from_signal_ids(
474 signal_ids,
475 min_timestamp=min_timestamp,
476 max_timestamp=max_timestamp,
477 window_min_timestamp=window_min_timestamp,
478 window_max_timestamp=window_max_timestamp,
479 max_documents=10 * (window_max_number_samples + outside_max_number_samples),
480 )
482 signals_data = signals_data.interest_window_desampling(
483 window_max_number_samples=window_max_number_samples,
484 outside_max_number_samples=outside_max_number_samples,
485 window_min_timestamp=window_min_timestamp,
486 window_max_timestamp=window_max_timestamp,
487 )
489 # profiler.stop()
490 # filename = "signals-data.html"
491 # full_file_path = os.path.join(Path.home(), filename)
492 # logger.info(f"Saving profiling to %s", full_file_path)
493 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
494 # profiling_file.write(profiler.output_html())
496 return signals_data
499@app.get("/signals-data/export_zip", dependencies=[Depends(get_current_active_user)])
500async def export_signals_zip(
501 file_format: str,
502 signal_ids: list[str] = Query(default=[]),
503 min_timestamp: float = None,
504 max_timestamp: float = None,
505):
506 signals_data = SignalsData.get_from_signal_ids(
507 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
508 )
509 zip_data = signals_data.zip_export(file_format)
510 return Response(
511 content=zip_data,
512 media_type="application/zip",
513 headers={"Content-Disposition": 'attachment; filename="signals.zip"'},
514 )
517@app.get("/signals-data/export_hdf5", dependencies=[Depends(get_current_active_user)])
518async def export_signals_hdf5(
519 signal_ids: list[str] = Query(default=[]),
520 min_timestamp: float = None,
521 max_timestamp: float = None,
522):
523 signals_data = SignalsData.get_from_signal_ids(
524 signal_ids, min_timestamp=min_timestamp, max_timestamp=max_timestamp, interpolate_bounds=False
525 )
526 data = signals_data.hdf5_export()
527 return Response(
528 content=data,
529 media_type="application/hdf5",
530 headers={"Content-Disposition": 'attachment; filename="signals.hdf5"'},
531 )
534@app.get("/events", dependencies=[Depends(get_current_active_user)])
535async def get_events(query: EventQuery = Depends()) -> ListResponse[Event]:
536 return Event.response_from_query(query)
539@app.get("/events/{event_id}", dependencies=[Depends(get_current_active_user)])
540async def get_event(event_id) -> Event:
541 event = Event.get_from_id(event_id)
542 if event is None:
543 raise HTTPException(status_code=404, detail="No such event")
544 return event
547@app.get("/number-events", dependencies=[Depends(get_current_active_user)])
548async def get_number_events(
549 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
550) -> list[TwinPadActivity]:
551 return TwinPadActivity.get_number_events_timeframe(min_timestamp, max_timestamp, recompute_amount)
554@app.get("/number-commands", dependencies=[Depends(get_current_active_user)])
555async def get_number_commands(
556 min_timestamp: float | int, max_timestamp: float | int, recompute_amount: bool = False
557) -> list[TwinPadActivity]:
558 return TwinPadActivity.get_number_commands_timeframe(min_timestamp, max_timestamp, recompute_amount)
561@app.get("/event-rules", dependencies=[Depends(get_current_active_user)])
562async def get_event_rules(query: EventRuleQuery = Depends()) -> ListResponse[EventRule]:
563 return EventRule.response_from_query(query)
566@app.get("/event-rules/{event_rule_id}", dependencies=[Depends(get_current_active_user)])
567async def get_event_rule(event_rule_id) -> EventRule:
568 event_rule = EventRule.get_from_id(event_rule_id)
569 if event_rule is None:
570 raise HTTPException(status_code=404, detail="No such event rule")
571 return event_rule
574@app.post("/users", status_code=201)
575async def create_user(user: User):
576 if User.get_one_by_attribute("email", user.email) is not None:
577 raise HTTPException(status_code=400, detail="An error occurred during account creation")
578 hashed_password = get_password_hash(user.password)
579 new_user = User.create(user.firstname, user.lastname, user.email, hashed_password, user.is_admin | False)
580 if new_user is None:
581 raise HTTPException(status_code=400, detail="An error occurred during account creation")
582 return new_user
585@app.post("/token", status_code=201)
586async def login_for_access_token(
587 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
588) -> Token:
589 user = authenticate_user(form_data.username, form_data.password)
590 if not user:
591 raise HTTPException(status_code=401, detail="Bad Credentials", headers={"WWW-authenticate": "Bearer"})
592 access_token_expires = timedelta(minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES))
593 if user.is_active:
594 raise HTTPException(status_code=402, detail="User Blocked", headers={"WWW-authenticate": "Bearer"})
595 access_token = create_access_token(
596 data={"sub": user.email, "admin": user.is_admin}, expires_delta=access_token_expires
597 )
598 return Token(access_token=access_token, token_type="bearer")
601@app.get("/users", dependencies=[Depends(get_current_active_user)])
602async def get_users():
603 return [u.to_dict(exclude={"password"}) for u in User.get_all(sort_by="email")]
606@app.get("/users/me", response_model=User)
607async def get_current_user(
608 current_user: Annotated[User, Depends(get_current_active_user)],
609):
610 del current_user.password
611 return current_user
614@app.get("/users/{user_id}", dependencies=[Depends(get_current_active_user)])
615async def get_user(user_id: str):
616 user = User.get_from_id(user_id)
618 if user is None:
619 raise HTTPException(
620 status_code=404,
621 detail="User not found",
622 )
623 return user.to_dict(exclude={"password", "is_connected"})
626@app.patch("/users/{user_id}", response_model=UserUpdate, dependencies=[Depends(get_current_active_user)])
627async def patch_user(user: UserUpdate, user_id):
628 if user.password == "" or user.password is None:
629 del user.password
630 else:
631 user.password = get_password_hash(user.password)
632 return User.update_info(user, user_id)
635@app.get("/commands", dependencies=[Depends(get_current_active_user)])
636async def get_commands(query: CommandQuery = Depends()) -> ListResponse[Command]:
637 return Command.response_from_query(query)
640@app.get("/campaigns", response_model=list[Campaign], dependencies=[Depends(get_current_active_user)])
641async def get_campaigns():
642 return Campaign.get_all()
645@app.get("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
646async def get_campaign_by_id(campaign_id: str):
647 campaign = Campaign.get_from_id(campaign_id)
648 if campaign is None:
649 raise HTTPException(status_code=500, detail="An error occurred retrieving campaign")
650 return campaign
653@app.post("/campaigns", dependencies=[Depends(get_current_active_user)], status_code=201)
654async def add_campaign(campaign: Campaign):
655 new_campaign = Campaign.create(campaign)
656 if new_campaign is None:
657 raise HTTPException(status_code=500, detail="An error occurred during campaign creation")
658 return new_campaign
661@app.patch("/campaigns/{campaign_id}", response_model=Campaign, dependencies=[Depends(get_current_active_user)])
662async def edit_campaign(campaign_id: str, edit_campaign: Campaign):
663 campaign = Campaign.get_from_id(campaign_id)
664 if campaign is None:
665 raise HTTPException(status_code=500, detail="An error occurred during campaign edition")
666 campaign.name = edit_campaign.name
667 campaign.description = edit_campaign.description
668 return Campaign.update(campaign)
671@app.delete(
672 "/campaigns/{campaign_id}", response_model=bool, dependencies=[Depends(get_current_active_user)], status_code=200
673)
674async def delete_campaign(campaign_id: str):
675 exception = HTTPException(status_code=500, detail="An error occurred during campaign deletion")
676 campaign = Campaign.get_from_id(campaign_id)
677 if campaign is None:
678 raise exception
679 delete_phases = Phase.deleteMany(campaign_id)
680 if not delete_phases.acknowledged:
681 raise exception
682 campaign_deleted = Campaign.delete(campaign_id)
683 return campaign_deleted.acknowledged
686@app.get("/campaigns/{campaign_id}/phases", response_model=list[Phase], dependencies=[Depends(get_current_active_user)])
687async def get_campaign_phases(campaign_id: str):
688 return Phase.get_by_attribute("campaign_id", campaign_id)
691@app.get("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
692async def get_phase(phase_id: str):
693 phase = Phase.get_from_id(phase_id)
694 if phase is None:
695 raise HTTPException(status_code=404, detail="Phase not found")
696 return phase
699@app.post("/phases", dependencies=[Depends(get_current_active_user)], status_code=201)
700async def add_phase(phase: Phase):
701 new_phase = Phase.create(phase)
702 if new_phase is None:
703 raise HTTPException(status_code=500, detail="An error occurred during phase creation")
704 return new_phase
707@app.patch("/phases/{phase_id}", response_model=Phase, dependencies=[Depends(get_current_active_user)])
708async def edit_phase(phase_id, edit_phase: Phase):
709 phase = Phase.get_from_id(phase_id)
710 if phase is None:
711 raise HTTPException(status_code=500, detail="An error occurred during Phase edition")
712 phase.name = edit_phase.name
713 phase.description = edit_phase.description
714 phase.start_at = edit_phase.start_at
715 phase.end_at = edit_phase.end_at
716 return Phase.update(phase)
719@app.delete("/phases/{phase_id}", dependencies=[Depends(get_current_active_user)], status_code=200)
720async def delete_phase(phase_id: str):
721 phase = Phase.get_from_id(phase_id)
722 if phase is None:
723 raise HTTPException(status_code=500, detail="An error occurred during Phase deletion")
724 phase_deleted = Phase.delete(phase_id)
725 return phase_deleted.acknowledged
728@app.get("/custom-views", dependencies=[Depends(get_current_active_user)])
729async def get_custom_views():
730 return CustomView.get_all()
733@app.get("/users/{user_id}/custom-views", dependencies=[Depends(get_current_active_user)])
734async def get_custom_views_from_user_id(user_id: str):
735 return CustomView.get_by_attribute("user_id", user_id)
738@app.get("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
739async def get_custom_view(custom_view_id: str):
740 return CustomView.get_from_id(custom_view_id)
743@app.post("/custom-views", dependencies=[Depends(get_current_active_user)])
744async def create_custom_view(
745 custom_view_creation: CustomViewCreation, current_user: User = Depends(get_current_active_user)
746):
747 custom_view = CustomView(**custom_view_creation.to_dict(), user_id=current_user.id)
748 custom_view.insert()
749 return custom_view
752@app.patch("/custom-views/{custom_view_id}", dependencies=[Depends(get_current_active_user)])
753async def update_custom_views(custom_view_id: str, custom_view_update: CustomViewUpdate):
754 custom_view = CustomView.get_from_id(custom_view_id)
755 return custom_view.update(custom_view_update.model_dump())
758@app.delete("/custom-views/{custom_view_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
759async def delete_custom_view(custom_view_id: str):
760 custom_view = CustomView.get_from_id(custom_view_id)
761 return custom_view.delete()
764@app.post("/videos", response_model=Video, dependencies=[Depends(get_current_active_user)])
765async def add_video(video: Video):
766 video.insert()
767 if not video:
768 raise HTTPException(status_code=500, detail="An error occurred during cctv creation")
769 return video
772@app.get("/videos", dependencies=[Depends(get_current_active_user)])
773async def get_videos():
774 return Video.get_all()
777@app.get("/videos/{video_id}", dependencies=[Depends(get_current_active_user)])
778def get_stream(video_id):
779 camera_name = Video.get_video(video_id)
780 if camera_name is None:
781 raise HTTPException(status_code=404, detail="Camera not found")
782 return camera_name
785@app.get("/signals-presets", dependencies=[Depends(get_current_active_user)])
786async def get_signals_preset(current_user: User = Depends(get_current_active_user)):
787 return SignalsPreset.get_by_attribute("user_id", current_user.id)
790@app.post("/signals-presets", dependencies=[Depends(get_current_active_user)])
791async def create_signals_preset(
792 signals_preset: SignalsPresetCreation, current_user: User = Depends(get_current_active_user)
793):
794 new_signals_preset = SignalsPreset.create(signals_preset=signals_preset, user_id=current_user.id)
795 return new_signals_preset
798@app.patch("/signals-presets/{signals_preset_id}", dependencies=[Depends(get_current_active_user)])
799async def update_signals_preset(signals_preset_id: str, signals_preset_update: SignalsPresetUpdate):
800 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
801 return signals_preset.update(signals_preset_update.model_dump(exclude_unset=True))
804@app.delete(
805 "/signals-presets/{signals_preset_id}", response_model=bool, dependencies=[Depends(get_current_active_user)]
806)
807async def delete_signals_preset(signals_preset_id: str):
808 signals_preset = SignalsPreset.get_from_id(signals_preset_id)
809 return signals_preset.delete()
812@app.post("/graph-themes", dependencies=[Depends(get_current_active_user)])
813async def create_graph_theme(
814 graph_theme_creation: GraphThemeCreation, current_user: User = Depends(get_current_active_user)
815):
816 styled_signal = Signal.get_one_by_attribute("signal_id", graph_theme_creation.signal_id)
817 if styled_signal is None:
818 raise HTTPException(400, f"Signal ID '{graph_theme_creation.signal_id}' doesn't exist")
820 graph_theme = PrivateGraphTheme.create(
821 **graph_theme_creation.to_dict(exclude={"id": True}), creator_id=current_user.id
822 )
823 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
826@app.get("/graph-themes", dependencies=[Depends(get_current_active_user)])
827async def get_all_graph_themes(
828 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
829) -> ListResponse[PublicGraphTheme]:
830 return PublicGraphTheme.response_from_query(query, current_user.id)
833@app.get("/graph-themes/own", dependencies=[Depends(get_current_active_user)])
834async def get_graph_themes_in_library(
835 query: GraphThemeQuery = Depends(), current_user: User = Depends(get_current_active_user)
836) -> ListResponse[PublicGraphTheme]:
837 return PublicGraphTheme.response_from_query_in_user_library(query, current_user.id)
840@app.patch("/graph-themes/{theme_id}", dependencies=[Depends(get_current_active_user)])
841async def update_graph_theme(
842 theme_id: str, theme_update: GraphThemeUpdate, current_user: User = Depends(get_current_active_user)
843):
844 graph_theme = PrivateGraphTheme.get_from_id(theme_id)
845 update_dict = theme_update.model_dump(exclude_unset=True)
846 if current_user.id != graph_theme.creator_id:
847 for theme_property in update_dict.keys():
848 if theme_property not in ["active_for_user", "in_user_library"]:
849 raise HTTPException(401, "User is not allowed to edit a theme which wasn't created by him")
850 graph_theme.update(theme_update.model_dump(exclude_unset=True), current_user.id)
851 return PublicGraphTheme.get_from_id(graph_theme.id, current_user.id)
854@app.delete("/graph-themes/{graph_theme_id}", response_model=bool, dependencies=[Depends(get_current_active_user)])
855async def delete_graph_theme(graph_theme_id: str, current_user: User = Depends(get_current_active_user)):
856 graph_theme = PrivateGraphTheme.get_from_id(graph_theme_id)
857 if current_user.id != graph_theme.creator_id:
858 raise HTTPException(401, "User is not allowed to delete a theme which wasn't created by him")
859 return graph_theme.delete()
862@app.get("/signals-appearances", dependencies=[Depends(get_current_active_user)])
863async def get_signals_appearances(
864 signal_ids: list[str] = Query(default=[]), current_user: User = Depends(get_current_active_user)
865) -> dict:
866 return PublicGraphTheme.get_signal_appearances(signal_ids, current_user.id)
869@app.get("/configs", dependencies=[Depends(get_current_active_user)])
870async def get_configs(query: ConfigurationQuery = Depends()) -> ListResponse[Configuration]:
871 # profiler = Profiler()
872 # profiler.start()
874 # profiler.stop()
875 # filename = "configs.html"
876 # full_file_path = os.path.join(Path.home(), filename)
877 # logger.info("Saving profiling to %s", full_file_path)
878 # with open(full_file_path, "w", encoding="utf-8") as profiling_file:
879 # profiling_file.write(profiler.output_html())
880 return Configuration.response_from_query(query)
883@app.get("/configs/ids", dependencies=[Depends(get_current_active_user)])
884async def get_config_ids():
885 return Configuration.get_all_ids()
888@app.get("/configs/{config_id}", response_model=Configuration, dependencies=[Depends(get_current_active_user)])
889async def get_config(config_id: str):
890 return Configuration.get_from_config_id(config_id).to_dict(exclude={"petri_network", "pid", "event_rules", "modes"})