Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%

1370 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-03 16:35 +0000

1from functools import cached_property 

2import os 

3import re 

4import io 

5import time 

6import csv 

7from typing import Self, ClassVar, Any, Literal, get_args, Annotated 

8import datetime 

9import math 

10import bisect 

11from enum import Enum 

12import logging 

13import copy 

14import asyncio 

15import requests 

16 

17import zipfile 

18import ping3 

19import pytz 

20from bson.objectid import ObjectId 

21from pymongo import ASCENDING, ReturnDocument 

22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator 

23import numpy as npy 

24import lttb 

25import h5py 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 signals_async_database, 

36 devices_states_database, 

37) 

38from twinpad_backend.responses import ListResponse 

39from twinpad_backend.messages import RabbitMQClient 

40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm 

41 

42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"] 

44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"] 

45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"] 

46 

47 

48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

53 

54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

55NUMBER_SAMPLES_DATABASE_UPDATE = 120 

56 

57logger = logging.getLogger("uvicorn.error") 

58 

59 

60class DeleteInfo(BaseModel): 

61 is_deleted: bool 

62 detail: str 

63 

64 

65class classproperty: 

66 """ 

67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

68 Found here: https://stackoverflow.com/a/76301341 

69 """ 

70 

71 def __init__(self, func): 

72 self.fget = func 

73 

74 def __get__(self, _, owner): 

75 return self.fget(owner) 

76 

77 

78def create_update_model(model): 

79 fields = {} 

80 

81 for field_name, field_annotation in model.model_fields.items(): 

82 if field_name != "id": 

83 fields[field_name] = (field_annotation.annotation | None, None) 

84 

85 query_name = model.__name__ + "Update" 

86 return create_model(query_name, **fields) 

87 

88 

89def get_utc_date_from_timestamp(timestamp: float): 

90 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

91 

92 

93def downsample_list(time_vector: list, values: list, max_number_samples: int): 

94 if len(time_vector) < max_number_samples: 

95 return time_vector, values 

96 

97 time_vector_copy = copy.deepcopy(time_vector) 

98 values_copy = copy.deepcopy(values) 

99 

100 none_group_bounds = [] 

101 none_group_index = -1 

102 index = -1 

103 # LTTB doesn't handle None values so remove them 

104 while values_copy.count(None) > 0: 

105 # Store bounds of None value groups so we can insert them back after the downsampling 

106 if (new_index := values_copy.index(None)) != index: 

107 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

108 none_group_index += 1 

109 elif len(none_group_bounds[none_group_index]) < 2: 

110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

111 else: 

112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

113 values_copy.pop(new_index) 

114 index = new_index 

115 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

116 

117 try: 

118 values_array = npy.array([time_vector_copy, values_copy]).T 

119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

120 

121 new_time_vector = interpolated_values[:, 0].tolist() 

122 new_values = interpolated_values[:, 1].tolist() 

123 except ValueError: 

124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

128 return new_time_vector, new_values_nan_to_none 

129 

130 # insert back None values at the correct timestamps 

131 for none_group in none_group_bounds: 

132 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

133 new_time_vector[start_index:start_index] = none_group 

134 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

135 

136 return new_time_vector, new_values 

137 

138 

139def is_of_type(value, wanted_type): 

140 if wanted_type is float: 

141 return isinstance(value, (int, float)) 

142 return isinstance(value, wanted_type) 

143 

144 

145# Models 

146class TwinPadModel(BaseModel): 

147 @classmethod 

148 def dict_to_object(cls, dict_): 

149 return cls.model_validate(dict_) 

150 

151 def to_dict(self, exclude=None): 

152 dict_ = self.model_dump(exclude=exclude, mode="json") 

153 return dict_ 

154 

155 

156def validate_mongo_id(v): 

157 if not ObjectId.is_valid(v): 

158 raise ValueError("Invalid MongoDB id") 

159 return str(v) 

160 

161 

162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)] 

163 

164 

165def validate_12_hex(v: str) -> str: 

166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v): 

167 raise ValueError("ID must be a 12-character hexadecimal string") 

168 return v 

169 

170 

171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)] 

172 

173 

174class GenericMongo(TwinPadModel): 

175 id: MongoId | None = None 

176 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

177 

178 @classmethod 

179 def collection(cls): 

180 return get_collection(systems_database, cls.collection_name, create=True) 

181 

182 @classmethod 

183 def response_from_query(cls, query) -> ListResponse[Self]: 

184 request_filters = query.mongodb_filter() 

185 items = [] 

186 

187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

188 sort_dict = {} 

189 for sort in query.sort_by.split(","): 

190 if ":" in sort: 

191 sort_field, sort_order = sort.split(":") 

192 sort_order = int(sort_order) 

193 else: 

194 sort_field = sort 

195 sort_order = 1 

196 sort_dict[sort_field] = sort_order 

197 

198 collection = get_collection(systems_database, cls.collection_name, create=True) 

199 total = collection.count_documents(request_filters) 

200 

201 pipeline = [] 

202 added_properties = [] 

203 if "$and" in request_filters: 

204 for request_filter in request_filters["$and"]: 

205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

206 if filtered_property in request_filter: 

207 pipeline.extend(pipeline_steps) 

208 added_properties.append(filtered_property) 

209 else: 

210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

211 if filtered_property in request_filters: 

212 pipeline.extend(pipeline_steps) 

213 added_properties.append(filtered_property) 

214 pipeline.append({"$match": request_filters}) 

215 

216 for sort_field in sort_dict.keys(): 

217 if sort_field in cls.custom_pipeline_steps: 

218 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

219 added_properties.append(sort_field) 

220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

221 

222 if (query.limit is not None) and (query.limit != 0): 

223 pipeline.append({"$limit": query.limit}) 

224 

225 for filtered_property, step in cls.custom_pipeline_steps.items(): 

226 if filtered_property not in added_properties: 

227 pipeline.extend(step) 

228 

229 cursor = collection.aggregate(pipeline) 

230 

231 for item_dict in cursor: 

232 items.append(cls.mongo_dict_to_object(item_dict)) 

233 

234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

235 

236 @classmethod 

237 def get_from_id(cls, item_id) -> Self | None: 

238 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

239 

240 @classmethod 

241 def mongo_dict_to_object(cls, mongo_dict): 

242 mongo_dict["id"] = str(mongo_dict["_id"]) 

243 del mongo_dict["_id"] 

244 return cls.dict_to_object(mongo_dict) 

245 

246 @classmethod 

247 def get_by_attribute(cls, attribute_name: str, attribute_value): 

248 """Returns all items that match the attribute with value.""" 

249 pipeline = [] 

250 if attribute_name in cls.custom_pipeline_steps: 

251 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

252 pipeline.append({"$match": {attribute_name: attribute_value}}) 

253 for key, step in cls.custom_pipeline_steps.items(): 

254 if key != attribute_name: 

255 pipeline.extend(step) 

256 items = cls.collection().aggregate(pipeline) 

257 if items is None: 

258 return None 

259 return [cls.mongo_dict_to_object(d) for d in items] 

260 

261 @classmethod 

262 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

263 pipeline = [] 

264 if attribute_name in cls.custom_pipeline_steps: 

265 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

266 pipeline.append({"$match": {attribute_name: attribute_value}}) 

267 pipeline.append({"$limit": 1}) 

268 for key, step in cls.custom_pipeline_steps.items(): 

269 if key != attribute_name: 

270 pipeline.extend(step) 

271 items = cls.collection().aggregate(pipeline).to_list() 

272 if len(items) == 0: 

273 return None 

274 return cls.mongo_dict_to_object(items[0]) 

275 

276 @classmethod 

277 def get_all(cls, sort_by="_id") -> list[Self]: 

278 items = [] 

279 pipeline = [] 

280 if sort_by in cls.custom_pipeline_steps: 

281 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

282 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

283 for key, step in cls.custom_pipeline_steps.items(): 

284 if key != sort_by: 

285 pipeline.extend(step) 

286 for dict_ in cls.collection().aggregate(pipeline): 

287 items.append(cls.mongo_dict_to_object(dict_)) 

288 return items 

289 

290 @classmethod 

291 def get_number_documents(cls): 

292 collection = get_collection(systems_database, cls.collection_name) 

293 if collection is None: 

294 return 0 

295 return collection.count_documents( 

296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]} 

297 ) 

298 

299 def insert(self): 

300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

301 self.id = str(insert_result.inserted_id) 

302 return self.id 

303 

304 def update(self, update_dict): 

305 for key, value in update_dict.items(): 

306 setattr(self, key, value) 

307 self.collection().find_one_and_update( 

308 {"_id": ObjectId(self.id)}, 

309 {"$set": update_dict}, 

310 return_document=ReturnDocument.AFTER, 

311 ) 

312 

313 return self 

314 

315 def delete(self): 

316 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

317 return result.deleted_count > 0 

318 

319 

320class User(GenericMongo): 

321 collection_name: ClassVar[str] = "users" 

322 

323 firstname: str 

324 lastname: str 

325 email: str 

326 password: str 

327 is_active: bool | None = False 

328 is_admin: bool | None = False 

329 is_connected: bool | None = False 

330 company_id: str | None = None 

331 

332 def to_dict(self, exclude: set = set()): 

333 exclude.add("password") 

334 return GenericMongo.to_dict(self, exclude=exclude) 

335 

336 @classmethod 

337 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

338 users = cls.get_all() 

339 if not users: 

340 is_admin = True 

341 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

342 user_collection = get_collection(systems_database, "users", create=True) 

343 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

344 if new_user is None: 

345 return None 

346 return {"user_id": str(new_user.inserted_id)} 

347 

348 @classmethod 

349 def update_info(cls, user: "UserUpdate", user_id: str): 

350 updated_user = cls.collection().find_one_and_update( 

351 {"_id": ObjectId(user_id)}, 

352 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

353 return_document=ReturnDocument.AFTER, 

354 ) 

355 updated_user["id"] = str(updated_user["_id"]) 

356 del (updated_user["_id"], updated_user["is_connected"]) 

357 return cls(**updated_user) 

358 

359 

360UserUpdate = create_update_model(User) 

361 

362 

363class Mode(TwinPadModel): 

