Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%

1701 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-11 15:40 +0000

1from functools import cached_property 

2import os 

3import re 

4import io 

5import time 

6import csv 

7from typing import IO, Self, ClassVar, Any, Literal, get_args, Annotated 

8import datetime 

9import math 

10import bisect 

11from enum import Enum 

12import logging 

13import copy 

14import asyncio 

15import json 

16import hashlib 

17from ast import literal_eval 

18import requests 

19 

20import zipfile 

21import ping3 

22import pytz 

23from bson.objectid import ObjectId 

24from pymongo import ASCENDING, ReturnDocument 

25from pydantic import BaseModel, ConfigDict, HttpUrl, computed_field, Field, create_model, BeforeValidator 

26import numpy as npy 

27import lttb 

28import h5py 

29from openpyxl import Workbook, load_workbook 

30from openpyxl.worksheet.worksheet import Worksheet 

31 

32from twinpad_backend.db import ( 

33 get_collection, 

34 get_async_collection, 

35 get_signal_collection, 

36 get_signal_collections_batch, 

37 systems_database, 

38 systems_async_database, 

39 signals_database, 

40 signals_async_database, 

41 devices_states_database, 

42) 

43from twinpad_backend.responses import ListResponse 

44from twinpad_backend.messages import RabbitMQClient 

45from twinpad_backend.config_manager.utils import is_line_empty, read_boolean_cell 

46from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm 

47 

48TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

49SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"] 

50DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"] 

51MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"] 

52 

53 

54RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

55MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

56SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

57HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

58DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

59 

60DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

61NUMBER_SAMPLES_DATABASE_UPDATE = 120 

62 

63logger = logging.getLogger("uvicorn.error") 

64 

65 

66CHANNEL_PATTERN = r"Channel\s+(\d+)" 

67XLSX_HEADER = [ 

68 "Ticker", 

69 "Component id", 

70 "Signal description", 

71 "Unit", 

72 "Type", 

73 "Frequency (Hz)", 

74 "Sensor transfer function", 

75 "Precision digits", 

76 "Data type", 

77 "Formula", 

78 "Forcible", 

79 "Commandable", 

80 "Broadcastable", 

81] 

82 

83ReadableFile = str | os.PathLike[str] | IO[bytes] | bytes 

84 

85 

86class DeleteInfo(BaseModel): 

87 is_deleted: bool 

88 detail: str 

89 

90 

91class classproperty: 

92 """ 

93 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

94 Found here: https://stackoverflow.com/a/76301341 

95 """ 

96 

97 def __init__(self, func): 

98 self.fget = func 

99 

100 def __get__(self, _, owner): 

101 return self.fget(owner) 

102 

103 

104def create_update_model(model): 

105 fields = {} 

106 

107 for field_name, field_annotation in model.model_fields.items(): 

108 if field_name != "id": 

109 fields[field_name] = (field_annotation.annotation | None, None) 

110 

111 query_name = model.__name__ + "Update" 

112 return create_model(query_name, **fields) 

113 

114 

115def get_utc_date_from_timestamp(timestamp: float): 

116 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

117 

118 

119def downsample_list(time_vector: list, values: list, max_number_samples: int): 

120 if len(time_vector) < max_number_samples: 

121 return time_vector, values 

122 

123 time_vector_copy = copy.deepcopy(time_vector) 

124 values_copy = copy.deepcopy(values) 

125 

126 none_group_bounds = [] 

127 none_group_index = -1 

128 index = -1 

129 # LTTB doesn't handle None values so remove them 

130 while values_copy.count(None) > 0: 

131 # Store bounds of None value groups so we can insert them back after the downsampling 

132 if (new_index := values_copy.index(None)) != index: 

133 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

134 none_group_index += 1 

135 elif len(none_group_bounds[none_group_index]) < 2: 

136 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

137 else: 

138 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

139 values_copy.pop(new_index) 

140 index = new_index 

141 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

142 

143 try: 

144 values_array = npy.array([time_vector_copy, values_copy]).T 

145 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

146 

147 new_time_vector = interpolated_values[:, 0].tolist() 

148 new_values = interpolated_values[:, 1].tolist() 

149 except ValueError: 

150 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

151 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

152 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

153 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

154 return new_time_vector, new_values_nan_to_none 

155 

156 # insert back None values at the correct timestamps 

157 for none_group in none_group_bounds: 

158 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

159 new_time_vector[start_index:start_index] = none_group 

160 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

161 

162 return new_time_vector, new_values 

163 

164 

165def is_of_type(value, wanted_type): 

166 if wanted_type is float: 

167 return isinstance(value, (int, float)) 

168 return isinstance(value, wanted_type) 

169 

170 

171def is_valid_excel_file(excel_file: ReadableFile) -> bool: 

172 """Checks if file can be opened as an Excel file. 

173 

174 :param excel_file: Path to the file. 

175 :type excel_file: str | os.PathLike[str] | IO[bytes] | bytes 

176 :return: True if it can be opened as an Excel file, False otherwise 

177 :rtype: bool 

178 """ 

179 try: 

180 temp = load_workbook(excel_file, read_only=True) 

181 temp.close() 

182 except zipfile.BadZipFile: 

183 return False 

184 

185 return True 

186 

187 

188# Models 

189class TwinPadModel(BaseModel): 

190 @classmethod 

191 def dict_to_object(cls, dict_): 

192 return cls.model_validate(dict_) 

193 

194 def to_dict(self, exclude=None, by_alias: bool | None = None): 

195 dict_ = self.model_dump(exclude=exclude, mode="json", by_alias=by_alias) 

196 return dict_ 

197 

198 

199def validate_mongo_id(v): 

200 if not ObjectId.is_valid(v): 

201 raise ValueError("Invalid MongoDB id") 

202 return str(v) 

203 

204 

205MongoId = Annotated[str, BeforeValidator(validate_mongo_id)] 

206 

207 

208def validate_12_hex(v: str) -> str: 

209 if not re.fullmatch(r"[0-9a-fA-F]{12}", v): 

210 raise ValueError("ID must be a 12-character hexadecimal string") 

211 return v 

212 

213 

214DeviceId = Annotated[str, BeforeValidator(validate_12_hex)] 

215 

216 

217class GenericMongo(TwinPadModel): 

218 id: MongoId | None = None 

219 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

220 

221 @classmethod 

222 def collection(cls): 

223 return get_collection(systems_database, cls.collection_name, create=True) 

224 

225 @classmethod 

226 def response_from_query(cls, query) -> ListResponse[Self]: 

227 request_filters = query.mongodb_filter() 

228 items = [] 

229 

230 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

231 sort_dict = {} 

232 for sort in query.sort_by.split(","): 

233 if ":" in sort: 

234 sort_field, sort_order = sort.split(":") 

235 sort_order = int(sort_order) 

236 else: 

237 sort_field = sort 

238 sort_order = 1 

239 sort_dict[sort_field] = sort_order 

240 

241 collection = get_collection(systems_database, cls.collection_name, create=True) 

242 total = collection.count_documents(request_filters) 

243 

244 pipeline = [] 

245 added_properties = [] 

246 if "$and" in request_filters: 

247 for request_filter in request_filters["$and"]: 

248 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

249 if filtered_property in request_filter: 

250 pipeline.extend(pipeline_steps) 

251 added_properties.append(filtered_property) 

252 else: 

253 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

254 if filtered_property in request_filters: 

255 pipeline.extend(pipeline_steps) 

256 added_properties.append(filtered_property) 

257 pipeline.append({"$match": request_filters}) 

258 

259 for sort_field in sort_dict.keys(): 

260 if sort_field in cls.custom_pipeline_steps: 

261 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

262 added_properties.append(sort_field) 

263 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

264 

265 if (query.limit is not None) and (query.limit != 0): 

266 pipeline.append({"$limit": query.limit}) 

267 

268 for filtered_property, step in cls.custom_pipeline_steps.items(): 

269 if filtered_property not in added_properties: 

270 pipeline.extend(step) 

271 

272 cursor = collection.aggregate(pipeline) 

273 

274 for item_dict in cursor: 

275 items.append(cls.mongo_dict_to_object(item_dict)) 

276 

277 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

278 

279 @classmethod 

280 def get_from_id(cls, item_id) -> Self | None: 

281 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

282 

283 @classmethod 

284 def mongo_dict_to_object(cls, mongo_dict): 

285 mongo_dict["id"] = str(mongo_dict["_id"]) 

286 del mongo_dict["_id"] 

287 return cls.dict_to_object(mongo_dict) 

288 

289 @classmethod 

290 def get_by_attribute(cls, attribute_name: str, attribute_value): 

291 """Returns all items that match the attribute with value.""" 

292 pipeline = [] 

293 if attribute_name in cls.custom_pipeline_steps: 

294 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

295 pipeline.append({"$match": {attribute_name: attribute_value}}) 

296 for key, step in cls.custom_pipeline_steps.items(): 

297 if key != attribute_name: 

298 pipeline.extend(step) 

299 items = cls.collection().aggregate(pipeline) 

300 if items is None: 

301 return None 

302 return [cls.mongo_dict_to_object(d) for d in items] 

303 

304 @classmethod 

305 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

306 pipeline = [] 

307 if attribute_name in cls.custom_pipeline_steps: 

308 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

309 pipeline.append({"$match": {attribute_name: attribute_value}}) 

310 pipeline.append({"$limit": 1}) 

311 for key, step in cls.custom_pipeline_steps.items(): 

312 if key != attribute_name: 

313 pipeline.extend(step) 

314 items = cls.collection().aggregate(pipeline).to_list() 

315 if len(items) == 0: 

316 return None 

317 return cls.mongo_dict_to_object(items[0]) 

318 

319 @classmethod 

320 def get_all(cls, sort_by="_id") -> list[Self]: 

321 items = [] 

322 pipeline = [] 

323 if sort_by in cls.custom_pipeline_steps: 

324 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

325 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

326 for key, step in cls.custom_pipeline_steps.items(): 

327 if key != sort_by: 

328 pipeline.extend(step) 

329 for dict_ in cls.collection().aggregate(pipeline): 

330 items.append(cls.mongo_dict_to_object(dict_)) 

331 return items 

332 

333 @classmethod 

334 def get_number_documents(cls): 

335 collection = get_collection(systems_database, cls.collection_name) 

336 if collection is None: 

337 return 0 

338 return collection.count_documents( 

339 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]} 

340 ) 

341 

342 def insert(self): 

343 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

344 self.id = str(insert_result.inserted_id) 

345 return self.id 

346 

347 def update(self, update_dict): 

348 for key, value in update_dict.items(): 

349 setattr(self, key, value) 

350 self.collection().find_one_and_update( 

351 {"_id": ObjectId(self.id)}, 

352 {"$set": update_dict}, 

353 return_document=ReturnDocument.AFTER, 

354 ) 

355 

356 return self 

357 

358 def delete(self): 

359 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

360 return result.deleted_count > 0 

361 

362 

363class User(GenericMongo): 

364 collection_name: ClassVar[str] = "users" 

365 

366 firstname: str 

367 lastname: str 

368 email: str 

369 password: str 

370 is_active: bool | None = False 

371 is_admin: bool | None = False 

372 is_connected: bool | None = False 

373 company_id: str | None = None 

374 

375 def to_dict(self, exclude: set = set()): 

376 exclude.add("password") 

377 return GenericMongo.to_dict(self, exclude=exclude) 

378 

379 @classmethod 

380 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

381 users = cls.get_all() 

382 if not users: 

383 is_admin = True 

384 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

385 user_collection = get_collection(systems_database, "users", create=True) 

386 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

387 if new_user is None: 

388 return None 

389 return {"user_id": str(new_user.inserted_id)} 

390 

391 @classmethod 

392 def update_info(cls, user: "UserUpdate", user_id: str): 

393 updated_user = cls.collection().find_one_and_update( 

394 {"_id": ObjectId(user_id)}, 

395 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

396 return_document=ReturnDocument.AFTER, 

397 ) 

398 updated_user["id"] = str(updated_user["_id"]) 

399 del (updated_user["_id"], updated_user["is_connected"]) 

400 return cls(**updated_user) 

401 

402 

403UserUpdate = create_update_model(User) 

404 

405 

406class Mode(TwinPadModel): 

407 mode_id: int 

408 name: str 

409 frequency_multiplier: float 

410 min_frequency: int 

411 

412 @classmethod 

413 def from_excel(cls, modes_sheet: Worksheet) -> list[Self]: 

414 modes = [] 

415 for mode_index, (mode_name, freq_multiplier, min_frequency) in enumerate( 

416 modes_sheet.iter_rows(min_row=2, max_col=3) 

417 ): 

418 modes.append( 

419 Mode( 

420 mode_id=mode_index + 1, 

421 name=mode_name.value, 

422 frequency_multiplier=freq_multiplier.value, 

423 min_frequency=min_frequency.value, 

424 ) 

425 ) 

426 

427 return modes 

428 

429 

430class DeviceUpdate(TwinPadModel): 

431 mode_id: int 

432 

433 

434class Device(GenericMongo): 

435 collection_name: ClassVar[str] = "devices" 

436 

437 device_id: DeviceId 

438 config_id: str | None = None 

439 config_name: str | None = None 

440 name: str 

441 description: str = "" 

442 modes: list[Mode] 

443 current_mode_id: int | None = None 

444 last_ping: float | None = None 

445 petri_network: Any 

446 pid: Any 

447 load: float | None = None 

448 tokens: list[int] = Field(default_factory=list) 

449 status: str 

450 

451 async def change_mode(self, update_dict, current_user: User): 

452 has_error = False 

453 

454 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

455 has_error = True 

456 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

457 elif self.current_mode_id is not None: 

458 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

459 else: 

460 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

461 command = Command( 

462 sent_at=time.time(), 

463 command_type="Mode change", 

464 description=description, 

465 user_id=current_user.id, 

466 ) 

467 

468 if has_error: 

