Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%

1163 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-23 11:24 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23import h5py 

24 

25# from scipy import signal as signal_scipy 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 systems_database, 

32 systems_async_database, 

33 signals_database, 

34 devices_states_database, 

35) 

36from twinpad_backend.responses import ListResponse 

37from twinpad_backend.messages import send_mode_change, send_signal_value 

38 

39TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

40 

41 

42RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

43MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

44SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

45HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

46DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

47 

48DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

49NUMBER_SAMPLES_DATABASE_UPDATE = 120 

50 

51logger = logging.getLogger("uvicorn.error") 

52 

53 

54class classproperty: 

55 """ 

56 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

57 Found here: https://stackoverflow.com/a/76301341 

58 """ 

59 

60 def __init__(self, func): 

61 self.fget = func 

62 

63 def __get__(self, _, owner): 

64 return self.fget(owner) 

65 

66 

67def create_update_model(model): 

68 fields = {} 

69 

70 for field_name, field_annotation in model.model_fields.items(): 

71 if field_name != "id": 

72 fields[field_name] = (field_annotation.annotation | None, None) 

73 

74 query_name = model.__name__ + "Update" 

75 return create_model(query_name, **fields) 

76 

77 

78def get_utc_date_from_timestamp(timestamp: float): 

79 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

80 

81 

82def downsample_list(time_vector: list, values: list, max_number_samples: int): 

83 if len(time_vector) < max_number_samples: 

84 return time_vector, values 

85 

86 time_vector_copy = copy.deepcopy(time_vector) 

87 values_copy = copy.deepcopy(values) 

88 

89 none_group_bounds = [] 

90 none_group_index = -1 

91 index = -1 

92 # LTTB doesn't handle None values so remove them 

93 while values_copy.count(None) > 0: 

94 # Store bounds of None value groups so we can insert them back after the downsampling 

95 if (new_index := values_copy.index(None)) != index: 

96 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

97 none_group_index += 1 

98 elif len(none_group_bounds[none_group_index]) < 2: 

99 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

100 else: 

101 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

102 values_copy.pop(new_index) 

103 index = new_index 

104 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

105 

106 try: 

107 values_array = npy.array([time_vector_copy, values_copy]).T 

108 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

109 

110 new_time_vector = interpolated_values[:, 0].tolist() 

111 new_values = interpolated_values[:, 1].tolist() 

112 except ValueError: 

113 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

114 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

115 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

116 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

117 return new_time_vector, new_values_nan_to_none 

118 

119 # insert back None values at the correct timestamps 

120 for none_group in none_group_bounds: 

121 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

122 new_time_vector[start_index:start_index] = none_group 

123 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

124 

125 return new_time_vector, new_values 

126 

127 

128def is_of_type(value, wanted_type): 

129 if wanted_type is float: 

130 return isinstance(value, (int, float)) 

131 return isinstance(value, wanted_type) 

132 

133 

134# Models 

135class TwinPadModel(BaseModel): 

136 @classmethod 

137 def dict_to_object(cls, dict_): 

138 return cls.model_validate(dict_) 

139 

140 def to_dict(self, exclude=None): 

141 dict_ = self.model_dump(exclude=exclude) 

142 return dict_ 

143 

144 

145class GenericMongo(TwinPadModel): 

146 id: str | None = None 

147 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

148 

149 @classmethod 

150 def collection(cls): 

151 return get_collection(systems_database, cls.collection_name, create=True) 

152 

153 @classmethod 

154 def response_from_query(cls, query) -> ListResponse[Self]: 

155 request_filters = query.mongodb_filter() 

156 items = [] 

157 if ":" in query.sort_by: 

158 sort_field, sort_order = query.sort_by.split(":") 

159 sort_order = int(sort_order) 

160 else: 

161 sort_field = query.sort_by 

162 sort_order = 1 

163 collection = get_collection(systems_database, cls.collection_name, create=True) 

164 total = collection.count_documents(request_filters) 

165 

166 pipeline = [] 

167 added_properties = [] 

168 if "$and" in request_filters: 

169 for request_filter in request_filters["$and"]: 

170 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

171 if filtered_property in request_filter: 

172 pipeline.extend(pipeline_steps) 

173 added_properties.append(filtered_property) 

174 else: 

175 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

176 if filtered_property in request_filters: 

177 pipeline.extend(pipeline_steps) 

178 added_properties.append(filtered_property) 

179 pipeline.append({"$match": request_filters}) 

180 if sort_field in cls.custom_pipeline_steps: 

181 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

182 added_properties.append(sort_field) 

183 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}]) 

184 

185 if (query.limit is not None) and (query.limit != 0): 

186 pipeline.append({"$limit": query.limit}) 

187 

188 for filtered_property, step in cls.custom_pipeline_steps.items(): 

189 if filtered_property not in added_properties: 

190 pipeline.extend(step) 

191 

192 cursor = collection.aggregate(pipeline) 

193 

194 for item_dict in cursor: 

195 items.append(cls.mongo_dict_to_object(item_dict)) 

196 

197 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

198 

199 @classmethod 

200 def get_from_id(cls, item_id) -> Self | None: 

201 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

202 

203 @classmethod 

204 def mongo_dict_to_object(cls, mongo_dict): 

205 mongo_dict["id"] = str(mongo_dict["_id"]) 

206 del mongo_dict["_id"] 

207 return cls.dict_to_object(mongo_dict) 

208 

209 @classmethod 

210 def get_by_attribute(cls, attribute_name: str, attribute_value): 

211 """Returns all items that match the attribute with value.""" 

212 pipeline = [] 

213 if attribute_name in cls.custom_pipeline_steps: 

214 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

215 pipeline.append({"$match": {attribute_name: attribute_value}}) 

216 for key, step in cls.custom_pipeline_steps.items(): 

217 if key != attribute_name: 

218 pipeline.extend(step) 

219 items = cls.collection().aggregate(pipeline) 

220 if items is None: 

221 return None 

222 return [cls.mongo_dict_to_object(d) for d in items] 

223 

224 @classmethod 

225 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

226 pipeline = [] 

227 if attribute_name in cls.custom_pipeline_steps: 

228 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

229 pipeline.append({"$match": {attribute_name: attribute_value}}) 

230 pipeline.append({"$limit": 1}) 

231 for key, step in cls.custom_pipeline_steps.items(): 

232 if key != attribute_name: 

233 pipeline.extend(step) 

234 items = cls.collection().aggregate(pipeline).to_list() 

235 if len(items) == 0: 

236 return None 

237 return cls.mongo_dict_to_object(items[0]) 

238 

239 @classmethod 

240 def get_all(cls, sort_by="_id") -> list[Self]: 

241 items = [] 

242 pipeline = [] 

243 if sort_by in cls.custom_pipeline_steps: 

244 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

245 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

246 for key, step in cls.custom_pipeline_steps.items(): 

247 if key != sort_by: 

248 pipeline.extend(step) 

249 for dict_ in cls.collection().aggregate(pipeline): 

250 items.append(cls.mongo_dict_to_object(dict_)) 

251 return items 

252 

253 @classmethod 

254 def get_number_documents(cls): 

255 collection = get_collection(systems_database, cls.collection_name) 

256 if collection is None: 

257 return 0 

258 return collection.count_documents({}) 

259 

260 def insert(self): 

261 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

262 self.id = str(insert_result.inserted_id) 

263 return self.id 

264 

265 def update(self, update_dict): 

266 for key, value in update_dict.items(): 

267 setattr(self, key, value) 

268 self.collection().find_one_and_update( 

269 {"_id": ObjectId(self.id)}, 

270 {"$set": update_dict}, 

271 return_document=ReturnDocument.AFTER, 

272 ) 

