Coverage for  / usr / local / lib / python3.11 / site-packages / twinpad_backend / models.py: 97%

1425 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-21 08:45 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23import h5py 

24 

25from twinpad_backend.db import ( 

26 get_collection, 

27 get_async_collection, 

28 get_signal_collection, 

29 get_signal_collections_batch, 

30 systems_database, 

31 systems_async_database, 

32 signals_database, 

33 signals_async_database, 

34 devices_states_database, 

35) 

36from twinpad_backend.responses import ListResponse 

37from twinpad_backend.messages import send_mode_change, send_signal_value 

38from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm 

39 

40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

41SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"] 

42DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"] 

43MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"] 

44 

45 

46RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

47MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

48SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

49HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

50DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

51 

52DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

53NUMBER_SAMPLES_DATABASE_UPDATE = 120 

54 

55logger = logging.getLogger("uvicorn.error") 

56 

57 

58class classproperty: 

59 """ 

60 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

61 Found here: https://stackoverflow.com/a/76301341 

62 """ 

63 

64 def __init__(self, func): 

65 self.fget = func 

66 

67 def __get__(self, _, owner): 

68 return self.fget(owner) 

69 

70 

71def create_update_model(model): 

72 fields = {} 

73 

74 for field_name, field_annotation in model.model_fields.items(): 

75 if field_name != "id": 

76 fields[field_name] = (field_annotation.annotation | None, None) 

77 

78 query_name = model.__name__ + "Update" 

79 return create_model(query_name, **fields) 

80 

81 

82def get_utc_date_from_timestamp(timestamp: float): 

83 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

84 

85 

86def downsample_list(time_vector: list, values: list, max_number_samples: int): 

87 if len(time_vector) < max_number_samples: 

88 return time_vector, values 

89 

90 time_vector_copy = copy.deepcopy(time_vector) 

91 values_copy = copy.deepcopy(values) 

92 

93 none_group_bounds = [] 

94 none_group_index = -1 

95 index = -1 

96 # LTTB doesn't handle None values so remove them 

97 while values_copy.count(None) > 0: 

98 # Store bounds of None value groups so we can insert them back after the downsampling 

99 if (new_index := values_copy.index(None)) != index: 

100 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

101 none_group_index += 1 

102 elif len(none_group_bounds[none_group_index]) < 2: 

103 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

104 else: 

105 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

106 values_copy.pop(new_index) 

107 index = new_index 

108 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

109 

110 try: 

111 values_array = npy.array([time_vector_copy, values_copy]).T 

112 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

113 

114 new_time_vector = interpolated_values[:, 0].tolist() 

115 new_values = interpolated_values[:, 1].tolist() 

116 except ValueError: 

117 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

118 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

119 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

120 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

121 return new_time_vector, new_values_nan_to_none 

122 

123 # insert back None values at the correct timestamps 

124 for none_group in none_group_bounds: 

125 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

126 new_time_vector[start_index:start_index] = none_group 

127 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

128 

129 return new_time_vector, new_values 

130 

131 

132def is_of_type(value, wanted_type): 

133 if wanted_type is float: 

134 return isinstance(value, (int, float)) 

135 return isinstance(value, wanted_type) 

136 

137 

138# Models 

139class TwinPadModel(BaseModel): 

140 @classmethod 

141 def dict_to_object(cls, dict_): 

142 return cls.model_validate(dict_) 

143 

144 def to_dict(self, exclude=None): 

145 dict_ = self.model_dump(exclude=exclude) 

146 return dict_ 

147 

148 

149class GenericMongo(TwinPadModel): 

150 id: str | None = None 

151 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

152 

153 @classmethod 

154 def collection(cls): 

155 return get_collection(systems_database, cls.collection_name, create=True) 

156 

157 @classmethod 

158 def response_from_query(cls, query) -> ListResponse[Self]: 

159 request_filters = query.mongodb_filter() 

160 items = [] 

161 

162 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

163 sort_dict = {} 

164 for sort in query.sort_by.split(","): 

165 if ":" in sort: 

166 sort_field, sort_order = sort.split(":") 

167 sort_order = int(sort_order) 

168 else: 

169 sort_field = sort 

170 sort_order = 1 

171 sort_dict[sort_field] = sort_order 

172 

173 collection = get_collection(systems_database, cls.collection_name, create=True) 

174 total = collection.count_documents(request_filters) 

175 

176 pipeline = [] 

177 added_properties = [] 

178 if "$and" in request_filters: 

179 for request_filter in request_filters["$and"]: 

180 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

181 if filtered_property in request_filter: 

182 pipeline.extend(pipeline_steps) 

183 added_properties.append(filtered_property) 

184 else: 

185 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

186 if filtered_property in request_filters: 

187 pipeline.extend(pipeline_steps) 

188 added_properties.append(filtered_property) 

189 pipeline.append({"$match": request_filters}) 

190 

191 for sort_field in sort_dict.keys(): 

192 if sort_field in cls.custom_pipeline_steps: 

193 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

194 added_properties.append(sort_field) 

195 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

196 

197 if (query.limit is not None) and (query.limit != 0): 

198 pipeline.append({"$limit": query.limit}) 

199 

200 for filtered_property, step in cls.custom_pipeline_steps.items(): 

201 if filtered_property not in added_properties: 

202 pipeline.extend(step) 

203 

204 cursor = collection.aggregate(pipeline) 

205 

206 for item_dict in cursor: 

207 items.append(cls.mongo_dict_to_object(item_dict)) 

208 

209 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

210 

211 @classmethod 

212 def get_from_id(cls, item_id) -> Self | None: 

213 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

214 

215 @classmethod 

216 def mongo_dict_to_object(cls, mongo_dict): 

217 mongo_dict["id"] = str(mongo_dict["_id"]) 

218 del mongo_dict["_id"] 

219 return cls.dict_to_object(mongo_dict) 

220 

221 @classmethod 

222 def get_by_attribute(cls, attribute_name: str, attribute_value): 

223 """Returns all items that match the attribute with value.""" 

224 pipeline = [] 

225 if attribute_name in cls.custom_pipeline_steps: 

226 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

227 pipeline.append({"$match": {attribute_name: attribute_value}}) 

228 for key, step in cls.custom_pipeline_steps.items(): 

229 if key != attribute_name: 

230 pipeline.extend(step) 

231 items = cls.collection().aggregate(pipeline) 

232 if items is None: 

233 return None 

234 return [cls.mongo_dict_to_object(d) for d in items] 

235 

236 @classmethod 

237 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

238 pipeline = [] 

239 if attribute_name in cls.custom_pipeline_steps: 

240 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

241 pipeline.append({"$match": {attribute_name: attribute_value}}) 

242 pipeline.append({"$limit": 1}) 

243 for key, step in cls.custom_pipeline_steps.items(): 

244 if key != attribute_name: 

245 pipeline.extend(step) 

246 items = cls.collection().aggregate(pipeline).to_list() 

247 if len(items) == 0: 

248 return None 

249 return cls.mongo_dict_to_object(items[0]) 

250 

251 @classmethod 

252 def get_all(cls, sort_by="_id") -> list[Self]: 

253 items = [] 

254 pipeline = [] 

255 if sort_by in cls.custom_pipeline_steps: 

256 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

257 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

258 for key, step in cls.custom_pipeline_steps.items(): 

259 if key != sort_by: 

260 pipeline.extend(step) 

261 for dict_ in cls.collection().aggregate(pipeline): 

262 items.append(cls.mongo_dict_to_object(dict_)) 

263 return items 

264 

265 @classmethod 

266 def get_number_documents(cls): 

267 collection = get_collection(systems_database, cls.collection_name) 

268 if collection is None: 

269 return 0 

270 return collection.count_documents( 

271 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]} 

272 ) 

273 

274 def insert(self): 

275 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

276 self.id = str(insert_result.inserted_id) 

277 return self.id 

278 

279 def update(self, update_dict): 

280 for key, value in update_dict.items(): 

281 setattr(self, key, value) 

282 self.collection().find_one_and_update( 

283 {"_id": ObjectId(self.id)}, 

284 {"$set": update_dict}, 

285 return_document=ReturnDocument.AFTER, 

286 ) 

287 

288 return self 

289 

290 def delete(self): 

291 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

292 return result.deleted_count > 0 

293 

294 

295class User(GenericMongo): 

296 collection_name: ClassVar[str] = "users" 

297 

298 firstname: str 

299 lastname: str 

300 email: str 

301 password: str 

302 is_active: bool | None = False 

303 is_admin: bool | None = False 

304 is_connected: bool | None = False 

305 company_id: str | None = None 

306 

307 def to_dict(self, exclude=None): 

308 if exclude is None: 

309 exclude = {"password"} 

310 return GenericMongo.to_dict(self, exclude=exclude) 

311 

312 @classmethod 

313 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

314 users = cls.get_all() 

315 if not users: 

316 is_admin = True 

317 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

318 user_collection = get_collection(systems_database, "users", create=True) 

319 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

320 if new_user is None: 

321 return None 

322 return {"user_id": str(new_user.inserted_id)} 

323 

324 @classmethod 

325 def update_info(cls, user: "UserUpdate", user_id: str): 

326 updated_user = cls.collection().find_one_and_update( 

327 {"_id": ObjectId(user_id)}, 

328 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

329 return_document=ReturnDocument.AFTER, 

330 ) 

331 updated_user["id"] = str(updated_user["_id"]) 

332 del (updated_user["_id"], updated_user["is_connected"]) 

333 return cls(**updated_user) 

334 

335 

336UserUpdate = create_update_model(User) 

337 

338 

339class Mode(TwinPadModel): 

340 mode_id: int 

341 name: str 

342 frequency_multiplier: float 

343 min_frequency: float 

344 

345 

346class DeviceUpdate(TwinPadModel): 

347 mode_id: int 

348 

349 

350class Device(GenericMongo): 

351 collection_name: ClassVar[str] = "devices" 

352 

353 device_id: str 

354 name: str 

355 description: str = "" 

356 modes: list[Mode] 

