Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/messages.py: 88%

75 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-23 07:38 +0000

1import time 

2import os 

3import logging 

4import uuid 

5import asyncio 

6 

7import json 

8import pika 

9import pika.exceptions 

10 

11logger = logging.getLogger("uvicorn.error") 

12 

13RABBITMQ_HOST = os.environ.get( 

14 "RABBITMQ_HOST", "cloud-broker" 

15) # Not a parameter because bundled with other backend services 

16RABBITMQ_USERNAME = os.environ.get("RABBITMQ_USERNAME", "twinpad") 

17RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "twinpad") 

18 

19declared_queues = [] 

20 

21 

22def get_channel(): 

23 broker_url = f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:5672/%2F" 

24 parameters = pika.URLParameters(broker_url) 

25 rmq_connected = False 

26 while not rmq_connected: 

27 try: 

28 connection = pika.BlockingConnection(parameters) 

29 rmq_connected = True 

30 

31 except pika.exceptions.AMQPConnectionError: 

32 print("Waiting 5 seconds for RabbitMQ to be up...") 

33 time.sleep(5.0) 

34 

35 logger.info("Connecting to broker...") 

36 logger.info("Waiting for broker to be up: broker URL: %s", broker_url) 

37 

38 # print("Connected to RabbitMQ") 

39 channel = connection.channel() 

40 return channel 

41 

42 

43async def send_mode_change(device_id: str, mode_id: int): 

44 channel = get_channel() 

45 payload = {"device_id": device_id, "mode_id": mode_id} 

46 

47 return await send_message_wait_for_response( 

48 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5 

49 ) 

50 

51 

52async def send_signal_value(signal_id: str, update) -> dict: 

53 """Send commands or forced value.""" 

54 channel = get_channel() 

55 payload = {"signal_id": signal_id} 

56 if update.timestamp is not None: 

57 payload["timestamp"] = update.timestamp 

58 if update.value is not None: 

59 payload["value"] = update.value 

60 payload["forced_value"] = update.forced_value 

61 if update.forced_value is None: 

62 del payload["forced_value"] 

63 

64 device_id = signal_id.split(".")[0] 

65 

66 return await send_message_wait_for_response( 

67 channel, payload, f"commands.{device_id}", f"commands_callback.{device_id}", timeout=5 

68 ) 

69 

70 

71async def send_message_wait_for_response( 

72 channel, payload, queue_name: str, callback_queue_name: str, timeout: int = 5 

73) -> dict: 

74 correlation_id = str(uuid.uuid4()) 

75 

76 payload["correlation_id"] = correlation_id 

77 payload["reply_to"] = callback_queue_name 

78 

79 if callback_queue_name not in declared_queues: 

80 channel.queue_declare(queue=callback_queue_name, exclusive=False) 

81 channel.queue_bind(exchange="amq.topic", queue=callback_queue_name, routing_key=callback_queue_name) 

82 declared_queues.append(callback_queue_name) 

83 

84 channel.basic_publish( 

85 "amq.topic", 

86 queue_name, 

87 json.dumps(payload), 

88 pika.BasicProperties( 

89 content_type="text/plain", 

90 delivery_mode=pika.DeliveryMode.Persistent, 

91 reply_to=callback_queue_name, 

92 correlation_id=correlation_id, 

93 ), 

94 ) 

95 

96 response = None 

97 start = time.time() 

98 while response is None and time.time() - start < timeout: 

99 try: 

100 method_frame, properties, body = channel.basic_get(callback_queue_name) 

101 except Exception as e: 

102 return {"error": True, "status_code": 504, "message": str(e)} 

103 if body: 

104 response = json.loads(body) 

105 

106 # Made it so the device can put the correlation_id either in the body or in the properties 

107 received_correlation_id = response.get("correlation_id", None) 

108 if received_correlation_id is None and properties.correlation_id is not None: 

109 received_correlation_id = properties.correlation_id 

110 

111 if correlation_id == received_correlation_id: 

112 channel.basic_ack(method_frame.delivery_tag) 

113 # Clean up response as it is returned afterwards 

114 if "correlation_id" in response: 

115 del response["correlation_id"] 

116 continue 

117 channel.basic_nack(method_frame.delivery_tag) 

118 await asyncio.sleep(0.1) 

119 

120 if response is None: 

121 response = {"error": True, "status_code": 504, "message": f"Device timed out after {timeout} seconds"} 

122 

123 return response