Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 89%

1401 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-20 11:44 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14import re 

15 

16import zipfile 

17import ping3 

18import pytz 

19from bson.objectid import ObjectId 

20from pymongo import ASCENDING, ReturnDocument 

21from pymongo.collation import Collation 

22from pydantic import BaseModel, computed_field, Field, create_model 

23import numpy as npy 

24import lttb 

25import h5py 

26from PIL import Image 

27from openpyxl import Workbook 

28from openpyxl.styles import Border, Side 

29 

30# from scipy import signal as signal_scipy 

31 

32from twinpad_backend.db import ( 

33 get_collection, 

34 get_async_collection, 

35 get_signal_collection, 

36 systems_database, 

37 systems_async_database, 

38 signals_database, 

39 devices_states_database, 

40) 

41from twinpad_backend.responses import ListResponse 

42from twinpad_backend.messages import send_mode_change, send_signal_value 

43from twinpad_backend.xml_parsing import extract_subname_ref, xml_tags 

44 

45TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

46 

47 

48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

53 

54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

55NUMBER_SAMPLES_DATABASE_UPDATE = 120 

56 

57logger = logging.getLogger("uvicorn.error") 

58 

59 

60CHANNEL_PATTERN = r"Channel\s+(\d+)" 

61XLSX_HEADER = [ 

62 "Ticker", 

63 "Component id", 

64 "Signal description", 

65 "Unit", 

66 "Type", 

67 "Frequency (Hz)", 

68 "Sensor transfer function", 

69 "Precision digits", 

70 "Data type", 

71 "Formula", 

72 "Forcible", 

73 "Commandable", 

74 "Broadcastable", 

75] 

76 

77 

78class classproperty: 

79 """ 

80 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

81 Found here: https://stackoverflow.com/a/76301341 

82 """ 

83 

84 def __init__(self, func): 

85 self.fget = func 

86 

87 def __get__(self, _, owner): 

88 return self.fget(owner) 

89 

90 

91def create_update_model(model): 

92 fields = {} 

93 

94 for field_name, field_annotation in model.model_fields.items(): 

95 if field_name != "id": 

96 fields[field_name] = (field_annotation.annotation | None, None) 

97 

98 query_name = model.__name__ + "Update" 

99 return create_model(query_name, **fields) 

100 

101 

102def get_utc_date_from_timestamp(timestamp: float): 

103 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

104 

105 

106def downsample_list(time_vector: list, values: list, max_number_samples: int): 

107 if len(time_vector) < max_number_samples: 

108 return time_vector, values 

109 

110 time_vector_copy = copy.deepcopy(time_vector) 

111 values_copy = copy.deepcopy(values) 

112 

113 none_group_bounds = [] 

114 none_group_index = -1 

115 index = -1 

116 # LTTB doesn't handle None values so remove them 

117 while values_copy.count(None) > 0: 

118 # Store bounds of None value groups so we can insert them back after the downsampling 

119 if (new_index := values_copy.index(None)) != index: 

120 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

121 none_group_index += 1 

122 elif len(none_group_bounds[none_group_index]) < 2: 

123 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

124 else: 

125 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

126 values_copy.pop(new_index) 

127 index = new_index 

128 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

129 

130 try: 

131 values_array = npy.array([time_vector_copy, values_copy]).T 

132 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

133 

134 new_time_vector = interpolated_values[:, 0].tolist() 

135 new_values = interpolated_values[:, 1].tolist() 

136 except ValueError: 

137 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

138 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

139 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

140 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

141 return new_time_vector, new_values_nan_to_none 

142 

143 # insert back None values at the correct timestamps 

144 for none_group in none_group_bounds: 

145 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

146 new_time_vector[start_index:start_index] = none_group 

147 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

148 

149 return new_time_vector, new_values 

150 

151 

152def is_of_type(value, wanted_type): 

153 if wanted_type is float: 

154 return isinstance(value, (int, float)) 

155 return isinstance(value, wanted_type) 

156 

157 

158# Models 

159class TwinPadModel(BaseModel): 

160 @classmethod 

161 def dict_to_object(cls, dict_): 

162 return cls.model_validate(dict_) 

163 

164 def to_dict(self, exclude=None): 

165 dict_ = self.model_dump(exclude=exclude) 

166 return dict_ 

167 

168 

169class GenericMongo(TwinPadModel): 

170 id: str | None = None 

171 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

172 

173 @classmethod 

174 def collection(cls): 

175 return get_collection(systems_database, cls.collection_name, create=True) 

176 

177 @classmethod 

178 def response_from_query(cls, query) -> ListResponse[Self]: 

179 request_filters = query.mongodb_filter() 

180 items = [] 

181 if ":" in query.sort_by: 

182 sort_field, sort_order = query.sort_by.split(":") 

183 sort_order = int(sort_order) 

184 else: 

185 sort_field = query.sort_by 

186 sort_order = 1 

187 collection = get_collection(systems_database, cls.collection_name, create=True) 

188 total = collection.count_documents(request_filters) 

189 

190 pipeline = [] 

191 added_properties = [] 

192 if "$and" in request_filters: 

193 for request_filter in request_filters["$and"]: 

194 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

195 if filtered_property in request_filter: 

196 pipeline.extend(pipeline_steps) 

197 added_properties.append(filtered_property) 

198 else: 

199 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

200 if filtered_property in request_filters: 

201 pipeline.extend(pipeline_steps) 

202 added_properties.append(filtered_property) 

203 pipeline.append({"$match": request_filters}) 

204 if sort_field in cls.custom_pipeline_steps: 

205 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

206 added_properties.append(sort_field) 

207 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}]) 

208 

209 if (query.limit is not None) and (query.limit != 0): 

210 pipeline.append({"$limit": query.limit}) 

211 

212 for filtered_property, step in cls.custom_pipeline_steps.items(): 

213 if filtered_property not in added_properties: 

214 pipeline.extend(step) 

215 

216 cursor = collection.aggregate(pipeline) 

217 

218 for item_dict in cursor: 

219 items.append(cls.mongo_dict_to_object(item_dict)) 

220 

221 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

222 

223 @classmethod 

224 def get_from_id(cls, item_id) -> Self | None: 

225 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

226 

227 @classmethod 

228 def mongo_dict_to_object(cls, mongo_dict): 

229 mongo_dict["id"] = str(mongo_dict["_id"]) 

230 del mongo_dict["_id"] 

231 return cls.dict_to_object(mongo_dict) 

232 

233 @classmethod 

234 def get_by_attribute(cls, attribute_name: str, attribute_value): 

235 """Returns all items that match the attribute with value.""" 

236 pipeline = [] 

237 if attribute_name in cls.custom_pipeline_steps: 

238 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

239 pipeline.append({"$match": {attribute_name: attribute_value}}) 

240 for key, step in cls.custom_pipeline_steps.items(): 

241 if key != attribute_name: 

242 pipeline.extend(step) 

243 items = cls.collection().aggregate(pipeline) 

244 if items is None: 

245 return None 

246 return [cls.mongo_dict_to_object(d) for d in items] 

247 

248 @classmethod 

249 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

250 pipeline = [] 

251 if attribute_name in cls.custom_pipeline_steps: 

252 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

253 pipeline.append({"$match": {attribute_name: attribute_value}}) 

254 pipeline.append({"$limit": 1}) 

255 for key, step in cls.custom_pipeline_steps.items(): 

256 if key != attribute_name: 

257 pipeline.extend(step) 

258 items = cls.collection().aggregate(pipeline).to_list() 

259 if len(items) == 0: 

260 return None 

261 return cls.mongo_dict_to_object(items[0]) 

262 

263 @classmethod 

264 def get_all(cls, sort_by="_id") -> list[Self]: 

265 items = [] 

266 pipeline = [] 

267 if sort_by in cls.custom_pipeline_steps: 

268 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

269 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

270 for key, step in cls.custom_pipeline_steps.items(): 

271 if key != sort_by: 

272 pipeline.extend(step) 

273 for dict_ in cls.collection().aggregate(pipeline): 

274 items.append(cls.mongo_dict_to_object(dict_)) 

275 return items 

276 

277 @classmethod 

278 def get_number_documents(cls): 

279 collection = get_collection(systems_database, cls.collection_name) 

280 if collection is None: 

281 return 0 

282 return collection.count_documents({}) 

283 

284 def insert(self): 

285 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

286 self.id = str(insert_result.inserted_id) 

287 return self.id 

288 

289 def update(self, update_dict): 

290 for key, value in update_dict.items(): 

291 setattr(self, key, value) 

292 self.collection().find_one_and_update( 

293 {"_id": ObjectId(self.id)}, 

294 {"$set": update_dict}, 

295 return_document=ReturnDocument.AFTER, 

296 ) 

297 

298 return self 

299 

300 def delete(self): 

301 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

302 return result.deleted_count > 0 

303 

304 

305class User(GenericMongo): 

306 collection_name: ClassVar[str] = "users" 

307 

308 firstname: str 

309 lastname: str 

310 email: str 

311 password: str 

312 is_active: bool | None = False 

313 is_admin: bool | None = False 

314 is_connected: bool | None = False 

315 company_id: str | None = None 

316 

317 def to_dict(self, exclude=None): 

318 if exclude is None: 

319 exclude = {"password"} 

320 return GenericMongo.to_dict(self, exclude=exclude) 

321 

322 @classmethod 

323 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

324 users = cls.get_all() 

325 if not users: 

326 is_admin = True 

327 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

328 user_collection = get_collection(systems_database, "users", create=True) 

329 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

330 if new_user is None: 

331 return None 

332 return {"user_id": str(new_user.inserted_id)} 

333 

334 @classmethod 

335 def update_info(cls, user: "UserUpdate", user_id: str): 

