Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%

1226 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-27 13:40 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pymongo.collation import Collation 

21from pydantic import BaseModel, computed_field, Field, create_model 

22import numpy as npy 

23import lttb 

24import h5py 

25 

26# from scipy import signal as signal_scipy 

27 

28from twinpad_backend.db import ( 

29 get_collection, 

30 get_async_collection, 

31 get_signal_collection, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 devices_states_database, 

36) 

37from twinpad_backend.responses import ListResponse 

38from twinpad_backend.messages import send_mode_change, send_signal_value 

39 

40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

41 

42 

43RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

44MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

45SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

46HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

47DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

48 

49DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

50NUMBER_SAMPLES_DATABASE_UPDATE = 120 

51 

52logger = logging.getLogger("uvicorn.error") 

53 

54 

55class classproperty: 

56 """ 

57 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

58 Found here: https://stackoverflow.com/a/76301341 

59 """ 

60 

61 def __init__(self, func): 

62 self.fget = func 

63 

64 def __get__(self, _, owner): 

65 return self.fget(owner) 

66 

67 

68def create_update_model(model): 

69 fields = {} 

70 

71 for field_name, field_annotation in model.model_fields.items(): 

72 if field_name != "id": 

73 fields[field_name] = (field_annotation.annotation | None, None) 

74 

75 query_name = model.__name__ + "Update" 

76 return create_model(query_name, **fields) 

77 

78 

79def get_utc_date_from_timestamp(timestamp: float): 

80 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

81 

82 

83def downsample_list(time_vector: list, values: list, max_number_samples: int): 

84 if len(time_vector) < max_number_samples: 

85 return time_vector, values 

86 

87 time_vector_copy = copy.deepcopy(time_vector) 

88 values_copy = copy.deepcopy(values) 

89 

90 none_group_bounds = [] 

91 none_group_index = -1 

92 index = -1 

93 # LTTB doesn't handle None values so remove them 

94 while values_copy.count(None) > 0: 

95 # Store bounds of None value groups so we can insert them back after the downsampling 

96 if (new_index := values_copy.index(None)) != index: 

97 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

98 none_group_index += 1 

99 elif len(none_group_bounds[none_group_index]) < 2: 

100 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

101 else: 

102 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

103 values_copy.pop(new_index) 

104 index = new_index 

105 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

106 

107 try: 

108 values_array = npy.array([time_vector_copy, values_copy]).T 

109 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

110 

111 new_time_vector = interpolated_values[:, 0].tolist() 

112 new_values = interpolated_values[:, 1].tolist() 

113 except ValueError: 

114 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

115 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

116 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

117 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

118 return new_time_vector, new_values_nan_to_none 

119 

120 # insert back None values at the correct timestamps 

121 for none_group in none_group_bounds: 

122 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

123 new_time_vector[start_index:start_index] = none_group 

124 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

125 

126 return new_time_vector, new_values 

127 

128 

129def is_of_type(value, wanted_type): 

130 if wanted_type is float: 

131 return isinstance(value, (int, float)) 

132 return isinstance(value, wanted_type) 

133 

134 

135# Models 

136class TwinPadModel(BaseModel): 

137 @classmethod 

138 def dict_to_object(cls, dict_): 

139 return cls.model_validate(dict_) 

140 

141 def to_dict(self, exclude=None): 

142 dict_ = self.model_dump(exclude=exclude) 

143 return dict_ 

144 

145 

146class GenericMongo(TwinPadModel): 

147 id: str | None = None 

148 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

149 

150 @classmethod 

151 def collection(cls): 

152 return get_collection(systems_database, cls.collection_name, create=True) 

153 

154 @classmethod 

155 def response_from_query(cls, query) -> ListResponse[Self]: 

156 request_filters = query.mongodb_filter() 

157 items = [] 

158 if ":" in query.sort_by: 

159 sort_field, sort_order = query.sort_by.split(":") 

160 sort_order = int(sort_order) 

161 else: 

162 sort_field = query.sort_by 

163 sort_order = 1 

164 collection = get_collection(systems_database, cls.collection_name, create=True) 

165 total = collection.count_documents(request_filters) 

166 

167 pipeline = [] 

168 added_properties = [] 

169 if "$and" in request_filters: 

170 for request_filter in request_filters["$and"]: 

171 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

172 if filtered_property in request_filter: 

173 pipeline.extend(pipeline_steps) 

174 added_properties.append(filtered_property) 

175 else: 

176 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

177 if filtered_property in request_filters: 

178 pipeline.extend(pipeline_steps) 

179 added_properties.append(filtered_property) 

180 pipeline.append({"$match": request_filters}) 

181 if sort_field in cls.custom_pipeline_steps: 

182 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

183 added_properties.append(sort_field) 

184 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}]) 

185 

186 if (query.limit is not None) and (query.limit != 0): 

187 pipeline.append({"$limit": query.limit}) 

188 

189 for filtered_property, step in cls.custom_pipeline_steps.items(): 

190 if filtered_property not in added_properties: 

191 pipeline.extend(step) 

192 

193 cursor = collection.aggregate(pipeline) 

194 

195 for item_dict in cursor: 

196 items.append(cls.mongo_dict_to_object(item_dict)) 

197 

198 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

199 

200 @classmethod 

201 def get_from_id(cls, item_id) -> Self | None: 

202 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

203 

204 @classmethod 

205 def mongo_dict_to_object(cls, mongo_dict): 

206 mongo_dict["id"] = str(mongo_dict["_id"]) 

207 del mongo_dict["_id"] 

208 return cls.dict_to_object(mongo_dict) 

209 

210 @classmethod 

211 def get_by_attribute(cls, attribute_name: str, attribute_value): 

212 """Returns all items that match the attribute with value.""" 

213 pipeline = [] 

214 if attribute_name in cls.custom_pipeline_steps: 

215 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

216 pipeline.append({"$match": {attribute_name: attribute_value}}) 

217 for key, step in cls.custom_pipeline_steps.items(): 

218 if key != attribute_name: 

219 pipeline.extend(step) 

220 items = cls.collection().aggregate(pipeline) 

221 if items is None: 

222 return None 

223 return [cls.mongo_dict_to_object(d) for d in items] 

224 

225 @classmethod 

226 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

227 pipeline = [] 

228 if attribute_name in cls.custom_pipeline_steps: 

229 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

230 pipeline.append({"$match": {attribute_name: attribute_value}}) 

231 pipeline.append({"$limit": 1}) 

232 for key, step in cls.custom_pipeline_steps.items(): 

233 if key != attribute_name: 

234 pipeline.extend(step) 

235 items = cls.collection().aggregate(pipeline).to_list() 

236 if len(items) == 0: 

237 return None 

238 return cls.mongo_dict_to_object(items[0]) 

239 

240 @classmethod 

241 def get_all(cls, sort_by="_id") -> list[Self]: 

242 items = [] 

243 pipeline = [] 

244 if sort_by in cls.custom_pipeline_steps: 

245 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

246 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

247 for key, step in cls.custom_pipeline_steps.items(): 

248 if key != sort_by: 

249 pipeline.extend(step) 

250 for dict_ in cls.collection().aggregate(pipeline): 

251 items.append(cls.mongo_dict_to_object(dict_)) 

252 return items 

253 

254 @classmethod 

255 def get_number_documents(cls): 

256 collection = get_collection(systems_database, cls.collection_name) 

257 if collection is None: 

258 return 0 

259 return collection.count_documents({}) 

260 

261 def insert(self): 

262 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

263 self.id = str(insert_result.inserted_id) 

264 return self.id 

265 

266 def update(self, update_dict): 

267 for key, value in update_dict.items(): 

268 setattr(self, key, value) 

269 self.collection().find_one_and_update( 

270 {"_id": ObjectId(self.id)}, 

271 {"$set": update_dict}, 

272 return_document=ReturnDocument.AFTER, 

273 ) 

274 

275 return self 

276 

277 def delete(self): 

278 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

279 return result.deleted_count > 0 

280 

281 

282class User(GenericMongo): 

283 collection_name: ClassVar[str] = "users" 

284 

285 firstname: str 

286 lastname: str 

287 email: str 

288 password: str 

289 is_active: bool | None = False 

