Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / queries.py: 98%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-11 15:40 +0000

1from typing import get_args, ClassVar 

2import datetime 

3import json 

4import re 

5 

6from pydantic import BaseModel, create_model 

7from fastapi import HTTPException 

8 

9from twinpad_backend.models import ( 

10 EtherCatTopology, 

11 Signal, 

12 ForcedSignal, 

13 Event, 

14 EventRule, 

15 DeviceState, 

16 Command, 

17 PublicGraphTheme, 

18 Configuration, 

19) 

20 

21 

22REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in", "not")} 

23 

24 

25def api_operators_to_mongo(operator, value): 

26 if operator in REST_API_OPERATORS_TO_MONGO: 

27 return REST_API_OPERATORS_TO_MONGO[operator], value 

28 

29 if operator == "startswith": 

30 return "$regex", f"^{value}" 

31 

32 if operator == "contains": 

33 return "$regex", f".*{value}.*" 

34 

35 raise ValueError("Operator unsupported") 

36 

37 

38def get_main_annotation(annotation): 

39 for arg in get_args(annotation): 

40 if arg is not None: 

41 if arg is bool: 

42 return strtobool 

43 return arg 

44 if annotation is bool: 

45 return strtobool 

46 return annotation 

47 

48 

49def strtobool(value: str): 

50 """Convert a string representation of truth to True or False. 

51 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values 

52 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if 

53 'val' is anything else. 

54 

55 It was found here: https://stackoverflow.com/a/18472142 and copied from distutils, a now deprecated stdlib 

56 """ 

57 value = value.strip().lower() 

58 if value in {"y", "yes", "t", "true", "on", "1"}: 

59 return True 

60 if value in {"n", "no", "f", "false", "off", "0"}: 

61 return False 

62 raise HTTPException(status_code=400, detail=f"Invalid truth value: {value}") 

63 

64 

65class Query(BaseModel): 

66 limit: int | None = 20 

67 offset: int | None = 0 

68 sort_by: str | None = "id" 

69 

70 def mongodb_filter(self): 

71 filters_by_field = {} 

72 for field_name in self._base_class.model_fields.keys(): 

73 value = getattr(self, field_name) 

74 if value is not None: 

75 filters_by_field.setdefault(field_name, {}) 

76 for segment in value.split("||"): 

77 if ":" in segment: 

78 operator, value = segment.split(":", maxsplit=1) 

79 else: 

80 value = segment 

81 operator = "eq" 

82 

83 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation) 

84 if operator in set(["in"]): # operators that accepts list argument 

85 # Parse list 

86 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")] 

87 elif re.match(r"\{.*\}", value): 

88 value = json.loads(value) 

89 else: 

90 value = annotation(value) 

91 if field_name == "timestamp": 

92 value = datetime.datetime.fromtimestamp(value) 

93 

94 try: 

95 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value) 

96 except ValueError as e: 

97 raise HTTPException( 

98 status_code=400, 

99 detail=f"Query Operator unsupported, got {operator}", 

100 ) from e 

101 

102 # when it's a regex add to its content instead of replacing it 

103 if mongo_operator == "$regex": 

104 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value: 

105 filters_by_field[field_name][mongo_operator] += mongo_value 

106 else: 

107 filters_by_field[field_name][mongo_operator] = mongo_value 

108 

109 filters_list = [] 

110 for field_name, field_filters in filters_by_field.items(): 

111 filters_list.append({field_name: field_filters}) 

112 if not filters_list: 

113 return {} 

114 if len(filters_list) == 1: 

115 return filters_list[0] 

116 return {"$and": filters_list} 

117 

118 

119def create_query_model(model): 

120 fields = {} 

121 

122 for field_name in model.model_fields.keys(): 

123 fields[field_name] = (str | None, None) 

124 

125 query_name = model.__name__ + "Query" 

126 return create_model(query_name, **fields) 

127 

128 

129class SignalQuery(create_query_model(Signal), Query): 

130 _base_class: ClassVar[type] = Signal 

131 sort_by: str | None = "signal_id" 

132 post_processing: bool | str | None = None 

133 broadcastable: str | None = "true" 

134 

135 

136class ForcedSignalQuery(create_query_model(ForcedSignal), Query): 

137 _base_class: ClassVar[type] = ForcedSignal 

138 sort_by: str | None = "signal_id" 

139 

140 

141class DeviceStatesQuery(create_query_model(DeviceState), Query): 

142 _base_class: ClassVar[type] = DeviceState 

143 sort_by: str | None = "timestamp:-1" 

144 modified_properties: str | None 

145 

146 

147class EventQuery(create_query_model(Event), Query): 

148 _base_class: ClassVar[type] = Event 

149 sort_by: str | None = "timestamp" 

150 

151 

152class EventRuleQuery(create_query_model(EventRule), Query): 

153 _base_class: ClassVar[type] = EventRule 

154 sort_by: str | None = "name" 

155 

156 

157class CommandQuery(create_query_model(Command), Query): 

158 _base_class: ClassVar[type] = Command 

159 sort_by: str | None = "timestamp:-1" 

160 

161 

162class GraphThemeQuery(create_query_model(PublicGraphTheme), Query): 

163 _base_class: ClassVar[type] = PublicGraphTheme 

164 

165 

166class ConfigurationQuery(create_query_model(Configuration), Query): 

167 _base_class: ClassVar[type] = Configuration 

168 sort_by: str | None = "received_at:-1" 

169 

170 

171class EtherCatTopologyQuery(create_query_model(EtherCatTopology), Query): 

172 _base_class: ClassVar[type] = EtherCatTopology