273 

274 return self 

275 

276 def delete(self): 

277 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

278 return result.deleted_count > 0 

279 

280 

281class User(GenericMongo): 

282 collection_name: ClassVar[str] = "users" 

283 

284 firstname: str 

285 lastname: str 

286 email: str 

287 password: str 

288 is_active: bool | None = False 

289 is_admin: bool | None = False 

290 is_connected: bool | None = False 

291 company_id: str | None = None 

292 

293 def to_dict(self, exclude=None): 

294 if exclude is None: 

295 exclude = {"password"} 

296 return GenericMongo.to_dict(self, exclude=exclude) 

297 

298 @classmethod 

299 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

300 users = cls.get_all() 

301 if not users: 

302 is_admin = True 

303 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

304 user_collection = get_collection(systems_database, "users", create=True) 

305 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

306 if new_user is None: 

307 return None 

308 return {"user_id": str(new_user.inserted_id)} 

309 

310 @classmethod 

311 def update_info(cls, user: "UserUpdate", user_id: str): 

312 updated_user = cls.collection().find_one_and_update( 

313 {"_id": ObjectId(user_id)}, 

314 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

315 return_document=ReturnDocument.AFTER, 

316 ) 

317 updated_user["id"] = str(updated_user["_id"]) 

318 del (updated_user["_id"], updated_user["is_connected"]) 

319 return cls(**updated_user) 

320 

321 

322UserUpdate = create_update_model(User) 

323 

324 

325class Mode(TwinPadModel): 

326 mode_id: int 

327 name: str 

328 frequency_multiplier: float 

329 min_frequency: float 

330 

331 

332class DeviceUpdate(TwinPadModel): 

333 mode_id: int 

334 

335 

336class Device(GenericMongo): 

337 collection_name: ClassVar[str] = "devices" 

338 

339 device_id: str 

340 name: str 

341 description: str = "" 

342 modes: list[Mode] 

343 current_mode_id: int | None = None 

344 last_ping: float | None = None 

345 petri_network: Any 

346 pid: Any 

347 load: float | None = None 

348 tokens: list[int] = Field(default_factory=list) 

349 status: str 

350 

351 async def change_mode(self, update_dict, current_user: User): 

352 has_error = False 

353 

354 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

355 has_error = True 

356 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

357 elif self.current_mode_id is not None: 

358 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

359 else: 

360 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

361 command = Command( 

362 sent_at=time.time(), 

363 command_type="Mode change", 

364 description=description, 

365 user_id=current_user.id, 

366 ) 

367 

368 if has_error: 

369 command.response_time = 0 

370 command.succeeded = False 

371 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

372 else: 

373 response = await send_mode_change(self.device_id, update_dict.mode_id) 

374 command.receive_response(response) 

375 

376 Command.create(command) 

377 return response 

378 

379 @classmethod 

380 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

381 devices_by_id = {} 

382 for signal_id in signal_ids: 

383 device_id = signal_id.split(".")[0] 

384 if device_id not in devices_by_id: 

385 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

386 return devices_by_id 

387 

388 

389class DeviceSetup(GenericMongo): 

390 collection_name: ClassVar[str] = "device_setups" 

391 

392 device_ids: list[str] 

393 active: bool = False 

394 variable_mapping: dict[str, str] 

395 

396 

397DeviceSetupUpdate = create_update_model(DeviceSetup) 

398 

399 

400class DeviceState(GenericMongo): 

401 collection_name: ClassVar[str] = "devices_states" 

402 

403 timestamp: float 

404 mode: str | None = None 

405 load: float | None = None 

406 tokens: list[int] = Field(default_factory=list) 

407 modified_properties: list[str] = Field(default_factory=list) 

408 

409 @classmethod 

410 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

411 req_filter = query.mongodb_filter() 

412 items = [] 

413 if ":" in query.sort_by: 

414 sort_field, sort_order = query.sort_by.split(":") 

415 sort_order = int(sort_order) 

416 else: 

417 sort_field = query.sort_by 

418 sort_order = 1 

419 collection = get_collection(devices_states_database, device_id) 

420 if collection is None: 

421 total = 0 

422 cursor = [] 

423 else: 

424 total = collection.count_documents(req_filter) 

425 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

426 if (query.limit is not None) and (query.limit != 0): 

427 cursor = cursor.limit(query.limit) 

428 for item_dict in cursor: 

429 items.append( 

430 cls( 

431 timestamp=item_dict.get("precise_timestamp"), 

432 mode=item_dict.get("mode", None), 

433 load=item_dict.get("load", None), 

434 tokens=item_dict.get("tokens", Field(default_factory=list)), 

435 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

436 ) 

437 ) 

438 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

439 

440 

441class SignalSample(TwinPadModel): 

442 signal_id: str 

443 timestamp: float 

444 value: float | int | str | bool | None 

445 forced_value: float | int | str | bool | None = None 

446 

447 @classmethod 

448 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

449 

450 collection = get_signal_collection(signal_id) 

451 if collection is None: 

452 return None 

453 

454 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

455 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

456 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

457 first_bucket = None 

458 if bucket is not None: 

459 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

460 if first_bucket is not None: 

461 sample_data = collection.find_one( 

462 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

463 ) 

464 else: 

465 sample_data = collection.find_one({}, sort=[("precise_timestamp", 1)]) 

466 

467 if sample_data is None: 

468 return None 

469 

470 timestamp = sample_data["precise_timestamp"] 

471 

472 return cls( 

473 signal_id=signal_id, 

474 timestamp=timestamp, 

475 value=sample_data.get("value", None), 

476 forced_value=sample_data.get("forced_value", None), 

477 ) 

478 

479 @classmethod 

480 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

481 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

482 

483 @classmethod 

484 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

485 

486 collection = get_signal_collection(signal_id) 

487 if collection is None: 

488 return None 

489 

490 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

491 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

492 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

493 last_bucket = None 

494 if bucket is not None: 

495 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

496 if last_bucket is not None: 

497 sample_data = collection.find_one({"precise_timestamp": last_bucket["control"]["max"]["precise_timestamp"]}) 

498 else: 

499 sample_data = collection.find_one({}, sort=[("precise_timestamp", -1)]) 

500 

501 if sample_data is None: 

502 return None 

503 

504 timestamp = sample_data["precise_timestamp"] 

505 

506 if device is None: 

507 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

508 if device is not None and device.last_ping is not None: 

509 if timestamp is None: 

510 timestamp = device.last_ping 

511 else: 

512 timestamp = max(timestamp, device.last_ping) 

513 return cls( 

514 signal_id=signal_id, 

515 timestamp=timestamp, 

516 value=sample_data.get("value", None), 

517 forced_value=sample_data.get("forced_value", None), 

518 ) 

519 

520 @classmethod 

521 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

522 collection = get_signal_collection(signal_id) 

523 if collection is None: 

524 return None 

525 

526 cursor = collection.aggregate( 

527 [ 

528 {"$match": {"timestamp": {"$gte": datetime.datetime.fromtimestamp(min_timestamp, pytz.UTC)}}}, 

529 {"$sort": {"timestamp": -1}}, 

530 {"$limit": 1}, 

531 ] 

532 ) 

533 cursor_data = cursor.to_list() 

534 

535 if len(cursor_data) == 0: 

536 return None 

537 

538 sample_data = cursor_data[0] 

539 return cls( 

540 signal_id=signal_id, 

541 timestamp=sample_data.get("precise_timestamp"), 

542 value=sample_data.get("value"), 

543 forced_value=sample_data.get("forced_value"), 

544 ) 

545 

546 @classmethod 

547 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

548 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

549 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

550 

551 @classmethod 

