Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%

1367 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-15 16:34 +0000

1from functools import cached_property 

2import os 

3import re 

4import io 

5import time 

6import csv 

7from typing import Self, ClassVar, Any, Literal, get_args, Annotated 

8import datetime 

9import math 

10import bisect 

11from enum import Enum 

12import logging 

13import copy 

14import asyncio 

15import requests 

16 

17import zipfile 

18import ping3 

19import pytz 

20from bson.objectid import ObjectId 

21from pymongo import ASCENDING, ReturnDocument 

22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator 

23import numpy as npy 

24import lttb 

25import h5py 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 signals_async_database, 

36 devices_states_database, 

37) 

38from twinpad_backend.responses import ListResponse 

39from twinpad_backend.messages import send_mode_change, send_signal_value 

40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm 

41 

42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"] 

44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"] 

45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"] 

46 

47 

48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

53 

54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

55NUMBER_SAMPLES_DATABASE_UPDATE = 120 

56 

57logger = logging.getLogger("uvicorn.error") 

58 

59 

60class DeleteInfo(BaseModel): 

61 is_deleted: bool 

62 detail: str 

63 

64 

65class classproperty: 

66 """ 

67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

68 Found here: https://stackoverflow.com/a/76301341 

69 """ 

70 

71 def __init__(self, func): 

72 self.fget = func 

73 

74 def __get__(self, _, owner): 

75 return self.fget(owner) 

76 

77 

78def create_update_model(model): 

79 fields = {} 

80 

81 for field_name, field_annotation in model.model_fields.items(): 

82 if field_name != "id": 

83 fields[field_name] = (field_annotation.annotation | None, None) 

84 

85 query_name = model.__name__ + "Update" 

86 return create_model(query_name, **fields) 

87 

88 

89def get_utc_date_from_timestamp(timestamp: float): 

90 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

91 

92 

93def downsample_list(time_vector: list, values: list, max_number_samples: int): 

94 if len(time_vector) < max_number_samples: 

95 return time_vector, values 

96 

97 time_vector_copy = copy.deepcopy(time_vector) 

98 values_copy = copy.deepcopy(values) 

99 

100 none_group_bounds = [] 

101 none_group_index = -1 

102 index = -1 

103 # LTTB doesn't handle None values so remove them 

104 while values_copy.count(None) > 0: 

105 # Store bounds of None value groups so we can insert them back after the downsampling 

106 if (new_index := values_copy.index(None)) != index: 

107 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

108 none_group_index += 1 

109 elif len(none_group_bounds[none_group_index]) < 2: 

110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

111 else: 

112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

113 values_copy.pop(new_index) 

114 index = new_index 

115 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

116 

117 try: 

118 values_array = npy.array([time_vector_copy, values_copy]).T 

119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

120 

121 new_time_vector = interpolated_values[:, 0].tolist() 

122 new_values = interpolated_values[:, 1].tolist() 

123 except ValueError: 

124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

128 return new_time_vector, new_values_nan_to_none 

129 

130 # insert back None values at the correct timestamps 

131 for none_group in none_group_bounds: 

132 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

133 new_time_vector[start_index:start_index] = none_group 

134 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

135 

136 return new_time_vector, new_values 

137 

138 

139def is_of_type(value, wanted_type): 

140 if wanted_type is float: 

141 return isinstance(value, (int, float)) 

142 return isinstance(value, wanted_type) 

143 

144 

145# Models 

146class TwinPadModel(BaseModel): 

147 @classmethod 

148 def dict_to_object(cls, dict_): 

149 return cls.model_validate(dict_) 

150 

151 def to_dict(self, exclude=None): 

152 dict_ = self.model_dump(exclude=exclude, mode="json") 

153 return dict_ 

154 

155 

156def validate_mongo_id(v): 

157 if not ObjectId.is_valid(v): 

158 raise ValueError("Invalid MongoDB id") 

159 return str(v) 

160 

161 

162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)] 

163 

164 

165def validate_12_hex(v: str) -> str: 

166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v): 

167 raise ValueError("ID must be a 12-character hexadecimal string") 

168 return v 

169 

170 

171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)] 

172 

173 

174class GenericMongo(TwinPadModel): 

175 id: MongoId | None = None 

176 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

177 

178 @classmethod 

179 def collection(cls): 

180 return get_collection(systems_database, cls.collection_name, create=True) 

181 

182 @classmethod 

183 def response_from_query(cls, query) -> ListResponse[Self]: 

184 request_filters = query.mongodb_filter() 

185 items = [] 

186 

187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

188 sort_dict = {} 

189 for sort in query.sort_by.split(","): 

190 if ":" in sort: 

191 sort_field, sort_order = sort.split(":") 

192 sort_order = int(sort_order) 

193 else: 

194 sort_field = sort 

195 sort_order = 1 

196 sort_dict[sort_field] = sort_order 

197 

198 collection = get_collection(systems_database, cls.collection_name, create=True) 

199 total = collection.count_documents(request_filters) 

200 

201 pipeline = [] 

202 added_properties = [] 

203 if "$and" in request_filters: 

204 for request_filter in request_filters["$and"]: 

205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

206 if filtered_property in request_filter: 

207 pipeline.extend(pipeline_steps) 

208 added_properties.append(filtered_property) 

209 else: 

210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

211 if filtered_property in request_filters: 

212 pipeline.extend(pipeline_steps) 

213 added_properties.append(filtered_property) 

214 pipeline.append({"$match": request_filters}) 

215 

216 for sort_field in sort_dict.keys(): 

217 if sort_field in cls.custom_pipeline_steps: 

218 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

219 added_properties.append(sort_field) 

220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

221 

222 if (query.limit is not None) and (query.limit != 0): 

223 pipeline.append({"$limit": query.limit}) 

224 

225 for filtered_property, step in cls.custom_pipeline_steps.items(): 

226 if filtered_property not in added_properties: 

227 pipeline.extend(step) 

228 

229 cursor = collection.aggregate(pipeline) 

230 

231 for item_dict in cursor: 

232 items.append(cls.mongo_dict_to_object(item_dict)) 

233 

234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

235 

236 @classmethod 

237 def get_from_id(cls, item_id) -> Self | None: 

238 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

239 

240 @classmethod 

241 def mongo_dict_to_object(cls, mongo_dict): 

242 mongo_dict["id"] = str(mongo_dict["_id"]) 

243 del mongo_dict["_id"] 

244 return cls.dict_to_object(mongo_dict) 

245 

246 @classmethod 

247 def get_by_attribute(cls, attribute_name: str, attribute_value): 

248 """Returns all items that match the attribute with value.""" 

249 pipeline = [] 

250 if attribute_name in cls.custom_pipeline_steps: 

251 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

252 pipeline.append({"$match": {attribute_name: attribute_value}}) 

253 for key, step in cls.custom_pipeline_steps.items(): 

254 if key != attribute_name: 

255 pipeline.extend(step) 

256 items = cls.collection().aggregate(pipeline) 

257 if items is None: 

258 return None 

259 return [cls.mongo_dict_to_object(d) for d in items] 

260 

261 @classmethod 

262 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

263 pipeline = [] 

264 if attribute_name in cls.custom_pipeline_steps: 

265 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

266 pipeline.append({"$match": {attribute_name: attribute_value}}) 

267 pipeline.append({"$limit": 1}) 

268 for key, step in cls.custom_pipeline_steps.items(): 

269 if key != attribute_name: 

270 pipeline.extend(step) 

271 items = cls.collection().aggregate(pipeline).to_list() 

272 if len(items) == 0: 

273 return None 

274 return cls.mongo_dict_to_object(items[0]) 

275 

276 @classmethod 

277 def get_all(cls, sort_by="_id") -> list[Self]: 

278 items = [] 

279 pipeline = [] 

280 if sort_by in cls.custom_pipeline_steps: 

281 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

282 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

283 for key, step in cls.custom_pipeline_steps.items(): 

284 if key != sort_by: 

285 pipeline.extend(step) 

286 for dict_ in cls.collection().aggregate(pipeline): 

287 items.append(cls.mongo_dict_to_object(dict_)) 

288 return items 

289 

290 @classmethod 

291 def get_number_documents(cls): 

292 collection = get_collection(systems_database, cls.collection_name) 

293 if collection is None: 

294 return 0 

295 return collection.count_documents( 

296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]} 

297 ) 

298 

299 def insert(self): 

300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

301 self.id = str(insert_result.inserted_id) 

302 return self.id 

303 

304 def update(self, update_dict): 

305 for key, value in update_dict.items(): 

306 setattr(self, key, value) 

307 self.collection().find_one_and_update( 

308 {"_id": ObjectId(self.id)}, 

309 {"$set": update_dict}, 

310 return_document=ReturnDocument.AFTER, 

311 ) 

312 

313 return self 

314 

315 def delete(self): 

316 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

317 return result.deleted_count > 0 

318 

319 

320class User(GenericMongo): 

321 collection_name: ClassVar[str] = "users" 

322 

323 firstname: str 

324 lastname: str 

325 email: str 

326 password: str 

327 is_active: bool | None = False 

328 is_admin: bool | None = False 

329 is_connected: bool | None = False 

330 company_id: str | None = None 

331 

332 def to_dict(self, exclude=None): 

333 if exclude is None: 

334 exclude = {"password"} 

335 return GenericMongo.to_dict(self, exclude=exclude) 

336 

337 @classmethod 

338 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

339 users = cls.get_all() 

340 if not users: 

341 is_admin = True 

342 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

343 user_collection = get_collection(systems_database, "users", create=True) 

344 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

345 if new_user is None: 

346 return None 

347 return {"user_id": str(new_user.inserted_id)} 

348 

349 @classmethod 

350 def update_info(cls, user: "UserUpdate", user_id: str): 

351 updated_user = cls.collection().find_one_and_update( 

352 {"_id": ObjectId(user_id)}, 

353 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

354 return_document=ReturnDocument.AFTER, 

355 ) 

356 updated_user["id"] = str(updated_user["_id"]) 

357 del (updated_user["_id"], updated_user["is_connected"]) 

358 return cls(**updated_user) 

359 

360 

