Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%

1366 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-27 08:17 +0000

1from functools import cached_property 

2import os 

3import re 

4import io 

5import time 

6import csv 

7from typing import Self, ClassVar, Any, Literal, get_args, Annotated 

8import datetime 

9import math 

10import bisect 

11from enum import Enum 

12import logging 

13import copy 

14import asyncio 

15import requests 

16 

17import zipfile 

18import ping3 

19import pytz 

20from bson.objectid import ObjectId 

21from pymongo import ASCENDING, ReturnDocument 

22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator 

23import numpy as npy 

24import lttb 

25import h5py 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 signals_async_database, 

36 devices_states_database, 

37) 

38from twinpad_backend.responses import ListResponse 

39from twinpad_backend.messages import RabbitMQClient 

40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm 

41 

42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"] 

44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"] 

45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"] 

46 

47 

48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

53 

54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

55NUMBER_SAMPLES_DATABASE_UPDATE = 120 

56 

57logger = logging.getLogger("uvicorn.error") 

58 

59 

60class DeleteInfo(BaseModel): 

61 is_deleted: bool 

62 detail: str 

63 

64 

65class classproperty: 

66 """ 

67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

68 Found here: https://stackoverflow.com/a/76301341 

69 """ 

70 

71 def __init__(self, func): 

72 self.fget = func 

73 

74 def __get__(self, _, owner): 

75 return self.fget(owner) 

76 

77 

78def create_update_model(model): 

79 fields = {} 

80 

81 for field_name, field_annotation in model.model_fields.items(): 

82 if field_name != "id": 

83 fields[field_name] = (field_annotation.annotation | None, None) 

84 

85 query_name = model.__name__ + "Update" 

86 return create_model(query_name, **fields) 

87 

88 

89def get_utc_date_from_timestamp(timestamp: float): 

90 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

91 

92 

93def downsample_list(time_vector: list, values: list, max_number_samples: int): 

94 if len(time_vector) < max_number_samples: 

95 return time_vector, values 

96 

97 time_vector_copy = copy.deepcopy(time_vector) 

98 values_copy = copy.deepcopy(values) 

99 

100 none_group_bounds = [] 

101 none_group_index = -1 

102 index = -1 

103 # LTTB doesn't handle None values so remove them 

104 while values_copy.count(None) > 0: 

105 # Store bounds of None value groups so we can insert them back after the downsampling 

106 if (new_index := values_copy.index(None)) != index: 

107 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

108 none_group_index += 1 

109 elif len(none_group_bounds[none_group_index]) < 2: 

110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

111 else: 

112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

113 values_copy.pop(new_index) 

114 index = new_index 

115 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

116 

117 try: 

118 values_array = npy.array([time_vector_copy, values_copy]).T 

119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

120 

121 new_time_vector = interpolated_values[:, 0].tolist() 

122 new_values = interpolated_values[:, 1].tolist() 

123 except ValueError: 

124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

128 return new_time_vector, new_values_nan_to_none 

129 

130 # insert back None values at the correct timestamps 

131 for none_group in none_group_bounds: 

132 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

133 new_time_vector[start_index:start_index] = none_group 

134 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

135 

136 return new_time_vector, new_values 

137 

138 

139def is_of_type(value, wanted_type): 

140 if wanted_type is float: 

141 return isinstance(value, (int, float)) 

142 return isinstance(value, wanted_type) 

143 

144 

145# Models 

146class TwinPadModel(BaseModel): 

147 @classmethod 

148 def dict_to_object(cls, dict_): 

149 return cls.model_validate(dict_) 

150 

151 def to_dict(self, exclude=None): 

152 dict_ = self.model_dump(exclude=exclude, mode="json") 

153 return dict_ 

154 

155 

156def validate_mongo_id(v): 

157 if not ObjectId.is_valid(v): 

158 raise ValueError("Invalid MongoDB id") 

159 return str(v) 

160 

161 

162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)] 

163 

164 

165def validate_12_hex(v: str) -> str: 

166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v): 

167 raise ValueError("ID must be a 12-character hexadecimal string") 

168 return v 

169 

170 

171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)] 

172 

173 

174class GenericMongo(TwinPadModel): 

175 id: MongoId | None = None 

176 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

177 

178 @classmethod 

179 def collection(cls): 

180 return get_collection(systems_database, cls.collection_name, create=True) 

181 

182 @classmethod 

183 def response_from_query(cls, query) -> ListResponse[Self]: 

184 request_filters = query.mongodb_filter() 

185 items = [] 

186 

187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

188 sort_dict = {} 

189 for sort in query.sort_by.split(","): 

190 if ":" in sort: 

191 sort_field, sort_order = sort.split(":") 

192 sort_order = int(sort_order) 

193 else: 

194 sort_field = sort 

195 sort_order = 1 

196 sort_dict[sort_field] = sort_order 

197 

198 collection = get_collection(systems_database, cls.collection_name, create=True) 

199 total = collection.count_documents(request_filters) 

200 

201 pipeline = [] 

202 added_properties = [] 

203 if "$and" in request_filters: 

204 for request_filter in request_filters["$and"]: 

205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

206 if filtered_property in request_filter: 

207 pipeline.extend(pipeline_steps) 

208 added_properties.append(filtered_property) 

209 else: 

210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

211 if filtered_property in request_filters: 

212 pipeline.extend(pipeline_steps) 

213 added_properties.append(filtered_property) 

214 pipeline.append({"$match": request_filters}) 

215 

216 for sort_field in sort_dict.keys(): 

217 if sort_field in cls.custom_pipeline_steps: 

218 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

219 added_properties.append(sort_field) 

220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

221 

222 if (query.limit is not None) and (query.limit != 0): 

223 pipeline.append({"$limit": query.limit}) 

224 

225 for filtered_property, step in cls.custom_pipeline_steps.items(): 

226 if filtered_property not in added_properties: 

227 pipeline.extend(step) 

228 

229 cursor = collection.aggregate(pipeline) 

230 

231 for item_dict in cursor: 

232 items.append(cls.mongo_dict_to_object(item_dict)) 

233 

234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

235 

236 @classmethod 

237 def get_from_id(cls, item_id) -> Self | None: 

238 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

239 

240 @classmethod 

241 def mongo_dict_to_object(cls, mongo_dict): 

242 mongo_dict["id"] = str(mongo_dict["_id"]) 

243 del mongo_dict["_id"] 

244 return cls.dict_to_object(mongo_dict) 

245 

246 @classmethod 

247 def get_by_attribute(cls, attribute_name: str, attribute_value): 

248 """Returns all items that match the attribute with value.""" 

249 pipeline = [] 

250 if attribute_name in cls.custom_pipeline_steps: 

251 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

252 pipeline.append({"$match": {attribute_name: attribute_value}}) 

253 for key, step in cls.custom_pipeline_steps.items(): 

254 if key != attribute_name: 

255 pipeline.extend(step) 

256 items = cls.collection().aggregate(pipeline) 

257 if items is None: 

258 return None 

259 return [cls.mongo_dict_to_object(d) for d in items] 

260 

261 @classmethod 

262 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

263 pipeline = [] 

264 if attribute_name in cls.custom_pipeline_steps: 

265 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

266 pipeline.append({"$match": {attribute_name: attribute_value}}) 

267 pipeline.append({"$limit": 1}) 

268 for key, step in cls.custom_pipeline_steps.items(): 

269 if key != attribute_name: 

270 pipeline.extend(step) 

271 items = cls.collection().aggregate(pipeline).to_list() 

272 if len(items) == 0: 

273 return None 

274 return cls.mongo_dict_to_object(items[0]) 

275 

276 @classmethod 

277 def get_all(cls, sort_by="_id") -> list[Self]: 

278 items = [] 

279 pipeline = [] 

280 if sort_by in cls.custom_pipeline_steps: 

281 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

282 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

283 for key, step in cls.custom_pipeline_steps.items(): 

284 if key != sort_by: 

285 pipeline.extend(step) 

286 for dict_ in cls.collection().aggregate(pipeline): 

287 items.append(cls.mongo_dict_to_object(dict_)) 

288 return items 

289 

290 @classmethod 

291 def get_number_documents(cls): 

292 collection = get_collection(systems_database, cls.collection_name) 

293 if collection is None: 

294 return 0 

295 return collection.count_documents( 

296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]} 

297 ) 

298 

299 def insert(self): 

300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

301 self.id = str(insert_result.inserted_id) 

302 return self.id 

303 

304 def update(self, update_dict): 

305 for key, value in update_dict.items(): 

306 setattr(self, key, value) 

307 self.collection().find_one_and_update( 

308 {"_id": ObjectId(self.id)}, 

309 {"$set": update_dict}, 

310 return_document=ReturnDocument.AFTER, 

311 ) 

312 

313 return self 

314 

315 def delete(self): 

316 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

317 return result.deleted_count > 0 

318 

319 

320class User(GenericMongo): 

321 collection_name: ClassVar[str] = "users" 

322 

323 firstname: str 

324 lastname: str 

325 email: str 

326 password: str 

327 is_active: bool | None = False 

328 is_admin: bool | None = False 

329 is_connected: bool | None = False 

330 company_id: str | None = None 

331 

332 def to_dict(self, exclude: set = set()): 

333 exclude.add("password") 

334 return GenericMongo.to_dict(self, exclude=exclude) 

335 

336 @classmethod 

337 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

338 users = cls.get_all() 

339 if not users: 

340 is_admin = True 

341 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

342 user_collection = get_collection(systems_database, "users", create=True) 

343 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

344 if new_user is None: 

345 return None 

346 return {"user_id": str(new_user.inserted_id)} 

347 

348 @classmethod 

349 def update_info(cls, user: "UserUpdate", user_id: str): 

350 updated_user = cls.collection().find_one_and_update( 

351 {"_id": ObjectId(user_id)}, 

352 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

353 return_document=ReturnDocument.AFTER, 

354 ) 

355 updated_user["id"] = str(updated_user["_id"]) 

356 del (updated_user["_id"], updated_user["is_connected"]) 

357 return cls(**updated_user) 

358 

359 

360UserUpdate = create_update_model(User) 

