Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%

1214 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-05 12:35 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23import h5py 

24 

25# from scipy import signal as signal_scipy 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 devices_states_database, 

36) 

37from twinpad_backend.responses import ListResponse 

38from twinpad_backend.messages import send_mode_change, send_signal_value 

39 

40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

41 

42 

43RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

44MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

45SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

46HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

47DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

48 

49DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

50NUMBER_SAMPLES_DATABASE_UPDATE = 120 

51 

52logger = logging.getLogger("uvicorn.error") 

53 

54 

55class classproperty: 

56 """ 

57 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

58 Found here: https://stackoverflow.com/a/76301341 

59 """ 

60 

61 def __init__(self, func): 

62 self.fget = func 

63 

64 def __get__(self, _, owner): 

65 return self.fget(owner) 

66 

67 

68def create_update_model(model): 

69 fields = {} 

70 

71 for field_name, field_annotation in model.model_fields.items(): 

72 if field_name != "id": 

73 fields[field_name] = (field_annotation.annotation | None, None) 

74 

75 query_name = model.__name__ + "Update" 

76 return create_model(query_name, **fields) 

77 

78 

79def get_utc_date_from_timestamp(timestamp: float): 

80 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

81 

82 

83def downsample_list(time_vector: list, values: list, max_number_samples: int): 

84 if len(time_vector) < max_number_samples: 

85 return time_vector, values 

86 

87 time_vector_copy = copy.deepcopy(time_vector) 

88 values_copy = copy.deepcopy(values) 

89 

90 none_group_bounds = [] 

91 none_group_index = -1 

92 index = -1 

93 # LTTB doesn't handle None values so remove them 

94 while values_copy.count(None) > 0: 

95 # Store bounds of None value groups so we can insert them back after the downsampling 

96 if (new_index := values_copy.index(None)) != index: 

97 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

98 none_group_index += 1 

99 elif len(none_group_bounds[none_group_index]) < 2: 

100 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

101 else: 

102 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

103 values_copy.pop(new_index) 

104 index = new_index 

105 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

106 

107 try: 

108 values_array = npy.array([time_vector_copy, values_copy]).T 

109 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

110 

111 new_time_vector = interpolated_values[:, 0].tolist() 

112 new_values = interpolated_values[:, 1].tolist() 

113 except ValueError: 

114 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

115 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

116 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

117 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

118 return new_time_vector, new_values_nan_to_none 

119 

120 # insert back None values at the correct timestamps 

121 for none_group in none_group_bounds: 

122 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

123 new_time_vector[start_index:start_index] = none_group 

124 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

125 

126 return new_time_vector, new_values 

127 

128 

129def is_of_type(value, wanted_type): 

130 if wanted_type is float: 

131 return isinstance(value, (int, float)) 

132 return isinstance(value, wanted_type) 

133 

134 

135# Models 

136class TwinPadModel(BaseModel): 

137 @classmethod 

138 def dict_to_object(cls, dict_): 

139 return cls.model_validate(dict_) 

140 

141 def to_dict(self, exclude=None): 

142 dict_ = self.model_dump(exclude=exclude) 

143 return dict_ 

144 

145 

146class GenericMongo(TwinPadModel): 

147 id: str | None = None 

148 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

149 

150 @classmethod 

151 def collection(cls): 

152 return get_collection(systems_database, cls.collection_name, create=True) 

153 

154 @classmethod 

155 def response_from_query(cls, query) -> ListResponse[Self]: 

156 request_filters = query.mongodb_filter() 

157 items = [] 

158 

159 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

160 sort_dict = {} 

161 for sort in query.sort_by.split(","): 

162 if ":" in sort: 

163 sort_field, sort_order = sort.split(":") 

164 sort_order = int(sort_order) 

165 else: 

166 sort_field = sort 

167 sort_order = 1 

168 sort_dict[sort_field] = sort_order 

169 

170 collection = get_collection(systems_database, cls.collection_name, create=True) 

171 total = collection.count_documents(request_filters) 

172 

173 pipeline = [] 

174 added_properties = [] 

175 if "$and" in request_filters: 

176 for request_filter in request_filters["$and"]: 

177 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

178 if filtered_property in request_filter: 

179 pipeline.extend(pipeline_steps) 

180 added_properties.append(filtered_property) 

181 else: 

182 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

183 if filtered_property in request_filters: 

184 pipeline.extend(pipeline_steps) 

185 added_properties.append(filtered_property) 

186 pipeline.append({"$match": request_filters}) 

187 

188 for sort_field in sort_dict.keys(): 

189 if sort_field in cls.custom_pipeline_steps: 

190 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

191 added_properties.append(sort_field) 

192 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

193 

194 if (query.limit is not None) and (query.limit != 0): 

195 pipeline.append({"$limit": query.limit}) 

196 

197 for filtered_property, step in cls.custom_pipeline_steps.items(): 

198 if filtered_property not in added_properties: 

199 pipeline.extend(step) 

200 

201 cursor = collection.aggregate(pipeline) 

202 

203 for item_dict in cursor: 

204 items.append(cls.mongo_dict_to_object(item_dict)) 

205 

206 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

207 

208 @classmethod 

209 def get_from_id(cls, item_id) -> Self | None: 

210 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

211 

212 @classmethod 

213 def mongo_dict_to_object(cls, mongo_dict): 

214 mongo_dict["id"] = str(mongo_dict["_id"]) 

215 del mongo_dict["_id"] 

216 return cls.dict_to_object(mongo_dict) 

217 

218 @classmethod 

219 def get_by_attribute(cls, attribute_name: str, attribute_value): 

220 """Returns all items that match the attribute with value.""" 

221 pipeline = [] 

222 if attribute_name in cls.custom_pipeline_steps: 

223 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

224 pipeline.append({"$match": {attribute_name: attribute_value}}) 

225 for key, step in cls.custom_pipeline_steps.items(): 

226 if key != attribute_name: 

227 pipeline.extend(step) 

228 items = cls.collection().aggregate(pipeline) 

229 if items is None: 

230 return None 

231 return [cls.mongo_dict_to_object(d) for d in items] 

232 

233 @classmethod 

234 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

235 pipeline = [] 

236 if attribute_name in cls.custom_pipeline_steps: 

237 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

238 pipeline.append({"$match": {attribute_name: attribute_value}}) 

239 pipeline.append({"$limit": 1}) 

240 for key, step in cls.custom_pipeline_steps.items(): 

241 if key != attribute_name: 

242 pipeline.extend(step) 

243 items = cls.collection().aggregate(pipeline).to_list() 

244 if len(items) == 0: 

245 return None 

246 return cls.mongo_dict_to_object(items[0]) 

247 

248 @classmethod 

249 def get_all(cls, sort_by="_id") -> list[Self]: 

250 items = [] 

251 pipeline = [] 

252 if sort_by in cls.custom_pipeline_steps: 

253 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

254 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

255 for key, step in cls.custom_pipeline_steps.items(): 

256 if key != sort_by: 

257 pipeline.extend(step) 

258 for dict_ in cls.collection().aggregate(pipeline): 

259 items.append(cls.mongo_dict_to_object(dict_)) 

260 return items 

261 

262 @classmethod 

263 def get_number_documents(cls): 

264 collection = get_collection(systems_database, cls.collection_name) 

265 if collection is None: 

266 return 0 

267 return collection.count_documents({}) 

268 

269 def insert(self): 

270 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

271 self.id = str(insert_result.inserted_id) 

272 return self.id 

273 

274 def update(self, update_dict): 

275 for key, value in update_dict.items(): 

276 setattr(self, key, value) 

277 self.collection().find_one_and_update( 

278 {"_id": ObjectId(self.id)}, 

279 {"$set": update_dict}, 

280 return_document=ReturnDocument.AFTER, 

281 ) 

282 

283 return self 

284 

285 def delete(self): 

286 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

287 return result.deleted_count > 0 

288 

289 

290class User(GenericMongo): 

291 collection_name: ClassVar[str] = "users" 

292 

293 firstname: str 

294 lastname: str 