364 mode_id: int 

365 name: str 

366 frequency_multiplier: float 

367 min_frequency: float 

368 

369 

370class DeviceUpdate(TwinPadModel): 

371 mode_id: int 

372 

373 

374class Device(GenericMongo): 

375 collection_name: ClassVar[str] = "devices" 

376 

377 device_id: DeviceId 

378 name: str 

379 description: str = "" 

380 modes: list[Mode] 

381 current_mode_id: int | None = None 

382 last_ping: float | None = None 

383 petri_network: Any 

384 pid: Any 

385 load: float | None = None 

386 tokens: list[int] = Field(default_factory=list) 

387 status: str 

388 

389 async def change_mode(self, update_dict, current_user: User): 

390 has_error = False 

391 

392 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

393 has_error = True 

394 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

395 elif self.current_mode_id is not None: 

396 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

397 else: 

398 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

399 command = Command( 

400 sent_at=time.time(), 

401 command_type="Mode change", 

402 description=description, 

403 user_id=current_user.id, 

404 ) 

405 

406 if has_error: 

407 command.response_time = 0 

408 command.succeeded = False 

409 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

410 else: 

411 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id) 

412 command.receive_response(response) 

413 

414 Command.create(command) 

415 return response 

416 

417 @classmethod 

418 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

419 devices_by_id = {} 

420 for signal_id in signal_ids: 

421 device_id = signal_id.split(".")[0] 

422 if device_id not in devices_by_id: 

423 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

424 return devices_by_id 

425 

426 

427class DeviceSetup(GenericMongo): 

428 collection_name: ClassVar[str] = "device_setups" 

429 

430 device_ids: list[str] 

431 active: bool = False 

432 variable_mapping: dict[str, str] 

433 

434 

435DeviceSetupUpdate = create_update_model(DeviceSetup) 

436 

437 

438class DeviceState(GenericMongo): 

439 collection_name: ClassVar[str] = "devices_states" 

440 

441 timestamp: float 

442 mode: str | None = None 

443 load: float | None = None 

444 tokens: list[int] = Field(default_factory=list) 

445 modified_properties: list[str] = Field(default_factory=list) 

446 

447 @classmethod 

448 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]: 

449 req_filter = query.mongodb_filter() 

450 items = [] 

451 if ":" in query.sort_by: 

452 sort_field, sort_order = query.sort_by.split(":") 

453 sort_order = int(sort_order) 

454 else: 

455 sort_field = query.sort_by 

456 sort_order = 1 

457 collection = get_collection(devices_states_database, device_id) 

458 if collection is None: 

459 total = 0 

460 cursor = [] 

461 else: 

462 total = collection.count_documents(req_filter) 

463 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

464 if (query.limit is not None) and (query.limit != 0): 

465 cursor = cursor.limit(query.limit) 

466 for item_dict in cursor: 

467 items.append( 

468 cls( 

469 timestamp=item_dict.get("precise_timestamp"), 

470 mode=item_dict.get("mode", None), 

471 load=item_dict.get("load", None), 

472 tokens=item_dict.get("tokens", Field(default_factory=list)), 

473 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

474 ) 

475 ) 

476 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

477 

478 

479class SignalSample(TwinPadModel): 

480 signal_id: str 

481 timestamp: float 

482 value: float | int | str | bool | None 

483 forced_value: float | int | str | bool | None = None 

484 

485 @classmethod 

486 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

487 

488 collection = get_signal_collection(signal_id) 

489 if collection is None: 

490 return None 

491 

492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

494 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

495 first_bucket = None 

496 if bucket is not None: 

497 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

498 if first_bucket is not None: 

499 sample_data = collection.find_one( 

500 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

501 ) 

502 else: 

503 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

504 

505 if sample_data is None: 

506 return None 

507 

508 timestamp = sample_data["precise_timestamp"] 

509 

510 return cls( 

511 signal_id=signal_id, 

512 timestamp=timestamp, 

513 value=sample_data.get("value", None), 

514 forced_value=sample_data.get("forced_value", None), 

515 ) 

516 

517 @classmethod 

518 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

519 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

520 

521 @classmethod 

522 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

523 last_value_collection = get_signal_collection("last_values", True) 

524 signals_collection = get_collection(systems_database, "signals", create=True) 

525 

526 sample_data = last_value_collection.find_one({"signal_id": signal_id}, sort={"precise_timestamp": -1}) 

527 

528 # If there is no data, check if the signal's type is anything other than float, as they don't send duplicate values 

529 if sample_data is None: 

530 if signals_collection.count_documents({"status.status": "up", "signal_id": signal_id}) < 1: 

531 return None 

532 

533 # Same workaround as above function, very effective to narrow down big sets of data 

534 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

535 last_bucket = None 

536 if bucket is not None: 

537 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

538 signal_collection = get_signal_collection(signal_id) 

539 if last_bucket is not None and signal_collection is not None: 

540 sample_data = signal_collection.find_one( 

541 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

542 sort={"precise_timestamp": -1}, 

543 ) 

544 else: 

545 sample_data = signal_collection.find_one({}, sort={"precise_timestamp": -1}) 

546 

547 if sample_data is None: 

548 return None 

549 

550 timestamp = sample_data.get("precise_timestamp") 

551 # Align the timestamp with the device's last ping, cannot align with current time to avoid false reports if device is down 

552 if device is None: 

553 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

554 if device is not None and device.last_ping is not None: 

555 if timestamp is None: 

556 timestamp = device.last_ping 

557 else: 

558 timestamp = max(timestamp, device.last_ping) 

559 else: 

560 timestamp = sample_data.get("precise_timestamp") 

561 

562 return cls( 

563 signal_id=signal_id, 

564 timestamp=timestamp, 

565 value=sample_data.get("value", None), 

566 forced_value=sample_data.get("forced_value", None), 

567 ) 

568 

569 @classmethod 

570 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

571 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

572 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

573 

574 

575class SignalData(TwinPadModel): 

576 signal_id: str 

577 forcible: bool = True 

578 time_vector: list[float] 

579 values: list[float | int | str | None] 

580 forced_values: list[float | int | str | None] 

581 

582 data_start: float | None = None 

583 data_end: float | None = None 

584 

585 number_samples: int = 0 

586 number_samples_db: int = 0 

587 

588 db_query_time: float = 0.0 

589 init_time: float = 0.0 

590 data_processing_time: float = 0.0 

591 

592 phase_id: str | None = None 

593 

594 @classmethod 

595 def get_from_signal_id( 

596 cls, 

597 signal_id: str, 

598 min_timestamp: float = None, 

599 max_timestamp: float = None, 

600 window_min_timestamp: float = None, 

601 window_max_timestamp: float = None, 

602 interpolate_bounds: bool = True, 

603 max_documents: int = None, 

604 collection=None, 

605 ) -> Self: 

606 

607 now = time.time() 

608 

609 req_signal = {} 

610 if min_timestamp is not None: 

611 req_signal.setdefault("timestamp", {}) 

612 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

613 if max_timestamp is not None: 

614 req_signal.setdefault("timestamp", {}) 

615 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

616 

617 if collection is None: 

618 collection = get_signal_collection(signal_id) 

619 if collection is None: 

620 return cls( 

621 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

622 ) 

623 

624 db_req_start = time.time() 

625 

626 sort_step = {"$sort": {"precise_timestamp": 1}} 

627 number_results = collection.count_documents(req_signal) 

628 

629 pipeline = [] 

630 if req_signal: 

631 pipeline.append({"$match": req_signal}) # Filter data if needed 

632 

633 pipeline.extend( 

634 [ 

635 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

636 sort_step, 

637 ] 

638 ) 

639 

640 if max_documents is not None and max_documents < number_results: 

641 unsampling_ratio = math.ceil(number_results / max_documents) 

642 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

643 pipeline.extend( 

644 [ 

645 { 

646 "$setWindowFields": { 

647 "sortBy": {"precise_timestamp": 1}, 

648 "output": {"index": {"$documentNumber": {}}}, 

649 } 

650 }, 

651 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

652 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

653 {"$replaceRoot": {"newRoot": "$doc"}}, 

654 {"$unset": ["index", "group_id"]}, 

655 {"$sort": {"precise_timestamp": 1}}, 

656 ] 

657 ) 

658 

659 # logger.info(f"pipeline: %s", str(pipeline)) 

660 cursor = collection.aggregate(pipeline) 

661 db_req_time = time.time() - db_req_start 

662 

663 init_time = time.time() 

664 

665 results = cursor.to_list() 

666 time_vector = [] 

667 values = [] 

668 forced_values = [] 

669 for s in results: 

670 time_vector.append(s["precise_timestamp"]) 

671 values.append(s.get("value", None)) 

672 forced_values.append(s.get("forced_value", None)) 

673 

674 signal = Signal.get_from_signal_id(signal_id) 

675 if signal is None: 

676 return cls( 

677 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

678 ) 

679 class_ = signal.signal_data_class 

680 

681 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

682 time_vector, values, forced_values = cls.interpolate_bounds( 

683 class_, 

684 collection, 

685 signal_id, 

686 time_vector, 

687 values, 

688 forced_values, 

689 window_min_timestamp, 

690 window_max_timestamp, 

691 ) 

692 

693 if values: 

694 # TODO: check below. a bit strange 

695 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

696 # Adding last value as it should be repeated 

697 time_vector.append(now) 

698 values.append(values[-1]) 

699 forced_values.append(forced_values[-1]) 

700 

701 init_time = time.time() - init_time 

702 

703 # See line 292 for explanation 

704 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

705 first_bucket = None 

706 if bucket is not None: 

707 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

708 if first_bucket is not None: 

709 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

710 else: 

711 data_start = None 

712 

713 last_bucket = None 

714 if bucket is not None: 

715 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

716 if last_bucket is not None: 

717 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

718 else: 

719 data_end = None 

720 

721 return class_( 

722 signal_id=signal_id, 

723 forcible=signal.forcible, 

724 time_vector=time_vector, 

725 values=values, 

726 forced_values=forced_values, 

727 data_start=data_start, 

728 data_end=data_end, 

729 number_samples=len(values), 

730 number_samples_db=number_results, 

731 db_query_time=db_req_time, 

732 init_time=init_time, 

733 ) 

