Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 91%

862 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-10 15:38 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, Union 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23 

24# from scipy import signal as signal_scipy 

25 

26from twinpad_backend.db import ( 

27 get_collection, 

28 get_async_collection, 

29 get_signal_collection, 

30 systems_database, 

31 systems_async_database, 

32 signals_database, 

33 devices_states_database, 

34) 

35from twinpad_backend.responses import ListResponse 

36from twinpad_backend.messages import send_mode_change, send_signal_value 

37 

38TYPES = ({"int": int, "float": float, "str": str, "bool": bool, "epoch": float},) 

39 

40 

41RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

42MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

43SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

44HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

45DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

46 

47DEVICE_TIMEOUT = 5.0 

48NUMBER_SAMPLES_DATABASE_UPDATE = 120 

49 

50logger = logging.getLogger("uvicorn.error") 

51 

52 

53def create_update_model(model): 

54 fields = {} 

55 

56 for field_name, field_annotation in model.model_fields.items(): 

57 if field_name != "id": 

58 fields[field_name] = (Union[field_annotation.annotation, None], None) 

59 

60 query_name = model.__name__ + "Update" 

61 return create_model(query_name, **fields) 

62 

63 

64def get_utc_date_from_timestamp(timestamp: float): 

65 return ( 

66 datetime.datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %-H:%M:%S.%f") + " UTC" 

67 ) 

68 

69 

70def downsample_list(time_vector: list, values: list, max_number_samples: int): 

71 if len(time_vector) < max_number_samples: 

72 return time_vector, values 

73 

74 time_vector_copy = copy.deepcopy(time_vector) 

75 values_copy = copy.deepcopy(values) 

76 try: 

77 none_group_bounds = [] 

78 none_group_index = -1 

79 index = -1 

80 # LTTB doesn't handle None values so remove them 

81 while values_copy.count(None) > 0: 

82 # Store bounds of None value groups so we can insert them back after the downsampling 

83 if (new_index := values_copy.index(None)) != index: 

84 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

85 none_group_index += 1 

86 elif len(none_group_bounds[none_group_index]) < 2: 

87 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

88 else: 

89 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

90 values_copy.pop(new_index) 

91 index = new_index 

92 

93 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

94 

95 values_array = npy.array([time_vector_copy, values_copy]).T 

96 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

97 

98 new_time_vector = interpolated_values[:, 0].tolist() 

99 new_values = interpolated_values[:, 1].tolist() 

100 except ValueError: 

101 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

102 new_time_vector = npy.linspace( 

103 time_vector_copy[0], time_vector_copy[-1], max_number_samples, endpoint=True 

104 ).tolist() 

105 new_values = list(npy.interp(new_time_vector, time_vector_copy, values_copy)) 

106 return new_time_vector, new_values 

107 

108 # insert back None values at the correct timestamps 

109 for none_group in none_group_bounds: 

110 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

111 new_time_vector[start_index:start_index] = none_group 

112 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

113 

114 return new_time_vector, new_values 

115 

116 

117# Models 

118class TwinPadModel(BaseModel): 

119 @classmethod 

120 def dict_to_object(cls, dict_): 

121 return cls.model_validate(dict_) 

122 

123 def to_dict(self, exclude=None): 

124 dict_ = self.model_dump(exclude=exclude) 

125 return dict_ 

126 

127 

128class GenericMongo(TwinPadModel): 

129 id: str | None = None 

130 

131 @classmethod 

132 def collection(cls): 

133 return get_collection(systems_database, cls.collection_name, create=True) 

134 

135 @classmethod 

136 def response_from_query(cls, query) -> ListResponse[Self]: 

137 req_filter = query.mongodb_filter() 

138 items = [] 

139 if ":" in query.sort_by: 

140 sort_field, sort_order = query.sort_by.split(":") 

141 sort_order = int(sort_order) 

142 else: 

143 sort_field = query.sort_by 

144 sort_order = 1 

145 collection = get_collection(systems_database, cls.collection_name, create=True) 

146 total = collection.count_documents(req_filter) 

147 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

148 if (query.limit is not None) and (query.limit != 0): 

149 cursor = cursor.limit(query.limit) 

150 for item_dict in cursor: 

151 items.append(cls.mongo_dict_to_object(item_dict)) 

152 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

153 

154 @classmethod 

155 def get_from_id(cls, item_id) -> Self | None: 

156 # collection = get_collection(systems_database, cls.collection_name, create=True) 

157 

158 dict_ = cls.collection().find_one({"_id": ObjectId(item_id)}) 

159 if dict_ is None: 

160 return None 

161 return cls.mongo_dict_to_object(dict_) 

162 

163 @classmethod 

164 def mongo_dict_to_object(cls, mongo_dict): 

165 mongo_dict["id"] = str(mongo_dict["_id"]) 

166 del mongo_dict["_id"] 

167 return cls.dict_to_object(mongo_dict) 

168 

169 @classmethod 

170 def get_by_attribute(cls, attribute_name: str, attribute_value): 

171 """Returns all items that match the attribute with value.""" 

172 items = cls.collection().find({attribute_name: attribute_value}) 

173 if items is None: 

174 return None 

175 return [cls.mongo_dict_to_object(d) for d in items] 

176 

177 @classmethod 

178 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

179 item = cls.collection().find_one({attribute_name: attribute_value}) 

180 if item is None: 

181 return None 

182 return cls.mongo_dict_to_object(item) 

183 

184 @classmethod 

185 def get_all(cls, sort_by="_id") -> list[Self]: 

186 items = [] 

187 for dict_ in cls.collection().find({}).sort(sort_by, ASCENDING): 

188 items.append(cls.mongo_dict_to_object(dict_)) 

189 return items 

190 

191 @classmethod 

192 def get_number_documents(cls): 

