Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%

1154 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-17 10:20 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23import h5py 

24 

25# from scipy import signal as signal_scipy 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 devices_states_database, 

36) 

37from twinpad_backend.responses import ListResponse 

38from twinpad_backend.messages import send_mode_change, send_signal_value 

39 

40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

41 

42 

43RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

44MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

45SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

46HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

47DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

48 

49DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

50NUMBER_SAMPLES_DATABASE_UPDATE = 120 

51 

52logger = logging.getLogger("uvicorn.error") 

53 

54 

55class classproperty: 

56 """ 

57 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

58 Found here: https://stackoverflow.com/a/76301341 

59 """ 

60 

61 def __init__(self, func): 

62 self.fget = func 

63 

64 def __get__(self, _, owner): 

65 return self.fget(owner) 

66 

67 

68def create_update_model(model): 

69 fields = {} 

70 

71 for field_name, field_annotation in model.model_fields.items(): 

72 if field_name != "id": 

73 fields[field_name] = (field_annotation.annotation | None, None) 

74 

75 query_name = model.__name__ + "Update" 

76 return create_model(query_name, **fields) 

77 

78 

79def get_utc_date_from_timestamp(timestamp: float): 

80 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

81 

82 

83def downsample_list(time_vector: list, values: list, max_number_samples: int): 

84 if len(time_vector) < max_number_samples: 

85 return time_vector, values 

86 

87 time_vector_copy = copy.deepcopy(time_vector) 

88 values_copy = copy.deepcopy(values) 

89 

90 none_group_bounds = [] 

91 none_group_index = -1 

92 index = -1 

93 # LTTB doesn't handle None values so remove them 

94 while values_copy.count(None) > 0: 

95 # Store bounds of None value groups so we can insert them back after the downsampling 

96 if (new_index := values_copy.index(None)) != index: 

97 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

98 none_group_index += 1 

99 elif len(none_group_bounds[none_group_index]) < 2: 

100 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

101 else: 

102 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

103 values_copy.pop(new_index) 

104 index = new_index 

105 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

106 

107 try: 

108 values_array = npy.array([time_vector_copy, values_copy]).T 

109 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

110 

111 new_time_vector = interpolated_values[:, 0].tolist() 

112 new_values = interpolated_values[:, 1].tolist() 

113 except ValueError: 

114 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

115 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

116 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

117 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

118 return new_time_vector, new_values_nan_to_none 

119 

120 # insert back None values at the correct timestamps 

121 for none_group in none_group_bounds: 

122 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

123 new_time_vector[start_index:start_index] = none_group 

124 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

125 

126 return new_time_vector, new_values 

127 

128 

129def is_of_type(value, wanted_type): 

130 if wanted_type is float: 

131 return isinstance(value, (int, float)) 

132 return isinstance(value, wanted_type) 

133 

134 

135# Models 

136class TwinPadModel(BaseModel): 

137 @classmethod 

138 def dict_to_object(cls, dict_): 

139 return cls.model_validate(dict_) 

140 

141 def to_dict(self, exclude=None): 

142 dict_ = self.model_dump(exclude=exclude) 

143 return dict_ 

144 

145 

146class GenericMongo(TwinPadModel): 

147 id: str | None = None 

148 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

149 

150 @classmethod 

151 def collection(cls): 

152 return get_collection(systems_database, cls.collection_name, create=True) 

153 

154 @classmethod 

155 def response_from_query(cls, query) -> ListResponse[Self]: 

156 request_filters = query.mongodb_filter() 

157 items = [] 

158 

159 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

160 sort_dict = {} 

161 for sort in query.sort_by.split(","): 

162 if ":" in sort: 

163 sort_field, sort_order = sort.split(":") 

164 sort_order = int(sort_order) 

165 else: 

166 sort_field = sort 

167 sort_order = 1 

168 sort_dict[sort_field] = sort_order 

169 

170 collection = get_collection(systems_database, cls.collection_name, create=True) 

171 total = collection.count_documents(request_filters) 

172 

173 pipeline = [] 

174 added_properties = [] 

175 if "$and" in request_filters: 

176 for request_filter in request_filters["$and"]: 

177 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

178 if filtered_property in request_filter: 

179 pipeline.extend(pipeline_steps) 

180 added_properties.append(filtered_property) 

181 else: 

182 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

183 if filtered_property in request_filters: 

184 pipeline.extend(pipeline_steps) 

185 added_properties.append(filtered_property) 

186 pipeline.append({"$match": request_filters}) 

187 

188 for sort_field in sort_dict.keys(): 

189 if sort_field in cls.custom_pipeline_steps: 

190 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

191 added_properties.append(sort_field) 

192 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

193 

194 if (query.limit is not None) and (query.limit != 0): 

195 pipeline.append({"$limit": query.limit}) 

196 

197 for filtered_property, step in cls.custom_pipeline_steps.items(): 

198 if filtered_property not in added_properties: 

199 pipeline.extend(step) 

200 

201 cursor = collection.aggregate(pipeline) 

202 

203 for item_dict in cursor: 

204 items.append(cls.mongo_dict_to_object(item_dict)) 

205 

206 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

207 

208 @classmethod 

209 def get_from_id(cls, item_id) -> Self | None: 

210 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

211 

212 @classmethod 

213 def mongo_dict_to_object(cls, mongo_dict): 

214 mongo_dict["id"] = str(mongo_dict["_id"]) 

215 del mongo_dict["_id"] 

216 return cls.dict_to_object(mongo_dict) 

217 

218 @classmethod 

219 def get_by_attribute(cls, attribute_name: str, attribute_value): 

220 """Returns all items that match the attribute with value.""" 

221 pipeline = [] 

222 if attribute_name in cls.custom_pipeline_steps: 

223 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

224 pipeline.append({"$match": {attribute_name: attribute_value}}) 

225 for key, step in cls.custom_pipeline_steps.items(): 

226 if key != attribute_name: 

227 pipeline.extend(step) 

228 items = cls.collection().aggregate(pipeline) 

229 if items is None: 

230 return None 

231 return [cls.mongo_dict_to_object(d) for d in items] 

232 

233 @classmethod 

234 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

235 pipeline = [] 

236 if attribute_name in cls.custom_pipeline_steps: 

237 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

238 pipeline.append({"$match": {attribute_name: attribute_value}}) 

239 pipeline.append({"$limit": 1}) 

240 for key, step in cls.custom_pipeline_steps.items(): 

241 if key != attribute_name: 

242 pipeline.extend(step) 

243 items = cls.collection().aggregate(pipeline).to_list() 

244 if len(items) == 0: 

245 return None 

246 return cls.mongo_dict_to_object(items[0]) 

247 

248 @classmethod 

249 def get_all(cls, sort_by="_id") -> list[Self]: 

250 items = [] 

251 pipeline = [] 

252 if sort_by in cls.custom_pipeline_steps: 

253 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

254 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

255 for key, step in cls.custom_pipeline_steps.items(): 

256 if key != sort_by: 

257 pipeline.extend(step) 

258 for dict_ in cls.collection().aggregate(pipeline): 

259 items.append(cls.mongo_dict_to_object(dict_)) 

260 return items 

261 

262 @classmethod 

263 def get_number_documents(cls): 

264 collection = get_collection(systems_database, cls.collection_name) 

265 if collection is None: 

266 return 0 

267 return collection.count_documents({}) 

268 

269 def insert(self): 

270 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

271 self.id = str(insert_result.inserted_id) 

272 return self.id 

273 

274 def update(self, update_dict): 

275 for key, value in update_dict.items(): 

276 setattr(self, key, value) 

