Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%

1365 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-28 09:12 +0000

1from functools import cached_property 

2import os 

3import re 

4import io 

5import time 

6import csv 

7from typing import Self, ClassVar, Any, Literal, get_args, Annotated 

8import datetime 

9import math 

10import bisect 

11from enum import Enum 

12import logging 

13import copy 

14import asyncio 

15import requests 

16 

17import zipfile 

18import ping3 

19import pytz 

20from bson.objectid import ObjectId 

21from pymongo import ASCENDING, ReturnDocument 

22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator 

23import numpy as npy 

24import lttb 

25import h5py 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 signals_async_database, 

36 devices_states_database, 

37) 

38from twinpad_backend.responses import ListResponse 

39from twinpad_backend.messages import RabbitMQClient 

40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm 

41 

42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"] 

44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"] 

45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"] 

46 

47 

48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

53 

54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

55NUMBER_SAMPLES_DATABASE_UPDATE = 120 

56 

57logger = logging.getLogger("uvicorn.error") 

58 

59 

60class DeleteInfo(BaseModel): 

61 is_deleted: bool 

62 detail: str 

63 

64 

65class classproperty: 

66 """ 

67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

68 Found here: https://stackoverflow.com/a/76301341 

69 """ 

70 

71 def __init__(self, func): 

72 self.fget = func 

73 

74 def __get__(self, _, owner): 

75 return self.fget(owner) 

76 

77 

78def create_update_model(model): 

79 fields = {} 

80 

81 for field_name, field_annotation in model.model_fields.items(): 

82 if field_name != "id": 

83 fields[field_name] = (field_annotation.annotation | None, None) 

84 

85 query_name = model.__name__ + "Update" 

86 return create_model(query_name, **fields) 

87 

88 

89def get_utc_date_from_timestamp(timestamp: float): 

90 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

91 

92 

93def downsample_list(time_vector: list, values: list, max_number_samples: int): 

94 if len(time_vector) < max_number_samples: 

95 return time_vector, values 

96 

97 time_vector_copy = copy.deepcopy(time_vector) 

98 values_copy = copy.deepcopy(values) 

99 

100 none_group_bounds = [] 

101 none_group_index = -1 

102 index = -1 

103 # LTTB doesn't handle None values so remove them 

104 while values_copy.count(None) > 0: 

105 # Store bounds of None value groups so we can insert them back after the downsampling 

106 if (new_index := values_copy.index(None)) != index: 

107 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

108 none_group_index += 1 

109 elif len(none_group_bounds[none_group_index]) < 2: 

110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

111 else: 

112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

113 values_copy.pop(new_index) 

114 index = new_index 

115 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

116 

117 try: 

118 values_array = npy.array([time_vector_copy, values_copy]).T 

119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

120 

121 new_time_vector = interpolated_values[:, 0].tolist() 

122 new_values = interpolated_values[:, 1].tolist() 

123 except ValueError: 

124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

128 return new_time_vector, new_values_nan_to_none 

129 

130 # insert back None values at the correct timestamps 

131 for none_group in none_group_bounds: 

132 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

133 new_time_vector[start_index:start_index] = none_group 

134 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

135 

136 return new_time_vector, new_values 

137 

138 

139def is_of_type(value, wanted_type): 

140 if wanted_type is float: 

141 return isinstance(value, (int, float)) 

142 return isinstance(value, wanted_type) 

143 

144 

145# Models 

146class TwinPadModel(BaseModel): 

147 @classmethod 

148 def dict_to_object(cls, dict_): 

149 return cls.model_validate(dict_) 

150 

151 def to_dict(self, exclude=None): 

152 dict_ = self.model_dump(exclude=exclude, mode="json") 

153 return dict_ 

154 

155 

156def validate_mongo_id(v): 

157 if not ObjectId.is_valid(v): 

158 raise ValueError("Invalid MongoDB id") 

159 return str(v) 

160 

161 

162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)] 

163 

164 

165def validate_12_hex(v: str) -> str: 

166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v): 

167 raise ValueError("ID must be a 12-character hexadecimal string") 

168 return v 

169 

170 

171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)] 

172 

173 

174class GenericMongo(TwinPadModel): 

175 id: MongoId | None = None 

176 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

177 

178 @classmethod 

179 def collection(cls): 

180 return get_collection(systems_database, cls.collection_name, create=True) 

181 

182 @classmethod 

183 def response_from_query(cls, query) -> ListResponse[Self]: 

184 request_filters = query.mongodb_filter() 

185 items = [] 

186 

187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

188 sort_dict = {} 

189 for sort in query.sort_by.split(","): 

190 if ":" in sort: 

191 sort_field, sort_order = sort.split(":") 

192 sort_order = int(sort_order) 

193 else: 

194 sort_field = sort 

195 sort_order = 1 

196 sort_dict[sort_field] = sort_order 

197 

198 collection = get_collection(systems_database, cls.collection_name, create=True) 

199 total = collection.count_documents(request_filters) 

200 

201 pipeline = [] 

202 added_properties = [] 

203 if "$and" in request_filters: 

204 for request_filter in request_filters["$and"]: 

205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

206 if filtered_property in request_filter: 

207 pipeline.extend(pipeline_steps) 

208 added_properties.append(filtered_property) 

209 else: 

210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

211 if filtered_property in request_filters: 

212 pipeline.extend(pipeline_steps) 

213 added_properties.append(filtered_property) 

214 pipeline.append({"$match": request_filters}) 

215 

216 for sort_field in sort_dict.keys(): 

217 if sort_field in cls.custom_pipeline_steps: 

218 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

219 added_properties.append(sort_field) 

220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

221 

222 if (query.limit is not None) and (query.limit != 0): 

223 pipeline.append({"$limit": query.limit}) 

224 

225 for filtered_property, step in cls.custom_pipeline_steps.items(): 

226 if filtered_property not in added_properties: 

227 pipeline.extend(step) 

228 

229 cursor = collection.aggregate(pipeline) 

230 

231 for item_dict in cursor: 

232 items.append(cls.mongo_dict_to_object(item_dict)) 

233 

234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

235 

236 @classmethod 

237 def get_from_id(cls, item_id) -> Self | None: 

238 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

239 

240 @classmethod 

241 def mongo_dict_to_object(cls, mongo_dict): 

242 mongo_dict["id"] = str(mongo_dict["_id"]) 

243 del mongo_dict["_id"] 

244 return cls.dict_to_object(mongo_dict) 

245 

246 @classmethod 

247 def get_by_attribute(cls, attribute_name: str, attribute_value): 

248 """Returns all items that match the attribute with value.""" 

249 pipeline = [] 

250 if attribute_name in cls.custom_pipeline_steps: 

251 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

252 pipeline.append({"$match": {attribute_name: attribute_value}}) 

253 for key, step in cls.custom_pipeline_steps.items(): 

254 if key != attribute_name: 

255 pipeline.extend(step) 

256 items = cls.collection().aggregate(pipeline) 

257 if items is None: 

258 return None 

259 return [cls.mongo_dict_to_object(d) for d in items] 

260 

261 @classmethod 

262 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

263 pipeline = [] 

264 if attribute_name in cls.custom_pipeline_steps: 

265 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

266 pipeline.append({"$match": {attribute_name: attribute_value}}) 

267 pipeline.append({"$limit": 1}) 

268 for key, step in cls.custom_pipeline_steps.items(): 

269 if key != attribute_name: 

270 pipeline.extend(step) 

271 items = cls.collection().aggregate(pipeline).to_list() 

272 if len(items) == 0: 

273 return None 

274 return cls.mongo_dict_to_object(items[0]) 

275 

276 @classmethod 

277 def get_all(cls, sort_by="_id") -> list[Self]: 

278 items = [] 

279 pipeline = [] 

280 if sort_by in cls.custom_pipeline_steps: 

281 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

282 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

283 for key, step in cls.custom_pipeline_steps.items(): 

284 if key != sort_by: 

285 pipeline.extend(step) 

286 for dict_ in cls.collection().aggregate(pipeline): 

287 items.append(cls.mongo_dict_to_object(dict_)) 

288 return items 

289 

290 @classmethod 

291 def get_number_documents(cls): 

292 collection = get_collection(systems_database, cls.collection_name) 

293 if collection is None: 

294 return 0 

295 return collection.count_documents( 

296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]} 

297 ) 

298 

299 def insert(self): 

300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

301 self.id = str(insert_result.inserted_id) 

302 return self.id 

303 

304 def update(self, update_dict): 

305 for key, value in update_dict.items(): 

306 setattr(self, key, value) 

307 self.collection().find_one_and_update( 

308 {"_id": ObjectId(self.id)}, 

309 {"$set": update_dict}, 

310 return_document=ReturnDocument.AFTER, 

311 ) 

312 

313 return self 

314 

315 def delete(self): 

316 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

317 return result.deleted_count > 0 

318 

319 

320class User(GenericMongo): 

321 collection_name: ClassVar[str] = "users" 

322 

323 firstname: str 

324 lastname: str 

325 email: str 

326 password: str 

327 is_active: bool | None = False 

328 is_admin: bool | None = False 

329 is_connected: bool | None = False 

330 company_id: str | None = None 

331 

332 def to_dict(self, exclude: set = set()): 

333 exclude.add("password") 

334 return GenericMongo.to_dict(self, exclude=exclude) 

335 

336 @classmethod 

337 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

338 users = cls.get_all() 

339 if not users: 

340 is_admin = True 

341 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

342 user_collection = get_collection(systems_database, "users", create=True) 

343 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

344 if new_user is None: 

345 return None 

346 return {"user_id": str(new_user.inserted_id)} 

347 

348 @classmethod 

349 def update_info(cls, user: "UserUpdate", user_id: str): 

350 updated_user = cls.collection().find_one_and_update( 

351 {"_id": ObjectId(user_id)}, 

352 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

353 return_document=ReturnDocument.AFTER, 

354 ) 

355 updated_user["id"] = str(updated_user["_id"]) 

356 del (updated_user["_id"], updated_user["is_connected"]) 

357 return cls(**updated_user) 

358 