469 command.response_time = 0 

470 command.succeeded = False 

471 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

472 else: 

473 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id) 

474 command.receive_response(response) 

475 

476 Command.create(command) 

477 return response 

478 

479 @classmethod 

480 def get_from_device_or_config_id(cls, device_or_config_id: str): 

481 items = ( 

482 cls.collection() 

483 .aggregate( 

484 [ 

485 {"$match": {"$or": [{"device_id": device_or_config_id}, {"config_id": device_or_config_id}]}}, 

486 {"$limit": 1}, 

487 ] 

488 ) 

489 .to_list() 

490 ) 

491 if len(items) == 0: 

492 return None 

493 return cls.mongo_dict_to_object(items[0]) 

494 

495 @classmethod 

496 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

497 devices_by_id = {} 

498 for signal_id in signal_ids: 

499 device_or_config_id = signal_id.split(".")[0] 

500 if device_or_config_id not in devices_by_id: 

501 devices_by_id[device_or_config_id] = cls.get_from_device_or_config_id(device_or_config_id) 

502 return devices_by_id 

503 

504 

505class DeviceSetup(GenericMongo): 

506 collection_name: ClassVar[str] = "device_setups" 

507 

508 device_ids: list[str] 

509 active: bool = False 

510 variable_mapping: dict[str, str] 

511 

512 

513DeviceSetupUpdate = create_update_model(DeviceSetup) 

514 

515 

516class DeviceState(GenericMongo): 

517 collection_name: ClassVar[str] = "devices_states" 

518 

519 timestamp: float 

520 mode: str | None = None 

521 load: float | None = None 

522 tokens: list[int] = Field(default_factory=list) 

523 config_id: str | None = None 

524 modified_properties: list[str] = Field(default_factory=list) 

525 

526 @classmethod 

527 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]: 

528 req_filter = query.mongodb_filter() 

529 items = [] 

530 if ":" in query.sort_by: 

531 sort_field, sort_order = query.sort_by.split(":") 

532 sort_order = int(sort_order) 

533 else: 

534 sort_field = query.sort_by 

535 sort_order = 1 

536 collection = get_collection(devices_states_database, device_id) 

537 if collection is None: 

538 total = 0 

539 cursor = [] 

540 else: 

541 total = collection.count_documents(req_filter) 

542 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

543 if (query.limit is not None) and (query.limit != 0): 

544 cursor = cursor.limit(query.limit) 

545 for item_dict in cursor: 

546 items.append( 

547 cls( 

548 timestamp=item_dict.get("precise_timestamp"), 

549 mode=item_dict.get("mode"), 

550 load=item_dict.get("load"), 

551 tokens=item_dict.get("tokens", Field(default_factory=list)), 

552 config_id=item_dict.get("config_id"), 

553 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

554 ) 

555 ) 

556 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

557 

558 

559class SignalSample(TwinPadModel): 

560 signal_id: str 

561 timestamp: float 

562 value: float | int | str | bool | None 

563 forced_value: float | int | str | bool | None = None 

564 

565 @classmethod 

566 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

567 collection = get_signal_collection(signal_id) 

568 real_signal_id = signal_id 

569 

570 if collection is None: 

571 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

572 if device is not None: 

573 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

574 collection = get_signal_collection(real_signal_id) 

575 

576 if collection is None: 

577 return None 

578 

579 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

580 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

581 bucket = get_signal_collection(f"system.buckets.{real_signal_id}") 

582 first_bucket = None 

583 if bucket is not None: 

584 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

585 if first_bucket is not None: 

586 sample_data = collection.find_one( 

587 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

588 ) 

589 else: 

590 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

591 

592 if sample_data is None: 

593 return None 

594 

595 timestamp = sample_data["precise_timestamp"] 

596 

597 return cls( 

598 signal_id=real_signal_id, 

599 timestamp=timestamp, 

600 value=sample_data.get("value", None), 

601 forced_value=sample_data.get("forced_value", None), 

602 ) 

603 

604 @classmethod 

605 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

606 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

607 

608 @classmethod 

609 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

610 last_value_collection = get_signal_collection("last_values", True) 

611 signals_collection = get_collection(systems_database, "signals", create=True) 

612 signal_collection = get_signal_collection(signal_id) 

613 real_signal_id = signal_id 

614 

615 if signal_collection is None: 

616 if device is None: 

617 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

618 if device is not None: 

619 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

620 

621 sample_data = last_value_collection.find_one({"signal_id": real_signal_id}, sort={"precise_timestamp": -1}) 

622 

623 # If there is no data, check if the signal's type is anything other than float, as they don't send duplicate values 

624 if sample_data is None: 

625 if signals_collection.count_documents({"status.status": "up", "signal_id": real_signal_id}) < 1: 

626 return None 

627 

628 # Same workaround as above function, very effective to narrow down big sets of data 

629 bucket = get_signal_collection(f"system.buckets.{real_signal_id}") 

630 last_bucket = None 

631 if bucket is not None: 

632 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

633 signal_collection = get_signal_collection(real_signal_id) 

634 if last_bucket is not None and signal_collection is not None: 

635 sample_data = signal_collection.find_one( 

636 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

637 sort={"precise_timestamp": -1}, 

638 ) 

639 else: 

640 sample_data = signal_collection.find_one({}, sort={"precise_timestamp": -1}) 

641 

642 if sample_data is None: 

643 return None 

644 

645 timestamp = sample_data.get("precise_timestamp") 

646 # Align the timestamp with the device's last ping, cannot align with current time to avoid false reports if device is down 

647 if device is None: 

648 device = Device.get_from_device_or_config_id(real_signal_id.split(".")[0]) 

649 if device is not None and device.last_ping is not None: 

650 if timestamp is None: 

651 timestamp = device.last_ping 

652 else: 

653 timestamp = max(timestamp, device.last_ping) 

654 else: 

655 timestamp = sample_data.get("precise_timestamp") 

656 

657 return cls( 

658 signal_id=real_signal_id, 

659 timestamp=timestamp, 

660 value=sample_data.get("value", None), 

661 forced_value=sample_data.get("forced_value", None), 

662 ) 

663 

664 @classmethod 

665 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

666 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

667 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

668 

669 

670class SignalData(TwinPadModel): 

671 signal_id: str 

672 forcible: bool = True 

673 time_vector: list[float] 

674 values: list[float | int | str | None] 

675 forced_values: list[float | int | str | None] 

676 

677 data_start: float | None = None 

678 data_end: float | None = None 

679 

680 number_samples: int = 0 

681 number_samples_db: int = 0 

682 

683 db_query_time: float = 0.0 

684 init_time: float = 0.0 

685 data_processing_time: float = 0.0 

686 

687 phase_id: str | None = None 

688 

689 @classmethod 

690 def get_from_signal_id( 

691 cls, 

692 signal_id: str, 

693 min_timestamp: float = None, 

694 max_timestamp: float = None, 

695 window_min_timestamp: float = None, 

696 window_max_timestamp: float = None, 

697 interpolate_bounds: bool = True, 

698 max_documents: int = None, 

699 collection=None, 

700 ) -> Self: 

701 

702 now = time.time() 

703 

704 req_signal = {} 

705 if min_timestamp is not None: 

706 req_signal.setdefault("timestamp", {}) 

707 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

708 if max_timestamp is not None: 

709 req_signal.setdefault("timestamp", {}) 

710 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

711 

712 if collection is None: 

713 collection = get_signal_collection(signal_id) 

714 

715 real_signal_id = signal_id 

716 

717 if collection is None: 

718 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

719 if device is not None: 

720 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

721 collection = get_signal_collection(real_signal_id) 

722 

723 if collection is None: 

724 return cls( 

725 signal_id=real_signal_id, 

726 time_vector=[], 

727 values=[], 

728 forced_values=[], 

729 number_samples=0, 

730 number_samples_db=0, 

731 ) 

732 

733 db_req_start = time.time() 

734 

735 sort_step = {"$sort": {"precise_timestamp": 1}} 

736 number_results = collection.count_documents(req_signal) 

737 

738 pipeline = [] 

739 if req_signal: 

740 pipeline.append({"$match": req_signal}) # Filter data if needed 

741 

742 pipeline.extend( 

743 [ 

744 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

745 sort_step, 

746 ] 

747 ) 

748 

749 if max_documents is not None and max_documents < number_results: 

750 unsampling_ratio = math.ceil(number_results / max_documents) 

751 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

752 pipeline.extend( 

753 [ 

754 { 

755 "$setWindowFields": { 

756 "sortBy": {"precise_timestamp": 1}, 

757 "output": {"index": {"$documentNumber": {}}}, 

758 } 

759 }, 

760 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

761 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

762 {"$replaceRoot": {"newRoot": "$doc"}}, 

763 {"$unset": ["index", "group_id"]}, 

764 {"$sort": {"precise_timestamp": 1}}, 

765 ] 

766 ) 

767 

768 # logger.info(f"pipeline: %s", str(pipeline)) 

769 cursor = collection.aggregate(pipeline) 

770 db_req_time = time.time() - db_req_start 

771 

772 init_time = time.time() 

773 

774 results = cursor.to_list() 

775 time_vector = [] 

776 values = [] 

777 forced_values = [] 

778 for s in results: 

779 time_vector.append(s["precise_timestamp"]) 

780 values.append(s.get("value", None)) 

781 forced_values.append(s.get("forced_value", None)) 

782 

783 signal = Signal.get_from_signal_id(real_signal_id) 

784 if signal is None: 

785 return cls( 

786 signal_id=real_signal_id, 

787 time_vector=[], 

788 values=[], 

789 forced_values=[], 

790 number_samples=0, 

791 number_samples_db=0, 

792 ) 

793 class_ = signal.signal_data_class 

794 

795 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

796 time_vector, values, forced_values = cls.interpolate_bounds( 

797 class_, 

798 collection, 

799 real_signal_id, 

800 time_vector, 

801 values, 

802 forced_values, 

803 window_min_timestamp, 

804 window_max_timestamp, 

805 ) 

806 

807 if values: 

808 # TODO: check below. a bit strange 

809 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

810 # Adding last value as it should be repeated 

811 time_vector.append(now) 

812 values.append(values[-1]) 

813 forced_values.append(forced_values[-1]) 

814 

815 init_time = time.time() - init_time 

816 

817 # See line 292 for explanation 

818 bucket = get_signal_collection(f"system.buckets.{real_signal_id}") 

819 first_bucket = None 

820 if bucket is not None: 

821 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

822 if first_bucket is not None: 

823 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

824 else: 

825 data_start = None 

826 

827 last_bucket = None 

828 if bucket is not None: 

829 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

830 if last_bucket is not None: 

831 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

832 else: 

833 data_end = None 

834 

835 return class_( 

836 signal_id=real_signal_id, 

837 forcible=signal.forcible, 

838 time_vector=time_vector, 

839 values=values, 

840 forced_values=forced_values, 

841 data_start=data_start, 

842 data_end=data_end, 

843 number_samples=len(values), 

844 number_samples_db=number_results, 

845 db_query_time=db_req_time, 

846 init_time=init_time, 

847 ) 

848 

849 @staticmethod 

850 def interpolate_bounds( 

851 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

852 ): 

853 sample_right = None 

854 # Fetching right side value & interpolation 

855 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

856 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

857 sample_right = collection.find_one( 

858 { 

859 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

860 "value": {"$exists": True}, 

861 }, 

862 sort={"precise_timestamp": -1}, 

863 ) 

864 if sample_right: 

865 if time_vector: 

866 right_sd = class_( 

867 signal_id=signal_id, 

868 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

869 values=[values[-1], sample_right.get("value", None)], 

870 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

871 ) 

872 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

873 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

874 else: 

875 max_ts_value = sample_right.get("value", None) 

876 max_ts_forced_value = sample_right.get("forced_value", None) 

877 time_vector.append(window_max_timestamp) 

878 values.append(max_ts_value) 

879 forced_values.append(max_ts_forced_value) 

880 

881 # Fetching left side value & interpolation 

882 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

883 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

884 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

885 sample_left = sample_right 

886 sample_left = collection.find_one( 

887 { 

888 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

889 "value": {"$exists": True}, 

890 }, 

891 sort={"precise_timestamp": -1}, 

892 ) 

893 

894 if sample_left: 

895 if time_vector: 

896 left_sd = class_( 

897 signal_id=signal_id, 

898 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

899 values=[sample_left["value"], values[0]], 

900 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

901 ) 

902 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

903 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

904 else: 

905 min_ts_value = sample_left.get("value", None) 

906 min_ts_forced_value = sample_left.get("forced_value", None) 

907 time_vector.insert(0, window_min_timestamp) 

908 values.insert(0, min_ts_value) 

909 forced_values.insert(0, min_ts_forced_value) 

910 

911 return time_vector, values, forced_values 

912 

913 def interpolate_values(self, new_time_vector: list[float]): 

914 return self.interpolate(new_time_vector, self.values) 

915 

916 def interpolate_forced_values(self, new_time_vector: list[float]): 

917 return self.interpolate(new_time_vector, self.forced_values) 

918 

919 def uniform_desampling(self, number_samples_max: int) -> Self: 

920 data_processing_time = time.time() 

921 if number_samples_max and self.number_samples > number_samples_max: 

922 new_time_vector = npy.linspace( 

923 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

924 ).tolist() 

925 values = self.interpolate_values(new_time_vector) 

926 forced_values = self.interpolate_forced_values(new_time_vector) 

927 time_vector = new_time_vector 

928 number_samples = len(time_vector) 

929 else: 

930 time_vector = self.time_vector 

931 number_samples = len(self.values) 

932 values = self.values[:] 

933 forced_values = self.forced_values[:] 

934 data_processing_time = time.time() - data_processing_time 

935 

