Coverage for /usr/local/lib/python3.14/site-packages/twinpad_backend/queries.py: 100%
99 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-09 13:43 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-09 13:43 +0000
1from typing import get_args, ClassVar
2import datetime
3import json
4import re
6from pydantic import BaseModel, create_model
7from fastapi import HTTPException
9from twinpad_backend.models import Signal, ForcedSignal, Event, EventRule, DeviceState, Command, PublicGraphTheme
11REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in", "not")}
14def api_operators_to_mongo(operator, value):
15 if operator in REST_API_OPERATORS_TO_MONGO:
16 return REST_API_OPERATORS_TO_MONGO[operator], value
18 if operator == "startswith":
19 return "$regex", f"^{value}"
21 if operator == "contains":
22 return "$regex", f".*{value}.*"
24 raise ValueError("Operator unsupported")
27def get_main_annotation(annotation):
28 for arg in get_args(annotation):
29 if arg is not None:
30 if arg is bool:
31 return strtobool
32 return arg
33 if annotation is bool:
34 return strtobool
35 return annotation
38def strtobool(value: str):
39 """Convert a string representation of truth to True or False.
40 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
41 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
42 'val' is anything else.
44 It was found here: https://stackoverflow.com/a/18472142 and copied from distutils, a now deprecated stdlib
45 """
46 value = value.strip().lower()
47 if value in {"y", "yes", "t", "true", "on", "1"}:
48 return True
49 if value in {"n", "no", "f", "false", "off", "0"}:
50 return False
51 raise HTTPException(status_code=400, detail=f"Invalid truth value: {value}")
54class Query(BaseModel):
55 limit: int | None = 20
56 offset: int | None = 0
57 sort_by: str | None = "id"
59 def mongodb_filter(self):
60 filters_by_field = {}
61 for field_name in self._base_class.model_fields.keys():
62 value = getattr(self, field_name)
63 if value is not None:
64 filters_by_field.setdefault(field_name, {})
65 for segment in value.split("|"):
66 if ":" in segment:
67 operator, value = segment.split(":", maxsplit=1)
68 else:
69 value = segment
70 operator = "eq"
72 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation)
73 if operator in set(["in"]): # operators that accepts list argument
74 # Parse list
75 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")]
76 elif re.match(r"\{.*\}", value):
77 value = json.loads(value)
78 else:
79 value = annotation(value)
80 if field_name == "timestamp":
81 value = datetime.datetime.fromtimestamp(value)
83 try:
84 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value)
85 except ValueError as e:
86 raise HTTPException(
87 status_code=400,
88 detail=f"Query Operator unsupported, got {operator}",
89 ) from e
91 # when it's a regex add to its content instead of replacing it
92 if mongo_operator == "$regex":
93 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value:
94 filters_by_field[field_name][mongo_operator] += mongo_value
95 else:
96 filters_by_field[field_name][mongo_operator] = mongo_value
98 filters_list = []
99 for field_name, field_filters in filters_by_field.items():
100 filters_list.append({field_name: field_filters})
101 if not filters_list:
102 return {}
103 if len(filters_list) == 1:
104 return filters_list[0]
105 return {"$and": filters_list}
108def create_query_model(model):
109 fields = {}
111 for field_name in model.model_fields.keys():
112 fields[field_name] = (str | None, None)
114 query_name = model.__name__ + "Query"
115 return create_model(query_name, **fields)
118class SignalQuery(create_query_model(Signal), Query):
119 _base_class: ClassVar[type] = Signal
120 sort_by: str | None = "signal_id"
121 post_processing: bool | str | None = None
122 broadcastable: str | None = "true"
125class ForcedSignalQuery(create_query_model(ForcedSignal), Query):
126 _base_class: ClassVar[type] = ForcedSignal
127 sort_by: str | None = "signal_id"
130class DeviceStatesQuery(create_query_model(DeviceState), Query):
131 _base_class: ClassVar[type] = DeviceState
132 sort_by: str | None = "timestamp:-1"
133 modified_properties: str | None
136class EventQuery(create_query_model(Event), Query):
137 _base_class: ClassVar[type] = Event
138 sort_by: str | None = "timestamp"
141class EventRuleQuery(create_query_model(EventRule), Query):
142 _base_class: ClassVar[type] = EventRule
143 sort_by: str | None = "name"
146class CommandQuery(create_query_model(Command), Query):
147 _base_class: ClassVar[type] = Command
148 sort_by: str | None = "timestamp:-1"
151class GraphThemeQuery(create_query_model(PublicGraphTheme), Query):
152 _base_class: ClassVar[type] = PublicGraphTheme