Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/xlsx_to_json.py: 0%
166 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
1import os
2import datetime
3import json
4from ast import literal_eval
5import time
7# import yaml
9import jsonschema.validators
10from openpyxl import Workbook, load_workbook
12from data_model import (
13 Component,
14 Signal,
15 Address,
16 TransferFunction,
17 Mode,
18 DIGITIZATION_FUNCTIONS_FROM_REFERENCE,
19 Parameter,
20 EventRule,
21 EtherCatLoop,
22 EtherCatModule,
23 EtherCatTopology,
24 XLSX_HEADER,
25)
28excels_filename = [
29 "clamping.xlsx",
30 "loxbox.xlsx",
31 "launcher_mockup.xlsx",
32 "orchestrator.xlsx",
33 "NominalConf09_GOC 2.xlsx",
34 "NominalConf09_LOC 2.xlsx",
35 "ArchiveMaxConf09_GOC 1.xlsx",
36 "ArchiveMaxConf09_LOC 1.xlsx",
37]
39extended_configs = [
40 ("loxbox.xlsx", "loxbox_valves-demo.xlsx"),
41 ("launcher_mockup.xlsx", "launcher_mockup_valves-demo.xlsx"),
42]
45def boolean_cell(cell, default_value):
46 if cell.value is None:
47 return default_value
48 cell_lower = cell.value.lower()
49 if cell_lower == "yes":
50 return True
51 if cell_lower == "no":
52 return False
53 raise ValueError(f"Cell value is invalid: {cell_lower}")
56def is_empty_row(row):
57 row_values = list(cell.value for cell in row if cell.value is not None)
58 return len(row_values) == 0
61def device_config(excel_filename):
62 # Load petri if exists
63 excel_name = excel_filename.split(".")[0]
65 wb = Workbook()
66 wb = load_workbook(filename=os.path.join("xlsx", excel_filename))
68 # acq, hw_list = wb.worksheets
70 sheets_by_name = {s.title: s for s in wb.worksheets}
71 signals_sheet = sheets_by_name["signals"]
72 components_sheet = sheets_by_name["components"]
73 config_sheet = sheets_by_name["config"]
74 modes_sheet = sheets_by_name["modes"]
75 event_rules_sheet = sheets_by_name["event_rules"]
77 # Finding loops sheets
78 CARD_INDEX_TO_REF = {}
79 TICKER_TO_ADRESS = {}
80 iloop = 0
81 loops = []
82 for sheet in wb.worksheets:
83 if sheet.title.startswith("loop_"):
84 iloop += 1
85 if sheet["B2"].value == "Position":
86 channel_offset = 3
87 elif sheet["B3"].value == "Position":
88 channel_offset = 4
89 else:
90 raise ValueError(
91 f'Invalid excel channel position B2: {sheet["B2"].value}, C2: {sheet["C2"].value}, should be Position'
92 )
94 # Acquisition cards
95 for irow, row in enumerate(sheet.iter_rows(channel_offset)):
96 if is_empty_row(row):
97 break
98 for icell, cell in enumerate(row[2:]):
99 if cell.value:
100 TICKER_TO_ADRESS[cell.value] = Address(
101 card_number=icell + 1, channel=irow + 1, loop_number=iloop
102 )
104 # cards = []
105 modules = []
106 for icol, (name, reference) in enumerate(sheet.iter_cols(min_col=3, min_row=1, max_row=2)):
107 if is_empty_row((name, reference)):
108 break
109 CARD_INDEX_TO_REF[iloop, icol + 1] = reference.value
110 # cards.append(col[0].value)
111 module = EtherCatModule(
112 name=name.value, reference=reference.value if reference.value is not None else ""
113 )
114 modules.append(module)
116 loops.append(EtherCatLoop(modules))
117 topology = EtherCatTopology(loops)
119 # HW list
120 # current_equipment = None
121 # systems = []
122 signals_by_component_id = {}
123 number_signals = 0
124 tickers = set()
125 for irow, row in enumerate(signals_sheet.iter_rows()):
126 if irow == 0:
127 # Check integrity
128 header = list(cell.value for cell in row if cell.value is not None)
129 if header != XLSX_HEADER:
130 raise ValueError(f"Header of XLSX is not valid. got: {header}, expected: {XLSX_HEADER}")
132 else:
133 # Check for empty row
135 if is_empty_row(row):
136 break
137 number_signals += 1
138 signal_type = row[4].value.lower()
139 if signal_type not in ["sensor", "command", "external_sensor"]:
140 raise ValueError(f"Should be either sensor, command or external_sensor, got {row[4].value}")
142 ticker = row[0].value
143 if not ticker:
144 raise RuntimeError("Ticker should not be empty")
145 tickers.add(ticker)
147 unit = row[3].value
148 intervals = literal_eval(row[6].value) if row[6].value is not None else None
149 sensor_transfer_function = TransferFunction(intervals) if intervals else None
150 address = TICKER_TO_ADRESS.get(ticker, None)
151 type = row[8].value
152 if type:
153 type = type.replace('"', "'")
154 formula = row[9].value
155 if formula is not None:
156 formula = formula.lstrip(" ")
157 if formula.startswith("{"):
158 # Test formula mapping is parsable
159 literal_eval(formula)
161 if address:
162 digitization_function = DIGITIZATION_FUNCTIONS_FROM_REFERENCE[
163 CARD_INDEX_TO_REF[address.loop_number, address.card_number]
164 ]
165 try:
166 transfer_function = digitization_function.to_transfer_function().compose(sensor_transfer_function)
167 except ValueError as exc:
168 print(f"Problem with formula of {ticker}: {exc}")
169 raise exc
171 else:
172 digitization_function = None
173 transfer_function = None
175 if formula is None and address is None and signal_type not in ["external_sensor", "command"]:
176 raise RuntimeError(
177 f"{ticker} should either be on I/O cards or a formula or be an interface. signal_type: {signal_type}"
178 )
180 if formula and address and signal_type != "command":
181 raise RuntimeError(
182 f"{ticker} is both on I/O cards and has a formula. This is forbidden for type {signal_type}."
183 )
185 # # Changing signal_value interface -> command if check above is ok
186 # if signal_type == "interface":
187 # signal_type = "command"
189 forcible = boolean_cell(row[10], True)
190 commandable = boolean_cell(row[11], True)
191 broadcastable = boolean_cell(row[12], True)
193 signal = Signal(
194 ticker=ticker,
195 description=row[2].value if row[2].value else "",
196 unit=unit,
197 type=signal_type,
198 address=address,
199 frequency=row[5].value,
200 transfer_function=transfer_function,
201 precision_digits=row[7].value,
202 digitization_function=digitization_function,
203 data_type=type,
204 formula=formula,
205 forcible=forcible,
206 commandable=commandable,
207 broadcastable=broadcastable,
208 )
210 component_id = row[1].value
211 signals_by_component_id.setdefault(component_id, [])
212 signals_by_component_id[component_id].append(signal)
214 # current_component.signals.append(signal)
216 if number_signals != len(tickers):
217 raise RuntimeError("Duplicate Ticker!!!")
219 col_to_parameters = {
220 c.column: c.value for c in list(components_sheet.iter_rows(max_row=1, min_col=4))[0] if c.value is not None
221 }
222 components = []
224 for component_id, name, reference, *extra_parameters in components_sheet.iter_rows(min_row=2):
225 if component_id.value is None:
226 break
227 reference = reference.value if reference.value is not None else ""
229 parameters = [
230 Parameter(name=col_to_parameters[c.column], value=c.value) for c in extra_parameters if c.value is not None
231 ]
232 components.append(
233 Component(
234 id=component_id.value,
235 name=name.value,
236 signals=signals_by_component_id[component_id.value],
237 reference=reference,
238 parameters=parameters,
239 )
240 )
242 modes = []
243 for imode, (mode_name, freq_multiplier, min_frequency) in enumerate(modes_sheet.iter_rows(2, max_col=3)):
244 modes.append(
245 Mode(
246 mode_id=imode + 1,
247 name=mode_name.value,
248 frequency_multiplier=freq_multiplier.value,
249 min_frequency=min_frequency.value,
250 )
251 )
252 config = {"description": ""} # Downwards compatibility
253 for row in config_sheet.iter_rows():
254 config[row[0].value] = row[1].value
256 event_rules = []
257 for row in event_rules_sheet.iter_rows(min_row=2):
258 if row[1].value is None or row[2].value is None:
259 break
260 event_rules.append(EventRule(row[1].value, row[2].value))
262 insert_classname = False
263 hwconfig = {
264 "$schema": "https://gitea.spacedreams.com/SpaceDreams/ConfigManager/src/branch/master/schemas/device-config_v1.json",
265 # "generated at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
266 "generated_at": round(time.time()),
267 "config": config,
268 "components": [c.to_dict(insert_classname=insert_classname) for c in components],
269 "modes": [m.to_dict(insert_classname=insert_classname) for m in modes],
270 "hardware_topology": topology.to_dict(insert_classname=insert_classname),
271 "event_rules": [e.to_dict(insert_classname=insert_classname) for e in event_rules],
272 }
274 # Adding petri
275 petri_filepath = os.path.join("petri", f"{excel_name}.json")
276 if os.path.isfile(petri_filepath):
277 with open(petri_filepath, "r") as petri_file:
278 petri = json.load(petri_file)
279 hwconfig["petri_network"] = petri
281 # Adding petri
282 pid_filepath = os.path.join("pid", f"{excel_name}.json")
283 if os.path.isfile(pid_filepath):
284 with open(pid_filepath, "r") as pid_file:
285 pid = json.load(pid_file)
286 hwconfig["pid"] = pid
288 # Validate against schema
289 with open("schemas/device-config_v1.json", "r", encoding="utf-8") as schema_file:
290 CONFIG_SCHEMA = json.load(schema_file)
292 jsonschema.validators.validate(hwconfig, CONFIG_SCHEMA)
294 return hwconfig
296 # output_yaml = excel_filename.replace(".xlsx", "") + ".yaml"
297 # with open(output_yaml, "w") as file:
298 # yaml.dump(hwconfig, file)
301for filename in excels_filename:
302 print(f"Generating file {filename}")
303 config = device_config(filename)
305 output_json = os.path.join("json", filename.replace(".xlsx", "") + ".json")
306 with open(output_json, "w") as file:
307 json.dump(config, file, indent=2)
309for base_config_filename, extension_filename in extended_configs:
310 print(f"Generating extended config {base_config_filename} + {extension_filename}")
311 config = device_config(base_config_filename)
312 extended_config = device_config(extension_filename)
314 new_components = config["components"].copy()
315 # print('new_components', new_components)
316 for component in extended_config["components"]:
317 # Find component index in base config
318 for icomp, component2 in enumerate(config["components"]):
319 if component2["id"] == component["id"]:
320 new_components[icomp] = component
321 break
322 config["components"] = new_components
324 output_json = os.path.join("json", extension_filename.replace(".xlsx", "") + ".json")
325 with open(output_json, "w") as file:
326 json.dump(config, file, indent=2)