Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/messages.py: 88%
74 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
1import time
2import os
3import logging
4import uuid
5import asyncio
7import json
8import pika
9import pika.exceptions
11logger = logging.getLogger("uvicorn.error")
13RABBITMQ_HOST = os.environ.get(
14 "RABBITMQ_HOST", "cloud-broker"
15) # Not a parameter because bundled with other backend services
16RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME", "twinpad")
17RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "twinpad")
19declared_queues = []
22def get_channel():
23 broker_url = f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:5672/%2F"
24 parameters = pika.URLParameters(broker_url)
25 rmq_connected = False
26 while not rmq_connected:
27 try:
28 connection = pika.BlockingConnection(parameters)
29 rmq_connected = True
31 except pika.exceptions.AMQPConnectionError:
32 print("Waiting 5 seconds for RabbitMQ to be up...")
33 time.sleep(5.0)
35 logger.info("Connecting to broker...")
36 logger.info("Waiting for broker to be up: broker URL: %s", broker_url)
38 # print("Connected to RabbitMQ")
39 channel = connection.channel()
40 return channel
43async def send_mode_change(device_id: str, mode_id: int):
44 channel = get_channel()
45 payload = {"device_id": device_id, "mode_id": mode_id}
47 return await send_message_wait_for_response(
48 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5
49 )
52async def send_signal_value(device_id: str, signal_id: str, update) -> dict:
53 """Send commands or forced value."""
54 channel = get_channel()
55 payload = {"signal_id": signal_id}
56 if update.timestamp is not None:
57 payload["timestamp"] = update.timestamp
58 if update.value is not None:
59 payload["value"] = update.value
60 payload["forced_value"] = update.forced_value
61 if update.forced_value is None:
62 del payload["forced_value"]
64 return await send_message_wait_for_response(
65 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5
66 )
69async def send_message_wait_for_response(
70 channel, payload, queue_name: str, callback_queue_name: str, timeout: int = 5
71) -> dict:
72 correlation_id = str(uuid.uuid4())
74 payload["correlation_id"] = correlation_id
75 payload["reply_to"] = callback_queue_name
77 if callback_queue_name not in declared_queues:
78 channel.queue_declare(queue=callback_queue_name, exclusive=False)
79 channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name)
80 declared_queues.append(callback_queue_name)
82 channel.basic_publish(
83 "amq.topic",
84 queue_name,
85 json.dumps(payload),
86 pika.BasicProperties(
87 content_type="text/plain",
88 delivery_mode=pika.DeliveryMode.Persistent,
89 reply_to=callback_queue_name,
90 correlation_id=correlation_id,
91 ),
92 )
94 response = None
95 start = time.time()
96 while response is None and time.time() - start < timeout:
97 try:
98 method_frame, properties, body = channel.basic_get(callback_queue_name)
99 except Exception as e:
100 return {"error": True, "status_code": 504, "message": str(e)}
101 if body:
102 response = json.loads(body)
104 # Made it so the device can put the correlation_id either in the body or in the properties
105 received_correlation_id = response.get("correlation_id", None)
106 if received_correlation_id is None and properties.correlation_id is not None:
107 received_correlation_id = properties.correlation_id
109 if correlation_id == received_correlation_id:
110 channel.basic_ack(method_frame.delivery_tag)
111 # Clean up response as it is returned afterwards
112 if "correlation_id" in response:
113 del response["correlation_id"]
114 continue
115 channel.basic_nack(method_frame.delivery_tag)
116 await asyncio.sleep(0.1)
118 if response is None:
119 response = {"error": True, "status_code": 504, "message": f"Device timed out after {timeout} seconds"}
121 return response