Coverage for  / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%

1322 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-08 08:24 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23import h5py 

24 

25from twinpad_backend.db import ( 

26 get_collection, 

27 get_async_collection, 

28 get_signal_collection, 

29 get_signal_collections_batch, 

30 systems_database, 

31 systems_async_database, 

32 signals_database, 

33 signals_async_database, 

34 devices_states_database, 

35) 

36from twinpad_backend.responses import ListResponse 

37from twinpad_backend.messages import send_mode_change, send_signal_value 

38from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm 

39 

40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

41SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"] 

42DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"] 

43MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"] 

44 

45 

46RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

47MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

48SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

49HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

50DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

51 

52DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

53NUMBER_SAMPLES_DATABASE_UPDATE = 120 

54 

55logger = logging.getLogger("uvicorn.error") 

56 

57 

58class classproperty: 

59 """ 

60 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

61 Found here: https://stackoverflow.com/a/76301341 

62 """ 

63 

64 def __init__(self, func): 

65 self.fget = func 

66 

67 def __get__(self, _, owner): 

68 return self.fget(owner) 

69 

70 

71def create_update_model(model): 

72 fields = {} 

73 

74 for field_name, field_annotation in model.model_fields.items(): 

75 if field_name != "id": 

76 fields[field_name] = (field_annotation.annotation | None, None) 

77 

78 query_name = model.__name__ + "Update" 

79 return create_model(query_name, **fields) 

80 

81 

82def get_utc_date_from_timestamp(timestamp: float): 

83 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

84 

85 

86def downsample_list(time_vector: list, values: list, max_number_samples: int): 

87 if len(time_vector) < max_number_samples: 

88 return time_vector, values 

89 

90 time_vector_copy = copy.deepcopy(time_vector) 

91 values_copy = copy.deepcopy(values) 

92 

93 none_group_bounds = [] 

94 none_group_index = -1 

95 index = -1 

96 # LTTB doesn't handle None values so remove them 

97 while values_copy.count(None) > 0: 

98 # Store bounds of None value groups so we can insert them back after the downsampling 

99 if (new_index := values_copy.index(None)) != index: 

100 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

101 none_group_index += 1 

102 elif len(none_group_bounds[none_group_index]) < 2: 

103 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

104 else: 

105 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

106 values_copy.pop(new_index) 

107 index = new_index 

108 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

109 

110 try: 

111 values_array = npy.array([time_vector_copy, values_copy]).T 

112 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

113 

114 new_time_vector = interpolated_values[:, 0].tolist() 

115 new_values = interpolated_values[:, 1].tolist() 

116 except ValueError: 

117 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

118 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

119 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

120 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

121 return new_time_vector, new_values_nan_to_none 

122 

123 # insert back None values at the correct timestamps 

124 for none_group in none_group_bounds: 

125 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

126 new_time_vector[start_index:start_index] = none_group 

127 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

128 

129 return new_time_vector, new_values 

130 

131 

132def is_of_type(value, wanted_type): 

133 if wanted_type is float: 

134 return isinstance(value, (int, float)) 

135 return isinstance(value, wanted_type) 

136 

137 

138# Models 

139class TwinPadModel(BaseModel): 

140 @classmethod 

141 def dict_to_object(cls, dict_): 

142 return cls.model_validate(dict_) 

143 

144 def to_dict(self, exclude=None): 

145 dict_ = self.model_dump(exclude=exclude) 

146 return dict_ 

147 

148 

149class GenericMongo(TwinPadModel): 

150 id: str | None = None 

151 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

152 

153 @classmethod 

154 def collection(cls): 

155 return get_collection(systems_database, cls.collection_name, create=True) 

156 

157 @classmethod 

158 def response_from_query(cls, query) -> ListResponse[Self]: 

159 request_filters = query.mongodb_filter() 

160 items = [] 

161 

162 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

163 sort_dict = {} 

164 for sort in query.sort_by.split(","): 

165 if ":" in sort: 

166 sort_field, sort_order = sort.split(":") 

167 sort_order = int(sort_order) 

168 else: 

169 sort_field = sort 

170 sort_order = 1 

171 sort_dict[sort_field] = sort_order 

172 

173 collection = get_collection(systems_database, cls.collection_name, create=True) 

174 total = collection.count_documents(request_filters) 

175 

176 pipeline = [] 

177 added_properties = [] 

178 if "$and" in request_filters: 

179 for request_filter in request_filters["$and"]: 

180 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

181 if filtered_property in request_filter: 

182 pipeline.extend(pipeline_steps) 

183 added_properties.append(filtered_property) 

184 else: 

185 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

186 if filtered_property in request_filters: 

187 pipeline.extend(pipeline_steps) 

188 added_properties.append(filtered_property) 

189 pipeline.append({"$match": request_filters}) 

190 

191 for sort_field in sort_dict.keys(): 

192 if sort_field in cls.custom_pipeline_steps: 

193 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

194 added_properties.append(sort_field) 

195 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

196 

197 if (query.limit is not None) and (query.limit != 0): 

198 pipeline.append({"$limit": query.limit}) 

199 

200 for filtered_property, step in cls.custom_pipeline_steps.items(): 

201 if filtered_property not in added_properties: 

202 pipeline.extend(step) 

203 

204 cursor = collection.aggregate(pipeline) 

205 

206 for item_dict in cursor: 

207 items.append(cls.mongo_dict_to_object(item_dict)) 

208 

209 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

210 

211 @classmethod 

212 def get_from_id(cls, item_id) -> Self | None: 

213 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

214 

215 @classmethod 

216 def mongo_dict_to_object(cls, mongo_dict): 

217 mongo_dict["id"] = str(mongo_dict["_id"]) 

218 del mongo_dict["_id"] 

219 return cls.dict_to_object(mongo_dict) 

220 

221 @classmethod 

222 def get_by_attribute(cls, attribute_name: str, attribute_value): 

223 """Returns all items that match the attribute with value.""" 

224 pipeline = [] 

225 if attribute_name in cls.custom_pipeline_steps: 

226 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

227 pipeline.append({"$match": {attribute_name: attribute_value}}) 

228 for key, step in cls.custom_pipeline_steps.items(): 

229 if key != attribute_name: 

230 pipeline.extend(step) 

231 items = cls.collection().aggregate(pipeline) 

232 if items is None: 

233 return None 

234 return [cls.mongo_dict_to_object(d) for d in items] 

235 

236 @classmethod 

237 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

238 pipeline = [] 

239 if attribute_name in cls.custom_pipeline_steps: 

240 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

241 pipeline.append({"$match": {attribute_name: attribute_value}}) 

242 pipeline.append({"$limit": 1}) 

243 for key, step in cls.custom_pipeline_steps.items(): 

244 if key != attribute_name: 

245 pipeline.extend(step) 

246 items = cls.collection().aggregate(pipeline).to_list() 

247 if len(items) == 0: 

248 return None 

249 return cls.mongo_dict_to_object(items[0]) 

250 

251 @classmethod 

252 def get_all(cls, sort_by="_id") -> list[Self]: 

253 items = [] 

254 pipeline = [] 

255 if sort_by in cls.custom_pipeline_steps: 

256 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

257 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

258 for key, step in cls.custom_pipeline_steps.items(): 

259 if key != sort_by: 

260 pipeline.extend(step) 

261 for dict_ in cls.collection().aggregate(pipeline): 

262 items.append(cls.mongo_dict_to_object(dict_)) 

263 return items 

264 

265 @classmethod 

266 def get_number_documents(cls): 

267 collection = get_collection(systems_database, cls.collection_name) 

268 if collection is None: 

269 return 0 

270 return collection.count_documents( 

271 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]} 

272 ) 

273 

274 def insert(self): 

275 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

276 self.id = str(insert_result.inserted_id) 

277 return self.id 

278 

279 def update(self, update_dict): 

280 for key, value in update_dict.items(): 

281 setattr(self, key, value) 

282 self.collection().find_one_and_update( 

283 {"_id": ObjectId(self.id)}, 

284 {"$set": update_dict}, 

285 return_document=ReturnDocument.AFTER, 

286 ) 

287 

288 return self 

289 

290 def delete(self): 

291 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

292 return result.deleted_count > 0 

293 

294 

295class User(GenericMongo): 

296 collection_name: ClassVar[str] = "users" 

297 

298 firstname: str 

299 lastname: str 

300 email: str 

301 password: str 

302 is_active: bool | None = False 

303 is_admin: bool | None = False 

304 is_connected: bool | None = False 

305 company_id: str | None = None 

306 

307 def to_dict(self, exclude=None): 

308 if exclude is None: 

309 exclude = {"password"} 

310 return GenericMongo.to_dict(self, exclude=exclude) 

311 

312 @classmethod 

313 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

314 users = cls.get_all() 

315 if not users: 

316 is_admin = True 

317 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

318 user_collection = get_collection(systems_database, "users", create=True) 

319 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

320 if new_user is None: 

321 return None 

322 return {"user_id": str(new_user.inserted_id)} 

323 

324 @classmethod 

325 def update_info(cls, user: "UserUpdate", user_id: str): 

326 updated_user = cls.collection().find_one_and_update( 

327 {"_id": ObjectId(user_id)}, 

328 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

329 return_document=ReturnDocument.AFTER, 

330 ) 

331 updated_user["id"] = str(updated_user["_id"]) 

332 del (updated_user["_id"], updated_user["is_connected"]) 

333 return cls(**updated_user) 

334 

335 

336UserUpdate = create_update_model(User) 

337 

338 

339class Mode(TwinPadModel): 

340 mode_id: int 

341 name: str 

342 frequency_multiplier: float 

343 min_frequency: float 

344 

345 

346class DeviceUpdate(TwinPadModel): 

347 mode_id: int 

348 

349 

350class Device(GenericMongo): 

351 collection_name: ClassVar[str] = "devices" 

352 

353 device_id: str 

354 name: str 

355 description: str = "" 