936 return self.__class__( 

937 signal_id=self.signal_id, 

938 time_vector=time_vector, 

939 values=values, 

940 forced_values=forced_values, 

941 number_samples=number_samples, 

942 number_samples_db=self.number_samples, 

943 data_start=self.data_start, 

944 data_end=self.data_end, 

945 db_query_time=self.db_query_time, 

946 init_time=self.init_time, 

947 data_processing_time=self.data_processing_time + data_processing_time, 

948 phase_id=self.phase_id, 

949 ) 

950 

951 def min_max_downsampling(self, number_samples_max: int) -> Self: 

952 return self.uniform_desampling(number_samples_max) 

953 

954 def interest_window_desampling( 

955 self, 

956 window_max_number_samples: int, 

957 outside_max_number_samples: int, 

958 window_min_timestamp: float | None = None, 

959 window_max_timestamp: float | None = None, 

960 ) -> Self: 

961 """Performs a sampling in a window of interest and outside.""" 

962 

963 if not self.time_vector: 

964 return self 

965 

966 if window_min_timestamp is None: 

967 window_min_timestamp = self.time_vector[0] 

968 if window_max_timestamp is None: 

969 window_max_timestamp = self.time_vector[-1] 

970 

971 data_processing_time = time.time() 

972 

973 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

974 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

975 

976 time_vector_before = self.time_vector[:index_window_start] 

977 time_vector_window = self.time_vector[index_window_start:index_window_end] 

978 time_vector_after = self.time_vector[index_window_end:] 

979 

980 # Resampling window 

981 if time_vector_window: 

982 # Ensurring window bounds 

983 if time_vector_window[0] != window_min_timestamp: 

984 time_vector_window.insert(0, window_min_timestamp) 

985 if time_vector_window[-1] != window_max_timestamp: 

986 time_vector_window.append(window_max_timestamp) 

987 else: 

988 time_vector_window = [window_min_timestamp, window_max_timestamp] 

989 

990 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

991 # Resampling 

992 new_window_time_vector = npy.linspace( 

993 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

994 ).tolist() 

995 time_vector_window = new_window_time_vector 

996 

997 # Resampling outside 

998 number_samples_before = len(time_vector_before) 

999 number_samples_after = len(time_vector_after) 

1000 if ( 

1001 outside_max_number_samples is not None 

1002 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1003 ): 

1004 new_number_samples_before = min( 

1005 number_samples_before, 

1006 math.ceil( 

1007 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1008 ), 

1009 ) 

1010 new_number_samples_after = min( 

1011 number_samples_after, 

1012 math.ceil( 

1013 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1014 ), 

1015 ) 

1016 # Adjusting numbers as math.ceil can do +1 on sum 

1017 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1018 if new_number_samples_before > new_number_samples_after: 

1019 new_number_samples_before -= 1 

1020 else: 

1021 new_number_samples_after -= 1 

1022 

1023 if new_number_samples_before > 0: 

1024 new_time_vector_before = npy.linspace( 

1025 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

1026 ).tolist() 

1027 time_vector_before = new_time_vector_before 

1028 

1029 if new_number_samples_after > 0: 

1030 new_time_vector_after = npy.linspace( 

1031 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

1032 ).tolist()[::-1] 

1033 time_vector_after = new_time_vector_after 

1034 

1035 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1036 values = self.interpolate_values(new_time_vector) 

1037 forced_values = self.interpolate_forced_values(new_time_vector) 

1038 number_samples = len(values) 

1039 

1040 data_processing_time = time.time() - data_processing_time 

1041 

1042 return self.__class__( 

1043 signal_id=self.signal_id, 

1044 forcible=self.forcible, 

1045 time_vector=new_time_vector, 

1046 values=values, 

1047 forced_values=forced_values, 

1048 number_samples=number_samples, 

1049 number_samples_db=self.number_samples, 

1050 data_start=self.data_start, 

1051 data_end=self.data_end, 

1052 db_query_time=self.db_query_time, 

1053 init_time=self.init_time, 

1054 data_processing_time=self.data_processing_time + data_processing_time, 

1055 ) 

1056 

1057 def zero_time_vector(self, data_start: float): 

1058 data_processing_time = time.time() 

1059 if len(self.time_vector) == 0: 

1060 return self 

1061 time_vector = npy.array(self.time_vector) - data_start 

1062 data_processing_time = time.time() - data_processing_time 

1063 

1064 return self.__class__( 

1065 signal_id=self.signal_id, 

1066 time_vector=time_vector, 

1067 values=self.values, 

1068 forced_values=self.forced_values, 

1069 number_samples=self.number_samples, 

1070 number_samples_db=self.number_samples_db, 

1071 data_start=time_vector[0], 

1072 data_end=time_vector[-1], 

1073 db_query_time=self.db_query_time, 

1074 init_time=self.init_time, 

1075 data_processing_time=self.data_processing_time + data_processing_time, 

1076 ) 

1077 

1078 def csv_export(self): 

1079 output = io.StringIO() 

1080 writer = csv.writer(output) 

1081 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

1082 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

1083 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

1084 return output.getvalue().encode("utf-8") 

1085 

1086 def prestoplot_export(self): 

1087 clean_signal_id = self.signal_id.replace(".", "_") 

1088 if clean_signal_id[0].isnumeric(): 

1089 clean_signal_id = "_" + clean_signal_id 

1090 

1091 output = io.StringIO() 

1092 output.write("# Encoding:\tUTF-8\n") 

1093 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

1094 output.write("ISO8601\tnone\tnone\n") 

1095 output.write(f"# Description :\t{clean_signal_id}\n") 

1096 

1097 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

1098 output.write( 

1099 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

1100 ) 

1101 return output.getvalue().encode("utf-8") 

1102 

1103 

1104class NumericSignalData(SignalData): 

1105 data_type: str = "float" 

1106 values: list[float | int | None] 

1107 forced_values: list[float | int | None] 

1108 

1109 def interpolate(self, new_time_vector: list[float], items): 

1110 items = [npy.nan if s is None else s for s in items] 

1111 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

1112 

1113 def uniform_desampling(self, number_samples_max: int) -> Self: 

1114 data_processing_time = time.time() 

1115 if number_samples_max and self.number_samples > number_samples_max: 

1116 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

1117 forced_values = self.interpolate_forced_values(time_vector) 

1118 number_samples = len(time_vector) 

1119 else: 

1120 time_vector = self.time_vector 

1121 number_samples = len(self.values) 

1122 values = self.values[:] 

1123 forced_values = self.forced_values[:] 

1124 data_processing_time = time.time() - data_processing_time 

1125 

1126 return self.__class__( 

1127 signal_id=self.signal_id, 

1128 time_vector=time_vector, 

1129 values=values, 

1130 forced_values=forced_values, 

1131 number_samples=number_samples, 

1132 number_samples_db=self.number_samples, 

1133 data_start=self.data_start, 

1134 data_end=self.data_end, 

1135 db_query_time=self.db_query_time, 

1136 init_time=self.init_time, 

1137 data_processing_time=self.data_processing_time + data_processing_time, 

1138 ) 

1139 

1140 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1141 if self.number_samples < number_samples_max: 

1142 return self 

1143 

1144 data_processing_time = time.time() 

1145 

1146 number_bins = number_samples_max // 2 

1147 

1148 time_vector = npy.array(self.time_vector, dtype=npy.float64) 

1149 values = npy.array(self.values, dtype=npy.float64) 

1150 forced_values = npy.array(self.forced_values, dtype=npy.float64) 

1151 

1152 points_per_bin = self.number_samples // number_bins 

1153 

1154 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above 

1155 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for 

1156 if self.number_samples - number_bins * points_per_bin > 1: 

1157 points_per_bin += 1 

1158 number_bins = self.number_samples // points_per_bin + 1 

1159 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan) 

1160 time_vector = npy.concatenate([time_vector, nan_points_to_add]) 

1161 values = npy.concatenate([values, nan_points_to_add]) 

1162 forced_values = npy.concatenate([forced_values, nan_points_to_add]) 

1163 

1164 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin) 

1165 values_matrix = values.reshape(number_bins, points_per_bin) 

1166 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin) 

1167 

1168 indexes_min = npy.zeros(number_bins, dtype="int64") 

1169 indexes_max = npy.zeros(number_bins, dtype="int64") 

1170 

1171 for row in range(number_bins): 

1172 min_value = values_matrix[row, 0] 

1173 max_value = values_matrix[row, 0] 

1174 for column in range(points_per_bin): 

1175 if values_matrix[row, column] < min_value: 

1176 min_value = values_matrix[row, column] 

1177 indexes_min[row] = column 

1178 elif values_matrix[row, column] > max_value: 

1179 max_value = values_matrix[row, column] 

1180 indexes_max[row] = column 

1181 

1182 row_index = npy.repeat(npy.arange(number_bins), 2) 

1183 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel() 

1184 

1185 data_processing_time = time.time() - data_processing_time 

1186 

1187 new_time_vector = timestamps_matrix[row_index, column_index] 

1188 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector) 

1189 new_values = values_matrix[row_index, column_index] 

1190 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1191 new_forced_values = forced_values_matrix[row_index, column_index] 

1192 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1193 

1194 # Make sure there are no None values for the time vector 

1195 time_vector_filter = new_time_vector != None 

1196 new_time_vector = new_time_vector[time_vector_filter] 

1197 new_values = new_values[time_vector_filter] 

1198 new_forced_values = new_forced_values[time_vector_filter] 

1199 

1200 return self.__class__( 

1201 signal_id=self.signal_id, 

1202 time_vector=new_time_vector, 

1203 values=new_values, 

1204 forced_values=new_forced_values, 

1205 number_samples=number_bins * 2, 

1206 number_samples_db=self.number_samples_db, 

1207 data_start=self.data_start, 

1208 data_end=self.data_end, 

1209 db_query_time=self.db_query_time, 

1210 init_time=self.init_time, 

1211 data_processing_time=self.data_processing_time + data_processing_time, 

1212 phase_id=self.phase_id, 

1213 ) 

1214 

1215 def interest_window_desampling( 

1216 self, 

1217 window_max_number_samples: int, 

1218 outside_max_number_samples: int, 

1219 window_min_timestamp: float | None = None, 

1220 window_max_timestamp: float | None = None, 

1221 ) -> Self: 

1222 """Performs a sampling in a window of interest and outside.""" 

1223 

1224 if not self.time_vector: 

1225 return self 

1226 

1227 if window_min_timestamp is None: 

1228 window_min_timestamp = self.time_vector[0] 

1229 if window_max_timestamp is None: 

1230 window_max_timestamp = self.time_vector[-1] 

1231 

1232 data_processing_time = time.time() 

1233 

1234 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1235 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1236 

1237 time_vector_before = self.time_vector[:index_window_start] 

1238 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1239 time_vector_after = self.time_vector[index_window_end:] 

1240 

1241 values_before = self.values[:index_window_start] 

1242 values_window = self.values[index_window_start:index_window_end] 

1243 values_after = self.values[index_window_end:] 

1244 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1245 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1246 

1247 # Resampling window 

1248 if time_vector_window: 

1249 # Ensurring window bounds 

1250 if time_vector_window[0] != window_min_timestamp: 

1251 time_vector_window.insert(0, window_min_timestamp) 

1252 values_window.insert(0, window_min_value) 

1253 if time_vector_window[-1] != window_max_timestamp: 

1254 time_vector_window.append(window_max_timestamp) 

1255 values_window.append(window_max_value) 

1256 else: 

1257 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1258 values_window = [window_min_value, window_max_value] 

1259 

1260 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

1261 # Resampling 

1262 time_vector_window, values_window = downsample_list( 

1263 time_vector_window, values_window, window_max_number_samples 

1264 ) 

1265 

1266 # Resampling outside 

1267 number_samples_before = len(time_vector_before) 

1268 number_samples_after = len(time_vector_after) 

1269 if ( 

1270 outside_max_number_samples is not None 

1271 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1272 ): 

1273 new_number_samples_before = min( 

1274 number_samples_before, 

1275 math.ceil( 

1276 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1277 ), 

1278 ) 

1279 new_number_samples_after = min( 

1280 number_samples_after, 

1281 math.ceil( 

1282 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1283 ), 

1284 ) 

1285 # Adjusting numbers as math.ceil can do +1 on sum 

1286 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1287 if new_number_samples_before > new_number_samples_after: 

1288 new_number_samples_before -= 1 

1289 else: 

1290 new_number_samples_after -= 1 

1291 

1292 if new_number_samples_before > 0: 

1293 time_vector_before, values_before = downsample_list( 

1294 time_vector_before, values_before, new_number_samples_before 

1295 ) 

1296 

1297 if new_number_samples_after > 0: 

1298 time_vector_after, values_after = downsample_list( 

1299 time_vector_after, values_after, new_number_samples_after 

1300 ) 

1301 

1302 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1303 values = values_before + values_window + values_after 

1304 forced_values = self.interpolate_forced_values(new_time_vector) 

1305 number_samples = len(values) 

1306 

1307 data_processing_time = time.time() - data_processing_time 

1308 

1309 return self.__class__( 

1310 signal_id=self.signal_id, 

1311 time_vector=new_time_vector, 

1312 values=values, 

1313 forced_values=forced_values, 

1314 number_samples=number_samples, 

1315 number_samples_db=self.number_samples, 

1316 data_start=self.data_start, 

1317 data_end=self.data_end, 

1318 db_query_time=self.db_query_time, 

1319 init_time=self.init_time, 

1320 data_processing_time=self.data_processing_time + data_processing_time, 

1321 ) 

1322 

1323 

1324class StringSignalData(SignalData): 

1325 data_type: str = "str" 

1326 values: list[str | None] 

1327 forced_values: list[str | None] 

1328 

1329 def interpolate(self, new_time_vector: list[float], items): 

1330 # Find the indices of the values in xp that are just smaller or equal to x 

1331 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1332 indices = npy.clip(indices, 0, len(items) - 1) 

1333 # Return the corresponding left string values from fp 

1334 return [items[i] for i in indices] 

1335 

1336 