359 

360UserUpdate = create_update_model(User) 

361 

362 

363class Mode(TwinPadModel): 

364 mode_id: int 

365 name: str 

366 frequency_multiplier: float 

367 min_frequency: float 

368 

369 

370class DeviceUpdate(TwinPadModel): 

371 mode_id: int 

372 

373 

374class Device(GenericMongo): 

375 collection_name: ClassVar[str] = "devices" 

376 

377 device_id: DeviceId 

378 name: str 

379 description: str = "" 

380 modes: list[Mode] 

381 current_mode_id: int | None = None 

382 last_ping: float | None = None 

383 petri_network: Any 

384 pid: Any 

385 load: float | None = None 

386 tokens: list[int] = Field(default_factory=list) 

387 status: str 

388 

389 async def change_mode(self, update_dict, current_user: User): 

390 has_error = False 

391 

392 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

393 has_error = True 

394 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

395 elif self.current_mode_id is not None: 

396 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

397 else: 

398 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

399 command = Command( 

400 sent_at=time.time(), 

401 command_type="Mode change", 

402 description=description, 

403 user_id=current_user.id, 

404 ) 

405 

406 if has_error: 

407 command.response_time = 0 

408 command.succeeded = False 

409 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

410 else: 

411 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id) 

412 command.receive_response(response) 

413 

414 Command.create(command) 

415 return response 

416 

417 @classmethod 

418 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

419 devices_by_id = {} 

420 for signal_id in signal_ids: 

421 device_id = signal_id.split(".")[0] 

422 if device_id not in devices_by_id: 

423 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

424 return devices_by_id 

425 

426 

427class DeviceSetup(GenericMongo): 

428 collection_name: ClassVar[str] = "device_setups" 

429 

430 device_ids: list[str] 

431 active: bool = False 

432 variable_mapping: dict[str, str] 

433 

434 

435DeviceSetupUpdate = create_update_model(DeviceSetup) 

436 

437 

438class DeviceState(GenericMongo): 

439 collection_name: ClassVar[str] = "devices_states" 

440 

441 timestamp: float 

442 mode: str | None = None 

443 load: float | None = None 

444 tokens: list[int] = Field(default_factory=list) 

445 modified_properties: list[str] = Field(default_factory=list) 

446 

447 @classmethod 

448 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]: 

449 req_filter = query.mongodb_filter() 

450 items = [] 

451 if ":" in query.sort_by: 

452 sort_field, sort_order = query.sort_by.split(":") 

453 sort_order = int(sort_order) 

454 else: 

455 sort_field = query.sort_by 

456 sort_order = 1 

457 collection = get_collection(devices_states_database, device_id) 

458 if collection is None: 

459 total = 0 

460 cursor = [] 

461 else: 

462 total = collection.count_documents(req_filter) 

463 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

464 if (query.limit is not None) and (query.limit != 0): 

465 cursor = cursor.limit(query.limit) 

466 for item_dict in cursor: 

467 items.append( 

468 cls( 

469 timestamp=item_dict.get("precise_timestamp"), 

470 mode=item_dict.get("mode", None), 

471 load=item_dict.get("load", None), 

472 tokens=item_dict.get("tokens", Field(default_factory=list)), 

473 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

474 ) 

475 ) 

476 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

477 

478 

479class SignalSample(TwinPadModel): 

480 signal_id: str 

481 timestamp: float 

482 value: float | int | str | bool | None 

483 forced_value: float | int | str | bool | None = None 

484 

485 @classmethod 

486 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

487 

488 collection = get_signal_collection(signal_id) 

489 if collection is None: 

490 return None 

491 

492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

494 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

495 first_bucket = None 

496 if bucket is not None: 

497 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

498 if first_bucket is not None: 

499 sample_data = collection.find_one( 

500 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

501 ) 

502 else: 

503 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

504 

505 if sample_data is None: 

506 return None 

507 

508 timestamp = sample_data["precise_timestamp"] 

509 

510 return cls( 

511 signal_id=signal_id, 

512 timestamp=timestamp, 

513 value=sample_data.get("value", None), 

514 forced_value=sample_data.get("forced_value", None), 

515 ) 

516 

517 @classmethod 

518 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

519 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

520 

521 @classmethod 

522 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

523 collection = get_signal_collection(signal_id) 

524 if collection is None: 

525 return None 

526 

527 # Same workaround as above function, very effective to narrow down big sets of data 

528 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

529 last_bucket = None 

530 if bucket is not None: 

531 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

532 if last_bucket is not None: 

533 sample_data = collection.find_one( 

534 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

535 sort={"precise_timestamp": -1}, 

536 ) 

537 else: 

538 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

539 

540 if sample_data is None: 

541 return None 

542 

543 timestamp = sample_data["precise_timestamp"] 

544 

545 if device is None: 

546 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

547 if device is not None and device.last_ping is not None: 

548 if timestamp is None: 

549 timestamp = device.last_ping 

550 else: 

551 timestamp = max(timestamp, device.last_ping) 

552 return cls( 

553 signal_id=signal_id, 

554 timestamp=timestamp, 

555 value=sample_data.get("value", None), 

556 forced_value=sample_data.get("forced_value", None), 

557 ) 

558 

559 @classmethod 

560 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

561 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

562 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

563 

564 

565class SignalData(TwinPadModel): 

566 signal_id: str 

567 forcible: bool = True 

568 time_vector: list[float] 

569 values: list[float | int | str | None] 

570 forced_values: list[float | int | str | None] 

571 

572 data_start: float | None = None 

573 data_end: float | None = None 

574 

575 number_samples: int = 0 

576 number_samples_db: int = 0 

577 

578 db_query_time: float = 0.0 

579 init_time: float = 0.0 

580 data_processing_time: float = 0.0 

581 

582 phase_id: str | None = None 

583 

584 @classmethod 

585 def get_from_signal_id( 

586 cls, 

587 signal_id: str, 

588 min_timestamp: float = None, 

589 max_timestamp: float = None, 

590 window_min_timestamp: float = None, 

591 window_max_timestamp: float = None, 

592 interpolate_bounds: bool = True, 

593 max_documents: int = None, 

594 collection=None, 

595 ) -> Self: 

596 

597 now = time.time() 

598 

599 req_signal = {} 

600 if min_timestamp is not None: 

601 req_signal.setdefault("timestamp", {}) 

602 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

603 if max_timestamp is not None: 

604 req_signal.setdefault("timestamp", {}) 

605 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

606 

607 if collection is None: 

608 collection = get_signal_collection(signal_id) 

609 if collection is None: 

610 return cls( 

611 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

612 ) 

613 

614 db_req_start = time.time() 

615 

616 sort_step = {"$sort": {"precise_timestamp": 1}} 

617 number_results = collection.count_documents(req_signal) 

618 

619 pipeline = [] 

620 if req_signal: 

621 pipeline.append({"$match": req_signal}) # Filter data if needed 

622 

623 pipeline.extend( 

624 [ 

625 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

626 sort_step, 

627 ] 

628 ) 

629 

630 if max_documents is not None and max_documents < number_results: 

631 unsampling_ratio = math.ceil(number_results / max_documents) 

632 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

633 pipeline.extend( 

634 [ 

635 { 

636 "$setWindowFields": { 

637 "sortBy": {"precise_timestamp": 1}, 

638 "output": {"index": {"$documentNumber": {}}}, 

639 } 

640 }, 

641 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

642 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

643 {"$replaceRoot": {"newRoot": "$doc"}}, 

644 {"$unset": ["index", "group_id"]}, 

645 {"$sort": {"precise_timestamp": 1}}, 

646 ] 

647 ) 

648 

649 # logger.info(f"pipeline: %s", str(pipeline)) 

650 cursor = collection.aggregate(pipeline) 

651 db_req_time = time.time() - db_req_start 

652 

653 init_time = time.time() 

654 

655 results = cursor.to_list() 

656 time_vector = [] 

657 values = [] 

658 forced_values = [] 

659 for s in results: 

660 time_vector.append(s["precise_timestamp"]) 

661 values.append(s.get("value", None)) 

662 forced_values.append(s.get("forced_value", None)) 

663 

664 signal = Signal.get_from_signal_id(signal_id) 

665 if signal is None: 

666 return cls( 

667 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

668 ) 

669 class_ = signal.signal_data_class 

670 

671 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

672 time_vector, values, forced_values = cls.interpolate_bounds( 

673 class_, 

674 collection, 

675 signal_id, 

676 time_vector, 

677 values, 

678 forced_values, 

679 window_min_timestamp, 

680 window_max_timestamp, 

681 ) 

682 

683 if values: 

684 # TODO: check below. a bit strange 

685 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

686 # Adding last value as it should be repeated 

687 time_vector.append(now) 

688 values.append(values[-1]) 

689 forced_values.append(forced_values[-1]) 

690 

691 init_time = time.time() - init_time 

692 

693 # See line 292 for explanation 

694 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

695 first_bucket = None 

696 if bucket is not None: 

697 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

698 if first_bucket is not None: 

699 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

700 else: 

701 data_start = None 

702 

703 last_bucket = None 

704 if bucket is not None: 

705 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

706 if last_bucket is not None: 

707 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

708 else: 

709 data_end = None 

710 

711 return class_( 

712 signal_id=signal_id, 

713 forcible=signal.forcible, 

714 time_vector=time_vector, 

715 values=values, 

716 forced_values=forced_values, 

717 data_start=data_start, 

718 data_end=data_end, 

719 number_samples=len(values), 

720 number_samples_db=number_results, 

721 db_query_time=db_req_time, 

722 init_time=init_time, 

723 ) 

724 

725 @staticmethod 

726 def interpolate_bounds( 

727 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

728 ): 

729 sample_right = None 

730 # Fetching right side value & interpolation 

731 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

732 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

733 sample_right = collection.find_one( 

734 { 

735 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

736 "value": {"$exists": True}, 

737 }, 

738 sort={"precise_timestamp": -1}, 

739 ) 

740 if sample_right: 

741 if time_vector: 