361UserUpdate = create_update_model(User) 

362 

363 

364class Mode(TwinPadModel): 

365 mode_id: int 

366 name: str 

367 frequency_multiplier: float 

368 min_frequency: float 

369 

370 

371class DeviceUpdate(TwinPadModel): 

372 mode_id: int 

373 

374 

375class Device(GenericMongo): 

376 collection_name: ClassVar[str] = "devices" 

377 

378 device_id: DeviceId 

379 name: str 

380 description: str = "" 

381 modes: list[Mode] 

382 current_mode_id: int | None = None 

383 last_ping: float | None = None 

384 petri_network: Any 

385 pid: Any 

386 load: float | None = None 

387 tokens: list[int] = Field(default_factory=list) 

388 status: str 

389 

390 async def change_mode(self, update_dict, current_user: User): 

391 has_error = False 

392 

393 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

394 has_error = True 

395 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

396 elif self.current_mode_id is not None: 

397 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

398 else: 

399 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

400 command = Command( 

401 sent_at=time.time(), 

402 command_type="Mode change", 

403 description=description, 

404 user_id=current_user.id, 

405 ) 

406 

407 if has_error: 

408 command.response_time = 0 

409 command.succeeded = False 

410 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

411 else: 

412 response = await send_mode_change(self.device_id, update_dict.mode_id) 

413 command.receive_response(response) 

414 

415 Command.create(command) 

416 return response 

417 

418 @classmethod 

419 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

420 devices_by_id = {} 

421 for signal_id in signal_ids: 

422 device_id = signal_id.split(".")[0] 

423 if device_id not in devices_by_id: 

424 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

425 return devices_by_id 

426 

427 

428class DeviceSetup(GenericMongo): 

429 collection_name: ClassVar[str] = "device_setups" 

430 

431 device_ids: list[str] 

432 active: bool = False 

433 variable_mapping: dict[str, str] 

434 

435 

436DeviceSetupUpdate = create_update_model(DeviceSetup) 

437 

438 

439class DeviceState(GenericMongo): 

440 collection_name: ClassVar[str] = "devices_states" 

441 

442 timestamp: float 

443 mode: str | None = None 

444 load: float | None = None 

445 tokens: list[int] = Field(default_factory=list) 

446 modified_properties: list[str] = Field(default_factory=list) 

447 

448 @classmethod 

449 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]: 

450 req_filter = query.mongodb_filter() 

451 items = [] 

452 if ":" in query.sort_by: 

453 sort_field, sort_order = query.sort_by.split(":") 

454 sort_order = int(sort_order) 

455 else: 

456 sort_field = query.sort_by 

457 sort_order = 1 

458 collection = get_collection(devices_states_database, device_id) 

459 if collection is None: 

460 total = 0 

461 cursor = [] 

462 else: 

463 total = collection.count_documents(req_filter) 

464 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

465 if (query.limit is not None) and (query.limit != 0): 

466 cursor = cursor.limit(query.limit) 

467 for item_dict in cursor: 

468 items.append( 

469 cls( 

470 timestamp=item_dict.get("precise_timestamp"), 

471 mode=item_dict.get("mode", None), 

472 load=item_dict.get("load", None), 

473 tokens=item_dict.get("tokens", Field(default_factory=list)), 

474 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

475 ) 

476 ) 

477 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

478 

479 

480class SignalSample(TwinPadModel): 

481 signal_id: str 

482 timestamp: float 

483 value: float | int | str | bool | None 

484 forced_value: float | int | str | bool | None = None 

485 

486 @classmethod 

487 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

488 

489 collection = get_signal_collection(signal_id) 

490 if collection is None: 

491 return None 

492 

493 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

494 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

495 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

496 first_bucket = None 

497 if bucket is not None: 

498 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

499 if first_bucket is not None: 

500 sample_data = collection.find_one( 

501 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

502 ) 

503 else: 

504 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

505 

506 if sample_data is None: 

507 return None 

508 

509 timestamp = sample_data["precise_timestamp"] 

510 

511 return cls( 

512 signal_id=signal_id, 

513 timestamp=timestamp, 

514 value=sample_data.get("value", None), 

515 forced_value=sample_data.get("forced_value", None), 

516 ) 

517 

518 @classmethod 

519 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

520 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

521 

522 @classmethod 

523 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

524 collection = get_signal_collection(signal_id) 

525 if collection is None: 

526 return None 

527 

528 # Same workaround as above function, very effective to narrow down big sets of data 

529 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

530 last_bucket = None 

531 if bucket is not None: 

532 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

533 if last_bucket is not None: 

534 sample_data = collection.find_one( 

535 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

536 sort={"precise_timestamp": -1}, 

537 ) 

538 else: 

539 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

540 

541 if sample_data is None: 

542 return None 

543 

544 timestamp = sample_data["precise_timestamp"] 

545 

546 if device is None: 

547 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

548 if device is not None and device.last_ping is not None: 

549 if timestamp is None: 

550 timestamp = device.last_ping 

551 else: 

552 timestamp = max(timestamp, device.last_ping) 

553 return cls( 

554 signal_id=signal_id, 

555 timestamp=timestamp, 

556 value=sample_data.get("value", None), 

557 forced_value=sample_data.get("forced_value", None), 

558 ) 

559 

560 @classmethod 

561 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

562 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

563 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

564 

565 

566class SignalData(TwinPadModel): 

567 signal_id: str 

568 forcible: bool = True 

569 time_vector: list[float] 

570 values: list[float | int | str | None] 

571 forced_values: list[float | int | str | None] 

572 

573 data_start: float | None = None 

574 data_end: float | None = None 

575 

576 number_samples: int = 0 

577 number_samples_db: int = 0 

578 

579 db_query_time: float = 0.0 

580 init_time: float = 0.0 

581 data_processing_time: float = 0.0 

582 

583 phase_id: str | None = None 

584 

585 @classmethod 

586 def get_from_signal_id( 

587 cls, 

588 signal_id: str, 

589 min_timestamp: float = None, 

590 max_timestamp: float = None, 

591 window_min_timestamp: float = None, 

592 window_max_timestamp: float = None, 

593 interpolate_bounds: bool = True, 

594 max_documents: int = None, 

595 ) -> Self: 

596 

597 now = time.time() 

598 

599 req_signal = {} 

600 if min_timestamp is not None: 

601 req_signal.setdefault("timestamp", {}) 

602 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

603 if max_timestamp is not None: 

604 req_signal.setdefault("timestamp", {}) 

605 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

606 

607 collection = get_signal_collection(signal_id) 

608 if collection is None: 

609 return cls( 

610 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

611 ) 

612 

613 db_req_start = time.time() 

614 

615 sort_step = {"$sort": {"precise_timestamp": 1}} 

616 number_results = collection.count_documents(req_signal) 

617 

618 pipeline = [] 

619 if req_signal: 

620 pipeline.append({"$match": req_signal}) # Filter data if needed 

621 

622 pipeline.extend( 

623 [ 

624 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

625 sort_step, 

626 ] 

627 ) 

628 

629 if max_documents is not None and max_documents < number_results: 

630 unsampling_ratio = math.ceil(number_results / max_documents) 

631 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

632 pipeline.extend( 

633 [ 

634 { 

635 "$setWindowFields": { 

636 "sortBy": {"precise_timestamp": 1}, 

637 "output": {"index": {"$documentNumber": {}}}, 

638 } 

639 }, 

640 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

641 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

642 {"$replaceRoot": {"newRoot": "$doc"}}, 

643 {"$unset": ["index", "group_id"]}, 

644 {"$sort": {"precise_timestamp": 1}}, 

645 ] 

646 ) 

647 

648 # logger.info(f"pipeline: %s", str(pipeline)) 

649 cursor = collection.aggregate(pipeline) 

650 db_req_time = time.time() - db_req_start 

651 

652 init_time = time.time() 

653 

654 results = cursor.to_list() 

655 time_vector = [] 

656 values = [] 

657 forced_values = [] 

658 for s in results: 

659 time_vector.append(s["precise_timestamp"]) 

660 values.append(s.get("value", None)) 

661 forced_values.append(s.get("forced_value", None)) 

662 

663 signal = Signal.get_from_signal_id(signal_id) 

664 if signal is None: 

665 return cls( 

666 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

667 ) 

668 class_ = signal.signal_data_class 

669 

670 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

671 time_vector, values, forced_values = cls.interpolate_bounds( 

672 class_, 

673 collection, 

674 signal_id, 

675 time_vector, 

676 values, 

677 forced_values, 

678 window_min_timestamp, 

679 window_max_timestamp, 

680 ) 

681 

682 if values: 

683 # TODO: check below. a bit strange 

684 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

685 # Adding last value as it should be repeated 

686 time_vector.append(now) 

687 values.append(values[-1]) 

688 forced_values.append(forced_values[-1]) 

689 

690 init_time = time.time() - init_time 

691 

692 # See line 292 for explanation 

693 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

694 first_bucket = None 

695 if bucket is not None: 

696 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

697 if first_bucket is not None: 

698 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

699 else: 

700 data_start = None 

701 

702 last_bucket = None 

703 if bucket is not None: 

704 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

705 if last_bucket is not None: 

706 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

707 else: 

708 data_end = None 

709 

710 return class_( 

711 signal_id=signal_id, 

712 forcible=signal.forcible, 

713 time_vector=time_vector, 

714 values=values, 

715 forced_values=forced_values, 

716 data_start=data_start, 

717 data_end=data_end, 

718 number_samples=len(values), 

719 number_samples_db=number_results, 

720 db_query_time=db_req_time, 

721 init_time=init_time, 

722 ) 

723 

724 @staticmethod 

725 def interpolate_bounds( 

726 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

727 ): 

728 sample_right = None 

729 # Fetching right side value & interpolation 

730 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

731 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

732 sample_right = collection.find_one( 

733 { 

734 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

735 "value": {"$exists": True}, 

736 }, 

737 sort={"precise_timestamp": -1}, 

738 ) 

739 if sample_right: 

740 if time_vector: 

741 right_sd = class_( 

742 signal_id=signal_id, 

743 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

744 values=[values[-1], sample_right.get("value", None)], 

745 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

746 ) 

