Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 90%

858 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-04 13:35 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, Union 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23 

24# from scipy import signal as signal_scipy 

25 

26from twinpad_backend.db import ( 

27 get_collection, 

28 get_async_collection, 

29 get_signal_collection, 

30 systems_database, 

31 systems_async_database, 

32 signals_database, 

33 devices_states_database, 

34) 

35from twinpad_backend.responses import ListResponse 

36from twinpad_backend.messages import send_mode_change, send_signal_value 

37 

38TYPES = ({"int": int, "float": float, "str": str, "bool": bool, "epoch": float},) 

39 

40 

41RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

42MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

43SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

44HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

45DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

46 

47DEVICE_TIMEOUT = 5.0 

48NUMBER_SAMPLES_DATABASE_UPDATE = 120 

49 

50logger = logging.getLogger("uvicorn.error") 

51 

52 

53def create_update_model(model): 

54 fields = {} 

55 

56 for field_name, field_annotation in model.model_fields.items(): 

57 if field_name != "id": 

58 fields[field_name] = (Union[field_annotation.annotation, None], None) 

59 

60 query_name = model.__name__ + "Update" 

61 return create_model(query_name, **fields) 

62 

63 

64def get_utc_date_from_timestamp(timestamp: float): 

65 return ( 

66 datetime.datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %-H:%M:%S.%f") + " UTC" 

67 ) 

68 

69 

70def downsample_list(time_vector: list, values: list, max_number_samples: int): 

71 if len(time_vector) < max_number_samples: 

72 return time_vector, values 

73 

74 time_vector_copy = copy.deepcopy(time_vector) 

75 values_copy = copy.deepcopy(values) 

76 try: 

77 none_group_bounds = [] 

78 none_group_index = -1 

79 index = -1 

80 # LTTB doesn't handle None values so remove them 

81 while values_copy.count(None) > 0: 

82 # Store bounds of None value groups so we can insert them back after the downsampling 

83 if (new_index := values_copy.index(None)) != index: 

84 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

85 none_group_index += 1 

86 elif len(none_group_bounds[none_group_index]) < 2: 

87 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

88 else: 

89 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

90 values_copy.pop(new_index) 

91 index = new_index 

92 

93 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

94 

95 values_array = npy.array([time_vector_copy, values_copy]).T 

96 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

97 

98 new_time_vector = interpolated_values[:, 0].tolist() 

99 new_values = interpolated_values[:, 1].tolist() 

100 except ValueError: 

101 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

102 new_time_vector = npy.linspace( 

103 time_vector_copy[0], time_vector_copy[-1], max_number_samples, endpoint=True 

104 ).tolist() 

105 new_values = list(npy.interp(new_time_vector, time_vector_copy, values_copy)) 

106 return new_time_vector, new_values 

107 

108 # insert back None values at the correct timestamps 

109 for none_group in none_group_bounds: 

110 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

111 new_time_vector[start_index:start_index] = none_group 

112 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

113 

114 return new_time_vector, new_values 

115 

116 

117# Models 

118class TwinPadModel(BaseModel): 

119 @classmethod 

120 def dict_to_object(cls, dict_): 

121 return cls.model_validate(dict_) 

122 

123 def to_dict(self, exclude=None): 

124 dict_ = self.model_dump(exclude=exclude) 

125 return dict_ 

126 

127 

128class GenericMongo(TwinPadModel): 

129 id: str | None = None 

130 

131 @classmethod 

132 def collection(cls): 

133 return get_collection(systems_database, cls.collection_name, create=True) 

134 

135 @classmethod 

136 def response_from_query(cls, query) -> ListResponse[Self]: 

137 req_filter = query.mongodb_filter() 

138 items = [] 

139 if ":" in query.sort_by: 

140 sort_field, sort_order = query.sort_by.split(":") 

141 sort_order = int(sort_order) 

142 else: 

143 sort_field = query.sort_by 

144 sort_order = 1 

145 collection = get_collection(systems_database, cls.collection_name, create=True) 

146 total = collection.count_documents(req_filter) 

147 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

148 if (query.limit is not None) and (query.limit != 0): 

149 cursor = cursor.limit(query.limit) 

150 for item_dict in cursor: 

151 items.append(cls.mongo_dict_to_object(item_dict)) 

152 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

153 

154 @classmethod 

155 def get_from_id(cls, item_id) -> Self | None: 

156 # collection = get_collection(systems_database, cls.collection_name, create=True) 

157 

158 dict_ = cls.collection().find_one({"_id": ObjectId(item_id)}) 

159 if dict_ is None: 

160 return None 

161 return cls.mongo_dict_to_object(dict_) 

162 

163 @classmethod 

164 def mongo_dict_to_object(cls, mongo_dict): 

165 mongo_dict["id"] = str(mongo_dict["_id"]) 

166 del mongo_dict["_id"] 

167 return cls.dict_to_object(mongo_dict) 

168 

169 @classmethod 

170 def get_by_attribute(cls, attribute_name: str, attribute_value): 

171 """Returns all items that match the attribute with value.""" 

172 items = cls.collection().find({attribute_name: attribute_value}) 

173 if items is None: 

174 return None 

175 return [cls.mongo_dict_to_object(d) for d in items] 

176 

177 @classmethod 

178 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

179 item = cls.collection().find_one({attribute_name: attribute_value}) 

180 if item is None: 

181 return None 

182 return cls.mongo_dict_to_object(item) 

183 

184 @classmethod 

185 def get_all(cls, sort_by="_id") -> list[Self]: 

186 items = [] 

187 for dict_ in cls.collection().find({}).sort(sort_by, ASCENDING): 

188 items.append(cls.mongo_dict_to_object(dict_)) 

189 return items 

190 

191 @classmethod 

192 def get_number_documents(cls): 

