Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/queries.py: 99%
96 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
1from typing import get_args, ClassVar
2import datetime
3import logging
5from pydantic import BaseModel, create_model
6from fastapi import HTTPException
8from twinpad_backend.models import Signal, Event, EventRule, DeviceState, Command, PublicGraphTheme, Configuration
11REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in")}
13REGEX_OPERATORS = ["startswith", "contains"]
16def api_operators_to_mongo(operator, value):
17 if operator in REST_API_OPERATORS_TO_MONGO:
18 return REST_API_OPERATORS_TO_MONGO[operator], value
20 if operator == "startswith":
21 return "$regex", f"^{value}"
23 if operator == "contains":
24 return "$regex", f".*{value}.*"
26 raise ValueError("Operator unsupported")
29def get_main_annotation(annotation):
30 for arg in get_args(annotation):
31 if arg is not None:
32 if arg is bool:
33 return strtobool
34 return arg
35 if annotation is bool:
36 return strtobool
37 return annotation
40def strtobool(value: str):
41 """Convert a string representation of truth to True or False.
42 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
43 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
44 'val' is anything else.
46 It was found here: https://stackoverflow.com/a/18472142 and copied from distutils, a now deprecated stdlib
47 """
48 value = value.strip().lower()
49 if value in {"y", "yes", "t", "true", "on", "1"}:
50 return True
51 if value in {"n", "no", "f", "false", "off", "0"}:
52 return False
53 raise HTTPException(status_code=400, detail=f"Invalid truth value: {value}")
56class Query(BaseModel):
57 limit: int | None = 20
58 offset: int | None = 0
59 sort_by: str | None = "id"
61 def mongodb_filter(self):
62 filters_by_field = {}
63 for field_name in self._base_class.model_fields.keys():
64 value = getattr(self, field_name)
65 if value is not None:
66 filters_by_field.setdefault(field_name, {})
67 for segment in value.split("||"):
68 if ":" in segment:
69 operator, value = segment.split(":")
70 else:
71 value = segment
72 operator = "eq"
74 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation)
75 if operator in set(["in"]): # operators that accepts list argument
76 # Parse list
77 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")]
78 else:
79 value = annotation(value)
80 if field_name == "timestamp":
81 value = datetime.datetime.fromtimestamp(value)
83 try:
84 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value)
85 except ValueError as e:
86 raise HTTPException(
87 status_code=400,
88 detail=f"Query Operator unsupported, got {operator}",
89 ) from e
91 # when it's a regex add to its content instead of replacing it
92 if mongo_operator == "$regex":
93 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value:
94 filters_by_field[field_name][mongo_operator] += mongo_value
95 else:
96 filters_by_field[field_name][mongo_operator] = mongo_value
98 filters_list = []
99 for field_name, field_filters in filters_by_field.items():
100 filters_list.append({field_name: field_filters})
101 if not filters_list:
102 return {}
103 if len(filters_list) == 1:
104 return filters_list[0]
105 return {"$and": filters_list}
108def create_query_model(model):
109 fields = {}
111 for field_name in model.model_fields.keys():
112 fields[field_name] = (str | None, None)
114 query_name = model.__name__ + "Query"
115 return create_model(query_name, **fields)
118class SignalQuery(create_query_model(Signal), Query):
119 _base_class: ClassVar[type] = Signal
120 sort_by: str | None = "signal_id"
123class DeviceStatesQuery(create_query_model(DeviceState), Query):
124 _base_class: ClassVar[type] = DeviceState
125 sort_by: str | None = "timestamp:-1"
126 modified_properties: str | None
129class EventQuery(create_query_model(Event), Query):
130 _base_class: ClassVar[type] = Event
131 sort_by: str | None = "timestamp"
134class EventRuleQuery(create_query_model(EventRule), Query):
135 _base_class: ClassVar[type] = EventRule
136 sort_by: str | None = "name"
139class CommandQuery(create_query_model(Command), Query):
140 _base_class: ClassVar[type] = Command
141 sort_by: str | None = "timestamp:-1"
144class GraphThemeQuery(create_query_model(PublicGraphTheme), Query):
145 _base_class: ClassVar[type] = PublicGraphTheme
148class ConfigurationQuery(create_query_model(Configuration), Query):
149 _base_class: ClassVar[type] = Configuration
150 sort_by: str | None = "received_at:-1"