Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/queries.py: 99%
91 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-02 07:30 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-02 07:30 +0000
1from typing import get_args, ClassVar, Union
2import datetime
4from pydantic import BaseModel, create_model
5from fastapi import HTTPException
7from twinpad_backend.models import Signal, Event, EventRule, DeviceState, Command, PublicGraphTheme
10REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in")}
13def api_operators_to_mongo(operator, value):
14 if operator in REST_API_OPERATORS_TO_MONGO:
15 return REST_API_OPERATORS_TO_MONGO[operator], value
17 if operator == "startswith":
18 return "$regex", f"^{value}"
20 if operator == "contains":
21 return "$regex", f".*{value}.*"
23 raise ValueError("Operator unsupported")
26def get_main_annotation(annotation):
27 for arg in get_args(annotation):
28 if arg is not None:
29 if arg is bool:
30 return strtobool
31 return arg
32 if annotation is bool:
33 return strtobool
34 return annotation
37def strtobool(value: str):
38 """Convert a string representation of truth to True or False.
39 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
40 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
41 'val' is anything else.
43 It was found here: https://stackoverflow.com/a/18472142 and copied from distutils, a now deprecated stdlib
44 """
45 value = value.strip().lower()
46 if value in ("y", "yes", "t", "true", "on", "1"):
47 return True
48 elif value in ("n", "no", "f", "false", "off", "0"):
49 return False
50 else:
51 raise HTTPException(status_code=400, detail=f"Invalid truth value: {value}")
54class Query(BaseModel):
55 limit: int | None = 20
56 offset: int | None = 0
57 sort_by: str | None = "id"
59 def mongodb_filter(self):
60 filters_by_field = {}
61 for field_name in self._base_class.model_fields.keys():
62 value = getattr(self, field_name)
63 if value is not None:
64 filters_by_field.setdefault(field_name, {})
65 for segment in value.split("|"):
66 if ":" in segment:
67 operator, value = segment.split(":")
68 else:
69 value = segment
70 operator = "eq"
72 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation)
73 if operator in set(["in"]): # operators that accepts list argument
74 # Parse list
75 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")]
76 else:
77 value = annotation(value)
78 if field_name == "timestamp":
79 value = datetime.datetime.fromtimestamp(value)
81 try:
82 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value)
83 except ValueError as e:
84 raise HTTPException(
85 status_code=400,
86 detail=f"Query Operator unsupported, got {operator}",
87 ) from e
89 # when it's a regex add to its content instead of replacing it
90 if mongo_operator == "$regex":
91 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value:
92 filters_by_field[field_name][mongo_operator] += mongo_value
93 else:
94 filters_by_field[field_name][mongo_operator] = mongo_value
96 filters_list = []
97 for field_name, field_filters in filters_by_field.items():
98 filters_list.append({field_name: field_filters})
99 if not filters_list:
100 return {}
101 if len(filters_list) == 1:
102 return filters_list[0]
103 return {"$and": filters_list}
106def create_query_model(model):
107 fields = {}
109 for field_name in model.model_fields.keys():
110 fields[field_name] = (Union[str, None], None)
112 query_name = model.__name__ + "Query"
113 return create_model(query_name, **fields)
116class SignalQuery(create_query_model(Signal), Query):
117 _base_class: ClassVar[type] = Signal
118 sort_by: str | None = "signal_id"
121class DeviceStatesQuery(create_query_model(DeviceState), Query):
122 _base_class: ClassVar[type] = DeviceState
123 sort_by: str | None = "timestamp:-1"
124 modified_properties: str | None
127class EventQuery(create_query_model(Event), Query):
128 _base_class: ClassVar[type] = Event
129 sort_by: str | None = "timestamp"
132class EventRuleQuery(create_query_model(EventRule), Query):
133 _base_class: ClassVar[type] = EventRule
134 sort_by: str | None = "name"
137class CommandQuery(create_query_model(Command), Query):
138 _base_class: ClassVar[type] = Command
139 sort_by: str | None = "timestamp:-1"
142class GraphThemeQuery(create_query_model(PublicGraphTheme), Query):
143 _base_class: ClassVar[type] = PublicGraphTheme