1337class SignalsData(TwinPadModel): 

1338 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1339 data_processing_time: float 

1340 data_start: float | None 

1341 data_end: float | None 

1342 

1343 @classmethod 

1344 def get_from_signal_ids( 

1345 cls, 

1346 signal_ids: list[str], 

1347 min_timestamp: float = None, 

1348 max_timestamp: float = None, 

1349 window_min_timestamp: float = None, 

1350 window_max_timestamp: float = None, 

1351 interpolate_bounds: bool = True, 

1352 max_documents: int = None, 

1353 ) -> Self: 

1354 signals_data = [] 

1355 data_start = None 

1356 data_end = None 

1357 if max_timestamp is None: 

1358 max_timestamp = time.time() 

1359 data_processing_time = 0.0 

1360 

1361 signal_collections = get_signal_collections_batch(signal_ids) 

1362 

1363 for signal_id, collection in zip(signal_ids, signal_collections): 

1364 signal_data = SignalData.get_from_signal_id( 

1365 signal_id=signal_id, 

1366 min_timestamp=min_timestamp, 

1367 max_timestamp=max_timestamp, 

1368 window_min_timestamp=window_min_timestamp, 

1369 window_max_timestamp=window_max_timestamp, 

1370 interpolate_bounds=interpolate_bounds, 

1371 max_documents=max_documents, 

1372 collection=collection, 

1373 ) 

1374 data_processing_time += signal_data.data_processing_time 

1375 signals_data.append(signal_data) 

1376 if signal_data.data_start is not None: 

1377 if data_start is None: 

1378 data_start = signal_data.data_start 

1379 else: 

1380 data_start = min(signal_data.data_start, data_start) 

1381 if signal_data.data_end is not None: 

1382 if data_end is None: 

1383 data_end = signal_data.data_end 

1384 else: 

1385 data_end = max(signal_data.data_end, data_end) 

1386 

1387 return cls( 

1388 signals_data=signals_data, 

1389 data_processing_time=data_processing_time, 

1390 data_start=data_start, 

1391 data_end=data_end, 

1392 ) 

1393 

1394 @classmethod 

1395 def get_from_phase_and_signal_ids( 

1396 cls, 

1397 phases: list, 

1398 phase_sync_times: list[float | None], 

1399 signal_ids: list[str], 

1400 window_min_timestamps: list[float | None], 

1401 window_max_timestamps: list[float | None], 

1402 zero_time_vector: bool = True, 

1403 ): 

1404 signals_data: list[SignalData] = [] 

1405 computation_start = time.time() 

1406 

1407 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip( 

1408 phases, phase_sync_times, window_min_timestamps, window_max_timestamps 

1409 ): 

1410 min_timestamp = phase.start_at / 1000 

1411 max_timestamp = phase.end_at / 1000 

1412 

1413 if sync_time is None: 

1414 sync_time = min_timestamp 

1415 

1416 if window_max_timestamp is not None and window_min_timestamp is not None: 

1417 window_length = window_max_timestamp - window_min_timestamp 

1418 

1419 if window_min_timestamp != min_timestamp: 

1420 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20) 

1421 if window_max_timestamp != max_timestamp: 

1422 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20) 

1423 

1424 signal_collections = get_signal_collections_batch(signal_ids) 

1425 

1426 for signal_id, collection in zip(signal_ids, signal_collections): 

1427 signal_data = SignalData.get_from_signal_id( 

1428 signal_id, 

1429 min_timestamp, 

1430 max_timestamp, 

1431 window_min_timestamp, 

1432 window_max_timestamp, 

1433 interpolate_bounds=False, 

1434 max_documents=None, 

1435 collection=collection, 

1436 ) 

1437 

1438 if len(signal_data.time_vector) == 0: 

1439 continue 

1440 

1441 if zero_time_vector: 

1442 signal_data = signal_data.zero_time_vector(sync_time) 

1443 signal_data.phase_id = phase.id 

1444 

1445 signals_data.append(signal_data) 

1446 

1447 return cls( 

1448 signals_data=signals_data, 

1449 data_processing_time=time.time() - computation_start, 

1450 data_start=0, 

1451 data_end=0, 

1452 ) 

1453 

1454 def uniform_desampling(self, number_samples_max: int) -> Self: 

1455 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1456 return SignalsData( 

1457 signals_data=signals_data, 

1458 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1459 data_start=self.data_start, 

1460 data_end=self.data_end, 

1461 ) 

1462 

1463 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1464 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1465 return SignalsData( 

1466 signals_data=signals_data, 

1467 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1468 data_start=self.data_start, 

1469 data_end=self.data_end, 

1470 ) 

1471 

1472 def interest_window_desampling( 

1473 self, 

1474 window_max_number_samples: int, 

1475 outside_max_number_samples: int, 

1476 window_min_timestamp: float = None, 

1477 window_max_timestamp: float = None, 

1478 ) -> Self: 

1479 signals_data = [ 

1480 s.interest_window_desampling( 

1481 window_max_number_samples=window_max_number_samples, 

1482 outside_max_number_samples=outside_max_number_samples, 

1483 window_min_timestamp=window_min_timestamp, 

1484 window_max_timestamp=window_max_timestamp, 

1485 ) 

1486 for s in self.signals_data 

1487 ] 

1488 

1489 return SignalsData( 

1490 signals_data=signals_data, 

1491 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1492 data_start=self.data_start, 

1493 data_end=self.data_end, 

1494 ) 

1495 

1496 def zero_time_vector(self, data_start: float): 

1497 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data] 

1498 return SignalsData( 

1499 signals_data=signals_data, 

1500 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1501 data_start=0, 

1502 data_end=max([s.data_end for s in signals_data]), 

1503 ) 

1504 

1505 @classmethod 

1506 async def apply_single_function( 

1507 cls, 

1508 phase, 

1509 base_signal_id: str, 

1510 function: SINGLE_POST_PROCESSING_FUNCTION, 

1511 window_min_timestamp: float = None, 

1512 window_max_timestamp: float = None, 

1513 ): 

1514 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}" 

1515 

1516 processed_result_signal = Signal.get_from_signal_id(signal_id) 

1517 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids: 

1518 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp) 

1519 

1520 signals_data = cls.get_from_phase_and_signal_ids( 

1521 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False 

1522 ) 

1523 

1524 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0: 

1525 return None 

1526 

1527 new_values = None 

1528 new_forced_values = None 

1529 time_vector = npy.array(signals_data.signals_data[0].time_vector) 

1530 values = signals_data.signals_data[0].values 

1531 forced_values = signals_data.signals_data[0].forced_values 

1532 

1533 match (function): 

1534 case "Cumul": 

1535 new_values = cumul(values) 

1536 new_forced_values = cumul(forced_values) 

1537 # case "CumulDistrib": 

1538 # new_values = cumul_distrib(values) 

1539 # new_forced_values = cumul_distrib(forced_values) 

1540 case "Delta": 

1541 new_values = delta(values) 

1542 new_forced_values = delta(forced_values) 

1543 case "DeltaT": 

1544 new_values = delta(time_vector) 

1545 new_forced_values = new_values 

1546 case "Derive": 

1547 new_values = derive(time_vector, values) 

1548 new_forced_values = derive(time_vector, forced_values) 

1549 case "Integ": 

1550 new_values = integ(time_vector, values) 

1551 new_forced_values = integ(time_vector, forced_values) 

1552 

1553 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1554 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1555 

1556 loop = asyncio.get_running_loop() 

1557 loop.create_task( 

1558 cls.save_function_signal( 

1559 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible 

1560 ) 

1561 ) 

1562 

1563 if window_max_timestamp is not None: 

1564 max_timestamp_mask = time_vector <= window_max_timestamp 

1565 time_vector = time_vector[max_timestamp_mask] 

1566 new_values = new_values[max_timestamp_mask] 

1567 new_forced_values = new_forced_values[max_timestamp_mask] 

1568 if window_min_timestamp is not None: 

1569 min_timestamp_mask = time_vector >= window_min_timestamp 

1570 time_vector = time_vector[min_timestamp_mask] 

1571 new_values = new_values[min_timestamp_mask] 

1572 new_forced_values = new_forced_values[min_timestamp_mask] 

1573 

1574 signals_data.signals_data[0].time_vector = time_vector.tolist() 

1575 signals_data.signals_data[0].values = new_values.tolist() 

1576 signals_data.signals_data[0].forced_values = new_forced_values.tolist() 

1577 signals_data.signals_data[0].number_samples = time_vector.size 

1578 

1579 signals_data.signals_data[0].signal_id = signal_id 

1580 

1581 return signals_data 

1582 

1583 @classmethod 

1584 async def apply_multiple_function( 

1585 cls, 

1586 phases: list, 

1587 signal_ids: list, 

1588 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

1589 window_min_timestamp: float = None, 

1590 window_max_timestamp: float = None, 

1591 ): 

1592 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION): 

1593 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}" 

1594 else: 

1595 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}" 

1596 

1597 active_phase = phases[0] 

1598 if function in {"Align-X", "Using-X"}: 

1599 active_phase = phases[1] 

1600 

1601 processed_result_signal = Signal.get_from_signal_id(function_signal_id) 

1602 if processed_result_signal is not None and ( 

1603 active_phase.id in processed_result_signal.computed_phases_ids 

1604 ): # If signal has been computed for the correct phase 

1605 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp) 

1606 

1607 array_length = None 

1608 time_vector_list = [] 

1609 values_list = [] 

1610 forced_values_list = [] 

1611 forcible = True 

1612 for phase, signal_id in zip(phases, signal_ids): 

1613 signals_data = cls.get_from_phase_and_signal_ids( 

1614 [phase], [None], [signal_id], [None], [None], zero_time_vector=False 

1615 ) 

1616 

1617 if len(signals_data.signals_data) == 0: 

1618 return None 

1619 

1620 signal_data = signals_data.signals_data[0] 

1621 

1622 if array_length is None: 

1623 array_length = signal_data.number_samples 

1624 if ( 

1625 array_length != signal_data.number_samples and function != "Align-X" 

1626 ) or signal_data.number_samples == 0: 

1627 return None 

1628 

1629 time_vector_list.append(npy.array(signal_data.time_vector)) 

1630 values_list.append(npy.array(signal_data.values, dtype=npy.float64)) 

1631 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64)) 

1632 forcible = forcible and signal_data.forcible 

1633 

1634 time_vector = time_vector_list[0] 

1635 new_values = None 

1636 new_forced_values = None 

1637 

1638 match (function): 

1639 case "Align-X": 

1640 time_vector = time_vector_list[1] 

1641 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000 

1642 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000 

1643 new_values = align_x(old_time_vector, values_list[0], new_time_vector) 

1644 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector) 

1645 # case "Atan2": 

1646 # new_values = atan2(values_list[0], values_list[1]) 

1647 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1]) 

1648 case "Using-X": 

1649 if len(time_vector_list[0]) != len(time_vector_list[1]): 

1650 return None 

1651 time_vector = time_vector_list[1] 

1652 new_values = values_list[0] 

1653 new_forced_values = forced_values_list[0] 

1654 case "Mean": 

1655 new_values = mean(*values_list) 

1656 new_forced_values = mean(*forced_values_list) 

1657 case "Norm": 

1658 new_values = norm(*values_list) 

1659 new_forced_values = norm(*forced_values_list) 

1660 

1661 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1662 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1663 

1664 loop = asyncio.get_running_loop() 

1665 loop.create_task( 

1666 cls.save_function_signal( 

1667 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible 

1668 ) 

1669 ) 

1670 

1671 total_number_samples = time_vector.size 

1672 

1673 if window_max_timestamp is not None: 

1674 max_timestamp_mask = time_vector <= window_max_timestamp 

1675 time_vector = time_vector[max_timestamp_mask] 

1676 new_values = new_values[max_timestamp_mask] 

1677 new_forced_values = new_forced_values[max_timestamp_mask] 

1678 if window_min_timestamp is not None: 

1679 min_timestamp_mask = time_vector >= window_min_timestamp 

1680 time_vector = time_vector[min_timestamp_mask] 

1681 new_values = new_values[min_timestamp_mask] 

1682 new_forced_values = new_forced_values[min_timestamp_mask] 

1683 

1684 signals_data = cls( 

1685 signals_data=[ 

1686 NumericSignalData( 

1687 signal_id=function_signal_id, 

1688 forcible=forcible, 

1689 time_vector=time_vector.tolist(), 

1690 values=new_values.tolist(), 

1691 forced_values=new_forced_values.tolist(), 

1692 number_samples=time_vector.size, 

1693 number_samples_db=total_number_samples, 

1694 ) 

1695 ], 

1696 data_processing_time=0, 

1697 data_start=0, 

1698 data_end=0, 

1699 ) 

1700 

1701 return signals_data 

1702 

1703 @classmethod 

1704 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float): 

1705 signal_data_collection = get_signal_collection(signal_id, create=True) 

1706 pipeline = [] 

1707 match_filter = {} 

1708 if window_min_timestamp is not None or window_max_timestamp is not None: 

1709 match_filter["$match"] = {} 

1710 match_filter["$match"]["precise_timestamp"] = {} 

1711 if window_max_timestamp is not None: 

1712 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp 

1713 if window_min_timestamp is not None: 

1714 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp 

1715 

1716 total_number_samples = signal_data_collection.count_documents({}) 

1717 

1718 if match_filter: 

1719 pipeline.append(match_filter) 

1720 

1721 fetch_start = time.time() 

1722 

1723 samples = signal_data_collection.aggregate(pipeline).to_list() 

1724 new_time_vector = [] 

1725 new_values = [] 

1726 new_forced_values = [] 

1727 for sample in samples: 

1728 new_time_vector.append(sample["precise_timestamp"]) 

1729 new_values.append(sample["value"]) 

1730 new_forced_values.append(sample["forced_value"]) 

1731 