361 

362 

363class Mode(TwinPadModel): 

364 mode_id: int 

365 name: str 

366 frequency_multiplier: float 

367 min_frequency: float 

368 

369 

370class DeviceUpdate(TwinPadModel): 

371 mode_id: int 

372 

373 

374class Device(GenericMongo): 

375 collection_name: ClassVar[str] = "devices" 

376 

377 device_id: DeviceId 

378 name: str 

379 description: str = "" 

380 modes: list[Mode] 

381 current_mode_id: int | None = None 

382 last_ping: float | None = None 

383 petri_network: Any 

384 pid: Any 

385 load: float | None = None 

386 tokens: list[int] = Field(default_factory=list) 

387 status: str 

388 

389 async def change_mode(self, update_dict, current_user: User): 

390 has_error = False 

391 

392 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

393 has_error = True 

394 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

395 elif self.current_mode_id is not None: 

396 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

397 else: 

398 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

399 command = Command( 

400 sent_at=time.time(), 

401 command_type="Mode change", 

402 description=description, 

403 user_id=current_user.id, 

404 ) 

405 

406 if has_error: 

407 command.response_time = 0 

408 command.succeeded = False 

409 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

410 else: 

411 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id) 

412 command.receive_response(response) 

413 

414 Command.create(command) 

415 return response 

416 

417 @classmethod 

418 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

419 devices_by_id = {} 

420 for signal_id in signal_ids: 

421 device_id = signal_id.split(".")[0] 

422 if device_id not in devices_by_id: 

423 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

424 return devices_by_id 

425 

426 

427class DeviceSetup(GenericMongo): 

428 collection_name: ClassVar[str] = "device_setups" 

429 

430 device_ids: list[str] 

431 active: bool = False 

432 variable_mapping: dict[str, str] 

433 

434 

435DeviceSetupUpdate = create_update_model(DeviceSetup) 

436 

437 

438class DeviceState(GenericMongo): 

439 collection_name: ClassVar[str] = "devices_states" 

440 

441 timestamp: float 

442 mode: str | None = None 

443 load: float | None = None 

444 tokens: list[int] = Field(default_factory=list) 

445 modified_properties: list[str] = Field(default_factory=list) 

446 

447 @classmethod 

448 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]: 

449 req_filter = query.mongodb_filter() 

450 items = [] 

451 if ":" in query.sort_by: 

452 sort_field, sort_order = query.sort_by.split(":") 

453 sort_order = int(sort_order) 

454 else: 

455 sort_field = query.sort_by 

456 sort_order = 1 

457 collection = get_collection(devices_states_database, device_id) 

458 if collection is None: 

459 total = 0 

460 cursor = [] 

461 else: 

462 total = collection.count_documents(req_filter) 

463 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

464 if (query.limit is not None) and (query.limit != 0): 

465 cursor = cursor.limit(query.limit) 

466 for item_dict in cursor: 

467 items.append( 

468 cls( 

469 timestamp=item_dict.get("precise_timestamp"), 

470 mode=item_dict.get("mode", None), 

471 load=item_dict.get("load", None), 

472 tokens=item_dict.get("tokens", Field(default_factory=list)), 

473 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

474 ) 

475 ) 

476 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

477 

478 

479class SignalSample(TwinPadModel): 

480 signal_id: str 

481 timestamp: float 

482 value: float | int | str | bool | None 

483 forced_value: float | int | str | bool | None = None 

484 

485 @classmethod 

486 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

487 

488 collection = get_signal_collection(signal_id) 

489 if collection is None: 

490 return None 

491 

492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

494 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

495 first_bucket = None 

496 if bucket is not None: 

497 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

498 if first_bucket is not None: 

499 sample_data = collection.find_one( 

500 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

501 ) 

502 else: 

503 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

504 

505 if sample_data is None: 

506 return None 

507 

508 timestamp = sample_data["precise_timestamp"] 

509 

510 return cls( 

511 signal_id=signal_id, 

512 timestamp=timestamp, 

513 value=sample_data.get("value", None), 

514 forced_value=sample_data.get("forced_value", None), 

515 ) 

516 

517 @classmethod 

518 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

519 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

520 

521 @classmethod 

522 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

523 collection = get_signal_collection(signal_id) 

524 if collection is None: 

525 return None 

526 

527 # Same workaround as above function, very effective to narrow down big sets of data 

528 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

529 last_bucket = None 

530 if bucket is not None: 

531 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

532 if last_bucket is not None: 

533 sample_data = collection.find_one( 

534 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

535 sort={"precise_timestamp": -1}, 

536 ) 

537 else: 

538 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

539 

540 if sample_data is None: 

541 return None 

542 

543 timestamp = sample_data["precise_timestamp"] 

544 

545 if device is None: 

546 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

547 if device is not None and device.last_ping is not None: 

548 if timestamp is None: 

549 timestamp = device.last_ping 

550 else: 

551 timestamp = max(timestamp, device.last_ping) 

552 return cls( 

553 signal_id=signal_id, 

554 timestamp=timestamp, 

555 value=sample_data.get("value", None), 

556 forced_value=sample_data.get("forced_value", None), 

557 ) 

558 

559 @classmethod 

560 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

561 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

562 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

563 

564 

565class SignalData(TwinPadModel): 

566 signal_id: str 

567 forcible: bool = True 

568 time_vector: list[float] 

569 values: list[float | int | str | None] 

570 forced_values: list[float | int | str | None] 

571 

572 data_start: float | None = None 

573 data_end: float | None = None 

574 

575 number_samples: int = 0 

576 number_samples_db: int = 0 

577 

578 db_query_time: float = 0.0 

579 init_time: float = 0.0 

580 data_processing_time: float = 0.0 

581 

582 phase_id: str | None = None 

583 

584 @classmethod 

585 def get_from_signal_id( 

586 cls, 

587 signal_id: str, 

588 min_timestamp: float = None, 

589 max_timestamp: float = None, 

590 window_min_timestamp: float = None, 

591 window_max_timestamp: float = None, 

592 interpolate_bounds: bool = True, 

593 max_documents: int = None, 

594 ) -> Self: 

595 

596 now = time.time() 

597 

598 req_signal = {} 

599 if min_timestamp is not None: 

600 req_signal.setdefault("timestamp", {}) 

601 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

602 if max_timestamp is not None: 

603 req_signal.setdefault("timestamp", {}) 

604 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

605 

606 collection = get_signal_collection(signal_id) 

607 if collection is None: 

608 return cls( 

609 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

610 ) 

611 

612 db_req_start = time.time() 

613 

614 sort_step = {"$sort": {"precise_timestamp": 1}} 

615 number_results = collection.count_documents(req_signal) 

616 

617 pipeline = [] 

618 if req_signal: 

619 pipeline.append({"$match": req_signal}) # Filter data if needed 

620 

621 pipeline.extend( 

622 [ 

623 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

624 sort_step, 

625 ] 

626 ) 

627 

628 if max_documents is not None and max_documents < number_results: 

629 unsampling_ratio = math.ceil(number_results / max_documents) 

630 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

631 pipeline.extend( 

632 [ 

633 { 

634 "$setWindowFields": { 

635 "sortBy": {"precise_timestamp": 1}, 

636 "output": {"index": {"$documentNumber": {}}}, 

637 } 

638 }, 

639 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

640 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

641 {"$replaceRoot": {"newRoot": "$doc"}}, 

642 {"$unset": ["index", "group_id"]}, 

643 {"$sort": {"precise_timestamp": 1}}, 

644 ] 

645 ) 

646 

647 # logger.info(f"pipeline: %s", str(pipeline)) 

648 cursor = collection.aggregate(pipeline) 

649 db_req_time = time.time() - db_req_start 

650 

651 init_time = time.time() 

652 

653 results = cursor.to_list() 

654 time_vector = [] 

655 values = [] 

656 forced_values = [] 

657 for s in results: 

658 time_vector.append(s["precise_timestamp"]) 

659 values.append(s.get("value", None)) 

660 forced_values.append(s.get("forced_value", None)) 

661 

662 signal = Signal.get_from_signal_id(signal_id) 

663 if signal is None: 

664 return cls( 

665 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

666 ) 

667 class_ = signal.signal_data_class 

668 

669 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

670 time_vector, values, forced_values = cls.interpolate_bounds( 

671 class_, 

672 collection, 

673 signal_id, 

674 time_vector, 

675 values, 

676 forced_values, 

677 window_min_timestamp, 

678 window_max_timestamp, 

679 ) 

680 

681 if values: 

682 # TODO: check below. a bit strange 

683 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

684 # Adding last value as it should be repeated 

685 time_vector.append(now) 

686 values.append(values[-1]) 

687 forced_values.append(forced_values[-1]) 

688 

689 init_time = time.time() - init_time 

690 

691 # See line 292 for explanation 

692 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

693 first_bucket = None 

694 if bucket is not None: 

695 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

696 if first_bucket is not None: 

697 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

698 else: 

699 data_start = None 

700 

701 last_bucket = None 

702 if bucket is not None: 

703 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

704 if last_bucket is not None: 

705 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

706 else: 

707 data_end = None 

708 

709 return class_( 

710 signal_id=signal_id, 

711 forcible=signal.forcible, 

712 time_vector=time_vector, 

713 values=values, 

714 forced_values=forced_values, 

715 data_start=data_start, 

716 data_end=data_end, 

717 number_samples=len(values), 

718 number_samples_db=number_results, 

719 db_query_time=db_req_time, 

720 init_time=init_time, 

721 ) 

722 

723 @staticmethod 

724 def interpolate_bounds( 

725 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

726 ): 

727 sample_right = None 

728 # Fetching right side value & interpolation 

729 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

730 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

731 sample_right = collection.find_one( 

732 { 

733 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

734 "value": {"$exists": True}, 

735 }, 

736 sort={"precise_timestamp": -1}, 

737 ) 

738 if sample_right: 

739 if time_vector: 

740 right_sd = class_( 

741 signal_id=signal_id, 

742 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

743 values=[values[-1], sample_right.get("value", None)], 

744 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

745 ) 

746 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

747 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

748 else: 

