Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 91%

944 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 14:27 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, Union 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23 

24# from scipy import signal as signal_scipy 

25 

26from twinpad_backend.db import ( 

27 get_collection, 

28 get_async_collection, 

29 get_signal_collection, 

30 systems_database, 

31 systems_async_database, 

32 signals_database, 

33 devices_states_database, 

34) 

35from twinpad_backend.responses import ListResponse 

36from twinpad_backend.messages import send_mode_change, send_signal_value 

37 

38TYPES = ({"int": int, "float": float, "str": str, "bool": bool, "epoch": float},) 

39 

40 

41RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

42MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

43SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

44HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

45DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

46 

47DEVICE_TIMEOUT = 5.0 

48NUMBER_SAMPLES_DATABASE_UPDATE = 120 

49 

50logger = logging.getLogger("uvicorn.error") 

51 

52 

53def create_update_model(model): 

54 fields = {} 

55 

56 for field_name, field_annotation in model.model_fields.items(): 

57 if field_name != "id": 

58 fields[field_name] = (Union[field_annotation.annotation, None], None) 

59 

60 query_name = model.__name__ + "Update" 

61 return create_model(query_name, **fields) 

62 

63 

64def get_utc_date_from_timestamp(timestamp: float): 

65 return ( 

66 datetime.datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %-H:%M:%S.%f") + " UTC" 

67 ) 

68 

69 

70def downsample_list(time_vector: list, values: list, max_number_samples: int): 

71 if len(time_vector) < max_number_samples: 

72 return time_vector, values 

73 

74 time_vector_copy = copy.deepcopy(time_vector) 

75 values_copy = copy.deepcopy(values) 

76 try: 

77 none_group_bounds = [] 

78 none_group_index = -1 

79 index = -1 

80 # LTTB doesn't handle None values so remove them 

81 while values_copy.count(None) > 0: 

82 # Store bounds of None value groups so we can insert them back after the downsampling 

83 if (new_index := values_copy.index(None)) != index: 

84 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

85 none_group_index += 1 

86 elif len(none_group_bounds[none_group_index]) < 2: 

87 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

88 else: 

89 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

90 values_copy.pop(new_index) 

91 index = new_index 

92 

93 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

94 

95 values_array = npy.array([time_vector_copy, values_copy]).T 

96 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

97 

98 new_time_vector = interpolated_values[:, 0].tolist() 

99 new_values = interpolated_values[:, 1].tolist() 

100 except ValueError: 

101 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

102 new_time_vector = npy.linspace( 

103 time_vector_copy[0], time_vector_copy[-1], max_number_samples, endpoint=True 

104 ).tolist() 

105 new_values = list(npy.interp(new_time_vector, time_vector_copy, values_copy)) 

106 return new_time_vector, new_values 

107 

108 # insert back None values at the correct timestamps 

109 for none_group in none_group_bounds: 

110 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

111 new_time_vector[start_index:start_index] = none_group 

112 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

113 

114 return new_time_vector, new_values 

115 

116 

117# Models 

118class TwinPadModel(BaseModel): 

119 @classmethod 

120 def dict_to_object(cls, dict_): 

121 return cls.model_validate(dict_) 

122 

123 def to_dict(self, exclude=None): 

124 dict_ = self.model_dump(exclude=exclude) 

125 return dict_ 

126 

127 

128class GenericMongo(TwinPadModel): 

129 id: str | None = None 

130 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

131 

132 @classmethod 

133 def collection(cls): 

134 return get_collection(systems_database, cls.collection_name, create=True) 

135 

136 @classmethod 

137 def response_from_query(cls, query) -> ListResponse[Self]: 

138 req_filter = query.mongodb_filter() 

139 items = [] 

140 if ":" in query.sort_by: 

141 sort_field, sort_order = query.sort_by.split(":") 

142 sort_order = int(sort_order) 

143 else: 

144 sort_field = query.sort_by 

145 sort_order = 1 

146 collection = get_collection(systems_database, cls.collection_name, create=True) 

147 total = collection.count_documents(req_filter) 

148 

149 pipeline = [] 

150 added_properties = [] 

151 

152 if "$and" in req_filter: 

153 for filter in req_filter["$and"]: 

154 for property in cls.custom_pipeline_steps: 

155 if property in filter: 

156 pipeline.extend(cls.custom_pipeline_steps[property]) 

157 added_properties.append(property) 

158 else: 

159 for property in cls.custom_pipeline_steps: 

160 if property in req_filter: 

161 pipeline.extend(cls.custom_pipeline_steps[property]) 

162 added_properties.append(property) 

163 

164 pipeline.append({"$match": req_filter}) 

165 

166 if sort_field in cls.custom_pipeline_steps: 

167 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

168 added_properties.append(sort_field) 

169 

170 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}]) 

171 

172 if (query.limit is not None) and (query.limit != 0): 

173 pipeline.append({"$limit": query.limit}) 

174 

175 for property, step in cls.custom_pipeline_steps.items(): 

176 if property not in added_properties: 

177 pipeline.extend(step) 

178 

179 cursor = collection.aggregate(pipeline) 

180 

181 for item_dict in cursor: 

182 items.append(cls.mongo_dict_to_object(item_dict)) 

183 

184 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

185 

186 @classmethod 

187 def get_from_id(cls, item_id) -> Self | None: 

188 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

189 

190 @classmethod 

191 def mongo_dict_to_object(cls, mongo_dict): 

192 mongo_dict["id"] = str(mongo_dict["_id"]) 

193 del mongo_dict["_id"] 

194 return cls.dict_to_object(mongo_dict) 

195 

196 @classmethod 

197 def get_by_attribute(cls, attribute_name: str, attribute_value): 

198 """Returns all items that match the attribute with value.""" 

199 pipeline = [] 

200 if attribute_name in cls.custom_pipeline_steps: 

201 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

202 pipeline.append({"$match": {attribute_name: attribute_value}}) 

203 for key, step in cls.custom_pipeline_steps.items(): 

204 if key != attribute_name: 

205 pipeline.extend(step) 

206 items = cls.collection().aggregate(pipeline) 

207 if items is None: 

208 return None 

209 return [cls.mongo_dict_to_object(d) for d in items] 

210 

211 @classmethod 

212 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

213 pipeline = [] 

214 if attribute_name in cls.custom_pipeline_steps: 

215 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