290 is_admin: bool | None = False 

291 is_connected: bool | None = False 

292 company_id: str | None = None 

293 

294 def to_dict(self, exclude=None): 

295 if exclude is None: 

296 exclude = {"password"} 

297 return GenericMongo.to_dict(self, exclude=exclude) 

298 

299 @classmethod 

300 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

301 users = cls.get_all() 

302 if not users: 

303 is_admin = True 

304 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

305 user_collection = get_collection(systems_database, "users", create=True) 

306 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

307 if new_user is None: 

308 return None 

309 return {"user_id": str(new_user.inserted_id)} 

310 

311 @classmethod 

312 def update_info(cls, user: "UserUpdate", user_id: str): 

313 updated_user = cls.collection().find_one_and_update( 

314 {"_id": ObjectId(user_id)}, 

315 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

316 return_document=ReturnDocument.AFTER, 

317 ) 

318 updated_user["id"] = str(updated_user["_id"]) 

319 del (updated_user["_id"], updated_user["is_connected"]) 

320 return cls(**updated_user) 

321 

322 

323UserUpdate = create_update_model(User) 

324 

325 

326class Mode(TwinPadModel): 

327 mode_id: int 

328 name: str 

329 frequency_multiplier: float 

330 min_frequency: float 

331 

332 

333class DeviceUpdate(TwinPadModel): 

334 mode_id: int 

335 

336 

337class Device(GenericMongo): 

338 collection_name: ClassVar[str] = "devices" 

339 

340 device_id: str 

341 config_id: str | None = None 

342 config_name: str | None = None 

343 name: str 

344 description: str = "" 

345 modes: list[Mode] 

346 current_mode_id: int | None = None 

347 last_ping: float | None = None 

348 petri_network: Any 

349 pid: Any 

350 load: float | None = None 

351 tokens: list[int] = Field(default_factory=list) 

352 status: str 

353 

354 async def change_mode(self, update_dict, current_user: User): 

355 has_error = False 

356 

357 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

358 has_error = True 

359 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

360 elif self.current_mode_id is not None: 

361 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

362 else: 

363 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

364 command = Command( 

365 sent_at=time.time(), 

366 command_type="Mode change", 

367 description=description, 

368 user_id=current_user.id, 

369 ) 

370 

371 if has_error: 

372 command.response_time = 0 

373 command.succeeded = False 

374 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

375 else: 

376 response = await send_mode_change(self.device_id, update_dict.mode_id) 

377 command.receive_response(response) 

378 

379 Command.create(command) 

380 return response 

381 

382 @classmethod 

383 def get_from_device_or_config_id(cls, device_or_config_id: str): 

384 items = ( 

385 cls.collection() 

386 .aggregate( 

387 [ 

388 {"$match": {"$or": [{"device_id": device_or_config_id}, {"config_id": device_or_config_id}]}}, 

389 {"$limit": 1}, 

390 ] 

391 ) 

392 .to_list() 

393 ) 

394 if len(items) == 0: 

395 return None 

396 return cls.mongo_dict_to_object(items[0]) 

397 

398 @classmethod 

399 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

400 devices_by_id = {} 

401 for signal_id in signal_ids: 

402 device_or_config_id = signal_id.split(".")[0] 

403 if device_or_config_id not in devices_by_id: 

404 devices_by_id[device_or_config_id] = cls.get_from_device_or_config_id(device_or_config_id) 

405 return devices_by_id 

406 

407 

408class DeviceSetup(GenericMongo): 

409 collection_name: ClassVar[str] = "device_setups" 

410 

411 device_ids: list[str] 

412 active: bool = False 

413 variable_mapping: dict[str, str] 

414 

415 

416DeviceSetupUpdate = create_update_model(DeviceSetup) 

417 

418 

419class DeviceState(GenericMongo): 

420 collection_name: ClassVar[str] = "devices_states" 

421 

422 timestamp: float 

423 mode: str | None = None 

424 load: float | None = None 

425 tokens: list[int] = Field(default_factory=list) 

426 config_id: str | None = None 

427 modified_properties: list[str] = Field(default_factory=list) 

428 

429 @classmethod 

430 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

431 req_filter = query.mongodb_filter() 

432 items = [] 

433 if ":" in query.sort_by: 

434 sort_field, sort_order = query.sort_by.split(":") 

435 sort_order = int(sort_order) 

436 else: 

437 sort_field = query.sort_by 

438 sort_order = 1 

439 collection = get_collection(devices_states_database, device_id) 

440 if collection is None: 

441 total = 0 

442 cursor = [] 

443 else: 

444 total = collection.count_documents(req_filter) 

445 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

446 if (query.limit is not None) and (query.limit != 0): 

447 cursor = cursor.limit(query.limit) 

448 for item_dict in cursor: 

449 items.append( 

450 cls( 

451 timestamp=item_dict.get("precise_timestamp"), 

452 mode=item_dict.get("mode"), 

453 load=item_dict.get("load"), 

454 tokens=item_dict.get("tokens", Field(default_factory=list)), 

455 config_id=item_dict.get("config_id"), 

456 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

457 ) 

458 ) 

459 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

460 

461 

462class SignalSample(TwinPadModel): 

463 signal_id: str 

464 timestamp: float 

465 value: float | int | str | bool | None 

466 forced_value: float | int | str | bool | None = None 

467 

468 @classmethod 

469 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

470 collection = get_signal_collection(signal_id) 

471 real_signal_id = signal_id 

472 

473 if collection is None: 

474 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

475 if device is not None: 

476 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

477 collection = get_signal_collection(real_signal_id) 

478 

479 if collection is None: 

480 return None 

481 

482 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

483 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

484 bucket = get_signal_collection(f"system.buckets.{real_signal_id}") 

485 first_bucket = None 

486 if bucket is not None: 

487 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

488 if first_bucket is not None: 

489 sample_data = collection.find_one( 

490 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

491 ) 

492 else: 

493 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

494 

495 if sample_data is None: 

496 return None 

497 

498 timestamp = sample_data["precise_timestamp"] 

499 

500 return cls( 

501 signal_id=real_signal_id, 

502 timestamp=timestamp, 

503 value=sample_data.get("value", None), 

504 forced_value=sample_data.get("forced_value", None), 

505 ) 

506 

507 @classmethod 

508 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

509 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

510 

511 @classmethod 

512 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

513 collection = get_signal_collection(signal_id) 

514 real_signal_id = signal_id 

515 

516 if collection is None: 

517 if device is None: 

518 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

519 if device is not None: 

520 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

521 collection = get_signal_collection(real_signal_id) 

522 

523 if collection is None: 

524 return None 

525 

526 # Same workaround as above function, very effective to narrow down big sets of data 

527 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

528 last_bucket = None 

529 if bucket is not None: 

530 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

531 if last_bucket is not None: 

532 sample_data = collection.find_one( 

533 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

534 sort={"precise_timestamp": -1}, 

535 ) 

536 else: 

537 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

538 

539 if sample_data is None: 

540 return None 

541 

542 timestamp = sample_data["precise_timestamp"] 

543 

544 if device is None: 

545 device = Device.get_from_device_or_config_id(real_signal_id.split(".")[0]) 

546 if device is not None and device.last_ping is not None: 

547 if timestamp is None: 

548 timestamp = device.last_ping 

549 else: 

550 timestamp = max(timestamp, device.last_ping) 

551 return cls( 

552 signal_id=real_signal_id, 

553 timestamp=timestamp, 

554 value=sample_data.get("value", None), 

555 forced_value=sample_data.get("forced_value", None), 

556 ) 

557 

558 @classmethod 

559 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

560 collection = get_signal_collection(signal_id) 

561 real_signal_id = signal_id 

562 

563 if collection is None: 

564 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

565 if device is not None: 

566 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

567 collection = get_signal_collection(real_signal_id) 

568 

569 if collection is None: 

570 return None 

571 

572 sample_data = collection.find_one( 

573 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1} 

574 ) 

575 if sample_data is None: 

576 return None 

577 

578 return cls( 

579 signal_id=real_signal_id, 

580 timestamp=sample_data.get("precise_timestamp"), 

581 value=sample_data.get("value"), 

582 forced_value=sample_data.get("forced_value"), 

583 ) 