356 modes: list[Mode] 

357 current_mode_id: int | None = None 

358 last_ping: float | None = None 

359 petri_network: Any 

360 pid: Any 

361 load: float | None = None 

362 tokens: list[int] = Field(default_factory=list) 

363 status: str 

364 

365 async def change_mode(self, update_dict, current_user: User): 

366 has_error = False 

367 

368 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

369 has_error = True 

370 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

371 elif self.current_mode_id is not None: 

372 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

373 else: 

374 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

375 command = Command( 

376 sent_at=time.time(), 

377 command_type="Mode change", 

378 description=description, 

379 user_id=current_user.id, 

380 ) 

381 

382 if has_error: 

383 command.response_time = 0 

384 command.succeeded = False 

385 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

386 else: 

387 response = await send_mode_change(self.device_id, update_dict.mode_id) 

388 command.receive_response(response) 

389 

390 Command.create(command) 

391 return response 

392 

393 @classmethod 

394 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

395 devices_by_id = {} 

396 for signal_id in signal_ids: 

397 device_id = signal_id.split(".")[0] 

398 if device_id not in devices_by_id: 

399 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

400 return devices_by_id 

401 

402 

403class DeviceSetup(GenericMongo): 

404 collection_name: ClassVar[str] = "device_setups" 

405 

406 device_ids: list[str] 

407 active: bool = False 

408 variable_mapping: dict[str, str] 

409 

410 

411DeviceSetupUpdate = create_update_model(DeviceSetup) 

412 

413 

414class DeviceState(GenericMongo): 

415 collection_name: ClassVar[str] = "devices_states" 

416 

417 timestamp: float 

418 mode: str | None = None 

419 load: float | None = None 

420 tokens: list[int] = Field(default_factory=list) 

421 modified_properties: list[str] = Field(default_factory=list) 

422 

423 @classmethod 

424 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

425 req_filter = query.mongodb_filter() 

426 items = [] 

427 if ":" in query.sort_by: 

428 sort_field, sort_order = query.sort_by.split(":") 

429 sort_order = int(sort_order) 

430 else: 

431 sort_field = query.sort_by 

432 sort_order = 1 

433 collection = get_collection(devices_states_database, device_id) 

434 if collection is None: 

435 total = 0 

436 cursor = [] 

437 else: 

438 total = collection.count_documents(req_filter) 

439 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

440 if (query.limit is not None) and (query.limit != 0): 

441 cursor = cursor.limit(query.limit) 

442 for item_dict in cursor: 

443 items.append( 

444 cls( 

445 timestamp=item_dict.get("precise_timestamp"), 

446 mode=item_dict.get("mode", None), 

447 load=item_dict.get("load", None), 

448 tokens=item_dict.get("tokens", Field(default_factory=list)), 

449 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

450 ) 

451 ) 

452 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

453 

454 

455class SignalSample(TwinPadModel): 

456 signal_id: str 

457 timestamp: float 

458 value: float | int | str | bool | None 

459 forced_value: float | int | str | bool | None = None 

460 

461 @classmethod 

462 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

463 

464 collection = get_signal_collection(signal_id) 

465 if collection is None: 

466 return None 

467 

468 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

469 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

470 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

471 first_bucket = None 

472 if bucket is not None: 

473 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

474 if first_bucket is not None: 

475 sample_data = collection.find_one( 

476 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

477 ) 

478 else: 

479 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

480 

481 if sample_data is None: 

482 return None 

483 

484 timestamp = sample_data["precise_timestamp"] 

485 

486 return cls( 

487 signal_id=signal_id, 

488 timestamp=timestamp, 

489 value=sample_data.get("value", None), 

490 forced_value=sample_data.get("forced_value", None), 

491 ) 

492 

493 @classmethod 

494 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

495 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

496 

497 @classmethod 

498 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

499 collection = get_signal_collection(signal_id) 

500 if collection is None: 

501 return None 

502 

503 # Same workaround as above function, very effective to narrow down big sets of data 

504 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

505 last_bucket = None 

506 if bucket is not None: 

507 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

508 if last_bucket is not None: 

509 sample_data = collection.find_one( 

510 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

511 sort={"precise_timestamp": -1}, 

512 ) 

513 else: 

514 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

515 

516 if sample_data is None: 

517 return None 

518 

519 timestamp = sample_data["precise_timestamp"] 

520 

521 if device is None: 

522 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

523 if device is not None and device.last_ping is not None: 

524 if timestamp is None: 

525 timestamp = device.last_ping 

526 else: 

527 timestamp = max(timestamp, device.last_ping) 

528 return cls( 

529 signal_id=signal_id, 

530 timestamp=timestamp, 

531 value=sample_data.get("value", None), 

532 forced_value=sample_data.get("forced_value", None), 

533 ) 

534 

535 @classmethod 

536 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

537 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

538 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

539 

540 

541class SignalData(TwinPadModel): 

542 signal_id: str 

543 forcible: bool = True 

544 time_vector: list[float] 

545 values: list[float | int | str | None] 

546 forced_values: list[float | int | str | None] 

547 

548 data_start: float | None = None 

549 data_end: float | None = None 

550 

551 number_samples: int = 0 

552 number_samples_db: int = 0 

553 

554 db_query_time: float = 0.0 

555 init_time: float = 0.0 

556 data_processing_time: float = 0.0 

557 

558 phase_id: str | None = None 

559 

560 @classmethod 

561 def get_from_signal_id( 

562 cls, 

563 signal_id: str, 

564 min_timestamp: float = None, 

565 max_timestamp: float = None, 

566 window_min_timestamp: float = None, 

567 window_max_timestamp: float = None, 

568 interpolate_bounds: bool = True, 

569 max_documents: int = None, 

570 ) -> Self: 

571 

572 now = time.time() 

573 

574 req_signal = {} 

575 if min_timestamp is not None: 

576 req_signal.setdefault("timestamp", {}) 

577 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

578 if max_timestamp is not None: 

579 req_signal.setdefault("timestamp", {}) 

580 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

581 

582 collection = get_signal_collection(signal_id) 

583 if collection is None: 

584 return cls( 

585 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

586 ) 

587 

588 db_req_start = time.time() 

589 

590 sort_step = {"$sort": {"precise_timestamp": 1}} 

591 number_results = collection.count_documents(req_signal) 

592 

593 pipeline = [] 

594 if req_signal: 

595 pipeline.append({"$match": req_signal}) # Filter data if needed 

596 

597 pipeline.extend( 

598 [ 

599 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

600 sort_step, 

601 ] 

602 ) 

603 

604 if max_documents is not None and max_documents < number_results: 

605 unsampling_ratio = math.ceil(number_results / max_documents) 

606 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

607 pipeline.extend( 

608 [ 

609 { 

610 "$setWindowFields": { 

611 "sortBy": {"precise_timestamp": 1}, 

612 "output": {"index": {"$documentNumber": {}}}, 

613 } 

614 }, 

615 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

616 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

617 {"$replaceRoot": {"newRoot": "$doc"}}, 

618 {"$unset": ["index", "group_id"]}, 

619 {"$sort": {"precise_timestamp": 1}}, 

620 ] 

621 ) 

622 

623 # logger.info(f"pipeline: %s", str(pipeline)) 

624 cursor = collection.aggregate(pipeline) 

625 db_req_time = time.time() - db_req_start 

626 

627 init_time = time.time() 

628 

629 results = cursor.to_list() 

630 time_vector = [] 

631 values = [] 

632 forced_values = [] 

633 for s in results: 

634 time_vector.append(s["precise_timestamp"]) 

635 values.append(s.get("value", None)) 

636 forced_values.append(s.get("forced_value", None)) 

637 

638 signal = Signal.get_from_signal_id(signal_id) 

639 if signal is None: 

640 return cls( 

641 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

642 ) 

643 class_ = signal.signal_data_class 

644 

645 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

646 time_vector, values, forced_values = cls.interpolate_bounds( 

647 class_, 

648 collection, 

649 signal_id, 

650 time_vector, 

651 values, 

652 forced_values, 

653 window_min_timestamp, 

654 window_max_timestamp, 

655 ) 

656 

657 if values: 

658 # TODO: check below. a bit strange 

659 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

660 # Adding last value as it should be repeated 

661 time_vector.append(now) 

662 values.append(values[-1]) 

663 forced_values.append(forced_values[-1]) 

664 

665 init_time = time.time() - init_time 

666 

667 # See line 292 for explanation 

668 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

669 first_bucket = None 

670 if bucket is not None: 

671 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

672 if first_bucket is not None: 

673 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

674 else: 

675 data_start = None 

676 

677 last_bucket = None 

678 if bucket is not None: 

679 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

680 if last_bucket is not None: 

681 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

682 else: 

683 data_end = None 

684 

685 return class_( 

686 signal_id=signal_id, 

687 forcible=signal.forcible, 

688 time_vector=time_vector, 

689 values=values, 

690 forced_values=forced_values, 

691 data_start=data_start, 

692 data_end=data_end, 

693 number_samples=len(values), 

694 number_samples_db=number_results, 

695 db_query_time=db_req_time, 

696 init_time=init_time, 

697 ) 

698 

699 @staticmethod 

700 def interpolate_bounds( 

701 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

702 ): 

703 sample_right = None 

704 # Fetching right side value & interpolation 

705 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

706 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

707 sample_right = collection.find_one( 

708 { 

709 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

710 "value": {"$exists": True}, 

711 }, 

712 sort={"precise_timestamp": -1}, 

713 ) 

714 if sample_right: 

715 if time_vector: 

716 right_sd = class_( 

717 signal_id=signal_id, 

718 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

719 values=[values[-1], sample_right.get("value", None)], 

720 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

721 ) 

722 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

723 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

724 else: 

725 max_ts_value = sample_right.get("value", None) 

726 max_ts_forced_value = sample_right.get("forced_value", None) 

727 time_vector.append(window_max_timestamp) 

728 values.append(max_ts_value) 