277 self.collection().find_one_and_update( 

278 {"_id": ObjectId(self.id)}, 

279 {"$set": update_dict}, 

280 return_document=ReturnDocument.AFTER, 

281 ) 

282 

283 return self 

284 

285 def delete(self): 

286 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

287 return result.deleted_count > 0 

288 

289 

290class User(GenericMongo): 

291 collection_name: ClassVar[str] = "users" 

292 

293 firstname: str 

294 lastname: str 

295 email: str 

296 password: str 

297 is_active: bool | None = False 

298 is_admin: bool | None = False 

299 is_connected: bool | None = False 

300 company_id: str | None = None 

301 

302 def to_dict(self, exclude=None): 

303 if exclude is None: 

304 exclude = {"password"} 

305 return GenericMongo.to_dict(self, exclude=exclude) 

306 

307 @classmethod 

308 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

309 users = cls.get_all() 

310 if not users: 

311 is_admin = True 

312 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

313 user_collection = get_collection(systems_database, "users", create=True) 

314 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

315 if new_user is None: 

316 return None 

317 return {"user_id": str(new_user.inserted_id)} 

318 

319 @classmethod 

320 def update_info(cls, user: "UserUpdate", user_id: str): 

321 updated_user = cls.collection().find_one_and_update( 

322 {"_id": ObjectId(user_id)}, 

323 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

324 return_document=ReturnDocument.AFTER, 

325 ) 

326 updated_user["id"] = str(updated_user["_id"]) 

327 del (updated_user["_id"], updated_user["is_connected"]) 

328 return cls(**updated_user) 

329 

330 

331UserUpdate = create_update_model(User) 

332 

333 

334class Mode(TwinPadModel): 

335 mode_id: int 

336 name: str 

337 frequency_multiplier: float 

338 min_frequency: float 

339 

340 

341class DeviceUpdate(TwinPadModel): 

342 mode_id: int 

343 

344 

345class Device(GenericMongo): 

346 collection_name: ClassVar[str] = "devices" 

347 

348 device_id: str 

349 name: str 

350 description: str = "" 

351 modes: list[Mode] 

352 current_mode_id: int | None = None 

353 last_ping: float | None = None 

354 petri_network: Any 

355 pid: Any 

356 load: float | None = None 

357 tokens: list[int] = Field(default_factory=list) 

358 status: str 

359 

360 async def change_mode(self, update_dict, current_user: User): 

361 has_error = False 

362 

363 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

364 has_error = True 

365 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

366 elif self.current_mode_id is not None: 

367 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

368 else: 

369 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

370 command = Command( 

371 sent_at=time.time(), 

372 command_type="Mode change", 

373 description=description, 

374 user_id=current_user.id, 

375 ) 

376 

377 if has_error: 

378 command.response_time = 0 

379 command.succeeded = False 

380 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

381 else: 

382 response = await send_mode_change(self.device_id, update_dict.mode_id) 

383 command.receive_response(response) 

384 

385 Command.create(command) 

386 return response 

387 

388 @classmethod 

389 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

390 devices_by_id = {} 

391 for signal_id in signal_ids: 

392 device_id = signal_id.split(".")[0] 

393 if device_id not in devices_by_id: 

394 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

395 return devices_by_id 

396 

397 

398class DeviceSetup(GenericMongo): 

399 collection_name: ClassVar[str] = "device_setups" 

400 

401 device_ids: list[str] 

402 active: bool = False 

403 variable_mapping: dict[str, str] 

404 

405 

406DeviceSetupUpdate = create_update_model(DeviceSetup) 

407 

408 

409class DeviceState(GenericMongo): 

410 collection_name: ClassVar[str] = "devices_states" 

411 

412 timestamp: float 

413 mode: str | None = None 

414 load: float | None = None 

415 tokens: list[int] = Field(default_factory=list) 

416 modified_properties: list[str] = Field(default_factory=list) 

417 

418 @classmethod 

419 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

420 req_filter = query.mongodb_filter() 

421 items = [] 

422 if ":" in query.sort_by: 

423 sort_field, sort_order = query.sort_by.split(":") 

424 sort_order = int(sort_order) 

425 else: 

426 sort_field = query.sort_by 

427 sort_order = 1 

428 collection = get_collection(devices_states_database, device_id) 

429 if collection is None: 

430 total = 0 

431 cursor = [] 

432 else: 

433 total = collection.count_documents(req_filter) 

434 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

435 if (query.limit is not None) and (query.limit != 0): 

436 cursor = cursor.limit(query.limit) 

437 for item_dict in cursor: 

438 items.append( 

439 cls( 

440 timestamp=item_dict.get("precise_timestamp"), 

441 mode=item_dict.get("mode", None), 

442 load=item_dict.get("load", None), 

443 tokens=item_dict.get("tokens", Field(default_factory=list)), 

444 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

445 ) 

446 ) 

447 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

448 

449 

450class SignalSample(TwinPadModel): 

451 signal_id: str 

452 timestamp: float 

453 value: float | int | str | bool | None 

454 forced_value: float | int | str | bool | None = None 

455 

456 @classmethod 

457 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

458 

459 collection = get_signal_collection(signal_id) 

460 if collection is None: 

461 return None 

462 

463 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

464 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

465 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

466 first_bucket = None 

467 if bucket is not None: 

468 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

469 if first_bucket is not None: 

470 sample_data = collection.find_one( 

471 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

472 ) 

473 else: 

474 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

475 

476 if sample_data is None: 

477 return None 

478 

479 timestamp = sample_data["precise_timestamp"] 

480 

481 return cls( 

482 signal_id=signal_id, 

483 timestamp=timestamp, 

484 value=sample_data.get("value", None), 

485 forced_value=sample_data.get("forced_value", None), 

486 ) 

487 

488 @classmethod 

489 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

490 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

491 

492 @classmethod 

493 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

494 

495 collection = get_signal_collection(signal_id) 

496 if collection is None: 

497 return None 

498 

499 # Same workaround as above function, very effective to narrow down big sets of data 

500 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

501 last_bucket = None 

502 if bucket is not None: 

503 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

504 if last_bucket is not None: 

505 sample_data = collection.find_one( 

506 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

507 sort={"precise_timestamp": -1}, 

508 ) 

509 else: 

510 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

511 

512 if sample_data is None: 

513 return None 

514 

515 timestamp = sample_data["precise_timestamp"] 

516 

517 if device is None: 

518 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

519 if device is not None and device.last_ping is not None: 

520 if timestamp is None: 

521 timestamp = device.last_ping 

522 else: 

523 timestamp = max(timestamp, device.last_ping) 

524 return cls( 

525 signal_id=signal_id, 

526 timestamp=timestamp, 

527 value=sample_data.get("value", None), 

528 forced_value=sample_data.get("forced_value", None), 

529 ) 

530 

531 @classmethod 

532 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

533 collection = get_signal_collection(signal_id) 

534 if collection is None: 

535 return None 

536 

537 sample_data = collection.find_one( 

538 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1} 

539 ) 

540 if sample_data is None: 

541 return None 

542 

543 return cls( 

544 signal_id=signal_id, 

545 timestamp=sample_data.get("precise_timestamp"), 

546 value=sample_data.get("value"), 

547 forced_value=sample_data.get("forced_value"), 

548 ) 

549 

550 @classmethod 

551 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

552 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

553 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

554 

555 @classmethod 

556 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

557 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

558 

559 

560class SignalData(TwinPadModel): 

561 signal_id: str 

562 forcible: bool = True 

563 time_vector: list[float] 

564 values: list[float | int | str | None] 

565 forced_values: list[float | int | str | None] 

566 

567 data_start: float | None = None 

568 data_end: float | None = None 

