Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/messages.py: 88%
75 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-24 11:51 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-24 11:51 +0000
1import time
2import os
3import logging
4import uuid
5import asyncio
7import json
8import pika
9import pika.exceptions
11logger = logging.getLogger("uvicorn.error")
13RABBITMQ_HOST = os.environ.get(
14 "RABBITMQ_HOST", "cloud-broker"
15) # Not a parameter because bundled with other backend services
16RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME", "twinpad")
17RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "twinpad")
19declared_queues = []
22def get_channel():
23 broker_url = f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:5672/%2F"
24 parameters = pika.URLParameters(broker_url)
25 rmq_connected = False
26 while not rmq_connected:
27 try:
28 connection = pika.BlockingConnection(parameters)
29 rmq_connected = True
31 except pika.exceptions.AMQPConnectionError:
32 print("Waiting 5 seconds for RabbitMQ to be up...")
33 time.sleep(5.0)
35 logger.info("Connecting to broker...")
36 logger.info("Waiting for broker to be up: broker URL: %s", broker_url)
38 # print("Connected to RabbitMQ")
39 channel = connection.channel()
40 return channel
43async def send_mode_change(device_id: str, mode_id: int):
44 channel = get_channel()
45 payload = {"device_id": device_id, "mode_id": mode_id}
47 return await send_message_wait_for_response(
48 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5
49 )
52async def send_signal_value(signal_id: str, update) -> dict:
53 """Send commands or forced value."""
54 channel = get_channel()
55 payload = {"signal_id": signal_id}
56 if update.timestamp is not None:
57 payload["timestamp"] = update.timestamp
58 if update.value is not None:
59 payload["value"] = update.value
60 payload["forced_value"] = update.forced_value
61 if update.forced_value is None:
62 del payload["forced_value"]
64 device_id = signal_id.split(".")[0]
66 return await send_message_wait_for_response(
67 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5
68 )
71async def send_message_wait_for_response(
72 channel, payload, queue_name: str, callback_queue_name: str, timeout: int = 5
73) -> dict:
74 correlation_id = str(uuid.uuid4())
76 payload["correlation_id"] = correlation_id
77 payload["reply_to"] = callback_queue_name
79 if callback_queue_name not in declared_queues:
80 channel.queue_declare(queue=callback_queue_name, exclusive=False)
81 channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name)
82 declared_queues.append(callback_queue_name)
84 channel.basic_publish(
85 "amq.topic",
86 queue_name,
87 json.dumps(payload),
88 pika.BasicProperties(
89 content_type="text/plain",
90 delivery_mode=pika.DeliveryMode.Persistent,
91 reply_to=callback_queue_name,
92 correlation_id=correlation_id,
93 ),
94 )
96 response = None
97 start = time.time()
98 while response is None and time.time() - start < timeout:
99 try:
100 method_frame, properties, body = channel.basic_get(callback_queue_name)
101 except Exception as e:
102 return {"error": True, "status_code": 504, "message": str(e)}
103 if body:
104 response = json.loads(body)
106 # Made it so the device can put the correlation_id either in the body or in the properties
107 received_correlation_id = response.get("correlation_id", None)
108 if received_correlation_id is None and properties.correlation_id is not None:
109 received_correlation_id = properties.correlation_id
111 if correlation_id == received_correlation_id:
112 channel.basic_ack(method_frame.delivery_tag)
113 # Clean up response as it is returned afterwards
114 if "correlation_id" in response:
115 del response["correlation_id"]
116 continue
117 channel.basic_nack(method_frame.delivery_tag)
118 await asyncio.sleep(0.1)
120 if response is None:
121 response = {"error": True, "status_code": 504, "message": f"Device timed out after {timeout} seconds"}
123 return response