216 pipeline.append({"$match": {attribute_name: attribute_value}}) 

217 pipeline.append({"$limit": 1}) 

218 for key, step in cls.custom_pipeline_steps.items(): 

219 if key != attribute_name: 

220 pipeline.extend(step) 

221 items = cls.collection().aggregate(pipeline).to_list() 

222 if len(items) == 0: 

223 return None 

224 return cls.mongo_dict_to_object(items[0]) 

225 

226 @classmethod 

227 def get_all(cls, sort_by="_id") -> list[Self]: 

228 items = [] 

229 pipeline = [] 

230 if sort_by in cls.custom_pipeline_steps: 

231 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

232 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

233 for key, step in cls.custom_pipeline_steps.items(): 

234 if key != sort_by: 

235 pipeline.extend(step) 

236 for dict_ in cls.collection().aggregate(pipeline): 

237 items.append(cls.mongo_dict_to_object(dict_)) 

238 return items 

239 

240 @classmethod 

241 def get_number_documents(cls): 

242 collection = get_collection(systems_database, cls.collection_name) 

243 if collection is None: 

244 return 0 

245 return collection.count_documents({}) 

246 

247 def insert(self): 

248 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

249 self.id = str(insert_result.inserted_id) 

250 return self.id 

251 

252 def update(self, update_dict): 

253 for key, value in update_dict.items(): 

254 setattr(self, key, value) 

255 self.collection().find_one_and_update( 

256 {"_id": ObjectId(self.id)}, 

257 {"$set": update_dict}, 

258 return_document=ReturnDocument.AFTER, 

259 ) 

260 

261 return self 

262 

263 def delete(self): 

264 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

265 return result.deleted_count > 0 

266 

267 

268class User(GenericMongo): 

269 collection_name: ClassVar[str] = "users" 

270 

271 firstname: str 

272 lastname: str 

273 email: str 

274 password: str 

275 is_active: bool | None = False 

276 is_admin: bool | None = False 

277 is_connected: bool | None = False 

278 company_id: str | None = None 

279 

280 def to_dict(self, exclude=None): 

281 if exclude is None: 

282 exclude = {"password"} 

283 return GenericMongo.to_dict(self, exclude=exclude) 

284 

285 @classmethod 

286 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

287 users = cls.get_all() 

288 if not users: 

289 is_admin = True 

290 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

291 user_collection = get_collection(systems_database, "users", create=True) 

292 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

293 if new_user is None: 

294 return None 

295 return {"user_id": str(new_user.inserted_id)} 

296 

297 @classmethod 

298 def update(cls, user: "UserUpdate", user_id: str): 

299 updated_user = cls.collection().find_one_and_update( 

300 {"_id": ObjectId(user_id)}, 

301 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

302 return_document=ReturnDocument.AFTER, 

303 ) 

304 updated_user["id"] = str(updated_user["_id"]) 

305 del (updated_user["_id"], updated_user["is_connected"]) 

306 return cls(**updated_user) 

307 

308 

309UserUpdate = create_update_model(User) 

310 

311 

312class Mode(TwinPadModel): 

313 mode_id: int 

314 name: str 

315 frequency_multiplier: float 

316 min_frequency: float 

317 

318 

319class DeviceUpdate(TwinPadModel): 

320 mode_id: int 

321 

322 

323class Device(GenericMongo): 

324 collection_name: ClassVar[str] = "devices" 

325 

326 device_id: str 

327 name: str 

328 description: str = "" 

329 modes: list[Mode] 

330 current_mode_id: int | None = None 

331 last_ping: float | None = None 

332 petri_network: Any 

333 pid: Any 

334 load: float | None = None 

335 tokens: list[int] = Field(default_factory=list) 

336 status: str 

337 

338 custom_pipeline_steps: ClassVar[dict[str, list]] = { 

339 "status": [ 

340 { 

341 "$addFields": { 

342 "status": { 

343 "$cond": { 

344 "if": { 

345 "$or": [ 

346 {"$eq": [{"$ifNull": ["$last_ping", None]}, None]}, 

347 { 

348 "$gt": [ 

349 {"$subtract": ["$$NOW", {"$multiply": ["$last_ping", 1000]}]}, 

350 {"$toDate": float(DEVICE_TIMEOUT * 1000)}, 

351 ] 

352 }, 

353 ] 

354 }, 

355 "then": "down", 

356 "else": "up", 

357 } 

358 } 

359 } 

360 } 

361 ] 

362 } 

363 

364 async def update(self, update_dict, current_user: User): 

365 if self.current_mode_id is not None: 

366 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

367 else: 

368 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

369 command = Command( 

370 sent_at=time.time(), 

371 command_type="Mode change", 

372 description=description, 

373 user_id=current_user.id, 

374 ) 

375 response = await send_mode_change(self.device_id, update_dict.mode_id) 

376 command.receive_response(response) 

377 Command.create(command) 

378 

379 return response 

380 

381 @classmethod 

382 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

383 devices_by_id = {} 

384 for signal_id in signal_ids: 

385 device_id = signal_id.split(".")[0] 

386 if device_id not in devices_by_id: 

387 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

388 return devices_by_id 

389 

390 

391class DeviceSetup(GenericMongo): 

392 collection_name: ClassVar[str] = "device_setups" 

393 

394 device_ids: list[str] 

395 active: bool = False 

396 variable_mapping: dict[str, str] 

397 

398 

399DeviceSetupUpdate = create_update_model(DeviceSetup) 

400 

401 

402class DeviceState(GenericMongo): 

403 collection_name: ClassVar[str] = "devices_states" 

404 

405 timestamp: float 

406 mode: str | None = None 

407 load: float | None = None 

408 tokens: list[int] = Field(default_factory=list) 

409 modified_properties: list[str] = Field(default_factory=list) 

410 

411 @classmethod 

412 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

413 req_filter = query.mongodb_filter() 

414 items = [] 

415 if ":" in query.sort_by: 

416 sort_field, sort_order = query.sort_by.split(":") 

417 sort_order = int(sort_order) 

418 else: 

419 sort_field = query.sort_by 

420 sort_order = 1 

421 collection = get_collection(devices_states_database, device_id) 

422 if collection is None: 

423 total = 0 

424 cursor = [] 

425 else: 

426 total = collection.count_documents(req_filter) 

427 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

428 if (query.limit is not None) and (query.limit != 0): 

429 cursor = cursor.limit(query.limit) 