336 updated_user = cls.collection().find_one_and_update( 

337 {"_id": ObjectId(user_id)}, 

338 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

339 return_document=ReturnDocument.AFTER, 

340 ) 

341 updated_user["id"] = str(updated_user["_id"]) 

342 del (updated_user["_id"], updated_user["is_connected"]) 

343 return cls(**updated_user) 

344 

345 

346UserUpdate = create_update_model(User) 

347 

348 

349class Mode(TwinPadModel): 

350 mode_id: int 

351 name: str 

352 frequency_multiplier: float 

353 min_frequency: float 

354 

355 

356class DeviceUpdate(TwinPadModel): 

357 mode_id: int 

358 

359 

360class Device(GenericMongo): 

361 collection_name: ClassVar[str] = "devices" 

362 

363 device_id: str 

364 config_id: str | None = None 

365 config_name: str | None = None 

366 name: str 

367 description: str = "" 

368 modes: list[Mode] 

369 current_mode_id: int | None = None 

370 last_ping: float | None = None 

371 petri_network: Any 

372 pid: Any 

373 load: float | None = None 

374 tokens: list[int] = Field(default_factory=list) 

375 status: str 

376 

377 async def change_mode(self, update_dict, current_user: User): 

378 has_error = False 

379 

380 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

381 has_error = True 

382 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

383 elif self.current_mode_id is not None: 

384 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

385 else: 

386 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

387 command = Command( 

388 sent_at=time.time(), 

389 command_type="Mode change", 

390 description=description, 

391 user_id=current_user.id, 

392 ) 

393 

394 if has_error: 

395 command.response_time = 0 

396 command.succeeded = False 

397 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

398 else: 

399 response = await send_mode_change(self.device_id, update_dict.mode_id) 

400 command.receive_response(response) 

401 

402 Command.create(command) 

403 return response 

404 

405 @classmethod 

406 def get_from_device_or_config_id(cls, device_or_config_id: str): 

407 items = ( 

408 cls.collection() 

409 .aggregate( 

410 [ 

411 {"$match": {"$or": [{"device_id": device_or_config_id}, {"config_id": device_or_config_id}]}}, 

412 {"$limit": 1}, 

413 ] 

414 ) 

415 .to_list() 

416 ) 

417 if len(items) == 0: 

418 return None 

419 return cls.mongo_dict_to_object(items[0]) 

420 

421 @classmethod 

422 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

423 devices_by_id = {} 

424 for signal_id in signal_ids: 

425 device_or_config_id = signal_id.split(".")[0] 

426 if device_or_config_id not in devices_by_id: 

427 devices_by_id[device_or_config_id] = cls.get_from_device_or_config_id(device_or_config_id) 

428 return devices_by_id 

429 

430 

431class DeviceSetup(GenericMongo): 

432 collection_name: ClassVar[str] = "device_setups" 

433 

434 device_ids: list[str] 

435 active: bool = False 

436 variable_mapping: dict[str, str] 

437 

438 

439DeviceSetupUpdate = create_update_model(DeviceSetup) 

440 

441 

442class DeviceState(GenericMongo): 

443 collection_name: ClassVar[str] = "devices_states" 

444 

445 timestamp: float 

446 mode: str | None = None 

447 load: float | None = None 

448 tokens: list[int] = Field(default_factory=list) 

449 config_id: str | None = None 

450 modified_properties: list[str] = Field(default_factory=list) 

451 

452 @classmethod 

453 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

454 req_filter = query.mongodb_filter() 

455 items = [] 

456 if ":" in query.sort_by: 

457 sort_field, sort_order = query.sort_by.split(":") 

458 sort_order = int(sort_order) 

459 else: 

460 sort_field = query.sort_by 

461 sort_order = 1 

462 collection = get_collection(devices_states_database, device_id) 

463 if collection is None: 

464 total = 0 

465 cursor = [] 

466 else: 

467 total = collection.count_documents(req_filter) 

468 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

469 if (query.limit is not None) and (query.limit != 0): 

470 cursor = cursor.limit(query.limit) 

471 for item_dict in cursor: 

472 items.append( 

473 cls( 

474 timestamp=item_dict.get("precise_timestamp"), 

475 mode=item_dict.get("mode"), 

476 load=item_dict.get("load"), 

477 tokens=item_dict.get("tokens", Field(default_factory=list)), 

478 config_id=item_dict.get("config_id"), 

479 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

480 ) 

481 ) 

482 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

483 

484 

485class SignalSample(TwinPadModel): 

486 signal_id: str 

487 timestamp: float 

488 value: float | int | str | bool | None 

489 forced_value: float | int | str | bool | None = None 

490 

491 @classmethod 

492 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

493 collection = get_signal_collection(signal_id) 

494 real_signal_id = signal_id 

495 

496 if collection is None: 

497 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

498 if device is not None: 

499 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

500 collection = get_signal_collection(real_signal_id) 

501 

502 if collection is None: 

503 return None 

504 

505 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

506 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

507 bucket = get_signal_collection(f"system.buckets.{real_signal_id}") 

508 first_bucket = None 

509 if bucket is not None: 

510 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

511 if first_bucket is not None: 

512 sample_data = collection.find_one( 

513 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

514 ) 

515 else: 

516 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

517 

518 if sample_data is None: 

519 return None 

520 

521 timestamp = sample_data["precise_timestamp"] 

522 

523 return cls( 

524 signal_id=real_signal_id, 

525 timestamp=timestamp, 

526 value=sample_data.get("value", None), 

527 forced_value=sample_data.get("forced_value", None), 

528 ) 

529 

530 @classmethod 

531 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

532 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

533 

534 @classmethod 

535 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

536 collection = get_signal_collection(signal_id) 

537 real_signal_id = signal_id 

538 

539 if collection is None: 

540 if device is None: 

541 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

542 if device is not None: 

543 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

544 collection = get_signal_collection(real_signal_id) 

545 

546 if collection is None: 

547 return None 

548 

549 # Same workaround as above function, very effective to narrow down big sets of data 

550 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

551 last_bucket = None 

552 if bucket is not None: 

553 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

554 if last_bucket is not None: 

555 sample_data = collection.find_one( 

556 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

557 sort={"precise_timestamp": -1}, 

558 ) 

559 else: 

560 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

561 

562 if sample_data is None: 

563 return None 

564 

565 timestamp = sample_data["precise_timestamp"] 

566 

567 if device is None: 

568 device = Device.get_from_device_or_config_id(real_signal_id.split(".")[0]) 

569 if device is not None and device.last_ping is not None: 

570 if timestamp is None: 

571 timestamp = device.last_ping 

572 else: 

573 timestamp = max(timestamp, device.last_ping) 

574 return cls( 

575 signal_id=real_signal_id, 

576 timestamp=timestamp, 

577 value=sample_data.get("value", None), 

578 forced_value=sample_data.get("forced_value", None), 

579 ) 

580 

581 @classmethod 

582 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

583 collection = get_signal_collection(signal_id) 

584 real_signal_id = signal_id 

585 

586 if collection is None: 

587 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

588 if device is not None: 

589 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

590 collection = get_signal_collection(real_signal_id) 

591 

592 if collection is None: 

593 return None 

594 

595 sample_data = collection.find_one( 

596 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1} 

597 ) 

598 if sample_data is None: 

599 return None 

600 

601 return cls( 

602 signal_id=real_signal_id, 

603 timestamp=sample_data.get("precise_timestamp"), 

604 value=sample_data.get("value"), 

605 forced_value=sample_data.get("forced_value"), 

606 ) 

607 

608 @classmethod 

609 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

610 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

611 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

612 

613 @classmethod 

614 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

615 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

616 

617 

618class SignalData(TwinPadModel): 

619 signal_id: str 

620 forcible: bool = True 

621 time_vector: list[float] 

622 values: list[float | int | str | None] 

623 forced_values: list[float | int | str | None] 

624 

625 data_start: float | None = None 

626 data_end: float | None = None 

627 

628 number_samples: int = 0 

629 number_samples_db: int = 0 

630 

631 db_query_time: float = 0.0 

632 init_time: float = 0.0 

633 data_processing_time: float = 0.0 

634 

635 @classmethod 

636 def get_from_signal_id( 

637 cls, 

638 signal_id: str, 

639 min_timestamp: float = None, 

640 max_timestamp: float = None, 

641 window_min_timestamp: float = None, 

642 window_max_timestamp: float = None, 

643 interpolate_bounds: bool = True, 

644 max_documents: int = None, 

645 ) -> Self: 

646 

647 now = time.time() 

648 

649 req_signal = {} 

650 if min_timestamp is not None: 

651 req_signal.setdefault("timestamp", {}) 

652 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

653 if max_timestamp is not None: 

654 req_signal.setdefault("timestamp", {}) 

655 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

656 

657 collection = get_signal_collection(signal_id) 

658 

659 real_signal_id = signal_id 

660 

661 if collection is None: 

662 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

663 if device is not None: 

664 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

665 collection = get_signal_collection(real_signal_id) 

666 

667 if collection is None: 

668 return cls( 

669 signal_id=real_signal_id, 

670 time_vector=[], 

671 values=[], 

672 forced_values=[], 

673 number_samples=0, 

674 number_samples_db=0, 

675 ) 

676 

677 db_req_start = time.time() 

678 

679 sort_step = {"$sort": {"precise_timestamp": 1}} 

680 number_results = collection.count_documents(req_signal) 

681 

682 pipeline = [] 

683 if req_signal: 

684 pipeline.append({"$match": req_signal}) # Filter data if needed 

685 

686 pipeline.extend( 

687 [ 

688 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

689 sort_step, 

690 ] 

691 ) 

692 

693 if max_documents is not None and max_documents < number_results: 

694 unsampling_ratio = math.ceil(number_results / max_documents) 