295 email: str 

296 password: str 

297 is_active: bool | None = False 

298 is_admin: bool | None = False 

299 is_connected: bool | None = False 

300 company_id: str | None = None 

301 

302 def to_dict(self, exclude=None): 

303 if exclude is None: 

304 exclude = {"password"} 

305 return GenericMongo.to_dict(self, exclude=exclude) 

306 

307 @classmethod 

308 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

309 users = cls.get_all() 

310 if not users: 

311 is_admin = True 

312 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

313 user_collection = get_collection(systems_database, "users", create=True) 

314 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

315 if new_user is None: 

316 return None 

317 return {"user_id": str(new_user.inserted_id)} 

318 

319 @classmethod 

320 def update_info(cls, user: "UserUpdate", user_id: str): 

321 updated_user = cls.collection().find_one_and_update( 

322 {"_id": ObjectId(user_id)}, 

323 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

324 return_document=ReturnDocument.AFTER, 

325 ) 

326 updated_user["id"] = str(updated_user["_id"]) 

327 del (updated_user["_id"], updated_user["is_connected"]) 

328 return cls(**updated_user) 

329 

330 

331UserUpdate = create_update_model(User) 

332 

333 

334class Mode(TwinPadModel): 

335 mode_id: int 

336 name: str 

337 frequency_multiplier: float 

338 min_frequency: float 

339 

340 

341class DeviceUpdate(TwinPadModel): 

342 mode_id: int 

343 

344 

345class Device(GenericMongo): 

346 collection_name: ClassVar[str] = "devices" 

347 

348 device_id: str 

349 name: str 

350 description: str = "" 

351 modes: list[Mode] 

352 current_mode_id: int | None = None 

353 last_ping: float | None = None 

354 petri_network: Any 

355 pid: Any 

356 load: float | None = None 

357 tokens: list[int] = Field(default_factory=list) 

358 status: str 

359 

360 async def change_mode(self, update_dict, current_user: User): 

361 has_error = False 

362 

363 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

364 has_error = True 

365 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

366 elif self.current_mode_id is not None: 

367 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

368 else: 

369 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

370 command = Command( 

371 sent_at=time.time(), 

372 command_type="Mode change", 

373 description=description, 

374 user_id=current_user.id, 

375 ) 

376 

377 if has_error: 

378 command.response_time = 0 

379 command.succeeded = False 

380 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

381 else: 

382 response = await send_mode_change(self.device_id, update_dict.mode_id) 

383 command.receive_response(response) 

384 

385 Command.create(command) 

386 return response 

387 

388 @classmethod 

389 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

390 devices_by_id = {} 

391 for signal_id in signal_ids: 

392 device_id = signal_id.split(".")[0] 

393 if device_id not in devices_by_id: 

394 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

395 return devices_by_id 

396 

397 

398class DeviceSetup(GenericMongo): 

399 collection_name: ClassVar[str] = "device_setups" 

400 

401 device_ids: list[str] 

402 active: bool = False 

403 variable_mapping: dict[str, str] 

404 

405 

406DeviceSetupUpdate = create_update_model(DeviceSetup) 

407 

408 

409class DeviceState(GenericMongo): 

410 collection_name: ClassVar[str] = "devices_states" 

411 

412 timestamp: float 

413 mode: str | None = None 

414 load: float | None = None 

415 tokens: list[int] = Field(default_factory=list) 

416 modified_properties: list[str] = Field(default_factory=list) 

417 

418 @classmethod 

419 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

420 req_filter = query.mongodb_filter() 

421 items = [] 

422 if ":" in query.sort_by: 

423 sort_field, sort_order = query.sort_by.split(":") 

424 sort_order = int(sort_order) 

425 else: 

426 sort_field = query.sort_by 

427 sort_order = 1 

428 collection = get_collection(devices_states_database, device_id) 

429 if collection is None: 

430 total = 0 

431 cursor = [] 

432 else: 

433 total = collection.count_documents(req_filter) 

434 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

435 if (query.limit is not None) and (query.limit != 0): 

436 cursor = cursor.limit(query.limit) 

437 for item_dict in cursor: 

438 items.append( 

439 cls( 

440 timestamp=item_dict.get("precise_timestamp"), 

441 mode=item_dict.get("mode", None), 

442 load=item_dict.get("load", None), 

443 tokens=item_dict.get("tokens", Field(default_factory=list)), 

444 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

445 ) 

446 ) 

447 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

448 

449 

450class SignalSample(TwinPadModel): 

451 signal_id: str 

452 timestamp: float 

453 value: float | int | str | bool | None 

454 forced_value: float | int | str | bool | None = None 

455 

456 @classmethod 

457 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

458 

459 collection = get_signal_collection(signal_id) 

460 if collection is None: 

461 return None 

462 

463 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

464 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

465 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

466 first_bucket = None 

467 if bucket is not None: 

468 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

469 if first_bucket is not None: 

470 sample_data = collection.find_one( 

471 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

472 ) 

473 else: 

474 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

475 

476 if sample_data is None: 

477 return None 

478 

479 timestamp = sample_data["precise_timestamp"] 

480 

481 return cls( 

482 signal_id=signal_id, 

483 timestamp=timestamp, 

484 value=sample_data.get("value", None), 

485 forced_value=sample_data.get("forced_value", None), 

486 ) 

487 

488 @classmethod 

489 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

490 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

491 

492 @classmethod 

493 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

494 

495 collection = get_signal_collection(signal_id) 

496 if collection is None: 

497 return None 

498 

499 # Same workaround as above function, very effective to narrow down big sets of data 

500 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

501 last_bucket = None 

502 if bucket is not None: 

503 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

504 if last_bucket is not None: 

505 sample_data = collection.find_one( 

506 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

507 sort={"precise_timestamp": -1}, 

508 ) 

509 else: 

510 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

511 

512 if sample_data is None: 

513 return None 

514 

515 timestamp = sample_data["precise_timestamp"] 

516 

517 if device is None: 

518 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

519 if device is not None and device.last_ping is not None: 

520 if timestamp is None: 

521 timestamp = device.last_ping 

522 else: 

523 timestamp = max(timestamp, device.last_ping) 

524 return cls( 

525 signal_id=signal_id, 

526 timestamp=timestamp, 

527 value=sample_data.get("value", None), 

528 forced_value=sample_data.get("forced_value", None), 

529 ) 

530 

531 @classmethod 

532 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

533 collection = get_signal_collection(signal_id) 

534 if collection is None: 

535 return None 

536 

537 sample_data = collection.find_one( 

538 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1} 

539 ) 

540 if sample_data is None: 

541 return None 

542 

543 return cls( 

544 signal_id=signal_id, 

545 timestamp=sample_data.get("precise_timestamp"), 

546 value=sample_data.get("value"), 

547 forced_value=sample_data.get("forced_value"), 

548 ) 

549 

550 @classmethod 

551 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

552 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

553 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

554 

555 @classmethod 

556 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

557 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

558 

559 

560class SignalData(TwinPadModel): 

561 signal_id: str 

562 forcible: bool = True 

563 time_vector: list[float] 

564 values: list[float | int | str | None] 

565 forced_values: list[float | int | str | None] 

566 

567 data_start: float | None = None 

568 data_end: float | None = None 

569 

570 number_samples: int = 0 

571 number_samples_db: int = 0 

572 

573 db_query_time: float = 0.0 

574 init_time: float = 0.0 

575 data_processing_time: float = 0.0 

576 

577 @classmethod 

578 def get_from_signal_id( 

579 cls, 

580 signal_id: str, 

581 min_timestamp: float = None, 

582 max_timestamp: float = None, 

583 window_min_timestamp: float = None, 

584 window_max_timestamp: float = None, 

585 interpolate_bounds: bool = True, 

586 max_documents: int = None, 

587 ) -> Self: 

588 

589 now = time.time() 

590 

591 req_signal = {} 

592 if min_timestamp is not None: 

593 req_signal.setdefault("timestamp", {}) 

594 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

595 if max_timestamp is not None: 

