Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%

1394 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 14:42 +0000

1from functools import cached_property 

2import os 

3import re 

4import io 

5import time 

6import csv 

7from typing import Self, ClassVar, Any, Literal, get_args, Annotated 

8import datetime 

9import math 

10import bisect 

11from enum import Enum 

12import logging 

13import copy 

14import asyncio 

15import zipfile 

16 

17import requests 

18import ping3 

19import pytz 

20from bson.objectid import ObjectId 

21from pymongo import ASCENDING, ReturnDocument 

22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator 

23import numpy as npy 

24import lttb 

25import h5py 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 devices_states_database, 

36) 

37from twinpad_backend.responses import ListResponse 

38from twinpad_backend.messages import RabbitMQClient 

39from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm 

40 

41TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

42SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"] 

43DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"] 

44MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"] 

45 

46 

47RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

48MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

49SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

50HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

51DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

52 

53DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

54NUMBER_SAMPLES_DATABASE_UPDATE = 120 

55 

56logger = logging.getLogger("uvicorn.error") 

57 

58 

59class DeleteInfo(BaseModel): 

60 is_deleted: bool 

61 detail: str 

62 

63 

64class classproperty: 

65 """ 

66 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

67 Found here: https://stackoverflow.com/a/76301341 

68 """ 

69 

70 def __init__(self, func): 

71 self.fget = func 

72 

73 def __get__(self, _, owner): 

74 return self.fget(owner) 

75 

76 

77def create_update_model(model): 

78 fields = {} 

79 

80 for field_name, field_annotation in model.model_fields.items(): 

81 if field_name != "id": 

82 fields[field_name] = (field_annotation.annotation | None, None) 

83 

84 query_name = model.__name__ + "Update" 

85 return create_model(query_name, **fields) 

86 

87 

88def get_utc_date_from_timestamp(timestamp: float): 

89 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

90 

91 

92def downsample_list(time_vector: list, values: list, max_number_samples: int): 

93 if len(time_vector) < max_number_samples: 

94 return time_vector, values 

95 

96 time_vector_copy = copy.deepcopy(time_vector) 

97 values_copy = copy.deepcopy(values) 

98 

99 none_group_bounds = [] 

100 none_group_index = -1 

101 index = -1 

102 # LTTB doesn't handle None values so remove them 

103 while values_copy.count(None) > 0: 

104 # Store bounds of None value groups so we can insert them back after the downsampling 

105 if (new_index := values_copy.index(None)) != index: 

106 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

107 none_group_index += 1 

108 elif len(none_group_bounds[none_group_index]) < 2: 

109 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

110 else: 

111 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

112 values_copy.pop(new_index) 

113 index = new_index 

114 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

115 

116 try: 

117 values_array = npy.array([time_vector_copy, values_copy]).T 

118 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

119 

120 new_time_vector = interpolated_values[:, 0].tolist() 

121 new_values = interpolated_values[:, 1].tolist() 

122 except ValueError: 

123 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

124 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

125 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

126 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

127 return new_time_vector, new_values_nan_to_none 

128 

129 # insert back None values at the correct timestamps 

130 for none_group in none_group_bounds: 

131 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

132 new_time_vector[start_index:start_index] = none_group 

133 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

134 

135 return new_time_vector, new_values 

136 

137 

138def is_of_type(value, wanted_type): 

139 if wanted_type is float: 

140 return isinstance(value, (int, float)) 

141 return isinstance(value, wanted_type) 

142 

143 

144# Models 

145class TwinPadModel(BaseModel): 

146 @classmethod 

147 def dict_to_object(cls, dict_): 

148 return cls.model_validate(dict_) 

149 

150 def to_dict(self, exclude=None): 

151 dict_ = self.model_dump(exclude=exclude, mode="json") 

152 return dict_ 

153 

154 

155def validate_mongo_id(v): 

156 if not ObjectId.is_valid(v): 

157 raise ValueError("Invalid MongoDB id") 

158 return str(v) 

159 

160 

161MongoId = Annotated[str, BeforeValidator(validate_mongo_id)] 

162 

163 

164def validate_12_hex(v: str) -> str: 

165 if not re.fullmatch(r"[0-9a-fA-F]{12}", v): 

166 raise ValueError("ID must be a 12-character hexadecimal string") 

167 return v 

168 

169 

170DeviceId = Annotated[str, BeforeValidator(validate_12_hex)] 

171 

172 

173class GenericMongo(TwinPadModel): 

174 id: MongoId | None = None 

175 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

176 

177 @classmethod 

178 def collection(cls): 

179 return get_collection(systems_database, cls.collection_name, create=True) 

180 

181 @classmethod 

182 def response_from_query(cls, query) -> ListResponse[Self]: 

183 request_filters = query.mongodb_filter() 

184 items = [] 

185 

186 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

187 sort_dict = {} 

188 for sort in query.sort_by.split(","): 

189 if ":" in sort: 

190 sort_field, sort_order = sort.split(":") 

191 sort_order = int(sort_order) 

192 else: 

193 sort_field = sort 

194 sort_order = 1 

195 sort_dict[sort_field] = sort_order 

196 

197 collection = get_collection(systems_database, cls.collection_name, create=True) 

198 total = collection.count_documents(request_filters) 

199 

200 pipeline = [] 

201 added_properties = [] 

202 if "$and" in request_filters: 

203 for request_filter in request_filters["$and"]: 

204 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

205 if filtered_property in request_filter: 

206 pipeline.extend(pipeline_steps) 

207 added_properties.append(filtered_property) 

208 else: 

209 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

210 if filtered_property in request_filters: 

211 pipeline.extend(pipeline_steps) 

212 added_properties.append(filtered_property) 

213 pipeline.append({"$match": request_filters}) 

214 

215 for sort_field in sort_dict: 

216 if sort_field in cls.custom_pipeline_steps: 

217 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

218 added_properties.append(sort_field) 

219 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

220 

221 if (query.limit is not None) and (query.limit != 0): 

222 pipeline.append({"$limit": query.limit}) 

223 

224 for filtered_property, step in cls.custom_pipeline_steps.items(): 

225 if filtered_property not in added_properties: 

226 pipeline.extend(step) 

227 

228 cursor = collection.aggregate(pipeline) 

229 

230 for item_dict in cursor: 

231 items.append(cls.mongo_dict_to_object(item_dict)) 

232 

233 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

234 

235 @classmethod 

236 def get_from_id(cls, item_id) -> Self | None: 

237 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

238 

239 @classmethod 

240 def mongo_dict_to_object(cls, mongo_dict): 

241 mongo_dict["id"] = str(mongo_dict["_id"]) 

242 del mongo_dict["_id"] 

243 return cls.dict_to_object(mongo_dict) 

244 

245 @classmethod 

246 def get_by_attribute(cls, attribute_name: str, attribute_value): 

247 """Returns all items that match the attribute with value.""" 

248 pipeline = [] 

249 if attribute_name in cls.custom_pipeline_steps: 

250 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

251 pipeline.append({"$match": {attribute_name: attribute_value}}) 

252 for key, step in cls.custom_pipeline_steps.items(): 

253 if key != attribute_name: 

254 pipeline.extend(step) 

255 items = cls.collection().aggregate(pipeline) 

256 if items is None: 

257 return None 

258 return [cls.mongo_dict_to_object(d) for d in items] 

259 

260 @classmethod 

261 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

262 pipeline = [] 

263 if attribute_name in cls.custom_pipeline_steps: 

264 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

265 pipeline.append({"$match": {attribute_name: attribute_value}}) 

266 pipeline.append({"$limit": 1}) 

267 for key, step in cls.custom_pipeline_steps.items(): 

268 if key != attribute_name: 

269 pipeline.extend(step) 

270 items = cls.collection().aggregate(pipeline).to_list() 

271 if len(items) == 0: 

272 return None 

273 return cls.mongo_dict_to_object(items[0]) 

274 

275 @classmethod 

276 def get_all(cls, sort_by="_id") -> list[Self]: 

277 items = [] 

278 pipeline = [] 

279 if sort_by in cls.custom_pipeline_steps: 

280 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

281 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

282 for key, step in cls.custom_pipeline_steps.items(): 

283 if key != sort_by: 

284 pipeline.extend(step) 

285 for dict_ in cls.collection().aggregate(pipeline): 

286 items.append(cls.mongo_dict_to_object(dict_)) 

287 return items 

288 

289 @classmethod 

290 def get_number_documents(cls): 

291 collection = get_collection(systems_database, cls.collection_name) 

292 if collection is None: 

293 return 0 

294 return collection.count_documents( 

295 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]} 

296 ) 

297 

298 def insert(self): 

299 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

300 self.id = str(insert_result.inserted_id) 

301 return self.id 

302 

303 def update(self, update_dict): 

304 for key, value in update_dict.items(): 

305 setattr(self, key, value) 

306 self.collection().find_one_and_update( 

307 {"_id": ObjectId(self.id)}, 

308 {"$set": update_dict}, 

309 return_document=ReturnDocument.AFTER, 

310 ) 

311 

312 return self 

313 

314 def delete(self): 

315 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

316 return result.deleted_count > 0 

317 

318 

319class User(GenericMongo): 

320 collection_name: ClassVar[str] = "users" 

321 

322 firstname: str 

323 lastname: str 

324 email: str 

325 password: str 

326 is_blocked: bool | None = False 

327 is_admin: bool | None = False 

328 is_connected: bool | None = False 

329 company_id: str | None = None 

330 

331 def to_dict(self, exclude: set = None): 

332 if exclude is None: 

333 exclude = set() 

334 exclude.add("password") 

335 return GenericMongo.to_dict(self, exclude=exclude) 

336 

337 @classmethod 

338 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

339 users = cls.get_all() 

340 if not users: 

341 is_admin = True 

342 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

343 user_collection = get_collection(systems_database, "users", create=True) 

344 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

345 if new_user is None: 

346 return None 

347 return {"user_id": str(new_user.inserted_id)} 

348 

349 @classmethod 

350 def update_info(cls, user: "UserUpdate", user_id: str): # type: ignore 

351 updated_user = cls.collection().find_one_and_update( 

352 {"_id": ObjectId(user_id)}, 

353 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

354 return_document=ReturnDocument.AFTER, 

355 ) 

356 updated_user["id"] = str(updated_user["_id"]) 

357 del (updated_user["_id"], updated_user["is_connected"]) 

358 return cls(**updated_user) 

359 

360 

361UserUpdate = create_update_model(User) 

362 

363 

364class Mode(TwinPadModel): 