695 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

696 pipeline.extend( 

697 [ 

698 { 

699 "$setWindowFields": { 

700 "sortBy": {"precise_timestamp": 1}, 

701 "output": {"index": {"$documentNumber": {}}}, 

702 } 

703 }, 

704 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

705 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

706 {"$replaceRoot": {"newRoot": "$doc"}}, 

707 {"$unset": ["index", "group_id"]}, 

708 {"$sort": {"precise_timestamp": 1}}, 

709 ] 

710 ) 

711 

712 # logger.info(f"pipeline: %s", str(pipeline)) 

713 cursor = collection.aggregate(pipeline) 

714 db_req_time = time.time() - db_req_start 

715 

716 init_time = time.time() 

717 

718 results = cursor.to_list() 

719 time_vector = [] 

720 values = [] 

721 forced_values = [] 

722 for s in results: 

723 time_vector.append(s["precise_timestamp"]) 

724 values.append(s.get("value", None)) 

725 forced_values.append(s.get("forced_value", None)) 

726 

727 signal = Signal.get_from_signal_id(real_signal_id) 

728 class_ = signal.signal_data_class 

729 

730 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

731 time_vector, values, forced_values = cls.interpolate_bounds( 

732 class_, 

733 collection, 

734 real_signal_id, 

735 time_vector, 

736 values, 

737 forced_values, 

738 window_min_timestamp, 

739 window_max_timestamp, 

740 ) 

741 

742 if values: 

743 # TODO: check below. a bit strange 

744 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

745 # Adding last value as it should be repeated 

746 time_vector.append(now) 

747 values.append(values[-1]) 

748 forced_values.append(forced_values[-1]) 

749 

750 init_time = time.time() - init_time 

751 

752 # See line 292 for explanation 

753 bucket = get_signal_collection(f"system.buckets.{real_signal_id}") 

754 first_bucket = None 

755 if bucket is not None: 

756 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

757 if first_bucket is not None: 

758 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

759 else: 

760 data_start = None 

761 

762 last_bucket = None 

763 if bucket is not None: 

764 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

765 if last_bucket is not None: 

766 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

767 else: 

768 data_end = None 

769 

770 return class_( 

771 signal_id=real_signal_id, 

772 forcible=signal.forcible, 

773 time_vector=time_vector, 

774 values=values, 

775 forced_values=forced_values, 

776 data_start=data_start, 

777 data_end=data_end, 

778 number_samples=len(values), 

779 number_samples_db=number_results, 

780 db_query_time=db_req_time, 

781 init_time=init_time, 

782 ) 

783 

784 @staticmethod 

785 def interpolate_bounds( 

786 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

787 ): 

788 sample_right = None 

789 # Fetching right side value & interpolation 

790 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

791 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

792 sample_right = collection.find_one( 

793 { 

794 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

795 "value": {"$exists": True}, 

796 }, 

797 sort={"precise_timestamp": -1}, 

798 ) 

799 if sample_right: 

800 if time_vector: 

801 right_sd = class_( 

802 signal_id=signal_id, 

803 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

804 values=[values[-1], sample_right.get("value", None)], 

805 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

806 ) 

807 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

808 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

809 else: 

810 max_ts_value = sample_right.get("value", None) 

811 max_ts_forced_value = sample_right.get("forced_value", None) 

812 time_vector.append(window_max_timestamp) 

813 values.append(max_ts_value) 

814 forced_values.append(max_ts_forced_value) 

815 

816 # Fetching left side value & interpolation 

817 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

818 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

819 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

820 sample_left = sample_right 

821 sample_left = collection.find_one( 

822 { 

823 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

824 "value": {"$exists": True}, 

825 }, 

826 sort={"precise_timestamp": -1}, 

827 ) 

828 

829 if sample_left: 

830 if time_vector: 

831 left_sd = class_( 

832 signal_id=signal_id, 

833 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

834 values=[sample_left["value"], values[0]], 

835 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

836 ) 

837 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

838 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

839 else: 

840 min_ts_value = sample_left.get("value", None) 

841 min_ts_forced_value = sample_left.get("forced_value", None) 

842 time_vector.insert(0, window_min_timestamp) 

843 values.insert(0, min_ts_value) 

844 forced_values.insert(0, min_ts_forced_value) 

845 

846 return time_vector, values, forced_values 

847 

848 def interpolate_values(self, new_time_vector: list[float]): 

849 return self.interpolate(new_time_vector, self.values) 

850 

851 def interpolate_forced_values(self, new_time_vector: list[float]): 

852 return self.interpolate(new_time_vector, self.forced_values) 

853 

854 def uniform_desampling(self, number_samples_max: int) -> Self: 

855 data_processing_time = time.time() 

856 if number_samples_max and self.number_samples > number_samples_max: 

857 new_time_vector = npy.linspace( 

858 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

859 ).tolist() 

860 values = self.interpolate_values(new_time_vector) 

861 forced_values = self.interpolate_forced_values(new_time_vector) 

862 time_vector = new_time_vector 

863 number_samples = len(time_vector) 

864 else: 

865 time_vector = self.time_vector 

866 number_samples = len(self.values) 

867 values = self.values[:] 

868 forced_values = self.forced_values[:] 

869 data_processing_time = time.time() - data_processing_time 

870 

871 return self.__class__( 

872 signal_id=self.signal_id, 

873 time_vector=time_vector, 

874 values=values, 

875 forced_values=forced_values, 

876 number_samples=number_samples, 

877 number_samples_db=self.number_samples, 

878 data_start=self.data_start, 

879 data_end=self.data_end, 

880 db_query_time=self.db_query_time, 

881 init_time=self.init_time, 

882 data_processing_time=self.data_processing_time + data_processing_time, 

883 ) 

884 

885 def interest_window_desampling( 

886 self, 

887 window_max_number_samples: int, 

888 outside_max_number_samples: int, 

889 window_min_timestamp: float | None = None, 

890 window_max_timestamp: float | None = None, 

891 ) -> Self: 

892 """Performs a sampling in a window of interest and outside.""" 

893 

894 if not self.time_vector: 

895 return self 

896 

897 if window_min_timestamp is None: 

898 window_min_timestamp = self.time_vector[0] 

899 if window_max_timestamp is None: 

900 window_max_timestamp = self.time_vector[-1] 

901 

902 data_processing_time = time.time() 

903 

904 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

905 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

906 

907 time_vector_before = self.time_vector[:index_window_start] 

908 time_vector_window = self.time_vector[index_window_start:index_window_end] 

909 time_vector_after = self.time_vector[index_window_end:] 

910 

911 # Resampling window 

912 if time_vector_window: 

913 # Ensurring window bounds 

914 if time_vector_window[0] != window_min_timestamp: 

915 time_vector_window.insert(0, window_min_timestamp) 

916 if time_vector_window[-1] != window_max_timestamp: 

917 time_vector_window.append(window_max_timestamp) 

918 else: 

919 time_vector_window = [window_min_timestamp, window_max_timestamp] 

920 

921 if len(time_vector_window) > window_max_number_samples: 

922 # Resampling 

923 new_window_time_vector = npy.linspace( 

924 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

925 ).tolist() 

926 time_vector_window = new_window_time_vector 

927 

928 # Resampling outside 

929 number_samples_before = len(time_vector_before) 

930 number_samples_after = len(time_vector_after) 

931 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

932 new_number_samples_before = min( 

933 number_samples_before, 

934 math.ceil( 

935 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

936 ), 

937 ) 

938 new_number_samples_after = min( 

939 number_samples_after, 

940 math.ceil( 

941 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

942 ), 

943 ) 

944 # Adjusting numbers as math.ceil can do +1 on sum 

945 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

946 if new_number_samples_before > new_number_samples_after: 

947 new_number_samples_before -= 1 

948 else: 

949 new_number_samples_after -= 1 

950 

951 if new_number_samples_before > 0: 

952 new_time_vector_before = npy.linspace( 

953 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

954 ).tolist() 

955 time_vector_before = new_time_vector_before 

956 

957 if new_number_samples_after > 0: 

958 new_time_vector_after = npy.linspace( 

959 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

960 ).tolist()[::-1] 

961 time_vector_after = new_time_vector_after 

962 

963 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

964 values = self.interpolate_values(new_time_vector) 

965 forced_values = self.interpolate_forced_values(new_time_vector) 

966 number_samples = len(values) 

967 

968 data_processing_time = time.time() - data_processing_time 

969 

970 return self.__class__( 

971 signal_id=self.signal_id, 

972 forcible=self.forcible, 

973 time_vector=new_time_vector, 

974 values=values, 

975 forced_values=forced_values, 

976 number_samples=number_samples, 

977 number_samples_db=self.number_samples, 

978 data_start=self.data_start, 

979 data_end=self.data_end, 

980 db_query_time=self.db_query_time, 

981 init_time=self.init_time, 

982 data_processing_time=self.data_processing_time + data_processing_time, 

983 ) 

984 

985 def csv_export(self): 

986 output = io.StringIO() 

987 writer = csv.writer(output) 

988 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

989 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

990 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

991 return output.getvalue().encode("utf-8") 

992 

993 def prestoplot_export(self): 

994 clean_signal_id = self.signal_id.replace(".", "_") 

995 if clean_signal_id[0].isnumeric(): 

996 clean_signal_id = "_" + clean_signal_id 

997 

998 output = io.StringIO() 

999 output.write("# Encoding:\tUTF-8\n") 

1000 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

1001 output.write("ISO8601\tnone\tnone\n") 

1002 output.write(f"# Description :\t{clean_signal_id}\n") 

1003 

1004 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

1005 output.write( 

1006 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

1007 ) 

1008 return output.getvalue().encode("utf-8") 

1009 

1010 