193 collection = get_collection(systems_database, cls.collection_name) 

194 if collection is None: 

195 return 0 

196 return collection.count_documents({}) 

197 

198 def insert(self): 

199 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

200 self.id = str(insert_result.inserted_id) 

201 return self.id 

202 

203 def update(self, update_dict): 

204 for key, value in update_dict.items(): 

205 setattr(self, key, value) 

206 self.collection().find_one_and_update( 

207 {"_id": ObjectId(self.id)}, 

208 {"$set": update_dict}, 

209 return_document=ReturnDocument.AFTER, 

210 ) 

211 

212 return self 

213 

214 def delete(self): 

215 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

216 return result.deleted_count > 0 

217 

218 

219class Mode(TwinPadModel): 

220 mode_id: int 

221 name: str 

222 frequency_multiplier: float 

223 min_frequency: float 

224 

225 

226class DeviceUpdate(TwinPadModel): 

227 mode_id: int 

228 

229 

230class Device(GenericMongo): 

231 collection_name: ClassVar[str] = "devices" 

232 

233 device_id: str 

234 name: str 

235 description: str = "" 

236 modes: list[Mode] 

237 current_mode_id: int | None = None 

238 last_ping: float | None = None 

239 petri_network: Any 

240 pid: Any 

241 load: float | None = None 

242 tokens: list[int] = Field(default_factory=list) 

243 

244 def update(self, update_dict): 

245 send_mode_change(self.device_id, update_dict.mode_id) 

246 

247 @computed_field 

248 @property 

249 def status(self) -> str: 

250 now = time.time() 

251 if self.last_ping is None: 

252 return "down" 

253 if (now - self.last_ping) > DEVICE_TIMEOUT: 

254 return "down" 

255 return "up" 

256 

257 @classmethod 

258 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

259 devices_by_id = {} 

260 for signal_id in signal_ids: 

261 device_id = signal_id.split(".")[0] 

262 if device_id not in devices_by_id: 

263 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

264 return devices_by_id 

265 

266 

267class DeviceSetup(GenericMongo): 

268 collection_name: ClassVar[str] = "device_setups" 

269 

270 device_ids: list[str] 

271 active: bool = False 

272 variable_mapping: dict[str, str] 

273 

274 

275DeviceSetupUpdate = create_update_model(DeviceSetup) 

276 

277 

278class DeviceState(GenericMongo): 

279 collection_name: ClassVar[str] = "devices_states" 

280 

281 timestamp: float 

282 mode: str | None = None 

283 load: float | None = None 

284 tokens: list[int] = Field(default_factory=list) 

285 modified_properties: list[str] = Field(default_factory=list) 

286 

287 @classmethod 

288 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

289 req_filter = query.mongodb_filter() 

290 items = [] 

291 if ":" in query.sort_by: 

292 sort_field, sort_order = query.sort_by.split(":") 

293 sort_order = int(sort_order) 

294 else: 

295 sort_field = query.sort_by 

296 sort_order = 1 

297 collection = get_collection(devices_states_database, device_id) 

298 if collection is None: 

299 total = 0 

300 cursor = [] 

301 else: 

302 total = collection.count_documents(req_filter) 

303 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

304 if (query.limit is not None) and (query.limit != 0): 

305 cursor = cursor.limit(query.limit) 

306 for item_dict in cursor: 

307 items.append( 

308 cls( 

309 timestamp=item_dict.get("precise_timestamp"), 

310 mode=item_dict.get("mode", None), 

311 load=item_dict.get("load", None), 

312 tokens=item_dict.get("tokens", Field(default_factory=list)), 

313 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

314 ) 

315 ) 

316 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

317 

318 

319class SignalSample(TwinPadModel): 

320 signal_id: str 

321 timestamp: float 

322 value: float | int | str | bool | None 

323 forced_value: float | int | str | bool | None = None 

324 

325 @classmethod 

326 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

327 

328 collection = get_signal_collection(signal_id) 

329 if collection is None: 

330 return None 

331 

332 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

333 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

334 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

335 first_bucket = None 

336 if bucket is not None: 

337 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

338 if first_bucket is not None: 

339 sample_data = collection.find_one( 

340 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

341 ) 

342 else: 

343 sample_data = collection.find_one({}, sort=[("precise_timestamp", 1)]) 

344 

345 if sample_data is None: 

346 return None 

347 

348 timestamp = sample_data["precise_timestamp"] 

349 

350 return cls( 

351 signal_id=signal_id, 

352 timestamp=timestamp, 

353 value=sample_data.get("value", None), 

354 forced_value=sample_data.get("forced_value", None), 

355 ) 

356 

357 @classmethod 

358 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

359 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

360 

361 @classmethod 

362 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

363 

364 collection = get_signal_collection(signal_id) 

365 if collection is None: 

366 return None 

367 

368 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

369 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

370 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

371 last_bucket = None 

372 if bucket is not None: 

373 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

374 if last_bucket is not None: 

375 sample_data = collection.find_one({"precise_timestamp": last_bucket["control"]["max"]["precise_timestamp"]}) 

376 else: 

377 sample_data = collection.find_one({}, sort=[("precise_timestamp", -1)]) 

378 

379 if sample_data is None: 

380 return None 

381 

382 timestamp = sample_data["precise_timestamp"] 

383 

384 if device is None: 

385 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

386 if device is not None and device.last_ping is not None: 

387 if timestamp is None: 

388 timestamp = device.last_ping 

389 else: 

390 timestamp = max(timestamp, device.last_ping) 

391 return cls( 

392 signal_id=signal_id, 

393 timestamp=timestamp, 

394 value=sample_data.get("value", None), 

395 forced_value=sample_data.get("forced_value", None), 

396 ) 

397 

398 @classmethod 

