Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 97%

1368 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-01 11:38 +0000

1from functools import cached_property 

2import os 

3import re 

4import io 

5import time 

6import csv 

7from typing import Self, ClassVar, Any, Literal, get_args, Annotated 

8import datetime 

9import math 

10import bisect 

11from enum import Enum 

12import logging 

13import copy 

14import asyncio 

15import requests 

16 

17import zipfile 

18import ping3 

19import pytz 

20from bson.objectid import ObjectId 

21from pymongo import ASCENDING, ReturnDocument 

22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator 

23import numpy as npy 

24import lttb 

25import h5py 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 signals_async_database, 

36 devices_states_database, 

37) 

38from twinpad_backend.responses import ListResponse 

39from twinpad_backend.messages import RabbitMQClient 

40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm 

41 

42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"] 

44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"] 

45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"] 

46 

47 

48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

53 

54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

55NUMBER_SAMPLES_DATABASE_UPDATE = 120 

56 

57logger = logging.getLogger("uvicorn.error") 

58 

59 

60class DeleteInfo(BaseModel): 

61 is_deleted: bool 

62 detail: str 

63 

64 

65class classproperty: 

66 """ 

67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

68 Found here: https://stackoverflow.com/a/76301341 

69 """ 

70 

71 def __init__(self, func): 

72 self.fget = func 

73 

74 def __get__(self, _, owner): 

75 return self.fget(owner) 

76 

77 

78def create_update_model(model): 

79 fields = {} 

80 

81 for field_name, field_annotation in model.model_fields.items(): 

82 if field_name != "id": 

83 fields[field_name] = (field_annotation.annotation | None, None) 

84 

85 query_name = model.__name__ + "Update" 

86 return create_model(query_name, **fields) 

87 

88 

89def get_utc_date_from_timestamp(timestamp: float): 

90 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

91 

92 

93def downsample_list(time_vector: list, values: list, max_number_samples: int): 

94 if len(time_vector) < max_number_samples: 

95 return time_vector, values 

96 

97 time_vector_copy = copy.deepcopy(time_vector) 

98 values_copy = copy.deepcopy(values) 

99 

100 none_group_bounds = [] 

101 none_group_index = -1 

102 index = -1 

103 # LTTB doesn't handle None values so remove them 

104 while values_copy.count(None) > 0: 

105 # Store bounds of None value groups so we can insert them back after the downsampling 

106 if (new_index := values_copy.index(None)) != index: 

107 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

108 none_group_index += 1 

109 elif len(none_group_bounds[none_group_index]) < 2: 

110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

111 else: 

112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

113 values_copy.pop(new_index) 

114 index = new_index 

115 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

116 

117 try: 

118 values_array = npy.array([time_vector_copy, values_copy]).T 

119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

120 

121 new_time_vector = interpolated_values[:, 0].tolist() 

122 new_values = interpolated_values[:, 1].tolist() 

123 except ValueError: 

124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

128 return new_time_vector, new_values_nan_to_none 

129 

130 # insert back None values at the correct timestamps 

131 for none_group in none_group_bounds: 

132 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

133 new_time_vector[start_index:start_index] = none_group 

134 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

135 

136 return new_time_vector, new_values 

137 

138 

139def is_of_type(value, wanted_type): 

140 if wanted_type is float: 

141 return isinstance(value, (int, float)) 

142 return isinstance(value, wanted_type) 

143 

144 

145# Models 

146class TwinPadModel(BaseModel): 

147 @classmethod 

148 def dict_to_object(cls, dict_): 

149 return cls.model_validate(dict_) 

150 

151 def to_dict(self, exclude=None): 

152 dict_ = self.model_dump(exclude=exclude, mode="json") 

153 return dict_ 

154 

155 

156def validate_mongo_id(v): 

157 if not ObjectId.is_valid(v): 

158 raise ValueError("Invalid MongoDB id") 

159 return str(v) 

160 

161 

162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)] 

163 

164 

165def validate_12_hex(v: str) -> str: 

166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v): 

167 raise ValueError("ID must be a 12-character hexadecimal string") 

168 return v 

169 

170 

171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)] 

172 

173 

174class GenericMongo(TwinPadModel): 

175 id: MongoId | None = None 

176 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

177 

178 @classmethod 

179 def collection(cls): 

180 return get_collection(systems_database, cls.collection_name, create=True) 

181 

182 @classmethod 

183 def response_from_query(cls, query) -> ListResponse[Self]: 

184 request_filters = query.mongodb_filter() 

185 items = [] 

186 

187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

188 sort_dict = {} 

189 for sort in query.sort_by.split(","): 

190 if ":" in sort: 

191 sort_field, sort_order = sort.split(":") 

192 sort_order = int(sort_order) 

193 else: 

194 sort_field = sort 

195 sort_order = 1 

196 sort_dict[sort_field] = sort_order 

197 

198 collection = get_collection(systems_database, cls.collection_name, create=True) 

199 total = collection.count_documents(request_filters) 

200 

201 pipeline = [] 

202 added_properties = [] 

203 if "$and" in request_filters: 

204 for request_filter in request_filters["$and"]: 

205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

206 if filtered_property in request_filter: 

207 pipeline.extend(pipeline_steps) 

208 added_properties.append(filtered_property) 

209 else: 

210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

211 if filtered_property in request_filters: 

212 pipeline.extend(pipeline_steps) 

213 added_properties.append(filtered_property) 

214 pipeline.append({"$match": request_filters}) 

215 

216 for sort_field in sort_dict.keys(): 

217 if sort_field in cls.custom_pipeline_steps: 

218 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

219 added_properties.append(sort_field) 

220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

221 

222 if (query.limit is not None) and (query.limit != 0): 

223 pipeline.append({"$limit": query.limit}) 

224 

225 for filtered_property, step in cls.custom_pipeline_steps.items(): 

226 if filtered_property not in added_properties: 

227 pipeline.extend(step) 

228 

229 cursor = collection.aggregate(pipeline) 

230 

231 for item_dict in cursor: 

232 items.append(cls.mongo_dict_to_object(item_dict)) 

233 

234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

235 

236 @classmethod 

237 def get_from_id(cls, item_id) -> Self | None: 

238 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

239 

240 @classmethod 

241 def mongo_dict_to_object(cls, mongo_dict): 

242 mongo_dict["id"] = str(mongo_dict["_id"]) 

243 del mongo_dict["_id"] 

244 return cls.dict_to_object(mongo_dict) 

245 

246 @classmethod 

247 def get_by_attribute(cls, attribute_name: str, attribute_value): 

248 """Returns all items that match the attribute with value.""" 

249 pipeline = [] 

250 if attribute_name in cls.custom_pipeline_steps: 

251 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

252 pipeline.append({"$match": {attribute_name: attribute_value}}) 

253 for key, step in cls.custom_pipeline_steps.items(): 

254 if key != attribute_name: 

255 pipeline.extend(step) 

256 items = cls.collection().aggregate(pipeline) 

257 if items is None: 

258 return None 

259 return [cls.mongo_dict_to_object(d) for d in items] 

260 

261 @classmethod 

262 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

263 pipeline = [] 

264 if attribute_name in cls.custom_pipeline_steps: 

265 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

266 pipeline.append({"$match": {attribute_name: attribute_value}}) 

267 pipeline.append({"$limit": 1}) 

268 for key, step in cls.custom_pipeline_steps.items(): 

269 if key != attribute_name: 

270 pipeline.extend(step) 

271 items = cls.collection().aggregate(pipeline).to_list() 

272 if len(items) == 0: 

273 return None 

274 return cls.mongo_dict_to_object(items[0]) 

275 

276 @classmethod 

277 def get_all(cls, sort_by="_id") -> list[Self]: 

278 items = [] 

279 pipeline = [] 

280 if sort_by in cls.custom_pipeline_steps: 

281 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

282 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

283 for key, step in cls.custom_pipeline_steps.items(): 

284 if key != sort_by: 

285 pipeline.extend(step) 

286 for dict_ in cls.collection().aggregate(pipeline): 

287 items.append(cls.mongo_dict_to_object(dict_)) 

288 return items 

289 

290 @classmethod 

291 def get_number_documents(cls): 

292 collection = get_collection(systems_database, cls.collection_name) 

293 if collection is None: 

294 return 0 

295 return collection.count_documents( 

296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]} 

297 ) 

298 

299 def insert(self): 

300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

301 self.id = str(insert_result.inserted_id) 

302 return self.id 

303 

304 def update(self, update_dict): 

305 for key, value in update_dict.items(): 

306 setattr(self, key, value) 

307 self.collection().find_one_and_update( 

308 {"_id": ObjectId(self.id)}, 

309 {"$set": update_dict}, 

310 return_document=ReturnDocument.AFTER, 

311 ) 

312 

313 return self 

314 

315 def delete(self): 

316 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

317 return result.deleted_count > 0 

318 

319 

320class User(GenericMongo): 

321 collection_name: ClassVar[str] = "users" 

322 

323 firstname: str 

324 lastname: str 

325 email: str 

326 password: str 

327 is_blocked: bool | None = False 

328 is_admin: bool | None = False 

329 is_connected: bool | None = False 

330 company_id: str | None = None 

331 

332 def to_dict(self, exclude: set = set()): 

333 exclude.add("password") 

334 return GenericMongo.to_dict(self, exclude=exclude) 

335 

336 @classmethod 

337 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

338 users = cls.get_all() 

339 if not users: 

340 is_admin = True 

341 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

342 user_collection = get_collection(systems_database, "users", create=True) 

343 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

344 if new_user is None: 

345 return None 

346 return {"user_id": str(new_user.inserted_id)} 

347 

348 @classmethod 

349 def update_info(cls, user: "UserUpdate", user_id: str): 

350 updated_user = cls.collection().find_one_and_update( 

351 {"_id": ObjectId(user_id)}, 

352 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

353 return_document=ReturnDocument.AFTER, 

354 ) 

355 updated_user["id"] = str(updated_user["_id"]) 

356 del (updated_user["_id"], updated_user["is_connected"]) 

357 return cls(**updated_user) 

358 

359 

360UserUpdate = create_update_model(User) 

361 

362 

363class Mode(TwinPadModel): 

364 mode_id: int 

365 name: str 