749 max_ts_value = sample_right.get("value", None) 

750 max_ts_forced_value = sample_right.get("forced_value", None) 

751 time_vector.append(window_max_timestamp) 

752 values.append(max_ts_value) 

753 forced_values.append(max_ts_forced_value) 

754 

755 # Fetching left side value & interpolation 

756 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

757 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

758 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

759 sample_left = sample_right 

760 sample_left = collection.find_one( 

761 { 

762 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

763 "value": {"$exists": True}, 

764 }, 

765 sort={"precise_timestamp": -1}, 

766 ) 

767 

768 if sample_left: 

769 if time_vector: 

770 left_sd = class_( 

771 signal_id=signal_id, 

772 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

773 values=[sample_left["value"], values[0]], 

774 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

775 ) 

776 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

777 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

778 else: 

779 min_ts_value = sample_left.get("value", None) 

780 min_ts_forced_value = sample_left.get("forced_value", None) 

781 time_vector.insert(0, window_min_timestamp) 

782 values.insert(0, min_ts_value) 

783 forced_values.insert(0, min_ts_forced_value) 

784 

785 return time_vector, values, forced_values 

786 

787 def interpolate_values(self, new_time_vector: list[float]): 

788 return self.interpolate(new_time_vector, self.values) 

789 

790 def interpolate_forced_values(self, new_time_vector: list[float]): 

791 return self.interpolate(new_time_vector, self.forced_values) 

792 

793 def uniform_desampling(self, number_samples_max: int) -> Self: 

794 data_processing_time = time.time() 

795 if number_samples_max and self.number_samples > number_samples_max: 

796 new_time_vector = npy.linspace( 

797 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

798 ).tolist() 

799 values = self.interpolate_values(new_time_vector) 

800 forced_values = self.interpolate_forced_values(new_time_vector) 

801 time_vector = new_time_vector 

802 number_samples = len(time_vector) 

803 else: 

804 time_vector = self.time_vector 

805 number_samples = len(self.values) 

806 values = self.values[:] 

807 forced_values = self.forced_values[:] 

808 data_processing_time = time.time() - data_processing_time 

809 

810 return self.__class__( 

811 signal_id=self.signal_id, 

812 time_vector=time_vector, 

813 values=values, 

814 forced_values=forced_values, 

815 number_samples=number_samples, 

816 number_samples_db=self.number_samples, 

817 data_start=self.data_start, 

818 data_end=self.data_end, 

819 db_query_time=self.db_query_time, 

820 init_time=self.init_time, 

821 data_processing_time=self.data_processing_time + data_processing_time, 

822 phase_id=self.phase_id, 

823 ) 

824 

825 def min_max_downsampling(self, number_samples_max: int) -> Self: 

826 return self.uniform_desampling(number_samples_max) 

827 

828 def interest_window_desampling( 

829 self, 

830 window_max_number_samples: int, 

831 outside_max_number_samples: int, 

832 window_min_timestamp: float | None = None, 

833 window_max_timestamp: float | None = None, 

834 ) -> Self: 

835 """Performs a sampling in a window of interest and outside.""" 

836 

837 if not self.time_vector: 

838 return self 

839 

840 if window_min_timestamp is None: 

841 window_min_timestamp = self.time_vector[0] 

842 if window_max_timestamp is None: 

843 window_max_timestamp = self.time_vector[-1] 

844 

845 data_processing_time = time.time() 

846 

847 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

848 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

849 

850 time_vector_before = self.time_vector[:index_window_start] 

851 time_vector_window = self.time_vector[index_window_start:index_window_end] 

852 time_vector_after = self.time_vector[index_window_end:] 

853 

854 # Resampling window 

855 if time_vector_window: 

856 # Ensurring window bounds 

857 if time_vector_window[0] != window_min_timestamp: 

858 time_vector_window.insert(0, window_min_timestamp) 

859 if time_vector_window[-1] != window_max_timestamp: 

860 time_vector_window.append(window_max_timestamp) 

861 else: 

862 time_vector_window = [window_min_timestamp, window_max_timestamp] 

863 

864 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

865 # Resampling 

866 new_window_time_vector = npy.linspace( 

867 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

868 ).tolist() 

869 time_vector_window = new_window_time_vector 

870 

871 # Resampling outside 

872 number_samples_before = len(time_vector_before) 

873 number_samples_after = len(time_vector_after) 

874 if ( 

875 outside_max_number_samples is not None 

876 and (number_samples_before + number_samples_after) > outside_max_number_samples 

877 ): 

878 new_number_samples_before = min( 

879 number_samples_before, 

880 math.ceil( 

881 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

882 ), 

883 ) 

884 new_number_samples_after = min( 

885 number_samples_after, 

886 math.ceil( 

887 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

888 ), 

889 ) 

890 # Adjusting numbers as math.ceil can do +1 on sum 

891 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

892 if new_number_samples_before > new_number_samples_after: 

893 new_number_samples_before -= 1 

894 else: 

895 new_number_samples_after -= 1 

896 

897 if new_number_samples_before > 0: 

898 new_time_vector_before = npy.linspace( 

899 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

900 ).tolist() 

901 time_vector_before = new_time_vector_before 

902 

903 if new_number_samples_after > 0: 

904 new_time_vector_after = npy.linspace( 

905 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

906 ).tolist()[::-1] 

907 time_vector_after = new_time_vector_after 

908 

909 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

910 values = self.interpolate_values(new_time_vector) 

911 forced_values = self.interpolate_forced_values(new_time_vector) 

912 number_samples = len(values) 

913 

914 data_processing_time = time.time() - data_processing_time 

915 

916 return self.__class__( 

917 signal_id=self.signal_id, 

918 forcible=self.forcible, 

919 time_vector=new_time_vector, 

920 values=values, 

921 forced_values=forced_values, 

922 number_samples=number_samples, 

923 number_samples_db=self.number_samples, 

924 data_start=self.data_start, 

925 data_end=self.data_end, 

926 db_query_time=self.db_query_time, 

927 init_time=self.init_time, 

928 data_processing_time=self.data_processing_time + data_processing_time, 

929 ) 

930 

931 def zero_time_vector(self, data_start: float): 

932 data_processing_time = time.time() 

933 if len(self.time_vector) == 0: 

934 return self 

935 time_vector = npy.array(self.time_vector) - data_start 

936 data_processing_time = time.time() - data_processing_time 

937 

938 return self.__class__( 

939 signal_id=self.signal_id, 

940 time_vector=time_vector, 

941 values=self.values, 

942 forced_values=self.forced_values, 

943 number_samples=self.number_samples, 

944 number_samples_db=self.number_samples_db, 

945 data_start=time_vector[0], 

946 data_end=time_vector[-1], 

947 db_query_time=self.db_query_time, 

948 init_time=self.init_time, 

949 data_processing_time=self.data_processing_time + data_processing_time, 

950 ) 

951 

952 def csv_export(self): 

953 output = io.StringIO() 

954 writer = csv.writer(output) 

955 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

956 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

957 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

958 return output.getvalue().encode("utf-8") 

959 

960 def prestoplot_export(self): 

961 clean_signal_id = self.signal_id.replace(".", "_") 

962 if clean_signal_id[0].isnumeric(): 

963 clean_signal_id = "_" + clean_signal_id 

964 

965 output = io.StringIO() 

966 output.write("# Encoding:\tUTF-8\n") 

967 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

968 output.write("ISO8601\tnone\tnone\n") 

969 output.write(f"# Description :\t{clean_signal_id}\n") 

970 

971 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

972 output.write( 

973 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

974 ) 

975 return output.getvalue().encode("utf-8") 

976 

977 

978class NumericSignalData(SignalData): 

979 data_type: str = "float" 

980 values: list[float | int | None] 

981 forced_values: list[float | int | None] 

982 

983 def interpolate(self, new_time_vector: list[float], items): 

984 items = [npy.nan if s is None else s for s in items] 

985 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

986 

987 def uniform_desampling(self, number_samples_max: int) -> Self: 

988 data_processing_time = time.time() 

989 if number_samples_max and self.number_samples > number_samples_max: 

990 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

991 forced_values = self.interpolate_forced_values(time_vector) 

992 number_samples = len(time_vector) 

993 else: 

994 time_vector = self.time_vector 

995 number_samples = len(self.values) 

996 values = self.values[:] 

997 forced_values = self.forced_values[:] 

998 data_processing_time = time.time() - data_processing_time 

999 

1000 return self.__class__( 

1001 signal_id=self.signal_id, 

1002 time_vector=time_vector, 

1003 values=values, 

1004 forced_values=forced_values, 

1005 number_samples=number_samples, 

1006 number_samples_db=self.number_samples, 

1007 data_start=self.data_start, 

1008 data_end=self.data_end, 

1009 db_query_time=self.db_query_time, 

1010 init_time=self.init_time, 

1011 data_processing_time=self.data_processing_time + data_processing_time, 

1012 ) 

1013 

1014 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1015 if self.number_samples < number_samples_max: 

1016 return self 

1017 

1018 data_processing_time = time.time() 

1019 

1020 number_bins = number_samples_max // 2 

1021 

1022 time_vector = npy.array(self.time_vector, dtype=npy.float64) 

1023 values = npy.array(self.values, dtype=npy.float64) 

1024 forced_values = npy.array(self.forced_values, dtype=npy.float64) 

1025 

1026 points_per_bin = self.number_samples // number_bins 

1027 

1028 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above 

1029 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for 

1030 if self.number_samples - number_bins * points_per_bin > 1: 

1031 points_per_bin += 1 

1032 number_bins = self.number_samples // points_per_bin + 1 

1033 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan) 

1034 time_vector = npy.concatenate([time_vector, nan_points_to_add]) 

1035 values = npy.concatenate([values, nan_points_to_add]) 

1036 forced_values = npy.concatenate([forced_values, nan_points_to_add]) 

1037 

1038 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin) 

1039 values_matrix = values.reshape(number_bins, points_per_bin) 

1040 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin) 

1041 

1042 indexes_min = npy.zeros(number_bins, dtype="int64") 

1043 indexes_max = npy.zeros(number_bins, dtype="int64") 