569 

570 number_samples: int = 0 

571 number_samples_db: int = 0 

572 

573 db_query_time: float = 0.0 

574 init_time: float = 0.0 

575 data_processing_time: float = 0.0 

576 

577 @classmethod 

578 def get_from_signal_id( 

579 cls, 

580 signal_id: str, 

581 min_timestamp: float = None, 

582 max_timestamp: float = None, 

583 window_min_timestamp: float = None, 

584 window_max_timestamp: float = None, 

585 interpolate_bounds: bool = True, 

586 max_documents: int = None, 

587 ) -> Self: 

588 

589 now = time.time() 

590 

591 req_signal = {} 

592 if min_timestamp is not None: 

593 req_signal.setdefault("timestamp", {}) 

594 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

595 if max_timestamp is not None: 

596 req_signal.setdefault("timestamp", {}) 

597 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

598 

599 collection = get_signal_collection(signal_id) 

600 if collection is None: 

601 return cls( 

602 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

603 ) 

604 

605 db_req_start = time.time() 

606 

607 sort_step = {"$sort": {"precise_timestamp": 1}} 

608 number_results = collection.count_documents(req_signal) 

609 

610 pipeline = [] 

611 if req_signal: 

612 pipeline.append({"$match": req_signal}) # Filter data if needed 

613 

614 pipeline.extend( 

615 [ 

616 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

617 sort_step, 

618 ] 

619 ) 

620 

621 if max_documents is not None and max_documents < number_results: 

622 unsampling_ratio = math.ceil(number_results / max_documents) 

623 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

624 pipeline.extend( 

625 [ 

626 { 

627 "$setWindowFields": { 

628 "sortBy": {"precise_timestamp": 1}, 

629 "output": {"index": {"$documentNumber": {}}}, 

630 } 

631 }, 

632 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

633 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

634 {"$replaceRoot": {"newRoot": "$doc"}}, 

635 {"$unset": ["index", "group_id"]}, 

636 {"$sort": {"precise_timestamp": 1}}, 

637 ] 

638 ) 

639 

640 # logger.info(f"pipeline: %s", str(pipeline)) 

641 cursor = collection.aggregate(pipeline) 

642 db_req_time = time.time() - db_req_start 

643 

644 init_time = time.time() 

645 

646 results = cursor.to_list() 

647 time_vector = [] 

648 values = [] 

649 forced_values = [] 

650 for s in results: 

651 time_vector.append(s["precise_timestamp"]) 

652 values.append(s.get("value", None)) 

653 forced_values.append(s.get("forced_value", None)) 

654 

655 signal = Signal.get_from_signal_id(signal_id) 

656 class_ = signal.signal_data_class 

657 

658 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

659 time_vector, values, forced_values = cls.interpolate_bounds( 

660 class_, 

661 collection, 

662 signal_id, 

663 time_vector, 

664 values, 

665 forced_values, 

666 window_min_timestamp, 

667 window_max_timestamp, 

668 ) 

669 

670 if values: 

671 # TODO: check below. a bit strange 

672 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

673 # Adding last value as it should be repeated 

674 time_vector.append(now) 

675 values.append(values[-1]) 

676 forced_values.append(forced_values[-1]) 

677 

678 init_time = time.time() - init_time 

679 

680 # See line 292 for explanation 

681 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

682 first_bucket = None 

683 if bucket is not None: 

684 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

685 if first_bucket is not None: 

686 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

687 else: 

688 data_start = None 

689 

690 last_bucket = None 

691 if bucket is not None: 

692 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

693 if last_bucket is not None: 

694 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

695 else: 

696 data_end = None 

697 

698 return class_( 

699 signal_id=signal_id, 

700 forcible=signal.forcible, 

701 time_vector=time_vector, 

702 values=values, 

703 forced_values=forced_values, 

704 data_start=data_start, 

705 data_end=data_end, 

706 number_samples=len(values), 

707 number_samples_db=number_results, 

708 db_query_time=db_req_time, 

709 init_time=init_time, 

710 ) 

711 

712 @staticmethod 

713 def interpolate_bounds( 

714 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

715 ): 

716 sample_right = None 

717 # Fetching right side value & interpolation 

718 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

719 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

720 sample_right = collection.find_one( 

721 { 

722 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

723 "value": {"$exists": True}, 

724 }, 

725 sort={"precise_timestamp": -1}, 

726 ) 

727 if sample_right: 

728 if time_vector: 

729 right_sd = class_( 

730 signal_id=signal_id, 

731 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

732 values=[values[-1], sample_right.get("value", None)], 

733 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

734 ) 

735 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

736 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

737 else: 

738 max_ts_value = sample_right.get("value", None) 

739 max_ts_forced_value = sample_right.get("forced_value", None) 

740 time_vector.append(window_max_timestamp) 

741 values.append(max_ts_value) 

742 forced_values.append(max_ts_forced_value) 

743 

744 # Fetching left side value & interpolation 

745 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

746 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

747 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

748 sample_left = sample_right 

749 sample_left = collection.find_one( 

750 { 

751 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

752 "value": {"$exists": True}, 

753 }, 

754 sort={"precise_timestamp": -1}, 

755 ) 

756 

757 if sample_left: 

758 if time_vector: 

759 left_sd = class_( 

760 signal_id=signal_id, 

761 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

762 values=[sample_left["value"], values[0]], 

763 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

764 ) 

765 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

766 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

767 else: 

768 min_ts_value = sample_left.get("value", None) 

769 min_ts_forced_value = sample_left.get("forced_value", None) 

770 time_vector.insert(0, window_min_timestamp) 

771 values.insert(0, min_ts_value) 

772 forced_values.insert(0, min_ts_forced_value) 

773 

774 return time_vector, values, forced_values 

775 

776 def interpolate_values(self, new_time_vector: list[float]): 

777 return self.interpolate(new_time_vector, self.values) 

778 

779 def interpolate_forced_values(self, new_time_vector: list[float]): 

780 return self.interpolate(new_time_vector, self.forced_values) 

781 

782 def uniform_desampling(self, number_samples_max: int) -> Self: 

783 data_processing_time = time.time() 

784 if number_samples_max and self.number_samples > number_samples_max: 

785 new_time_vector = npy.linspace( 

786 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

787 ).tolist() 

788 values = self.interpolate_values(new_time_vector) 

789 forced_values = self.interpolate_forced_values(new_time_vector) 

790 time_vector = new_time_vector 

791 number_samples = len(time_vector) 

792 else: 

793 time_vector = self.time_vector 

794 number_samples = len(self.values) 

795 values = self.values[:] 

796 forced_values = self.forced_values[:] 

797 data_processing_time = time.time() - data_processing_time 

798 

799 return self.__class__( 

800 signal_id=self.signal_id, 

801 time_vector=time_vector, 

802 values=values, 

803 forced_values=forced_values, 

804 number_samples=number_samples, 

805 number_samples_db=self.number_samples, 

806 data_start=self.data_start, 

807 data_end=self.data_end, 

808 db_query_time=self.db_query_time, 

809 init_time=self.init_time, 

810 data_processing_time=self.data_processing_time + data_processing_time, 

811 ) 

812 

813 def interest_window_desampling( 

814 self, 

815 window_max_number_samples: int, 

816 outside_max_number_samples: int, 

817 window_min_timestamp: float | None = None, 

818 window_max_timestamp: float | None = None, 

819 ) -> Self: 

820 """Performs a sampling in a window of interest and outside.""" 

821 

822 if not self.time_vector: 

823 return self 

824 

825 if window_min_timestamp is None: 

826 window_min_timestamp = self.time_vector[0] 