1011class NumericSignalData(SignalData): 

1012 data_type: str = "float" 

1013 values: list[float | int | None] 

1014 forced_values: list[float | int | None] 

1015 

1016 def interpolate(self, new_time_vector: list[float], items): 

1017 items = [npy.nan if s is None else s for s in items] 

1018 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

1019 

1020 def uniform_desampling(self, number_samples_max: int) -> Self: 

1021 data_processing_time = time.time() 

1022 if number_samples_max and self.number_samples > number_samples_max: 

1023 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

1024 forced_values = self.interpolate_forced_values(time_vector) 

1025 number_samples = len(time_vector) 

1026 else: 

1027 time_vector = self.time_vector 

1028 number_samples = len(self.values) 

1029 values = self.values[:] 

1030 forced_values = self.forced_values[:] 

1031 data_processing_time = time.time() - data_processing_time 

1032 

1033 return self.__class__( 

1034 signal_id=self.signal_id, 

1035 time_vector=time_vector, 

1036 values=values, 

1037 forced_values=forced_values, 

1038 number_samples=number_samples, 

1039 number_samples_db=self.number_samples, 

1040 data_start=self.data_start, 

1041 data_end=self.data_end, 

1042 db_query_time=self.db_query_time, 

1043 init_time=self.init_time, 

1044 data_processing_time=self.data_processing_time + data_processing_time, 

1045 ) 

1046 

1047 def interest_window_desampling( 

1048 self, 

1049 window_max_number_samples: int, 

1050 outside_max_number_samples: int, 

1051 window_min_timestamp: float | None = None, 

1052 window_max_timestamp: float | None = None, 

1053 ) -> Self: 

1054 """Performs a sampling in a window of interest and outside.""" 

1055 

1056 if not self.time_vector: 

1057 return self 

1058 

1059 if window_min_timestamp is None: 

1060 window_min_timestamp = self.time_vector[0] 

1061 if window_max_timestamp is None: 

1062 window_max_timestamp = self.time_vector[-1] 

1063 

1064 data_processing_time = time.time() 

1065 

1066 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1067 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1068 

1069 time_vector_before = self.time_vector[:index_window_start] 

1070 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1071 time_vector_after = self.time_vector[index_window_end:] 

1072 

1073 values_before = self.values[:index_window_start] 

1074 values_window = self.values[index_window_start:index_window_end] 

1075 values_after = self.values[index_window_end:] 

1076 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1077 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1078 

1079 # Resampling window 

1080 if time_vector_window: 

1081 # Ensurring window bounds 

1082 if time_vector_window[0] != window_min_timestamp: 

1083 time_vector_window.insert(0, window_min_timestamp) 

1084 values_window.insert(0, window_min_value) 

1085 if time_vector_window[-1] != window_max_timestamp: 

1086 time_vector_window.append(window_max_timestamp) 

1087 values_window.append(window_max_value) 

1088 else: 

1089 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1090 values_window = [window_min_value, window_max_value] 

1091 

1092 if len(time_vector_window) > window_max_number_samples: 

1093 # Resampling 

1094 time_vector_window, values_window = downsample_list( 

1095 time_vector_window, values_window, window_max_number_samples 

1096 ) 

1097 

1098 # Resampling outside 

1099 number_samples_before = len(time_vector_before) 

1100 number_samples_after = len(time_vector_after) 

1101 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1102 new_number_samples_before = min( 

1103 number_samples_before, 

1104 math.ceil( 

1105 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1106 ), 

1107 ) 

1108 new_number_samples_after = min( 

1109 number_samples_after, 

1110 math.ceil( 

1111 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1112 ), 

1113 ) 

1114 # Adjusting numbers as math.ceil can do +1 on sum 

1115 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1116 if new_number_samples_before > new_number_samples_after: 

1117 new_number_samples_before -= 1 

1118 else: 

1119 new_number_samples_after -= 1 

1120 

1121 if new_number_samples_before > 0: 

1122 time_vector_before, values_before = downsample_list( 

1123 time_vector_before, values_before, new_number_samples_before 

1124 ) 

1125 

1126 if new_number_samples_after > 0: 

1127 time_vector_after, values_after = downsample_list( 

1128 time_vector_after, values_after, new_number_samples_after 

1129 ) 

1130 

1131 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1132 values = values_before + values_window + values_after 

1133 forced_values = self.interpolate_forced_values(new_time_vector) 

1134 number_samples = len(values) 

1135 

1136 data_processing_time = time.time() - data_processing_time 

1137 

1138 return self.__class__( 

1139 signal_id=self.signal_id, 

1140 time_vector=new_time_vector, 

1141 values=values, 

1142 forced_values=forced_values, 

1143 number_samples=number_samples, 

1144 number_samples_db=self.number_samples, 

1145 data_start=self.data_start, 

1146 data_end=self.data_end, 

1147 db_query_time=self.db_query_time, 

1148 init_time=self.init_time, 

1149 data_processing_time=self.data_processing_time + data_processing_time, 

1150 ) 

1151 

1152 

1153class StringSignalData(SignalData): 

1154 data_type: str = "str" 

1155 values: list[str | None] 

1156 forced_values: list[str | None] 

1157 

1158 def interpolate(self, new_time_vector: list[float], items): 

1159 # Find the indices of the values in xp that are just smaller or equal to x 

1160 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1161 indices = npy.clip(indices, 0, len(items) - 1) 

1162 # Return the corresponding left string values from fp 

1163 return [items[i] for i in indices] 

1164 

1165 

1166class SignalsData(TwinPadModel): 

1167 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1168 data_processing_time: float 

1169 data_start: float | None 

1170 data_end: float | None 

1171 

1172 @classmethod 

1173 def get_from_signal_ids( 

1174 cls, 

1175 signal_ids: list[str], 

1176 min_timestamp: float = None, 

1177 max_timestamp: float = None, 

1178 window_min_timestamp: float = None, 

1179 window_max_timestamp: float = None, 

1180 interpolate_bounds: bool = True, 

1181 max_documents: int = None, 

1182 ) -> Self: 

1183 signals_data = [] 

1184 data_start = None 

1185 data_end = None 

1186 if max_timestamp is None: 

1187 max_timestamp = time.time() 

1188 data_processing_time = 0.0 

1189 for signal_id in signal_ids: 

1190 signal_data = SignalData.get_from_signal_id( 

1191 signal_id=signal_id, 

1192 min_timestamp=min_timestamp, 

1193 max_timestamp=max_timestamp, 

1194 window_min_timestamp=window_min_timestamp, 

1195 window_max_timestamp=window_max_timestamp, 

1196 interpolate_bounds=interpolate_bounds, 

1197 max_documents=max_documents, 

1198 ) 

1199 data_processing_time += signal_data.data_processing_time 

1200 signals_data.append(signal_data) 

1201 if signal_data.data_start is not None: 

1202 if data_start is None: 

1203 data_start = signal_data.data_start 

1204 else: 

1205 data_start = min(signal_data.data_start, data_start) 

1206 if signal_data.data_end is not None: 

1207 if data_end is None: 

1208 data_end = signal_data.data_end 

1209 else: 

1210 data_end = max(signal_data.data_end, data_end) 

1211 

1212 return cls( 

1213 signals_data=signals_data, 

1214 data_processing_time=data_processing_time, 

1215 data_start=data_start, 

1216 data_end=data_end, 

1217 ) 

1218 

1219 def uniform_desampling(self, number_samples_max: int) -> Self: 

1220 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1221 return SignalsData( 

1222 signals_data=signals_data, 

1223 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1224 data_start=self.data_start, 

1225 data_end=self.data_end, 

1226 ) 

1227 

1228 def interest_window_desampling( 

1229 self, 

1230 window_max_number_samples: int, 

1231 outside_max_number_samples: int, 

1232 window_min_timestamp: float = None, 

1233 window_max_timestamp: float = None, 

1234 ) -> Self: 

1235 signals_data = [ 

1236 s.interest_window_desampling( 

1237 window_max_number_samples=window_max_number_samples, 

1238 outside_max_number_samples=outside_max_number_samples, 

1239 window_min_timestamp=window_min_timestamp, 

1240 window_max_timestamp=window_max_timestamp, 

1241 ) 

1242 for s in self.signals_data 

1243 ] 

1244 

1245 return SignalsData( 

1246 signals_data=signals_data, 

1247 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1248 data_start=self.data_start, 

1249 data_end=self.data_end, 

1250 ) 

1251 

1252 def zip_export(self, file_format: str = "csv"): 

1253 # return self.signals_data[0].csv_export() 

1254 zip_buffer = io.BytesIO() 

1255 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1256 for signal_data in self.signals_data: 

1257 if file_format == "csv": 

1258 export_io = signal_data.csv_export() 

1259 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1260 elif file_format == "prestoplot": 

1261 export_io = signal_data.prestoplot_export() 

1262 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1263 else: 

1264 raise ValueError(f"Format not found. Got: {file_format}") 

1265 zip_bytes = zip_buffer.getvalue() 

1266 # zip_bytes.seek(0) 

1267 return zip_bytes 

1268 

1269 def hdf5_export(self): 

1270 hdf5_buffer = io.BytesIO() 

1271 custom_type_float = npy.dtype( 

1272 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1273 ) 

1274 custom_type_string = npy.dtype( 

1275 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1276 ) 

1277 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1278 for signal_data in self.signals_data: 

1279 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1280 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1281 if signal_data.data_type == "str": 

1282 export_data = npy.array( 

1283 list( 

1284 zip( 

1285 date_vector, 

1286 signal_data.time_vector, 

1287 signal_data.values, 

1288 signal_data.forced_values, 

1289 ) 

1290 ), 

1291 dtype=custom_type_string, 

1292 ) 

1293 else: 