584 

585 @classmethod 

586 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

587 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

588 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

589 

590 @classmethod 

591 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

592 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

593 

594 

595class SignalData(TwinPadModel): 

596 signal_id: str 

597 forcible: bool = True 

598 time_vector: list[float] 

599 values: list[float | int | str | None] 

600 forced_values: list[float | int | str | None] 

601 

602 data_start: float | None = None 

603 data_end: float | None = None 

604 

605 number_samples: int = 0 

606 number_samples_db: int = 0 

607 

608 db_query_time: float = 0.0 

609 init_time: float = 0.0 

610 data_processing_time: float = 0.0 

611 

612 @classmethod 

613 def get_from_signal_id( 

614 cls, 

615 signal_id: str, 

616 min_timestamp: float = None, 

617 max_timestamp: float = None, 

618 window_min_timestamp: float = None, 

619 window_max_timestamp: float = None, 

620 interpolate_bounds: bool = True, 

621 max_documents: int = None, 

622 ) -> Self: 

623 

624 now = time.time() 

625 

626 req_signal = {} 

627 if min_timestamp is not None: 

628 req_signal.setdefault("timestamp", {}) 

629 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

630 if max_timestamp is not None: 

631 req_signal.setdefault("timestamp", {}) 

632 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

633 

634 collection = get_signal_collection(signal_id) 

635 

636 real_signal_id = signal_id 

637 

638 if collection is None: 

639 device = Device.get_from_device_or_config_id(signal_id.split(".")[0]) 

640 if device is not None: 

641 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}" 

642 collection = get_signal_collection(real_signal_id) 

643 

644 if collection is None: 

645 return cls( 

646 signal_id=real_signal_id, 

647 time_vector=[], 

648 values=[], 

649 forced_values=[], 

650 number_samples=0, 

651 number_samples_db=0, 

652 ) 

653 

654 db_req_start = time.time() 

655 

656 sort_step = {"$sort": {"precise_timestamp": 1}} 

657 number_results = collection.count_documents(req_signal) 

658 

659 pipeline = [] 

660 if req_signal: 

661 pipeline.append({"$match": req_signal}) # Filter data if needed 

662 

663 pipeline.extend( 

664 [ 

665 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

666 sort_step, 

667 ] 

668 ) 

669 

670 if max_documents is not None and max_documents < number_results: 

671 unsampling_ratio = math.ceil(number_results / max_documents) 

672 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

673 pipeline.extend( 

674 [ 

675 { 

676 "$setWindowFields": { 

677 "sortBy": {"precise_timestamp": 1}, 

678 "output": {"index": {"$documentNumber": {}}}, 

679 } 

680 }, 

681 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

682 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

683 {"$replaceRoot": {"newRoot": "$doc"}}, 

684 {"$unset": ["index", "group_id"]}, 

685 {"$sort": {"precise_timestamp": 1}}, 

686 ] 

687 ) 

688 

689 # logger.info(f"pipeline: %s", str(pipeline)) 

690 cursor = collection.aggregate(pipeline) 

691 db_req_time = time.time() - db_req_start 

692 

693 init_time = time.time() 

694 

695 results = cursor.to_list() 

696 time_vector = [] 

697 values = [] 

698 forced_values = [] 

699 for s in results: 

700 time_vector.append(s["precise_timestamp"]) 

701 values.append(s.get("value", None)) 

702 forced_values.append(s.get("forced_value", None)) 

703 

704 signal = Signal.get_from_signal_id(real_signal_id) 

705 class_ = signal.signal_data_class 

706 

707 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

708 time_vector, values, forced_values = cls.interpolate_bounds( 

709 class_, 

710 collection, 

711 real_signal_id, 

712 time_vector, 

713 values, 

714 forced_values, 

715 window_min_timestamp, 

716 window_max_timestamp, 

717 ) 

718 

719 if values: 

720 # TODO: check below. a bit strange 

721 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

722 # Adding last value as it should be repeated 

723 time_vector.append(now) 

724 values.append(values[-1]) 

725 forced_values.append(forced_values[-1]) 

726 

727 init_time = time.time() - init_time 

728 

729 # See line 292 for explanation 

730 bucket = get_signal_collection(f"system.buckets.{real_signal_id}") 

731 first_bucket = None 

732 if bucket is not None: 

733 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

734 if first_bucket is not None: 

735 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

736 else: 

737 data_start = None 

738 

739 last_bucket = None 

740 if bucket is not None: 

741 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

742 if last_bucket is not None: 

743 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

744 else: 

745 data_end = None 

746 

747 return class_( 

748 signal_id=real_signal_id, 

749 forcible=signal.forcible, 

750 time_vector=time_vector, 

751 values=values, 

752 forced_values=forced_values, 

753 data_start=data_start, 

754 data_end=data_end, 

755 number_samples=len(values), 

756 number_samples_db=number_results, 

757 db_query_time=db_req_time, 

758 init_time=init_time, 

759 ) 

760 

761 @staticmethod 

762 def interpolate_bounds( 

763 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

764 ): 

765 sample_right = None 

766 # Fetching right side value & interpolation 

767 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

768 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

769 sample_right = collection.find_one( 

770 { 

771 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

772 "value": {"$exists": True}, 

773 }, 

774 sort={"precise_timestamp": -1}, 

775 ) 

776 if sample_right: 

777 if time_vector: 

778 right_sd = class_( 

779 signal_id=signal_id, 

780 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

781 values=[values[-1], sample_right.get("value", None)], 

782 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

783 ) 

784 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

785 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

786 else: 

787 max_ts_value = sample_right.get("value", None) 

788 max_ts_forced_value = sample_right.get("forced_value", None) 

789 time_vector.append(window_max_timestamp) 

790 values.append(max_ts_value) 

791 forced_values.append(max_ts_forced_value) 

792 

793 # Fetching left side value & interpolation 

794 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

795 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

796 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

797 sample_left = sample_right 

798 sample_left = collection.find_one( 

799 { 

800 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

801 "value": {"$exists": True}, 

802 }, 

803 sort={"precise_timestamp": -1}, 

804 ) 

805 

806 if sample_left: 

807 if time_vector: 

808 left_sd = class_( 

809 signal_id=signal_id, 

810 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

811 values=[sample_left["value"], values[0]], 

812 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

813 ) 

814 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

815 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

816 else: 

817 min_ts_value = sample_left.get("value", None) 

818 min_ts_forced_value = sample_left.get("forced_value", None) 

819 time_vector.insert(0, window_min_timestamp) 

820 values.insert(0, min_ts_value) 

821 forced_values.insert(0, min_ts_forced_value) 

822 

823 return time_vector, values, forced_values 

824 

825 def interpolate_values(self, new_time_vector: list[float]): 

826 return self.interpolate(new_time_vector, self.values) 

827 

828 def interpolate_forced_values(self, new_time_vector: list[float]): 

829 return self.interpolate(new_time_vector, self.forced_values) 

830 

831 def uniform_desampling(self, number_samples_max: int) -> Self: 

832 data_processing_time = time.time() 

833 if number_samples_max and self.number_samples > number_samples_max: 

834 new_time_vector = npy.linspace( 

835 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

836 ).tolist() 

837 values = self.interpolate_values(new_time_vector) 

838 forced_values = self.interpolate_forced_values(new_time_vector) 

839 time_vector = new_time_vector 

840 number_samples = len(time_vector) 

841 else: 

842 time_vector = self.time_vector 

843 number_samples = len(self.values) 

844 values = self.values[:] 

845 forced_values = self.forced_values[:] 

846 data_processing_time = time.time() - data_processing_time 

847 

848 return self.__class__( 

849 signal_id=self.signal_id, 

850 time_vector=time_vector, 

851 values=values, 

852 forced_values=forced_values, 

853 number_samples=number_samples, 

854 number_samples_db=self.number_samples, 

855 data_start=self.data_start, 

856 data_end=self.data_end, 

857 db_query_time=self.db_query_time, 

858 init_time=self.init_time, 

859 data_processing_time=self.data_processing_time + data_processing_time, 

860 ) 

861 

862 def interest_window_desampling( 

863 self, 

864 window_max_number_samples: int, 

865 outside_max_number_samples: int, 

866 window_min_timestamp: float | None = None, 

867 window_max_timestamp: float | None = None, 

868 ) -> Self: 