430 for item_dict in cursor: 

431 items.append( 

432 cls( 

433 timestamp=item_dict.get("precise_timestamp"), 

434 mode=item_dict.get("mode", None), 

435 load=item_dict.get("load", None), 

436 tokens=item_dict.get("tokens", Field(default_factory=list)), 

437 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

438 ) 

439 ) 

440 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

441 

442 

443class SignalSample(TwinPadModel): 

444 signal_id: str 

445 timestamp: float 

446 value: float | int | str | bool | None 

447 forced_value: float | int | str | bool | None = None 

448 

449 @classmethod 

450 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

451 

452 collection = get_signal_collection(signal_id) 

453 if collection is None: 

454 return None 

455 

456 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

457 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

458 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

459 first_bucket = None 

460 if bucket is not None: 

461 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

462 if first_bucket is not None: 

463 sample_data = collection.find_one( 

464 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

465 ) 

466 else: 

467 sample_data = collection.find_one({}, sort=[("precise_timestamp", 1)]) 

468 

469 if sample_data is None: 

470 return None 

471 

472 timestamp = sample_data["precise_timestamp"] 

473 

474 return cls( 

475 signal_id=signal_id, 

476 timestamp=timestamp, 

477 value=sample_data.get("value", None), 

478 forced_value=sample_data.get("forced_value", None), 

479 ) 

480 

481 @classmethod 

482 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

483 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

484 

485 @classmethod 

486 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

487 

488 collection = get_signal_collection(signal_id) 

489 if collection is None: 

490 return None 

491 

492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

494 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

495 last_bucket = None 

496 if bucket is not None: 

497 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

498 if last_bucket is not None: 

499 sample_data = collection.find_one({"precise_timestamp": last_bucket["control"]["max"]["precise_timestamp"]}) 

500 else: 

501 sample_data = collection.find_one({}, sort=[("precise_timestamp", -1)]) 

502 

503 if sample_data is None: 

504 return None 

505 

506 timestamp = sample_data["precise_timestamp"] 

507 

508 if device is None: 

509 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

510 if device is not None and device.last_ping is not None: 

511 if timestamp is None: 

512 timestamp = device.last_ping 

513 else: 

514 timestamp = max(timestamp, device.last_ping) 

515 return cls( 

516 signal_id=signal_id, 

517 timestamp=timestamp, 

518 value=sample_data.get("value", None), 

519 forced_value=sample_data.get("forced_value", None), 

520 ) 

521 

522 @classmethod 

523 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

524 collection = get_signal_collection(signal_id) 

525 if collection is None: 

526 return None 

527 

528 cursor = collection.aggregate( 

529 [ 

530 {"$match": {"timestamp": {"$gte": datetime.datetime.fromtimestamp(min_timestamp, pytz.UTC)}}}, 

531 {"$sort": {"timestamp": -1}}, 

532 {"$limit": 1}, 

533 ] 

534 ) 

535 cursor_data = cursor.to_list() 

536 

537 if len(cursor_data) == 0: 

538 return None 

539 

540 sample_data = cursor_data[0] 

541 return cls( 

542 signal_id=signal_id, 

543 timestamp=sample_data.get("precise_timestamp"), 

544 value=sample_data.get("value"), 

545 forced_value=sample_data.get("forced_value"), 

546 ) 

547 

548 @classmethod 

549 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

550 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

551 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

552 

553 @classmethod 

554 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

555 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

556 

557 

558class SignalData(TwinPadModel): 

559 signal_id: str 

560 forcible: bool = True 

561 time_vector: list[float] 

562 values: list[float | int | str | None] 

563 forced_values: list[float | int | str | None] 

564 

565 data_start: float | None = None 

566 data_end: float | None = None 

567 

568 number_samples: int = 0 

569 number_samples_db: int = 0 

570 

571 db_query_time: float = 0.0 

572 init_time: float = 0.0 

573 data_processing_time: float = 0.0 

574 

575 @classmethod 

576 def get_from_signal_id( 

577 cls, 

578 signal_id: str, 

579 min_timestamp: float = None, 

580 max_timestamp: float = None, 

581 window_min_timestamp: float = None, 

582 window_max_timestamp: float = None, 

583 interpolate_bounds: bool = True, 

584 max_documents: int = None, 

585 ) -> Self: 

586 

587 now = time.time() 

588 

589 req_signal = {} 

590 if min_timestamp is not None: 

591 req_signal.setdefault("timestamp", {}) 

592 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

593 if max_timestamp is not None: 

594 req_signal.setdefault("timestamp", {}) 

595 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

596 

597 collection = get_signal_collection(signal_id) 

598 if collection is None: 

599 return cls( 

600 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

601 ) 

602 

603 db_req_start = time.time() 

604 

605 sort_step = {"$sort": {"precise_timestamp": 1}} 

606 number_results = collection.count_documents(req_signal) 

607 

608 pipeline = [] 

609 if req_signal: 

610 pipeline.append({"$match": req_signal}) # Filter data if needed 

611 

612 pipeline.extend( 

613 [ 

614 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

615 sort_step, 

616 ] 

617 ) 

618 

619 if max_documents is not None and max_documents < number_results: 

620 unsampling_ratio = math.ceil(number_results / max_documents) 

621 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

622 pipeline.extend( 

623 [ 

624 { 

625 "$setWindowFields": { 

626 "sortBy": {"precise_timestamp": 1}, 

627 "output": {"index": {"$documentNumber": {}}}, 

628 } 

629 }, 

630 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

631 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

632 {"$replaceRoot": {"newRoot": "$doc"}}, 

633 {"$unset": ["index", "group_id"]}, 

634 {"$sort": {"precise_timestamp": 1}}, 

635 ] 

636 ) 

637 

638 # logger.info(f"pipeline: %s", str(pipeline)) 

639 cursor = collection.aggregate(pipeline) 

640 db_req_time = time.time() - db_req_start 

641 

642 init_time = time.time() 

643 

644 results = cursor.to_list() 

645 time_vector = [] 

646 values = [] 

647 forced_values = [] 

648 for s in results: 

649 time_vector.append(s["precise_timestamp"]) 

650 values.append(s.get("value", None)) 

651 forced_values.append(s.get("forced_value", None)) 

652 

653 signal = Signal.get_from_signal_id(signal_id) 

654 class_ = signal.signal_data_class 

655 

656 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