747 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

748 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

749 else: 

750 max_ts_value = sample_right.get("value", None) 

751 max_ts_forced_value = sample_right.get("forced_value", None) 

752 time_vector.append(window_max_timestamp) 

753 values.append(max_ts_value) 

754 forced_values.append(max_ts_forced_value) 

755 

756 # Fetching left side value & interpolation 

757 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

758 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

759 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

760 sample_left = sample_right 

761 sample_left = collection.find_one( 

762 { 

763 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

764 "value": {"$exists": True}, 

765 }, 

766 sort={"precise_timestamp": -1}, 

767 ) 

768 

769 if sample_left: 

770 if time_vector: 

771 left_sd = class_( 

772 signal_id=signal_id, 

773 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

774 values=[sample_left["value"], values[0]], 

775 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

776 ) 

777 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

778 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

779 else: 

780 min_ts_value = sample_left.get("value", None) 

781 min_ts_forced_value = sample_left.get("forced_value", None) 

782 time_vector.insert(0, window_min_timestamp) 

783 values.insert(0, min_ts_value) 

784 forced_values.insert(0, min_ts_forced_value) 

785 

786 return time_vector, values, forced_values 

787 

788 def interpolate_values(self, new_time_vector: list[float]): 

789 return self.interpolate(new_time_vector, self.values) 

790 

791 def interpolate_forced_values(self, new_time_vector: list[float]): 

792 return self.interpolate(new_time_vector, self.forced_values) 

793 

794 def uniform_desampling(self, number_samples_max: int) -> Self: 

795 data_processing_time = time.time() 

796 if number_samples_max and self.number_samples > number_samples_max: 

797 new_time_vector = npy.linspace( 

798 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

799 ).tolist() 

800 values = self.interpolate_values(new_time_vector) 

801 forced_values = self.interpolate_forced_values(new_time_vector) 

802 time_vector = new_time_vector 

803 number_samples = len(time_vector) 

804 else: 

805 time_vector = self.time_vector 

806 number_samples = len(self.values) 

807 values = self.values[:] 

808 forced_values = self.forced_values[:] 

809 data_processing_time = time.time() - data_processing_time 

810 

811 return self.__class__( 

812 signal_id=self.signal_id, 

813 time_vector=time_vector, 

814 values=values, 

815 forced_values=forced_values, 

816 number_samples=number_samples, 

817 number_samples_db=self.number_samples, 

818 data_start=self.data_start, 

819 data_end=self.data_end, 

820 db_query_time=self.db_query_time, 

821 init_time=self.init_time, 

822 data_processing_time=self.data_processing_time + data_processing_time, 

823 phase_id=self.phase_id, 

824 ) 

825 

826 def min_max_downsampling(self, number_samples_max: int) -> Self: 

827 return self.uniform_desampling(number_samples_max) 

828 

829 def interest_window_desampling( 

830 self, 

831 window_max_number_samples: int, 

832 outside_max_number_samples: int, 

833 window_min_timestamp: float | None = None, 

834 window_max_timestamp: float | None = None, 

835 ) -> Self: 

836 """Performs a sampling in a window of interest and outside.""" 

837 

838 if not self.time_vector: 

839 return self 

840 

841 if window_min_timestamp is None: 

842 window_min_timestamp = self.time_vector[0] 

843 if window_max_timestamp is None: 

844 window_max_timestamp = self.time_vector[-1] 

845 

846 data_processing_time = time.time() 

847 

848 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

849 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

850 

851 time_vector_before = self.time_vector[:index_window_start] 

852 time_vector_window = self.time_vector[index_window_start:index_window_end] 

853 time_vector_after = self.time_vector[index_window_end:] 

854 

855 # Resampling window 

856 if time_vector_window: 

857 # Ensurring window bounds 

858 if time_vector_window[0] != window_min_timestamp: 

859 time_vector_window.insert(0, window_min_timestamp) 

860 if time_vector_window[-1] != window_max_timestamp: 

861 time_vector_window.append(window_max_timestamp) 

862 else: 

863 time_vector_window = [window_min_timestamp, window_max_timestamp] 

864 

865 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

866 # Resampling 

867 new_window_time_vector = npy.linspace( 

868 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

869 ).tolist() 

870 time_vector_window = new_window_time_vector 

871 

872 # Resampling outside 

873 number_samples_before = len(time_vector_before) 

874 number_samples_after = len(time_vector_after) 

875 if ( 

876 outside_max_number_samples is not None 

877 and (number_samples_before + number_samples_after) > outside_max_number_samples 

878 ): 

879 new_number_samples_before = min( 

880 number_samples_before, 

881 math.ceil( 

882 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

883 ), 

884 ) 

885 new_number_samples_after = min( 

886 number_samples_after, 

887 math.ceil( 

888 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

889 ), 

890 ) 

891 # Adjusting numbers as math.ceil can do +1 on sum 

892 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

893 if new_number_samples_before > new_number_samples_after: 

894 new_number_samples_before -= 1 

895 else: 

896 new_number_samples_after -= 1 

897 

898 if new_number_samples_before > 0: 

899 new_time_vector_before = npy.linspace( 

900 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

901 ).tolist() 

902 time_vector_before = new_time_vector_before 

903 

904 if new_number_samples_after > 0: 

905 new_time_vector_after = npy.linspace( 

906 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

907 ).tolist()[::-1] 

908 time_vector_after = new_time_vector_after 

909 

910 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

911 values = self.interpolate_values(new_time_vector) 

912 forced_values = self.interpolate_forced_values(new_time_vector) 

913 number_samples = len(values) 

914 

915 data_processing_time = time.time() - data_processing_time 

916 

917 return self.__class__( 

918 signal_id=self.signal_id, 

919 forcible=self.forcible, 

920 time_vector=new_time_vector, 

921 values=values, 

922 forced_values=forced_values, 

923 number_samples=number_samples, 

924 number_samples_db=self.number_samples, 

925 data_start=self.data_start, 

926 data_end=self.data_end, 

927 db_query_time=self.db_query_time, 

928 init_time=self.init_time, 

929 data_processing_time=self.data_processing_time + data_processing_time, 

930 ) 

931 

932 def zero_time_vector(self, data_start: float): 

933 data_processing_time = time.time() 

934 if len(self.time_vector) == 0: 

935 return self 

936 time_vector = npy.array(self.time_vector) - data_start 

937 data_processing_time = time.time() - data_processing_time 

938 

939 return self.__class__( 

940 signal_id=self.signal_id, 

941 time_vector=time_vector, 

942 values=self.values, 

943 forced_values=self.forced_values, 

944 number_samples=self.number_samples, 

945 number_samples_db=self.number_samples_db, 

946 data_start=time_vector[0], 

947 data_end=time_vector[-1], 

948 db_query_time=self.db_query_time, 

949 init_time=self.init_time, 

950 data_processing_time=self.data_processing_time + data_processing_time, 

951 ) 

952 

953 def csv_export(self): 

954 output = io.StringIO() 

955 writer = csv.writer(output) 

956 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

957 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

958 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

959 return output.getvalue().encode("utf-8") 

960 

961 def prestoplot_export(self): 

962 clean_signal_id = self.signal_id.replace(".", "_") 

963 if clean_signal_id[0].isnumeric(): 

964 clean_signal_id = "_" + clean_signal_id 

965 

966 output = io.StringIO() 

967 output.write("# Encoding:\tUTF-8\n") 

968 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

969 output.write("ISO8601\tnone\tnone\n") 

970 output.write(f"# Description :\t{clean_signal_id}\n") 

971 

972 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

973 output.write( 

974 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

975 ) 

976 return output.getvalue().encode("utf-8") 

977 

978 

979class NumericSignalData(SignalData): 

980 data_type: str = "float" 

981 values: list[float | int | None] 

982 forced_values: list[float | int | None] 

983 

984 def interpolate(self, new_time_vector: list[float], items): 

985 items = [npy.nan if s is None else s for s in items] 

986 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

987 

988 def uniform_desampling(self, number_samples_max: int) -> Self: 

989 data_processing_time = time.time() 

990 if number_samples_max and self.number_samples > number_samples_max: 

991 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

992 forced_values = self.interpolate_forced_values(time_vector) 

993 number_samples = len(time_vector) 

994 else: 

995 time_vector = self.time_vector 

996 number_samples = len(self.values) 

997 values = self.values[:] 

998 forced_values = self.forced_values[:] 

999 data_processing_time = time.time() - data_processing_time 

1000 

1001 return self.__class__( 

1002 signal_id=self.signal_id, 

1003 time_vector=time_vector, 

1004 values=values, 

1005 forced_values=forced_values, 

1006 number_samples=number_samples, 

1007 number_samples_db=self.number_samples, 

1008 data_start=self.data_start, 

1009 data_end=self.data_end, 

1010 db_query_time=self.db_query_time, 

1011 init_time=self.init_time, 

1012 data_processing_time=self.data_processing_time + data_processing_time, 

1013 ) 

1014 

1015 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1016 if self.number_samples < number_samples_max: 

1017 return self 

1018 

1019 data_processing_time = time.time() 

1020 

1021 number_bins = number_samples_max // 2 

1022 

1023 time_vector = npy.array(self.time_vector, dtype=npy.float64) 

1024 values = npy.array(self.values, dtype=npy.float64) 

1025 forced_values = npy.array(self.forced_values, dtype=npy.float64) 

1026 

1027 points_per_bin = self.number_samples // number_bins 

1028 

1029 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above 

1030 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for 

1031 if self.number_samples - number_bins * points_per_bin > 1: 

1032 points_per_bin += 1 

1033 number_bins = self.number_samples // points_per_bin + 1 

1034 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan) 

1035 time_vector = npy.concatenate([time_vector, nan_points_to_add]) 

1036 values = npy.concatenate([values, nan_points_to_add]) 

1037 forced_values = npy.concatenate([forced_values, nan_points_to_add]) 

1038 

1039 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin) 

1040 values_matrix = values.reshape(number_bins, points_per_bin) 

1041 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin) 

1042 

1043 indexes_min = npy.zeros(number_bins, dtype="int64") 