734 

735 @staticmethod 

736 def interpolate_bounds( 

737 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

738 ): 

739 sample_right = None 

740 # Fetching right side value & interpolation 

741 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

742 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

743 sample_right = collection.find_one( 

744 { 

745 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

746 "value": {"$exists": True}, 

747 }, 

748 sort={"precise_timestamp": -1}, 

749 ) 

750 if sample_right: 

751 if time_vector: 

752 right_sd = class_( 

753 signal_id=signal_id, 

754 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

755 values=[values[-1], sample_right.get("value", None)], 

756 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

757 ) 

758 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

759 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

760 else: 

761 max_ts_value = sample_right.get("value", None) 

762 max_ts_forced_value = sample_right.get("forced_value", None) 

763 time_vector.append(window_max_timestamp) 

764 values.append(max_ts_value) 

765 forced_values.append(max_ts_forced_value) 

766 

767 # Fetching left side value & interpolation 

768 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

769 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

770 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

771 sample_left = sample_right 

772 sample_left = collection.find_one( 

773 { 

774 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

775 "value": {"$exists": True}, 

776 }, 

777 sort={"precise_timestamp": -1}, 

778 ) 

779 

780 if sample_left: 

781 if time_vector: 

782 left_sd = class_( 

783 signal_id=signal_id, 

784 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

785 values=[sample_left["value"], values[0]], 

786 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

787 ) 

788 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

789 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

790 else: 

791 min_ts_value = sample_left.get("value", None) 

792 min_ts_forced_value = sample_left.get("forced_value", None) 

793 time_vector.insert(0, window_min_timestamp) 

794 values.insert(0, min_ts_value) 

795 forced_values.insert(0, min_ts_forced_value) 

796 

797 return time_vector, values, forced_values 

798 

799 def interpolate_values(self, new_time_vector: list[float]): 

800 return self.interpolate(new_time_vector, self.values) 

801 

802 def interpolate_forced_values(self, new_time_vector: list[float]): 

803 return self.interpolate(new_time_vector, self.forced_values) 

804 

805 def uniform_desampling(self, number_samples_max: int) -> Self: 

806 data_processing_time = time.time() 

807 if number_samples_max and self.number_samples > number_samples_max: 

808 new_time_vector = npy.linspace( 

809 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

810 ).tolist() 

811 values = self.interpolate_values(new_time_vector) 

812 forced_values = self.interpolate_forced_values(new_time_vector) 

813 time_vector = new_time_vector 

814 number_samples = len(time_vector) 

815 else: 

816 time_vector = self.time_vector 

817 number_samples = len(self.values) 

818 values = self.values[:] 

819 forced_values = self.forced_values[:] 

820 data_processing_time = time.time() - data_processing_time 

821 

822 return self.__class__( 

823 signal_id=self.signal_id, 

824 time_vector=time_vector, 

825 values=values, 

826 forced_values=forced_values, 

827 number_samples=number_samples, 

828 number_samples_db=self.number_samples, 

829 data_start=self.data_start, 

830 data_end=self.data_end, 

831 db_query_time=self.db_query_time, 

832 init_time=self.init_time, 

833 data_processing_time=self.data_processing_time + data_processing_time, 

834 phase_id=self.phase_id, 

835 ) 

836 

837 def min_max_downsampling(self, number_samples_max: int) -> Self: 

838 return self.uniform_desampling(number_samples_max) 

839 

840 def interest_window_desampling( 

841 self, 

842 window_max_number_samples: int, 

843 outside_max_number_samples: int, 

844 window_min_timestamp: float | None = None, 

845 window_max_timestamp: float | None = None, 

846 ) -> Self: 

847 """Performs a sampling in a window of interest and outside.""" 

848 

849 if not self.time_vector: 

850 return self 

851 

852 if window_min_timestamp is None: 

853 window_min_timestamp = self.time_vector[0] 

854 if window_max_timestamp is None: 

855 window_max_timestamp = self.time_vector[-1] 

856 

857 data_processing_time = time.time() 

858 

859 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

860 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

861 

862 time_vector_before = self.time_vector[:index_window_start] 

863 time_vector_window = self.time_vector[index_window_start:index_window_end] 

864 time_vector_after = self.time_vector[index_window_end:] 

865 

866 # Resampling window 

867 if time_vector_window: 

868 # Ensurring window bounds 

869 if time_vector_window[0] != window_min_timestamp: 

870 time_vector_window.insert(0, window_min_timestamp) 

871 if time_vector_window[-1] != window_max_timestamp: 

872 time_vector_window.append(window_max_timestamp) 

873 else: 

874 time_vector_window = [window_min_timestamp, window_max_timestamp] 

875 

876 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

877 # Resampling 

878 new_window_time_vector = npy.linspace( 

879 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

880 ).tolist() 

881 time_vector_window = new_window_time_vector 

882 

883 # Resampling outside 

884 number_samples_before = len(time_vector_before) 

885 number_samples_after = len(time_vector_after) 

886 if ( 

887 outside_max_number_samples is not None 

888 and (number_samples_before + number_samples_after) > outside_max_number_samples 

889 ): 

890 new_number_samples_before = min( 

891 number_samples_before, 

892 math.ceil( 

893 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

894 ), 

895 ) 

896 new_number_samples_after = min( 

897 number_samples_after, 

898 math.ceil( 

899 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

900 ), 

901 ) 

902 # Adjusting numbers as math.ceil can do +1 on sum 

903 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

904 if new_number_samples_before > new_number_samples_after: 

905 new_number_samples_before -= 1 

906 else: 

907 new_number_samples_after -= 1 

908 

909 if new_number_samples_before > 0: 

910 new_time_vector_before = npy.linspace( 

911 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

912 ).tolist() 

913 time_vector_before = new_time_vector_before 

914 

915 if new_number_samples_after > 0: 

916 new_time_vector_after = npy.linspace( 

917 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

918 ).tolist()[::-1] 

919 time_vector_after = new_time_vector_after 

920 

921 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

922 values = self.interpolate_values(new_time_vector) 

923 forced_values = self.interpolate_forced_values(new_time_vector) 

924 number_samples = len(values) 

925 

926 data_processing_time = time.time() - data_processing_time 

927 

928 return self.__class__( 

929 signal_id=self.signal_id, 

930 forcible=self.forcible, 

931 time_vector=new_time_vector, 

932 values=values, 

933 forced_values=forced_values, 

934 number_samples=number_samples, 

935 number_samples_db=self.number_samples, 

936 data_start=self.data_start, 

937 data_end=self.data_end, 

938 db_query_time=self.db_query_time, 

939 init_time=self.init_time, 

940 data_processing_time=self.data_processing_time + data_processing_time, 

941 ) 

942 

943 def zero_time_vector(self, data_start: float): 

944 data_processing_time = time.time() 

945 if len(self.time_vector) == 0: 

946 return self 

947 time_vector = npy.array(self.time_vector) - data_start 

948 data_processing_time = time.time() - data_processing_time 

949 

950 return self.__class__( 

951 signal_id=self.signal_id, 

952 time_vector=time_vector, 

953 values=self.values, 

954 forced_values=self.forced_values, 

955 number_samples=self.number_samples, 

956 number_samples_db=self.number_samples_db, 

957 data_start=time_vector[0], 

958 data_end=time_vector[-1], 

959 db_query_time=self.db_query_time, 

960 init_time=self.init_time, 

961 data_processing_time=self.data_processing_time + data_processing_time, 

962 ) 

963 

964 def csv_export(self): 

965 output = io.StringIO() 

966 writer = csv.writer(output) 

967 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

968 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

969 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

970 return output.getvalue().encode("utf-8") 

971 

972 def prestoplot_export(self): 

973 clean_signal_id = self.signal_id.replace(".", "_") 

974 if clean_signal_id[0].isnumeric(): 

975 clean_signal_id = "_" + clean_signal_id 

976 

977 output = io.StringIO() 

978 output.write("# Encoding:\tUTF-8\n") 

979 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

980 output.write("ISO8601\tnone\tnone\n") 

981 output.write(f"# Description :\t{clean_signal_id}\n") 

982 

983 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

984 output.write( 

985 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

986 ) 

987 return output.getvalue().encode("utf-8") 

988 

989 

990class NumericSignalData(SignalData): 

991 data_type: str = "float" 

992 values: list[float | int | None] 

993 forced_values: list[float | int | None] 

994 

995 def interpolate(self, new_time_vector: list[float], items): 

996 items = [npy.nan if s is None else s for s in items] 

997 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

998 

999 def uniform_desampling(self, number_samples_max: int) -> Self: 

1000 data_processing_time = time.time() 

1001 if number_samples_max and self.number_samples > number_samples_max: 

1002 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

1003 forced_values = self.interpolate_forced_values(time_vector) 

1004 number_samples = len(time_vector) 

1005 else: 

1006 time_vector = self.time_vector 

1007 number_samples = len(self.values) 

1008 values = self.values[:] 

1009 forced_values = self.forced_values[:] 

1010 data_processing_time = time.time() - data_processing_time 

1011 

1012 return self.__class__( 

1013 signal_id=self.signal_id, 

1014 time_vector=time_vector, 

1015 values=values, 

1016 forced_values=forced_values, 

1017 number_samples=number_samples, 

1018 number_samples_db=self.number_samples, 

1019 data_start=self.data_start, 

1020 data_end=self.data_end, 

1021 db_query_time=self.db_query_time, 

1022 init_time=self.init_time, 

1023 data_processing_time=self.data_processing_time + data_processing_time, 

1024 ) 

1025 

1026 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1027 if self.number_samples < number_samples_max: 

1028 return self 

1029 

1030 data_processing_time = time.time() 

1031 

1032 number_bins = number_samples_max // 2 

1033 

1034 time_vector = npy.array(self.time_vector, dtype=npy.float64) 

1035 values = npy.array(self.values, dtype=npy.float64) 

1036 forced_values = npy.array(self.forced_values, dtype=npy.float64) 

1037 

1038 points_per_bin = self.number_samples // number_bins 

1039 

1040 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above 

1041 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for 

1042 if self.number_samples - number_bins * points_per_bin > 1: 