365 mode_id: int 

366 name: str 

367 frequency_multiplier: float 

368 min_frequency: float 

369 

370 

371class DeviceUpdate(TwinPadModel): 

372 mode_id: int 

373 

374 

375class Device(GenericMongo): 

376 collection_name: ClassVar[str] = "devices" 

377 

378 device_id: DeviceId 

379 name: str 

380 description: str = "" 

381 modes: list[Mode] 

382 current_mode_id: int | None = None 

383 last_ping: float | None = None 

384 petri_network: Any 

385 pid: Any 

386 load: float | None = None 

387 tokens: list[int] = Field(default_factory=list) 

388 status: str 

389 

390 async def change_mode(self, update_dict, current_user: User): 

391 has_error = False 

392 

393 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

394 has_error = True 

395 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

396 elif self.current_mode_id is not None: 

397 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

398 else: 

399 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

400 command = Command( 

401 sent_at=time.time(), 

402 command_type="Mode change", 

403 description=description, 

404 user_id=current_user.id, 

405 ) 

406 

407 if has_error: 

408 command.response_time = 0 

409 command.succeeded = False 

410 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

411 else: 

412 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id) 

413 command.receive_response(response) 

414 

415 Command.create(command) 

416 return response 

417 

418 @classmethod 

419 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

420 devices_by_id = {} 

421 for signal_id in signal_ids: 

422 device_id = signal_id.split(".")[0] 

423 if device_id not in devices_by_id: 

424 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

425 return devices_by_id 

426 

427 

428class DeviceSetup(GenericMongo): 

429 collection_name: ClassVar[str] = "device_setups" 

430 

431 device_ids: list[str] 

432 active: bool = False 

433 variable_mapping: dict[str, str] 

434 

435 

436DeviceSetupUpdate = create_update_model(DeviceSetup) 

437 

438 

439class DeviceState(GenericMongo): 

440 collection_name: ClassVar[str] = "devices_states" 

441 

442 timestamp: float 

443 mode: str | None = None 

444 load: float | None = None 

445 tokens: list[int] = Field(default_factory=list) 

446 modified_properties: list[str] = Field(default_factory=list) 

447 

448 @classmethod 

449 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]: 

450 req_filter = query.mongodb_filter() 

451 items = [] 

452 if ":" in query.sort_by: 

453 sort_field, sort_order = query.sort_by.split(":") 

454 sort_order = int(sort_order) 

455 else: 

456 sort_field = query.sort_by 

457 sort_order = 1 

458 collection = get_collection(devices_states_database, device_id) 

459 if collection is None: 

460 total = 0 

461 cursor = [] 

462 else: 

463 total = collection.count_documents(req_filter) 

464 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

465 if (query.limit is not None) and (query.limit != 0): 

466 cursor = cursor.limit(query.limit) 

467 for item_dict in cursor: 

468 items.append( 

469 cls( 

470 timestamp=item_dict.get("precise_timestamp"), 

471 mode=item_dict.get("mode", None), 

472 load=item_dict.get("load", None), 

473 tokens=item_dict.get("tokens", Field(default_factory=list)), 

474 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

475 ) 

476 ) 

477 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

478 

479 

480class SignalSample(TwinPadModel): 

481 signal_id: str 

482 timestamp: float 

483 value: float | int | str | bool | None 

484 forced_value: float | int | str | bool | None = None 

485 

486 @classmethod 

487 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

488 

489 collection = get_signal_collection(signal_id) 

490 if collection is None: 

491 return None 

492 

493 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

494 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

495 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

496 first_bucket = None 

497 if bucket is not None: 

498 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

499 if first_bucket is not None: 

500 sample_data = collection.find_one( 

501 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

502 ) 

503 else: 

504 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

505 

506 if sample_data is None: 

507 return None 

508 

509 timestamp = sample_data["precise_timestamp"] 

510 

511 return cls( 

512 signal_id=signal_id, 

513 timestamp=timestamp, 

514 value=sample_data.get("value", None), 

515 forced_value=sample_data.get("forced_value", None), 

516 ) 

517 

518 @classmethod 

519 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

520 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

521 

522 @classmethod 

523 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

524 last_value_collection = get_signal_collection("last_values", create=False) 

525 signals_collection = get_collection(systems_database, "signals", create=True) 

526 

527 sample_data = None 

528 if last_value_collection is not None: 

529 sample_data = last_value_collection.find_one({"signal_id": signal_id}, sort={"precise_timestamp": -1}) 

530 

531 # If there is no data, check if the signal's is up, as they don't send duplicate values 

532 if sample_data is None: 

533 if signals_collection.count_documents({"status.status": "up", "signal_id": signal_id}) < 1: 

534 return None 

535 

536 # Same workaround as above function, very effective to narrow down big sets of data 

537 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

538 last_bucket = None 

539 if bucket is not None: 

540 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

541 signal_collection = get_signal_collection(signal_id) 

542 if last_bucket is not None and signal_collection is not None: 

543 sample_data = signal_collection.find_one( 

544 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

545 sort={"precise_timestamp": -1}, 

546 ) 

547 else: 

548 sample_data = signal_collection.find_one({}, sort={"precise_timestamp": -1}) 

549 

550 if sample_data is None: 

551 return None 

552 

553 timestamp = sample_data.get("precise_timestamp") 

554 # Align the timestamp with the device's last ping, cannot align with current time to avoid false reports if device is down 

555 if device is None: 

556 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

557 if device is not None and device.last_ping is not None: 

558 if timestamp is None: 

559 timestamp = device.last_ping 

560 else: 

561 timestamp = max(timestamp, device.last_ping) 

562 

563 return cls( 

564 signal_id=signal_id, 

565 timestamp=timestamp, 

566 value=sample_data.get("value", None), 

567 forced_value=sample_data.get("forced_value", None), 

568 ) 

569 

570 @classmethod 

571 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

572 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

573 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

574 

575 

576class SignalData(TwinPadModel): 

577 signal_id: str 

578 forcible: bool = True 

579 time_vector: list[float] 

580 values: list[float | int | str | None] 

581 forced_values: list[float | int | str | None] 

582 

583 data_start: float | None = None 

584 data_end: float | None = None 

585 

586 number_samples: int = 0 

587 number_samples_db: int = 0 

588 

589 db_query_time: float = 0.0 

590 init_time: float = 0.0 

591 data_processing_time: float = 0.0 

592 

593 phase_id: str | None = None 

594 

595 @classmethod 

596 def get_from_signal_id( 

597 cls, 

598 signal_id: str, 

599 min_timestamp: float = None, 

600 max_timestamp: float = None, 

601 window_min_timestamp: float = None, 

602 window_max_timestamp: float = None, 

603 interpolate_bounds: bool = True, 

604 max_documents: int = None, 

605 collection=None, 

606 ) -> Self: 

607 

608 now = time.time() 

609 

610 req_signal = {} 

611 if min_timestamp is not None: 

612 req_signal.setdefault("timestamp", {}) 

613 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

614 if max_timestamp is not None: 

615 req_signal.setdefault("timestamp", {}) 

616 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

617 

618 if collection is None: 

619 collection = get_signal_collection(signal_id) 

620 if collection is None: 

621 return cls( 

622 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

623 ) 

624 

625 db_req_start = time.time() 

626 

627 sort_step = {"$sort": {"precise_timestamp": 1}} 

628 number_results = collection.count_documents(req_signal) 

629 

630 pipeline = [] 

631 if req_signal: 

632 pipeline.append({"$match": req_signal}) # Filter data if needed 

633 

634 pipeline.extend( 

635 [ 

636 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

637 sort_step, 

638 ] 

639 ) 

640 

641 if max_documents is not None and max_documents < number_results: 

642 unsampling_ratio = math.ceil(number_results / max_documents) 

643 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

644 pipeline.extend( 

645 [ 

646 { 

647 "$setWindowFields": { 

648 "sortBy": {"precise_timestamp": 1}, 

649 "output": {"index": {"$documentNumber": {}}}, 

650 } 

651 }, 

652 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

653 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

654 {"$replaceRoot": {"newRoot": "$doc"}}, 

655 {"$unset": ["index", "group_id"]}, 

656 {"$sort": {"precise_timestamp": 1}}, 

657 ] 

658 ) 

659 

660 # logger.info(f"pipeline: %s", str(pipeline)) 

661 cursor = collection.aggregate(pipeline) 

662 db_req_time = time.time() - db_req_start 

663 

664 init_time = time.time() 

665 

666 results = cursor.to_list() 

667 time_vector = [] 

668 values = [] 

669 forced_values = [] 

670 for s in results: 

671 time_vector.append(s["precise_timestamp"]) 

672 values.append(s.get("value", None)) 

673 forced_values.append(s.get("forced_value", None)) 

674 

675 signal = Signal.get_from_signal_id(signal_id) 

676 if signal is None: 

677 return cls( 

678 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

679 ) 

680 class_ = signal.signal_data_class 

681 

682 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

683 time_vector, values, forced_values = cls.interpolate_bounds( 

684 class_, 

685 collection, 

686 signal_id, 

687 time_vector, 

688 values, 

689 forced_values, 

690 window_min_timestamp, 

691 window_max_timestamp, 

692 ) 

693 

694 if values: 

695 # TODO: check below. a bit strange 

696 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

697 # Adding last value as it should be repeated 

698 time_vector.append(now) 

699 values.append(values[-1]) 

700 forced_values.append(forced_values[-1]) 

701 

702 init_time = time.time() - init_time 

703 

704 # See line 292 for explanation 

705 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

706 first_bucket = None 

707 if bucket is not None: 

708 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

709 if first_bucket is not None: 

710 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

711 else: 

712 data_start = None 

713 

714 last_bucket = None 

715 if bucket is not None: 

716 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

717 if last_bucket is not None: 

718 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

719 else: 

720 data_end = None 

721 

722 return class_( 

723 signal_id=signal_id, 

724 forcible=signal.forcible, 

725 time_vector=time_vector, 

726 values=values, 

727 forced_values=forced_values, 

728 data_start=data_start, 

729 data_end=data_end, 

730 number_samples=len(values), 

731 number_samples_db=number_results, 

732 db_query_time=db_req_time, 

733 init_time=init_time, 

734 ) 

735 

736 @staticmethod 

737 def interpolate_bounds( 

738 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

739 ): 

740 sample_right = None 

741 # Fetching right side value & interpolation 

742 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

743 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

744 sample_right = collection.find_one( 

745 { 

746 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

747 "value": {"$exists": True}, 

748 }, 

749 sort={"precise_timestamp": -1}, 

750 ) 

