Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 89%

714 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-03 07:30 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Optional 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12from typing import Literal, Dict 

13 

14import zipfile 

15import ping3 

16import pytz 

17from bson.objectid import ObjectId 

18from pymongo import ASCENDING, ReturnDocument 

19from pydantic import BaseModel, computed_field, Field, create_model 

20import numpy as npy 

21 

22# from scipy import signal as signal_scipy 

23 

24from twinpad_backend.db import ( 

25 get_collection, 

26 get_signal_collection, 

27 systems_database, 

28 signals_database, 

29 devices_states_database, 

30) 

31from twinpad_backend.responses import ListResponse 

32from twinpad_backend.messages import send_mode_change, send_signal_value 

33 

34TYPES = ({"int": int, "float": float, "str": str, "bool": bool, "epoch": float},) 

35 

36 

37RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

38MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

39SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

40HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

41DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

42 

43DEVICE_TIMEOUT = 5.0 

44NUMBER_SAMPLES_DATABASE_UPDATE = 120 

45 

46logger = logging.getLogger("uvicorn.error") 

47 

48number_samples_collection = get_collection(systems_database, "number_samples", create=True, time_series=True) 

49 

50 

51def create_update_model(model): 

52 fields = {} 

53 

54 for field_name, field_annotation in model.model_fields.items(): 

55 if field_name != "id": 

56 fields[field_name] = (Optional[field_annotation.annotation], None) 

57 

58 query_name = model.__name__ + "Update" 

59 return create_model(query_name, **fields) 

60 

61 

62def get_utc_date_from_timestamp(timestamp: float): 

63 return ( 

64 datetime.datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %-H:%M:%S.%f")[:-3] 

65 + " UTC" 

66 ) 

67 

68 

69# Models 

70class TwinPadModel(BaseModel): 

71 @classmethod 

72 def dict_to_object(cls, dict_): 

73 return cls.model_validate(dict_) 

74 

75 def to_dict(self, exclude=None): 

76 dict_ = self.model_dump(exclude=exclude) 

77 return dict_ 

78 

79 

80class GenericMongo(TwinPadModel): 

81 id: str | None = None 

82 

83 @classmethod 

84 def collection(self): 

85 return get_collection(systems_database, self.collection_name, create=True) 

86 

87 @classmethod 

88 def response_from_query(cls, query) -> ListResponse[Self]: 

89 req_filter = query.mongodb_filter() 

90 items = [] 

91 if ":" in query.sort_by: 

92 sort_field, sort_order = query.sort_by.split(":") 

93 sort_order = int(sort_order) 

94 else: 

95 sort_field = query.sort_by 

96 sort_order = 1 

97 collection = get_collection(systems_database, cls.collection_name, create=True) 

98 total = collection.count_documents(req_filter) 

99 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

100 if (query.limit is not None) and (query.limit != 0): 

101 cursor = cursor.limit(query.limit) 

102 for item_dict in cursor: 

103 items.append(cls.mongo_dict_to_object(item_dict)) 

104 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

105 

106 @classmethod 

107 def get_from_id(cls, item_id) -> Self | None: 

108 # collection = get_collection(systems_database, cls.collection_name, create=True) 

109 

110 dict_ = cls.collection().find_one({"_id": ObjectId(item_id)}) 

111 if dict_ is None: 

112 return None 

113 return cls.mongo_dict_to_object(dict_) 

114 

115 @classmethod 

116 def mongo_dict_to_object(cls, mongo_dict): 

117 mongo_dict["id"] = str(mongo_dict["_id"]) 

118 del mongo_dict["_id"] 

119 return cls.dict_to_object(mongo_dict) 

120 

121 @classmethod 

122 def get_by_attribute(cls, attribute_name: str, attribute_value): 

123 """Returns all items that match the attribute with value.""" 

124 items = cls.collection().find({attribute_name: attribute_value}) 

125 if items is None: 

126 return None 

127 return [cls.mongo_dict_to_object(d) for d in items] 

128 

129 @classmethod 

130 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

131 item = cls.collection().find_one({attribute_name: attribute_value}) 

132 if item is None: 

133 return None 

134 return cls.mongo_dict_to_object(item) 

135 

136 @classmethod 

137 def get_all(cls, sort_by="_id") -> list[Self]: 

138 items = [] 

139 for dict_ in cls.collection().find({}).sort(sort_by, ASCENDING): 

140 items.append(cls.mongo_dict_to_object(dict_)) 

141 return items 

142 

143 def insert(self): 

144 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

145 self.id = str(insert_result.inserted_id) 

146 return self.id 

147 

148 def update(self, update_dict): 

149 for key, value in update_dict.items(): 

150 setattr(self, key, value) 