596 req_signal.setdefault("timestamp", {}) 

597 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

598 

599 collection = get_signal_collection(signal_id) 

600 if collection is None: 

601 return cls( 

602 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

603 ) 

604 

605 db_req_start = time.time() 

606 

607 sort_step = {"$sort": {"precise_timestamp": 1}} 

608 number_results = collection.count_documents(req_signal) 

609 

610 pipeline = [] 

611 if req_signal: 

612 pipeline.append({"$match": req_signal}) # Filter data if needed 

613 

614 pipeline.extend( 

615 [ 

616 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

617 sort_step, 

618 ] 

619 ) 

620 

621 if max_documents is not None and max_documents < number_results: 

622 unsampling_ratio = math.ceil(number_results / max_documents) 

623 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

624 pipeline.extend( 

625 [ 

626 { 

627 "$setWindowFields": { 

628 "sortBy": {"precise_timestamp": 1}, 

629 "output": {"index": {"$documentNumber": {}}}, 

630 } 

631 }, 

632 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

633 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

634 {"$replaceRoot": {"newRoot": "$doc"}}, 

635 {"$unset": ["index", "group_id"]}, 

636 {"$sort": {"precise_timestamp": 1}}, 

637 ] 

638 ) 

639 

640 # logger.info(f"pipeline: %s", str(pipeline)) 

641 cursor = collection.aggregate(pipeline) 

642 db_req_time = time.time() - db_req_start 

643 

644 init_time = time.time() 

645 

646 results = cursor.to_list() 

647 time_vector = [] 

648 values = [] 

649 forced_values = [] 

650 for s in results: 

651 time_vector.append(s["precise_timestamp"]) 

652 values.append(s.get("value", None)) 

653 forced_values.append(s.get("forced_value", None)) 

654 

655 signal = Signal.get_from_signal_id(signal_id) 

656 class_ = signal.signal_data_class 

657 

658 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

659 time_vector, values, forced_values = cls.interpolate_bounds( 

660 class_, 

661 collection, 

662 signal_id, 

663 time_vector, 

664 values, 

665 forced_values, 

666 window_min_timestamp, 

667 window_max_timestamp, 

668 ) 

669 

670 if values: 

671 # TODO: check below. a bit strange 

672 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

673 # Adding last value as it should be repeated 

674 time_vector.append(now) 

675 values.append(values[-1]) 

676 forced_values.append(forced_values[-1]) 

677 

678 init_time = time.time() - init_time 

679 

680 # See line 292 for explanation 

681 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

682 first_bucket = None 

683 if bucket is not None: 

684 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

685 if first_bucket is not None: 

686 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

687 else: 

688 data_start = None 

689 

690 last_bucket = None 

691 if bucket is not None: 

692 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

693 if last_bucket is not None: 

694 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

695 else: 

696 data_end = None 

697 

698 return class_( 

699 signal_id=signal_id, 

700 forcible=signal.forcible, 

701 time_vector=time_vector, 

702 values=values, 

703 forced_values=forced_values, 

704 data_start=data_start, 

705 data_end=data_end, 

706 number_samples=len(values), 

707 number_samples_db=number_results, 

708 db_query_time=db_req_time, 

709 init_time=init_time, 

710 ) 

711 

712 @staticmethod 

713 def interpolate_bounds( 

714 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

715 ): 

716 sample_right = None 

717 # Fetching right side value & interpolation 

718 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

719 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

720 sample_right = collection.find_one( 

721 { 

722 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

723 "value": {"$exists": True}, 

724 }, 

725 sort={"precise_timestamp": -1}, 

726 ) 

727 if sample_right: 

728 if time_vector: 

729 right_sd = class_( 

730 signal_id=signal_id, 

731 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

732 values=[values[-1], sample_right.get("value", None)], 

733 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

734 ) 

735 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

736 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

737 else: 

738 max_ts_value = sample_right.get("value", None) 

739 max_ts_forced_value = sample_right.get("forced_value", None) 

740 time_vector.append(window_max_timestamp) 

741 values.append(max_ts_value) 

742 forced_values.append(max_ts_forced_value) 

743 

744 # Fetching left side value & interpolation 

745 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

746 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

747 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

748 sample_left = sample_right 

749 sample_left = collection.find_one( 

750 { 

751 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

752 "value": {"$exists": True}, 

753 }, 

754 sort={"precise_timestamp": -1}, 

755 ) 

756 

757 if sample_left: 

758 if time_vector: 

759 left_sd = class_( 

760 signal_id=signal_id, 

761 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

762 values=[sample_left["value"], values[0]], 

763 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

764 ) 

765 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

766 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

767 else: 

768 min_ts_value = sample_left.get("value", None) 

769 min_ts_forced_value = sample_left.get("forced_value", None) 

770 time_vector.insert(0, window_min_timestamp) 

771 values.insert(0, min_ts_value) 

772 forced_values.insert(0, min_ts_forced_value) 

773 

774 return time_vector, values, forced_values 

775 

776 def interpolate_values(self, new_time_vector: list[float]): 

777 return self.interpolate(new_time_vector, self.values) 

778 

779 def interpolate_forced_values(self, new_time_vector: list[float]): 

780 return self.interpolate(new_time_vector, self.forced_values) 

781 

782 def uniform_desampling(self, number_samples_max: int) -> Self: 

783 data_processing_time = time.time() 

784 if number_samples_max and self.number_samples > number_samples_max: 

785 new_time_vector = npy.linspace( 

786 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

787 ).tolist() 

788 values = self.interpolate_values(new_time_vector) 

789 forced_values = self.interpolate_forced_values(new_time_vector) 

790 time_vector = new_time_vector 

791 number_samples = len(time_vector) 

792 else: 

793 time_vector = self.time_vector 

794 number_samples = len(self.values) 

795 values = self.values[:] 

796 forced_values = self.forced_values[:] 

797 data_processing_time = time.time() - data_processing_time 

798 

799 return self.__class__( 

800 signal_id=self.signal_id, 

801 time_vector=time_vector, 

802 values=values, 

803 forced_values=forced_values, 

804 number_samples=number_samples, 

805 number_samples_db=self.number_samples, 

806 data_start=self.data_start, 

807 data_end=self.data_end, 

808 db_query_time=self.db_query_time, 

809 init_time=self.init_time, 

810 data_processing_time=self.data_processing_time + data_processing_time, 

811 ) 

812 

813 def interest_window_desampling( 

814 self, 

815 window_max_number_samples: int, 

816 outside_max_number_samples: int, 

817 window_min_timestamp: float | None = None, 

818 window_max_timestamp: float | None = None, 

819 ) -> Self: 

820 """Performs a sampling in a window of interest and outside.""" 

821 

822 if not self.time_vector: 

823 return self 

824 

825 if window_min_timestamp is None: 

826 window_min_timestamp = self.time_vector[0] 

827 if window_max_timestamp is None: 

828 window_max_timestamp = self.time_vector[-1] 

829 

830 data_processing_time = time.time() 

831 

832 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

833 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

834 

835 time_vector_before = self.time_vector[:index_window_start] 

836 time_vector_window = self.time_vector[index_window_start:index_window_end] 

837 time_vector_after = self.time_vector[index_window_end:] 

838 

839 # Resampling window 

840 if time_vector_window: 

841 # Ensurring window bounds 

842 if time_vector_window[0] != window_min_timestamp: 

843 time_vector_window.insert(0, window_min_timestamp) 

844 if time_vector_window[-1] != window_max_timestamp: 

845 time_vector_window.append(window_max_timestamp) 

846 else: 

847 time_vector_window = [window_min_timestamp, window_max_timestamp] 

848 

849 if len(time_vector_window) > window_max_number_samples: 

850 # Resampling 

851 new_window_time_vector = npy.linspace( 

852 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

853 ).tolist() 

854 time_vector_window = new_window_time_vector 

855 

856 # Resampling outside 

857 number_samples_before = len(time_vector_before) 

858 number_samples_after = len(time_vector_after) 

859 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