1294 export_data = npy.array( 

1295 list( 

1296 zip( 

1297 date_vector, 

1298 signal_data.time_vector, 

1299 signal_data.values, 

1300 signal_data.forced_values, 

1301 ) 

1302 ), 

1303 dtype=custom_type_float, 

1304 ) 

1305 signal_group["data"] = export_data 

1306 return hdf5_buffer.getvalue() 

1307 

1308 

1309class SignalStatus(TwinPadModel): 

1310 status: str 

1311 reason: str 

1312 delay: float | None 

1313 

1314 

1315class DigitizationFunction(TwinPadModel): 

1316 bits: int | None = None 

1317 min_value: float 

1318 max_value: float 

1319 min_raw_value: float 

1320 max_raw_value: float 

1321 

1322 @classmethod 

1323 def from_bits(cls, bits: int, min_value: float, max_value: float): 

1324 return cls(bits=bits, min_raw_value=0, max_raw_value=2**bits - 1, min_value=min_value, max_value=max_value) 

1325 

1326 @classmethod 

1327 def from_values(cls, min_raw_value: float, max_raw_value: float, min_value: float, max_value: float): 

1328 return cls( 

1329 bits=None, 

1330 min_raw_value=min_raw_value, 

1331 max_raw_value=max_raw_value, 

1332 min_value=min_value, 

1333 max_value=max_value, 

1334 ) 

1335 

1336 def to_transfer_function(self): 

1337 return TransferFunction(intervals=[(self.min_raw_value, self.min_value), (self.max_raw_value, self.max_value)]) 

1338 

1339 

1340class SignalUpdate(TwinPadModel): 

1341 value: float | str | bool | int | None = None 

1342 forced_value: float | str | bool | int | None = None 

1343 timestamp: int | None = None 

1344 

1345 

1346class SignalType(str, Enum): 

1347 command = "command" 

1348 sensor = "sensor" 

1349 external_sensor = "external_sensor" 

1350 interface = "interface" 

1351 

1352 

1353SIGNALDATA_TYPES = { 

1354 "int": NumericSignalData, 

1355 "float": NumericSignalData, 

1356 "str": StringSignalData, 

1357 "bool": NumericSignalData, 

1358 "epoch": NumericSignalData, 

1359} 

1360 

1361 

1362class LoopAddress(TwinPadModel): 

1363 card_number: int 

1364 channel: int 

1365 

1366 

1367class Address(LoopAddress): 

1368 loop_number: int 

1369 

1370 

1371class TransferFunction(TwinPadModel): 

1372 """ 

1373 A piecewise monotone linear function. 

1374 """ 

1375 

1376 intervals: list[tuple[float, float]] 

1377 

1378 def evaluate(self, x): 

1379 for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]): 

1380 if x1 <= x and x <= x2: 

1381 return (y2 - y1) / (x2 - x1) * (x - x1) + y1 

1382 raise ValueError("Out of bounds") 

1383 

1384 def reverse(self, y): 

1385 for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]): 

1386 if min(y1, y2) <= y <= max(y1, y2): 

1387 return (x2 - x1) / (y2 - y1) * (y - y1) + x1 

1388 raise ValueError(f"Out of bounds: {y} is not in {self.intervals[0][1]}, {self.intervals[0][1]}") 

1389 

1390 def compose(self, other_function): 

1391 if other_function is None: 

1392 return self 

1393 # for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]): 

1394 # other_function.reverse() 

1395 # Reversing other function x 

1396 new_x = {self.reverse(x) for (x, _) in other_function.intervals} 

1397 new_x.union(x for (x, _) in self.intervals) 

1398 new_intervals = [(x, other_function.evaluate(self.evaluate(x))) for x in sorted(new_x)] 

1399 return TransferFunction(new_intervals) 

1400 

1401 def inverse(self): 

1402 """ 

1403 Calculate the inverse function of this transfer function. 

1404 

1405 The inverse of a piecewise monotone linear function can be calculated by inverting each interval. 

1406 This means swapping x and y, then solving for y. 

1407 

1408 Returns: 

1409 TransferFunction: The inverse function of this transfer function. 

1410 """ 

1411 

1412 return TransferFunction([(x, y) for (y, x) in self.intervals]) 

1413 

1414 

1415class Signal(GenericMongo): 

1416 collection_name: ClassVar[str] = "signals" 

1417 

1418 signal_id: str 

1419 frequency: float 

1420 unit: str | None 

1421 description: str 

1422 type: SignalType 

1423 address: Address | LoopAddress | None = None 

1424 data_type: str 

1425 transfer_function: TransferFunction | None = None 

1426 precision_digits: int | None 

1427 digitization_function: DigitizationFunction | None = None 

1428 forcible: bool 

1429 

1430 @property 

1431 def device(self) -> Device: 

1432 device_or_config_id = self.signal_id.split(".")[0] 

1433 return Device.get_from_device_or_config_id(device_or_config_id) 

1434 

1435 @cached_property 

1436 def ticker(self) -> str: 

1437 return self.signal_id.split(".")[-1] 

1438 

1439 @cached_property 

1440 def signal_data_class(self): 

1441 if self.data_type in SIGNALDATA_TYPES: 

1442 return SIGNALDATA_TYPES[self.data_type] 

1443 if self.data_type.startswith("enum"): 

1444 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1445 raise ValueError(f"Unhandled python type: {self.data_type}") 

1446 

1447 @cached_property 

1448 def python_type(self): 

1449 if self.data_type in TYPES: 

1450 return TYPES[self.data_type] 

1451 if self.data_type.startswith("enum"): 

1452 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1453 return Literal[*choices] 

1454 raise ValueError(f"Unhandled python type: {self.data_type}") 

1455 

1456 @computed_field 

1457 @property 

1458 def status(self) -> SignalStatus: 

1459 now = time.time() 

1460 status = "up" 

1461 reason = "" 

1462 

1463 # See line 285 for explanation 

1464 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1465 last_bucket = None 

1466 if bucket is not None: 

1467 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1468 if last_bucket is None: 

1469 status = "no data" 

1470 reason = "signal does not exist" 

1471 return SignalStatus(status=status, reason=reason, delay=None) 

1472 

1473 try: 

1474 last_date = last_bucket["control"]["max"]["timestamp"] 

1475 last_date = last_date.replace(tzinfo=pytz.UTC) 

1476 last_value_ts = last_date.timestamp() 

1477 except IndexError: 

1478 last_value_ts = None 

1479 

1480 if last_value_ts is None: 

1481 delay = None 

1482 reason = "No data from signal" 

1483 else: 

1484 # Since device is a computed property, only compute it once 

1485 device = self.device 

1486 if device is not None and device.last_ping is not None: 

1487 last_value_ts = max(last_value_ts, device.last_ping) 

1488 delay = now - last_value_ts 

1489 if delay > DEVICE_TIMEOUT: 

1490 status = "down" 

1491 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1492 return SignalStatus(status=status, reason=reason, delay=delay) 

1493 

1494 async def send_command(self, device_id: str, update_dict: SignalUpdate, current_user: User) -> dict: 

1495 command = Command( 

1496 sent_at=time.time(), 

1497 command_type="Signal command", 

1498 user_id=current_user.id, 

1499 ) 

1500 

1501 has_input_error = False 

1502 error_message = "" 

1503 

1504 if self.data_type.startswith("enum"): 

1505 enum_options = get_args(self.python_type) 

1506 

1507 if update_dict.value is not None and update_dict.value not in enum_options: 

1508 has_input_error = True 

1509 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1510 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1511 has_input_error = True 

1512 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1513 else: 

1514 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1515 has_input_error = True 

1516 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1517 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1518 has_input_error = True 

1519 error_message += ( 

1520 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1521 ) 

1522 

1523 if has_input_error: 

1524 command.response_time = 0 

1525 command.succeeded = False 

1526 command.description = f"Tried to modify signal {self.signal_id}" 

1527 response = {"error": True, "status_code": 400, "message": error_message} 

1528 else: 

1529 response = await send_signal_value(device_id, self.signal_id, update_dict) 

1530 command.receive_response(response) 

1531 

1532 Command.create(command) 

1533 return response 

1534 

1535 @classmethod 

1536 def get_from_signal_id(cls, signal_id: str) -> Self: 

1537 """Could be generic from mongo""" 

1538 signal = Signal.get_one_by_attribute("signal_id", signal_id) 

1539 if signal is None: 

1540 split_signal_id = signal_id.split(".") 

1541 device_or_config_id = split_signal_id[0] 

1542 ticker = split_signal_id[-1] 

1543 possible_device = Device.get_from_device_or_config_id(device_or_config_id) 

1544 if possible_device is not None: 

1545 signal = Signal.get_one_by_attribute( 

1546 "signal_id", f"{possible_device.device_id}.{possible_device.config_id}.{ticker}" 

1547 ) 

1548 if not signal: 

1549 return None 

1550 return cls.dict_to_object(signal) 

1551 

1552 @classmethod 

1553 def get_all_ids(cls) -> list[str]: 

1554 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1555 

1556 return [signal["signal_id"] for signal in cursor] 

1557 

1558 async def number_samples(self): 

1559 collection = get_signal_collection(signal_id=self.signal_id) 

1560 if collection is None: 

1561 return 0 

1562 

1563 number_samples = collection.estimated_document_count() 

1564 

1565 number_samples_async_collection = await get_async_collection( 

1566 systems_async_database, "number_samples", create=True, time_series=True 

1567 ) 

1568 

1569 loop = asyncio.get_running_loop() 

1570 loop.create_task( 

1571 number_samples_async_collection.insert_one( 

1572 { 

1573 "timestamp": datetime.datetime.now(pytz.UTC), 

1574 "signal_id": self.signal_id, 

1575 "number_samples": number_samples, 

1576 } 

1577 ) 

1578 ) 

