Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%

1210 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-05 09:52 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23import h5py 

24 

25# from scipy import signal as signal_scipy 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 devices_states_database, 

36) 

37from twinpad_backend.responses import ListResponse 

38from twinpad_backend.messages import send_mode_change, send_signal_value 

39 

40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

41 

42 

43RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

44MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

45SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

46HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

47DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

48 

49DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

50NUMBER_SAMPLES_DATABASE_UPDATE = 120 

51 

52logger = logging.getLogger("uvicorn.error") 

53 

54 

55class classproperty: 

56 """ 

57 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

58 Found here: https://stackoverflow.com/a/76301341 

59 """ 

60 

61 def __init__(self, func): 

62 self.fget = func 

63 

64 def __get__(self, _, owner): 

65 return self.fget(owner) 

66 

67 

68def create_update_model(model): 

69 fields = {} 

70 

71 for field_name, field_annotation in model.model_fields.items(): 

72 if field_name != "id": 

73 fields[field_name] = (field_annotation.annotation | None, None) 

74 

75 query_name = model.__name__ + "Update" 

76 return create_model(query_name, **fields) 

77 

78 

79def get_utc_date_from_timestamp(timestamp: float): 

80 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

81 

82 

83def downsample_list(time_vector: list, values: list, max_number_samples: int): 

84 if len(time_vector) < max_number_samples: 

85 return time_vector, values 

86 

87 time_vector_copy = copy.deepcopy(time_vector) 

88 values_copy = copy.deepcopy(values) 

89 

90 none_group_bounds = [] 

91 none_group_index = -1 

92 index = -1 

93 # LTTB doesn't handle None values so remove them 

94 while values_copy.count(None) > 0: 

95 # Store bounds of None value groups so we can insert them back after the downsampling 

96 if (new_index := values_copy.index(None)) != index: 

97 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

98 none_group_index += 1 

99 elif len(none_group_bounds[none_group_index]) < 2: 

100 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

101 else: 

102 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

103 values_copy.pop(new_index) 

104 index = new_index 

105 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

106 

107 try: 

108 values_array = npy.array([time_vector_copy, values_copy]).T 

109 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

110 

111 new_time_vector = interpolated_values[:, 0].tolist() 

112 new_values = interpolated_values[:, 1].tolist() 

113 except ValueError: 

114 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

115 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

116 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

117 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

118 return new_time_vector, new_values_nan_to_none 

119 

120 # insert back None values at the correct timestamps 

121 for none_group in none_group_bounds: 

122 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

123 new_time_vector[start_index:start_index] = none_group 

124 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

125 

126 return new_time_vector, new_values 

127 

128 

129def is_of_type(value, wanted_type): 

130 if wanted_type is float: 

131 return isinstance(value, (int, float)) 

132 return isinstance(value, wanted_type) 

133 

134 

135# Models 

136class TwinPadModel(BaseModel): 

137 @classmethod 

138 def dict_to_object(cls, dict_): 

139 return cls.model_validate(dict_) 

140 

141 def to_dict(self, exclude=None): 

142 dict_ = self.model_dump(exclude=exclude) 

143 return dict_ 

144 

145 

146class GenericMongo(TwinPadModel): 

147 id: str | None = None 

148 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

149 

150 @classmethod 

151 def collection(cls): 

152 return get_collection(systems_database, cls.collection_name, create=True) 

153 

154 @classmethod 

155 def response_from_query(cls, query) -> ListResponse[Self]: 

156 request_filters = query.mongodb_filter() 

157 items = [] 

158 if ":" in query.sort_by: 

159 sort_field, sort_order = query.sort_by.split(":") 

160 sort_order = int(sort_order) 

161 else: 

162 sort_field = query.sort_by 

163 sort_order = 1 

164 collection = get_collection(systems_database, cls.collection_name, create=True) 

165 total = collection.count_documents(request_filters) 

166 

167 pipeline = [] 

168 added_properties = [] 

169 if "$and" in request_filters: 

170 for request_filter in request_filters["$and"]: 

171 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

172 if filtered_property in request_filter: 

173 pipeline.extend(pipeline_steps) 

174 added_properties.append(filtered_property) 

175 else: 

176 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

177 if filtered_property in request_filters: 

178 pipeline.extend(pipeline_steps) 

179 added_properties.append(filtered_property) 

180 pipeline.append({"$match": request_filters}) 

181 if sort_field in cls.custom_pipeline_steps: 

182 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

183 added_properties.append(sort_field) 

184 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}]) 

185 

186 if (query.limit is not None) and (query.limit != 0): 

187 pipeline.append({"$limit": query.limit}) 

188 

189 for filtered_property, step in cls.custom_pipeline_steps.items(): 

190 if filtered_property not in added_properties: 

191 pipeline.extend(step) 

192 

193 cursor = collection.aggregate(pipeline) 

194 

195 for item_dict in cursor: 

196 items.append(cls.mongo_dict_to_object(item_dict)) 

197 

198 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

199 

200 @classmethod 

201 def get_from_id(cls, item_id) -> Self | None: 

202 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

203 

204 @classmethod 

205 def mongo_dict_to_object(cls, mongo_dict): 

206 mongo_dict["id"] = str(mongo_dict["_id"]) 

207 del mongo_dict["_id"] 

208 return cls.dict_to_object(mongo_dict) 

209 

210 @classmethod 

211 def get_by_attribute(cls, attribute_name: str, attribute_value): 

212 """Returns all items that match the attribute with value.""" 

213 pipeline = [] 

214 if attribute_name in cls.custom_pipeline_steps: 

215 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

216 pipeline.append({"$match": {attribute_name: attribute_value}}) 

217 for key, step in cls.custom_pipeline_steps.items(): 

218 if key != attribute_name: 

219 pipeline.extend(step) 

220 items = cls.collection().aggregate(pipeline) 

221 if items is None: 

222 return None 

223 return [cls.mongo_dict_to_object(d) for d in items] 

224 

225 @classmethod 

226 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

227 pipeline = [] 

228 if attribute_name in cls.custom_pipeline_steps: 

229 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

230 pipeline.append({"$match": {attribute_name: attribute_value}}) 

231 pipeline.append({"$limit": 1}) 

232 for key, step in cls.custom_pipeline_steps.items(): 

233 if key != attribute_name: 

234 pipeline.extend(step) 

235 items = cls.collection().aggregate(pipeline).to_list() 

236 if len(items) == 0: 

237 return None 

238 return cls.mongo_dict_to_object(items[0]) 

239 

240 @classmethod 

241 def get_all(cls, sort_by="_id") -> list[Self]: 

242 items = [] 

243 pipeline = [] 

244 if sort_by in cls.custom_pipeline_steps: 

245 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

246 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

247 for key, step in cls.custom_pipeline_steps.items(): 

248 if key != sort_by: 

249 pipeline.extend(step) 

250 for dict_ in cls.collection().aggregate(pipeline): 

251 items.append(cls.mongo_dict_to_object(dict_)) 

252 return items 

253 

254 @classmethod 

255 def get_number_documents(cls): 

256 collection = get_collection(systems_database, cls.collection_name) 

257 if collection is None: 

258 return 0 

259 return collection.count_documents({}) 

260 

261 def insert(self): 

262 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

263 self.id = str(insert_result.inserted_id) 

264 return self.id 

265 

266 def update(self, update_dict): 

267 for key, value in update_dict.items(): 

268 setattr(self, key, value) 

269 self.collection().find_one_and_update( 

270 {"_id": ObjectId(self.id)}, 

271 {"$set": update_dict}, 

272 return_document=ReturnDocument.AFTER, 

273 ) 

274 

275 return self 

276 

277 def delete(self): 

278 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

279 return result.deleted_count > 0 

280 

281 

282class User(GenericMongo): 

283 collection_name: ClassVar[str] = "users" 

284 

285 firstname: str 

286 lastname: str 

287 email: str 

288 password: str 

289 is_active: bool | None = False 

290 is_admin: bool | None = False 

291 is_connected: bool | None = False 