860 new_number_samples_before = min( 

861 number_samples_before, 

862 math.ceil( 

863 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

864 ), 

865 ) 

866 new_number_samples_after = min( 

867 number_samples_after, 

868 math.ceil( 

869 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

870 ), 

871 ) 

872 # Adjusting numbers as math.ceil can do +1 on sum 

873 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

874 if new_number_samples_before > new_number_samples_after: 

875 new_number_samples_before -= 1 

876 else: 

877 new_number_samples_after -= 1 

878 

879 if new_number_samples_before > 0: 

880 new_time_vector_before = npy.linspace( 

881 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

882 ).tolist() 

883 time_vector_before = new_time_vector_before 

884 

885 if new_number_samples_after > 0: 

886 new_time_vector_after = npy.linspace( 

887 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

888 ).tolist()[::-1] 

889 time_vector_after = new_time_vector_after 

890 

891 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

892 values = self.interpolate_values(new_time_vector) 

893 forced_values = self.interpolate_forced_values(new_time_vector) 

894 number_samples = len(values) 

895 

896 data_processing_time = time.time() - data_processing_time 

897 

898 return self.__class__( 

899 signal_id=self.signal_id, 

900 forcible=self.forcible, 

901 time_vector=new_time_vector, 

902 values=values, 

903 forced_values=forced_values, 

904 number_samples=number_samples, 

905 number_samples_db=self.number_samples, 

906 data_start=self.data_start, 

907 data_end=self.data_end, 

908 db_query_time=self.db_query_time, 

909 init_time=self.init_time, 

910 data_processing_time=self.data_processing_time + data_processing_time, 

911 ) 

912 

913 def csv_export(self): 

914 output = io.StringIO() 

915 writer = csv.writer(output) 

916 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

917 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

918 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

919 return output.getvalue().encode("utf-8") 

920 

921 def prestoplot_export(self): 

922 clean_signal_id = self.signal_id.replace(".", "_") 

923 if clean_signal_id[0].isnumeric(): 

924 clean_signal_id = "_" + clean_signal_id 

925 

926 output = io.StringIO() 

927 output.write("# Encoding:\tUTF-8\n") 

928 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

929 output.write("ISO8601\tnone\tnone\n") 

930 output.write(f"# Description :\t{clean_signal_id}\n") 

931 

932 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

933 output.write( 

934 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

935 ) 

936 return output.getvalue().encode("utf-8") 

937 

938 

939class NumericSignalData(SignalData): 

940 data_type: str = "float" 

941 values: list[float | int | None] 

942 forced_values: list[float | int | None] 

943 

944 def interpolate(self, new_time_vector: list[float], items): 

945 items = [npy.nan if s is None else s for s in items] 

946 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

947 

948 def uniform_desampling(self, number_samples_max: int) -> Self: 

949 data_processing_time = time.time() 

950 if number_samples_max and self.number_samples > number_samples_max: 

951 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

952 forced_values = self.interpolate_forced_values(time_vector) 

953 number_samples = len(time_vector) 

954 else: 

955 time_vector = self.time_vector 

956 number_samples = len(self.values) 

957 values = self.values[:] 

958 forced_values = self.forced_values[:] 

959 data_processing_time = time.time() - data_processing_time 

960 

961 return self.__class__( 

962 signal_id=self.signal_id, 

963 time_vector=time_vector, 

964 values=values, 

965 forced_values=forced_values, 

966 number_samples=number_samples, 

967 number_samples_db=self.number_samples, 

968 data_start=self.data_start, 

969 data_end=self.data_end, 

970 db_query_time=self.db_query_time, 

971 init_time=self.init_time, 

972 data_processing_time=self.data_processing_time + data_processing_time, 

973 ) 

974 

975 def interest_window_desampling( 

976 self, 

977 window_max_number_samples: int, 

978 outside_max_number_samples: int, 

979 window_min_timestamp: float | None = None, 

980 window_max_timestamp: float | None = None, 

981 ) -> Self: 

982 """Performs a sampling in a window of interest and outside.""" 

983 

984 if not self.time_vector: 

985 return self 

986 

987 if window_min_timestamp is None: 

988 window_min_timestamp = self.time_vector[0] 

989 if window_max_timestamp is None: 

990 window_max_timestamp = self.time_vector[-1] 

991 

992 data_processing_time = time.time() 

993 

994 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

995 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

996 

997 time_vector_before = self.time_vector[:index_window_start] 

998 time_vector_window = self.time_vector[index_window_start:index_window_end] 

999 time_vector_after = self.time_vector[index_window_end:] 

1000 

1001 values_before = self.values[:index_window_start] 

1002 values_window = self.values[index_window_start:index_window_end] 

1003 values_after = self.values[index_window_end:] 

1004 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1005 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1006 

1007 # Resampling window 

1008 if time_vector_window: 

1009 # Ensurring window bounds 

1010 if time_vector_window[0] != window_min_timestamp: 

1011 time_vector_window.insert(0, window_min_timestamp) 

1012 values_window.insert(0, window_min_value) 

1013 if time_vector_window[-1] != window_max_timestamp: 

1014 time_vector_window.append(window_max_timestamp) 

1015 values_window.append(window_max_value) 

1016 else: 

1017 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1018 values_window = [window_min_value, window_max_value] 

1019 

1020 if len(time_vector_window) > window_max_number_samples: 

1021 # Resampling 

1022 time_vector_window, values_window = downsample_list( 

1023 time_vector_window, values_window, window_max_number_samples 

1024 ) 

1025 

1026 # Resampling outside 

1027 number_samples_before = len(time_vector_before) 

1028 number_samples_after = len(time_vector_after) 

1029 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1030 new_number_samples_before = min( 

1031 number_samples_before, 

1032 math.ceil( 

1033 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1034 ), 

1035 ) 

1036 new_number_samples_after = min( 

1037 number_samples_after, 

1038 math.ceil( 

1039 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1040 ), 

1041 ) 

1042 # Adjusting numbers as math.ceil can do +1 on sum 

1043 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1044 if new_number_samples_before > new_number_samples_after: 

1045 new_number_samples_before -= 1 

1046 else: 

1047 new_number_samples_after -= 1 

1048 

1049 if new_number_samples_before > 0: 

1050 time_vector_before, values_before = downsample_list( 

1051 time_vector_before, values_before, new_number_samples_before 

1052 ) 

1053 

1054 if new_number_samples_after > 0: 

1055 time_vector_after, values_after = downsample_list( 

1056 time_vector_after, values_after, new_number_samples_after 

1057 ) 

1058 

1059 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1060 values = values_before + values_window + values_after 

1061 forced_values = self.interpolate_forced_values(new_time_vector) 

1062 number_samples = len(values) 

1063 

1064 data_processing_time = time.time() - data_processing_time 

1065 

1066 return self.__class__( 

1067 signal_id=self.signal_id, 

1068 time_vector=new_time_vector, 

1069 values=values, 

1070 forced_values=forced_values, 

1071 number_samples=number_samples, 

1072 number_samples_db=self.number_samples, 

1073 data_start=self.data_start, 

1074 data_end=self.data_end, 

1075 db_query_time=self.db_query_time, 

1076 init_time=self.init_time, 

1077 data_processing_time=self.data_processing_time + data_processing_time, 

1078 ) 

1079 

1080 

1081class StringSignalData(SignalData): 

1082 data_type: str = "str" 

1083 values: list[str | None] 

1084 forced_values: list[str | None] 

1085 

1086 def interpolate(self, new_time_vector: list[float], items): 

1087 # Find the indices of the values in xp that are just smaller or equal to x 

1088 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1089 indices = npy.clip(indices, 0, len(items) - 1) 

1090 # Return the corresponding left string values from fp 

1091 return [items[i] for i in indices] 

1092 

1093 

1094class SignalsData(TwinPadModel): 

1095 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1096 data_processing_time: float 

1097 data_start: float | None 

1098 data_end: float | None 

1099 

1100 @classmethod 