729 forced_values.append(max_ts_forced_value) 

730 

731 # Fetching left side value & interpolation 

732 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

733 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

734 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

735 sample_left = sample_right 

736 sample_left = collection.find_one( 

737 { 

738 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

739 "value": {"$exists": True}, 

740 }, 

741 sort={"precise_timestamp": -1}, 

742 ) 

743 

744 if sample_left: 

745 if time_vector: 

746 left_sd = class_( 

747 signal_id=signal_id, 

748 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

749 values=[sample_left["value"], values[0]], 

750 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

751 ) 

752 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

753 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

754 else: 

755 min_ts_value = sample_left.get("value", None) 

756 min_ts_forced_value = sample_left.get("forced_value", None) 

757 time_vector.insert(0, window_min_timestamp) 

758 values.insert(0, min_ts_value) 

759 forced_values.insert(0, min_ts_forced_value) 

760 

761 return time_vector, values, forced_values 

762 

763 def interpolate_values(self, new_time_vector: list[float]): 

764 return self.interpolate(new_time_vector, self.values) 

765 

766 def interpolate_forced_values(self, new_time_vector: list[float]): 

767 return self.interpolate(new_time_vector, self.forced_values) 

768 

769 def uniform_desampling(self, number_samples_max: int) -> Self: 

770 data_processing_time = time.time() 

771 if number_samples_max and self.number_samples > number_samples_max: 

772 new_time_vector = npy.linspace( 

773 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

774 ).tolist() 

775 values = self.interpolate_values(new_time_vector) 

776 forced_values = self.interpolate_forced_values(new_time_vector) 

777 time_vector = new_time_vector 

778 number_samples = len(time_vector) 

779 else: 

780 time_vector = self.time_vector 

781 number_samples = len(self.values) 

782 values = self.values[:] 

783 forced_values = self.forced_values[:] 

784 data_processing_time = time.time() - data_processing_time 

785 

786 return self.__class__( 

787 signal_id=self.signal_id, 

788 time_vector=time_vector, 

789 values=values, 

790 forced_values=forced_values, 

791 number_samples=number_samples, 

792 number_samples_db=self.number_samples, 

793 data_start=self.data_start, 

794 data_end=self.data_end, 

795 db_query_time=self.db_query_time, 

796 init_time=self.init_time, 

797 data_processing_time=self.data_processing_time + data_processing_time, 

798 phase_id=self.phase_id, 

799 ) 

800 

801 def min_max_downsampling(self, number_samples_max: int) -> Self: 

802 return self.uniform_desampling(number_samples_max) 

803 

804 def interest_window_desampling( 

805 self, 

806 window_max_number_samples: int, 

807 outside_max_number_samples: int, 

808 window_min_timestamp: float | None = None, 

809 window_max_timestamp: float | None = None, 

810 ) -> Self: 

811 """Performs a sampling in a window of interest and outside.""" 

812 

813 if not self.time_vector: 

814 return self 

815 

816 if window_min_timestamp is None: 

817 window_min_timestamp = self.time_vector[0] 

818 if window_max_timestamp is None: 

819 window_max_timestamp = self.time_vector[-1] 

820 

821 data_processing_time = time.time() 

822 

823 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

824 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

825 

826 time_vector_before = self.time_vector[:index_window_start] 

827 time_vector_window = self.time_vector[index_window_start:index_window_end] 

828 time_vector_after = self.time_vector[index_window_end:] 

829 

830 # Resampling window 

831 if time_vector_window: 

832 # Ensurring window bounds 

833 if time_vector_window[0] != window_min_timestamp: 

834 time_vector_window.insert(0, window_min_timestamp) 

835 if time_vector_window[-1] != window_max_timestamp: 

836 time_vector_window.append(window_max_timestamp) 

837 else: 

838 time_vector_window = [window_min_timestamp, window_max_timestamp] 

839 

840 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

841 # Resampling 

842 new_window_time_vector = npy.linspace( 

843 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

844 ).tolist() 

845 time_vector_window = new_window_time_vector 

846 

847 # Resampling outside 

848 number_samples_before = len(time_vector_before) 

849 number_samples_after = len(time_vector_after) 

850 if ( 

851 outside_max_number_samples is not None 

852 and (number_samples_before + number_samples_after) > outside_max_number_samples 

853 ): 

854 new_number_samples_before = min( 

855 number_samples_before, 

856 math.ceil( 

857 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

858 ), 

859 ) 

860 new_number_samples_after = min( 

861 number_samples_after, 

862 math.ceil( 

863 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

864 ), 

865 ) 

866 # Adjusting numbers as math.ceil can do +1 on sum 

867 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

868 if new_number_samples_before > new_number_samples_after: 

869 new_number_samples_before -= 1 

870 else: 

871 new_number_samples_after -= 1 

872 

873 if new_number_samples_before > 0: 

874 new_time_vector_before = npy.linspace( 

875 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

876 ).tolist() 

877 time_vector_before = new_time_vector_before 

878 

879 if new_number_samples_after > 0: 

880 new_time_vector_after = npy.linspace( 

881 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

882 ).tolist()[::-1] 

883 time_vector_after = new_time_vector_after 

884 

885 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

886 values = self.interpolate_values(new_time_vector) 

887 forced_values = self.interpolate_forced_values(new_time_vector) 

888 number_samples = len(values) 

889 

890 data_processing_time = time.time() - data_processing_time 

891 

892 return self.__class__( 

893 signal_id=self.signal_id, 

894 forcible=self.forcible, 

895 time_vector=new_time_vector, 

896 values=values, 

897 forced_values=forced_values, 

898 number_samples=number_samples, 

899 number_samples_db=self.number_samples, 

900 data_start=self.data_start, 

901 data_end=self.data_end, 

902 db_query_time=self.db_query_time, 

903 init_time=self.init_time, 

904 data_processing_time=self.data_processing_time + data_processing_time, 

905 ) 

906 

907 def zero_time_vector(self, data_start: float): 

908 data_processing_time = time.time() 

909 if len(self.time_vector) == 0: 

910 return self 

911 time_vector = npy.array(self.time_vector) - data_start 

912 data_processing_time = time.time() - data_processing_time 

913 

914 return self.__class__( 

915 signal_id=self.signal_id, 

916 time_vector=time_vector, 

917 values=self.values, 

918 forced_values=self.forced_values, 

919 number_samples=self.number_samples, 

920 number_samples_db=self.number_samples_db, 

921 data_start=time_vector[0], 

922 data_end=time_vector[-1], 

923 db_query_time=self.db_query_time, 

924 init_time=self.init_time, 

925 data_processing_time=self.data_processing_time + data_processing_time, 

926 ) 

927 

928 def csv_export(self): 

929 output = io.StringIO() 

930 writer = csv.writer(output) 

931 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

932 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

933 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

934 return output.getvalue().encode("utf-8") 

935 

936 def prestoplot_export(self): 

937 clean_signal_id = self.signal_id.replace(".", "_") 

938 if clean_signal_id[0].isnumeric(): 

939 clean_signal_id = "_" + clean_signal_id 

940 

941 output = io.StringIO() 

942 output.write("# Encoding:\tUTF-8\n") 

943 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

944 output.write("ISO8601\tnone\tnone\n") 

945 output.write(f"# Description :\t{clean_signal_id}\n") 

946 

947 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

948 output.write( 

949 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

950 ) 

951 return output.getvalue().encode("utf-8") 

952 

953 

954class NumericSignalData(SignalData): 

955 data_type: str = "float" 

956 values: list[float | int | None] 

957 forced_values: list[float | int | None] 

958 

959 def interpolate(self, new_time_vector: list[float], items): 

960 items = [npy.nan if s is None else s for s in items] 

961 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

962 

963 def uniform_desampling(self, number_samples_max: int) -> Self: 

964 data_processing_time = time.time() 

965 if number_samples_max and self.number_samples > number_samples_max: 

966 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

967 forced_values = self.interpolate_forced_values(time_vector) 

968 number_samples = len(time_vector) 

969 else: 

970 time_vector = self.time_vector 

971 number_samples = len(self.values) 

972 values = self.values[:] 

973 forced_values = self.forced_values[:] 

974 data_processing_time = time.time() - data_processing_time 

975 

976 return self.__class__( 

977 signal_id=self.signal_id, 

978 time_vector=time_vector, 

979 values=values, 

980 forced_values=forced_values, 

981 number_samples=number_samples, 

982 number_samples_db=self.number_samples, 

983 data_start=self.data_start, 

984 data_end=self.data_end, 

985 db_query_time=self.db_query_time, 

986 init_time=self.init_time, 

987 data_processing_time=self.data_processing_time + data_processing_time, 

988 ) 

989 

990 def min_max_downsampling(self, number_samples_max: int) -> Self: 

991 if self.number_samples < number_samples_max: 

992 return self 

993 

994 data_processing_time = time.time() 

995 

996 number_bins = number_samples_max // 2 

997 

998 time_vector = npy.array(self.time_vector, dtype=npy.float64) 

999 values = npy.array(self.values, dtype=npy.float64) 

1000 forced_values = npy.array(self.forced_values, dtype=npy.float64) 

1001 

1002 points_per_bin = self.number_samples // number_bins 

1003 

1004 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above 

1005 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for 

1006 if self.number_samples - number_bins * points_per_bin > 1: 

1007 points_per_bin += 1 

1008 number_bins = self.number_samples // points_per_bin + 1 

1009 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan) 

1010 time_vector = npy.concatenate([time_vector, nan_points_to_add]) 

1011 values = npy.concatenate([values, nan_points_to_add]) 

1012 forced_values = npy.concatenate([forced_values, nan_points_to_add]) 

1013 

1014 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin) 

1015 values_matrix = values.reshape(number_bins, points_per_bin) 

1016 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin) 

1017 

1018 indexes_min = npy.zeros(number_bins, dtype="int64") 

1019 indexes_max = npy.zeros(number_bins, dtype="int64") 