751 if sample_right: 

752 if time_vector: 

753 right_sd = class_( 

754 signal_id=signal_id, 

755 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

756 values=[values[-1], sample_right.get("value", None)], 

757 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

758 ) 

759 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

760 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

761 else: 

762 max_ts_value = sample_right.get("value", None) 

763 max_ts_forced_value = sample_right.get("forced_value", None) 

764 time_vector.append(window_max_timestamp) 

765 values.append(max_ts_value) 

766 forced_values.append(max_ts_forced_value) 

767 

768 # Fetching left side value & interpolation 

769 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

770 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

771 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

772 sample_left = sample_right 

773 sample_left = collection.find_one( 

774 { 

775 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

776 "value": {"$exists": True}, 

777 }, 

778 sort={"precise_timestamp": -1}, 

779 ) 

780 

781 if sample_left: 

782 if time_vector: 

783 left_sd = class_( 

784 signal_id=signal_id, 

785 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

786 values=[sample_left["value"], values[0]], 

787 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

788 ) 

789 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

790 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

791 else: 

792 min_ts_value = sample_left.get("value", None) 

793 min_ts_forced_value = sample_left.get("forced_value", None) 

794 time_vector.insert(0, window_min_timestamp) 

795 values.insert(0, min_ts_value) 

796 forced_values.insert(0, min_ts_forced_value) 

797 

798 return time_vector, values, forced_values 

799 

800 def interpolate_values(self, new_time_vector: list[float]): 

801 return self.interpolate(new_time_vector, self.values) 

802 

803 def interpolate_forced_values(self, new_time_vector: list[float]): 

804 return self.interpolate(new_time_vector, self.forced_values) 

805 

806 def uniform_desampling(self, number_samples_max: int = None) -> Self: 

807 if number_samples_max is None or self.number_samples <= number_samples_max: 

808 return self 

809 

810 data_processing_time = time.time() 

811 

812 new_time_vector = npy.linspace(self.time_vector[0], self.time_vector[-1], number_samples_max).tolist() 

813 values = self.interpolate_values(new_time_vector) 

814 forced_values = self.interpolate_forced_values(new_time_vector) 

815 number_samples = len(new_time_vector) 

816 

817 data_processing_time = time.time() - data_processing_time 

818 

819 return self.__class__( 

820 signal_id=self.signal_id, 

821 time_vector=new_time_vector, 

822 values=values, 

823 forced_values=forced_values, 

824 number_samples=number_samples, 

825 number_samples_db=self.number_samples_db, 

826 data_start=self.data_start, 

827 data_end=self.data_end, 

828 db_query_time=self.db_query_time, 

829 init_time=self.init_time, 

830 data_processing_time=self.data_processing_time + data_processing_time, 

831 phase_id=self.phase_id, 

832 ) 

833 

834 def min_max_downsampling(self, number_samples_max: int) -> Self: 

835 return self.uniform_desampling(number_samples_max) 

836 

837 def interest_window_desampling( 

838 self, 

839 window_max_number_samples: int, 

840 outside_max_number_samples: int, 

841 window_min_timestamp: float | None = None, 

842 window_max_timestamp: float | None = None, 

843 ) -> Self: 

844 """Performs a sampling in a window of interest and outside.""" 

845 

846 if not self.time_vector: 

847 return self 

848 

849 if window_min_timestamp is None: 

850 window_min_timestamp = self.time_vector[0] 

851 if window_max_timestamp is None: 

852 window_max_timestamp = self.time_vector[-1] 

853 

854 data_processing_time = time.time() 

855 

856 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

857 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

858 

859 time_vector_before = self.time_vector[:index_window_start] 

860 time_vector_window = self.time_vector[index_window_start:index_window_end] 

861 time_vector_after = self.time_vector[index_window_end:] 

862 

863 # Resampling window 

864 if time_vector_window: 

865 # Ensurring window bounds 

866 if time_vector_window[0] != window_min_timestamp: 

867 time_vector_window.insert(0, window_min_timestamp) 

868 if time_vector_window[-1] != window_max_timestamp: 

869 time_vector_window.append(window_max_timestamp) 

870 else: 

871 time_vector_window = [window_min_timestamp, window_max_timestamp] 

872 

873 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

874 # Resampling 

875 new_window_time_vector = npy.linspace( 

876 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

877 ).tolist() 

878 time_vector_window = new_window_time_vector 

879 

880 # Resampling outside 

881 time_vector_before, time_vector_after = SignalData.resample_outside_window( 

882 time_vector_before, time_vector_after, outside_max_number_samples 

883 ) 

884 

885 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

886 values = self.interpolate_values(new_time_vector) 

887 forced_values = self.interpolate_forced_values(new_time_vector) 

888 number_samples = len(values) 

889 

890 data_processing_time = time.time() - data_processing_time 

891 

892 return self.__class__( 

893 signal_id=self.signal_id, 

894 forcible=self.forcible, 

895 time_vector=new_time_vector, 

896 values=values, 

897 forced_values=forced_values, 

898 number_samples=number_samples, 

899 number_samples_db=self.number_samples, 

900 data_start=self.data_start, 

901 data_end=self.data_end, 

902 db_query_time=self.db_query_time, 

903 init_time=self.init_time, 

904 data_processing_time=self.data_processing_time + data_processing_time, 

905 ) 

906 

907 def zero_time_vector(self, data_start: float): 

908 data_processing_time = time.time() 

909 if len(self.time_vector) == 0: 

910 return self 

911 time_vector = npy.array(self.time_vector) - data_start 

912 data_processing_time = time.time() - data_processing_time 

913 

914 return self.__class__( 

915 signal_id=self.signal_id, 

916 time_vector=time_vector, 

917 values=self.values, 

918 forced_values=self.forced_values, 

919 number_samples=self.number_samples, 

920 number_samples_db=self.number_samples_db, 

921 data_start=time_vector[0], 

922 data_end=time_vector[-1], 

923 db_query_time=self.db_query_time, 

924 init_time=self.init_time, 

925 data_processing_time=self.data_processing_time + data_processing_time, 

926 ) 

927 

928 def csv_export(self): 

929 output = io.StringIO() 

930 writer = csv.writer(output) 

931 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

932 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

933 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

934 return output.getvalue().encode("utf-8") 

935 

936 def prestoplot_export(self): 

937 clean_signal_id = self.signal_id.replace(".", "_") 

938 if clean_signal_id[0].isnumeric(): 

939 clean_signal_id = "_" + clean_signal_id 

940 

941 output = io.StringIO() 

942 output.write("# Encoding:\tUTF-8\n") 

943 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

944 output.write("ISO8601\tnone\tnone\n") 

945 output.write(f"# Description :\t{clean_signal_id}\n") 

946 

947 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

948 output.write( 

949 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

950 ) 

951 return output.getvalue().encode("utf-8") 

952 

953 @staticmethod 

954 def resample_outside_window(time_vector_left, time_vector_right, outside_max_number_samples): 

955 number_samples_left = len(time_vector_left) 

956 number_samples_right = len(time_vector_right) 

957 

958 if ( 

959 outside_max_number_samples is None 

960 or (number_samples_left + number_samples_right) <= outside_max_number_samples 

961 ): 

962 return time_vector_left, time_vector_right 

963 

964 new_time_vector_left = time_vector_left 

965 new_time_vector_right = time_vector_right 

966 

967 new_number_samples_before = min( 

968 number_samples_left, 

969 math.ceil(outside_max_number_samples * number_samples_left / (number_samples_left + number_samples_right)), 

970 ) 

971 new_number_samples_after = min( 

972 number_samples_right, 

973 math.ceil(outside_max_number_samples * number_samples_right / (number_samples_left + number_samples_right)), 

974 ) 

975 # Adjusting numbers as math.ceil can do +1 on sum 

976 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

977 if new_number_samples_before > new_number_samples_after: 

978 new_number_samples_before -= 1 

979 else: 

980 new_number_samples_after -= 1 

981 

982 if new_number_samples_before > 0: 

983 new_time_vector_left = npy.linspace( 

984 time_vector_left[0], time_vector_left[-1], new_number_samples_before 

985 ).tolist() 

986 

987 if new_number_samples_after > 0: 

988 new_time_vector_right = npy.linspace( 

989 time_vector_right[-1], time_vector_right[0], new_number_samples_after 

990 ).tolist()[::-1] 

991 

992 return new_time_vector_left, new_time_vector_right 

993 

994 

995class NumericSignalData(SignalData): 

996 data_type: str = "float" 

997 values: list[float | int | None] 

998 forced_values: list[float | int | None] 

999 

1000 def interpolate(self, new_time_vector: list[float], items): 

1001 items = [npy.nan if s is None else s for s in items] 

1002 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

1003 

1004 def uniform_desampling(self, number_samples_max: int = None) -> Self: 

1005 if number_samples_max is None or self.number_samples <= number_samples_max: 

1006 return self 

1007 

1008 data_processing_time = time.time() 

1009 

1010 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

1011 forced_values = self.interpolate_forced_values(time_vector) 

1012 number_samples = len(time_vector) 

1013 

1014 data_processing_time = time.time() - data_processing_time 

1015 

1016 return self.__class__( 

1017 signal_id=self.signal_id, 

1018 time_vector=time_vector, 

1019 values=values, 

1020 forced_values=forced_values, 

1021 number_samples=number_samples, 

1022 number_samples_db=self.number_samples_db, 

1023 data_start=self.data_start, 

1024 data_end=self.data_end, 

1025 db_query_time=self.db_query_time, 

1026 init_time=self.init_time, 

1027 data_processing_time=self.data_processing_time + data_processing_time, 

1028 ) 

1029 

1030 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1031 if self.number_samples < number_samples_max: 

1032 return self 

1033 

1034 data_processing_time = time.time() 

1035 

1036 number_bins = number_samples_max // 2 

1037 

1038 time_vector = npy.array(self.time_vector, dtype=npy.float64) 

1039 values = npy.array(self.values, dtype=npy.float64) 

1040 forced_values = npy.array(self.forced_values, dtype=npy.float64) 

1041 

1042 points_per_bin = self.number_samples // number_bins 

1043 

1044 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above 

1045 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for 

1046 if self.number_samples - number_bins * points_per_bin > 1: 

1047 points_per_bin += 1 

1048 number_bins = self.number_samples // points_per_bin + 1 

1049 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan) 

1050 time_vector = npy.concatenate([time_vector, nan_points_to_add]) 