869 """Performs a sampling in a window of interest and outside.""" 

870 

871 if not self.time_vector: 

872 return self 

873 

874 if window_min_timestamp is None: 

875 window_min_timestamp = self.time_vector[0] 

876 if window_max_timestamp is None: 

877 window_max_timestamp = self.time_vector[-1] 

878 

879 data_processing_time = time.time() 

880 

881 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

882 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

883 

884 time_vector_before = self.time_vector[:index_window_start] 

885 time_vector_window = self.time_vector[index_window_start:index_window_end] 

886 time_vector_after = self.time_vector[index_window_end:] 

887 

888 # Resampling window 

889 if time_vector_window: 

890 # Ensurring window bounds 

891 if time_vector_window[0] != window_min_timestamp: 

892 time_vector_window.insert(0, window_min_timestamp) 

893 if time_vector_window[-1] != window_max_timestamp: 

894 time_vector_window.append(window_max_timestamp) 

895 else: 

896 time_vector_window = [window_min_timestamp, window_max_timestamp] 

897 

898 if len(time_vector_window) > window_max_number_samples: 

899 # Resampling 

900 new_window_time_vector = npy.linspace( 

901 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

902 ).tolist() 

903 time_vector_window = new_window_time_vector 

904 

905 # Resampling outside 

906 number_samples_before = len(time_vector_before) 

907 number_samples_after = len(time_vector_after) 

908 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

909 new_number_samples_before = min( 

910 number_samples_before, 

911 math.ceil( 

912 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

913 ), 

914 ) 

915 new_number_samples_after = min( 

916 number_samples_after, 

917 math.ceil( 

918 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

919 ), 

920 ) 

921 # Adjusting numbers as math.ceil can do +1 on sum 

922 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

923 if new_number_samples_before > new_number_samples_after: 

924 new_number_samples_before -= 1 

925 else: 

926 new_number_samples_after -= 1 

927 

928 if new_number_samples_before > 0: 

929 new_time_vector_before = npy.linspace( 

930 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

931 ).tolist() 

932 time_vector_before = new_time_vector_before 

933 

934 if new_number_samples_after > 0: 

935 new_time_vector_after = npy.linspace( 

936 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

937 ).tolist()[::-1] 

938 time_vector_after = new_time_vector_after 

939 

940 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

941 values = self.interpolate_values(new_time_vector) 

942 forced_values = self.interpolate_forced_values(new_time_vector) 

943 number_samples = len(values) 

944 

945 data_processing_time = time.time() - data_processing_time 

946 

947 return self.__class__( 

948 signal_id=self.signal_id, 

949 forcible=self.forcible, 

950 time_vector=new_time_vector, 

951 values=values, 

952 forced_values=forced_values, 

953 number_samples=number_samples, 

954 number_samples_db=self.number_samples, 

955 data_start=self.data_start, 

956 data_end=self.data_end, 

957 db_query_time=self.db_query_time, 

958 init_time=self.init_time, 

959 data_processing_time=self.data_processing_time + data_processing_time, 

960 ) 

961 

962 def csv_export(self): 

963 output = io.StringIO() 

964 writer = csv.writer(output) 

965 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

966 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

967 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

968 return output.getvalue().encode("utf-8") 

969 

970 def prestoplot_export(self): 

971 clean_signal_id = self.signal_id.replace(".", "_") 

972 if clean_signal_id[0].isnumeric(): 

973 clean_signal_id = "_" + clean_signal_id 

974 

975 output = io.StringIO() 

976 output.write("# Encoding:\tUTF-8\n") 

977 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

978 output.write("ISO8601\tnone\tnone\n") 

979 output.write(f"# Description :\t{clean_signal_id}\n") 

980 

981 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

982 output.write( 

983 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

984 ) 

985 return output.getvalue().encode("utf-8") 

986 

987 

988class NumericSignalData(SignalData): 

989 data_type: str = "float" 

990 values: list[float | int | None] 

991 forced_values: list[float | int | None] 

992 

993 def interpolate(self, new_time_vector: list[float], items): 

994 items = [npy.nan if s is None else s for s in items] 

995 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

996 

997 def uniform_desampling(self, number_samples_max: int) -> Self: 

998 data_processing_time = time.time() 

999 if number_samples_max and self.number_samples > number_samples_max: 

1000 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

1001 forced_values = self.interpolate_forced_values(time_vector) 

1002 number_samples = len(time_vector) 

1003 else: 

1004 time_vector = self.time_vector 

1005 number_samples = len(self.values) 

1006 values = self.values[:] 

1007 forced_values = self.forced_values[:] 

1008 data_processing_time = time.time() - data_processing_time 

1009 

1010 return self.__class__( 

1011 signal_id=self.signal_id, 

1012 time_vector=time_vector, 

1013 values=values, 

1014 forced_values=forced_values, 

1015 number_samples=number_samples, 

1016 number_samples_db=self.number_samples, 

1017 data_start=self.data_start, 

1018 data_end=self.data_end, 

1019 db_query_time=self.db_query_time, 

1020 init_time=self.init_time, 

1021 data_processing_time=self.data_processing_time + data_processing_time, 

1022 ) 

1023 

1024 def interest_window_desampling( 

1025 self, 

1026 window_max_number_samples: int, 

1027 outside_max_number_samples: int, 

1028 window_min_timestamp: float | None = None, 

1029 window_max_timestamp: float | None = None, 

1030 ) -> Self: 

1031 """Performs a sampling in a window of interest and outside.""" 

1032 

1033 if not self.time_vector: 

1034 return self 

1035 

1036 if window_min_timestamp is None: 

1037 window_min_timestamp = self.time_vector[0] 

1038 if window_max_timestamp is None: 

1039 window_max_timestamp = self.time_vector[-1] 

1040 

1041 data_processing_time = time.time() 

1042 

1043 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

1044 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

1045 

1046 time_vector_before = self.time_vector[:index_window_start] 

1047 time_vector_window = self.time_vector[index_window_start:index_window_end] 

1048 time_vector_after = self.time_vector[index_window_end:] 

1049 

1050 values_before = self.values[:index_window_start] 

1051 values_window = self.values[index_window_start:index_window_end] 

1052 values_after = self.values[index_window_end:] 

1053 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1054 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1055 

1056 # Resampling window 

1057 if time_vector_window: 

1058 # Ensurring window bounds 

1059 if time_vector_window[0] != window_min_timestamp: 

1060 time_vector_window.insert(0, window_min_timestamp) 

1061 values_window.insert(0, window_min_value) 

1062 if time_vector_window[-1] != window_max_timestamp: 

1063 time_vector_window.append(window_max_timestamp) 

1064 values_window.append(window_max_value) 

1065 else: 

1066 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1067 values_window = [window_min_value, window_max_value] 

1068 

1069 if len(time_vector_window) > window_max_number_samples: 

1070 # Resampling 

1071 time_vector_window, values_window = downsample_list( 

1072 time_vector_window, values_window, window_max_number_samples 

1073 ) 

1074 

1075 # Resampling outside 

1076 number_samples_before = len(time_vector_before) 

1077 number_samples_after = len(time_vector_after) 

1078 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1079 new_number_samples_before = min( 

1080 number_samples_before, 

1081 math.ceil( 

1082 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1083 ), 

1084 ) 

1085 new_number_samples_after = min( 

1086 number_samples_after, 

1087 math.ceil( 

1088 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1089 ), 

1090 ) 

1091 # Adjusting numbers as math.ceil can do +1 on sum 

1092 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1093 if new_number_samples_before > new_number_samples_after: 

1094 new_number_samples_before -= 1 

1095 else: 

1096 new_number_samples_after -= 1 

1097 

1098 if new_number_samples_before > 0: 

1099 time_vector_before, values_before = downsample_list( 

1100 time_vector_before, values_before, new_number_samples_before 

1101 ) 

1102 

1103 if new_number_samples_after > 0: 

1104 time_vector_after, values_after = downsample_list( 

1105 time_vector_after, values_after, new_number_samples_after 

1106 ) 

1107 

1108 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1109 values = values_before + values_window + values_after 