1044 indexes_max = npy.zeros(number_bins, dtype="int64") 

1045 

1046 for row in range(number_bins): 

1047 min_value = values_matrix[row, 0] 

1048 max_value = values_matrix[row, 0] 

1049 for column in range(points_per_bin): 

1050 if values_matrix[row, column] < min_value: 

1051 min_value = values_matrix[row, column] 

1052 indexes_min[row] = column 

1053 elif values_matrix[row, column] > max_value: 

1054 max_value = values_matrix[row, column] 

1055 indexes_max[row] = column 

1056 

1057 row_index = npy.repeat(npy.arange(number_bins), 2) 

1058 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel() 

1059 

1060 data_processing_time = time.time() - data_processing_time 

1061 

1062 new_time_vector = timestamps_matrix[row_index, column_index] 

1063 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector) 

1064 new_values = values_matrix[row_index, column_index] 

1065 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1066 new_forced_values = forced_values_matrix[row_index, column_index] 

1067 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1068 

1069 # Make sure there are no None values for the time vector 

1070 time_vector_filter = new_time_vector != None 

1071 new_time_vector = new_time_vector[time_vector_filter] 

1072 new_values = new_values[time_vector_filter] 

1073 new_forced_values = new_forced_values[time_vector_filter] 

1074 

1075 return self.__class__( 

1076 signal_id=self.signal_id, 

1077 time_vector=new_time_vector, 

1078 values=new_values, 

1079 forced_values=new_forced_values, 

1080 number_samples=number_bins * 2, 

1081 number_samples_db=self.number_samples_db, 

1082 data_start=self.data_start, 

1083 data_end=self.data_end, 

1084 db_query_time=self.db_query_time, 

1085 init_time=self.init_time, 

1086 data_processing_time=self.data_processing_time + data_processing_time, 

1087 phase_id=self.phase_id, 

1088 ) 

1089 

1090 def interest_window_desampling( 

1091 self, 

1092 window_max_number_samples: int, 

1093 outside_max_number_samples: int, 

1094 window_min_timestamp: float | None = None, 

1095 window_max_timestamp: float | None = None, 

1096 ) -> Self: 

1097 """Performs a sampling in a window of interest and outside.""" 

1098 

1099 if not self.time_vector: 

1100 return self 

1101 

1102 if window_min_timestamp is None: 

1103 window_min_timestamp = self.time_vector[0] 

1104 if window_max_timestamp is None: 

1105 window_max_timestamp = self.time_vector[-1] 

1106 

1107 data_processing_time = time.time() 

1108 

1109 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1110 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1111 

1112 time_vector_before = self.time_vector[:index_window_start] 

1113 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1114 time_vector_after = self.time_vector[index_window_end:] 

1115 

1116 values_before = self.values[:index_window_start] 

1117 values_window = self.values[index_window_start:index_window_end] 

1118 values_after = self.values[index_window_end:] 

1119 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1120 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1121 

1122 # Resampling window 

1123 if time_vector_window: 

1124 # Ensurring window bounds 

1125 if time_vector_window[0] != window_min_timestamp: 

1126 time_vector_window.insert(0, window_min_timestamp) 

1127 values_window.insert(0, window_min_value) 

1128 if time_vector_window[-1] != window_max_timestamp: 

1129 time_vector_window.append(window_max_timestamp) 

1130 values_window.append(window_max_value) 

1131 else: 

1132 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1133 values_window = [window_min_value, window_max_value] 

1134 

1135 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

1136 # Resampling 

1137 time_vector_window, values_window = downsample_list( 

1138 time_vector_window, values_window, window_max_number_samples 

1139 ) 

1140 

1141 # Resampling outside 

1142 number_samples_before = len(time_vector_before) 

1143 number_samples_after = len(time_vector_after) 

1144 if ( 

1145 outside_max_number_samples is not None 

1146 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1147 ): 

1148 new_number_samples_before = min( 

1149 number_samples_before, 

1150 math.ceil( 

1151 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1152 ), 

1153 ) 

1154 new_number_samples_after = min( 

1155 number_samples_after, 

1156 math.ceil( 

1157 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1158 ), 

1159 ) 

1160 # Adjusting numbers as math.ceil can do +1 on sum 

1161 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1162 if new_number_samples_before > new_number_samples_after: 

1163 new_number_samples_before -= 1 

1164 else: 

1165 new_number_samples_after -= 1 

1166 

1167 if new_number_samples_before > 0: 

1168 time_vector_before, values_before = downsample_list( 

1169 time_vector_before, values_before, new_number_samples_before 

1170 ) 

1171 

1172 if new_number_samples_after > 0: 

1173 time_vector_after, values_after = downsample_list( 

1174 time_vector_after, values_after, new_number_samples_after 

1175 ) 

1176 

1177 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1178 values = values_before + values_window + values_after 

1179 forced_values = self.interpolate_forced_values(new_time_vector) 

1180 number_samples = len(values) 

1181 

1182 data_processing_time = time.time() - data_processing_time 

1183 

1184 return self.__class__( 

1185 signal_id=self.signal_id, 

1186 time_vector=new_time_vector, 

1187 values=values, 

1188 forced_values=forced_values, 

1189 number_samples=number_samples, 

1190 number_samples_db=self.number_samples, 

1191 data_start=self.data_start, 

1192 data_end=self.data_end, 

1193 db_query_time=self.db_query_time, 

1194 init_time=self.init_time, 

1195 data_processing_time=self.data_processing_time + data_processing_time, 

1196 ) 

1197 

1198 

1199class StringSignalData(SignalData): 

1200 data_type: str = "str" 

1201 values: list[str | None] 

1202 forced_values: list[str | None] 

1203 

1204 def interpolate(self, new_time_vector: list[float], items): 

1205 # Find the indices of the values in xp that are just smaller or equal to x 

1206 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1207 indices = npy.clip(indices, 0, len(items) - 1) 

1208 # Return the corresponding left string values from fp 

1209 return [items[i] for i in indices] 

1210 

1211 

1212class SignalsData(TwinPadModel): 

1213 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1214 data_processing_time: float 

1215 data_start: float | None 

1216 data_end: float | None 

1217 

1218 @classmethod 

1219 def get_from_signal_ids( 

1220 cls, 

1221 signal_ids: list[str], 

1222 min_timestamp: float = None, 

1223 max_timestamp: float = None, 

1224 window_min_timestamp: float = None, 

1225 window_max_timestamp: float = None, 

1226 interpolate_bounds: bool = True, 

1227 max_documents: int = None, 

1228 ) -> Self: 

1229 signals_data = [] 

1230 data_start = None 

1231 data_end = None 

1232 if max_timestamp is None: 

1233 max_timestamp = time.time() 

1234 data_processing_time = 0.0 

1235 for signal_id in signal_ids: 

1236 signal_data = SignalData.get_from_signal_id( 

1237 signal_id=signal_id, 

1238 min_timestamp=min_timestamp, 

1239 max_timestamp=max_timestamp, 

1240 window_min_timestamp=window_min_timestamp, 

1241 window_max_timestamp=window_max_timestamp, 

1242 interpolate_bounds=interpolate_bounds, 

1243 max_documents=max_documents, 

1244 ) 

1245 data_processing_time += signal_data.data_processing_time 

1246 signals_data.append(signal_data) 

1247 if signal_data.data_start is not None: 

1248 if data_start is None: 

1249 data_start = signal_data.data_start 

1250 else: 

1251 data_start = min(signal_data.data_start, data_start) 

1252 if signal_data.data_end is not None: 

1253 if data_end is None: 

1254 data_end = signal_data.data_end 

1255 else: 

1256 data_end = max(signal_data.data_end, data_end) 

1257 

1258 return cls( 

1259 signals_data=signals_data, 

1260 data_processing_time=data_processing_time, 

1261 data_start=data_start, 

1262 data_end=data_end, 

1263 ) 

1264 

1265 @classmethod 

1266 def get_from_phase_and_signal_ids( 

1267 cls, 

1268 phases: list, 

1269 phase_sync_times: list[float | None], 

1270 signal_ids: list[str], 

1271 window_min_timestamps: list[float | None], 

1272 window_max_timestamps: list[float | None], 

1273 zero_time_vector: bool = True, 

1274 ): 

1275 signals_data: list[SignalData] = [] 

1276 computation_start = time.time() 

1277 

1278 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip( 

1279 phases, phase_sync_times, window_min_timestamps, window_max_timestamps 

1280 ): 

1281 min_timestamp = phase.start_at / 1000 

1282 max_timestamp = phase.end_at / 1000 

1283 

1284 if sync_time is None: 

1285 sync_time = min_timestamp 

1286 

1287 if window_max_timestamp is not None and window_min_timestamp is not None: 

1288 window_length = window_max_timestamp - window_min_timestamp 

1289 

1290 if window_min_timestamp != min_timestamp: 

1291 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20) 

1292 if window_max_timestamp != max_timestamp: 

1293 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20) 

1294 

1295 for signal_id in signal_ids: 

1296 signal_data = SignalData.get_from_signal_id( 

1297 signal_id, 

1298 min_timestamp, 

1299 max_timestamp, 

1300 window_min_timestamp, 

1301 window_max_timestamp, 

1302 interpolate_bounds=False, 

1303 max_documents=None, 

1304 ) 

1305 

1306 if len(signal_data.time_vector) == 0: 

1307 continue 

1308 

1309 if zero_time_vector: 

1310 signal_data = signal_data.zero_time_vector(sync_time) 

1311 signal_data.phase_id = phase.id 

1312 

1313 signals_data.append(signal_data) 

1314 

1315 return cls( 

1316 signals_data=signals_data, 

1317 data_processing_time=time.time() - computation_start, 

1318 data_start=0, 

1319 data_end=0, 

1320 ) 

1321 

1322 def uniform_desampling(self, number_samples_max: int) -> Self: 

1323 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1324 return SignalsData( 

1325 signals_data=signals_data, 

1326 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1327 data_start=self.data_start, 

1328 data_end=self.data_end, 

1329 ) 

1330 