1051 values = npy.concatenate([values, nan_points_to_add]) 

1052 forced_values = npy.concatenate([forced_values, nan_points_to_add]) 

1053 

1054 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin) 

1055 values_matrix = values.reshape(number_bins, points_per_bin) 

1056 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin) 

1057 

1058 indexes_min = npy.zeros(number_bins, dtype="int64") 

1059 indexes_max = npy.zeros(number_bins, dtype="int64") 

1060 

1061 for row in range(number_bins): 

1062 min_value = values_matrix[row, 0] 

1063 max_value = values_matrix[row, 0] 

1064 for column in range(points_per_bin): 

1065 if values_matrix[row, column] < min_value: 

1066 min_value = values_matrix[row, column] 

1067 indexes_min[row] = column 

1068 elif values_matrix[row, column] > max_value: 

1069 max_value = values_matrix[row, column] 

1070 indexes_max[row] = column 

1071 

1072 row_index = npy.repeat(npy.arange(number_bins), 2) 

1073 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel() 

1074 

1075 data_processing_time = time.time() - data_processing_time 

1076 

1077 new_time_vector = timestamps_matrix[row_index, column_index] 

1078 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector) 

1079 new_values = values_matrix[row_index, column_index] 

1080 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1081 new_forced_values = forced_values_matrix[row_index, column_index] 

1082 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1083 

1084 # Make sure there are no None values for the time vector 

1085 # Numpy operator to ensure no element in new time vector is None 

1086 time_vector_filter = new_time_vector != None # pylint: disable=singleton-comparison 

1087 new_time_vector = new_time_vector[time_vector_filter] 

1088 new_values = new_values[time_vector_filter] 

1089 new_forced_values = new_forced_values[time_vector_filter] 

1090 

1091 return self.__class__( 

1092 signal_id=self.signal_id, 

1093 time_vector=new_time_vector, 

1094 values=new_values, 

1095 forced_values=new_forced_values, 

1096 number_samples=number_bins * 2, 

1097 number_samples_db=self.number_samples_db, 

1098 data_start=self.data_start, 

1099 data_end=self.data_end, 

1100 db_query_time=self.db_query_time, 

1101 init_time=self.init_time, 

1102 data_processing_time=self.data_processing_time + data_processing_time, 

1103 phase_id=self.phase_id, 

1104 ) 

1105 

1106 def interest_window_desampling( 

1107 self, 

1108 window_max_number_samples: int, 

1109 outside_max_number_samples: int, 

1110 window_min_timestamp: float | None = None, 

1111 window_max_timestamp: float | None = None, 

1112 ) -> Self: 

1113 """Performs a sampling in a window of interest and outside.""" 

1114 

1115 if not self.time_vector: 

1116 return self 

1117 

1118 if window_min_timestamp is None: 

1119 window_min_timestamp = self.time_vector[0] 

1120 if window_max_timestamp is None: 

1121 window_max_timestamp = self.time_vector[-1] 

1122 

1123 data_processing_time = time.time() 

1124 

1125 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1126 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1127 

1128 time_vector_before = self.time_vector[:index_window_start] 

1129 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1130 time_vector_after = self.time_vector[index_window_end:] 

1131 

1132 values_before = self.values[:index_window_start] 

1133 values_window = self.values[index_window_start:index_window_end] 

1134 values_after = self.values[index_window_end:] 

1135 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1136 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1137 

1138 # Resampling window 

1139 if time_vector_window: 

1140 # Ensurring window bounds 

1141 if time_vector_window[0] != window_min_timestamp: 

1142 time_vector_window.insert(0, window_min_timestamp) 

1143 values_window.insert(0, window_min_value) 

1144 if time_vector_window[-1] != window_max_timestamp: 

1145 time_vector_window.append(window_max_timestamp) 

1146 values_window.append(window_max_value) 

1147 else: 

1148 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1149 values_window = [window_min_value, window_max_value] 

1150 

1151 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

1152 # Resampling 

1153 time_vector_window, values_window = downsample_list( 

1154 time_vector_window, values_window, window_max_number_samples 

1155 ) 

1156 

1157 # Resampling outside 

1158 number_samples_before = len(time_vector_before) 

1159 number_samples_after = len(time_vector_after) 

1160 if ( 

1161 outside_max_number_samples is not None 

1162 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1163 ): 

1164 new_number_samples_before = min( 

1165 number_samples_before, 

1166 math.ceil( 

1167 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1168 ), 

1169 ) 

1170 new_number_samples_after = min( 

1171 number_samples_after, 

1172 math.ceil( 

1173 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1174 ), 

1175 ) 

1176 # Adjusting numbers as math.ceil can do +1 on sum 

1177 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1178 if new_number_samples_before > new_number_samples_after: 

1179 new_number_samples_before -= 1 

1180 else: 

1181 new_number_samples_after -= 1 

1182 

1183 if new_number_samples_before > 0: 

1184 time_vector_before, values_before = downsample_list( 

1185 time_vector_before, values_before, new_number_samples_before 

1186 ) 

1187 

1188 if new_number_samples_after > 0: 

1189 time_vector_after, values_after = downsample_list( 

1190 time_vector_after, values_after, new_number_samples_after 

1191 ) 

1192 

1193 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1194 values = values_before + values_window + values_after 

1195 forced_values = self.interpolate_forced_values(new_time_vector) 

1196 number_samples = len(values) 

1197 

1198 data_processing_time = time.time() - data_processing_time 

1199 

1200 return self.__class__( 

1201 signal_id=self.signal_id, 

1202 time_vector=new_time_vector, 

1203 values=values, 

1204 forced_values=forced_values, 

1205 number_samples=number_samples, 

1206 number_samples_db=self.number_samples, 

1207 data_start=self.data_start, 

1208 data_end=self.data_end, 

1209 db_query_time=self.db_query_time, 

1210 init_time=self.init_time, 

1211 data_processing_time=self.data_processing_time + data_processing_time, 

1212 ) 

1213 

1214 

1215class StringSignalData(SignalData): 

1216 data_type: str = "str" 

1217 values: list[str | None] 

1218 forced_values: list[str | None] 

1219 

1220 def interpolate(self, new_time_vector: list[float], items): 

1221 # Find the indices of the values in xp that are just smaller or equal to x 

1222 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1223 indices = npy.clip(indices, 0, len(items) - 1) 

1224 # Return the corresponding left string values from fp 

1225 return [items[i] for i in indices] 

1226 

1227 

1228class SignalsData(TwinPadModel): 

1229 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1230 data_processing_time: float 

1231 data_start: float | None 

1232 data_end: float | None 

1233 

1234 @classmethod 

1235 def get_from_signal_ids( 

1236 cls, 

1237 signal_ids: list[str], 

1238 min_timestamp: float = None, 

1239 max_timestamp: float = None, 

1240 window_min_timestamp: float = None, 

1241 window_max_timestamp: float = None, 

1242 interpolate_bounds: bool = True, 

1243 max_documents: int = None, 

1244 ) -> Self: 

1245 signals_data = [] 

1246 data_start = None 

1247 data_end = None 

1248 if max_timestamp is None: 

1249 max_timestamp = time.time() 

1250 data_processing_time = 0.0 

1251 

1252 signal_collections = get_signal_collections_batch(signal_ids) 

1253 

1254 for signal_id, collection in zip(signal_ids, signal_collections): 

1255 signal_data = SignalData.get_from_signal_id( 

1256 signal_id=signal_id, 

1257 min_timestamp=min_timestamp, 

1258 max_timestamp=max_timestamp, 

1259 window_min_timestamp=window_min_timestamp, 

1260 window_max_timestamp=window_max_timestamp, 

1261 interpolate_bounds=interpolate_bounds, 

1262 max_documents=max_documents, 

1263 collection=collection, 

1264 ) 

1265 data_processing_time += signal_data.data_processing_time 

1266 signals_data.append(signal_data) 

1267 if signal_data.data_start is not None: 

1268 if data_start is None: 

1269 data_start = signal_data.data_start 

1270 else: 

1271 data_start = min(signal_data.data_start, data_start) 

1272 if signal_data.data_end is not None: 

1273 if data_end is None: 

1274 data_end = signal_data.data_end 

1275 else: 

1276 data_end = max(signal_data.data_end, data_end) 

1277 

1278 return cls( 

1279 signals_data=signals_data, 

1280 data_processing_time=data_processing_time, 

1281 data_start=data_start, 

1282 data_end=data_end, 

1283 ) 

1284 

1285 @classmethod 

1286 def get_from_phase_and_signal_ids( 

1287 cls, 

1288 phases: list, 

1289 phase_sync_times: list[float | None], 

1290 signal_ids: list[str], 

1291 window_min_timestamps: list[float | None], 

1292 window_max_timestamps: list[float | None], 

1293 zero_time_vector: bool = True, 

1294 ): 

1295 signals_data: list[SignalData] = [] 

1296 computation_start = time.time() 

1297 

1298 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip( 

1299 phases, phase_sync_times, window_min_timestamps, window_max_timestamps 

1300 ): 

1301 min_timestamp = phase.start_at / 1000 

1302 max_timestamp = phase.end_at / 1000 

1303 

1304 if sync_time is None: 

1305 sync_time = min_timestamp 

1306 

1307 if window_max_timestamp is not None and window_min_timestamp is not None: 

1308 window_length = window_max_timestamp - window_min_timestamp 

1309 

1310 if window_min_timestamp != min_timestamp: 

1311 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20) 

1312 if window_max_timestamp != max_timestamp: 

1313 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20) 

1314 

1315 signal_collections = get_signal_collections_batch(signal_ids) 

1316 

1317 for signal_id, collection in zip(signal_ids, signal_collections): 

1318 signal_data = SignalData.get_from_signal_id( 

1319 signal_id, 

1320 min_timestamp, 

1321 max_timestamp, 

1322 window_min_timestamp, 

1323 window_max_timestamp, 

1324 interpolate_bounds=False, 

1325 max_documents=None, 

1326 collection=collection, 

1327 ) 

1328 

1329 if len(signal_data.time_vector) == 0: 

1330 continue 

1331 

1332 if zero_time_vector: 

1333 signal_data = signal_data.zero_time_vector(sync_time) 

1334 signal_data.phase_id = phase.id 

1335 

1336 signals_data.append(signal_data) 

1337 

1338 return cls( 

1339 signals_data=signals_data, 

1340 data_processing_time=time.time() - computation_start, 

1341 data_start=0, 

1342 data_end=0, 

1343 ) 

1344 