742 right_sd = class_( 

743 signal_id=signal_id, 

744 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

745 values=[values[-1], sample_right.get("value", None)], 

746 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

747 ) 

748 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

749 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

750 else: 

751 max_ts_value = sample_right.get("value", None) 

752 max_ts_forced_value = sample_right.get("forced_value", None) 

753 time_vector.append(window_max_timestamp) 

754 values.append(max_ts_value) 

755 forced_values.append(max_ts_forced_value) 

756 

757 # Fetching left side value & interpolation 

758 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

759 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

760 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

761 sample_left = sample_right 

762 sample_left = collection.find_one( 

763 { 

764 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

765 "value": {"$exists": True}, 

766 }, 

767 sort={"precise_timestamp": -1}, 

768 ) 

769 

770 if sample_left: 

771 if time_vector: 

772 left_sd = class_( 

773 signal_id=signal_id, 

774 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

775 values=[sample_left["value"], values[0]], 

776 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

777 ) 

778 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

779 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

780 else: 

781 min_ts_value = sample_left.get("value", None) 

782 min_ts_forced_value = sample_left.get("forced_value", None) 

783 time_vector.insert(0, window_min_timestamp) 

784 values.insert(0, min_ts_value) 

785 forced_values.insert(0, min_ts_forced_value) 

786 

787 return time_vector, values, forced_values 

788 

789 def interpolate_values(self, new_time_vector: list[float]): 

790 return self.interpolate(new_time_vector, self.values) 

791 

792 def interpolate_forced_values(self, new_time_vector: list[float]): 

793 return self.interpolate(new_time_vector, self.forced_values) 

794 

795 def uniform_desampling(self, number_samples_max: int) -> Self: 

796 data_processing_time = time.time() 

797 if number_samples_max and self.number_samples > number_samples_max: 

798 new_time_vector = npy.linspace( 

799 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

800 ).tolist() 

801 values = self.interpolate_values(new_time_vector) 

802 forced_values = self.interpolate_forced_values(new_time_vector) 

803 time_vector = new_time_vector 

804 number_samples = len(time_vector) 

805 else: 

806 time_vector = self.time_vector 

807 number_samples = len(self.values) 

808 values = self.values[:] 

809 forced_values = self.forced_values[:] 

810 data_processing_time = time.time() - data_processing_time 

811 

812 return self.__class__( 

813 signal_id=self.signal_id, 

814 time_vector=time_vector, 

815 values=values, 

816 forced_values=forced_values, 

817 number_samples=number_samples, 

818 number_samples_db=self.number_samples, 

819 data_start=self.data_start, 

820 data_end=self.data_end, 

821 db_query_time=self.db_query_time, 

822 init_time=self.init_time, 

823 data_processing_time=self.data_processing_time + data_processing_time, 

824 phase_id=self.phase_id, 

825 ) 

826 

827 def min_max_downsampling(self, number_samples_max: int) -> Self: 

828 return self.uniform_desampling(number_samples_max) 

829 

830 def interest_window_desampling( 

831 self, 

832 window_max_number_samples: int, 

833 outside_max_number_samples: int, 

834 window_min_timestamp: float | None = None, 

835 window_max_timestamp: float | None = None, 

836 ) -> Self: 

837 """Performs a sampling in a window of interest and outside.""" 

838 

839 if not self.time_vector: 

840 return self 

841 

842 if window_min_timestamp is None: 

843 window_min_timestamp = self.time_vector[0] 

844 if window_max_timestamp is None: 

845 window_max_timestamp = self.time_vector[-1] 

846 

847 data_processing_time = time.time() 

848 

849 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

850 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

851 

852 time_vector_before = self.time_vector[:index_window_start] 

853 time_vector_window = self.time_vector[index_window_start:index_window_end] 

854 time_vector_after = self.time_vector[index_window_end:] 

855 

856 # Resampling window 

857 if time_vector_window: 

858 # Ensurring window bounds 

859 if time_vector_window[0] != window_min_timestamp: 

860 time_vector_window.insert(0, window_min_timestamp) 

861 if time_vector_window[-1] != window_max_timestamp: 

862 time_vector_window.append(window_max_timestamp) 

863 else: 

864 time_vector_window = [window_min_timestamp, window_max_timestamp] 

865 

866 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

867 # Resampling 

868 new_window_time_vector = npy.linspace( 

869 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

870 ).tolist() 

871 time_vector_window = new_window_time_vector 

872 

873 # Resampling outside 

874 number_samples_before = len(time_vector_before) 

875 number_samples_after = len(time_vector_after) 

876 if ( 

877 outside_max_number_samples is not None 

878 and (number_samples_before + number_samples_after) > outside_max_number_samples 

879 ): 

880 new_number_samples_before = min( 

881 number_samples_before, 

882 math.ceil( 

883 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

884 ), 

885 ) 

886 new_number_samples_after = min( 

887 number_samples_after, 

888 math.ceil( 

889 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

890 ), 

891 ) 

892 # Adjusting numbers as math.ceil can do +1 on sum 

893 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

894 if new_number_samples_before > new_number_samples_after: 

895 new_number_samples_before -= 1 

896 else: 

897 new_number_samples_after -= 1 

898 

899 if new_number_samples_before > 0: 

900 new_time_vector_before = npy.linspace( 

901 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

902 ).tolist() 

903 time_vector_before = new_time_vector_before 

904 

905 if new_number_samples_after > 0: 

906 new_time_vector_after = npy.linspace( 

907 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

908 ).tolist()[::-1] 

909 time_vector_after = new_time_vector_after 

910 

911 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

912 values = self.interpolate_values(new_time_vector) 

913 forced_values = self.interpolate_forced_values(new_time_vector) 

914 number_samples = len(values) 

915 

916 data_processing_time = time.time() - data_processing_time 

917 

918 return self.__class__( 

919 signal_id=self.signal_id, 

920 forcible=self.forcible, 

921 time_vector=new_time_vector, 

922 values=values, 

923 forced_values=forced_values, 

924 number_samples=number_samples, 

925 number_samples_db=self.number_samples, 

926 data_start=self.data_start, 

927 data_end=self.data_end, 

928 db_query_time=self.db_query_time, 

929 init_time=self.init_time, 

930 data_processing_time=self.data_processing_time + data_processing_time, 

931 ) 

932 

933 def zero_time_vector(self, data_start: float): 

934 data_processing_time = time.time() 

935 if len(self.time_vector) == 0: 

936 return self 

937 time_vector = npy.array(self.time_vector) - data_start 

938 data_processing_time = time.time() - data_processing_time 

939 

940 return self.__class__( 

941 signal_id=self.signal_id, 

942 time_vector=time_vector, 

943 values=self.values, 

944 forced_values=self.forced_values, 

945 number_samples=self.number_samples, 

946 number_samples_db=self.number_samples_db, 

947 data_start=time_vector[0], 

948 data_end=time_vector[-1], 

949 db_query_time=self.db_query_time, 

950 init_time=self.init_time, 

951 data_processing_time=self.data_processing_time + data_processing_time, 

952 ) 

953 

954 def csv_export(self): 

955 output = io.StringIO() 

956 writer = csv.writer(output) 

957 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

958 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

959 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

960 return output.getvalue().encode("utf-8") 

961 

962 def prestoplot_export(self): 

963 clean_signal_id = self.signal_id.replace(".", "_") 

964 if clean_signal_id[0].isnumeric(): 

965 clean_signal_id = "_" + clean_signal_id 

966 

967 output = io.StringIO() 

968 output.write("# Encoding:\tUTF-8\n") 

969 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

970 output.write("ISO8601\tnone\tnone\n") 

971 output.write(f"# Description :\t{clean_signal_id}\n") 

972 

973 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

974 output.write( 

975 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

976 ) 

977 return output.getvalue().encode("utf-8") 

978 

979 

980class NumericSignalData(SignalData): 

981 data_type: str = "float" 

982 values: list[float | int | None] 

983 forced_values: list[float | int | None] 

984 

985 def interpolate(self, new_time_vector: list[float], items): 

986 items = [npy.nan if s is None else s for s in items] 

987 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

988 

989 def uniform_desampling(self, number_samples_max: int) -> Self: 

990 data_processing_time = time.time() 

991 if number_samples_max and self.number_samples > number_samples_max: 

992 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

993 forced_values = self.interpolate_forced_values(time_vector) 

994 number_samples = len(time_vector) 

995 else: 

996 time_vector = self.time_vector 

997 number_samples = len(self.values) 

998 values = self.values[:] 

999 forced_values = self.forced_values[:] 

1000 data_processing_time = time.time() - data_processing_time 

1001 

1002 return self.__class__( 

1003 signal_id=self.signal_id, 

1004 time_vector=time_vector, 

1005 values=values, 

1006 forced_values=forced_values, 

1007 number_samples=number_samples, 

1008 number_samples_db=self.number_samples, 

1009 data_start=self.data_start, 

1010 data_end=self.data_end, 

1011 db_query_time=self.db_query_time, 

1012 init_time=self.init_time, 

1013 data_processing_time=self.data_processing_time + data_processing_time, 

1014 ) 

1015 

1016 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1017 if self.number_samples < number_samples_max: 

1018 return self 

1019 

1020 data_processing_time = time.time() 

1021 

1022 number_bins = number_samples_max // 2 

1023 

1024 time_vector = npy.array(self.time_vector, dtype=npy.float64) 

1025 values = npy.array(self.values, dtype=npy.float64) 

1026 forced_values = npy.array(self.forced_values, dtype=npy.float64) 

1027 

1028 points_per_bin = self.number_samples // number_bins 

1029 

1030 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above 

1031 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for 

1032 if self.number_samples - number_bins * points_per_bin > 1: 

1033 points_per_bin += 1 

1034 number_bins = self.number_samples // points_per_bin + 1 

1035 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan) 

1036 time_vector = npy.concatenate([time_vector, nan_points_to_add]) 

1037 values = npy.concatenate([values, nan_points_to_add]) 

1038 forced_values = npy.concatenate([forced_values, nan_points_to_add]) 

1039 

1040 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin) 

1041 values_matrix = values.reshape(number_bins, points_per_bin) 