1020 

1021 for row in range(number_bins): 

1022 min_value = values_matrix[row, 0] 

1023 max_value = values_matrix[row, 0] 

1024 for column in range(points_per_bin): 

1025 if values_matrix[row, column] < min_value: 

1026 min_value = values_matrix[row, column] 

1027 indexes_min[row] = column 

1028 elif values_matrix[row, column] > max_value: 

1029 max_value = values_matrix[row, column] 

1030 indexes_max[row] = column 

1031 

1032 row_index = npy.repeat(npy.arange(number_bins), 2) 

1033 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel() 

1034 

1035 data_processing_time = time.time() - data_processing_time 

1036 

1037 new_time_vector = timestamps_matrix[row_index, column_index] 

1038 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector) 

1039 new_values = values_matrix[row_index, column_index] 

1040 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1041 new_forced_values = forced_values_matrix[row_index, column_index] 

1042 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1043 

1044 # Make sure there are no None values for the time vector 

1045 time_vector_filter = new_time_vector != None 

1046 new_time_vector = new_time_vector[time_vector_filter] 

1047 new_values = new_values[time_vector_filter] 

1048 new_forced_values = new_forced_values[time_vector_filter] 

1049 

1050 return self.__class__( 

1051 signal_id=self.signal_id, 

1052 time_vector=new_time_vector, 

1053 values=new_values, 

1054 forced_values=new_forced_values, 

1055 number_samples=number_bins * 2, 

1056 number_samples_db=self.number_samples_db, 

1057 data_start=self.data_start, 

1058 data_end=self.data_end, 

1059 db_query_time=self.db_query_time, 

1060 init_time=self.init_time, 

1061 data_processing_time=self.data_processing_time + data_processing_time, 

1062 phase_id=self.phase_id, 

1063 ) 

1064 

1065 def interest_window_desampling( 

1066 self, 

1067 window_max_number_samples: int, 

1068 outside_max_number_samples: int, 

1069 window_min_timestamp: float | None = None, 

1070 window_max_timestamp: float | None = None, 

1071 ) -> Self: 

1072 """Performs a sampling in a window of interest and outside.""" 

1073 

1074 if not self.time_vector: 

1075 return self 

1076 

1077 if window_min_timestamp is None: 

1078 window_min_timestamp = self.time_vector[0] 

1079 if window_max_timestamp is None: 

1080 window_max_timestamp = self.time_vector[-1] 

1081 

1082 data_processing_time = time.time() 

1083 

1084 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1085 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1086 

1087 time_vector_before = self.time_vector[:index_window_start] 

1088 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1089 time_vector_after = self.time_vector[index_window_end:] 

1090 

1091 values_before = self.values[:index_window_start] 

1092 values_window = self.values[index_window_start:index_window_end] 

1093 values_after = self.values[index_window_end:] 

1094 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1095 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1096 

1097 # Resampling window 

1098 if time_vector_window: 

1099 # Ensurring window bounds 

1100 if time_vector_window[0] != window_min_timestamp: 

1101 time_vector_window.insert(0, window_min_timestamp) 

1102 values_window.insert(0, window_min_value) 

1103 if time_vector_window[-1] != window_max_timestamp: 

1104 time_vector_window.append(window_max_timestamp) 

1105 values_window.append(window_max_value) 

1106 else: 

1107 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1108 values_window = [window_min_value, window_max_value] 

1109 

1110 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

1111 # Resampling 

1112 time_vector_window, values_window = downsample_list( 

1113 time_vector_window, values_window, window_max_number_samples 

1114 ) 

1115 

1116 # Resampling outside 

1117 number_samples_before = len(time_vector_before) 

1118 number_samples_after = len(time_vector_after) 

1119 if ( 

1120 outside_max_number_samples is not None 

1121 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1122 ): 

1123 new_number_samples_before = min( 

1124 number_samples_before, 

1125 math.ceil( 

1126 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1127 ), 

1128 ) 

1129 new_number_samples_after = min( 

1130 number_samples_after, 

1131 math.ceil( 

1132 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1133 ), 

1134 ) 

1135 # Adjusting numbers as math.ceil can do +1 on sum 

1136 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1137 if new_number_samples_before > new_number_samples_after: 

1138 new_number_samples_before -= 1 

1139 else: 

1140 new_number_samples_after -= 1 

1141 

1142 if new_number_samples_before > 0: 

1143 time_vector_before, values_before = downsample_list( 

1144 time_vector_before, values_before, new_number_samples_before 

1145 ) 

1146 

1147 if new_number_samples_after > 0: 

1148 time_vector_after, values_after = downsample_list( 

1149 time_vector_after, values_after, new_number_samples_after 

1150 ) 

1151 

1152 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1153 values = values_before + values_window + values_after 

1154 forced_values = self.interpolate_forced_values(new_time_vector) 

1155 number_samples = len(values) 

1156 

1157 data_processing_time = time.time() - data_processing_time 

1158 

1159 return self.__class__( 

1160 signal_id=self.signal_id, 

1161 time_vector=new_time_vector, 

1162 values=values, 

1163 forced_values=forced_values, 

1164 number_samples=number_samples, 

1165 number_samples_db=self.number_samples, 

1166 data_start=self.data_start, 

1167 data_end=self.data_end, 

1168 db_query_time=self.db_query_time, 

1169 init_time=self.init_time, 

1170 data_processing_time=self.data_processing_time + data_processing_time, 

1171 ) 

1172 

1173 

1174class StringSignalData(SignalData): 

1175 data_type: str = "str" 

1176 values: list[str | None] 

1177 forced_values: list[str | None] 

1178 

1179 def interpolate(self, new_time_vector: list[float], items): 

1180 # Find the indices of the values in xp that are just smaller or equal to x 

1181 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1182 indices = npy.clip(indices, 0, len(items) - 1) 

1183 # Return the corresponding left string values from fp 

1184 return [items[i] for i in indices] 

1185 

1186 

1187class SignalsData(TwinPadModel): 

1188 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1189 data_processing_time: float 

1190 data_start: float | None 

1191 data_end: float | None 

1192 

1193 @classmethod 

1194 def get_from_signal_ids( 

1195 cls, 

1196 signal_ids: list[str], 

1197 min_timestamp: float = None, 

1198 max_timestamp: float = None, 

1199 window_min_timestamp: float = None, 

1200 window_max_timestamp: float = None, 

1201 interpolate_bounds: bool = True, 

1202 max_documents: int = None, 

1203 ) -> Self: 

1204 signals_data = [] 

1205 data_start = None 

1206 data_end = None 

1207 if max_timestamp is None: 

1208 max_timestamp = time.time() 

1209 data_processing_time = 0.0 

1210 for signal_id in signal_ids: 

1211 signal_data = SignalData.get_from_signal_id( 

1212 signal_id=signal_id, 

1213 min_timestamp=min_timestamp, 

1214 max_timestamp=max_timestamp, 

1215 window_min_timestamp=window_min_timestamp, 

1216 window_max_timestamp=window_max_timestamp, 

1217 interpolate_bounds=interpolate_bounds, 

1218 max_documents=max_documents, 

1219 ) 

1220 data_processing_time += signal_data.data_processing_time 

1221 signals_data.append(signal_data) 

1222 if signal_data.data_start is not None: 

1223 if data_start is None: 

1224 data_start = signal_data.data_start 

1225 else: 

1226 data_start = min(signal_data.data_start, data_start) 

1227 if signal_data.data_end is not None: 

1228 if data_end is None: 

1229 data_end = signal_data.data_end 

1230 else: 

1231 data_end = max(signal_data.data_end, data_end) 

1232 

1233 return cls( 

1234 signals_data=signals_data, 

1235 data_processing_time=data_processing_time, 

1236 data_start=data_start, 

1237 data_end=data_end, 

1238 ) 

1239 

1240 @classmethod 

1241 def get_from_phase_and_signal_ids( 

1242 cls, 

1243 phases: list, 

1244 phase_sync_times: list[float | None], 

1245 signal_ids: list[str], 

1246 window_min_timestamps: list[float | None], 

1247 window_max_timestamps: list[float | None], 

1248 zero_time_vector: bool = True, 

1249 ): 

1250 signals_data: list[SignalData] = [] 

1251 computation_start = time.time() 

1252 

1253 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip( 

1254 phases, phase_sync_times, window_min_timestamps, window_max_timestamps 

1255 ): 

1256 min_timestamp = phase.start_at / 1000 

1257 max_timestamp = phase.end_at / 1000 

1258 

1259 if sync_time is None: 

1260 sync_time = min_timestamp 

1261 

1262 if window_max_timestamp is not None and window_min_timestamp is not None: 

1263 window_length = window_max_timestamp - window_min_timestamp 

1264 

1265 if window_min_timestamp != min_timestamp: 

1266 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20) 

1267 if window_max_timestamp != max_timestamp: 

1268 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20) 

1269 

1270 for signal_id in signal_ids: 

1271 signal_data = SignalData.get_from_signal_id( 

1272 signal_id, 

1273 min_timestamp, 

1274 max_timestamp, 

1275 window_min_timestamp, 

1276 window_max_timestamp, 

1277 interpolate_bounds=False, 

1278 max_documents=None, 

1279 ) 

1280 

1281 if len(signal_data.time_vector) == 0: 

1282 continue 

1283 

1284 if zero_time_vector: 

1285 signal_data = signal_data.zero_time_vector(sync_time) 

1286 signal_data.phase_id = phase.id 

1287 

1288 signals_data.append(signal_data) 

1289 

1290 return cls( 

1291 signals_data=signals_data, 

1292 data_processing_time=time.time() - computation_start, 

1293 data_start=0, 

1294 data_end=0, 

1295 ) 

1296 

1297 def uniform_desampling(self, number_samples_max: int) -> Self: 

1298 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1299 return SignalsData( 

1300 signals_data=signals_data, 

1301 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1302 data_start=self.data_start, 

1303 data_end=self.data_end, 

1304 ) 