1044 

1045 for row in range(number_bins): 

1046 min_value = values_matrix[row, 0] 

1047 max_value = values_matrix[row, 0] 

1048 for column in range(points_per_bin): 

1049 if values_matrix[row, column] < min_value: 

1050 min_value = values_matrix[row, column] 

1051 indexes_min[row] = column 

1052 elif values_matrix[row, column] > max_value: 

1053 max_value = values_matrix[row, column] 

1054 indexes_max[row] = column 

1055 

1056 row_index = npy.repeat(npy.arange(number_bins), 2) 

1057 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel() 

1058 

1059 data_processing_time = time.time() - data_processing_time 

1060 

1061 new_time_vector = timestamps_matrix[row_index, column_index] 

1062 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector) 

1063 new_values = values_matrix[row_index, column_index] 

1064 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1065 new_forced_values = forced_values_matrix[row_index, column_index] 

1066 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1067 

1068 # Make sure there are no None values for the time vector 

1069 time_vector_filter = new_time_vector != None 

1070 new_time_vector = new_time_vector[time_vector_filter] 

1071 new_values = new_values[time_vector_filter] 

1072 new_forced_values = new_forced_values[time_vector_filter] 

1073 

1074 return self.__class__( 

1075 signal_id=self.signal_id, 

1076 time_vector=new_time_vector, 

1077 values=new_values, 

1078 forced_values=new_forced_values, 

1079 number_samples=number_bins * 2, 

1080 number_samples_db=self.number_samples_db, 

1081 data_start=self.data_start, 

1082 data_end=self.data_end, 

1083 db_query_time=self.db_query_time, 

1084 init_time=self.init_time, 

1085 data_processing_time=self.data_processing_time + data_processing_time, 

1086 phase_id=self.phase_id, 

1087 ) 

1088 

1089 def interest_window_desampling( 

1090 self, 

1091 window_max_number_samples: int, 

1092 outside_max_number_samples: int, 

1093 window_min_timestamp: float | None = None, 

1094 window_max_timestamp: float | None = None, 

1095 ) -> Self: 

1096 """Performs a sampling in a window of interest and outside.""" 

1097 

1098 if not self.time_vector: 

1099 return self 

1100 

1101 if window_min_timestamp is None: 

1102 window_min_timestamp = self.time_vector[0] 

1103 if window_max_timestamp is None: 

1104 window_max_timestamp = self.time_vector[-1] 

1105 

1106 data_processing_time = time.time() 

1107 

1108 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1109 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1110 

1111 time_vector_before = self.time_vector[:index_window_start] 

1112 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1113 time_vector_after = self.time_vector[index_window_end:] 

1114 

1115 values_before = self.values[:index_window_start] 

1116 values_window = self.values[index_window_start:index_window_end] 

1117 values_after = self.values[index_window_end:] 

1118 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1119 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1120 

1121 # Resampling window 

1122 if time_vector_window: 

1123 # Ensurring window bounds 

1124 if time_vector_window[0] != window_min_timestamp: 

1125 time_vector_window.insert(0, window_min_timestamp) 

1126 values_window.insert(0, window_min_value) 

1127 if time_vector_window[-1] != window_max_timestamp: 

1128 time_vector_window.append(window_max_timestamp) 

1129 values_window.append(window_max_value) 

1130 else: 

1131 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1132 values_window = [window_min_value, window_max_value] 

1133 

1134 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

1135 # Resampling 

1136 time_vector_window, values_window = downsample_list( 

1137 time_vector_window, values_window, window_max_number_samples 

1138 ) 

1139 

1140 # Resampling outside 

1141 number_samples_before = len(time_vector_before) 

1142 number_samples_after = len(time_vector_after) 

1143 if ( 

1144 outside_max_number_samples is not None 

1145 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1146 ): 

1147 new_number_samples_before = min( 

1148 number_samples_before, 

1149 math.ceil( 

1150 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1151 ), 

1152 ) 

1153 new_number_samples_after = min( 

1154 number_samples_after, 

1155 math.ceil( 

1156 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1157 ), 

1158 ) 

1159 # Adjusting numbers as math.ceil can do +1 on sum 

1160 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1161 if new_number_samples_before > new_number_samples_after: 

1162 new_number_samples_before -= 1 

1163 else: 

1164 new_number_samples_after -= 1 

1165 

1166 if new_number_samples_before > 0: 

1167 time_vector_before, values_before = downsample_list( 

1168 time_vector_before, values_before, new_number_samples_before 

1169 ) 

1170 

1171 if new_number_samples_after > 0: 

1172 time_vector_after, values_after = downsample_list( 

1173 time_vector_after, values_after, new_number_samples_after 

1174 ) 

1175 

1176 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1177 values = values_before + values_window + values_after 

1178 forced_values = self.interpolate_forced_values(new_time_vector) 

1179 number_samples = len(values) 

1180 

1181 data_processing_time = time.time() - data_processing_time 

1182 

1183 return self.__class__( 

1184 signal_id=self.signal_id, 

1185 time_vector=new_time_vector, 

1186 values=values, 

1187 forced_values=forced_values, 

1188 number_samples=number_samples, 

1189 number_samples_db=self.number_samples, 

1190 data_start=self.data_start, 

1191 data_end=self.data_end, 

1192 db_query_time=self.db_query_time, 

1193 init_time=self.init_time, 

1194 data_processing_time=self.data_processing_time + data_processing_time, 

1195 ) 

1196 

1197 

1198class StringSignalData(SignalData): 

1199 data_type: str = "str" 

1200 values: list[str | None] 

1201 forced_values: list[str | None] 

1202 

1203 def interpolate(self, new_time_vector: list[float], items): 

1204 # Find the indices of the values in xp that are just smaller or equal to x 

1205 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1206 indices = npy.clip(indices, 0, len(items) - 1) 

1207 # Return the corresponding left string values from fp 

1208 return [items[i] for i in indices] 

1209 

1210 

1211class SignalsData(TwinPadModel): 

1212 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1213 data_processing_time: float 

1214 data_start: float | None 

1215 data_end: float | None 

1216 

1217 @classmethod 

1218 def get_from_signal_ids( 

1219 cls, 

1220 signal_ids: list[str], 

1221 min_timestamp: float = None, 

1222 max_timestamp: float = None, 

1223 window_min_timestamp: float = None, 

1224 window_max_timestamp: float = None, 

1225 interpolate_bounds: bool = True, 

1226 max_documents: int = None, 

1227 ) -> Self: 

1228 signals_data = [] 

1229 data_start = None 

1230 data_end = None 

1231 if max_timestamp is None: 

1232 max_timestamp = time.time() 

1233 data_processing_time = 0.0 

1234 for signal_id in signal_ids: 

1235 signal_data = SignalData.get_from_signal_id( 

1236 signal_id=signal_id, 

1237 min_timestamp=min_timestamp, 

1238 max_timestamp=max_timestamp, 

1239 window_min_timestamp=window_min_timestamp, 

1240 window_max_timestamp=window_max_timestamp, 

1241 interpolate_bounds=interpolate_bounds, 

1242 max_documents=max_documents, 

1243 ) 

1244 data_processing_time += signal_data.data_processing_time 

1245 signals_data.append(signal_data) 

1246 if signal_data.data_start is not None: 

1247 if data_start is None: 

1248 data_start = signal_data.data_start 

1249 else: 

1250 data_start = min(signal_data.data_start, data_start) 

1251 if signal_data.data_end is not None: 

1252 if data_end is None: 

1253 data_end = signal_data.data_end 

1254 else: 

1255 data_end = max(signal_data.data_end, data_end) 

1256 

1257 return cls( 

1258 signals_data=signals_data, 

1259 data_processing_time=data_processing_time, 

1260 data_start=data_start, 

1261 data_end=data_end, 

1262 ) 

1263 

1264 @classmethod 

1265 def get_from_phase_and_signal_ids( 

1266 cls, 

1267 phases: list, 

1268 phase_sync_times: list[float | None], 

1269 signal_ids: list[str], 

1270 window_min_timestamps: list[float | None], 

1271 window_max_timestamps: list[float | None], 

1272 zero_time_vector: bool = True, 

1273 ): 

1274 signals_data: list[SignalData] = [] 

1275 computation_start = time.time() 

1276 

1277 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip( 

1278 phases, phase_sync_times, window_min_timestamps, window_max_timestamps 

1279 ): 

1280 min_timestamp = phase.start_at / 1000 

1281 max_timestamp = phase.end_at / 1000 

1282 

1283 if sync_time is None: 

1284 sync_time = min_timestamp 

1285 

1286 if window_max_timestamp is not None and window_min_timestamp is not None: 

1287 window_length = window_max_timestamp - window_min_timestamp 

1288 

1289 if window_min_timestamp != min_timestamp: 

1290 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20) 

1291 if window_max_timestamp != max_timestamp: 

1292 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20) 

1293 

1294 for signal_id in signal_ids: 

1295 signal_data = SignalData.get_from_signal_id( 

1296 signal_id, 

1297 min_timestamp, 

1298 max_timestamp, 

1299 window_min_timestamp, 

1300 window_max_timestamp, 

1301 interpolate_bounds=False, 

1302 max_documents=None, 

1303 ) 

1304 

1305 if len(signal_data.time_vector) == 0: 

1306 continue 

1307 

1308 if zero_time_vector: 

1309 signal_data = signal_data.zero_time_vector(sync_time) 

1310 signal_data.phase_id = phase.id 

1311 

1312 signals_data.append(signal_data) 

1313 

1314 return cls( 

1315 signals_data=signals_data, 

1316 data_processing_time=time.time() - computation_start, 

1317 data_start=0, 

1318 data_end=0, 

1319 ) 

1320 

1321 def uniform_desampling(self, number_samples_max: int) -> Self: 

1322 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1323 return SignalsData( 

1324 signals_data=signals_data, 

1325 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1326 data_start=self.data_start, 

1327 data_end=self.data_end, 

1328 ) 

1329 

1330 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1331 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1332 return SignalsData( 

1333 signals_data=signals_data, 

1334 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1335 data_start=self.data_start, 

1336 data_end=self.data_end, 

1337 ) 