1345 def uniform_desampling(self, number_samples_max: int) -> Self: 

1346 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1347 return SignalsData( 

1348 signals_data=signals_data, 

1349 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1350 data_start=self.data_start, 

1351 data_end=self.data_end, 

1352 ) 

1353 

1354 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1355 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1356 return SignalsData( 

1357 signals_data=signals_data, 

1358 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1359 data_start=self.data_start, 

1360 data_end=self.data_end, 

1361 ) 

1362 

1363 def interest_window_desampling( 

1364 self, 

1365 window_max_number_samples: int, 

1366 outside_max_number_samples: int, 

1367 window_min_timestamp: float = None, 

1368 window_max_timestamp: float = None, 

1369 ) -> Self: 

1370 signals_data = [ 

1371 s.interest_window_desampling( 

1372 window_max_number_samples=window_max_number_samples, 

1373 outside_max_number_samples=outside_max_number_samples, 

1374 window_min_timestamp=window_min_timestamp, 

1375 window_max_timestamp=window_max_timestamp, 

1376 ) 

1377 for s in self.signals_data 

1378 ] 

1379 

1380 return SignalsData( 

1381 signals_data=signals_data, 

1382 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1383 data_start=self.data_start, 

1384 data_end=self.data_end, 

1385 ) 

1386 

1387 def zero_time_vector(self, data_start: float): 

1388 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data] 

1389 return SignalsData( 

1390 signals_data=signals_data, 

1391 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1392 data_start=0, 

1393 data_end=max([s.data_end for s in signals_data]), 

1394 ) 

1395 

1396 @classmethod 

1397 async def apply_single_function( 

1398 cls, 

1399 phase, 

1400 base_signal_id: str, 

1401 function: SINGLE_POST_PROCESSING_FUNCTION, 

1402 window_min_timestamp: float = None, 

1403 window_max_timestamp: float = None, 

1404 ): 

1405 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}" 

1406 

1407 processed_result_signal = Signal.get_from_signal_id(signal_id) 

1408 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids: 

1409 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp) 

1410 

1411 signals_data = cls.get_from_phase_and_signal_ids( 

1412 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False 

1413 ) 

1414 

1415 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0: 

1416 return None 

1417 

1418 new_values = None 

1419 new_forced_values = None 

1420 time_vector = npy.array(signals_data.signals_data[0].time_vector) 

1421 values = signals_data.signals_data[0].values 

1422 forced_values = signals_data.signals_data[0].forced_values 

1423 

1424 match (function): 

1425 case "Cumul": 

1426 new_values = cumul(values) 

1427 new_forced_values = cumul(forced_values) 

1428 # case "CumulDistrib": 

1429 # new_values = cumul_distrib(values) 

1430 # new_forced_values = cumul_distrib(forced_values) 

1431 case "Delta": 

1432 new_values = delta(values) 

1433 new_forced_values = delta(forced_values) 

1434 case "DeltaT": 

1435 new_values = delta(time_vector) 

1436 new_forced_values = new_values 

1437 case "Derive": 

1438 new_values = derive(time_vector, values) 

1439 new_forced_values = derive(time_vector, forced_values) 

1440 case "Integ": 

1441 new_values = integ(time_vector, values) 

1442 new_forced_values = integ(time_vector, forced_values) 

1443 

1444 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1445 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1446 

1447 loop = asyncio.get_running_loop() 

1448 loop.create_task( 

1449 cls.save_function_signal( 

1450 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible 

1451 ) 

1452 ) 

1453 

1454 if window_max_timestamp is not None: 

1455 max_timestamp_mask = time_vector <= window_max_timestamp 

1456 time_vector = time_vector[max_timestamp_mask] 

1457 new_values = new_values[max_timestamp_mask] 

1458 new_forced_values = new_forced_values[max_timestamp_mask] 

1459 if window_min_timestamp is not None: 

1460 min_timestamp_mask = time_vector >= window_min_timestamp 

1461 time_vector = time_vector[min_timestamp_mask] 

1462 new_values = new_values[min_timestamp_mask] 

1463 new_forced_values = new_forced_values[min_timestamp_mask] 

1464 

1465 signals_data.signals_data[0].time_vector = time_vector.tolist() 

1466 signals_data.signals_data[0].values = new_values.tolist() 

1467 signals_data.signals_data[0].forced_values = new_forced_values.tolist() 

1468 signals_data.signals_data[0].number_samples = time_vector.size 

1469 

1470 signals_data.signals_data[0].signal_id = signal_id 

1471 

1472 return signals_data 

1473 

1474 @classmethod 

1475 async def apply_multiple_function( 

1476 cls, 

1477 phases: list, 

1478 signal_ids: list, 

1479 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

1480 window_min_timestamp: float = None, 

1481 window_max_timestamp: float = None, 

1482 ): 

1483 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION): 

1484 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}" 

1485 else: 

1486 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}" 

1487 

1488 active_phase = phases[0] 

1489 if function in {"Align-X", "Using-X"}: 

1490 active_phase = phases[1] 

1491 

1492 processed_result_signal = Signal.get_from_signal_id(function_signal_id) 

1493 if processed_result_signal is not None and ( 

1494 active_phase.id in processed_result_signal.computed_phases_ids 

1495 ): # If signal has been computed for the correct phase 

1496 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp) 

1497 

1498 array_length = None 

1499 time_vector_list = [] 

1500 values_list = [] 

1501 forced_values_list = [] 

1502 forcible = True 

1503 for phase, signal_id in zip(phases, signal_ids): 

1504 signals_data = cls.get_from_phase_and_signal_ids( 

1505 [phase], [None], [signal_id], [None], [None], zero_time_vector=False 

1506 ) 

1507 

1508 if len(signals_data.signals_data) == 0: 

1509 return None 

1510 

1511 signal_data = signals_data.signals_data[0] 

1512 

1513 if array_length is None: 

1514 array_length = signal_data.number_samples 

1515 if ( 

1516 array_length != signal_data.number_samples and function != "Align-X" 

1517 ) or signal_data.number_samples == 0: 

1518 return None 

1519 

1520 time_vector_list.append(npy.array(signal_data.time_vector)) 

1521 values_list.append(npy.array(signal_data.values, dtype=npy.float64)) 

1522 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64)) 

1523 forcible = forcible and signal_data.forcible 

1524 

1525 time_vector = time_vector_list[0] 

1526 new_values = None 

1527 new_forced_values = None 

1528 

1529 match (function): 

1530 case "Align-X": 

1531 time_vector = time_vector_list[1] 

1532 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000 

1533 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000 

1534 new_values = align_x(old_time_vector, values_list[0], new_time_vector) 

1535 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector) 

1536 # case "Atan2": 

1537 # new_values = atan2(values_list[0], values_list[1]) 

1538 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1]) 

1539 case "Using-X": 

1540 if len(time_vector_list[0]) != len(time_vector_list[1]): 

1541 return None 

1542 time_vector = time_vector_list[1] 

1543 new_values = values_list[0] 

1544 new_forced_values = forced_values_list[0] 

1545 case "Mean": 

1546 new_values = mean(*values_list) 

1547 new_forced_values = mean(*forced_values_list) 

1548 case "Norm": 

1549 new_values = norm(*values_list) 

1550 new_forced_values = norm(*forced_values_list) 

1551 

1552 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1553 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1554 

1555 loop = asyncio.get_running_loop() 

1556 loop.create_task( 

1557 cls.save_function_signal( 

1558 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible 

1559 ) 

1560 ) 

1561 

1562 total_number_samples = time_vector.size 

1563 

1564 if window_max_timestamp is not None: 

1565 max_timestamp_mask = time_vector <= window_max_timestamp 

1566 time_vector = time_vector[max_timestamp_mask] 

1567 new_values = new_values[max_timestamp_mask] 

1568 new_forced_values = new_forced_values[max_timestamp_mask] 

1569 if window_min_timestamp is not None: 

1570 min_timestamp_mask = time_vector >= window_min_timestamp 

1571 time_vector = time_vector[min_timestamp_mask] 

1572 new_values = new_values[min_timestamp_mask] 

1573 new_forced_values = new_forced_values[min_timestamp_mask] 

1574 

1575 signals_data = cls( 

1576 signals_data=[ 

1577 NumericSignalData( 

1578 signal_id=function_signal_id, 

1579 forcible=forcible, 

1580 time_vector=time_vector.tolist(), 

1581 values=new_values.tolist(), 

1582 forced_values=new_forced_values.tolist(), 

1583 number_samples=time_vector.size, 

1584 number_samples_db=total_number_samples, 

1585 ) 

1586 ], 

1587 data_processing_time=0, 

1588 data_start=0, 

1589 data_end=0, 

1590 ) 

1591 

1592 return signals_data 

1593 

1594 @classmethod 

1595 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float): 

1596 signal_data_collection = get_signal_collection(signal_id, create=True) 

1597 pipeline = [] 

1598 match_filter = {} 

1599 if window_min_timestamp is not None or window_max_timestamp is not None: 

1600 match_filter["$match"] = {} 

1601 match_filter["$match"]["precise_timestamp"] = {} 

1602 if window_max_timestamp is not None: 

1603 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp 

1604 if window_min_timestamp is not None: 

1605 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp 

1606 

1607 total_number_samples = signal_data_collection.count_documents({}) 

1608 

1609 if match_filter: 

1610 pipeline.append(match_filter) 

1611 

1612 fetch_start = time.time() 

1613 

1614 samples = signal_data_collection.aggregate(pipeline).to_list() 

1615 new_time_vector = [] 

1616 new_values = [] 

1617 new_forced_values = [] 

1618 for sample in samples: 

1619 new_time_vector.append(sample["precise_timestamp"]) 

1620 new_values.append(sample["value"]) 

1621 new_forced_values.append(sample["forced_value"]) 

1622 

1623 return cls( 

1624 signals_data=[ 

1625 NumericSignalData( 

1626 signal_id=signal_id, 

1627 time_vector=new_time_vector, 

1628 values=new_values, 

1629 forced_values=new_forced_values, 

1630 number_samples=len(new_time_vector), 

1631 number_samples_db=total_number_samples, 

1632 ) 

1633 ], 

1634 data_processing_time=time.time() - fetch_start, 

1635 data_start=0, 

1636 data_end=0, 

1637 ) 

1638 

1639 @classmethod 

1640 async def save_function_signal( 

1641 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool 

1642 ): 

1643 # Insert data first so if it is requested by another user, it will be computed again 