193 collection = get_collection(systems_database, cls.collection_name) 

194 if collection is None: 

195 return 0 

196 return collection.count_documents({}) 

197 

198 def insert(self): 

199 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

200 self.id = str(insert_result.inserted_id) 

201 return self.id 

202 

203 def update(self, update_dict): 

204 for key, value in update_dict.items(): 

205 setattr(self, key, value) 

206 self.collection().find_one_and_update( 

207 {"_id": ObjectId(self.id)}, 

208 {"$set": update_dict}, 

209 return_document=ReturnDocument.AFTER, 

210 ) 

211 

212 return self 

213 

214 def delete(self): 

215 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

216 return result.deleted_count > 0 

217 

218 

219class Mode(TwinPadModel): 

220 mode_id: int 

221 name: str 

222 frequency_multiplier: float 

223 min_frequency: float 

224 

225 

226class DeviceUpdate(TwinPadModel): 

227 mode_id: int 

228 

229 

230class Device(GenericMongo): 

231 collection_name: ClassVar[str] = "devices" 

232 

233 device_id: str 

234 name: str 

235 description: str = "" 

236 modes: list[Mode] 

237 current_mode_id: int | None = None 

238 last_ping: float | None = None 

239 petri_network: Any 

240 pid: Any 

241 load: float | None = None 

242 tokens: list[int] = Field(default_factory=list) 

243 

244 def update(self, update_dict): 

245 send_mode_change(self.device_id, update_dict.mode_id) 

246 

247 @computed_field 

248 @property 

249 def status(self) -> str: 

250 now = time.time() 

251 if self.last_ping is None: 

252 return "down" 

253 if (now - self.last_ping) > DEVICE_TIMEOUT: 

254 return "down" 

255 return "up" 

256 

257 @classmethod 

258 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

259 devices_by_id = {} 

260 for signal_id in signal_ids: 

261 device_id = signal_id.split(".")[0] 

262 if device_id not in devices_by_id: 

263 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

264 return devices_by_id 

265 

266 

267class DeviceSetup(GenericMongo): 

268 collection_name: ClassVar[str] = "device_setups" 

269 

270 device_ids: list[str] 

271 active: bool = False 

272 variable_mapping: dict[str, str] 

273 

274 

275DeviceSetupUpdate = create_update_model(DeviceSetup) 

276 

277 

278class DeviceState(GenericMongo): 

279 collection_name: ClassVar[str] = "devices_states" 

280 

281 timestamp: float 

282 mode: str | None = None 

283 load: float | None = None 

284 tokens: list[int] = Field(default_factory=list) 

285 modified_properties: list[str] = Field(default_factory=list) 

286 

287 @classmethod 

288 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

289 req_filter = query.mongodb_filter() 

290 items = [] 

291 if ":" in query.sort_by: 

292 sort_field, sort_order = query.sort_by.split(":") 

293 sort_order = int(sort_order) 

294 else: 

295 sort_field = query.sort_by 

296 sort_order = 1 

297 collection = get_collection(devices_states_database, device_id) 

298 if collection is None: 

299 total = 0 

300 cursor = [] 

301 else: 

302 total = collection.count_documents(req_filter) 

303 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

304 if (query.limit is not None) and (query.limit != 0): 

305 cursor = cursor.limit(query.limit) 

306 for item_dict in cursor: 

307 items.append( 

308 cls( 

309 timestamp=item_dict.get("precise_timestamp"), 

310 mode=item_dict.get("mode", None), 

311 load=item_dict.get("load", None), 

312 tokens=item_dict.get("tokens", Field(default_factory=list)), 

313 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

314 ) 

315 ) 

316 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

317 

318 

319class SignalSample(TwinPadModel): 

320 signal_id: str 

321 timestamp: float 

322 value: float | int | str | bool | None 

323 forced_value: float | int | str | bool | None = None 

324 

325 @classmethod 

326 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

327 

328 collection = get_signal_collection(signal_id) 

329 if collection is None: 

330 return None 

331 

332 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

333 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

334 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

335 first_bucket = None 

336 if bucket is not None: 

337 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

338 if first_bucket is not None: 

339 sample_data = collection.find_one( 

340 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

341 ) 

342 else: 

343 sample_data = collection.find_one({}, sort=[("precise_timestamp", 1)]) 

344 

345 if sample_data is None: 

346 return None 

347 

348 timestamp = sample_data["precise_timestamp"] 

349 

350 return cls( 

351 signal_id=signal_id, 

352 timestamp=timestamp, 

353 value=sample_data.get("value", None), 

354 forced_value=sample_data.get("forced_value", None), 

355 ) 

356 

357 @classmethod 

358 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

359 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

360 

361 @classmethod 

362 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

363 

364 collection = get_signal_collection(signal_id) 

365 if collection is None: 

366 return None 

367 

368 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

369 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

370 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

371 last_bucket = None 

372 if bucket is not None: 

373 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

374 if last_bucket is not None: 

375 sample_data = collection.find_one({"precise_timestamp": last_bucket["control"]["max"]["precise_timestamp"]}) 

376 else: 

377 sample_data = collection.find_one({}, sort=[("precise_timestamp", -1)]) 

378 

379 if sample_data is None: 

380 return None 

381 

382 timestamp = sample_data["precise_timestamp"] 

383 

384 if device is None: 

385 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

386 if device is not None and device.last_ping is not None: 

387 if timestamp is None: 

388 timestamp = device.last_ping 

389 else: 

390 timestamp = max(timestamp, device.last_ping) 

391 return cls( 

392 signal_id=signal_id, 

393 timestamp=timestamp, 

394 value=sample_data.get("value", None), 

395 forced_value=sample_data.get("forced_value", None), 

396 ) 

397 

398 @classmethod 

399 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

400 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

401 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

402 

403 

