Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/queries.py: 84%

79 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 14:27 +0000

1from typing import get_args, ClassVar, Union 

2import datetime 

3 

4from pydantic import BaseModel, create_model 

5from fastapi import HTTPException 

6 

7from twinpad_backend.models import Signal, Event, EventRule, DeviceState, Command 

8 

9 

10REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in")} 

11 

12 

13def api_operators_to_mongo(operator, value): 

14 if operator in REST_API_OPERATORS_TO_MONGO: 

15 return REST_API_OPERATORS_TO_MONGO[operator], value 

16 

17 if operator == "startswith": 

18 return "$regex", f"^{value}" 

19 

20 if operator == "contains": 

21 return "$regex", f".*{value}.*" 

22 

23 raise ValueError("Operator unsupported") 

24 

25 

26def get_main_annotation(annotation): 

27 for arg in get_args(annotation): 

28 if arg is not None: 

29 return arg 

30 return annotation 

31 

32 

33class Query(BaseModel): 

34 limit: int | None = 20 

35 offset: int | None = 0 

36 sort_by: str | None = "id" 

37 # sort_order: int | None = 1 

38 

39 def mongodb_filter(self): 

40 # filter_ = {} 

41 filters_by_field = {} 

42 for field_name in self.model_fields.keys(): 

43 if field_name not in ["limit", "offset", "sort_by"]: 

44 value = getattr(self, field_name) 

45 if value is not None: 

46 filters_by_field.setdefault(field_name, {}) 

47 for segment in value.split("|"): 

48 if ":" in segment: 

49 operator, value = segment.split(":") 

50 else: 

51 value = segment 

52 operator = "eq" 

53 

54 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation) 

55 if operator in set(["in"]): # operators that accepts list argument 

56 # Parse list 

57 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")] 

58 else: 

59 value = annotation(value) 

60 if field_name == "timestamp": 

61 value = datetime.datetime.fromtimestamp(value) 

62 

63 try: 

64 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value) 

65 except ValueError as e: 

66 raise HTTPException( 

67 status_code=400, 

68 detail=f"Query Operator unsupported, got {operator}", 

69 ) from e 

70 

71 # when it's a regex add to its content instead of replacing it 

72 if mongo_operator == "$regex": 

73 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value: 

74 filters_by_field[field_name][mongo_operator] += mongo_value 

75 else: 

76 filters_by_field[field_name][mongo_operator] = mongo_value 

77 

78 filters_list = [] 

79 for field_name, field_filters in filters_by_field.items(): 

80 filters_list.append({field_name: field_filters}) 

81 if not filters_list: 

82 return {} 

83 if len(filters_list) == 1: 

84 return filters_list[0] 

85 return {"$and": filters_list} 

86 

87 

88def create_query_model(model): 

89 fields = {} 

90 

91 for field_name in model.model_fields.keys(): 

92 fields[field_name] = (Union[str, None], None) 

93 

94 query_name = model.__name__ + "Query" 

95 return create_model(query_name, **fields) 

96 

97 

98class SignalQuery(create_query_model(Signal), Query): 

99 _base_class: ClassVar[type] = Signal 

100 sort_by: str | None = "signal_id" 

101 

102 

103class DeviceStatesQuery(create_query_model(DeviceState), Query): 

104 _base_class: ClassVar[type] = DeviceState 

105 sort_by: str | None = "timestamp:-1" 

106 modified_properties: str | None 

107 

108 

109class EventQuery(create_query_model(Event), Query): 

110 _base_class: ClassVar[type] = Event 

111 sort_by: str | None = "timestamp" 

112 

113 

114class EventRuleQuery(create_query_model(Event), Query): 

115 _base_class: ClassVar[type] = EventRule 

116 sort_by: str | None = "name" 

117 

118 

119class CommandQuery(create_query_model(Command), Query): 

120 _base_class: ClassVar[type] = Command 

121 sort_by: str | None = "timestamp:-1"