292 company_id: str | None = None 

293 

294 def to_dict(self, exclude=None): 

295 if exclude is None: 

296 exclude = {"password"} 

297 return GenericMongo.to_dict(self, exclude=exclude) 

298 

299 @classmethod 

300 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

301 users = cls.get_all() 

302 if not users: 

303 is_admin = True 

304 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

305 user_collection = get_collection(systems_database, "users", create=True) 

306 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

307 if new_user is None: 

308 return None 

309 return {"user_id": str(new_user.inserted_id)} 

310 

311 @classmethod 

312 def update_info(cls, user: "UserUpdate", user_id: str): 

313 updated_user = cls.collection().find_one_and_update( 

314 {"_id": ObjectId(user_id)}, 

315 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

316 return_document=ReturnDocument.AFTER, 

317 ) 

318 updated_user["id"] = str(updated_user["_id"]) 

319 del (updated_user["_id"], updated_user["is_connected"]) 

320 return cls(**updated_user) 

321 

322 

323UserUpdate = create_update_model(User) 

324 

325 

326class Mode(TwinPadModel): 

327 mode_id: int 

328 name: str 

329 frequency_multiplier: float 

330 min_frequency: float 

331 

332 

333class DeviceUpdate(TwinPadModel): 

334 mode_id: int 

335 

336 

337class Device(GenericMongo): 

338 collection_name: ClassVar[str] = "devices" 

339 

340 device_id: str 

341 name: str 

342 description: str = "" 

343 modes: list[Mode] 

344 current_mode_id: int | None = None 

345 last_ping: float | None = None 

346 petri_network: Any 

347 pid: Any 

348 load: float | None = None 

349 tokens: list[int] = Field(default_factory=list) 

350 status: str 

351 

352 async def change_mode(self, update_dict, current_user: User): 

353 has_error = False 

354 

355 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

356 has_error = True 

357 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

358 elif self.current_mode_id is not None: 

359 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

360 else: 

361 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

362 command = Command( 

363 sent_at=time.time(), 

364 command_type="Mode change", 

365 description=description, 

366 user_id=current_user.id, 

367 ) 

368 

369 if has_error: 

370 command.response_time = 0 

371 command.succeeded = False 

372 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

373 else: 

374 response = await send_mode_change(self.device_id, update_dict.mode_id) 

375 command.receive_response(response) 

376 

377 Command.create(command) 

378 return response 

379 

380 @classmethod 

381 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

382 devices_by_id = {} 

383 for signal_id in signal_ids: 

384 device_id = signal_id.split(".")[0] 

385 if device_id not in devices_by_id: 

386 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

387 return devices_by_id 

388 

389 

390class DeviceSetup(GenericMongo): 

391 collection_name: ClassVar[str] = "device_setups" 

392 

393 device_ids: list[str] 

394 active: bool = False 

395 variable_mapping: dict[str, str] 

396 

397 

398DeviceSetupUpdate = create_update_model(DeviceSetup) 

399 

400 

401class DeviceState(GenericMongo): 

402 collection_name: ClassVar[str] = "devices_states" 

403 

404 timestamp: float 

405 mode: str | None = None 

406 load: float | None = None 

407 tokens: list[int] = Field(default_factory=list) 

408 modified_properties: list[str] = Field(default_factory=list) 

409 

410 @classmethod 

411 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

412 req_filter = query.mongodb_filter() 

413 items = [] 

414 if ":" in query.sort_by: 

415 sort_field, sort_order = query.sort_by.split(":") 

416 sort_order = int(sort_order) 

417 else: 

418 sort_field = query.sort_by 

419 sort_order = 1 

420 collection = get_collection(devices_states_database, device_id) 

421 if collection is None: 

422 total = 0 

423 cursor = [] 

424 else: 

425 total = collection.count_documents(req_filter) 

426 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

427 if (query.limit is not None) and (query.limit != 0): 

428 cursor = cursor.limit(query.limit) 

429 for item_dict in cursor: 

430 items.append( 

431 cls( 

432 timestamp=item_dict.get("precise_timestamp"), 

433 mode=item_dict.get("mode", None), 

434 load=item_dict.get("load", None), 

435 tokens=item_dict.get("tokens", Field(default_factory=list)), 

436 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

437 ) 

438 ) 

439 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

440 

441 

442class SignalSample(TwinPadModel): 

443 signal_id: str 

444 timestamp: float 

445 value: float | int | str | bool | None 

446 forced_value: float | int | str | bool | None = None 

447 

448 @classmethod 

449 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

450 

451 collection = get_signal_collection(signal_id) 

452 if collection is None: 

453 return None 

454 

455 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

456 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

457 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

458 first_bucket = None 

459 if bucket is not None: 

460 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

461 if first_bucket is not None: 

462 sample_data = collection.find_one( 

463 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

464 ) 

465 else: 

466 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

467 

468 if sample_data is None: 

469 return None 

470 

471 timestamp = sample_data["precise_timestamp"] 

472 

473 return cls( 

474 signal_id=signal_id, 

475 timestamp=timestamp, 

476 value=sample_data.get("value", None), 

477 forced_value=sample_data.get("forced_value", None), 

478 ) 

479 

480 @classmethod 

481 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

482 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

483 

484 @classmethod 

485 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

486 

487 collection = get_signal_collection(signal_id) 

488 if collection is None: 

489 return None 

490 

491 # Same workaround as above function, very effective to narrow down big sets of data 

492 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

493 last_bucket = None 

494 if bucket is not None: 

495 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

496 if last_bucket is not None: 

497 sample_data = collection.find_one( 

498 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

499 sort={"precise_timestamp": -1}, 

500 ) 

501 else: 

502 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

503 

504 if sample_data is None: 

505 return None 

506 

507 timestamp = sample_data["precise_timestamp"] 

508 

509 if device is None: 

510 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

511 if device is not None and device.last_ping is not None: 

512 if timestamp is None: 

513 timestamp = device.last_ping 

514 else: 

515 timestamp = max(timestamp, device.last_ping) 

516 return cls( 

517 signal_id=signal_id, 

518 timestamp=timestamp, 

519 value=sample_data.get("value", None), 

520 forced_value=sample_data.get("forced_value", None), 

521 ) 

522 

523 @classmethod 

524 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

525 collection = get_signal_collection(signal_id) 

526 if collection is None: 

527 return None 

528 

529 sample_data = collection.find_one( 

530 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1} 

531 ) 

532 if sample_data is None: 

533 return None 

534 

535 return cls( 

536 signal_id=signal_id, 

537 timestamp=sample_data.get("precise_timestamp"), 

538 value=sample_data.get("value"), 

539 forced_value=sample_data.get("forced_value"), 

540 ) 

541 

542 @classmethod 

543 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

544 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

545 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

546 

547 @classmethod 

548 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

549 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

550 

551 

552class SignalData(TwinPadModel): 

553 signal_id: str 

554 forcible: bool = True 

555 time_vector: list[float] 

556 values: list[float | int | str | None] 

557 forced_values: list[float | int | str | None] 

558 

559 data_start: float | None = None 

560 data_end: float | None = None 

561 

562 number_samples: int = 0 

563 number_samples_db: int = 0 

564 

565 db_query_time: float = 0.0 

566 init_time: float = 0.0 

567 data_processing_time: float = 0.0 

568 

569 @classmethod 

570 def get_from_signal_id( 

571 cls, 

572 signal_id: str, 

573 min_timestamp: float = None, 

574 max_timestamp: float = None, 

575 window_min_timestamp: float = None, 

576 window_max_timestamp: float = None, 

577 interpolate_bounds: bool = True, 

578 max_documents: int = None, 

579 ) -> Self: 

580 

581 now = time.time() 

582 

583 req_signal = {} 

584 if min_timestamp is not None: 

585 req_signal.setdefault("timestamp", {}) 

586 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

587 if max_timestamp is not None: 

588 req_signal.setdefault("timestamp", {}) 

589 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

590 

591 collection = get_signal_collection(signal_id) 

592 if collection is None: 