399 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

400 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

401 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

402 

403 

404class SignalData(TwinPadModel): 

405 signal_id: str 

406 forcible: bool = True 

407 time_vector: list[float] 

408 values: list[float | int | str | None] 

409 forced_values: list[float | int | str | None] 

410 

411 data_start: float | None = None 

412 data_end: float | None = None 

413 

414 number_samples: int = 0 

415 number_samples_db: int = 0 

416 

417 db_query_time: float = 0.0 

418 init_time: float = 0.0 

419 data_processing_time: float = 0.0 

420 

421 @classmethod 

422 def get_from_signal_id( 

423 cls, 

424 signal_id: str, 

425 min_timestamp: float = None, 

426 max_timestamp: float = None, 

427 window_min_timestamp: float = None, 

428 window_max_timestamp: float = None, 

429 interpolate_bounds: bool = True, 

430 max_documents: int = None, 

431 ) -> Self: 

432 

433 now = time.time() 

434 

435 req_signal = {} 

436 if min_timestamp is not None: 

437 req_signal.setdefault("timestamp", {}) 

438 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

439 if max_timestamp is not None: 

440 req_signal.setdefault("timestamp", {}) 

441 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

442 

443 collection = get_signal_collection(signal_id) 

444 if collection is None: 

445 return cls( 

446 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

447 ) 

448 

449 db_req_start = time.time() 

450 

451 sort_step = {"$sort": {"precise_timestamp": 1}} 

452 number_results = collection.count_documents(req_signal) 

453 

454 pipeline = [] 

455 if req_signal: 

456 pipeline.append({"$match": req_signal}) # Filter data if needed 

457 

458 pipeline.extend( 

459 [ 

460 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

461 sort_step, 

462 ] 

463 ) 

464 

465 if max_documents is not None and max_documents < number_results: 

466 unsampling_ratio = math.ceil(number_results / max_documents) 

467 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

468 pipeline.extend( 

469 [ 

470 { 

471 "$setWindowFields": { 

472 "sortBy": {"precise_timestamp": 1}, 

473 "output": {"index": {"$documentNumber": {}}}, 

474 } 

475 }, 

476 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

477 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

478 {"$replaceRoot": {"newRoot": "$doc"}}, 

479 {"$unset": ["index", "group_id"]}, 

480 {"$sort": {"precise_timestamp": 1}}, 

481 ] 

482 ) 

483 

484 # logger.info(f"pipeline: %s", str(pipeline)) 

485 cursor = collection.aggregate(pipeline) 

486 db_req_time = time.time() - db_req_start 

487 

488 init_time = time.time() 

489 

490 results = cursor.to_list() 

491 time_vector = [] 

492 values = [] 

493 forced_values = [] 

494 for s in results: 

495 time_vector.append(s["precise_timestamp"]) 

496 values.append(s.get("value", None)) 

497 forced_values.append(s.get("forced_value", None)) 

498 

499 signal = Signal.get_from_signal_id(signal_id) 

500 class_ = signal.signal_data_class 

501 

502 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

503 time_vector, values, forced_values = cls.interpolate_bounds( 

504 class_, 

505 collection, 

506 signal_id, 

507 time_vector, 

508 values, 

509 forced_values, 

510 window_min_timestamp, 

511 window_max_timestamp, 

512 ) 

513 

514 if values: 

515 # TODO: check below. a bit strange 

516 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

517 # Adding last value as it should be repeated 

518 time_vector.append(now) 

519 values.append(values[-1]) 

520 forced_values.append(forced_values[-1]) 

521 

522 init_time = time.time() - init_time 

523 

524 # See line 292 for explanation 

525 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

526 first_bucket = None 

527 if bucket is not None: 

528 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

529 if first_bucket is not None: 

530 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

531 else: 

532 data_start = None 

533 

534 last_bucket = None 

535 if bucket is not None: 

536 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

537 if last_bucket is not None: 

538 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

539 else: 

540 data_end = None 

541 

542 return class_( 

543 signal_id=signal_id, 

544 forcible=signal.forcible, 

545 time_vector=time_vector, 

546 values=values, 

547 forced_values=forced_values, 

548 data_start=data_start, 

549 data_end=data_end, 

550 number_samples=len(values), 

551 number_samples_db=number_results, 

552 db_query_time=db_req_time, 

553 init_time=init_time, 

554 ) 

555 

556 @staticmethod 

557 def interpolate_bounds( 

558 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

559 ): 

560 sample_right = None 

561 # Fetching right side value & interpolation 

562 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

563 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

564 sample_right = collection.find_one( 

565 { 

566 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

567 "value": {"$exists": True}, 

568 }, 

569 sort=[("precise_timestamp", -1)], 

570 ) 

571 if sample_right: 

572 if time_vector: 

573 right_sd = class_( 

574 signal_id=signal_id, 

575 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

576 values=[values[-1], sample_right.get("value", None)], 

577 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

578 ) 

579 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

580 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

581 else: 

582 max_ts_value = sample_right.get("value", None) 

583 max_ts_forced_value = sample_right.get("forced_value", None) 

584 time_vector.append(window_max_timestamp) 

585 values.append(max_ts_value) 

586 forced_values.append(max_ts_forced_value) 

587 

588 # Fetching left side value & interpolation 

589 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

590 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

591 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

592 sample_left = sample_right 

593 sample_left = collection.find_one( 

594 { 

595 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

596 "value": {"$exists": True}, 

597 }, 

598 sort=[("precise_timestamp", -1)], 

599 ) 

600 

601 if sample_left: 

602 if time_vector: 

603 left_sd = class_( 

604 signal_id=signal_id, 

605 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

606 values=[sample_left["value"], values[0]], 

607 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

608 ) 

609 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

610 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

611 else: 