1043 points_per_bin += 1 

1044 number_bins = self.number_samples // points_per_bin + 1 

1045 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan) 

1046 time_vector = npy.concatenate([time_vector, nan_points_to_add]) 

1047 values = npy.concatenate([values, nan_points_to_add]) 

1048 forced_values = npy.concatenate([forced_values, nan_points_to_add]) 

1049 

1050 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin) 

1051 values_matrix = values.reshape(number_bins, points_per_bin) 

1052 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin) 

1053 

1054 indexes_min = npy.zeros(number_bins, dtype="int64") 

1055 indexes_max = npy.zeros(number_bins, dtype="int64") 

1056 

1057 for row in range(number_bins): 

1058 min_value = values_matrix[row, 0] 

1059 max_value = values_matrix[row, 0] 

1060 for column in range(points_per_bin): 

1061 if values_matrix[row, column] < min_value: 

1062 min_value = values_matrix[row, column] 

1063 indexes_min[row] = column 

1064 elif values_matrix[row, column] > max_value: 

1065 max_value = values_matrix[row, column] 

1066 indexes_max[row] = column 

1067 

1068 row_index = npy.repeat(npy.arange(number_bins), 2) 

1069 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel() 

1070 

1071 data_processing_time = time.time() - data_processing_time 

1072 

1073 new_time_vector = timestamps_matrix[row_index, column_index] 

1074 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector) 

1075 new_values = values_matrix[row_index, column_index] 

1076 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1077 new_forced_values = forced_values_matrix[row_index, column_index] 

1078 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1079 

1080 # Make sure there are no None values for the time vector 

1081 time_vector_filter = new_time_vector != None 

1082 new_time_vector = new_time_vector[time_vector_filter] 

1083 new_values = new_values[time_vector_filter] 

1084 new_forced_values = new_forced_values[time_vector_filter] 

1085 

1086 return self.__class__( 

1087 signal_id=self.signal_id, 

1088 time_vector=new_time_vector, 

1089 values=new_values, 

1090 forced_values=new_forced_values, 

1091 number_samples=number_bins * 2, 

1092 number_samples_db=self.number_samples_db, 

1093 data_start=self.data_start, 

1094 data_end=self.data_end, 

1095 db_query_time=self.db_query_time, 

1096 init_time=self.init_time, 

1097 data_processing_time=self.data_processing_time + data_processing_time, 

1098 phase_id=self.phase_id, 

1099 ) 

1100 

1101 def interest_window_desampling( 

1102 self, 

1103 window_max_number_samples: int, 

1104 outside_max_number_samples: int, 

1105 window_min_timestamp: float | None = None, 

1106 window_max_timestamp: float | None = None, 

1107 ) -> Self: 

1108 """Performs a sampling in a window of interest and outside.""" 

1109 

1110 if not self.time_vector: 

1111 return self 

1112 

1113 if window_min_timestamp is None: 

1114 window_min_timestamp = self.time_vector[0] 

1115 if window_max_timestamp is None: 

1116 window_max_timestamp = self.time_vector[-1] 

1117 

1118 data_processing_time = time.time() 

1119 

1120 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1121 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1122 

1123 time_vector_before = self.time_vector[:index_window_start] 

1124 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1125 time_vector_after = self.time_vector[index_window_end:] 

1126 

1127 values_before = self.values[:index_window_start] 

1128 values_window = self.values[index_window_start:index_window_end] 

1129 values_after = self.values[index_window_end:] 

1130 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1131 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1132 

1133 # Resampling window 

1134 if time_vector_window: 

1135 # Ensurring window bounds 

1136 if time_vector_window[0] != window_min_timestamp: 

1137 time_vector_window.insert(0, window_min_timestamp) 

1138 values_window.insert(0, window_min_value) 

1139 if time_vector_window[-1] != window_max_timestamp: 

1140 time_vector_window.append(window_max_timestamp) 

1141 values_window.append(window_max_value) 

1142 else: 

1143 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1144 values_window = [window_min_value, window_max_value] 

1145 

1146 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

1147 # Resampling 

1148 time_vector_window, values_window = downsample_list( 

1149 time_vector_window, values_window, window_max_number_samples 

1150 ) 

1151 

1152 # Resampling outside 

1153 number_samples_before = len(time_vector_before) 

1154 number_samples_after = len(time_vector_after) 

1155 if ( 

1156 outside_max_number_samples is not None 

1157 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1158 ): 

1159 new_number_samples_before = min( 

1160 number_samples_before, 

1161 math.ceil( 

1162 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1163 ), 

1164 ) 

1165 new_number_samples_after = min( 

1166 number_samples_after, 

1167 math.ceil( 

1168 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1169 ), 

1170 ) 

1171 # Adjusting numbers as math.ceil can do +1 on sum 

1172 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1173 if new_number_samples_before > new_number_samples_after: 

1174 new_number_samples_before -= 1 

1175 else: 

1176 new_number_samples_after -= 1 

1177 

1178 if new_number_samples_before > 0: 

1179 time_vector_before, values_before = downsample_list( 

1180 time_vector_before, values_before, new_number_samples_before 

1181 ) 

1182 

1183 if new_number_samples_after > 0: 

1184 time_vector_after, values_after = downsample_list( 

1185 time_vector_after, values_after, new_number_samples_after 

1186 ) 

1187 

1188 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1189 values = values_before + values_window + values_after 

1190 forced_values = self.interpolate_forced_values(new_time_vector) 

1191 number_samples = len(values) 

1192 

1193 data_processing_time = time.time() - data_processing_time 

1194 

1195 return self.__class__( 

1196 signal_id=self.signal_id, 

1197 time_vector=new_time_vector, 

1198 values=values, 

1199 forced_values=forced_values, 

1200 number_samples=number_samples, 

1201 number_samples_db=self.number_samples, 

1202 data_start=self.data_start, 

1203 data_end=self.data_end, 

1204 db_query_time=self.db_query_time, 

1205 init_time=self.init_time, 

1206 data_processing_time=self.data_processing_time + data_processing_time, 

1207 ) 

1208 

1209 

1210class StringSignalData(SignalData): 

1211 data_type: str = "str" 

1212 values: list[str | None] 

1213 forced_values: list[str | None] 

1214 

1215 def interpolate(self, new_time_vector: list[float], items): 

1216 # Find the indices of the values in xp that are just smaller or equal to x 

1217 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1218 indices = npy.clip(indices, 0, len(items) - 1) 

1219 # Return the corresponding left string values from fp 

1220 return [items[i] for i in indices] 

1221 

1222 

1223class SignalsData(TwinPadModel): 

1224 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1225 data_processing_time: float 

1226 data_start: float | None 

1227 data_end: float | None 

1228 

1229 @classmethod 

1230 def get_from_signal_ids( 

1231 cls, 

1232 signal_ids: list[str], 

1233 min_timestamp: float = None, 

1234 max_timestamp: float = None, 

1235 window_min_timestamp: float = None, 

1236 window_max_timestamp: float = None, 

1237 interpolate_bounds: bool = True, 

1238 max_documents: int = None, 

1239 ) -> Self: 

1240 signals_data = [] 

1241 data_start = None 

1242 data_end = None 

1243 if max_timestamp is None: 

1244 max_timestamp = time.time() 

1245 data_processing_time = 0.0 

1246 

1247 signal_collections = get_signal_collections_batch(signal_ids) 

1248 

1249 for signal_id, collection in zip(signal_ids, signal_collections): 

1250 signal_data = SignalData.get_from_signal_id( 

1251 signal_id=signal_id, 

1252 min_timestamp=min_timestamp, 

1253 max_timestamp=max_timestamp, 

1254 window_min_timestamp=window_min_timestamp, 

1255 window_max_timestamp=window_max_timestamp, 

1256 interpolate_bounds=interpolate_bounds, 

1257 max_documents=max_documents, 

1258 collection=collection, 

1259 ) 

1260 data_processing_time += signal_data.data_processing_time 

1261 signals_data.append(signal_data) 

1262 if signal_data.data_start is not None: 

1263 if data_start is None: 

1264 data_start = signal_data.data_start 

1265 else: 

1266 data_start = min(signal_data.data_start, data_start) 

1267 if signal_data.data_end is not None: 

1268 if data_end is None: 

1269 data_end = signal_data.data_end 

1270 else: 

1271 data_end = max(signal_data.data_end, data_end) 

1272 

1273 return cls( 

1274 signals_data=signals_data, 

1275 data_processing_time=data_processing_time, 

1276 data_start=data_start, 

1277 data_end=data_end, 

1278 ) 

1279 

1280 @classmethod 

1281 def get_from_phase_and_signal_ids( 

1282 cls, 

1283 phases: list, 

1284 phase_sync_times: list[float | None], 

1285 signal_ids: list[str], 

1286 window_min_timestamps: list[float | None], 

1287 window_max_timestamps: list[float | None], 

1288 zero_time_vector: bool = True, 

1289 ): 

1290 signals_data: list[SignalData] = [] 

1291 computation_start = time.time() 

1292 

1293 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip( 

1294 phases, phase_sync_times, window_min_timestamps, window_max_timestamps 

1295 ): 

1296 min_timestamp = phase.start_at / 1000 

1297 max_timestamp = phase.end_at / 1000 

1298 

1299 if sync_time is None: 

1300 sync_time = min_timestamp 

1301 

1302 if window_max_timestamp is not None and window_min_timestamp is not None: 

1303 window_length = window_max_timestamp - window_min_timestamp 

1304 

1305 if window_min_timestamp != min_timestamp: 

1306 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20) 

1307 if window_max_timestamp != max_timestamp: 

1308 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20) 

1309 

1310 signal_collections = get_signal_collections_batch(signal_ids) 

1311 

1312 for signal_id, collection in zip(signal_ids, signal_collections): 

1313 signal_data = SignalData.get_from_signal_id( 

1314 signal_id, 

1315 min_timestamp, 

1316 max_timestamp, 

1317 window_min_timestamp, 

1318 window_max_timestamp, 

1319 interpolate_bounds=False, 

1320 max_documents=None, 

1321 collection=collection, 

1322 ) 

1323 

1324 if len(signal_data.time_vector) == 0: 