1331 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1332 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1333 return SignalsData( 

1334 signals_data=signals_data, 

1335 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1336 data_start=self.data_start, 

1337 data_end=self.data_end, 

1338 ) 

1339 

1340 def interest_window_desampling( 

1341 self, 

1342 window_max_number_samples: int, 

1343 outside_max_number_samples: int, 

1344 window_min_timestamp: float = None, 

1345 window_max_timestamp: float = None, 

1346 ) -> Self: 

1347 signals_data = [ 

1348 s.interest_window_desampling( 

1349 window_max_number_samples=window_max_number_samples, 

1350 outside_max_number_samples=outside_max_number_samples, 

1351 window_min_timestamp=window_min_timestamp, 

1352 window_max_timestamp=window_max_timestamp, 

1353 ) 

1354 for s in self.signals_data 

1355 ] 

1356 

1357 return SignalsData( 

1358 signals_data=signals_data, 

1359 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1360 data_start=self.data_start, 

1361 data_end=self.data_end, 

1362 ) 

1363 

1364 def zero_time_vector(self, data_start: float): 

1365 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data] 

1366 return SignalsData( 

1367 signals_data=signals_data, 

1368 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1369 data_start=0, 

1370 data_end=max([s.data_end for s in signals_data]), 

1371 ) 

1372 

1373 @classmethod 

1374 async def apply_single_function( 

1375 cls, 

1376 phase, 

1377 base_signal_id: str, 

1378 function: SINGLE_POST_PROCESSING_FUNCTION, 

1379 window_min_timestamp: float = None, 

1380 window_max_timestamp: float = None, 

1381 ): 

1382 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}" 

1383 

1384 processed_result_signal = Signal.get_from_signal_id(signal_id) 

1385 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids: 

1386 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp) 

1387 

1388 signals_data = cls.get_from_phase_and_signal_ids( 

1389 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False 

1390 ) 

1391 

1392 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0: 

1393 return None 

1394 

1395 new_values = None 

1396 new_forced_values = None 

1397 time_vector = npy.array(signals_data.signals_data[0].time_vector) 

1398 values = signals_data.signals_data[0].values 

1399 forced_values = signals_data.signals_data[0].forced_values 

1400 

1401 match (function): 

1402 case "Cumul": 

1403 new_values = cumul(values) 

1404 new_forced_values = cumul(forced_values) 

1405 # case "CumulDistrib": 

1406 # new_values = cumul_distrib(values) 

1407 # new_forced_values = cumul_distrib(forced_values) 

1408 case "Delta": 

1409 new_values = delta(values) 

1410 new_forced_values = delta(forced_values) 

1411 case "DeltaT": 

1412 new_values = delta(time_vector) 

1413 new_forced_values = new_values 

1414 case "Derive": 

1415 new_values = derive(time_vector, values) 

1416 new_forced_values = derive(time_vector, forced_values) 

1417 case "Integ": 

1418 new_values = integ(time_vector, values) 

1419 new_forced_values = integ(time_vector, forced_values) 

1420 

1421 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1422 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1423 

1424 loop = asyncio.get_running_loop() 

1425 loop.create_task( 

1426 cls.save_function_signal( 

1427 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible 

1428 ) 

1429 ) 

1430 

1431 if window_max_timestamp is not None: 

1432 max_timestamp_mask = time_vector <= window_max_timestamp 

1433 time_vector = time_vector[max_timestamp_mask] 

1434 new_values = new_values[max_timestamp_mask] 

1435 new_forced_values = new_forced_values[max_timestamp_mask] 

1436 if window_min_timestamp is not None: 

1437 min_timestamp_mask = time_vector >= window_min_timestamp 

1438 time_vector = time_vector[min_timestamp_mask] 

1439 new_values = new_values[min_timestamp_mask] 

1440 new_forced_values = new_forced_values[min_timestamp_mask] 

1441 

1442 signals_data.signals_data[0].time_vector = time_vector.tolist() 

1443 signals_data.signals_data[0].values = new_values.tolist() 

1444 signals_data.signals_data[0].forced_values = new_forced_values.tolist() 

1445 signals_data.signals_data[0].number_samples = time_vector.size 

1446 

1447 signals_data.signals_data[0].signal_id = signal_id 

1448 

1449 return signals_data 

1450 

1451 @classmethod 

1452 async def apply_multiple_function( 

1453 cls, 

1454 phases: list, 

1455 signal_ids: list, 

1456 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

1457 window_min_timestamp: float = None, 

1458 window_max_timestamp: float = None, 

1459 ): 

1460 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION): 

1461 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}" 

1462 else: 

1463 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}" 

1464 

1465 active_phase = phases[0] 

1466 if function in {"Align-X", "Using-X"}: 

1467 active_phase = phases[1] 

1468 

1469 processed_result_signal = Signal.get_from_signal_id(function_signal_id) 

1470 if processed_result_signal is not None and ( 

1471 active_phase.id in processed_result_signal.computed_phases_ids 

1472 ): # If signal has been computed for the correct phase 

1473 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp) 

1474 

1475 array_length = None 

1476 time_vector_list = [] 

1477 values_list = [] 

1478 forced_values_list = [] 

1479 forcible = True 

1480 for phase, signal_id in zip(phases, signal_ids): 

1481 signals_data = cls.get_from_phase_and_signal_ids( 

1482 [phase], [None], [signal_id], [None], [None], zero_time_vector=False 

1483 ) 

1484 

1485 if len(signals_data.signals_data) == 0: 

1486 return None 

1487 

1488 signal_data = signals_data.signals_data[0] 

1489 

1490 if array_length is None: 

1491 array_length = signal_data.number_samples 

1492 if ( 

1493 array_length != signal_data.number_samples and function != "Align-X" 

1494 ) or signal_data.number_samples == 0: 

1495 return None 

1496 

1497 time_vector_list.append(npy.array(signal_data.time_vector)) 

1498 values_list.append(npy.array(signal_data.values, dtype=npy.float64)) 

1499 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64)) 

1500 forcible = forcible and signal_data.forcible 

1501 

1502 time_vector = time_vector_list[0] 

1503 new_values = None 

1504 new_forced_values = None 

1505 

1506 match (function): 

1507 case "Align-X": 

1508 time_vector = time_vector_list[1] 

1509 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000 

1510 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000 

1511 new_values = align_x(old_time_vector, values_list[0], new_time_vector) 

1512 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector) 

1513 # case "Atan2": 

1514 # new_values = atan2(values_list[0], values_list[1]) 

1515 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1]) 

1516 case "Using-X": 

1517 if len(time_vector_list[0]) != len(time_vector_list[1]): 

1518 return None 

1519 time_vector = time_vector_list[1] 

1520 new_values = values_list[0] 

1521 new_forced_values = forced_values_list[0] 

1522 case "Mean": 

1523 new_values = mean(*values_list) 

1524 new_forced_values = mean(*forced_values_list) 

1525 case "Norm": 

1526 new_values = norm(*values_list) 

1527 new_forced_values = norm(*forced_values_list) 

1528 

1529 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1530 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1531 

1532 loop = asyncio.get_running_loop() 

1533 loop.create_task( 

1534 cls.save_function_signal( 

1535 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible 

1536 ) 

1537 ) 

1538 

1539 total_number_samples = time_vector.size 

1540 

1541 if window_max_timestamp is not None: 

1542 max_timestamp_mask = time_vector <= window_max_timestamp 

1543 time_vector = time_vector[max_timestamp_mask] 

1544 new_values = new_values[max_timestamp_mask] 

1545 new_forced_values = new_forced_values[max_timestamp_mask] 

1546 if window_min_timestamp is not None: 

1547 min_timestamp_mask = time_vector >= window_min_timestamp 

1548 time_vector = time_vector[min_timestamp_mask] 

1549 new_values = new_values[min_timestamp_mask] 

1550 new_forced_values = new_forced_values[min_timestamp_mask] 

1551 

1552 signals_data = cls( 

1553 signals_data=[ 

1554 NumericSignalData( 

1555 signal_id=function_signal_id, 

1556 forcible=forcible, 

1557 time_vector=time_vector.tolist(), 

1558 values=new_values.tolist(), 

1559 forced_values=new_forced_values.tolist(), 

1560 number_samples=time_vector.size, 

1561 number_samples_db=total_number_samples, 

1562 ) 

1563 ], 

1564 data_processing_time=0, 

1565 data_start=0, 

1566 data_end=0, 

1567 ) 

1568 

1569 return signals_data 

1570 

1571 @classmethod 

1572 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float): 

1573 signal_data_collection = get_signal_collection(signal_id, create=True) 

1574 pipeline = [] 

1575 match_filter = {} 

1576 if window_min_timestamp is not None or window_max_timestamp is not None: 

1577 match_filter["$match"] = {} 

1578 match_filter["$match"]["precise_timestamp"] = {} 

1579 if window_max_timestamp is not None: 

1580 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp 

1581 if window_min_timestamp is not None: 

1582 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp 

1583 

1584 total_number_samples = signal_data_collection.count_documents({}) 

1585 

1586 if match_filter: 

1587 pipeline.append(match_filter) 

1588 

1589 fetch_start = time.time() 

1590 

1591 samples = signal_data_collection.aggregate(pipeline).to_list() 

1592 new_time_vector = [] 

1593 new_values = [] 

1594 new_forced_values = [] 

1595 for sample in samples: 

1596 new_time_vector.append(sample["precise_timestamp"]) 

1597 new_values.append(sample["value"]) 

1598 new_forced_values.append(sample["forced_value"]) 

1599 

1600 return cls( 

1601 signals_data=[ 

1602 NumericSignalData( 

1603 signal_id=signal_id, 

1604 time_vector=new_time_vector, 

1605 values=new_values, 

1606 forced_values=new_forced_values, 

1607 number_samples=len(new_time_vector), 

1608 number_samples_db=total_number_samples, 

1609 ) 

1610 ], 

1611 data_processing_time=time.time() - fetch_start, 

1612 data_start=0, 

1613 data_end=0, 

1614 ) 

1615 

1616 @classmethod 

1617 async def save_function_signal( 

1618 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool 

1619 ): 

1620 # Insert data first so if it is requested by another user, it will be computed again 