404class SignalData(TwinPadModel): 

405 signal_id: str 

406 forcible: bool = True 

407 time_vector: list[float] 

408 values: list[float | int | str | None] 

409 forced_values: list[float | int | str | None] 

410 

411 data_start: float | None = None 

412 data_end: float | None = None 

413 

414 number_samples: int = 0 

415 number_samples_db: int = 0 

416 

417 db_query_time: float = 0.0 

418 init_time: float = 0.0 

419 data_processing_time: float = 0.0 

420 

421 @classmethod 

422 def get_from_signal_id( 

423 cls, 

424 signal_id: str, 

425 min_timestamp: float = None, 

426 max_timestamp: float = None, 

427 window_min_timestamp: float = None, 

428 window_max_timestamp: float = None, 

429 interpolate_bounds: bool = True, 

430 max_documents: int = None, 

431 ) -> Self: 

432 

433 now = time.time() 

434 

435 req_signal = {} 

436 if min_timestamp is not None: 

437 req_signal.setdefault("timestamp", {}) 

438 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

439 if max_timestamp is not None: 

440 req_signal.setdefault("timestamp", {}) 

441 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

442 

443 collection = get_signal_collection(signal_id) 

444 if collection is None: 

445 return cls( 

446 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

447 ) 

448 

449 db_req_start = time.time() 

450 

451 sort_step = {"$sort": {"precise_timestamp": 1}} 

452 number_results = collection.count_documents(req_signal) 

453 

454 pipeline = [] 

455 if req_signal: 

456 pipeline.append({"$match": req_signal}) # Filter data if needed 

457 

458 pipeline.extend( 

459 [ 

460 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

461 sort_step, 

462 ] 

463 ) 

464 

465 if max_documents is not None and max_documents < number_results: 

466 unsampling_ratio = math.ceil(number_results / max_documents) 

467 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

468 pipeline.extend( 

469 [ 

470 { 

471 "$setWindowFields": { 

472 "sortBy": {"precise_timestamp": 1}, 

473 "output": {"index": {"$documentNumber": {}}}, 

474 } 

475 }, 

476 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

477 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

478 {"$replaceRoot": {"newRoot": "$doc"}}, 

479 {"$unset": ["index", "group_id"]}, 

480 {"$sort": {"precise_timestamp": 1}}, 

481 ] 

482 ) 

483 

484 # logger.info(f"pipeline: %s", str(pipeline)) 

485 cursor = collection.aggregate(pipeline) 

486 db_req_time = time.time() - db_req_start 

487 

488 init_time = time.time() 

489 

490 results = cursor.to_list() 

491 time_vector = [] 

492 values = [] 

493 forced_values = [] 

494 for s in results: 

495 time_vector.append(s["precise_timestamp"]) 

496 values.append(s.get("value", None)) 

497 forced_values.append(s.get("forced_value", None)) 

498 

499 signal = Signal.get_from_signal_id(signal_id) 

500 class_ = signal.signal_data_class 

501 

502 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

503 time_vector, values, forced_values = cls.interpolate_bounds( 

504 class_, 

505 collection, 

506 signal_id, 

507 time_vector, 

508 values, 

509 forced_values, 

510 window_min_timestamp, 

511 window_max_timestamp, 

512 ) 

513 

514 if values: 

515 # TODO: check below. a bit strange 

516 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

517 # Adding last value as it should be repeated 

518 time_vector.append(now) 

519 values.append(values[-1]) 

520 forced_values.append(forced_values[-1]) 

521 

522 init_time = time.time() - init_time 

523 

524 # See line 292 for explanation 

525 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

526 first_bucket = None 

527 if bucket is not None: 

528 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

529 if first_bucket is not None: 

530 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

531 else: 

532 data_start = None 

533 

534 last_bucket = None 

535 if bucket is not None: 

536 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

537 if last_bucket is not None: 

538 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

539 else: 

540 data_end = None 

541 

542 return class_( 

543 signal_id=signal_id, 

544 forcible=signal.forcible, 

545 time_vector=time_vector, 

546 values=values, 

547 forced_values=forced_values, 

548 data_start=data_start, 

549 data_end=data_end, 

550 number_samples=len(values), 

551 number_samples_db=number_results, 

552 db_query_time=db_req_time, 

553 init_time=init_time, 

554 ) 

555 

556 @staticmethod 

557 def interpolate_bounds( 

558 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

559 ): 

560 sample_right = None 

561 # Fetching right side value & interpolation 

562 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

563 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

564 sample_right = collection.find_one( 

565 { 

566 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

567 "value": {"$exists": True}, 

568 }, 

569 sort=[("precise_timestamp", -1)], 

570 ) 

571 if sample_right: 

572 if time_vector: 

573 right_sd = class_( 

574 signal_id=signal_id, 

575 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

576 values=[values[-1], sample_right.get("value", None)], 

577 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

578 ) 

579 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

580 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

581 else: 

582 max_ts_value = sample_right.get("value", None) 

583 max_ts_forced_value = sample_right.get("forced_value", None) 

584 time_vector.append(window_max_timestamp) 

585 values.append(max_ts_value) 

586 forced_values.append(max_ts_forced_value) 

587 

588 # Fetching left side value & interpolation 

589 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

590 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

591 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

592 sample_left = sample_right 

593 sample_left = collection.find_one( 

594 { 

595 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

596 "value": {"$exists": True}, 

597 }, 

598 sort=[("precise_timestamp", -1)], 

599 ) 

600 

601 if sample_left: 

602 if time_vector: 

603 left_sd = class_( 

604 signal_id=signal_id, 

605 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

606 values=[sample_left["value"], values[0]], 

607 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

608 ) 

609 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

610 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

611 else: 

612 min_ts_value = sample_left.get("value", None) 