1338 

1339 def interest_window_desampling( 

1340 self, 

1341 window_max_number_samples: int, 

1342 outside_max_number_samples: int, 

1343 window_min_timestamp: float = None, 

1344 window_max_timestamp: float = None, 

1345 ) -> Self: 

1346 signals_data = [ 

1347 s.interest_window_desampling( 

1348 window_max_number_samples=window_max_number_samples, 

1349 outside_max_number_samples=outside_max_number_samples, 

1350 window_min_timestamp=window_min_timestamp, 

1351 window_max_timestamp=window_max_timestamp, 

1352 ) 

1353 for s in self.signals_data 

1354 ] 

1355 

1356 return SignalsData( 

1357 signals_data=signals_data, 

1358 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1359 data_start=self.data_start, 

1360 data_end=self.data_end, 

1361 ) 

1362 

1363 def zero_time_vector(self, data_start: float): 

1364 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data] 

1365 return SignalsData( 

1366 signals_data=signals_data, 

1367 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1368 data_start=0, 

1369 data_end=max([s.data_end for s in signals_data]), 

1370 ) 

1371 

1372 @classmethod 

1373 async def apply_single_function( 

1374 cls, 

1375 phase, 

1376 base_signal_id: str, 

1377 function: SINGLE_POST_PROCESSING_FUNCTION, 

1378 window_min_timestamp: float = None, 

1379 window_max_timestamp: float = None, 

1380 ): 

1381 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}" 

1382 

1383 processed_result_signal = Signal.get_from_signal_id(signal_id) 

1384 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids: 

1385 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp) 

1386 

1387 signals_data = cls.get_from_phase_and_signal_ids( 

1388 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False 

1389 ) 

1390 

1391 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0: 

1392 return None 

1393 

1394 new_values = None 

1395 new_forced_values = None 

1396 time_vector = npy.array(signals_data.signals_data[0].time_vector) 

1397 values = signals_data.signals_data[0].values 

1398 forced_values = signals_data.signals_data[0].forced_values 

1399 

1400 match (function): 

1401 case "Cumul": 

1402 new_values = cumul(values) 

1403 new_forced_values = cumul(forced_values) 

1404 # case "CumulDistrib": 

1405 # new_values = cumul_distrib(values) 

1406 # new_forced_values = cumul_distrib(forced_values) 

1407 case "Delta": 

1408 new_values = delta(values) 

1409 new_forced_values = delta(forced_values) 

1410 case "DeltaT": 

1411 new_values = delta(time_vector) 

1412 new_forced_values = new_values 

1413 case "Derive": 

1414 new_values = derive(time_vector, values) 

1415 new_forced_values = derive(time_vector, forced_values) 

1416 case "Integ": 

1417 new_values = integ(time_vector, values) 

1418 new_forced_values = integ(time_vector, forced_values) 

1419 

1420 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1421 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1422 

1423 loop = asyncio.get_running_loop() 

1424 loop.create_task( 

1425 cls.save_function_signal( 

1426 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible 

1427 ) 

1428 ) 

1429 

1430 if window_max_timestamp is not None: 

1431 max_timestamp_mask = time_vector <= window_max_timestamp 

1432 time_vector = time_vector[max_timestamp_mask] 

1433 new_values = new_values[max_timestamp_mask] 

1434 new_forced_values = new_forced_values[max_timestamp_mask] 

1435 if window_min_timestamp is not None: 

1436 min_timestamp_mask = time_vector >= window_min_timestamp 

1437 time_vector = time_vector[min_timestamp_mask] 

1438 new_values = new_values[min_timestamp_mask] 

1439 new_forced_values = new_forced_values[min_timestamp_mask] 

1440 

1441 signals_data.signals_data[0].time_vector = time_vector.tolist() 

1442 signals_data.signals_data[0].values = new_values.tolist() 

1443 signals_data.signals_data[0].forced_values = new_forced_values.tolist() 

1444 signals_data.signals_data[0].number_samples = time_vector.size 

1445 

1446 signals_data.signals_data[0].signal_id = signal_id 

1447 

1448 return signals_data 

1449 

1450 @classmethod 

1451 async def apply_multiple_function( 

1452 cls, 

1453 phases: list, 

1454 signal_ids: list, 

1455 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

1456 window_min_timestamp: float = None, 

1457 window_max_timestamp: float = None, 

1458 ): 

1459 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION): 

1460 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}" 

1461 else: 

1462 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}" 

1463 

1464 active_phase = phases[0] 

1465 if function in {"Align-X", "Using-X"}: 

1466 active_phase = phases[1] 

1467 

1468 processed_result_signal = Signal.get_from_signal_id(function_signal_id) 

1469 if processed_result_signal is not None and ( 

1470 active_phase.id in processed_result_signal.computed_phases_ids 

1471 ): # If signal has been computed for the correct phase 

1472 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp) 

1473 

1474 array_length = None 

1475 time_vector_list = [] 

1476 values_list = [] 

1477 forced_values_list = [] 

1478 forcible = True 

1479 for phase, signal_id in zip(phases, signal_ids): 

1480 signals_data = cls.get_from_phase_and_signal_ids( 

1481 [phase], [None], [signal_id], [None], [None], zero_time_vector=False 

1482 ) 

1483 

1484 if len(signals_data.signals_data) == 0: 

1485 return None 

1486 

1487 signal_data = signals_data.signals_data[0] 

1488 

1489 if array_length is None: 

1490 array_length = signal_data.number_samples 

1491 if ( 

1492 array_length != signal_data.number_samples and function != "Align-X" 

1493 ) or signal_data.number_samples == 0: 

1494 return None 

1495 

1496 time_vector_list.append(npy.array(signal_data.time_vector)) 

1497 values_list.append(npy.array(signal_data.values, dtype=npy.float64)) 

1498 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64)) 

1499 forcible = forcible and signal_data.forcible 

1500 

1501 time_vector = time_vector_list[0] 

1502 new_values = None 

1503 new_forced_values = None 

1504 

1505 match (function): 

1506 case "Align-X": 

1507 time_vector = time_vector_list[1] 

1508 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000 

1509 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000 

1510 new_values = align_x(old_time_vector, values_list[0], new_time_vector) 

1511 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector) 

1512 # case "Atan2": 

1513 # new_values = atan2(values_list[0], values_list[1]) 

1514 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1]) 

1515 case "Using-X": 

1516 if len(time_vector_list[0]) != len(time_vector_list[1]): 

1517 return None 

1518 time_vector = time_vector_list[1] 

1519 new_values = values_list[0] 

1520 new_forced_values = forced_values_list[0] 

1521 case "Mean": 

1522 new_values = mean(*values_list) 

1523 new_forced_values = mean(*forced_values_list) 

1524 case "Norm": 

1525 new_values = norm(*values_list) 

1526 new_forced_values = norm(*forced_values_list) 

1527 

1528 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1529 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1530 

1531 loop = asyncio.get_running_loop() 

1532 loop.create_task( 

1533 cls.save_function_signal( 

1534 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible 

1535 ) 

1536 ) 

1537 

1538 total_number_samples = time_vector.size 

1539 

1540 if window_max_timestamp is not None: 

1541 max_timestamp_mask = time_vector <= window_max_timestamp 

1542 time_vector = time_vector[max_timestamp_mask] 

1543 new_values = new_values[max_timestamp_mask] 

1544 new_forced_values = new_forced_values[max_timestamp_mask] 

1545 if window_min_timestamp is not None: 

1546 min_timestamp_mask = time_vector >= window_min_timestamp 

1547 time_vector = time_vector[min_timestamp_mask] 

1548 new_values = new_values[min_timestamp_mask] 

1549 new_forced_values = new_forced_values[min_timestamp_mask] 

1550 

1551 signals_data = cls( 

1552 signals_data=[ 

1553 NumericSignalData( 

1554 signal_id=function_signal_id, 

1555 forcible=forcible, 

1556 time_vector=time_vector.tolist(), 

1557 values=new_values.tolist(), 

1558 forced_values=new_forced_values.tolist(), 

1559 number_samples=time_vector.size, 

1560 number_samples_db=total_number_samples, 

1561 ) 

1562 ], 

1563 data_processing_time=0, 

1564 data_start=0, 

1565 data_end=0, 

1566 ) 

1567 

1568 return signals_data 

1569 

1570 @classmethod 

1571 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float): 

1572 signal_data_collection = get_signal_collection(signal_id, create=True) 

1573 pipeline = [] 

1574 match_filter = {} 

1575 if window_min_timestamp is not None or window_max_timestamp is not None: 

1576 match_filter["$match"] = {} 

1577 match_filter["$match"]["precise_timestamp"] = {} 

1578 if window_max_timestamp is not None: 

1579 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp 

1580 if window_min_timestamp is not None: 

1581 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp 

1582 

1583 total_number_samples = signal_data_collection.count_documents({}) 

1584 

1585 if match_filter: 

1586 pipeline.append(match_filter) 

1587 

1588 fetch_start = time.time() 

1589 

1590 samples = signal_data_collection.aggregate(pipeline).to_list() 

1591 new_time_vector = [] 

1592 new_values = [] 

1593 new_forced_values = [] 

1594 for sample in samples: 

1595 new_time_vector.append(sample["precise_timestamp"]) 

1596 new_values.append(sample["value"]) 

1597 new_forced_values.append(sample["forced_value"]) 

1598 

1599 return cls( 

1600 signals_data=[ 

1601 NumericSignalData( 

1602 signal_id=signal_id, 

1603 time_vector=new_time_vector, 

1604 values=new_values, 

1605 forced_values=new_forced_values, 

1606 number_samples=len(new_time_vector), 

1607 number_samples_db=total_number_samples, 

1608 ) 

1609 ], 

1610 data_processing_time=time.time() - fetch_start, 

1611 data_start=0, 

1612 data_end=0, 

1613 ) 

1614 

1615 @classmethod 

1616 async def save_function_signal( 

1617 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool 

1618 ): 

1619 # Insert data first so if it is requested by another user, it will be computed again 

