Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/xlsx_to_json.py: 0%

166 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-20 11:44 +0000

1import os 

2import datetime 

3import json 

4from ast import literal_eval 

5import time 

6 

7# import yaml 

8 

9import jsonschema.validators 

10from openpyxl import Workbook, load_workbook 

11 

12from data_model import ( 

13 Component, 

14 Signal, 

15 Address, 

16 TransferFunction, 

17 Mode, 

18 DIGITIZATION_FUNCTIONS_FROM_REFERENCE, 

19 Parameter, 

20 EventRule, 

21 EtherCatLoop, 

22 EtherCatModule, 

23 EtherCatTopology, 

24 XLSX_HEADER, 

25) 

26 

27 

28excels_filename = [ 

29 "clamping.xlsx", 

30 "loxbox.xlsx", 

31 "launcher_mockup.xlsx", 

32 "orchestrator.xlsx", 

33 "NominalConf09_GOC 2.xlsx", 

34 "NominalConf09_LOC 2.xlsx", 

35 "ArchiveMaxConf09_GOC 1.xlsx", 

36 "ArchiveMaxConf09_LOC 1.xlsx", 

37] 

38 

39extended_configs = [ 

40 ("loxbox.xlsx", "loxbox_valves-demo.xlsx"), 

41 ("launcher_mockup.xlsx", "launcher_mockup_valves-demo.xlsx"), 

42] 

43 

44 

45def boolean_cell(cell, default_value): 

46 if cell.value is None: 

47 return default_value 

48 cell_lower = cell.value.lower() 

49 if cell_lower == "yes": 

50 return True 

51 if cell_lower == "no": 

52 return False 

53 raise ValueError(f"Cell value is invalid: {cell_lower}") 

54 

55 

56def is_empty_row(row): 

57 row_values = list(cell.value for cell in row if cell.value is not None) 

58 return len(row_values) == 0 

59 

60 

61def device_config(excel_filename): 

62 # Load petri if exists 

63 excel_name = excel_filename.split(".")[0] 

64 

65 wb = Workbook() 

66 wb = load_workbook(filename=os.path.join("xlsx", excel_filename)) 

67 

68 # acq, hw_list = wb.worksheets 

69 

70 sheets_by_name = {s.title: s for s in wb.worksheets} 

71 signals_sheet = sheets_by_name["signals"] 

72 components_sheet = sheets_by_name["components"] 

73 config_sheet = sheets_by_name["config"] 

74 modes_sheet = sheets_by_name["modes"] 

75 event_rules_sheet = sheets_by_name["event_rules"] 

76 

77 # Finding loops sheets 

78 CARD_INDEX_TO_REF = {} 

79 TICKER_TO_ADRESS = {} 

80 iloop = 0 

81 loops = [] 

82 for sheet in wb.worksheets: 

83 if sheet.title.startswith("loop_"): 

84 iloop += 1 

85 if sheet["B2"].value == "Position": 

86 channel_offset = 3 

87 elif sheet["B3"].value == "Position": 

88 channel_offset = 4 

89 else: 

90 raise ValueError( 

91 f'Invalid excel channel position B2: {sheet["B2"].value}, C2: {sheet["C2"].value}, should be Position' 

92 ) 

93 

94 # Acquisition cards 

95 for irow, row in enumerate(sheet.iter_rows(channel_offset)): 

96 if is_empty_row(row): 

97 break 

98 for icell, cell in enumerate(row[2:]): 

99 if cell.value: 

100 TICKER_TO_ADRESS[cell.value] = Address( 

101 card_number=icell + 1, channel=irow + 1, loop_number=iloop 

102 ) 

103 

104 # cards = [] 

105 modules = [] 

106 for icol, (name, reference) in enumerate(sheet.iter_cols(min_col=3, min_row=1, max_row=2)): 

107 if is_empty_row((name, reference)): 

108 break 

109 CARD_INDEX_TO_REF[iloop, icol + 1] = reference.value 

110 # cards.append(col[0].value) 

111 module = EtherCatModule( 

112 name=name.value, reference=reference.value if reference.value is not None else "" 

113 ) 

114 modules.append(module) 

115 