151 self.collection().find_one_and_update( 

152 {"_id": ObjectId(self.id)}, 

153 {"$set": update_dict}, 

154 return_document=ReturnDocument.AFTER, 

155 ) 

156 

157 return self 

158 

159 def delete(self): 

160 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

161 return result.deleted_count > 0 

162 

163 

164class Mode(TwinPadModel): 

165 mode_id: int 

166 name: str 

167 frequency_multiplier: float 

168 min_frequency: float 

169 

170 

171class DeviceUpdate(TwinPadModel): 

172 mode_id: int 

173 

174 

175class Device(GenericMongo): 

176 collection_name: ClassVar[str] = "devices" 

177 

178 device_id: str 

179 name: str 

180 description: str = "" 

181 modes: list[Mode] 

182 current_mode_id: int | None = None 

183 last_ping: float | None = None 

184 petri_network: Any 

185 pid: Any 

186 load: float | None = None 

187 tokens: list[int] = Field(default_factory=list) 

188 

189 def update(self, update_device): 

190 send_mode_change(self.device_id, update_device.mode_id) 

191 

192 @computed_field 

193 @property 

194 def status(self) -> str: 

195 now = time.time() 

196 if self.last_ping is None: 

197 return "down" 

198 if (now - self.last_ping) > DEVICE_TIMEOUT: 

199 return "down" 

200 return "up" 

201 

202 

203class DeviceSetup(GenericMongo): 

204 collection_name: ClassVar[str] = "device_setups" 

205 

206 device_ids: list[str] 

207 active: bool = False 

208 variable_mapping: Dict[str, str] 

209 

210 

211DeviceSetupUpdate = create_update_model(DeviceSetup) 

212 

213 

214class DeviceState(GenericMongo): 

215 collection_name: ClassVar[str] = "devices_states" 

216 

217 timestamp: float 

218 mode_id: int | None = None 

219 load: float | None = None 

220 tokens: list[int] = Field(default_factory=list) 

221 

222 @classmethod 

223 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

224 req_filter = query.mongodb_filter() 

225 items = [] 

226 if ":" in query.sort_by: 

227 sort_field, sort_order = query.sort_by.split(":") 

228 sort_order = int(sort_order) 

229 else: 

230 sort_field = query.sort_by 

231 sort_order = 1 

232 collection = get_collection(devices_states_database, device_id, create=True) 

233 total = collection.count_documents(req_filter) 

234 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

235 if (query.limit is not None) and (query.limit != 0): 

236 cursor = cursor.limit(query.limit) 

237 for item_dict in cursor: 

238 items.append( 

239 cls( 

240 timestamp=item_dict.get("precise_timestamp"), 

241 mode_id=item_dict.get("mode_id", None), 

242 load=item_dict.get("load", None), 

243 tokens=item_dict.get("tokens", Field(default_factory=list)), 

244 ) 

245 ) 

246 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

247 

248 

249class SignalSample(TwinPadModel): 

250 signal_id: str 

251 timestamp: float 

252 value: float | int | str | bool | None 

253 forced_value: float | int | str | bool | None = None 

254 

255 @classmethod 

256 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

257 

258 collection = get_signal_collection(signal_id) 

259 if collection is None: 

260 return None 

261 

262 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

263 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

264 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

265 last_bucket = None 

266 if bucket is not None: 

267 last_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

268 if last_bucket is not None: 

269 sample_data = collection.find_one({"timestamp": last_bucket["control"]["min"]["timestamp"]}) 

270 else: 

271 sample_data = collection.find_one({}, sort=[("timestamp", 1)]) 

272 

273 if sample_data is None: 

274 return None 

275 

276 timestamp = sample_data["precise_timestamp"] 

277 

278 return cls( 

279 signal_id=signal_id, 

280 timestamp=timestamp, 

281 value=sample_data.get("value", None), 

282 forced_value=sample_data.get("forced_value", None), 

283 ) 

284 

285 @classmethod 

286 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

287 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

288 

289 @classmethod 

290 def get_last_from_signal_id(cls, signal_id: str) -> Self | None: 

291 

292 collection = get_signal_collection(signal_id) 

293 if collection is None: 

294 return None 

295 

296 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

297 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

298 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

299 last_bucket = None 

300 if bucket is not None: 

301 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

302 if last_bucket is not None: 

303 sample_data = collection.find_one({"timestamp": last_bucket["control"]["max"]["timestamp"]}) 

304 else: 

305 sample_data = collection.find_one({}, sort=[("timestamp", -1)]) 

306 

307 if sample_data is None: 

308 return None 

309 

310 timestamp = sample_data["precise_timestamp"] 

311 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

312 if device is not None and device.last_ping is not None: 

313 if timestamp is None: 