613 min_ts_forced_value = sample_left.get("forced_value", None) 

614 time_vector.insert(0, window_min_timestamp) 

615 values.insert(0, min_ts_value) 

616 forced_values.insert(0, min_ts_forced_value) 

617 

618 return time_vector, values, forced_values 

619 

620 def interpolate_values(self, new_time_vector: list[float]): 

621 return self.interpolate(new_time_vector, self.values) 

622 

623 def interpolate_forced_values(self, new_time_vector: list[float]): 

624 return self.interpolate(new_time_vector, self.forced_values) 

625 

626 def uniform_desampling(self, number_samples_max: int) -> Self: 

627 data_processing_time = time.time() 

628 if number_samples_max and self.number_samples > number_samples_max: 

629 new_time_vector = npy.linspace( 

630 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

631 ).tolist() 

632 values = self.interpolate_values(new_time_vector) 

633 forced_values = self.interpolate_forced_values(new_time_vector) 

634 time_vector = new_time_vector 

635 number_samples = len(time_vector) 

636 else: 

637 time_vector = self.time_vector 

638 number_samples = len(self.values) 

639 values = self.values[:] 

640 forced_values = self.forced_values[:] 

641 data_processing_time = time.time() - data_processing_time 

642 

643 return self.__class__( 

644 signal_id=self.signal_id, 

645 time_vector=time_vector, 

646 values=values, 

647 forced_values=forced_values, 

648 number_samples=number_samples, 

649 number_samples_db=self.number_samples, 

650 data_start=self.data_start, 

651 data_end=self.data_end, 

652 db_query_time=self.db_query_time, 

653 init_time=self.init_time, 

654 data_processing_time=self.data_processing_time + data_processing_time, 

655 ) 

656 

657 def interest_window_desampling( 

658 self, 

659 window_max_number_samples: int, 

660 outside_max_number_samples: int, 

661 window_min_timestamp: float | None = None, 

662 window_max_timestamp: float | None = None, 

663 ) -> Self: 

664 """Performs a sampling in a window of interest and outside.""" 

665 

666 if not self.time_vector: 

667 return self 

668 

669 if window_min_timestamp is None: 

670 window_min_timestamp = self.time_vector[0] 

671 if window_max_timestamp is None: 

672 window_max_timestamp = self.time_vector[-1] 

673 

674 data_processing_time = time.time() 

675 

676 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

677 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

678 

679 time_vector_before = self.time_vector[:index_window_start] 

680 time_vector_window = self.time_vector[index_window_start:index_window_end] 

681 time_vector_after = self.time_vector[index_window_end:] 

682 

683 # Resampling window 

684 if time_vector_window: 

685 # Ensurring window bounds 

686 if time_vector_window[0] != window_min_timestamp: 

687 time_vector_window.insert(0, window_min_timestamp) 

688 if time_vector_window[-1] != window_max_timestamp: 

689 time_vector_window.append(window_max_timestamp) 

690 else: 

691 time_vector_window = [window_min_timestamp, window_max_timestamp] 

692 

693 if len(time_vector_window) > window_max_number_samples: 

694 # Resampling 

695 new_window_time_vector = npy.linspace( 

696 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

697 ).tolist() 

698 time_vector_window = new_window_time_vector 

699 

700 # Resampling outside 

701 number_samples_before = len(time_vector_before) 

702 number_samples_after = len(time_vector_after) 

703 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

704 new_number_samples_before = math.ceil( 

705 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

706 ) 

707 new_number_samples_after = math.ceil( 

708 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

709 ) 

710 # Adjusting numbers as math.ceil can do +1 on sum 

711 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

712 if new_number_samples_before > new_number_samples_after: 

713 new_number_samples_before -= 1 

714 else: 

715 new_number_samples_after -= 1 

716 

717 if new_number_samples_before: 

718 new_time_vector_before = npy.linspace( 

719 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

720 ).tolist() 

721 time_vector_before = new_time_vector_before 

722 

723 if number_samples_after: 

724 new_time_vector_after = npy.linspace( 

725 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

726 ).tolist()[::-1] 

727 time_vector_after = new_time_vector_after 

728 

729 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

730 values = self.interpolate_values(new_time_vector) 

731 forced_values = self.interpolate_forced_values(new_time_vector) 

732 number_samples = len(values) 

733 

734 data_processing_time = time.time() - data_processing_time 

735 

736 return self.__class__( 

737 signal_id=self.signal_id, 

738 forcible=self.forcible, 

739 time_vector=new_time_vector, 

740 values=values, 

741 forced_values=forced_values, 

742 number_samples=number_samples, 

743 number_samples_db=self.number_samples, 

744 data_start=self.data_start, 

745 data_end=self.data_end, 

746 db_query_time=self.db_query_time, 

747 init_time=self.init_time, 

748 data_processing_time=self.data_processing_time + data_processing_time, 

749 ) 

750 

751 def csv_export(self): 

752 output = io.StringIO() 

753 writer = csv.writer(output) 

754 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

755 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

756 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

757 return output.getvalue().encode("utf-8") 

758 

759 def prestoplot_export(self): 

760 clean_signal_id = self.signal_id.replace(".", "_") 

761 if clean_signal_id[0].isnumeric(): 

762 clean_signal_id = "_" + clean_signal_id 

763 

764 output = io.StringIO() 

765 output.write("# Encoding:\tUTF-8\n") 

766 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

767 output.write("ISO8601\tnone\tnone\n") 

768 output.write(f"# Description :\t{clean_signal_id}\n") 

769 

770 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

771 output.write( 

772 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

773 ) 

774 return output.getvalue().encode("utf-8") 

775 

776 

777class NumericSignalData(SignalData): 

778 data_type: str = "float" 

779 values: list[float | int | None] 

780 forced_values: list[float | int | None] 

781 

