Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/queries.py: 83%
76 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-10 15:38 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-10 15:38 +0000
1from typing import get_args, ClassVar, Union
2import datetime
4from pydantic import BaseModel, create_model
5from fastapi import HTTPException
7from twinpad_backend.models import Signal, Event, EventRule, DeviceState
10REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in")}
13def api_operators_to_mongo(operator, value):
14 if operator in REST_API_OPERATORS_TO_MONGO:
15 return REST_API_OPERATORS_TO_MONGO[operator], value
17 if operator == "startswith":
18 return "$regex", f"^{value}"
20 if operator == "contains":
21 return "$regex", f".*{value}.*"
23 raise ValueError("Operator unsupported")
26def get_main_annotation(annotation):
27 for arg in get_args(annotation):
28 if arg is not None:
29 return arg
30 return annotation
33class Query(BaseModel):
34 limit: int | None = 20
35 offset: int | None = 0
36 sort_by: str | None = "id"
37 # sort_order: int | None = 1
39 def mongodb_filter(self):
40 # filter_ = {}
41 filters_by_field = {}
42 for field_name in self.model_fields.keys():
43 if field_name not in ["limit", "offset", "sort_by"]:
44 value = getattr(self, field_name)
45 if value is not None:
46 filters_by_field.setdefault(field_name, {})
47 for segment in value.split("|"):
48 if ":" in segment:
49 operator, value = segment.split(":")
50 else:
51 value = segment
52 operator = "eq"
54 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation)
55 if operator in set(["in"]): # operators that accepts list argument
56 # Parse list
57 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")]
58 else:
59 value = annotation(value)
60 if field_name == "timestamp":
61 value = datetime.datetime.fromtimestamp(value)
63 try:
64 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value)
65 except ValueError as e:
66 raise HTTPException(
67 status_code=400,
68 detail=f"Query Operator unsupported, got {operator}",
69 ) from e
71 # when it's a regex add to its content instead of replacing it
72 if mongo_operator == "$regex":
73 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value:
74 filters_by_field[field_name][mongo_operator] += mongo_value
75 else:
76 filters_by_field[field_name][mongo_operator] = mongo_value
78 filters_list = []
79 for field_name, field_filters in filters_by_field.items():
80 filters_list.append({field_name: field_filters})
81 if not filters_list:
82 return {}
83 if len(filters_list) == 1:
84 return filters_list[0]
85 return {"$and": filters_list}
88def create_query_model(model):
89 fields = {}
91 for field_name in model.model_fields.keys():
92 fields[field_name] = (Union[str, None], None)
94 query_name = model.__name__ + "Query"
95 return create_model(query_name, **fields)
98class SignalQuery(create_query_model(Signal), Query):
99 _base_class: ClassVar[type] = Signal
100 sort_by: str | None = "signal_id"
103class DeviceStatesQuery(create_query_model(DeviceState), Query):
104 _base_class: ClassVar[type] = DeviceState
105 sort_by: str | None = "timestamp:-1"
106 modified_properties: str | None
109class EventQuery(create_query_model(Event), Query):
110 _base_class: ClassVar[type] = Event
111 sort_by: str | None = "timestamp"
114class EventRuleQuery(create_query_model(Event), Query):
115 _base_class: ClassVar[type] = EventRule
116 sort_by: str | None = "name"