Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/queries.py: 81%
72 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-16 14:43 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-16 14:43 +0000
1from typing import Optional, get_args, ClassVar
2import datetime
4from pydantic import BaseModel, create_model
5from fastapi import HTTPException
7from twinpad_backend.models import Signal, Event, EventRule, DeviceState
10REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in")}
13def api_operators_to_mongo(operator, value):
14 if operator in REST_API_OPERATORS_TO_MONGO:
15 return REST_API_OPERATORS_TO_MONGO[operator], value
17 if operator == "startswith":
18 return "$regex", f"^{value}"
20 if operator == "contains":
21 return "$regex", f".*{value}.*"
23 raise ValueError("Operator unsupported")
26def get_main_annotation(annotation):
27 for arg in get_args(annotation):
28 if arg is not None:
29 return arg
30 return annotation
33class Query(BaseModel):
34 limit: int | None = 20
35 offset: int | None = 0
36 sort_by: str | None = "id"
37 # sort_order: int | None = 1
39 def mongodb_filter(self):
40 # filter_ = {}
41 filters_by_field = {}
42 for field_name in self.model_fields.keys():
43 if field_name not in ["limit", "offset", "sort_by"]:
44 value = getattr(self, field_name)
45 if value is not None:
46 filters_by_field.setdefault(field_name, {})
47 for segment in value.split("|"):
48 if ":" in segment:
49 operator, value = segment.split(":")
50 else:
51 value = segment
52 operator = "eq"
54 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation)
55 if operator in ["in"]: # operators that accepts list argument
56 # Parse list
57 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")]
58 else:
59 value = annotation(value)
60 if field_name == "timestamp":
61 value = datetime.datetime.fromtimestamp(value)
63 try:
64 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value)
65 except ValueError:
66 raise HTTPException(
67 status_code=400,
68 detail=f"Query Operator unsupported, got {operator}",
69 )
71 filters_by_field[field_name][mongo_operator] = mongo_value
73 filters_list = []
74 for field_name, field_filters in filters_by_field.items():
75 filters_list.append({field_name: field_filters})
76 if not filters_list:
77 return {}
78 if len(filters_list) == 1:
79 return filters_list[0]
80 return {"$and": filters_list}
83def create_query_model(model):
84 fields = {}
86 for field_name in model.model_fields.keys():
87 fields[field_name] = (Optional[str], None)
89 query_name = model.__name__ + "Query"
90 return create_model(query_name, **fields)
93class SignalQuery(create_query_model(Signal), Query):
94 _base_class: ClassVar[type] = Signal
95 sort_by: str | None = "signal_id"
98class DeviceStatesQuery(create_query_model(DeviceState), Query):
99 _base_class: ClassVar[type] = DeviceState
100 sort_by: str | None = "timestamp:-1"
103class EventQuery(create_query_model(Event), Query):
104 _base_class: ClassVar[type] = Event
105 sort_by: str | None = "timestamp"
108class EventRuleQuery(create_query_model(Event), Query):
109 _base_class: ClassVar[type] = EventRule
110 sort_by: str | None = "name"