366 frequency_multiplier: float 

367 min_frequency: float 

368 

369 

370class DeviceUpdate(TwinPadModel): 

371 mode_id: int 

372 

373 

374class Device(GenericMongo): 

375 collection_name: ClassVar[str] = "devices" 

376 

377 device_id: DeviceId 

378 name: str 

379 description: str = "" 

380 modes: list[Mode] 

381 current_mode_id: int | None = None 

382 last_ping: float | None = None 

383 petri_network: Any 

384 pid: Any 

385 load: float | None = None 

386 tokens: list[int] = Field(default_factory=list) 

387 status: str 

388 

389 async def change_mode(self, update_dict, current_user: User): 

390 has_error = False 

391 

392 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

393 has_error = True 

394 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

395 elif self.current_mode_id is not None: 

396 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

397 else: 

398 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

399 command = Command( 

400 sent_at=time.time(), 

401 command_type="Mode change", 

402 description=description, 

403 user_id=current_user.id, 

404 ) 

405 

406 if has_error: 

407 command.response_time = 0 

408 command.succeeded = False 

409 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

410 else: 

411 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id) 

412 command.receive_response(response) 

413 

414 Command.create(command) 

415 return response 

416 

417 @classmethod 

418 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

419 devices_by_id = {} 

420 for signal_id in signal_ids: 

421 device_id = signal_id.split(".")[0] 

422 if device_id not in devices_by_id: 

423 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

424 return devices_by_id 

425 

426 

427class DeviceSetup(GenericMongo): 

428 collection_name: ClassVar[str] = "device_setups" 

429 

430 device_ids: list[str] 

431 active: bool = False 

432 variable_mapping: dict[str, str] 

433 

434 

435DeviceSetupUpdate = create_update_model(DeviceSetup) 

436 

437 

438class DeviceState(GenericMongo): 

439 collection_name: ClassVar[str] = "devices_states" 

440 

441 timestamp: float 

442 mode: str | None = None 

443 load: float | None = None 

444 tokens: list[int] = Field(default_factory=list) 

445 modified_properties: list[str] = Field(default_factory=list) 

446 

447 @classmethod 

448 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]: 

449 req_filter = query.mongodb_filter() 

450 items = [] 

451 if ":" in query.sort_by: 

452 sort_field, sort_order = query.sort_by.split(":") 

453 sort_order = int(sort_order) 

454 else: 

455 sort_field = query.sort_by 

456 sort_order = 1 

457 collection = get_collection(devices_states_database, device_id) 

458 if collection is None: 

459 total = 0 

460 cursor = [] 

461 else: 

462 total = collection.count_documents(req_filter) 

463 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

464 if (query.limit is not None) and (query.limit != 0): 

465 cursor = cursor.limit(query.limit) 

466 for item_dict in cursor: 

467 items.append( 

468 cls( 

469 timestamp=item_dict.get("precise_timestamp"), 

470 mode=item_dict.get("mode", None), 

471 load=item_dict.get("load", None), 

472 tokens=item_dict.get("tokens", Field(default_factory=list)), 

473 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

474 ) 

475 ) 

476 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

477 

478 

479class SignalSample(TwinPadModel): 

480 signal_id: str 

481 timestamp: float 

482 value: float | int | str | bool | None 

483 forced_value: float | int | str | bool | None = None 

484 

485 @classmethod 

486 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

487 

488 collection = get_signal_collection(signal_id) 

489 if collection is None: 

490 return None 

491 

492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

494 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

495 first_bucket = None 

496 if bucket is not None: 

497 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

498 if first_bucket is not None: 

499 sample_data = collection.find_one( 

500 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

501 ) 

502 else: 

503 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

504 

505 if sample_data is None: 

506 return None 

507 

508 timestamp = sample_data["precise_timestamp"] 

509 

510 return cls( 

511 signal_id=signal_id, 

512 timestamp=timestamp, 

513 value=sample_data.get("value", None), 

514 forced_value=sample_data.get("forced_value", None), 

515 ) 

516 

517 @classmethod 

518 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

519 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

520 

521 @classmethod 

522 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

523 last_value_collection = get_signal_collection("last_values", True) 

524 signals_collection = get_collection(systems_database, "signals", create=True) 

525 

526 sample_data = last_value_collection.find_one({"signal_id": signal_id}, sort={"precise_timestamp": -1}) 

527 

528 # If there is no data, check if the signal's type is anything other than float, as they don't send duplicate values 

529 if sample_data is None: 

530 if signals_collection.count_documents({"status.status": "up", "signal_id": signal_id}) < 1: 

531 return None 

532 

533 # Same workaround as above function, very effective to narrow down big sets of data 

534 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

535 last_bucket = None 

536 if bucket is not None: 

537 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

538 signal_collection = get_signal_collection(signal_id) 

539 if last_bucket is not None and signal_collection is not None: 

540 sample_data = signal_collection.find_one( 

541 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

542 sort={"precise_timestamp": -1}, 

543 ) 

544 else: 

545 sample_data = signal_collection.find_one({}, sort={"precise_timestamp": -1}) 

546 

547 if sample_data is None: 

548 return None 

549 

550 timestamp = sample_data.get("precise_timestamp") 

551 # Align the timestamp with the device's last ping, cannot align with current time to avoid false reports if device is down 

552 if device is None: 

553 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

554 if device is not None and device.last_ping is not None: 

555 if timestamp is None: 

556 timestamp = device.last_ping 

557 else: 

558 timestamp = max(timestamp, device.last_ping) 

559 else: 

560 timestamp = sample_data.get("precise_timestamp") 

561 

562 return cls( 

563 signal_id=signal_id, 

564 timestamp=timestamp, 

565 value=sample_data.get("value", None), 

566 forced_value=sample_data.get("forced_value", None), 

567 ) 

568 

569 @classmethod 

570 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

571 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

572 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

573 

574 

575class SignalData(TwinPadModel): 

576 signal_id: str 

577 forcible: bool = True 

578 time_vector: list[float] 

579 values: list[float | int | str | None] 

580 forced_values: list[float | int | str | None] 

581 

582 data_start: float | None = None 

583 data_end: float | None = None 

584 

585 number_samples: int = 0 

586 number_samples_db: int = 0 

587 

588 db_query_time: float = 0.0 

589 init_time: float = 0.0 

590 data_processing_time: float = 0.0 

591 

592 phase_id: str | None = None 

593 

594 @classmethod 

595 def get_from_signal_id( 

596 cls, 

597 signal_id: str, 

598 min_timestamp: float = None, 

599 max_timestamp: float = None, 

600 window_min_timestamp: float = None, 

601 window_max_timestamp: float = None, 

602 interpolate_bounds: bool = True, 

603 max_documents: int = None, 

604 collection=None, 

605 ) -> Self: 

606 

607 now = time.time() 

608 

609 req_signal = {} 

610 if min_timestamp is not None: 

611 req_signal.setdefault("timestamp", {}) 

612 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

613 if max_timestamp is not None: 

614 req_signal.setdefault("timestamp", {}) 

615 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

616 

617 if collection is None: 

618 collection = get_signal_collection(signal_id) 

619 if collection is None: 

620 return cls( 

621 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

622 ) 

623 

624 db_req_start = time.time() 

625 

626 sort_step = {"$sort": {"precise_timestamp": 1}} 

627 number_results = collection.count_documents(req_signal) 

628 

629 pipeline = [] 

630 if req_signal: 

631 pipeline.append({"$match": req_signal}) # Filter data if needed 

632 

633 pipeline.extend( 

634 [ 

635 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

636 sort_step, 

637 ] 

638 ) 

639 

640 if max_documents is not None and max_documents < number_results: 

641 unsampling_ratio = math.ceil(number_results / max_documents) 

642 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

643 pipeline.extend( 

644 [ 

645 { 

646 "$setWindowFields": { 

647 "sortBy": {"precise_timestamp": 1}, 

648 "output": {"index": {"$documentNumber": {}}}, 

649 } 

650 }, 

651 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

652 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

653 {"$replaceRoot": {"newRoot": "$doc"}}, 

654 {"$unset": ["index", "group_id"]}, 

655 {"$sort": {"precise_timestamp": 1}}, 

656 ] 

657 ) 

658 

659 # logger.info(f"pipeline: %s", str(pipeline)) 

660 cursor = collection.aggregate(pipeline) 

661 db_req_time = time.time() - db_req_start 

662 

663 init_time = time.time() 

664 

665 results = cursor.to_list() 

666 time_vector = [] 

667 values = [] 

668 forced_values = [] 

669 for s in results: 

670 time_vector.append(s["precise_timestamp"]) 

671 values.append(s.get("value", None)) 

672 forced_values.append(s.get("forced_value", None)) 

673 

674 signal = Signal.get_from_signal_id(signal_id) 

675 if signal is None: 

676 return cls( 

677 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

678 ) 

679 class_ = signal.signal_data_class 

680 

681 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

682 time_vector, values, forced_values = cls.interpolate_bounds( 

683 class_, 

684 collection, 

685 signal_id, 

686 time_vector, 

687 values, 

688 forced_values, 

689 window_min_timestamp, 

690 window_max_timestamp, 

691 ) 

692 

693 if values: 

694 # TODO: check below. a bit strange 

695 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

696 # Adding last value as it should be repeated 

697 time_vector.append(now) 

698 values.append(values[-1]) 

699 forced_values.append(forced_values[-1]) 

700 

701 init_time = time.time() - init_time 

702 

703 # See line 292 for explanation 

704 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

705 first_bucket = None 

706 if bucket is not None: 

707 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

708 if first_bucket is not None: 

709 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

710 else: 

711 data_start = None 

712 

713 last_bucket = None 

714 if bucket is not None: 

715 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

716 if last_bucket is not None: 

717 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

718 else: 

719 data_end = None 

720 

721 return class_( 

722 signal_id=signal_id, 

723 forcible=signal.forcible, 

724 time_vector=time_vector, 

725 values=values, 

726 forced_values=forced_values, 

727 data_start=data_start, 

728 data_end=data_end, 

729 number_samples=len(values), 

730 number_samples_db=number_results, 

731 db_query_time=db_req_time, 

732 init_time=init_time, 

733 ) 

734 

735 @staticmethod 