552 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

553 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

554 

555 

556class SignalData(TwinPadModel): 

557 signal_id: str 

558 forcible: bool = True 

559 time_vector: list[float] 

560 values: list[float | int | str | None] 

561 forced_values: list[float | int | str | None] 

562 

563 data_start: float | None = None 

564 data_end: float | None = None 

565 

566 number_samples: int = 0 

567 number_samples_db: int = 0 

568 

569 db_query_time: float = 0.0 

570 init_time: float = 0.0 

571 data_processing_time: float = 0.0 

572 

573 @classmethod 

574 def get_from_signal_id( 

575 cls, 

576 signal_id: str, 

577 min_timestamp: float = None, 

578 max_timestamp: float = None, 

579 window_min_timestamp: float = None, 

580 window_max_timestamp: float = None, 

581 interpolate_bounds: bool = True, 

582 max_documents: int = None, 

583 ) -> Self: 

584 

585 now = time.time() 

586 

587 req_signal = {} 

588 if min_timestamp is not None: 

589 req_signal.setdefault("timestamp", {}) 

590 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

591 if max_timestamp is not None: 

592 req_signal.setdefault("timestamp", {}) 

593 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

594 

595 collection = get_signal_collection(signal_id) 

596 if collection is None: 

597 return cls( 

598 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

599 ) 

600 

601 db_req_start = time.time() 

602 

603 sort_step = {"$sort": {"precise_timestamp": 1}} 

604 number_results = collection.count_documents(req_signal) 

605 

606 pipeline = [] 

607 if req_signal: 

608 pipeline.append({"$match": req_signal}) # Filter data if needed 

609 

610 pipeline.extend( 

611 [ 

612 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

613 sort_step, 

614 ] 

615 ) 

616 

617 if max_documents is not None and max_documents < number_results: 

618 unsampling_ratio = math.ceil(number_results / max_documents) 

619 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

620 pipeline.extend( 

621 [ 

622 { 

623 "$setWindowFields": { 

624 "sortBy": {"precise_timestamp": 1}, 

625 "output": {"index": {"$documentNumber": {}}}, 

626 } 

627 }, 

628 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

629 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

630 {"$replaceRoot": {"newRoot": "$doc"}}, 

631 {"$unset": ["index", "group_id"]}, 

632 {"$sort": {"precise_timestamp": 1}}, 

633 ] 

634 ) 

635 

636 # logger.info(f"pipeline: %s", str(pipeline)) 

637 cursor = collection.aggregate(pipeline) 

638 db_req_time = time.time() - db_req_start 

639 

640 init_time = time.time() 

641 

642 results = cursor.to_list() 

643 time_vector = [] 

644 values = [] 

645 forced_values = [] 

646 for s in results: 

647 time_vector.append(s["precise_timestamp"]) 

648 values.append(s.get("value", None)) 

649 forced_values.append(s.get("forced_value", None)) 

650 

651 signal = Signal.get_from_signal_id(signal_id) 

652 class_ = signal.signal_data_class 

653 

654 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

655 time_vector, values, forced_values = cls.interpolate_bounds( 

656 class_, 

657 collection, 

658 signal_id, 

659 time_vector, 

660 values, 

661 forced_values, 

662 window_min_timestamp, 

663 window_max_timestamp, 

664 ) 

665 

666 if values: 

667 # TODO: check below. a bit strange 

668 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

669 # Adding last value as it should be repeated 

670 time_vector.append(now) 

671 values.append(values[-1]) 

672 forced_values.append(forced_values[-1]) 

673 

674 init_time = time.time() - init_time 

675 

676 # See line 292 for explanation 

677 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

678 first_bucket = None 

679 if bucket is not None: 

680 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

681 if first_bucket is not None: 

682 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

683 else: 

684 data_start = None 

685 

686 last_bucket = None 

687 if bucket is not None: 

688 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

689 if last_bucket is not None: 

690 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

691 else: 

692 data_end = None 

693 

694 return class_( 

695 signal_id=signal_id, 

696 forcible=signal.forcible, 

697 time_vector=time_vector, 

698 values=values, 

699 forced_values=forced_values, 

700 data_start=data_start, 

701 data_end=data_end, 

702 number_samples=len(values), 

703 number_samples_db=number_results, 

704 db_query_time=db_req_time, 

705 init_time=init_time, 

706 ) 

707 

708 @staticmethod 

709 def interpolate_bounds( 

710 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

711 ): 

712 sample_right = None 

713 # Fetching right side value & interpolation 

714 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

715 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

716 sample_right = collection.find_one( 

717 { 

718 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

719 "value": {"$exists": True}, 

720 }, 

721 sort=[("precise_timestamp", -1)], 

722 ) 

723 if sample_right: 

724 if time_vector: 

725 right_sd = class_( 

726 signal_id=signal_id, 

727 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

728 values=[values[-1], sample_right.get("value", None)], 

729 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

730 ) 

731 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

732 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

733 else: 

734 max_ts_value = sample_right.get("value", None) 

735 max_ts_forced_value = sample_right.get("forced_value", None) 

736 time_vector.append(window_max_timestamp) 

737 values.append(max_ts_value) 

738 forced_values.append(max_ts_forced_value) 

739 

740 # Fetching left side value & interpolation 

741 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

742 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

743 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

744 sample_left = sample_right 

745 sample_left = collection.find_one( 

746 { 

747 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

748 "value": {"$exists": True}, 

749 }, 

750 sort=[("precise_timestamp", -1)], 

751 ) 

752 

753 if sample_left: 

754 if time_vector: 

755 left_sd = class_( 

756 signal_id=signal_id, 

757 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

758 values=[sample_left["value"], values[0]], 

759 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

760 ) 

761 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

762 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

763 else: 

764 min_ts_value = sample_left.get("value", None) 

765 min_ts_forced_value = sample_left.get("forced_value", None) 

766 time_vector.insert(0, window_min_timestamp) 

767 values.insert(0, min_ts_value) 

768 forced_values.insert(0, min_ts_forced_value) 

769 

770 return time_vector, values, forced_values 

771 

772 def interpolate_values(self, new_time_vector: list[float]): 

773 return self.interpolate(new_time_vector, self.values) 

774 

775 def interpolate_forced_values(self, new_time_vector: list[float]): 

776 return self.interpolate(new_time_vector, self.forced_values) 

777 

778 def uniform_desampling(self, number_samples_max: int) -> Self: 

779 data_processing_time = time.time() 

780 if number_samples_max and self.number_samples > number_samples_max: 

781 new_time_vector = npy.linspace( 

782 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

783 ).tolist() 

784 values = self.interpolate_values(new_time_vector) 

785 forced_values = self.interpolate_forced_values(new_time_vector) 

786 time_vector = new_time_vector 

787 number_samples = len(time_vector) 

788 else: 

789 time_vector = self.time_vector 

790 number_samples = len(self.values) 

791 values = self.values[:] 

792 forced_values = self.forced_values[:] 

793 data_processing_time = time.time() - data_processing_time 

794 

795 return self.__class__( 

796 signal_id=self.signal_id, 

797 time_vector=time_vector, 

798 values=values, 

799 forced_values=forced_values, 

800 number_samples=number_samples, 

801 number_samples_db=self.number_samples, 

802 data_start=self.data_start, 

803 data_end=self.data_end, 

804 db_query_time=self.db_query_time, 

805 init_time=self.init_time, 

806 data_processing_time=self.data_processing_time + data_processing_time, 

807 ) 

808 

809 def interest_window_desampling( 

810 self, 

811 window_max_number_samples: int, 

812 outside_max_number_samples: int, 

813 window_min_timestamp: float | None = None, 

814 window_max_timestamp: float | None = None, 

815 ) -> Self: 