1644 signal_collection = get_signal_collection(function_signal_id, create=True) 

1645 signal_collection.delete_many( 

1646 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}} 

1647 ) 

1648 signal_collection.insert_many( 

1649 [ 

1650 { 

1651 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]), 

1652 "precise_timestamp": time_vector[i], 

1653 "value": new_values[i], 

1654 "forced_value": new_forced_values[i], 

1655 } 

1656 for i in range(len(time_vector)) 

1657 ] 

1658 ) 

1659 

1660 signals_config_collection = get_collection(systems_database, "signals", create=True) 

1661 signals_config_collection.find_one_and_update( 

1662 {"signal_id": function_signal_id}, 

1663 { 

1664 "$set": { 

1665 "description": "", 

1666 "unit": None, 

1667 "type": "sensor", 

1668 "address": None, 

1669 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0 

1670 "transfer_function": None, 

1671 "precision_digits": None, 

1672 "digitization_function": None, 

1673 "data_type": "float", 

1674 "formula": None, 

1675 "forcible": forcible, 

1676 "commandable": False, 

1677 "broadcastable": True, 

1678 "signal_id": function_signal_id, 

1679 "post_processing": True, 

1680 }, 

1681 "$push": {"computed_phases_ids": phase.id}, 

1682 }, 

1683 upsert=True, 

1684 ) 

1685 

1686 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = None): 

1687 if phase_ids is None: 

1688 phase_ids = [] 

1689 if post_processing: 

1690 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1691 zip_buffer = io.BytesIO() 

1692 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1693 for signal_data in self.signals_data: 

1694 file_name = signal_data.signal_id 

1695 if post_processing: 

1696 phase = phases_by_id.get( 

1697 signal_data.phase_id, 

1698 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1699 ) 

1700 file_name = f"{signal_data.signal_id} ({phase.name})" 

1701 if file_format == "csv": 

1702 export_io = signal_data.csv_export() 

1703 zip_file.writestr(f"{file_name}.csv", export_io) 

1704 elif file_format == "prestoplot": 

1705 export_io = signal_data.prestoplot_export() 

1706 zip_file.writestr(f"{file_name}.tab", export_io) 

1707 else: 

1708 raise ValueError(f"Format not found. Got: {file_format}") 

1709 zip_bytes = zip_buffer.getvalue() 

1710 return zip_bytes 

1711 

1712 def hdf5_export(self, post_processing: bool = False, phase_ids: list = None): 

1713 if phase_ids is None: 

1714 phase_ids = [] 

1715 if post_processing: 

1716 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1717 hdf5_buffer = io.BytesIO() 

1718 custom_type_float = npy.dtype( 

1719 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1720 ) 

1721 custom_type_string = npy.dtype( 

1722 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1723 ) 

1724 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1725 for signal_data in self.signals_data: 

1726 if post_processing: 

1727 phase = phases_by_id.get( 

1728 signal_data.phase_id, 

1729 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1730 ) 

1731 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})") 

1732 else: 

1733 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1734 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1735 if signal_data.data_type == "str": 

1736 export_data = npy.array( 

1737 list( 

1738 zip( 

1739 date_vector, 

1740 signal_data.time_vector, 

1741 signal_data.values, 

1742 signal_data.forced_values, 

1743 ) 

1744 ), 

1745 dtype=custom_type_string, 

1746 ) 

1747 else: 

1748 export_data = npy.array( 

1749 list( 

1750 zip( 

1751 date_vector, 

1752 signal_data.time_vector, 

1753 signal_data.values, 

1754 signal_data.forced_values, 

1755 ) 

1756 ), 

1757 dtype=custom_type_float, 

1758 ) 

1759 signal_group["data"] = export_data 

1760 return hdf5_buffer.getvalue() 

1761 

1762 

1763class SignalStatus(TwinPadModel): 

1764 status: str = "down" 

1765 reason: str = "" 

1766 delay: float | None = None 

1767 

1768 

1769class DigitizationFunction(TwinPadModel): 

1770 bits: int | None = None 

1771 min_value: float 

1772 max_value: float 

1773 min_raw_value: float 

1774 max_raw_value: float 

1775 

1776 

1777class SignalUpdate(TwinPadModel): 

1778 value: float | str | bool | int | None = None 

1779 forced_value: float | str | bool | int | None = None 

1780 timestamp: int | None = None 

1781 

1782 

1783class SignalType(str, Enum): 

1784 command = "command" 

1785 sensor = "sensor" 

1786 external_sensor = "external_sensor" 

1787 

1788 

1789SIGNALDATA_TYPES = { 

1790 "int": NumericSignalData, 

1791 "float": NumericSignalData, 

1792 "str": StringSignalData, 

1793 "bool": NumericSignalData, 

1794 "epoch": NumericSignalData, 

1795} 

1796 

1797 

1798class TransferFunction(TwinPadModel): 

1799 intervals: list[list[float]] 

1800 

1801 

1802class Signal(GenericMongo): 

1803 collection_name: ClassVar[str] = "signals" 

1804 

1805 signal_id: str 

1806 frequency: float 

1807 unit: str | None 

1808 description: str 

1809 type: SignalType 

1810 data_type: str 

1811 transfer_function: TransferFunction | None = None 

1812 precision_digits: int | None 

1813 forcible: bool 

1814 commandable: bool 

1815 broadcastable: bool 

1816 status: SignalStatus = SignalStatus() 

1817 

1818 post_processing: bool = False 

1819 computed_phases_ids: list[str] = [] 

1820 

1821 digitization_function: DigitizationFunction | None 

1822 

1823 @property 

1824 def device(self) -> Device: 

1825 device_id = self.signal_id.split(".")[0] 

1826 device = Device.get_one_by_attribute("device_id", device_id) 

1827 return device 

1828 

1829 @cached_property 

1830 def signal_data_class(self): 

1831 if self.data_type in SIGNALDATA_TYPES: 

1832 return SIGNALDATA_TYPES[self.data_type] 

1833 if self.data_type.startswith("enum"): 

1834 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1835 raise ValueError(f"Unhandled python type: {self.data_type}") 

1836 

1837 @cached_property 

1838 def python_type(self): 

1839 if self.data_type in TYPES: 

1840 return TYPES[self.data_type] 

1841 if self.data_type.startswith("enum"): 

1842 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1843 return Literal[*choices] 

1844 raise ValueError(f"Unhandled python type: {self.data_type}") 

1845 

1846 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1847 command = Command( 

1848 sent_at=time.time(), 

1849 command_type="Signal command", 

1850 user_id=current_user.id, 

1851 ) 

1852 

1853 has_input_error = False 

1854 error_message = "" 

1855 

1856 if self.data_type.startswith("enum"): 

1857 enum_options = get_args(self.python_type) 

1858 

1859 if update_dict.value is not None and update_dict.value not in enum_options: 

1860 has_input_error = True 

1861 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1862 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1863 has_input_error = True 

1864 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1865 else: 

1866 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1867 has_input_error = True 

1868 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1869 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1870 has_input_error = True 

1871 error_message += ( 

1872 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1873 ) 

1874 if ( 

1875 self.transfer_function is not None 

1876 and update_dict.value is not None 

1877 and ( 

1878 (min_value := self.transfer_function.intervals[0][1]) > update_dict.value 

1879 or update_dict.value > (max_value := self.transfer_function.intervals[1][1]) 

1880 ) 

1881 ): 

1882 has_input_error = True 

1883 error_message += ( 

1884 f"Impossible value: {update_dict.value} is not within the transfer function's bounds" 

1885 f"([{min_value}, {max_value}])\n" 

1886 ) 

1887 

1888 if has_input_error: 

1889 command.response_time = 0 

1890 command.succeeded = False 

1891 command.description = f"Tried to modify signal {self.signal_id}" 

1892 response = {"error": True, "status_code": 400, "message": error_message} 

1893 else: 

1894 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict) 

1895 command.receive_response(response) 

1896 

1897 Command.create(command) 

1898 return response 

1899 

1900 @classmethod 

1901 def get_from_signal_id(cls, signal_id) -> Self: 

1902 """Could be generic from mongo""" 

1903 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1904 if not raw_value: 

1905 return None 

1906 del raw_value["_id"] 

1907 return cls.dict_to_object(raw_value) 

1908 

1909 @classmethod 

1910 def get_all_ids(cls) -> list[str]: 

1911 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1912 

1913 return [signal["signal_id"] for signal in cursor] 

1914 

1915 @classmethod 

1916 def get_all_statuses(cls) -> list[dict]: 

1917 cursor = cls.collection().aggregate( 

1918 [{"$project": {"signal_id": 1, "status": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}] 

1919 ) 

1920 

1921 return [ 

1922 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1923 for signal in cursor 

1924 ] 

1925 

1926 async def number_samples(self): 

1927 collection = get_signal_collection(signal_id=self.signal_id) 

1928 if collection is None: 

1929 return 0 

1930 

1931 number_samples = collection.estimated_document_count() 

1932 

1933 number_samples_async_collection = await get_async_collection( 

1934 systems_async_database, "number_samples", create=True, time_series=True 

1935 ) 

1936 

1937 loop = asyncio.get_running_loop() 

1938 loop.create_task( 

1939 number_samples_async_collection.insert_one( 

1940 { 

1941 "timestamp": datetime.datetime.now(pytz.UTC), 

1942 "signal_id": self.signal_id, 

1943 "number_samples": number_samples, 

1944 } 

1945 ) 

1946 ) 

1947 

1948 return number_samples 

1949 

1950 @classmethod 

1951 def total_number_samples(cls) -> int: 

1952 TwinPadActivity.get_number_samples_timeframe(0, 0, False) 

1953 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1954 

1955 if number_samples_collection is None: 

1956 return 0 

1957 

1958 result = number_samples_collection.aggregate( 

1959 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}] 

1960 ) 

1961 

1962 result = result.to_list() 

1963 if len(result) == 0: 

1964 return 0 

1965 return result[0]["amount"] 

1966 

1967 def sample_datasize(self): 

1968 return signals_database.command("collstats", self.signal_id)["size"] 

1969 

1970 @classmethod 

1971 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1972 result = cls.collection().aggregate( 

1973 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1974 ) 

1975 

1976 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1977 

1978 

1979class ForcedSignal(GenericMongo): 

1980 collection_name: ClassVar[str] = "forced_signals" 

1981 

1982 signal_id: str 

1983 forcing_user_id: str 

1984 forced_at: float 

1985 value: str | float 

1986 

1987 def insert(self): 