657 time_vector, values, forced_values = cls.interpolate_bounds( 

658 class_, 

659 collection, 

660 signal_id, 

661 time_vector, 

662 values, 

663 forced_values, 

664 window_min_timestamp, 

665 window_max_timestamp, 

666 ) 

667 

668 if values: 

669 # TODO: check below. a bit strange 

670 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

671 # Adding last value as it should be repeated 

672 time_vector.append(now) 

673 values.append(values[-1]) 

674 forced_values.append(forced_values[-1]) 

675 

676 init_time = time.time() - init_time 

677 

678 # See line 292 for explanation 

679 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

680 first_bucket = None 

681 if bucket is not None: 

682 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

683 if first_bucket is not None: 

684 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

685 else: 

686 data_start = None 

687 

688 last_bucket = None 

689 if bucket is not None: 

690 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

691 if last_bucket is not None: 

692 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

693 else: 

694 data_end = None 

695 

696 return class_( 

697 signal_id=signal_id, 

698 forcible=signal.forcible, 

699 time_vector=time_vector, 

700 values=values, 

701 forced_values=forced_values, 

702 data_start=data_start, 

703 data_end=data_end, 

704 number_samples=len(values), 

705 number_samples_db=number_results, 

706 db_query_time=db_req_time, 

707 init_time=init_time, 

708 ) 

709 

710 @staticmethod 

711 def interpolate_bounds( 

712 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

713 ): 

714 sample_right = None 

715 # Fetching right side value & interpolation 

716 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

717 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

718 sample_right = collection.find_one( 

719 { 

720 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

721 "value": {"$exists": True}, 

722 }, 

723 sort=[("precise_timestamp", -1)], 

724 ) 

725 if sample_right: 

726 if time_vector: 

727 right_sd = class_( 

728 signal_id=signal_id, 

729 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

730 values=[values[-1], sample_right.get("value", None)], 

731 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

732 ) 

733 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

734 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

735 else: 

736 max_ts_value = sample_right.get("value", None) 

737 max_ts_forced_value = sample_right.get("forced_value", None) 

738 time_vector.append(window_max_timestamp) 

739 values.append(max_ts_value) 

740 forced_values.append(max_ts_forced_value) 

741 

742 # Fetching left side value & interpolation 

743 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

744 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

745 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

746 sample_left = sample_right 

747 sample_left = collection.find_one( 

748 { 

749 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

750 "value": {"$exists": True}, 

751 }, 

752 sort=[("precise_timestamp", -1)], 

753 ) 

754 

755 if sample_left: 

756 if time_vector: 

757 left_sd = class_( 

758 signal_id=signal_id, 

759 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

760 values=[sample_left["value"], values[0]], 

761 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

762 ) 

763 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

764 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

765 else: 

766 min_ts_value = sample_left.get("value", None) 

767 min_ts_forced_value = sample_left.get("forced_value", None) 

768 time_vector.insert(0, window_min_timestamp) 

769 values.insert(0, min_ts_value) 

770 forced_values.insert(0, min_ts_forced_value) 

771 

772 return time_vector, values, forced_values 

773 

774 def interpolate_values(self, new_time_vector: list[float]): 

775 return self.interpolate(new_time_vector, self.values) 

776 

777 def interpolate_forced_values(self, new_time_vector: list[float]): 

778 return self.interpolate(new_time_vector, self.forced_values) 

779 

780 def uniform_desampling(self, number_samples_max: int) -> Self: 

781 data_processing_time = time.time() 

782 if number_samples_max and self.number_samples > number_samples_max: 

783 new_time_vector = npy.linspace( 

784 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

785 ).tolist() 

786 values = self.interpolate_values(new_time_vector) 

787 forced_values = self.interpolate_forced_values(new_time_vector) 

788 time_vector = new_time_vector 

789 number_samples = len(time_vector) 

790 else: 

791 time_vector = self.time_vector 

792 number_samples = len(self.values) 

793 values = self.values[:] 

794 forced_values = self.forced_values[:] 

795 data_processing_time = time.time() - data_processing_time 

796 

797 return self.__class__( 

798 signal_id=self.signal_id, 

799 time_vector=time_vector, 

800 values=values, 

801 forced_values=forced_values, 

802 number_samples=number_samples, 

803 number_samples_db=self.number_samples, 

804 data_start=self.data_start, 

805 data_end=self.data_end, 

806 db_query_time=self.db_query_time, 

807 init_time=self.init_time, 

808 data_processing_time=self.data_processing_time + data_processing_time, 

809 ) 

810 

811 def interest_window_desampling( 

812 self, 

813 window_max_number_samples: int, 

814 outside_max_number_samples: int, 

815 window_min_timestamp: float | None = None, 

816 window_max_timestamp: float | None = None, 

817 ) -> Self: 

818 """Performs a sampling in a window of interest and outside.""" 

819 

820 if not self.time_vector: 

821 return self 

822 

823 if window_min_timestamp is None: 

824 window_min_timestamp = self.time_vector[0] 

825 if window_max_timestamp is None: 

826 window_max_timestamp = self.time_vector[-1] 

827 

828 data_processing_time = time.time() 

829 

830 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

831 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

832 

833 time_vector_before = self.time_vector[:index_window_start] 

834 time_vector_window = self.time_vector[index_window_start:index_window_end] 

835 time_vector_after = self.time_vector[index_window_end:] 

836 

837 # Resampling window 

838 if time_vector_window: 

839 # Ensurring window bounds 

840 if time_vector_window[0] != window_min_timestamp: 

841 time_vector_window.insert(0, window_min_timestamp) 

842 if time_vector_window[-1] != window_max_timestamp: 

843 time_vector_window.append(window_max_timestamp) 

844 else: 

845 time_vector_window = [window_min_timestamp, window_max_timestamp] 

846 

847 if len(time_vector_window) > window_max_number_samples: 

848 # Resampling 

849 new_window_time_vector = npy.linspace( 

850 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

851 ).tolist() 

852 time_vector_window = new_window_time_vector 

853 

854 # Resampling outside 

855 number_samples_before = len(time_vector_before) 

856 number_samples_after = len(time_vector_after) 

857 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

858 new_number_samples_before = math.ceil( 

859 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

860 ) 

861 new_number_samples_after = math.ceil( 

862 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

863 ) 

864 # Adjusting numbers as math.ceil can do +1 on sum 

865 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