1621 signal_collection = get_signal_collection(function_signal_id, create=True) 

1622 signal_collection.delete_many( 

1623 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}} 

1624 ) 

1625 signal_collection.insert_many( 

1626 [ 

1627 { 

1628 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]), 

1629 "precise_timestamp": time_vector[i], 

1630 "value": new_values[i], 

1631 "forced_value": new_forced_values[i], 

1632 } 

1633 for i in range(len(time_vector)) 

1634 ] 

1635 ) 

1636 

1637 signals_config_collection = get_collection(systems_database, "signals", create=True) 

1638 signals_config_collection.find_one_and_update( 

1639 {"signal_id": function_signal_id}, 

1640 { 

1641 "$set": { 

1642 "description": "", 

1643 "unit": None, 

1644 "type": "sensor", 

1645 "address": None, 

1646 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0 

1647 "transfer_function": None, 

1648 "precision_digits": None, 

1649 "digitization_function": None, 

1650 "data_type": "float", 

1651 "formula": None, 

1652 "forcible": forcible, 

1653 "commandable": False, 

1654 "broadcastable": False, 

1655 "signal_id": function_signal_id, 

1656 "post_processing": True, 

1657 }, 

1658 "$push": {"computed_phases_ids": phase.id}, 

1659 }, 

1660 upsert=True, 

1661 ) 

1662 

1663 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []): 

1664 if post_processing: 

1665 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1666 zip_buffer = io.BytesIO() 

1667 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1668 for signal_data in self.signals_data: 

1669 file_name = signal_data.signal_id 

1670 if post_processing: 

1671 phase = phases_by_id.get( 

1672 signal_data.phase_id, 

1673 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1674 ) 

1675 file_name = f"{signal_data.signal_id} ({phase.name})" 

1676 if file_format == "csv": 

1677 export_io = signal_data.csv_export() 

1678 zip_file.writestr(f"{file_name}.csv", export_io) 

1679 elif file_format == "prestoplot": 

1680 export_io = signal_data.prestoplot_export() 

1681 zip_file.writestr(f"{file_name}.tab", export_io) 

1682 else: 

1683 raise ValueError(f"Format not found. Got: {file_format}") 

1684 zip_bytes = zip_buffer.getvalue() 

1685 return zip_bytes 

1686 

1687 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []): 

1688 if post_processing: 

1689 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1690 hdf5_buffer = io.BytesIO() 

1691 custom_type_float = npy.dtype( 

1692 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1693 ) 

1694 custom_type_string = npy.dtype( 

1695 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1696 ) 

1697 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1698 for signal_data in self.signals_data: 

1699 if post_processing: 

1700 phase = phases_by_id.get( 

1701 signal_data.phase_id, 

1702 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"), 

1703 ) 

1704 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})") 

1705 else: 

1706 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1707 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1708 if signal_data.data_type == "str": 

1709 export_data = npy.array( 

1710 list( 

1711 zip( 

1712 date_vector, 

1713 signal_data.time_vector, 

1714 signal_data.values, 

1715 signal_data.forced_values, 

1716 ) 

1717 ), 

1718 dtype=custom_type_string, 

1719 ) 

1720 else: 

1721 export_data = npy.array( 

1722 list( 

1723 zip( 

1724 date_vector, 

1725 signal_data.time_vector, 

1726 signal_data.values, 

1727 signal_data.forced_values, 

1728 ) 

1729 ), 

1730 dtype=custom_type_float, 

1731 ) 

1732 signal_group["data"] = export_data 

1733 return hdf5_buffer.getvalue() 

1734 

1735 

1736class SignalStatus(TwinPadModel): 

1737 status: str = "down" 

1738 reason: str = "" 

1739 delay: float | None = None 

1740 

1741 

1742class DigitizationFunction(TwinPadModel): 

1743 bits: int | None = None 

1744 min_value: float 

1745 max_value: float 

1746 min_raw_value: float 

1747 max_raw_value: float 

1748 

1749 

1750class SignalUpdate(TwinPadModel): 

1751 value: float | str | bool | int | None = None 

1752 forced_value: float | str | bool | int | None = None 

1753 timestamp: int | None = None 

1754 

1755 

1756class SignalType(str, Enum): 

1757 command = "command" 

1758 sensor = "sensor" 

1759 external_sensor = "external_sensor" 

1760 

1761 

1762SIGNALDATA_TYPES = { 

1763 "int": NumericSignalData, 

1764 "float": NumericSignalData, 

1765 "str": StringSignalData, 

1766 "bool": NumericSignalData, 

1767 "epoch": NumericSignalData, 

1768} 

1769 

1770 

1771class Signal(GenericMongo): 

1772 collection_name: ClassVar[str] = "signals" 

1773 

1774 signal_id: str 

1775 frequency: float 

1776 unit: str | None 

1777 description: str 

1778 type: SignalType 

1779 data_type: str 

1780 precision_digits: int | None 

1781 forcible: bool 

1782 status: SignalStatus = SignalStatus() 

1783 

1784 post_processing: bool = False 

1785 computed_phases_ids: list[str] = [] 

1786 

1787 digitization_function: DigitizationFunction | None 

1788 

1789 @property 

1790 def device(self) -> Device: 

1791 device_id = self.signal_id.split(".")[0] 

1792 device = Device.get_one_by_attribute("device_id", device_id) 

1793 return device 

1794 

1795 @cached_property 

1796 def signal_data_class(self): 

1797 if self.data_type in SIGNALDATA_TYPES: 

1798 return SIGNALDATA_TYPES[self.data_type] 

1799 if self.data_type.startswith("enum"): 

1800 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1801 raise ValueError(f"Unhandled python type: {self.data_type}") 

1802 

1803 @cached_property 

1804 def python_type(self): 

1805 if self.data_type in TYPES: 

1806 return TYPES[self.data_type] 

1807 if self.data_type.startswith("enum"): 

1808 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1809 return Literal[*choices] 

1810 raise ValueError(f"Unhandled python type: {self.data_type}") 

1811 

1812 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1813 command = Command( 

1814 sent_at=time.time(), 

1815 command_type="Signal command", 

1816 user_id=current_user.id, 

1817 ) 

1818 

1819 has_input_error = False 

1820 error_message = "" 

1821 

1822 if self.data_type.startswith("enum"): 

1823 enum_options = get_args(self.python_type) 

1824 

1825 if update_dict.value is not None and update_dict.value not in enum_options: 

1826 has_input_error = True 

1827 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1828 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1829 has_input_error = True 

1830 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1831 else: 

1832 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1833 has_input_error = True 

1834 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1835 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1836 has_input_error = True 

1837 error_message += ( 

1838 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1839 ) 

1840 

1841 if has_input_error: 

1842 command.response_time = 0 

1843 command.succeeded = False 

1844 command.description = f"Tried to modify signal {self.signal_id}" 

1845 response = {"error": True, "status_code": 400, "message": error_message} 

1846 else: 

1847 response = await send_signal_value(self.signal_id, update_dict) 

1848 command.receive_response(response) 

1849 

1850 Command.create(command) 

1851 return response 

1852 

1853 @classmethod 

1854 def get_from_signal_id(cls, signal_id) -> Self: 

1855 """Could be generic from mongo""" 

1856 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1857 if not raw_value: 

1858 return None 

1859 del raw_value["_id"] 

1860 return cls.dict_to_object(raw_value) 

1861 

1862 @classmethod 

1863 def get_all_ids(cls) -> list[str]: 

1864 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1865 

1866 return [signal["signal_id"] for signal in cursor] 

1867 

1868 @classmethod 

1869 def get_all_statuses(cls) -> list[dict]: 

1870 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

1871 

1872 return [ 

1873 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1874 for signal in cursor 

1875 ] 

1876 

1877 async def number_samples(self): 

1878 collection = get_signal_collection(signal_id=self.signal_id) 

1879 if collection is None: 

1880 return 0 

1881 

1882 number_samples = collection.estimated_document_count() 

1883 

1884 number_samples_async_collection = await get_async_collection( 

1885 systems_async_database, "number_samples", create=True, time_series=True 

1886 ) 

1887 

1888 loop = asyncio.get_running_loop() 

1889 loop.create_task( 

1890 number_samples_async_collection.insert_one( 

1891 { 

1892 "timestamp": datetime.datetime.now(pytz.UTC), 

1893 "signal_id": self.signal_id, 

1894 "number_samples": number_samples, 

1895 } 

1896 ) 

1897 ) 

1898 

1899 return number_samples 

1900 

1901 @classmethod 

1902 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]: 

1903 number_samples_by_id = {} 

1904 collections = get_signal_collections_batch(signal_ids) 

1905 number_samples_async_collection = await get_async_collection( 

1906 systems_async_database, "number_samples", create=True, time_series=True 

1907 ) 

1908 

1909 for signal_id, collection in zip(signal_ids, collections): 

1910 if collection is None: 

1911 number_samples_by_id[signal_id] = 0 

1912 continue 

1913 

1914 number_samples = collection.estimated_document_count() 

1915 

1916 number_samples_by_id[signal_id] = number_samples 

1917 

1918 now = datetime.datetime.now(pytz.UTC) 

1919 loop = asyncio.get_running_loop() 

1920 loop.create_task( 

1921 number_samples_async_collection.insert_many( 

1922 [ 

1923 { 

1924 "timestamp": now, 

1925 "signal_id": signal_id, 

1926 "number_samples": number_samples, 

1927 } 

1928 for signal_id, number_samples in number_samples_by_id.items() 

1929 ] 

1930 ) 

1931 ) 

1932 

1933 return number_samples_by_id 

1934 

1935 def sample_datasize(self): 

1936 return signals_database.command("collstats", self.signal_id)["size"] 

1937 

1938 @classmethod 

1939 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1940 result = cls.collection().aggregate( 

1941 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1942 ) 

1943 

1944 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1945 

1946 

1947class ForcedSignal(GenericMongo): 

1948 collection_name: ClassVar[str] = "forced_signals" 

1949 

1950 signal_id: str 

1951 forcing_user_id: str 

1952 forced_at: float 

1953 value: str | float 