1579 

1580 return number_samples 

1581 

1582 def sample_datasize(self): 

1583 return signals_database.command("collstats", self.signal_id)["size"] 

1584 

1585 @classmethod 

1586 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1587 result = cls.collection().aggregate( 

1588 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1589 ) 

1590 

1591 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1592 

1593 

1594class Parameter(TwinPadModel): 

1595 name: str 

1596 value: str | float | bool | int 

1597 

1598 

1599class Component(GenericMongo): 

1600 collection_name: ClassVar[str] = "components" 

1601 

1602 id: int 

1603 name: str 

1604 signals: list[Signal] 

1605 parameters: list[Parameter] = [] 

1606 reference: str = None 

1607 

1608 

1609class ServicesStatus(TwinPadModel): 

1610 backend: str 

1611 cloud_broker: str 

1612 time_series_database: str 

1613 signal_storage: str 

1614 heartbeat_storage: str 

1615 data_analyzer: str 

1616 

1617 @classmethod 

1618 def check(cls) -> Self: 

1619 return cls( 

1620 cloud_broker=ping(RABBITMQ_HOST), 

1621 backend="up", 

1622 time_series_database=ping(MONGO_HOST), 

1623 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1624 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1625 data_analyzer=ping(DATA_ANALYZER_HOST), 

1626 ) 

1627 

1628 

1629def ping(host): 

1630 try: 

1631 if ping3.ping(host, timeout=0.8): 

1632 return "up" 

1633 except PermissionError: 

1634 pass 

1635 return "down" 

1636 

1637 

1638class Event(GenericMongo): 

1639 collection_name: ClassVar[str] = "events" 

1640 

1641 name: str 

1642 timestamp: float 

1643 event_rule_id: str 

1644 

1645 @computed_field 

1646 @cached_property 

1647 def event_rule(self) -> "EventRule": 

1648 return EventRule.get_from_id(self.event_rule_id) 

1649 

1650 @classmethod 

1651 def dict_to_object(cls, dict_): 

1652 """Refine to convert timestamp to datetime for mongodb.""" 

1653 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1654 return super().dict_to_object(dict_) 

1655 

1656 

1657class TwinPadActivity(GenericMongo): 

1658 timestamp: float 

1659 amount: int 

1660 

1661 @classmethod 

1662 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

1663 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1664 number_events_collection = get_collection(systems_database, "number_events") 

1665 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1666 items = [] 

1667 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1668 if number_events_collection is None or recompute_amount: 

1669 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1670 number_events_collection.delete_many({}) 

1671 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1672 if first_event is None: 

1673 return items 

1674 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1675 tzinfo=pytz.UTC 

1676 ) 

1677 while last_computed_day < TODAY: 

1678 day_nb_events = events_collection.count_documents( 

1679 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1680 ) 

1681 if day_nb_events > 0: 

1682 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

1683 last_computed_day += ONE_DAY_OFFSET 

1684 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1685 if number_events_today > 0: 

1686 number_events_collection.delete_many({"timestamp": TODAY}) 

1687 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

1688 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1689 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1690 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1691 for day in number_events: 

1692 day["timestamp"] = day["timestamp"].timestamp() 

1693 items.append(cls.mongo_dict_to_object(day)) 

1694 return items 

1695 

1696 @classmethod 

1697 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1698 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1699 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1700 signals_number_samples_collection = get_collection( 

1701 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

1702 ) 

1703 items = [] 

1704 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1705 if number_samples_collection is None or recompute_amount: 

1706 number_samples_collection = get_collection( 

1707 systems_database, "number_received_samples", create=True, time_series=True 

1708 ) 

1709 number_samples_collection.delete_many({}) 

1710 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

1711 if first_sample is None: 

1712 return items 

1713 # compute from day of first found event 

1714 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

1715 tzinfo=pytz.UTC 

1716 ) 

1717 while last_computed_day < TODAY: 

1718 number_samples_request = signals_number_samples_collection.aggregate( 

1719 [ 

1720 { 

1721 "$match": { 

1722 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

1723 } 

1724 }, 

1725 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1726 ] 

1727 ).to_list() 

1728 if len(number_samples_request) == 0: 

1729 number_samples = 0 

1730 else: 

1731 number_samples = number_samples_request[0].get("number_samples", 0) 

1732 if number_samples > 0: 

1733 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

1734 last_computed_day += ONE_DAY_OFFSET 

1735 number_samples_request = signals_number_samples_collection.aggregate( 

1736 [ 

1737 {"$match": {"timestamp": {"$gte": TODAY}}}, 

1738 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1739 ] 

1740 ).to_list() 

1741 if len(number_samples_request) == 0: 

1742 number_samples_today = 0 

1743 else: 

1744 number_samples_today = number_samples_request[0].get("number_samples", 0) 

1745 if number_samples_today > 0: 

1746 number_samples_collection.delete_many({"timestamp": TODAY}) 

1747 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

1748 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1749 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1750 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1751 for day in number_events: 

1752 day["timestamp"] = day["timestamp"].timestamp() 

1753 items.append(cls.mongo_dict_to_object(day)) 

1754 return items 

1755 

1756 @classmethod 

1757 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1758 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1759 number_commands_collection = get_collection(systems_database, "number_commands") 

1760 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

1761 items = [] 

1762 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1763 if number_commands_collection is None or recompute_amount: 

1764 number_commands_collection = get_collection( 

1765 systems_database, "number_commands", create=True, time_series=True 

1766 ) 

1767 number_commands_collection.delete_many({}) 

1768 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

1769 if first_command is None: 

1770 return items 

1771 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

1772 tzinfo=pytz.UTC 

1773 ) 

1774 while last_computed_day < TODAY: 

1775 day_nb_commands = commands_collection.count_documents( 

1776 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1777 ) 

1778 if day_nb_commands > 0: 

1779 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

1780 last_computed_day += ONE_DAY_OFFSET 

1781 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1782 if number_commands_today > 0: 

1783 number_commands_collection.delete_many({"timestamp": TODAY}) 

1784 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

1785 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1786 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1787 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1788 for day in number_commands: 

1789 day["timestamp"] = day["timestamp"].timestamp() 

1790 items.append(cls.mongo_dict_to_object(day)) 

1791 return items 

1792 

1793 

1794class EventRule(GenericMongo): 

1795 collection_name: ClassVar[str] = "event_rules" 

1796 

1797 name: str 

1798 formula: str 

1799 variables: list[str] 

1800 

1801 @computed_field 

1802 @cached_property 

1803 def number_events(self) -> int: 

1804 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1805 

1806 

1807class Company(GenericMongo): 

1808 collection_name: ClassVar[str] = "companies" 

1809 name: str 

1810 

1811 

1812class Campaign(GenericMongo): 

1813 collection_name: ClassVar[str] = "campaigns" 

1814 

1815 # Properties 

1816 id: str | None = None 

1817 name: str 

1818 description: str | None = None 

1819 

1820 @classmethod 

1821 def create(cls, campaign: Self): 

1822 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1823 if new_campaign is None: 

1824 return None 

1825 return {"campaign_id": str(new_campaign.inserted_id)} 

1826 

1827 @classmethod 

1828 def update(cls, campaign: Self): 

1829 updated_campaign = cls.collection().find_one_and_update( 

1830 {"_id": ObjectId(campaign.id)}, 

1831 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1832 return_document=ReturnDocument.AFTER, 

1833 ) 

1834 return updated_campaign 

1835 

1836 @classmethod 

1837 def delete(cls, campaign_id): 

1838 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1839 return deleted_user 

1840 

1841 

1842class Phase(GenericMongo): 

1843 collection_name: ClassVar[str] = "phases" 

1844 

1845 # Properties 

1846 id: str | None = None 

1847 name: str 

1848 description: str | None = None 

1849 start_at: float 

1850 end_at: float 

1851 

1852 # FK 

1853 campaign_id: str 

1854 

1855 # @classmethod 

1856 # def get_by_date(cls, datetime: float): 

1857 # phases = [] 

1858 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1859 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1860 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1861 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1862 # if phases is None: 

1863 # return None 

1864 # return phases 

1865 

1866 @classmethod 

1867 def create(cls, phase: Self): 

1868 phase = Phase( 

1869 name=phase.name, 

1870 description=phase.description, 

1871 start_at=phase.start_at, 

1872 end_at=phase.end_at, 

1873 campaign_id=phase.campaign_id, 

1874 ) 

1875 phase_collection = get_collection(systems_database, "phases", create=True) 

1876 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1877 if new_phase is None: 

1878 return None 

1879 return {"phase_id": str(new_phase.inserted_id)} 

1880 

1881 @classmethod 

1882 def update(cls, phase: Self): 

1883 updated_phase = cls.collection().find_one_and_update( 

1884 {"_id": ObjectId(phase.id)}, 

1885 { 

1886 "$set": { 

1887 "name": phase.name, 

1888 "description": phase.description, 

1889 "start_at": phase.start_at, 

1890 "end_at": phase.end_at, 

1891 } 

1892 }, 

1893 return_document=ReturnDocument.AFTER, 

1894 ) 

1895 return updated_phase 

1896 

1897 @classmethod 

1898 def delete(cls, phase_id): 

1899 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1900 return delete_phase 

1901 

1902 @classmethod 

1903 def deleteMany(cls, campaign_id): 

1904 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1905 return delete_phases 

1906 

1907 

1908class CustomViewCreation(GenericMongo): 

1909 collection_name: ClassVar[str] = "custom_views" 

1910 

1911 name: str 

1912 configuration: list 

1913 

1914 

1915class CustomView(CustomViewCreation): 

1916 # Properties 

1917 id: str | None = None 

1918 

1919 # Foreign Key 

1920 user_id: str 