1110 forced_values = self.interpolate_forced_values(new_time_vector) 

1111 number_samples = len(values) 

1112 

1113 data_processing_time = time.time() - data_processing_time 

1114 

1115 return self.__class__( 

1116 signal_id=self.signal_id, 

1117 time_vector=new_time_vector, 

1118 values=values, 

1119 forced_values=forced_values, 

1120 number_samples=number_samples, 

1121 number_samples_db=self.number_samples, 

1122 data_start=self.data_start, 

1123 data_end=self.data_end, 

1124 db_query_time=self.db_query_time, 

1125 init_time=self.init_time, 

1126 data_processing_time=self.data_processing_time + data_processing_time, 

1127 ) 

1128 

1129 

1130class StringSignalData(SignalData): 

1131 data_type: str = "str" 

1132 values: list[str | None] 

1133 forced_values: list[str | None] 

1134 

1135 def interpolate(self, new_time_vector: list[float], items): 

1136 # Find the indices of the values in xp that are just smaller or equal to x 

1137 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1138 indices = npy.clip(indices, 0, len(items) - 1) 

1139 # Return the corresponding left string values from fp 

1140 return [items[i] for i in indices] 

1141 

1142 

1143class SignalsData(TwinPadModel): 

1144 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1145 data_processing_time: float 

1146 data_start: float | None 

1147 data_end: float | None 

1148 

1149 @classmethod 

1150 def get_from_signal_ids( 

1151 cls, 

1152 signal_ids: list[str], 

1153 min_timestamp: float = None, 

1154 max_timestamp: float = None, 

1155 window_min_timestamp: float = None, 

1156 window_max_timestamp: float = None, 

1157 interpolate_bounds: bool = True, 

1158 max_documents: int = None, 

1159 ) -> Self: 

1160 signals_data = [] 

1161 data_start = None 

1162 data_end = None 

1163 if max_timestamp is None: 

1164 max_timestamp = time.time() 

1165 data_processing_time = 0.0 

1166 for signal_id in signal_ids: 

1167 signal_data = SignalData.get_from_signal_id( 

1168 signal_id=signal_id, 

1169 min_timestamp=min_timestamp, 

1170 max_timestamp=max_timestamp, 

1171 window_min_timestamp=window_min_timestamp, 

1172 window_max_timestamp=window_max_timestamp, 

1173 interpolate_bounds=interpolate_bounds, 

1174 max_documents=max_documents, 

1175 ) 

1176 data_processing_time += signal_data.data_processing_time 

1177 signals_data.append(signal_data) 

1178 if signal_data.data_start is not None: 

1179 if data_start is None: 

1180 data_start = signal_data.data_start 

1181 else: 

1182 data_start = min(signal_data.data_start, data_start) 

1183 if signal_data.data_end is not None: 

1184 if data_end is None: 

1185 data_end = signal_data.data_end 

1186 else: 

1187 data_end = max(signal_data.data_end, data_end) 

1188 

1189 return cls( 

1190 signals_data=signals_data, 

1191 data_processing_time=data_processing_time, 

1192 data_start=data_start, 

1193 data_end=data_end, 

1194 ) 

1195 

1196 def uniform_desampling(self, number_samples_max: int) -> Self: 

1197 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1198 return SignalsData( 

1199 signals_data=signals_data, 

1200 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1201 data_start=self.data_start, 

1202 data_end=self.data_end, 

1203 ) 

1204 

1205 def interest_window_desampling( 

1206 self, 

1207 window_max_number_samples: int, 

1208 outside_max_number_samples: int, 

1209 window_min_timestamp: float = None, 

1210 window_max_timestamp: float = None, 

1211 ) -> Self: 

1212 signals_data = [ 

1213 s.interest_window_desampling( 

1214 window_max_number_samples=window_max_number_samples, 

1215 outside_max_number_samples=outside_max_number_samples, 

1216 window_min_timestamp=window_min_timestamp, 

1217 window_max_timestamp=window_max_timestamp, 

1218 ) 

1219 for s in self.signals_data 

1220 ] 

1221 

1222 return SignalsData( 

1223 signals_data=signals_data, 

1224 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1225 data_start=self.data_start, 

1226 data_end=self.data_end, 

1227 ) 

1228 

1229 def zip_export(self, file_format: str = "csv"): 

1230 # return self.signals_data[0].csv_export() 

1231 zip_buffer = io.BytesIO() 

1232 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1233 for signal_data in self.signals_data: 

1234 if file_format == "csv": 

1235 export_io = signal_data.csv_export() 

1236 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1237 elif file_format == "prestoplot": 

1238 export_io = signal_data.prestoplot_export() 

1239 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1240 else: 

1241 raise ValueError(f"Format not found. Got: {file_format}") 

1242 zip_bytes = zip_buffer.getvalue() 

1243 # zip_bytes.seek(0) 

1244 return zip_bytes 

1245 

1246 def hdf5_export(self): 

1247 hdf5_buffer = io.BytesIO() 

1248 custom_type_float = npy.dtype( 

1249 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1250 ) 

1251 custom_type_string = npy.dtype( 

1252 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1253 ) 

1254 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1255 for signal_data in self.signals_data: 

1256 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1257 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1258 if signal_data.data_type == "str": 

1259 export_data = npy.array( 

1260 list( 

1261 zip( 

1262 date_vector, 

1263 signal_data.time_vector, 

1264 signal_data.values, 

1265 signal_data.forced_values, 

1266 ) 

1267 ), 

1268 dtype=custom_type_string, 

1269 ) 

1270 else: 

1271 export_data = npy.array( 

1272 list( 

1273 zip( 

1274 date_vector, 

1275 signal_data.time_vector, 

1276 signal_data.values, 

1277 signal_data.forced_values, 

1278 ) 

1279 ), 

1280 dtype=custom_type_float, 

1281 ) 

1282 signal_group["data"] = export_data 

1283 return hdf5_buffer.getvalue() 

1284 

1285 

1286class SignalStatus(TwinPadModel): 

1287 status: str 

1288 reason: str 

1289 delay: float | None 

1290 

1291 

1292class DigitizationFunction(TwinPadModel): 

1293 bits: int | None = None 

1294 min_value: float 

1295 max_value: float 

1296 min_raw_value: float 

1297 max_raw_value: float 

1298 

1299 

1300class SignalUpdate(TwinPadModel): 

1301 value: float | str | bool | int | None = None 

1302 forced_value: float | str | bool | int | None = None 

1303 timestamp: int | None = None 

1304 

1305 

1306class SignalType(str, Enum): 

1307 command = "command" 

1308 sensor = "sensor" 

1309 external_sensor = "external_sensor" 

1310 

1311 

1312SIGNALDATA_TYPES = { 

1313 "int": NumericSignalData, 

1314 "float": NumericSignalData, 

1315 "str": StringSignalData, 

1316 "bool": NumericSignalData, 

1317 "epoch": NumericSignalData, 

1318} 

1319 

1320 

1321class Signal(GenericMongo): 

1322 collection_name: ClassVar[str] = "signals" 

1323 

1324 signal_id: str 

1325 frequency: float 

1326 unit: str | None 

1327 description: str 

1328 type: SignalType 

1329 data_type: str 

1330 precision_digits: int | None 

1331 forcible: bool 

1332 

1333 digitization_function: DigitizationFunction | None 

1334 

1335 @property 

1336 def device(self) -> Device: 

1337 device_or_config_id = self.signal_id.split(".")[0] 

1338 return Device.get_from_device_or_config_id(device_or_config_id) 

1339 

1340 @cached_property 

1341 def signal_data_class(self): 

1342 if self.data_type in SIGNALDATA_TYPES: 

1343 return SIGNALDATA_TYPES[self.data_type] 

1344 if self.data_type.startswith("enum"): 

1345 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1346 raise ValueError(f"Unhandled python type: {self.data_type}") 

1347 

1348 @cached_property 

1349 def python_type(self): 

1350 if self.data_type in TYPES: 

1351 return TYPES[self.data_type] 

1352 if self.data_type.startswith("enum"): 

1353 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1354 return Literal[*choices] 

1355 raise ValueError(f"Unhandled python type: {self.data_type}") 

1356 

1357 @computed_field 

1358 @property 