593 return cls( 

594 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

595 ) 

596 

597 db_req_start = time.time() 

598 

599 sort_step = {"$sort": {"precise_timestamp": 1}} 

600 number_results = collection.count_documents(req_signal) 

601 

602 pipeline = [] 

603 if req_signal: 

604 pipeline.append({"$match": req_signal}) # Filter data if needed 

605 

606 pipeline.extend( 

607 [ 

608 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

609 sort_step, 

610 ] 

611 ) 

612 

613 if max_documents is not None and max_documents < number_results: 

614 unsampling_ratio = math.ceil(number_results / max_documents) 

615 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

616 pipeline.extend( 

617 [ 

618 { 

619 "$setWindowFields": { 

620 "sortBy": {"precise_timestamp": 1}, 

621 "output": {"index": {"$documentNumber": {}}}, 

622 } 

623 }, 

624 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

625 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

626 {"$replaceRoot": {"newRoot": "$doc"}}, 

627 {"$unset": ["index", "group_id"]}, 

628 {"$sort": {"precise_timestamp": 1}}, 

629 ] 

630 ) 

631 

632 # logger.info(f"pipeline: %s", str(pipeline)) 

633 cursor = collection.aggregate(pipeline) 

634 db_req_time = time.time() - db_req_start 

635 

636 init_time = time.time() 

637 

638 results = cursor.to_list() 

639 time_vector = [] 

640 values = [] 

641 forced_values = [] 

642 for s in results: 

643 time_vector.append(s["precise_timestamp"]) 

644 values.append(s.get("value", None)) 

645 forced_values.append(s.get("forced_value", None)) 

646 

647 signal = Signal.get_from_signal_id(signal_id) 

648 class_ = signal.signal_data_class 

649 

650 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

651 time_vector, values, forced_values = cls.interpolate_bounds( 

652 class_, 

653 collection, 

654 signal_id, 

655 time_vector, 

656 values, 

657 forced_values, 

658 window_min_timestamp, 

659 window_max_timestamp, 

660 ) 

661 

662 if values: 

663 # TODO: check below. a bit strange 

664 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

665 # Adding last value as it should be repeated 

666 time_vector.append(now) 

667 values.append(values[-1]) 

668 forced_values.append(forced_values[-1]) 

669 

670 init_time = time.time() - init_time 

671 

672 # See line 292 for explanation 

673 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

674 first_bucket = None 

675 if bucket is not None: 

676 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

677 if first_bucket is not None: 

678 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

679 else: 

680 data_start = None 

681 

682 last_bucket = None 

683 if bucket is not None: 

684 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

685 if last_bucket is not None: 

686 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

687 else: 

688 data_end = None 

689 

690 return class_( 

691 signal_id=signal_id, 

692 forcible=signal.forcible, 

693 time_vector=time_vector, 

694 values=values, 

695 forced_values=forced_values, 

696 data_start=data_start, 

697 data_end=data_end, 

698 number_samples=len(values), 

699 number_samples_db=number_results, 

700 db_query_time=db_req_time, 

701 init_time=init_time, 

702 ) 

703 

704 @staticmethod 

705 def interpolate_bounds( 

706 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

707 ): 

708 sample_right = None 

709 # Fetching right side value & interpolation 

710 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

711 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

712 sample_right = collection.find_one( 

713 { 

714 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

715 "value": {"$exists": True}, 

716 }, 

717 sort={"precise_timestamp": -1}, 

718 ) 

719 if sample_right: 

720 if time_vector: 

721 right_sd = class_( 

722 signal_id=signal_id, 

723 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

724 values=[values[-1], sample_right.get("value", None)], 

725 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

726 ) 

727 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

728 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

729 else: 

730 max_ts_value = sample_right.get("value", None) 

731 max_ts_forced_value = sample_right.get("forced_value", None) 

732 time_vector.append(window_max_timestamp) 

733 values.append(max_ts_value) 

734 forced_values.append(max_ts_forced_value) 

735 

736 # Fetching left side value & interpolation 

737 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

738 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

739 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

740 sample_left = sample_right 

741 sample_left = collection.find_one( 

742 { 

743 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

744 "value": {"$exists": True}, 

745 }, 

746 sort={"precise_timestamp": -1}, 

747 ) 

748 

749 if sample_left: 

750 if time_vector: 

751 left_sd = class_( 

752 signal_id=signal_id, 

753 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

754 values=[sample_left["value"], values[0]], 

755 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

756 ) 

757 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

758 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

759 else: 

760 min_ts_value = sample_left.get("value", None) 

761 min_ts_forced_value = sample_left.get("forced_value", None) 

762 time_vector.insert(0, window_min_timestamp) 

763 values.insert(0, min_ts_value) 

764 forced_values.insert(0, min_ts_forced_value) 

765 

766 return time_vector, values, forced_values 

767 

768 def interpolate_values(self, new_time_vector: list[float]): 

769 return self.interpolate(new_time_vector, self.values) 

770 

771 def interpolate_forced_values(self, new_time_vector: list[float]): 

772 return self.interpolate(new_time_vector, self.forced_values) 

773 

774 def uniform_desampling(self, number_samples_max: int) -> Self: 

775 data_processing_time = time.time() 

776 if number_samples_max and self.number_samples > number_samples_max: 

777 new_time_vector = npy.linspace( 

778 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

779 ).tolist() 

780 values = self.interpolate_values(new_time_vector) 

781 forced_values = self.interpolate_forced_values(new_time_vector) 

782 time_vector = new_time_vector 

783 number_samples = len(time_vector) 

784 else: 

785 time_vector = self.time_vector 

786 number_samples = len(self.values) 

787 values = self.values[:] 

788 forced_values = self.forced_values[:] 

789 data_processing_time = time.time() - data_processing_time 

790 

791 return self.__class__( 

792 signal_id=self.signal_id, 

793 time_vector=time_vector, 

794 values=values, 

795 forced_values=forced_values, 

796 number_samples=number_samples, 

797 number_samples_db=self.number_samples, 

798 data_start=self.data_start, 

799 data_end=self.data_end, 

800 db_query_time=self.db_query_time, 

801 init_time=self.init_time, 

802 data_processing_time=self.data_processing_time + data_processing_time, 

803 ) 

804 

805 def interest_window_desampling( 

806 self, 

807 window_max_number_samples: int, 

808 outside_max_number_samples: int, 

809 window_min_timestamp: float | None = None, 

810 window_max_timestamp: float | None = None, 

811 ) -> Self: 

812 """Performs a sampling in a window of interest and outside.""" 

813 

814 if not self.time_vector: 

815 return self 

816 

817 if window_min_timestamp is None: 

818 window_min_timestamp = self.time_vector[0] 

819 if window_max_timestamp is None: 

820 window_max_timestamp = self.time_vector[-1] 

821 

822 data_processing_time = time.time() 

823 

824 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

825 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

826 

827 time_vector_before = self.time_vector[:index_window_start] 

828 time_vector_window = self.time_vector[index_window_start:index_window_end] 

829 time_vector_after = self.time_vector[index_window_end:] 

830 

831 # Resampling window 

832 if time_vector_window: 

833 # Ensurring window bounds 

834 if time_vector_window[0] != window_min_timestamp: 

835 time_vector_window.insert(0, window_min_timestamp) 

836 if time_vector_window[-1] != window_max_timestamp: 

837 time_vector_window.append(window_max_timestamp) 

838 else: 

839 time_vector_window = [window_min_timestamp, window_max_timestamp] 

840 

841 if len(time_vector_window) > window_max_number_samples: 

842 # Resampling 

843 new_window_time_vector = npy.linspace( 

844 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

845 ).tolist() 

846 time_vector_window = new_window_time_vector 

847 

848 # Resampling outside 

849 number_samples_before = len(time_vector_before) 

850 number_samples_after = len(time_vector_after) 

851 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

852 new_number_samples_before = min( 

853 number_samples_before, 

854 math.ceil( 

855 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

856 ), 

857 ) 