736 def interpolate_bounds( 

737 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

738 ): 

739 sample_right = None 

740 # Fetching right side value & interpolation 

741 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

742 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

743 sample_right = collection.find_one( 

744 { 

745 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

746 "value": {"$exists": True}, 

747 }, 

748 sort={"precise_timestamp": -1}, 

749 ) 

750 if sample_right: 

751 if time_vector: 

752 right_sd = class_( 

753 signal_id=signal_id, 

754 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

755 values=[values[-1], sample_right.get("value", None)], 

756 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

757 ) 

758 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

759 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

760 else: 

761 max_ts_value = sample_right.get("value", None) 

762 max_ts_forced_value = sample_right.get("forced_value", None) 

763 time_vector.append(window_max_timestamp) 

764 values.append(max_ts_value) 

765 forced_values.append(max_ts_forced_value) 

766 

767 # Fetching left side value & interpolation 

768 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

769 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

770 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

771 sample_left = sample_right 

772 sample_left = collection.find_one( 

773 { 

774 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

775 "value": {"$exists": True}, 

776 }, 

777 sort={"precise_timestamp": -1}, 

778 ) 

779 

780 if sample_left: 

781 if time_vector: 

782 left_sd = class_( 

783 signal_id=signal_id, 

784 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

785 values=[sample_left["value"], values[0]], 

786 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

787 ) 

788 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

789 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

790 else: 

791 min_ts_value = sample_left.get("value", None) 

792 min_ts_forced_value = sample_left.get("forced_value", None) 

793 time_vector.insert(0, window_min_timestamp) 

794 values.insert(0, min_ts_value) 

795 forced_values.insert(0, min_ts_forced_value) 

796 

797 return time_vector, values, forced_values 

798 

799 def interpolate_values(self, new_time_vector: list[float]): 

800 return self.interpolate(new_time_vector, self.values) 

801 

802 def interpolate_forced_values(self, new_time_vector: list[float]): 

803 return self.interpolate(new_time_vector, self.forced_values) 

804 

805 def uniform_desampling(self, number_samples_max: int = None) -> Self: 

806 if number_samples_max is None or self.number_samples <= number_samples_max: 

807 return self 

808 

809 data_processing_time = time.time() 

810 

811 new_time_vector = npy.linspace(self.time_vector[0], self.time_vector[-1], number_samples_max).tolist() 

812 values = self.interpolate_values(new_time_vector) 

813 forced_values = self.interpolate_forced_values(new_time_vector) 

814 number_samples = len(new_time_vector) 

815 

816 data_processing_time = time.time() - data_processing_time 

817 

818 return self.__class__( 

819 signal_id=self.signal_id, 

820 time_vector=new_time_vector, 

821 values=values, 

822 forced_values=forced_values, 

823 number_samples=number_samples, 

824 number_samples_db=self.number_samples_db, 

825 data_start=self.data_start, 

826 data_end=self.data_end, 

827 db_query_time=self.db_query_time, 

828 init_time=self.init_time, 

829 data_processing_time=self.data_processing_time + data_processing_time, 

830 phase_id=self.phase_id, 

831 ) 

832 

833 def min_max_downsampling(self, number_samples_max: int) -> Self: 

834 return self.uniform_desampling(number_samples_max) 

835 

836 def interest_window_desampling( 

837 self, 

838 window_max_number_samples: int, 

839 outside_max_number_samples: int, 

840 window_min_timestamp: float | None = None, 

841 window_max_timestamp: float | None = None, 

842 ) -> Self: 

843 """Performs a sampling in a window of interest and outside.""" 

844 

845 if not self.time_vector: 

846 return self 

847 

848 if window_min_timestamp is None: 

849 window_min_timestamp = self.time_vector[0] 

850 if window_max_timestamp is None: 

851 window_max_timestamp = self.time_vector[-1] 

852 

853 data_processing_time = time.time() 

854 

855 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

856 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

857 

858 time_vector_before = self.time_vector[:index_window_start] 

859 time_vector_window = self.time_vector[index_window_start:index_window_end] 

860 time_vector_after = self.time_vector[index_window_end:] 

861 

862 # Resampling window 

863 if time_vector_window: 

864 # Ensurring window bounds 

865 if time_vector_window[0] != window_min_timestamp: 

866 time_vector_window.insert(0, window_min_timestamp) 

867 if time_vector_window[-1] != window_max_timestamp: 

868 time_vector_window.append(window_max_timestamp) 

869 else: 

870 time_vector_window = [window_min_timestamp, window_max_timestamp] 

871 

872 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

873 # Resampling 

874 new_window_time_vector = npy.linspace( 

875 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

876 ).tolist() 

877 time_vector_window = new_window_time_vector 

878 

879 # Resampling outside 

880 time_vector_before, time_vector_after = SignalData.resample_outside_window( 

881 time_vector_before, time_vector_after, outside_max_number_samples 

882 ) 

883 

884 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

885 values = self.interpolate_values(new_time_vector) 

886 forced_values = self.interpolate_forced_values(new_time_vector) 

887 number_samples = len(values) 

888 

889 data_processing_time = time.time() - data_processing_time 

890 

891 return self.__class__( 

892 signal_id=self.signal_id, 

893 forcible=self.forcible, 

894 time_vector=new_time_vector, 

895 values=values, 

896 forced_values=forced_values, 

897 number_samples=number_samples, 

898 number_samples_db=self.number_samples, 

899 data_start=self.data_start, 

900 data_end=self.data_end, 

901 db_query_time=self.db_query_time, 

902 init_time=self.init_time, 

903 data_processing_time=self.data_processing_time + data_processing_time, 

904 ) 

905 

906 def zero_time_vector(self, data_start: float): 

907 data_processing_time = time.time() 

908 if len(self.time_vector) == 0: 

909 return self 

910 time_vector = npy.array(self.time_vector) - data_start 

911 data_processing_time = time.time() - data_processing_time 

912 

913 return self.__class__( 

914 signal_id=self.signal_id, 

915 time_vector=time_vector, 

916 values=self.values, 

917 forced_values=self.forced_values, 

918 number_samples=self.number_samples, 

919 number_samples_db=self.number_samples_db, 

920 data_start=time_vector[0], 

921 data_end=time_vector[-1], 

922 db_query_time=self.db_query_time, 

923 init_time=self.init_time, 

924 data_processing_time=self.data_processing_time + data_processing_time, 

925 ) 

926 

927 def csv_export(self): 

928 output = io.StringIO() 

929 writer = csv.writer(output) 

930 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

931 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

932 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

933 return output.getvalue().encode("utf-8") 

934 

935 def prestoplot_export(self): 

936 clean_signal_id = self.signal_id.replace(".", "_") 

937 if clean_signal_id[0].isnumeric(): 

938 clean_signal_id = "_" + clean_signal_id 

939 

940 output = io.StringIO() 

941 output.write("# Encoding:\tUTF-8\n") 

942 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

943 output.write("ISO8601\tnone\tnone\n") 

944 output.write(f"# Description :\t{clean_signal_id}\n") 

945 

946 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

947 output.write( 

948 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

949 ) 

950 return output.getvalue().encode("utf-8") 

951 

952 @staticmethod 

953 def resample_outside_window(time_vector_left, time_vector_right, outside_max_number_samples): 

954 number_samples_left = len(time_vector_left) 

955 number_samples_right = len(time_vector_right) 

956 

957 if ( 

958 outside_max_number_samples is None 

959 or (number_samples_left + number_samples_right) <= outside_max_number_samples 

960 ): 

961 return time_vector_left, time_vector_right 

962 

963 new_time_vector_left = time_vector_left 

964 new_time_vector_right = time_vector_right 

965 

966 new_number_samples_before = min( 

967 number_samples_left, 

968 math.ceil(outside_max_number_samples * number_samples_left / (number_samples_left + number_samples_right)), 

969 ) 

970 new_number_samples_after = min( 

971 number_samples_right, 

972 math.ceil(outside_max_number_samples * number_samples_right / (number_samples_left + number_samples_right)), 

973 ) 

974 # Adjusting numbers as math.ceil can do +1 on sum 

975 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

976 if new_number_samples_before > new_number_samples_after: 

977 new_number_samples_before -= 1 

978 else: 

979 new_number_samples_after -= 1 

980 

981 if new_number_samples_before > 0: 

982 new_time_vector_left = npy.linspace( 

983 time_vector_left[0], time_vector_left[-1], new_number_samples_before 

984 ).tolist() 

985 

986 if new_number_samples_after > 0: 

987 new_time_vector_right = npy.linspace( 

988 time_vector_right[-1], time_vector_right[0], new_number_samples_after 

989 ).tolist()[::-1] 

990 

991 return new_time_vector_left, new_time_vector_right 

992 

993 

994class NumericSignalData(SignalData): 

995 data_type: str = "float" 

996 values: list[float | int | None] 

997 forced_values: list[float | int | None] 

998 

999 def interpolate(self, new_time_vector: list[float], items): 

1000 items = [npy.nan if s is None else s for s in items] 

1001 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

1002 

1003 def uniform_desampling(self, number_samples_max: int) -> Self: 

1004 if number_samples_max is None or self.number_samples <= number_samples_max: 

1005 return self 

1006 

1007 data_processing_time = time.time() 

1008 

1009 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

1010 forced_values = self.interpolate_forced_values(time_vector) 

1011 number_samples = len(time_vector) 

1012 

1013 data_processing_time = time.time() - data_processing_time 

1014 

1015 return self.__class__( 

1016 signal_id=self.signal_id, 

1017 time_vector=time_vector, 

1018 values=values, 

1019 forced_values=forced_values, 

1020 number_samples=number_samples, 

1021 number_samples_db=self.number_samples_db, 

1022 data_start=self.data_start, 

1023 data_end=self.data_end, 

1024 db_query_time=self.db_query_time, 

1025 init_time=self.init_time, 

1026 data_processing_time=self.data_processing_time + data_processing_time, 

1027 ) 

1028 

1029 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1030 if self.number_samples < number_samples_max: 

1031 return self 

1032 

1033 data_processing_time = time.time() 

1034 

1035 number_bins = number_samples_max // 2 

1036 

1037 time_vector = npy.array(self.time_vector, dtype=npy.float64) 