612 min_ts_value = sample_left.get("value", None) 

613 min_ts_forced_value = sample_left.get("forced_value", None) 

614 time_vector.insert(0, window_min_timestamp) 

615 values.insert(0, min_ts_value) 

616 forced_values.insert(0, min_ts_forced_value) 

617 

618 return time_vector, values, forced_values 

619 

620 def interpolate_values(self, new_time_vector: list[float]): 

621 return self.interpolate(new_time_vector, self.values) 

622 

623 def interpolate_forced_values(self, new_time_vector: list[float]): 

624 return self.interpolate(new_time_vector, self.forced_values) 

625 

626 def uniform_desampling(self, number_samples_max: int) -> Self: 

627 data_processing_time = time.time() 

628 if number_samples_max and self.number_samples > number_samples_max: 

629 new_time_vector = npy.linspace( 

630 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

631 ).tolist() 

632 values = self.interpolate_values(new_time_vector) 

633 forced_values = self.interpolate_forced_values(new_time_vector) 

634 time_vector = new_time_vector 

635 number_samples = len(time_vector) 

636 else: 

637 time_vector = self.time_vector 

638 number_samples = len(self.values) 

639 values = self.values[:] 

640 forced_values = self.forced_values[:] 

641 data_processing_time = time.time() - data_processing_time 

642 

643 return self.__class__( 

644 signal_id=self.signal_id, 

645 time_vector=time_vector, 

646 values=values, 

647 forced_values=forced_values, 

648 number_samples=number_samples, 

649 number_samples_db=self.number_samples, 

650 data_start=self.data_start, 

651 data_end=self.data_end, 

652 db_query_time=self.db_query_time, 

653 init_time=self.init_time, 

654 data_processing_time=self.data_processing_time + data_processing_time, 

655 ) 

656 

657 def interest_window_desampling( 

658 self, 

659 window_max_number_samples: int, 

660 outside_max_number_samples: int, 

661 window_min_timestamp: float | None = None, 

662 window_max_timestamp: float | None = None, 

663 ) -> Self: 

664 """Performs a sampling in a window of interest and outside.""" 

665 

666 if not self.time_vector: 

667 return self 

668 

669 if window_min_timestamp is None: 

670 window_min_timestamp = self.time_vector[0] 

671 if window_max_timestamp is None: 

672 window_max_timestamp = self.time_vector[-1] 

673 

674 data_processing_time = time.time() 

675 

676 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

677 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

678 

679 time_vector_before = self.time_vector[:index_window_start] 

680 time_vector_window = self.time_vector[index_window_start:index_window_end] 

681 time_vector_after = self.time_vector[index_window_end:] 

682 

683 # Resampling window 

684 if time_vector_window: 

685 # Ensurring window bounds 

686 if time_vector_window[0] != window_min_timestamp: 

687 time_vector_window.insert(0, window_min_timestamp) 

688 if time_vector_window[-1] != window_max_timestamp: 

689 time_vector_window.append(window_max_timestamp) 

690 else: 

691 time_vector_window = [window_min_timestamp, window_max_timestamp] 

692 

693 if len(time_vector_window) > window_max_number_samples: 

694 # Resampling 

695 new_window_time_vector = npy.linspace( 

696 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

697 ).tolist() 

698 time_vector_window = new_window_time_vector 

699 

700 # Resampling outside 

701 number_samples_before = len(time_vector_before) 

702 number_samples_after = len(time_vector_after) 

703 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

704 new_number_samples_before = math.ceil( 

705 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

706 ) 

707 new_number_samples_after = math.ceil( 

708 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

709 ) 

710 # Adjusting numbers as math.ceil can do +1 on sum 

711 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

712 if new_number_samples_before > new_number_samples_after: 

713 new_number_samples_before -= 1 

714 else: 

715 new_number_samples_after -= 1 

716 

717 if new_number_samples_before: 

718 new_time_vector_before = npy.linspace( 

719 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

720 ).tolist() 

721 time_vector_before = new_time_vector_before 

722 

723 if number_samples_after: 

724 new_time_vector_after = npy.linspace( 

725 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

726 ).tolist()[::-1] 

727 time_vector_after = new_time_vector_after 

728 

729 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

730 values = self.interpolate_values(new_time_vector) 

731 forced_values = self.interpolate_forced_values(new_time_vector) 

732 number_samples = len(values) 

733 

734 data_processing_time = time.time() - data_processing_time 

735 

736 return self.__class__( 

737 signal_id=self.signal_id, 

738 forcible=self.forcible, 

739 time_vector=new_time_vector, 

740 values=values, 

741 forced_values=forced_values, 

742 number_samples=number_samples, 

743 number_samples_db=self.number_samples, 

744 data_start=self.data_start, 

745 data_end=self.data_end, 

746 db_query_time=self.db_query_time, 

747 init_time=self.init_time, 

748 data_processing_time=self.data_processing_time + data_processing_time, 

749 ) 

750 

751 def csv_export(self): 

752 output = io.StringIO() 

753 writer = csv.writer(output) 

754 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

755 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

756 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

757 return output.getvalue().encode("utf-8") 

758 

759 def prestoplot_export(self): 

760 clean_signal_id = self.signal_id.replace(".", "_") 

761 if clean_signal_id[0].isnumeric(): 

762 clean_signal_id = "_" + clean_signal_id 

763 

764 output = io.StringIO() 

765 output.write("# Encoding:\tUTF-8\n") 

766 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

767 output.write("ISO8601\tnone\tnone\n") 

768 output.write(f"# Description :\t{clean_signal_id}\n") 

769 

770 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

771 output.write( 

772 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

773 ) 

774 return output.getvalue().encode("utf-8") 

775 

776 

777class NumericSignalData(SignalData): 

778 data_type: str = "float" 

