Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/messages.py: 88%

42 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-03 07:30 +0000

1import time 

2import os 

3import logging 

4 

5import json 

6import pika 

7 

8logger = logging.getLogger("uvicorn.error") 

9 

10RABBITMQ_HOST = os.environ.get( 

11 "RABBITMQ_HOST", "cloud-broker" 

12) # Not a parameter because bundled with other backend services 

13RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME", "twinpad") 

14RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "twinpad") 

15 

16 

17def get_channel(): 

18 broker_url = f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:5672/%2F" 

19 parameters = pika.URLParameters(broker_url) 

20 rmq_connected = False 

21 while not rmq_connected: 

22 try: 

23 connection = pika.BlockingConnection(parameters) 

24 rmq_connected = True 

25 

26 except pika.exceptions.AMQPConnectionError: 

27 print("Waiting 5 seconds for RabbitMQ to be up...") 

28 time.sleep(5.0) 

29 

30 logger.info("Connecting to broker...") 

31 logger.info("Waiting for broker to be up: broker URL: %s", broker_url) 

32 

33 # print("Connected to RabbitMQ") 

34 channel = connection.channel() 

35 return channel 

36 

37 

38def send_mode_change(device_id: str, mode_id: int, timestamp: int | None = None): 

39 channel = get_channel() 

40 payload = {"device_id": device_id, "mode_id": mode_id} 

41 if timestamp: 

42 payload["timestamp"] = timestamp 

43 channel.basic_publish( 

44 "amq.topic", 

45 f"commands.{device_id}", 

46 json.dumps(payload), 

47 pika.BasicProperties(content_type="text/plain", delivery_mode=pika.DeliveryMode.Persistent), 

48 ) 

49 

50 

51def send_signal_value(signal_id: str, update): 

52 """Send commands or forced value.""" 

53 channel = get_channel() 

54 payload = {"signal_id": signal_id} 

55 if update.timestamp is not None: 

56 payload["timestamp"] = update.timestamp 

57 if update.value is not None: 

58 payload["value"] = update.value 

59 payload["forced_value"] = update.forced_value 

60 if update.forced_value is None: 

61 del payload["forced_value"] 

62 

63 device_id = signal_id.split(".")[0] 

64 

65 channel.basic_publish( 

66 "amq.topic", 

67 f"commands.{device_id}", 

68 json.dumps(payload), 

69 pika.BasicProperties(content_type="text/plain", delivery_mode=pika.DeliveryMode.Persistent), 

70 )