1101 def get_from_signal_ids( 

1102 cls, 

1103 signal_ids: list[str], 

1104 min_timestamp: float = None, 

1105 max_timestamp: float = None, 

1106 window_min_timestamp: float = None, 

1107 window_max_timestamp: float = None, 

1108 interpolate_bounds: bool = True, 

1109 max_documents: int = None, 

1110 ) -> Self: 

1111 signals_data = [] 

1112 data_start = None 

1113 data_end = None 

1114 if max_timestamp is None: 

1115 max_timestamp = time.time() 

1116 data_processing_time = 0.0 

1117 for signal_id in signal_ids: 

1118 signal_data = SignalData.get_from_signal_id( 

1119 signal_id=signal_id, 

1120 min_timestamp=min_timestamp, 

1121 max_timestamp=max_timestamp, 

1122 window_min_timestamp=window_min_timestamp, 

1123 window_max_timestamp=window_max_timestamp, 

1124 interpolate_bounds=interpolate_bounds, 

1125 max_documents=max_documents, 

1126 ) 

1127 data_processing_time += signal_data.data_processing_time 

1128 signals_data.append(signal_data) 

1129 if signal_data.data_start is not None: 

1130 if data_start is None: 

1131 data_start = signal_data.data_start 

1132 else: 

1133 data_start = min(signal_data.data_start, data_start) 

1134 if signal_data.data_end is not None: 

1135 if data_end is None: 

1136 data_end = signal_data.data_end 

1137 else: 

1138 data_end = max(signal_data.data_end, data_end) 

1139 

1140 return cls( 

1141 signals_data=signals_data, 

1142 data_processing_time=data_processing_time, 

1143 data_start=data_start, 

1144 data_end=data_end, 

1145 ) 

1146 

1147 def uniform_desampling(self, number_samples_max: int) -> Self: 

1148 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1149 return SignalsData( 

1150 signals_data=signals_data, 

1151 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1152 data_start=self.data_start, 

1153 data_end=self.data_end, 

1154 ) 

1155 

1156 def interest_window_desampling( 

1157 self, 

1158 window_max_number_samples: int, 

1159 outside_max_number_samples: int, 

1160 window_min_timestamp: float = None, 

1161 window_max_timestamp: float = None, 

1162 ) -> Self: 

1163 signals_data = [ 

1164 s.interest_window_desampling( 

1165 window_max_number_samples=window_max_number_samples, 

1166 outside_max_number_samples=outside_max_number_samples, 

1167 window_min_timestamp=window_min_timestamp, 

1168 window_max_timestamp=window_max_timestamp, 

1169 ) 

1170 for s in self.signals_data 

1171 ] 

1172 

1173 return SignalsData( 

1174 signals_data=signals_data, 

1175 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1176 data_start=self.data_start, 

1177 data_end=self.data_end, 

1178 ) 

1179 

1180 def zip_export(self, file_format: str = "csv"): 

1181 # return self.signals_data[0].csv_export() 

1182 zip_buffer = io.BytesIO() 

1183 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1184 for signal_data in self.signals_data: 

1185 if file_format == "csv": 

1186 export_io = signal_data.csv_export() 

1187 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1188 elif file_format == "prestoplot": 

1189 export_io = signal_data.prestoplot_export() 

1190 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1191 else: 

1192 raise ValueError(f"Format not found. Got: {file_format}") 

1193 zip_bytes = zip_buffer.getvalue() 

1194 # zip_bytes.seek(0) 

1195 return zip_bytes 

1196 

1197 def hdf5_export(self): 

1198 hdf5_buffer = io.BytesIO() 

1199 custom_type_float = npy.dtype( 

1200 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1201 ) 

1202 custom_type_string = npy.dtype( 

1203 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1204 ) 

1205 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1206 for signal_data in self.signals_data: 

1207 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1208 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1209 if signal_data.data_type == "str": 

1210 export_data = npy.array( 

1211 list( 

1212 zip( 

1213 date_vector, 

1214 signal_data.time_vector, 

1215 signal_data.values, 

1216 signal_data.forced_values, 

1217 ) 

1218 ), 

1219 dtype=custom_type_string, 

1220 ) 

1221 else: 

1222 export_data = npy.array( 

1223 list( 

1224 zip( 

1225 date_vector, 

1226 signal_data.time_vector, 

1227 signal_data.values, 

1228 signal_data.forced_values, 

1229 ) 

1230 ), 

1231 dtype=custom_type_float, 

1232 ) 

1233 signal_group["data"] = export_data 

1234 return hdf5_buffer.getvalue() 

1235 

1236 

1237class SignalStatus(TwinPadModel): 

1238 status: str 

1239 reason: str 

1240 delay: float | None 

1241 

1242 

1243class DigitizationFunction(TwinPadModel): 

1244 bits: int | None = None 

1245 min_value: float 

1246 max_value: float 

1247 min_raw_value: float 

1248 max_raw_value: float 

1249 

1250 

1251class SignalUpdate(TwinPadModel): 

1252 value: float | str | bool | int | None = None 

1253 forced_value: float | str | bool | int | None = None 

1254 timestamp: int | None = None 

1255 

1256 

1257class SignalType(str, Enum): 

1258 command = "command" 

1259 sensor = "sensor" 

1260 external_sensor = "external_sensor" 

1261 

1262 

1263SIGNALDATA_TYPES = { 

1264 "int": NumericSignalData, 

1265 "float": NumericSignalData, 

1266 "str": StringSignalData, 

1267 "bool": NumericSignalData, 

1268 "epoch": NumericSignalData, 

1269} 

1270 

1271 

1272class Signal(GenericMongo): 

1273 collection_name: ClassVar[str] = "signals" 

1274 

1275 signal_id: str 

1276 frequency: float 

1277 unit: str | None 

1278 description: str 

1279 type: SignalType 

1280 data_type: str 

1281 precision_digits: int | None 

1282 forcible: bool 

1283 

1284 digitization_function: DigitizationFunction | None 

1285 

1286 @property 

1287 def device(self) -> Device: 

1288 device_id = self.signal_id.split(".")[0] 

1289 device = Device.get_one_by_attribute("device_id", device_id) 

1290 return device 

1291 

1292 @cached_property 

1293 def signal_data_class(self): 

1294 if self.data_type in SIGNALDATA_TYPES: 

1295 return SIGNALDATA_TYPES[self.data_type] 

1296 if self.data_type.startswith("enum"): 

1297 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1298 raise ValueError(f"Unhandled python type: {self.data_type}") 

1299 

1300 @cached_property 

1301 def python_type(self): 

1302 if self.data_type in TYPES: 

1303 return TYPES[self.data_type] 

1304 if self.data_type.startswith("enum"): 

1305 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1306 return Literal[*choices] 

1307 raise ValueError(f"Unhandled python type: {self.data_type}") 

1308 

1309 @computed_field 

1310 @property 

1311 def status(self) -> SignalStatus: 

1312 now = time.time() 

1313 status = "up" 

1314 reason = "" 

1315 

1316 # See line 285 for explanation 

1317 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1318 last_bucket = None 

1319 if bucket is not None: 

1320 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1321 if last_bucket is None: 

1322 status = "no data" 

1323 reason = "signal does not exist" 

1324 return SignalStatus(status=status, reason=reason, delay=None) 

1325 

1326 try: 

1327 last_date = last_bucket["control"]["max"]["timestamp"] 

1328 last_date = last_date.replace(tzinfo=pytz.UTC) 

1329 last_value_ts = last_date.timestamp() 

1330 except IndexError: 

1331 last_value_ts = None 

1332 

1333 if last_value_ts is None: 

1334 delay = None 

1335 reason = "No data from signal" 

1336 else: 

1337 # Since device is a computed property, only compute it once 

1338 device = self.device 

1339 if device is not None and device.last_ping is not None: 

1340 last_value_ts = max(last_value_ts, device.last_ping) 

1341 delay = now - last_value_ts 

1342 if delay > DEVICE_TIMEOUT: 

1343 status = "down" 

1344 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1345 return SignalStatus(status=status, reason=reason, delay=delay) 

1346 

1347 @classmethod 