1305 

1306 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1307 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1308 return SignalsData( 

1309 signals_data=signals_data, 

1310 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1311 data_start=self.data_start, 

1312 data_end=self.data_end, 

1313 ) 

1314 

1315 def interest_window_desampling( 

1316 self, 

1317 window_max_number_samples: int, 

1318 outside_max_number_samples: int, 

1319 window_min_timestamp: float = None, 

1320 window_max_timestamp: float = None, 

1321 ) -> Self: 

1322 signals_data = [ 

1323 s.interest_window_desampling( 

1324 window_max_number_samples=window_max_number_samples, 

1325 outside_max_number_samples=outside_max_number_samples, 

1326 window_min_timestamp=window_min_timestamp, 

1327 window_max_timestamp=window_max_timestamp, 

1328 ) 

1329 for s in self.signals_data 

1330 ] 

1331 

1332 return SignalsData( 

1333 signals_data=signals_data, 

1334 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1335 data_start=self.data_start, 

1336 data_end=self.data_end, 

1337 ) 

1338 

1339 def zero_time_vector(self, data_start: float): 

1340 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data] 

1341 return SignalsData( 

1342 signals_data=signals_data, 

1343 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1344 data_start=0, 

1345 data_end=max([s.data_end for s in signals_data]), 

1346 ) 

1347 

1348 @classmethod 

1349 async def apply_single_function( 

1350 cls, 

1351 phase, 

1352 base_signal_id: str, 

1353 function: SINGLE_POST_PROCESSING_FUNCTION, 

1354 window_min_timestamp: float = None, 

1355 window_max_timestamp: float = None, 

1356 ): 

1357 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}" 

1358 

1359 processed_result_signal = Signal.get_from_signal_id(signal_id) 

1360 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids: 

1361 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp) 

1362 

1363 signals_data = cls.get_from_phase_and_signal_ids( 

1364 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False 

1365 ) 

1366 

1367 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0: 

1368 return None 

1369 

1370 new_values = None 

1371 new_forced_values = None 

1372 time_vector = npy.array(signals_data.signals_data[0].time_vector) 

1373 values = signals_data.signals_data[0].values 

1374 forced_values = signals_data.signals_data[0].forced_values 

1375 

1376 match (function): 

1377 case "Cumul": 

1378 new_values = cumul(values) 

1379 new_forced_values = cumul(forced_values) 

1380 # case "CumulDistrib": 

1381 # new_values = cumul_distrib(values) 

1382 # new_forced_values = cumul_distrib(forced_values) 

1383 case "Delta": 

1384 new_values = delta(values) 

1385 new_forced_values = delta(forced_values) 

1386 case "DeltaT": 

1387 new_values = delta(time_vector) 

1388 new_forced_values = new_values 

1389 case "Derive": 

1390 new_values = derive(time_vector, values) 

1391 new_forced_values = derive(time_vector, forced_values) 

1392 case "Integ": 

1393 new_values = integ(time_vector, values) 

1394 new_forced_values = integ(time_vector, forced_values) 

1395 

1396 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1397 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1398 

1399 loop = asyncio.get_running_loop() 

1400 loop.create_task( 

1401 cls.save_function_signal( 

1402 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible 

1403 ) 

1404 ) 

1405 

1406 if window_max_timestamp is not None: 

1407 max_timestamp_mask = time_vector <= window_max_timestamp 

1408 time_vector = time_vector[max_timestamp_mask] 

1409 new_values = new_values[max_timestamp_mask] 

1410 new_forced_values = new_forced_values[max_timestamp_mask] 

1411 if window_min_timestamp is not None: 

1412 min_timestamp_mask = time_vector >= window_min_timestamp 

1413 time_vector = time_vector[min_timestamp_mask] 

1414 new_values = new_values[min_timestamp_mask] 

1415 new_forced_values = new_forced_values[min_timestamp_mask] 

1416 

1417 signals_data.signals_data[0].time_vector = time_vector.tolist() 

1418 signals_data.signals_data[0].values = new_values.tolist() 

1419 signals_data.signals_data[0].forced_values = new_forced_values.tolist() 

1420 signals_data.signals_data[0].number_samples = time_vector.size 

1421 

1422 signals_data.signals_data[0].signal_id = signal_id 

1423 

1424 return signals_data 

1425 

1426 @classmethod 

1427 async def apply_multiple_function( 

1428 cls, 

1429 phases: list, 

1430 signal_ids: list, 

1431 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

1432 window_min_timestamp: float = None, 

1433 window_max_timestamp: float = None, 

1434 ): 

1435 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION): 

1436 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}" 

1437 else: 

1438 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}" 

1439 

1440 active_phase = phases[0] 

1441 if function in {"Align-X", "Using-X"}: 

1442 active_phase = phases[1] 

1443 

1444 processed_result_signal = Signal.get_from_signal_id(function_signal_id) 

1445 if processed_result_signal is not None and ( 

1446 active_phase.id in processed_result_signal.computed_phases_ids 

1447 ): # If signal has been computed for the correct phase 

1448 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp) 

1449 

1450 array_length = None 

1451 time_vector_list = [] 

1452 values_list = [] 

1453 forced_values_list = [] 

1454 forcible = True 

1455 for phase, signal_id in zip(phases, signal_ids): 

1456 signals_data = cls.get_from_phase_and_signal_ids( 

1457 [phase], [None], [signal_id], [None], [None], zero_time_vector=False 

1458 ) 

1459 

1460 if len(signals_data.signals_data) == 0: 

1461 return None 

1462 

1463 signal_data = signals_data.signals_data[0] 

1464 

1465 if array_length is None: 

1466 array_length = signal_data.number_samples 

1467 if ( 

1468 array_length != signal_data.number_samples and function != "Align-X" 

1469 ) or signal_data.number_samples == 0: 

1470 return None 

1471 

1472 time_vector_list.append(npy.array(signal_data.time_vector)) 

1473 values_list.append(npy.array(signal_data.values, dtype=npy.float64)) 

1474 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64)) 

1475 forcible = forcible and signal_data.forcible 

1476 

1477 time_vector = time_vector_list[0] 

1478 new_values = None 

1479 new_forced_values = None 

1480 

1481 match (function): 

1482 case "Align-X": 

1483 time_vector = time_vector_list[1] 

1484 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000 

1485 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000 

1486 new_values = align_x(old_time_vector, values_list[0], new_time_vector) 

1487 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector) 

1488 # case "Atan2": 

1489 # new_values = atan2(values_list[0], values_list[1]) 

1490 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1]) 

1491 case "Using-X": 

1492 if len(time_vector_list[0]) != len(time_vector_list[1]): 

1493 return None 

1494 time_vector = time_vector_list[1] 

1495 new_values = values_list[0] 

1496 new_forced_values = forced_values_list[0] 

1497 case "Mean": 

1498 new_values = mean(*values_list) 

1499 new_forced_values = mean(*forced_values_list) 

1500 case "Norm": 

1501 new_values = norm(*values_list) 

1502 new_forced_values = norm(*forced_values_list) 

1503 

1504 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1505 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1506 

1507 loop = asyncio.get_running_loop() 

1508 loop.create_task( 

1509 cls.save_function_signal( 

1510 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible 

1511 ) 

1512 ) 

1513 

1514 total_number_samples = time_vector.size 

1515 

1516 if window_max_timestamp is not None: 

1517 max_timestamp_mask = time_vector <= window_max_timestamp 

1518 time_vector = time_vector[max_timestamp_mask] 

1519 new_values = new_values[max_timestamp_mask] 

1520 new_forced_values = new_forced_values[max_timestamp_mask] 

1521 if window_min_timestamp is not None: 

1522 min_timestamp_mask = time_vector >= window_min_timestamp 

1523 time_vector = time_vector[min_timestamp_mask] 

1524 new_values = new_values[min_timestamp_mask] 

1525 new_forced_values = new_forced_values[min_timestamp_mask] 

1526 

1527 signals_data = cls( 

1528 signals_data=[ 

1529 NumericSignalData( 

1530 signal_id=function_signal_id, 

1531 forcible=forcible, 

1532 time_vector=time_vector.tolist(), 

1533 values=new_values.tolist(), 

1534 forced_values=new_forced_values.tolist(), 

1535 number_samples=time_vector.size, 

1536 number_samples_db=total_number_samples, 

1537 ) 

1538 ], 

1539 data_processing_time=0, 

1540 data_start=0, 

1541 data_end=0, 

1542 ) 

1543 

1544 return signals_data 

1545 

1546 @classmethod 

1547 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float): 

1548 signal_data_collection = get_signal_collection(signal_id, create=True) 

1549 pipeline = [] 

1550 match_filter = {} 

1551 if window_min_timestamp is not None or window_max_timestamp is not None: 

1552 match_filter["$match"] = {} 

1553 match_filter["$match"]["precise_timestamp"] = {} 

1554 if window_max_timestamp is not None: 

1555 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp 

1556 if window_min_timestamp is not None: 

1557 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp 

1558 

1559 total_number_samples = signal_data_collection.count_documents({}) 

1560 

1561 if match_filter: 

1562 pipeline.append(match_filter) 

1563 

1564 fetch_start = time.time() 

1565 

1566 samples = signal_data_collection.aggregate(pipeline).to_list() 

1567 new_time_vector = [] 

1568 new_values = [] 

1569 new_forced_values = [] 

1570 for sample in samples: 

1571 new_time_vector.append(sample["precise_timestamp"]) 

1572 new_values.append(sample["value"]) 

1573 new_forced_values.append(sample["forced_value"]) 

1574 

1575 return cls( 

1576 signals_data=[ 

1577 NumericSignalData( 

1578 signal_id=signal_id, 

1579 time_vector=new_time_vector, 

1580 values=new_values, 

1581 forced_values=new_forced_values, 

1582 number_samples=len(new_time_vector), 

1583 number_samples_db=total_number_samples, 

1584 ) 

1585 ], 

1586 data_processing_time=time.time() - fetch_start, 

1587 data_start=0, 

1588 data_end=0, 

1589 ) 