357 current_mode_id: int | None = None 

358 last_ping: float | None = None 

359 petri_network: Any 

360 pid: Any 

361 load: float | None = None 

362 tokens: list[int] = Field(default_factory=list) 

363 status: str 

364 

365 async def change_mode(self, update_dict, current_user: User): 

366 has_error = False 

367 

368 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

369 has_error = True 

370 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

371 elif self.current_mode_id is not None: 

372 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

373 else: 

374 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

375 command = Command( 

376 sent_at=time.time(), 

377 command_type="Mode change", 

378 description=description, 

379 user_id=current_user.id, 

380 ) 

381 

382 if has_error: 

383 command.response_time = 0 

384 command.succeeded = False 

385 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

386 else: 

387 response = await send_mode_change(self.device_id, update_dict.mode_id) 

388 command.receive_response(response) 

389 

390 Command.create(command) 

391 return response 

392 

393 @classmethod 

394 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

395 devices_by_id = {} 

396 for signal_id in signal_ids: 

397 device_id = signal_id.split(".")[0] 

398 if device_id not in devices_by_id: 

399 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

400 return devices_by_id 

401 

402 

403class DeviceSetup(GenericMongo): 

404 collection_name: ClassVar[str] = "device_setups" 

405 

406 device_ids: list[str] 

407 active: bool = False 

408 variable_mapping: dict[str, str] 

409 

410 

411DeviceSetupUpdate = create_update_model(DeviceSetup) 

412 

413 

414class DeviceState(GenericMongo): 

415 collection_name: ClassVar[str] = "devices_states" 

416 

417 timestamp: float 

418 mode: str | None = None 

419 load: float | None = None 

420 tokens: list[int] = Field(default_factory=list) 

421 modified_properties: list[str] = Field(default_factory=list) 

422 

423 @classmethod 

424 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

425 req_filter = query.mongodb_filter() 

426 items = [] 

427 if ":" in query.sort_by: 

428 sort_field, sort_order = query.sort_by.split(":") 

429 sort_order = int(sort_order) 

430 else: 

431 sort_field = query.sort_by 

432 sort_order = 1 

433 collection = get_collection(devices_states_database, device_id) 

434 if collection is None: 

435 total = 0 

436 cursor = [] 

437 else: 

438 total = collection.count_documents(req_filter) 

439 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

440 if (query.limit is not None) and (query.limit != 0): 

441 cursor = cursor.limit(query.limit) 

442 for item_dict in cursor: 

443 items.append( 

444 cls( 

445 timestamp=item_dict.get("precise_timestamp"), 

446 mode=item_dict.get("mode", None), 

447 load=item_dict.get("load", None), 

448 tokens=item_dict.get("tokens", Field(default_factory=list)), 

449 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

450 ) 

451 ) 

452 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

453 

454 

455class SignalSample(TwinPadModel): 

456 signal_id: str 

457 timestamp: float 

458 value: float | int | str | bool | None 

459 forced_value: float | int | str | bool | None = None 

460 

461 @classmethod 

462 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

463 

464 collection = get_signal_collection(signal_id) 

465 if collection is None: 

466 return None 

467 

468 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

469 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

470 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

471 first_bucket = None 

472 if bucket is not None: 

473 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

474 if first_bucket is not None: 

475 sample_data = collection.find_one( 

476 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

477 ) 

478 else: 

479 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

480 

481 if sample_data is None: 

482 return None 

483 

484 timestamp = sample_data["precise_timestamp"] 

485 

486 return cls( 

487 signal_id=signal_id, 

488 timestamp=timestamp, 

489 value=sample_data.get("value", None), 

490 forced_value=sample_data.get("forced_value", None), 

491 ) 

492 

493 @classmethod 

494 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

495 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

496 

497 @classmethod 

498 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

499 

500 collection = get_signal_collection(signal_id) 

501 if collection is None: 

502 return None 

503 

504 # Same workaround as above function, very effective to narrow down big sets of data 

505 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

506 last_bucket = None 

507 if bucket is not None: 

508 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

509 if last_bucket is not None: 

510 sample_data = collection.find_one( 

511 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

512 sort={"precise_timestamp": -1}, 

513 ) 

514 else: 

515 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

516 

517 if sample_data is None: 

518 return None 

519 

520 timestamp = sample_data["precise_timestamp"] 

521 

522 if device is None: 

523 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

524 if device is not None and device.last_ping is not None: 

525 if timestamp is None: 

526 timestamp = device.last_ping 

527 else: 

528 timestamp = max(timestamp, device.last_ping) 

529 return cls( 

530 signal_id=signal_id, 

531 timestamp=timestamp, 

532 value=sample_data.get("value", None), 

533 forced_value=sample_data.get("forced_value", None), 

534 ) 

535 

536 @classmethod 

537 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

538 collection = get_signal_collection(signal_id) 

539 if collection is None: 

540 return None 

541 

542 sample_data = collection.find_one( 

543 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1} 

544 ) 

545 if sample_data is None: 

546 return None 

547 

548 return cls( 

549 signal_id=signal_id, 

550 timestamp=sample_data.get("precise_timestamp"), 

551 value=sample_data.get("value"), 

552 forced_value=sample_data.get("forced_value"), 

553 ) 

554 

555 @classmethod 

556 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

557 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

558 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

559 

560 @classmethod 

561 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

562 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

563 

564 

565class SignalData(TwinPadModel): 

566 signal_id: str 

567 forcible: bool = True 

568 time_vector: list[float] 

569 values: list[float | int | str | None] 

570 forced_values: list[float | int | str | None] 

571 

572 data_start: float | None = None 

573 data_end: float | None = None 

574 

575 number_samples: int = 0 

576 number_samples_db: int = 0 

577 

578 db_query_time: float = 0.0 

579 init_time: float = 0.0 

580 data_processing_time: float = 0.0 

581 

582 phase_id: str | None = None 

583 

584 @classmethod 

585 def get_from_signal_id( 

586 cls, 

587 signal_id: str, 

588 min_timestamp: float = None, 

589 max_timestamp: float = None, 

590 window_min_timestamp: float = None, 

591 window_max_timestamp: float = None, 

592 interpolate_bounds: bool = True, 

593 max_documents: int = None, 

594 ) -> Self: 

595 

596 now = time.time() 

597 

598 req_signal = {} 

599 if min_timestamp is not None: 

600 req_signal.setdefault("timestamp", {}) 

601 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

602 if max_timestamp is not None: 

603 req_signal.setdefault("timestamp", {}) 

604 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

605 

606 collection = get_signal_collection(signal_id) 

607 if collection is None: 

608 return cls( 

609 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

610 ) 

611 

612 db_req_start = time.time() 

613 

614 sort_step = {"$sort": {"precise_timestamp": 1}} 

615 number_results = collection.count_documents(req_signal) 

616 

617 pipeline = [] 

618 if req_signal: 

619 pipeline.append({"$match": req_signal}) # Filter data if needed 

620 

621 pipeline.extend( 

622 [ 

623 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

624 sort_step, 

625 ] 

626 ) 

627 

628 if max_documents is not None and max_documents < number_results: 

629 unsampling_ratio = math.ceil(number_results / max_documents) 

630 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

631 pipeline.extend( 

632 [ 

633 { 

634 "$setWindowFields": { 

635 "sortBy": {"precise_timestamp": 1}, 

636 "output": {"index": {"$documentNumber": {}}}, 

637 } 

638 }, 

639 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

640 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

641 {"$replaceRoot": {"newRoot": "$doc"}}, 

642 {"$unset": ["index", "group_id"]}, 

643 {"$sort": {"precise_timestamp": 1}}, 

644 ] 

645 ) 

646 

647 # logger.info(f"pipeline: %s", str(pipeline)) 

648 cursor = collection.aggregate(pipeline) 

649 db_req_time = time.time() - db_req_start 

650 

651 init_time = time.time() 

652 

653 results = cursor.to_list() 

654 time_vector = [] 

655 values = [] 

656 forced_values = [] 

657 for s in results: 

658 time_vector.append(s["precise_timestamp"]) 

659 values.append(s.get("value", None)) 

660 forced_values.append(s.get("forced_value", None)) 

661 

662 signal = Signal.get_from_signal_id(signal_id) 

663 if signal is None: 

664 return cls( 

665 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

666 ) 

667 class_ = signal.signal_data_class 

668 

669 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

670 time_vector, values, forced_values = cls.interpolate_bounds( 

671 class_, 

672 collection, 

673 signal_id, 

674 time_vector, 

675 values, 

676 forced_values, 

677 window_min_timestamp, 

678 window_max_timestamp, 

679 ) 

680 

681 if values: 

682 # TODO: check below. a bit strange 

683 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

684 # Adding last value as it should be repeated 

685 time_vector.append(now) 

686 values.append(values[-1]) 

687 forced_values.append(forced_values[-1]) 

688 

689 init_time = time.time() - init_time 

690 

691 # See line 292 for explanation 

692 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

693 first_bucket = None 

694 if bucket is not None: 

695 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

696 if first_bucket is not None: 

697 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

698 else: 

699 data_start = None 

700 

701 last_bucket = None 

702 if bucket is not None: 

703 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

704 if last_bucket is not None: 

705 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

706 else: 

707 data_end = None 

708 

709 return class_( 

710 signal_id=signal_id, 

711 forcible=signal.forcible, 

712 time_vector=time_vector, 

713 values=values, 

714 forced_values=forced_values, 

715 data_start=data_start, 

716 data_end=data_end, 

717 number_samples=len(values), 

718 number_samples_db=number_results, 

719 db_query_time=db_req_time, 

720 init_time=init_time, 

721 ) 

722 

723 @staticmethod 

724 def interpolate_bounds( 

725 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

726 ): 

727 sample_right = None 

728 # Fetching right side value & interpolation 

729 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

730 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

731 sample_right = collection.find_one( 

732 { 

733 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

734 "value": {"$exists": True}, 

735 }, 

736 sort={"precise_timestamp": -1}, 

737 ) 

738 if sample_right: 

739 if time_vector: 

