Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/queries.py: 99%

91 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-02 07:30 +0000

1from typing import get_args, ClassVar, Union 

2import datetime 

3 

4from pydantic import BaseModel, create_model 

5from fastapi import HTTPException 

6 

7from twinpad_backend.models import Signal, Event, EventRule, DeviceState, Command, PublicGraphTheme 

8 

9 

10REST_API_OPERATORS_TO_MONGO = {op: f"${op}" for op in ("gt", "gte", "lt", "lte", "eq", "in")} 

11 

12 

13def api_operators_to_mongo(operator, value): 

14 if operator in REST_API_OPERATORS_TO_MONGO: 

15 return REST_API_OPERATORS_TO_MONGO[operator], value 

16 

17 if operator == "startswith": 

18 return "$regex", f"^{value}" 

19 

20 if operator == "contains": 

21 return "$regex", f".*{value}.*" 

22 

23 raise ValueError("Operator unsupported") 

24 

25 

26def get_main_annotation(annotation): 

27 for arg in get_args(annotation): 

28 if arg is not None: 

29 if arg is bool: 

30 return strtobool 

31 return arg 

32 if annotation is bool: 

33 return strtobool 

34 return annotation 

35 

36 

37def strtobool(value: str): 

38 """Convert a string representation of truth to True or False. 

39 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values 

40 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if 

41 'val' is anything else. 

42 

43 It was found here: https://stackoverflow.com/a/18472142 and copied from distutils, a now deprecated stdlib 

44 """ 

45 value = value.strip().lower() 

46 if value in ("y", "yes", "t", "true", "on", "1"): 

47 return True 

48 elif value in ("n", "no", "f", "false", "off", "0"): 

49 return False 

50 else: 

51 raise HTTPException(status_code=400, detail=f"Invalid truth value: {value}") 

52 

53 

54class Query(BaseModel): 

55 limit: int | None = 20 

56 offset: int | None = 0 

57 sort_by: str | None = "id" 

58 

59 def mongodb_filter(self): 

60 filters_by_field = {} 

61 for field_name in self._base_class.model_fields.keys(): 

62 value = getattr(self, field_name) 

63 if value is not None: 

64 filters_by_field.setdefault(field_name, {}) 

65 for segment in value.split("|"): 

66 if ":" in segment: 

67 operator, value = segment.split(":") 

68 else: 

69 value = segment 

70 operator = "eq" 

71 

72 annotation = get_main_annotation(self._base_class.model_fields[field_name].annotation) 

73 if operator in set(["in"]): # operators that accepts list argument 

74 # Parse list 

75 value = [annotation(v) for v in value.lstrip("[").rstrip("]").split(",")] 

76 else: 

77 value = annotation(value) 

78 if field_name == "timestamp": 

79 value = datetime.datetime.fromtimestamp(value) 

80 

81 try: 

82 mongo_operator, mongo_value = api_operators_to_mongo(operator=operator, value=value) 

83 except ValueError as e: 

84 raise HTTPException( 

85 status_code=400, 

86 detail=f"Query Operator unsupported, got {operator}", 

87 ) from e 

88 

89 # when it's a regex add to its content instead of replacing it 

90 if mongo_operator == "$regex": 

91 if filters_by_field[field_name].setdefault(mongo_operator, mongo_value) != mongo_value: 

92 filters_by_field[field_name][mongo_operator] += mongo_value 

93 else: 

94 filters_by_field[field_name][mongo_operator] = mongo_value 

95 

96 filters_list = [] 

97 for field_name, field_filters in filters_by_field.items(): 

98 filters_list.append({field_name: field_filters}) 

99 if not filters_list: 

100 return {} 

101 if len(filters_list) == 1: 

102 return filters_list[0] 

103 return {"$and": filters_list} 

104 

105 

106def create_query_model(model): 

107 fields = {} 

108 

109 for field_name in model.model_fields.keys(): 

110 fields[field_name] = (Union[str, None], None) 

111 

112 query_name = model.__name__ + "Query" 

113 return create_model(query_name, **fields) 

114 

115 

116class SignalQuery(create_query_model(Signal), Query): 

117 _base_class: ClassVar[type] = Signal 

118 sort_by: str | None = "signal_id" 

119 

120 

121class DeviceStatesQuery(create_query_model(DeviceState), Query): 

122 _base_class: ClassVar[type] = DeviceState 

123 sort_by: str | None = "timestamp:-1" 

124 modified_properties: str | None 

125 

126 

127class EventQuery(create_query_model(Event), Query): 

128 _base_class: ClassVar[type] = Event 

129 sort_by: str | None = "timestamp" 

130 

131 

132class EventRuleQuery(create_query_model(EventRule), Query): 

133 _base_class: ClassVar[type] = EventRule 

134 sort_by: str | None = "name" 

135 

136 

137class CommandQuery(create_query_model(Command), Query): 

138 _base_class: ClassVar[type] = Command 

139 sort_by: str | None = "timestamp:-1" 

140 

141 

142class GraphThemeQuery(create_query_model(PublicGraphTheme), Query): 

143 _base_class: ClassVar[type] = PublicGraphTheme