1042 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin) 

1043 

1044 indexes_min = npy.zeros(number_bins, dtype="int64") 

1045 indexes_max = npy.zeros(number_bins, dtype="int64") 

1046 

1047 for row in range(number_bins): 

1048 min_value = values_matrix[row, 0] 

1049 max_value = values_matrix[row, 0] 

1050 for column in range(points_per_bin): 

1051 if values_matrix[row, column] < min_value: 

1052 min_value = values_matrix[row, column] 

1053 indexes_min[row] = column 

1054 elif values_matrix[row, column] > max_value: 

1055 max_value = values_matrix[row, column] 

1056 indexes_max[row] = column 

1057 

1058 row_index = npy.repeat(npy.arange(number_bins), 2) 

1059 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel() 

1060 

1061 data_processing_time = time.time() - data_processing_time 

1062 

1063 new_time_vector = timestamps_matrix[row_index, column_index] 

1064 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector) 

1065 new_values = values_matrix[row_index, column_index] 

1066 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1067 new_forced_values = forced_values_matrix[row_index, column_index] 

1068 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1069 

1070 # Make sure there are no None values for the time vector 

1071 time_vector_filter = new_time_vector != None 

1072 new_time_vector = new_time_vector[time_vector_filter] 

1073 new_values = new_values[time_vector_filter] 

1074 new_forced_values = new_forced_values[time_vector_filter] 

1075 

1076 return self.__class__( 

1077 signal_id=self.signal_id, 

1078 time_vector=new_time_vector, 

1079 values=new_values, 

1080 forced_values=new_forced_values, 

1081 number_samples=number_bins * 2, 

1082 number_samples_db=self.number_samples_db, 

1083 data_start=self.data_start, 

1084 data_end=self.data_end, 

1085 db_query_time=self.db_query_time, 

1086 init_time=self.init_time, 

1087 data_processing_time=self.data_processing_time + data_processing_time, 

1088 phase_id=self.phase_id, 

1089 ) 

1090 

1091 def interest_window_desampling( 

1092 self, 

1093 window_max_number_samples: int, 

1094 outside_max_number_samples: int, 

1095 window_min_timestamp: float | None = None, 

1096 window_max_timestamp: float | None = None, 

1097 ) -> Self: 

1098 """Performs a sampling in a window of interest and outside.""" 

1099 

1100 if not self.time_vector: 

1101 return self 

1102 

1103 if window_min_timestamp is None: 

1104 window_min_timestamp = self.time_vector[0] 

1105 if window_max_timestamp is None: 

1106 window_max_timestamp = self.time_vector[-1] 

1107 

1108 data_processing_time = time.time() 

1109 

1110 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1111 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1112 

1113 time_vector_before = self.time_vector[:index_window_start] 

1114 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1115 time_vector_after = self.time_vector[index_window_end:] 

1116 

1117 values_before = self.values[:index_window_start] 

1118 values_window = self.values[index_window_start:index_window_end] 

1119 values_after = self.values[index_window_end:] 

1120 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1121 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1122 

1123 # Resampling window 

1124 if time_vector_window: 

1125 # Ensurring window bounds 

1126 if time_vector_window[0] != window_min_timestamp: 

1127 time_vector_window.insert(0, window_min_timestamp) 

1128 values_window.insert(0, window_min_value) 

1129 if time_vector_window[-1] != window_max_timestamp: 

1130 time_vector_window.append(window_max_timestamp) 

1131 values_window.append(window_max_value) 

1132 else: 

1133 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1134 values_window = [window_min_value, window_max_value] 

1135 

1136 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

1137 # Resampling 

1138 time_vector_window, values_window = downsample_list( 

1139 time_vector_window, values_window, window_max_number_samples 

1140 ) 

1141 

1142 # Resampling outside 

1143 number_samples_before = len(time_vector_before) 

1144 number_samples_after = len(time_vector_after) 

1145 if ( 

1146 outside_max_number_samples is not None 

1147 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1148 ): 

1149 new_number_samples_before = min( 

1150 number_samples_before, 

1151 math.ceil( 

1152 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1153 ), 

1154 ) 

1155 new_number_samples_after = min( 

1156 number_samples_after, 

1157 math.ceil( 

1158 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1159 ), 

1160 ) 

1161 # Adjusting numbers as math.ceil can do +1 on sum 

1162 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1163 if new_number_samples_before > new_number_samples_after: 

1164 new_number_samples_before -= 1 

1165 else: 

1166 new_number_samples_after -= 1 

1167 

1168 if new_number_samples_before > 0: 

1169 time_vector_before, values_before = downsample_list( 

1170 time_vector_before, values_before, new_number_samples_before 

1171 ) 

1172 

1173 if new_number_samples_after > 0: 

1174 time_vector_after, values_after = downsample_list( 

1175 time_vector_after, values_after, new_number_samples_after 

1176 ) 

1177 

1178 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1179 values = values_before + values_window + values_after 

1180 forced_values = self.interpolate_forced_values(new_time_vector) 

1181 number_samples = len(values) 

1182 

1183 data_processing_time = time.time() - data_processing_time 

1184 

1185 return self.__class__( 

1186 signal_id=self.signal_id, 

1187 time_vector=new_time_vector, 

1188 values=values, 

1189 forced_values=forced_values, 

1190 number_samples=number_samples, 

1191 number_samples_db=self.number_samples, 

1192 data_start=self.data_start, 

1193 data_end=self.data_end, 

1194 db_query_time=self.db_query_time, 

1195 init_time=self.init_time, 

1196 data_processing_time=self.data_processing_time + data_processing_time, 

1197 ) 

1198 

1199 

1200class StringSignalData(SignalData): 

1201 data_type: str = "str" 

1202 values: list[str | None] 

1203 forced_values: list[str | None] 

1204 

1205 def interpolate(self, new_time_vector: list[float], items): 

1206 # Find the indices of the values in xp that are just smaller or equal to x 

1207 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1208 indices = npy.clip(indices, 0, len(items) - 1) 

1209 # Return the corresponding left string values from fp 

1210 return [items[i] for i in indices] 

1211 

1212 

1213class SignalsData(TwinPadModel): 

1214 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1215 data_processing_time: float 

1216 data_start: float | None 

1217 data_end: float | None 

1218 

1219 @classmethod 

1220 def get_from_signal_ids( 

1221 cls, 

1222 signal_ids: list[str], 

1223 min_timestamp: float = None, 

1224 max_timestamp: float = None, 

1225 window_min_timestamp: float = None, 

1226 window_max_timestamp: float = None, 

1227 interpolate_bounds: bool = True, 

1228 max_documents: int = None, 

1229 ) -> Self: 

1230 signals_data = [] 

1231 data_start = None 

1232 data_end = None 

1233 if max_timestamp is None: 

1234 max_timestamp = time.time() 

1235 data_processing_time = 0.0 

1236 

1237 signal_collections = get_signal_collections_batch(signal_ids) 

1238 

1239 for signal_id, collection in zip(signal_ids, signal_collections): 

1240 signal_data = SignalData.get_from_signal_id( 

1241 signal_id=signal_id, 

1242 min_timestamp=min_timestamp, 

1243 max_timestamp=max_timestamp, 

1244 window_min_timestamp=window_min_timestamp, 

1245 window_max_timestamp=window_max_timestamp, 

1246 interpolate_bounds=interpolate_bounds, 

1247 max_documents=max_documents, 

1248 collection=collection, 

1249 ) 

1250 data_processing_time += signal_data.data_processing_time 

1251 signals_data.append(signal_data) 

1252 if signal_data.data_start is not None: 

1253 if data_start is None: 

1254 data_start = signal_data.data_start 

1255 else: 

1256 data_start = min(signal_data.data_start, data_start) 

1257 if signal_data.data_end is not None: 

1258 if data_end is None: 

1259 data_end = signal_data.data_end 

1260 else: 

1261 data_end = max(signal_data.data_end, data_end) 

1262 

1263 return cls( 

1264 signals_data=signals_data, 

1265 data_processing_time=data_processing_time, 

1266 data_start=data_start, 

1267 data_end=data_end, 

1268 ) 

1269 

1270 @classmethod 

1271 def get_from_phase_and_signal_ids( 

1272 cls, 

1273 phases: list, 

1274 phase_sync_times: list[float | None], 

1275 signal_ids: list[str], 

1276 window_min_timestamps: list[float | None], 

1277 window_max_timestamps: list[float | None], 

1278 zero_time_vector: bool = True, 

1279 ): 

1280 signals_data: list[SignalData] = [] 

1281 computation_start = time.time() 

1282 

1283 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip( 

1284 phases, phase_sync_times, window_min_timestamps, window_max_timestamps 

1285 ): 

1286 min_timestamp = phase.start_at / 1000 

1287 max_timestamp = phase.end_at / 1000 

1288 

1289 if sync_time is None: 

1290 sync_time = min_timestamp 

1291 

1292 if window_max_timestamp is not None and window_min_timestamp is not None: 

1293 window_length = window_max_timestamp - window_min_timestamp 

1294 

1295 if window_min_timestamp != min_timestamp: 

1296 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20) 

1297 if window_max_timestamp != max_timestamp: 

1298 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20) 

1299 

1300 signal_collections = get_signal_collections_batch(signal_ids) 

1301 

1302 for signal_id, collection in zip(signal_ids, signal_collections): 

1303 signal_data = SignalData.get_from_signal_id( 

1304 signal_id, 

1305 min_timestamp, 

1306 max_timestamp, 

1307 window_min_timestamp, 

1308 window_max_timestamp, 

1309 interpolate_bounds=False, 

1310 max_documents=None, 

1311 collection=collection, 

1312 ) 

1313 

1314 if len(signal_data.time_vector) == 0: 

1315 continue 

1316 

1317 if zero_time_vector: 

1318 signal_data = signal_data.zero_time_vector(sync_time) 

1319 signal_data.phase_id = phase.id 

1320 

1321 signals_data.append(signal_data) 

1322 