740 right_sd = class_( 

741 signal_id=signal_id, 

742 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

743 values=[values[-1], sample_right.get("value", None)], 

744 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

745 ) 

746 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

747 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

748 else: 

749 max_ts_value = sample_right.get("value", None) 

750 max_ts_forced_value = sample_right.get("forced_value", None) 

751 time_vector.append(window_max_timestamp) 

752 values.append(max_ts_value) 

753 forced_values.append(max_ts_forced_value) 

754 

755 # Fetching left side value & interpolation 

756 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

757 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

758 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

759 sample_left = sample_right 

760 sample_left = collection.find_one( 

761 { 

762 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

763 "value": {"$exists": True}, 

764 }, 

765 sort={"precise_timestamp": -1}, 

766 ) 

767 

768 if sample_left: 

769 if time_vector: 

770 left_sd = class_( 

771 signal_id=signal_id, 

772 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

773 values=[sample_left["value"], values[0]], 

774 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

775 ) 

776 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

777 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

778 else: 

779 min_ts_value = sample_left.get("value", None) 

780 min_ts_forced_value = sample_left.get("forced_value", None) 

781 time_vector.insert(0, window_min_timestamp) 

782 values.insert(0, min_ts_value) 

783 forced_values.insert(0, min_ts_forced_value) 

784 

785 return time_vector, values, forced_values 

786 

787 def interpolate_values(self, new_time_vector: list[float]): 

788 return self.interpolate(new_time_vector, self.values) 

789 

790 def interpolate_forced_values(self, new_time_vector: list[float]): 

791 return self.interpolate(new_time_vector, self.forced_values) 

792 

793 def uniform_desampling(self, number_samples_max: int) -> Self: 

794 data_processing_time = time.time() 

795 if number_samples_max and self.number_samples > number_samples_max: 

796 new_time_vector = npy.linspace( 

797 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

798 ).tolist() 

799 values = self.interpolate_values(new_time_vector) 

800 forced_values = self.interpolate_forced_values(new_time_vector) 

801 time_vector = new_time_vector 

802 number_samples = len(time_vector) 

803 else: 

804 time_vector = self.time_vector 

805 number_samples = len(self.values) 

806 values = self.values[:] 

807 forced_values = self.forced_values[:] 

808 data_processing_time = time.time() - data_processing_time 

809 

810 return self.__class__( 

811 signal_id=self.signal_id, 

812 time_vector=time_vector, 

813 values=values, 

814 forced_values=forced_values, 

815 number_samples=number_samples, 

816 number_samples_db=self.number_samples, 

817 data_start=self.data_start, 

818 data_end=self.data_end, 

819 db_query_time=self.db_query_time, 

820 init_time=self.init_time, 

821 data_processing_time=self.data_processing_time + data_processing_time, 

822 phase_id=self.phase_id, 

823 ) 

824 

825 def min_max_downsampling(self, number_samples_max: int) -> Self: 

826 return self.uniform_desampling(number_samples_max) 

827 

828 def interest_window_desampling( 

829 self, 

830 window_max_number_samples: int, 

831 outside_max_number_samples: int, 

832 window_min_timestamp: float | None = None, 

833 window_max_timestamp: float | None = None, 

834 ) -> Self: 

835 """Performs a sampling in a window of interest and outside.""" 

836 

837 if not self.time_vector: 

838 return self 

839 

840 if window_min_timestamp is None: 

841 window_min_timestamp = self.time_vector[0] 

842 if window_max_timestamp is None: 

843 window_max_timestamp = self.time_vector[-1] 

844 

845 data_processing_time = time.time() 

846 

847 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

848 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

849 

850 time_vector_before = self.time_vector[:index_window_start] 

851 time_vector_window = self.time_vector[index_window_start:index_window_end] 

852 time_vector_after = self.time_vector[index_window_end:] 

853 

854 # Resampling window 

855 if time_vector_window: 

856 # Ensurring window bounds 

857 if time_vector_window[0] != window_min_timestamp: 

858 time_vector_window.insert(0, window_min_timestamp) 

859 if time_vector_window[-1] != window_max_timestamp: 

860 time_vector_window.append(window_max_timestamp) 

861 else: 

862 time_vector_window = [window_min_timestamp, window_max_timestamp] 

863 

864 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

865 # Resampling 

866 new_window_time_vector = npy.linspace( 

867 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

868 ).tolist() 

869 time_vector_window = new_window_time_vector 

870 

871 # Resampling outside 

872 number_samples_before = len(time_vector_before) 

873 number_samples_after = len(time_vector_after) 

874 if ( 

875 outside_max_number_samples is not None 

876 and (number_samples_before + number_samples_after) > outside_max_number_samples 

877 ): 

878 new_number_samples_before = min( 

879 number_samples_before, 

880 math.ceil( 

881 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

882 ), 

883 ) 

884 new_number_samples_after = min( 

885 number_samples_after, 

886 math.ceil( 

887 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

888 ), 

889 ) 

890 # Adjusting numbers as math.ceil can do +1 on sum 

891 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

892 if new_number_samples_before > new_number_samples_after: 

893 new_number_samples_before -= 1 

894 else: 

895 new_number_samples_after -= 1 

896 

897 if new_number_samples_before > 0: 

898 new_time_vector_before = npy.linspace( 

899 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

900 ).tolist() 

901 time_vector_before = new_time_vector_before 

902 

903 if new_number_samples_after > 0: 

904 new_time_vector_after = npy.linspace( 

905 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

906 ).tolist()[::-1] 

907 time_vector_after = new_time_vector_after 

908 

909 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

910 values = self.interpolate_values(new_time_vector) 

911 forced_values = self.interpolate_forced_values(new_time_vector) 

912 number_samples = len(values) 

913 

914 data_processing_time = time.time() - data_processing_time 

915 

916 return self.__class__( 

917 signal_id=self.signal_id, 

918 forcible=self.forcible, 

919 time_vector=new_time_vector, 

920 values=values, 

921 forced_values=forced_values, 

922 number_samples=number_samples, 

923 number_samples_db=self.number_samples, 

924 data_start=self.data_start, 

925 data_end=self.data_end, 

926 db_query_time=self.db_query_time, 

927 init_time=self.init_time, 

928 data_processing_time=self.data_processing_time + data_processing_time, 

929 ) 

930 

931 def zero_time_vector(self, data_start: float): 

932 data_processing_time = time.time() 

933 if len(self.time_vector) == 0: 

934 return self 

935 time_vector = npy.array(self.time_vector) - data_start 

936 data_processing_time = time.time() - data_processing_time 

937 

938 return self.__class__( 

939 signal_id=self.signal_id, 

940 time_vector=time_vector, 

941 values=self.values, 

942 forced_values=self.forced_values, 

943 number_samples=self.number_samples, 

944 number_samples_db=self.number_samples_db, 

945 data_start=time_vector[0], 

946 data_end=time_vector[-1], 

947 db_query_time=self.db_query_time, 

948 init_time=self.init_time, 

949 data_processing_time=self.data_processing_time + data_processing_time, 

950 ) 

951 

952 def csv_export(self): 

953 output = io.StringIO() 

954 writer = csv.writer(output) 

955 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

956 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

957 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

958 return output.getvalue().encode("utf-8") 

959 

960 def prestoplot_export(self): 

961 clean_signal_id = self.signal_id.replace(".", "_") 

962 if clean_signal_id[0].isnumeric(): 

963 clean_signal_id = "_" + clean_signal_id 

964 

965 output = io.StringIO() 

966 output.write("# Encoding:\tUTF-8\n") 

967 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

968 output.write("ISO8601\tnone\tnone\n") 

969 output.write(f"# Description :\t{clean_signal_id}\n") 

970 

971 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

972 output.write( 

973 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

974 ) 

975 return output.getvalue().encode("utf-8") 

976 

977 

978class NumericSignalData(SignalData): 

979 data_type: str = "float" 

980 values: list[float | int | None] 

981 forced_values: list[float | int | None] 

982 

983 def interpolate(self, new_time_vector: list[float], items): 

984 items = [npy.nan if s is None else s for s in items] 

985 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

986 

987 def uniform_desampling(self, number_samples_max: int) -> Self: 

988 data_processing_time = time.time() 

989 if number_samples_max and self.number_samples > number_samples_max: 

990 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

991 forced_values = self.interpolate_forced_values(time_vector) 

992 number_samples = len(time_vector) 

993 else: 

994 time_vector = self.time_vector 

995 number_samples = len(self.values) 

996 values = self.values[:] 

997 forced_values = self.forced_values[:] 

998 data_processing_time = time.time() - data_processing_time 

999 

1000 return self.__class__( 

1001 signal_id=self.signal_id, 

1002 time_vector=time_vector, 

1003 values=values, 

1004 forced_values=forced_values, 

1005 number_samples=number_samples, 

1006 number_samples_db=self.number_samples, 

1007 data_start=self.data_start, 

1008 data_end=self.data_end, 

1009 db_query_time=self.db_query_time, 

1010 init_time=self.init_time, 

1011 data_processing_time=self.data_processing_time + data_processing_time, 

1012 ) 

1013 

1014 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1015 if self.number_samples < number_samples_max: 

1016 return self 

1017 

1018 data_processing_time = time.time() 

1019 

1020 number_bins = number_samples_max // 2 

1021 

1022 time_vector = npy.array(self.time_vector, dtype=npy.float64) 

1023 values = npy.array(self.values, dtype=npy.float64) 

1024 forced_values = npy.array(self.forced_values, dtype=npy.float64) 

1025 

1026 points_per_bin = self.number_samples // number_bins 

1027 

1028 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above 

1029 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for 

1030 if self.number_samples - number_bins * points_per_bin > 1: 

1031 points_per_bin += 1 

1032 number_bins = self.number_samples // points_per_bin + 1 

1033 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan) 

1034 time_vector = npy.concatenate([time_vector, nan_points_to_add]) 

1035 values = npy.concatenate([values, nan_points_to_add]) 

1036 forced_values = npy.concatenate([forced_values, nan_points_to_add]) 