1038 values = npy.array(self.values, dtype=npy.float64) 

1039 forced_values = npy.array(self.forced_values, dtype=npy.float64) 

1040 

1041 points_per_bin = self.number_samples // number_bins 

1042 

1043 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above 

1044 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for 

1045 if self.number_samples - number_bins * points_per_bin > 1: 

1046 points_per_bin += 1 

1047 number_bins = self.number_samples // points_per_bin + 1 

1048 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan) 

1049 time_vector = npy.concatenate([time_vector, nan_points_to_add]) 

1050 values = npy.concatenate([values, nan_points_to_add]) 

1051 forced_values = npy.concatenate([forced_values, nan_points_to_add]) 

1052 

1053 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin) 

1054 values_matrix = values.reshape(number_bins, points_per_bin) 

1055 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin) 

1056 

1057 indexes_min = npy.zeros(number_bins, dtype="int64") 

1058 indexes_max = npy.zeros(number_bins, dtype="int64") 

1059 

1060 for row in range(number_bins): 

1061 min_value = values_matrix[row, 0] 

1062 max_value = values_matrix[row, 0] 

1063 for column in range(points_per_bin): 

1064 if values_matrix[row, column] < min_value: 

1065 min_value = values_matrix[row, column] 

1066 indexes_min[row] = column 

1067 elif values_matrix[row, column] > max_value: 

1068 max_value = values_matrix[row, column] 

1069 indexes_max[row] = column 

1070 

1071 row_index = npy.repeat(npy.arange(number_bins), 2) 

1072 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel() 

1073 

1074 data_processing_time = time.time() - data_processing_time 

1075 

1076 new_time_vector = timestamps_matrix[row_index, column_index] 

1077 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector) 

1078 new_values = values_matrix[row_index, column_index] 

1079 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1080 new_forced_values = forced_values_matrix[row_index, column_index] 

1081 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1082 

1083 # Make sure there are no None values for the time vector 

1084 time_vector_filter = new_time_vector != None 

1085 new_time_vector = new_time_vector[time_vector_filter] 

1086 new_values = new_values[time_vector_filter] 

1087 new_forced_values = new_forced_values[time_vector_filter] 

1088 

1089 return self.__class__( 

1090 signal_id=self.signal_id, 

1091 time_vector=new_time_vector, 

1092 values=new_values, 

1093 forced_values=new_forced_values, 

1094 number_samples=number_bins * 2, 

1095 number_samples_db=self.number_samples_db, 

1096 data_start=self.data_start, 

1097 data_end=self.data_end, 

1098 db_query_time=self.db_query_time, 

1099 init_time=self.init_time, 

1100 data_processing_time=self.data_processing_time + data_processing_time, 

1101 phase_id=self.phase_id, 

1102 ) 

1103 

1104 def interest_window_desampling( 

1105 self, 

1106 window_max_number_samples: int, 

1107 outside_max_number_samples: int, 

1108 window_min_timestamp: float | None = None, 

1109 window_max_timestamp: float | None = None, 

1110 ) -> Self: 

1111 """Performs a sampling in a window of interest and outside.""" 

1112 

1113 if not self.time_vector: 

1114 return self 

1115 

1116 if window_min_timestamp is None: 

1117 window_min_timestamp = self.time_vector[0] 

1118 if window_max_timestamp is None: 

1119 window_max_timestamp = self.time_vector[-1] 

1120 

1121 data_processing_time = time.time() 

1122 

1123 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1124 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1125 

1126 time_vector_before = self.time_vector[:index_window_start] 

1127 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1128 time_vector_after = self.time_vector[index_window_end:] 

1129 

1130 values_before = self.values[:index_window_start] 

1131 values_window = self.values[index_window_start:index_window_end] 

1132 values_after = self.values[index_window_end:] 

1133 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1134 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1135 

1136 # Resampling window 

1137 if time_vector_window: 

1138 # Ensurring window bounds 

1139 if time_vector_window[0] != window_min_timestamp: 

1140 time_vector_window.insert(0, window_min_timestamp) 

1141 values_window.insert(0, window_min_value) 

1142 if time_vector_window[-1] != window_max_timestamp: 

1143 time_vector_window.append(window_max_timestamp) 

1144 values_window.append(window_max_value) 

1145 else: 

1146 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1147 values_window = [window_min_value, window_max_value] 

1148 

1149 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

1150 # Resampling 

1151 time_vector_window, values_window = downsample_list( 

1152 time_vector_window, values_window, window_max_number_samples 

1153 ) 

1154 

1155 # Resampling outside 

1156 number_samples_before = len(time_vector_before) 

1157 number_samples_after = len(time_vector_after) 

1158 if ( 

1159 outside_max_number_samples is not None 

1160 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1161 ): 

1162 new_number_samples_before = min( 

1163 number_samples_before, 

1164 math.ceil( 

1165 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1166 ), 

1167 ) 

1168 new_number_samples_after = min( 

1169 number_samples_after, 

1170 math.ceil( 

1171 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1172 ), 

1173 ) 

1174 # Adjusting numbers as math.ceil can do +1 on sum 

1175 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1176 if new_number_samples_before > new_number_samples_after: 

1177 new_number_samples_before -= 1 

1178 else: 

1179 new_number_samples_after -= 1 

1180 

1181 if new_number_samples_before > 0: 

1182 time_vector_before, values_before = downsample_list( 

1183 time_vector_before, values_before, new_number_samples_before 

1184 ) 

1185 

1186 if new_number_samples_after > 0: 

1187 time_vector_after, values_after = downsample_list( 

1188 time_vector_after, values_after, new_number_samples_after 

1189 ) 

1190 

1191 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1192 values = values_before + values_window + values_after 

1193 forced_values = self.interpolate_forced_values(new_time_vector) 

1194 number_samples = len(values) 

1195 

1196 data_processing_time = time.time() - data_processing_time 

1197 

1198 return self.__class__( 

1199 signal_id=self.signal_id, 

1200 time_vector=new_time_vector, 

1201 values=values, 

1202 forced_values=forced_values, 

1203 number_samples=number_samples, 

1204 number_samples_db=self.number_samples, 

1205 data_start=self.data_start, 

1206 data_end=self.data_end, 

1207 db_query_time=self.db_query_time, 

1208 init_time=self.init_time, 

1209 data_processing_time=self.data_processing_time + data_processing_time, 

1210 ) 

1211 

1212 

1213class StringSignalData(SignalData): 

1214 data_type: str = "str" 

1215 values: list[str | None] 

1216 forced_values: list[str | None] 

1217 

1218 def interpolate(self, new_time_vector: list[float], items): 

1219 # Find the indices of the values in xp that are just smaller or equal to x 

1220 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1221 indices = npy.clip(indices, 0, len(items) - 1) 

1222 # Return the corresponding left string values from fp 

1223 return [items[i] for i in indices] 

1224 

1225 

1226class SignalsData(TwinPadModel): 

1227 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1228 data_processing_time: float 

1229 data_start: float | None 

1230 data_end: float | None 

1231 

1232 @classmethod 

1233 def get_from_signal_ids( 

1234 cls, 

1235 signal_ids: list[str], 

1236 min_timestamp: float = None, 

1237 max_timestamp: float = None, 

1238 window_min_timestamp: float = None, 

1239 window_max_timestamp: float = None, 

1240 interpolate_bounds: bool = True, 

1241 max_documents: int = None, 

1242 ) -> Self: 

1243 signals_data = [] 

1244 data_start = None 

1245 data_end = None 

1246 if max_timestamp is None: 

1247 max_timestamp = time.time() 

1248 data_processing_time = 0.0 

1249 

1250 signal_collections = get_signal_collections_batch(signal_ids) 

1251 

1252 for signal_id, collection in zip(signal_ids, signal_collections): 

1253 signal_data = SignalData.get_from_signal_id( 

1254 signal_id=signal_id, 

1255 min_timestamp=min_timestamp, 

1256 max_timestamp=max_timestamp, 

1257 window_min_timestamp=window_min_timestamp, 

1258 window_max_timestamp=window_max_timestamp, 

1259 interpolate_bounds=interpolate_bounds, 

1260 max_documents=max_documents, 

1261 collection=collection, 

1262 ) 

1263 data_processing_time += signal_data.data_processing_time 

1264 signals_data.append(signal_data) 

1265 if signal_data.data_start is not None: 

1266 if data_start is None: 

1267 data_start = signal_data.data_start 

1268 else: 

1269 data_start = min(signal_data.data_start, data_start) 

1270 if signal_data.data_end is not None: 

1271 if data_end is None: 

1272 data_end = signal_data.data_end 

1273 else: 

1274 data_end = max(signal_data.data_end, data_end) 

1275 

1276 return cls( 

1277 signals_data=signals_data, 

1278 data_processing_time=data_processing_time, 

1279 data_start=data_start, 

1280 data_end=data_end, 

1281 ) 

1282 

1283 @classmethod 

1284 def get_from_phase_and_signal_ids( 

1285 cls, 

1286 phases: list, 

1287 phase_sync_times: list[float | None], 

1288 signal_ids: list[str], 

1289 window_min_timestamps: list[float | None], 

1290 window_max_timestamps: list[float | None], 

1291 zero_time_vector: bool = True, 

1292 ): 

1293 signals_data: list[SignalData] = [] 

1294 computation_start = time.time() 

1295 

1296 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip( 

1297 phases, phase_sync_times, window_min_timestamps, window_max_timestamps 

1298 ): 

1299 min_timestamp = phase.start_at / 1000 

1300 max_timestamp = phase.end_at / 1000 

1301 

1302 if sync_time is None: 

1303 sync_time = min_timestamp 

1304 

1305 if window_max_timestamp is not None and window_min_timestamp is not None: 

1306 window_length = window_max_timestamp - window_min_timestamp 

1307 

1308 if window_min_timestamp != min_timestamp: 

1309 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20) 

1310 if window_max_timestamp != max_timestamp: 

1311 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20) 

1312 

1313 signal_collections = get_signal_collections_batch(signal_ids) 

1314 

1315 for signal_id, collection in zip(signal_ids, signal_collections): 