1323 return cls( 

1324 signals_data=signals_data, 

1325 data_processing_time=time.time() - computation_start, 

1326 data_start=0, 

1327 data_end=0, 

1328 ) 

1329 

1330 def uniform_desampling(self, number_samples_max: int) -> Self: 

1331 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1332 return SignalsData( 

1333 signals_data=signals_data, 

1334 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1335 data_start=self.data_start, 

1336 data_end=self.data_end, 

1337 ) 

1338 

1339 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1340 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1341 return SignalsData( 

1342 signals_data=signals_data, 

1343 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1344 data_start=self.data_start, 

1345 data_end=self.data_end, 

1346 ) 

1347 

1348 def interest_window_desampling( 

1349 self, 

1350 window_max_number_samples: int, 

1351 outside_max_number_samples: int, 

1352 window_min_timestamp: float = None, 

1353 window_max_timestamp: float = None, 

1354 ) -> Self: 

1355 signals_data = [ 

1356 s.interest_window_desampling( 

1357 window_max_number_samples=window_max_number_samples, 

1358 outside_max_number_samples=outside_max_number_samples, 

1359 window_min_timestamp=window_min_timestamp, 

1360 window_max_timestamp=window_max_timestamp, 

1361 ) 

1362 for s in self.signals_data 

1363 ] 

1364 

1365 return SignalsData( 

1366 signals_data=signals_data, 

1367 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1368 data_start=self.data_start, 

1369 data_end=self.data_end, 

1370 ) 

1371 

1372 def zero_time_vector(self, data_start: float): 

1373 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data] 

1374 return SignalsData( 

1375 signals_data=signals_data, 

1376 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1377 data_start=0, 

1378 data_end=max([s.data_end for s in signals_data]), 

1379 ) 

1380 

1381 @classmethod 

1382 async def apply_single_function( 

1383 cls, 

1384 phase, 

1385 base_signal_id: str, 

1386 function: SINGLE_POST_PROCESSING_FUNCTION, 

1387 window_min_timestamp: float = None, 

1388 window_max_timestamp: float = None, 

1389 ): 

1390 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}" 

1391 

1392 processed_result_signal = Signal.get_from_signal_id(signal_id) 

1393 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids: 

1394 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp) 

1395 

1396 signals_data = cls.get_from_phase_and_signal_ids( 

1397 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False 

1398 ) 

1399 

1400 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0: 

1401 return None 

1402 

1403 new_values = None 

1404 new_forced_values = None 

1405 time_vector = npy.array(signals_data.signals_data[0].time_vector) 

1406 values = signals_data.signals_data[0].values 

1407 forced_values = signals_data.signals_data[0].forced_values 

1408 

1409 match (function): 

1410 case "Cumul": 

1411 new_values = cumul(values) 

1412 new_forced_values = cumul(forced_values) 

1413 # case "CumulDistrib": 

1414 # new_values = cumul_distrib(values) 

1415 # new_forced_values = cumul_distrib(forced_values) 

1416 case "Delta": 

1417 new_values = delta(values) 

1418 new_forced_values = delta(forced_values) 

1419 case "DeltaT": 

1420 new_values = delta(time_vector) 

1421 new_forced_values = new_values 

1422 case "Derive": 

1423 new_values = derive(time_vector, values) 

1424 new_forced_values = derive(time_vector, forced_values) 

1425 case "Integ": 

1426 new_values = integ(time_vector, values) 

1427 new_forced_values = integ(time_vector, forced_values) 

1428 

1429 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1430 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1431 

1432 loop = asyncio.get_running_loop() 

1433 loop.create_task( 

1434 cls.save_function_signal( 

1435 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible 

1436 ) 

1437 ) 

1438 

1439 if window_max_timestamp is not None: 

1440 max_timestamp_mask = time_vector <= window_max_timestamp 

1441 time_vector = time_vector[max_timestamp_mask] 

1442 new_values = new_values[max_timestamp_mask] 

1443 new_forced_values = new_forced_values[max_timestamp_mask] 

1444 if window_min_timestamp is not None: 

1445 min_timestamp_mask = time_vector >= window_min_timestamp 

1446 time_vector = time_vector[min_timestamp_mask] 

1447 new_values = new_values[min_timestamp_mask] 

1448 new_forced_values = new_forced_values[min_timestamp_mask] 

1449 

1450 signals_data.signals_data[0].time_vector = time_vector.tolist() 

1451 signals_data.signals_data[0].values = new_values.tolist() 

1452 signals_data.signals_data[0].forced_values = new_forced_values.tolist() 

1453 signals_data.signals_data[0].number_samples = time_vector.size 

1454 

1455 signals_data.signals_data[0].signal_id = signal_id 

1456 

1457 return signals_data 

1458 

1459 @classmethod 

1460 async def apply_multiple_function( 

1461 cls, 

1462 phases: list, 

1463 signal_ids: list, 

1464 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

1465 window_min_timestamp: float = None, 

1466 window_max_timestamp: float = None, 

1467 ): 

1468 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION): 

1469 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}" 

1470 else: 

1471 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}" 

1472 

1473 active_phase = phases[0] 

1474 if function in {"Align-X", "Using-X"}: 

1475 active_phase = phases[1] 

1476 

1477 processed_result_signal = Signal.get_from_signal_id(function_signal_id) 

1478 if processed_result_signal is not None and ( 

1479 active_phase.id in processed_result_signal.computed_phases_ids 

1480 ): # If signal has been computed for the correct phase 

1481 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp) 

1482 

1483 array_length = None 

1484 time_vector_list = [] 

1485 values_list = [] 

1486 forced_values_list = [] 

1487 forcible = True 

1488 for phase, signal_id in zip(phases, signal_ids): 

1489 signals_data = cls.get_from_phase_and_signal_ids( 

1490 [phase], [None], [signal_id], [None], [None], zero_time_vector=False 

1491 ) 

1492 

1493 if len(signals_data.signals_data) == 0: 

1494 return None 

1495 

1496 signal_data = signals_data.signals_data[0] 

1497 

1498 if array_length is None: 

1499 array_length = signal_data.number_samples 

1500 if ( 

1501 array_length != signal_data.number_samples and function != "Align-X" 

1502 ) or signal_data.number_samples == 0: 

1503 return None 

1504 

1505 time_vector_list.append(npy.array(signal_data.time_vector)) 

1506 values_list.append(npy.array(signal_data.values, dtype=npy.float64)) 

1507 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64)) 

1508 forcible = forcible and signal_data.forcible 

1509 

1510 time_vector = time_vector_list[0] 

1511 new_values = None 

1512 new_forced_values = None 

1513 

1514 match (function): 

1515 case "Align-X": 

1516 time_vector = time_vector_list[1] 

1517 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000 

1518 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000 

1519 new_values = align_x(old_time_vector, values_list[0], new_time_vector) 

1520 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector) 

1521 # case "Atan2": 

1522 # new_values = atan2(values_list[0], values_list[1]) 

1523 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1]) 

1524 case "Using-X": 

1525 if len(time_vector_list[0]) != len(time_vector_list[1]): 

1526 return None 

1527 time_vector = time_vector_list[1] 

1528 new_values = values_list[0] 

1529 new_forced_values = forced_values_list[0] 

1530 case "Mean": 

1531 new_values = mean(*values_list) 

1532 new_forced_values = mean(*forced_values_list) 

1533 case "Norm": 

1534 new_values = norm(*values_list) 

1535 new_forced_values = norm(*forced_values_list) 

1536 

1537 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1538 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1539 

1540 loop = asyncio.get_running_loop() 

1541 loop.create_task( 

1542 cls.save_function_signal( 

1543 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible 

1544 ) 

1545 ) 

1546 

1547 total_number_samples = time_vector.size 

1548 

1549 if window_max_timestamp is not None: 

1550 max_timestamp_mask = time_vector <= window_max_timestamp 

1551 time_vector = time_vector[max_timestamp_mask] 

1552 new_values = new_values[max_timestamp_mask] 

1553 new_forced_values = new_forced_values[max_timestamp_mask] 

1554 if window_min_timestamp is not None: 

1555 min_timestamp_mask = time_vector >= window_min_timestamp 

1556 time_vector = time_vector[min_timestamp_mask] 

1557 new_values = new_values[min_timestamp_mask] 

1558 new_forced_values = new_forced_values[min_timestamp_mask] 

1559 

1560 signals_data = cls( 

1561 signals_data=[ 

1562 NumericSignalData( 

1563 signal_id=function_signal_id, 

1564 forcible=forcible, 

1565 time_vector=time_vector.tolist(), 

1566 values=new_values.tolist(), 

1567 forced_values=new_forced_values.tolist(), 

1568 number_samples=time_vector.size, 

1569 number_samples_db=total_number_samples, 

1570 ) 

1571 ], 

1572 data_processing_time=0, 

1573 data_start=0, 

1574 data_end=0, 

1575 ) 

1576 

1577 return signals_data 

1578 

1579 @classmethod 

1580 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float): 

1581 signal_data_collection = get_signal_collection(signal_id, create=True) 

1582 pipeline = [] 

1583 match_filter = {} 

1584 if window_min_timestamp is not None or window_max_timestamp is not None: 

1585 match_filter["$match"] = {} 

1586 match_filter["$match"]["precise_timestamp"] = {} 

1587 if window_max_timestamp is not None: 

1588 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp 

1589 if window_min_timestamp is not None: 

1590 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp 

1591 

1592 total_number_samples = signal_data_collection.count_documents({}) 

1593 

1594 if match_filter: 

1595 pipeline.append(match_filter) 

1596 

1597 fetch_start = time.time() 

1598 

1599 samples = signal_data_collection.aggregate(pipeline).to_list() 

1600 new_time_vector = [] 

1601 new_values = [] 

1602 new_forced_values = [] 

1603 for sample in samples: 

1604 new_time_vector.append(sample["precise_timestamp"]) 

1605 new_values.append(sample["value"]) 

1606 new_forced_values.append(sample["forced_value"]) 

1607 