827 if window_max_timestamp is None: 

828 window_max_timestamp = self.time_vector[-1] 

829 

830 data_processing_time = time.time() 

831 

832 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

833 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

834 

835 time_vector_before = self.time_vector[:index_window_start] 

836 time_vector_window = self.time_vector[index_window_start:index_window_end] 

837 time_vector_after = self.time_vector[index_window_end:] 

838 

839 # Resampling window 

840 if time_vector_window: 

841 # Ensurring window bounds 

842 if time_vector_window[0] != window_min_timestamp: 

843 time_vector_window.insert(0, window_min_timestamp) 

844 if time_vector_window[-1] != window_max_timestamp: 

845 time_vector_window.append(window_max_timestamp) 

846 else: 

847 time_vector_window = [window_min_timestamp, window_max_timestamp] 

848 

849 if len(time_vector_window) > window_max_number_samples: 

850 # Resampling 

851 new_window_time_vector = npy.linspace( 

852 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

853 ).tolist() 

854 time_vector_window = new_window_time_vector 

855 

856 # Resampling outside 

857 number_samples_before = len(time_vector_before) 

858 number_samples_after = len(time_vector_after) 

859 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

860 new_number_samples_before = min( 

861 number_samples_before, 

862 math.ceil( 

863 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

864 ), 

865 ) 

866 new_number_samples_after = min( 

867 number_samples_after, 

868 math.ceil( 

869 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

870 ), 

871 ) 

872 # Adjusting numbers as math.ceil can do +1 on sum 

873 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

874 if new_number_samples_before > new_number_samples_after: 

875 new_number_samples_before -= 1 

876 else: 

877 new_number_samples_after -= 1 

878 

879 if new_number_samples_before > 0: 

880 new_time_vector_before = npy.linspace( 

881 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

882 ).tolist() 

883 time_vector_before = new_time_vector_before 

884 

885 if new_number_samples_after > 0: 

886 new_time_vector_after = npy.linspace( 

887 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

888 ).tolist()[::-1] 

889 time_vector_after = new_time_vector_after 

890 

891 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

892 values = self.interpolate_values(new_time_vector) 

893 forced_values = self.interpolate_forced_values(new_time_vector) 

894 number_samples = len(values) 

895 

896 data_processing_time = time.time() - data_processing_time 

897 

898 return self.__class__( 

899 signal_id=self.signal_id, 

900 forcible=self.forcible, 

901 time_vector=new_time_vector, 

902 values=values, 

903 forced_values=forced_values, 

904 number_samples=number_samples, 

905 number_samples_db=self.number_samples, 

906 data_start=self.data_start, 

907 data_end=self.data_end, 

908 db_query_time=self.db_query_time, 

909 init_time=self.init_time, 

910 data_processing_time=self.data_processing_time + data_processing_time, 

911 ) 

912 

913 def csv_export(self): 

914 output = io.StringIO() 

915 writer = csv.writer(output) 

916 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

917 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

918 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

919 return output.getvalue().encode("utf-8") 

920 

921 def prestoplot_export(self): 

922 clean_signal_id = self.signal_id.replace(".", "_") 

923 if clean_signal_id[0].isnumeric(): 

924 clean_signal_id = "_" + clean_signal_id 

925 

926 output = io.StringIO() 

927 output.write("# Encoding:\tUTF-8\n") 

928 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

929 output.write("ISO8601\tnone\tnone\n") 

930 output.write(f"# Description :\t{clean_signal_id}\n") 

931 

932 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

933 output.write( 

934 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

935 ) 

936 return output.getvalue().encode("utf-8") 

937 

938 

939class NumericSignalData(SignalData): 

940 data_type: str = "float" 

941 values: list[float | int | None] 

942 forced_values: list[float | int | None] 

943 

944 def interpolate(self, new_time_vector: list[float], items): 

945 items = [npy.nan if s is None else s for s in items] 

946 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

947 

948 def uniform_desampling(self, number_samples_max: int) -> Self: 

949 data_processing_time = time.time() 

950 if number_samples_max and self.number_samples > number_samples_max: 

951 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

952 forced_values = self.interpolate_forced_values(time_vector) 

953 number_samples = len(time_vector) 

954 else: 

955 time_vector = self.time_vector 

956 number_samples = len(self.values) 

957 values = self.values[:] 

958 forced_values = self.forced_values[:] 

959 data_processing_time = time.time() - data_processing_time 

960 

961 return self.__class__( 

962 signal_id=self.signal_id, 

963 time_vector=time_vector, 

964 values=values, 

965 forced_values=forced_values, 

966 number_samples=number_samples, 

967 number_samples_db=self.number_samples, 

968 data_start=self.data_start, 

969 data_end=self.data_end, 

970 db_query_time=self.db_query_time, 

971 init_time=self.init_time, 

972 data_processing_time=self.data_processing_time + data_processing_time, 

973 ) 

974 

975 def interest_window_desampling( 

976 self, 

977 window_max_number_samples: int, 

978 outside_max_number_samples: int, 

979 window_min_timestamp: float | None = None, 

980 window_max_timestamp: float | None = None, 

981 ) -> Self: 

982 """Performs a sampling in a window of interest and outside.""" 

983 

984 if not self.time_vector: 

985 return self 

986 

987 if window_min_timestamp is None: 

988 window_min_timestamp = self.time_vector[0] 

989 if window_max_timestamp is None: 

990 window_max_timestamp = self.time_vector[-1] 

991 

992 data_processing_time = time.time() 

993 

994 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

995 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

996 

997 time_vector_before = self.time_vector[:index_window_start] 

998 time_vector_window = self.time_vector[index_window_start:index_window_end] 

999 time_vector_after = self.time_vector[index_window_end:] 

1000 

1001 values_before = self.values[:index_window_start] 

1002 values_window = self.values[index_window_start:index_window_end] 

1003 values_after = self.values[index_window_end:] 

1004 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1005 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1006 

1007 # Resampling window 

1008 if time_vector_window: 

1009 # Ensurring window bounds 

1010 if time_vector_window[0] != window_min_timestamp: 

1011 time_vector_window.insert(0, window_min_timestamp) 

1012 values_window.insert(0, window_min_value) 

1013 if time_vector_window[-1] != window_max_timestamp: 

1014 time_vector_window.append(window_max_timestamp) 

1015 values_window.append(window_max_value) 

1016 else: 

1017 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1018 values_window = [window_min_value, window_max_value] 

1019 

1020 if len(time_vector_window) > window_max_number_samples: 

1021 # Resampling 

1022 time_vector_window, values_window = downsample_list( 

1023 time_vector_window, values_window, window_max_number_samples 

1024 ) 

1025 

1026 # Resampling outside 

1027 number_samples_before = len(time_vector_before) 

1028 number_samples_after = len(time_vector_after) 

1029 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1030 new_number_samples_before = min( 

1031 number_samples_before, 

1032 math.ceil( 

1033 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1034 ), 

1035 ) 

1036 new_number_samples_after = min( 

1037 number_samples_after, 

1038 math.ceil( 

1039 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1040 ), 

1041 ) 

1042 # Adjusting numbers as math.ceil can do +1 on sum 

1043 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1044 if new_number_samples_before > new_number_samples_after: 

1045 new_number_samples_before -= 1 

1046 else: 

1047 new_number_samples_after -= 1 

1048 

1049 if new_number_samples_before > 0: 

1050 time_vector_before, values_before = downsample_list( 

1051 time_vector_before, values_before, new_number_samples_before 

1052 ) 

1053 

1054 if new_number_samples_after > 0: 

1055 time_vector_after, values_after = downsample_list( 

1056 time_vector_after, values_after, new_number_samples_after 

1057 ) 