1316 signal_data = SignalData.get_from_signal_id( 

1317 signal_id, 

1318 min_timestamp, 

1319 max_timestamp, 

1320 window_min_timestamp, 

1321 window_max_timestamp, 

1322 interpolate_bounds=False, 

1323 max_documents=None, 

1324 collection=collection, 

1325 ) 

1326 

1327 if len(signal_data.time_vector) == 0: 

1328 continue 

1329 

1330 if zero_time_vector: 

1331 signal_data = signal_data.zero_time_vector(sync_time) 

1332 signal_data.phase_id = phase.id 

1333 

1334 signals_data.append(signal_data) 

1335 

1336 return cls( 

1337 signals_data=signals_data, 

1338 data_processing_time=time.time() - computation_start, 

1339 data_start=0, 

1340 data_end=0, 

1341 ) 

1342 

1343 def uniform_desampling(self, number_samples_max: int) -> Self: 

1344 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1345 return SignalsData( 

1346 signals_data=signals_data, 

1347 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1348 data_start=self.data_start, 

1349 data_end=self.data_end, 

1350 ) 

1351 

1352 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1353 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1354 return SignalsData( 

1355 signals_data=signals_data, 

1356 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1357 data_start=self.data_start, 

1358 data_end=self.data_end, 

1359 ) 

1360 

1361 def interest_window_desampling( 

1362 self, 

1363 window_max_number_samples: int, 

1364 outside_max_number_samples: int, 

1365 window_min_timestamp: float = None, 

1366 window_max_timestamp: float = None, 

1367 ) -> Self: 

1368 signals_data = [ 

1369 s.interest_window_desampling( 

1370 window_max_number_samples=window_max_number_samples, 

1371 outside_max_number_samples=outside_max_number_samples, 

1372 window_min_timestamp=window_min_timestamp, 

1373 window_max_timestamp=window_max_timestamp, 

1374 ) 

1375 for s in self.signals_data 

1376 ] 

1377 

1378 return SignalsData( 

1379 signals_data=signals_data, 

1380 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1381 data_start=self.data_start, 

1382 data_end=self.data_end, 

1383 ) 

1384 

1385 def zero_time_vector(self, data_start: float): 

1386 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data] 

1387 return SignalsData( 

1388 signals_data=signals_data, 

1389 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1390 data_start=0, 

1391 data_end=max([s.data_end for s in signals_data]), 

1392 ) 

1393 

1394 @classmethod 

1395 async def apply_single_function( 

1396 cls, 

1397 phase, 

1398 base_signal_id: str, 

1399 function: SINGLE_POST_PROCESSING_FUNCTION, 

1400 window_min_timestamp: float = None, 

1401 window_max_timestamp: float = None, 

1402 ): 

1403 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}" 

1404 

1405 processed_result_signal = Signal.get_from_signal_id(signal_id) 

1406 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids: 

1407 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp) 

1408 

1409 signals_data = cls.get_from_phase_and_signal_ids( 

1410 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False 

1411 ) 

1412 

1413 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0: 

1414 return None 

1415 

1416 new_values = None 

1417 new_forced_values = None 

1418 time_vector = npy.array(signals_data.signals_data[0].time_vector) 

1419 values = signals_data.signals_data[0].values 

1420 forced_values = signals_data.signals_data[0].forced_values 

1421 

1422 match (function): 

1423 case "Cumul": 

1424 new_values = cumul(values) 

1425 new_forced_values = cumul(forced_values) 

1426 # case "CumulDistrib": 

1427 # new_values = cumul_distrib(values) 

1428 # new_forced_values = cumul_distrib(forced_values) 

1429 case "Delta": 

1430 new_values = delta(values) 

1431 new_forced_values = delta(forced_values) 

1432 case "DeltaT": 

1433 new_values = delta(time_vector) 

1434 new_forced_values = new_values 

1435 case "Derive": 

1436 new_values = derive(time_vector, values) 

1437 new_forced_values = derive(time_vector, forced_values) 

1438 case "Integ": 

1439 new_values = integ(time_vector, values) 

1440 new_forced_values = integ(time_vector, forced_values) 

1441 

1442 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1443 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1444 

1445 loop = asyncio.get_running_loop() 

1446 loop.create_task( 

1447 cls.save_function_signal( 

1448 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible 

1449 ) 

1450 ) 

1451 

1452 if window_max_timestamp is not None: 

1453 max_timestamp_mask = time_vector <= window_max_timestamp 

1454 time_vector = time_vector[max_timestamp_mask] 

1455 new_values = new_values[max_timestamp_mask] 

1456 new_forced_values = new_forced_values[max_timestamp_mask] 

1457 if window_min_timestamp is not None: 

1458 min_timestamp_mask = time_vector >= window_min_timestamp 

1459 time_vector = time_vector[min_timestamp_mask] 

1460 new_values = new_values[min_timestamp_mask] 

1461 new_forced_values = new_forced_values[min_timestamp_mask] 

1462 

1463 signals_data.signals_data[0].time_vector = time_vector.tolist() 

1464 signals_data.signals_data[0].values = new_values.tolist() 

1465 signals_data.signals_data[0].forced_values = new_forced_values.tolist() 

1466 signals_data.signals_data[0].number_samples = time_vector.size 

1467 

1468 signals_data.signals_data[0].signal_id = signal_id 

1469 

1470 return signals_data 

1471 

1472 @classmethod 

1473 async def apply_multiple_function( 

1474 cls, 

1475 phases: list, 

1476 signal_ids: list, 

1477 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

1478 window_min_timestamp: float = None, 

1479 window_max_timestamp: float = None, 

1480 ): 

1481 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION): 

1482 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}" 

1483 else: 

1484 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}" 

1485 

1486 active_phase = phases[0] 

1487 if function in {"Align-X", "Using-X"}: 

1488 active_phase = phases[1] 

1489 

1490 processed_result_signal = Signal.get_from_signal_id(function_signal_id) 

1491 if processed_result_signal is not None and ( 

1492 active_phase.id in processed_result_signal.computed_phases_ids 

1493 ): # If signal has been computed for the correct phase 

1494 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp) 

1495 

1496 array_length = None 

1497 time_vector_list = [] 

1498 values_list = [] 

1499 forced_values_list = [] 

1500 forcible = True 

1501 for phase, signal_id in zip(phases, signal_ids): 

1502 signals_data = cls.get_from_phase_and_signal_ids( 

1503 [phase], [None], [signal_id], [None], [None], zero_time_vector=False 

1504 ) 

1505 

1506 if len(signals_data.signals_data) == 0: 

1507 return None 

1508 

1509 signal_data = signals_data.signals_data[0] 

1510 

1511 if array_length is None: 

1512 array_length = signal_data.number_samples 

1513 if ( 

1514 array_length != signal_data.number_samples and function != "Align-X" 

1515 ) or signal_data.number_samples == 0: 

1516 return None 

1517 

1518 time_vector_list.append(npy.array(signal_data.time_vector)) 

1519 values_list.append(npy.array(signal_data.values, dtype=npy.float64)) 

1520 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64)) 

1521 forcible = forcible and signal_data.forcible 

1522 

1523 time_vector = time_vector_list[0] 

1524 new_values = None 

1525 new_forced_values = None 

1526 

1527 match (function): 

1528 case "Align-X": 

1529 time_vector = time_vector_list[1] 

1530 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000 

1531 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000 

1532 new_values = align_x(old_time_vector, values_list[0], new_time_vector) 

1533 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector) 

1534 # case "Atan2": 

1535 # new_values = atan2(values_list[0], values_list[1]) 

1536 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1]) 

1537 case "Using-X": 

1538 if len(time_vector_list[0]) != len(time_vector_list[1]): 

1539 return None 

1540 time_vector = time_vector_list[1] 

1541 new_values = values_list[0] 

1542 new_forced_values = forced_values_list[0] 

1543 case "Mean": 

1544 new_values = mean(*values_list) 

1545 new_forced_values = mean(*forced_values_list) 

1546 case "Norm": 

1547 new_values = norm(*values_list) 

1548 new_forced_values = norm(*forced_values_list) 

1549 

1550 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1551 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1552 

1553 loop = asyncio.get_running_loop() 

1554 loop.create_task( 

1555 cls.save_function_signal( 

1556 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible 

1557 ) 

1558 ) 

1559 

1560 total_number_samples = time_vector.size 

1561 

1562 if window_max_timestamp is not None: 

1563 max_timestamp_mask = time_vector <= window_max_timestamp 

1564 time_vector = time_vector[max_timestamp_mask] 

1565 new_values = new_values[max_timestamp_mask] 

1566 new_forced_values = new_forced_values[max_timestamp_mask] 

1567 if window_min_timestamp is not None: 

1568 min_timestamp_mask = time_vector >= window_min_timestamp 

1569 time_vector = time_vector[min_timestamp_mask] 

1570 new_values = new_values[min_timestamp_mask] 

1571 new_forced_values = new_forced_values[min_timestamp_mask] 

1572 

1573 signals_data = cls( 

1574 signals_data=[ 

1575 NumericSignalData( 

1576 signal_id=function_signal_id, 

1577 forcible=forcible, 

1578 time_vector=time_vector.tolist(), 

1579 values=new_values.tolist(), 

1580 forced_values=new_forced_values.tolist(), 

1581 number_samples=time_vector.size, 

1582 number_samples_db=total_number_samples, 

1583 ) 

1584 ], 

1585 data_processing_time=0, 

1586 data_start=0, 

1587 data_end=0, 

1588 ) 

1589 

1590 return signals_data 

1591 

1592 @classmethod 

1593 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float): 

1594 signal_data_collection = get_signal_collection(signal_id, create=True) 

1595 pipeline = [] 

1596 match_filter = {} 

1597 if window_min_timestamp is not None or window_max_timestamp is not None: 

1598 match_filter["$match"] = {} 

1599 match_filter["$match"]["precise_timestamp"] = {} 

1600 if window_max_timestamp is not None: 

1601 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp 

1602 if window_min_timestamp is not None: 

1603 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp 

1604 

1605 total_number_samples = signal_data_collection.count_documents({}) 

1606 

1607 if match_filter: 

1608 pipeline.append(match_filter) 

1609 

1610 fetch_start = time.time() 

1611 

1612 samples = signal_data_collection.aggregate(pipeline).to_list() 

1613 new_time_vector = [] 

