Coverage for /usr/local/lib/python3.14/site-packages/twinpad_backend/messages.py: 100%

78 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-09 13:43 +0000

1import logging 

2import os 

3import socket 

4import time 

5import uuid 

6 

7import json 

8import pika 

9import pika.exceptions 

10 

11from pika.adapters.blocking_connection import BlockingChannel 

12 

13logger = logging.getLogger("uvicorn.error") 

14 

15RABBITMQ_HOST = os.environ.get( 

16 "RABBITMQ_HOST", "cloud-broker" 

17) # Not a parameter because bundled with other backend services 

18RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME") 

19RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD") 

20 

21RESPONSE_TIMEOUT = 5 

22 

23 

24class RabbitMQClient: 

25 connection: pika.BlockingConnection 

26 channel: BlockingChannel 

27 correlation_id: str = None 

28 response: dict = None 

29 

30 def __init__(self, broker_host: str = RABBITMQ_HOST): 

31 self.channel = self.get_channel(broker_host) 

32 

33 def get_channel(self, broker_host: str = RABBITMQ_HOST): 

34 parameters = pika.URLParameters(f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{broker_host}:5672/%2F") 

35 logger.info("Connecting to broker...") 

36 try: 

37 self.connection = pika.BlockingConnection(parameters) 

38 logger.info("Connected to broker") 

39 except (pika.exceptions.AMQPConnectionError, socket.gaierror): 

40 logger.info("Failed to connect to broker") 

41 return None 

42 

43 channel = self.connection.channel() 

44 return channel 

45 

46 async def send_mode_change(self, device_id: str, mode_id: int): 

47 payload = {"device_id": device_id, "mode_id": mode_id} 

48 

49 return await self.send_message_wait_for_response( 

50 payload, f"commands.{device_id}", f"commands_callback.{device_id}" 

51 ) 

52 

53 async def send_signal_value(self, signal_id: str, update) -> dict: 

54 """Send commands or forced value.""" 

55 payload = {"signal_id": signal_id} 

56 if update.timestamp is not None: 

57 payload["timestamp"] = update.timestamp 

58 if update.value is not None: 

59 payload["value"] = update.value 

60 payload["forced_value"] = update.forced_value 

61 if update.forced_value is None: 

62 del payload["forced_value"] 

63 

64 device_id = signal_id.split(".")[0] 

65 

66 return await self.send_message_wait_for_response( 

67 payload, f"commands.{device_id}", f"commands_callback.{device_id}" 

68 ) 

69 

70 async def send_message_wait_for_response(self, payload, queue_name: str, callback_queue_name: str) -> dict: 

71 if self.channel is None: 

72 return {"error": True, "status_code": 502, "message": "Could not connect to the message broker"} 

73 self.correlation_id = str(uuid.uuid4()) 

74 

75 payload["correlation_id"] = self.correlation_id 

76 payload["reply_to"] = callback_queue_name 

77 

78 self.channel.queue_declare( 

79 queue=callback_queue_name, durable=True, exclusive=False, arguments={"x-message-ttl": 10000} 

80 ) 

81 self.channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name) 

82 

83 self.channel.basic_publish( 

84 "amq.topic", 

85 queue_name, 

86 json.dumps(payload), 

87 pika.BasicProperties( 

88 content_type="text/plain", 

89 delivery_mode=pika.DeliveryMode.Persistent, 

90 reply_to=callback_queue_name, 

91 correlation_id=self.correlation_id, 

92 ), 

93 ) 

94 

95 consumer_tag = self.channel.basic_consume( 

96 callback_queue_name, self.on_callback_message, auto_ack=False, exclusive=False 

97 ) 

98 # blocks processes for a certain time to ensure consumption is done 

99 start_time = time.time() 

100 while self.response is None and time.time() < start_time + RESPONSE_TIMEOUT: 

101 self.connection.process_data_events(time_limit=max(0, time.time() - start_time + RESPONSE_TIMEOUT)) 

102 

103 self.channel.stop_consuming(consumer_tag) 

104 if self.response is None: 

105 self.response = { 

106 "error": True, 

107 "status_code": 504, 

108 "message": f"Device timed out after {RESPONSE_TIMEOUT} seconds", 

109 } 

110 

111 return self.response 

112 

113 def on_callback_message(self, channel: BlockingChannel, method_frame, properties, body): 

114 if not body: 

115 return 

116 

117 response = json.loads(body) 

118 # Made it so the device can put the correlation_id either in the body or in the properties 

119 received_correlation_id = response.get("correlation_id", None) 

120 if received_correlation_id is None and properties.correlation_id is not None: 

121 received_correlation_id = properties.correlation_id 

122 

123 if self.correlation_id == received_correlation_id: 

124 # Clean up response as it is returned afterwards 

125 if "correlation_id" in response: 

126 del response["correlation_id"] 

127 self.response = response 

128 channel.basic_ack(method_frame.delivery_tag) 

129 return 

130 

131 requeue = True 

132 if received_correlation_id is None: 

133 requeue = False 

134 channel.basic_nack(method_frame.delivery_tag, requeue=requeue)