858 new_number_samples_after = min( 

859 number_samples_after, 

860 math.ceil( 

861 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

862 ), 

863 ) 

864 # Adjusting numbers as math.ceil can do +1 on sum 

865 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

866 if new_number_samples_before > new_number_samples_after: 

867 new_number_samples_before -= 1 

868 else: 

869 new_number_samples_after -= 1 

870 

871 if new_number_samples_before > 0: 

872 new_time_vector_before = npy.linspace( 

873 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

874 ).tolist() 

875 time_vector_before = new_time_vector_before 

876 

877 if new_number_samples_after > 0: 

878 new_time_vector_after = npy.linspace( 

879 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

880 ).tolist()[::-1] 

881 time_vector_after = new_time_vector_after 

882 

883 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

884 values = self.interpolate_values(new_time_vector) 

885 forced_values = self.interpolate_forced_values(new_time_vector) 

886 number_samples = len(values) 

887 

888 data_processing_time = time.time() - data_processing_time 

889 

890 return self.__class__( 

891 signal_id=self.signal_id, 

892 forcible=self.forcible, 

893 time_vector=new_time_vector, 

894 values=values, 

895 forced_values=forced_values, 

896 number_samples=number_samples, 

897 number_samples_db=self.number_samples, 

898 data_start=self.data_start, 

899 data_end=self.data_end, 

900 db_query_time=self.db_query_time, 

901 init_time=self.init_time, 

902 data_processing_time=self.data_processing_time + data_processing_time, 

903 ) 

904 

905 def csv_export(self): 

906 output = io.StringIO() 

907 writer = csv.writer(output) 

908 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

909 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

910 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

911 return output.getvalue().encode("utf-8") 

912 

913 def prestoplot_export(self): 

914 clean_signal_id = self.signal_id.replace(".", "_") 

915 if clean_signal_id[0].isnumeric(): 

916 clean_signal_id = "_" + clean_signal_id 

917 

918 output = io.StringIO() 

919 output.write("# Encoding:\tUTF-8\n") 

920 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

921 output.write("ISO8601\tnone\tnone\n") 

922 output.write(f"# Description :\t{clean_signal_id}\n") 

923 

924 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

925 output.write( 

926 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

927 ) 

928 return output.getvalue().encode("utf-8") 

929 

930 

931class NumericSignalData(SignalData): 

932 data_type: str = "float" 

933 values: list[float | int | None] 

934 forced_values: list[float | int | None] 

935 

936 def interpolate(self, new_time_vector: list[float], items): 

937 items = [npy.nan if s is None else s for s in items] 

938 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

939 

940 def uniform_desampling(self, number_samples_max: int) -> Self: 

941 data_processing_time = time.time() 

942 if number_samples_max and self.number_samples > number_samples_max: 

943 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

944 forced_values = self.interpolate_forced_values(time_vector) 

945 number_samples = len(time_vector) 

946 else: 

947 time_vector = self.time_vector 

948 number_samples = len(self.values) 

949 values = self.values[:] 

950 forced_values = self.forced_values[:] 

951 data_processing_time = time.time() - data_processing_time 

952 

953 return self.__class__( 

954 signal_id=self.signal_id, 

955 time_vector=time_vector, 

956 values=values, 

957 forced_values=forced_values, 

958 number_samples=number_samples, 

959 number_samples_db=self.number_samples, 

960 data_start=self.data_start, 

961 data_end=self.data_end, 

962 db_query_time=self.db_query_time, 

963 init_time=self.init_time, 

964 data_processing_time=self.data_processing_time + data_processing_time, 

965 ) 

966 

967 def interest_window_desampling( 

968 self, 

969 window_max_number_samples: int, 

970 outside_max_number_samples: int, 

971 window_min_timestamp: float | None = None, 

972 window_max_timestamp: float | None = None, 

973 ) -> Self: 

974 """Performs a sampling in a window of interest and outside.""" 

975 

976 if not self.time_vector: 

977 return self 

978 

979 if window_min_timestamp is None: 

980 window_min_timestamp = self.time_vector[0] 

981 if window_max_timestamp is None: 

982 window_max_timestamp = self.time_vector[-1] 

983 

984 data_processing_time = time.time() 

985 

986 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

987 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

988 

989 time_vector_before = self.time_vector[:index_window_start] 

990 time_vector_window = self.time_vector[index_window_start:index_window_end] 

991 time_vector_after = self.time_vector[index_window_end:] 

992 

993 values_before = self.values[:index_window_start] 

994 values_window = self.values[index_window_start:index_window_end] 

995 values_after = self.values[index_window_end:] 

996 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

997 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

998 

999 # Resampling window 

1000 if time_vector_window: 

1001 # Ensurring window bounds 

1002 if time_vector_window[0] != window_min_timestamp: 

1003 time_vector_window.insert(0, window_min_timestamp) 

1004 values_window.insert(0, window_min_value) 

1005 if time_vector_window[-1] != window_max_timestamp: 

1006 time_vector_window.append(window_max_timestamp) 

1007 values_window.append(window_max_value) 

1008 else: 

1009 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1010 values_window = [window_min_value, window_max_value] 

1011 

1012 if len(time_vector_window) > window_max_number_samples: 

1013 # Resampling 

1014 time_vector_window, values_window = downsample_list( 

1015 time_vector_window, values_window, window_max_number_samples 

1016 ) 

1017 

1018 # Resampling outside 

1019 number_samples_before = len(time_vector_before) 

1020 number_samples_after = len(time_vector_after) 

1021 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1022 new_number_samples_before = min( 

1023 number_samples_before, 

1024 math.ceil( 

1025 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1026 ), 

1027 ) 

1028 new_number_samples_after = min( 

1029 number_samples_after, 

1030 math.ceil( 

1031 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1032 ), 

1033 ) 

1034 # Adjusting numbers as math.ceil can do +1 on sum 

1035 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1036 if new_number_samples_before > new_number_samples_after: 

1037 new_number_samples_before -= 1 

1038 else: 

1039 new_number_samples_after -= 1 

1040 

1041 if new_number_samples_before > 0: 

1042 time_vector_before, values_before = downsample_list( 

1043 time_vector_before, values_before, new_number_samples_before 

1044 ) 

1045 

1046 if new_number_samples_after > 0: 

1047 time_vector_after, values_after = downsample_list( 

1048 time_vector_after, values_after, new_number_samples_after 

1049 ) 

1050 

1051 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1052 values = values_before + values_window + values_after 

1053 forced_values = self.interpolate_forced_values(new_time_vector) 

1054 number_samples = len(values) 

1055 

1056 data_processing_time = time.time() - data_processing_time 

1057 

1058 return self.__class__( 

1059 signal_id=self.signal_id, 

1060 time_vector=new_time_vector, 

1061 values=values, 

1062 forced_values=forced_values, 

1063 number_samples=number_samples, 

1064 number_samples_db=self.number_samples, 

1065 data_start=self.data_start, 

1066 data_end=self.data_end, 

1067 db_query_time=self.db_query_time, 

1068 init_time=self.init_time, 

1069 data_processing_time=self.data_processing_time + data_processing_time, 

1070 ) 

1071 

1072 

1073class StringSignalData(SignalData): 

1074 data_type: str = "str" 

1075 values: list[str | None] 

1076 forced_values: list[str | None] 

1077 

1078 def interpolate(self, new_time_vector: list[float], items): 

1079 # Find the indices of the values in xp that are just smaller or equal to x 

1080 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1081 indices = npy.clip(indices, 0, len(items) - 1) 

1082 # Return the corresponding left string values from fp 

1083 return [items[i] for i in indices] 

1084 

1085 

1086class SignalsData(TwinPadModel): 

1087 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1088 data_processing_time: float 

1089 data_start: float | None 

1090 data_end: float | None 

1091 

1092 @classmethod 

1093 def get_from_signal_ids( 

1094 cls, 

1095 signal_ids: list[str], 

1096 min_timestamp: float = None, 

1097 max_timestamp: float = None, 

1098 window_min_timestamp: float = None, 

1099 window_max_timestamp: float = None, 

1100 interpolate_bounds: bool = True, 

1101 max_documents: int = None, 

1102 ) -> Self: 