1037 

1038 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin) 

1039 values_matrix = values.reshape(number_bins, points_per_bin) 

1040 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin) 

1041 

1042 indexes_min = npy.zeros(number_bins, dtype="int64") 

1043 indexes_max = npy.zeros(number_bins, dtype="int64") 

1044 

1045 for row in range(number_bins): 

1046 min_value = values_matrix[row, 0] 

1047 max_value = values_matrix[row, 0] 

1048 for column in range(points_per_bin): 

1049 if values_matrix[row, column] < min_value: 

1050 min_value = values_matrix[row, column] 

1051 indexes_min[row] = column 

1052 elif values_matrix[row, column] > max_value: 

1053 max_value = values_matrix[row, column] 

1054 indexes_max[row] = column 

1055 

1056 row_index = npy.repeat(npy.arange(number_bins), 2) 

1057 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel() 

1058 

1059 data_processing_time = time.time() - data_processing_time 

1060 

1061 new_time_vector = timestamps_matrix[row_index, column_index] 

1062 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector) 

1063 new_values = values_matrix[row_index, column_index] 

1064 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1065 new_forced_values = forced_values_matrix[row_index, column_index] 

1066 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1067 

1068 # Make sure there are no None values for the time vector 

1069 time_vector_filter = new_time_vector != None 

1070 new_time_vector = new_time_vector[time_vector_filter] 

1071 new_values = new_values[time_vector_filter] 

1072 new_forced_values = new_forced_values[time_vector_filter] 

1073 

1074 return self.__class__( 

1075 signal_id=self.signal_id, 

1076 time_vector=new_time_vector, 

1077 values=new_values, 

1078 forced_values=new_forced_values, 

1079 number_samples=number_bins * 2, 

1080 number_samples_db=self.number_samples_db, 

1081 data_start=self.data_start, 

1082 data_end=self.data_end, 

1083 db_query_time=self.db_query_time, 

1084 init_time=self.init_time, 

1085 data_processing_time=self.data_processing_time + data_processing_time, 

1086 phase_id=self.phase_id, 

1087 ) 

1088 

1089 def interest_window_desampling( 

1090 self, 

1091 window_max_number_samples: int, 

1092 outside_max_number_samples: int, 

1093 window_min_timestamp: float | None = None, 

1094 window_max_timestamp: float | None = None, 

1095 ) -> Self: 

1096 """Performs a sampling in a window of interest and outside.""" 

1097 

1098 if not self.time_vector: 

1099 return self 

1100 

1101 if window_min_timestamp is None: 

1102 window_min_timestamp = self.time_vector[0] 

1103 if window_max_timestamp is None: 

1104 window_max_timestamp = self.time_vector[-1] 

1105 

1106 data_processing_time = time.time() 

1107 

1108 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1109 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1110 

1111 time_vector_before = self.time_vector[:index_window_start] 

1112 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1113 time_vector_after = self.time_vector[index_window_end:] 

1114 

1115 values_before = self.values[:index_window_start] 

1116 values_window = self.values[index_window_start:index_window_end] 

1117 values_after = self.values[index_window_end:] 

1118 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1119 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1120 

1121 # Resampling window 

1122 if time_vector_window: 

1123 # Ensurring window bounds 

1124 if time_vector_window[0] != window_min_timestamp: 

1125 time_vector_window.insert(0, window_min_timestamp) 

1126 values_window.insert(0, window_min_value) 

1127 if time_vector_window[-1] != window_max_timestamp: 

1128 time_vector_window.append(window_max_timestamp) 

1129 values_window.append(window_max_value) 

1130 else: 

1131 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1132 values_window = [window_min_value, window_max_value] 

1133 

1134 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples: 

1135 # Resampling 

1136 time_vector_window, values_window = downsample_list( 

1137 time_vector_window, values_window, window_max_number_samples 

1138 ) 

1139 

1140 # Resampling outside 

1141 number_samples_before = len(time_vector_before) 

1142 number_samples_after = len(time_vector_after) 

1143 if ( 

1144 outside_max_number_samples is not None 

1145 and (number_samples_before + number_samples_after) > outside_max_number_samples 

1146 ): 

1147 new_number_samples_before = min( 

1148 number_samples_before, 

1149 math.ceil( 

1150 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1151 ), 

1152 ) 

1153 new_number_samples_after = min( 

1154 number_samples_after, 

1155 math.ceil( 

1156 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1157 ), 

1158 ) 

1159 # Adjusting numbers as math.ceil can do +1 on sum 

1160 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1161 if new_number_samples_before > new_number_samples_after: 

1162 new_number_samples_before -= 1 

1163 else: 

1164 new_number_samples_after -= 1 

1165 

1166 if new_number_samples_before > 0: 

1167 time_vector_before, values_before = downsample_list( 

1168 time_vector_before, values_before, new_number_samples_before 

1169 ) 

1170 

1171 if new_number_samples_after > 0: 

1172 time_vector_after, values_after = downsample_list( 

1173 time_vector_after, values_after, new_number_samples_after 

1174 ) 

1175 

1176 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1177 values = values_before + values_window + values_after 

1178 forced_values = self.interpolate_forced_values(new_time_vector) 

1179 number_samples = len(values) 

1180 

1181 data_processing_time = time.time() - data_processing_time 

1182 

1183 return self.__class__( 

1184 signal_id=self.signal_id, 

1185 time_vector=new_time_vector, 

1186 values=values, 

1187 forced_values=forced_values, 

1188 number_samples=number_samples, 

1189 number_samples_db=self.number_samples, 

1190 data_start=self.data_start, 

1191 data_end=self.data_end, 

1192 db_query_time=self.db_query_time, 

1193 init_time=self.init_time, 

1194 data_processing_time=self.data_processing_time + data_processing_time, 

1195 ) 

1196 

1197 

1198class StringSignalData(SignalData): 

1199 data_type: str = "str" 

1200 values: list[str | None] 

1201 forced_values: list[str | None] 

1202 

1203 def interpolate(self, new_time_vector: list[float], items): 

1204 # Find the indices of the values in xp that are just smaller or equal to x 

1205 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1206 indices = npy.clip(indices, 0, len(items) - 1) 

1207 # Return the corresponding left string values from fp 

1208 return [items[i] for i in indices] 

1209 

1210 

1211class SignalsData(TwinPadModel): 

1212 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1213 data_processing_time: float 

1214 data_start: float | None 

1215 data_end: float | None 

1216 

1217 @classmethod 

1218 def get_from_signal_ids( 

1219 cls, 

1220 signal_ids: list[str], 

1221 min_timestamp: float = None, 

1222 max_timestamp: float = None, 

1223 window_min_timestamp: float = None, 

1224 window_max_timestamp: float = None, 

1225 interpolate_bounds: bool = True, 

1226 max_documents: int = None, 

1227 ) -> Self: 

1228 signals_data = [] 

1229 data_start = None 

1230 data_end = None 

1231 if max_timestamp is None: 

1232 max_timestamp = time.time() 

1233 data_processing_time = 0.0 

1234 for signal_id in signal_ids: 

1235 signal_data = SignalData.get_from_signal_id( 

1236 signal_id=signal_id, 

1237 min_timestamp=min_timestamp, 

1238 max_timestamp=max_timestamp, 

1239 window_min_timestamp=window_min_timestamp, 

1240 window_max_timestamp=window_max_timestamp, 

1241 interpolate_bounds=interpolate_bounds, 

1242 max_documents=max_documents, 

1243 ) 

1244 data_processing_time += signal_data.data_processing_time 

1245 signals_data.append(signal_data) 

1246 if signal_data.data_start is not None: 

1247 if data_start is None: 

1248 data_start = signal_data.data_start 

1249 else: 

1250 data_start = min(signal_data.data_start, data_start) 

1251 if signal_data.data_end is not None: 

1252 if data_end is None: 

1253 data_end = signal_data.data_end 

1254 else: 

1255 data_end = max(signal_data.data_end, data_end) 

1256 

1257 return cls( 

1258 signals_data=signals_data, 

1259 data_processing_time=data_processing_time, 

1260 data_start=data_start, 

1261 data_end=data_end, 

1262 ) 

1263 

1264 @classmethod 

1265 def get_from_phase_and_signal_ids( 

1266 cls, 

1267 phases: list, 

1268 phase_sync_times: list[float | None], 

1269 signal_ids: list[str], 

1270 window_min_timestamps: list[float | None], 

1271 window_max_timestamps: list[float | None], 

1272 zero_time_vector: bool = True, 

1273 ): 

1274 signals_data: list[SignalData] = [] 

1275 computation_start = time.time() 

1276 

1277 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip( 

1278 phases, phase_sync_times, window_min_timestamps, window_max_timestamps 

1279 ): 

1280 min_timestamp = phase.start_at / 1000 

1281 max_timestamp = phase.end_at / 1000 

1282 

1283 if sync_time is None: 

1284 sync_time = min_timestamp 

1285 

1286 if window_max_timestamp is not None and window_min_timestamp is not None: 

1287 window_length = window_max_timestamp - window_min_timestamp 

1288 

1289 if window_min_timestamp != min_timestamp: 

1290 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20) 

1291 if window_max_timestamp != max_timestamp: 

1292 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20) 

1293 

1294 for signal_id in signal_ids: 

1295 signal_data = SignalData.get_from_signal_id( 

1296 signal_id, 

1297 min_timestamp, 

1298 max_timestamp, 

1299 window_min_timestamp, 

1300 window_max_timestamp, 

1301 interpolate_bounds=False, 

1302 max_documents=None, 

1303 ) 

1304 

1305 if len(signal_data.time_vector) == 0: 

1306 continue 

1307 

1308 if zero_time_vector: 

1309 signal_data = signal_data.zero_time_vector(sync_time) 

1310 signal_data.phase_id = phase.id 

1311 

1312 signals_data.append(signal_data) 

1313 

1314 return cls( 

1315 signals_data=signals_data, 

1316 data_processing_time=time.time() - computation_start, 

1317 data_start=0, 

1318 data_end=0, 

1319 ) 