816 """Performs a sampling in a window of interest and outside.""" 

817 

818 if not self.time_vector: 

819 return self 

820 

821 if window_min_timestamp is None: 

822 window_min_timestamp = self.time_vector[0] 

823 if window_max_timestamp is None: 

824 window_max_timestamp = self.time_vector[-1] 

825 

826 data_processing_time = time.time() 

827 

828 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

829 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

830 

831 time_vector_before = self.time_vector[:index_window_start] 

832 time_vector_window = self.time_vector[index_window_start:index_window_end] 

833 time_vector_after = self.time_vector[index_window_end:] 

834 

835 # Resampling window 

836 if time_vector_window: 

837 # Ensurring window bounds 

838 if time_vector_window[0] != window_min_timestamp: 

839 time_vector_window.insert(0, window_min_timestamp) 

840 if time_vector_window[-1] != window_max_timestamp: 

841 time_vector_window.append(window_max_timestamp) 

842 else: 

843 time_vector_window = [window_min_timestamp, window_max_timestamp] 

844 

845 if len(time_vector_window) > window_max_number_samples: 

846 # Resampling 

847 new_window_time_vector = npy.linspace( 

848 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

849 ).tolist() 

850 time_vector_window = new_window_time_vector 

851 

852 # Resampling outside 

853 number_samples_before = len(time_vector_before) 

854 number_samples_after = len(time_vector_after) 

855 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

856 new_number_samples_before = min( 

857 number_samples_before, 

858 math.ceil( 

859 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

860 ), 

861 ) 

862 new_number_samples_after = min( 

863 number_samples_after, 

864 math.ceil( 

865 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

866 ), 

867 ) 

868 # Adjusting numbers as math.ceil can do +1 on sum 

869 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

870 if new_number_samples_before > new_number_samples_after: 

871 new_number_samples_before -= 1 

872 else: 

873 new_number_samples_after -= 1 

874 

875 if new_number_samples_before > 0: 

876 new_time_vector_before = npy.linspace( 

877 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

878 ).tolist() 

879 time_vector_before = new_time_vector_before 

880 

881 if new_number_samples_after > 0: 

882 new_time_vector_after = npy.linspace( 

883 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

884 ).tolist()[::-1] 

885 time_vector_after = new_time_vector_after 

886 

887 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

888 values = self.interpolate_values(new_time_vector) 

889 forced_values = self.interpolate_forced_values(new_time_vector) 

890 number_samples = len(values) 

891 

892 data_processing_time = time.time() - data_processing_time 

893 

894 return self.__class__( 

895 signal_id=self.signal_id, 

896 forcible=self.forcible, 

897 time_vector=new_time_vector, 

898 values=values, 

899 forced_values=forced_values, 

900 number_samples=number_samples, 

901 number_samples_db=self.number_samples, 

902 data_start=self.data_start, 

903 data_end=self.data_end, 

904 db_query_time=self.db_query_time, 

905 init_time=self.init_time, 

906 data_processing_time=self.data_processing_time + data_processing_time, 

907 ) 

908 

909 def csv_export(self): 

910 output = io.StringIO() 

911 writer = csv.writer(output) 

912 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

913 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

914 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

915 return output.getvalue().encode("utf-8") 

916 

917 def prestoplot_export(self): 

918 clean_signal_id = self.signal_id.replace(".", "_") 

919 if clean_signal_id[0].isnumeric(): 

920 clean_signal_id = "_" + clean_signal_id 

921 

922 output = io.StringIO() 

923 output.write("# Encoding:\tUTF-8\n") 

924 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

925 output.write("ISO8601\tnone\tnone\n") 

926 output.write(f"# Description :\t{clean_signal_id}\n") 

927 

928 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

929 output.write( 

930 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

931 ) 

932 return output.getvalue().encode("utf-8") 

933 

934 

935class NumericSignalData(SignalData): 

936 data_type: str = "float" 

937 values: list[float | int | None] 

938 forced_values: list[float | int | None] 

939 

940 def interpolate(self, new_time_vector: list[float], items): 

941 items = [npy.nan if s is None else s for s in items] 

942 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

943 

944 def uniform_desampling(self, number_samples_max: int) -> Self: 

945 data_processing_time = time.time() 

946 if number_samples_max and self.number_samples > number_samples_max: 

947 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

948 forced_values = self.interpolate_forced_values(time_vector) 

949 number_samples = len(time_vector) 

950 else: 

951 time_vector = self.time_vector 

952 number_samples = len(self.values) 

953 values = self.values[:] 

954 forced_values = self.forced_values[:] 

955 data_processing_time = time.time() - data_processing_time 

956 

957 return self.__class__( 

958 signal_id=self.signal_id, 

959 time_vector=time_vector, 

960 values=values, 

961 forced_values=forced_values, 

962 number_samples=number_samples, 

963 number_samples_db=self.number_samples, 

964 data_start=self.data_start, 

965 data_end=self.data_end, 

966 db_query_time=self.db_query_time, 

967 init_time=self.init_time, 

968 data_processing_time=self.data_processing_time + data_processing_time, 

969 ) 

970 

971 def interest_window_desampling( 

972 self, 

973 window_max_number_samples: int, 

974 outside_max_number_samples: int, 

975 window_min_timestamp: float | None = None, 

976 window_max_timestamp: float | None = None, 

977 ) -> Self: 

978 """Performs a sampling in a window of interest and outside.""" 

979 

980 if not self.time_vector: 

981 return self 

982 

983 if window_min_timestamp is None: 

984 window_min_timestamp = self.time_vector[0] 

985 if window_max_timestamp is None: 

986 window_max_timestamp = self.time_vector[-1] 

987 

988 data_processing_time = time.time() 

989 

990 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

991 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

992 

993 time_vector_before = self.time_vector[:index_window_start] 

994 time_vector_window = self.time_vector[index_window_start:index_window_end] 

995 time_vector_after = self.time_vector[index_window_end:] 

996 

997 values_before = self.values[:index_window_start] 

998 values_window = self.values[index_window_start:index_window_end] 

999 values_after = self.values[index_window_end:] 

1000 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1001 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1002 

1003 # Resampling window 

1004 if time_vector_window: 

1005 # Ensurring window bounds 

1006 if time_vector_window[0] != window_min_timestamp: 

1007 time_vector_window.insert(0, window_min_timestamp) 

1008 values_window.insert(0, window_min_value) 

1009 if time_vector_window[-1] != window_max_timestamp: 

1010 time_vector_window.append(window_max_timestamp) 

1011 values_window.append(window_max_value) 

1012 else: 

1013 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1014 values_window = [window_min_value, window_max_value] 

1015 

1016 if len(time_vector_window) > window_max_number_samples: 

1017 # Resampling 

1018 time_vector_window, values_window = downsample_list( 

1019 time_vector_window, values_window, window_max_number_samples 

1020 ) 

1021 

1022 # Resampling outside 

1023 number_samples_before = len(time_vector_before) 

1024 number_samples_after = len(time_vector_after) 

1025 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1026 new_number_samples_before = min( 

1027 number_samples_before, 

1028 math.ceil( 

1029 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1030 ), 

1031 ) 

1032 new_number_samples_after = min( 

1033 number_samples_after, 

1034 math.ceil( 

1035 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1036 ), 

1037 ) 

1038 # Adjusting numbers as math.ceil can do +1 on sum 

1039 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1040 if new_number_samples_before > new_number_samples_after: 

1041 new_number_samples_before -= 1 

1042 else: 

1043 new_number_samples_after -= 1 

1044 

1045 if new_number_samples_before > 0: 