1103 signals_data = [] 

1104 data_start = None 

1105 data_end = None 

1106 if max_timestamp is None: 

1107 max_timestamp = time.time() 

1108 data_processing_time = 0.0 

1109 for signal_id in signal_ids: 

1110 signal_data = SignalData.get_from_signal_id( 

1111 signal_id=signal_id, 

1112 min_timestamp=min_timestamp, 

1113 max_timestamp=max_timestamp, 

1114 window_min_timestamp=window_min_timestamp, 

1115 window_max_timestamp=window_max_timestamp, 

1116 interpolate_bounds=interpolate_bounds, 

1117 max_documents=max_documents, 

1118 ) 

1119 data_processing_time += signal_data.data_processing_time 

1120 signals_data.append(signal_data) 

1121 if signal_data.data_start is not None: 

1122 if data_start is None: 

1123 data_start = signal_data.data_start 

1124 else: 

1125 data_start = min(signal_data.data_start, data_start) 

1126 if signal_data.data_end is not None: 

1127 if data_end is None: 

1128 data_end = signal_data.data_end 

1129 else: 

1130 data_end = max(signal_data.data_end, data_end) 

1131 

1132 return cls( 

1133 signals_data=signals_data, 

1134 data_processing_time=data_processing_time, 

1135 data_start=data_start, 

1136 data_end=data_end, 

1137 ) 

1138 

1139 def uniform_desampling(self, number_samples_max: int) -> Self: 

1140 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1141 return SignalsData( 

1142 signals_data=signals_data, 

1143 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1144 data_start=self.data_start, 

1145 data_end=self.data_end, 

1146 ) 

1147 

1148 def interest_window_desampling( 

1149 self, 

1150 window_max_number_samples: int, 

1151 outside_max_number_samples: int, 

1152 window_min_timestamp: float = None, 

1153 window_max_timestamp: float = None, 

1154 ) -> Self: 

1155 signals_data = [ 

1156 s.interest_window_desampling( 

1157 window_max_number_samples=window_max_number_samples, 

1158 outside_max_number_samples=outside_max_number_samples, 

1159 window_min_timestamp=window_min_timestamp, 

1160 window_max_timestamp=window_max_timestamp, 

1161 ) 

1162 for s in self.signals_data 

1163 ] 

1164 

1165 return SignalsData( 

1166 signals_data=signals_data, 

1167 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1168 data_start=self.data_start, 

1169 data_end=self.data_end, 

1170 ) 

1171 

1172 def zip_export(self, file_format: str = "csv"): 

1173 # return self.signals_data[0].csv_export() 

1174 zip_buffer = io.BytesIO() 

1175 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1176 for signal_data in self.signals_data: 

1177 if file_format == "csv": 

1178 export_io = signal_data.csv_export() 

1179 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1180 elif file_format == "prestoplot": 

1181 export_io = signal_data.prestoplot_export() 

1182 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1183 else: 

1184 raise ValueError(f"Format not found. Got: {file_format}") 

1185 zip_bytes = zip_buffer.getvalue() 

1186 # zip_bytes.seek(0) 

1187 return zip_bytes 

1188 

1189 def hdf5_export(self): 

1190 hdf5_buffer = io.BytesIO() 

1191 custom_type_float = npy.dtype( 

1192 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1193 ) 

1194 custom_type_string = npy.dtype( 

1195 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1196 ) 

1197 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1198 for signal_data in self.signals_data: 

1199 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1200 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1201 if signal_data.data_type == "str": 

1202 export_data = npy.array( 

1203 list( 

1204 zip( 

1205 date_vector, 

1206 signal_data.time_vector, 

1207 signal_data.values, 

1208 signal_data.forced_values, 

1209 ) 

1210 ), 

1211 dtype=custom_type_string, 

1212 ) 

1213 else: 

1214 export_data = npy.array( 

1215 list( 

1216 zip( 

1217 date_vector, 

1218 signal_data.time_vector, 

1219 signal_data.values, 

1220 signal_data.forced_values, 

1221 ) 

1222 ), 

1223 dtype=custom_type_float, 

1224 ) 

1225 signal_group["data"] = export_data 

1226 return hdf5_buffer.getvalue() 

1227 

1228 

1229class SignalStatus(TwinPadModel): 

1230 status: str 

1231 reason: str 

1232 delay: float | None 

1233 

1234 

1235class DigitizationFunction(TwinPadModel): 

1236 bits: int | None = None 

1237 min_value: float 

1238 max_value: float 

1239 min_raw_value: float 

1240 max_raw_value: float 

1241 

1242 

1243class SignalUpdate(TwinPadModel): 

1244 value: float | str | bool | int | None = None 

1245 forced_value: float | str | bool | int | None = None 

1246 timestamp: int | None = None 

1247 

1248 

1249class SignalType(str, Enum): 

1250 command = "command" 

1251 sensor = "sensor" 

1252 external_sensor = "external_sensor" 

1253 

1254 

1255SIGNALDATA_TYPES = { 

1256 "int": NumericSignalData, 

1257 "float": NumericSignalData, 

1258 "str": StringSignalData, 

1259 "bool": NumericSignalData, 

1260 "epoch": NumericSignalData, 

1261} 

1262 

1263 

1264class Signal(GenericMongo): 

1265 collection_name: ClassVar[str] = "signals" 

1266 

1267 signal_id: str 

1268 frequency: float 

1269 unit: str | None 

1270 description: str 

1271 type: SignalType 

1272 data_type: str 

1273 precision_digits: int | None 

1274 forcible: bool 

1275 

1276 digitization_function: DigitizationFunction | None 

1277 

1278 @property 

1279 def device(self) -> Device: 

1280 device_id = self.signal_id.split(".")[0] 

1281 device = Device.get_one_by_attribute("device_id", device_id) 

1282 return device 

1283 

1284 @cached_property 

1285 def signal_data_class(self): 

1286 if self.data_type in SIGNALDATA_TYPES: 

1287 return SIGNALDATA_TYPES[self.data_type] 

1288 if self.data_type.startswith("enum"): 

1289 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1290 raise ValueError(f"Unhandled python type: {self.data_type}") 

1291 

1292 @cached_property 

1293 def python_type(self): 

1294 if self.data_type in TYPES: 

1295 return TYPES[self.data_type] 

1296 if self.data_type.startswith("enum"): 

1297 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1298 return Literal[*choices] 

1299 raise ValueError(f"Unhandled python type: {self.data_type}") 

1300 

1301 @computed_field 

1302 @property 

1303 def status(self) -> SignalStatus: 

1304 now = time.time() 

1305 status = "up" 

1306 reason = "" 

1307 

1308 # See line 285 for explanation 

1309 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1310 last_bucket = None 

1311 if bucket is not None: 

1312 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1313 if last_bucket is None: 

1314 status = "no data" 

1315 reason = "signal does not exist" 

1316 return SignalStatus(status=status, reason=reason, delay=None) 

1317 

1318 try: 

1319 last_date = last_bucket["control"]["max"]["timestamp"] 

1320 last_date = last_date.replace(tzinfo=pytz.UTC) 

1321 last_value_ts = last_date.timestamp() 

1322 except IndexError: 

1323 last_value_ts = None 

1324 

1325 if last_value_ts is None: 

1326 delay = None 

1327 reason = "No data from signal" 

1328 else: 

1329 # Since device is a computed property, only compute it once 

1330 device = self.device 

1331 if device is not None and device.last_ping is not None: 

1332 last_value_ts = max(last_value_ts, device.last_ping) 

1333 delay = now - last_value_ts 

1334 if delay > DEVICE_TIMEOUT: 

1335 status = "down" 

1336 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1337 return SignalStatus(status=status, reason=reason, delay=delay) 

1338 

1339 @classmethod 

1340 def status_batch(cls, signal_ids: list[str], devices_by_ids: dict[str, Device]) -> dict[str, SignalStatus]: 

