Coverage for / usr / local / lib / python3.11 / site-packages / twinpad_backend / queries.py: 98%
99 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 11:18 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 11:18 +0000
1from typing import get_args, ClassVar
2import datetime
3import json
4import re
6from pydantic import BaseModel, create_model
7from fastapi import HTTPException
9from twinpad_backend.models import Signal, ForcedSignal, Event, EventRule, DeviceState, Command, PublicGraphTheme
12REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in", "not")}
15def api_operators_to_mongo(operator, value):
16 if operator in REST_API_OPERATORS_TO_MONGO:
17 return REST_API_OPERATORS_TO_MONGO[operator], value
19 if operator == "startswith":
20 return "$regex", f"^{value}"
22 if operator == "contains":
23 return "$regex", f".*{value}.*"
25 raise ValueError("Operator unsupported")
28def get_main_annotation(annotation):
29 for arg in get_args(annotation):
30 if arg is not None:
31 if arg is bool:
32 return strtobool
33 return arg
34 if annotation is bool:
35 return strtobool
36 return annotation
39def strtobool(value: str):
40 """Convert a string representation of truth to True or False.
41 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
42 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
43 'val' is anything else.
45 It was found here: https://stackoverflow.com/a/18472142 and copied from distutils, a now deprecated stdlib
46 """
47 value = value.strip().lower()
48 if value in {"y", "yes", "t", "true", "on", "1"}:
49 return True
50 if value in {"n", "no", "f", "false", "off", "0"}:
51 return False
52 raise HTTPException(status_code=400, detail=f"Invalid truth value: {value}")
55class Query(BaseModel):
56 limit: int | None = 20
57 offset: int | None = 0
58 sort_by: str | None = "id"
60 def mongodb_filter(self):
61 filters_by_field = {}
62 for field_name in self._base_class.model_fields.keys():
63 value = getattr(self, field_name)
64 if value is not None:
65 filters_by_field.setdefault(field_name, {})
66 for segment in value.split("|"):
67 if ":" in segment:
68 operator, value = segment.split(":", maxsplit=1)
69 else:
70 value = segment
71 operator = "eq"
73 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation)
74 if operator in set(["in"]): # operators that accepts list argument
75 # Parse list
76 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")]
77 elif re.match(r"\{.*\}", value):
78 value = json.loads(value)
79 else:
80 value = annotation(value)
81 if field_name == "timestamp":
82 value = datetime.datetime.fromtimestamp(value)
84 try:
85 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value)
86 except ValueError as e:
87 raise HTTPException(
88 status_code=400,
89 detail=f"Query Operator unsupported, got {operator}",
90 ) from e
92 # when it's a regex add to its content instead of replacing it
93 if mongo_operator == "$regex":
94 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value:
95 filters_by_field[field_name][mongo_operator] += mongo_value
96 else:
97 filters_by_field[field_name][mongo_operator] = mongo_value
99 filters_list = []
100 for field_name, field_filters in filters_by_field.items():
101 filters_list.append({field_name: field_filters})
102 if not filters_list:
103 return {}
104 if len(filters_list) == 1:
105 return filters_list[0]
106 return {"$and": filters_list}
109def create_query_model(model):
110 fields = {}
112 for field_name in model.model_fields.keys():
113 fields[field_name] = (str | None, None)
115 query_name = model.__name__ + "Query"
116 return create_model(query_name, **fields)
119class SignalQuery(create_query_model(Signal), Query):
120 _base_class: ClassVar[type] = Signal
121 sort_by: str | None = "signal_id"
122 post_processing: bool | str | None = None
125class ForcedSignalQuery(create_query_model(ForcedSignal), Query):
126 _base_class: ClassVar[type] = ForcedSignal
127 sort_by: str | None = "signal_id"
130class DeviceStatesQuery(create_query_model(DeviceState), Query):
131 _base_class: ClassVar[type] = DeviceState
132 sort_by: str | None = "timestamp:-1"
133 modified_properties: str | None
136class EventQuery(create_query_model(Event), Query):
137 _base_class: ClassVar[type] = Event
138 sort_by: str | None = "timestamp"
141class EventRuleQuery(create_query_model(EventRule), Query):
142 _base_class: ClassVar[type] = EventRule
143 sort_by: str | None = "name"
146class CommandQuery(create_query_model(Command), Query):
147 _base_class: ClassVar[type] = Command
148 sort_by: str | None = "timestamp:-1"
151class GraphThemeQuery(create_query_model(PublicGraphTheme), Query):
152 _base_class: ClassVar[type] = PublicGraphTheme