116 loops.append(EtherCatLoop(modules)) 

117 topology = EtherCatTopology(loops) 

118 

119 # HW list 

120 # current_equipment = None 

121 # systems = [] 

122 signals_by_component_id = {} 

123 number_signals = 0 

124 tickers = set() 

125 for irow, row in enumerate(signals_sheet.iter_rows()): 

126 if irow == 0: 

127 # Check integrity 

128 header = list(cell.value for cell in row if cell.value is not None) 

129 if header != XLSX_HEADER: 

130 raise ValueError(f"Header of XLSX is not valid. got: {header}, expected: {XLSX_HEADER}") 

131 

132 else: 

133 # Check for empty row 

134 

135 if is_empty_row(row): 

136 break 

137 number_signals += 1 

138 signal_type = row[4].value.lower() 

139 if signal_type not in ["sensor", "command", "external_sensor"]: 

140 raise ValueError(f"Should be either sensor, command or external_sensor, got {row[4].value}") 

141 

142 ticker = row[0].value 

143 if not ticker: 

144 raise RuntimeError("Ticker should not be empty") 

145 tickers.add(ticker) 

146 

147 unit = row[3].value 

148 intervals = literal_eval(row[6].value) if row[6].value is not None else None 

149 sensor_transfer_function = TransferFunction(intervals) if intervals else None 

150 address = TICKER_TO_ADRESS.get(ticker, None) 

151 type = row[8].value 

152 if type: 

153 type = type.replace('"', "'") 

154 formula = row[9].value 

155 if formula is not None: 

156 formula = formula.lstrip(" ") 

157 if formula.startswith("{"): 

158 # Test formula mapping is parsable 

159 literal_eval(formula) 

160 

161 if address: 

162 digitization_function = DIGITIZATION_FUNCTIONS_FROM_REFERENCE[ 

163 CARD_INDEX_TO_REF[address.loop_number, address.card_number] 

164 ] 

165 try: 

166 transfer_function = digitization_function.to_transfer_function().compose(sensor_transfer_function) 

167 except ValueError as exc: 

168 print(f"Problem with formula of {ticker}: {exc}") 

169 raise exc 

170 

171 else: 

172 digitization_function = None 

173 transfer_function = None 

174 

175 if formula is None and address is None and signal_type not in ["external_sensor", "command"]: 

176 raise RuntimeError( 

177 f"{ticker} should either be on I/O cards or a formula or be an interface. signal_type: {signal_type}" 

178 ) 

179 

180 if formula and address and signal_type != "command": 

181 raise RuntimeError( 

182 f"{ticker} is both on I/O cards and has a formula. This is forbidden for type {signal_type}." 

183 ) 

184 

185 # # Changing signal_value interface -> command if check above is ok 

186 # if signal_type == "interface": 

187 # signal_type = "command" 

188 

189 forcible = boolean_cell(row[10], True) 

190 commandable = boolean_cell(row[11], True) 

191 broadcastable = boolean_cell(row[12], True) 

192 

193 signal = Signal( 

194 ticker=ticker, 

195 description=row[2].value if row[2].value else "", 

196 unit=unit, 

197 type=signal_type, 

198 address=address, 

199 frequency=row[5].value, 

200 transfer_function=transfer_function, 

201 precision_digits=row[7].value, 

202 digitization_function=digitization_function, 

203 data_type=type, 

204 formula=formula, 

205 forcible=forcible, 

206 commandable=commandable, 

207 broadcastable=broadcastable, 

208 ) 

209 

210 component_id = row[1].value 

211 signals_by_component_id.setdefault(component_id, []) 

212 signals_by_component_id[component_id].append(signal) 

213 

214 # current_component.signals.append(signal) 

215 

216 if number_signals != len(tickers): 

217 raise RuntimeError("Duplicate Ticker!!!") 

218 

219 col_to_parameters = { 

220 c.column: c.value for c in list(components_sheet.iter_rows(max_row=1, min_col=4))[0] if c.value is not None 

221 } 

222 components = [] 

223 

224 for component_id, name, reference, *extra_parameters in components_sheet.iter_rows(min_row=2): 

225 if component_id.value is None: 