1325 continue 

1326 

1327 if zero_time_vector: 

1328 signal_data = signal_data.zero_time_vector(sync_time) 

1329 signal_data.phase_id = phase.id 

1330 

1331 signals_data.append(signal_data) 

1332 

1333 return cls( 

1334 signals_data=signals_data, 

1335 data_processing_time=time.time() - computation_start, 

1336 data_start=0, 

1337 data_end=0, 

1338 ) 

1339 

1340 def uniform_desampling(self, number_samples_max: int) -> Self: 

1341 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1342 return SignalsData( 

1343 signals_data=signals_data, 

1344 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1345 data_start=self.data_start, 

1346 data_end=self.data_end, 

1347 ) 

1348 

1349 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1350 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1351 return SignalsData( 

1352 signals_data=signals_data, 

1353 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1354 data_start=self.data_start, 

1355 data_end=self.data_end, 

1356 ) 

1357 

1358 def interest_window_desampling( 

1359 self, 

1360 window_max_number_samples: int, 

1361 outside_max_number_samples: int, 

1362 window_min_timestamp: float = None, 

1363 window_max_timestamp: float = None, 

1364 ) -> Self: 

1365 signals_data = [ 

1366 s.interest_window_desampling( 

1367 window_max_number_samples=window_max_number_samples, 

1368 outside_max_number_samples=outside_max_number_samples, 

1369 window_min_timestamp=window_min_timestamp, 

1370 window_max_timestamp=window_max_timestamp, 

1371 ) 

1372 for s in self.signals_data 

1373 ] 

1374 

1375 return SignalsData( 

1376 signals_data=signals_data, 

1377 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1378 data_start=self.data_start, 

1379 data_end=self.data_end, 

1380 ) 

1381 

1382 def zero_time_vector(self, data_start: float): 

1383 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data] 

1384 return SignalsData( 

1385 signals_data=signals_data, 

1386 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1387 data_start=0, 

1388 data_end=max([s.data_end for s in signals_data]), 

1389 ) 

1390 

1391 @classmethod 

1392 async def apply_single_function( 

1393 cls, 

1394 phase, 

1395 base_signal_id: str, 

1396 function: SINGLE_POST_PROCESSING_FUNCTION, 

1397 window_min_timestamp: float = None, 

1398 window_max_timestamp: float = None, 

1399 ): 

1400 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}" 

1401 

1402 processed_result_signal = Signal.get_from_signal_id(signal_id) 

1403 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids: 

1404 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp) 

1405 

1406 signals_data = cls.get_from_phase_and_signal_ids( 

1407 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False 

1408 ) 

1409 

1410 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0: 

1411 return None 

1412 

1413 new_values = None 

1414 new_forced_values = None 

1415 time_vector = npy.array(signals_data.signals_data[0].time_vector) 

1416 values = signals_data.signals_data[0].values 

1417 forced_values = signals_data.signals_data[0].forced_values 

1418 

1419 match (function): 

1420 case "Cumul": 

1421 new_values = cumul(values) 

1422 new_forced_values = cumul(forced_values) 

1423 # case "CumulDistrib": 

1424 # new_values = cumul_distrib(values) 

1425 # new_forced_values = cumul_distrib(forced_values) 

1426 case "Delta": 

1427 new_values = delta(values) 

1428 new_forced_values = delta(forced_values) 

1429 case "DeltaT": 

1430 new_values = delta(time_vector) 

1431 new_forced_values = new_values 

1432 case "Derive": 

1433 new_values = derive(time_vector, values) 

1434 new_forced_values = derive(time_vector, forced_values) 

1435 case "Integ": 

1436 new_values = integ(time_vector, values) 

1437 new_forced_values = integ(time_vector, forced_values) 

1438 

1439 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1440 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1441 

1442 loop = asyncio.get_running_loop() 

1443 loop.create_task( 

1444 cls.save_function_signal( 

1445 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible 

1446 ) 

1447 ) 

1448 

1449 if window_max_timestamp is not None: 

1450 max_timestamp_mask = time_vector <= window_max_timestamp 

1451 time_vector = time_vector[max_timestamp_mask] 

1452 new_values = new_values[max_timestamp_mask] 

1453 new_forced_values = new_forced_values[max_timestamp_mask] 

1454 if window_min_timestamp is not None: 

1455 min_timestamp_mask = time_vector >= window_min_timestamp 

1456 time_vector = time_vector[min_timestamp_mask] 

1457 new_values = new_values[min_timestamp_mask] 

1458 new_forced_values = new_forced_values[min_timestamp_mask] 

1459 

1460 signals_data.signals_data[0].time_vector = time_vector.tolist() 

1461 signals_data.signals_data[0].values = new_values.tolist() 

1462 signals_data.signals_data[0].forced_values = new_forced_values.tolist() 

1463 signals_data.signals_data[0].number_samples = time_vector.size 

1464 

1465 signals_data.signals_data[0].signal_id = signal_id 

1466 

1467 return signals_data 

1468 

1469 @classmethod 

1470 async def apply_multiple_function( 

1471 cls, 

1472 phases: list, 

1473 signal_ids: list, 

1474 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

1475 window_min_timestamp: float = None, 

1476 window_max_timestamp: float = None, 

1477 ): 

1478 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION): 

1479 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}" 

1480 else: 

1481 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}" 

1482 

1483 active_phase = phases[0] 

1484 if function in {"Align-X", "Using-X"}: 

1485 active_phase = phases[1] 

1486 

1487 processed_result_signal = Signal.get_from_signal_id(function_signal_id) 

1488 if processed_result_signal is not None and ( 

1489 active_phase.id in processed_result_signal.computed_phases_ids 

1490 ): # If signal has been computed for the correct phase 

1491 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp) 

1492 

1493 array_length = None 

1494 time_vector_list = [] 

1495 values_list = [] 

1496 forced_values_list = [] 

1497 forcible = True 

1498 for phase, signal_id in zip(phases, signal_ids): 

1499 signals_data = cls.get_from_phase_and_signal_ids( 

1500 [phase], [None], [signal_id], [None], [None], zero_time_vector=False 

1501 ) 

1502 

1503 if len(signals_data.signals_data) == 0: 

1504 return None 

1505 

1506 signal_data = signals_data.signals_data[0] 

1507 

1508 if array_length is None: 

1509 array_length = signal_data.number_samples 

1510 if ( 

1511 array_length != signal_data.number_samples and function != "Align-X" 

1512 ) or signal_data.number_samples == 0: 

1513 return None 

1514 

1515 time_vector_list.append(npy.array(signal_data.time_vector)) 

1516 values_list.append(npy.array(signal_data.values, dtype=npy.float64)) 

1517 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64)) 

1518 forcible = forcible and signal_data.forcible 

1519 

1520 time_vector = time_vector_list[0] 

1521 new_values = None 

1522 new_forced_values = None 

1523 

1524 match (function): 

1525 case "Align-X": 

1526 time_vector = time_vector_list[1] 

1527 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000 

1528 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000 

1529 new_values = align_x(old_time_vector, values_list[0], new_time_vector) 

1530 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector) 

1531 # case "Atan2": 

1532 # new_values = atan2(values_list[0], values_list[1]) 

1533 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1]) 

1534 case "Using-X": 

1535 if len(time_vector_list[0]) != len(time_vector_list[1]): 

1536 return None 

1537 time_vector = time_vector_list[1] 

1538 new_values = values_list[0] 

1539 new_forced_values = forced_values_list[0] 

1540 case "Mean": 

1541 new_values = mean(*values_list) 

1542 new_forced_values = mean(*forced_values_list) 

1543 case "Norm": 

1544 new_values = norm(*values_list) 

1545 new_forced_values = norm(*forced_values_list) 

1546 

1547 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1548 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1549 

1550 loop = asyncio.get_running_loop() 

1551 loop.create_task( 

1552 cls.save_function_signal( 

1553 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible 

1554 ) 

1555 ) 

1556 

1557 total_number_samples = time_vector.size 

1558 

1559 if window_max_timestamp is not None: 

1560 max_timestamp_mask = time_vector <= window_max_timestamp 

1561 time_vector = time_vector[max_timestamp_mask] 

1562 new_values = new_values[max_timestamp_mask] 

1563 new_forced_values = new_forced_values[max_timestamp_mask] 

1564 if window_min_timestamp is not None: 

1565 min_timestamp_mask = time_vector >= window_min_timestamp 

1566 time_vector = time_vector[min_timestamp_mask] 

1567 new_values = new_values[min_timestamp_mask] 

1568 new_forced_values = new_forced_values[min_timestamp_mask] 

1569 

1570 signals_data = cls( 

1571 signals_data=[ 

1572 NumericSignalData( 

1573 signal_id=function_signal_id, 

1574 forcible=forcible, 

1575 time_vector=time_vector.tolist(), 

1576 values=new_values.tolist(), 

1577 forced_values=new_forced_values.tolist(), 

1578 number_samples=time_vector.size, 

1579 number_samples_db=total_number_samples, 

1580 ) 

1581 ], 

1582 data_processing_time=0, 

1583 data_start=0, 

1584 data_end=0, 

1585 ) 

1586 

1587 return signals_data 

1588 

1589 @classmethod 

1590 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float): 

1591 signal_data_collection = get_signal_collection(signal_id, create=True) 

1592 pipeline = [] 

1593 match_filter = {} 

1594 if window_min_timestamp is not None or window_max_timestamp is not None: 

1595 match_filter["$match"] = {} 

1596 match_filter["$match"]["precise_timestamp"] = {} 

1597 if window_max_timestamp is not None: 

1598 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp 

1599 if window_min_timestamp is not None: 

1600 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp 

1601 

1602 total_number_samples = signal_data_collection.count_documents({}) 

1603 

1604 if match_filter: 

1605 pipeline.append(match_filter) 

1606 

1607 fetch_start = time.time() 

1608 

1609 samples = signal_data_collection.aggregate(pipeline).to_list() 

1610 new_time_vector = [] 

1611 new_values = [] 

1612 new_forced_values = [] 

1613 for sample in samples: 

1614 new_time_vector.append(sample["precise_timestamp"]) 