1341 """Computes the status of multiple signals in batch from their signal ids. 

1342 

1343 :param signal_ids: Signal IDs of the wanted signals 

1344 :type signal_ids: list[str] 

1345 :param devices_by_ids: A pre-computed map of all signal IDs linked to their :py:class:`Device`. 

1346 :type devices_by_ids: dict[str, Device] 

1347 :return: A dictionary with the signal ID as the keys and their respective :py:class:`SignalStatus` as its values. 

1348 :rtype: dict[str, SignalStatus] 

1349 """ 

1350 statuses_by_signal_id = {} 

1351 

1352 buckets = get_signal_collections_batch([f"system.buckets.{signal_id}" for signal_id in signal_ids]) 

1353 for signal_id, bucket in zip(signal_ids, buckets): 

1354 now = time.time() 

1355 status = "up" 

1356 reason = "" 

1357 last_bucket = None 

1358 if bucket is not None: 

1359 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1360 if last_bucket is None: 

1361 status = "no data" 

1362 reason = "signal does not exist" 

1363 statuses_by_signal_id[signal_id] = SignalStatus(status=status, reason=reason, delay=None) 

1364 continue 

1365 

1366 try: 

1367 last_date = last_bucket["control"]["max"]["timestamp"] 

1368 last_date = last_date.replace(tzinfo=pytz.UTC) 

1369 last_value_ts = last_date.timestamp() 

1370 except IndexError: 

1371 last_value_ts = None 

1372 

1373 if last_value_ts is None: 

1374 delay = None 

1375 reason = "No data from signal" 

1376 else: 

1377 # Since device is a computed property, only compute it once 

1378 device = devices_by_ids.get(signal_id.split(".")[0], None) 

1379 delay = now - last_value_ts 

1380 if device is not None and device.status == "up": 

1381 delay = 0 

1382 if delay > DEVICE_TIMEOUT: 

1383 status = "down" 

1384 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1385 statuses_by_signal_id[signal_id] = SignalStatus(status=status, reason=reason, delay=delay) 

1386 

1387 return statuses_by_signal_id 

1388 

1389 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1390 command = Command( 

1391 sent_at=time.time(), 

1392 command_type="Signal command", 

1393 user_id=current_user.id, 

1394 ) 

1395 

1396 has_input_error = False 

1397 error_message = "" 

1398 

1399 if self.data_type.startswith("enum"): 

1400 enum_options = get_args(self.python_type) 

1401 

1402 if update_dict.value is not None and update_dict.value not in enum_options: 

1403 has_input_error = True 

1404 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1405 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1406 has_input_error = True 

1407 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1408 else: 

1409 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1410 has_input_error = True 

1411 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1412 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1413 has_input_error = True 

1414 error_message += ( 

1415 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1416 ) 

1417 

1418 if has_input_error: 

1419 command.response_time = 0 

1420 command.succeeded = False 

1421 command.description = f"Tried to modify signal {self.signal_id}" 

1422 response = {"error": True, "status_code": 400, "message": error_message} 

1423 else: 

1424 response = await send_signal_value(self.signal_id, update_dict) 

1425 command.receive_response(response) 

1426 

1427 Command.create(command) 

1428 return response 

1429 

1430 @classmethod 

1431 def get_from_signal_id(cls, signal_id) -> Self: 

1432 """Could be generic from mongo""" 

1433 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1434 if not raw_value: 

1435 return None 

1436 del raw_value["_id"] 

1437 return cls.dict_to_object(raw_value) 

1438 

1439 @classmethod 

1440 def get_all_ids(cls) -> list[str]: 

1441 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1442 

1443 return [signal["signal_id"] for signal in cursor] 

1444 

1445 async def number_samples(self): 

1446 collection = get_signal_collection(signal_id=self.signal_id) 

1447 if collection is None: 

1448 return 0 

1449 

1450 number_samples = collection.estimated_document_count() 

1451 

1452 number_samples_async_collection = await get_async_collection( 

1453 systems_async_database, "number_samples", create=True, time_series=True 

1454 ) 

1455 

1456 loop = asyncio.get_running_loop() 

1457 loop.create_task( 

1458 number_samples_async_collection.insert_one( 

1459 { 

1460 "timestamp": datetime.datetime.now(pytz.UTC), 

1461 "signal_id": self.signal_id, 

1462 "number_samples": number_samples, 

1463 } 

1464 ) 

1465 ) 

1466 

1467 return number_samples 

1468 

1469 @classmethod 

1470 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]: 

1471 number_samples_by_id = {} 

1472 collections = get_signal_collections_batch(signal_ids) 

1473 number_samples_async_collection = await get_async_collection( 

1474 systems_async_database, "number_samples", create=True, time_series=True 

1475 ) 

1476 

1477 for signal_id, collection in zip(signal_ids, collections): 

1478 if collection is None: 

1479 number_samples_by_id[signal_id] = 0 

1480 continue 

1481 

1482 number_samples = collection.estimated_document_count() 

1483 

1484 number_samples_by_id[signal_id] = number_samples 

1485 

1486 now = datetime.datetime.now(pytz.UTC) 

1487 loop = asyncio.get_running_loop() 

1488 loop.create_task( 

1489 number_samples_async_collection.insert_many( 

1490 [ 

1491 { 

1492 "timestamp": now, 

1493 "signal_id": signal_id, 

1494 "number_samples": number_samples, 

1495 } 

1496 for signal_id, number_samples in number_samples_by_id.items() 

1497 ] 

1498 ) 

1499 ) 

1500 

1501 return number_samples_by_id 

1502 

1503 def sample_datasize(self): 

1504 return signals_database.command("collstats", self.signal_id)["size"] 

1505 

1506 @classmethod 

1507 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1508 result = cls.collection().aggregate( 

1509 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1510 ) 

1511 

1512 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1513 

1514 

1515class ServicesStatus(TwinPadModel): 

1516 backend: str 

1517 cloud_broker: str 

1518 time_series_database: str 

1519 signal_storage: str 

1520 heartbeat_storage: str 

1521 data_analyzer: str 

1522 

1523 @classmethod 

1524 def check(cls) -> Self: 

1525 return cls( 

1526 cloud_broker=ping(RABBITMQ_HOST), 

1527 backend="up", 

1528 time_series_database=ping(MONGO_HOST), 

1529 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1530 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1531 data_analyzer=ping(DATA_ANALYZER_HOST), 

1532 ) 

1533 

1534 

1535def ping(host): 

1536 try: 

1537 if ping3.ping(host, timeout=0.8): 

1538 return "up" 

1539 except PermissionError: 

1540 pass 

1541 return "down" 

1542 

1543 

1544class Event(GenericMongo): 

1545 collection_name: ClassVar[str] = "events" 

1546 

1547 name: str 

1548 timestamp: float 

1549 event_rule_id: str 

1550 

1551 @computed_field 

1552 @cached_property 

1553 def event_rule(self) -> "EventRule": 

1554 return EventRule.get_from_id(self.event_rule_id) 

1555 

1556 @classmethod 

1557 def dict_to_object(cls, dict_): 

1558 """Refine to convert timestamp to datetime for mongodb.""" 

1559 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1560 return super().dict_to_object(dict_) 

1561 

1562 

1563class TwinPadActivity(GenericMongo): 

1564 timestamp: float 

1565 amount: int 

1566 

1567 @classmethod 

1568 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

1569 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1570 number_events_collection = get_collection(systems_database, "number_events") 

1571 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1572 items = [] 

1573 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1574 if number_events_collection is None or recompute_amount: 

1575 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1576 number_events_collection.delete_many({}) 

1577 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1578 if first_event is None: 

1579 return items 

1580 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1581 tzinfo=pytz.UTC 

1582 ) 

1583 while last_computed_day < TODAY: 

1584 day_nb_events = events_collection.count_documents( 

1585 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1586 ) 

1587 if day_nb_events > 0: 

1588 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

1589 last_computed_day += ONE_DAY_OFFSET 