1320 

1321 def uniform_desampling(self, number_samples_max: int) -> Self: 

1322 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1323 return SignalsData( 

1324 signals_data=signals_data, 

1325 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1326 data_start=self.data_start, 

1327 data_end=self.data_end, 

1328 ) 

1329 

1330 def min_max_downsampling(self, number_samples_max: int) -> Self: 

1331 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1332 return SignalsData( 

1333 signals_data=signals_data, 

1334 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1335 data_start=self.data_start, 

1336 data_end=self.data_end, 

1337 ) 

1338 

1339 def interest_window_desampling( 

1340 self, 

1341 window_max_number_samples: int, 

1342 outside_max_number_samples: int, 

1343 window_min_timestamp: float = None, 

1344 window_max_timestamp: float = None, 

1345 ) -> Self: 

1346 signals_data = [ 

1347 s.interest_window_desampling( 

1348 window_max_number_samples=window_max_number_samples, 

1349 outside_max_number_samples=outside_max_number_samples, 

1350 window_min_timestamp=window_min_timestamp, 

1351 window_max_timestamp=window_max_timestamp, 

1352 ) 

1353 for s in self.signals_data 

1354 ] 

1355 

1356 return SignalsData( 

1357 signals_data=signals_data, 

1358 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1359 data_start=self.data_start, 

1360 data_end=self.data_end, 

1361 ) 

1362 

1363 def zero_time_vector(self, data_start: float): 

1364 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data] 

1365 return SignalsData( 

1366 signals_data=signals_data, 

1367 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1368 data_start=0, 

1369 data_end=max([s.data_end for s in signals_data]), 

1370 ) 

1371 

1372 @classmethod 

1373 async def apply_single_function( 

1374 cls, 

1375 phase, 

1376 base_signal_id: str, 

1377 function: SINGLE_POST_PROCESSING_FUNCTION, 

1378 window_min_timestamp: float = None, 

1379 window_max_timestamp: float = None, 

1380 ): 

1381 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}" 

1382 

1383 processed_result_signal = Signal.get_from_signal_id(signal_id) 

1384 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids: 

1385 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp) 

1386 

1387 signals_data = cls.get_from_phase_and_signal_ids( 

1388 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False 

1389 ) 

1390 

1391 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0: 

1392 return None 

1393 

1394 new_values = None 

1395 new_forced_values = None 

1396 time_vector = npy.array(signals_data.signals_data[0].time_vector) 

1397 values = signals_data.signals_data[0].values 

1398 forced_values = signals_data.signals_data[0].forced_values 

1399 

1400 match (function): 

1401 case "Cumul": 

1402 new_values = cumul(values) 

1403 new_forced_values = cumul(forced_values) 

1404 # case "CumulDistrib": 

1405 # new_values = cumul_distrib(values) 

1406 # new_forced_values = cumul_distrib(forced_values) 

1407 case "Delta": 

1408 new_values = delta(values) 

1409 new_forced_values = delta(forced_values) 

1410 case "DeltaT": 

1411 new_values = delta(time_vector) 

1412 new_forced_values = new_values 

1413 case "Derive": 

1414 new_values = derive(time_vector, values) 

1415 new_forced_values = derive(time_vector, forced_values) 

1416 case "Integ": 

1417 new_values = integ(time_vector, values) 

1418 new_forced_values = integ(time_vector, forced_values) 

1419 

1420 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1421 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1422 

1423 loop = asyncio.get_running_loop() 

1424 loop.create_task( 

1425 cls.save_function_signal( 

1426 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible 

1427 ) 

1428 ) 

1429 

1430 if window_max_timestamp is not None: 

1431 max_timestamp_mask = time_vector <= window_max_timestamp 

1432 time_vector = time_vector[max_timestamp_mask] 

1433 new_values = new_values[max_timestamp_mask] 

1434 new_forced_values = new_forced_values[max_timestamp_mask] 

1435 if window_min_timestamp is not None: 

1436 min_timestamp_mask = time_vector >= window_min_timestamp 

1437 time_vector = time_vector[min_timestamp_mask] 

1438 new_values = new_values[min_timestamp_mask] 

1439 new_forced_values = new_forced_values[min_timestamp_mask] 

1440 

1441 signals_data.signals_data[0].time_vector = time_vector.tolist() 

1442 signals_data.signals_data[0].values = new_values.tolist() 

1443 signals_data.signals_data[0].forced_values = new_forced_values.tolist() 

1444 signals_data.signals_data[0].number_samples = time_vector.size 

1445 

1446 signals_data.signals_data[0].signal_id = signal_id 

1447 

1448 return signals_data 

1449 

1450 @classmethod 

1451 async def apply_multiple_function( 

1452 cls, 

1453 phases: list, 

1454 signal_ids: list, 

1455 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION, 

1456 window_min_timestamp: float = None, 

1457 window_max_timestamp: float = None, 

1458 ): 

1459 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION): 

1460 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}" 

1461 else: 

1462 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}" 

1463 

1464 active_phase = phases[0] 

1465 if function in {"Align-X", "Using-X"}: 

1466 active_phase = phases[1] 

1467 

1468 processed_result_signal = Signal.get_from_signal_id(function_signal_id) 

1469 if processed_result_signal is not None and ( 

1470 active_phase.id in processed_result_signal.computed_phases_ids 

1471 ): # If signal has been computed for the correct phase 

1472 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp) 

1473 

1474 array_length = None 

1475 time_vector_list = [] 

1476 values_list = [] 

1477 forced_values_list = [] 

1478 forcible = True 

1479 for phase, signal_id in zip(phases, signal_ids): 

1480 signals_data = cls.get_from_phase_and_signal_ids( 

1481 [phase], [None], [signal_id], [None], [None], zero_time_vector=False 

1482 ) 

1483 

1484 if len(signals_data.signals_data) == 0: 

1485 return None 

1486 

1487 signal_data = signals_data.signals_data[0] 

1488 

1489 if array_length is None: 

1490 array_length = signal_data.number_samples 

1491 if ( 

1492 array_length != signal_data.number_samples and function != "Align-X" 

1493 ) or signal_data.number_samples == 0: 

1494 return None 

1495 

1496 time_vector_list.append(npy.array(signal_data.time_vector)) 

1497 values_list.append(npy.array(signal_data.values, dtype=npy.float64)) 

1498 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64)) 

1499 forcible = forcible and signal_data.forcible 

1500 

1501 time_vector = time_vector_list[0] 

1502 new_values = None 

1503 new_forced_values = None 

1504 

1505 match (function): 

1506 case "Align-X": 

1507 time_vector = time_vector_list[1] 

1508 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000 

1509 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000 

1510 new_values = align_x(old_time_vector, values_list[0], new_time_vector) 

1511 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector) 

1512 # case "Atan2": 

1513 # new_values = atan2(values_list[0], values_list[1]) 

1514 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1]) 

1515 case "Using-X": 

1516 if len(time_vector_list[0]) != len(time_vector_list[1]): 

1517 return None 

1518 time_vector = time_vector_list[1] 

1519 new_values = values_list[0] 

1520 new_forced_values = forced_values_list[0] 

1521 case "Mean": 

1522 new_values = mean(*values_list) 

1523 new_forced_values = mean(*forced_values_list) 

1524 case "Norm": 

1525 new_values = norm(*values_list) 

1526 new_forced_values = norm(*forced_values_list) 

1527 

1528 new_values = npy.where(npy.isnan(new_values), None, new_values) 

1529 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values) 

1530 

1531 loop = asyncio.get_running_loop() 

1532 loop.create_task( 

1533 cls.save_function_signal( 

1534 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible 

1535 ) 

1536 ) 

1537 

1538 total_number_samples = time_vector.size 

1539 

1540 if window_max_timestamp is not None: 

1541 max_timestamp_mask = time_vector <= window_max_timestamp 

1542 time_vector = time_vector[max_timestamp_mask] 

1543 new_values = new_values[max_timestamp_mask] 

1544 new_forced_values = new_forced_values[max_timestamp_mask] 

1545 if window_min_timestamp is not None: 

1546 min_timestamp_mask = time_vector >= window_min_timestamp 

1547 time_vector = time_vector[min_timestamp_mask] 

1548 new_values = new_values[min_timestamp_mask] 

1549 new_forced_values = new_forced_values[min_timestamp_mask] 

1550 

1551 signals_data = cls( 

1552 signals_data=[ 

1553 NumericSignalData( 

1554 signal_id=function_signal_id, 

1555 forcible=forcible, 

1556 time_vector=time_vector.tolist(), 

1557 values=new_values.tolist(), 

1558 forced_values=new_forced_values.tolist(), 

1559 number_samples=time_vector.size, 

1560 number_samples_db=total_number_samples, 

1561 ) 

1562 ], 

1563 data_processing_time=0, 

1564 data_start=0, 

1565 data_end=0, 

1566 ) 

1567 

1568 return signals_data 

1569 

1570 @classmethod 

1571 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float): 

1572 signal_data_collection = get_signal_collection(signal_id, create=True) 

1573 pipeline = [] 

1574 match_filter = {} 

1575 if window_min_timestamp is not None or window_max_timestamp is not None: 

1576 match_filter["$match"] = {} 

1577 match_filter["$match"]["precise_timestamp"] = {} 

1578 if window_max_timestamp is not None: 

1579 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp 

1580 if window_min_timestamp is not None: 

1581 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp 

1582 

1583 total_number_samples = signal_data_collection.count_documents({}) 

1584 

1585 if match_filter: 

1586 pipeline.append(match_filter) 

1587 

1588 fetch_start = time.time() 

1589 

1590 samples = signal_data_collection.aggregate(pipeline).to_list() 

1591 new_time_vector = [] 

1592 new_values = [] 

1593 new_forced_values = [] 

1594 for sample in samples: 

1595 new_time_vector.append(sample["precise_timestamp"]) 

1596 new_values.append(sample["value"]) 

1597 new_forced_values.append(sample["forced_value"]) 

1598 