314 timestamp = device.last_ping 

315 else: 

316 timestamp = max(timestamp, device.last_ping) 

317 return cls( 

318 signal_id=signal_id, 

319 timestamp=timestamp, 

320 value=sample_data.get("value", None), 

321 forced_value=sample_data.get("forced_value", None), 

322 ) 

323 

324 @classmethod 

325 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

326 return [cls.get_last_from_signal_id(sid) for sid in signal_ids] 

327 

328 

329class SignalData(TwinPadModel): 

330 signal_id: str 

331 time_vector: list[float] 

332 values: list[float | int | str | None] 

333 forced_values: list[float | int | str | None] 

334 

335 data_start: float | None = None 

336 data_end: float | None = None 

337 

338 number_samples: int = 0 

339 number_samples_db: int = 0 

340 

341 db_query_time: float = 0.0 

342 init_time: float = 0.0 

343 data_processing_time: float = 0.0 

344 

345 @classmethod 

346 def get_from_signal_id( 

347 cls, 

348 signal_id: str, 

349 min_timestamp: float = None, 

350 max_timestamp: float = None, 

351 interpolate_bounds: bool = True, 

352 max_documents: int = None, 

353 ) -> Self: 

354 

355 now = time.time() 

356 

357 req_signal = {} 

358 if min_timestamp is not None: 

359 req_signal.setdefault("timestamp", {}) 

360 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

361 if max_timestamp is not None: 

362 req_signal.setdefault("timestamp", {}) 

363 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

364 

365 collection = get_signal_collection(signal_id) 

366 if collection is None: 

367 return cls( 

368 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

369 ) 

370 

371 db_req_start = time.time() 

372 

373 sort_step = {"$sort": {"precise_timestamp": 1}} 

374 number_results = collection.count_documents(req_signal) 

375 

376 pipeline = [] 

377 if req_signal: 

378 pipeline.append({"$match": req_signal}) # Filter data if needed 

379 

380 pipeline.extend( 

381 [ 

382 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

383 sort_step, 

384 ] 

385 ) 

386 

387 if max_documents is not None and max_documents < number_results: 

388 unsampling_ratio = math.ceil(number_results / max_documents) 

389 logger.info(f"unsampling ratio: {unsampling_ratio} (number_results:{number_results})") 

390 pipeline.extend( 

391 [ 

392 { 

393 "$setWindowFields": { 

394 "sortBy": {"precise_timestamp": 1}, 

395 "output": {"index": {"$documentNumber": {}}}, 

396 } 

397 }, 

398 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

399 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

400 {"$replaceRoot": {"newRoot": "$doc"}}, 

401 {"$unset": ["index", "group_id"]}, 

402 {"$sort": {"precise_timestamp": 1}}, 

403 ] 

404 ) 

405 

406 # logger.info(f"pipeline: {pipeline}") 

407 cursor = collection.aggregate(pipeline) 

408 db_req_time = time.time() - db_req_start 

409 

410 init_time = time.time() 

411 number_samples_db = collection.count_documents(req_signal) 

412 

413 results = list(cursor) 

414 time_vector = [] 

415 values = [] 

416 forced_values = [] 

417 for s in results: 

418 time_vector.append(s["precise_timestamp"]) 

419 values.append(s.get("value", None)) 

420 forced_values.append(s.get("forced_value", None)) 

421 

422 signal = Signal.get_from_signal_id(signal_id) 

423 class_ = signal.signal_data_class 

424 

425 if interpolate_bounds: 

426 # Fetching left side value & interpolation 

427 if min_timestamp and (not time_vector or time_vector[0] != min_timestamp): 

428 sample_left = collection.find_one( 

429 { 

430 "timestamp": {"$lte": datetime.datetime.fromtimestamp(min_timestamp)}, 

431 "value": {"$exists": True}, 

432 }, 

433 sort=[("timestamp", -1)], 

434 ) 

435 

436 if sample_left: 

437 if time_vector: 

438 left_sd = class_( 

439 signal_id=signal_id, 

440 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

441 values=[sample_left["value"], values[0]], 

442 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

443 ) 

444 min_ts_value = left_sd.interpolate_values([min_timestamp])[0] 

445 min_ts_forced_value = left_sd.interpolate_forced_values([min_timestamp])[0] 

446 else: 

447 min_ts_value = sample_left.get("value", None) 

448 min_ts_forced_value = sample_left.get("forced_value", None) 

449 time_vector.insert(0, min_timestamp) 

450 values.insert(0, min_ts_value) 

451 forced_values.insert(0, min_ts_forced_value) 

452 

453 # Fetching right side value & interpolation 

454 if max_timestamp is not None and (not time_vector or time_vector[-1] != max_timestamp): 