1954 

1955 def insert(self): 

1956 insert_result = self.collection().find_one_and_update( 

1957 {"signal_id": self.signal_id}, 

1958 {"$set": self.to_dict(exclude={"id"})}, 

1959 upsert=True, 

1960 return_document=ReturnDocument.AFTER, 

1961 ) 

1962 self.id = str(insert_result["_id"]) 

1963 return self.id 

1964 

1965 @classmethod 

1966 def can_force(cls, signal_id: str, current_user: User) -> bool: 

1967 """Checks whether user can force a given signal. 

1968 

1969 :param signal_id: Signal ID of the signal to force 

1970 :type signal_id: str 

1971 :param current_user: Current user 

1972 :type current_user: User 

1973 :return: False if the signal was forced by someone else than the user, True otherwise 

1974 :rtype: bool 

1975 """ 

1976 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

1977 if forced_signal is not None: 

1978 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

1979 return False 

1980 return True 

1981 

1982 

1983class ServicesStatus(TwinPadModel): 

1984 backend: str 

1985 cloud_broker: str 

1986 time_series_database: str 

1987 signal_storage: str 

1988 heartbeat_storage: str 

1989 data_analyzer: str 

1990 

1991 @classmethod 

1992 def check(cls) -> Self: 

1993 return cls( 

1994 cloud_broker=ping(RABBITMQ_HOST), 

1995 backend="up", 

1996 time_series_database=ping(MONGO_HOST), 

1997 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1998 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1999 data_analyzer=ping(DATA_ANALYZER_HOST), 

2000 ) 

2001 

2002 

2003def ping(host): 

2004 try: 

2005 if ping3.ping(host, timeout=0.8): 

2006 return "up" 

2007 except PermissionError: 

2008 pass 

2009 return "down" 

2010 

2011 

2012class Event(GenericMongo): 

2013 collection_name: ClassVar[str] = "events" 

2014 

2015 name: str 

2016 timestamp: float 

2017 event_rule_id: str 

2018 

2019 @computed_field 

2020 @cached_property 

2021 def event_rule(self) -> "EventRule": 

2022 return EventRule.get_from_id(self.event_rule_id) 

2023 

2024 @classmethod 

2025 def dict_to_object(cls, dict_): 

2026 """Refine to convert timestamp to datetime for mongodb.""" 

2027 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

2028 return super().dict_to_object(dict_) 

2029 

2030 

2031class TwinPadActivity(GenericMongo): 

2032 timestamp: float 

2033 amount: int 

2034 

2035 @classmethod 

2036 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

2037 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2038 number_events_collection = get_collection(systems_database, "number_events") 

2039 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

2040 items = [] 

2041 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2042 if number_events_collection is None or recompute_amount: 

2043 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

2044 number_events_collection.delete_many({}) 

2045 first_event = events_collection.find_one(sort={"timestamp": 1}) 

2046 if first_event is None: 

2047 return items 

2048 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

2049 tzinfo=pytz.UTC 

2050 ) 

2051 while last_computed_day < TODAY: 

2052 day_nb_events = events_collection.count_documents( 

2053 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2054 ) 

2055 if day_nb_events > 0: 

2056 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

2057 last_computed_day += ONE_DAY_OFFSET 

2058 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2059 if number_events_today > 0: 

2060 number_events_collection.delete_many({"timestamp": TODAY}) 

2061 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

2062 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2063 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2064 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2065 for day in number_events: 

2066 day["timestamp"] = day["timestamp"].timestamp() 

2067 items.append(cls.mongo_dict_to_object(day)) 

2068 return items 

2069 

2070 @classmethod 

2071 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2072 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2073 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2074 signals_number_samples_collection = get_collection( 

2075 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

2076 ) 

2077 items = [] 

2078 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2079 if number_samples_collection is None or recompute_amount: 

2080 number_samples_collection = get_collection( 

2081 systems_database, "number_received_samples", create=True, time_series=True 

2082 ) 

2083 number_samples_collection.delete_many({}) 

2084 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

2085 if first_sample is None: 

2086 return items 

2087 # compute from day of first found event 

2088 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

2089 tzinfo=pytz.UTC 

2090 ) 

2091 while last_computed_day < TODAY: 

2092 number_samples_request = signals_number_samples_collection.aggregate( 

2093 [ 

2094 { 

2095 "$match": { 

2096 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

2097 } 

2098 }, 

2099 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2100 ] 

2101 ).to_list() 

2102 if len(number_samples_request) == 0: 

2103 number_samples = 0 

2104 else: 

2105 number_samples = number_samples_request[0].get("number_samples", 0) 

2106 if number_samples > 0: 

2107 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

2108 last_computed_day += ONE_DAY_OFFSET 

2109 number_samples_request = signals_number_samples_collection.aggregate( 

2110 [ 

2111 {"$match": {"timestamp": {"$gte": TODAY}}}, 

2112 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2113 ] 

2114 ).to_list() 

2115 if len(number_samples_request) == 0: 

2116 number_samples_today = 0 

2117 else: 

2118 number_samples_today = number_samples_request[0].get("number_samples", 0) 

2119 if number_samples_today > 0: 

2120 number_samples_collection.delete_many({"timestamp": TODAY}) 

2121 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

2122 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2123 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2124 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2125 for day in number_events: 

2126 day["timestamp"] = day["timestamp"].timestamp() 

2127 items.append(cls.mongo_dict_to_object(day)) 

2128 return items 

2129 

2130 @classmethod 

2131 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2132 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2133 number_commands_collection = get_collection(systems_database, "number_commands") 

2134 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

2135 items = [] 

2136 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2137 if number_commands_collection is None or recompute_amount: 

2138 number_commands_collection = get_collection( 

2139 systems_database, "number_commands", create=True, time_series=True 

2140 ) 

2141 number_commands_collection.delete_many({}) 

2142 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

2143 if first_command is None: 

2144 return items 

2145 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

2146 tzinfo=pytz.UTC 

2147 ) 

2148 while last_computed_day < TODAY: 

2149 day_nb_commands = commands_collection.count_documents( 

2150 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2151 ) 

2152 if day_nb_commands > 0: 

2153 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

2154 last_computed_day += ONE_DAY_OFFSET 

2155 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2156 if number_commands_today > 0: 

2157 number_commands_collection.delete_many({"timestamp": TODAY}) 

2158 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

2159 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2160 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2161 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2162 for day in number_commands: 

2163 day["timestamp"] = day["timestamp"].timestamp() 

2164 items.append(cls.mongo_dict_to_object(day)) 

2165 return items 

2166 

2167 

2168class EventRule(GenericMongo): 

2169 collection_name: ClassVar[str] = "event_rules" 

2170 

2171 name: str 

2172 formula: str 

2173 variables: list[str] 

2174 

2175 @computed_field 

2176 @cached_property 

2177 def number_events(self) -> int: 

2178 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

2179 

2180 

2181class Company(GenericMongo): 

2182 collection_name: ClassVar[str] = "companies" 

2183 name: str 

2184 

2185 

2186class Campaign(GenericMongo): 

2187 collection_name: ClassVar[str] = "campaigns" 

2188 

2189 # Properties 

2190 id: str | None = None 

2191 name: str 

2192 description: str | None = None 

2193 

2194 

2195class Phase(GenericMongo): 

2196 collection_name: ClassVar[str] = "phases" 

2197 

2198 # Properties 

2199 id: str | None = None 

2200 name: str 

2201 description: str | None = None 

2202 start_at: float 

2203 end_at: float 

2204 

2205 # FK 

2206 campaign_id: MongoId 

2207 

2208 @classmethod 

2209 def deleteMany(cls, campaign_id): 

2210 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

2211 return delete_phases 

2212 

2213 

2214class CustomViewCreation(GenericMongo): 

2215 collection_name: ClassVar[str] = "custom_views" 

2216 

2217 name: str 

2218 configuration: list 

2219 

2220 

2221class CustomView(CustomViewCreation): 

2222 # Properties 

2223 id: str | None = None 

2224 

2225 # Foreign Key 

2226 user_id: str 

2227 

2228 

2229CustomViewUpdate = create_update_model(CustomView) 

2230 

2231 

2232class Video(GenericMongo): 

2233 collection_name: ClassVar[str] = "videos" 

2234 

2235 # Properties 

2236 name: str 

2237 ip_addr: str 

2238 username: str | None = None 

2239 password: str | None = None 

2240 

2241 # Methods 

2242 @classmethod 

2243 def get_all(cls, sort_by="_id") -> list[Self]: 

2244 items = [] 

2245 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

2246 items.append(cls.mongo_dict_to_object(dict_)) 

2247 return items 

2248 

2249 @classmethod 

2250 def get_video(cls, camera_id: ObjectId): 

2251 camera = cls.get_from_id(camera_id) 

2252 if camera is not None: 

2253 return camera.name 

2254 return None 

2255 

2256 

2257class Command(GenericMongo): 

2258 collection_name: ClassVar[str] = "commands" 

2259 

2260 # Properties 

2261 timestamp: datetime.datetime = None 

2262 sent_at: float 

2263 response_time: float = 0.0 

2264 command_type: str 

2265 description: str = "" 

2266 succeeded: bool = False 

2267 

2268 # Foreign key 

2269 user_id: str 

2270 

2271 @classmethod 

2272 def collection(cls): 

2273 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

2274 

2275 @classmethod 

2276 def create(cls, command: Self): 

2277 command = cls( 

2278 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

2279 sent_at=command.sent_at, 

2280 response_time=command.response_time, 

2281 command_type=command.command_type, 

2282 description=command.description, 

2283 succeeded=command.succeeded, 

2284 user_id=command.user_id, 

2285 ) 

2286 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2287 if new_command is None: 

2288 return None 

2289 return {"command_id": str(new_command.inserted_id)} 

2290 

2291 def receive_response(self, response: dict): 

2292 self.response_time = time.time() - self.sent_at 

2293 self.succeeded = response.get("error", True) is False 

2294 if self.description == "": 

2295 self.description += response.get("message", "").rstrip() 

2296 

2297 

2298class SignalsPresetCreation(GenericMongo): 

2299 name: str 