866 if new_number_samples_before > new_number_samples_after: 

867 new_number_samples_before -= 1 

868 else: 

869 new_number_samples_after -= 1 

870 

871 if new_number_samples_before: 

872 new_time_vector_before = npy.linspace( 

873 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

874 ).tolist() 

875 time_vector_before = new_time_vector_before 

876 

877 if number_samples_after: 

878 new_time_vector_after = npy.linspace( 

879 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

880 ).tolist()[::-1] 

881 time_vector_after = new_time_vector_after 

882 

883 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

884 values = self.interpolate_values(new_time_vector) 

885 forced_values = self.interpolate_forced_values(new_time_vector) 

886 number_samples = len(values) 

887 

888 data_processing_time = time.time() - data_processing_time 

889 

890 return self.__class__( 

891 signal_id=self.signal_id, 

892 forcible=self.forcible, 

893 time_vector=new_time_vector, 

894 values=values, 

895 forced_values=forced_values, 

896 number_samples=number_samples, 

897 number_samples_db=self.number_samples, 

898 data_start=self.data_start, 

899 data_end=self.data_end, 

900 db_query_time=self.db_query_time, 

901 init_time=self.init_time, 

902 data_processing_time=self.data_processing_time + data_processing_time, 

903 ) 

904 

905 def csv_export(self): 

906 output = io.StringIO() 

907 writer = csv.writer(output) 

908 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

909 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

910 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

911 return output.getvalue().encode("utf-8") 

912 

913 def prestoplot_export(self): 

914 clean_signal_id = self.signal_id.replace(".", "_") 

915 if clean_signal_id[0].isnumeric(): 

916 clean_signal_id = "_" + clean_signal_id 

917 

918 output = io.StringIO() 

919 output.write("# Encoding:\tUTF-8\n") 

920 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

921 output.write("ISO8601\tnone\tnone\n") 

922 output.write(f"# Description :\t{clean_signal_id}\n") 

923 

924 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

925 output.write( 

926 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

927 ) 

928 return output.getvalue().encode("utf-8") 

929 

930 

931class NumericSignalData(SignalData): 

932 data_type: str = "float" 

933 values: list[float | int | None] 

934 forced_values: list[float | int | None] 

935 

936 def interpolate(self, new_time_vector: list[float], items): 

937 items = [npy.nan if s is None else s for s in items] 

938 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

939 

940 def uniform_desampling(self, number_samples_max: int) -> Self: 

941 data_processing_time = time.time() 

942 if number_samples_max and self.number_samples > number_samples_max: 

943 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

944 forced_values = self.interpolate_forced_values(time_vector) 

945 number_samples = len(time_vector) 

946 else: 

947 time_vector = self.time_vector 

948 number_samples = len(self.values) 

949 values = self.values[:] 

950 forced_values = self.forced_values[:] 

951 data_processing_time = time.time() - data_processing_time 

952 

953 return self.__class__( 

954 signal_id=self.signal_id, 

955 time_vector=time_vector, 

956 values=values, 

957 forced_values=forced_values, 

958 number_samples=number_samples, 

959 number_samples_db=self.number_samples, 

960 data_start=self.data_start, 

961 data_end=self.data_end, 

962 db_query_time=self.db_query_time, 

963 init_time=self.init_time, 

964 data_processing_time=self.data_processing_time + data_processing_time, 

965 ) 

966 

967 def interest_window_desampling( 

968 self, 

969 window_max_number_samples: int, 

970 outside_max_number_samples: int, 

971 window_min_timestamp: float | None = None, 

972 window_max_timestamp: float | None = None, 

973 ) -> Self: 

974 """Performs a sampling in a window of interest and outside.""" 

975 

976 if not self.time_vector: 

977 return self 

978 

979 if window_min_timestamp is None: 

980 window_min_timestamp = self.time_vector[0] 

981 if window_max_timestamp is None: 

982 window_max_timestamp = self.time_vector[-1] 

983 

984 data_processing_time = time.time() 

985 

986 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

987 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

988 

989 time_vector_before = self.time_vector[:index_window_start] 

990 time_vector_window = self.time_vector[index_window_start:index_window_end] 

991 time_vector_after = self.time_vector[index_window_end:] 

992 

993 values_before = self.values[:index_window_start] 

994 values_window = self.values[index_window_start:index_window_end] 

995 values_after = self.values[index_window_end:] 

996 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

997 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

998 

999 # Resampling window 

1000 if time_vector_window: 

1001 # Ensurring window bounds 

1002 if time_vector_window[0] != window_min_timestamp: 

1003 time_vector_window.insert(0, window_min_timestamp) 

1004 values_window.insert(0, window_min_value) 

1005 if time_vector_window[-1] != window_max_timestamp: 

1006 time_vector_window.append(window_max_timestamp) 

1007 values_window.append(window_max_value) 

1008 else: 

1009 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1010 values_window = [window_min_value, window_max_value] 

1011 

1012 if len(time_vector_window) > window_max_number_samples: 

1013 # Resampling 

1014 time_vector_window, values_window = downsample_list( 

1015 time_vector_window, values_window, window_max_number_samples 

1016 ) 

1017 

1018 # Resampling outside 

1019 number_samples_before = len(time_vector_before) 

1020 number_samples_after = len(time_vector_after) 

1021 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1022 new_number_samples_before = math.ceil( 

1023 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1024 ) 

1025 new_number_samples_after = math.ceil( 

1026 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1027 ) 

1028 # Adjusting numbers as math.ceil can do +1 on sum 

1029 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1030 if new_number_samples_before > new_number_samples_after: 

1031 new_number_samples_before -= 1 

1032 else: 

1033 new_number_samples_after -= 1 

1034 

1035 if new_number_samples_before: 

1036 time_vector_before, values_before = downsample_list( 

1037 time_vector_before, values_before, new_number_samples_before 

1038 ) 

1039 

1040 if number_samples_after: 

1041 time_vector_after, values_after = downsample_list( 

1042 time_vector_after, values_after, new_number_samples_after 

1043 ) 

1044 

1045 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1046 values = values_before + values_window + values_after 

1047 forced_values = self.interpolate_forced_values(new_time_vector) 

1048 number_samples = len(values) 

1049 

1050 data_processing_time = time.time() - data_processing_time 

1051 