455 sample_right = collection.find_one( 

456 { 

457 "timestamp": {"$gte": datetime.datetime.fromtimestamp(max_timestamp)}, 

458 "value": {"$exists": True}, 

459 }, 

460 sort=[("timestamp", 1)], 

461 ) 

462 if sample_right: 

463 if time_vector: 

464 right_sd = class_( 

465 signal_id=signal_id, 

466 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

467 values=[values[-1], sample_right.get("value", None)], 

468 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

469 ) 

470 max_ts_value = right_sd.interpolate_values([max_timestamp])[0] 

471 max_ts_forced_value = right_sd.interpolate_forced_values([max_timestamp])[0] 

472 else: 

473 max_ts_value = sample_right.get("value", None) 

474 max_ts_forced_value = sample_right.get("forced_value", None) 

475 time_vector.append(max_timestamp) 

476 values.append(max_ts_value) 

477 forced_values.append(max_ts_forced_value) 

478 

479 if values: 

480 # TODO: check below. a bit strange 

481 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

482 # Adding last value as it should be repeated 

483 time_vector.append(now) 

484 values.append(values[-1]) 

485 forced_values.append(forced_values[-1]) 

486 

487 init_time = time.time() - init_time 

488 

489 # See line 292 for explanation 

490 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

491 first_bucket = None 

492 if bucket is not None: 

493 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

494 if first_bucket is not None: 

495 data_start = first_bucket["control"]["min"]["timestamp"].replace(tzinfo=pytz.UTC).timestamp() 

496 else: 

497 data_start = None 

498 

499 if signal.repeated_sample(): 

500 data_end = time.time() 

501 else: 

502 last_bucket = None 

503 if bucket is not None: 

504 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

505 if last_bucket is not None: 

506 data_end = last_bucket["control"]["max"]["timestamp"].replace(tzinfo=pytz.UTC).timestamp() 

507 else: 

508 data_end = time.time() 

509 

510 return class_( 

511 signal_id=signal_id, 

512 time_vector=time_vector, 

513 values=values, 

514 forced_values=forced_values, 

515 data_start=data_start, 

516 data_end=data_end, 

517 number_samples=len(values), 

518 number_samples_db=number_samples_db, 

519 db_query_time=db_req_time, 

520 init_time=init_time, 

521 ) 

522 

523 def interpolate_values(self, new_time_vector: list[float]): 

524 return self.interpolate(new_time_vector, self.values) 

525 

526 def interpolate_forced_values(self, new_time_vector: list[float]): 

527 return self.interpolate(new_time_vector, self.forced_values) 

528 

529 def uniform_desampling(self, number_samples_max: int) -> Self: 

530 data_processing_time = time.time() 

531 if number_samples_max and self.number_samples > number_samples_max: 

532 new_time_vector = npy.linspace( 

533 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

534 ).tolist() 

535 values = self.interpolate_values(new_time_vector) 

536 forced_values = self.interpolate_forced_values(new_time_vector) 

537 time_vector = new_time_vector 

538 number_samples = len(time_vector) 

539 else: 

540 time_vector = self.time_vector 

541 number_samples = len(self.values) 

542 values = self.values[:] 

543 forced_values = self.forced_values[:] 

544 data_processing_time = time.time() - data_processing_time 

545 

546 return self.__class__( 

547 signal_id=self.signal_id, 

548 time_vector=time_vector, 

549 values=values, 

550 forced_values=forced_values, 

551 number_samples=number_samples, 

552 number_samples_db=self.number_samples, 

553 data_start=self.data_start, 

554 data_end=self.data_end, 

555 db_query_time=self.db_query_time, 

556 init_time=self.init_time, 

557 data_processing_time=self.data_processing_time + data_processing_time, 

558 ) 

559 

560 def interest_window_desampling( 

561 self, 

562 window_max_number_samples: int, 

563 outside_max_number_samples: int, 

564 window_min_timestamp: float | None = None, 

565 window_max_timestamp: float | None = None, 

566 ) -> Self: 

567 """Performs a sampling in a window of interest and outside.""" 

568 

569 if not self.time_vector: 

570 return self 

571 

572 if window_min_timestamp is None: 

573 window_min_timestamp = self.time_vector[0] 

574 if window_max_timestamp is None: 

575 window_max_timestamp = self.time_vector[-1] 

576 

577 data_processing_time = time.time() 

578 

579 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

580 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

581 

582 time_vector_before = self.time_vector[:index_window_start] 

583 time_vector_window = self.time_vector[index_window_start:index_window_end] 

584 time_vector_after = self.time_vector[index_window_end:] 

585 

586 # Resampling window 

587 if time_vector_window: 

588 # Ensurring window bounds 

589 if time_vector_window[0] != window_min_timestamp: 