1608 return cls( 

1609 signals_data=[ 

1610 NumericSignalData( 

1611 signal_id=signal_id, 

1612 time_vector=new_time_vector, 

1613 values=new_values, 

1614 forced_values=new_forced_values, 

1615 number_samples=len(new_time_vector), 

1616 number_samples_db=total_number_samples, 

1617 ) 

1618 ], 

1619 data_processing_time=time.time() - fetch_start, 

1620 data_start=0, 

1621 data_end=0, 

1622 ) 

1623 

1624 @classmethod 

1625 async def save_function_signal( 

1626 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool 

1627 ): 

1628 # Insert data first so if it is requested by another user, it will be computed again 

1629 signal_collection = get_signal_collection(function_signal_id, create=True) 

1630 signal_collection.delete_many( 

1631 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}} 

1632 ) 

1633 signal_collection.insert_many( 

1634 [ 

1635 { 

1636 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]), 

1637 "precise_timestamp": time_vector[i], 

1638 "value": new_values[i], 

1639 "forced_value": new_forced_values[i], 

1640 } 

1641 for i in range(len(time_vector)) 

1642 ] 

1643 ) 

1644 

1645 signals_config_collection = get_collection(systems_database, "signals", create=True) 

1646 signals_config_collection.find_one_and_update( 

1647 {"signal_id": function_signal_id}, 

1648 { 

1649 "$set": { 

1650 "description": "", 

1651 "unit": None, 

1652 "type": "sensor", 

1653 "address": None, 

1654 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0 

1655 "transfer_function": None, 

1656 "precision_digits": None, 

1657 "digitization_function": None, 

1658 "data_type": "float", 

1659 "formula": None, 

1660 "forcible": forcible, 

1661 "commandable": False, 

1662 "broadcastable": True, 

1663 "signal_id": function_signal_id, 

1664 "post_processing": True, 

1665 }, 

1666 "$push": {"computed_phases_ids": phase.id}, 

1667 }, 

1668 upsert=True, 

1669 ) 

1670 

1671 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []): 

1672 if post_processing: 

1673 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1674 zip_buffer = io.BytesIO() 

1675 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1676 for signal_data in self.signals_data: 

1677 file_name = signal_data.signal_id 

1678 if post_processing: 

1679 phase = phases_by_id.get( 

1680 signal_data.phase_id, 

1681 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1682 ) 

1683 file_name = f"{signal_data.signal_id} ({phase.name})" 

1684 if file_format == "csv": 

1685 export_io = signal_data.csv_export() 

1686 zip_file.writestr(f"{file_name}.csv", export_io) 

1687 elif file_format == "prestoplot": 

1688 export_io = signal_data.prestoplot_export() 

1689 zip_file.writestr(f"{file_name}.tab", export_io) 

1690 else: 

1691 raise ValueError(f"Format not found. Got: {file_format}") 

1692 zip_bytes = zip_buffer.getvalue() 

1693 return zip_bytes 

1694 

1695 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []): 

1696 if post_processing: 

1697 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1698 hdf5_buffer = io.BytesIO() 

1699 custom_type_float = npy.dtype( 

1700 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1701 ) 

1702 custom_type_string = npy.dtype( 

1703 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1704 ) 

1705 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1706 for signal_data in self.signals_data: 

1707 if post_processing: 

1708 phase = phases_by_id.get( 

1709 signal_data.phase_id, 

1710 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1711 ) 

1712 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})") 

1713 else: 

1714 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1715 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1716 if signal_data.data_type == "str": 

1717 export_data = npy.array( 

1718 list( 

1719 zip( 

1720 date_vector, 

1721 signal_data.time_vector, 

1722 signal_data.values, 

1723 signal_data.forced_values, 

1724 ) 

1725 ), 

1726 dtype=custom_type_string, 

1727 ) 

1728 else: 

1729 export_data = npy.array( 

1730 list( 

1731 zip( 

1732 date_vector, 

1733 signal_data.time_vector, 

1734 signal_data.values, 

1735 signal_data.forced_values, 

1736 ) 

1737 ), 

1738 dtype=custom_type_float, 

1739 ) 

1740 signal_group["data"] = export_data 

1741 return hdf5_buffer.getvalue() 

1742 

1743 

1744class SignalStatus(TwinPadModel): 

1745 status: str = "down" 

1746 reason: str = "" 

1747 delay: float | None = None 

1748 

1749 

1750class DigitizationFunction(TwinPadModel): 

1751 bits: int | None = None 

1752 min_value: float 

1753 max_value: float 

1754 min_raw_value: float 

1755 max_raw_value: float 

1756 

1757 

1758class SignalUpdate(TwinPadModel): 

1759 value: float | str | bool | int | None = None 

1760 forced_value: float | str | bool | int | None = None 

1761 timestamp: int | None = None 

1762 

1763 

1764class SignalType(str, Enum): 

1765 command = "command" 

1766 sensor = "sensor" 

1767 external_sensor = "external_sensor" 

1768 

1769 

1770SIGNALDATA_TYPES = { 

1771 "int": NumericSignalData, 

1772 "float": NumericSignalData, 

1773 "str": StringSignalData, 

1774 "bool": NumericSignalData, 

1775 "epoch": NumericSignalData, 

1776} 

1777 

1778 

1779class Signal(GenericMongo): 

1780 collection_name: ClassVar[str] = "signals" 

1781 

1782 signal_id: str 

1783 frequency: float 

1784 unit: str | None 

1785 description: str 

1786 type: SignalType 

1787 data_type: str 

1788 precision_digits: int | None 

1789 forcible: bool 

1790 commandable: bool 

1791 broadcastable: bool 

1792 status: SignalStatus = SignalStatus() 

1793 

1794 post_processing: bool = False 

1795 computed_phases_ids: list[str] = [] 

1796 

1797 digitization_function: DigitizationFunction | None 

1798 

1799 @property 

1800 def device(self) -> Device: 

1801 device_id = self.signal_id.split(".")[0] 

1802 device = Device.get_one_by_attribute("device_id", device_id) 

1803 return device 

1804 

1805 @cached_property 

1806 def signal_data_class(self): 

1807 if self.data_type in SIGNALDATA_TYPES: 

1808 return SIGNALDATA_TYPES[self.data_type] 

1809 if self.data_type.startswith("enum"): 

1810 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1811 raise ValueError(f"Unhandled python type: {self.data_type}") 

1812 

1813 @cached_property 

1814 def python_type(self): 

1815 if self.data_type in TYPES: 

1816 return TYPES[self.data_type] 

1817 if self.data_type.startswith("enum"): 

1818 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1819 return Literal[*choices] 

1820 raise ValueError(f"Unhandled python type: {self.data_type}") 

1821 

1822 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1823 command = Command( 

1824 sent_at=time.time(), 

1825 command_type="Signal command", 

1826 user_id=current_user.id, 

1827 ) 

1828 

1829 has_input_error = False 

1830 error_message = "" 

1831 

1832 if self.data_type.startswith("enum"): 

1833 enum_options = get_args(self.python_type) 

1834 

1835 if update_dict.value is not None and update_dict.value not in enum_options: 

1836 has_input_error = True 

1837 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1838 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1839 has_input_error = True 

1840 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1841 else: 

1842 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1843 has_input_error = True 

1844 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1845 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1846 has_input_error = True 

1847 error_message += ( 

1848 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1849 ) 

1850 

1851 if has_input_error: 

1852 command.response_time = 0 

1853 command.succeeded = False 

1854 command.description = f"Tried to modify signal {self.signal_id}" 

1855 response = {"error": True, "status_code": 400, "message": error_message} 

1856 else: 

1857 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict) 

1858 command.receive_response(response) 

1859 

1860 Command.create(command) 

1861 return response 

1862 

1863 @classmethod 

1864 def get_from_signal_id(cls, signal_id) -> Self: 

1865 """Could be generic from mongo""" 

1866 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1867 if not raw_value: 

1868 return None 

1869 del raw_value["_id"] 

1870 return cls.dict_to_object(raw_value) 

1871 

1872 @classmethod 

1873 def get_all_ids(cls) -> list[str]: 

1874 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1875 

1876 return [signal["signal_id"] for signal in cursor] 

1877 

1878 @classmethod 

1879 def get_all_statuses(cls) -> list[dict]: 

1880 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

1881 

1882 return [ 

1883 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1884 for signal in cursor 

1885 ] 

1886 

1887 async def number_samples(self): 

1888 collection = get_signal_collection(signal_id=self.signal_id) 

1889 if collection is None: 

1890 return 0 

1891 

1892 number_samples = collection.estimated_document_count() 

1893 

1894 number_samples_async_collection = await get_async_collection( 

1895 systems_async_database, "number_samples", create=True, time_series=True 

1896 ) 

1897 

1898 loop = asyncio.get_running_loop() 

1899 loop.create_task( 

1900 number_samples_async_collection.insert_one( 

1901 { 

1902 "timestamp": datetime.datetime.now(pytz.UTC), 

1903 "signal_id": self.signal_id, 

1904 "number_samples": number_samples, 

1905 } 

1906 ) 

1907 ) 

1908 

1909 return number_samples 

1910 

1911 @classmethod 

1912 def total_number_samples(cls) -> int: 

1913 TwinPadActivity.get_number_samples_timeframe(0, 0, False) 

1914 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1915 

1916 if number_samples_collection is None: 

1917 return 0 

1918 

1919 result = number_samples_collection.aggregate( 

1920 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}] 

1921 ) 

1922 

1923 result = result.to_list() 

1924 if len(result) == 0: 

1925 return 0 

1926 return result[0]["amount"] 

1927 

1928 def sample_datasize(self): 

1929 return signals_database.command("collstats", self.signal_id)["size"] 

1930 

1931 @classmethod 

1932 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1933 result = cls.collection().aggregate( 

1934 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1935 ) 

1936 

1937 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1938 

1939 

1940class ForcedSignal(GenericMongo): 

1941 collection_name: ClassVar[str] = "forced_signals" 

1942 

1943 signal_id: str 

1944 forcing_user_id: str 

1945 forced_at: float 

1946 value: str | float 

1947 

1948 def insert(self): 