782 def interpolate(self, new_time_vector: list[float], items): 

783 items = [npy.nan if s is None else s for s in items] 

784 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

785 

786 def uniform_desampling(self, number_samples_max: int) -> Self: 

787 data_processing_time = time.time() 

788 if number_samples_max and self.number_samples > number_samples_max: 

789 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

790 forced_values = self.interpolate_forced_values(time_vector) 

791 number_samples = len(time_vector) 

792 else: 

793 time_vector = self.time_vector 

794 number_samples = len(self.values) 

795 values = self.values[:] 

796 forced_values = self.forced_values[:] 

797 data_processing_time = time.time() - data_processing_time 

798 

799 return self.__class__( 

800 signal_id=self.signal_id, 

801 time_vector=time_vector, 

802 values=values, 

803 forced_values=forced_values, 

804 number_samples=number_samples, 

805 number_samples_db=self.number_samples, 

806 data_start=self.data_start, 

807 data_end=self.data_end, 

808 db_query_time=self.db_query_time, 

809 init_time=self.init_time, 

810 data_processing_time=self.data_processing_time + data_processing_time, 

811 ) 

812 

813 def interest_window_desampling( 

814 self, 

815 window_max_number_samples: int, 

816 outside_max_number_samples: int, 

817 window_min_timestamp: float | None = None, 

818 window_max_timestamp: float | None = None, 

819 ) -> Self: 

820 """Performs a sampling in a window of interest and outside.""" 

821 

822 if not self.time_vector: 

823 return self 

824 

825 if window_min_timestamp is None: 

826 window_min_timestamp = self.time_vector[0] 

827 if window_max_timestamp is None: 

828 window_max_timestamp = self.time_vector[-1] 

829 

830 data_processing_time = time.time() 

831 

832 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

833 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

834 

835 time_vector_before = self.time_vector[:index_window_start] 

836 time_vector_window = self.time_vector[index_window_start:index_window_end] 

837 time_vector_after = self.time_vector[index_window_end:] 

838 

839 values_before = self.values[:index_window_start] 

840 values_window = self.values[index_window_start:index_window_end] 

841 values_after = self.values[index_window_end:] 

842 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

843 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

844 

845 # Resampling window 

846 if time_vector_window: 

847 # Ensurring window bounds 

848 if time_vector_window[0] != window_min_timestamp: 

849 time_vector_window.insert(0, window_min_timestamp) 

850 values_window.insert(0, window_min_value) 

851 if time_vector_window[-1] != window_max_timestamp: 

852 time_vector_window.append(window_max_timestamp) 

853 values_window.append(window_max_value) 

854 else: 

855 time_vector_window = [window_min_timestamp, window_max_timestamp] 

856 values_window = [window_min_value, window_max_value] 

857 

858 if len(time_vector_window) > window_max_number_samples: 

859 # Resampling 

860 time_vector_window, values_window = downsample_list( 

861 time_vector_window, values_window, window_max_number_samples 

862 ) 

863 

864 # Resampling outside 

865 number_samples_before = len(time_vector_before) 

866 number_samples_after = len(time_vector_after) 

867 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

868 new_number_samples_before = math.ceil( 

869 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

870 ) 

871 new_number_samples_after = math.ceil( 

872 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

873 ) 

874 # Adjusting numbers as math.ceil can do +1 on sum 

875 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

876 if new_number_samples_before > new_number_samples_after: 

877 new_number_samples_before -= 1 

878 else: 

879 new_number_samples_after -= 1 

880 

881 if new_number_samples_before: 

882 time_vector_before, values_before = downsample_list( 

883 time_vector_before, values_before, new_number_samples_before 

884 ) 

885 

886 if number_samples_after: 

887 time_vector_after, values_after = downsample_list( 

888 time_vector_after, values_after, new_number_samples_after 

889 ) 

890 

891 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

892 values = values_before + values_window + values_after 

893 forced_values = self.interpolate_forced_values(new_time_vector) 

894 number_samples = len(values) 

895 

896 data_processing_time = time.time() - data_processing_time 

897 

898 return self.__class__( 

899 signal_id=self.signal_id, 

900 time_vector=new_time_vector, 

901 values=values, 

902 forced_values=forced_values, 

903 number_samples=number_samples, 

904 number_samples_db=self.number_samples, 

905 data_start=self.data_start, 

906 data_end=self.data_end, 

907 db_query_time=self.db_query_time, 

908 init_time=self.init_time, 

909 data_processing_time=self.data_processing_time + data_processing_time, 

910 ) 

911 

912 

913class StringSignalData(SignalData): 

914 data_type: str = "str" 

915 values: list[str | None] 

916 forced_values: list[str | None] 

917 

918 def interpolate(self, new_time_vector: list[float], items): 

919 # Find the indices of the values in xp that are just smaller or equal to x 

920 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

921 indices = npy.clip(indices, 0, len(items) - 1) 

922 # Return the corresponding left string values from fp 

923 return [items[i] for i in indices] 

924 

925 

926class SignalsData(TwinPadModel): 

927 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

928 data_processing_time: float 

929 data_start: float | None 

930 data_end: float | None 

931 

932 @classmethod 

933 def get_from_signal_ids( 

934 cls, 

935 signal_ids: list[str], 

936 min_timestamp: float = None, 

937 max_timestamp: float = None, 

938 window_min_timestamp: float = None, 

939 window_max_timestamp: float = None, 

940 interpolate_bounds: bool = True, 

941 max_documents: int = None, 

942 ) -> Self: 

943 signals_data = [] 

944 data_start = None 

945 data_end = None 

946 if max_timestamp is None: 

947 max_timestamp = time.time() 

948 data_processing_time = 0.0 

949 for signal_id in signal_ids: 