1615 new_values.append(sample["value"]) 

1616 new_forced_values.append(sample["forced_value"]) 

1617 

1618 return cls( 

1619 signals_data=[ 

1620 NumericSignalData( 

1621 signal_id=signal_id, 

1622 time_vector=new_time_vector, 

1623 values=new_values, 

1624 forced_values=new_forced_values, 

1625 number_samples=len(new_time_vector), 

1626 number_samples_db=total_number_samples, 

1627 ) 

1628 ], 

1629 data_processing_time=time.time() - fetch_start, 

1630 data_start=0, 

1631 data_end=0, 

1632 ) 

1633 

1634 @classmethod 

1635 async def save_function_signal( 

1636 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool 

1637 ): 

1638 # Insert data first so if it is requested by another user, it will be computed again 

1639 signal_collection = get_signal_collection(function_signal_id, create=True) 

1640 signal_collection.delete_many( 

1641 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}} 

1642 ) 

1643 signal_collection.insert_many( 

1644 [ 

1645 { 

1646 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]), 

1647 "precise_timestamp": time_vector[i], 

1648 "value": new_values[i], 

1649 "forced_value": new_forced_values[i], 

1650 } 

1651 for i in range(len(time_vector)) 

1652 ] 

1653 ) 

1654 

1655 signals_config_collection = get_collection(systems_database, "signals", create=True) 

1656 signals_config_collection.find_one_and_update( 

1657 {"signal_id": function_signal_id}, 

1658 { 

1659 "$set": { 

1660 "description": "", 

1661 "unit": None, 

1662 "type": "sensor", 

1663 "address": None, 

1664 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0 

1665 "transfer_function": None, 

1666 "precision_digits": None, 

1667 "digitization_function": None, 

1668 "data_type": "float", 

1669 "formula": None, 

1670 "forcible": forcible, 

1671 "commandable": False, 

1672 "broadcastable": True, 

1673 "signal_id": function_signal_id, 

1674 "post_processing": True, 

1675 }, 

1676 "$push": {"computed_phases_ids": phase.id}, 

1677 }, 

1678 upsert=True, 

1679 ) 

1680 

1681 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []): 

1682 if post_processing: 

1683 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1684 zip_buffer = io.BytesIO() 

1685 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1686 for signal_data in self.signals_data: 

1687 file_name = signal_data.signal_id 

1688 if post_processing: 

1689 phase = phases_by_id.get( 

1690 signal_data.phase_id, 

1691 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1692 ) 

1693 file_name = f"{signal_data.signal_id} ({phase.name})" 

1694 if file_format == "csv": 

1695 export_io = signal_data.csv_export() 

1696 zip_file.writestr(f"{file_name}.csv", export_io) 

1697 elif file_format == "prestoplot": 

1698 export_io = signal_data.prestoplot_export() 

1699 zip_file.writestr(f"{file_name}.tab", export_io) 

1700 else: 

1701 raise ValueError(f"Format not found. Got: {file_format}") 

1702 zip_bytes = zip_buffer.getvalue() 

1703 return zip_bytes 

1704 

1705 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []): 

1706 if post_processing: 

1707 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1708 hdf5_buffer = io.BytesIO() 

1709 custom_type_float = npy.dtype( 

1710 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1711 ) 

1712 custom_type_string = npy.dtype( 

1713 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1714 ) 

1715 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1716 for signal_data in self.signals_data: 

1717 if post_processing: 

1718 phase = phases_by_id.get( 

1719 signal_data.phase_id, 

1720 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1721 ) 

1722 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})") 

1723 else: 

1724 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1725 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1726 if signal_data.data_type == "str": 

1727 export_data = npy.array( 

1728 list( 

1729 zip( 

1730 date_vector, 

1731 signal_data.time_vector, 

1732 signal_data.values, 

1733 signal_data.forced_values, 

1734 ) 

1735 ), 

1736 dtype=custom_type_string, 

1737 ) 

1738 else: 

1739 export_data = npy.array( 

1740 list( 

1741 zip( 

1742 date_vector, 

1743 signal_data.time_vector, 

1744 signal_data.values, 

1745 signal_data.forced_values, 

1746 ) 

1747 ), 

1748 dtype=custom_type_float, 

1749 ) 

1750 signal_group["data"] = export_data 

1751 return hdf5_buffer.getvalue() 

1752 

1753 

1754class SignalStatus(TwinPadModel): 

1755 status: str = "down" 

1756 reason: str = "" 

1757 delay: float | None = None 

1758 

1759 

1760class DigitizationFunction(TwinPadModel): 

1761 bits: int | None = None 

1762 min_value: float 

1763 max_value: float 

1764 min_raw_value: float 

1765 max_raw_value: float 

1766 

1767 

1768class SignalUpdate(TwinPadModel): 

1769 value: float | str | bool | int | None = None 

1770 forced_value: float | str | bool | int | None = None 

1771 timestamp: int | None = None 

1772 

1773 

1774class SignalType(str, Enum): 

1775 command = "command" 

1776 sensor = "sensor" 

1777 external_sensor = "external_sensor" 

1778 

1779 

1780SIGNALDATA_TYPES = { 

1781 "int": NumericSignalData, 

1782 "float": NumericSignalData, 

1783 "str": StringSignalData, 

1784 "bool": NumericSignalData, 

1785 "epoch": NumericSignalData, 

1786} 

1787 

1788 

1789class Signal(GenericMongo): 

1790 collection_name: ClassVar[str] = "signals" 

1791 

1792 signal_id: str 

1793 frequency: float 

1794 unit: str | None 

1795 description: str 

1796 type: SignalType 

1797 data_type: str 

1798 precision_digits: int | None 

1799 forcible: bool 

1800 commandable: bool 

1801 broadcastable: bool 

1802 status: SignalStatus = SignalStatus() 

1803 

1804 post_processing: bool = False 

1805 computed_phases_ids: list[str] = [] 

1806 

1807 digitization_function: DigitizationFunction | None 

1808 

1809 @property 

1810 def device(self) -> Device: 

1811 device_id = self.signal_id.split(".")[0] 

1812 device = Device.get_one_by_attribute("device_id", device_id) 

1813 return device 

1814 

1815 @cached_property 

1816 def signal_data_class(self): 

1817 if self.data_type in SIGNALDATA_TYPES: 

1818 return SIGNALDATA_TYPES[self.data_type] 

1819 if self.data_type.startswith("enum"): 

1820 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1821 raise ValueError(f"Unhandled python type: {self.data_type}") 

1822 

1823 @cached_property 

1824 def python_type(self): 

1825 if self.data_type in TYPES: 

1826 return TYPES[self.data_type] 

1827 if self.data_type.startswith("enum"): 

1828 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1829 return Literal[*choices] 

1830 raise ValueError(f"Unhandled python type: {self.data_type}") 

1831 

1832 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1833 command = Command( 

1834 sent_at=time.time(), 

1835 command_type="Signal command", 

1836 user_id=current_user.id, 

1837 ) 

1838 

1839 has_input_error = False 

1840 error_message = "" 

1841 

1842 if self.data_type.startswith("enum"): 

1843 enum_options = get_args(self.python_type) 

1844 

1845 if update_dict.value is not None and update_dict.value not in enum_options: 

1846 has_input_error = True 

1847 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1848 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1849 has_input_error = True 

1850 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1851 else: 

1852 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1853 has_input_error = True 

1854 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1855 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1856 has_input_error = True 

1857 error_message += ( 

1858 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1859 ) 

1860 

1861 if has_input_error: 

1862 command.response_time = 0 

1863 command.succeeded = False 

1864 command.description = f"Tried to modify signal {self.signal_id}" 

1865 response = {"error": True, "status_code": 400, "message": error_message} 

1866 else: 

1867 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict) 

1868 command.receive_response(response) 

1869 

1870 Command.create(command) 

1871 return response 

1872 

1873 @classmethod 

1874 def get_from_signal_id(cls, signal_id) -> Self: 

1875 """Could be generic from mongo""" 

1876 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1877 if not raw_value: 

1878 return None 

1879 del raw_value["_id"] 

1880 return cls.dict_to_object(raw_value) 

1881 

1882 @classmethod 

1883 def get_all_ids(cls) -> list[str]: 

1884 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1885 

1886 return [signal["signal_id"] for signal in cursor] 

1887 

1888 @classmethod 

1889 def get_all_statuses(cls) -> list[dict]: 

1890 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

1891 

1892 return [ 

1893 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1894 for signal in cursor 

1895 ] 

1896 

1897 async def number_samples(self): 

1898 collection = get_signal_collection(signal_id=self.signal_id) 

1899 if collection is None: 

1900 return 0 

1901 

1902 number_samples = collection.estimated_document_count() 

1903 

1904 number_samples_async_collection = await get_async_collection( 

1905 systems_async_database, "number_samples", create=True, time_series=True 

1906 ) 

1907 

1908 loop = asyncio.get_running_loop() 

1909 loop.create_task( 

1910 number_samples_async_collection.insert_one( 

1911 { 

1912 "timestamp": datetime.datetime.now(pytz.UTC), 

1913 "signal_id": self.signal_id, 

1914 "number_samples": number_samples, 

1915 } 

1916 ) 

1917 ) 

1918 

1919 return number_samples 

1920 

1921 @classmethod 

1922 def total_number_samples(cls) -> int: 

1923 TwinPadActivity.get_number_samples_timeframe(0, 0, False) 

1924 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1925 

1926 if number_samples_collection is None: 

1927 return 0 

1928 

1929 result = number_samples_collection.aggregate( 

1930 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}] 

1931 ) 

1932 

1933 result = result.to_list() 

1934 if len(result) == 0: 

1935 return 0 

1936 return result[0]["amount"] 

1937 

1938 def sample_datasize(self): 

1939 return signals_database.command("collstats", self.signal_id)["size"] 

1940 

1941 @classmethod 

1942 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1943 result = cls.collection().aggregate( 

1944 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1945 ) 

1946 

1947 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1948 

1949 

1950class ForcedSignal(GenericMongo): 

1951 collection_name: ClassVar[str] = "forced_signals" 

1952 

1953 signal_id: str 

1954 forcing_user_id: str 