1599 return cls( 

1600 signals_data=[ 

1601 NumericSignalData( 

1602 signal_id=signal_id, 

1603 time_vector=new_time_vector, 

1604 values=new_values, 

1605 forced_values=new_forced_values, 

1606 number_samples=len(new_time_vector), 

1607 number_samples_db=total_number_samples, 

1608 ) 

1609 ], 

1610 data_processing_time=time.time() - fetch_start, 

1611 data_start=0, 

1612 data_end=0, 

1613 ) 

1614 

1615 @classmethod 

1616 async def save_function_signal( 

1617 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool 

1618 ): 

1619 # Insert data first so if it is requested by another user, it will be computed again 

1620 signal_collection = get_signal_collection(function_signal_id, create=True) 

1621 signal_collection.delete_many( 

1622 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}} 

1623 ) 

1624 signal_collection.insert_many( 

1625 [ 

1626 { 

1627 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]), 

1628 "precise_timestamp": time_vector[i], 

1629 "value": new_values[i], 

1630 "forced_value": new_forced_values[i], 

1631 } 

1632 for i in range(len(time_vector)) 

1633 ] 

1634 ) 

1635 

1636 signals_config_collection = get_collection(systems_database, "signals", create=True) 

1637 signals_config_collection.find_one_and_update( 

1638 {"signal_id": function_signal_id}, 

1639 { 

1640 "$set": { 

1641 "description": "", 

1642 "unit": None, 

1643 "type": "sensor", 

1644 "address": None, 

1645 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0 

1646 "transfer_function": None, 

1647 "precision_digits": None, 

1648 "digitization_function": None, 

1649 "data_type": "float", 

1650 "formula": None, 

1651 "forcible": forcible, 

1652 "commandable": False, 

1653 "broadcastable": False, 

1654 "signal_id": function_signal_id, 

1655 "post_processing": True, 

1656 }, 

1657 "$push": {"computed_phases_ids": phase.id}, 

1658 }, 

1659 upsert=True, 

1660 ) 

1661 

1662 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []): 

1663 if post_processing: 

1664 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1665 zip_buffer = io.BytesIO() 

1666 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1667 for signal_data in self.signals_data: 

1668 file_name = signal_data.signal_id 

1669 if post_processing: 

1670 phase = phases_by_id.get( 

1671 signal_data.phase_id, Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="0") 

1672 ) 

1673 file_name = f"{signal_data.signal_id} ({phase.name})" 

1674 if file_format == "csv": 

1675 export_io = signal_data.csv_export() 

1676 zip_file.writestr(f"{file_name}.csv", export_io) 

1677 elif file_format == "prestoplot": 

1678 export_io = signal_data.prestoplot_export() 

1679 zip_file.writestr(f"{file_name}.tab", export_io) 

1680 else: 

1681 raise ValueError(f"Format not found. Got: {file_format}") 

1682 zip_bytes = zip_buffer.getvalue() 

1683 return zip_bytes 

1684 

1685 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []): 

1686 if post_processing: 

1687 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids} 

1688 hdf5_buffer = io.BytesIO() 

1689 custom_type_float = npy.dtype( 

1690 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1691 ) 

1692 custom_type_string = npy.dtype( 

1693 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1694 ) 

1695 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1696 for signal_data in self.signals_data: 

1697 if post_processing: 

1698 phase = phases_by_id.get( 

1699 signal_data.phase_id, Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="0") 

1700 ) 

1701 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})") 

1702 else: 

1703 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1704 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1705 if signal_data.data_type == "str": 

1706 export_data = npy.array( 

1707 list( 

1708 zip( 

1709 date_vector, 

1710 signal_data.time_vector, 

1711 signal_data.values, 

1712 signal_data.forced_values, 

1713 ) 

1714 ), 

1715 dtype=custom_type_string, 

1716 ) 

1717 else: 

1718 export_data = npy.array( 

1719 list( 

1720 zip( 

1721 date_vector, 

1722 signal_data.time_vector, 

1723 signal_data.values, 

1724 signal_data.forced_values, 

1725 ) 

1726 ), 

1727 dtype=custom_type_float, 

1728 ) 

1729 signal_group["data"] = export_data 

1730 return hdf5_buffer.getvalue() 

1731 

1732 

1733class SignalStatus(TwinPadModel): 

1734 status: str = "down" 

1735 reason: str = "" 

1736 delay: float | None = None 

1737 

1738 

1739class DigitizationFunction(TwinPadModel): 

1740 bits: int | None = None 

1741 min_value: float 

1742 max_value: float 

1743 min_raw_value: float 

1744 max_raw_value: float 

1745 

1746 

1747class SignalUpdate(TwinPadModel): 

1748 value: float | str | bool | int | None = None 

1749 forced_value: float | str | bool | int | None = None 

1750 timestamp: int | None = None 

1751 

1752 

1753class SignalType(str, Enum): 

1754 command = "command" 

1755 sensor = "sensor" 

1756 external_sensor = "external_sensor" 

1757 

1758 

1759SIGNALDATA_TYPES = { 

1760 "int": NumericSignalData, 

1761 "float": NumericSignalData, 

1762 "str": StringSignalData, 

1763 "bool": NumericSignalData, 

1764 "epoch": NumericSignalData, 

1765} 

1766 

1767 

1768class Signal(GenericMongo): 

1769 collection_name: ClassVar[str] = "signals" 

1770 

1771 signal_id: str 

1772 frequency: float 

1773 unit: str | None 

1774 description: str 

1775 type: SignalType 

1776 data_type: str 

1777 precision_digits: int | None 

1778 forcible: bool 

1779 status: SignalStatus = SignalStatus() 

1780 

1781 post_processing: bool = False 

1782 computed_phases_ids: list[str] = [] 

1783 

1784 digitization_function: DigitizationFunction | None 

1785 

1786 @property 

1787 def device(self) -> Device: 

1788 device_id = self.signal_id.split(".")[0] 

1789 device = Device.get_one_by_attribute("device_id", device_id) 

1790 return device 

1791 

1792 @cached_property 

1793 def signal_data_class(self): 

1794 if self.data_type in SIGNALDATA_TYPES: 

1795 return SIGNALDATA_TYPES[self.data_type] 

1796 if self.data_type.startswith("enum"): 

1797 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1798 raise ValueError(f"Unhandled python type: {self.data_type}") 

1799 

1800 @cached_property 

1801 def python_type(self): 

1802 if self.data_type in TYPES: 

1803 return TYPES[self.data_type] 

1804 if self.data_type.startswith("enum"): 

1805 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1806 return Literal[*choices] 

1807 raise ValueError(f"Unhandled python type: {self.data_type}") 

1808 

1809 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1810 command = Command( 

1811 sent_at=time.time(), 

1812 command_type="Signal command", 

1813 user_id=current_user.id, 

1814 ) 

1815 

1816 has_input_error = False 

1817 error_message = "" 

1818 

1819 if self.data_type.startswith("enum"): 

1820 enum_options = get_args(self.python_type) 

1821 

1822 if update_dict.value is not None and update_dict.value not in enum_options: 

1823 has_input_error = True 

1824 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1825 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1826 has_input_error = True 

1827 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1828 else: 

1829 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1830 has_input_error = True 

1831 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1832 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1833 has_input_error = True 

1834 error_message += ( 

1835 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1836 ) 

1837 

1838 if has_input_error: 

1839 command.response_time = 0 

1840 command.succeeded = False 

1841 command.description = f"Tried to modify signal {self.signal_id}" 

1842 response = {"error": True, "status_code": 400, "message": error_message} 

1843 else: 

1844 response = await send_signal_value(self.signal_id, update_dict) 

1845 command.receive_response(response) 

1846 

1847 Command.create(command) 

1848 return response 

1849 

1850 @classmethod 

1851 def get_from_signal_id(cls, signal_id) -> Self: 

1852 """Could be generic from mongo""" 

1853 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1854 if not raw_value: 

1855 return None 

1856 del raw_value["_id"] 

1857 return cls.dict_to_object(raw_value) 

1858 

1859 @classmethod 

1860 def get_all_ids(cls) -> list[str]: 

1861 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1862 

1863 return [signal["signal_id"] for signal in cursor] 

1864 

1865 @classmethod 

1866 def get_all_statuses(cls) -> list[dict]: 

1867 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

1868 

1869 return [ 

1870 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1871 for signal in cursor 

1872 ] 

1873 

1874 async def number_samples(self): 

1875 collection = get_signal_collection(signal_id=self.signal_id) 

1876 if collection is None: 

1877 return 0 

1878 

1879 number_samples = collection.estimated_document_count() 

1880 

1881 number_samples_async_collection = await get_async_collection( 

1882 systems_async_database, "number_samples", create=True, time_series=True 

1883 ) 

1884 

1885 loop = asyncio.get_running_loop() 

1886 loop.create_task( 

1887 number_samples_async_collection.insert_one( 

1888 { 

1889 "timestamp": datetime.datetime.now(pytz.UTC), 

1890 "signal_id": self.signal_id, 

1891 "number_samples": number_samples, 

1892 } 

1893 ) 

1894 ) 

1895 

1896 return number_samples 

1897 

1898 @classmethod 

1899 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]: 

1900 number_samples_by_id = {} 

1901 collections = get_signal_collections_batch(signal_ids) 

1902 number_samples_async_collection = await get_async_collection( 

1903 systems_async_database, "number_samples", create=True, time_series=True 

1904 ) 

1905 

1906 for signal_id, collection in zip(signal_ids, collections): 

1907 if collection is None: 

1908 number_samples_by_id[signal_id] = 0 

1909 continue 

1910 

1911 number_samples = collection.estimated_document_count() 

1912 

1913 number_samples_by_id[signal_id] = number_samples 

1914 

1915 now = datetime.datetime.now(pytz.UTC) 

1916 loop = asyncio.get_running_loop() 

1917 loop.create_task( 

1918 number_samples_async_collection.insert_many( 

1919 [ 

1920 { 

1921 "timestamp": now, 

1922 "signal_id": signal_id, 

1923 "number_samples": number_samples, 

1924 } 

1925 for signal_id, number_samples in number_samples_by_id.items() 

1926 ] 

1927 ) 

1928 ) 