1988 insert_result = self.collection().find_one_and_update( 

1989 {"signal_id": self.signal_id}, 

1990 {"$set": self.to_dict(exclude={"id"})}, 

1991 upsert=True, 

1992 return_document=ReturnDocument.AFTER, 

1993 ) 

1994 self.id = str(insert_result["_id"]) 

1995 return self.id 

1996 

1997 @classmethod 

1998 def can_force(cls, signal_id: str, current_user: User) -> bool: 

1999 """Checks whether user can force a given signal. 

2000 

2001 :param signal_id: Signal ID of the signal to force 

2002 :type signal_id: str 

2003 :param current_user: Current user 

2004 :type current_user: User 

2005 :return: False if the signal was forced by someone else than the user, True otherwise 

2006 :rtype: bool 

2007 """ 

2008 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

2009 if forced_signal is not None: 

2010 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

2011 return False 

2012 return True 

2013 

2014 

2015class ServicesStatus(TwinPadModel): 

2016 backend: str 

2017 cloud_broker: str 

2018 time_series_database: str 

2019 signal_storage: str 

2020 heartbeat_storage: str 

2021 data_analyzer: str 

2022 

2023 @classmethod 

2024 def check(cls) -> Self: 

2025 return cls( 

2026 cloud_broker=ping(RABBITMQ_HOST), 

2027 backend="up", 

2028 time_series_database=ping(MONGO_HOST), 

2029 signal_storage=ping(SIGNAL_STORAGE_HOST), 

2030 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

2031 data_analyzer=ping(DATA_ANALYZER_HOST), 

2032 ) 

2033 

2034 

2035def ping(host): 

2036 try: 

2037 if ping3.ping(host, timeout=0.8): 

2038 return "up" 

2039 except PermissionError: 

2040 pass 

2041 return "down" 

2042 

2043 

2044class Event(GenericMongo): 

2045 collection_name: ClassVar[str] = "events" 

2046 

2047 name: str 

2048 timestamp: float 

2049 event_rule_id: str 

2050 

2051 @computed_field 

2052 @cached_property 

2053 def event_rule(self) -> "EventRule": 

2054 return EventRule.get_from_id(self.event_rule_id) 

2055 

2056 @classmethod 

2057 def dict_to_object(cls, dict_): 

2058 """Refine to convert timestamp to datetime for mongodb.""" 

2059 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

2060 return super().dict_to_object(dict_) 

2061 

2062 

2063class TwinPadActivity(GenericMongo): 

2064 timestamp: float 

2065 amount: int 

2066 

2067 @classmethod 

2068 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

2069 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2070 number_events_collection = get_collection(systems_database, "number_events") 

2071 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

2072 items = [] 

2073 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2074 if number_events_collection is None or recompute_amount: 

2075 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

2076 number_events_collection.delete_many({}) 

2077 first_event = events_collection.find_one(sort={"timestamp": 1}) 

2078 if first_event is None: 

2079 return items 

2080 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

2081 tzinfo=pytz.UTC 

2082 ) 

2083 while last_computed_day < TODAY: 

2084 day_nb_events = events_collection.count_documents( 

2085 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2086 ) 

2087 if day_nb_events > 0: 

2088 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

2089 last_computed_day += ONE_DAY_OFFSET 

2090 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2091 if number_events_today > 0: 

2092 number_events_collection.delete_many({"timestamp": TODAY}) 

2093 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

2094 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2095 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2096 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2097 for day in number_events: 

2098 day["timestamp"] = day["timestamp"].timestamp() 

2099 items.append(cls.mongo_dict_to_object(day)) 

2100 return items 

2101 

2102 @classmethod 

2103 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2104 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2105 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2106 signals_number_samples_collection = get_collection( 

2107 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

2108 ) 

2109 items = [] 

2110 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2111 if number_samples_collection is None or recompute_amount: 

2112 number_samples_collection = get_collection( 

2113 systems_database, "number_received_samples", create=True, time_series=True 

2114 ) 

2115 number_samples_collection.delete_many({}) 

2116 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

2117 if first_sample is None: 

2118 return items 

2119 # compute from day of first found event 

2120 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

2121 tzinfo=pytz.UTC 

2122 ) 

2123 while last_computed_day < TODAY: 

2124 number_samples_request = signals_number_samples_collection.aggregate( 

2125 [ 

2126 { 

2127 "$match": { 

2128 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

2129 } 

2130 }, 

2131 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2132 ] 

2133 ).to_list() 

2134 if len(number_samples_request) == 0: 

2135 number_samples = 0 

2136 else: 

2137 number_samples = number_samples_request[0].get("number_samples", 0) 

2138 if number_samples > 0: 

2139 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

2140 last_computed_day += ONE_DAY_OFFSET 

2141 number_samples_request = signals_number_samples_collection.aggregate( 

2142 [ 

2143 {"$match": {"timestamp": {"$gte": TODAY}}}, 

2144 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2145 ] 

2146 ).to_list() 

2147 if len(number_samples_request) == 0: 

2148 number_samples_today = 0 

2149 else: 

2150 number_samples_today = number_samples_request[0].get("number_samples", 0) 

2151 if number_samples_today > 0: 

2152 number_samples_collection.delete_many({"timestamp": TODAY}) 

2153 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

2154 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2155 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2156 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2157 for day in number_events: 

2158 day["timestamp"] = day["timestamp"].timestamp() 

2159 items.append(cls.mongo_dict_to_object(day)) 

2160 return items 

2161 

2162 @classmethod 

2163 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2164 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2165 number_commands_collection = get_collection(systems_database, "number_commands") 

2166 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

2167 items = [] 

2168 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2169 if number_commands_collection is None or recompute_amount: 

2170 number_commands_collection = get_collection( 

2171 systems_database, "number_commands", create=True, time_series=True 

2172 ) 

2173 number_commands_collection.delete_many({}) 

2174 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

2175 if first_command is None: 

2176 return items 

2177 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

2178 tzinfo=pytz.UTC 

2179 ) 

2180 while last_computed_day < TODAY: 

2181 day_nb_commands = commands_collection.count_documents( 

2182 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2183 ) 

2184 if day_nb_commands > 0: 

2185 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

2186 last_computed_day += ONE_DAY_OFFSET 

2187 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2188 if number_commands_today > 0: 

2189 number_commands_collection.delete_many({"timestamp": TODAY}) 

2190 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

2191 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2192 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2193 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2194 for day in number_commands: 

2195 day["timestamp"] = day["timestamp"].timestamp() 

2196 items.append(cls.mongo_dict_to_object(day)) 

2197 return items 

2198 

2199 

2200class EventRule(GenericMongo): 

2201 collection_name: ClassVar[str] = "event_rules" 

2202 

2203 name: str 

2204 formula: str 

2205 variables: list[str] 

2206 

2207 @computed_field 

2208 @cached_property 

2209 def number_events(self) -> int: 

2210 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

2211 

2212 

2213class Company(GenericMongo): 

2214 collection_name: ClassVar[str] = "companies" 

2215 name: str 

2216 

2217 

2218class Campaign(GenericMongo): 

2219 collection_name: ClassVar[str] = "campaigns" 

2220 

2221 # Properties 

2222 id: str | None = None 

2223 name: str 

2224 description: str | None = None 

2225 

2226 

2227class Phase(GenericMongo): 

2228 collection_name: ClassVar[str] = "phases" 

2229 

2230 # Properties 

2231 id: str | None = None 

2232 name: str 

2233 description: str | None = None 

2234 start_at: float 

2235 end_at: float 

2236 

2237 # FK 

2238 campaign_id: MongoId 

2239 

2240 @classmethod 

2241 def deleteMany(cls, campaign_id): 

2242 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

2243 return delete_phases 

2244 

2245 

2246class CustomViewCreation(GenericMongo): 

2247 collection_name: ClassVar[str] = "custom_views" 

2248 

2249 name: str 

2250 configuration: list 

2251 

2252 

2253class CustomView(CustomViewCreation): 

2254 # Properties 

2255 id: str | None = None 

2256 

2257 # Foreign Key 

2258 user_id: str 

2259 

2260 

2261CustomViewUpdate = create_update_model(CustomView) 

2262 

2263 

2264class Video(GenericMongo): 

2265 collection_name: ClassVar[str] = "videos" 

2266 

2267 # Properties 

2268 name: str 

2269 ip_addr: str 

2270 username: str | None = None 

2271 password: str | None = None 

2272 

2273 # Methods 

2274 @classmethod 

2275 def get_all(cls, sort_by="_id") -> list[Self]: 

2276 items = [] 

2277 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

2278 items.append(cls.mongo_dict_to_object(dict_)) 

2279 return items 

2280 

2281 @classmethod 

2282 def get_video(cls, camera_id: ObjectId): 

2283 camera = cls.get_from_id(camera_id) 

2284 if camera is not None: 

2285 return camera.name 

2286 return None 

2287 

2288 

2289class Command(GenericMongo): 

2290 collection_name: ClassVar[str] = "commands" 

2291 

2292 # Properties 

2293 timestamp: datetime.datetime = None 

2294 sent_at: float 

2295 response_time: float = 0.0 

2296 command_type: str 

2297 description: str = "" 

2298 succeeded: bool = False 

2299 

2300 # Foreign key 

2301 user_id: str 

2302 

2303 @classmethod 

2304 def collection(cls): 

2305 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

2306 

2307 @classmethod 

2308 def create(cls, command: Self): 

2309 command = cls( 

2310 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

2311 sent_at=command.sent_at, 

2312 response_time=command.response_time, 

2313 command_type=command.command_type, 

2314 description=command.description, 

2315 succeeded=command.succeeded, 

2316 user_id=command.user_id, 

2317 ) 

2318 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2319 if new_command is None: 

2320 return None 

2321 return {"command_id": str(new_command.inserted_id)} 

2322 

2323 def receive_response(self, response: dict): 

2324 self.response_time = time.time() - self.sent_at 

2325 self.succeeded = response.get("error", True) is False 

2326 if self.description == "": 

2327 self.description += response.get("message", "").rstrip() 

2328 

2329 

2330class SignalsPresetCreation(GenericMongo): 

2331 name: str 

2332 signal_ids: list[str] 

2333 

2334 

2335class SignalsPreset(SignalsPresetCreation): 

2336 collection_name: ClassVar[str] = "signals_presets" 

2337 

2338 user_id: str 

2339 

2340 @classmethod 

2341 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2342 signals_preset = cls( 

2343 user_id=user_id, 

2344 name=signals_preset.name, 

2345 signal_ids=signals_preset.signal_ids, 

2346 ) 

