Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/queries.py: 99%

96 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-20 11:44 +0000

1from typing import get_args, ClassVar 

2import datetime 

3import logging 

4 

5from pydantic import BaseModel, create_model 

6from fastapi import HTTPException 

7 

8from twinpad_backend.models import Signal, Event, EventRule, DeviceState, Command, PublicGraphTheme, Configuration 

9 

10 

11REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in")} 

12 

13REGEX_OPERATORS = ["startswith", "contains"] 

14 

15 

16def api_operators_to_mongo(operator, value): 

17 if operator in REST_API_OPERATORS_TO_MONGO: 

18 return REST_API_OPERATORS_TO_MONGO[operator], value 

19 

20 if operator == "startswith": 

21 return "$regex", f"^{value}" 

22 

23 if operator == "contains": 

24 return "$regex", f".*{value}.*" 

25 

26 raise ValueError("Operator unsupported") 

27 

28 

29def get_main_annotation(annotation): 

30 for arg in get_args(annotation): 

31 if arg is not None: 

32 if arg is bool: 

33 return strtobool 

34 return arg 

35 if annotation is bool: 

36 return strtobool 

37 return annotation 

38 

39 

40def strtobool(value: str): 

41 """Convert a string representation of truth to True or False. 

42 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values 

43 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if 

44 'val' is anything else. 

45 

46 It was found here: https://stackoverflow.com/a/18472142 and copied from distutils, a now deprecated stdlib 

47 """ 

48 value = value.strip().lower() 

49 if value in {"y", "yes", "t", "true", "on", "1"}: 

50 return True 

51 if value in {"n", "no", "f", "false", "off", "0"}: 

52 return False 

53 raise HTTPException(status_code=400, detail=f"Invalid truth value: {value}") 

54 

55 

56class Query(BaseModel): 

57 limit: int | None = 20 

58 offset: int | None = 0 

59 sort_by: str | None = "id" 

60 

61 def mongodb_filter(self): 

62 filters_by_field = {} 

63 for field_name in self._base_class.model_fields.keys(): 

64 value = getattr(self, field_name) 

65 if value is not None: 

66 filters_by_field.setdefault(field_name, {}) 

67 for segment in value.split("||"): 

68 if ":" in segment: 

69 operator, value = segment.split(":") 

70 else: 

71 value = segment 

72 operator = "eq" 

73 

74 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation) 

75 if operator in set(["in"]): # operators that accepts list argument 

76 # Parse list 

77 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")] 

78 else: 

79 value = annotation(value) 

80 if field_name == "timestamp": 

81 value = datetime.datetime.fromtimestamp(value) 

82 

83 try: 

84 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value) 

85 except ValueError as e: 

86 raise HTTPException( 

87 status_code=400, 

88 detail=f"Query Operator unsupported, got {operator}", 

89 ) from e 

90 

91 # when it's a regex add to its content instead of replacing it 

92 if mongo_operator == "$regex": 

93 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value: 

94 filters_by_field[field_name][mongo_operator] += mongo_value 

95 else: 

96 filters_by_field[field_name][mongo_operator] = mongo_value 

97 

98 filters_list = [] 

99 for field_name, field_filters in filters_by_field.items(): 

100 filters_list.append({field_name: field_filters}) 

101 if not filters_list: 

102 return {} 

103 if len(filters_list) == 1: 

104 return filters_list[0] 

105 return {"$and": filters_list} 

106 

107 

108def create_query_model(model): 

109 fields = {} 

110 

111 for field_name in model.model_fields.keys(): 

112 fields[field_name] = (str | None, None) 

113 

114 query_name = model.__name__ + "Query" 

115 return create_model(query_name, **fields) 

116 

117 

118class SignalQuery(create_query_model(Signal), Query): 

119 _base_class: ClassVar[type] = Signal 

120 sort_by: str | None = "signal_id" 

121 

122 

123class DeviceStatesQuery(create_query_model(DeviceState), Query): 

124 _base_class: ClassVar[type] = DeviceState 

125 sort_by: str | None = "timestamp:-1" 

126 modified_properties: str | None 

127 

128 

129class EventQuery(create_query_model(Event), Query): 

130 _base_class: ClassVar[type] = Event 

131 sort_by: str | None = "timestamp" 

132 

133 

134class EventRuleQuery(create_query_model(EventRule), Query): 

135 _base_class: ClassVar[type] = EventRule 

136 sort_by: str | None = "name" 

137 

138 

139class CommandQuery(create_query_model(Command), Query): 

140 _base_class: ClassVar[type] = Command 

141 sort_by: str | None = "timestamp:-1" 

142 

143 

144class GraphThemeQuery(create_query_model(PublicGraphTheme), Query): 

145 _base_class: ClassVar[type] = PublicGraphTheme 

146 

147 

148class ConfigurationQuery(create_query_model(Configuration), Query): 

149 _base_class: ClassVar[type] = Configuration 

150 sort_by: str | None = "received_at:-1"