1955 forced_at: float 

1956 value: str | float 

1957 

1958 def insert(self): 

1959 insert_result = self.collection().find_one_and_update( 

1960 {"signal_id": self.signal_id}, 

1961 {"$set": self.to_dict(exclude={"id"})}, 

1962 upsert=True, 

1963 return_document=ReturnDocument.AFTER, 

1964 ) 

1965 self.id = str(insert_result["_id"]) 

1966 return self.id 

1967 

1968 @classmethod 

1969 def can_force(cls, signal_id: str, current_user: User) -> bool: 

1970 """Checks whether user can force a given signal. 

1971 

1972 :param signal_id: Signal ID of the signal to force 

1973 :type signal_id: str 

1974 :param current_user: Current user 

1975 :type current_user: User 

1976 :return: False if the signal was forced by someone else than the user, True otherwise 

1977 :rtype: bool 

1978 """ 

1979 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

1980 if forced_signal is not None: 

1981 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

1982 return False 

1983 return True 

1984 

1985 

1986class ServicesStatus(TwinPadModel): 

1987 backend: str 

1988 cloud_broker: str 

1989 time_series_database: str 

1990 signal_storage: str 

1991 heartbeat_storage: str 

1992 data_analyzer: str 

1993 

1994 @classmethod 

1995 def check(cls) -> Self: 

1996 return cls( 

1997 cloud_broker=ping(RABBITMQ_HOST), 

1998 backend="up", 

1999 time_series_database=ping(MONGO_HOST), 

2000 signal_storage=ping(SIGNAL_STORAGE_HOST), 

2001 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

2002 data_analyzer=ping(DATA_ANALYZER_HOST), 

2003 ) 

2004 

2005 

2006def ping(host): 

2007 try: 

2008 if ping3.ping(host, timeout=0.8): 

2009 return "up" 

2010 except PermissionError: 

2011 pass 

2012 return "down" 

2013 

2014 

2015class Event(GenericMongo): 

2016 collection_name: ClassVar[str] = "events" 

2017 

2018 name: str 

2019 timestamp: float 

2020 event_rule_id: str 

2021 

2022 @computed_field 

2023 @cached_property 

2024 def event_rule(self) -> "EventRule": 

2025 return EventRule.get_from_id(self.event_rule_id) 

2026 

2027 @classmethod 

2028 def dict_to_object(cls, dict_): 

2029 """Refine to convert timestamp to datetime for mongodb.""" 

2030 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

2031 return super().dict_to_object(dict_) 

2032 

2033 

2034class TwinPadActivity(GenericMongo): 

2035 timestamp: float 

2036 amount: int 

2037 

2038 @classmethod 

2039 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

2040 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2041 number_events_collection = get_collection(systems_database, "number_events") 

2042 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

2043 items = [] 

2044 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2045 if number_events_collection is None or recompute_amount: 

2046 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

2047 number_events_collection.delete_many({}) 

2048 first_event = events_collection.find_one(sort={"timestamp": 1}) 

2049 if first_event is None: 

2050 return items 

2051 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

2052 tzinfo=pytz.UTC 

2053 ) 

2054 while last_computed_day < TODAY: 

2055 day_nb_events = events_collection.count_documents( 

2056 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2057 ) 

2058 if day_nb_events > 0: 

2059 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

2060 last_computed_day += ONE_DAY_OFFSET 

2061 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2062 if number_events_today > 0: 

2063 number_events_collection.delete_many({"timestamp": TODAY}) 

2064 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

2065 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2066 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2067 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2068 for day in number_events: 

2069 day["timestamp"] = day["timestamp"].timestamp() 

2070 items.append(cls.mongo_dict_to_object(day)) 

2071 return items 

2072 

2073 @classmethod 

2074 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2075 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2076 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2077 signals_number_samples_collection = get_collection( 

2078 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

2079 ) 

2080 items = [] 

2081 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2082 if number_samples_collection is None or recompute_amount: 

2083 number_samples_collection = get_collection( 

2084 systems_database, "number_received_samples", create=True, time_series=True 

2085 ) 

2086 number_samples_collection.delete_many({}) 

2087 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

2088 if first_sample is None: 

2089 return items 

2090 # compute from day of first found event 

2091 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

2092 tzinfo=pytz.UTC 

2093 ) 

2094 while last_computed_day < TODAY: 

2095 number_samples_request = signals_number_samples_collection.aggregate( 

2096 [ 

2097 { 

2098 "$match": { 

2099 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

2100 } 

2101 }, 

2102 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2103 ] 

2104 ).to_list() 

2105 if len(number_samples_request) == 0: 

2106 number_samples = 0 

2107 else: 

2108 number_samples = number_samples_request[0].get("number_samples", 0) 

2109 if number_samples > 0: 

2110 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

2111 last_computed_day += ONE_DAY_OFFSET 

2112 number_samples_request = signals_number_samples_collection.aggregate( 

2113 [ 

2114 {"$match": {"timestamp": {"$gte": TODAY}}}, 

2115 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2116 ] 

2117 ).to_list() 

2118 if len(number_samples_request) == 0: 

2119 number_samples_today = 0 

2120 else: 

2121 number_samples_today = number_samples_request[0].get("number_samples", 0) 

2122 if number_samples_today > 0: 

2123 number_samples_collection.delete_many({"timestamp": TODAY}) 

2124 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

2125 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2126 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2127 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2128 for day in number_events: 

2129 day["timestamp"] = day["timestamp"].timestamp() 

2130 items.append(cls.mongo_dict_to_object(day)) 

2131 return items 

2132 

2133 @classmethod 

2134 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2135 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2136 number_commands_collection = get_collection(systems_database, "number_commands") 

2137 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

2138 items = [] 

2139 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2140 if number_commands_collection is None or recompute_amount: 

2141 number_commands_collection = get_collection( 

2142 systems_database, "number_commands", create=True, time_series=True 

2143 ) 

2144 number_commands_collection.delete_many({}) 

2145 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

2146 if first_command is None: 

2147 return items 

2148 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

2149 tzinfo=pytz.UTC 

2150 ) 

2151 while last_computed_day < TODAY: 

2152 day_nb_commands = commands_collection.count_documents( 

2153 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2154 ) 

2155 if day_nb_commands > 0: 

2156 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

2157 last_computed_day += ONE_DAY_OFFSET 

2158 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2159 if number_commands_today > 0: 

2160 number_commands_collection.delete_many({"timestamp": TODAY}) 

2161 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

2162 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2163 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2164 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2165 for day in number_commands: 

2166 day["timestamp"] = day["timestamp"].timestamp() 

2167 items.append(cls.mongo_dict_to_object(day)) 

2168 return items 

2169 

2170 

2171class EventRule(GenericMongo): 

2172 collection_name: ClassVar[str] = "event_rules" 

2173 

2174 name: str 

2175 formula: str 

2176 variables: list[str] 

2177 

2178 @computed_field 

2179 @cached_property 

2180 def number_events(self) -> int: 

2181 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

2182 

2183 

2184class Company(GenericMongo): 

2185 collection_name: ClassVar[str] = "companies" 

2186 name: str 

2187 

2188 

2189class Campaign(GenericMongo): 

2190 collection_name: ClassVar[str] = "campaigns" 

2191 

2192 # Properties 

2193 id: str | None = None 

2194 name: str 

2195 description: str | None = None 

2196 

2197 

2198class Phase(GenericMongo): 

2199 collection_name: ClassVar[str] = "phases" 

2200 

2201 # Properties 

2202 id: str | None = None 

2203 name: str 

2204 description: str | None = None 

2205 start_at: float 

2206 end_at: float 

2207 

2208 # FK 

2209 campaign_id: MongoId 

2210 

2211 @classmethod 

2212 def deleteMany(cls, campaign_id): 

2213 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

2214 return delete_phases 

2215 

2216 

2217class CustomViewCreation(GenericMongo): 

2218 collection_name: ClassVar[str] = "custom_views" 

2219 

2220 name: str 

2221 configuration: list 

2222 

2223 

2224class CustomView(CustomViewCreation): 

2225 # Properties 

2226 id: str | None = None 

2227 

2228 # Foreign Key 

2229 user_id: str 

2230 

2231 

2232CustomViewUpdate = create_update_model(CustomView) 

2233 

2234 

2235class Video(GenericMongo): 

2236 collection_name: ClassVar[str] = "videos" 

2237 

2238 # Properties 

2239 name: str 

2240 ip_addr: str 

2241 username: str | None = None 

2242 password: str | None = None 

2243 

2244 # Methods 

2245 @classmethod 

2246 def get_all(cls, sort_by="_id") -> list[Self]: 

2247 items = [] 

2248 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

2249 items.append(cls.mongo_dict_to_object(dict_)) 

2250 return items 

2251 

2252 @classmethod 

2253 def get_video(cls, camera_id: ObjectId): 

2254 camera = cls.get_from_id(camera_id) 

2255 if camera is not None: 

2256 return camera.name 

2257 return None 

2258 

2259 

2260class Command(GenericMongo): 

2261 collection_name: ClassVar[str] = "commands" 

2262 

2263 # Properties 

2264 timestamp: datetime.datetime = None 

2265 sent_at: float 

2266 response_time: float = 0.0 

2267 command_type: str 

2268 description: str = "" 

2269 succeeded: bool = False 

2270 

2271 # Foreign key 

2272 user_id: str 

2273 

2274 @classmethod 

2275 def collection(cls): 

2276 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

2277 

2278 @classmethod 

2279 def create(cls, command: Self): 

2280 command = cls( 

2281 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

2282 sent_at=command.sent_at, 

2283 response_time=command.response_time, 

2284 command_type=command.command_type, 

2285 description=command.description, 

2286 succeeded=command.succeeded, 

2287 user_id=command.user_id, 

2288 ) 

2289 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2290 if new_command is None: 

2291 return None 

2292 return {"command_id": str(new_command.inserted_id)} 

2293 

2294 def receive_response(self, response: dict): 

2295 self.response_time = time.time() - self.sent_at 

2296 self.succeeded = response.get("error", True) is False 

2297 if self.description == "": 

2298 self.description += response.get("message", "").rstrip() 

2299 

2300 