1348 def status_batch(cls, signal_ids: list[str], devices_by_ids: dict[str, Device]) -> dict[str, SignalStatus]: 

1349 """Computes the status of multiple signals in batch from their signal ids. 

1350 

1351 :param signal_ids: Signal IDs of the wanted signals 

1352 :type signal_ids: list[str] 

1353 :param devices_by_ids: A pre-computed map of all signal IDs linked to their :py:class:`Device`. 

1354 :type devices_by_ids: dict[str, Device] 

1355 :return: A dictionary with the signal ID as the keys and their respective :py:class:`SignalStatus` as its values. 

1356 :rtype: dict[str, SignalStatus] 

1357 """ 

1358 statuses_by_signal_id = {} 

1359 

1360 buckets = get_signal_collections_batch([f"system.buckets.{signal_id}" for signal_id in signal_ids]) 

1361 for signal_id, bucket in zip(signal_ids, buckets): 

1362 now = time.time() 

1363 status = "up" 

1364 reason = "" 

1365 last_bucket = None 

1366 if bucket is not None: 

1367 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1368 if last_bucket is None: 

1369 status = "no data" 

1370 reason = "signal does not exist" 

1371 statuses_by_signal_id[signal_id] = SignalStatus(status=status, reason=reason, delay=None) 

1372 continue 

1373 

1374 try: 

1375 last_date = last_bucket["control"]["max"]["timestamp"] 

1376 last_date = last_date.replace(tzinfo=pytz.UTC) 

1377 last_value_ts = last_date.timestamp() 

1378 except IndexError: 

1379 last_value_ts = None 

1380 

1381 if last_value_ts is None: 

1382 delay = None 

1383 reason = "No data from signal" 

1384 else: 

1385 # Since device is a computed property, only compute it once 

1386 device = devices_by_ids.get(signal_id.split(".")[0], None) 

1387 delay = now - last_value_ts 

1388 if device is not None and device.status == "up": 

1389 delay = 0 

1390 if delay > DEVICE_TIMEOUT: 

1391 status = "down" 

1392 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1393 statuses_by_signal_id[signal_id] = SignalStatus(status=status, reason=reason, delay=delay) 

1394 

1395 return statuses_by_signal_id 

1396 

1397 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1398 command = Command( 

1399 sent_at=time.time(), 

1400 command_type="Signal command", 

1401 user_id=current_user.id, 

1402 ) 

1403 

1404 has_input_error = False 

1405 error_message = "" 

1406 

1407 if self.data_type.startswith("enum"): 

1408 enum_options = get_args(self.python_type) 

1409 

1410 if update_dict.value is not None and update_dict.value not in enum_options: 

1411 has_input_error = True 

1412 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1413 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1414 has_input_error = True 

1415 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1416 else: 

1417 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1418 has_input_error = True 

1419 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1420 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1421 has_input_error = True 

1422 error_message += ( 

1423 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1424 ) 

1425 

1426 if has_input_error: 

1427 command.response_time = 0 

1428 command.succeeded = False 

1429 command.description = f"Tried to modify signal {self.signal_id}" 

1430 response = {"error": True, "status_code": 400, "message": error_message} 

1431 else: 

1432 response = await send_signal_value(self.signal_id, update_dict) 

1433 command.receive_response(response) 

1434 

1435 Command.create(command) 

1436 return response 

1437 

1438 @classmethod 

1439 def get_from_signal_id(cls, signal_id) -> Self: 

1440 """Could be generic from mongo""" 

1441 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1442 if not raw_value: 

1443 return None 

1444 del raw_value["_id"] 

1445 return cls.dict_to_object(raw_value) 

1446 

1447 @classmethod 

1448 def get_all_ids(cls) -> list[str]: 

1449 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1450 

1451 return [signal["signal_id"] for signal in cursor] 

1452 

1453 async def number_samples(self): 

1454 collection = get_signal_collection(signal_id=self.signal_id) 

1455 if collection is None: 

1456 return 0 

1457 

1458 number_samples = collection.estimated_document_count() 

1459 

1460 number_samples_async_collection = await get_async_collection( 

1461 systems_async_database, "number_samples", create=True, time_series=True 

1462 ) 

1463 

1464 loop = asyncio.get_running_loop() 

1465 loop.create_task( 

1466 number_samples_async_collection.insert_one( 

1467 { 

1468 "timestamp": datetime.datetime.now(pytz.UTC), 

1469 "signal_id": self.signal_id, 

1470 "number_samples": number_samples, 

1471 } 

1472 ) 

1473 ) 

1474 

1475 return number_samples 

1476 

1477 @classmethod 

1478 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]: 

1479 number_samples_by_id = {} 

1480 collections = get_signal_collections_batch(signal_ids) 

1481 number_samples_async_collection = await get_async_collection( 

1482 systems_async_database, "number_samples", create=True, time_series=True 

1483 ) 

1484 

1485 for signal_id, collection in zip(signal_ids, collections): 

1486 if collection is None: 

1487 number_samples_by_id[signal_id] = 0 

1488 continue 

1489 

1490 number_samples = collection.estimated_document_count() 

1491 

1492 number_samples_by_id[signal_id] = number_samples 

1493 

1494 now = datetime.datetime.now(pytz.UTC) 

1495 loop = asyncio.get_running_loop() 

1496 loop.create_task( 

1497 number_samples_async_collection.insert_many( 

1498 [ 

1499 { 

1500 "timestamp": now, 

1501 "signal_id": signal_id, 

1502 "number_samples": number_samples, 

1503 } 

1504 for signal_id, number_samples in number_samples_by_id.items() 

1505 ] 

1506 ) 

1507 ) 

1508 

1509 return number_samples_by_id 

1510 

1511 def sample_datasize(self): 

1512 return signals_database.command("collstats", self.signal_id)["size"] 

1513 

1514 @classmethod 

1515 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1516 result = cls.collection().aggregate( 

1517 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1518 ) 

1519 

1520 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1521 

1522 

1523class ServicesStatus(TwinPadModel): 

1524 backend: str 

1525 cloud_broker: str 

1526 time_series_database: str 

1527 signal_storage: str 

1528 heartbeat_storage: str 

1529 data_analyzer: str 

1530 

1531 @classmethod 

1532 def check(cls) -> Self: 

1533 return cls( 

1534 cloud_broker=ping(RABBITMQ_HOST), 

1535 backend="up", 

1536 time_series_database=ping(MONGO_HOST), 

1537 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1538 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1539 data_analyzer=ping(DATA_ANALYZER_HOST), 

1540 ) 

1541 

1542 

1543def ping(host): 

1544 try: 

1545 if ping3.ping(host, timeout=0.8): 

1546 return "up" 

1547 except PermissionError: 

1548 pass 

1549 return "down" 

1550 

1551 

1552class Event(GenericMongo): 

1553 collection_name: ClassVar[str] = "events" 

1554 

1555 name: str 

1556 timestamp: float 

1557 event_rule_id: str 

1558 

1559 @computed_field 

1560 @cached_property 

1561 def event_rule(self) -> "EventRule": 

1562 return EventRule.get_from_id(self.event_rule_id) 

1563 

1564 @classmethod 

1565 def dict_to_object(cls, dict_): 

1566 """Refine to convert timestamp to datetime for mongodb.""" 

1567 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1568 return super().dict_to_object(dict_) 

1569 

1570 

1571class TwinPadActivity(GenericMongo): 

1572 timestamp: float 

1573 amount: int 

1574 

1575 @classmethod 

1576 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

1577 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1578 number_events_collection = get_collection(systems_database, "number_events") 

1579 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1580 items = [] 

1581 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1582 if number_events_collection is None or recompute_amount: 

1583 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1584 number_events_collection.delete_many({}) 

1585 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1586 if first_event is None: 

1587 return items 

1588 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1589 tzinfo=pytz.UTC 

1590 ) 

1591 while last_computed_day < TODAY: 

1592 day_nb_events = events_collection.count_documents( 

1593 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1594 ) 

1595 if day_nb_events > 0: 

1596 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

1597 last_computed_day += ONE_DAY_OFFSET 