1732 return cls( 

1733 signals_data=[ 

1734 NumericSignalData( 

1735 signal_id=signal_id, 

1736 time_vector=new_time_vector, 

1737 values=new_values, 

1738 forced_values=new_forced_values, 

1739 number_samples=len(new_time_vector), 

1740 number_samples_db=total_number_samples, 

1741 ) 

1742 ], 

1743 data_processing_time=time.time() - fetch_start, 

1744 data_start=0, 

1745 data_end=0, 

1746 ) 

1747 

1748 @classmethod 

1749 async def save_function_signal( 

1750 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool 

1751 ): 

1752 # Insert data first so if it is requested by another user, it will be computed again 

1753 signal_collection = get_signal_collection(function_signal_id, create=True) 

1754 signal_collection.delete_many( 

1755 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}} 

1756 ) 

1757 signal_collection.insert_many( 

1758 [ 

1759 { 

1760 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]), 

1761 "precise_timestamp": time_vector[i], 

1762 "value": new_values[i], 

1763 "forced_value": new_forced_values[i], 

1764 } 

1765 for i in range(len(time_vector)) 

1766 ] 

1767 ) 

1768 

1769 signals_config_collection = get_collection(systems_database, "signals", create=True) 

1770 signals_config_collection.find_one_and_update( 

1771 {"signal_id": function_signal_id}, 

1772 { 

1773 "$set": { 

1774 "description": "", 

1775 "unit": None, 

1776 "type": "sensor", 

1777 "address": None, 

1778 "frequency": round(1 / max(npy.mean(delta(time_vector)), 1)), # avoid division by 0 

1779 "transfer_function": None, 

1780 "precision_digits": None, 

1781 "digitization_function": None, 

1782 "data_type": "float", 

1783 "formula": None, 

1784 "forcible": forcible, 

1785 "commandable": False, 

1786 "broadcastable": True, 

1787 "signal_id": function_signal_id, 

1788 "post_processing": True, 

1789 }, 

1790 "$push": {"computed_phases_ids": phase.id}, 

1791 }, 

1792 upsert=True, 

1793 ) 

1794 

1795 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []): 

1796 if post_processing: 

1797 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1798 zip_buffer = io.BytesIO() 

1799 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1800 for signal_data in self.signals_data: 

1801 file_name = signal_data.signal_id 

1802 if post_processing: 

1803 phase = phases_by_id.get( 

1804 signal_data.phase_id, 

1805 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1806 ) 

1807 file_name = f"{signal_data.signal_id} ({phase.name})" 

1808 if file_format == "csv": 

1809 export_io = signal_data.csv_export() 

1810 zip_file.writestr(f"{file_name}.csv", export_io) 

1811 elif file_format == "prestoplot": 

1812 export_io = signal_data.prestoplot_export() 

1813 zip_file.writestr(f"{file_name}.tab", export_io) 

1814 else: 

1815 raise ValueError(f"Format not found. Got: {file_format}") 

1816 zip_bytes = zip_buffer.getvalue() 

1817 return zip_bytes 

1818 

1819 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []): 

1820 if post_processing: 

1821 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1822 hdf5_buffer = io.BytesIO() 

1823 custom_type_float = npy.dtype( 

1824 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1825 ) 

1826 custom_type_string = npy.dtype( 

1827 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1828 ) 

1829 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1830 for signal_data in self.signals_data: 

1831 if post_processing: 

1832 phase = phases_by_id.get( 

1833 signal_data.phase_id, 

1834 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1835 ) 

1836 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})") 

1837 else: 

1838 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1839 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1840 if signal_data.data_type == "str": 

1841 export_data = npy.array( 

1842 list( 

1843 zip( 

1844 date_vector, 

1845 signal_data.time_vector, 

1846 signal_data.values, 

1847 signal_data.forced_values, 

1848 ) 

1849 ), 

1850 dtype=custom_type_string, 

1851 ) 

1852 else: 

1853 export_data = npy.array( 

1854 list( 

1855 zip( 

1856 date_vector, 

1857 signal_data.time_vector, 

1858 signal_data.values, 

1859 signal_data.forced_values, 

1860 ) 

1861 ), 

1862 dtype=custom_type_float, 

1863 ) 

1864 signal_group["data"] = export_data 

1865 return hdf5_buffer.getvalue() 

1866 

1867 

1868class SignalStatus(TwinPadModel): 

1869 status: str = "down" 

1870 reason: str = "" 

1871 delay: float | None = None 

1872 

1873 

1874class DigitizationFunction(TwinPadModel): 

1875 bits: int | None = None 

1876 min_value: float 

1877 max_value: float 

1878 min_raw_value: float 

1879 max_raw_value: float 

1880 

1881 @classmethod 

1882 def from_bits(cls, bits: int, min_value: float, max_value: float): 

1883 return cls(bits=bits, min_raw_value=0, max_raw_value=2**bits - 1, min_value=min_value, max_value=max_value) 

1884 

1885 @classmethod 

1886 def from_values(cls, min_raw_value: float, max_raw_value: float, min_value: float, max_value: float): 

1887 return cls( 

1888 bits=None, 

1889 min_raw_value=min_raw_value, 

1890 max_raw_value=max_raw_value, 

1891 min_value=min_value, 

1892 max_value=max_value, 

1893 ) 

1894 

1895 def to_transfer_function(self): 

1896 return TransferFunction(intervals=[(self.min_raw_value, self.min_value), (self.max_raw_value, self.max_value)]) 

1897 

1898 

1899class SignalUpdate(TwinPadModel): 

1900 value: float | str | bool | int | None = None 

1901 forced_value: float | str | bool | int | None = None 

1902 timestamp: int | None = None 

1903 

1904 

1905class SignalType(str, Enum): 

1906 command = "command" 

1907 sensor = "sensor" 

1908 external_sensor = "external_sensor" 

1909 interface = "interface" 

1910 

1911 

1912SIGNALDATA_TYPES = { 

1913 "int": NumericSignalData, 

1914 "float": NumericSignalData, 

1915 "str": StringSignalData, 

1916 "bool": NumericSignalData, 

1917 "epoch": NumericSignalData, 

1918} 

1919 

1920 

1921class LoopAddress(TwinPadModel): 

1922 card_number: int 

1923 channel: int 

1924 

1925 

1926class Address(LoopAddress): 

1927 loop_number: int 

1928 

1929 

1930class TransferFunction(TwinPadModel): 

1931 """ 

1932 A piecewise monotone linear function. 

1933 """ 

1934 

1935 intervals: list[tuple[float, float]] 

1936 

1937 def evaluate(self, x): 

1938 for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]): 

1939 if x1 <= x and x <= x2: 

1940 return (y2 - y1) / (x2 - x1) * (x - x1) + y1 

1941 raise ValueError("Out of bounds") 

1942 

1943 def reverse(self, y): 

1944 for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]): 

1945 if min(y1, y2) <= y <= max(y1, y2): 

1946 return (x2 - x1) / (y2 - y1) * (y - y1) + x1 

1947 raise ValueError(f"Out of bounds: {y} is not in {self.intervals[0][1]}, {self.intervals[0][1]}") 

1948 

1949 def compose(self, other_function): 

1950 if other_function is None: 

1951 return self 

1952 # for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]): 

1953 # other_function.reverse() 

1954 # Reversing other function x 

1955 new_x = {self.reverse(x) for (x, _) in other_function.intervals} 

1956 new_x.union(x for (x, _) in self.intervals) 

1957 new_intervals = [(x, other_function.evaluate(self.evaluate(x))) for x in sorted(new_x)] 

1958 return TransferFunction(intervals=new_intervals) 

1959 

1960 def inverse(self): 

1961 """ 

1962 Calculate the inverse function of this transfer function. 

1963 

1964 The inverse of a piecewise monotone linear function can be calculated by inverting each interval. 

1965 This means swapping x and y, then solving for y. 

1966 

1967 Returns: 

1968 TransferFunction: The inverse function of this transfer function. 

1969 """ 

1970 

1971 return TransferFunction([(x, y) for (y, x) in self.intervals]) 

1972 

1973 

1974class Signal(GenericMongo): 

1975 collection_name: ClassVar[str] = "signals" 

1976 

1977 signal_id: str | None = None 

1978 ticker: str 

1979 frequency: int 

1980 unit: str | None 

1981 description: str 

1982 type: SignalType 

1983 address: Address | LoopAddress | None = None 

1984 data_type: str 

1985 formula: str | None = None 

1986 transfer_function: TransferFunction | None = None 

1987 precision_digits: int | None 

1988 digitization_function: DigitizationFunction | None = None 

1989 forcible: bool 

1990 commandable: bool 

1991 broadcastable: bool 

1992 status: SignalStatus = SignalStatus() 

1993 

1994 post_processing: bool = False 

1995 computed_phases_ids: list[str] = [] 

1996 

1997 custom_pipeline_steps: ClassVar[dict] = { 

1998 "ticker": [ 

1999 { 

2000 "$addFields": { 

2001 "ticker": {"$ifNull": ["$ticker", {"$arrayElemAt": [{"$split": ["$signal_id", "."]}, -1]}]} 

2002 } 

2003 } 

2004 ] 

2005 } 

2006 

2007 @property 

2008 def device(self) -> Device: 

2009 device_or_config_id = self.signal_id.split(".")[0] 

2010 return Device.get_from_device_or_config_id(device_or_config_id) 

2011 

2012 @cached_property 

2013 def signal_data_class(self): 

2014 if self.data_type in SIGNALDATA_TYPES: 

2015 return SIGNALDATA_TYPES[self.data_type] 

2016 if self.data_type.startswith("enum"): 

2017 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

2018 raise ValueError(f"Unhandled python type: {self.data_type}") 

2019 

2020 @cached_property 

2021 def python_type(self): 

2022 if self.data_type in TYPES: 

2023 return TYPES[self.data_type] 

2024 if self.data_type.startswith("enum"): 

2025 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

2026 return Literal[*choices] 

2027 raise ValueError(f"Unhandled python type: {self.data_type}") 

2028 

2029 def to_config_dict(self): 

2030 return self.to_dict(exclude={"id", "signal_id", "status", "post_processing", "computed_phases_ids"}) 

2031 

2032 async def send_command(self, device_id: str, update_dict: SignalUpdate, current_user: User) -> dict: 

2033 command = Command( 

2034 sent_at=time.time(), 

2035 command_type="Signal command", 

2036 user_id=current_user.id, 

2037 ) 

2038 

2039 has_input_error = False 

2040 error_message = "" 

2041 

2042 if self.data_type.startswith("enum"): 

2043 enum_options = get_args(self.python_type) 

2044 

2045 if update_dict.value is not None and update_dict.value not in enum_options: 

2046 has_input_error = True 

2047 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

2048 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

2049 has_input_error = True 

2050 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

2051 else: 

2052 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

2053 has_input_error = True 

2054 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

2055 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

2056 has_input_error = True 

2057 error_message += ( 

2058 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

2059 ) 

2060 

2061 if has_input_error: 

2062 command.response_time = 0 

2063 command.succeeded = False 

2064 command.description = f"Tried to modify signal {self.signal_id}" 

2065 response = {"error": True, "status_code": 400, "message": error_message} 

2066 else: 

2067 response = await RabbitMQClient().send_signal_value(device_id, self.signal_id, update_dict) 

2068 command.receive_response(response) 

2069 

2070 Command.create(command) 

2071 return response 

2072 

2073 @classmethod 

2074 def from_excel( 

2075 cls, signals_sheet: Worksheet, card_indexes_to_refs: dict[str, str], tickers_to_addresses: dict[str, Address] 

2076 ) -> dict[int, Self]: 

2077 first_row = next(signals_sheet.iter_rows(max_row=1)) 

2078 header = list(cell.value for cell in first_row if cell.value is not None) 

2079 if header != XLSX_HEADER: 

2080 raise ValueError(f"Header of XLSX is not valid. got: {header}, expected: {XLSX_HEADER}") 

2081 

2082 tickers = [] 

2083 signals_by_component_id: dict[int, list] = {} 

2084 

2085 for row in signals_sheet.iter_rows(min_row=2): 

2086 if is_line_empty(row): 

2087 break 

2088 

2089 ticker = row[0].value 

2090 if not ticker: 

2091 raise ValueError("Ticker should not be empty.") 

2092 tickers.append(ticker) 

2093 

2094 signal_type = row[4].value.lower() 

2095 if signal_type not in ["sensor", "command", "external_sensor"]: 

2096 raise ValueError(f"Should be either sensor, command or external_sensor, got {signal_type}") 

2097 

2098 address = tickers_to_addresses.get(ticker, None) 

2099 intervals = literal_eval(row[6].value) if row[6].value is not None else None 

2100 sensor_transfer_function = TransferFunction(intervals=intervals) if intervals else None 

2101 if address is not None: 

2102 card_reference = card_indexes_to_refs.get((address.loop_number, address.card_number)) 

2103 digitization_function = DIGITIZATION_FUNCTIONS_FROM_REFERENCE.get(card_reference) 

2104 try: 

2105 transfer_function = digitization_function.to_transfer_function().compose(sensor_transfer_function) 

2106 except ValueError as error: 

2107 raise error 

2108 else: 

2109 digitization_function = None 

2110 transfer_function = None 

2111 

2112 type = row[8].value 

2113 if type: 

2114 type = type.replace('"', "'") 

2115 

2116 formula = row[9].value 

2117 if formula is not None: 

2118 formula = formula.lstrip(" ") 

2119 if formula.startswith("{"): 

2120 # Test formula mapping is parsable 

2121 literal_eval(formula) 

2122 

2123 if formula is None and address is None and signal_type not in ["external_sensor", "command"]: 

2124 raise ValueError( 

2125 f"{ticker} should either be on an I/O card or have a formula or be an interface. Current signal type: {signal_type}" 

2126 ) 

2127 

2128 if formula and address and signal_type != "command": 