1058 

1059 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1060 values = values_before + values_window + values_after 

1061 forced_values = self.interpolate_forced_values(new_time_vector) 

1062 number_samples = len(values) 

1063 

1064 data_processing_time = time.time() - data_processing_time 

1065 

1066 return self.__class__( 

1067 signal_id=self.signal_id, 

1068 time_vector=new_time_vector, 

1069 values=values, 

1070 forced_values=forced_values, 

1071 number_samples=number_samples, 

1072 number_samples_db=self.number_samples, 

1073 data_start=self.data_start, 

1074 data_end=self.data_end, 

1075 db_query_time=self.db_query_time, 

1076 init_time=self.init_time, 

1077 data_processing_time=self.data_processing_time + data_processing_time, 

1078 ) 

1079 

1080 

1081class StringSignalData(SignalData): 

1082 data_type: str = "str" 

1083 values: list[str | None] 

1084 forced_values: list[str | None] 

1085 

1086 def interpolate(self, new_time_vector: list[float], items): 

1087 # Find the indices of the values in xp that are just smaller or equal to x 

1088 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1089 indices = npy.clip(indices, 0, len(items) - 1) 

1090 # Return the corresponding left string values from fp 

1091 return [items[i] for i in indices] 

1092 

1093 

1094class SignalsData(TwinPadModel): 

1095 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1096 data_processing_time: float 

1097 data_start: float | None 

1098 data_end: float | None 

1099 

1100 @classmethod 

1101 def get_from_signal_ids( 

1102 cls, 

1103 signal_ids: list[str], 

1104 min_timestamp: float = None, 

1105 max_timestamp: float = None, 

1106 window_min_timestamp: float = None, 

1107 window_max_timestamp: float = None, 

1108 interpolate_bounds: bool = True, 

1109 max_documents: int = None, 

1110 ) -> Self: 

1111 signals_data = [] 

1112 data_start = None 

1113 data_end = None 

1114 if max_timestamp is None: 

1115 max_timestamp = time.time() 

1116 data_processing_time = 0.0 

1117 for signal_id in signal_ids: 

1118 signal_data = SignalData.get_from_signal_id( 

1119 signal_id=signal_id, 

1120 min_timestamp=min_timestamp, 

1121 max_timestamp=max_timestamp, 

1122 window_min_timestamp=window_min_timestamp, 

1123 window_max_timestamp=window_max_timestamp, 

1124 interpolate_bounds=interpolate_bounds, 

1125 max_documents=max_documents, 

1126 ) 

1127 data_processing_time += signal_data.data_processing_time 

1128 signals_data.append(signal_data) 

1129 if signal_data.data_start is not None: 

1130 if data_start is None: 

1131 data_start = signal_data.data_start 

1132 else: 

1133 data_start = min(signal_data.data_start, data_start) 

1134 if signal_data.data_end is not None: 

1135 if data_end is None: 

1136 data_end = signal_data.data_end 

1137 else: 

1138 data_end = max(signal_data.data_end, data_end) 

1139 

1140 return cls( 

1141 signals_data=signals_data, 

1142 data_processing_time=data_processing_time, 

1143 data_start=data_start, 

1144 data_end=data_end, 

1145 ) 

1146 

1147 def uniform_desampling(self, number_samples_max: int) -> Self: 

1148 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1149 return SignalsData( 

1150 signals_data=signals_data, 

1151 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1152 data_start=self.data_start, 

1153 data_end=self.data_end, 

1154 ) 

1155 

1156 def interest_window_desampling( 

1157 self, 

1158 window_max_number_samples: int, 

1159 outside_max_number_samples: int, 

1160 window_min_timestamp: float = None, 

1161 window_max_timestamp: float = None, 

1162 ) -> Self: 

1163 signals_data = [ 

1164 s.interest_window_desampling( 

1165 window_max_number_samples=window_max_number_samples, 

1166 outside_max_number_samples=outside_max_number_samples, 

1167 window_min_timestamp=window_min_timestamp, 

1168 window_max_timestamp=window_max_timestamp, 

1169 ) 

1170 for s in self.signals_data 

1171 ] 

1172 

1173 return SignalsData( 

1174 signals_data=signals_data, 

1175 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1176 data_start=self.data_start, 

1177 data_end=self.data_end, 

1178 ) 

1179 

1180 def zip_export(self, file_format: str = "csv"): 

1181 # return self.signals_data[0].csv_export() 

1182 zip_buffer = io.BytesIO() 

1183 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1184 for signal_data in self.signals_data: 

1185 if file_format == "csv": 

1186 export_io = signal_data.csv_export() 

1187 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1188 elif file_format == "prestoplot": 

1189 export_io = signal_data.prestoplot_export() 

1190 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1191 else: 

1192 raise ValueError(f"Format not found. Got: {file_format}") 

1193 zip_bytes = zip_buffer.getvalue() 

1194 # zip_bytes.seek(0) 

1195 return zip_bytes 

1196 

1197 def hdf5_export(self): 

1198 hdf5_buffer = io.BytesIO() 

1199 custom_type_float = npy.dtype( 

1200 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1201 ) 

1202 custom_type_string = npy.dtype( 

1203 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1204 ) 

1205 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1206 for signal_data in self.signals_data: 

1207 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1208 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1209 if signal_data.data_type == "str": 

1210 export_data = npy.array( 

1211 list( 

1212 zip( 

1213 date_vector, 

1214 signal_data.time_vector, 

1215 signal_data.values, 

1216 signal_data.forced_values, 

1217 ) 

1218 ), 

1219 dtype=custom_type_string, 

1220 ) 

1221 else: 

1222 export_data = npy.array( 

1223 list( 

1224 zip( 

1225 date_vector, 

1226 signal_data.time_vector, 

1227 signal_data.values, 

1228 signal_data.forced_values, 

1229 ) 

1230 ), 

1231 dtype=custom_type_float, 

1232 ) 

1233 signal_group["data"] = export_data 

1234 return hdf5_buffer.getvalue() 

1235 

1236 

1237class SignalStatus(TwinPadModel): 

1238 status: str = "down" 

1239 reason: str = "" 

1240 delay: float | None = None 

1241 

1242 

1243class DigitizationFunction(TwinPadModel): 

1244 bits: int | None = None 

1245 min_value: float 

1246 max_value: float 

1247 min_raw_value: float 

1248 max_raw_value: float 

1249 

1250 

1251class SignalUpdate(TwinPadModel): 

1252 value: float | str | bool | int | None = None 

1253 forced_value: float | str | bool | int | None = None 

1254 timestamp: int | None = None 

1255 

1256 

1257class SignalType(str, Enum): 

1258 command = "command" 

1259 sensor = "sensor" 

1260 external_sensor = "external_sensor" 

1261 

1262 

1263SIGNALDATA_TYPES = { 

1264 "int": NumericSignalData, 

1265 "float": NumericSignalData, 

1266 "str": StringSignalData, 

1267 "bool": NumericSignalData, 

1268 "epoch": NumericSignalData, 

1269} 

1270 

1271 

1272class Signal(GenericMongo): 

1273 collection_name: ClassVar[str] = "signals" 

1274 

1275 signal_id: str 

1276 frequency: float 

1277 unit: str | None 

1278 description: str 

1279 type: SignalType 

1280 data_type: str 

1281 precision_digits: int | None 

1282 forcible: bool 

1283 status: SignalStatus = SignalStatus() 

1284 

1285 digitization_function: DigitizationFunction | None 

1286 

1287 @property 

1288 def device(self) -> Device: 

1289 device_id = self.signal_id.split(".")[0] 