1614 new_values = [] 

1615 new_forced_values = [] 

1616 for sample in samples: 

1617 new_time_vector.append(sample["precise_timestamp"]) 

1618 new_values.append(sample["value"]) 

1619 new_forced_values.append(sample["forced_value"]) 

1620 

1621 return cls( 

1622 signals_data=[ 

1623 NumericSignalData( 

1624 signal_id=signal_id, 

1625 time_vector=new_time_vector, 

1626 values=new_values, 

1627 forced_values=new_forced_values, 

1628 number_samples=len(new_time_vector), 

1629 number_samples_db=total_number_samples, 

1630 ) 

1631 ], 

1632 data_processing_time=time.time() - fetch_start, 

1633 data_start=0, 

1634 data_end=0, 

1635 ) 

1636 

1637 @classmethod 

1638 async def save_function_signal( 

1639 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool 

1640 ): 

1641 # Insert data first so if it is requested by another user, it will be computed again 

1642 signal_collection = get_signal_collection(function_signal_id, create=True) 

1643 signal_collection.delete_many( 

1644 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}} 

1645 ) 

1646 signal_collection.insert_many( 

1647 [ 

1648 { 

1649 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]), 

1650 "precise_timestamp": time_vector[i], 

1651 "value": new_values[i], 

1652 "forced_value": new_forced_values[i], 

1653 } 

1654 for i in range(len(time_vector)) 

1655 ] 

1656 ) 

1657 

1658 signals_config_collection = get_collection(systems_database, "signals", create=True) 

1659 signals_config_collection.find_one_and_update( 

1660 {"signal_id": function_signal_id}, 

1661 { 

1662 "$set": { 

1663 "description": "", 

1664 "unit": None, 

1665 "type": "sensor", 

1666 "address": None, 

1667 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0 

1668 "transfer_function": None, 

1669 "precision_digits": None, 

1670 "digitization_function": None, 

1671 "data_type": "float", 

1672 "formula": None, 

1673 "forcible": forcible, 

1674 "commandable": False, 

1675 "broadcastable": True, 

1676 "signal_id": function_signal_id, 

1677 "post_processing": True, 

1678 }, 

1679 "$push": {"computed_phases_ids": phase.id}, 

1680 }, 

1681 upsert=True, 

1682 ) 

1683 

1684 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []): 

1685 if post_processing: 

1686 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1687 zip_buffer = io.BytesIO() 

1688 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1689 for signal_data in self.signals_data: 

1690 file_name = signal_data.signal_id 

1691 if post_processing: 

1692 phase = phases_by_id.get( 

1693 signal_data.phase_id, 

1694 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1695 ) 

1696 file_name = f"{signal_data.signal_id} ({phase.name})" 

1697 if file_format == "csv": 

1698 export_io = signal_data.csv_export() 

1699 zip_file.writestr(f"{file_name}.csv", export_io) 

1700 elif file_format == "prestoplot": 

1701 export_io = signal_data.prestoplot_export() 

1702 zip_file.writestr(f"{file_name}.tab", export_io) 

1703 else: 

1704 raise ValueError(f"Format not found. Got: {file_format}") 

1705 zip_bytes = zip_buffer.getvalue() 

1706 return zip_bytes 

1707 

1708 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []): 

1709 if post_processing: 

1710 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1711 hdf5_buffer = io.BytesIO() 

1712 custom_type_float = npy.dtype( 

1713 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1714 ) 

1715 custom_type_string = npy.dtype( 

1716 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1717 ) 

1718 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1719 for signal_data in self.signals_data: 

1720 if post_processing: 

1721 phase = phases_by_id.get( 

1722 signal_data.phase_id, 

1723 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1724 ) 

1725 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})") 

1726 else: 

1727 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1728 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1729 if signal_data.data_type == "str": 

1730 export_data = npy.array( 

1731 list( 

1732 zip( 

1733 date_vector, 

1734 signal_data.time_vector, 

1735 signal_data.values, 

1736 signal_data.forced_values, 

1737 ) 

1738 ), 

1739 dtype=custom_type_string, 

1740 ) 

1741 else: 

1742 export_data = npy.array( 

1743 list( 

1744 zip( 

1745 date_vector, 

1746 signal_data.time_vector, 

1747 signal_data.values, 

1748 signal_data.forced_values, 

1749 ) 

1750 ), 

1751 dtype=custom_type_float, 

1752 ) 

1753 signal_group["data"] = export_data 

1754 return hdf5_buffer.getvalue() 

1755 

1756 

1757class SignalStatus(TwinPadModel): 

1758 status: str = "down" 

1759 reason: str = "" 

1760 delay: float | None = None 

1761 

1762 

1763class DigitizationFunction(TwinPadModel): 

1764 bits: int | None = None 

1765 min_value: float 

1766 max_value: float 

1767 min_raw_value: float 

1768 max_raw_value: float 

1769 

1770 

1771class SignalUpdate(TwinPadModel): 

1772 value: float | str | bool | int | None = None 

1773 forced_value: float | str | bool | int | None = None 

1774 timestamp: int | None = None 

1775 

1776 

1777class SignalType(str, Enum): 

1778 command = "command" 

1779 sensor = "sensor" 

1780 external_sensor = "external_sensor" 

1781 

1782 

1783SIGNALDATA_TYPES = { 

1784 "int": NumericSignalData, 

1785 "float": NumericSignalData, 

1786 "str": StringSignalData, 

1787 "bool": NumericSignalData, 

1788 "epoch": NumericSignalData, 

1789} 

1790 

1791 

1792class Signal(GenericMongo): 

1793 collection_name: ClassVar[str] = "signals" 

1794 

1795 signal_id: str 

1796 frequency: float 

1797 unit: str | None 

1798 description: str 

1799 type: SignalType 

1800 data_type: str 

1801 precision_digits: int | None 

1802 forcible: bool 

1803 commandable: bool 

1804 broadcastable: bool 

1805 status: SignalStatus = SignalStatus() 

1806 

1807 post_processing: bool = False 

1808 computed_phases_ids: list[str] = [] 

1809 

1810 digitization_function: DigitizationFunction | None 

1811 

1812 @property 

1813 def device(self) -> Device: 

1814 device_id = self.signal_id.split(".")[0] 

1815 device = Device.get_one_by_attribute("device_id", device_id) 

1816 return device 

1817 

1818 @cached_property 

1819 def signal_data_class(self): 

1820 if self.data_type in SIGNALDATA_TYPES: 

1821 return SIGNALDATA_TYPES[self.data_type] 

1822 if self.data_type.startswith("enum"): 

1823 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1824 raise ValueError(f"Unhandled python type: {self.data_type}") 

1825 

1826 @cached_property 

1827 def python_type(self): 

1828 if self.data_type in TYPES: 

1829 return TYPES[self.data_type] 

1830 if self.data_type.startswith("enum"): 

1831 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1832 return Literal[*choices] 

1833 raise ValueError(f"Unhandled python type: {self.data_type}") 

1834 

1835 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1836 command = Command( 

1837 sent_at=time.time(), 

1838 command_type="Signal command", 

1839 user_id=current_user.id, 

1840 ) 

1841 

1842 has_input_error = False 

1843 error_message = "" 

1844 

1845 if self.data_type.startswith("enum"): 

1846 enum_options = get_args(self.python_type) 

1847 

1848 if update_dict.value is not None and update_dict.value not in enum_options: 

1849 has_input_error = True 

1850 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1851 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1852 has_input_error = True 

1853 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1854 else: 

1855 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1856 has_input_error = True 

1857 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1858 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1859 has_input_error = True 

1860 error_message += ( 

1861 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1862 ) 

1863 

1864 if has_input_error: 

1865 command.response_time = 0 

1866 command.succeeded = False 

1867 command.description = f"Tried to modify signal {self.signal_id}" 

1868 response = {"error": True, "status_code": 400, "message": error_message} 

1869 else: 

1870 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict) 

1871 command.receive_response(response) 

1872 

1873 Command.create(command) 

1874 return response 

1875 

1876 @classmethod 

1877 def get_from_signal_id(cls, signal_id) -> Self: 

1878 """Could be generic from mongo""" 

1879 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1880 if not raw_value: 

1881 return None 

1882 del raw_value["_id"] 

1883 return cls.dict_to_object(raw_value) 

1884 

1885 @classmethod 

1886 def get_all_ids(cls) -> list[str]: 

1887 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1888 

1889 return [signal["signal_id"] for signal in cursor] 

1890 

1891 @classmethod 

1892 def get_all_statuses(cls) -> list[dict]: 

1893 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

1894 

1895 return [ 

1896 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1897 for signal in cursor 

1898 ] 

1899 

1900 async def number_samples(self): 

1901 collection = get_signal_collection(signal_id=self.signal_id) 

1902 if collection is None: 

1903 return 0 

1904 

1905 number_samples = collection.estimated_document_count() 

1906 

1907 number_samples_async_collection = await get_async_collection( 

1908 systems_async_database, "number_samples", create=True, time_series=True 

1909 ) 

1910 

1911 loop = asyncio.get_running_loop() 

1912 loop.create_task( 

1913 number_samples_async_collection.insert_one( 

1914 { 

1915 "timestamp": datetime.datetime.now(pytz.UTC), 

1916 "signal_id": self.signal_id, 

1917 "number_samples": number_samples, 

1918 } 

1919 ) 

1920 ) 

1921 

1922 return number_samples 

1923 

1924 @classmethod 

1925 def total_number_samples(cls) -> int: 

1926 TwinPadActivity.get_number_samples_timeframe(0, 0, False) 

1927 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1928 

1929 if number_samples_collection is None: 

1930 return 0 

1931 

1932 result = number_samples_collection.aggregate( 

1933 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}] 

1934 ) 

1935 

1936 result = result.to_list() 

1937 if len(result) == 0: 

1938 return 0 

1939 return result[0]["amount"] 

1940 

1941 def sample_datasize(self): 

1942 return signals_database.command("collstats", self.signal_id)["size"] 

1943 

1944 @classmethod 

1945 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1946 result = cls.collection().aggregate( 

1947 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1948 ) 

1949 

1950 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1951 

1952 

1953class ForcedSignal(GenericMongo): 