1929 

1930 return number_samples_by_id 

1931 

1932 def sample_datasize(self): 

1933 return signals_database.command("collstats", self.signal_id)["size"] 

1934 

1935 @classmethod 

1936 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1937 result = cls.collection().aggregate( 

1938 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1939 ) 

1940 

1941 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1942 

1943 

1944class ForcedSignal(GenericMongo): 

1945 collection_name: ClassVar[str] = "forced_signals" 

1946 

1947 signal_id: str 

1948 forcing_user_id: str 

1949 forced_at: float 

1950 value: str | float 

1951 

1952 def insert(self): 

1953 insert_result = self.collection().find_one_and_update( 

1954 {"signal_id": self.signal_id}, 

1955 {"$set": self.to_dict(exclude={"id"})}, 

1956 upsert=True, 

1957 return_document=ReturnDocument.AFTER, 

1958 ) 

1959 self.id = str(insert_result["_id"]) 

1960 return self.id 

1961 

1962 @classmethod 

1963 def can_force(cls, signal_id: str, current_user: User) -> bool: 

1964 """Checks whether user can force a given signal. 

1965 

1966 :param signal_id: Signal ID of the signal to force 

1967 :type signal_id: str 

1968 :param current_user: Current user 

1969 :type current_user: User 

1970 :return: False if the signal was forced by someone else than the user, True otherwise 

1971 :rtype: bool 

1972 """ 

1973 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

1974 if forced_signal is not None: 

1975 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

1976 return False 

1977 return True 

1978 

1979 

1980class ServicesStatus(TwinPadModel): 

1981 backend: str 

1982 cloud_broker: str 

1983 time_series_database: str 

1984 signal_storage: str 

1985 heartbeat_storage: str 

1986 data_analyzer: str 

1987 

1988 @classmethod 

1989 def check(cls) -> Self: 

1990 return cls( 

1991 cloud_broker=ping(RABBITMQ_HOST), 

1992 backend="up", 

1993 time_series_database=ping(MONGO_HOST), 

1994 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1995 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1996 data_analyzer=ping(DATA_ANALYZER_HOST), 

1997 ) 

1998 

1999 

2000def ping(host): 

2001 try: 

2002 if ping3.ping(host, timeout=0.8): 

2003 return "up" 

2004 except PermissionError: 

2005 pass 

2006 return "down" 

2007 

2008 

2009class Event(GenericMongo): 

2010 collection_name: ClassVar[str] = "events" 

2011 

2012 name: str 

2013 timestamp: float 

2014 event_rule_id: str 

2015 

2016 @computed_field 

2017 @cached_property 

2018 def event_rule(self) -> "EventRule": 

2019 return EventRule.get_from_id(self.event_rule_id) 

2020 

2021 @classmethod 

2022 def dict_to_object(cls, dict_): 

2023 """Refine to convert timestamp to datetime for mongodb.""" 

2024 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

2025 return super().dict_to_object(dict_) 

2026 

2027 

2028class TwinPadActivity(GenericMongo): 

2029 timestamp: float 

2030 amount: int 

2031 

2032 @classmethod 

2033 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

2034 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2035 number_events_collection = get_collection(systems_database, "number_events") 

2036 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

2037 items = [] 

2038 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2039 if number_events_collection is None or recompute_amount: 

2040 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

2041 number_events_collection.delete_many({}) 

2042 first_event = events_collection.find_one(sort={"timestamp": 1}) 

2043 if first_event is None: 

2044 return items 

2045 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

2046 tzinfo=pytz.UTC 

2047 ) 

2048 while last_computed_day < TODAY: 

2049 day_nb_events = events_collection.count_documents( 

2050 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2051 ) 

2052 if day_nb_events > 0: 

2053 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

2054 last_computed_day += ONE_DAY_OFFSET 

2055 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2056 if number_events_today > 0: 

2057 number_events_collection.delete_many({"timestamp": TODAY}) 

2058 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

2059 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2060 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2061 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2062 for day in number_events: 

2063 day["timestamp"] = day["timestamp"].timestamp() 

2064 items.append(cls.mongo_dict_to_object(day)) 

2065 return items 

2066 

2067 @classmethod 

2068 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2069 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2070 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

2071 signals_number_samples_collection = get_collection( 

2072 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

2073 ) 

2074 items = [] 

2075 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2076 if number_samples_collection is None or recompute_amount: 

2077 number_samples_collection = get_collection( 

2078 systems_database, "number_received_samples", create=True, time_series=True 

2079 ) 

2080 number_samples_collection.delete_many({}) 

2081 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

2082 if first_sample is None: 

2083 return items 

2084 # compute from day of first found event 

2085 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

2086 tzinfo=pytz.UTC 

2087 ) 

2088 while last_computed_day < TODAY: 

2089 number_samples_request = signals_number_samples_collection.aggregate( 

2090 [ 

2091 { 

2092 "$match": { 

2093 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

2094 } 

2095 }, 

2096 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2097 ] 

2098 ).to_list() 

2099 if len(number_samples_request) == 0: 

2100 number_samples = 0 

2101 else: 

2102 number_samples = number_samples_request[0].get("number_samples", 0) 

2103 if number_samples > 0: 

2104 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

2105 last_computed_day += ONE_DAY_OFFSET 

2106 number_samples_request = signals_number_samples_collection.aggregate( 

2107 [ 

2108 {"$match": {"timestamp": {"$gte": TODAY}}}, 

2109 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

2110 ] 

2111 ).to_list() 

2112 if len(number_samples_request) == 0: 

2113 number_samples_today = 0 

2114 else: 

2115 number_samples_today = number_samples_request[0].get("number_samples", 0) 

2116 if number_samples_today > 0: 

2117 number_samples_collection.delete_many({"timestamp": TODAY}) 

2118 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

2119 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2120 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2121 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2122 for day in number_events: 

2123 day["timestamp"] = day["timestamp"].timestamp() 

2124 items.append(cls.mongo_dict_to_object(day)) 

2125 return items 

2126 

2127 @classmethod 

2128 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

2129 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

2130 number_commands_collection = get_collection(systems_database, "number_commands") 

2131 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

2132 items = [] 

2133 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

2134 if number_commands_collection is None or recompute_amount: 

2135 number_commands_collection = get_collection( 

2136 systems_database, "number_commands", create=True, time_series=True 

2137 ) 

2138 number_commands_collection.delete_many({}) 

2139 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

2140 if first_command is None: 

2141 return items 

2142 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

2143 tzinfo=pytz.UTC 

2144 ) 

2145 while last_computed_day < TODAY: 

2146 day_nb_commands = commands_collection.count_documents( 

2147 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

2148 ) 

2149 if day_nb_commands > 0: 

2150 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

2151 last_computed_day += ONE_DAY_OFFSET 

2152 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

2153 if number_commands_today > 0: 

2154 number_commands_collection.delete_many({"timestamp": TODAY}) 

2155 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

2156 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

2157 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

2158 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

2159 for day in number_commands: 

2160 day["timestamp"] = day["timestamp"].timestamp() 

2161 items.append(cls.mongo_dict_to_object(day)) 

2162 return items 

2163 

2164 

2165class EventRule(GenericMongo): 

2166 collection_name: ClassVar[str] = "event_rules" 

2167 

2168 name: str 

2169 formula: str 

2170 variables: list[str] 

2171 

2172 @computed_field 

2173 @cached_property 

2174 def number_events(self) -> int: 

2175 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

2176 

2177 

2178class Company(GenericMongo): 

2179 collection_name: ClassVar[str] = "companies" 

2180 name: str 

2181 

2182 

2183class Campaign(GenericMongo): 

2184 collection_name: ClassVar[str] = "campaigns" 

2185 

2186 # Properties 

2187 id: str | None = None 

2188 name: str 

2189 description: str | None = None 

2190 

2191 @classmethod 

2192 def create(cls, campaign: Self): 

2193 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

2194 if new_campaign is None: 

2195 return None 

2196 return {"campaign_id": str(new_campaign.inserted_id)} 

2197 

2198 @classmethod 

2199 def update(cls, campaign: Self): 

2200 updated_campaign = cls.collection().find_one_and_update( 

2201 {"_id": ObjectId(campaign.id)}, 

2202 {"$set": {"name": campaign.name, "description": campaign.description}}, 

2203 return_document=ReturnDocument.AFTER, 

2204 ) 

2205 return updated_campaign 

2206 

2207 @classmethod 

2208 def delete(cls, campaign_id): 

2209 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

2210 return deleted_user 

2211 

2212 

2213class Phase(GenericMongo): 

2214 collection_name: ClassVar[str] = "phases" 

2215 

2216 # Properties 

2217 id: str | None = None 

2218 name: str 

2219 description: str | None = None 

2220 start_at: float 

2221 end_at: float 

2222 

2223 # FK 

2224 campaign_id: str 

2225 

2226 # @classmethod 

2227 # def get_by_date(cls, datetime: float): 

2228 # phases = [] 

2229 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

2230 # phases.append(cls.dict_to_object(dict_).model_dump()) 

2231 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

2232 # phases.append(cls.dict_to_object(dict_).model_dump()) 

2233 # if phases is None: 

2234 # return None 

2235 # return phases 

2236 

2237 @classmethod 

2238 def create(cls, phase: Self): 

2239 phase = Phase( 

2240 name=phase.name, 

2241 description=phase.description, 

2242 start_at=phase.start_at, 

2243 end_at=phase.end_at, 

2244 campaign_id=phase.campaign_id, 

2245 ) 

2246 phase_collection = get_collection(systems_database, "phases", create=True) 

2247 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

2248 if new_phase is None: 

2249 return None 

2250 return {"phase_id": str(new_phase.inserted_id)} 

2251 

2252 @classmethod 

2253 def update(cls, phase: Self): 