1290 device = Device.get_one_by_attribute("device_id", device_id) 

1291 return device 

1292 

1293 @cached_property 

1294 def signal_data_class(self): 

1295 if self.data_type in SIGNALDATA_TYPES: 

1296 return SIGNALDATA_TYPES[self.data_type] 

1297 if self.data_type.startswith("enum"): 

1298 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1299 raise ValueError(f"Unhandled python type: {self.data_type}") 

1300 

1301 @cached_property 

1302 def python_type(self): 

1303 if self.data_type in TYPES: 

1304 return TYPES[self.data_type] 

1305 if self.data_type.startswith("enum"): 

1306 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1307 return Literal[*choices] 

1308 raise ValueError(f"Unhandled python type: {self.data_type}") 

1309 

1310 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1311 command = Command( 

1312 sent_at=time.time(), 

1313 command_type="Signal command", 

1314 user_id=current_user.id, 

1315 ) 

1316 

1317 has_input_error = False 

1318 error_message = "" 

1319 

1320 if self.data_type.startswith("enum"): 

1321 enum_options = get_args(self.python_type) 

1322 

1323 if update_dict.value is not None and update_dict.value not in enum_options: 

1324 has_input_error = True 

1325 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1326 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1327 has_input_error = True 

1328 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1329 else: 

1330 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1331 has_input_error = True 

1332 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1333 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1334 has_input_error = True 

1335 error_message += ( 

1336 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1337 ) 

1338 

1339 if has_input_error: 

1340 command.response_time = 0 

1341 command.succeeded = False 

1342 command.description = f"Tried to modify signal {self.signal_id}" 

1343 response = {"error": True, "status_code": 400, "message": error_message} 

1344 else: 

1345 response = await send_signal_value(self.signal_id, update_dict) 

1346 command.receive_response(response) 

1347 

1348 Command.create(command) 

1349 return response 

1350 

1351 @classmethod 

1352 def get_from_signal_id(cls, signal_id) -> Self: 

1353 """Could be generic from mongo""" 

1354 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1355 if not raw_value: 

1356 return None 

1357 del raw_value["_id"] 

1358 return cls.dict_to_object(raw_value) 

1359 

1360 @classmethod 

1361 def get_all_ids(cls) -> list[str]: 

1362 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1363 

1364 return [signal["signal_id"] for signal in cursor] 

1365 

1366 @classmethod 

1367 def get_all_statuses(cls) -> list[dict]: 

1368 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

1369 

1370 return [ 

1371 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1372 for signal in cursor 

1373 ] 

1374 

1375 async def number_samples(self): 

1376 collection = get_signal_collection(signal_id=self.signal_id) 

1377 if collection is None: 

1378 return 0 

1379 

1380 number_samples = collection.estimated_document_count() 

1381 

1382 number_samples_async_collection = await get_async_collection( 

1383 systems_async_database, "number_samples", create=True, time_series=True 

1384 ) 

1385 

1386 loop = asyncio.get_running_loop() 

1387 loop.create_task( 

1388 number_samples_async_collection.insert_one( 

1389 { 

1390 "timestamp": datetime.datetime.now(pytz.UTC), 

1391 "signal_id": self.signal_id, 

1392 "number_samples": number_samples, 

1393 } 

1394 ) 

1395 ) 

1396 

1397 return number_samples 

1398 

1399 @classmethod 

1400 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]: 

1401 number_samples_by_id = {} 

1402 collections = get_signal_collections_batch(signal_ids) 

1403 number_samples_async_collection = await get_async_collection( 

1404 systems_async_database, "number_samples", create=True, time_series=True 

1405 ) 

1406 

1407 for signal_id, collection in zip(signal_ids, collections): 

1408 if collection is None: 

1409 number_samples_by_id[signal_id] = 0 

1410 continue 

1411 

1412 number_samples = collection.estimated_document_count() 

1413 

1414 number_samples_by_id[signal_id] = number_samples 

1415 

1416 now = datetime.datetime.now(pytz.UTC) 

1417 loop = asyncio.get_running_loop() 

1418 loop.create_task( 

1419 number_samples_async_collection.insert_many( 

1420 [ 

1421 { 

1422 "timestamp": now, 

1423 "signal_id": signal_id, 

1424 "number_samples": number_samples, 

1425 } 

1426 for signal_id, number_samples in number_samples_by_id.items() 

1427 ] 

1428 ) 

1429 ) 

1430 

1431 return number_samples_by_id 

1432 

1433 def sample_datasize(self): 

1434 return signals_database.command("collstats", self.signal_id)["size"] 

1435 

1436 @classmethod 

1437 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1438 result = cls.collection().aggregate( 

1439 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1440 ) 

1441 

1442 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1443 

1444 

1445class ServicesStatus(TwinPadModel): 

1446 backend: str 

1447 cloud_broker: str 

1448 time_series_database: str 

1449 signal_storage: str 

1450 heartbeat_storage: str 

1451 data_analyzer: str 

1452 

1453 @classmethod 

1454 def check(cls) -> Self: 

1455 return cls( 

1456 cloud_broker=ping(RABBITMQ_HOST), 

1457 backend="up", 

1458 time_series_database=ping(MONGO_HOST), 

1459 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1460 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1461 data_analyzer=ping(DATA_ANALYZER_HOST), 

1462 ) 

1463 

1464 

1465def ping(host): 

1466 try: 

1467 if ping3.ping(host, timeout=0.8): 

1468 return "up" 

1469 except PermissionError: 

1470 pass 

1471 return "down" 

1472 

1473 

1474class Event(GenericMongo): 

1475 collection_name: ClassVar[str] = "events" 

1476 

1477 name: str 

1478 timestamp: float 

1479 event_rule_id: str 

1480 

1481 @computed_field 

1482 @cached_property 

1483 def event_rule(self) -> "EventRule": 

1484 return EventRule.get_from_id(self.event_rule_id) 

1485 

1486 @classmethod 

1487 def dict_to_object(cls, dict_): 

1488 """Refine to convert timestamp to datetime for mongodb.""" 

1489 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1490 return super().dict_to_object(dict_) 

1491 

1492 

1493class TwinPadActivity(GenericMongo): 

1494 timestamp: float 

1495 amount: int 

1496 

1497 @classmethod 

1498 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

1499 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1500 number_events_collection = get_collection(systems_database, "number_events") 

1501 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1502 items = [] 

1503 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1504 if number_events_collection is None or recompute_amount: 

1505 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1506 number_events_collection.delete_many({}) 

1507 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1508 if first_event is None: 

1509 return items 

1510 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1511 tzinfo=pytz.UTC 

1512 ) 

1513 while last_computed_day < TODAY: 

1514 day_nb_events = events_collection.count_documents( 

1515 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1516 ) 

1517 if day_nb_events > 0: 

1518 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

1519 last_computed_day += ONE_DAY_OFFSET 

1520 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1521 if number_events_today > 0: 

1522 number_events_collection.delete_many({"timestamp": TODAY}) 

1523 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

1524 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1525 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1526 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1527 for day in number_events: 

1528 day["timestamp"] = day["timestamp"].timestamp() 

1529 items.append(cls.mongo_dict_to_object(day)) 

1530 return items 

1531 

1532 @classmethod 

1533 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1534 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1535 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1536 signals_number_samples_collection = get_collection( 

1537 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

1538 ) 

1539 items = [] 

1540 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1541 if number_samples_collection is None or recompute_amount: 

1542 number_samples_collection = get_collection( 

1543 systems_database, "number_received_samples", create=True, time_series=True 

1544 ) 

1545 number_samples_collection.delete_many({}) 

1546 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

1547 if first_sample is None: 