1359 def status(self) -> SignalStatus: 

1360 now = time.time() 

1361 status = "up" 

1362 reason = "" 

1363 

1364 # See line 285 for explanation 

1365 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1366 last_bucket = None 

1367 if bucket is not None: 

1368 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1369 if last_bucket is None: 

1370 status = "no data" 

1371 reason = "signal does not exist" 

1372 return SignalStatus(status=status, reason=reason, delay=None) 

1373 

1374 try: 

1375 last_date = last_bucket["control"]["max"]["timestamp"] 

1376 last_date = last_date.replace(tzinfo=pytz.UTC) 

1377 last_value_ts = last_date.timestamp() 

1378 except IndexError: 

1379 last_value_ts = None 

1380 

1381 if last_value_ts is None: 

1382 delay = None 

1383 reason = "No data from signal" 

1384 else: 

1385 # Since device is a computed property, only compute it once 

1386 device = self.device 

1387 if device is not None and device.last_ping is not None: 

1388 last_value_ts = max(last_value_ts, device.last_ping) 

1389 delay = now - last_value_ts 

1390 if delay > DEVICE_TIMEOUT: 

1391 status = "down" 

1392 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1393 return SignalStatus(status=status, reason=reason, delay=delay) 

1394 

1395 async def send_command(self, device_id: str, update_dict: SignalUpdate, current_user: User) -> dict: 

1396 command = Command( 

1397 sent_at=time.time(), 

1398 command_type="Signal command", 

1399 user_id=current_user.id, 

1400 ) 

1401 

1402 has_input_error = False 

1403 error_message = "" 

1404 

1405 if self.data_type.startswith("enum"): 

1406 enum_options = get_args(self.python_type) 

1407 

1408 if update_dict.value is not None and update_dict.value not in enum_options: 

1409 has_input_error = True 

1410 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1411 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1412 has_input_error = True 

1413 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1414 else: 

1415 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1416 has_input_error = True 

1417 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1418 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1419 has_input_error = True 

1420 error_message += ( 

1421 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1422 ) 

1423 

1424 if has_input_error: 

1425 command.response_time = 0 

1426 command.succeeded = False 

1427 command.description = f"Tried to modify signal {self.signal_id}" 

1428 response = {"error": True, "status_code": 400, "message": error_message} 

1429 else: 

1430 response = await send_signal_value(device_id, self.signal_id, update_dict) 

1431 command.receive_response(response) 

1432 

1433 Command.create(command) 

1434 return response 

1435 

1436 @classmethod 

1437 def get_from_signal_id(cls, signal_id: str) -> Self: 

1438 """Could be generic from mongo""" 

1439 signal = Signal.get_one_by_attribute("signal_id", signal_id) 

1440 if signal is None: 

1441 split_signal_id = signal_id.split(".") 

1442 device_or_config_id = split_signal_id[0] 

1443 ticker = split_signal_id[-1] 

1444 possible_device = Device.get_from_device_or_config_id(device_or_config_id) 

1445 if possible_device is not None: 

1446 signal = Signal.get_one_by_attribute( 

1447 "signal_id", f"{possible_device.device_id}.{possible_device.config_id}.{ticker}" 

1448 ) 

1449 if not signal: 

1450 return None 

1451 return cls.dict_to_object(signal) 

1452 

1453 @classmethod 

1454 def get_all_ids(cls) -> list[str]: 

1455 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1456 

1457 return [signal["signal_id"] for signal in cursor] 

1458 

1459 async def number_samples(self): 

1460 collection = get_signal_collection(signal_id=self.signal_id) 

1461 if collection is None: 

1462 return 0 

1463 

1464 number_samples = collection.estimated_document_count() 

1465 

1466 number_samples_async_collection = await get_async_collection( 

1467 systems_async_database, "number_samples", create=True, time_series=True 

1468 ) 

1469 

1470 loop = asyncio.get_running_loop() 

1471 loop.create_task( 

1472 number_samples_async_collection.insert_one( 

1473 { 

1474 "timestamp": datetime.datetime.now(pytz.UTC), 

1475 "signal_id": self.signal_id, 

1476 "number_samples": number_samples, 

1477 } 

1478 ) 

1479 ) 

1480 

1481 return number_samples 

1482 

1483 def sample_datasize(self): 

1484 return signals_database.command("collstats", self.signal_id)["size"] 

1485 

1486 @classmethod 

1487 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1488 result = cls.collection().aggregate( 

1489 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1490 ) 

1491 

1492 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1493 

1494 

1495class ServicesStatus(TwinPadModel): 

1496 backend: str 

1497 cloud_broker: str 

1498 time_series_database: str 

1499 signal_storage: str 

1500 heartbeat_storage: str 

1501 data_analyzer: str 

1502 

1503 @classmethod 

1504 def check(cls) -> Self: 

1505 return cls( 

1506 cloud_broker=ping(RABBITMQ_HOST), 

1507 backend="up", 

1508 time_series_database=ping(MONGO_HOST), 

1509 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1510 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1511 data_analyzer=ping(DATA_ANALYZER_HOST), 

1512 ) 

1513 

1514 

1515def ping(host): 

1516 try: 

1517 if ping3.ping(host, timeout=0.8): 

1518 return "up" 

1519 except PermissionError: 

1520 pass 

1521 return "down" 

1522 

1523 

1524class Event(GenericMongo): 

1525 collection_name: ClassVar[str] = "events" 

1526 

1527 name: str 

1528 timestamp: float 

1529 event_rule_id: str 

1530 

1531 @computed_field 

1532 @cached_property 

1533 def event_rule(self) -> "EventRule": 

1534 return EventRule.get_from_id(self.event_rule_id) 

1535 

1536 @classmethod 

1537 def dict_to_object(cls, dict_): 

1538 """Refine to convert timestamp to datetime for mongodb.""" 

1539 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1540 return super().dict_to_object(dict_) 

1541 

1542 

1543class TwinPadActivity(GenericMongo): 

1544 timestamp: float 

1545 amount: int 

1546 

1547 @classmethod 

1548 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

1549 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1550 number_events_collection = get_collection(systems_database, "number_events") 

1551 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1552 items = [] 

1553 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1554 if number_events_collection is None or recompute_amount: 

1555 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1556 number_events_collection.delete_many({}) 

1557 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1558 if first_event is None: 

1559 return items 

1560 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1561 tzinfo=pytz.UTC 

1562 ) 

1563 while last_computed_day < TODAY: 

1564 day_nb_events = events_collection.count_documents( 

1565 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1566 ) 

1567 if day_nb_events > 0: 

1568 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

1569 last_computed_day += ONE_DAY_OFFSET 

1570 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1571 if number_events_today > 0: 

1572 number_events_collection.delete_many({"timestamp": TODAY}) 

1573 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

1574 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1575 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1576 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1577 for day in number_events: 

1578 day["timestamp"] = day["timestamp"].timestamp() 

1579 items.append(cls.mongo_dict_to_object(day)) 

1580 return items 

1581 

1582 @classmethod 

1583 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1584 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1585 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1586 signals_number_samples_collection = get_collection( 

1587 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

1588 ) 

1589 items = [] 

1590 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1591 if number_samples_collection is None or recompute_amount: 

1592 number_samples_collection = get_collection( 

1593 systems_database, "number_received_samples", create=True, time_series=True 

1594 ) 

1595 number_samples_collection.delete_many({}) 

1596 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

1597 if first_sample is None: 

1598 return items 

1599 # compute from day of first found event 

1600 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

1601 tzinfo=pytz.UTC 

1602 ) 

1603 while last_computed_day < TODAY: 

1604 number_samples_request = signals_number_samples_collection.aggregate( 

1605 [ 

1606 { 

1607 "$match": { 

1608 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

1609 } 

1610 }, 

1611 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1612 ] 

1613 ).to_list() 

1614 if len(number_samples_request) == 0: 

1615 number_samples = 0 

1616 else: 

1617 number_samples = number_samples_request[0].get("number_samples", 0) 

1618 if number_samples > 0: 

1619 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

1620 last_computed_day += ONE_DAY_OFFSET 

1621 number_samples_request = signals_number_samples_collection.aggregate( 

1622 [ 

1623 {"$match": {"timestamp": {"$gte": TODAY}}}, 

1624 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1625 ] 

1626 ).to_list() 

