Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/messages.py: 88%
42 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-03 07:30 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-03 07:30 +0000
1import time
2import os
3import logging
5import json
6import pika
8logger = logging.getLogger("uvicorn.error")
10RABBITMQ_HOST = os.environ.get(
11 "RABBITMQ_HOST", "cloud-broker"
12) # Not a parameter because bundled with other backend services
13RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME", "twinpad")
14RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "twinpad")
17def get_channel():
18 broker_url = f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:5672/%2F"
19 parameters = pika.URLParameters(broker_url)
20 rmq_connected = False
21 while not rmq_connected:
22 try:
23 connection = pika.BlockingConnection(parameters)
24 rmq_connected = True
26 except pika.exceptions.AMQPConnectionError:
27 print("Waiting 5 seconds for RabbitMQ to be up...")
28 time.sleep(5.0)
30 logger.info("Connecting to broker...")
31 logger.info("Waiting for broker to be up: broker URL: %s", broker_url)
33 # print("Connected to RabbitMQ")
34 channel = connection.channel()
35 return channel
38def send_mode_change(device_id: str, mode_id: int, timestamp: int | None = None):
39 channel = get_channel()
40 payload = {"device_id": device_id, "mode_id": mode_id}
41 if timestamp:
42 payload["timestamp"] = timestamp
43 channel.basic_publish(
44 "amq.topic",
45 f"commands.{device_id}",
46 json.dumps(payload),
47 pika.BasicProperties(content_type="text/plain", delivery_mode=pika.DeliveryMode.Persistent),
48 )
51def send_signal_value(signal_id: str, update):
52 """Send commands or forced value."""
53 channel = get_channel()
54 payload = {"signal_id": signal_id}
55 if update.timestamp is not None:
56 payload["timestamp"] = update.timestamp
57 if update.value is not None:
58 payload["value"] = update.value
59 payload["forced_value"] = update.forced_value
60 if update.forced_value is None:
61 del payload["forced_value"]
63 device_id = signal_id.split(".")[0]
65 channel.basic_publish(
66 "amq.topic",
67 f"commands.{device_id}",
68 json.dumps(payload),
69 pika.BasicProperties(content_type="text/plain", delivery_mode=pika.DeliveryMode.Persistent),
70 )