1548 return items 

1549 # compute from day of first found event 

1550 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

1551 tzinfo=pytz.UTC 

1552 ) 

1553 while last_computed_day < TODAY: 

1554 number_samples_request = signals_number_samples_collection.aggregate( 

1555 [ 

1556 { 

1557 "$match": { 

1558 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

1559 } 

1560 }, 

1561 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1562 ] 

1563 ).to_list() 

1564 if len(number_samples_request) == 0: 

1565 number_samples = 0 

1566 else: 

1567 number_samples = number_samples_request[0].get("number_samples", 0) 

1568 if number_samples > 0: 

1569 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

1570 last_computed_day += ONE_DAY_OFFSET 

1571 number_samples_request = signals_number_samples_collection.aggregate( 

1572 [ 

1573 {"$match": {"timestamp": {"$gte": TODAY}}}, 

1574 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1575 ] 

1576 ).to_list() 

1577 if len(number_samples_request) == 0: 

1578 number_samples_today = 0 

1579 else: 

1580 number_samples_today = number_samples_request[0].get("number_samples", 0) 

1581 if number_samples_today > 0: 

1582 number_samples_collection.delete_many({"timestamp": TODAY}) 

1583 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

1584 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1585 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1586 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1587 for day in number_events: 

1588 day["timestamp"] = day["timestamp"].timestamp() 

1589 items.append(cls.mongo_dict_to_object(day)) 

1590 return items 

1591 

1592 @classmethod 

1593 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1594 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1595 number_commands_collection = get_collection(systems_database, "number_commands") 

1596 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

1597 items = [] 

1598 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1599 if number_commands_collection is None or recompute_amount: 

1600 number_commands_collection = get_collection( 

1601 systems_database, "number_commands", create=True, time_series=True 

1602 ) 

1603 number_commands_collection.delete_many({}) 

1604 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

1605 if first_command is None: 

1606 return items 

1607 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

1608 tzinfo=pytz.UTC 

1609 ) 

1610 while last_computed_day < TODAY: 

1611 day_nb_commands = commands_collection.count_documents( 

1612 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1613 ) 

1614 if day_nb_commands > 0: 

1615 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

1616 last_computed_day += ONE_DAY_OFFSET 

1617 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1618 if number_commands_today > 0: 

1619 number_commands_collection.delete_many({"timestamp": TODAY}) 

1620 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

1621 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1622 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1623 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1624 for day in number_commands: 

1625 day["timestamp"] = day["timestamp"].timestamp() 

1626 items.append(cls.mongo_dict_to_object(day)) 

1627 return items 

1628 

1629 

1630class EventRule(GenericMongo): 

1631 collection_name: ClassVar[str] = "event_rules" 

1632 

1633 name: str 

1634 formula: str 

1635 variables: list[str] 

1636 

1637 @computed_field 

1638 @cached_property 

1639 def number_events(self) -> int: 

1640 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1641 

1642 

1643class Company(GenericMongo): 

1644 collection_name: ClassVar[str] = "companies" 

1645 name: str 

1646 

1647 

1648class Campaign(GenericMongo): 

1649 collection_name: ClassVar[str] = "campaigns" 

1650 

1651 # Properties 

1652 id: str | None = None 

1653 name: str 

1654 description: str | None = None 

1655 

1656 @classmethod 

1657 def create(cls, campaign: Self): 

1658 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1659 if new_campaign is None: 

1660 return None 

1661 return {"campaign_id": str(new_campaign.inserted_id)} 

1662 

1663 @classmethod 

1664 def update(cls, campaign: Self): 

1665 updated_campaign = cls.collection().find_one_and_update( 

1666 {"_id": ObjectId(campaign.id)}, 

1667 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1668 return_document=ReturnDocument.AFTER, 

1669 ) 

1670 return updated_campaign 

1671 

1672 @classmethod 

1673 def delete(cls, campaign_id): 

1674 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1675 return deleted_user 

1676 

1677 

1678class Phase(GenericMongo): 

1679 collection_name: ClassVar[str] = "phases" 

1680 

1681 # Properties 

1682 id: str | None = None 

1683 name: str 

1684 description: str | None = None 

1685 start_at: float 

1686 end_at: float 

1687 

1688 # FK 

1689 campaign_id: str 

1690 

1691 # @classmethod 

1692 # def get_by_date(cls, datetime: float): 

1693 # phases = [] 

1694 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1695 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1696 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1697 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1698 # if phases is None: 

1699 # return None 

1700 # return phases 

1701 

1702 @classmethod 

1703 def create(cls, phase: Self): 

1704 phase = Phase( 

1705 name=phase.name, 

1706 description=phase.description, 

1707 start_at=phase.start_at, 

1708 end_at=phase.end_at, 

1709 campaign_id=phase.campaign_id, 

1710 ) 

1711 phase_collection = get_collection(systems_database, "phases", create=True) 

1712 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1713 if new_phase is None: 

1714 return None 

1715 return {"phase_id": str(new_phase.inserted_id)} 

1716 

1717 @classmethod 

1718 def update(cls, phase: Self): 

1719 updated_phase = cls.collection().find_one_and_update( 

1720 {"_id": ObjectId(phase.id)}, 

1721 { 

1722 "$set": { 

1723 "name": phase.name, 

1724 "description": phase.description, 

1725 "start_at": phase.start_at, 

1726 "end_at": phase.end_at, 

1727 } 

1728 }, 

1729 return_document=ReturnDocument.AFTER, 

1730 ) 

1731 return updated_phase 

1732 

1733 @classmethod 

1734 def delete(cls, phase_id): 

1735 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1736 return delete_phase 

1737 

1738 @classmethod 

1739 def deleteMany(cls, campaign_id): 

1740 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1741 return delete_phases 

1742 

1743 

1744class CustomViewCreation(GenericMongo): 

1745 collection_name: ClassVar[str] = "custom_views" 

1746 

1747 name: str 

1748 configuration: list 

1749 

1750 

1751class CustomView(CustomViewCreation): 

1752 # Properties 

1753 id: str | None = None 

1754 

1755 # Foreign Key 

1756 user_id: str 

1757 

1758 # # Methods 

1759 # @classmethod 

1760 # def create(cls, form_custom_view: Self, user_id) -> list: 

1761 # custom_view = CustomView( 

1762 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1763 # ) 

1764 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1765 # return new_custom_view 

1766 

1767 # @classmethod 

1768 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1769 # updated_custom_view = cls.collection().find_one_and_update( 

1770 # {"_id": ObjectId(custom_view_id)}, 

1771 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1772 # return_document=ReturnDocument.AFTER, 

1773 # ) 

1774 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1775 # del updated_custom_view["_id"] 

1776 # return cls(**updated_custom_view) 

1777 

1778 # @classmethod 

1779 # def delete(cls, custom_view_id) -> bool: 

1780 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1781 # return deleted_custom_view.acknowledged 

1782 

1783 

1784CustomViewUpdate = create_update_model(CustomView) 

1785 

1786 

1787class Video(GenericMongo): 

1788 collection_name: ClassVar[str] = "videos" 

1789 

1790 # Properties 

1791 name: str 

1792 ip_addr: str 

1793 username: str | None = None 

1794 password: str | None = None 

1795 

1796 # Methods 

1797 @classmethod 

1798 def get_all(cls, sort_by="_id") -> list[Self]: 

1799 items = [] 

1800 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1801 items.append(cls.mongo_dict_to_object(dict_)) 

1802 return items 

1803 

1804 @classmethod 

1805 def get_video(cls, camera_id: ObjectId): 

1806 camera = cls.get_from_id(camera_id) 

1807 if camera is not None: 

