Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / messages.py: 86%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-27 14:22 +0000

1import time 

2import os 

3import logging 

4import uuid 

5 

6import json 

7import pika 

8import pika.exceptions 

9 

10from pika.adapters.blocking_connection import BlockingChannel 

11 

12logger = logging.getLogger("uvicorn.error") 

13 

14RABBITMQ_HOST = os.environ.get( 

15 "RABBITMQ_HOST", "cloud-broker" 

16) # Not a parameter because bundled with other backend services 

17RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME") 

18RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD") 

19 

20RESPONSE_TIMEOUT = 5 

21 

22 

23class RabbitMQClient: 

24 connection: pika.BlockingConnection 

25 channel: BlockingChannel 

26 correlation_id: str = None 

27 response: dict = None 

28 

29 def __init__(self): 

30 self.channel = self.get_channel() 

31 

32 def get_channel(self): 

33 parameters = pika.URLParameters(f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:5672/%2F") 

34 rmq_connected = False 

35 while not rmq_connected: 

36 logger.info("Connecting to broker...") 

37 try: 

38 self.connection = pika.BlockingConnection(parameters) 

39 rmq_connected = True 

40 logger.info("Connected to broker") 

41 

42 except pika.exceptions.AMQPConnectionError: 

43 print("Waiting 5 seconds for RabbitMQ to be up...") 

44 time.sleep(5.0) 

45 

46 channel = self.connection.channel() 

47 return channel 

48 

49 async def send_mode_change(self, device_id: str, mode_id: int): 

50 payload = {"device_id": device_id, "mode_id": mode_id} 

51 

52 return await self.send_message_wait_for_response( 

53 payload, f"commands.{device_id}", f"commands_callback.{device_id}" 

54 ) 

55 

56 async def send_signal_value(self, signal_id: str, update) -> dict: 

57 """Send commands or forced value.""" 

58 payload = {"signal_id": signal_id} 

59 if update.timestamp is not None: 

60 payload["timestamp"] = update.timestamp 

61 if update.value is not None: 

62 payload["value"] = update.value 

63 payload["forced_value"] = update.forced_value 

64 if update.forced_value is None: 

65 del payload["forced_value"] 

66 

67 device_id = signal_id.split(".")[0] 

68 

69 return await self.send_message_wait_for_response( 

70 payload, f"commands.{device_id}", f"commands_callback.{device_id}" 

71 ) 

72 

73 async def send_message_wait_for_response(self, payload, queue_name: str, callback_queue_name: str) -> dict: 

74 self.correlation_id = str(uuid.uuid4()) 

75 

76 payload["correlation_id"] = self.correlation_id 

77 payload["reply_to"] = callback_queue_name 

78 

79 self.channel.queue_declare( 

80 queue=callback_queue_name, durable=True, exclusive=False, arguments={"x-message-ttl": 10000} 

81 ) 

82 self.channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name) 

83 

84 self.channel.basic_publish( 

85 "amq.topic", 

86 queue_name, 

87 json.dumps(payload), 

88 pika.BasicProperties( 

89 content_type="text/plain", 

90 delivery_mode=pika.DeliveryMode.Persistent, 

91 reply_to=callback_queue_name, 

92 correlation_id=self.correlation_id, 

93 ), 

94 ) 

95 

96 consumer_tag = self.channel.basic_consume( 

97 callback_queue_name, self.on_callback_message, auto_ack=False, exclusive=False 

98 ) 

99 # blocks processes for a certain time to ensure consumption is done 

100 start_time = time.time() 

101 while self.response is None and time.time() < start_time + RESPONSE_TIMEOUT: 

102 self.connection.process_data_events(time_limit=max(0, time.time() - start_time + RESPONSE_TIMEOUT)) 

103 

104 self.channel.stop_consuming(consumer_tag) 

105 if self.response is None: 

106 self.response = { 

107 "error": True, 

108 "status_code": 504, 

109 "message": f"Device timed out after {RESPONSE_TIMEOUT} seconds", 

110 } 

111 

112 return self.response 

113 

114 def on_callback_message(self, channel: BlockingChannel, method_frame, properties, body): 

115 if not body: 

116 return 

117 

118 response = json.loads(body) 

119 # Made it so the device can put the correlation_id either in the body or in the properties 

120 received_correlation_id = response.get("correlation_id", None) 

121 if received_correlation_id is None and properties.correlation_id is not None: 

122 received_correlation_id = properties.correlation_id 

123 

124 if self.correlation_id == received_correlation_id: 

125 # Clean up response as it is returned afterwards 

126 if "correlation_id" in response: 

127 del response["correlation_id"] 

128 self.response = response 

129 channel.basic_ack(method_frame.delivery_tag) 

130 return 

131 

132 requeue = True 

133 if received_correlation_id is None: 

134 requeue = False 

135 channel.basic_nack(method_frame.delivery_tag, requeue=requeue)