1046 time_vector_before, values_before = downsample_list( 

1047 time_vector_before, values_before, new_number_samples_before 

1048 ) 

1049 

1050 if new_number_samples_after > 0: 

1051 time_vector_after, values_after = downsample_list( 

1052 time_vector_after, values_after, new_number_samples_after 

1053 ) 

1054 

1055 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1056 values = values_before + values_window + values_after 

1057 forced_values = self.interpolate_forced_values(new_time_vector) 

1058 number_samples = len(values) 

1059 

1060 data_processing_time = time.time() - data_processing_time 

1061 

1062 return self.__class__( 

1063 signal_id=self.signal_id, 

1064 time_vector=new_time_vector, 

1065 values=values, 

1066 forced_values=forced_values, 

1067 number_samples=number_samples, 

1068 number_samples_db=self.number_samples, 

1069 data_start=self.data_start, 

1070 data_end=self.data_end, 

1071 db_query_time=self.db_query_time, 

1072 init_time=self.init_time, 

1073 data_processing_time=self.data_processing_time + data_processing_time, 

1074 ) 

1075 

1076 

1077class StringSignalData(SignalData): 

1078 data_type: str = "str" 

1079 values: list[str | None] 

1080 forced_values: list[str | None] 

1081 

1082 def interpolate(self, new_time_vector: list[float], items): 

1083 # Find the indices of the values in xp that are just smaller or equal to x 

1084 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1085 indices = npy.clip(indices, 0, len(items) - 1) 

1086 # Return the corresponding left string values from fp 

1087 return [items[i] for i in indices] 

1088 

1089 

1090class SignalsData(TwinPadModel): 

1091 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1092 data_processing_time: float 

1093 data_start: float | None 

1094 data_end: float | None 

1095 

1096 @classmethod 

1097 def get_from_signal_ids( 

1098 cls, 

1099 signal_ids: list[str], 

1100 min_timestamp: float = None, 

1101 max_timestamp: float = None, 

1102 window_min_timestamp: float = None, 

1103 window_max_timestamp: float = None, 

1104 interpolate_bounds: bool = True, 

1105 max_documents: int = None, 

1106 ) -> Self: 

1107 signals_data = [] 

1108 data_start = None 

1109 data_end = None 

1110 if max_timestamp is None: 

1111 max_timestamp = time.time() 

1112 data_processing_time = 0.0 

1113 for signal_id in signal_ids: 

1114 signal_data = SignalData.get_from_signal_id( 

1115 signal_id=signal_id, 

1116 min_timestamp=min_timestamp, 

1117 max_timestamp=max_timestamp, 

1118 window_min_timestamp=window_min_timestamp, 

1119 window_max_timestamp=window_max_timestamp, 

1120 interpolate_bounds=interpolate_bounds, 

1121 max_documents=max_documents, 

1122 ) 

1123 data_processing_time += signal_data.data_processing_time 

1124 signals_data.append(signal_data) 

1125 if signal_data.data_start is not None: 

1126 if data_start is None: 

1127 data_start = signal_data.data_start 

1128 else: 

1129 data_start = min(signal_data.data_start, data_start) 

1130 if signal_data.data_end is not None: 

1131 if data_end is None: 

1132 data_end = signal_data.data_end 

1133 else: 

1134 data_end = max(signal_data.data_end, data_end) 

1135 

1136 return cls( 

1137 signals_data=signals_data, 

1138 data_processing_time=data_processing_time, 

1139 data_start=data_start, 

1140 data_end=data_end, 

1141 ) 

1142 

1143 def uniform_desampling(self, number_samples_max: int) -> Self: 

1144 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1145 return SignalsData( 

1146 signals_data=signals_data, 

1147 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1148 data_start=self.data_start, 

1149 data_end=self.data_end, 

1150 ) 

1151 

1152 def interest_window_desampling( 

1153 self, 

1154 window_max_number_samples: int, 

1155 outside_max_number_samples: int, 

1156 window_min_timestamp: float = None, 

1157 window_max_timestamp: float = None, 

1158 ) -> Self: 

1159 signals_data = [ 

1160 s.interest_window_desampling( 

1161 window_max_number_samples=window_max_number_samples, 

1162 outside_max_number_samples=outside_max_number_samples, 

1163 window_min_timestamp=window_min_timestamp, 

1164 window_max_timestamp=window_max_timestamp, 

1165 ) 

1166 for s in self.signals_data 

1167 ] 

1168 

1169 return SignalsData( 

1170 signals_data=signals_data, 

1171 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1172 data_start=self.data_start, 

1173 data_end=self.data_end, 

1174 ) 

1175 

1176 def zip_export(self, file_format: str = "csv"): 

1177 # return self.signals_data[0].csv_export() 

1178 zip_buffer = io.BytesIO() 

1179 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1180 for signal_data in self.signals_data: 

1181 if file_format == "csv": 

1182 export_io = signal_data.csv_export() 

1183 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1184 elif file_format == "prestoplot": 

1185 export_io = signal_data.prestoplot_export() 

1186 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1187 else: 

1188 raise ValueError(f"Format not found. Got: {file_format}") 

1189 zip_bytes = zip_buffer.getvalue() 

1190 # zip_bytes.seek(0) 

1191 return zip_bytes 

1192 

1193 def hdf5_export(self): 

1194 hdf5_buffer = io.BytesIO() 

1195 custom_type_float = npy.dtype( 

1196 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1197 ) 

1198 custom_type_string = npy.dtype( 

1199 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1200 ) 

1201 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1202 for signal_data in self.signals_data: 

1203 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1204 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1205 if signal_data.data_type == "str": 

1206 export_data = npy.array( 

1207 list( 

1208 zip( 

1209 date_vector, 

1210 signal_data.time_vector, 

1211 signal_data.values, 

1212 signal_data.forced_values, 

1213 ) 

1214 ), 

1215 dtype=custom_type_string, 

1216 ) 

1217 else: 

1218 export_data = npy.array( 

1219 list( 

1220 zip( 

1221 date_vector, 

1222 signal_data.time_vector, 

1223 signal_data.values, 

1224 signal_data.forced_values, 

1225 ) 

1226 ), 

1227 dtype=custom_type_float, 

1228 ) 

1229 signal_group["data"] = export_data 

1230 return hdf5_buffer.getvalue() 

1231 

1232 

1233class SignalStatus(TwinPadModel): 

1234 status: str 

1235 reason: str 

1236 delay: float | None 

1237 

1238 

1239class DigitizationFunction(TwinPadModel): 

1240 bits: int | None = None 

1241 min_value: float 

1242 max_value: float 

1243 min_raw_value: float 

1244 max_raw_value: float 

1245 

1246 

1247class SignalUpdate(TwinPadModel): 

1248 value: float | str | bool | int | None = None 

1249 forced_value: float | str | bool | int | None = None 

1250 timestamp: int | None = None 

1251 

1252 

1253class SignalType(str, Enum): 

1254 command = "command" 

1255 sensor = "sensor" 

1256 external_sensor = "external_sensor" 

1257 

1258 

1259SIGNALDATA_TYPES = { 

1260 "int": NumericSignalData, 

1261 "float": NumericSignalData, 

1262 "str": StringSignalData, 

1263 "bool": NumericSignalData, 

1264 "epoch": NumericSignalData, 

1265} 

1266 

1267 

1268class Signal(GenericMongo): 

1269 collection_name: ClassVar[str] = "signals" 

1270 

1271 signal_id: str 

1272 frequency: float 

1273 unit: str | None 

1274 description: str 

1275 type: SignalType 

1276 data_type: str 

1277 precision_digits: int | None 

1278 forcible: bool 

1279 

1280 digitization_function: DigitizationFunction | None 