1590 

1591 @classmethod 

1592 async def save_function_signal( 

1593 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool 

1594 ): 

1595 # Insert data first so if it is requested by another user, it will be computed again 

1596 signal_collection = get_signal_collection(function_signal_id, create=True) 

1597 signal_collection.delete_many( 

1598 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}} 

1599 ) 

1600 signal_collection.insert_many( 

1601 [ 

1602 { 

1603 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]), 

1604 "precise_timestamp": time_vector[i], 

1605 "value": new_values[i], 

1606 "forced_value": new_forced_values[i], 

1607 } 

1608 for i in range(len(time_vector)) 

1609 ] 

1610 ) 

1611 

1612 signals_config_collection = get_collection(systems_database, "signals", create=True) 

1613 signals_config_collection.find_one_and_update( 

1614 {"signal_id": function_signal_id}, 

1615 { 

1616 "$set": { 

1617 "description": "", 

1618 "unit": None, 

1619 "type": "sensor", 

1620 "address": None, 

1621 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0 

1622 "transfer_function": None, 

1623 "precision_digits": None, 

1624 "digitization_function": None, 

1625 "data_type": "float", 

1626 "formula": None, 

1627 "forcible": forcible, 

1628 "commandable": False, 

1629 "broadcastable": False, 

1630 "signal_id": function_signal_id, 

1631 "post_processing": True, 

1632 }, 

1633 "$push": {"computed_phases_ids": phase.id}, 

1634 }, 

1635 upsert=True, 

1636 ) 

1637 

1638 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []): 

1639 if post_processing: 

1640 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1641 zip_buffer = io.BytesIO() 

1642 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1643 for signal_data in self.signals_data: 

1644 file_name = signal_data.signal_id 

1645 if post_processing: 

1646 phase = phases_by_id.get( 

1647 signal_data.phase_id, Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="0") 

1648 ) 

1649 file_name = f"{signal_data.signal_id} ({phase.name})" 

1650 if file_format == "csv": 

1651 export_io = signal_data.csv_export() 

1652 zip_file.writestr(f"{file_name}.csv", export_io) 

1653 elif file_format == "prestoplot": 

1654 export_io = signal_data.prestoplot_export() 

1655 zip_file.writestr(f"{file_name}.tab", export_io) 

1656 else: 

1657 raise ValueError(f"Format not found. Got: {file_format}") 

1658 zip_bytes = zip_buffer.getvalue() 

1659 return zip_bytes 

1660 

1661 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []): 

1662 if post_processing: 

1663 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1664 hdf5_buffer = io.BytesIO() 

1665 custom_type_float = npy.dtype( 

1666 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1667 ) 

1668 custom_type_string = npy.dtype( 

1669 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1670 ) 

1671 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1672 for signal_data in self.signals_data: 

1673 if post_processing: 

1674 phase = phases_by_id.get( 

1675 signal_data.phase_id, Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="0") 

1676 ) 

1677 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})") 

1678 else: 

1679 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1680 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1681 if signal_data.data_type == "str": 

1682 export_data = npy.array( 

1683 list( 

1684 zip( 

1685 date_vector, 

1686 signal_data.time_vector, 

1687 signal_data.values, 

1688 signal_data.forced_values, 

1689 ) 

1690 ), 

1691 dtype=custom_type_string, 

1692 ) 

1693 else: 

1694 export_data = npy.array( 

1695 list( 

1696 zip( 

1697 date_vector, 

1698 signal_data.time_vector, 

1699 signal_data.values, 

1700 signal_data.forced_values, 

1701 ) 

1702 ), 

1703 dtype=custom_type_float, 

1704 ) 

1705 signal_group["data"] = export_data 

1706 return hdf5_buffer.getvalue() 

1707 

1708 

1709class SignalStatus(TwinPadModel): 

1710 status: str = "down" 

1711 reason: str = "" 

1712 delay: float | None = None 

1713 

1714 

1715class DigitizationFunction(TwinPadModel): 

1716 bits: int | None = None 

1717 min_value: float 

1718 max_value: float 

1719 min_raw_value: float 

1720 max_raw_value: float 

1721 

1722 

1723class SignalUpdate(TwinPadModel): 

1724 value: float | str | bool | int | None = None 

1725 forced_value: float | str | bool | int | None = None 

1726 timestamp: int | None = None 

1727 

1728 

1729class SignalType(str, Enum): 

1730 command = "command" 

1731 sensor = "sensor" 

1732 external_sensor = "external_sensor" 

1733 

1734 

1735SIGNALDATA_TYPES = { 

1736 "int": NumericSignalData, 

1737 "float": NumericSignalData, 

1738 "str": StringSignalData, 

1739 "bool": NumericSignalData, 

1740 "epoch": NumericSignalData, 

1741} 

1742 

1743 

1744class Signal(GenericMongo): 

1745 collection_name: ClassVar[str] = "signals" 

1746 

1747 signal_id: str 

1748 frequency: float 

1749 unit: str | None 

1750 description: str 

1751 type: SignalType 

1752 data_type: str 

1753 precision_digits: int | None 

1754 forcible: bool 

1755 status: SignalStatus = SignalStatus() 

1756 

1757 post_processing: bool = False 

1758 computed_phases_ids: list[str] = [] 

1759 

1760 digitization_function: DigitizationFunction | None 

1761 

1762 @property 

1763 def device(self) -> Device: 

1764 device_id = self.signal_id.split(".")[0] 

1765 device = Device.get_one_by_attribute("device_id", device_id) 

1766 return device 

1767 

1768 @cached_property 

1769 def signal_data_class(self): 

1770 if self.data_type in SIGNALDATA_TYPES: 

1771 return SIGNALDATA_TYPES[self.data_type] 

1772 if self.data_type.startswith("enum"): 

1773 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1774 raise ValueError(f"Unhandled python type: {self.data_type}") 

1775 

1776 @cached_property 

1777 def python_type(self): 

1778 if self.data_type in TYPES: 

1779 return TYPES[self.data_type] 

1780 if self.data_type.startswith("enum"): 

1781 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1782 return Literal[*choices] 

1783 raise ValueError(f"Unhandled python type: {self.data_type}") 

1784 

1785 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1786 command = Command( 

1787 sent_at=time.time(), 

1788 command_type="Signal command", 

1789 user_id=current_user.id, 

1790 ) 

1791 

1792 has_input_error = False 

1793 error_message = "" 

1794 

1795 if self.data_type.startswith("enum"): 

1796 enum_options = get_args(self.python_type) 

1797 

1798 if update_dict.value is not None and update_dict.value not in enum_options: 

1799 has_input_error = True 

1800 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1801 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1802 has_input_error = True 

1803 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1804 else: 

1805 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1806 has_input_error = True 

1807 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1808 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1809 has_input_error = True 

1810 error_message += ( 

1811 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1812 ) 

1813 

1814 if has_input_error: 

1815 command.response_time = 0 

1816 command.succeeded = False 

1817 command.description = f"Tried to modify signal {self.signal_id}" 

1818 response = {"error": True, "status_code": 400, "message": error_message} 

1819 else: 

1820 response = await send_signal_value(self.signal_id, update_dict) 

1821 command.receive_response(response) 

1822 

1823 Command.create(command) 

1824 return response 

1825 

1826 @classmethod 

1827 def get_from_signal_id(cls, signal_id) -> Self: 

1828 """Could be generic from mongo""" 

1829 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1830 if not raw_value: 

1831 return None 

1832 del raw_value["_id"] 

1833 return cls.dict_to_object(raw_value) 

1834 

1835 @classmethod 

1836 def get_all_ids(cls) -> list[str]: 

1837 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1838 

1839 return [signal["signal_id"] for signal in cursor] 

1840 

1841 @classmethod 

1842 def get_all_statuses(cls) -> list[dict]: 

1843 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

1844 

1845 return [ 

1846 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1847 for signal in cursor 

1848 ] 

1849 

1850 async def number_samples(self): 

1851 collection = get_signal_collection(signal_id=self.signal_id) 

1852 if collection is None: 

1853 return 0 

1854 

1855 number_samples = collection.estimated_document_count() 

1856 

1857 number_samples_async_collection = await get_async_collection( 

1858 systems_async_database, "number_samples", create=True, time_series=True 

1859 ) 

1860 

1861 loop = asyncio.get_running_loop() 

1862 loop.create_task( 

1863 number_samples_async_collection.insert_one( 

1864 { 

1865 "timestamp": datetime.datetime.now(pytz.UTC), 

1866 "signal_id": self.signal_id, 

1867 "number_samples": number_samples, 

1868 } 

1869 ) 

1870 ) 

1871 

1872 return number_samples 

1873 

1874 @classmethod 

1875 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]: 

1876 number_samples_by_id = {} 

1877 collections = get_signal_collections_batch(signal_ids) 

1878 number_samples_async_collection = await get_async_collection( 

1879 systems_async_database, "number_samples", create=True, time_series=True 

1880 ) 

1881 

1882 for signal_id, collection in zip(signal_ids, collections): 

1883 if collection is None: 

1884 number_samples_by_id[signal_id] = 0 

1885 continue 

1886 

1887 number_samples = collection.estimated_document_count() 

1888 

1889 number_samples_by_id[signal_id] = number_samples 

1890 

1891 now = datetime.datetime.now(pytz.UTC) 

1892 loop = asyncio.get_running_loop() 

1893 loop.create_task( 

1894 number_samples_async_collection.insert_many( 

1895 [ 

1896 { 

1897 "timestamp": now, 

1898 "signal_id": signal_id, 

1899 "number_samples": number_samples, 

1900 } 

1901 for signal_id, number_samples in number_samples_by_id.items() 

1902 ] 

1903 ) 

1904 ) 

1905 

1906 return number_samples_by_id 

1907 