1590 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1591 if number_events_today > 0: 

1592 number_events_collection.delete_many({"timestamp": TODAY}) 

1593 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

1594 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1595 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1596 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1597 for day in number_events: 

1598 day["timestamp"] = day["timestamp"].timestamp() 

1599 items.append(cls.mongo_dict_to_object(day)) 

1600 return items 

1601 

1602 @classmethod 

1603 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1604 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1605 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1606 signals_number_samples_collection = get_collection( 

1607 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

1608 ) 

1609 items = [] 

1610 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1611 if number_samples_collection is None or recompute_amount: 

1612 number_samples_collection = get_collection( 

1613 systems_database, "number_received_samples", create=True, time_series=True 

1614 ) 

1615 number_samples_collection.delete_many({}) 

1616 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

1617 if first_sample is None: 

1618 return items 

1619 # compute from day of first found event 

1620 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

1621 tzinfo=pytz.UTC 

1622 ) 

1623 while last_computed_day < TODAY: 

1624 number_samples_request = signals_number_samples_collection.aggregate( 

1625 [ 

1626 { 

1627 "$match": { 

1628 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

1629 } 

1630 }, 

1631 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1632 ] 

1633 ).to_list() 

1634 if len(number_samples_request) == 0: 

1635 number_samples = 0 

1636 else: 

1637 number_samples = number_samples_request[0].get("number_samples", 0) 

1638 if number_samples > 0: 

1639 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

1640 last_computed_day += ONE_DAY_OFFSET 

1641 number_samples_request = signals_number_samples_collection.aggregate( 

1642 [ 

1643 {"$match": {"timestamp": {"$gte": TODAY}}}, 

1644 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1645 ] 

1646 ).to_list() 

1647 if len(number_samples_request) == 0: 

1648 number_samples_today = 0 

1649 else: 

1650 number_samples_today = number_samples_request[0].get("number_samples", 0) 

1651 if number_samples_today > 0: 

1652 number_samples_collection.delete_many({"timestamp": TODAY}) 

1653 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

1654 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1655 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1656 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1657 for day in number_events: 

1658 day["timestamp"] = day["timestamp"].timestamp() 

1659 items.append(cls.mongo_dict_to_object(day)) 

1660 return items 

1661 

1662 @classmethod 

1663 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1664 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1665 number_commands_collection = get_collection(systems_database, "number_commands") 

1666 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

1667 items = [] 

1668 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1669 if number_commands_collection is None or recompute_amount: 

1670 number_commands_collection = get_collection( 

1671 systems_database, "number_commands", create=True, time_series=True 

1672 ) 

1673 number_commands_collection.delete_many({}) 

1674 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

1675 if first_command is None: 

1676 return items 

1677 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

1678 tzinfo=pytz.UTC 

1679 ) 

1680 while last_computed_day < TODAY: 

1681 day_nb_commands = commands_collection.count_documents( 

1682 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1683 ) 

1684 if day_nb_commands > 0: 

1685 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

1686 last_computed_day += ONE_DAY_OFFSET 

1687 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1688 if number_commands_today > 0: 

1689 number_commands_collection.delete_many({"timestamp": TODAY}) 

1690 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

1691 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1692 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1693 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1694 for day in number_commands: 

1695 day["timestamp"] = day["timestamp"].timestamp() 

1696 items.append(cls.mongo_dict_to_object(day)) 

1697 return items 

1698 

1699 

1700class EventRule(GenericMongo): 

1701 collection_name: ClassVar[str] = "event_rules" 

1702 

1703 name: str 

1704 formula: str 

1705 variables: list[str] 

1706 

1707 @computed_field 

1708 @cached_property 

1709 def number_events(self) -> int: 

1710 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1711 

1712 

1713class Company(GenericMongo): 

1714 collection_name: ClassVar[str] = "companies" 

1715 name: str 

1716 

1717 

1718class Campaign(GenericMongo): 

1719 collection_name: ClassVar[str] = "campaigns" 

1720 

1721 # Properties 

1722 id: str | None = None 

1723 name: str 

1724 description: str | None = None 

1725 

1726 @classmethod 

1727 def create(cls, campaign: Self): 

1728 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1729 if new_campaign is None: 

1730 return None 

1731 return {"campaign_id": str(new_campaign.inserted_id)} 

1732 

1733 @classmethod 

1734 def update(cls, campaign: Self): 

1735 updated_campaign = cls.collection().find_one_and_update( 

1736 {"_id": ObjectId(campaign.id)}, 

1737 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1738 return_document=ReturnDocument.AFTER, 

1739 ) 

1740 return updated_campaign 

1741 

1742 @classmethod 

1743 def delete(cls, campaign_id): 

1744 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1745 return deleted_user 

1746 

1747 

1748class Phase(GenericMongo): 

1749 collection_name: ClassVar[str] = "phases" 

1750 

1751 # Properties 

1752 id: str | None = None 

1753 name: str 

1754 description: str | None = None 

1755 start_at: float 

1756 end_at: float 

1757 

1758 # FK 

1759 campaign_id: str 

1760 

1761 # @classmethod 

1762 # def get_by_date(cls, datetime: float): 

1763 # phases = [] 

1764 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1765 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1766 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1767 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1768 # if phases is None: 

1769 # return None 

1770 # return phases 

1771 

1772 @classmethod 

1773 def create(cls, phase: Self): 

1774 phase = Phase( 

1775 name=phase.name, 

1776 description=phase.description, 

1777 start_at=phase.start_at, 

1778 end_at=phase.end_at, 

1779 campaign_id=phase.campaign_id, 

1780 ) 

1781 phase_collection = get_collection(systems_database, "phases", create=True) 

1782 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1783 if new_phase is None: 

1784 return None 

1785 return {"phase_id": str(new_phase.inserted_id)} 

1786 

1787 @classmethod 

1788 def update(cls, phase: Self): 

1789 updated_phase = cls.collection().find_one_and_update( 

1790 {"_id": ObjectId(phase.id)}, 

1791 { 

1792 "$set": { 

1793 "name": phase.name, 

1794 "description": phase.description, 

1795 "start_at": phase.start_at, 

1796 "end_at": phase.end_at, 

1797 } 

1798 }, 

1799 return_document=ReturnDocument.AFTER, 

1800 ) 

1801 return updated_phase 

1802 

1803 @classmethod 

1804 def delete(cls, phase_id): 

1805 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1806 return delete_phase 

1807 

1808 @classmethod 

1809 def deleteMany(cls, campaign_id): 

1810 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1811 return delete_phases 

1812 

1813 

1814class CustomViewCreation(GenericMongo): 

1815 collection_name: ClassVar[str] = "custom_views" 

1816 

1817 name: str 

1818 configuration: list 

1819 

1820 

1821class CustomView(CustomViewCreation): 

1822 # Properties 

1823 id: str | None = None 

1824 

1825 # Foreign Key 

1826 user_id: str 

1827 

1828 # # Methods 

1829 # @classmethod 

1830 # def create(cls, form_custom_view: Self, user_id) -> list: 

1831 # custom_view = CustomView( 

1832 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1833 # ) 

1834 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1835 # return new_custom_view 

1836 

1837 # @classmethod 

1838 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1839 # updated_custom_view = cls.collection().find_one_and_update( 

1840 # {"_id": ObjectId(custom_view_id)}, 

1841 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1842 # return_document=ReturnDocument.AFTER, 

1843 # ) 

1844 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1845 # del updated_custom_view["_id"] 

1846 # return cls(**updated_custom_view) 

1847 

1848 # @classmethod 

1849 # def delete(cls, custom_view_id) -> bool: 

1850 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1851 # return deleted_custom_view.acknowledged 

1852 

1853 

1854CustomViewUpdate = create_update_model(CustomView) 

1855 

1856 

1857class Video(GenericMongo): 

1858 collection_name: ClassVar[str] = "videos" 

1859 

1860 # Properties 

1861 name: str 

1862 ip_addr: str 

1863 username: str | None = None 

1864 password: str | None = None 

1865 

