Coverage for /usr/local/lib/python3.14/site-packages/twinpad_backend/messages.py: 100%
78 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-01 08:23 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-01 08:23 +0000
1import logging
2import os
3import socket
4import time
5import uuid
7import json
8import pika
9import pika.exceptions
11from pika.adapters.blocking_connection import BlockingChannel
13logger = logging.getLogger("uvicorn.error")
15RABBITMQ_HOST = os.environ.get(
16 "RABBITMQ_HOST", "cloud-broker"
17) # Not a parameter because bundled with other backend services
18RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME")
19RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD")
21RESPONSE_TIMEOUT = 5
24class RabbitMQClient:
25 connection: pika.BlockingConnection
26 channel: BlockingChannel
27 correlation_id: str = None
28 response: dict = None
30 def __init__(self, broker_host: str = RABBITMQ_HOST):
31 self.channel = self.get_channel(broker_host)
33 def get_channel(self, broker_host: str = RABBITMQ_HOST):
34 parameters = pika.URLParameters(f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{broker_host}:5672/%2F")
35 logger.info("Connecting to broker...")
36 try:
37 self.connection = pika.BlockingConnection(parameters)
38 logger.info("Connected to broker")
39 except (pika.exceptions.AMQPConnectionError, socket.gaierror):
40 logger.info("Failed to connect to broker")
41 return None
43 channel = self.connection.channel()
44 return channel
46 async def send_mode_change(self, device_id: str, mode_id: int):
47 payload = {"device_id": device_id, "mode_id": mode_id}
49 return await self.send_message_wait_for_response(
50 payload, f"commands.{device_id}", f"commands_callback.{device_id}"
51 )
53 async def send_signal_value(self, signal_id: str, update) -> dict:
54 """Send commands or forced value."""
55 payload = {"signal_id": signal_id}
56 if update.timestamp is not None:
57 payload["timestamp"] = update.timestamp
58 if update.value is not None:
59 payload["value"] = update.value
60 payload["forced_value"] = update.forced_value
61 if update.forced_value is None:
62 del payload["forced_value"]
64 device_id = signal_id.split(".")[0]
66 return await self.send_message_wait_for_response(
67 payload, f"commands.{device_id}", f"commands_callback.{device_id}"
68 )
70 async def send_message_wait_for_response(self, payload, queue_name: str, callback_queue_name: str) -> dict:
71 if self.channel is None:
72 return {"error": True, "status_code": 502, "message": "Could not connect to the message broker"}
73 self.correlation_id = str(uuid.uuid4())
75 payload["correlation_id"] = self.correlation_id
76 payload["reply_to"] = callback_queue_name
78 self.channel.queue_declare(
79 queue=callback_queue_name, durable=True, exclusive=False, arguments={"x-message-ttl": 10000}
80 )
81 self.channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name)
83 self.channel.basic_publish(
84 "amq.topic",
85 queue_name,
86 json.dumps(payload),
87 pika.BasicProperties(
88 content_type="text/plain",
89 delivery_mode=pika.DeliveryMode.Persistent,
90 reply_to=callback_queue_name,
91 correlation_id=self.correlation_id,
92 ),
93 )
95 consumer_tag = self.channel.basic_consume(
96 callback_queue_name, self.on_callback_message, auto_ack=False, exclusive=False
97 )
98 # blocks processes for a certain time to ensure consumption is done
99 start_time = time.time()
100 while self.response is None and time.time() < start_time + RESPONSE_TIMEOUT:
101 self.connection.process_data_events(time_limit=max(0, time.time() - start_time + RESPONSE_TIMEOUT))
103 self.channel.stop_consuming(consumer_tag)
104 if self.response is None:
105 self.response = {
106 "error": True,
107 "status_code": 504,
108 "message": f"Device timed out after {RESPONSE_TIMEOUT} seconds",
109 }
111 return self.response
113 def on_callback_message(self, channel: BlockingChannel, method_frame, properties, body):
114 if not body:
115 return
117 response = json.loads(body)
118 # Made it so the device can put the correlation_id either in the body or in the properties
119 received_correlation_id = response.get("correlation_id", None)
120 if received_correlation_id is None and properties.correlation_id is not None:
121 received_correlation_id = properties.correlation_id
123 if self.correlation_id == received_correlation_id:
124 # Clean up response as it is returned afterwards
125 if "correlation_id" in response:
126 del response["correlation_id"]
127 self.response = response
128 channel.basic_ack(method_frame.delivery_tag)
129 return
131 requeue = True
132 if received_correlation_id is None:
133 requeue = False
134 channel.basic_nack(method_frame.delivery_tag, requeue=requeue)