1620 signal_collection = get_signal_collection(function_signal_id, create=True) 

1621 signal_collection.delete_many( 

1622 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}} 

1623 ) 

1624 signal_collection.insert_many( 

1625 [ 

1626 { 

1627 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]), 

1628 "precise_timestamp": time_vector[i], 

1629 "value": new_values[i], 

1630 "forced_value": new_forced_values[i], 

1631 } 

1632 for i in range(len(time_vector)) 

1633 ] 

1634 ) 

1635 

1636 signals_config_collection = get_collection(systems_database, "signals", create=True) 

1637 signals_config_collection.find_one_and_update( 

1638 {"signal_id": function_signal_id}, 

1639 { 

1640 "$set": { 

1641 "description": "", 

1642 "unit": None, 

1643 "type": "sensor", 

1644 "address": None, 

1645 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0 

1646 "transfer_function": None, 

1647 "precision_digits": None, 

1648 "digitization_function": None, 

1649 "data_type": "float", 

1650 "formula": None, 

1651 "forcible": forcible, 

1652 "commandable": False, 

1653 "broadcastable": True, 

1654 "signal_id": function_signal_id, 

1655 "post_processing": True, 

1656 }, 

1657 "$push": {"computed_phases_ids": phase.id}, 

1658 }, 

1659 upsert=True, 

1660 ) 

1661 

1662 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []): 

1663 if post_processing: 

1664 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1665 zip_buffer = io.BytesIO() 

1666 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1667 for signal_data in self.signals_data: 

1668 file_name = signal_data.signal_id 

1669 if post_processing: 

1670 phase = phases_by_id.get( 

1671 signal_data.phase_id, 

1672 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1673 ) 

1674 file_name = f"{signal_data.signal_id} ({phase.name})" 

1675 if file_format == "csv": 

1676 export_io = signal_data.csv_export() 

1677 zip_file.writestr(f"{file_name}.csv", export_io) 

1678 elif file_format == "prestoplot": 

1679 export_io = signal_data.prestoplot_export() 

1680 zip_file.writestr(f"{file_name}.tab", export_io) 

1681 else: 

1682 raise ValueError(f"Format not found. Got: {file_format}") 

1683 zip_bytes = zip_buffer.getvalue() 

1684 return zip_bytes 

1685 

1686 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []): 

1687 if post_processing: 

1688 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1689 hdf5_buffer = io.BytesIO() 

1690 custom_type_float = npy.dtype( 

1691 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1692 ) 

1693 custom_type_string = npy.dtype( 

1694 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1695 ) 

1696 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1697 for signal_data in self.signals_data: 

1698 if post_processing: 

1699 phase = phases_by_id.get( 

1700 signal_data.phase_id, 

1701 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1702 ) 

1703 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})") 

1704 else: 

1705 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1706 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1707 if signal_data.data_type == "str": 

1708 export_data = npy.array( 

1709 list( 

1710 zip( 

1711 date_vector, 

1712 signal_data.time_vector, 

1713 signal_data.values, 

1714 signal_data.forced_values, 

1715 ) 

1716 ), 

1717 dtype=custom_type_string, 

1718 ) 

1719 else: 

1720 export_data = npy.array( 

1721 list( 

1722 zip( 

1723 date_vector, 

1724 signal_data.time_vector, 

1725 signal_data.values, 

1726 signal_data.forced_values, 

1727 ) 

1728 ), 

1729 dtype=custom_type_float, 

1730 ) 

1731 signal_group["data"] = export_data 

1732 return hdf5_buffer.getvalue() 

1733 

1734 

1735class SignalStatus(TwinPadModel): 

1736 status: str = "down" 

1737 reason: str = "" 

1738 delay: float | None = None 

1739 

1740 

1741class DigitizationFunction(TwinPadModel): 

1742 bits: int | None = None 

1743 min_value: float 

1744 max_value: float 

1745 min_raw_value: float 

1746 max_raw_value: float 

1747 

1748 

1749class SignalUpdate(TwinPadModel): 

1750 value: float | str | bool | int | None = None 

1751 forced_value: float | str | bool | int | None = None 

1752 timestamp: int | None = None 

1753 

1754 

1755class SignalType(str, Enum): 

1756 command = "command" 

1757 sensor = "sensor" 

1758 external_sensor = "external_sensor" 

1759 

1760 

1761SIGNALDATA_TYPES = { 

1762 "int": NumericSignalData, 

1763 "float": NumericSignalData, 

1764 "str": StringSignalData, 

1765 "bool": NumericSignalData, 

1766 "epoch": NumericSignalData, 

1767} 

1768 

1769 

1770class Signal(GenericMongo): 

1771 collection_name: ClassVar[str] = "signals" 

1772 

1773 signal_id: str 

1774 frequency: float 

1775 unit: str | None 

1776 description: str 

1777 type: SignalType 

1778 data_type: str 

1779 precision_digits: int | None 

1780 forcible: bool 

1781 commandable: bool 

1782 broadcastable: bool 

1783 status: SignalStatus = SignalStatus() 

1784 

1785 post_processing: bool = False 

1786 computed_phases_ids: list[str] = [] 

1787 

1788 digitization_function: DigitizationFunction | None 

1789 

1790 @property 

1791 def device(self) -> Device: 

1792 device_id = self.signal_id.split(".")[0] 

1793 device = Device.get_one_by_attribute("device_id", device_id) 

1794 return device 

1795 

1796 @cached_property 

1797 def signal_data_class(self): 

1798 if self.data_type in SIGNALDATA_TYPES: 

1799 return SIGNALDATA_TYPES[self.data_type] 

1800 if self.data_type.startswith("enum"): 

1801 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1802 raise ValueError(f"Unhandled python type: {self.data_type}") 

1803 

1804 @cached_property 

1805 def python_type(self): 

1806 if self.data_type in TYPES: 

1807 return TYPES[self.data_type] 

1808 if self.data_type.startswith("enum"): 

1809 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1810 return Literal[*choices] 

1811 raise ValueError(f"Unhandled python type: {self.data_type}") 

1812 

1813 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1814 command = Command( 

1815 sent_at=time.time(), 

1816 command_type="Signal command", 

1817 user_id=current_user.id, 

1818 ) 

1819 

1820 has_input_error = False 

1821 error_message = "" 

1822 

1823 if self.data_type.startswith("enum"): 

1824 enum_options = get_args(self.python_type) 

1825 

1826 if update_dict.value is not None and update_dict.value not in enum_options: 

1827 has_input_error = True 

1828 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1829 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1830 has_input_error = True 

1831 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1832 else: 

1833 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1834 has_input_error = True 

1835 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1836 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1837 has_input_error = True 

1838 error_message += ( 

1839 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1840 ) 

1841 

1842 if has_input_error: 

1843 command.response_time = 0 

1844 command.succeeded = False 

1845 command.description = f"Tried to modify signal {self.signal_id}" 

1846 response = {"error": True, "status_code": 400, "message": error_message} 

1847 else: 

1848 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict) 

1849 command.receive_response(response) 

1850 

1851 Command.create(command) 

1852 return response 

1853 

1854 @classmethod 

1855 def get_from_signal_id(cls, signal_id) -> Self: 

1856 """Could be generic from mongo""" 

1857 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1858 if not raw_value: 

1859 return None 

1860 del raw_value["_id"] 

1861 return cls.dict_to_object(raw_value) 

1862 

1863 @classmethod 

1864 def get_all_ids(cls) -> list[str]: 

1865 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1866 

1867 return [signal["signal_id"] for signal in cursor] 

1868 

1869 @classmethod 

1870 def get_all_statuses(cls) -> list[dict]: 

1871 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

1872 

1873 return [ 

1874 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1875 for signal in cursor 

1876 ] 

1877 

1878 async def number_samples(self): 

1879 collection = get_signal_collection(signal_id=self.signal_id) 

1880 if collection is None: 

1881 return 0 

1882 

1883 number_samples = collection.estimated_document_count() 

1884 

1885 number_samples_async_collection = await get_async_collection( 

1886 systems_async_database, "number_samples", create=True, time_series=True 

1887 ) 

1888 

1889 loop = asyncio.get_running_loop() 

1890 loop.create_task( 

1891 number_samples_async_collection.insert_one( 

1892 { 

1893 "timestamp": datetime.datetime.now(pytz.UTC), 

1894 "signal_id": self.signal_id, 

1895 "number_samples": number_samples, 

1896 } 

1897 ) 

1898 ) 

1899 

1900 return number_samples 

1901 

1902 @classmethod 

1903 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]: 

1904 number_samples_by_id = {} 

1905 collections = get_signal_collections_batch(signal_ids) 

1906 number_samples_async_collection = await get_async_collection( 

1907 systems_async_database, "number_samples", create=True, time_series=True 

1908 ) 

1909 

1910 for signal_id, collection in zip(signal_ids, collections): 

1911 if collection is None: 

1912 number_samples_by_id[signal_id] = 0 

1913 continue 

1914 

1915 number_samples = collection.estimated_document_count() 

1916 

1917 number_samples_by_id[signal_id] = number_samples 

1918 

1919 now = datetime.datetime.now(pytz.UTC) 

1920 loop = asyncio.get_running_loop() 

1921 loop.create_task( 

1922 number_samples_async_collection.insert_many( 

1923 [ 

1924 { 

1925 "timestamp": now, 

1926 "signal_id": signal_id, 

1927 "number_samples": number_samples, 

1928 } 

1929 for signal_id, number_samples in number_samples_by_id.items() 

1930 ] 

1931 ) 

1932 ) 

1933 

1934 return number_samples_by_id 

1935 

1936 def sample_datasize(self): 

1937 return signals_database.command("collstats", self.signal_id)["size"] 

1938 

1939 @classmethod 

1940 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1941 result = cls.collection().aggregate( 

1942 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1943 ) 

1944 

1945 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1946 

1947 

1948class ForcedSignal(GenericMongo): 

1949 collection_name: ClassVar[str] = "forced_signals" 

1950 

1951 signal_id: str 

1952 forcing_user_id: str 

1953 forced_at: float 

1954 value: str | float 

1955 