2129 raise ValueError( 

2130 f"{ticker} is both on an I/O card and has a formula. This is forbidden for signal type {signal_type}." 

2131 ) 

2132 

2133 signal = cls( 

2134 ticker=ticker, 

2135 description=row[2].value if row[2].value else "", 

2136 unit=row[3].value, 

2137 type=signal_type, 

2138 address=address, 

2139 frequency=row[5].value, 

2140 transfer_function=transfer_function, 

2141 precision_digits=row[7].value, 

2142 digitization_function=digitization_function, 

2143 data_type=type, 

2144 formula=formula, 

2145 forcible=read_boolean_cell(row[10], True), 

2146 commandable=read_boolean_cell(row[11], True), 

2147 broadcastable=read_boolean_cell(row[12], True), 

2148 ) 

2149 

2150 component_id = row[1].value 

2151 signals_by_component_id.setdefault(component_id, []) 

2152 signals_by_component_id[component_id].append(signal) 

2153 

2154 distinct_tickers = set() 

2155 duplicated_tickers = { 

2156 ticker for ticker in tickers if ticker in distinct_tickers or distinct_tickers.add(ticker) 

2157 } 

2158 if len(duplicated_tickers) > 0: 

2159 raise ValueError(f"Duplicated ticker(s): {", ".join(duplicated_tickers)}") 

2160 

2161 return signals_by_component_id 

2162 

2163 @classmethod 

2164 def get_from_signal_id(cls, signal_id: str) -> Self: 

2165 """Could be generic from mongo""" 

2166 signal = Signal.get_one_by_attribute("signal_id", signal_id) 

2167 if signal is None: 

2168 split_signal_id = signal_id.split(".") 

2169 device_or_config_id = split_signal_id[0] 

2170 ticker = split_signal_id[-1] 

2171 possible_device = Device.get_from_device_or_config_id(device_or_config_id) 

2172 if possible_device is not None: 

2173 signal = Signal.get_one_by_attribute( 

2174 "signal_id", f"{possible_device.device_id}.{possible_device.config_id}.{ticker}" 

2175 ) 

2176 if not signal: 

2177 return None 

2178 return cls.dict_to_object(signal) 

2179 

2180 @classmethod 

2181 def get_all_ids(cls) -> list[str]: 

2182 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

2183 

2184 return [signal["signal_id"] for signal in cursor] 

2185 

2186 @classmethod 

2187 def get_all_statuses(cls) -> list[dict]: 

2188 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

2189 

2190 return [ 

2191 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

2192 for signal in cursor 

2193 ] 

2194 

2195 async def number_samples(self): 

2196 collection = get_signal_collection(signal_id=self.signal_id) 

2197 if collection is None: 

2198 return 0 

2199 

2200 number_samples = collection.estimated_document_count() 

2201 

2202 number_samples_async_collection = await get_async_collection( 

2203 systems_async_database, "number_samples", create=True, time_series=True 

2204 ) 

2205 

2206 loop = asyncio.get_running_loop() 

2207 loop.create_task( 

2208 number_samples_async_collection.insert_one( 

2209 { 

2210 "timestamp": datetime.datetime.now(pytz.UTC), 

2211 "signal_id": self.signal_id, 

2212 "number_samples": number_samples, 

2213 } 

2214 ) 

2215 ) 

2216 

2217 return number_samples 

2218 

2219 @classmethod 

2220 def total_number_samples(cls) -> int: 

2221 TwinPadActivity.get_number_samples_timeframe(0, 0, False) 

2222 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2223 

2224 if number_samples_collection is None: 

2225 return 0 

2226 

2227 result = number_samples_collection.aggregate( 

2228 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}] 

2229 ) 

2230 

2231 result = result.to_list() 

2232 if len(result) == 0: 

2233 return 0 

2234 return result[0]["amount"] 

2235 

2236 def sample_datasize(self): 

2237 return signals_database.command("collstats", self.signal_id)["size"] 

2238 

2239 @classmethod 

2240 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

2241 result = cls.collection().aggregate( 

2242 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

2243 ) 

2244 

2245 return {signal["signal_id"]: signal["forcible"] for signal in result} 

2246 

2247 

2248class ForcedSignal(GenericMongo): 

2249 collection_name: ClassVar[str] = "forced_signals" 

2250 

2251 signal_id: str 

2252 forcing_user_id: str 

2253 forced_at: float 

2254 value: str | float 

2255 

2256 def insert(self): 

2257 insert_result = self.collection().find_one_and_update( 

2258 {"signal_id": self.signal_id}, 

2259 {"$set": self.to_dict(exclude={"id"})}, 

2260 upsert=True, 

2261 return_document=ReturnDocument.AFTER, 

2262 ) 

2263 self.id = str(insert_result["_id"]) 

2264 return self.id 

2265 

2266 @classmethod 

2267 def can_force(cls, signal_id: str, current_user: User) -> bool: 

2268 """Checks whether user can force a given signal. 

2269 

2270 :param signal_id: Signal ID of the signal to force 

2271 :type signal_id: str 

2272 :param current_user: Current user 

2273 :type current_user: User 

2274 :return: False if the signal was forced by someone else than the user, True otherwise 

2275 :rtype: bool 

2276 """ 

2277 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

2278 if forced_signal is not None: 

2279 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

2280 return False 

2281 return True 

2282 

2283 

2284class Parameter(TwinPadModel): 

2285 name: str 

2286 value: str | float | bool 

2287 

2288 

2289class ConfigurationComponent(GenericMongo): 

2290 id: int 

2291 name: str 

2292 signals: list[Signal] 

2293 parameters: list[Parameter] = [] 

2294 reference: str = None 

2295 

2296 @classmethod 

2297 def from_excel(cls, components_sheet: Worksheet) -> list[Self]: 

2298 first_row = next(components_sheet.iter_rows(max_row=1, min_col=4)) 

2299 col_to_parameter_names = {header.column: header.value for header in first_row if header.value is not None} 

2300 

2301 components: list[ConfigurationComponent] = [] 

2302 for component_id, name, reference, *extra_parameters in components_sheet.iter_rows(min_row=2): 

2303 if component_id.value is None: 

2304 break 

2305 reference = reference.value if reference.value is not None else "" 

2306 

2307 parameters = [ 

2308 Parameter(name=col_to_parameter_names[parameter.column], value=parameter.value) 

2309 for parameter in extra_parameters 

2310 if parameter.value is not None 

2311 ] 

2312 components.append( 

2313 cls( 

2314 id=component_id.value, 

2315 name=name.value, 

2316 reference=reference, 

2317 parameters=parameters, 

2318 signals=[], 

2319 ) 

2320 ) 

2321 

2322 return components 

2323 

2324 def to_config_dict(self): 

2325 component_dict = self.to_dict(exclude={}) 

2326 component_dict["signals"] = [signal.to_config_dict() for signal in self.signals] 

2327 return component_dict 

2328 

2329 

2330class ServicesStatus(TwinPadModel): 

2331 backend: str 

2332 cloud_broker: str 

2333 time_series_database: str 

2334 signal_storage: str 

2335 heartbeat_storage: str 

2336 data_analyzer: str 

2337 

2338 @classmethod 

2339 def check(cls) -> Self: 

2340 return cls( 

2341 cloud_broker=ping(RABBITMQ_HOST), 

2342 backend="up", 

2343 time_series_database=ping(MONGO_HOST), 

2344 signal_storage=ping(SIGNAL_STORAGE_HOST), 

2345 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

2346 data_analyzer=ping(DATA_ANALYZER_HOST), 

2347 ) 

2348 

2349 

2350def ping(host): 

2351 try: 

2352 if ping3.ping(host, timeout=0.8): 

2353 return "up" 

2354 except PermissionError: 

2355 pass 

2356 return "down" 

2357 

2358 

2359class Event(GenericMongo): 

2360 collection_name: ClassVar[str] = "events" 

2361 

2362 name: str 

2363 timestamp: float 

2364 event_rule_id: str 

2365 

2366 @computed_field 

2367 @cached_property 

2368 def event_rule(self) -> "EventRule": 

2369 return EventRule.get_from_id(self.event_rule_id) 

2370 

2371 @classmethod 

2372 def dict_to_object(cls, dict_): 

2373 """Refine to convert timestamp to datetime for mongodb.""" 

2374 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

2375 return super().dict_to_object(dict_) 

2376 

2377 

2378class TwinPadActivity(GenericMongo): 

2379 timestamp: float 

2380 amount: int 

2381 

2382 @classmethod 

2383 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

2384 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2385 number_events_collection = get_collection(systems_database, "number_events") 

2386 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

2387 items = [] 

2388 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2389 if number_events_collection is None or recompute_amount: 

2390 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

2391 number_events_collection.delete_many({}) 

2392 first_event = events_collection.find_one(sort={"timestamp": 1}) 

2393 if first_event is None: 

2394 return items 

2395 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

2396 tzinfo=pytz.UTC 

2397 ) 

2398 while last_computed_day < TODAY: 

2399 day_nb_events = events_collection.count_documents( 

2400 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2401 ) 

2402 if day_nb_events > 0: 

2403 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

2404 last_computed_day += ONE_DAY_OFFSET 

2405 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2406 if number_events_today > 0: 

2407 number_events_collection.delete_many({"timestamp": TODAY}) 

2408 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

2409 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2410 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2411 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2412 for day in number_events: 

2413 day["timestamp"] = day["timestamp"].timestamp() 

2414 items.append(cls.mongo_dict_to_object(day)) 

2415 return items 

2416 

2417 @classmethod 

2418 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2419 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2420 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2421 signals_number_samples_collection = get_collection( 

2422 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

2423 ) 

2424 items = [] 

2425 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2426 if number_samples_collection is None or recompute_amount: 

2427 number_samples_collection = get_collection( 

2428 systems_database, "number_received_samples", create=True, time_series=True 

2429 ) 

2430 number_samples_collection.delete_many({}) 

2431 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

2432 if first_sample is None: 

2433 return items 

2434 # compute from day of first found event 

2435 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

2436 tzinfo=pytz.UTC 

2437 ) 

2438 while last_computed_day < TODAY: 

2439 number_samples_request = signals_number_samples_collection.aggregate( 

2440 [ 

2441 { 

2442 "$match": { 

2443 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

2444 } 

2445 }, 

2446 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2447 ] 

2448 ).to_list() 

2449 if len(number_samples_request) == 0: 

2450 number_samples = 0 

2451 else: 

2452 number_samples = number_samples_request[0].get("number_samples", 0) 

2453 if number_samples > 0: 

2454 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

2455 last_computed_day += ONE_DAY_OFFSET 

2456 number_samples_request = signals_number_samples_collection.aggregate( 

2457 [ 

2458 {"$match": {"timestamp": {"$gte": TODAY}}}, 

2459 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2460 ] 

2461 ).to_list() 

2462 if len(number_samples_request) == 0: 

2463 number_samples_today = 0 

2464 else: 

2465 number_samples_today = number_samples_request[0].get("number_samples", 0) 

2466 if number_samples_today > 0: 

2467 number_samples_collection.delete_many({"timestamp": TODAY}) 

2468 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

2469 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2470 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2471 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2472 for day in number_events: 

2473 day["timestamp"] = day["timestamp"].timestamp() 

2474 items.append(cls.mongo_dict_to_object(day)) 

2475 return items 

2476 

2477 @classmethod 

2478 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2479 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2480 number_commands_collection = get_collection(systems_database, "number_commands") 

2481 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

2482 items = [] 

2483 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2484 if number_commands_collection is None or recompute_amount: 

2485 number_commands_collection = get_collection( 

2486 systems_database, "number_commands", create=True, time_series=True 

2487 ) 

2488 number_commands_collection.delete_many({}) 

2489 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

2490 if first_command is None: 

2491 return items 

2492 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

2493 tzinfo=pytz.UTC 

2494 ) 

2495 while last_computed_day < TODAY: 

2496 day_nb_commands = commands_collection.count_documents( 

2497 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2498 ) 

2499 if day_nb_commands > 0: 

2500 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

2501 last_computed_day += ONE_DAY_OFFSET 

2502 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2503 if number_commands_today > 0: 

2504 number_commands_collection.delete_many({"timestamp": TODAY}) 

2505 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

2506 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2507 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2508 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2509 for day in number_commands: 

2510 day["timestamp"] = day["timestamp"].timestamp() 

2511 items.append(cls.mongo_dict_to_object(day)) 

2512 return items 

2513 

2514 

2515class EventRule(GenericMongo): 

2516 collection_name: ClassVar[str] = "event_rules" 

2517 

2518 name: str 

2519 formula: str 

2520 variables: list[str] = [] 

2521 

2522 @computed_field 

2523 @cached_property 

2524 def number_events(self) -> int: 

2525 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

2526 

2527 @classmethod 

2528 def from_excel(cls, event_rules_sheet: Worksheet) -> list[Self]: 

2529 event_rules = [] 

2530 for event_name, formula in event_rules_sheet.iter_rows(min_row=2, min_col=2, max_col=3): 

2531 if event_name.value is None or formula.value is None: 

2532 break 

2533 event_rules.append(EventRule(name=event_name.value, formula=formula.value)) 

2534 

2535 return event_rules 

2536 

2537 def to_config_dict(self): 

2538 return self.to_dict(exclude={"id", "variables", "number_events"}) 

2539 

2540 

2541class Company(GenericMongo): 

2542 collection_name: ClassVar[str] = "companies" 

2543 name: str 

2544 

2545 

2546class Campaign(GenericMongo): 

2547 collection_name: ClassVar[str] = "campaigns" 

2548 

2549 # Properties 

2550 id: str | None = None 

2551 name: str 

2552 description: str | None = None 

2553 

2554 

2555class Phase(GenericMongo): 

2556 collection_name: ClassVar[str] = "phases" 

2557 

2558 # Properties 

2559 id: str | None = None 

2560 name: str 

2561 description: str | None = None 

2562 start_at: float 