1908 def sample_datasize(self): 

1909 return signals_database.command("collstats", self.signal_id)["size"] 

1910 

1911 @classmethod 

1912 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1913 result = cls.collection().aggregate( 

1914 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1915 ) 

1916 

1917 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1918 

1919 

1920class ForcedSignal(GenericMongo): 

1921 collection_name: ClassVar[str] = "forced_signals" 

1922 

1923 signal_id: str 

1924 forcing_user_id: str 

1925 forced_at: float 

1926 value: str | float 

1927 

1928 def insert(self): 

1929 insert_result = self.collection().find_one_and_update( 

1930 {"signal_id": self.signal_id}, 

1931 {"$set": self.to_dict(exclude={"id"})}, 

1932 upsert=True, 

1933 return_document=ReturnDocument.AFTER, 

1934 ) 

1935 self.id = str(insert_result["_id"]) 

1936 return self.id 

1937 

1938 @classmethod 

1939 def can_force(cls, signal_id: str, current_user: User) -> bool: 

1940 """Checks whether user can force a given signal. 

1941 

1942 :param signal_id: Signal ID of the signal to force 

1943 :type signal_id: str 

1944 :param current_user: Current user 

1945 :type current_user: User 

1946 :return: False if the signal was forced by someone else than the user, True otherwise 

1947 :rtype: bool 

1948 """ 

1949 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

1950 if forced_signal is not None: 

1951 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

1952 return False 

1953 return True 

1954 

1955 

1956class ServicesStatus(TwinPadModel): 

1957 backend: str 

1958 cloud_broker: str 

1959 time_series_database: str 

1960 signal_storage: str 

1961 heartbeat_storage: str 

1962 data_analyzer: str 

1963 

1964 @classmethod 

1965 def check(cls) -> Self: 

1966 return cls( 

1967 cloud_broker=ping(RABBITMQ_HOST), 

1968 backend="up", 

1969 time_series_database=ping(MONGO_HOST), 

1970 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1971 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1972 data_analyzer=ping(DATA_ANALYZER_HOST), 

1973 ) 

1974 

1975 

1976def ping(host): 

1977 try: 

1978 if ping3.ping(host, timeout=0.8): 

1979 return "up" 

1980 except PermissionError: 

1981 pass 

1982 return "down" 

1983 

1984 

1985class Event(GenericMongo): 

1986 collection_name: ClassVar[str] = "events" 

1987 

1988 name: str 

1989 timestamp: float 

1990 event_rule_id: str 

1991 

1992 @computed_field 

1993 @cached_property 

1994 def event_rule(self) -> "EventRule": 

1995 return EventRule.get_from_id(self.event_rule_id) 

1996 

1997 @classmethod 

1998 def dict_to_object(cls, dict_): 

1999 """Refine to convert timestamp to datetime for mongodb.""" 

2000 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

2001 return super().dict_to_object(dict_) 

2002 

2003 

2004class TwinPadActivity(GenericMongo): 

2005 timestamp: float 

2006 amount: int 

2007 

2008 @classmethod 

2009 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

2010 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2011 number_events_collection = get_collection(systems_database, "number_events") 

2012 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

2013 items = [] 

2014 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2015 if number_events_collection is None or recompute_amount: 

2016 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

2017 number_events_collection.delete_many({}) 

2018 first_event = events_collection.find_one(sort={"timestamp": 1}) 

2019 if first_event is None: 

2020 return items 

2021 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

2022 tzinfo=pytz.UTC 

2023 ) 

2024 while last_computed_day < TODAY: 

2025 day_nb_events = events_collection.count_documents( 

2026 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2027 ) 

2028 if day_nb_events > 0: 

2029 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

2030 last_computed_day += ONE_DAY_OFFSET 

2031 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2032 if number_events_today > 0: 

2033 number_events_collection.delete_many({"timestamp": TODAY}) 

2034 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

2035 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2036 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2037 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2038 for day in number_events: 

2039 day["timestamp"] = day["timestamp"].timestamp() 

2040 items.append(cls.mongo_dict_to_object(day)) 

2041 return items 

2042 

2043 @classmethod 

2044 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2045 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2046 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2047 signals_number_samples_collection = get_collection( 

2048 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

2049 ) 

2050 items = [] 

2051 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2052 if number_samples_collection is None or recompute_amount: 

2053 number_samples_collection = get_collection( 

2054 systems_database, "number_received_samples", create=True, time_series=True 

2055 ) 

2056 number_samples_collection.delete_many({}) 

2057 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

2058 if first_sample is None: 

2059 return items 

2060 # compute from day of first found event 

2061 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

2062 tzinfo=pytz.UTC 

2063 ) 

2064 while last_computed_day < TODAY: 

2065 number_samples_request = signals_number_samples_collection.aggregate( 

2066 [ 

2067 { 

2068 "$match": { 

2069 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

2070 } 

2071 }, 

2072 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2073 ] 

2074 ).to_list() 

2075 if len(number_samples_request) == 0: 

2076 number_samples = 0 

2077 else: 

2078 number_samples = number_samples_request[0].get("number_samples", 0) 

2079 if number_samples > 0: 

2080 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

2081 last_computed_day += ONE_DAY_OFFSET 

2082 number_samples_request = signals_number_samples_collection.aggregate( 

2083 [ 

2084 {"$match": {"timestamp": {"$gte": TODAY}}}, 

2085 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2086 ] 

2087 ).to_list() 

2088 if len(number_samples_request) == 0: 

2089 number_samples_today = 0 

2090 else: 

2091 number_samples_today = number_samples_request[0].get("number_samples", 0) 

2092 if number_samples_today > 0: 

2093 number_samples_collection.delete_many({"timestamp": TODAY}) 

2094 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

2095 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2096 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2097 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2098 for day in number_events: 

2099 day["timestamp"] = day["timestamp"].timestamp() 

2100 items.append(cls.mongo_dict_to_object(day)) 

2101 return items 

2102 

2103 @classmethod 

2104 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2105 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2106 number_commands_collection = get_collection(systems_database, "number_commands") 

2107 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

2108 items = [] 

2109 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2110 if number_commands_collection is None or recompute_amount: 

2111 number_commands_collection = get_collection( 

2112 systems_database, "number_commands", create=True, time_series=True 

2113 ) 

2114 number_commands_collection.delete_many({}) 

2115 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

2116 if first_command is None: 

2117 return items 

2118 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

2119 tzinfo=pytz.UTC 

2120 ) 

2121 while last_computed_day < TODAY: 

2122 day_nb_commands = commands_collection.count_documents( 

2123 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2124 ) 

2125 if day_nb_commands > 0: 

2126 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

2127 last_computed_day += ONE_DAY_OFFSET 

2128 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2129 if number_commands_today > 0: 

2130 number_commands_collection.delete_many({"timestamp": TODAY}) 

2131 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

2132 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2133 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2134 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2135 for day in number_commands: 

2136 day["timestamp"] = day["timestamp"].timestamp() 

2137 items.append(cls.mongo_dict_to_object(day)) 

2138 return items 

2139 

2140 

2141class EventRule(GenericMongo): 

2142 collection_name: ClassVar[str] = "event_rules" 

2143 

2144 name: str 

2145 formula: str 

2146 variables: list[str] 

2147 

2148 @computed_field 

2149 @cached_property 

2150 def number_events(self) -> int: 

2151 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

2152 

2153 

2154class Company(GenericMongo): 

2155 collection_name: ClassVar[str] = "companies" 

2156 name: str 

2157 

2158 

2159class Campaign(GenericMongo): 

2160 collection_name: ClassVar[str] = "campaigns" 

2161 

2162 # Properties 

2163 id: str | None = None 

2164 name: str 

2165 description: str | None = None 

2166 

2167 @classmethod 

2168 def create(cls, campaign: Self): 

2169 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

2170 if new_campaign is None: 

2171 return None 

2172 return {"campaign_id": str(new_campaign.inserted_id)} 

2173 

2174 @classmethod 

2175 def update(cls, campaign: Self): 

2176 updated_campaign = cls.collection().find_one_and_update( 

2177 {"_id": ObjectId(campaign.id)}, 

2178 {"$set": {"name": campaign.name, "description": campaign.description}}, 

2179 return_document=ReturnDocument.AFTER, 

2180 ) 

2181 return updated_campaign 

2182 

2183 @classmethod 

2184 def delete(cls, campaign_id): 

2185 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

2186 return deleted_user 

2187 

2188 

2189class Phase(GenericMongo): 

2190 collection_name: ClassVar[str] = "phases" 

2191 

2192 # Properties 

2193 id: str | None = None 

2194 name: str 

2195 description: str | None = None 

2196 start_at: float 

2197 end_at: float 

2198 

2199 # FK 

2200 campaign_id: str 

2201 

2202 # @classmethod 

2203 # def get_by_date(cls, datetime: float): 

2204 # phases = [] 

2205 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

2206 # phases.append(cls.dict_to_object(dict_).model_dump()) 

2207 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

2208 # phases.append(cls.dict_to_object(dict_).model_dump()) 

2209 # if phases is None: 

2210 # return None 

2211 # return phases 

2212 

2213 @classmethod 

2214 def create(cls, phase: Self): 

2215 phase = Phase( 

2216 name=phase.name, 

2217 description=phase.description, 

2218 start_at=phase.start_at, 

2219 end_at=phase.end_at, 

2220 campaign_id=phase.campaign_id, 

2221 ) 

2222 phase_collection = get_collection(systems_database, "phases", create=True) 

2223 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

2224 if new_phase is None: 

2225 return None 

2226 return {"phase_id": str(new_phase.inserted_id)} 

2227 

2228 @classmethod 

2229 def update(cls, phase: Self): 

2230 updated_phase = cls.collection().find_one_and_update( 

2231 {"_id": ObjectId(phase.id)}, 

2232 { 

2233 "$set": { 

2234 "name": phase.name, 

2235 "description": phase.description, 

2236 "start_at": phase.start_at, 

2237 "end_at": phase.end_at, 

2238 } 

2239 }, 

2240 return_document=ReturnDocument.AFTER, 

2241 ) 