590 time_vector_window.insert(0, window_min_timestamp) 

591 if time_vector_window[-1] != window_max_timestamp: 

592 time_vector_window.append(window_max_timestamp) 

593 else: 

594 time_vector_window = [window_min_timestamp, window_max_timestamp] 

595 

596 if len(time_vector_window) > window_max_number_samples: 

597 # Resampling 

598 new_window_time_vector = npy.linspace( 

599 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

600 ).tolist() 

601 time_vector_window = new_window_time_vector 

602 

603 # Resampling outside 

604 number_samples_before = len(time_vector_before) 

605 number_samples_after = len(time_vector_after) 

606 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

607 new_number_samples_before = math.ceil( 

608 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

609 ) 

610 new_number_samples_after = math.ceil( 

611 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

612 ) 

613 # Adjusting numbers as math.ceil can do +1 on sum 

614 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

615 if new_number_samples_before > new_number_samples_after: 

616 new_number_samples_before -= 1 

617 else: 

618 new_number_samples_after -= 1 

619 

620 if new_number_samples_before: 

621 new_time_vector_before = npy.linspace( 

622 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

623 ).tolist() 

624 time_vector_before = new_time_vector_before 

625 

626 if number_samples_after: 

627 new_time_vector_after = npy.linspace( 

628 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

629 ).tolist()[::-1] 

630 time_vector_after = new_time_vector_after 

631 

632 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

633 values = self.interpolate_values(new_time_vector) 

634 forced_values = self.interpolate_forced_values(new_time_vector) 

635 number_samples = len(values) 

636 

637 data_processing_time = time.time() - data_processing_time 

638 

639 # logger.warning(f"samples: {samples}") 

640 

641 return self.__class__( 

642 signal_id=self.signal_id, 

643 time_vector=new_time_vector, 

644 values=values, 

645 forced_values=forced_values, 

646 number_samples=number_samples, 

647 number_samples_db=self.number_samples, 

648 data_start=self.data_start, 

649 data_end=self.data_end, 

650 db_query_time=self.db_query_time, 

651 init_time=self.init_time, 

652 data_processing_time=self.data_processing_time + data_processing_time, 

653 ) 

654 

655 def csv_export(self): 

656 output = io.StringIO() 

657 writer = csv.writer(output) 

658 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

659 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

660 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

661 return output.getvalue().encode("utf-8") 

662 

663 def prestoplot_export(self): 

664 clean_signal_id = self.signal_id.replace(".", "_") 

665 if clean_signal_id[0].isnumeric(): 

666 clean_signal_id = "_" + clean_signal_id 

667 

668 output = io.StringIO() 

669 output.write("# Encoding:\tUTF-8\n") 

670 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

671 output.write("ISO8601\tnone\tnone\n") 

672 output.write(f"# Description :\t{clean_signal_id}\n") 

673 

674 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

675 output.write( 

676 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

677 ) 

678 return output.getvalue().encode("utf-8") 

679 

680 

681class NumericSignalData(SignalData): 

682 data_type: str = "float" 

683 values: list[float | int | None] 

684 forced_values: list[float | int | None] 

685 

686 def interpolate(self, new_time_vector: list[float], items): 

687 items = [npy.nan if s is None else s for s in items] 

688 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

689 

690 

691class StringSignalData(SignalData): 

692 data_type: str = "str" 

693 values: list[str | None] 

694 forced_values: list[str | None] 

695 

696 def interpolate(self, new_time_vector: list[float], items): 

697 # Find the indices of the values in xp that are just smaller or equal to x 

698 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

699 indices = npy.clip(indices, 0, len(items) - 1) 

700 # Return the corresponding left string values from fp 

701 return [items[i] for i in indices] 

702 

703 

704class SignalsData(TwinPadModel): 

705 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

706 data_processing_time: float 

707 data_start: float | None 

708 data_end: float | None 

709 

710 @classmethod 

711 def get_from_signal_ids( 

712 cls, signal_ids: list[str], min_timestamp: float = None, max_timestamp: float = None, max_documents: int = None 

713 ) -> Self: 

714 signals_data = [] 

715 data_start = None 

716 data_end = None 

717 if max_timestamp is None: 

718 max_timestamp = time.time() 

719 data_processing_time = 0.0 

720 for signal_id in signal_ids: 

721 signal_data = SignalData.get_from_signal_id( 

722 signal_id=signal_id, 

723 min_timestamp=min_timestamp, 

724 max_timestamp=max_timestamp, 

725 max_documents=max_documents, 

726 ) 

727 data_processing_time += signal_data.data_processing_time 

728 signals_data.append(signal_data) 

729 if signal_data.data_start is not None: 

730 if data_start is None: 