2563 end_at: float 

2564 

2565 # FK 

2566 campaign_id: MongoId 

2567 

2568 @classmethod 

2569 def deleteMany(cls, campaign_id): 

2570 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

2571 return delete_phases 

2572 

2573 

2574class CustomViewCreation(GenericMongo): 

2575 collection_name: ClassVar[str] = "custom_views" 

2576 

2577 name: str 

2578 configuration: list 

2579 

2580 

2581class CustomView(CustomViewCreation): 

2582 # Properties 

2583 id: str | None = None 

2584 

2585 # Foreign Key 

2586 user_id: str 

2587 

2588 

2589CustomViewUpdate = create_update_model(CustomView) 

2590 

2591 

2592class Video(GenericMongo): 

2593 collection_name: ClassVar[str] = "videos" 

2594 

2595 # Properties 

2596 name: str 

2597 ip_addr: str 

2598 username: str | None = None 

2599 password: str | None = None 

2600 

2601 # Methods 

2602 @classmethod 

2603 def get_all(cls, sort_by="_id") -> list[Self]: 

2604 items = [] 

2605 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

2606 items.append(cls.mongo_dict_to_object(dict_)) 

2607 return items 

2608 

2609 @classmethod 

2610 def get_video(cls, camera_id: ObjectId): 

2611 camera = cls.get_from_id(camera_id) 

2612 if camera is not None: 

2613 return camera.name 

2614 return None 

2615 

2616 

2617class Command(GenericMongo): 

2618 collection_name: ClassVar[str] = "commands" 

2619 

2620 # Properties 

2621 timestamp: datetime.datetime = None 

2622 sent_at: float 

2623 response_time: float = 0.0 

2624 command_type: str 

2625 description: str = "" 

2626 succeeded: bool = False 

2627 

2628 # Foreign key 

2629 user_id: str 

2630 

2631 @classmethod 

2632 def collection(cls): 

2633 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

2634 

2635 @classmethod 

2636 def create(cls, command: Self): 

2637 command = cls( 

2638 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

2639 sent_at=command.sent_at, 

2640 response_time=command.response_time, 

2641 command_type=command.command_type, 

2642 description=command.description, 

2643 succeeded=command.succeeded, 

2644 user_id=command.user_id, 

2645 ) 

2646 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2647 if new_command is None: 

2648 return None 

2649 return {"command_id": str(new_command.inserted_id)} 

2650 

2651 def receive_response(self, response: dict): 

2652 self.response_time = time.time() - self.sent_at 

2653 self.succeeded = response.get("error", True) is False 

2654 if self.description == "": 

2655 self.description += response.get("message", "").rstrip() 

2656 

2657 

2658class SignalsPresetCreation(GenericMongo): 

2659 name: str 

2660 signal_ids: list[str] 

2661 

2662 

2663class SignalsPreset(SignalsPresetCreation): 

2664 collection_name: ClassVar[str] = "signals_presets" 

2665 

2666 user_id: str 

2667 

2668 @classmethod 

2669 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2670 signals_preset = cls( 

2671 user_id=user_id, 

2672 name=signals_preset.name, 

2673 signal_ids=signals_preset.signal_ids, 

2674 ) 

2675 

2676 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2677 

2678 return str(new_signal_preset.inserted_id) 

2679 

2680 

2681SignalsPresetUpdate = create_update_model(SignalsPreset) 

2682 

2683 

2684class LineStyle(str, Enum): 

2685 solid = "solid" 

2686 dotted = "dotted" 

2687 dashed = "dashed" 

2688 

2689 

2690class SignalAppearance: 

2691 value_color: str 

2692 forced_value_color: str 

2693 

2694 

2695class GraphThemeCreation(GenericMongo): 

2696 collection_name: ClassVar[str] = "graph_themes" 

2697 

2698 name: str 

2699 signal_id: str 

2700 value_color: str = "" 

2701 forced_value_color: str = "" 

2702 value_line_style: LineStyle = LineStyle.solid 

2703 forced_value_line_style: LineStyle = LineStyle.solid 

2704 private: bool = True 

2705 

2706 

2707class PublicGraphTheme(GraphThemeCreation): 

2708 created_by_user: bool 

2709 in_user_library: bool 

2710 active_for_user: bool 

2711 

2712 _current_user_id: str = "" 

2713 

2714 @classproperty 

2715 def custom_pipeline_steps(cls) -> dict[str, list]: 

2716 return { 

2717 "created_by_user": [ 

2718 { 

2719 "$addFields": { 

2720 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2721 } 

2722 } 

2723 ], 

2724 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2725 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2726 ], 

2727 "in_user_library": [ 

2728 { 

2729 "$addFields": { 

2730 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2731 } 

2732 } 

2733 ], 

2734 "active_for_user": [ 

2735 { 

2736 "$addFields": { 

2737 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2738 } 

2739 } 

2740 ], 

2741 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2742 "active": [ 

2743 { 

2744 "$addFields": { 

2745 "active": "$$REMOVE", 

2746 } 

2747 } 

2748 ], 

2749 "creator_id": [ 

2750 { 

2751 "$addFields": { 

2752 "creator_id": "$$REMOVE", 

2753 } 

2754 } 

2755 ], 

2756 } 

2757 

2758 @classmethod 

2759 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2760 cls._current_user_id = user_id 

2761 return super().response_from_query(query) 

2762 

2763 @classmethod 

2764 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2765 query.in_user_library = "true" 

2766 return cls.response_from_query(query, user_id) 

2767 

2768 @classmethod 

2769 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2770 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2771 

2772 @classmethod 

2773 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2774 cls._current_user_id = user_id 

2775 return super().get_by_attribute(attribute_name, attribute_value) 

2776 

2777 @classmethod 

2778 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2779 cls._current_user_id = user_id 

2780 return super().get_one_by_attribute(attribute_name, attribute_value) 

2781 

2782 @classmethod 

2783 def get_all(cls, sort_by: str, user_id: str): 

2784 cls._current_user_id = user_id 

2785 return super().get_all(sort_by) 

2786 

2787 @classmethod 

2788 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2789 pipeline = [ 

2790 { 

2791 "$match": { 

2792 "active": {"$eq": user_id}, 

2793 "signal_id": {"$in": signal_ids}, 

2794 } 

2795 }, 

2796 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2797 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2798 { 

2799 "$project": { 

2800 "_id": 0, 

2801 "signal_id": 1, 

2802 "value_color": 1, 

2803 "forced_value_color": 1, 

2804 "value_line_style": 1, 

2805 "forced_value_line_style": 1, 

2806 } 

2807 }, 

2808 ] 

2809 

2810 result = {} 

2811 

2812 cursor = cls.collection().aggregate(pipeline) 

2813 for document in cursor: 

2814 signal_id = document["signal_id"] 

2815 del document["signal_id"] 

2816 result[signal_id] = document 

2817 

2818 return result 

2819 

2820 

2821GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2822 

2823 

2824class PrivateGraphTheme(GraphThemeCreation): 

2825 # private 

2826 creator_id: str 

2827 in_library: list[str] 

2828 active: list[str] 

2829 

2830 @classmethod 

2831 def create( 

2832 cls, 

2833 creator_id: str, 

2834 name: str, 

2835 signal_id: str, 

2836 value_color: str, 

2837 forced_value_color: str, 

2838 value_line_style: LineStyle, 

2839 forced_value_line_style: LineStyle, 

2840 private: bool, 

2841 ): 

2842 color_setting = cls( 

2843 creator_id=creator_id, 

2844 name=name, 

2845 signal_id=signal_id, 

2846 value_color=value_color, 

2847 forced_value_color=forced_value_color, 

2848 value_line_style=value_line_style, 

2849 forced_value_line_style=forced_value_line_style, 

2850 private=private, 

2851 in_library=[creator_id], 

2852 active=[creator_id], 

2853 ) 

2854 

2855 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2856 color_setting.id = str(new_color_setting.inserted_id) 

2857 return color_setting 

2858 

2859 def update(self, update_dict: dict, user_id: str): 

2860 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2861 if in_user_lib and user_id not in self.in_library: 

2862 self.in_library.append(user_id) 

2863 elif not in_user_lib and user_id in self.in_library: 

2864 self.in_library.remove(user_id) 

2865 update_dict["in_library"] = self.in_library 

2866 del update_dict["in_user_library"] 

2867 

2868 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2869 if active_for_user and user_id not in self.active: 

2870 self.active.append(user_id) 

2871 elif not active_for_user and user_id in self.active: 

2872 self.active.remove(user_id) 

2873 update_dict["active"] = self.active 

2874 del update_dict["active_for_user"] 

2875 

2876 if update_dict.get("created_by_user") is not None: 

2877 del update_dict["created_by_user"] 

2878 

2879 self.collection().find_one_and_update( 

2880 {"_id": ObjectId(self.id)}, 

2881 {"$set": update_dict}, 

2882 ) 

2883 

2884 return {} 

2885 

2886 

2887class Configuration(GenericMongo): 

2888 model_config = ConfigDict(serialize_by_alias=True) 

2889 

2890 collection_name: ClassVar[str] = "configs" 

2891 

2892 # Properties 

2893 json_schema: str | None = Field(default=None, serialization_alias="$schema") 

2894 config_name: str | None = None 

2895 config_id: str | None = None 

2896 generated_at: float 

2897 config: dict 

2898 modes: list[Mode] 

2899 components: list[ConfigurationComponent] 

2900 hardware_topology: EtherCatTopology 

2901 petri_network: dict | None = None 

2902 pid: dict | None = None 

2903 event_rules: list[EventRule] 

2904 

2905 received_at: float | None = None 

2906 in_use_by_devices: list[str] = [] 

2907 is_in_use: bool = False 

2908 

2909 custom_pipeline_steps = { 

2910 "is_in_use": [ 

2911 { 

2912 "$addFields": { 

2913 "is_in_use": { 

2914 "$cond": [ 

2915 {"$gt": [{"$size": {"$ifNull": ["$in_use_by_devices", []]}}, 0]}, 

2916 True, 

2917 False, 

2918 ] 

2919 }, 

2920 } 

2921 } 

2922 ], 

2923 } 

2924 

2925 @classmethod 

2926 def get_all_names_devices(cls) -> list[dict[str, str]]: 

2927 cursor = cls.collection().aggregate( 

2928 [ 

2929 {"$replaceRoot": {"newRoot": {"$mergeObjects": ["$config", "$$ROOT"]}}}, 

2930 {"$project": {"config_id": 1, "config_name": 1, "target_device_id": 1, "device_name": 1, "_id": 0}}, 

2931 {"$sort": {"signal_id": 1}}, 

2932 ] 

2933 ) 

2934 

2935 return cursor.to_list() 

2936 

2937 def to_config_dict(self, exclude=set()): 

2938 for property in ("id", "received_at", "in_use_by_devices", "is_in_use"): 

2939 exclude.add(property) 

2940 config = self.to_dict(exclude=exclude) 

2941 config["components"] = [component.to_config_dict() for component in self.components] 

2942 config["event_rules"] = [event_rule.to_config_dict() for event_rule in self.event_rules] 

2943 return config 

2944 

2945 def to_json(self): 

2946 return json.dumps(self.to_config_dict(), indent=2) 

2947 

2948 @classmethod 

2949 def get_from_config_id(cls, config_id: str, exclude_sensitive_info: bool = True) -> Self: 

2950 pipeline = [] 

2951 if len(config_id) == 24: 

2952 pipeline.append({"$match": {"_id": ObjectId(config_id)}}) 

2953 else: 

2954 pipeline.append({"$match": {"config_id": config_id}}) 

2955 pipeline.append({"$limit": 1}) 

2956 for _, step in cls.custom_pipeline_steps.items(): 

2957 pipeline.extend(step) 

2958 items = cls.collection().aggregate(pipeline).to_list() 

2959 if len(items) == 0: 

2960 return None 

2961 dict_ = items[0] 

2962 # There is some protected information in the config dict, so keep only specific keys 

2963 if exclude_sensitive_info: 

2964 allowed_config_keys = ["description", "broker_host", "target_device_id", "device_name"] 

2965 config_dict = dict_.get("config") 

2966 dict_["config"] = {k: config_dict[k] for k in allowed_config_keys} 

2967 return cls.mongo_dict_to_object(dict_) 

2968 

2969 @classmethod 

2970 def get_from_excel( 

2971 cls, 

2972 config_name: str, 

2973 excel_file: ReadableFile, 

2974 pid_json: ReadableFile | None, 

2975 petri_json: ReadableFile | None, 

2976 ) -> Self: 

2977 """ 

2978 Converts a device's configuration from an excel file to a proper JSON configuration. 

2979 

2980 :param config_name: The configuration's name. 

2981 :type config_name: str 

2982 :param excel_file: The path to open or a file-like object of the excel file. 

2983 :type excel_file: str | os.PathLike[str] | IO[bytes] | bytes 

2984 :param pid_json: The path to open or a file-like object of the PID's JSON file. 

2985 :type pid_json: str | os.PathLike[str] | IO[bytes] | bytes 

2986 :param petri_json: The path to open or a file-like object of the Petri Network's JSON file. 

2987 :type petri_json: str | os.PathLike[str] | IO[bytes] | bytes 

2988 

2989 :raises ValueError: When excel file is invalid in any way. 

2990 :raises FileNotFoundError: When the schema file used to verify the configuration was not found locally. 

2991 

2992 :return: Dictionary representing the loaded configuration. 

2993 :rtype: Configuration 

2994 """ 

2995 workbook: Workbook = load_workbook(filename=excel_file) 

2996 

2997 sheets_by_name = {s.title: s for s in workbook.worksheets} 

2998 # These can generate errors if the sheets don't exist, they are meant to be caught 

2999 signals_sheet = sheets_by_name["signals"] 

3000 components_sheet = sheets_by_name["components"] 

3001 config_sheet = sheets_by_name["config"] 