950 signal_data = SignalData.get_from_signal_id( 

951 signal_id=signal_id, 

952 min_timestamp=min_timestamp, 

953 max_timestamp=max_timestamp, 

954 window_min_timestamp=window_min_timestamp, 

955 window_max_timestamp=window_max_timestamp, 

956 interpolate_bounds=interpolate_bounds, 

957 max_documents=max_documents, 

958 ) 

959 data_processing_time += signal_data.data_processing_time 

960 signals_data.append(signal_data) 

961 if signal_data.data_start is not None: 

962 if data_start is None: 

963 data_start = signal_data.data_start 

964 else: 

965 data_start = min(signal_data.data_start, data_start) 

966 if signal_data.data_end is not None: 

967 if data_end is None: 

968 data_end = signal_data.data_end 

969 else: 

970 data_end = max(signal_data.data_end, data_end) 

971 

972 return cls( 

973 signals_data=signals_data, 

974 data_processing_time=data_processing_time, 

975 data_start=data_start, 

976 data_end=data_end, 

977 ) 

978 

979 def uniform_desampling(self, number_samples_max: int) -> Self: 

980 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

981 return SignalsData( 

982 signals_data=signals_data, 

983 data_processing_time=sum(s.data_processing_time for s in signals_data), 

984 data_start=self.data_start, 

985 data_end=self.data_end, 

986 ) 

987 

988 def interest_window_desampling( 

989 self, 

990 window_max_number_samples: int, 

991 outside_max_number_samples: int, 

992 window_min_timestamp: float = None, 

993 window_max_timestamp: float = None, 

994 ) -> Self: 

995 signals_data = [ 

996 s.interest_window_desampling( 

997 window_max_number_samples=window_max_number_samples, 

998 outside_max_number_samples=outside_max_number_samples, 

999 window_min_timestamp=window_min_timestamp, 

1000 window_max_timestamp=window_max_timestamp, 

1001 ) 

1002 for s in self.signals_data 

1003 ] 

1004 

1005 return SignalsData( 

1006 signals_data=signals_data, 

1007 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1008 data_start=self.data_start, 

1009 data_end=self.data_end, 

1010 ) 

1011 

1012 def zip_export(self, file_format: str = "csv"): 

1013 # return self.signals_data[0].csv_export() 

1014 zip_buffer = io.BytesIO() 

1015 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1016 for signal_data in self.signals_data: 

1017 if file_format == "csv": 

1018 export_io = signal_data.csv_export() 

1019 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1020 elif file_format == "prestoplot": 

1021 export_io = signal_data.prestoplot_export() 

1022 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1023 else: 

1024 raise ValueError(f"Format not found. Got: {file_format}") 

1025 zip_bytes = zip_buffer.getvalue() 

1026 # zip_bytes.seek(0) 

1027 return zip_bytes 

1028 

1029 

1030class SignalStatus(TwinPadModel): 

1031 status: str 

1032 reason: str 

1033 delay: float | None 

1034 

1035 

1036class DigitizationFunction(TwinPadModel): 

1037 bits: int | None = None 

1038 min_value: float 

1039 max_value: float 

1040 min_raw_value: float 

1041 max_raw_value: float 

1042 

1043 

1044class SignalUpdate(TwinPadModel): 

1045 value: float | str | bool | int | None = None 

1046 forced_value: float | str | bool | int | None = None 

1047 timestamp: int | None = None 

1048 

1049 

1050class SignalType(str, Enum): 

1051 command = "command" 

1052 sensor = "sensor" 

1053 external_sensor = "external_sensor" 

1054 

1055 

1056SIGNALDATA_TYPES = { 

1057 "int": NumericSignalData, 

1058 "float": NumericSignalData, 

1059 "str": StringSignalData, 

1060 "bool": NumericSignalData, 

1061 "epoch": NumericSignalData, 

1062} 

1063 

1064 

1065class Signal(GenericMongo): 

1066 collection_name: ClassVar[str] = "signals" 

1067 

1068 signal_id: str 

1069 frequency: float 

1070 unit: str | None 

1071 description: str 

1072 type: SignalType 

1073 data_type: str 

1074 precision_digits: int | None 

1075 forcible: bool 

1076 

1077 digitization_function: DigitizationFunction | None 

1078 

1079 @property 

1080 def device(self) -> Device: 

1081 device_id = self.signal_id.split(".")[0] 

1082 device = Device.get_one_by_attribute("device_id", device_id) 

1083 return device 

1084 

1085 @cached_property 

1086 def signal_data_class(self): 

1087 if self.data_type in SIGNALDATA_TYPES: 

1088 return SIGNALDATA_TYPES[self.data_type] 

1089 if self.data_type.startswith("enum"): 

1090 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1091 raise ValueError(f"Unhandled python type: {self.data_type}") 

1092 

1093 @cached_property 

1094 def python_type(self): 

1095 if self.data_type in TYPES: 

1096 return TYPES[self.data_type] 

1097 if self.data_type.startswith("enum"): 

1098 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1099 return Literal[*choices] 

1100 raise ValueError(f"Unhandled python type: {self.data_type}") 

1101 

1102 @computed_field 

1103 @property 

1104 def status(self) -> SignalStatus: 

1105 now = time.time() 

1106 status = "up" 

1107 reason = "" 

1108 

1109 # See line 292 for explanation 

1110 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1111 last_bucket = None 

1112 if bucket is not None: 

1113 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1114 if last_bucket is None: 

1115 status = "no data" 

1116 reason = "signal does not exist" 

1117 return SignalStatus(status=status, reason=reason, delay=None) 

1118 

1119 try: 

1120 last_date = last_bucket["control"]["max"]["timestamp"] 

1121 last_date = last_date.replace(tzinfo=pytz.UTC) 

1122 last_value_ts = last_date.timestamp() 

1123 except IndexError: 