779 values: list[float | int | None] 

780 forced_values: list[float | int | None] 

781 

782 def interpolate(self, new_time_vector: list[float], items): 

783 items = [npy.nan if s is None else s for s in items] 

784 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

785 

786 def uniform_desampling(self, number_samples_max: int) -> Self: 

787 data_processing_time = time.time() 

788 if number_samples_max and self.number_samples > number_samples_max: 

789 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

790 forced_values = self.interpolate_forced_values(time_vector) 

791 number_samples = len(time_vector) 

792 else: 

793 time_vector = self.time_vector 

794 number_samples = len(self.values) 

795 values = self.values[:] 

796 forced_values = self.forced_values[:] 

797 data_processing_time = time.time() - data_processing_time 

798 

799 return self.__class__( 

800 signal_id=self.signal_id, 

801 time_vector=time_vector, 

802 values=values, 

803 forced_values=forced_values, 

804 number_samples=number_samples, 

805 number_samples_db=self.number_samples, 

806 data_start=self.data_start, 

807 data_end=self.data_end, 

808 db_query_time=self.db_query_time, 

809 init_time=self.init_time, 

810 data_processing_time=self.data_processing_time + data_processing_time, 

811 ) 

812 

813 def interest_window_desampling( 

814 self, 

815 window_max_number_samples: int, 

816 outside_max_number_samples: int, 

817 window_min_timestamp: float | None = None, 

818 window_max_timestamp: float | None = None, 

819 ) -> Self: 

820 """Performs a sampling in a window of interest and outside.""" 

821 

822 if not self.time_vector: 

823 return self 

824 

825 if window_min_timestamp is None: 

826 window_min_timestamp = self.time_vector[0] 

827 if window_max_timestamp is None: 

828 window_max_timestamp = self.time_vector[-1] 

829 

830 data_processing_time = time.time() 

831 

832 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

833 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

834 

835 time_vector_before = self.time_vector[:index_window_start] 

836 time_vector_window = self.time_vector[index_window_start:index_window_end] 

837 time_vector_after = self.time_vector[index_window_end:] 

838 

839 values_before = self.values[:index_window_start] 

840 values_window = self.values[index_window_start:index_window_end] 

841 values_after = self.values[index_window_end:] 

842 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

843 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

844 

845 # Resampling window 

846 if time_vector_window: 

847 # Ensurring window bounds 

848 if time_vector_window[0] != window_min_timestamp: 

849 time_vector_window.insert(0, window_min_timestamp) 

850 values_window.insert(0, window_min_value) 

851 if time_vector_window[-1] != window_max_timestamp: 

852 time_vector_window.append(window_max_timestamp) 

853 values_window.append(window_max_value) 

854 else: 

855 time_vector_window = [window_min_timestamp, window_max_timestamp] 

856 values_window = [window_min_value, window_max_value] 

857 

858 if len(time_vector_window) > window_max_number_samples: 

859 # Resampling 

860 time_vector_window, values_window = downsample_list( 

861 time_vector_window, values_window, window_max_number_samples 

862 ) 

863 

864 # Resampling outside 

865 number_samples_before = len(time_vector_before) 

866 number_samples_after = len(time_vector_after) 

867 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

868 new_number_samples_before = math.ceil( 

869 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

870 ) 

871 new_number_samples_after = math.ceil( 

872 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

873 ) 

874 # Adjusting numbers as math.ceil can do +1 on sum 

875 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

876 if new_number_samples_before > new_number_samples_after: 

877 new_number_samples_before -= 1 

878 else: 

879 new_number_samples_after -= 1 

880 

881 if new_number_samples_before: 

882 time_vector_before, values_before = downsample_list( 

883 time_vector_before, values_before, new_number_samples_before 

884 ) 

885 

886 if number_samples_after: 

887 time_vector_after, values_after = downsample_list( 

888 time_vector_after, values_after, new_number_samples_after 

889 ) 

890 

891 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

892 values = values_before + values_window + values_after 

893 forced_values = self.interpolate_forced_values(new_time_vector) 

894 number_samples = len(values) 

895 

896 data_processing_time = time.time() - data_processing_time 

897 

898 return self.__class__( 

899 signal_id=self.signal_id, 

900 time_vector=new_time_vector, 

901 values=values, 

902 forced_values=forced_values, 

903 number_samples=number_samples, 

904 number_samples_db=self.number_samples, 

905 data_start=self.data_start, 

906 data_end=self.data_end, 

907 db_query_time=self.db_query_time, 

908 init_time=self.init_time, 

909 data_processing_time=self.data_processing_time + data_processing_time, 

910 ) 

911 

912 

913class StringSignalData(SignalData): 

914 data_type: str = "str" 

915 values: list[str | None] 

916 forced_values: list[str | None] 

917 

918 def interpolate(self, new_time_vector: list[float], items): 

919 # Find the indices of the values in xp that are just smaller or equal to x 

920 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

921 indices = npy.clip(indices, 0, len(items) - 1) 

922 # Return the corresponding left string values from fp 

923 return [items[i] for i in indices] 

924 

925 

926class SignalsData(TwinPadModel): 

927 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

928 data_processing_time: float 

929 data_start: float | None 

930 data_end: float | None 

931 

932 @classmethod 

933 def get_from_signal_ids( 

934 cls, 

935 signal_ids: list[str], 

936 min_timestamp: float = None, 

937 max_timestamp: float = None, 

938 window_min_timestamp: float = None, 

939 window_max_timestamp: float = None, 

940 interpolate_bounds: bool = True, 

941 max_documents: int = None, 

942 ) -> Self: 

943 signals_data = [] 

944 data_start = None 

945 data_end = None 

946 if max_timestamp is None: 

947 max_timestamp = time.time() 

948 data_processing_time = 0.0 

