Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/queries.py: 99%
91 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 09:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 09:06 +0000
1from typing import get_args, ClassVar
2import datetime
4from pydantic import BaseModel, create_model
5from fastapi import HTTPException
7from twinpad_backend.models import Signal, Event, EventRule, DeviceState, Command, PublicGraphTheme
10REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in")}
13def api_operators_to_mongo(operator, value):
14 if operator in REST_API_OPERATORS_TO_MONGO:
15 return REST_API_OPERATORS_TO_MONGO[operator], value
17 if operator == "startswith":
18 return "$regex", f"^{value}"
20 if operator == "contains":
21 return "$regex", f".*{value}.*"
23 raise ValueError("Operator unsupported")
26def get_main_annotation(annotation):
27 for arg in get_args(annotation):
28 if arg is not None:
29 if arg is bool:
30 return strtobool
31 return arg
32 if annotation is bool:
33 return strtobool
34 return annotation
37def strtobool(value: str):
38 """Convert a string representation of truth to True or False.
39 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
40 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
41 'val' is anything else.
43 It was found here: https://stackoverflow.com/a/18472142 and copied from distutils, a now deprecated stdlib
44 """
45 value = value.strip().lower()
46 if value in {"y", "yes", "t", "true", "on", "1"}:
47 return True
48 if value in {"n", "no", "f", "false", "off", "0"}:
49 return False
50 raise HTTPException(status_code=400, detail=f"Invalid truth value: {value}")
53class Query(BaseModel):
54 limit: int | None = 20
55 offset: int | None = 0
56 sort_by: str | None = "id"
58 def mongodb_filter(self):
59 filters_by_field = {}
60 for field_name in self._base_class.model_fields.keys():
61 value = getattr(self, field_name)
62 if value is not None:
63 filters_by_field.setdefault(field_name, {})
64 for segment in value.split("|"):
65 if ":" in segment:
66 operator, value = segment.split(":")
67 else:
68 value = segment
69 operator = "eq"
71 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation)
72 if operator in set(["in"]): # operators that accepts list argument
73 # Parse list
74 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")]
75 else:
76 value = annotation(value)
77 if field_name == "timestamp":
78 value = datetime.datetime.fromtimestamp(value)
80 try:
81 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value)
82 except ValueError as e:
83 raise HTTPException(
84 status_code=400,
85 detail=f"Query Operator unsupported, got {operator}",
86 ) from e
88 # when it's a regex add to its content instead of replacing it
89 if mongo_operator == "$regex":
90 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value:
91 filters_by_field[field_name][mongo_operator] += mongo_value
92 else:
93 filters_by_field[field_name][mongo_operator] = mongo_value
95 filters_list = []
96 for field_name, field_filters in filters_by_field.items():
97 filters_list.append({field_name: field_filters})
98 if not filters_list:
99 return {}
100 if len(filters_list) == 1:
101 return filters_list[0]
102 return {"$and": filters_list}
105def create_query_model(model):
106 fields = {}
108 for field_name in model.model_fields.keys():
109 fields[field_name] = (str | None, None)
111 query_name = model.__name__ + "Query"
112 return create_model(query_name, **fields)
115class SignalQuery(create_query_model(Signal), Query):
116 _base_class: ClassVar[type] = Signal
117 sort_by: str | None = "signal_id"
120class DeviceStatesQuery(create_query_model(DeviceState), Query):
121 _base_class: ClassVar[type] = DeviceState
122 sort_by: str | None = "timestamp:-1"
123 modified_properties: str | None
126class EventQuery(create_query_model(Event), Query):
127 _base_class: ClassVar[type] = Event
128 sort_by: str | None = "timestamp"
131class EventRuleQuery(create_query_model(EventRule), Query):
132 _base_class: ClassVar[type] = EventRule
133 sort_by: str | None = "name"
136class CommandQuery(create_query_model(Command), Query):
137 _base_class: ClassVar[type] = Command
138 sort_by: str | None = "timestamp:-1"
141class GraphThemeQuery(create_query_model(PublicGraphTheme), Query):
142 _base_class: ClassVar[type] = PublicGraphTheme