1627 if len(number_samples_request) == 0: 

1628 number_samples_today = 0 

1629 else: 

1630 number_samples_today = number_samples_request[0].get("number_samples", 0) 

1631 if number_samples_today > 0: 

1632 number_samples_collection.delete_many({"timestamp": TODAY}) 

1633 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

1634 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1635 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1636 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1637 for day in number_events: 

1638 day["timestamp"] = day["timestamp"].timestamp() 

1639 items.append(cls.mongo_dict_to_object(day)) 

1640 return items 

1641 

1642 @classmethod 

1643 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1644 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1645 number_commands_collection = get_collection(systems_database, "number_commands") 

1646 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

1647 items = [] 

1648 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1649 if number_commands_collection is None or recompute_amount: 

1650 number_commands_collection = get_collection( 

1651 systems_database, "number_commands", create=True, time_series=True 

1652 ) 

1653 number_commands_collection.delete_many({}) 

1654 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

1655 if first_command is None: 

1656 return items 

1657 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

1658 tzinfo=pytz.UTC 

1659 ) 

1660 while last_computed_day < TODAY: 

1661 day_nb_commands = commands_collection.count_documents( 

1662 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1663 ) 

1664 if day_nb_commands > 0: 

1665 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

1666 last_computed_day += ONE_DAY_OFFSET 

1667 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1668 if number_commands_today > 0: 

1669 number_commands_collection.delete_many({"timestamp": TODAY}) 

1670 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

1671 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1672 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1673 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1674 for day in number_commands: 

1675 day["timestamp"] = day["timestamp"].timestamp() 

1676 items.append(cls.mongo_dict_to_object(day)) 

1677 return items 

1678 

1679 

1680class EventRule(GenericMongo): 

1681 collection_name: ClassVar[str] = "event_rules" 

1682 

1683 name: str 

1684 formula: str 

1685 variables: list[str] 

1686 

1687 @computed_field 

1688 @cached_property 

1689 def number_events(self) -> int: 

1690 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1691 

1692 

1693class Company(GenericMongo): 

1694 collection_name: ClassVar[str] = "companies" 

1695 name: str 

1696 

1697 

1698class Campaign(GenericMongo): 

1699 collection_name: ClassVar[str] = "campaigns" 

1700 

1701 # Properties 

1702 id: str | None = None 

1703 name: str 

1704 description: str | None = None 

1705 

1706 @classmethod 

1707 def create(cls, campaign: Self): 

1708 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1709 if new_campaign is None: 

1710 return None 

1711 return {"campaign_id": str(new_campaign.inserted_id)} 

1712 

1713 @classmethod 

1714 def update(cls, campaign: Self): 

1715 updated_campaign = cls.collection().find_one_and_update( 

1716 {"_id": ObjectId(campaign.id)}, 

1717 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1718 return_document=ReturnDocument.AFTER, 

1719 ) 

1720 return updated_campaign 

1721 

1722 @classmethod 

1723 def delete(cls, campaign_id): 

1724 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1725 return deleted_user 

1726 

1727 

1728class Phase(GenericMongo): 

1729 collection_name: ClassVar[str] = "phases" 

1730 

1731 # Properties 

1732 id: str | None = None 

1733 name: str 

1734 description: str | None = None 

1735 start_at: float 

1736 end_at: float 

1737 

1738 # FK 

1739 campaign_id: str 

1740 

1741 # @classmethod 

1742 # def get_by_date(cls, datetime: float): 

1743 # phases = [] 

1744 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1745 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1746 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1747 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1748 # if phases is None: 

1749 # return None 

1750 # return phases 

1751 

1752 @classmethod 

1753 def create(cls, phase: Self): 

1754 phase = Phase( 

1755 name=phase.name, 

1756 description=phase.description, 

1757 start_at=phase.start_at, 

1758 end_at=phase.end_at, 

1759 campaign_id=phase.campaign_id, 

1760 ) 

1761 phase_collection = get_collection(systems_database, "phases", create=True) 

1762 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1763 if new_phase is None: 

1764 return None 

1765 return {"phase_id": str(new_phase.inserted_id)} 

1766 

1767 @classmethod 

1768 def update(cls, phase: Self): 

1769 updated_phase = cls.collection().find_one_and_update( 

1770 {"_id": ObjectId(phase.id)}, 

1771 { 

1772 "$set": { 

1773 "name": phase.name, 

1774 "description": phase.description, 

1775 "start_at": phase.start_at, 

1776 "end_at": phase.end_at, 

1777 } 

1778 }, 

1779 return_document=ReturnDocument.AFTER, 

1780 ) 

1781 return updated_phase 

1782 

1783 @classmethod 

1784 def delete(cls, phase_id): 

1785 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1786 return delete_phase 

1787 

1788 @classmethod 

1789 def deleteMany(cls, campaign_id): 

1790 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1791 return delete_phases 

1792 

1793 

1794class CustomViewCreation(GenericMongo): 

1795 collection_name: ClassVar[str] = "custom_views" 

1796 

1797 name: str 

1798 configuration: list 

1799 

1800 

1801class CustomView(CustomViewCreation): 

1802 # Properties 

1803 id: str | None = None 

1804 

1805 # Foreign Key 

1806 user_id: str 

1807 

1808 # # Methods 

1809 # @classmethod 

1810 # def create(cls, form_custom_view: Self, user_id) -> list: 

1811 # custom_view = CustomView( 

1812 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1813 # ) 

1814 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1815 # return new_custom_view 

1816 

1817 # @classmethod 

1818 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1819 # updated_custom_view = cls.collection().find_one_and_update( 

1820 # {"_id": ObjectId(custom_view_id)}, 

1821 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1822 # return_document=ReturnDocument.AFTER, 

1823 # ) 

1824 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1825 # del updated_custom_view["_id"] 

1826 # return cls(**updated_custom_view) 

1827 

1828 # @classmethod 

1829 # def delete(cls, custom_view_id) -> bool: 

1830 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1831 # return deleted_custom_view.acknowledged 

1832 

1833 

1834CustomViewUpdate = create_update_model(CustomView) 

1835 

1836 

1837class Video(GenericMongo): 

1838 collection_name: ClassVar[str] = "videos" 

1839 

1840 # Properties 

1841 name: str 

1842 ip_addr: str 

1843 username: str | None = None 

1844 password: str | None = None 

1845 

1846 # Methods 

1847 @classmethod 

1848 def get_all(cls, sort_by="_id") -> list[Self]: 

1849 items = [] 

1850 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1851 items.append(cls.mongo_dict_to_object(dict_)) 

1852 return items 

1853 

1854 @classmethod 

1855 def get_video(cls, camera_id: ObjectId): 

1856 camera = cls.get_from_id(camera_id) 

1857 if camera is not None: 

1858 return camera.name 

1859 return None 

1860 

1861 

1862class Command(GenericMongo): 

1863 collection_name: ClassVar[str] = "commands" 

1864 

1865 # Properties 

1866 timestamp: datetime.datetime = None 

1867 sent_at: float 

1868 response_time: float = 0.0 

1869 command_type: str 

1870 description: str = "" 

1871 succeeded: bool = False 

1872 

1873 # Foreign key 

1874 user_id: str 

1875 

1876 @classmethod 

1877 def collection(cls): 

1878 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1879 

1880 @classmethod 

1881 def create(cls, command: Self): 

1882 command = cls( 

1883 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1884 sent_at=command.sent_at, 

1885 response_time=command.response_time, 

1886 command_type=command.command_type, 

1887 description=command.description, 

1888 succeeded=command.succeeded, 

1889 user_id=command.user_id, 

1890 ) 

1891 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

1892 if new_command is None: 

1893 return None 

1894 return {"command_id": str(new_command.inserted_id)} 

1895 

1896 def receive_response(self, response: dict): 

1897 self.response_time = time.time() - self.sent_at 

1898 self.succeeded = response.get("error", True) is False 

1899 if self.description == "": 

1900 self.description += response.get("message", "").rstrip() 

1901 

1902 

1903class SignalsPresetCreation(GenericMongo): 

1904 name: str 

1905 signal_ids: list[str] 