1052 return self.__class__( 

1053 signal_id=self.signal_id, 

1054 time_vector=new_time_vector, 

1055 values=values, 

1056 forced_values=forced_values, 

1057 number_samples=number_samples, 

1058 number_samples_db=self.number_samples, 

1059 data_start=self.data_start, 

1060 data_end=self.data_end, 

1061 db_query_time=self.db_query_time, 

1062 init_time=self.init_time, 

1063 data_processing_time=self.data_processing_time + data_processing_time, 

1064 ) 

1065 

1066 

1067class StringSignalData(SignalData): 

1068 data_type: str = "str" 

1069 values: list[str | None] 

1070 forced_values: list[str | None] 

1071 

1072 def interpolate(self, new_time_vector: list[float], items): 

1073 # Find the indices of the values in xp that are just smaller or equal to x 

1074 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1075 indices = npy.clip(indices, 0, len(items) - 1) 

1076 # Return the corresponding left string values from fp 

1077 return [items[i] for i in indices] 

1078 

1079 

1080class SignalsData(TwinPadModel): 

1081 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1082 data_processing_time: float 

1083 data_start: float | None 

1084 data_end: float | None 

1085 

1086 @classmethod 

1087 def get_from_signal_ids( 

1088 cls, 

1089 signal_ids: list[str], 

1090 min_timestamp: float = None, 

1091 max_timestamp: float = None, 

1092 window_min_timestamp: float = None, 

1093 window_max_timestamp: float = None, 

1094 interpolate_bounds: bool = True, 

1095 max_documents: int = None, 

1096 ) -> Self: 

1097 signals_data = [] 

1098 data_start = None 

1099 data_end = None 

1100 if max_timestamp is None: 

1101 max_timestamp = time.time() 

1102 data_processing_time = 0.0 

1103 for signal_id in signal_ids: 

1104 signal_data = SignalData.get_from_signal_id( 

1105 signal_id=signal_id, 

1106 min_timestamp=min_timestamp, 

1107 max_timestamp=max_timestamp, 

1108 window_min_timestamp=window_min_timestamp, 

1109 window_max_timestamp=window_max_timestamp, 

1110 interpolate_bounds=interpolate_bounds, 

1111 max_documents=max_documents, 

1112 ) 

1113 data_processing_time += signal_data.data_processing_time 

1114 signals_data.append(signal_data) 

1115 if signal_data.data_start is not None: 

1116 if data_start is None: 

1117 data_start = signal_data.data_start 

1118 else: 

1119 data_start = min(signal_data.data_start, data_start) 

1120 if signal_data.data_end is not None: 

1121 if data_end is None: 

1122 data_end = signal_data.data_end 

1123 else: 

1124 data_end = max(signal_data.data_end, data_end) 

1125 

1126 return cls( 

1127 signals_data=signals_data, 

1128 data_processing_time=data_processing_time, 

1129 data_start=data_start, 

1130 data_end=data_end, 

1131 ) 

1132 

1133 def uniform_desampling(self, number_samples_max: int) -> Self: 

1134 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1135 return SignalsData( 

1136 signals_data=signals_data, 

1137 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1138 data_start=self.data_start, 

1139 data_end=self.data_end, 

1140 ) 

1141 

1142 def interest_window_desampling( 

1143 self, 

1144 window_max_number_samples: int, 

1145 outside_max_number_samples: int, 

1146 window_min_timestamp: float = None, 

1147 window_max_timestamp: float = None, 

1148 ) -> Self: 

1149 signals_data = [ 

1150 s.interest_window_desampling( 

1151 window_max_number_samples=window_max_number_samples, 

1152 outside_max_number_samples=outside_max_number_samples, 

1153 window_min_timestamp=window_min_timestamp, 

1154 window_max_timestamp=window_max_timestamp, 

1155 ) 

1156 for s in self.signals_data 

1157 ] 

1158 

1159 return SignalsData( 

1160 signals_data=signals_data, 

1161 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1162 data_start=self.data_start, 

1163 data_end=self.data_end, 

1164 ) 

1165 

1166 def zip_export(self, file_format: str = "csv"): 

1167 # return self.signals_data[0].csv_export() 

1168 zip_buffer = io.BytesIO() 

1169 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1170 for signal_data in self.signals_data: 

1171 if file_format == "csv": 

1172 export_io = signal_data.csv_export() 

1173 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1174 elif file_format == "prestoplot": 

1175 export_io = signal_data.prestoplot_export() 

1176 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1177 else: 

1178 raise ValueError(f"Format not found. Got: {file_format}") 

1179 zip_bytes = zip_buffer.getvalue() 

1180 # zip_bytes.seek(0) 

1181 return zip_bytes 

1182 

1183 

1184class SignalStatus(TwinPadModel): 

1185 status: str 

1186 reason: str 

1187 delay: float | None 

1188 

1189 

1190class DigitizationFunction(TwinPadModel): 

1191 bits: int | None = None 

1192 min_value: float 

1193 max_value: float 

1194 min_raw_value: float 

1195 max_raw_value: float 

1196 

1197 

1198class SignalUpdate(TwinPadModel): 

1199 value: float | str | bool | int | None = None 

1200 forced_value: float | str | bool | int | None = None 

1201 timestamp: int | None = None 

1202 

1203 

1204class SignalType(str, Enum): 

1205 command = "command" 

1206 sensor = "sensor" 

1207 external_sensor = "external_sensor" 

1208 

1209 

1210SIGNALDATA_TYPES = { 

1211 "int": NumericSignalData, 

1212 "float": NumericSignalData, 

1213 "str": StringSignalData, 

1214 "bool": NumericSignalData, 

1215 "epoch": NumericSignalData, 

1216} 

1217 

1218 

1219class Signal(GenericMongo): 

1220 collection_name: ClassVar[str] = "signals" 

1221 

1222 signal_id: str 

1223 frequency: float 

1224 unit: str | None 

1225 description: str 

1226 type: SignalType 

1227 data_type: str 

1228 precision_digits: int | None 

1229 forcible: bool 

1230 

1231 digitization_function: DigitizationFunction | None 

1232 

1233 @property 

1234 def device(self) -> Device: 

1235 device_id = self.signal_id.split(".")[0] 

1236 device = Device.get_one_by_attribute("device_id", device_id) 

1237 return device 

1238 

1239 @cached_property 

1240 def signal_data_class(self): 

1241 if self.data_type in SIGNALDATA_TYPES: 

1242 return SIGNALDATA_TYPES[self.data_type] 