1281 

1282 @property 

1283 def device(self) -> Device: 

1284 device_id = self.signal_id.split(".")[0] 

1285 device = Device.get_one_by_attribute("device_id", device_id) 

1286 return device 

1287 

1288 @cached_property 

1289 def signal_data_class(self): 

1290 if self.data_type in SIGNALDATA_TYPES: 

1291 return SIGNALDATA_TYPES[self.data_type] 

1292 if self.data_type.startswith("enum"): 

1293 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1294 raise ValueError(f"Unhandled python type: {self.data_type}") 

1295 

1296 @cached_property 

1297 def python_type(self): 

1298 if self.data_type in TYPES: 

1299 return TYPES[self.data_type] 

1300 if self.data_type.startswith("enum"): 

1301 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1302 return Literal[*choices] 

1303 raise ValueError(f"Unhandled python type: {self.data_type}") 

1304 

1305 @computed_field 

1306 @property 

1307 def status(self) -> SignalStatus: 

1308 now = time.time() 

1309 status = "up" 

1310 reason = "" 

1311 

1312 # See line 292 for explanation 

1313 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1314 last_bucket = None 

1315 if bucket is not None: 

1316 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1317 if last_bucket is None: 

1318 status = "no data" 

1319 reason = "signal does not exist" 

1320 return SignalStatus(status=status, reason=reason, delay=None) 

1321 

1322 try: 

1323 last_date = last_bucket["control"]["max"]["timestamp"] 

1324 last_date = last_date.replace(tzinfo=pytz.UTC) 

1325 last_value_ts = last_date.timestamp() 

1326 except IndexError: 

1327 last_value_ts = None 

1328 

1329 if last_value_ts is None: 

1330 delay = None 

1331 reason = "No data from signal" 

1332 else: 

1333 # Since device is a computed property, only compute it once 

1334 device = self.device 

1335 if device is not None and device.last_ping is not None: 

1336 last_value_ts = max(last_value_ts, device.last_ping) 

1337 delay = now - last_value_ts 

1338 if delay > DEVICE_TIMEOUT: 

1339 status = "down" 

1340 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1341 return SignalStatus(status=status, reason=reason, delay=delay) 

1342 

1343 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1344 command = Command( 

1345 sent_at=time.time(), 

1346 command_type="Signal command", 

1347 user_id=current_user.id, 

1348 ) 

1349 

1350 has_input_error = False 

1351 error_message = "" 

1352 

1353 if self.data_type.startswith("enum"): 

1354 enum_options = get_args(self.python_type) 

1355 

1356 if update_dict.value is not None and update_dict.value not in enum_options: 

1357 has_input_error = True 

1358 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1359 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1360 has_input_error = True 

1361 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1362 else: 

1363 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1364 has_input_error = True 

1365 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1366 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1367 has_input_error = True 

1368 error_message += ( 

1369 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1370 ) 

1371 

1372 if has_input_error: 

1373 command.response_time = 0 

1374 command.succeeded = False 

1375 command.description = f"Tried to modify signal {self.signal_id}" 

1376 response = {"error": True, "status_code": 400, "message": error_message} 

1377 else: 

1378 response = await send_signal_value(self.signal_id, update_dict) 

1379 command.receive_response(response) 

1380 

1381 Command.create(command) 

1382 return response 

1383 

1384 @classmethod 

1385 def get_from_signal_id(cls, signal_id) -> Self: 

1386 """Could be generic from mongo""" 

1387 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1388 if not raw_value: 

1389 return None 

1390 del raw_value["_id"] 

1391 return cls.dict_to_object(raw_value) 

1392 

1393 @classmethod 

1394 def get_all_ids(cls) -> list[str]: 

1395 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1396 

1397 return [signal["signal_id"] for signal in cursor] 

1398 

1399 async def number_samples(self): 

1400 collection = get_signal_collection(signal_id=self.signal_id) 

1401 if collection is None: 

1402 return 0 

1403 

1404 number_samples = collection.estimated_document_count() 

1405 

1406 number_samples_async_collection = await get_async_collection( 

1407 systems_async_database, "number_samples", create=True, time_series=True 

1408 ) 

1409 

1410 loop = asyncio.get_running_loop() 

1411 loop.create_task( 

1412 number_samples_async_collection.insert_one( 

1413 { 

1414 "timestamp": datetime.datetime.now(pytz.UTC), 

1415 "signal_id": self.signal_id, 

1416 "number_samples": number_samples, 

1417 } 

1418 ) 

1419 ) 

1420 

1421 return number_samples 

1422 

1423 def sample_datasize(self): 

1424 return signals_database.command("collstats", self.signal_id)["size"] 

1425 

1426 @classmethod 

1427 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1428 result = cls.collection().aggregate( 

1429 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1430 ) 

1431 

1432 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1433 

1434 

1435class ServicesStatus(TwinPadModel): 

1436 backend: str 

1437 cloud_broker: str 

1438 time_series_database: str 

1439 signal_storage: str 

1440 heartbeat_storage: str 

1441 data_analyzer: str 

1442 

1443 @classmethod 

1444 def check(cls) -> Self: 

1445 return cls( 

1446 cloud_broker=ping(RABBITMQ_HOST), 

1447 backend="up", 

1448 time_series_database=ping(MONGO_HOST), 

1449 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1450 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1451 data_analyzer=ping(DATA_ANALYZER_HOST), 

1452 ) 

1453 

1454 

1455def ping(host): 

1456 try: 

1457 if ping3.ping(host, timeout=0.8): 

1458 return "up" 

1459 except PermissionError: 

1460 pass 

1461 return "down" 

1462 

1463 

1464class Event(GenericMongo): 

1465 collection_name: ClassVar[str] = "events" 

1466 

1467 name: str 

1468 timestamp: float 

1469 event_rule_id: str 

1470 

1471 @computed_field 

1472 @cached_property 

1473 def event_rule(self) -> "EventRule": 

1474 return EventRule.get_from_id(self.event_rule_id) 

1475 

1476 @classmethod 

1477 def dict_to_object(cls, dict_): 

1478 """Refine to convert timestamp to datetime for mongodb.""" 

1479 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1480 return super().dict_to_object(dict_) 

1481 

1482 

1483class TwinPadActivity(GenericMongo): 

1484 timestamp: float 

1485 amount: int 

1486 

1487 @classmethod 

1488 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

1489 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1490 number_events_collection = get_collection(systems_database, "number_events") 

1491 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1492 items = [] 

1493 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1494 if number_events_collection is None or recompute_amount: 

1495 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1496 number_events_collection.delete_many({}) 

1497 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1498 if first_event is None: 

1499 return items 

1500 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1501 tzinfo=pytz.UTC 

1502 ) 

1503 while last_computed_day < TODAY: 

1504 day_nb_events = events_collection.count_documents( 

1505 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1506 ) 

1507 if day_nb_events > 0: 

1508 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

1509 last_computed_day += ONE_DAY_OFFSET 

1510 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1511 if number_events_today > 0: 

1512 number_events_collection.delete_many({"timestamp": TODAY}) 

1513 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

1514 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1515 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1516 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1517 for day in number_events: 

1518 day["timestamp"] = day["timestamp"].timestamp() 

1519 items.append(cls.mongo_dict_to_object(day)) 

1520 return items 

1521 

1522 @classmethod 

1523 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1524 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1525 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1526 signals_number_samples_collection = get_collection( 

1527 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

1528 ) 

1529 items = [] 

1530 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1531 if number_samples_collection is None or recompute_amount: 

1532 number_samples_collection = get_collection( 

1533 systems_database, "number_received_samples", create=True, time_series=True 

1534 ) 

1535 number_samples_collection.delete_many({}) 