731 data_start = signal_data.data_start 

732 else: 

733 data_start = min(signal_data.data_start, data_start) 

734 if signal_data.data_end is not None: 

735 if data_end is None: 

736 data_end = signal_data.data_end 

737 else: 

738 data_end = max(signal_data.data_end, data_end) 

739 

740 return cls( 

741 signals_data=signals_data, 

742 data_processing_time=data_processing_time, 

743 data_start=data_start, 

744 data_end=data_end, 

745 ) 

746 

747 def uniform_desampling(self, number_samples_max: int) -> Self: 

748 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

749 return SignalsData( 

750 signals_data=signals_data, 

751 data_processing_time=sum(s.data_processing_time for s in signals_data), 

752 data_start=self.data_start, 

753 data_end=self.data_end, 

754 ) 

755 

756 def interest_window_desampling( 

757 self, 

758 window_max_number_samples: int, 

759 outside_max_number_samples: int, 

760 window_min_timestamp: float = None, 

761 window_max_timestamp: float = None, 

762 ) -> Self: 

763 signals_data = [ 

764 s.interest_window_desampling( 

765 window_max_number_samples=window_max_number_samples, 

766 outside_max_number_samples=outside_max_number_samples, 

767 window_min_timestamp=window_min_timestamp, 

768 window_max_timestamp=window_max_timestamp, 

769 ) 

770 for s in self.signals_data 

771 ] 

772 

773 return SignalsData( 

774 signals_data=signals_data, 

775 data_processing_time=sum(s.data_processing_time for s in signals_data), 

776 data_start=self.data_start, 

777 data_end=self.data_end, 

778 ) 

779 

780 def zip_export(self, format: str = "csv"): 

781 # return self.signals_data[0].csv_export() 

782 zip_buffer = io.BytesIO() 

783 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

784 for signal_data in self.signals_data: 

785 if format == "csv": 

786 export_io = signal_data.csv_export() 

787 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

788 elif format == "prestoplot": 

789 export_io = signal_data.prestoplot_export() 

790 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

791 else: 

792 raise ValueError(f"Format not found. Got: {format}") 

793 zip_bytes = zip_buffer.getvalue() 

794 # zip_bytes.seek(0) 

795 return zip_bytes 

796 

797 

798class SignalStatus(TwinPadModel): 

799 status: str 

800 reason: str 

801 delay: float | None 

802 

803 

804class DigitizationFunction(TwinPadModel): 

805 bits: int | None = None 

806 min_value: float 

807 max_value: float 

808 min_raw_value: float 

809 max_raw_value: float 

810 

811 

812class SignalUpdate(TwinPadModel): 

813 value: float | str | bool | int | None = None 

814 forced_value: float | str | bool | int | None = None 

815 timestamp: int | None = None 

816 

817 

818class SignalType(str, Enum): 

819 command = "command" 

820 sensor = "sensor" 

821 external_sensor = "external_sensor" 

822 

823 

824SIGNALDATA_TYPES = { 

825 "int": NumericSignalData, 

826 "float": NumericSignalData, 

827 "str": StringSignalData, 

828 "bool": NumericSignalData, 

829 "epoch": NumericSignalData, 

830} 

831 

832 

833class Signal(GenericMongo): 

834 collection_name: ClassVar[str] = "signals" 

835 

836 signal_id: str 

837 frequency: float 

838 unit: str | None 

839 description: str 

840 type: SignalType 

841 data_type: str 

842 precision_digits: int | None 

843 

844 digitization_function: DigitizationFunction | None 

845 

846 @property 

847 def device(self) -> Device: 

848 device_id = self.signal_id.split(".")[0] 

849 device = Device.get_one_by_attribute("device_id", device_id) 

850 return device 

851 

852 @cached_property 

853 def signal_data_class(self): 

854 if self.data_type in SIGNALDATA_TYPES: 

855 return SIGNALDATA_TYPES[self.data_type] 

856 if self.data_type.startswith("enum"): 

857 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

858 raise ValueError(f"Unhandled python type: {self.data_type}") 

859 

860 @cached_property 

861 def python_type(self): 

862 if self.data_type in TYPES: 

863 return TYPES[self.data_type] 

864 if self.data_type.startswith("enum"): 

865 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

866 return Literal[*choices] 

867 raise ValueError(f"Unhandled python type: {self.data_type}") 

868 

869 @computed_field 

870 @property 

871 def status(self) -> SignalStatus: 

872 now = time.time() 

873 status = "up" 

874 reason = "" 

875 

876 # See line 292 for explanation 

877 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

878 last_bucket = None 

879 if bucket is not None: 

880 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

881 if last_bucket is None: 

882 status = "no data" 

883 reason = "signal does not exist" 

