Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/messages.py: 88%
68 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 10:53 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 10:53 +0000
1import time
2import os
3import logging
4import uuid
5import asyncio
7import json
8import pika
9import pika.exceptions
11logger = logging.getLogger("uvicorn.error")
13RABBITMQ_HOST = os.environ.get(
14 "RABBITMQ_HOST", "cloud-broker"
15) # Not a parameter because bundled with other backend services
16RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME", "twinpad")
17RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "twinpad")
19declared_queues = []
22def get_channel():
23 broker_url = f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:5672/%2F"
24 parameters = pika.URLParameters(broker_url)
25 rmq_connected = False
26 while not rmq_connected:
27 try:
28 connection = pika.BlockingConnection(parameters)
29 rmq_connected = True
31 except pika.exceptions.AMQPConnectionError:
32 print("Waiting 5 seconds for RabbitMQ to be up...")
33 time.sleep(5.0)
35 logger.info("Connecting to broker...")
36 logger.info("Waiting for broker to be up: broker URL: %s", broker_url)
38 # print("Connected to RabbitMQ")
39 channel = connection.channel()
40 return channel
43async def send_mode_change(device_id: str, mode_id: int):
44 channel = get_channel()
45 payload = {"device_id": device_id, "mode_id": mode_id}
47 return await send_message_wait_for_response(
48 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5
49 )
52async def send_signal_value(signal_id: str, update) -> dict:
53 """Send commands or forced value."""
54 channel = get_channel()
55 payload = {"signal_id": signal_id}
56 if update.timestamp is not None:
57 payload["timestamp"] = update.timestamp
58 if update.value is not None:
59 payload["value"] = update.value
60 payload["forced_value"] = update.forced_value
61 if update.forced_value is None:
62 del payload["forced_value"]
64 device_id = signal_id.split(".")[0]
66 return await send_message_wait_for_response(
67 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5
68 )
71async def send_message_wait_for_response(
72 channel, payload, queue_name: str, callback_queue_name: str, timeout: int = 5
73) -> dict:
74 correlation_id = str(uuid.uuid4())
76 if callback_queue_name not in declared_queues:
77 channel.queue_declare(queue=callback_queue_name, exclusive=False)
78 channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name)
79 declared_queues.append(callback_queue_name)
81 channel.basic_publish(
82 "amq.topic",
83 queue_name,
84 json.dumps(payload),
85 pika.BasicProperties(
86 content_type="text/plain",
87 delivery_mode=pika.DeliveryMode.Persistent,
88 reply_to=callback_queue_name,
89 correlation_id=correlation_id,
90 ),
91 )
93 response = None
94 start = time.time()
95 while response is None and time.time() - start < timeout:
96 try:
97 method_frame, properties, body = channel.basic_get(callback_queue_name)
98 except Exception as e:
99 return {"error": True, "status_code": 504, "message": str(e)}
100 if body:
101 if correlation_id == properties.correlation_id:
102 channel.basic_ack(method_frame.delivery_tag)
103 response = json.loads(body)
104 continue
105 channel.basic_nack(method_frame.delivery_tag)
106 await asyncio.sleep(0.1)
108 if response is None:
109 response = {"error": True, "status_code": 504, "message": f"Device timed out after {timeout} seconds"}
111 return response