3002 modes_sheet = sheets_by_name["modes"] 

3003 event_rules_sheet = sheets_by_name["event_rules"] 

3004 

3005 topology, card_indexes_to_refs, tickers_to_addresses = EtherCatTopology.from_excel(workbook) 

3006 

3007 workbook.close() 

3008 

3009 components = ConfigurationComponent.from_excel(components_sheet) 

3010 components_by_id = {component.id: component for component in components} 

3011 

3012 signals_by_component_id = Signal.from_excel(signals_sheet, card_indexes_to_refs, tickers_to_addresses) 

3013 

3014 for component_id, signals in signals_by_component_id.items(): 

3015 component = components_by_id.get(component_id) 

3016 if component is None: 

3017 raise ValueError( 

3018 f"Component with id #{component_id} does not exist. Mnemonic(s) {[signal.ticker for signal in signals]} is/are orphaned." 

3019 ) 

3020 component.signals = signals 

3021 

3022 device_config = {"description": ""} # Backwards compatibility 

3023 for key, value in config_sheet.iter_rows(max_col=2): 

3024 device_config[key.value] = value.value 

3025 

3026 if device_config.get("target_device_id") is None: 

3027 if (target_device_id := device_config.get("device_id", None)) is not None: 

3028 device_config["target_device_id"] = target_device_id 

3029 del device_config["device_id"] 

3030 else: 

3031 raise ValueError("Missing target device id.") 

3032 

3033 modes = Mode.from_excel(modes_sheet) 

3034 event_rules = EventRule.from_excel(event_rules_sheet) 

3035 

3036 hwconfig = { 

3037 "config_name": config_name, 

3038 "components": [c.to_config_dict() for c in components], 

3039 "modes": [m.to_dict() for m in modes], 

3040 "hardware_topology": topology.to_dict(), 

3041 "event_rules": [e.to_config_dict() for e in event_rules], 

3042 } 

3043 

3044 # Adding petri 

3045 if petri_json is not None: 

3046 petri = json.load(petri_json) 

3047 hwconfig["petri_network"] = petri 

3048 else: 

3049 hwconfig["petri_network"] = None 

3050 

3051 # Adding PID 

3052 if pid_json is not None: 

3053 pid = json.load(pid_json) 

3054 hwconfig["pid"] = pid 

3055 else: 

3056 hwconfig["pid"] = None 

3057 

3058 json_config = json.dumps(hwconfig, sort_keys=True, indent=2).encode() 

3059 config_hash = hashlib.md5(json_config).hexdigest()[:12] 

3060 

3061 schema = ( 

3062 "https://gitea.spacedreams.com/SpaceDreams/Twinpad/src/branch/master/data-storage/device-config_v2.json" 

3063 ) 

3064 

3065 hwconfig["json_schema"] = schema 

3066 hwconfig["generated_at"] = round(time.time()) 

3067 hwconfig["config_id"] = config_hash 

3068 hwconfig["config"] = device_config 

3069 # Since some properties in signals are ignored, add back full signal objects after hash computing 

3070 hwconfig["components"] = [c.to_dict() for c in components] 

3071 

3072 insert_result = cls.collection().find_one_and_update( 

3073 {"config_id": config_hash, "config": device_config}, 

3074 {"$set": hwconfig}, 

3075 upsert=True, 

3076 return_document=ReturnDocument.AFTER, 

3077 ) 

3078 

3079 return str(insert_result["_id"]) 

3080 

3081 def compute_config_id(self): 

3082 config_dict = self.to_config_dict( 

3083 exclude={ 

3084 "id", 

3085 "json_schema", 

3086 "generated_at", 

3087 "config_id", 

3088 "config", 

3089 "received_at", 

3090 "in_use_by_devices", 

3091 "is_in_use", 

3092 } 

3093 ) 

3094 

3095 json_config = json.dumps(config_dict, sort_keys=True, indent=2).encode() 

3096 return hashlib.md5(json_config).hexdigest()[:12] 

3097 

3098 def update( 

3099 self, config_name: str, petri_file: ReadableFile | None, pid_file: ReadableFile | None, save_as_new: bool 

3100 ): 

3101 if config_name is not None: 

3102 self.config_name = config_name 

3103 if petri_file is not None: 

3104 self.petri_network = json.load(petri_file) 

3105 if pid_file is not None: 

3106 self.pid = json.load(pid_file) 

3107 

3108 self.config_id = self.compute_config_id() 

3109 self.generated_at = round(time.time()) 

3110 

3111 insert_dict = self.to_dict(exclude={"id", "received_at", "in_use_by_devices"}, by_alias=False) 

3112 if save_as_new: 

3113 self.collection().find_one_and_update( 

3114 {"config_id": self.config_id, "config": self.config}, 

3115 {"$set": insert_dict}, 

3116 upsert=True, 

3117 ) 

3118 else: 

3119 self.collection().find_one_and_update({"_id": ObjectId(self.id)}, {"$set": insert_dict}) 

3120 

3121 return self.config_id 

3122 

3123 

3124DIGITIZATION_FUNCTIONS_FROM_REFERENCE = { 

3125 "EL1809": DigitizationFunction.from_bits(1, 0, 1), 

3126 "EL1819": DigitizationFunction.from_bits(1, 0, 1), 

3127 "EL1918": DigitizationFunction.from_bits(1, 0, 1), 

3128 "EL3124": DigitizationFunction.from_bits(15, 0.004, 0.020), 

3129 "EL3062-0030": DigitizationFunction.from_bits(15, 0.0, 30.0), 

3130 "EL3356-0020": DigitizationFunction.from_bits(24, -12.0, 12.0), 

3131 "EL2004": DigitizationFunction.from_bits(1, 0, 1), 

3132 "EL2042": DigitizationFunction.from_bits(1, 0, 1), 

3133 "EL1004": DigitizationFunction.from_bits(1, 0, 1), 

3134 "EL3054": DigitizationFunction.from_bits(15, 0.004, 0.020), 

3135 "EL3202": DigitizationFunction.from_values(-2000, 8500, -200, 850), 

3136 "EL4022": DigitizationFunction.from_bits(15, 0.004, 0.020), 

3137 "EL3064": DigitizationFunction.from_bits(15, 0.0, 10.0), 

3138 "EL2022": DigitizationFunction.from_bits(1, 0, 1), 

3139 "ELX2008": DigitizationFunction.from_bits(1, 0, 1), 

3140 "EPX3158": DigitizationFunction.from_bits(15, 0.004, 0.020), 

3141 "EPX1058": DigitizationFunction.from_bits(1, 0, 1), 

3142 "ELX4154": DigitizationFunction.from_bits(15, 0.004, 0.020), 

3143 "ELM3704": DigitizationFunction.from_bits(31, 0.004, 0.020), 

3144 "EL3062": DigitizationFunction.from_bits(15, 0, 10), 

3145 "EL3351": DigitizationFunction.from_bits(15, -0.020, 0.020), 

3146 "EL9410": DigitizationFunction.from_bits(1, 0, 1), 

3147 "EL2904": DigitizationFunction.from_bits(1, 0, 1), 

3148 "EL2911": DigitizationFunction.from_bits(1, 0, 1), 

3149} 

3150 

3151REFERENCED_CARDS = list(DIGITIZATION_FUNCTIONS_FROM_REFERENCE.keys()) + ["EK1100", "EL9011"] 

3152 

3153 

3154class EtherCatModule(TwinPadModel): 

3155 name: str 

3156 reference: str 

3157 signals: list[str] = [] 

3158 

3159 

3160class EtherCatLoop(TwinPadModel): 

3161 terminals: list[EtherCatModule] 

3162 

3163 @cached_property 

3164 def signals(self): 

3165 signals = [] 

3166 for terminal in self.terminals: 

3167 signals.extend(terminal.signals) 

3168 return signals 

3169 

3170 

3171class EtherCatTopology(TwinPadModel): 

3172 loops: list[EtherCatLoop] 

3173 

3174 @cached_property 

3175 def signals(self) -> list[str]: 

3176 signals = [] 

3177 for loop in self.loops: 

3178 signals.extend(loop.signals) 

3179 return signals 

3180 

3181 @classmethod 

3182 def from_excel(cls, excel_workbook: Workbook): 

3183 loop_number = 0 

3184 loops = [] 

3185 tickers_to_addresses = {} 

3186 card_indexes_to_refs = {} 

3187 for sheet in excel_workbook.worksheets: 

3188 if sheet.title.startswith("loop_"): 

3189 if sheet["B3"].value != "Position": 

3190 raise ValueError( 

3191 f'Invalid channel position.\nCell B3: (currently "{sheet["B3"].value}") should be "Position"' 

3192 ) 

3193 loop_number += 1 

3194 

3195 # Acquisition cards 

3196 for row_index, row in enumerate(sheet.iter_rows(min_row=4, min_col=3)): 

3197 if is_line_empty(row): 

3198 break 

3199 for cell_index, cell in enumerate(row): 

3200 if cell.value: 

3201 tickers_to_addresses[cell.value] = Address( 

3202 card_number=cell_index + 1, channel=row_index + 1, loop_number=loop_number 

3203 ) 

3204 

3205 modules = [] 

3206 for column_index, (name, reference, _, *tickers) in enumerate(sheet.iter_cols(min_col=3, min_row=1)): 

3207 if is_line_empty((name, reference, *tickers)): 

3208 break 

3209 if reference.value not in REFERENCED_CARDS: 

3210 raise NotImplementedError( 

3211 f"Card '{reference.value}' hasn't been referenced yet. Report this to system administrators." 

3212 ) 

3213 card_indexes_to_refs[loop_number, column_index + 1] = reference.value 

3214 module = EtherCatModule( 

3215 name=name.value, 

3216 reference=reference.value if reference.value is not None else "", 

3217 signals=[str(ticker_cell.value) for ticker_cell in tickers if ticker_cell.value is not None], 

3218 ) 

3219 modules.append(module) 

3220 

3221 loops.append(EtherCatLoop(terminals=modules)) 

3222 

3223 return cls(loops=loops), card_indexes_to_refs, tickers_to_addresses 

3224 

3225 

3226class DeviceStatus(str, Enum): 

3227 started = "started" 

3228 running = "running" 

3229 created = "created" 

3230 exited = "exited" 

3231 restarting = "restarting" 

3232 

3233 

3234class DeviceUpdateFromDeployer(BaseModel): 

3235 status: DeviceStatus 

3236 

3237 

3238class DeviceFromDeployerCreation(BaseModel): 

3239 name: str 

3240 description: str 

3241 

3242 

3243class DeviceFromDeployer(DeviceFromDeployerCreation): 

3244 status: DeviceStatus 

3245 device_id: DeviceId 

3246 logs: str = "" 

3247 

3248 

3249class DeviceDeployer(GenericMongo): 

3250 collection_name: ClassVar[str] = "device_deployers" 

3251 url: HttpUrl 

3252 

3253 def endpoint_url(self, endpoint): 

3254 return f"{str(self.url).rstrip('/')}/{endpoint}" 

3255 

3256 def devices(self) -> list[DeviceFromDeployer]: 

3257 devices = [] 

3258 try: 

3259 response = requests.get(self.endpoint_url("devices")) 

3260 except requests.exceptions.ConnectionError: 

3261 logger.info("connection error") 

3262 return None 

3263 if response.status_code != 200: 

3264 return None 

3265 for device_dict in response.json()["devices"]: 

3266 devices.append( 

3267 DeviceFromDeployer( 

3268 device_id=device_dict["device_id"], 

3269 name=device_dict["container_name"], 

3270 description="desc", 

3271 status=device_dict["status"], 

3272 logs=device_dict["logs"], 

3273 ) 

3274 ) 

3275 return devices 

3276 

3277 def get_device(self, device_id: DeviceId): 

3278 try: 

3279 response = requests.get(self.endpoint_url(f"devices/{device_id}")) 

3280 except requests.exceptions.ConnectionError: 

3281 return None 

3282 if response.status_code != 200: 

3283 return None 

3284 device_dict = response.json() 

3285 return DeviceFromDeployer( 

3286 device_id=device_dict["device_id"], 

3287 name=device_dict["container_name"], 

3288 description="desc", 

3289 status=device_dict["status"], 

3290 logs=device_dict["logs"], 

3291 ) 

3292 

3293 def create_device(self, device: DeviceFromDeployer) -> Device | None: 

3294 try: 

3295 response = requests.post(self.endpoint_url("devices"), json={"name": device.name}) 

3296 except requests.exceptions.ConnectionError: 

3297 return None 

3298 

3299 if response.status_code != 201: 

3300 return None 

3301 

3302 device_dict = response.json() 

3303 return DeviceFromDeployer( 

3304 device_id=device_dict["device_id"], 

3305 name="", 

3306 description="desc", 

3307 status=device_dict["status"], 

3308 ) 

3309 

3310 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None: 

3311 try: 

3312 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump()) 

3313 except requests.exceptions.ConnectionError: 

3314 return None 

3315 

3316 if response.status_code != 200: 

3317 return None 

3318 

3319 device_dict = response.json() 

3320 return Device( 

3321 device_id=device_dict["device_id"], 

3322 name="", 

3323 description="desc", 

3324 pid={}, 

3325 petri_network={}, 

3326 modes=[], 

3327 status=device_dict["status"], 

3328 ) 

3329 

3330 def delete_device(self, device_id: DeviceId) -> DeleteInfo: 

3331 try: 

3332 response = requests.delete(self.endpoint_url(f"devices/{device_id}")) 

3333 except requests.exceptions.ConnectionError: 

3334 return DeleteInfo(is_deleted=False, detail="Connection to deployer error") 

3335 if response.status_code not in [200, 202, 204]: 

3336 return DeleteInfo(is_deleted=False, detail=response.text) 

3337 

3338 return DeleteInfo(is_deleted=True, detail="") 

3339 

3340 

3341DeviceDeployerUpdate = create_update_model(DeviceDeployer)