2254 updated_phase = cls.collection().find_one_and_update( 

2255 {"_id": ObjectId(phase.id)}, 

2256 { 

2257 "$set": { 

2258 "name": phase.name, 

2259 "description": phase.description, 

2260 "start_at": phase.start_at, 

2261 "end_at": phase.end_at, 

2262 } 

2263 }, 

2264 return_document=ReturnDocument.AFTER, 

2265 ) 

2266 return updated_phase 

2267 

2268 @classmethod 

2269 def delete(cls, phase_id): 

2270 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

2271 return delete_phase 

2272 

2273 @classmethod 

2274 def deleteMany(cls, campaign_id): 

2275 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

2276 return delete_phases 

2277 

2278 

2279class CustomViewCreation(GenericMongo): 

2280 collection_name: ClassVar[str] = "custom_views" 

2281 

2282 name: str 

2283 configuration: list 

2284 

2285 

2286class CustomView(CustomViewCreation): 

2287 # Properties 

2288 id: str | None = None 

2289 

2290 # Foreign Key 

2291 user_id: str 

2292 

2293 # # Methods 

2294 # @classmethod 

2295 # def create(cls, form_custom_view: Self, user_id) -> list: 

2296 # custom_view = CustomView( 

2297 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

2298 # ) 

2299 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

2300 # return new_custom_view 

2301 

2302 # @classmethod 

2303 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

2304 # updated_custom_view = cls.collection().find_one_and_update( 

2305 # {"_id": ObjectId(custom_view_id)}, 

2306 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

2307 # return_document=ReturnDocument.AFTER, 

2308 # ) 

2309 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

2310 # del updated_custom_view["_id"] 

2311 # return cls(**updated_custom_view) 

2312 

2313 # @classmethod 

2314 # def delete(cls, custom_view_id) -> bool: 

2315 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

2316 # return deleted_custom_view.acknowledged 

2317 

2318 

2319CustomViewUpdate = create_update_model(CustomView) 

2320 

2321 

2322class Video(GenericMongo): 

2323 collection_name: ClassVar[str] = "videos" 

2324 

2325 # Properties 

2326 name: str 

2327 ip_addr: str 

2328 username: str | None = None 

2329 password: str | None = None 

2330 

2331 # Methods 

2332 @classmethod 

2333 def get_all(cls, sort_by="_id") -> list[Self]: 

2334 items = [] 

2335 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

2336 items.append(cls.mongo_dict_to_object(dict_)) 

2337 return items 

2338 

2339 @classmethod 

2340 def get_video(cls, camera_id: ObjectId): 

2341 camera = cls.get_from_id(camera_id) 

2342 if camera is not None: 

2343 return camera.name 

2344 return None 

2345 

2346 

2347class Command(GenericMongo): 

2348 collection_name: ClassVar[str] = "commands" 

2349 

2350 # Properties 

2351 timestamp: datetime.datetime = None 

2352 sent_at: float 

2353 response_time: float = 0.0 

2354 command_type: str 

2355 description: str = "" 

2356 succeeded: bool = False 

2357 

2358 # Foreign key 

2359 user_id: str 

2360 

2361 @classmethod 

2362 def collection(cls): 

2363 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

2364 

2365 @classmethod 

2366 def create(cls, command: Self): 

2367 command = cls( 

2368 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

2369 sent_at=command.sent_at, 

2370 response_time=command.response_time, 

2371 command_type=command.command_type, 

2372 description=command.description, 

2373 succeeded=command.succeeded, 

2374 user_id=command.user_id, 

2375 ) 

2376 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

2377 if new_command is None: 

2378 return None 

2379 return {"command_id": str(new_command.inserted_id)} 

2380 

2381 def receive_response(self, response: dict): 

2382 self.response_time = time.time() - self.sent_at 

2383 self.succeeded = response.get("error", True) is False 

2384 if self.description == "": 

2385 self.description += response.get("message", "").rstrip() 

2386 

2387 

2388class SignalsPresetCreation(GenericMongo): 

2389 name: str 

2390 signal_ids: list[str] 

2391 

2392 

2393class SignalsPreset(SignalsPresetCreation): 

2394 collection_name: ClassVar[str] = "signals_presets" 

2395 

2396 user_id: str 

2397 

2398 @classmethod 

2399 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

2400 signals_preset = cls( 

2401 user_id=user_id, 

2402 name=signals_preset.name, 

2403 signal_ids=signals_preset.signal_ids, 

2404 ) 

2405 

2406 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

2407 

2408 return str(new_signal_preset.inserted_id) 

2409 

2410 

2411SignalsPresetUpdate = create_update_model(SignalsPreset) 

2412 

2413 

2414class LineStyle(str, Enum): 

2415 solid = "solid" 

2416 dotted = "dotted" 

2417 dashed = "dashed" 

2418 

2419 

2420class SignalAppearance: 

2421 value_color: str 

2422 forced_value_color: str 

2423 

2424 

2425class GraphThemeCreation(GenericMongo): 

2426 collection_name: ClassVar[str] = "graph_themes" 

2427 

2428 name: str 

2429 signal_id: str 

2430 value_color: str = "" 

2431 forced_value_color: str = "" 

2432 value_line_style: LineStyle = LineStyle.solid 

2433 forced_value_line_style: LineStyle = LineStyle.solid 

2434 private: bool = True 

2435 

2436 

2437class PublicGraphTheme(GraphThemeCreation): 

2438 created_by_user: bool 

2439 in_user_library: bool 

2440 active_for_user: bool 

2441 

2442 _current_user_id: str = "" 

2443 

2444 @classproperty 

2445 def custom_pipeline_steps(cls) -> dict[str, list]: 

2446 return { 

2447 "created_by_user": [ 

2448 { 

2449 "$addFields": { 

2450 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

2451 } 

2452 } 

2453 ], 

2454 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

2455 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

2456 ], 

2457 "in_user_library": [ 

2458 { 

2459 "$addFields": { 

2460 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2461 } 

2462 } 

2463 ], 

2464 "active_for_user": [ 

2465 { 

2466 "$addFields": { 

2467 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2468 } 

2469 } 

2470 ], 

2471 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2472 "active": [ 

2473 { 

2474 "$addFields": { 

2475 "active": "$$REMOVE", 

2476 } 

2477 } 

2478 ], 

2479 "creator_id": [ 

2480 { 

2481 "$addFields": { 

2482 "creator_id": "$$REMOVE", 

2483 } 

2484 } 

2485 ], 

2486 } 

2487 

2488 @classmethod 

2489 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2490 cls._current_user_id = user_id 

2491 return super().response_from_query(query) 

2492 

2493 @classmethod 

2494 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2495 query.in_user_library = "true" 

2496 return cls.response_from_query(query, user_id) 

2497 

2498 @classmethod 

2499 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2500 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2501 

2502 @classmethod 

2503 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2504 cls._current_user_id = user_id 

2505 return super().get_by_attribute(attribute_name, attribute_value) 

2506 

2507 @classmethod 

2508 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2509 cls._current_user_id = user_id 

2510 return super().get_one_by_attribute(attribute_name, attribute_value) 

2511 

2512 @classmethod 

2513 def get_all(cls, sort_by: str, user_id: str): 

2514 cls._current_user_id = user_id 

2515 return super().get_all(sort_by) 

2516 

2517 @classmethod 

2518 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2519 pipeline = [ 

2520 { 

2521 "$match": { 

2522 "active": {"$eq": user_id}, 

2523 "signal_id": {"$in": signal_ids}, 

2524 } 

2525 }, 

2526 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2527 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2528 { 

2529 "$project": { 

2530 "_id": 0, 

2531 "signal_id": 1, 

2532 "value_color": 1, 

2533 "forced_value_color": 1, 

2534 "value_line_style": 1, 

2535 "forced_value_line_style": 1, 

2536 } 

2537 }, 

2538 ] 

2539 

2540 result = {} 

2541 

2542 cursor = cls.collection().aggregate(pipeline) 

2543 for document in cursor: 

2544 signal_id = document["signal_id"] 

2545 del document["signal_id"] 

2546 result[signal_id] = document 

2547 

2548 return result 

2549 

2550 

2551GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2552 

2553 

2554class PrivateGraphTheme(GraphThemeCreation): 

2555 # private 

2556 creator_id: str 

2557 in_library: list[str] 

2558 active: list[str] 

2559 

2560 @classmethod 

2561 def create( 

2562 cls, 

2563 creator_id: str, 

2564 name: str, 

2565 signal_id: str, 

2566 value_color: str, 

2567 forced_value_color: str, 

2568 value_line_style: LineStyle, 

2569 forced_value_line_style: LineStyle, 

2570 private: bool, 

2571 ): 

2572 color_setting = cls( 

2573 creator_id=creator_id, 

2574 name=name, 

2575 signal_id=signal_id, 

2576 value_color=value_color, 

2577 forced_value_color=forced_value_color, 

2578 value_line_style=value_line_style, 

2579 forced_value_line_style=forced_value_line_style, 

2580 private=private, 

2581 in_library=[creator_id], 

2582 active=[creator_id], 

2583 ) 

2584 

2585 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2586 color_setting.id = str(new_color_setting.inserted_id) 

2587 return color_setting 

2588 

2589 def update(self, update_dict: dict, user_id: str): 

2590 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2591 if in_user_lib and user_id not in self.in_library: 

2592 self.in_library.append(user_id) 

2593 elif not in_user_lib and user_id in self.in_library: 

2594 self.in_library.remove(user_id) 

2595 update_dict["in_library"] = self.in_library 

2596 del update_dict["in_user_library"] 

2597 

2598 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2599 if active_for_user and user_id not in self.active: 

2600 self.active.append(user_id) 

2601 elif not active_for_user and user_id in self.active: 

2602 self.active.remove(user_id) 

2603 update_dict["active"] = self.active 

2604 del update_dict["active_for_user"] 

2605 

2606 if update_dict.get("created_by_user") is not None: 

2607 del update_dict["created_by_user"] 

2608 

2609 self.collection().find_one_and_update( 

2610 {"_id": ObjectId(self.id)}, 

2611 {"$set": update_dict}, 

2612 ) 

2613 

2614 return {}