Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / messages.py: 86%
77 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-16 15:19 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-16 15:19 +0000
1import time
2import os
3import logging
4import uuid
6import json
7import pika
8import pika.exceptions
10from pika.adapters.blocking_connection import BlockingChannel
12logger = logging.getLogger("uvicorn.error")
14RABBITMQ_HOST = os.environ.get(
15 "RABBITMQ_HOST", "cloud-broker"
16) # Not a parameter because bundled with other backend services
17RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME")
18RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD")
20RESPONSE_TIMEOUT = 5
23class RabbitMQClient:
24 connection: pika.BlockingConnection
25 channel: BlockingChannel
26 correlation_id: str = None
27 response: dict = None
29 def __init__(self):
30 self.channel = self.get_channel()
32 def get_channel(self):
33 parameters = pika.URLParameters(f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:5672/%2F")
34 rmq_connected = False
35 while not rmq_connected:
36 logger.info("Connecting to broker...")
37 try:
38 self.connection = pika.BlockingConnection(parameters)
39 rmq_connected = True
40 logger.info("Connected to broker")
42 except pika.exceptions.AMQPConnectionError:
43 print("Waiting 5 seconds for RabbitMQ to be up...")
44 time.sleep(5.0)
46 channel = self.connection.channel()
47 return channel
49 async def send_mode_change(self, device_id: str, mode_id: int):
50 payload = {"device_id": device_id, "mode_id": mode_id}
52 return await self.send_message_wait_for_response(
53 payload, f"commands.{device_id}", f"commands_callback.{device_id}"
54 )
56 async def send_signal_value(self, signal_id: str, update) -> dict:
57 """Send commands or forced value."""
58 payload = {"signal_id": signal_id}
59 if update.timestamp is not None:
60 payload["timestamp"] = update.timestamp
61 if update.value is not None:
62 payload["value"] = update.value
63 payload["forced_value"] = update.forced_value
64 if update.forced_value is None:
65 del payload["forced_value"]
67 device_id = signal_id.split(".")[0]
69 return await self.send_message_wait_for_response(
70 payload, f"commands.{device_id}", f"commands_callback.{device_id}"
71 )
73 async def send_message_wait_for_response(self, payload, queue_name: str, callback_queue_name: str) -> dict:
74 self.correlation_id = str(uuid.uuid4())
76 payload["correlation_id"] = self.correlation_id
77 payload["reply_to"] = callback_queue_name
79 self.channel.queue_declare(queue=callback_queue_name, durable=True, exclusive=False)
80 self.channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name)
82 self.channel.basic_publish(
83 "amq.topic",
84 queue_name,
85 json.dumps(payload),
86 pika.BasicProperties(
87 content_type="text/plain",
88 delivery_mode=pika.DeliveryMode.Persistent,
89 reply_to=callback_queue_name,
90 correlation_id=self.correlation_id,
91 ),
92 )
94 self.channel.basic_consume(callback_queue_name, self.on_callback_message, auto_ack=False, exclusive=False)
95 # blocks processes for a certain time to ensure consumption is done
96 while self.response is None:
97 self.connection.process_data_events(time_limit=RESPONSE_TIMEOUT)
99 if self.response is None:
100 self.response = {
101 "error": True,
102 "status_code": 504,
103 "message": f"Device timed out after {RESPONSE_TIMEOUT} seconds",
104 }
106 return self.response
108 def on_callback_message(self, channel: BlockingChannel, method_frame, properties, body):
109 if not body:
110 return
112 response = json.loads(body)
113 # Made it so the device can put the correlation_id either in the body or in the properties
114 received_correlation_id = response.get("correlation_id", None)
115 if received_correlation_id is None and properties.correlation_id is not None:
116 received_correlation_id = properties.correlation_id
118 if self.correlation_id == received_correlation_id:
119 channel.basic_ack(method_frame.delivery_tag)
120 # Clean up response as it is returned afterwards
121 if "correlation_id" in response:
122 del response["correlation_id"]
123 self.response = response
124 channel.basic_ack(method_frame.delivery_tag)
125 return
127 requeue = True
128 if received_correlation_id is None:
129 requeue = False
130 channel.basic_nack(method_frame.delivery_tag, requeue=requeue)