949 for signal_id in signal_ids: 

950 signal_data = SignalData.get_from_signal_id( 

951 signal_id=signal_id, 

952 min_timestamp=min_timestamp, 

953 max_timestamp=max_timestamp, 

954 window_min_timestamp=window_min_timestamp, 

955 window_max_timestamp=window_max_timestamp, 

956 interpolate_bounds=interpolate_bounds, 

957 max_documents=max_documents, 

958 ) 

959 data_processing_time += signal_data.data_processing_time 

960 signals_data.append(signal_data) 

961 if signal_data.data_start is not None: 

962 if data_start is None: 

963 data_start = signal_data.data_start 

964 else: 

965 data_start = min(signal_data.data_start, data_start) 

966 if signal_data.data_end is not None: 

967 if data_end is None: 

968 data_end = signal_data.data_end 

969 else: 

970 data_end = max(signal_data.data_end, data_end) 

971 

972 return cls( 

973 signals_data=signals_data, 

974 data_processing_time=data_processing_time, 

975 data_start=data_start, 

976 data_end=data_end, 

977 ) 

978 

979 def uniform_desampling(self, number_samples_max: int) -> Self: 

980 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

981 return SignalsData( 

982 signals_data=signals_data, 

983 data_processing_time=sum(s.data_processing_time for s in signals_data), 

984 data_start=self.data_start, 

985 data_end=self.data_end, 

986 ) 

987 

988 def interest_window_desampling( 

989 self, 

990 window_max_number_samples: int, 

991 outside_max_number_samples: int, 

992 window_min_timestamp: float = None, 

993 window_max_timestamp: float = None, 

994 ) -> Self: 

995 signals_data = [ 

996 s.interest_window_desampling( 

997 window_max_number_samples=window_max_number_samples, 

998 outside_max_number_samples=outside_max_number_samples, 

999 window_min_timestamp=window_min_timestamp, 

1000 window_max_timestamp=window_max_timestamp, 

1001 ) 

1002 for s in self.signals_data 

1003 ] 

1004 

1005 return SignalsData( 

1006 signals_data=signals_data, 

1007 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1008 data_start=self.data_start, 

1009 data_end=self.data_end, 

1010 ) 

1011 

1012 def zip_export(self, file_format: str = "csv"): 

1013 # return self.signals_data[0].csv_export() 

1014 zip_buffer = io.BytesIO() 

1015 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1016 for signal_data in self.signals_data: 

1017 if file_format == "csv": 

1018 export_io = signal_data.csv_export() 

1019 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1020 elif file_format == "prestoplot": 

1021 export_io = signal_data.prestoplot_export() 

1022 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1023 else: 

1024 raise ValueError(f"Format not found. Got: {file_format}") 

1025 zip_bytes = zip_buffer.getvalue() 

1026 # zip_bytes.seek(0) 

1027 return zip_bytes 

1028 

1029 

1030class SignalStatus(TwinPadModel): 

1031 status: str 

1032 reason: str 

1033 delay: float | None 

1034 

1035 

1036class DigitizationFunction(TwinPadModel): 

1037 bits: int | None = None 

1038 min_value: float 

1039 max_value: float 

1040 min_raw_value: float 

1041 max_raw_value: float 

1042 

1043 

1044class SignalUpdate(TwinPadModel): 

1045 value: float | str | bool | int | None = None 

1046 forced_value: float | str | bool | int | None = None 

1047 timestamp: int | None = None 

1048 

1049 

1050class SignalType(str, Enum): 

1051 command = "command" 

1052 sensor = "sensor" 

1053 external_sensor = "external_sensor" 

1054 

1055 

1056SIGNALDATA_TYPES = { 

1057 "int": NumericSignalData, 

1058 "float": NumericSignalData, 

1059 "str": StringSignalData, 

1060 "bool": NumericSignalData, 

1061 "epoch": NumericSignalData, 

1062} 

1063 

1064 

1065class Signal(GenericMongo): 

1066 collection_name: ClassVar[str] = "signals" 

1067 

1068 signal_id: str 

1069 frequency: float 

1070 unit: str | None 

1071 description: str 

1072 type: SignalType 

1073 data_type: str 

1074 precision_digits: int | None 

1075 forcible: bool 

1076 

1077 digitization_function: DigitizationFunction | None 

1078 

1079 @property 

1080 def device(self) -> Device: 

1081 device_id = self.signal_id.split(".")[0] 

1082 device = Device.get_one_by_attribute("device_id", device_id) 

1083 return device 

1084 

1085 @cached_property 

1086 def signal_data_class(self): 

1087 if self.data_type in SIGNALDATA_TYPES: 

1088 return SIGNALDATA_TYPES[self.data_type] 

1089 if self.data_type.startswith("enum"): 

1090 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1091 raise ValueError(f"Unhandled python type: {self.data_type}") 

1092 

1093 @cached_property 

1094 def python_type(self): 

1095 if self.data_type in TYPES: 

1096 return TYPES[self.data_type] 

1097 if self.data_type.startswith("enum"): 

1098 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1099 return Literal[*choices] 

1100 raise ValueError(f"Unhandled python type: {self.data_type}") 

1101 

1102 @computed_field 

1103 @property 

1104 def status(self) -> SignalStatus: 

1105 now = time.time() 

1106 status = "up" 

1107 reason = "" 

1108 

1109 # See line 292 for explanation 

1110 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1111 last_bucket = None 

1112 if bucket is not None: 

1113 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1114 if last_bucket is None: 

1115 status = "no data" 

1116 reason = "signal does not exist" 

1117 return SignalStatus(status=status, reason=reason, delay=None) 

1118 

1119 try: 

1120 last_date = last_bucket["control"]["max"]["timestamp"] 