1956 def insert(self): 

1957 insert_result = self.collection().find_one_and_update( 

1958 {"signal_id": self.signal_id}, 

1959 {"$set": self.to_dict(exclude={"id"})}, 

1960 upsert=True, 

1961 return_document=ReturnDocument.AFTER, 

1962 ) 

1963 self.id = str(insert_result["_id"]) 

1964 return self.id 

1965 

1966 @classmethod 

1967 def can_force(cls, signal_id: str, current_user: User) -> bool: 

1968 """Checks whether user can force a given signal. 

1969 

1970 :param signal_id: Signal ID of the signal to force 

1971 :type signal_id: str 

1972 :param current_user: Current user 

1973 :type current_user: User 

1974 :return: False if the signal was forced by someone else than the user, True otherwise 

1975 :rtype: bool 

1976 """ 

1977 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

1978 if forced_signal is not None: 

1979 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

1980 return False 

1981 return True 

1982 

1983 

1984class ServicesStatus(TwinPadModel): 

1985 backend: str 

1986 cloud_broker: str 

1987 time_series_database: str 

1988 signal_storage: str 

1989 heartbeat_storage: str 

1990 data_analyzer: str 

1991 

1992 @classmethod 

1993 def check(cls) -> Self: 

1994 return cls( 

1995 cloud_broker=ping(RABBITMQ_HOST), 

1996 backend="up", 

1997 time_series_database=ping(MONGO_HOST), 

1998 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1999 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

2000 data_analyzer=ping(DATA_ANALYZER_HOST), 

2001 ) 

2002 

2003 

2004def ping(host): 

2005 try: 

2006 if ping3.ping(host, timeout=0.8): 

2007 return "up" 

2008 except PermissionError: 

2009 pass 

2010 return "down" 

2011 

2012 

2013class Event(GenericMongo): 

2014 collection_name: ClassVar[str] = "events" 

2015 

2016 name: str 

2017 timestamp: float 

2018 event_rule_id: str 

2019 

2020 @computed_field 

2021 @cached_property 

2022 def event_rule(self) -> "EventRule": 

2023 return EventRule.get_from_id(self.event_rule_id) 

2024 

2025 @classmethod 

2026 def dict_to_object(cls, dict_): 

2027 """Refine to convert timestamp to datetime for mongodb.""" 

2028 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

2029 return super().dict_to_object(dict_) 

2030 

2031 

2032class TwinPadActivity(GenericMongo): 

2033 timestamp: float 

2034 amount: int 

2035 

2036 @classmethod 

2037 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

2038 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2039 number_events_collection = get_collection(systems_database, "number_events") 

2040 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

2041 items = [] 

2042 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2043 if number_events_collection is None or recompute_amount: 

2044 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

2045 number_events_collection.delete_many({}) 

2046 first_event = events_collection.find_one(sort={"timestamp": 1}) 

2047 if first_event is None: 

2048 return items 

2049 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

2050 tzinfo=pytz.UTC 

2051 ) 

2052 while last_computed_day < TODAY: 

2053 day_nb_events = events_collection.count_documents( 

2054 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2055 ) 

2056 if day_nb_events > 0: 

2057 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

2058 last_computed_day += ONE_DAY_OFFSET 

2059 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2060 if number_events_today > 0: 

2061 number_events_collection.delete_many({"timestamp": TODAY}) 

2062 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

2063 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2064 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2065 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2066 for day in number_events: 

2067 day["timestamp"] = day["timestamp"].timestamp() 

2068 items.append(cls.mongo_dict_to_object(day)) 

2069 return items 

2070 

2071 @classmethod 

2072 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2073 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2074 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2075 signals_number_samples_collection = get_collection( 

2076 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

2077 ) 

2078 items = [] 

2079 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2080 if number_samples_collection is None or recompute_amount: 

2081 number_samples_collection = get_collection( 

2082 systems_database, "number_received_samples", create=True, time_series=True 

2083 ) 

2084 number_samples_collection.delete_many({}) 

2085 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

2086 if first_sample is None: 

2087 return items 

2088 # compute from day of first found event 

2089 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

2090 tzinfo=pytz.UTC 

2091 ) 

2092 while last_computed_day < TODAY: 

2093 number_samples_request = signals_number_samples_collection.aggregate( 

2094 [ 

2095 { 

2096 "$match": { 

2097 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

2098 } 

2099 }, 

2100 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2101 ] 

2102 ).to_list() 

2103 if len(number_samples_request) == 0: 

2104 number_samples = 0 

2105 else: 

2106 number_samples = number_samples_request[0].get("number_samples", 0) 

2107 if number_samples > 0: 

2108 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

2109 last_computed_day += ONE_DAY_OFFSET 

2110 number_samples_request = signals_number_samples_collection.aggregate( 

2111 [ 

2112 {"$match": {"timestamp": {"$gte": TODAY}}}, 

2113 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2114 ] 

2115 ).to_list() 

2116 if len(number_samples_request) == 0: 

2117 number_samples_today = 0 

2118 else: 

2119 number_samples_today = number_samples_request[0].get("number_samples", 0) 

2120 if number_samples_today > 0: 

2121 number_samples_collection.delete_many({"timestamp": TODAY}) 

2122 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

2123 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2124 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2125 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2126 for day in number_events: 

2127 day["timestamp"] = day["timestamp"].timestamp() 

2128 items.append(cls.mongo_dict_to_object(day)) 

2129 return items 

2130 

2131 @classmethod 

2132 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2133 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2134 number_commands_collection = get_collection(systems_database, "number_commands") 

2135 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

2136 items = [] 

2137 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2138 if number_commands_collection is None or recompute_amount: 

2139 number_commands_collection = get_collection( 

2140 systems_database, "number_commands", create=True, time_series=True 

2141 ) 

2142 number_commands_collection.delete_many({}) 

2143 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

2144 if first_command is None: 

2145 return items 

2146 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

2147 tzinfo=pytz.UTC 

2148 ) 

2149 while last_computed_day < TODAY: 

2150 day_nb_commands = commands_collection.count_documents( 

2151 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2152 ) 

2153 if day_nb_commands > 0: 

2154 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

2155 last_computed_day += ONE_DAY_OFFSET 

2156 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2157 if number_commands_today > 0: 

2158 number_commands_collection.delete_many({"timestamp": TODAY}) 

2159 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

2160 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2161 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2162 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2163 for day in number_commands: 

2164 day["timestamp"] = day["timestamp"].timestamp() 

2165 items.append(cls.mongo_dict_to_object(day)) 

2166 return items 

2167 

2168 

2169class EventRule(GenericMongo): 

2170 collection_name: ClassVar[str] = "event_rules" 

2171 

2172 name: str 

2173 formula: str 

2174 variables: list[str] 

2175 

2176 @computed_field 

2177 @cached_property 

2178 def number_events(self) -> int: 

2179 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

2180 

2181 

2182class Company(GenericMongo): 

2183 collection_name: ClassVar[str] = "companies" 

2184 name: str 

2185 

2186 

2187class Campaign(GenericMongo): 

2188 collection_name: ClassVar[str] = "campaigns" 

2189 

2190 # Properties 

2191 id: str | None = None 

2192 name: str 

2193 description: str | None = None 

2194 

2195 

2196class Phase(GenericMongo): 

2197 collection_name: ClassVar[str] = "phases" 

2198 

2199 # Properties 

2200 id: str | None = None 

2201 name: str 

2202 description: str | None = None 

2203 start_at: float 

2204 end_at: float 

2205 

2206 # FK 

2207 campaign_id: MongoId 

2208 

2209 @classmethod 

2210 def deleteMany(cls, campaign_id): 

2211 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

2212 return delete_phases 

2213 

2214 

2215class CustomViewCreation(GenericMongo): 

2216 collection_name: ClassVar[str] = "custom_views" 

2217 

2218 name: str 

2219 configuration: list 

2220 

2221 

2222class CustomView(CustomViewCreation): 

2223 # Properties 

2224 id: str | None = None 

2225 

2226 # Foreign Key 

2227 user_id: str 

2228 

2229 

2230CustomViewUpdate = create_update_model(CustomView) 

2231 

2232 

2233class Video(GenericMongo): 

2234 collection_name: ClassVar[str] = "videos" 

2235 

2236 # Properties 

2237 name: str 

2238 ip_addr: str 

2239 username: str | None = None 

2240 password: str | None = None 

2241 

2242 # Methods 

2243 @classmethod 

2244 def get_all(cls, sort_by="_id") -> list[Self]: 

2245 items = [] 

2246 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

2247 items.append(cls.mongo_dict_to_object(dict_)) 

2248 return items 

2249 

2250 @classmethod 

2251 def get_video(cls, camera_id: ObjectId): 

2252 camera = cls.get_from_id(camera_id) 

2253 if camera is not None: 

2254 return camera.name 

2255 return None 

2256 

2257 

2258class Command(GenericMongo): 

2259 collection_name: ClassVar[str] = "commands" 

2260 

2261 # Properties 

2262 timestamp: datetime.datetime = None 

2263 sent_at: float 

2264 response_time: float = 0.0 

2265 command_type: str 

2266 description: str = "" 

2267 succeeded: bool = False 

2268 

2269 # Foreign key 

2270 user_id: str 

2271 

2272 @classmethod 

2273 def collection(cls): 

2274 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

2275 

2276 @classmethod 

2277 def create(cls, command: Self): 

2278 command = cls( 

2279 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

2280 sent_at=command.sent_at, 

2281 response_time=command.response_time, 

2282 command_type=command.command_type, 

2283 description=command.description, 

2284 succeeded=command.succeeded, 

2285 user_id=command.user_id, 

2286 ) 

2287 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2288 if new_command is None: 

2289 return None 

2290 return {"command_id": str(new_command.inserted_id)} 

2291 

2292 def receive_response(self, response: dict): 

2293 self.response_time = time.time() - self.sent_at 

2294 self.succeeded = response.get("error", True) is False 

2295 if self.description == "": 

2296 self.description += response.get("message", "").rstrip() 

2297 

2298 

2299class SignalsPresetCreation(GenericMongo): 

2300 name: str 