1243 if self.data_type.startswith("enum"): 

1244 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1245 raise ValueError(f"Unhandled python type: {self.data_type}") 

1246 

1247 @cached_property 

1248 def python_type(self): 

1249 if self.data_type in TYPES: 

1250 return TYPES[self.data_type] 

1251 if self.data_type.startswith("enum"): 

1252 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1253 return Literal[*choices] 

1254 raise ValueError(f"Unhandled python type: {self.data_type}") 

1255 

1256 @computed_field 

1257 @property 

1258 def status(self) -> SignalStatus: 

1259 now = time.time() 

1260 status = "up" 

1261 reason = "" 

1262 

1263 # See line 292 for explanation 

1264 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1265 last_bucket = None 

1266 if bucket is not None: 

1267 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1268 if last_bucket is None: 

1269 status = "no data" 

1270 reason = "signal does not exist" 

1271 return SignalStatus(status=status, reason=reason, delay=None) 

1272 

1273 try: 

1274 last_date = last_bucket["control"]["max"]["timestamp"] 

1275 last_date = last_date.replace(tzinfo=pytz.UTC) 

1276 last_value_ts = last_date.timestamp() 

1277 except IndexError: 

1278 last_value_ts = None 

1279 

1280 if last_value_ts is None: 

1281 delay = None 

1282 reason = "No data from signal" 

1283 else: 

1284 # Since device is a computed property, only compute it once 

1285 device = self.device 

1286 if device is not None and device.last_ping is not None: 

1287 last_value_ts = max(last_value_ts, device.last_ping) 

1288 delay = now - last_value_ts 

1289 if delay > DEVICE_TIMEOUT: 

1290 status = "down" 

1291 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1292 return SignalStatus(status=status, reason=reason, delay=delay) 

1293 

1294 async def update(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1295 command = Command( 

1296 sent_at=time.time(), 

1297 command_type="Signal command", 

1298 user_id=current_user.id, 

1299 ) 

1300 response = await send_signal_value(self.signal_id, update_dict) 

1301 command.receive_response(response) 

1302 Command.create(command) 

1303 

1304 return response 

1305 

1306 @classmethod 

1307 def get_from_signal_id(cls, signal_id) -> Self: 

1308 """Could be generic from mongo""" 

1309 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1310 if not raw_value: 

1311 return None 

1312 del raw_value["_id"] 

1313 return cls.dict_to_object(raw_value) 

1314 

1315 @classmethod 

1316 def get_all_ids(cls) -> list[str]: 

1317 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1318 

1319 return [signal["signal_id"] for signal in cursor] 

1320 

1321 async def number_samples(self): 

1322 collection = get_signal_collection(signal_id=self.signal_id) 

1323 if collection is None: 

1324 return 0 

1325 

1326 number_samples = collection.estimated_document_count() 

1327 

1328 number_samples_async_collection = await get_async_collection( 

1329 systems_async_database, "number_samples", create=True, time_series=True 

1330 ) 

1331 

1332 loop = asyncio.get_running_loop() 

1333 loop.create_task( 

1334 number_samples_async_collection.insert_one( 

1335 { 

1336 "timestamp": datetime.datetime.now(pytz.UTC), 

1337 "signal_id": self.signal_id, 

1338 "number_samples": number_samples, 

1339 } 

1340 ) 

1341 ) 

1342 

1343 return number_samples 

1344 

1345 def sample_datasize(self): 

1346 return signals_database.command("collstats", self.signal_id)["size"] 

1347 

1348 @classmethod 

1349 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1350 result = cls.collection().aggregate( 

1351 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1352 ) 

1353 

1354 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1355 

1356 

1357class ServicesStatus(TwinPadModel): 

1358 backend: str 

1359 cloud_broker: str 

1360 time_series_database: str 

1361 signal_storage: str 

1362 heartbeat_storage: str 

1363 data_analyzer: str 

1364 

1365 @classmethod 

1366 def check(cls) -> Self: 

1367 return cls( 

1368 cloud_broker=ping(RABBITMQ_HOST), 

1369 backend="up", 

1370 time_series_database=ping(MONGO_HOST), 

1371 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1372 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1373 data_analyzer=ping(DATA_ANALYZER_HOST), 

1374 ) 

1375 

1376 

1377def ping(host): 

1378 try: 

1379 if ping3.ping(host, timeout=0.8): 

1380 return "up" 

1381 except PermissionError: 

1382 pass 

1383 return "down" 

1384 

1385 

1386class Event(GenericMongo): 

1387 collection_name: ClassVar[str] = "events" 

1388 

1389 name: str 

1390 timestamp: float 

1391 event_rule_id: str 

1392 

1393 @computed_field 

1394 @cached_property 

1395 def event_rule(self) -> "EventRule": 

1396 return EventRule.get_from_id(self.event_rule_id) 

1397 

1398 @classmethod 

1399 def dict_to_object(cls, dict_): 

1400 """Refine to convert timestamp to datetime for mongodb.""" 

1401 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1402 return super().dict_to_object(dict_) 

1403 

1404 

1405class EventDay(GenericMongo): 

1406 collection_name: ClassVar[str] = "number_events" 

1407 

1408 timestamp: float 

1409 number_events: int 

1410 

1411 @classmethod 

1412 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_events) -> list[Self]: 

1413 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1414 number_events_collection = get_collection(systems_database, "number_events") 

1415 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1416 items = [] 

1417 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1418 if number_events_collection is None or recompute_events: 

1419 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1420 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1421 if first_event is None: 

1422 return items 

1423 # compute from day of first found event 

1424 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1425 tzinfo=pytz.UTC 

1426 ) 

1427 while last_computed_day < TODAY: 

1428 day_nb_events = events_collection.count_documents( 

1429 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1430 ) 

1431 if day_nb_events > 0: 

1432 number_events_collection.insert_one( 

1433 {"timestamp": last_computed_day, "number_events": day_nb_events} 

1434 ) 

1435 last_computed_day += ONE_DAY_OFFSET 

1436 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1437 if number_events_today > 0: 

1438 number_events_collection.delete_one({"timestamp": TODAY}) 

1439 number_events_collection.insert_one({"timestamp": TODAY, "number_events": number_events_today}) 

1440 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1441 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1442 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1443 for day in number_events: 

1444 day["timestamp"] = day["timestamp"].timestamp() 