1906 

1907 

1908class SignalsPreset(SignalsPresetCreation): 

1909 collection_name: ClassVar[str] = "signals_presets" 

1910 

1911 user_id: str 

1912 

1913 @classmethod 

1914 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

1915 signals_preset = cls( 

1916 user_id=user_id, 

1917 name=signals_preset.name, 

1918 signal_ids=signals_preset.signal_ids, 

1919 ) 

1920 

1921 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

1922 

1923 return str(new_signal_preset.inserted_id) 

1924 

1925 

1926SignalsPresetUpdate = create_update_model(SignalsPreset) 

1927 

1928 

1929class LineStyle(str, Enum): 

1930 solid = "solid" 

1931 dotted = "dotted" 

1932 dashed = "dashed" 

1933 

1934 

1935class SignalAppearance: 

1936 value_color: str 

1937 forced_value_color: str 

1938 

1939 

1940class GraphThemeCreation(GenericMongo): 

1941 collection_name: ClassVar[str] = "graph_themes" 

1942 

1943 name: str 

1944 signal_id: str 

1945 value_color: str = "" 

1946 forced_value_color: str = "" 

1947 value_line_style: LineStyle = LineStyle.solid 

1948 forced_value_line_style: LineStyle = LineStyle.solid 

1949 private: bool = True 

1950 

1951 

1952class PublicGraphTheme(GraphThemeCreation): 

1953 created_by_user: bool 

1954 in_user_library: bool 

1955 active_for_user: bool 

1956 

1957 _current_user_id: str = "" 

1958 

1959 @classproperty 

1960 def custom_pipeline_steps(cls) -> dict[str, list]: 

1961 return { 

1962 "created_by_user": [ 

1963 { 

1964 "$addFields": { 

1965 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

1966 } 

1967 } 

1968 ], 

1969 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

1970 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

1971 ], 

1972 "in_user_library": [ 

1973 { 

1974 "$addFields": { 

1975 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

1976 } 

1977 } 

1978 ], 

1979 "active_for_user": [ 

1980 { 

1981 "$addFields": { 

1982 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

1983 } 

1984 } 

1985 ], 

1986 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

1987 "active": [ 

1988 { 

1989 "$addFields": { 

1990 "active": "$$REMOVE", 

1991 } 

1992 } 

1993 ], 

1994 "creator_id": [ 

1995 { 

1996 "$addFields": { 

1997 "creator_id": "$$REMOVE", 

1998 } 

1999 } 

2000 ], 

2001 } 

2002 

2003 @classmethod 

2004 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

2005 cls._current_user_id = user_id 

2006 return super().response_from_query(query) 

2007 

2008 @classmethod 

2009 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

2010 query.in_user_library = "true" 

2011 return cls.response_from_query(query, user_id) 

2012 

2013 @classmethod 

2014 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2015 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2016 

2017 @classmethod 

2018 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2019 cls._current_user_id = user_id 

2020 return super().get_by_attribute(attribute_name, attribute_value) 

2021 

2022 @classmethod 

2023 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2024 cls._current_user_id = user_id 

2025 return super().get_one_by_attribute(attribute_name, attribute_value) 

2026 

2027 @classmethod 

2028 def get_all(cls, sort_by: str, user_id: str): 

2029 cls._current_user_id = user_id 

2030 return super().get_all(sort_by) 

2031 

2032 @classmethod 

2033 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2034 pipeline = [ 

2035 { 

2036 "$match": { 

2037 "active": {"$eq": user_id}, 

2038 "signal_id": {"$in": signal_ids}, 

2039 } 

2040 }, 

2041 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2042 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2043 { 

2044 "$project": { 

2045 "_id": 0, 

2046 "signal_id": 1, 

2047 "value_color": 1, 

2048 "forced_value_color": 1, 

2049 "value_line_style": 1, 

2050 "forced_value_line_style": 1, 

2051 } 

2052 }, 

2053 ] 

2054 

2055 result = {} 

2056 

2057 cursor = cls.collection().aggregate(pipeline) 

2058 for document in cursor: 

2059 signal_id = document["signal_id"] 

2060 del document["signal_id"] 

2061 result[signal_id] = document 

2062 

2063 return result 

2064 

2065 

2066GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2067 

2068 

2069class PrivateGraphTheme(GraphThemeCreation): 

2070 # private 

2071 creator_id: str 

2072 in_library: list[str] 

2073 active: list[str] 

2074 

2075 @classmethod 

2076 def create( 

2077 cls, 

2078 creator_id: str, 

2079 name: str, 

2080 signal_id: str, 

2081 value_color: str, 

2082 forced_value_color: str, 

2083 value_line_style: LineStyle, 

2084 forced_value_line_style: LineStyle, 

2085 private: bool, 

2086 ): 

2087 color_setting = cls( 

2088 creator_id=creator_id, 

2089 name=name, 

2090 signal_id=signal_id, 

2091 value_color=value_color, 

2092 forced_value_color=forced_value_color, 

2093 value_line_style=value_line_style, 

2094 forced_value_line_style=forced_value_line_style, 

2095 private=private, 

2096 in_library=[creator_id], 

2097 active=[creator_id], 

2098 ) 

2099 

2100 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2101 color_setting.id = str(new_color_setting.inserted_id) 

2102 return color_setting 

2103 

2104 def update(self, update_dict: dict, user_id: str): 

2105 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2106 if in_user_lib and user_id not in self.in_library: 

2107 self.in_library.append(user_id) 

2108 elif not in_user_lib and user_id in self.in_library: 

2109 self.in_library.remove(user_id) 

2110 update_dict["in_library"] = self.in_library 

2111 del update_dict["in_user_library"] 

2112 

2113 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2114 if active_for_user and user_id not in self.active: 

2115 self.active.append(user_id) 

2116 elif not active_for_user and user_id in self.active: 

2117 self.active.remove(user_id) 

2118 update_dict["active"] = self.active 

2119 del update_dict["active_for_user"] 

2120 

2121 if update_dict.get("created_by_user") is not None: 

2122 del update_dict["created_by_user"] 

2123 

2124 self.collection().find_one_and_update( 

2125 {"_id": ObjectId(self.id)}, 

2126 {"$set": update_dict}, 

2127 ) 

2128 

2129 return {} 

2130 

2131 

2132class Configuration(GenericMongo): 

2133 collection_name: ClassVar[str] = "configs" 

2134 

2135 # Properties 

2136 config_id: str | None = None 

2137 generated_at: float 

2138 config: dict 

2139 components: list 

2140 hardware_topology: dict 

2141 received_at: float 

2142 in_use_by_devices: list[str] = [] 

2143 is_in_use: bool = False 

2144 

2145 custom_pipeline_steps = { 

2146 "is_in_use": [ 

2147 { 

2148 "$addFields": { 

2149 "is_in_use": { 

2150 "$cond": [ 

2151 {"$gt": [{"$size": {"$ifNull": ["$in_use_by_devices", []]}}, 0]}, 

2152 True, 

2153 False, 

2154 ] 

2155 }, 

2156 } 

2157 } 

2158 ], 

2159 } 

2160 

2161 @classmethod 

2162 def get_all_ids(cls) -> list[str]: 

2163 cursor = cls.collection().aggregate([{"$project": {"config_id": 1, "config_name": 1, "_id": 0}}]) 

2164 

2165 return [{"config_id": config["config_id"], "config_name": config["config_name"]} for config in cursor] 

2166 

2167 @classmethod 

2168 def get_from_config_id(cls, config_id: str) -> Self: 

2169 items = ( 

2170 cls.collection() 

2171 .aggregate( 

2172 [ 

2173 {"$match": {"config_id": config_id}}, 

2174 {"$limit": 1}, 

2175 ] 

2176 ) 

2177 .to_list() 

2178 ) 

2179 if len(items) == 0: 

2180 return None 

2181 dict_ = items[0] 

2182 # There is some protected information in the config dict, so keep only specific keys 

2183 allowed_config_keys = ["description", "broker_host", "target_device_id", "device_name"] 

2184 config_dict = dict_.get("config") 

2185 dict_["config"] = {k: config_dict[k] for k in allowed_config_keys} 

2186 return cls.mongo_dict_to_object(dict_)