Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / messages.py: 87%
86 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 16:35 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 16:35 +0000
1import time
2import os
3import logging
4import uuid
5import ssl
7import json
8import pika
9import pika.exceptions
11from pika.credentials import ExternalCredentials
12from pika.adapters.blocking_connection import BlockingChannel
14logger = logging.getLogger("uvicorn.error")
16RABBITMQ_HOST = os.environ.get(
17 "RABBITMQ_HOST", "cloud-broker"
18) # Not a parameter because bundled with other backend services
19AMQP_SSL_PORT = os.environ.get("AMQP_SSL_PORT")
20RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME")
21RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD")
22CERT_DIRECTORY = os.environ.get("CERT_DIRECTORY")
23CERT_PASSWORD = os.environ.get("CERT_PASSWORD")
25RESPONSE_TIMEOUT = 5
28class RabbitMQClient:
29 connection: pika.BlockingConnection
30 channel: BlockingChannel
31 correlation_id: str = None
32 response: dict = None
34 def __init__(self):
35 self.channel = self.get_channel()
37 def get_channel(self):
38 # AMQP connection
39 context = ssl.create_default_context(cafile=os.path.join(CERT_DIRECTORY, "ca_certificate.pem"))
40 context.verify_mode = ssl.CERT_REQUIRED
41 context.load_cert_chain(
42 certfile=os.path.join(CERT_DIRECTORY, "client_certificate.pem"),
43 keyfile=os.path.join(CERT_DIRECTORY, "client_key.pem"),
44 password=CERT_PASSWORD,
45 )
46 ssl_options = pika.SSLOptions(context, RABBITMQ_USERNAME)
47 parameters = pika.ConnectionParameters(
48 host=RABBITMQ_HOST,
49 port=AMQP_SSL_PORT,
50 ssl_options=ssl_options,
51 credentials=ExternalCredentials(),
52 )
53 rmq_connected = False
54 while not rmq_connected:
55 logger.info("Connecting to broker...")
56 try:
57 self.connection = pika.BlockingConnection(parameters)
58 rmq_connected = True
60 except pika.exceptions.AMQPConnectionError:
61 print("Waiting 5 seconds for RabbitMQ to be up...")
62 time.sleep(5.0)
64 channel = self.connection.channel()
65 return channel
67 async def send_mode_change(self, device_id: str, mode_id: int):
68 payload = {"device_id": device_id, "mode_id": mode_id}
70 return await self.send_message_wait_for_response(
71 payload, f"commands.{device_id}", f"commands_callback.{device_id}"
72 )
74 async def send_signal_value(self, signal_id: str, update) -> dict:
75 """Send commands or forced value."""
76 payload = {"signal_id": signal_id}
77 if update.timestamp is not None:
78 payload["timestamp"] = update.timestamp
79 if update.value is not None:
80 payload["value"] = update.value
81 payload["forced_value"] = update.forced_value
82 if update.forced_value is None:
83 del payload["forced_value"]
85 device_id = signal_id.split(".")[0]
87 return await self.send_message_wait_for_response(
88 payload, f"commands.{device_id}", f"commands_callback.{device_id}"
89 )
91 async def send_message_wait_for_response(self, payload, queue_name: str, callback_queue_name: str) -> dict:
92 self.correlation_id = str(uuid.uuid4())
94 payload["correlation_id"] = self.correlation_id
95 payload["reply_to"] = callback_queue_name
97 self.channel.queue_declare(
98 queue=callback_queue_name, durable=True, exclusive=False, arguments={"x-message-ttl": 10000}
99 )
100 self.channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name)
102 self.channel.basic_publish(
103 "amq.topic",
104 queue_name,
105 json.dumps(payload),
106 pika.BasicProperties(
107 content_type="text/plain",
108 delivery_mode=pika.DeliveryMode.Persistent,
109 reply_to=callback_queue_name,
110 correlation_id=self.correlation_id,
111 ),
112 )
114 consumer_tag = self.channel.basic_consume(
115 callback_queue_name, self.on_callback_message, auto_ack=False, exclusive=False
116 )
117 # blocks processes for a certain time to ensure consumption is done
118 start_time = time.time()
119 while self.response is None and time.time() < start_time + RESPONSE_TIMEOUT:
120 self.connection.process_data_events(time_limit=max(0, time.time() - start_time + RESPONSE_TIMEOUT))
122 self.channel.stop_consuming(consumer_tag)
123 if self.response is None:
124 self.response = {
125 "error": True,
126 "status_code": 504,
127 "message": f"Device timed out after {RESPONSE_TIMEOUT} seconds",
128 }
130 return self.response
132 def on_callback_message(self, channel: BlockingChannel, method_frame, properties, body):
133 if not body:
134 return
136 response = json.loads(body)
137 # Made it so the device can put the correlation_id either in the body or in the properties
138 received_correlation_id = response.get("correlation_id", None)
139 if received_correlation_id is None and properties.correlation_id is not None:
140 received_correlation_id = properties.correlation_id
142 if self.correlation_id == received_correlation_id:
143 # Clean up response as it is returned afterwards
144 if "correlation_id" in response:
145 del response["correlation_id"]
146 self.response = response
147 channel.basic_ack(method_frame.delivery_tag)
148 return
150 requeue = True
151 if received_correlation_id is None:
152 requeue = False
153 channel.basic_nack(method_frame.delivery_tag, requeue=requeue)