1124 last_value_ts = None 

1125 

1126 if last_value_ts is None: 

1127 delay = None 

1128 reason = "No data from signal" 

1129 else: 

1130 # Since device is a computed property, only compute it once 

1131 device = self.device 

1132 if device is not None and device.last_ping is not None: 

1133 last_value_ts = max(last_value_ts, device.last_ping) 

1134 delay = now - last_value_ts 

1135 if delay > DEVICE_TIMEOUT: 

1136 status = "down" 

1137 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1138 return SignalStatus(status=status, reason=reason, delay=delay) 

1139 

1140 def update(self, update_dict: SignalUpdate): 

1141 send_signal_value(self.signal_id, update_dict) 

1142 

1143 @classmethod 

1144 def get_from_signal_id(cls, signal_id) -> Self: 

1145 """Could be generic from mongo""" 

1146 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1147 if not raw_value: 

1148 return None 

1149 del raw_value["_id"] 

1150 return cls.dict_to_object(raw_value) 

1151 

1152 @classmethod 

1153 def get_all_ids(cls) -> list[str]: 

1154 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1155 

1156 return [signal["signal_id"] for signal in cursor] 

1157 

1158 async def number_samples(self): 

1159 collection = get_signal_collection(signal_id=self.signal_id) 

1160 if collection is None: 

1161 return 0 

1162 

1163 number_samples = collection.estimated_document_count() 

1164 

1165 number_samples_async_collection = await get_async_collection( 

1166 systems_async_database, "number_samples", create=True, time_series=True 

1167 ) 

1168 

1169 loop = asyncio.get_running_loop() 

1170 loop.create_task( 

1171 number_samples_async_collection.insert_one( 

1172 { 

1173 "timestamp": datetime.datetime.now(pytz.UTC), 

1174 "signal_id": self.signal_id, 

1175 "number_samples": number_samples, 

1176 } 

1177 ) 

1178 ) 

1179 

1180 return number_samples 

1181 

1182 def sample_datasize(self): 

1183 return signals_database.command("collstats", self.signal_id)["size"] 

1184 

1185 @classmethod 

1186 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1187 result = cls.collection().aggregate( 

1188 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1189 ) 

1190 

1191 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1192 

1193 

1194class ServicesStatus(TwinPadModel): 

1195 backend: str 

1196 cloud_broker: str 

1197 time_series_database: str 

1198 signal_storage: str 

1199 heartbeat_storage: str 

1200 data_analyzer: str 

1201 

1202 @classmethod 

1203 def check(cls) -> Self: 

1204 return cls( 

1205 cloud_broker=ping(RABBITMQ_HOST), 

1206 backend="up", 

1207 time_series_database=ping(MONGO_HOST), 

1208 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1209 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1210 data_analyzer=ping(DATA_ANALYZER_HOST), 

1211 ) 

1212 

1213 

1214def ping(host): 

1215 try: 

1216 if ping3.ping(host, timeout=0.8): 

1217 return "up" 

1218 except PermissionError: 

1219 pass 

1220 return "down" 

1221 

1222 

1223class Event(GenericMongo): 

1224 collection_name: ClassVar[str] = "events" 

1225 

1226 name: str 

1227 timestamp: float 

1228 event_rule_id: str 

1229 

1230 @computed_field 

1231 @cached_property 

1232 def event_rule(self) -> "EventRule": 

1233 return EventRule.get_from_id(self.event_rule_id) 

1234 

1235 @classmethod 

1236 def dict_to_object(cls, dict_): 

1237 """Refine to convert timestamp to datetime for mongodb.""" 

1238 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1239 return super().dict_to_object(dict_) 

1240 

1241 

1242class EventDay(GenericMongo): 

1243 collection_name: ClassVar[str] = "number_events" 

1244 

1245 timestamp: float 

1246 number_events: int 

1247 

1248 @classmethod 

1249 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_events) -> list[Self]: 

1250 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1251 number_events_collection = get_collection(systems_database, "number_events") 

1252 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1253 items = [] 

1254 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1255 if number_events_collection is None or recompute_events: 

1256 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1257 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1258 if first_event is None: 

1259 return items 

1260 # compute from day of first found event 

1261 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1262 tzinfo=pytz.UTC 

1263 ) 

1264 while last_computed_day < TODAY: 

1265 day_nb_events = events_collection.count_documents( 

1266 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1267 ) 

1268 if day_nb_events > 0: 

1269 number_events_collection.insert_one( 

1270 {"timestamp": last_computed_day, "number_events": day_nb_events} 

1271 ) 

1272 last_computed_day += ONE_DAY_OFFSET 

1273 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1274 if number_events_today > 0: 

1275 number_events_collection.delete_one({"timestamp": TODAY}) 

1276 number_events_collection.insert_one({"timestamp": TODAY, "number_events": number_events_today}) 

1277 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1278 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1279 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1280 for day in number_events: 

1281 day["timestamp"] = day["timestamp"].timestamp() 

1282 items.append(cls.mongo_dict_to_object(day)) 

1283 return items 

1284 

1285 

1286class EventRule(GenericMongo): 

1287 collection_name: ClassVar[str] = "event_rules" 

1288 

1289 name: str 

1290 formula: str 

1291 variables: list[str] 

1292 

1293 @computed_field 

1294 @cached_property 

1295 def number_events(self) -> int: 

1296 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1297 

1298 

1299class Company(GenericMongo): 

1300 collection_name: ClassVar[str] = "companies" 

1301 name: str 

1302 

1303 

1304class User(GenericMongo): 

1305 collection_name: ClassVar[str] = "users" 

1306 

1307 firstname: str 

1308 lastname: str 

1309 email: str 

1310 password: str 

1311 is_active: bool | None = False 

1312 is_admin: bool | None = False 

1313 is_connected: bool | None = False 