1949 insert_result = self.collection().find_one_and_update( 

1950 {"signal_id": self.signal_id}, 

1951 {"$set": self.to_dict(exclude={"id"})}, 

1952 upsert=True, 

1953 return_document=ReturnDocument.AFTER, 

1954 ) 

1955 self.id = str(insert_result["_id"]) 

1956 return self.id 

1957 

1958 @classmethod 

1959 def can_force(cls, signal_id: str, current_user: User) -> bool: 

1960 """Checks whether user can force a given signal. 

1961 

1962 :param signal_id: Signal ID of the signal to force 

1963 :type signal_id: str 

1964 :param current_user: Current user 

1965 :type current_user: User 

1966 :return: False if the signal was forced by someone else than the user, True otherwise 

1967 :rtype: bool 

1968 """ 

1969 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

1970 if forced_signal is not None: 

1971 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

1972 return False 

1973 return True 

1974 

1975 

1976class ServicesStatus(TwinPadModel): 

1977 backend: str 

1978 cloud_broker: str 

1979 time_series_database: str 

1980 signal_storage: str 

1981 heartbeat_storage: str 

1982 data_analyzer: str 

1983 

1984 @classmethod 

1985 def check(cls) -> Self: 

1986 return cls( 

1987 cloud_broker=ping(RABBITMQ_HOST), 

1988 backend="up", 

1989 time_series_database=ping(MONGO_HOST), 

1990 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1991 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1992 data_analyzer=ping(DATA_ANALYZER_HOST), 

1993 ) 

1994 

1995 

1996def ping(host): 

1997 try: 

1998 if ping3.ping(host, timeout=0.8): 

1999 return "up" 

2000 except PermissionError: 

2001 pass 

2002 return "down" 

2003 

2004 

2005class Event(GenericMongo): 

2006 collection_name: ClassVar[str] = "events" 

2007 

2008 name: str 

2009 timestamp: float 

2010 event_rule_id: str 

2011 

2012 @computed_field 

2013 @cached_property 

2014 def event_rule(self) -> "EventRule": 

2015 return EventRule.get_from_id(self.event_rule_id) 

2016 

2017 @classmethod 

2018 def dict_to_object(cls, dict_): 

2019 """Refine to convert timestamp to datetime for mongodb.""" 

2020 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

2021 return super().dict_to_object(dict_) 

2022 

2023 

2024class TwinPadActivity(GenericMongo): 

2025 timestamp: float 

2026 amount: int 

2027 

2028 @classmethod 

2029 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

2030 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2031 number_events_collection = get_collection(systems_database, "number_events") 

2032 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

2033 items = [] 

2034 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2035 if number_events_collection is None or recompute_amount: 

2036 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

2037 number_events_collection.delete_many({}) 

2038 first_event = events_collection.find_one(sort={"timestamp": 1}) 

2039 if first_event is None: 

2040 return items 

2041 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

2042 tzinfo=pytz.UTC 

2043 ) 

2044 while last_computed_day < TODAY: 

2045 day_nb_events = events_collection.count_documents( 

2046 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2047 ) 

2048 if day_nb_events > 0: 

2049 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

2050 last_computed_day += ONE_DAY_OFFSET 

2051 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2052 if number_events_today > 0: 

2053 number_events_collection.delete_many({"timestamp": TODAY}) 

2054 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

2055 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2056 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2057 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2058 for day in number_events: 

2059 day["timestamp"] = day["timestamp"].timestamp() 

2060 items.append(cls.mongo_dict_to_object(day)) 

2061 return items 

2062 

2063 @classmethod 

2064 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2065 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2066 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2067 signals_number_samples_collection = get_collection( 

2068 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

2069 ) 

2070 items = [] 

2071 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2072 if number_samples_collection is None or recompute_amount: 

2073 number_samples_collection = get_collection( 

2074 systems_database, "number_received_samples", create=True, time_series=True 

2075 ) 

2076 number_samples_collection.delete_many({}) 

2077 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

2078 if first_sample is None: 

2079 return items 

2080 # compute from day of first found event 

2081 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

2082 tzinfo=pytz.UTC 

2083 ) 

2084 while last_computed_day < TODAY: 

2085 number_samples_request = signals_number_samples_collection.aggregate( 

2086 [ 

2087 { 

2088 "$match": { 

2089 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

2090 } 

2091 }, 

2092 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2093 ] 

2094 ).to_list() 

2095 if len(number_samples_request) == 0: 

2096 number_samples = 0 

2097 else: 

2098 number_samples = number_samples_request[0].get("number_samples", 0) 

2099 if number_samples > 0: 

2100 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

2101 last_computed_day += ONE_DAY_OFFSET 

2102 number_samples_request = signals_number_samples_collection.aggregate( 

2103 [ 

2104 {"$match": {"timestamp": {"$gte": TODAY}}}, 

2105 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2106 ] 

2107 ).to_list() 

2108 if len(number_samples_request) == 0: 

2109 number_samples_today = 0 

2110 else: 

2111 number_samples_today = number_samples_request[0].get("number_samples", 0) 

2112 if number_samples_today > 0: 

2113 number_samples_collection.delete_many({"timestamp": TODAY}) 

2114 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

2115 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2116 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2117 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2118 for day in number_events: 

2119 day["timestamp"] = day["timestamp"].timestamp() 

2120 items.append(cls.mongo_dict_to_object(day)) 

2121 return items 

2122 

2123 @classmethod 

2124 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2125 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2126 number_commands_collection = get_collection(systems_database, "number_commands") 

2127 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

2128 items = [] 

2129 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2130 if number_commands_collection is None or recompute_amount: 

2131 number_commands_collection = get_collection( 

2132 systems_database, "number_commands", create=True, time_series=True 

2133 ) 

2134 number_commands_collection.delete_many({}) 

2135 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

2136 if first_command is None: 

2137 return items 

2138 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

2139 tzinfo=pytz.UTC 

2140 ) 

2141 while last_computed_day < TODAY: 

2142 day_nb_commands = commands_collection.count_documents( 

2143 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2144 ) 

2145 if day_nb_commands > 0: 

2146 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

2147 last_computed_day += ONE_DAY_OFFSET 

2148 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2149 if number_commands_today > 0: 

2150 number_commands_collection.delete_many({"timestamp": TODAY}) 

2151 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

2152 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2153 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2154 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2155 for day in number_commands: 

2156 day["timestamp"] = day["timestamp"].timestamp() 

2157 items.append(cls.mongo_dict_to_object(day)) 

2158 return items 

2159 

2160 

2161class EventRule(GenericMongo): 

2162 collection_name: ClassVar[str] = "event_rules" 

2163 

2164 name: str 

2165 formula: str 

2166 variables: list[str] 

2167 

2168 @computed_field 

2169 @cached_property 

2170 def number_events(self) -> int: 

2171 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

2172 

2173 

2174class Company(GenericMongo): 

2175 collection_name: ClassVar[str] = "companies" 

2176 name: str 

2177 

2178 

2179class Campaign(GenericMongo): 

2180 collection_name: ClassVar[str] = "campaigns" 

2181 

2182 # Properties 

2183 id: str | None = None 

2184 name: str 

2185 description: str | None = None 

2186 

2187 

2188class Phase(GenericMongo): 

2189 collection_name: ClassVar[str] = "phases" 

2190 

2191 # Properties 

2192 id: str | None = None 

2193 name: str 

2194 description: str | None = None 

2195 start_at: float 

2196 end_at: float 

2197 

2198 # FK 

2199 campaign_id: MongoId 

2200 

2201 @classmethod 

2202 def deleteMany(cls, campaign_id): 

2203 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

2204 return delete_phases 

2205 

2206 

2207class CustomViewCreation(GenericMongo): 

2208 collection_name: ClassVar[str] = "custom_views" 

2209 

2210 name: str 

2211 configuration: list 

2212 

2213 

2214class CustomView(CustomViewCreation): 

2215 # Properties 

2216 id: str | None = None 

2217 

2218 # Foreign Key 

2219 user_id: str 

2220 

2221 

2222CustomViewUpdate = create_update_model(CustomView) 

2223 

2224 

2225class Video(GenericMongo): 

2226 collection_name: ClassVar[str] = "videos" 

2227 

2228 # Properties 

2229 name: str 

2230 ip_addr: str 

2231 username: str | None = None 

2232 password: str | None = None 

2233 

2234 # Methods 

2235 @classmethod 

2236 def get_all(cls, sort_by="_id") -> list[Self]: 

2237 items = [] 

2238 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

2239 items.append(cls.mongo_dict_to_object(dict_)) 

2240 return items 

2241 

2242 @classmethod 

2243 def get_video(cls, camera_id: ObjectId): 

2244 camera = cls.get_from_id(camera_id) 

2245 if camera is not None: 

2246 return camera.name 

2247 return None 

2248 

2249 

2250class Command(GenericMongo): 

2251 collection_name: ClassVar[str] = "commands" 

2252 

2253 # Properties 

2254 timestamp: datetime.datetime = None 

2255 sent_at: float 

2256 response_time: float = 0.0 

2257 command_type: str 

2258 description: str = "" 

2259 succeeded: bool = False 

2260 

2261 # Foreign key 

2262 user_id: str 

2263 

2264 @classmethod 

2265 def collection(cls): 

2266 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

2267 

2268 @classmethod 

2269 def create(cls, command: Self): 

2270 command = cls( 

2271 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

2272 sent_at=command.sent_at, 

2273 response_time=command.response_time, 

2274 command_type=command.command_type, 

2275 description=command.description, 

2276 succeeded=command.succeeded, 

2277 user_id=command.user_id, 

2278 ) 

2279 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2280 if new_command is None: 

2281 return None 

2282 return {"command_id": str(new_command.inserted_id)} 

2283 

2284 def receive_response(self, response: dict): 

2285 self.response_time = time.time() - self.sent_at 

2286 self.succeeded = response.get("error", True) is False 

2287 if self.description == "": 

2288 self.description += response.get("message", "").rstrip() 

2289 

2290 

2291class SignalsPresetCreation(GenericMongo): 

2292 name: str 

2293 signal_ids: list[str] 

2294 

