Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/messages.py: 89%
70 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 14:27 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 14:27 +0000
1import time
2import os
3import logging
4import uuid
5import asyncio
7import json
8import pika
9import pika.exceptions
11logger = logging.getLogger("uvicorn.error")
13RABBITMQ_HOST = os.environ.get(
14 "RABBITMQ_HOST", "cloud-broker"
15) # Not a parameter because bundled with other backend services
16RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME", "twinpad")
17RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "twinpad")
19declared_queues = []
22def get_channel():
23 broker_url = f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:5672/%2F"
24 parameters = pika.URLParameters(broker_url)
25 rmq_connected = False
26 while not rmq_connected:
27 try:
28 connection = pika.BlockingConnection(parameters)
29 rmq_connected = True
31 except pika.exceptions.AMQPConnectionError:
32 print("Waiting 5 seconds for RabbitMQ to be up...")
33 time.sleep(5.0)
35 logger.info("Connecting to broker...")
36 logger.info("Waiting for broker to be up: broker URL: %s", broker_url)
38 # print("Connected to RabbitMQ")
39 channel = connection.channel()
40 return channel
43async def send_mode_change(device_id: str, mode_id: int, timestamp: int | None = None):
44 channel = get_channel()
45 payload = {"device_id": device_id, "mode_id": mode_id}
46 if timestamp:
47 payload["timestamp"] = timestamp
49 return await send_message_wait_for_response(
50 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5
51 )
54async def send_signal_value(signal_id: str, update) -> dict:
55 """Send commands or forced value."""
56 channel = get_channel()
57 payload = {"signal_id": signal_id}
58 if update.timestamp is not None:
59 payload["timestamp"] = update.timestamp
60 if update.value is not None:
61 payload["value"] = update.value
62 payload["forced_value"] = update.forced_value
63 if update.forced_value is None:
64 del payload["forced_value"]
66 device_id = signal_id.split(".")[0]
68 return await send_message_wait_for_response(
69 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5
70 )
73async def send_message_wait_for_response(
74 channel, payload, queue_name: str, callback_queue_name: str, timeout: int = 5
75) -> dict:
76 correlation_id = str(uuid.uuid4())
78 if callback_queue_name not in declared_queues:
79 channel.queue_declare(queue=callback_queue_name, exclusive=False)
80 channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name)
81 declared_queues.append(callback_queue_name)
83 channel.basic_publish(
84 "amq.topic",
85 queue_name,
86 json.dumps(payload),
87 pika.BasicProperties(
88 content_type="text/plain",
89 delivery_mode=pika.DeliveryMode.Persistent,
90 reply_to=callback_queue_name,
91 correlation_id=correlation_id,
92 ),
93 )
95 response = None
96 start = time.time()
97 while response is None and time.time() - start < timeout:
98 try:
99 method_frame, properties, body = channel.basic_get(callback_queue_name)
100 except Exception as e:
101 return {"error": True, "status_code": 504, "message": str(e)}
102 if body:
103 if correlation_id == properties.correlation_id:
104 channel.basic_ack(method_frame.delivery_tag)
105 response = json.loads(body)
106 continue
107 else:
108 channel.basic_nack(method_frame.delivery_tag)
109 await asyncio.sleep(0.1)
111 if response is None:
112 response = {"error": True, "status_code": 504, "message": f"Device timed out after {timeout} seconds"}
114 return response