1598 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1599 if number_events_today > 0: 

1600 number_events_collection.delete_many({"timestamp": TODAY}) 

1601 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

1602 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1603 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1604 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1605 for day in number_events: 

1606 day["timestamp"] = day["timestamp"].timestamp() 

1607 items.append(cls.mongo_dict_to_object(day)) 

1608 return items 

1609 

1610 @classmethod 

1611 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1612 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1613 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1614 signals_number_samples_collection = get_collection( 

1615 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

1616 ) 

1617 items = [] 

1618 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1619 if number_samples_collection is None or recompute_amount: 

1620 number_samples_collection = get_collection( 

1621 systems_database, "number_received_samples", create=True, time_series=True 

1622 ) 

1623 number_samples_collection.delete_many({}) 

1624 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

1625 if first_sample is None: 

1626 return items 

1627 # compute from day of first found event 

1628 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

1629 tzinfo=pytz.UTC 

1630 ) 

1631 while last_computed_day < TODAY: 

1632 number_samples_request = signals_number_samples_collection.aggregate( 

1633 [ 

1634 { 

1635 "$match": { 

1636 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

1637 } 

1638 }, 

1639 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1640 ] 

1641 ).to_list() 

1642 if len(number_samples_request) == 0: 

1643 number_samples = 0 

1644 else: 

1645 number_samples = number_samples_request[0].get("number_samples", 0) 

1646 if number_samples > 0: 

1647 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

1648 last_computed_day += ONE_DAY_OFFSET 

1649 number_samples_request = signals_number_samples_collection.aggregate( 

1650 [ 

1651 {"$match": {"timestamp": {"$gte": TODAY}}}, 

1652 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1653 ] 

1654 ).to_list() 

1655 if len(number_samples_request) == 0: 

1656 number_samples_today = 0 

1657 else: 

1658 number_samples_today = number_samples_request[0].get("number_samples", 0) 

1659 if number_samples_today > 0: 

1660 number_samples_collection.delete_many({"timestamp": TODAY}) 

1661 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

1662 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1663 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1664 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1665 for day in number_events: 

1666 day["timestamp"] = day["timestamp"].timestamp() 

1667 items.append(cls.mongo_dict_to_object(day)) 

1668 return items 

1669 

1670 @classmethod 

1671 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1672 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1673 number_commands_collection = get_collection(systems_database, "number_commands") 

1674 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

1675 items = [] 

1676 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1677 if number_commands_collection is None or recompute_amount: 

1678 number_commands_collection = get_collection( 

1679 systems_database, "number_commands", create=True, time_series=True 

1680 ) 

1681 number_commands_collection.delete_many({}) 

1682 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

1683 if first_command is None: 

1684 return items 

1685 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

1686 tzinfo=pytz.UTC 

1687 ) 

1688 while last_computed_day < TODAY: 

1689 day_nb_commands = commands_collection.count_documents( 

1690 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1691 ) 

1692 if day_nb_commands > 0: 

1693 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

1694 last_computed_day += ONE_DAY_OFFSET 

1695 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1696 if number_commands_today > 0: 

1697 number_commands_collection.delete_many({"timestamp": TODAY}) 

1698 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

1699 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1700 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1701 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1702 for day in number_commands: 

1703 day["timestamp"] = day["timestamp"].timestamp() 

1704 items.append(cls.mongo_dict_to_object(day)) 

1705 return items 

1706 

1707 

1708class EventRule(GenericMongo): 

1709 collection_name: ClassVar[str] = "event_rules" 

1710 

1711 name: str 

1712 formula: str 

1713 variables: list[str] 

1714 

1715 @computed_field 

1716 @cached_property 

1717 def number_events(self) -> int: 

1718 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1719 

1720 

1721class Company(GenericMongo): 

1722 collection_name: ClassVar[str] = "companies" 

1723 name: str 

1724 

1725 

1726class Campaign(GenericMongo): 

1727 collection_name: ClassVar[str] = "campaigns" 

1728 

1729 # Properties 

1730 id: str | None = None 

1731 name: str 

1732 description: str | None = None 

1733 

1734 @classmethod 

1735 def create(cls, campaign: Self): 

1736 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1737 if new_campaign is None: 

1738 return None 

1739 return {"campaign_id": str(new_campaign.inserted_id)} 

1740 

1741 @classmethod 

1742 def update(cls, campaign: Self): 

1743 updated_campaign = cls.collection().find_one_and_update( 

1744 {"_id": ObjectId(campaign.id)}, 

1745 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1746 return_document=ReturnDocument.AFTER, 

1747 ) 

1748 return updated_campaign 

1749 

1750 @classmethod 

1751 def delete(cls, campaign_id): 

1752 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1753 return deleted_user 

1754 

1755 

1756class Phase(GenericMongo): 

1757 collection_name: ClassVar[str] = "phases" 

1758 

1759 # Properties 

1760 id: str | None = None 

1761 name: str 

1762 description: str | None = None 

1763 start_at: float 

1764 end_at: float 

1765 

1766 # FK 

1767 campaign_id: str 

1768 

1769 # @classmethod 

1770 # def get_by_date(cls, datetime: float): 

1771 # phases = [] 

1772 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1773 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1774 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1775 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1776 # if phases is None: 

1777 # return None 

1778 # return phases 

1779 

1780 @classmethod 

1781 def create(cls, phase: Self): 

1782 phase = Phase( 

1783 name=phase.name, 

1784 description=phase.description, 

1785 start_at=phase.start_at, 

1786 end_at=phase.end_at, 

1787 campaign_id=phase.campaign_id, 

1788 ) 

1789 phase_collection = get_collection(systems_database, "phases", create=True) 

1790 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1791 if new_phase is None: 

1792 return None 

1793 return {"phase_id": str(new_phase.inserted_id)} 

1794 

1795 @classmethod 

1796 def update(cls, phase: Self): 

1797 updated_phase = cls.collection().find_one_and_update( 

1798 {"_id": ObjectId(phase.id)}, 

1799 { 

1800 "$set": { 

1801 "name": phase.name, 

1802 "description": phase.description, 

1803 "start_at": phase.start_at, 

1804 "end_at": phase.end_at, 

1805 } 

1806 }, 

1807 return_document=ReturnDocument.AFTER, 

1808 ) 

1809 return updated_phase 

1810 

1811 @classmethod 

1812 def delete(cls, phase_id): 

1813 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1814 return delete_phase 

1815 

1816 @classmethod 

1817 def deleteMany(cls, campaign_id): 

1818 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1819 return delete_phases 

1820 

1821 

1822class CustomViewCreation(GenericMongo): 

1823 collection_name: ClassVar[str] = "custom_views" 

1824 

1825 name: str 

1826 configuration: list 

1827 

1828 

1829class CustomView(CustomViewCreation): 

1830 # Properties 

1831 id: str | None = None 

1832 

1833 # Foreign Key 

1834 user_id: str 

1835 

1836 # # Methods 

1837 # @classmethod 

1838 # def create(cls, form_custom_view: Self, user_id) -> list: 

1839 # custom_view = CustomView( 

1840 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1841 # ) 

1842 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1843 # return new_custom_view 

1844 

1845 # @classmethod 

1846 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1847 # updated_custom_view = cls.collection().find_one_and_update( 

1848 # {"_id": ObjectId(custom_view_id)}, 

1849 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1850 # return_document=ReturnDocument.AFTER, 

1851 # ) 

1852 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1853 # del updated_custom_view["_id"] 

1854 # return cls(**updated_custom_view) 

1855 

1856 # @classmethod 

1857 # def delete(cls, custom_view_id) -> bool: 

1858 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1859 # return deleted_custom_view.acknowledged 

1860 

1861 

1862CustomViewUpdate = create_update_model(CustomView) 

1863 

1864 

1865class Video(GenericMongo): 

1866 collection_name: ClassVar[str] = "videos" 

1867 

1868 # Properties 

1869 name: str 

1870 ip_addr: str 

1871 username: str | None = None 

1872 password: str | None = None 