2295 

2296class SignalsPreset(SignalsPresetCreation): 

2297 collection_name: ClassVar[str] = "signals_presets" 

2298 

2299 user_id: str 

2300 

2301 @classmethod 

2302 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2303 signals_preset = cls( 

2304 user_id=user_id, 

2305 name=signals_preset.name, 

2306 signal_ids=signals_preset.signal_ids, 

2307 ) 

2308 

2309 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2310 

2311 return str(new_signal_preset.inserted_id) 

2312 

2313 

2314SignalsPresetUpdate = create_update_model(SignalsPreset) 

2315 

2316 

2317class LineStyle(str, Enum): 

2318 solid = "solid" 

2319 dotted = "dotted" 

2320 dashed = "dashed" 

2321 

2322 

2323class SignalAppearance: 

2324 value_color: str 

2325 forced_value_color: str 

2326 

2327 

2328class GraphThemeCreation(GenericMongo): 

2329 collection_name: ClassVar[str] = "graph_themes" 

2330 

2331 name: str 

2332 signal_id: str 

2333 value_color: str = "" 

2334 forced_value_color: str = "" 

2335 value_line_style: LineStyle = LineStyle.solid 

2336 forced_value_line_style: LineStyle = LineStyle.solid 

2337 private: bool = True 

2338 

2339 

2340class PublicGraphTheme(GraphThemeCreation): 

2341 created_by_user: bool 

2342 in_user_library: bool 

2343 active_for_user: bool 

2344 

2345 _current_user_id: str = "" 

2346 

2347 @classproperty 

2348 def custom_pipeline_steps(cls) -> dict[str, list]: 

2349 return { 

2350 "created_by_user": [ 

2351 { 

2352 "$addFields": { 

2353 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2354 } 

2355 } 

2356 ], 

2357 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2358 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2359 ], 

2360 "in_user_library": [ 

2361 { 

2362 "$addFields": { 

2363 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2364 } 

2365 } 

2366 ], 

2367 "active_for_user": [ 

2368 { 

2369 "$addFields": { 

2370 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2371 } 

2372 } 

2373 ], 

2374 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2375 "active": [ 

2376 { 

2377 "$addFields": { 

2378 "active": "$$REMOVE", 

2379 } 

2380 } 

2381 ], 

2382 "creator_id": [ 

2383 { 

2384 "$addFields": { 

2385 "creator_id": "$$REMOVE", 

2386 } 

2387 } 

2388 ], 

2389 } 

2390 

2391 @classmethod 

2392 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2393 cls._current_user_id = user_id 

2394 return super().response_from_query(query) 

2395 

2396 @classmethod 

2397 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2398 query.in_user_library = "true" 

2399 return cls.response_from_query(query, user_id) 

2400 

2401 @classmethod 

2402 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2403 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2404 

2405 @classmethod 

2406 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2407 cls._current_user_id = user_id 

2408 return super().get_by_attribute(attribute_name, attribute_value) 

2409 

2410 @classmethod 

2411 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2412 cls._current_user_id = user_id 

2413 return super().get_one_by_attribute(attribute_name, attribute_value) 

2414 

2415 @classmethod 

2416 def get_all(cls, sort_by: str, user_id: str): 

2417 cls._current_user_id = user_id 

2418 return super().get_all(sort_by) 

2419 

2420 @classmethod 

2421 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2422 pipeline = [ 

2423 { 

2424 "$match": { 

2425 "active": {"$eq": user_id}, 

2426 "signal_id": {"$in": signal_ids}, 

2427 } 

2428 }, 

2429 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2430 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2431 { 

2432 "$project": { 

2433 "_id": 0, 

2434 "signal_id": 1, 

2435 "value_color": 1, 

2436 "forced_value_color": 1, 

2437 "value_line_style": 1, 

2438 "forced_value_line_style": 1, 

2439 } 

2440 }, 

2441 ] 

2442 

2443 result = {} 

2444 

2445 cursor = cls.collection().aggregate(pipeline) 

2446 for document in cursor: 

2447 signal_id = document["signal_id"] 

2448 del document["signal_id"] 

2449 result[signal_id] = document 

2450 

2451 return result 

2452 

2453 

2454GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2455 

2456 

2457class PrivateGraphTheme(GraphThemeCreation): 

2458 # private 

2459 creator_id: str 

2460 in_library: list[str] 

2461 active: list[str] 

2462 

2463 @classmethod 

2464 def create( 

2465 cls, 

2466 creator_id: str, 

2467 name: str, 

2468 signal_id: str, 

2469 value_color: str, 

2470 forced_value_color: str, 

2471 value_line_style: LineStyle, 

2472 forced_value_line_style: LineStyle, 

2473 private: bool, 

2474 ): 

2475 color_setting = cls( 

2476 creator_id=creator_id, 

2477 name=name, 

2478 signal_id=signal_id, 

2479 value_color=value_color, 

2480 forced_value_color=forced_value_color, 

2481 value_line_style=value_line_style, 

2482 forced_value_line_style=forced_value_line_style, 

2483 private=private, 

2484 in_library=[creator_id], 

2485 active=[creator_id], 

2486 ) 

2487 

2488 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2489 color_setting.id = str(new_color_setting.inserted_id) 

2490 return color_setting 

2491 

2492 def update(self, update_dict: dict, user_id: str): 

2493 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2494 if in_user_lib and user_id not in self.in_library: 

2495 self.in_library.append(user_id) 

2496 elif not in_user_lib and user_id in self.in_library: 

2497 self.in_library.remove(user_id) 

2498 update_dict["in_library"] = self.in_library 

2499 del update_dict["in_user_library"] 

2500 

2501 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2502 if active_for_user and user_id not in self.active: 

2503 self.active.append(user_id) 

2504 elif not active_for_user and user_id in self.active: 

2505 self.active.remove(user_id) 

2506 update_dict["active"] = self.active 

2507 del update_dict["active_for_user"] 

2508 

2509 if update_dict.get("created_by_user") is not None: 

2510 del update_dict["created_by_user"] 

2511 

2512 self.collection().find_one_and_update( 

2513 {"_id": ObjectId(self.id)}, 

2514 {"$set": update_dict}, 

2515 ) 

2516 

2517 return {} 

2518 

2519 

2520class DeviceStatus(str, Enum): 

2521 started = "started" 

2522 running = "running" 

2523 created = "created" 

2524 exited = "exited" 

2525 restarting = "restarting" 

2526 

2527 

2528class DeviceUpdateFromDeployer(BaseModel): 

2529 status: DeviceStatus 

2530 

2531 

2532class DeviceFromDeployerCreation(BaseModel): 

2533 name: str 

2534 description: str 

2535 

2536 

2537class DeviceFromDeployer(DeviceFromDeployerCreation): 

2538 status: DeviceStatus 

2539 device_id: DeviceId 

2540 logs: str = "" 

2541 

2542 

2543class DeviceDeployer(GenericMongo): 

2544 collection_name: ClassVar[str] = "device_deployers" 

2545 url: HttpUrl 

2546 

2547 def endpoint_url(self, endpoint): 

2548 return f"{str(self.url).rstrip('/')}/{endpoint}" 

2549 

2550 def devices(self) -> list[DeviceFromDeployer]: 

2551 devices = [] 

2552 try: 

2553 response = requests.get(self.endpoint_url("devices")) 

2554 except requests.exceptions.ConnectionError: 

2555 logger.info("connection error") 

2556 return None 

2557 if response.status_code != 200: 

2558 return None 

2559 for device_dict in response.json()["devices"]: 

2560 devices.append( 

2561 DeviceFromDeployer( 

2562 device_id=device_dict["device_id"], 

2563 name=device_dict["container_name"], 

2564 description="desc", 

2565 status=device_dict["status"], 

2566 logs=device_dict["logs"], 

2567 ) 

2568 ) 

2569 return devices 

2570 

2571 def get_device(self, device_id: DeviceId): 

2572 try: 

2573 response = requests.get(self.endpoint_url(f"devices/{device_id}")) 

2574 except requests.exceptions.ConnectionError: 

2575 return None 

2576 if response.status_code != 200: 

2577 return None 

2578 device_dict = response.json() 

2579 return DeviceFromDeployer( 

2580 device_id=device_dict["device_id"], 

2581 name=device_dict["container_name"], 

2582 description="desc", 

2583 status=device_dict["status"], 

2584 logs=device_dict["logs"], 

2585 ) 

2586 

2587 def create_device(self, device: DeviceFromDeployer) -> Device | None: 

2588 try: 

2589 response = requests.post(self.endpoint_url("devices"), json={"name": device.name}) 

2590 except requests.exceptions.ConnectionError: 

2591 return None 

2592 

2593 if response.status_code != 201: 

2594 return None 

2595 

2596 device_dict = response.json() 

2597 return DeviceFromDeployer( 

2598 device_id=device_dict["device_id"], 

2599 name="", 

2600 description="desc", 

2601 status=device_dict["status"], 

2602 ) 

2603 

2604 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None: 

2605 try: 

2606 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump()) 

2607 except requests.exceptions.ConnectionError: 

2608 return None 

2609 

2610 if response.status_code != 200: 

2611 return None 

2612 

2613 device_dict = response.json() 

2614 return Device( 

2615 device_id=device_dict["device_id"], 

2616 name="", 

2617 description="desc", 

2618 pid={}, 

2619 petri_network={}, 

2620 modes=[], 

2621 status=device_dict["status"], 

2622 ) 

2623 

2624 def delete_device(self, device_id: DeviceId) -> DeleteInfo: 

2625 try: 

2626 response = requests.delete(self.endpoint_url(f"devices/{device_id}")) 

2627 except requests.exceptions.ConnectionError: 

2628 return DeleteInfo(is_deleted=False, detail="Connection to deployer error") 

2629 if response.status_code not in [200, 202, 204]: 

2630 return DeleteInfo(is_deleted=False, detail=response.text) 

2631 

2632 return DeleteInfo(is_deleted=True, detail="") 

2633 

2634 

2635DeviceDeployerUpdate = create_update_model(DeviceDeployer)