1954 collection_name: ClassVar[str] = "forced_signals" 

1955 

1956 signal_id: str 

1957 forcing_user_id: str 

1958 forced_at: float 

1959 value: str | float 

1960 

1961 def insert(self): 

1962 insert_result = self.collection().find_one_and_update( 

1963 {"signal_id": self.signal_id}, 

1964 {"$set": self.to_dict(exclude={"id"})}, 

1965 upsert=True, 

1966 return_document=ReturnDocument.AFTER, 

1967 ) 

1968 self.id = str(insert_result["_id"]) 

1969 return self.id 

1970 

1971 @classmethod 

1972 def can_force(cls, signal_id: str, current_user: User) -> bool: 

1973 """Checks whether user can force a given signal. 

1974 

1975 :param signal_id: Signal ID of the signal to force 

1976 :type signal_id: str 

1977 :param current_user: Current user 

1978 :type current_user: User 

1979 :return: False if the signal was forced by someone else than the user, True otherwise 

1980 :rtype: bool 

1981 """ 

1982 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

1983 if forced_signal is not None: 

1984 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

1985 return False 

1986 return True 

1987 

1988 

1989class ServicesStatus(TwinPadModel): 

1990 backend: str 

1991 cloud_broker: str 

1992 time_series_database: str 

1993 signal_storage: str 

1994 heartbeat_storage: str 

1995 data_analyzer: str 

1996 

1997 @classmethod 

1998 def check(cls) -> Self: 

1999 return cls( 

2000 cloud_broker=ping(RABBITMQ_HOST), 

2001 backend="up", 

2002 time_series_database=ping(MONGO_HOST), 

2003 signal_storage=ping(SIGNAL_STORAGE_HOST), 

2004 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

2005 data_analyzer=ping(DATA_ANALYZER_HOST), 

2006 ) 

2007 

2008 

2009def ping(host): 

2010 try: 

2011 if ping3.ping(host, timeout=0.8): 

2012 return "up" 

2013 except PermissionError: 

2014 pass 

2015 return "down" 

2016 

2017 

2018class Event(GenericMongo): 

2019 collection_name: ClassVar[str] = "events" 

2020 

2021 name: str 

2022 timestamp: float 

2023 event_rule_id: str 

2024 

2025 @computed_field 

2026 @cached_property 

2027 def event_rule(self) -> "EventRule": 

2028 return EventRule.get_from_id(self.event_rule_id) 

2029 

2030 @classmethod 

2031 def dict_to_object(cls, dict_): 

2032 """Refine to convert timestamp to datetime for mongodb.""" 

2033 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

2034 return super().dict_to_object(dict_) 

2035 

2036 

2037class TwinPadActivity(GenericMongo): 

2038 timestamp: float 

2039 amount: int 

2040 

2041 @classmethod 

2042 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

2043 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2044 number_events_collection = get_collection(systems_database, "number_events") 

2045 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

2046 items = [] 

2047 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2048 if number_events_collection is None or recompute_amount: 

2049 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

2050 number_events_collection.delete_many({}) 

2051 first_event = events_collection.find_one(sort={"timestamp": 1}) 

2052 if first_event is None: 

2053 return items 

2054 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

2055 tzinfo=pytz.UTC 

2056 ) 

2057 while last_computed_day < TODAY: 

2058 day_nb_events = events_collection.count_documents( 

2059 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2060 ) 

2061 if day_nb_events > 0: 

2062 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

2063 last_computed_day += ONE_DAY_OFFSET 

2064 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2065 if number_events_today > 0: 

2066 number_events_collection.delete_many({"timestamp": TODAY}) 

2067 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

2068 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2069 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2070 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2071 for day in number_events: 

2072 day["timestamp"] = day["timestamp"].timestamp() 

2073 items.append(cls.mongo_dict_to_object(day)) 

2074 return items 

2075 

2076 @classmethod 

2077 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2078 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2079 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2080 signals_number_samples_collection = get_collection( 

2081 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

2082 ) 

2083 items = [] 

2084 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2085 if number_samples_collection is None or recompute_amount: 

2086 number_samples_collection = get_collection( 

2087 systems_database, "number_received_samples", create=True, time_series=True 

2088 ) 

2089 number_samples_collection.delete_many({}) 

2090 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

2091 if first_sample is None: 

2092 return items 

2093 # compute from day of first found event 

2094 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

2095 tzinfo=pytz.UTC 

2096 ) 

2097 while last_computed_day < TODAY: 

2098 number_samples_request = signals_number_samples_collection.aggregate( 

2099 [ 

2100 { 

2101 "$match": { 

2102 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

2103 } 

2104 }, 

2105 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2106 ] 

2107 ).to_list() 

2108 if len(number_samples_request) == 0: 

2109 number_samples = 0 

2110 else: 

2111 number_samples = number_samples_request[0].get("number_samples", 0) 

2112 if number_samples > 0: 

2113 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

2114 last_computed_day += ONE_DAY_OFFSET 

2115 number_samples_request = signals_number_samples_collection.aggregate( 

2116 [ 

2117 {"$match": {"timestamp": {"$gte": TODAY}}}, 

2118 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2119 ] 

2120 ).to_list() 

2121 if len(number_samples_request) == 0: 

2122 number_samples_today = 0 

2123 else: 

2124 number_samples_today = number_samples_request[0].get("number_samples", 0) 

2125 if number_samples_today > 0: 

2126 number_samples_collection.delete_many({"timestamp": TODAY}) 

2127 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

2128 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2129 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2130 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2131 for day in number_events: 

2132 day["timestamp"] = day["timestamp"].timestamp() 

2133 items.append(cls.mongo_dict_to_object(day)) 

2134 return items 

2135 

2136 @classmethod 

2137 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2138 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2139 number_commands_collection = get_collection(systems_database, "number_commands") 

2140 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

2141 items = [] 

2142 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2143 if number_commands_collection is None or recompute_amount: 

2144 number_commands_collection = get_collection( 

2145 systems_database, "number_commands", create=True, time_series=True 

2146 ) 

2147 number_commands_collection.delete_many({}) 

2148 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

2149 if first_command is None: 

2150 return items 

2151 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

2152 tzinfo=pytz.UTC 

2153 ) 

2154 while last_computed_day < TODAY: 

2155 day_nb_commands = commands_collection.count_documents( 

2156 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2157 ) 

2158 if day_nb_commands > 0: 

2159 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

2160 last_computed_day += ONE_DAY_OFFSET 

2161 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2162 if number_commands_today > 0: 

2163 number_commands_collection.delete_many({"timestamp": TODAY}) 

2164 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

2165 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2166 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2167 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2168 for day in number_commands: 

2169 day["timestamp"] = day["timestamp"].timestamp() 

2170 items.append(cls.mongo_dict_to_object(day)) 

2171 return items 

2172 

2173 

2174class EventRule(GenericMongo): 

2175 collection_name: ClassVar[str] = "event_rules" 

2176 

2177 name: str 

2178 formula: str 

2179 variables: list[str] 

2180 

2181 @computed_field 

2182 @cached_property 

2183 def number_events(self) -> int: 

2184 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

2185 

2186 

2187class Company(GenericMongo): 

2188 collection_name: ClassVar[str] = "companies" 

2189 name: str 

2190 

2191 

2192class Campaign(GenericMongo): 

2193 collection_name: ClassVar[str] = "campaigns" 

2194 

2195 # Properties 

2196 id: str | None = None 

2197 name: str 

2198 description: str | None = None 

2199 

2200 

2201class Phase(GenericMongo): 

2202 collection_name: ClassVar[str] = "phases" 

2203 

2204 # Properties 

2205 id: str | None = None 

2206 name: str 

2207 description: str | None = None 

2208 start_at: float 

2209 end_at: float 

2210 

2211 # FK 

2212 campaign_id: MongoId 

2213 

2214 @classmethod 

2215 def deleteMany(cls, campaign_id): 

2216 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

2217 return delete_phases 

2218 

2219 

2220class CustomViewCreation(GenericMongo): 

2221 collection_name: ClassVar[str] = "custom_views" 

2222 

2223 name: str 

2224 configuration: list 

2225 

2226 

2227class CustomView(CustomViewCreation): 

2228 # Properties 

2229 id: str | None = None 

2230 

2231 # Foreign Key 

2232 user_id: str 

2233 

2234 

2235CustomViewUpdate = create_update_model(CustomView) 

2236 

2237 

2238class Video(GenericMongo): 

2239 collection_name: ClassVar[str] = "videos" 

2240 

2241 # Properties 

2242 name: str 

2243 ip_addr: str 

2244 username: str | None = None 

2245 password: str | None = None 

2246 

2247 # Methods 

2248 @classmethod 

2249 def get_all(cls, sort_by="_id") -> list[Self]: 

2250 items = [] 

2251 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

2252 items.append(cls.mongo_dict_to_object(dict_)) 

2253 return items 

2254 

2255 @classmethod 

2256 def get_video(cls, camera_id: ObjectId): 

2257 camera = cls.get_from_id(camera_id) 

2258 if camera is not None: 

2259 return camera.name 

2260 return None 

2261 

2262 

2263class Command(GenericMongo): 

2264 collection_name: ClassVar[str] = "commands" 

2265 

2266 # Properties 

2267 timestamp: datetime.datetime = None 

2268 sent_at: float 

2269 response_time: float = 0.0 

2270 command_type: str 

2271 description: str = "" 

2272 succeeded: bool = False 

2273 

2274 # Foreign key 

2275 user_id: str 

2276 

2277 @classmethod 

2278 def collection(cls): 

2279 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

2280 

2281 @classmethod 

2282 def create(cls, command: Self): 

2283 command = cls( 

2284 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

2285 sent_at=command.sent_at, 

2286 response_time=command.response_time, 

2287 command_type=command.command_type, 

2288 description=command.description, 

2289 succeeded=command.succeeded, 

2290 user_id=command.user_id, 

2291 ) 

2292 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2293 if new_command is None: 

2294 return None 

2295 return {"command_id": str(new_command.inserted_id)} 

2296 

2297 def receive_response(self, response: dict): 

2298 self.response_time = time.time() - self.sent_at 

2299 self.succeeded = response.get("error", True) is False 

2300 if self.description == "": 

2301 self.description += response.get("message", "").rstrip() 

