Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / messages.py: 87%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-03 16:35 +0000

1import time 

2import os 

3import logging 

4import uuid 

5import ssl 

6 

7import json 

8import pika 

9import pika.exceptions 

10 

11from pika.credentials import ExternalCredentials 

12from pika.adapters.blocking_connection import BlockingChannel 

13 

14logger = logging.getLogger("uvicorn.error") 

15 

16RABBITMQ_HOST = os.environ.get( 

17 "RABBITMQ_HOST", "cloud-broker" 

18) # Not a parameter because bundled with other backend services 

19AMQP_SSL_PORT = os.environ.get("AMQP_SSL_PORT") 

20RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME") 

21RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD") 

22CERT_DIRECTORY = os.environ.get("CERT_DIRECTORY") 

23CERT_PASSWORD = os.environ.get("CERT_PASSWORD") 

24 

25RESPONSE_TIMEOUT = 5 

26 

27 

28class RabbitMQClient: 

29 connection: pika.BlockingConnection 

30 channel: BlockingChannel 

31 correlation_id: str = None 

32 response: dict = None 

33 

34 def __init__(self): 

35 self.channel = self.get_channel() 

36 

37 def get_channel(self): 

38 # AMQP connection 

39 context = ssl.create_default_context(cafile=os.path.join(CERT_DIRECTORY, "ca_certificate.pem")) 

40 context.verify_mode = ssl.CERT_REQUIRED 

41 context.load_cert_chain( 

42 certfile=os.path.join(CERT_DIRECTORY, "client_certificate.pem"), 

43 keyfile=os.path.join(CERT_DIRECTORY, "client_key.pem"), 

44 password=CERT_PASSWORD, 

45 ) 

46 ssl_options = pika.SSLOptions(context, RABBITMQ_USERNAME) 

47 parameters = pika.ConnectionParameters( 

48 host=RABBITMQ_HOST, 

49 port=AMQP_SSL_PORT, 

50 ssl_options=ssl_options, 

51 credentials=ExternalCredentials(), 

52 ) 

53 rmq_connected = False 

54 while not rmq_connected: 

55 logger.info("Connecting to broker...") 

56 try: 

57 self.connection = pika.BlockingConnection(parameters) 

58 rmq_connected = True 

59 

60 except pika.exceptions.AMQPConnectionError: 

61 print("Waiting 5 seconds for RabbitMQ to be up...") 

62 time.sleep(5.0) 

63 

64 channel = self.connection.channel() 

65 return channel 

66 

67 async def send_mode_change(self, device_id: str, mode_id: int): 

68 payload = {"device_id": device_id, "mode_id": mode_id} 

69 

70 return await self.send_message_wait_for_response( 

71 payload, f"commands.{device_id}", f"commands_callback.{device_id}" 

72 ) 

73 

74 async def send_signal_value(self, signal_id: str, update) -> dict: 

75 """Send commands or forced value.""" 

76 payload = {"signal_id": signal_id} 

77 if update.timestamp is not None: 

78 payload["timestamp"] = update.timestamp 

79 if update.value is not None: 

80 payload["value"] = update.value 

81 payload["forced_value"] = update.forced_value 

82 if update.forced_value is None: 

83 del payload["forced_value"] 

84 

85 device_id = signal_id.split(".")[0] 

86 

87 return await self.send_message_wait_for_response( 

88 payload, f"commands.{device_id}", f"commands_callback.{device_id}" 

89 ) 

90 

91 async def send_message_wait_for_response(self, payload, queue_name: str, callback_queue_name: str) -> dict: 

92 self.correlation_id = str(uuid.uuid4()) 

93 

94 payload["correlation_id"] = self.correlation_id 

95 payload["reply_to"] = callback_queue_name 

96 

97 self.channel.queue_declare( 

98 queue=callback_queue_name, durable=True, exclusive=False, arguments={"x-message-ttl": 10000} 

99 ) 

100 self.channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name) 

101 

102 self.channel.basic_publish( 

103 "amq.topic", 

104 queue_name, 

105 json.dumps(payload), 

106 pika.BasicProperties( 

107 content_type="text/plain", 

108 delivery_mode=pika.DeliveryMode.Persistent, 

109 reply_to=callback_queue_name, 

110 correlation_id=self.correlation_id, 

111 ), 

112 ) 

113 

114 consumer_tag = self.channel.basic_consume( 

115 callback_queue_name, self.on_callback_message, auto_ack=False, exclusive=False 

116 ) 

117 # blocks processes for a certain time to ensure consumption is done 

118 start_time = time.time() 

119 while self.response is None and time.time() < start_time + RESPONSE_TIMEOUT: 

120 self.connection.process_data_events(time_limit=max(0, time.time() - start_time + RESPONSE_TIMEOUT)) 

121 

122 self.channel.stop_consuming(consumer_tag) 

123 if self.response is None: 

124 self.response = { 

125 "error": True, 

126 "status_code": 504, 

127 "message": f"Device timed out after {RESPONSE_TIMEOUT} seconds", 

128 } 

129 

130 return self.response 

131 

132 def on_callback_message(self, channel: BlockingChannel, method_frame, properties, body): 

133 if not body: 

134 return 

135 

136 response = json.loads(body) 

137 # Made it so the device can put the correlation_id either in the body or in the properties 

138 received_correlation_id = response.get("correlation_id", None) 

139 if received_correlation_id is None and properties.correlation_id is not None: 

140 received_correlation_id = properties.correlation_id 

141 

142 if self.correlation_id == received_correlation_id: 

143 # Clean up response as it is returned afterwards 

144 if "correlation_id" in response: 

145 del response["correlation_id"] 

146 self.response = response 

147 channel.basic_ack(method_frame.delivery_tag) 

148 return 

149 

150 requeue = True 

151 if received_correlation_id is None: 

152 requeue = False 

153 channel.basic_nack(method_frame.delivery_tag, requeue=requeue)