1536 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

1537 if first_sample is None: 

1538 return items 

1539 # compute from day of first found event 

1540 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

1541 tzinfo=pytz.UTC 

1542 ) 

1543 while last_computed_day < TODAY: 

1544 number_samples_request = signals_number_samples_collection.aggregate( 

1545 [ 

1546 { 

1547 "$match": { 

1548 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

1549 } 

1550 }, 

1551 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1552 ] 

1553 ).to_list() 

1554 if len(number_samples_request) == 0: 

1555 number_samples = 0 

1556 else: 

1557 number_samples = number_samples_request[0].get("number_samples", 0) 

1558 if number_samples > 0: 

1559 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

1560 last_computed_day += ONE_DAY_OFFSET 

1561 number_samples_request = signals_number_samples_collection.aggregate( 

1562 [ 

1563 {"$match": {"timestamp": {"$gte": TODAY}}}, 

1564 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1565 ] 

1566 ).to_list() 

1567 if len(number_samples_request) == 0: 

1568 number_samples_today = 0 

1569 else: 

1570 number_samples_today = number_samples_request[0].get("number_samples", 0) 

1571 if number_samples_today > 0: 

1572 number_samples_collection.delete_many({"timestamp": TODAY}) 

1573 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

1574 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1575 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1576 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1577 for day in number_events: 

1578 day["timestamp"] = day["timestamp"].timestamp() 

1579 items.append(cls.mongo_dict_to_object(day)) 

1580 return items 

1581 

1582 @classmethod 

1583 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1584 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1585 number_commands_collection = get_collection(systems_database, "number_commands") 

1586 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

1587 items = [] 

1588 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1589 if number_commands_collection is None or recompute_amount: 

1590 number_commands_collection = get_collection( 

1591 systems_database, "number_commands", create=True, time_series=True 

1592 ) 

1593 number_commands_collection.delete_many({}) 

1594 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

1595 if first_command is None: 

1596 return items 

1597 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

1598 tzinfo=pytz.UTC 

1599 ) 

1600 while last_computed_day < TODAY: 

1601 day_nb_commands = commands_collection.count_documents( 

1602 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1603 ) 

1604 if day_nb_commands > 0: 

1605 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

1606 last_computed_day += ONE_DAY_OFFSET 

1607 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1608 if number_commands_today > 0: 

1609 number_commands_collection.delete_many({"timestamp": TODAY}) 

1610 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

1611 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1612 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1613 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1614 for day in number_commands: 

1615 day["timestamp"] = day["timestamp"].timestamp() 

1616 items.append(cls.mongo_dict_to_object(day)) 

1617 return items 

1618 

1619 

1620class EventRule(GenericMongo): 

1621 collection_name: ClassVar[str] = "event_rules" 

1622 

1623 name: str 

1624 formula: str 

1625 variables: list[str] 

1626 

1627 @computed_field 

1628 @cached_property 

1629 def number_events(self) -> int: 

1630 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1631 

1632 

1633class Company(GenericMongo): 

1634 collection_name: ClassVar[str] = "companies" 

1635 name: str 

1636 

1637 

1638class Campaign(GenericMongo): 

1639 collection_name: ClassVar[str] = "campaigns" 

1640 

1641 # Properties 

1642 id: str | None = None 

1643 name: str 

1644 description: str | None = None 

1645 

1646 @classmethod 

1647 def create(cls, campaign: Self): 

1648 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1649 if new_campaign is None: 

1650 return None 

1651 return {"campaign_id": str(new_campaign.inserted_id)} 

1652 

1653 @classmethod 

1654 def update(cls, campaign: Self): 

1655 updated_campaign = cls.collection().find_one_and_update( 

1656 {"_id": ObjectId(campaign.id)}, 

1657 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1658 return_document=ReturnDocument.AFTER, 

1659 ) 

1660 return updated_campaign 

1661 

1662 @classmethod 

1663 def delete(cls, campaign_id): 

1664 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1665 return deleted_user 

1666 

1667 

1668class Phase(GenericMongo): 

1669 collection_name: ClassVar[str] = "phases" 

1670 

1671 # Properties 

1672 id: str | None = None 

1673 name: str 

1674 description: str | None = None 

1675 start_at: float 

1676 end_at: float 

1677 

1678 # FK 

1679 campaign_id: str 

1680 

1681 # @classmethod 

1682 # def get_by_date(cls, datetime: float): 

1683 # phases = [] 

1684 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1685 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1686 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1687 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1688 # if phases is None: 

1689 # return None 

1690 # return phases 

1691 

1692 @classmethod 

1693 def create(cls, phase: Self): 

1694 phase = Phase( 

1695 name=phase.name, 

1696 description=phase.description, 

1697 start_at=phase.start_at, 

1698 end_at=phase.end_at, 

1699 campaign_id=phase.campaign_id, 

1700 ) 

1701 phase_collection = get_collection(systems_database, "phases", create=True) 

1702 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1703 if new_phase is None: 

1704 return None 

1705 return {"phase_id": str(new_phase.inserted_id)} 

1706 

1707 @classmethod 

1708 def update(cls, phase: Self): 

1709 updated_phase = cls.collection().find_one_and_update( 

1710 {"_id": ObjectId(phase.id)}, 

1711 { 

1712 "$set": { 

1713 "name": phase.name, 

1714 "description": phase.description, 

1715 "start_at": phase.start_at, 

1716 "end_at": phase.end_at, 

1717 } 

1718 }, 

1719 return_document=ReturnDocument.AFTER, 

1720 ) 

1721 return updated_phase 

1722 

1723 @classmethod 

1724 def delete(cls, phase_id): 

1725 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1726 return delete_phase 

1727 

1728 @classmethod 

1729 def deleteMany(cls, campaign_id): 

1730 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1731 return delete_phases 

1732 

1733 

1734class CustomViewCreation(GenericMongo): 

1735 collection_name: ClassVar[str] = "custom_views" 

1736 

1737 name: str 

1738 configuration: list 

1739 

1740 

1741class CustomView(CustomViewCreation): 

1742 # Properties 

1743 id: str | None = None 

1744 

1745 # Foreign Key 

1746 user_id: str 

1747 

1748 # # Methods 

1749 # @classmethod 

1750 # def create(cls, form_custom_view: Self, user_id) -> list: 

1751 # custom_view = CustomView( 

1752 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1753 # ) 

1754 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1755 # return new_custom_view 

1756 

1757 # @classmethod 

1758 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1759 # updated_custom_view = cls.collection().find_one_and_update( 

1760 # {"_id": ObjectId(custom_view_id)}, 

1761 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1762 # return_document=ReturnDocument.AFTER, 

1763 # ) 

1764 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1765 # del updated_custom_view["_id"] 

1766 # return cls(**updated_custom_view) 

1767 

1768 # @classmethod 

1769 # def delete(cls, custom_view_id) -> bool: 

1770 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1771 # return deleted_custom_view.acknowledged 

1772 

1773 

1774CustomViewUpdate = create_update_model(CustomView) 

1775 

1776 

1777class Video(GenericMongo): 

1778 collection_name: ClassVar[str] = "videos" 

1779 

1780 # Properties 

1781 name: str 

1782 ip_addr: str 

1783 username: str | None = None 

1784 password: str | None = None 

1785 

1786 # Methods 

1787 @classmethod 

1788 def get_all(cls, sort_by="_id") -> list[Self]: 

1789 items = [] 

1790 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1791 items.append(cls.mongo_dict_to_object(dict_)) 

1792 return items 

1793 

1794 @classmethod 

1795 def get_video(cls, camera_id: ObjectId): 

1796 camera = cls.get_from_id(camera_id) 