1921 

1922 # # Methods 

1923 # @classmethod 

1924 # def create(cls, form_custom_view: Self, user_id) -> list: 

1925 # custom_view = CustomView( 

1926 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1927 # ) 

1928 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1929 # return new_custom_view 

1930 

1931 # @classmethod 

1932 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1933 # updated_custom_view = cls.collection().find_one_and_update( 

1934 # {"_id": ObjectId(custom_view_id)}, 

1935 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1936 # return_document=ReturnDocument.AFTER, 

1937 # ) 

1938 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1939 # del updated_custom_view["_id"] 

1940 # return cls(**updated_custom_view) 

1941 

1942 # @classmethod 

1943 # def delete(cls, custom_view_id) -> bool: 

1944 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1945 # return deleted_custom_view.acknowledged 

1946 

1947 

1948CustomViewUpdate = create_update_model(CustomView) 

1949 

1950 

1951class Video(GenericMongo): 

1952 collection_name: ClassVar[str] = "videos" 

1953 

1954 # Properties 

1955 name: str 

1956 ip_addr: str 

1957 username: str | None = None 

1958 password: str | None = None 

1959 

1960 # Methods 

1961 @classmethod 

1962 def get_all(cls, sort_by="_id") -> list[Self]: 

1963 items = [] 

1964 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1965 items.append(cls.mongo_dict_to_object(dict_)) 

1966 return items 

1967 

1968 @classmethod 

1969 def get_video(cls, camera_id: ObjectId): 

1970 camera = cls.get_from_id(camera_id) 

1971 if camera is not None: 

1972 return camera.name 

1973 return None 

1974 

1975 

1976class Command(GenericMongo): 

1977 collection_name: ClassVar[str] = "commands" 

1978 

1979 # Properties 

1980 timestamp: datetime.datetime = None 

1981 sent_at: float 

1982 response_time: float = 0.0 

1983 command_type: str 

1984 description: str = "" 

1985 succeeded: bool = False 

1986 

1987 # Foreign key 

1988 user_id: str 

1989 

1990 @classmethod 

1991 def collection(cls): 

1992 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1993 

1994 @classmethod 

1995 def create(cls, command: Self): 

1996 command = cls( 

1997 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1998 sent_at=command.sent_at, 

1999 response_time=command.response_time, 

2000 command_type=command.command_type, 

2001 description=command.description, 

2002 succeeded=command.succeeded, 

2003 user_id=command.user_id, 

2004 ) 

2005 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2006 if new_command is None: 

2007 return None 

2008 return {"command_id": str(new_command.inserted_id)} 

2009 

2010 def receive_response(self, response: dict): 

2011 self.response_time = time.time() - self.sent_at 

2012 self.succeeded = response.get("error", True) is False 

2013 if self.description == "": 

2014 self.description += response.get("message", "").rstrip() 

2015 

2016 

2017class SignalsPresetCreation(GenericMongo): 

2018 name: str 

2019 signal_ids: list[str] 

2020 

2021 

2022class SignalsPreset(SignalsPresetCreation): 

2023 collection_name: ClassVar[str] = "signals_presets" 

2024 

2025 user_id: str 

2026 

2027 @classmethod 

2028 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2029 signals_preset = cls( 

2030 user_id=user_id, 

2031 name=signals_preset.name, 

2032 signal_ids=signals_preset.signal_ids, 

2033 ) 

2034 

2035 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2036 

2037 return str(new_signal_preset.inserted_id) 

2038 

2039 

2040SignalsPresetUpdate = create_update_model(SignalsPreset) 

2041 

2042 

2043class LineStyle(str, Enum): 

2044 solid = "solid" 

2045 dotted = "dotted" 

2046 dashed = "dashed" 

2047 

2048 

2049class SignalAppearance: 

2050 value_color: str 

2051 forced_value_color: str 

2052 

2053 

2054class GraphThemeCreation(GenericMongo): 

2055 collection_name: ClassVar[str] = "graph_themes" 

2056 

2057 name: str 

2058 signal_id: str 

2059 value_color: str = "" 

2060 forced_value_color: str = "" 

2061 value_line_style: LineStyle = LineStyle.solid 

2062 forced_value_line_style: LineStyle = LineStyle.solid 

2063 private: bool = True 

2064 

2065 

2066class PublicGraphTheme(GraphThemeCreation): 

2067 created_by_user: bool 

2068 in_user_library: bool 

2069 active_for_user: bool 

2070 

2071 _current_user_id: str = "" 

2072 

2073 @classproperty 

2074 def custom_pipeline_steps(cls) -> dict[str, list]: 

2075 return { 

2076 "created_by_user": [ 

2077 { 

2078 "$addFields": { 

2079 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2080 } 

2081 } 

2082 ], 

2083 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2084 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2085 ], 

2086 "in_user_library": [ 

2087 { 

2088 "$addFields": { 

2089 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2090 } 

2091 } 

2092 ], 

2093 "active_for_user": [ 

2094 { 

2095 "$addFields": { 

2096 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2097 } 

2098 } 

2099 ], 

2100 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2101 "active": [ 

2102 { 

2103 "$addFields": { 

2104 "active": "$$REMOVE", 

2105 } 

2106 } 

2107 ], 

2108 "creator_id": [ 

2109 { 

2110 "$addFields": { 

2111 "creator_id": "$$REMOVE", 

2112 } 

2113 } 

2114 ], 

2115 } 

2116 

2117 @classmethod 

2118 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2119 cls._current_user_id = user_id 

2120 return super().response_from_query(query) 

2121 

2122 @classmethod 

2123 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2124 query.in_user_library = "true" 

2125 return cls.response_from_query(query, user_id) 

2126 

2127 @classmethod 

2128 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2129 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2130 

2131 @classmethod 

2132 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2133 cls._current_user_id = user_id 

2134 return super().get_by_attribute(attribute_name, attribute_value) 

2135 

2136 @classmethod 

2137 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2138 cls._current_user_id = user_id 

2139 return super().get_one_by_attribute(attribute_name, attribute_value) 

2140 

2141 @classmethod 

2142 def get_all(cls, sort_by: str, user_id: str): 

2143 cls._current_user_id = user_id 

2144 return super().get_all(sort_by) 

2145 

2146 @classmethod 

2147 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2148 pipeline = [ 

2149 { 

2150 "$match": { 

2151 "active": {"$eq": user_id}, 

2152 "signal_id": {"$in": signal_ids}, 

2153 } 

2154 }, 

2155 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2156 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2157 { 

2158 "$project": { 

2159 "_id": 0, 

2160 "signal_id": 1, 

2161 "value_color": 1, 

2162 "forced_value_color": 1, 

2163 "value_line_style": 1, 

2164 "forced_value_line_style": 1, 

2165 } 

2166 }, 

2167 ] 

2168 

2169 result = {} 

2170 

2171 cursor = cls.collection().aggregate(pipeline) 

2172 for document in cursor: 

2173 signal_id = document["signal_id"] 

2174 del document["signal_id"] 

2175 result[signal_id] = document 

2176 

2177 return result 

2178 

2179 

2180GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2181 

2182 

2183class PrivateGraphTheme(GraphThemeCreation): 

2184 # private 

2185 creator_id: str 

2186 in_library: list[str] 

2187 active: list[str] 

2188 

2189 @classmethod 

2190 def create( 

2191 cls, 

2192 creator_id: str, 

2193 name: str, 

2194 signal_id: str, 

2195 value_color: str, 

2196 forced_value_color: str, 

2197 value_line_style: LineStyle, 

2198 forced_value_line_style: LineStyle, 

2199 private: bool, 

2200 ): 

2201 color_setting = cls( 

2202 creator_id=creator_id, 

2203 name=name, 

2204 signal_id=signal_id, 

2205 value_color=value_color, 

2206 forced_value_color=forced_value_color, 

2207 value_line_style=value_line_style, 

2208 forced_value_line_style=forced_value_line_style, 

2209 private=private, 

2210 in_library=[creator_id], 

2211 active=[creator_id], 

2212 ) 

2213 

2214 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2215 color_setting.id = str(new_color_setting.inserted_id) 

2216 return color_setting 

2217 

2218 def update(self, update_dict: dict, user_id: str): 

2219 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2220 if in_user_lib and user_id not in self.in_library: 

2221 self.in_library.append(user_id) 

2222 elif not in_user_lib and user_id in self.in_library: 

2223 self.in_library.remove(user_id) 

2224 update_dict["in_library"] = self.in_library 

2225 del update_dict["in_user_library"] 

2226 

2227 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2228 if active_for_user and user_id not in self.active: 

2229 self.active.append(user_id) 

2230 elif not active_for_user and user_id in self.active: 

2231 self.active.remove(user_id) 

2232 update_dict["active"] = self.active 

2233 del update_dict["active_for_user"] 

2234 

2235 if update_dict.get("created_by_user") is not None: 

2236 del update_dict["created_by_user"] 

2237 

2238 self.collection().find_one_and_update( 

2239 {"_id": ObjectId(self.id)}, 

2240 {"$set": update_dict}, 

2241 ) 

2242 

2243 return {} 

2244 

2245 

2246class Configuration(GenericMongo): 

2247 collection_name: ClassVar[str] = "configs" 

2248 

2249 # Properties 

2250 config_id: str | None = None 

2251 generated_at: float 

2252 config: dict 

2253 components: list 

2254 hardware_topology: dict 

2255 received_at: float 

2256 in_use_by_devices: list[str] = [] 

2257 is_in_use: bool = False 

2258 

2259 custom_pipeline_steps = { 

2260 "is_in_use": [ 

2261 { 

2262 "$addFields": { 

2263 "is_in_use": { 

2264 "$cond": [ 

2265 {"$gt": [{"$size": {"$ifNull": ["$in_use_by_devices", []]}}, 0]}, 

2266 True, 

2267 False, 

2268 ] 

2269 }, 

2270 } 

2271 } 

2272 ], 

2273 } 