1121 last_date = last_date.replace(tzinfo=pytz.UTC) 

1122 last_value_ts = last_date.timestamp() 

1123 except IndexError: 

1124 last_value_ts = None 

1125 

1126 if last_value_ts is None: 

1127 delay = None 

1128 reason = "No data from signal" 

1129 else: 

1130 # Since device is a computed property, only compute it once 

1131 device = self.device 

1132 if device is not None and device.last_ping is not None: 

1133 last_value_ts = max(last_value_ts, device.last_ping) 

1134 delay = now - last_value_ts 

1135 if delay > DEVICE_TIMEOUT: 

1136 status = "down" 

1137 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1138 return SignalStatus(status=status, reason=reason, delay=delay) 

1139 

1140 def update(self, update_dict: SignalUpdate): 

1141 send_signal_value(self.signal_id, update_dict) 

1142 

1143 @classmethod 

1144 def get_from_signal_id(cls, signal_id) -> Self: 

1145 """Could be generic from mongo""" 

1146 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1147 if not raw_value: 

1148 return None 

1149 del raw_value["_id"] 

1150 return cls.dict_to_object(raw_value) 

1151 

1152 async def number_samples(self): 

1153 collection = get_signal_collection(signal_id=self.signal_id) 

1154 if collection is None: 

1155 return 0 

1156 

1157 number_samples = collection.estimated_document_count() 

1158 

1159 number_samples_async_collection = await get_async_collection( 

1160 systems_async_database, "number_samples", create=True, time_series=True 

1161 ) 

1162 

1163 loop = asyncio.get_running_loop() 

1164 loop.create_task( 

1165 number_samples_async_collection.insert_one( 

1166 { 

1167 "timestamp": datetime.datetime.now(pytz.UTC), 

1168 "signal_id": self.signal_id, 

1169 "number_samples": number_samples, 

1170 } 

1171 ) 

1172 ) 

1173 

1174 return number_samples 

1175 

1176 def sample_datasize(self): 

1177 return signals_database.command("collstats", self.signal_id)["size"] 

1178 

1179 @classmethod 

1180 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1181 result = cls.collection().aggregate( 

1182 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1183 ) 

1184 

1185 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1186 

1187 

1188class ServicesStatus(TwinPadModel): 

1189 backend: str 

1190 cloud_broker: str 

1191 time_series_database: str 

1192 signal_storage: str 

1193 heartbeat_storage: str 

1194 data_analyzer: str 

1195 

1196 @classmethod 

1197 def check(cls) -> Self: 

1198 return cls( 

1199 cloud_broker=ping(RABBITMQ_HOST), 

1200 backend="up", 

1201 time_series_database=ping(MONGO_HOST), 

1202 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1203 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1204 data_analyzer=ping(DATA_ANALYZER_HOST), 

1205 ) 

1206 

1207 

1208def ping(host): 

1209 try: 

1210 if ping3.ping(host, timeout=0.8): 

1211 return "up" 

1212 except PermissionError: 

1213 pass 

1214 return "down" 

1215 

1216 

1217class Event(GenericMongo): 

1218 collection_name: ClassVar[str] = "events" 

1219 

1220 name: str 

1221 timestamp: float 

1222 event_rule_id: str 

1223 

1224 @computed_field 

1225 @cached_property 

1226 def event_rule(self) -> "EventRule": 

1227 return EventRule.get_from_id(self.event_rule_id) 

1228 

1229 @classmethod 

1230 def dict_to_object(cls, dict_): 

1231 """Refine to convert timestamp to datetime for mongodb.""" 

1232 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1233 return super().dict_to_object(dict_) 

1234 

1235 

1236class EventDay(GenericMongo): 

1237 collection_name: ClassVar[str] = "number_events" 

1238 

1239 timestamp: float 

1240 number_events: int 

1241 

1242 @classmethod 

1243 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_events) -> list[Self]: 

1244 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1245 number_events_collection = get_collection(systems_database, "number_events") 

1246 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1247 items = [] 

1248 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1249 if number_events_collection is None or recompute_events: 

1250 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1251 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1252 if first_event is None: 

1253 return items 

1254 # compute from day of first found event 

1255 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1256 tzinfo=pytz.UTC 

1257 ) 

1258 while last_computed_day < TODAY: 

1259 day_nb_events = events_collection.count_documents( 

1260 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1261 ) 

1262 if day_nb_events > 0: 

1263 number_events_collection.insert_one( 

1264 {"timestamp": last_computed_day, "number_events": day_nb_events} 

1265 ) 

1266 last_computed_day += ONE_DAY_OFFSET 

1267 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1268 if number_events_today > 0: 

1269 number_events_collection.delete_one({"timestamp": TODAY}) 

1270 number_events_collection.insert_one({"timestamp": TODAY, "number_events": number_events_today}) 

1271 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1272 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1273 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1274 for day in number_events: 

1275 day["timestamp"] = day["timestamp"].timestamp() 

1276 items.append(cls.mongo_dict_to_object(day)) 

1277 return items 

1278 

1279 

1280class EventRule(GenericMongo): 

1281 collection_name: ClassVar[str] = "event_rules" 

1282 

1283 name: str 

1284 formula: str 

1285 variables: list[str] 

1286 

1287 @computed_field 

1288 @cached_property 

1289 def number_events(self) -> int: 

1290 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1291 

1292 

1293class Company(GenericMongo): 

1294 collection_name: ClassVar[str] = "companies" 

1295 name: str 

1296 

1297 

1298class User(GenericMongo): 

1299 collection_name: ClassVar[str] = "users" 

1300 

1301 firstname: str 

1302 lastname: str 

1303 email: str 

1304 password: str 

1305 is_active: bool | None = False 

1306 is_admin: bool | None = False 

1307 is_connected: bool | None = False 