1866 # Methods 

1867 @classmethod 

1868 def get_all(cls, sort_by="_id") -> list[Self]: 

1869 items = [] 

1870 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1871 items.append(cls.mongo_dict_to_object(dict_)) 

1872 return items 

1873 

1874 @classmethod 

1875 def get_video(cls, camera_id: ObjectId): 

1876 camera = cls.get_from_id(camera_id) 

1877 if camera is not None: 

1878 return camera.name 

1879 return None 

1880 

1881 

1882class Command(GenericMongo): 

1883 collection_name: ClassVar[str] = "commands" 

1884 

1885 # Properties 

1886 timestamp: datetime.datetime = None 

1887 sent_at: float 

1888 response_time: float = 0.0 

1889 command_type: str 

1890 description: str = "" 

1891 succeeded: bool = False 

1892 

1893 # Foreign key 

1894 user_id: str 

1895 

1896 @classmethod 

1897 def collection(cls): 

1898 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1899 

1900 @classmethod 

1901 def create(cls, command: Self): 

1902 command = cls( 

1903 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1904 sent_at=command.sent_at, 

1905 response_time=command.response_time, 

1906 command_type=command.command_type, 

1907 description=command.description, 

1908 succeeded=command.succeeded, 

1909 user_id=command.user_id, 

1910 ) 

1911 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

1912 if new_command is None: 

1913 return None 

1914 return {"command_id": str(new_command.inserted_id)} 

1915 

1916 def receive_response(self, response: dict): 

1917 self.response_time = time.time() - self.sent_at 

1918 self.succeeded = response.get("error", True) is False 

1919 if self.description == "": 

1920 self.description += response.get("message", "").rstrip() 

1921 

1922 

1923class SignalsPresetCreation(GenericMongo): 

1924 name: str 

1925 signal_ids: list[str] 

1926 

1927 

1928class SignalsPreset(SignalsPresetCreation): 

1929 collection_name: ClassVar[str] = "signals_presets" 

1930 

1931 user_id: str 

1932 

1933 @classmethod 

1934 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

1935 signals_preset = cls( 

1936 user_id=user_id, 

1937 name=signals_preset.name, 

1938 signal_ids=signals_preset.signal_ids, 

1939 ) 

1940 

1941 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

1942 

1943 return str(new_signal_preset.inserted_id) 

1944 

1945 

1946SignalsPresetUpdate = create_update_model(SignalsPreset) 

1947 

1948 

1949class LineStyle(str, Enum): 

1950 solid = "solid" 

1951 dotted = "dotted" 

1952 dashed = "dashed" 

1953 

1954 

1955class SignalAppearance: 

1956 value_color: str 

1957 forced_value_color: str 

1958 

1959 

1960class GraphThemeCreation(GenericMongo): 

1961 collection_name: ClassVar[str] = "graph_themes" 

1962 

1963 name: str 

1964 signal_id: str 

1965 value_color: str = "" 

1966 forced_value_color: str = "" 

1967 value_line_style: LineStyle = LineStyle.solid 

1968 forced_value_line_style: LineStyle = LineStyle.solid 

1969 private: bool = True 

1970 

1971 

1972class PublicGraphTheme(GraphThemeCreation): 

1973 created_by_user: bool 

1974 in_user_library: bool 

1975 active_for_user: bool 

1976 

1977 _current_user_id: str = "" 

1978 

1979 @classproperty 

1980 def custom_pipeline_steps(cls) -> dict[str, list]: 

1981 return { 

1982 "created_by_user": [ 

1983 { 

1984 "$addFields": { 

1985 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

1986 } 

1987 } 

1988 ], 

1989 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

1990 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

1991 ], 

1992 "in_user_library": [ 

1993 { 

1994 "$addFields": { 

1995 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

1996 } 

1997 } 

1998 ], 

1999 "active_for_user": [ 

2000 { 

2001 "$addFields": { 

2002 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2003 } 

2004 } 

2005 ], 

2006 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2007 "active": [ 

2008 { 

2009 "$addFields": { 

2010 "active": "$$REMOVE", 

2011 } 

2012 } 

2013 ], 

2014 "creator_id": [ 

2015 { 

2016 "$addFields": { 

2017 "creator_id": "$$REMOVE", 

2018 } 

2019 } 

2020 ], 

2021 } 

2022 

2023 @classmethod 

2024 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2025 cls._current_user_id = user_id 

2026 return super().response_from_query(query) 

2027 

2028 @classmethod 

2029 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2030 query.in_user_library = "true" 

2031 return cls.response_from_query(query, user_id) 

2032 

2033 @classmethod 

2034 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2035 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2036 

2037 @classmethod 

2038 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2039 cls._current_user_id = user_id 

2040 return super().get_by_attribute(attribute_name, attribute_value) 

2041 

2042 @classmethod 

2043 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2044 cls._current_user_id = user_id 

2045 return super().get_one_by_attribute(attribute_name, attribute_value) 

2046 

2047 @classmethod 

2048 def get_all(cls, sort_by: str, user_id: str): 

2049 cls._current_user_id = user_id 

2050 return super().get_all(sort_by) 

2051 

2052 @classmethod 

2053 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2054 pipeline = [ 

2055 { 

2056 "$match": { 

2057 "active": {"$eq": user_id}, 

2058 "signal_id": {"$in": signal_ids}, 

2059 } 

2060 }, 

2061 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2062 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2063 { 

2064 "$project": { 

2065 "_id": 0, 

2066 "signal_id": 1, 

2067 "value_color": 1, 

2068 "forced_value_color": 1, 

2069 "value_line_style": 1, 

2070 "forced_value_line_style": 1, 

2071 } 

2072 }, 

2073 ] 

2074 

2075 result = {} 

2076 

2077 cursor = cls.collection().aggregate(pipeline) 

2078 for document in cursor: 

2079 signal_id = document["signal_id"] 

2080 del document["signal_id"] 

2081 result[signal_id] = document 

2082 

2083 return result 

2084 

2085 

2086GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2087 

2088 

2089class PrivateGraphTheme(GraphThemeCreation): 

2090 # private 

2091 creator_id: str 

2092 in_library: list[str] 

2093 active: list[str] 

2094 

2095 @classmethod 

2096 def create( 

2097 cls, 

2098 creator_id: str, 

2099 name: str, 

2100 signal_id: str, 

2101 value_color: str, 

2102 forced_value_color: str, 

2103 value_line_style: LineStyle, 

2104 forced_value_line_style: LineStyle, 

2105 private: bool, 

2106 ): 

2107 color_setting = cls( 

2108 creator_id=creator_id, 

2109 name=name, 

2110 signal_id=signal_id, 

2111 value_color=value_color, 

2112 forced_value_color=forced_value_color, 

2113 value_line_style=value_line_style, 

2114 forced_value_line_style=forced_value_line_style, 

2115 private=private, 

2116 in_library=[creator_id], 

2117 active=[creator_id], 

2118 ) 

2119 

2120 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2121 color_setting.id = str(new_color_setting.inserted_id) 

2122 return color_setting 

2123 

2124 def update(self, update_dict: dict, user_id: str): 

2125 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2126 if in_user_lib and user_id not in self.in_library: 

2127 self.in_library.append(user_id) 

2128 elif not in_user_lib and user_id in self.in_library: 

2129 self.in_library.remove(user_id) 

2130 update_dict["in_library"] = self.in_library 

2131 del update_dict["in_user_library"] 

2132 

2133 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2134 if active_for_user and user_id not in self.active: 

2135 self.active.append(user_id) 

2136 elif not active_for_user and user_id in self.active: 

2137 self.active.remove(user_id) 

2138 update_dict["active"] = self.active 

2139 del update_dict["active_for_user"] 

2140 

2141 if update_dict.get("created_by_user") is not None: 

2142 del update_dict["created_by_user"] 

2143 

2144 self.collection().find_one_and_update( 

2145 {"_id": ObjectId(self.id)}, 

2146 {"$set": update_dict}, 

2147 ) 

2148 

2149 return {}