2301 signal_ids: list[str] 

2302 

2303 

2304class SignalsPreset(SignalsPresetCreation): 

2305 collection_name: ClassVar[str] = "signals_presets" 

2306 

2307 user_id: str 

2308 

2309 @classmethod 

2310 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2311 signals_preset = cls( 

2312 user_id=user_id, 

2313 name=signals_preset.name, 

2314 signal_ids=signals_preset.signal_ids, 

2315 ) 

2316 

2317 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2318 

2319 return str(new_signal_preset.inserted_id) 

2320 

2321 

2322SignalsPresetUpdate = create_update_model(SignalsPreset) 

2323 

2324 

2325class LineStyle(str, Enum): 

2326 solid = "solid" 

2327 dotted = "dotted" 

2328 dashed = "dashed" 

2329 

2330 

2331class SignalAppearance: 

2332 value_color: str 

2333 forced_value_color: str 

2334 

2335 

2336class GraphThemeCreation(GenericMongo): 

2337 collection_name: ClassVar[str] = "graph_themes" 

2338 

2339 name: str 

2340 signal_id: str 

2341 value_color: str = "" 

2342 forced_value_color: str = "" 

2343 value_line_style: LineStyle = LineStyle.solid 

2344 forced_value_line_style: LineStyle = LineStyle.solid 

2345 private: bool = True 

2346 

2347 

2348class PublicGraphTheme(GraphThemeCreation): 

2349 created_by_user: bool 

2350 in_user_library: bool 

2351 active_for_user: bool 

2352 

2353 _current_user_id: str = "" 

2354 

2355 @classproperty 

2356 def custom_pipeline_steps(cls) -> dict[str, list]: 

2357 return { 

2358 "created_by_user": [ 

2359 { 

2360 "$addFields": { 

2361 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2362 } 

2363 } 

2364 ], 

2365 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2366 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2367 ], 

2368 "in_user_library": [ 

2369 { 

2370 "$addFields": { 

2371 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2372 } 

2373 } 

2374 ], 

2375 "active_for_user": [ 

2376 { 

2377 "$addFields": { 

2378 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2379 } 

2380 } 

2381 ], 

2382 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2383 "active": [ 

2384 { 

2385 "$addFields": { 

2386 "active": "$$REMOVE", 

2387 } 

2388 } 

2389 ], 

2390 "creator_id": [ 

2391 { 

2392 "$addFields": { 

2393 "creator_id": "$$REMOVE", 

2394 } 

2395 } 

2396 ], 

2397 } 

2398 

2399 @classmethod 

2400 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2401 cls._current_user_id = user_id 

2402 return super().response_from_query(query) 

2403 

2404 @classmethod 

2405 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2406 query.in_user_library = "true" 

2407 return cls.response_from_query(query, user_id) 

2408 

2409 @classmethod 

2410 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2411 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2412 

2413 @classmethod 

2414 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2415 cls._current_user_id = user_id 

2416 return super().get_by_attribute(attribute_name, attribute_value) 

2417 

2418 @classmethod 

2419 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2420 cls._current_user_id = user_id 

2421 return super().get_one_by_attribute(attribute_name, attribute_value) 

2422 

2423 @classmethod 

2424 def get_all(cls, sort_by: str, user_id: str): 

2425 cls._current_user_id = user_id 

2426 return super().get_all(sort_by) 

2427 

2428 @classmethod 

2429 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2430 pipeline = [ 

2431 { 

2432 "$match": { 

2433 "active": {"$eq": user_id}, 

2434 "signal_id": {"$in": signal_ids}, 

2435 } 

2436 }, 

2437 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2438 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2439 { 

2440 "$project": { 

2441 "_id": 0, 

2442 "signal_id": 1, 

2443 "value_color": 1, 

2444 "forced_value_color": 1, 

2445 "value_line_style": 1, 

2446 "forced_value_line_style": 1, 

2447 } 

2448 }, 

2449 ] 

2450 

2451 result = {} 

2452 

2453 cursor = cls.collection().aggregate(pipeline) 

2454 for document in cursor: 

2455 signal_id = document["signal_id"] 

2456 del document["signal_id"] 

2457 result[signal_id] = document 

2458 

2459 return result 

2460 

2461 

2462GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2463 

2464 

2465class PrivateGraphTheme(GraphThemeCreation): 

2466 # private 

2467 creator_id: str 

2468 in_library: list[str] 

2469 active: list[str] 

2470 

2471 @classmethod 

2472 def create( 

2473 cls, 

2474 creator_id: str, 

2475 name: str, 

2476 signal_id: str, 

2477 value_color: str, 

2478 forced_value_color: str, 

2479 value_line_style: LineStyle, 

2480 forced_value_line_style: LineStyle, 

2481 private: bool, 

2482 ): 

2483 color_setting = cls( 

2484 creator_id=creator_id, 

2485 name=name, 

2486 signal_id=signal_id, 

2487 value_color=value_color, 

2488 forced_value_color=forced_value_color, 

2489 value_line_style=value_line_style, 

2490 forced_value_line_style=forced_value_line_style, 

2491 private=private, 

2492 in_library=[creator_id], 

2493 active=[creator_id], 

2494 ) 

2495 

2496 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2497 color_setting.id = str(new_color_setting.inserted_id) 

2498 return color_setting 

2499 

2500 def update(self, update_dict: dict, user_id: str): 

2501 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2502 if in_user_lib and user_id not in self.in_library: 

2503 self.in_library.append(user_id) 

2504 elif not in_user_lib and user_id in self.in_library: 

2505 self.in_library.remove(user_id) 

2506 update_dict["in_library"] = self.in_library 

2507 del update_dict["in_user_library"] 

2508 

2509 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2510 if active_for_user and user_id not in self.active: 

2511 self.active.append(user_id) 

2512 elif not active_for_user and user_id in self.active: 

2513 self.active.remove(user_id) 

2514 update_dict["active"] = self.active 

2515 del update_dict["active_for_user"] 

2516 

2517 if update_dict.get("created_by_user") is not None: 

2518 del update_dict["created_by_user"] 

2519 

2520 self.collection().find_one_and_update( 

2521 {"_id": ObjectId(self.id)}, 

2522 {"$set": update_dict}, 

2523 ) 

2524 

2525 return {} 

2526 

2527 

2528class DeviceStatus(str, Enum): 

2529 started = "started" 

2530 running = "running" 

2531 created = "created" 

2532 exited = "exited" 

2533 restarting = "restarting" 

2534 

2535 

2536class DeviceUpdateFromDeployer(BaseModel): 

2537 status: DeviceStatus 

2538 

2539 

2540class DeviceFromDeployerCreation(BaseModel): 

2541 name: str 

2542 description: str 

2543 

2544 

2545class DeviceFromDeployer(DeviceFromDeployerCreation): 

2546 status: DeviceStatus 

2547 device_id: DeviceId 

2548 logs: str = "" 

2549 

2550 

2551class DeviceDeployer(GenericMongo): 

2552 collection_name: ClassVar[str] = "device_deployers" 

2553 url: HttpUrl 

2554 

2555 def endpoint_url(self, endpoint): 

2556 return f"{str(self.url).rstrip('/')}/{endpoint}" 

2557 

2558 def devices(self) -> list[DeviceFromDeployer]: 

2559 devices = [] 

2560 try: 

2561 response = requests.get(self.endpoint_url("devices")) 

2562 except requests.exceptions.ConnectionError: 

2563 logger.info("connection error") 

2564 return None 

2565 if response.status_code != 200: 

2566 return None 

2567 for device_dict in response.json()["devices"]: 

2568 devices.append( 

2569 DeviceFromDeployer( 

2570 device_id=device_dict["device_id"], 

2571 name=device_dict["container_name"], 

2572 description="desc", 

2573 status=device_dict["status"], 

2574 logs=device_dict["logs"], 

2575 ) 

2576 ) 

2577 return devices 

2578 

2579 def get_device(self, device_id: DeviceId): 

2580 try: 

2581 response = requests.get(self.endpoint_url(f"devices/{device_id}")) 

2582 except requests.exceptions.ConnectionError: 

2583 return None 

2584 if response.status_code != 200: 

2585 return None 

2586 device_dict = response.json() 

2587 return DeviceFromDeployer( 

2588 device_id=device_dict["device_id"], 

2589 name=device_dict["container_name"], 

2590 description="desc", 

2591 status=device_dict["status"], 

2592 logs=device_dict["logs"], 

2593 ) 

2594 

2595 def create_device(self, device: DeviceFromDeployer) -> Device | None: 

2596 try: 

2597 response = requests.post(self.endpoint_url("devices"), json={"name": device.name}) 

2598 except requests.exceptions.ConnectionError: 

2599 return None 

2600 

2601 if response.status_code != 201: 

2602 return None 

2603 

2604 device_dict = response.json() 

2605 return DeviceFromDeployer( 

2606 device_id=device_dict["device_id"], 

2607 name="", 

2608 description="desc", 

2609 status=device_dict["status"], 

2610 ) 

2611 

2612 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None: 

2613 try: 

2614 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump()) 

2615 except requests.exceptions.ConnectionError: 

2616 return None 

2617 

2618 if response.status_code != 200: 

2619 return None 

2620 

2621 device_dict = response.json() 

2622 return Device( 

2623 device_id=device_dict["device_id"], 

2624 name="", 

2625 description="desc", 

2626 pid={}, 

2627 petri_network={}, 

2628 modes=[], 

2629 status=device_dict["status"], 

2630 ) 

2631 

2632 def delete_device(self, device_id: DeviceId) -> DeleteInfo: 

2633 try: 

2634 response = requests.delete(self.endpoint_url(f"devices/{device_id}")) 

2635 except requests.exceptions.ConnectionError: 

2636 return DeleteInfo(is_deleted=False, detail="Connection to deployer error") 

2637 if response.status_code not in [200, 202, 204]: 

2638 return DeleteInfo(is_deleted=False, detail=response.text) 

2639 

2640 return DeleteInfo(is_deleted=True, detail="") 

2641 

2642 

2643DeviceDeployerUpdate = create_update_model(DeviceDeployer)