1308 company_id: str | None = None 

1309 

1310 def to_dict(self, exclude=None): 

1311 if exclude is None: 

1312 exclude = {"password"} 

1313 return GenericMongo.to_dict(self, exclude=exclude) 

1314 

1315 @classmethod 

1316 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

1317 users = cls.get_all() 

1318 if not users: 

1319 is_admin = True 

1320 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

1321 user_collection = get_collection(systems_database, "users", create=True) 

1322 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

1323 if new_user is None: 

1324 return None 

1325 return {"user_id": str(new_user.inserted_id)} 

1326 

1327 @classmethod 

1328 def update(cls, user: "UserUpdate", user_id: str): 

1329 updated_user = cls.collection().find_one_and_update( 

1330 {"_id": ObjectId(user_id)}, 

1331 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

1332 return_document=ReturnDocument.AFTER, 

1333 ) 

1334 updated_user["id"] = str(updated_user["_id"]) 

1335 del (updated_user["_id"], updated_user["is_connected"]) 

1336 return cls(**updated_user) 

1337 

1338 

1339UserUpdate = create_update_model(User) 

1340 

1341 

1342class Campaign(GenericMongo): 

1343 collection_name: ClassVar[str] = "campaigns" 

1344 

1345 # Properties 

1346 id: str | None = None 

1347 name: str 

1348 description: str | None = None 

1349 

1350 @classmethod 

1351 def create(cls, campaign: Self): 

1352 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1353 if new_campaign is None: 

1354 return None 

1355 return {"campaign_id": str(new_campaign.inserted_id)} 

1356 

1357 @classmethod 

1358 def update(cls, campaign: Self): 

1359 updated_campaign = cls.collection().find_one_and_update( 

1360 {"_id": ObjectId(campaign.id)}, 

1361 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1362 return_document=ReturnDocument.AFTER, 

1363 ) 

1364 return updated_campaign 

1365 

1366 @classmethod 

1367 def delete(cls, campaign_id): 

1368 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1369 return deleted_user 

1370 

1371 

1372class Phase(GenericMongo): 

1373 collection_name: ClassVar[str] = "phases" 

1374 

1375 # Properties 

1376 id: str | None = None 

1377 name: str 

1378 description: str | None = None 

1379 start_at: float 

1380 end_at: float 

1381 

1382 # FK 

1383 campaign_id: str 

1384 

1385 # @classmethod 

1386 # def get_by_date(cls, datetime: float): 

1387 # phases = [] 

1388 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1389 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1390 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1391 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1392 # if phases is None: 

1393 # return None 

1394 # return phases 

1395 

1396 @classmethod 

1397 def create(cls, phase: Self): 

1398 phase = Phase( 

1399 name=phase.name, 

1400 description=phase.description, 

1401 start_at=phase.start_at, 

1402 end_at=phase.end_at, 

1403 campaign_id=phase.campaign_id, 

1404 ) 

1405 phase_collection = get_collection(systems_database, "phases", create=True) 

1406 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1407 if new_phase is None: 

1408 return None 

1409 return {"phase_id": str(new_phase.inserted_id)} 

1410 

1411 @classmethod 

1412 def update(cls, phase: Self): 

1413 updated_phase = cls.collection().find_one_and_update( 

1414 {"_id": ObjectId(phase.id)}, 

1415 { 

1416 "$set": { 

1417 "name": phase.name, 

1418 "description": phase.description, 

1419 "start_at": phase.start_at, 

1420 "end_at": phase.end_at, 

1421 } 

1422 }, 

1423 return_document=ReturnDocument.AFTER, 

1424 ) 

1425 return updated_phase 

1426 

1427 @classmethod 

1428 def delete(cls, phase_id): 

1429 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1430 return delete_phase 

1431 

1432 @classmethod 

1433 def deleteMany(cls, campaign_id): 

1434 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1435 return delete_phases 

1436 

1437 

1438class CustomViewCreation(GenericMongo): 

1439 collection_name: ClassVar[str] = "custom_views" 

1440 

1441 name: str 

1442 configuration: list 

1443 

1444 

1445class CustomView(CustomViewCreation): 

1446 # Properties 

1447 id: str | None = None 

1448 

1449 # Foreign Key 

1450 user_id: str 

1451 

1452 # # Methods 

1453 # @classmethod 

1454 # def create(cls, form_custom_view: Self, user_id) -> list: 

1455 # custom_view = CustomView( 

1456 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1457 # ) 

1458 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1459 # return new_custom_view 

1460 

1461 # @classmethod 

1462 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1463 # updated_custom_view = cls.collection().find_one_and_update( 

1464 # {"_id": ObjectId(custom_view_id)}, 

1465 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1466 # return_document=ReturnDocument.AFTER, 

1467 # ) 

1468 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1469 # del updated_custom_view["_id"] 

1470 # return cls(**updated_custom_view) 

1471 

1472 # @classmethod 

1473 # def delete(cls, custom_view_id) -> bool: 

1474 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1475 # return deleted_custom_view.acknowledged 

1476 

1477 

1478CustomViewUpdate = create_update_model(CustomView) 

1479 

1480 

1481class Video(GenericMongo): 

1482 collection_name: ClassVar[str] = "videos" 

1483 

1484 # Properties 

1485 name: str 

1486 ip_addr: str 

1487 username: str | None = None 

1488 password: str | None = None 

1489 

1490 # Methods 

1491 @classmethod 

1492 def get_all(cls, sort_by="_id") -> list[Self]: 

1493 items = [] 

1494 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1495 items.append(cls.mongo_dict_to_object(dict_)) 

1496 return items 

1497 

1498 @classmethod 

1499 def get_video(cls, camera_id: ObjectId): 

1500 camera = cls.get_from_id(camera_id) 

1501 return camera.name