884 return SignalStatus(status=status, reason=reason, delay=None) 

885 

886 try: 

887 last_date = last_bucket["control"]["max"]["timestamp"] 

888 last_date = last_date.replace(tzinfo=pytz.UTC) 

889 last_value_ts = last_date.timestamp() 

890 except IndexError: 

891 last_value_ts = None 

892 

893 if last_value_ts is None: 

894 delay = None 

895 reason = "No data from signal" 

896 else: 

897 # Since device is a computed property, only compute it once 

898 device = self.device 

899 if device.last_ping is not None: 

900 last_value_ts = max(last_value_ts, device.last_ping) 

901 delay = now - last_value_ts 

902 if delay > DEVICE_TIMEOUT: 

903 status = "down" 

904 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

905 return SignalStatus(status=status, reason=reason, delay=delay) 

906 

907 def update(self, update: SignalUpdate): 

908 send_signal_value(self.signal_id, update) 

909 

910 @classmethod 

911 def get_from_signal_id(cls, signal_id) -> Self: 

912 """Could be generic from mongo""" 

913 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

914 if not raw_value: 

915 return None 

916 del raw_value["_id"] 

917 return cls.dict_to_object(raw_value) 

918 

919 def number_samples(self): 

920 collection = get_signal_collection(signal_id=self.signal_id) 

921 if collection is None: 

922 return 0 

923 

924 number_samples = collection.estimated_document_count() 

925 

926 number_samples_collection.insert_one( 

927 { 

928 "timestamp": datetime.datetime.now(pytz.UTC), 

929 "signal_id": self.signal_id, 

930 "number_samples": number_samples, 

931 } 

932 ) 

933 

934 return number_samples 

935 

936 def repeated_sample(self, now: float = None): 

937 if now is None: 

938 now = time.time() 

939 

940 sample = SignalSample.get_last_from_signal_id(self.signal_id) 

941 if sample is not None: 

942 if self.device.status == "up": 

943 if now - sample.timestamp < DEVICE_TIMEOUT: 

944 return True 

945 return False 

946 

947 def sample_datasize(self): 

948 return signals_database.command("collstats", self.signal_id)["size"] 

949 

950 

951class ServicesStatus(TwinPadModel): 

952 backend: str 

953 cloud_broker: str 

954 time_series_database: str 

955 signal_storage: str 

956 heartbeat_storage: str 

957 data_analyzer: str 

958 

959 @classmethod 

960 def check(cls) -> Self: 

961 return cls( 

962 cloud_broker=ping(RABBITMQ_HOST), 

963 backend="up", 

964 time_series_database=ping(MONGO_HOST), 

965 signal_storage=ping(SIGNAL_STORAGE_HOST), 

966 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

967 data_analyzer=ping(DATA_ANALYZER_HOST), 

968 ) 

969 

970 

971def ping(host): 

972 try: 

973 if ping3.ping(host, timeout=0.8): 

974 return "up" 

975 except PermissionError: 

976 pass 

977 return "down" 

978 

979 

980class Event(GenericMongo): 

981 collection_name: ClassVar[str] = "events" 

982 

983 name: str 

984 timestamp: float 

985 event_rule_id: str 

986 

987 @computed_field 

988 @cached_property 

989 def event_rule(self) -> "EventRule": 

990 return EventRule.get_from_id(self.event_rule_id) 

991 

992 @classmethod 

993 def dict_to_object(cls, dict_): 

994 """Refine to convert timestamp to datetime for mongodb.""" 

995 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

996 return super().dict_to_object(dict_) 

997 

998 

999class EventRule(GenericMongo): 

1000 collection_name: ClassVar[str] = "event_rules" 

1001 

1002 name: str 

1003 formula: str 

1004 variables: list[str] 

1005 

1006 @computed_field 

1007 @cached_property 

1008 def number_events(self) -> int: 

1009 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1010 

1011 

1012class Company(GenericMongo): 

1013 collection_name: ClassVar[str] = "companies" 

1014 name: str 

1015 

1016 

1017class User(GenericMongo): 

1018 collection_name: ClassVar[str] = "users" 

1019 

1020 firstname: str 

1021 lastname: str 

1022 email: str 

1023 password: str 

1024 is_active: bool | None = False 

1025 is_admin: bool | None = False 

1026 is_connected: bool | None = False 

1027 company_id: str | None = None 

1028 

1029 def to_dict(self, exclude={"password"}): 

1030 return GenericMongo.to_dict(self, exclude=exclude) 

1031 

1032 @classmethod 

1033 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

1034 users = cls.get_all() 

1035 if not users: 

1036 is_admin = True 

1037 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

1038 user_collection = get_collection(systems_database, "users", create=True) 

1039 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