1797 if camera is not None: 

1798 return camera.name 

1799 return None 

1800 

1801 

1802class Command(GenericMongo): 

1803 collection_name: ClassVar[str] = "commands" 

1804 

1805 # Properties 

1806 timestamp: datetime.datetime = None 

1807 sent_at: float 

1808 response_time: float = 0.0 

1809 command_type: str 

1810 description: str = "" 

1811 succeeded: bool = False 

1812 

1813 # Foreign key 

1814 user_id: str 

1815 

1816 @classmethod 

1817 def collection(cls): 

1818 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1819 

1820 @classmethod 

1821 def create(cls, command: Self): 

1822 command = cls( 

1823 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1824 sent_at=command.sent_at, 

1825 response_time=command.response_time, 

1826 command_type=command.command_type, 

1827 description=command.description, 

1828 succeeded=command.succeeded, 

1829 user_id=command.user_id, 

1830 ) 

1831 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

1832 if new_command is None: 

1833 return None 

1834 return {"command_id": str(new_command.inserted_id)} 

1835 

1836 def receive_response(self, response: dict): 

1837 self.response_time = time.time() - self.sent_at 

1838 self.succeeded = response.get("error", True) is False 

1839 if self.description == "": 

1840 self.description += response.get("message", "").rstrip() 

1841 

1842 

1843class SignalsPresetCreation(GenericMongo): 

1844 name: str 

1845 signal_ids: list[str] 

1846 

1847 

1848class SignalsPreset(SignalsPresetCreation): 

1849 collection_name: ClassVar[str] = "signals_presets" 

1850 

1851 user_id: str 

1852 

1853 @classmethod 

1854 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

1855 signals_preset = cls( 

1856 user_id=user_id, 

1857 name=signals_preset.name, 

1858 signal_ids=signals_preset.signal_ids, 

1859 ) 

1860 

1861 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

1862 

1863 return str(new_signal_preset.inserted_id) 

1864 

1865 

1866SignalsPresetUpdate = create_update_model(SignalsPreset) 

1867 

1868 

1869class LineStyle(str, Enum): 

1870 solid = "solid" 

1871 dotted = "dotted" 

1872 dashed = "dashed" 

1873 

1874 

1875class SignalAppearance: 

1876 value_color: str 

1877 forced_value_color: str 

1878 

1879 

1880class GraphThemeCreation(GenericMongo): 

1881 collection_name: ClassVar[str] = "graph_themes" 

1882 

1883 name: str 

1884 signal_id: str 

1885 value_color: str = "" 

1886 forced_value_color: str = "" 

1887 value_line_style: LineStyle = LineStyle.solid 

1888 forced_value_line_style: LineStyle = LineStyle.solid 

1889 private: bool = True 

1890 

1891 

1892class PublicGraphTheme(GraphThemeCreation): 

1893 created_by_user: bool 

1894 in_user_library: bool 

1895 active_for_user: bool 

1896 

1897 _current_user_id: str = "" 

1898 

1899 @classproperty 

1900 def custom_pipeline_steps(cls) -> dict[str, list]: 

1901 return { 

1902 "created_by_user": [ 

1903 { 

1904 "$addFields": { 

1905 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

1906 } 

1907 } 

1908 ], 

1909 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

1910 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

1911 ], 

1912 "in_user_library": [ 

1913 { 

1914 "$addFields": { 

1915 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

1916 } 

1917 } 

1918 ], 

1919 "active_for_user": [ 

1920 { 

1921 "$addFields": { 

1922 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

1923 } 

1924 } 

1925 ], 

1926 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

1927 "active": [ 

1928 { 

1929 "$addFields": { 

1930 "active": "$$REMOVE", 

1931 } 

1932 } 

1933 ], 

1934 "creator_id": [ 

1935 { 

1936 "$addFields": { 

1937 "creator_id": "$$REMOVE", 

1938 } 

1939 } 

1940 ], 

1941 } 

1942 

1943 @classmethod 

1944 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

1945 cls._current_user_id = user_id 

1946 return super().response_from_query(query) 

1947 

1948 @classmethod 

1949 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

1950 query.in_user_library = "true" 

1951 return cls.response_from_query(query, user_id) 

1952 

1953 @classmethod 

1954 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

1955 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

1956 

1957 @classmethod 

1958 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

1959 cls._current_user_id = user_id 

1960 return super().get_by_attribute(attribute_name, attribute_value) 

1961 

1962 @classmethod 

1963 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

1964 cls._current_user_id = user_id 

1965 return super().get_one_by_attribute(attribute_name, attribute_value) 

1966 

1967 @classmethod 

1968 def get_all(cls, sort_by: str, user_id: str): 

1969 cls._current_user_id = user_id 

1970 return super().get_all(sort_by) 

1971 

1972 @classmethod 

1973 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

1974 pipeline = [ 

1975 { 

1976 "$match": { 

1977 "active": {"$eq": user_id}, 

1978 "signal_id": {"$in": signal_ids}, 

1979 } 

1980 }, 

1981 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

1982 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

1983 { 

1984 "$project": { 

1985 "_id": 0, 

1986 "signal_id": 1, 

1987 "value_color": 1, 

1988 "forced_value_color": 1, 

1989 "value_line_style": 1, 

1990 "forced_value_line_style": 1, 

1991 } 

1992 }, 

1993 ] 

1994 

1995 result = {} 

1996 

1997 cursor = cls.collection().aggregate(pipeline) 

1998 for document in cursor: 

1999 signal_id = document["signal_id"] 

2000 del document["signal_id"] 

2001 result[signal_id] = document 

2002 

2003 return result 

2004 

2005 

2006GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2007 

2008 

2009class PrivateGraphTheme(GraphThemeCreation): 

2010 # private 

2011 creator_id: str 

2012 in_library: list[str] 

2013 active: list[str] 

2014 

2015 @classmethod 

2016 def create( 

2017 cls, 

2018 creator_id: str, 

2019 name: str, 

2020 signal_id: str, 

2021 value_color: str, 

2022 forced_value_color: str, 

2023 value_line_style: LineStyle, 

2024 forced_value_line_style: LineStyle, 

2025 private: bool, 

2026 ): 

2027 color_setting = cls( 

2028 creator_id=creator_id, 

2029 name=name, 

2030 signal_id=signal_id, 

2031 value_color=value_color, 

2032 forced_value_color=forced_value_color, 

2033 value_line_style=value_line_style, 

2034 forced_value_line_style=forced_value_line_style, 

2035 private=private, 

2036 in_library=[creator_id], 

2037 active=[creator_id], 

2038 ) 

2039 

2040 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2041 color_setting.id = str(new_color_setting.inserted_id) 

2042 return color_setting 

2043 

2044 def update(self, update_dict: dict, user_id: str): 

2045 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2046 if in_user_lib and user_id not in self.in_library: 

2047 self.in_library.append(user_id) 

2048 elif not in_user_lib and user_id in self.in_library: 

2049 self.in_library.remove(user_id) 

2050 update_dict["in_library"] = self.in_library 

2051 del update_dict["in_user_library"] 

2052 

2053 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2054 if active_for_user and user_id not in self.active: 

2055 self.active.append(user_id) 

2056 elif not active_for_user and user_id in self.active: 

2057 self.active.remove(user_id) 

2058 update_dict["active"] = self.active 

2059 del update_dict["active_for_user"] 

2060 

2061 if update_dict.get("created_by_user") is not None: 

2062 del update_dict["created_by_user"] 

2063 

2064 self.collection().find_one_and_update( 

2065 {"_id": ObjectId(self.id)}, 

2066 {"$set": update_dict}, 

2067 ) 

2068 

2069 return {}