1314 company_id: str | None = None 

1315 

1316 def to_dict(self, exclude=None): 

1317 if exclude is None: 

1318 exclude = {"password"} 

1319 return GenericMongo.to_dict(self, exclude=exclude) 

1320 

1321 @classmethod 

1322 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

1323 users = cls.get_all() 

1324 if not users: 

1325 is_admin = True 

1326 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

1327 user_collection = get_collection(systems_database, "users", create=True) 

1328 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

1329 if new_user is None: 

1330 return None 

1331 return {"user_id": str(new_user.inserted_id)} 

1332 

1333 @classmethod 

1334 def update(cls, user: "UserUpdate", user_id: str): 

1335 updated_user = cls.collection().find_one_and_update( 

1336 {"_id": ObjectId(user_id)}, 

1337 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

1338 return_document=ReturnDocument.AFTER, 

1339 ) 

1340 updated_user["id"] = str(updated_user["_id"]) 

1341 del (updated_user["_id"], updated_user["is_connected"]) 

1342 return cls(**updated_user) 

1343 

1344 

1345UserUpdate = create_update_model(User) 

1346 

1347 

1348class Campaign(GenericMongo): 

1349 collection_name: ClassVar[str] = "campaigns" 

1350 

1351 # Properties 

1352 id: str | None = None 

1353 name: str 

1354 description: str | None = None 

1355 

1356 @classmethod 

1357 def create(cls, campaign: Self): 

1358 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1359 if new_campaign is None: 

1360 return None 

1361 return {"campaign_id": str(new_campaign.inserted_id)} 

1362 

1363 @classmethod 

1364 def update(cls, campaign: Self): 

1365 updated_campaign = cls.collection().find_one_and_update( 

1366 {"_id": ObjectId(campaign.id)}, 

1367 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1368 return_document=ReturnDocument.AFTER, 

1369 ) 

1370 return updated_campaign 

1371 

1372 @classmethod 

1373 def delete(cls, campaign_id): 

1374 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1375 return deleted_user 

1376 

1377 

1378class Phase(GenericMongo): 

1379 collection_name: ClassVar[str] = "phases" 

1380 

1381 # Properties 

1382 id: str | None = None 

1383 name: str 

1384 description: str | None = None 

1385 start_at: float 

1386 end_at: float 

1387 

1388 # FK 

1389 campaign_id: str 

1390 

1391 # @classmethod 

1392 # def get_by_date(cls, datetime: float): 

1393 # phases = [] 

1394 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1395 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1396 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1397 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1398 # if phases is None: 

1399 # return None 

1400 # return phases 

1401 

1402 @classmethod 

1403 def create(cls, phase: Self): 

1404 phase = Phase( 

1405 name=phase.name, 

1406 description=phase.description, 

1407 start_at=phase.start_at, 

1408 end_at=phase.end_at, 

1409 campaign_id=phase.campaign_id, 

1410 ) 

1411 phase_collection = get_collection(systems_database, "phases", create=True) 

1412 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1413 if new_phase is None: 

1414 return None 

1415 return {"phase_id": str(new_phase.inserted_id)} 

1416 

1417 @classmethod 

1418 def update(cls, phase: Self): 

1419 updated_phase = cls.collection().find_one_and_update( 

1420 {"_id": ObjectId(phase.id)}, 

1421 { 

1422 "$set": { 

1423 "name": phase.name, 

1424 "description": phase.description, 

1425 "start_at": phase.start_at, 

1426 "end_at": phase.end_at, 

1427 } 

1428 }, 

1429 return_document=ReturnDocument.AFTER, 

1430 ) 

1431 return updated_phase 

1432 

1433 @classmethod 

1434 def delete(cls, phase_id): 

1435 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1436 return delete_phase 

1437 

1438 @classmethod 

1439 def deleteMany(cls, campaign_id): 

1440 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1441 return delete_phases 

1442 

1443 

1444class CustomViewCreation(GenericMongo): 

1445 collection_name: ClassVar[str] = "custom_views" 

1446 

1447 name: str 

1448 configuration: list 

1449 

1450 

1451class CustomView(CustomViewCreation): 

1452 # Properties 

1453 id: str | None = None 

1454 

1455 # Foreign Key 

1456 user_id: str 

1457 

1458 # # Methods 

1459 # @classmethod 

1460 # def create(cls, form_custom_view: Self, user_id) -> list: 

1461 # custom_view = CustomView( 

1462 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1463 # ) 

1464 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1465 # return new_custom_view 

1466 

1467 # @classmethod 

1468 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1469 # updated_custom_view = cls.collection().find_one_and_update( 

1470 # {"_id": ObjectId(custom_view_id)}, 

1471 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1472 # return_document=ReturnDocument.AFTER, 

1473 # ) 

1474 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1475 # del updated_custom_view["_id"] 

1476 # return cls(**updated_custom_view) 

1477 

1478 # @classmethod 

1479 # def delete(cls, custom_view_id) -> bool: 

1480 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1481 # return deleted_custom_view.acknowledged 

1482 

1483 

1484CustomViewUpdate = create_update_model(CustomView) 

1485 

1486 

1487class Video(GenericMongo): 

1488 collection_name: ClassVar[str] = "videos" 

1489 

1490 # Properties 

1491 name: str 

1492 ip_addr: str 

1493 username: str | None = None 

1494 password: str | None = None 

1495 

1496 # Methods 

1497 @classmethod 

1498 def get_all(cls, sort_by="_id") -> list[Self]: 

1499 items = [] 

1500 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1501 items.append(cls.mongo_dict_to_object(dict_)) 

1502 return items 

1503 

1504 @classmethod 

1505 def get_video(cls, camera_id: ObjectId): 

1506 camera = cls.get_from_id(camera_id) 

1507 return camera.name