1040 if new_user is None: 

1041 return None 

1042 return {"user_id": str(new_user.inserted_id)} 

1043 

1044 @classmethod 

1045 def update(cls, user: "UserUpdate", user_id: str): 

1046 updated_user = cls.collection().find_one_and_update( 

1047 {"_id": ObjectId(user_id)}, 

1048 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

1049 return_document=ReturnDocument.AFTER, 

1050 ) 

1051 updated_user["id"] = str(updated_user["_id"]) 

1052 del (updated_user["_id"], updated_user["is_connected"]) 

1053 return cls(**updated_user) 

1054 

1055 

1056UserUpdate = create_update_model(User) 

1057 

1058 

1059class Campaign(GenericMongo): 

1060 collection_name: ClassVar[str] = "campaigns" 

1061 

1062 # Properties 

1063 id: str | None = None 

1064 name: str 

1065 description: str | None = None 

1066 

1067 @classmethod 

1068 def create(cls, campaign: Self): 

1069 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1070 if new_campaign is None: 

1071 return None 

1072 return {"campaign_id": str(new_campaign.inserted_id)} 

1073 

1074 @classmethod 

1075 def update(cls, campaign: Self): 

1076 updated_campaign = cls.collection().find_one_and_update( 

1077 {"_id": ObjectId(campaign.id)}, 

1078 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1079 return_document=ReturnDocument.AFTER, 

1080 ) 

1081 return updated_campaign 

1082 

1083 @classmethod 

1084 def delete(cls, campaign_id): 

1085 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1086 return deleted_user 

1087 

1088 

1089class Phase(GenericMongo): 

1090 collection_name: ClassVar[str] = "phases" 

1091 

1092 # Properties 

1093 id: str | None = None 

1094 name: str 

1095 description: str | None = None 

1096 start_at: float 

1097 end_at: float 

1098 

1099 # FK 

1100 campaign_id: str 

1101 

1102 @classmethod 

1103 def get_by_date(cls, datetime: float): 

1104 phases = [] 

1105 for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1106 phases.append(cls.dict_to_object(dict_).model_dump()) 

1107 for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1108 phases.append(cls.dict_to_object(dict_).model_dump()) 

1109 if phases is None: 

1110 return None 

1111 return phases 

1112 

1113 @classmethod 

1114 def create(cls, phase: Self): 

1115 phase = Phase( 

1116 name=phase.name, 

1117 description=phase.description, 

1118 start_at=phase.start_at, 

1119 end_at=phase.end_at, 

1120 campaign_id=phase.campaign_id, 

1121 ) 

1122 phase_collection = get_collection(systems_database, "phases", create=True) 

1123 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1124 if new_phase is None: 

1125 return None 

1126 return {"phase_id": str(new_phase.inserted_id)} 

1127 

1128 @classmethod 

1129 def update(cls, phase: Self): 

1130 updated_phase = cls.collection().find_one_and_update( 

1131 {"_id": ObjectId(phase.id)}, 

1132 { 

1133 "$set": { 

1134 "name": phase.name, 

1135 "description": phase.description, 

1136 "start_at": phase.start_at, 

1137 "end_at": phase.end_at, 

1138 } 

1139 }, 

1140 return_document=ReturnDocument.AFTER, 

1141 ) 

1142 return updated_phase 

1143 

1144 @classmethod 

1145 def delete(cls, phase_id): 

1146 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1147 return delete_phase 

1148 

1149 @classmethod 

1150 def deleteMany(cls, campaign_id): 

1151 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1152 return delete_phases 

1153 

1154 

1155class CustomView(GenericMongo): 

1156 collection_name: ClassVar[str] = "custom_views" 

1157 

1158 # Properties 

1159 id: str | None = None 

1160 name: str 

1161 configuration: list 

1162 

1163 # Foreign Key 

1164 user_id: str 

1165 

1166 # Methods 

1167 @classmethod 

1168 def create(cls, form_custom_view: Self) -> list: 

1169 custom_view = CustomView( 

1170 name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=form_custom_view.user_id 

1171 ) 

1172 new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1173 if new_custom_view is None: 

1174 return None 

1175 return {"custom_view_id": str(new_custom_view.inserted_id)} 

1176 

1177 @classmethod 

1178 def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1179 updated_custom_view = cls.collection().find_one_and_update( 

1180 {"_id": ObjectId(custom_view_id)}, 

1181 {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1182 return_document=ReturnDocument.AFTER, 

1183 ) 

1184 updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1185 del updated_custom_view["_id"] 

1186 return cls(**updated_custom_view) 

1187 

1188 @classmethod 

1189 def delete(cls, custom_view_id) -> bool: 

1190 deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1191 return deleted_custom_view.acknowledged