2274 

2275 @classmethod 

2276 def get_from_config_id(cls, config_id: str) -> Self: 

2277 items = ( 

2278 cls.collection() 

2279 .aggregate( 

2280 [ 

2281 {"$match": {"config_id": config_id}}, 

2282 {"$limit": 1}, 

2283 ] 

2284 ) 

2285 .to_list() 

2286 ) 

2287 if len(items) == 0: 

2288 return None 

2289 dict_ = items[0] 

2290 # There is some protected information in the config dict, so keep only specific keys 

2291 allowed_config_keys = ["description", "broker_host", "target_device_id", "device_name"] 

2292 config_dict = dict_.get("config") 

2293 dict_["config"] = {k: config_dict[k] for k in allowed_config_keys} 

2294 return cls.mongo_dict_to_object(dict_) 

2295 

2296 

2297DIGITIZATION_FUNCTIONS_FROM_REFERENCE = { 

2298 "EL1809": DigitizationFunction.from_bits(1, 0, 1), 

2299 "EL1819": DigitizationFunction.from_bits(1, 0, 1), 

2300 "EL3124": DigitizationFunction.from_bits(15, 0.004, 0.020), 

2301 "EL3062-0030": DigitizationFunction.from_bits(15, 0.0, 30.0), 

2302 "EL3356-0020": DigitizationFunction.from_bits(24, -12.0, 12.0), 

2303 "EL2004": DigitizationFunction.from_bits(1, 0, 1), 

2304 "EL2042": DigitizationFunction.from_bits(1, 0, 1), 

2305 "EL1004": DigitizationFunction.from_bits(1, 0, 1), 

2306 "EL3054": DigitizationFunction.from_bits(15, 0.004, 0.020), 

2307 "EL3202": DigitizationFunction.from_values(-2000, 8500, -200, 850), 

2308 "EL4022": DigitizationFunction.from_bits(15, 0.004, 0.020), 

2309 "EL3064": DigitizationFunction.from_bits(15, 0.0, 10.0), 

2310 "EL2022": DigitizationFunction.from_bits(1, 0, 1), 

2311 "ELX2008": DigitizationFunction.from_bits(1, 0, 1), 

2312 "EXP3158": DigitizationFunction.from_bits(15, 0.004, 0.020), 

2313 "EPX1058": DigitizationFunction.from_bits(1, 0, 1), 

2314 "ELX4154": DigitizationFunction.from_bits(15, 0.004, 0.020), 

2315 "ELM3704": DigitizationFunction.from_bits(31, 0.004, 0.020), 

2316 "EL3062": DigitizationFunction.from_bits(15, 0, 10), 

2317 "EL3351": DigitizationFunction.from_bits(15, -0.020, 0.020), 

2318 "EL9410": DigitizationFunction.from_bits(1, 0, 1), 

2319 "EL2911": DigitizationFunction.from_bits(1, 0, 1), 

2320} 

2321 

2322 

2323class EtherCatModule: 

2324 name: str 

2325 reference: str 

2326 # ports: list[Port] 

2327 signals: list[Signal] = [] 

2328 

2329 

2330class EtherCatLoop: 

2331 terminals: list[EtherCatModule] 

2332 

2333 @cached_property 

2334 def signals(self): 

2335 signals = [] 

2336 for terminal in self.terminals: 

2337 signals.extend(terminal.signals) 

2338 return signals 

2339 

2340 def blueprint(self, filename: str): 

2341 width = 0 

2342 height = 0 

2343 

2344 images = [] 

2345 missing_images = set() 

2346 for terminal in self.terminals: 

2347 # Load terminal image 

2348 try: 

2349 terminal_image = Image.open(f"beckhoff/{terminal.reference}.png") 

2350 except FileNotFoundError: 

2351 terminal_image = Image.open("beckhoff/undefined.png") 

2352 missing_images.add(terminal.reference) 

2353 images.append(terminal_image) 

2354 

2355 height = max(height, terminal_image.height) 

2356 width += terminal_image.width 

2357 if missing_images: 

2358 print(f"Missing images: {missing_images}") 

2359 im = Image.new("RGBA", (width, height)) 

2360 x = 0 

2361 for image in images: 

2362 

2363 # box =(x, 0, x, image.width) 

2364 im.paste(image, (x, 0)) 

2365 x += image.width 

2366 im.save(filename) 

2367 return im 

2368 

2369 @classmethod 

2370 def from_eni(cls, filename: str): 

2371 

2372 tags = xml_tags(filename) 

2373 

2374 modules = [] 

2375 for slave in tags["Slave"]: 

2376 # for child in slave: 

2377 info = slave.find("Info") 

2378 name = info.find("Name").text 

2379 

2380 subname, ref = extract_subname_ref(name) 

2381 

2382 modules.append(EtherCatModule(subname, ref)) 

2383 # print(attr) 

2384 return cls(modules) 

2385 

2386 @classmethod 

2387 def from_syvar_xml(cls, filename): 

2388 tags = xml_tags(filename) 

2389 

2390 modules = [] 

2391 card_number = 0 

2392 for slave in tags["Interface"]: 

2393 cards_signals = [] 

2394 

2395 # for child in slave: 

2396 name = slave.find("name").text 

2397 extracted_subname_ref = extract_subname_ref(name) 

2398 if extracted_subname_ref is not None: 

2399 card_number += 1 

2400 subname, ref = extracted_subname_ref 

2401 

2402 for variable in slave.find("Variables"): 

2403 ticker = variable.find("mnemonic").text 

2404 description = variable.find("information").text 

2405 # Unit 

2406 unit_tag = variable.find("datatype").find("unit") 

2407 if unit_tag is not None: 

2408 unit = unit_tag.text 

2409 else: 

2410 unit = None 

2411 if unit == "NoUnit": 

2412 unit = None 

2413 

2414 address_tag = variable.find("behavior").find("address") 

2415 if address_tag is not None: 

2416 address = address_tag.text 

2417 match = re.search(CHANNEL_PATTERN, address) 

2418 if match: 

2419 

2420 channel_number = int(match.group(1)) 

2421 

2422 digitization_function = DIGITIZATION_FUNCTIONS_FROM_REFERENCE[ref] 

2423 transfer_function = None 

2424 

2425 signal = Signal( 

2426 signal_id=ticker, 

2427 description=description, 

2428 unit=unit, 

2429 type="sensor", 

2430 address=LoopAddress(card_number=card_number, channel=channel_number), 

2431 frequency=None, 

2432 transfer_function=transfer_function, 

2433 precision_digits=None, 

2434 data_type="float", 

2435 digitization_function=digitization_function, 

2436 ) 

2437 cards_signals.append(signal) 

2438 

2439 modules.append(EtherCatModule(subname, ref, signals=cards_signals)) 

2440 

2441 return cls(modules) 

2442 

2443 

2444class EtherCatTopology: 

2445 loops: list[EtherCatLoop] 

2446 

2447 @cached_property 

2448 def signals(self) -> list[Signal]: 

2449 signals = [] 

2450 for loop in self.loops: 

2451 signals.extend(loop.signals) 

2452 return signals 

2453 

2454 def wiring_xlsx(self, filename: str): 

2455 wb = Workbook() 

2456 signals_sheet = wb.active 

2457 signals_sheet.title = "signals" 

2458 

2459 for iloop, loop in enumerate(self.loops): 

2460 loop_sheet = wb.create_sheet(f"loop_{iloop+1}") 

2461 # Creating card header 

2462 loop_sheet["B1"] = "Card name" 

2463 loop_sheet["B2"] = "Card reference" 

2464 loop_sheet["B3"] = "Position" 

2465 loop_sheet["A4"] = "Channel" 

2466 for i in range(16): 

2467 loop_sheet.cell(row=4 + i, column=2, value=i + 1) 

2468 

2469 for icard, card in enumerate(loop.terminals): 

2470 loop_sheet.cell(row=1, column=3 + icard, value=card.name) 

2471 loop_sheet.cell(row=2, column=3 + icard, value=card.reference) 

2472 loop_sheet.cell(row=3, column=3 + icard, value=icard + 1) 

2473 

2474 for signal in card.signals: 

2475 loop_sheet.cell(row=3 + signal.address.channel, column=3 + icard, value=signal.ticker) 

2476 

2477 thin = Side(border_style="thin", color="000000") 

2478 for i in range(19): 

2479 for j in range(len(loop.terminals) + 2): 

2480 cell = loop_sheet.cell(row=i + 1, column=j + 1) 

2481 cell.border = Border(top=thin, left=thin, right=thin, bottom=thin) 

2482 loop_sheet.merge_cells("A4:A19") 

2483 

2484 # signals_sheet = wb.create_sheet('signals') 

2485 for i, header in enumerate(XLSX_HEADER): 

2486 signals_sheet.cell(row=1, column=1 + i, value=header) 

2487 

2488 for isignal, signal in enumerate(self.signals): 

2489 signals_sheet.cell(row=isignal + 2, column=1, value=signal.ticker) 

2490 signals_sheet.cell(row=isignal + 2, column=3, value=signal.description) 

2491 signals_sheet.cell(row=isignal + 2, column=4, value=signal.unit) 

2492 signals_sheet.cell(row=isignal + 2, column=5, value=signal.type) 

2493 signals_sheet.cell(row=isignal + 2, column=6, value=signal.frequency) 

2494 signals_sheet.cell(row=isignal + 2, column=7, value=signal.transfer_function) 

2495 signals_sheet.cell(row=isignal + 2, column=8, value=signal.precision_digits) 

2496 

2497 wb.save(filename)