2347 

2348 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2349 

2350 return str(new_signal_preset.inserted_id) 

2351 

2352 

2353SignalsPresetUpdate = create_update_model(SignalsPreset) 

2354 

2355 

2356class LineStyle(str, Enum): 

2357 solid = "solid" 

2358 dotted = "dotted" 

2359 dashed = "dashed" 

2360 

2361 

2362class SignalAppearance: 

2363 value_color: str 

2364 forced_value_color: str 

2365 

2366 

2367class GraphThemeCreation(GenericMongo): 

2368 collection_name: ClassVar[str] = "graph_themes" 

2369 

2370 name: str 

2371 signal_id: str 

2372 value_color: str = "" 

2373 forced_value_color: str = "" 

2374 value_line_style: LineStyle = LineStyle.solid 

2375 forced_value_line_style: LineStyle = LineStyle.solid 

2376 private: bool = True 

2377 

2378 

2379class PublicGraphTheme(GraphThemeCreation): 

2380 created_by_user: bool 

2381 in_user_library: bool 

2382 active_for_user: bool 

2383 

2384 _current_user_id: str = "" 

2385 

2386 @classproperty 

2387 def custom_pipeline_steps(cls) -> dict[str, list]: # pylint: disable=no-self-argument 

2388 return { 

2389 "created_by_user": [ 

2390 { 

2391 "$addFields": { 

2392 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2393 } 

2394 } 

2395 ], 

2396 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2397 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2398 ], 

2399 "in_user_library": [ 

2400 { 

2401 "$addFields": { 

2402 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2403 } 

2404 } 

2405 ], 

2406 "active_for_user": [ 

2407 { 

2408 "$addFields": { 

2409 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2410 } 

2411 } 

2412 ], 

2413 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2414 "active": [ 

2415 { 

2416 "$addFields": { 

2417 "active": "$$REMOVE", 

2418 } 

2419 } 

2420 ], 

2421 "creator_id": [ 

2422 { 

2423 "$addFields": { 

2424 "creator_id": "$$REMOVE", 

2425 } 

2426 } 

2427 ], 

2428 } 

2429 

2430 @classmethod 

2431 def response_from_query(cls, query, user_id: str = None) -> ListResponse[Self]: 

2432 if user_id is None: 

2433 return ListResponse(items=[], limit=0, offset=0, sort_by="", total=0) 

2434 cls._current_user_id = user_id 

2435 return super().response_from_query(query) 

2436 

2437 @classmethod 

2438 def response_from_query_in_user_library(cls, query, user_id: str = None) -> ListResponse[Self]: 

2439 if user_id is None: 

2440 return ListResponse(items=[], limit=0, offset=0, sort_by="", total=0) 

2441 query.in_user_library = "true" 

2442 return cls.response_from_query(query, user_id) 

2443 

2444 @classmethod 

2445 def get_from_id(cls, item_id, user_id: str = None) -> Self | None: 

2446 if user_id is None: 

2447 return None 

2448 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2449 

2450 @classmethod 

2451 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str = None): 

2452 if user_id is None: 

2453 return None 

2454 cls._current_user_id = user_id 

2455 return super().get_by_attribute(attribute_name, attribute_value) 

2456 

2457 @classmethod 

2458 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str = None): 

2459 if user_id is None: 

2460 return None 

2461 cls._current_user_id = user_id 

2462 return super().get_one_by_attribute(attribute_name, attribute_value) 

2463 

2464 @classmethod 

2465 def get_all(cls, sort_by: str, user_id: str = None): 

2466 if user_id is None: 

2467 return [] 

2468 cls._current_user_id = user_id 

2469 return super().get_all(sort_by) 

2470 

2471 @classmethod 

2472 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2473 pipeline = [ 

2474 { 

2475 "$match": { 

2476 "active": {"$eq": user_id}, 

2477 "signal_id": {"$in": signal_ids}, 

2478 } 

2479 }, 

2480 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2481 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2482 { 

2483 "$project": { 

2484 "_id": 0, 

2485 "signal_id": 1, 

2486 "value_color": 1, 

2487 "forced_value_color": 1, 

2488 "value_line_style": 1, 

2489 "forced_value_line_style": 1, 

2490 } 

2491 }, 

2492 ] 

2493 

2494 result = {} 

2495 

2496 cursor = cls.collection().aggregate(pipeline) 

2497 for document in cursor: 

2498 signal_id = document["signal_id"] 

2499 del document["signal_id"] 

2500 result[signal_id] = document 

2501 

2502 return result 

2503 

2504 

2505GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2506 

2507 

2508class PrivateGraphTheme(GraphThemeCreation): 

2509 # private 

2510 creator_id: str 

2511 in_library: list[str] 

2512 active: list[str] 

2513 

2514 @classmethod 

2515 def create( 

2516 cls, 

2517 creator_id: str, 

2518 name: str, 

2519 signal_id: str, 

2520 value_color: str, 

2521 forced_value_color: str, 

2522 value_line_style: LineStyle, 

2523 forced_value_line_style: LineStyle, 

2524 private: bool, 

2525 ): 

2526 color_setting = cls( 

2527 creator_id=creator_id, 

2528 name=name, 

2529 signal_id=signal_id, 

2530 value_color=value_color, 

2531 forced_value_color=forced_value_color, 

2532 value_line_style=value_line_style, 

2533 forced_value_line_style=forced_value_line_style, 

2534 private=private, 

2535 in_library=[creator_id], 

2536 active=[creator_id], 

2537 ) 

2538 

2539 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2540 color_setting.id = str(new_color_setting.inserted_id) 

2541 return color_setting 

2542 

2543 def update(self, update_dict: dict, user_id: str = None): 

2544 if user_id is None: 

2545 return {} 

2546 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2547 if in_user_lib and user_id not in self.in_library: 

2548 self.in_library.append(user_id) 

2549 elif not in_user_lib and user_id in self.in_library: 

2550 self.in_library.remove(user_id) 

2551 update_dict["in_library"] = self.in_library 

2552 del update_dict["in_user_library"] 

2553 

2554 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2555 if active_for_user and user_id not in self.active: 

2556 self.active.append(user_id) 

2557 elif not active_for_user and user_id in self.active: 

2558 self.active.remove(user_id) 

2559 update_dict["active"] = self.active 

2560 del update_dict["active_for_user"] 

2561 

2562 if update_dict.get("created_by_user") is not None: 

2563 del update_dict["created_by_user"] 

2564 

2565 self.collection().find_one_and_update( 

2566 {"_id": ObjectId(self.id)}, 

2567 {"$set": update_dict}, 

2568 ) 

2569 

2570 return {} 

2571 

2572 

2573class DeviceStatus(str, Enum): 

2574 started = "started" 

2575 running = "running" 

2576 created = "created" 

2577 exited = "exited" 

2578 restarting = "restarting" 

2579 

2580 

2581class DeviceUpdateFromDeployer(BaseModel): 

2582 status: DeviceStatus 

2583 

2584 

2585class DeviceFromDeployerCreation(BaseModel): 

2586 name: str 

2587 description: str 

2588 

2589 

2590class DeviceFromDeployer(DeviceFromDeployerCreation): 

2591 status: DeviceStatus 

2592 device_id: DeviceId 

2593 logs: str = "" 

2594 

2595 

2596class DeviceDeployer(GenericMongo): 

2597 collection_name: ClassVar[str] = "device_deployers" 

2598 url: HttpUrl 

2599 

2600 def endpoint_url(self, endpoint): 

2601 return f"{str(self.url).rstrip('/')}/{endpoint}" 

2602 

2603 def devices(self) -> list[DeviceFromDeployer]: 

2604 devices = [] 

2605 try: 

2606 response = requests.get(self.endpoint_url("devices")) 

2607 except requests.exceptions.ConnectionError: 

2608 logger.info("connection error") 

2609 return None 

2610 if response.status_code != 200: 

2611 return None 

2612 for device_dict in response.json()["devices"]: 

2613 devices.append( 

2614 DeviceFromDeployer( 

2615 device_id=device_dict["device_id"], 

2616 name=device_dict["container_name"], 

2617 description="desc", 

2618 status=device_dict["status"], 

2619 logs=device_dict["logs"], 

2620 ) 

2621 ) 

2622 return devices 

2623 

2624 def get_device(self, device_id: DeviceId): 

2625 try: 

2626 response = requests.get(self.endpoint_url(f"devices/{device_id}")) 

2627 except requests.exceptions.ConnectionError: 

2628 return None 

2629 if response.status_code != 200: 

2630 return None 

2631 device_dict = response.json() 

2632 return DeviceFromDeployer( 

2633 device_id=device_dict["device_id"], 

2634 name=device_dict["container_name"], 

2635 description="desc", 

2636 status=device_dict["status"], 

2637 logs=device_dict["logs"], 

2638 ) 

2639 

2640 def create_device(self, device: DeviceFromDeployer) -> Device | None: 

2641 try: 

2642 response = requests.post(self.endpoint_url("devices"), json={"name": device.name}) 

2643 except requests.exceptions.ConnectionError: 

2644 return None 

2645 

2646 if response.status_code != 201: 

2647 return None 

2648 

2649 device_dict = response.json() 

2650 return DeviceFromDeployer( 

2651 device_id=device_dict["device_id"], 

2652 name="", 

2653 description="desc", 

2654 status=device_dict["status"], 

2655 ) 

2656 

2657 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None: 

2658 try: 

2659 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump()) 

2660 except requests.exceptions.ConnectionError: 

2661 return None 

2662 

2663 if response.status_code != 200: 

2664 return None 

2665 

2666 device_dict = response.json() 

2667 return Device( 

2668 device_id=device_dict["device_id"], 

2669 name="", 

2670 description="desc", 

2671 pid={}, 

2672 petri_network={}, 

2673 modes=[], 

2674 status=device_dict["status"], 

2675 ) 

2676 

2677 def delete_device(self, device_id: DeviceId) -> DeleteInfo: 

2678 try: 

2679 response = requests.delete(self.endpoint_url(f"devices/{device_id}")) 

2680 except requests.exceptions.ConnectionError: 

2681 return DeleteInfo(is_deleted=False, detail="Connection to deployer error") 

2682 if response.status_code not in [200, 202, 204]: 

2683 return DeleteInfo(is_deleted=False, detail=response.text) 

2684 

2685 return DeleteInfo(is_deleted=True, detail="") 

2686 

2687 

2688DeviceDeployerUpdate = create_update_model(DeviceDeployer)