1808 return camera.name 

1809 return None 

1810 

1811 

1812class Command(GenericMongo): 

1813 collection_name: ClassVar[str] = "commands" 

1814 

1815 # Properties 

1816 timestamp: datetime.datetime = None 

1817 sent_at: float 

1818 response_time: float = 0.0 

1819 command_type: str 

1820 description: str = "" 

1821 succeeded: bool = False 

1822 

1823 # Foreign key 

1824 user_id: str 

1825 

1826 @classmethod 

1827 def collection(cls): 

1828 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1829 

1830 @classmethod 

1831 def create(cls, command: Self): 

1832 command = cls( 

1833 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1834 sent_at=command.sent_at, 

1835 response_time=command.response_time, 

1836 command_type=command.command_type, 

1837 description=command.description, 

1838 succeeded=command.succeeded, 

1839 user_id=command.user_id, 

1840 ) 

1841 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

1842 if new_command is None: 

1843 return None 

1844 return {"command_id": str(new_command.inserted_id)} 

1845 

1846 def receive_response(self, response: dict): 

1847 self.response_time = time.time() - self.sent_at 

1848 self.succeeded = response.get("error", True) is False 

1849 if self.description == "": 

1850 self.description += response.get("message", "").rstrip() 

1851 

1852 

1853class SignalsPresetCreation(GenericMongo): 

1854 name: str 

1855 signal_ids: list[str] 

1856 

1857 

1858class SignalsPreset(SignalsPresetCreation): 

1859 collection_name: ClassVar[str] = "signals_presets" 

1860 

1861 user_id: str 

1862 

1863 @classmethod 

1864 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

1865 signals_preset = cls( 

1866 user_id=user_id, 

1867 name=signals_preset.name, 

1868 signal_ids=signals_preset.signal_ids, 

1869 ) 

1870 

1871 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

1872 

1873 return str(new_signal_preset.inserted_id) 

1874 

1875 

1876SignalsPresetUpdate = create_update_model(SignalsPreset) 

1877 

1878 

1879class LineStyle(str, Enum): 

1880 solid = "solid" 

1881 dotted = "dotted" 

1882 dashed = "dashed" 

1883 

1884 

1885class SignalAppearance: 

1886 value_color: str 

1887 forced_value_color: str 

1888 

1889 

1890class GraphThemeCreation(GenericMongo): 

1891 collection_name: ClassVar[str] = "graph_themes" 

1892 

1893 name: str 

1894 signal_id: str 

1895 value_color: str = "" 

1896 forced_value_color: str = "" 

1897 value_line_style: LineStyle = LineStyle.solid 

1898 forced_value_line_style: LineStyle = LineStyle.solid 

1899 private: bool = True 

1900 

1901 

1902class PublicGraphTheme(GraphThemeCreation): 

1903 created_by_user: bool 

1904 in_user_library: bool 

1905 active_for_user: bool 

1906 

1907 _current_user_id: str = "" 

1908 

1909 @classproperty 

1910 def custom_pipeline_steps(cls) -> dict[str, list]: 

1911 return { 

1912 "created_by_user": [ 

1913 { 

1914 "$addFields": { 

1915 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

1916 } 

1917 } 

1918 ], 

1919 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

1920 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

1921 ], 

1922 "in_user_library": [ 

1923 { 

1924 "$addFields": { 

1925 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

1926 } 

1927 } 

1928 ], 

1929 "active_for_user": [ 

1930 { 

1931 "$addFields": { 

1932 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

1933 } 

1934 } 

1935 ], 

1936 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

1937 "active": [ 

1938 { 

1939 "$addFields": { 

1940 "active": "$$REMOVE", 

1941 } 

1942 } 

1943 ], 

1944 "creator_id": [ 

1945 { 

1946 "$addFields": { 

1947 "creator_id": "$$REMOVE", 

1948 } 

1949 } 

1950 ], 

1951 } 

1952 

1953 @classmethod 

1954 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

1955 cls._current_user_id = user_id 

1956 return super().response_from_query(query) 

1957 

1958 @classmethod 

1959 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

1960 query.in_user_library = "true" 

1961 return cls.response_from_query(query, user_id) 

1962 

1963 @classmethod 

1964 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

1965 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

1966 

1967 @classmethod 

1968 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

1969 cls._current_user_id = user_id 

1970 return super().get_by_attribute(attribute_name, attribute_value) 

1971 

1972 @classmethod 

1973 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

1974 cls._current_user_id = user_id 

1975 return super().get_one_by_attribute(attribute_name, attribute_value) 

1976 

1977 @classmethod 

1978 def get_all(cls, sort_by: str, user_id: str): 

1979 cls._current_user_id = user_id 

1980 return super().get_all(sort_by) 

1981 

1982 @classmethod 

1983 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

1984 pipeline = [ 

1985 { 

1986 "$match": { 

1987 "active": {"$eq": user_id}, 

1988 "signal_id": {"$in": signal_ids}, 

1989 } 

1990 }, 

1991 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

1992 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

1993 { 

1994 "$project": { 

1995 "_id": 0, 

1996 "signal_id": 1, 

1997 "value_color": 1, 

1998 "forced_value_color": 1, 

1999 "value_line_style": 1, 

2000 "forced_value_line_style": 1, 

2001 } 

2002 }, 

2003 ] 

2004 

2005 result = {} 

2006 

2007 cursor = cls.collection().aggregate(pipeline) 

2008 for document in cursor: 

2009 signal_id = document["signal_id"] 

2010 del document["signal_id"] 

2011 result[signal_id] = document 

2012 

2013 return result 

2014 

2015 

2016GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2017 

2018 

2019class PrivateGraphTheme(GraphThemeCreation): 

2020 # private 

2021 creator_id: str 

2022 in_library: list[str] 

2023 active: list[str] 

2024 

2025 @classmethod 

2026 def create( 

2027 cls, 

2028 creator_id: str, 

2029 name: str, 

2030 signal_id: str, 

2031 value_color: str, 

2032 forced_value_color: str, 

2033 value_line_style: LineStyle, 

2034 forced_value_line_style: LineStyle, 

2035 private: bool, 

2036 ): 

2037 color_setting = cls( 

2038 creator_id=creator_id, 

2039 name=name, 

2040 signal_id=signal_id, 

2041 value_color=value_color, 

2042 forced_value_color=forced_value_color, 

2043 value_line_style=value_line_style, 

2044 forced_value_line_style=forced_value_line_style, 

2045 private=private, 

2046 in_library=[creator_id], 

2047 active=[creator_id], 

2048 ) 

2049 

2050 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2051 color_setting.id = str(new_color_setting.inserted_id) 

2052 return color_setting 

2053 

2054 def update(self, update_dict: dict, user_id: str): 

2055 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2056 if in_user_lib and user_id not in self.in_library: 

2057 self.in_library.append(user_id) 

2058 elif not in_user_lib and user_id in self.in_library: 

2059 self.in_library.remove(user_id) 

2060 update_dict["in_library"] = self.in_library 

2061 del update_dict["in_user_library"] 

2062 

2063 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2064 if active_for_user and user_id not in self.active: 

2065 self.active.append(user_id) 

2066 elif not active_for_user and user_id in self.active: 

2067 self.active.remove(user_id) 

2068 update_dict["active"] = self.active 

2069 del update_dict["active_for_user"] 

2070 

2071 if update_dict.get("created_by_user") is not None: 

2072 del update_dict["created_by_user"] 

2073 

2074 self.collection().find_one_and_update( 

2075 {"_id": ObjectId(self.id)}, 

2076 {"$set": update_dict}, 

2077 ) 

2078 

2079 return {}