1873 

1874 # Methods 

1875 @classmethod 

1876 def get_all(cls, sort_by="_id") -> list[Self]: 

1877 items = [] 

1878 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1879 items.append(cls.mongo_dict_to_object(dict_)) 

1880 return items 

1881 

1882 @classmethod 

1883 def get_video(cls, camera_id: ObjectId): 

1884 camera = cls.get_from_id(camera_id) 

1885 if camera is not None: 

1886 return camera.name 

1887 return None 

1888 

1889 

1890class Command(GenericMongo): 

1891 collection_name: ClassVar[str] = "commands" 

1892 

1893 # Properties 

1894 timestamp: datetime.datetime = None 

1895 sent_at: float 

1896 response_time: float = 0.0 

1897 command_type: str 

1898 description: str = "" 

1899 succeeded: bool = False 

1900 

1901 # Foreign key 

1902 user_id: str 

1903 

1904 @classmethod 

1905 def collection(cls): 

1906 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1907 

1908 @classmethod 

1909 def create(cls, command: Self): 

1910 command = cls( 

1911 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1912 sent_at=command.sent_at, 

1913 response_time=command.response_time, 

1914 command_type=command.command_type, 

1915 description=command.description, 

1916 succeeded=command.succeeded, 

1917 user_id=command.user_id, 

1918 ) 

1919 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

1920 if new_command is None: 

1921 return None 

1922 return {"command_id": str(new_command.inserted_id)} 

1923 

1924 def receive_response(self, response: dict): 

1925 self.response_time = time.time() - self.sent_at 

1926 self.succeeded = response.get("error", True) is False 

1927 if self.description == "": 

1928 self.description += response.get("message", "").rstrip() 

1929 

1930 

1931class SignalsPresetCreation(GenericMongo): 

1932 name: str 

1933 signal_ids: list[str] 

1934 

1935 

1936class SignalsPreset(SignalsPresetCreation): 

1937 collection_name: ClassVar[str] = "signals_presets" 

1938 

1939 user_id: str 

1940 

1941 @classmethod 

1942 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

1943 signals_preset = cls( 

1944 user_id=user_id, 

1945 name=signals_preset.name, 

1946 signal_ids=signals_preset.signal_ids, 

1947 ) 

1948 

1949 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

1950 

1951 return str(new_signal_preset.inserted_id) 

1952 

1953 

1954SignalsPresetUpdate = create_update_model(SignalsPreset) 

1955 

1956 

1957class LineStyle(str, Enum): 

1958 solid = "solid" 

1959 dotted = "dotted" 

1960 dashed = "dashed" 

1961 

1962 

1963class SignalAppearance: 

1964 value_color: str 

1965 forced_value_color: str 

1966 

1967 

1968class GraphThemeCreation(GenericMongo): 

1969 collection_name: ClassVar[str] = "graph_themes" 

1970 

1971 name: str 

1972 signal_id: str 

1973 value_color: str = "" 

1974 forced_value_color: str = "" 

1975 value_line_style: LineStyle = LineStyle.solid 

1976 forced_value_line_style: LineStyle = LineStyle.solid 

1977 private: bool = True 

1978 

1979 

1980class PublicGraphTheme(GraphThemeCreation): 

1981 created_by_user: bool 

1982 in_user_library: bool 

1983 active_for_user: bool 

1984 

1985 _current_user_id: str = "" 

1986 

1987 @classproperty 

1988 def custom_pipeline_steps(cls) -> dict[str, list]: 

1989 return { 

1990 "created_by_user": [ 

1991 { 

1992 "$addFields": { 

1993 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

1994 } 

1995 } 

1996 ], 

1997 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

1998 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

1999 ], 

2000 "in_user_library": [ 

2001 { 

2002 "$addFields": { 

2003 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

2004 } 

2005 } 

2006 ], 

2007 "active_for_user": [ 

2008 { 

2009 "$addFields": { 

2010 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

2011 } 

2012 } 

2013 ], 

2014 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

2015 "active": [ 

2016 { 

2017 "$addFields": { 

2018 "active": "$$REMOVE", 

2019 } 

2020 } 

2021 ], 

2022 "creator_id": [ 

2023 { 

2024 "$addFields": { 

2025 "creator_id": "$$REMOVE", 

2026 } 

2027 } 

2028 ], 

2029 } 

2030 

2031 @classmethod 

2032 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2033 cls._current_user_id = user_id 

2034 return super().response_from_query(query) 

2035 

2036 @classmethod 

2037 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2038 query.in_user_library = "true" 

2039 return cls.response_from_query(query, user_id) 

2040 

2041 @classmethod 

2042 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2043 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2044 

2045 @classmethod 

2046 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2047 cls._current_user_id = user_id 

2048 return super().get_by_attribute(attribute_name, attribute_value) 

2049 

2050 @classmethod 

2051 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2052 cls._current_user_id = user_id 

2053 return super().get_one_by_attribute(attribute_name, attribute_value) 

2054 

2055 @classmethod 

2056 def get_all(cls, sort_by: str, user_id: str): 

2057 cls._current_user_id = user_id 

2058 return super().get_all(sort_by) 

2059 

2060 @classmethod 

2061 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2062 pipeline = [ 

2063 { 

2064 "$match": { 

2065 "active": {"$eq": user_id}, 

2066 "signal_id": {"$in": signal_ids}, 

2067 } 

2068 }, 

2069 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2070 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2071 { 

2072 "$project": { 

2073 "_id": 0, 

2074 "signal_id": 1, 

2075 "value_color": 1, 

2076 "forced_value_color": 1, 

2077 "value_line_style": 1, 

2078 "forced_value_line_style": 1, 

2079 } 

2080 }, 

2081 ] 

2082 

2083 result = {} 

2084 

2085 cursor = cls.collection().aggregate(pipeline) 

2086 for document in cursor: 

2087 signal_id = document["signal_id"] 

2088 del document["signal_id"] 

2089 result[signal_id] = document 

2090 

2091 return result 

2092 

2093 

2094GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2095 

2096 

2097class PrivateGraphTheme(GraphThemeCreation): 

2098 # private 

2099 creator_id: str 

2100 in_library: list[str] 

2101 active: list[str] 

2102 

2103 @classmethod 

2104 def create( 

2105 cls, 

2106 creator_id: str, 

2107 name: str, 

2108 signal_id: str, 

2109 value_color: str, 

2110 forced_value_color: str, 

2111 value_line_style: LineStyle, 

2112 forced_value_line_style: LineStyle, 

2113 private: bool, 

2114 ): 

2115 color_setting = cls( 

2116 creator_id=creator_id, 

2117 name=name, 

2118 signal_id=signal_id, 

2119 value_color=value_color, 

2120 forced_value_color=forced_value_color, 

2121 value_line_style=value_line_style, 

2122 forced_value_line_style=forced_value_line_style, 

2123 private=private, 

2124 in_library=[creator_id], 

2125 active=[creator_id], 

2126 ) 

2127 

2128 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2129 color_setting.id = str(new_color_setting.inserted_id) 

2130 return color_setting 

2131 

2132 def update(self, update_dict: dict, user_id: str): 

2133 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2134 if in_user_lib and user_id not in self.in_library: 

2135 self.in_library.append(user_id) 

2136 elif not in_user_lib and user_id in self.in_library: 

2137 self.in_library.remove(user_id) 

2138 update_dict["in_library"] = self.in_library 

2139 del update_dict["in_user_library"] 

2140 

2141 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2142 if active_for_user and user_id not in self.active: 

2143 self.active.append(user_id) 

2144 elif not active_for_user and user_id in self.active: 

2145 self.active.remove(user_id) 

2146 update_dict["active"] = self.active 

2147 del update_dict["active_for_user"] 

2148 

2149 if update_dict.get("created_by_user") is not None: 

2150 del update_dict["created_by_user"] 

2151 

2152 self.collection().find_one_and_update( 

2153 {"_id": ObjectId(self.id)}, 

2154 {"$set": update_dict}, 

2155 ) 

2156 

2157 return {}