226 break 

227 reference = reference.value if reference.value is not None else "" 

228 

229 parameters = [ 

230 Parameter(name=col_to_parameters[c.column], value=c.value) for c in extra_parameters if c.value is not None 

231 ] 

232 components.append( 

233 Component( 

234 id=component_id.value, 

235 name=name.value, 

236 signals=signals_by_component_id[component_id.value], 

237 reference=reference, 

238 parameters=parameters, 

239 ) 

240 ) 

241 

242 modes = [] 

243 for imode, (mode_name, freq_multiplier, min_frequency) in enumerate(modes_sheet.iter_rows(2, max_col=3)): 

244 modes.append( 

245 Mode( 

246 mode_id=imode + 1, 

247 name=mode_name.value, 

248 frequency_multiplier=freq_multiplier.value, 

249 min_frequency=min_frequency.value, 

250 ) 

251 ) 

252 config = {"description": ""} # Downwards compatibility 

253 for row in config_sheet.iter_rows(): 

254 config[row[0].value] = row[1].value 

255 

256 event_rules = [] 

257 for row in event_rules_sheet.iter_rows(min_row=2): 

258 if row[1].value is None or row[2].value is None: 

259 break 

260 event_rules.append(EventRule(row[1].value, row[2].value)) 

261 

262 insert_classname = False 

263 hwconfig = { 

264 "$schema": "https://gitea.spacedreams.com/SpaceDreams/ConfigManager/src/branch/master/schemas/device-config_v1.json", 

265 # "generated at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 

266 "generated_at": round(time.time()), 

267 "config": config, 

268 "components": [c.to_dict(insert_classname=insert_classname) for c in components], 

269 "modes": [m.to_dict(insert_classname=insert_classname) for m in modes], 

270 "hardware_topology": topology.to_dict(insert_classname=insert_classname), 

271 "event_rules": [e.to_dict(insert_classname=insert_classname) for e in event_rules], 

272 } 

273 

274 # Adding petri 

275 petri_filepath = os.path.join("petri", f"{excel_name}.json") 

276 if os.path.isfile(petri_filepath): 

277 with open(petri_filepath, "r") as petri_file: 

278 petri = json.load(petri_file) 

279 hwconfig["petri_network"] = petri 

280 

281 # Adding petri 

282 pid_filepath = os.path.join("pid", f"{excel_name}.json") 

283 if os.path.isfile(pid_filepath): 

284 with open(pid_filepath, "r") as pid_file: 

285 pid = json.load(pid_file) 

286 hwconfig["pid"] = pid 

287 

288 # Validate against schema 

289 with open("schemas/device-config_v1.json", "r", encoding="utf-8") as schema_file: 

290 CONFIG_SCHEMA = json.load(schema_file) 

291 

292 jsonschema.validators.validate(hwconfig, CONFIG_SCHEMA) 

293 

294 return hwconfig 

295 

296 # output_yaml = excel_filename.replace(".xlsx", "") + ".yaml" 

297 # with open(output_yaml, "w") as file: 

298 # yaml.dump(hwconfig, file) 

299 

300 

301for filename in excels_filename: 

302 print(f"Generating file {filename}") 

303 config = device_config(filename) 

304 

305 output_json = os.path.join("json", filename.replace(".xlsx", "") + ".json") 

306 with open(output_json, "w") as file: 

307 json.dump(config, file, indent=2) 

308 

309for base_config_filename, extension_filename in extended_configs: 

310 print(f"Generating extended config {base_config_filename} + {extension_filename}") 

311 config = device_config(base_config_filename) 

312 extended_config = device_config(extension_filename) 

313 

314 new_components = config["components"].copy() 

315 # print('new_components', new_components) 

316 for component in extended_config["components"]: 

317 # Find component index in base config 

318 for icomp, component2 in enumerate(config["components"]): 

319 if component2["id"] == component["id"]: 

320 new_components[icomp] = component 

321 break 

322 config["components"] = new_components 

323 

324 output_json = os.path.join("json", extension_filename.replace(".xlsx", "") + ".json") 

325 with open(output_json, "w") as file: 

326 json.dump(config, file, indent=2)