1445 items.append(cls.mongo_dict_to_object(day)) 

1446 return items 

1447 

1448 

1449class EventRule(GenericMongo): 

1450 collection_name: ClassVar[str] = "event_rules" 

1451 

1452 name: str 

1453 formula: str 

1454 variables: list[str] 

1455 

1456 @computed_field 

1457 @cached_property 

1458 def number_events(self) -> int: 

1459 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1460 

1461 

1462class Company(GenericMongo): 

1463 collection_name: ClassVar[str] = "companies" 

1464 name: str 

1465 

1466 

1467class Campaign(GenericMongo): 

1468 collection_name: ClassVar[str] = "campaigns" 

1469 

1470 # Properties 

1471 id: str | None = None 

1472 name: str 

1473 description: str | None = None 

1474 

1475 @classmethod 

1476 def create(cls, campaign: Self): 

1477 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1478 if new_campaign is None: 

1479 return None 

1480 return {"campaign_id": str(new_campaign.inserted_id)} 

1481 

1482 @classmethod 

1483 def update(cls, campaign: Self): 

1484 updated_campaign = cls.collection().find_one_and_update( 

1485 {"_id": ObjectId(campaign.id)}, 

1486 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1487 return_document=ReturnDocument.AFTER, 

1488 ) 

1489 return updated_campaign 

1490 

1491 @classmethod 

1492 def delete(cls, campaign_id): 

1493 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1494 return deleted_user 

1495 

1496 

1497class Phase(GenericMongo): 

1498 collection_name: ClassVar[str] = "phases" 

1499 

1500 # Properties 

1501 id: str | None = None 

1502 name: str 

1503 description: str | None = None 

1504 start_at: float 

1505 end_at: float 

1506 

1507 # FK 

1508 campaign_id: str 

1509 

1510 # @classmethod 

1511 # def get_by_date(cls, datetime: float): 

1512 # phases = [] 

1513 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1514 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1515 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1516 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1517 # if phases is None: 

1518 # return None 

1519 # return phases 

1520 

1521 @classmethod 

1522 def create(cls, phase: Self): 

1523 phase = Phase( 

1524 name=phase.name, 

1525 description=phase.description, 

1526 start_at=phase.start_at, 

1527 end_at=phase.end_at, 

1528 campaign_id=phase.campaign_id, 

1529 ) 

1530 phase_collection = get_collection(systems_database, "phases", create=True) 

1531 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1532 if new_phase is None: 

1533 return None 

1534 return {"phase_id": str(new_phase.inserted_id)} 

1535 

1536 @classmethod 

1537 def update(cls, phase: Self): 

1538 updated_phase = cls.collection().find_one_and_update( 

1539 {"_id": ObjectId(phase.id)}, 

1540 { 

1541 "$set": { 

1542 "name": phase.name, 

1543 "description": phase.description, 

1544 "start_at": phase.start_at, 

1545 "end_at": phase.end_at, 

1546 } 

1547 }, 

1548 return_document=ReturnDocument.AFTER, 

1549 ) 

1550 return updated_phase 

1551 

1552 @classmethod 

1553 def delete(cls, phase_id): 

1554 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1555 return delete_phase 

1556 

1557 @classmethod 

1558 def deleteMany(cls, campaign_id): 

1559 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1560 return delete_phases 

1561 

1562 

1563class CustomViewCreation(GenericMongo): 

1564 collection_name: ClassVar[str] = "custom_views" 

1565 

1566 name: str 

1567 configuration: list 

1568 

1569 

1570class CustomView(CustomViewCreation): 

1571 # Properties 

1572 id: str | None = None 

1573 

1574 # Foreign Key 

1575 user_id: str 

1576 

1577 # # Methods 

1578 # @classmethod 

1579 # def create(cls, form_custom_view: Self, user_id) -> list: 

1580 # custom_view = CustomView( 

1581 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1582 # ) 

1583 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1584 # return new_custom_view 

1585 

1586 # @classmethod 

1587 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1588 # updated_custom_view = cls.collection().find_one_and_update( 

1589 # {"_id": ObjectId(custom_view_id)}, 

1590 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1591 # return_document=ReturnDocument.AFTER, 

1592 # ) 

1593 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1594 # del updated_custom_view["_id"] 

1595 # return cls(**updated_custom_view) 

1596 

1597 # @classmethod 

1598 # def delete(cls, custom_view_id) -> bool: 

1599 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1600 # return deleted_custom_view.acknowledged 

1601 

1602 

1603CustomViewUpdate = create_update_model(CustomView) 

1604 

1605 

1606class Video(GenericMongo): 

1607 collection_name: ClassVar[str] = "videos" 

1608 

1609 # Properties 

1610 name: str 

1611 ip_addr: str 

1612 username: str | None = None 

1613 password: str | None = None 

1614 

1615 # Methods 

1616 @classmethod 

1617 def get_all(cls, sort_by="_id") -> list[Self]: 

1618 items = [] 

1619 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1620 items.append(cls.mongo_dict_to_object(dict_)) 

1621 return items 

1622 

1623 @classmethod 

1624 def get_video(cls, camera_id: ObjectId): 

1625 camera = cls.get_from_id(camera_id) 

1626 return camera.name 

1627 

1628 

1629class Command(GenericMongo): 

1630 collection_name: ClassVar[str] = "commands" 

1631 

1632 # Properties 

1633 timestamp: datetime.datetime = None 

1634 sent_at: float 

1635 response_time: float = 0.0 

1636 command_type: str 

1637 description: str = "" 

1638 succeeded: bool = False 

1639 

1640 # Foreign key 

1641 user_id: str 

1642 

1643 @classmethod 

1644 def collection(cls): 

1645 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1646 

1647 @classmethod 

1648 def create(cls, command: Self): 

1649 command = cls( 

1650 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1651 sent_at=command.sent_at, 

1652 response_time=command.response_time, 

1653 command_type=command.command_type, 

1654 description=command.description, 

1655 succeeded=command.succeeded, 

1656 user_id=command.user_id, 

1657 ) 

1658 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

1659 if new_command is None: 

1660 return None 

1661 return {"command_id": str(new_command.inserted_id)} 

1662 

1663 def receive_response(self, response: dict): 

1664 self.response_time = time.time() - self.sent_at 

1665 self.succeeded = response.get("error", True) is False 

1666 if self.description == "": 

1667 self.description += response.get("message", "").rstrip()