2300 signal_ids: list[str] 

2301 

2302 

2303class SignalsPreset(SignalsPresetCreation): 

2304 collection_name: ClassVar[str] = "signals_presets" 

2305 

2306 user_id: str 

2307 

2308 @classmethod 

2309 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2310 signals_preset = cls( 

2311 user_id=user_id, 

2312 name=signals_preset.name, 

2313 signal_ids=signals_preset.signal_ids, 

2314 ) 

2315 

2316 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2317 

2318 return str(new_signal_preset.inserted_id) 

2319 

2320 

2321SignalsPresetUpdate = create_update_model(SignalsPreset) 

2322 

2323 

2324class LineStyle(str, Enum): 

2325 solid = "solid" 

2326 dotted = "dotted" 

2327 dashed = "dashed" 

2328 

2329 

2330class SignalAppearance: 

2331 value_color: str 

2332 forced_value_color: str 

2333 

2334 

2335class GraphThemeCreation(GenericMongo): 

2336 collection_name: ClassVar[str] = "graph_themes" 

2337 

2338 name: str 

2339 signal_id: str 

2340 value_color: str = "" 

2341 forced_value_color: str = "" 

2342 value_line_style: LineStyle = LineStyle.solid 

2343 forced_value_line_style: LineStyle = LineStyle.solid 

2344 private: bool = True 

2345 

2346 

2347class PublicGraphTheme(GraphThemeCreation): 

2348 created_by_user: bool 

2349 in_user_library: bool 

2350 active_for_user: bool 

2351 

2352 _current_user_id: str = "" 

2353 

2354 @classproperty 

2355 def custom_pipeline_steps(cls) -> dict[str, list]: 

2356 return { 

2357 "created_by_user": [ 

2358 { 

2359 "$addFields": { 

2360 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2361 } 

2362 } 

2363 ], 

2364 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2365 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2366 ], 

2367 "in_user_library": [ 

2368 { 

2369 "$addFields": { 

2370 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2371 } 

2372 } 

2373 ], 

2374 "active_for_user": [ 

2375 { 

2376 "$addFields": { 

2377 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2378 } 

2379 } 

2380 ], 

2381 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2382 "active": [ 

2383 { 

2384 "$addFields": { 

2385 "active": "$$REMOVE", 

2386 } 

2387 } 

2388 ], 

2389 "creator_id": [ 

2390 { 

2391 "$addFields": { 

2392 "creator_id": "$$REMOVE", 

2393 } 

2394 } 

2395 ], 

2396 } 

2397 

2398 @classmethod 

2399 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2400 cls._current_user_id = user_id 

2401 return super().response_from_query(query) 

2402 

2403 @classmethod 

2404 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2405 query.in_user_library = "true" 

2406 return cls.response_from_query(query, user_id) 

2407 

2408 @classmethod 

2409 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2410 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2411 

2412 @classmethod 

2413 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2414 cls._current_user_id = user_id 

2415 return super().get_by_attribute(attribute_name, attribute_value) 

2416 

2417 @classmethod 

2418 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2419 cls._current_user_id = user_id 

2420 return super().get_one_by_attribute(attribute_name, attribute_value) 

2421 

2422 @classmethod 

2423 def get_all(cls, sort_by: str, user_id: str): 

2424 cls._current_user_id = user_id 

2425 return super().get_all(sort_by) 

2426 

2427 @classmethod 

2428 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2429 pipeline = [ 

2430 { 

2431 "$match": { 

2432 "active": {"$eq": user_id}, 

2433 "signal_id": {"$in": signal_ids}, 

2434 } 

2435 }, 

2436 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2437 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2438 { 

2439 "$project": { 

2440 "_id": 0, 

2441 "signal_id": 1, 

2442 "value_color": 1, 

2443 "forced_value_color": 1, 

2444 "value_line_style": 1, 

2445 "forced_value_line_style": 1, 

2446 } 

2447 }, 

2448 ] 

2449 

2450 result = {} 

2451 

2452 cursor = cls.collection().aggregate(pipeline) 

2453 for document in cursor: 

2454 signal_id = document["signal_id"] 

2455 del document["signal_id"] 

2456 result[signal_id] = document 

2457 

2458 return result 

2459 

2460 

2461GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2462 

2463 

2464class PrivateGraphTheme(GraphThemeCreation): 

2465 # private 

2466 creator_id: str 

2467 in_library: list[str] 

2468 active: list[str] 

2469 

2470 @classmethod 

2471 def create( 

2472 cls, 

2473 creator_id: str, 

2474 name: str, 

2475 signal_id: str, 

2476 value_color: str, 

2477 forced_value_color: str, 

2478 value_line_style: LineStyle, 

2479 forced_value_line_style: LineStyle, 

2480 private: bool, 

2481 ): 

2482 color_setting = cls( 

2483 creator_id=creator_id, 

2484 name=name, 

2485 signal_id=signal_id, 

2486 value_color=value_color, 

2487 forced_value_color=forced_value_color, 

2488 value_line_style=value_line_style, 

2489 forced_value_line_style=forced_value_line_style, 

2490 private=private, 

2491 in_library=[creator_id], 

2492 active=[creator_id], 

2493 ) 

2494 

2495 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2496 color_setting.id = str(new_color_setting.inserted_id) 

2497 return color_setting 

2498 

2499 def update(self, update_dict: dict, user_id: str): 

2500 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2501 if in_user_lib and user_id not in self.in_library: 

2502 self.in_library.append(user_id) 

2503 elif not in_user_lib and user_id in self.in_library: 

2504 self.in_library.remove(user_id) 

2505 update_dict["in_library"] = self.in_library 

2506 del update_dict["in_user_library"] 

2507 

2508 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2509 if active_for_user and user_id not in self.active: 

2510 self.active.append(user_id) 

2511 elif not active_for_user and user_id in self.active: 

2512 self.active.remove(user_id) 

2513 update_dict["active"] = self.active 

2514 del update_dict["active_for_user"] 

2515 

2516 if update_dict.get("created_by_user") is not None: 

2517 del update_dict["created_by_user"] 

2518 

2519 self.collection().find_one_and_update( 

2520 {"_id": ObjectId(self.id)}, 

2521 {"$set": update_dict}, 

2522 ) 

2523 

2524 return {} 

2525 

2526 

2527class DeviceStatus(str, Enum): 

2528 started = "started" 

2529 running = "running" 

2530 created = "created" 

2531 exited = "exited" 

2532 restarting = "restarting" 

2533 

2534 

2535class DeviceUpdateFromDeployer(BaseModel): 

2536 status: DeviceStatus 

2537 

2538 

2539class DeviceFromDeployerCreation(BaseModel): 

2540 name: str 

2541 description: str 

2542 

2543 

2544class DeviceFromDeployer(DeviceFromDeployerCreation): 

2545 status: DeviceStatus 

2546 device_id: DeviceId 

2547 logs: str = "" 

2548 

2549 

2550class DeviceDeployer(GenericMongo): 

2551 collection_name: ClassVar[str] = "device_deployers" 

2552 url: HttpUrl 

2553 

2554 def endpoint_url(self, endpoint): 

2555 return f"{str(self.url).rstrip('/')}/{endpoint}" 

2556 

2557 def devices(self) -> list[DeviceFromDeployer]: 

2558 devices = [] 

2559 try: 

2560 response = requests.get(self.endpoint_url("devices")) 

2561 except requests.exceptions.ConnectionError: 

2562 logger.info("connection error") 

2563 return None 

2564 if response.status_code != 200: 

2565 return None 

2566 for device_dict in response.json()["devices"]: 

2567 devices.append( 

2568 DeviceFromDeployer( 

2569 device_id=device_dict["device_id"], 

2570 name=device_dict["container_name"], 

2571 description="desc", 

2572 status=device_dict["status"], 

2573 logs=device_dict["logs"], 

2574 ) 

2575 ) 

2576 return devices 

2577 

2578 def get_device(self, device_id: DeviceId): 

2579 try: 

2580 response = requests.get(self.endpoint_url(f"devices/{device_id}")) 

2581 except requests.exceptions.ConnectionError: 

2582 return None 

2583 if response.status_code != 200: 

2584 return None 

2585 device_dict = response.json() 

2586 return DeviceFromDeployer( 

2587 device_id=device_dict["device_id"], 

2588 name=device_dict["container_name"], 

2589 description="desc", 

2590 status=device_dict["status"], 

2591 logs=device_dict["logs"], 

2592 ) 

2593 

2594 def create_device(self, device: DeviceFromDeployer) -> Device | None: 

2595 try: 

2596 response = requests.post(self.endpoint_url("devices"), json={"name": device.name}) 

2597 except requests.exceptions.ConnectionError: 

2598 return None 

2599 

2600 if response.status_code != 201: 

2601 return None 

2602 

2603 device_dict = response.json() 

2604 return DeviceFromDeployer( 

2605 device_id=device_dict["device_id"], 

2606 name="", 

2607 description="desc", 

2608 status=device_dict["status"], 

2609 ) 

2610 

2611 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None: 

2612 try: 

2613 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump()) 

2614 except requests.exceptions.ConnectionError: 

2615 return None 

2616 

2617 if response.status_code != 200: 

2618 return None 

2619 

2620 device_dict = response.json() 

2621 return Device( 

2622 device_id=device_dict["device_id"], 

2623 name="", 

2624 description="desc", 

2625 pid={}, 

2626 petri_network={}, 

2627 modes=[], 

2628 status=device_dict["status"], 

2629 ) 

2630 

2631 def delete_device(self, device_id: DeviceId) -> DeleteInfo: 

2632 try: 

2633 response = requests.delete(self.endpoint_url(f"devices/{device_id}")) 

2634 except requests.exceptions.ConnectionError: 

2635 return DeleteInfo(is_deleted=False, detail="Connection to deployer error") 

2636 if response.status_code not in [200, 202, 204]: 

2637 return DeleteInfo(is_deleted=False, detail=response.text) 

2638 

2639 return DeleteInfo(is_deleted=True, detail="") 

2640 

2641 

2642DeviceDeployerUpdate = create_update_model(DeviceDeployer)