2302 

2303 

2304class SignalsPresetCreation(GenericMongo): 

2305 name: str 

2306 signal_ids: list[str] 

2307 

2308 

2309class SignalsPreset(SignalsPresetCreation): 

2310 collection_name: ClassVar[str] = "signals_presets" 

2311 

2312 user_id: str 

2313 

2314 @classmethod 

2315 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2316 signals_preset = cls( 

2317 user_id=user_id, 

2318 name=signals_preset.name, 

2319 signal_ids=signals_preset.signal_ids, 

2320 ) 

2321 

2322 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2323 

2324 return str(new_signal_preset.inserted_id) 

2325 

2326 

2327SignalsPresetUpdate = create_update_model(SignalsPreset) 

2328 

2329 

2330class LineStyle(str, Enum): 

2331 solid = "solid" 

2332 dotted = "dotted" 

2333 dashed = "dashed" 

2334 

2335 

2336class SignalAppearance: 

2337 value_color: str 

2338 forced_value_color: str 

2339 

2340 

2341class GraphThemeCreation(GenericMongo): 

2342 collection_name: ClassVar[str] = "graph_themes" 

2343 

2344 name: str 

2345 signal_id: str 

2346 value_color: str = "" 

2347 forced_value_color: str = "" 

2348 value_line_style: LineStyle = LineStyle.solid 

2349 forced_value_line_style: LineStyle = LineStyle.solid 

2350 private: bool = True 

2351 

2352 

2353class PublicGraphTheme(GraphThemeCreation): 

2354 created_by_user: bool 

2355 in_user_library: bool 

2356 active_for_user: bool 

2357 

2358 _current_user_id: str = "" 

2359 

2360 @classproperty 

2361 def custom_pipeline_steps(cls) -> dict[str, list]: 

2362 return { 

2363 "created_by_user": [ 

2364 { 

2365 "$addFields": { 

2366 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2367 } 

2368 } 

2369 ], 

2370 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2371 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2372 ], 

2373 "in_user_library": [ 

2374 { 

2375 "$addFields": { 

2376 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2377 } 

2378 } 

2379 ], 

2380 "active_for_user": [ 

2381 { 

2382 "$addFields": { 

2383 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2384 } 

2385 } 

2386 ], 

2387 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2388 "active": [ 

2389 { 

2390 "$addFields": { 

2391 "active": "$$REMOVE", 

2392 } 

2393 } 

2394 ], 

2395 "creator_id": [ 

2396 { 

2397 "$addFields": { 

2398 "creator_id": "$$REMOVE", 

2399 } 

2400 } 

2401 ], 

2402 } 

2403 

2404 @classmethod 

2405 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2406 cls._current_user_id = user_id 

2407 return super().response_from_query(query) 

2408 

2409 @classmethod 

2410 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2411 query.in_user_library = "true" 

2412 return cls.response_from_query(query, user_id) 

2413 

2414 @classmethod 

2415 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2416 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2417 

2418 @classmethod 

2419 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2420 cls._current_user_id = user_id 

2421 return super().get_by_attribute(attribute_name, attribute_value) 

2422 

2423 @classmethod 

2424 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2425 cls._current_user_id = user_id 

2426 return super().get_one_by_attribute(attribute_name, attribute_value) 

2427 

2428 @classmethod 

2429 def get_all(cls, sort_by: str, user_id: str): 

2430 cls._current_user_id = user_id 

2431 return super().get_all(sort_by) 

2432 

2433 @classmethod 

2434 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2435 pipeline = [ 

2436 { 

2437 "$match": { 

2438 "active": {"$eq": user_id}, 

2439 "signal_id": {"$in": signal_ids}, 

2440 } 

2441 }, 

2442 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2443 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2444 { 

2445 "$project": { 

2446 "_id": 0, 

2447 "signal_id": 1, 

2448 "value_color": 1, 

2449 "forced_value_color": 1, 

2450 "value_line_style": 1, 

2451 "forced_value_line_style": 1, 

2452 } 

2453 }, 

2454 ] 

2455 

2456 result = {} 

2457 

2458 cursor = cls.collection().aggregate(pipeline) 

2459 for document in cursor: 

2460 signal_id = document["signal_id"] 

2461 del document["signal_id"] 

2462 result[signal_id] = document 

2463 

2464 return result 

2465 

2466 

2467GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2468 

2469 

2470class PrivateGraphTheme(GraphThemeCreation): 

2471 # private 

2472 creator_id: str 

2473 in_library: list[str] 

2474 active: list[str] 

2475 

2476 @classmethod 

2477 def create( 

2478 cls, 

2479 creator_id: str, 

2480 name: str, 

2481 signal_id: str, 

2482 value_color: str, 

2483 forced_value_color: str, 

2484 value_line_style: LineStyle, 

2485 forced_value_line_style: LineStyle, 

2486 private: bool, 

2487 ): 

2488 color_setting = cls( 

2489 creator_id=creator_id, 

2490 name=name, 

2491 signal_id=signal_id, 

2492 value_color=value_color, 

2493 forced_value_color=forced_value_color, 

2494 value_line_style=value_line_style, 

2495 forced_value_line_style=forced_value_line_style, 

2496 private=private, 

2497 in_library=[creator_id], 

2498 active=[creator_id], 

2499 ) 

2500 

2501 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2502 color_setting.id = str(new_color_setting.inserted_id) 

2503 return color_setting 

2504 

2505 def update(self, update_dict: dict, user_id: str): 

2506 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2507 if in_user_lib and user_id not in self.in_library: 

2508 self.in_library.append(user_id) 

2509 elif not in_user_lib and user_id in self.in_library: 

2510 self.in_library.remove(user_id) 

2511 update_dict["in_library"] = self.in_library 

2512 del update_dict["in_user_library"] 

2513 

2514 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2515 if active_for_user and user_id not in self.active: 

2516 self.active.append(user_id) 

2517 elif not active_for_user and user_id in self.active: 

2518 self.active.remove(user_id) 

2519 update_dict["active"] = self.active 

2520 del update_dict["active_for_user"] 

2521 

2522 if update_dict.get("created_by_user") is not None: 

2523 del update_dict["created_by_user"] 

2524 

2525 self.collection().find_one_and_update( 

2526 {"_id": ObjectId(self.id)}, 

2527 {"$set": update_dict}, 

2528 ) 

2529 

2530 return {} 

2531 

2532 

2533class DeviceStatus(str, Enum): 

2534 started = "started" 

2535 running = "running" 

2536 created = "created" 

2537 exited = "exited" 

2538 restarting = "restarting" 

2539 

2540 

2541class DeviceUpdateFromDeployer(BaseModel): 

2542 status: DeviceStatus 

2543 

2544 

2545class DeviceFromDeployerCreation(BaseModel): 

2546 name: str 

2547 description: str 

2548 

2549 

2550class DeviceFromDeployer(DeviceFromDeployerCreation): 

2551 status: DeviceStatus 

2552 device_id: DeviceId 

2553 logs: str = "" 

2554 

2555 

2556class DeviceDeployer(GenericMongo): 

2557 collection_name: ClassVar[str] = "device_deployers" 

2558 url: HttpUrl 

2559 

2560 def endpoint_url(self, endpoint): 

2561 return f"{str(self.url).rstrip('/')}/{endpoint}" 

2562 

2563 def devices(self) -> list[DeviceFromDeployer]: 

2564 devices = [] 

2565 try: 

2566 response = requests.get(self.endpoint_url("devices")) 

2567 except requests.exceptions.ConnectionError: 

2568 logger.info("connection error") 

2569 return None 

2570 if response.status_code != 200: 

2571 return None 

2572 for device_dict in response.json()["devices"]: 

2573 devices.append( 

2574 DeviceFromDeployer( 

2575 device_id=device_dict["device_id"], 

2576 name=device_dict["container_name"], 

2577 description="desc", 

2578 status=device_dict["status"], 

2579 logs=device_dict["logs"], 

2580 ) 

2581 ) 

2582 return devices 

2583 

2584 def get_device(self, device_id: DeviceId): 

2585 try: 

2586 response = requests.get(self.endpoint_url(f"devices/{device_id}")) 

2587 except requests.exceptions.ConnectionError: 

2588 return None 

2589 if response.status_code != 200: 

2590 return None 

2591 device_dict = response.json() 

2592 return DeviceFromDeployer( 

2593 device_id=device_dict["device_id"], 

2594 name=device_dict["container_name"], 

2595 description="desc", 

2596 status=device_dict["status"], 

2597 logs=device_dict["logs"], 

2598 ) 

2599 

2600 def create_device(self, device: DeviceFromDeployer) -> Device | None: 

2601 try: 

2602 response = requests.post(self.endpoint_url("devices"), json={"name": device.name}) 

2603 except requests.exceptions.ConnectionError: 

2604 return None 

2605 

2606 if response.status_code != 201: 

2607 return None 

2608 

2609 device_dict = response.json() 

2610 return DeviceFromDeployer( 

2611 device_id=device_dict["device_id"], 

2612 name="", 

2613 description="desc", 

2614 status=device_dict["status"], 

2615 ) 

2616 

2617 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None: 

2618 try: 

2619 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump()) 

2620 except requests.exceptions.ConnectionError: 

2621 return None 

2622 

2623 if response.status_code != 200: 

2624 return None 

2625 

2626 device_dict = response.json() 

2627 return Device( 

2628 device_id=device_dict["device_id"], 

2629 name="", 

2630 description="desc", 

2631 pid={}, 

2632 petri_network={}, 

2633 modes=[], 

2634 status=device_dict["status"], 

2635 ) 

2636 

2637 def delete_device(self, device_id: DeviceId) -> DeleteInfo: 

2638 try: 

2639 response = requests.delete(self.endpoint_url(f"devices/{device_id}")) 

2640 except requests.exceptions.ConnectionError: 

2641 return DeleteInfo(is_deleted=False, detail="Connection to deployer error") 

2642 if response.status_code not in [200, 202, 204]: 

2643 return DeleteInfo(is_deleted=False, detail=response.text) 

2644 

2645 return DeleteInfo(is_deleted=True, detail="") 

2646 

2647 

2648DeviceDeployerUpdate = create_update_model(DeviceDeployer)