2301class SignalsPresetCreation(GenericMongo): 

2302 name: str 

2303 signal_ids: list[str] 

2304 

2305 

2306class SignalsPreset(SignalsPresetCreation): 

2307 collection_name: ClassVar[str] = "signals_presets" 

2308 

2309 user_id: str 

2310 

2311 @classmethod 

2312 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2313 signals_preset = cls( 

2314 user_id=user_id, 

2315 name=signals_preset.name, 

2316 signal_ids=signals_preset.signal_ids, 

2317 ) 

2318 

2319 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2320 

2321 return str(new_signal_preset.inserted_id) 

2322 

2323 

2324SignalsPresetUpdate = create_update_model(SignalsPreset) 

2325 

2326 

2327class LineStyle(str, Enum): 

2328 solid = "solid" 

2329 dotted = "dotted" 

2330 dashed = "dashed" 

2331 

2332 

2333class SignalAppearance: 

2334 value_color: str 

2335 forced_value_color: str 

2336 

2337 

2338class GraphThemeCreation(GenericMongo): 

2339 collection_name: ClassVar[str] = "graph_themes" 

2340 

2341 name: str 

2342 signal_id: str 

2343 value_color: str = "" 

2344 forced_value_color: str = "" 

2345 value_line_style: LineStyle = LineStyle.solid 

2346 forced_value_line_style: LineStyle = LineStyle.solid 

2347 private: bool = True 

2348 

2349 

2350class PublicGraphTheme(GraphThemeCreation): 

2351 created_by_user: bool 

2352 in_user_library: bool 

2353 active_for_user: bool 

2354 

2355 _current_user_id: str = "" 

2356 

2357 @classproperty 

2358 def custom_pipeline_steps(cls) -> dict[str, list]: 

2359 return { 

2360 "created_by_user": [ 

2361 { 

2362 "$addFields": { 

2363 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2364 } 

2365 } 

2366 ], 

2367 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2368 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2369 ], 

2370 "in_user_library": [ 

2371 { 

2372 "$addFields": { 

2373 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2374 } 

2375 } 

2376 ], 

2377 "active_for_user": [ 

2378 { 

2379 "$addFields": { 

2380 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2381 } 

2382 } 

2383 ], 

2384 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2385 "active": [ 

2386 { 

2387 "$addFields": { 

2388 "active": "$$REMOVE", 

2389 } 

2390 } 

2391 ], 

2392 "creator_id": [ 

2393 { 

2394 "$addFields": { 

2395 "creator_id": "$$REMOVE", 

2396 } 

2397 } 

2398 ], 

2399 } 

2400 

2401 @classmethod 

2402 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2403 cls._current_user_id = user_id 

2404 return super().response_from_query(query) 

2405 

2406 @classmethod 

2407 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2408 query.in_user_library = "true" 

2409 return cls.response_from_query(query, user_id) 

2410 

2411 @classmethod 

2412 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2413 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2414 

2415 @classmethod 

2416 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2417 cls._current_user_id = user_id 

2418 return super().get_by_attribute(attribute_name, attribute_value) 

2419 

2420 @classmethod 

2421 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2422 cls._current_user_id = user_id 

2423 return super().get_one_by_attribute(attribute_name, attribute_value) 

2424 

2425 @classmethod 

2426 def get_all(cls, sort_by: str, user_id: str): 

2427 cls._current_user_id = user_id 

2428 return super().get_all(sort_by) 

2429 

2430 @classmethod 

2431 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2432 pipeline = [ 

2433 { 

2434 "$match": { 

2435 "active": {"$eq": user_id}, 

2436 "signal_id": {"$in": signal_ids}, 

2437 } 

2438 }, 

2439 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2440 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2441 { 

2442 "$project": { 

2443 "_id": 0, 

2444 "signal_id": 1, 

2445 "value_color": 1, 

2446 "forced_value_color": 1, 

2447 "value_line_style": 1, 

2448 "forced_value_line_style": 1, 

2449 } 

2450 }, 

2451 ] 

2452 

2453 result = {} 

2454 

2455 cursor = cls.collection().aggregate(pipeline) 

2456 for document in cursor: 

2457 signal_id = document["signal_id"] 

2458 del document["signal_id"] 

2459 result[signal_id] = document 

2460 

2461 return result 

2462 

2463 

2464GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2465 

2466 

2467class PrivateGraphTheme(GraphThemeCreation): 

2468 # private 

2469 creator_id: str 

2470 in_library: list[str] 

2471 active: list[str] 

2472 

2473 @classmethod 

2474 def create( 

2475 cls, 

2476 creator_id: str, 

2477 name: str, 

2478 signal_id: str, 

2479 value_color: str, 

2480 forced_value_color: str, 

2481 value_line_style: LineStyle, 

2482 forced_value_line_style: LineStyle, 

2483 private: bool, 

2484 ): 

2485 color_setting = cls( 

2486 creator_id=creator_id, 

2487 name=name, 

2488 signal_id=signal_id, 

2489 value_color=value_color, 

2490 forced_value_color=forced_value_color, 

2491 value_line_style=value_line_style, 

2492 forced_value_line_style=forced_value_line_style, 

2493 private=private, 

2494 in_library=[creator_id], 

2495 active=[creator_id], 

2496 ) 

2497 

2498 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2499 color_setting.id = str(new_color_setting.inserted_id) 

2500 return color_setting 

2501 

2502 def update(self, update_dict: dict, user_id: str): 

2503 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2504 if in_user_lib and user_id not in self.in_library: 

2505 self.in_library.append(user_id) 

2506 elif not in_user_lib and user_id in self.in_library: 

2507 self.in_library.remove(user_id) 

2508 update_dict["in_library"] = self.in_library 

2509 del update_dict["in_user_library"] 

2510 

2511 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2512 if active_for_user and user_id not in self.active: 

2513 self.active.append(user_id) 

2514 elif not active_for_user and user_id in self.active: 

2515 self.active.remove(user_id) 

2516 update_dict["active"] = self.active 

2517 del update_dict["active_for_user"] 

2518 

2519 if update_dict.get("created_by_user") is not None: 

2520 del update_dict["created_by_user"] 

2521 

2522 self.collection().find_one_and_update( 

2523 {"_id": ObjectId(self.id)}, 

2524 {"$set": update_dict}, 

2525 ) 

2526 

2527 return {} 

2528 

2529 

2530class DeviceStatus(str, Enum): 

2531 started = "started" 

2532 running = "running" 

2533 created = "created" 

2534 exited = "exited" 

2535 restarting = "restarting" 

2536 

2537 

2538class DeviceUpdateFromDeployer(BaseModel): 

2539 status: DeviceStatus 

2540 

2541 

2542class DeviceFromDeployerCreation(BaseModel): 

2543 name: str 

2544 description: str 

2545 

2546 

2547class DeviceFromDeployer(DeviceFromDeployerCreation): 

2548 status: DeviceStatus 

2549 device_id: DeviceId 

2550 logs: str = "" 

2551 

2552 

2553class DeviceDeployer(GenericMongo): 

2554 collection_name: ClassVar[str] = "device_deployers" 

2555 url: HttpUrl 

2556 

2557 def endpoint_url(self, endpoint): 

2558 return f"{str(self.url).rstrip('/')}/{endpoint}" 

2559 

2560 def devices(self) -> list[DeviceFromDeployer]: 

2561 devices = [] 

2562 try: 

2563 response = requests.get(self.endpoint_url("devices")) 

2564 except requests.exceptions.ConnectionError: 

2565 logger.info("connection error") 

2566 return None 

2567 if response.status_code != 200: 

2568 return None 

2569 for device_dict in response.json()["devices"]: 

2570 devices.append( 

2571 DeviceFromDeployer( 

2572 device_id=device_dict["device_id"], 

2573 name=device_dict["container_name"], 

2574 description="desc", 

2575 status=device_dict["status"], 

2576 logs=device_dict["logs"], 

2577 ) 

2578 ) 

2579 return devices 

2580 

2581 def get_device(self, device_id: DeviceId): 

2582 try: 

2583 response = requests.get(self.endpoint_url(f"devices/{device_id}")) 

2584 except requests.exceptions.ConnectionError: 

2585 return None 

2586 if response.status_code != 200: 

2587 return None 

2588 device_dict = response.json() 

2589 return DeviceFromDeployer( 

2590 device_id=device_dict["device_id"], 

2591 name=device_dict["container_name"], 

2592 description="desc", 

2593 status=device_dict["status"], 

2594 logs=device_dict["logs"], 

2595 ) 

2596 

2597 def create_device(self, device: DeviceFromDeployer) -> Device | None: 

2598 try: 

2599 response = requests.post(self.endpoint_url("devices"), json={"name": device.name}) 

2600 except requests.exceptions.ConnectionError: 

2601 return None 

2602 

2603 if response.status_code != 201: 

2604 return None 

2605 

2606 device_dict = response.json() 

2607 return DeviceFromDeployer( 

2608 device_id=device_dict["device_id"], 

2609 name="", 

2610 description="desc", 

2611 status=device_dict["status"], 

2612 ) 

2613 

2614 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None: 

2615 try: 

2616 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump()) 

2617 except requests.exceptions.ConnectionError: 

2618 return None 

2619 

2620 if response.status_code != 200: 

2621 return None 

2622 

2623 device_dict = response.json() 

2624 return Device( 

2625 device_id=device_dict["device_id"], 

2626 name="", 

2627 description="desc", 

2628 pid={}, 

2629 petri_network={}, 

2630 modes=[], 

2631 status=device_dict["status"], 

2632 ) 

2633 

2634 def delete_device(self, device_id: DeviceId) -> DeleteInfo: 

2635 try: 

2636 response = requests.delete(self.endpoint_url(f"devices/{device_id}")) 

2637 except requests.exceptions.ConnectionError: 

2638 return DeleteInfo(is_deleted=False, detail="Connection to deployer error") 

2639 if response.status_code not in [200, 202, 204]: 

2640 return DeleteInfo(is_deleted=False, detail=response.text) 

2641 

2642 return DeleteInfo(is_deleted=True, detail="") 

2643 

2644 

2645DeviceDeployerUpdate = create_update_model(DeviceDeployer)