2242 return updated_phase 

2243 

2244 @classmethod 

2245 def delete(cls, phase_id): 

2246 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

2247 return delete_phase 

2248 

2249 @classmethod 

2250 def deleteMany(cls, campaign_id): 

2251 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

2252 return delete_phases 

2253 

2254 

2255class CustomViewCreation(GenericMongo): 

2256 collection_name: ClassVar[str] = "custom_views" 

2257 

2258 name: str 

2259 configuration: list 

2260 

2261 

2262class CustomView(CustomViewCreation): 

2263 # Properties 

2264 id: str | None = None 

2265 

2266 # Foreign Key 

2267 user_id: str 

2268 

2269 # # Methods 

2270 # @classmethod 

2271 # def create(cls, form_custom_view: Self, user_id) -> list: 

2272 # custom_view = CustomView( 

2273 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

2274 # ) 

2275 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

2276 # return new_custom_view 

2277 

2278 # @classmethod 

2279 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

2280 # updated_custom_view = cls.collection().find_one_and_update( 

2281 # {"_id": ObjectId(custom_view_id)}, 

2282 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

2283 # return_document=ReturnDocument.AFTER, 

2284 # ) 

2285 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

2286 # del updated_custom_view["_id"] 

2287 # return cls(**updated_custom_view) 

2288 

2289 # @classmethod 

2290 # def delete(cls, custom_view_id) -> bool: 

2291 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

2292 # return deleted_custom_view.acknowledged 

2293 

2294 

2295CustomViewUpdate = create_update_model(CustomView) 

2296 

2297 

2298class Video(GenericMongo): 

2299 collection_name: ClassVar[str] = "videos" 

2300 

2301 # Properties 

2302 name: str 

2303 ip_addr: str 

2304 username: str | None = None 

2305 password: str | None = None 

2306 

2307 # Methods 

2308 @classmethod 

2309 def get_all(cls, sort_by="_id") -> list[Self]: 

2310 items = [] 

2311 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

2312 items.append(cls.mongo_dict_to_object(dict_)) 

2313 return items 

2314 

2315 @classmethod 

2316 def get_video(cls, camera_id: ObjectId): 

2317 camera = cls.get_from_id(camera_id) 

2318 if camera is not None: 

2319 return camera.name 

2320 return None 

2321 

2322 

2323class Command(GenericMongo): 

2324 collection_name: ClassVar[str] = "commands" 

2325 

2326 # Properties 

2327 timestamp: datetime.datetime = None 

2328 sent_at: float 

2329 response_time: float = 0.0 

2330 command_type: str 

2331 description: str = "" 

2332 succeeded: bool = False 

2333 

2334 # Foreign key 

2335 user_id: str 

2336 

2337 @classmethod 

2338 def collection(cls): 

2339 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

2340 

2341 @classmethod 

2342 def create(cls, command: Self): 

2343 command = cls( 

2344 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

2345 sent_at=command.sent_at, 

2346 response_time=command.response_time, 

2347 command_type=command.command_type, 

2348 description=command.description, 

2349 succeeded=command.succeeded, 

2350 user_id=command.user_id, 

2351 ) 

2352 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2353 if new_command is None: 

2354 return None 

2355 return {"command_id": str(new_command.inserted_id)} 

2356 

2357 def receive_response(self, response: dict): 

2358 self.response_time = time.time() - self.sent_at 

2359 self.succeeded = response.get("error", True) is False 

2360 if self.description == "": 

2361 self.description += response.get("message", "").rstrip() 

2362 

2363 

2364class SignalsPresetCreation(GenericMongo): 

2365 name: str 

2366 signal_ids: list[str] 

2367 

2368 

2369class SignalsPreset(SignalsPresetCreation): 

2370 collection_name: ClassVar[str] = "signals_presets" 

2371 

2372 user_id: str 

2373 

2374 @classmethod 

2375 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2376 signals_preset = cls( 

2377 user_id=user_id, 

2378 name=signals_preset.name, 

2379 signal_ids=signals_preset.signal_ids, 

2380 ) 

2381 

2382 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2383 

2384 return str(new_signal_preset.inserted_id) 

2385 

2386 

2387SignalsPresetUpdate = create_update_model(SignalsPreset) 

2388 

2389 

2390class LineStyle(str, Enum): 

2391 solid = "solid" 

2392 dotted = "dotted" 

2393 dashed = "dashed" 

2394 

2395 

2396class SignalAppearance: 

2397 value_color: str 

2398 forced_value_color: str 

2399 

2400 

2401class GraphThemeCreation(GenericMongo): 

2402 collection_name: ClassVar[str] = "graph_themes" 

2403 

2404 name: str 

2405 signal_id: str 

2406 value_color: str = "" 

2407 forced_value_color: str = "" 

2408 value_line_style: LineStyle = LineStyle.solid 

2409 forced_value_line_style: LineStyle = LineStyle.solid 

2410 private: bool = True 

2411 

2412 

2413class PublicGraphTheme(GraphThemeCreation): 

2414 created_by_user: bool 

2415 in_user_library: bool 

2416 active_for_user: bool 

2417 

2418 _current_user_id: str = "" 

2419 

2420 @classproperty 

2421 def custom_pipeline_steps(cls) -> dict[str, list]: 

2422 return { 

2423 "created_by_user": [ 

2424 { 

2425 "$addFields": { 

2426 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2427 } 

2428 } 

2429 ], 

2430 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2431 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2432 ], 

2433 "in_user_library": [ 

2434 { 

2435 "$addFields": { 

2436 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2437 } 

2438 } 

2439 ], 

2440 "active_for_user": [ 

2441 { 

2442 "$addFields": { 

2443 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2444 } 

2445 } 

2446 ], 

2447 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2448 "active": [ 

2449 { 

2450 "$addFields": { 

2451 "active": "$$REMOVE", 

2452 } 

2453 } 

2454 ], 

2455 "creator_id": [ 

2456 { 

2457 "$addFields": { 

2458 "creator_id": "$$REMOVE", 

2459 } 

2460 } 

2461 ], 

2462 } 

2463 

2464 @classmethod 

2465 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2466 cls._current_user_id = user_id 

2467 return super().response_from_query(query) 

2468 

2469 @classmethod 

2470 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2471 query.in_user_library = "true" 

2472 return cls.response_from_query(query, user_id) 

2473 

2474 @classmethod 

2475 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2476 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2477 

2478 @classmethod 

2479 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2480 cls._current_user_id = user_id 

2481 return super().get_by_attribute(attribute_name, attribute_value) 

2482 

2483 @classmethod 

2484 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2485 cls._current_user_id = user_id 

2486 return super().get_one_by_attribute(attribute_name, attribute_value) 

2487 

2488 @classmethod 

2489 def get_all(cls, sort_by: str, user_id: str): 

2490 cls._current_user_id = user_id 

2491 return super().get_all(sort_by) 

2492 

2493 @classmethod 

2494 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2495 pipeline = [ 

2496 { 

2497 "$match": { 

2498 "active": {"$eq": user_id}, 

2499 "signal_id": {"$in": signal_ids}, 

2500 } 

2501 }, 

2502 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2503 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2504 { 

2505 "$project": { 

2506 "_id": 0, 

2507 "signal_id": 1, 

2508 "value_color": 1, 

2509 "forced_value_color": 1, 

2510 "value_line_style": 1, 

2511 "forced_value_line_style": 1, 

2512 } 

2513 }, 

2514 ] 

2515 

2516 result = {} 

2517 

2518 cursor = cls.collection().aggregate(pipeline) 

2519 for document in cursor: 

2520 signal_id = document["signal_id"] 

2521 del document["signal_id"] 

2522 result[signal_id] = document 

2523 

2524 return result 

2525 

2526 

2527GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2528 

2529 

2530class PrivateGraphTheme(GraphThemeCreation): 

2531 # private 

2532 creator_id: str 

2533 in_library: list[str] 

2534 active: list[str] 

2535 

2536 @classmethod 

2537 def create( 

2538 cls, 

2539 creator_id: str, 

2540 name: str, 

2541 signal_id: str, 

2542 value_color: str, 

2543 forced_value_color: str, 

2544 value_line_style: LineStyle, 

2545 forced_value_line_style: LineStyle, 

2546 private: bool, 

2547 ): 

2548 color_setting = cls( 

2549 creator_id=creator_id, 

2550 name=name, 

2551 signal_id=signal_id, 

2552 value_color=value_color, 

2553 forced_value_color=forced_value_color, 

2554 value_line_style=value_line_style, 

2555 forced_value_line_style=forced_value_line_style, 

2556 private=private, 

2557 in_library=[creator_id], 

2558 active=[creator_id], 

2559 ) 

2560 

2561 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2562 color_setting.id = str(new_color_setting.inserted_id) 

2563 return color_setting 

2564 

2565 def update(self, update_dict: dict, user_id: str): 

2566 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2567 if in_user_lib and user_id not in self.in_library: 

2568 self.in_library.append(user_id) 

2569 elif not in_user_lib and user_id in self.in_library: 

2570 self.in_library.remove(user_id) 

2571 update_dict["in_library"] = self.in_library 

2572 del update_dict["in_user_library"] 

2573 

2574 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2575 if active_for_user and user_id not in self.active: 

2576 self.active.append(user_id) 

2577 elif not active_for_user and user_id in self.active: 

2578 self.active.remove(user_id) 

2579 update_dict["active"] = self.active 

2580 del update_dict["active_for_user"] 

2581 

2582 if update_dict.get("created_by_user") is not None: 

2583 del update_dict["created_by_user"] 

2584 

2585 self.collection().find_one_and_update( 

2586 {"_id": ObjectId(self.id)}, 

2587 {"$set": update_dict}, 

2588 ) 

2589 

2590 return {}