Coverage for  / usr / local / lib / python3.11 / site-packages / twinpad_backend / models.py: 97%

1171 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-20 06:48 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23import h5py 

24 

25# from scipy import signal as signal_scipy 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 get_signal_collections_batch, 

32 systems_database, 

33 systems_async_database, 

34 signals_database, 

35 devices_states_database, 

36) 

37from twinpad_backend.responses import ListResponse 

38from twinpad_backend.messages import send_mode_change, send_signal_value 

39 

40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

41 

42 

43RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

44MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

45SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

46HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

47DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

48 

49DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

50NUMBER_SAMPLES_DATABASE_UPDATE = 120 

51 

52logger = logging.getLogger("uvicorn.error") 

53 

54 

55class classproperty: 

56 """ 

57 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

58 Found here: https://stackoverflow.com/a/76301341 

59 """ 

60 

61 def __init__(self, func): 

62 self.fget = func 

63 

64 def __get__(self, _, owner): 

65 return self.fget(owner) 

66 

67 

68def create_update_model(model): 

69 fields = {} 

70 

71 for field_name, field_annotation in model.model_fields.items(): 

72 if field_name != "id": 

73 fields[field_name] = (field_annotation.annotation | None, None) 

74 

75 query_name = model.__name__ + "Update" 

76 return create_model(query_name, **fields) 

77 

78 

79def get_utc_date_from_timestamp(timestamp: float): 

80 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

81 

82 

83def downsample_list(time_vector: list, values: list, max_number_samples: int): 

84 if len(time_vector) < max_number_samples: 

85 return time_vector, values 

86 

87 time_vector_copy = copy.deepcopy(time_vector) 

88 values_copy = copy.deepcopy(values) 

89 

90 none_group_bounds = [] 

91 none_group_index = -1 

92 index = -1 

93 # LTTB doesn't handle None values so remove them 

94 while values_copy.count(None) > 0: 

95 # Store bounds of None value groups so we can insert them back after the downsampling 

96 if (new_index := values_copy.index(None)) != index: 

97 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

98 none_group_index += 1 

99 elif len(none_group_bounds[none_group_index]) < 2: 

100 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

101 else: 

102 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

103 values_copy.pop(new_index) 

104 index = new_index 

105 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

106 

107 try: 

108 values_array = npy.array([time_vector_copy, values_copy]).T 

109 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

110 

111 new_time_vector = interpolated_values[:, 0].tolist() 

112 new_values = interpolated_values[:, 1].tolist() 

113 except ValueError: 

114 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

115 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

116 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

117 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

118 return new_time_vector, new_values_nan_to_none 

119 

120 # insert back None values at the correct timestamps 

121 for none_group in none_group_bounds: 

122 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

123 new_time_vector[start_index:start_index] = none_group 

124 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

125 

126 return new_time_vector, new_values 

127 

128 

129def is_of_type(value, wanted_type): 

130 if wanted_type is float: 

131 return isinstance(value, (int, float)) 

132 return isinstance(value, wanted_type) 

133 

134 

135# Models 

136class TwinPadModel(BaseModel): 

137 @classmethod 

138 def dict_to_object(cls, dict_): 

139 return cls.model_validate(dict_) 

140 

141 def to_dict(self, exclude=None): 

142 dict_ = self.model_dump(exclude=exclude) 

143 return dict_ 

144 

145 

146class GenericMongo(TwinPadModel): 

147 id: str | None = None 

148 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

149 

150 @classmethod 

151 def collection(cls): 

152 return get_collection(systems_database, cls.collection_name, create=True) 

153 

154 @classmethod 

155 def response_from_query(cls, query) -> ListResponse[Self]: 

156 request_filters = query.mongodb_filter() 

157 items = [] 

158 

159 # Allows for multi-sort, Python dicts are ordered so no issue while sorting 

160 sort_dict = {} 

161 for sort in query.sort_by.split(","): 

162 if ":" in sort: 

163 sort_field, sort_order = sort.split(":") 

164 sort_order = int(sort_order) 

165 else: 

166 sort_field = sort 

167 sort_order = 1 

168 sort_dict[sort_field] = sort_order 

169 

170 collection = get_collection(systems_database, cls.collection_name, create=True) 

171 total = collection.count_documents(request_filters) 

172 

173 pipeline = [] 

174 added_properties = [] 

175 if "$and" in request_filters: 

176 for request_filter in request_filters["$and"]: 

177 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

178 if filtered_property in request_filter: 

179 pipeline.extend(pipeline_steps) 

180 added_properties.append(filtered_property) 

181 else: 

182 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

183 if filtered_property in request_filters: 

184 pipeline.extend(pipeline_steps) 

185 added_properties.append(filtered_property) 

186 pipeline.append({"$match": request_filters}) 

187 

188 for sort_field in sort_dict.keys(): 

189 if sort_field in cls.custom_pipeline_steps: 

190 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

191 added_properties.append(sort_field) 

192 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}]) 

193 

194 if (query.limit is not None) and (query.limit != 0): 

195 pipeline.append({"$limit": query.limit}) 

196 

197 for filtered_property, step in cls.custom_pipeline_steps.items(): 

198 if filtered_property not in added_properties: 

199 pipeline.extend(step) 

200 

201 cursor = collection.aggregate(pipeline) 

202 

203 for item_dict in cursor: 

204 items.append(cls.mongo_dict_to_object(item_dict)) 

205 

206 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

207 

208 @classmethod 

209 def get_from_id(cls, item_id) -> Self | None: 

210 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

211 

212 @classmethod 

213 def mongo_dict_to_object(cls, mongo_dict): 

214 mongo_dict["id"] = str(mongo_dict["_id"]) 

215 del mongo_dict["_id"] 

216 return cls.dict_to_object(mongo_dict) 

217 

218 @classmethod 

219 def get_by_attribute(cls, attribute_name: str, attribute_value): 

220 """Returns all items that match the attribute with value.""" 

221 pipeline = [] 

222 if attribute_name in cls.custom_pipeline_steps: 

223 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

224 pipeline.append({"$match": {attribute_name: attribute_value}}) 

225 for key, step in cls.custom_pipeline_steps.items(): 

226 if key != attribute_name: 

227 pipeline.extend(step) 

228 items = cls.collection().aggregate(pipeline) 

229 if items is None: 

230 return None 

231 return [cls.mongo_dict_to_object(d) for d in items] 

232 

233 @classmethod 

234 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

235 pipeline = [] 

236 if attribute_name in cls.custom_pipeline_steps: 

237 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

238 pipeline.append({"$match": {attribute_name: attribute_value}}) 

239 pipeline.append({"$limit": 1}) 

240 for key, step in cls.custom_pipeline_steps.items(): 

241 if key != attribute_name: 

242 pipeline.extend(step) 

243 items = cls.collection().aggregate(pipeline).to_list() 

244 if len(items) == 0: 

245 return None 

246 return cls.mongo_dict_to_object(items[0]) 

247 

248 @classmethod 

249 def get_all(cls, sort_by="_id") -> list[Self]: 

250 items = [] 

251 pipeline = [] 

252 if sort_by in cls.custom_pipeline_steps: 

253 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

254 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

255 for key, step in cls.custom_pipeline_steps.items(): 

256 if key != sort_by: 

257 pipeline.extend(step) 

258 for dict_ in cls.collection().aggregate(pipeline): 

259 items.append(cls.mongo_dict_to_object(dict_)) 

260 return items 

261 

262 @classmethod 

263 def get_number_documents(cls): 

264 collection = get_collection(systems_database, cls.collection_name) 

265 if collection is None: 

266 return 0 

267 return collection.count_documents({}) 

268 

269 def insert(self): 

270 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"})) 

271 self.id = str(insert_result.inserted_id) 

272 return self.id 

273 

274 def update(self, update_dict): 

275 for key, value in update_dict.items(): 

276 setattr(self, key, value) 

277 self.collection().find_one_and_update( 

278 {"_id": ObjectId(self.id)}, 

279 {"$set": update_dict}, 

280 return_document=ReturnDocument.AFTER, 

281 ) 

282 

283 return self 

284 

285 def delete(self): 

286 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

287 return result.deleted_count > 0 

288 

289 

290class User(GenericMongo): 

291 collection_name: ClassVar[str] = "users" 

292 

293 firstname: str 

294 lastname: str 

295 email: str 

296 password: str 

297 is_active: bool | None = False 

298 is_admin: bool | None = False 

299 is_connected: bool | None = False 

300 company_id: str | None = None 

301 

302 def to_dict(self, exclude=None): 

303 if exclude is None: 

304 exclude = {"password"} 

305 return GenericMongo.to_dict(self, exclude=exclude) 

306 

307 @classmethod 

308 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

309 users = cls.get_all() 

310 if not users: 

311 is_admin = True 

312 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

313 user_collection = get_collection(systems_database, "users", create=True) 

314 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

315 if new_user is None: 

316 return None 

317 return {"user_id": str(new_user.inserted_id)} 

318 

319 @classmethod 

320 def update_info(cls, user: "UserUpdate", user_id: str): 

321 updated_user = cls.collection().find_one_and_update( 

322 {"_id": ObjectId(user_id)}, 

323 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

324 return_document=ReturnDocument.AFTER, 

325 ) 

326 updated_user["id"] = str(updated_user["_id"]) 

327 del (updated_user["_id"], updated_user["is_connected"]) 

328 return cls(**updated_user) 

329 

330 

331UserUpdate = create_update_model(User) 

332 

333 

334class Mode(TwinPadModel): 

335 mode_id: int 

336 name: str 

337 frequency_multiplier: float 

338 min_frequency: float 

339 

340 

341class DeviceUpdate(TwinPadModel): 

342 mode_id: int 

343 

344 

345class Device(GenericMongo): 

346 collection_name: ClassVar[str] = "devices" 

347 

348 device_id: str 

349 name: str 

350 description: str = "" 

351 modes: list[Mode] 

352 current_mode_id: int | None = None 

353 last_ping: float | None = None 

354 petri_network: Any 

355 pid: Any 

356 load: float | None = None 

357 tokens: list[int] = Field(default_factory=list) 

358 status: str 

359 

360 async def change_mode(self, update_dict, current_user: User): 

361 has_error = False 

362 

363 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

364 has_error = True 

365 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

366 elif self.current_mode_id is not None: 

367 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

368 else: 

369 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

370 command = Command( 

371 sent_at=time.time(), 

372 command_type="Mode change", 

373 description=description, 

374 user_id=current_user.id, 

375 ) 

376 

377 if has_error: 

378 command.response_time = 0 

379 command.succeeded = False 

380 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

381 else: 

382 response = await send_mode_change(self.device_id, update_dict.mode_id) 

383 command.receive_response(response) 

384 

385 Command.create(command) 

386 return response 

387 

388 @classmethod 

389 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

390 devices_by_id = {} 

391 for signal_id in signal_ids: 

392 device_id = signal_id.split(".")[0] 

393 if device_id not in devices_by_id: 

394 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

395 return devices_by_id 

396 

397 

398class DeviceSetup(GenericMongo): 

399 collection_name: ClassVar[str] = "device_setups" 

400 

401 device_ids: list[str] 

402 active: bool = False 

403 variable_mapping: dict[str, str] 

404 

405 

406DeviceSetupUpdate = create_update_model(DeviceSetup) 

407 

408 

409class DeviceState(GenericMongo): 

410 collection_name: ClassVar[str] = "devices_states" 

411 

412 timestamp: float 

413 mode: str | None = None 

414 load: float | None = None 

415 tokens: list[int] = Field(default_factory=list) 

416 modified_properties: list[str] = Field(default_factory=list) 

417 

418 @classmethod 

419 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

420 req_filter = query.mongodb_filter() 

421 items = [] 

422 if ":" in query.sort_by: 

423 sort_field, sort_order = query.sort_by.split(":") 

424 sort_order = int(sort_order) 

425 else: 

426 sort_field = query.sort_by 

427 sort_order = 1 

428 collection = get_collection(devices_states_database, device_id) 

429 if collection is None: 

430 total = 0 

431 cursor = [] 

432 else: 

433 total = collection.count_documents(req_filter) 

434 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

435 if (query.limit is not None) and (query.limit != 0): 

436 cursor = cursor.limit(query.limit) 

437 for item_dict in cursor: 

438 items.append( 

439 cls( 

440 timestamp=item_dict.get("precise_timestamp"), 

441 mode=item_dict.get("mode", None), 

442 load=item_dict.get("load", None), 

443 tokens=item_dict.get("tokens", Field(default_factory=list)), 

444 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

445 ) 

446 ) 

447 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

448 

449 

450class SignalSample(TwinPadModel): 

451 signal_id: str 

452 timestamp: float 

453 value: float | int | str | bool | None 

454 forced_value: float | int | str | bool | None = None 

455 

456 @classmethod 

457 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

458 

459 collection = get_signal_collection(signal_id) 

460 if collection is None: 

461 return None 

462 

463 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

464 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

465 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

466 first_bucket = None 

467 if bucket is not None: 

468 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

469 if first_bucket is not None: 

470 sample_data = collection.find_one( 

471 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

472 ) 

473 else: 

474 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

475 

476 if sample_data is None: 

477 return None 

478 

479 timestamp = sample_data["precise_timestamp"] 

480 

481 return cls( 

482 signal_id=signal_id, 

483 timestamp=timestamp, 

484 value=sample_data.get("value", None), 

485 forced_value=sample_data.get("forced_value", None), 

486 ) 

487 

488 @classmethod 

489 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

490 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

491 

492 @classmethod 

493 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

494 

495 collection = get_signal_collection(signal_id) 

496 if collection is None: 

497 return None 

498 

499 # Same workaround as above function, very effective to narrow down big sets of data 

500 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

501 last_bucket = None 

502 if bucket is not None: 

503 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

504 if last_bucket is not None: 

505 sample_data = collection.find_one( 

506 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

507 sort={"precise_timestamp": -1}, 

508 ) 

509 else: 

510 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

511 

512 if sample_data is None: 

513 return None 

514 

515 timestamp = sample_data["precise_timestamp"] 

516 

517 if device is None: 

518 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

519 if device is not None and device.last_ping is not None: 

520 if timestamp is None: 

521 timestamp = device.last_ping 

522 else: 

523 timestamp = max(timestamp, device.last_ping) 

524 return cls( 

525 signal_id=signal_id, 

526 timestamp=timestamp, 

527 value=sample_data.get("value", None), 

528 forced_value=sample_data.get("forced_value", None), 

529 ) 

530 

531 @classmethod 

532 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

533 collection = get_signal_collection(signal_id) 

534 if collection is None: 

535 return None 

536 

537 sample_data = collection.find_one( 

538 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1} 

539 ) 

540 if sample_data is None: 

541 return None 

542 

543 return cls( 

544 signal_id=signal_id, 

545 timestamp=sample_data.get("precise_timestamp"), 

546 value=sample_data.get("value"), 

547 forced_value=sample_data.get("forced_value"), 

548 ) 

549 

550 @classmethod 

551 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

552 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

553 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

554 

555 @classmethod 

556 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

557 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

558 

559 

560class SignalData(TwinPadModel): 

561 signal_id: str 

562 forcible: bool = True 

563 time_vector: list[float] 

564 values: list[float | int | str | None] 

565 forced_values: list[float | int | str | None] 

566 

567 data_start: float | None = None 

568 data_end: float | None = None 

569 

570 number_samples: int = 0 

571 number_samples_db: int = 0 

572 

573 db_query_time: float = 0.0 

574 init_time: float = 0.0 

575 data_processing_time: float = 0.0 

576 

577 @classmethod 

578 def get_from_signal_id( 

579 cls, 

580 signal_id: str, 

581 min_timestamp: float = None, 

582 max_timestamp: float = None, 

583 window_min_timestamp: float = None, 

584 window_max_timestamp: float = None, 

585 interpolate_bounds: bool = True, 

586 max_documents: int = None, 

587 ) -> Self: 

588 

589 now = time.time() 

590 

591 req_signal = {} 

592 if min_timestamp is not None: 

593 req_signal.setdefault("timestamp", {}) 

594 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

595 if max_timestamp is not None: 

596 req_signal.setdefault("timestamp", {}) 

597 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

598 

599 collection = get_signal_collection(signal_id) 

600 if collection is None: 

601 return cls( 

602 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

603 ) 

604 

605 db_req_start = time.time() 

606 

607 sort_step = {"$sort": {"precise_timestamp": 1}} 

608 number_results = collection.count_documents(req_signal) 

609 

610 pipeline = [] 

611 if req_signal: 

612 pipeline.append({"$match": req_signal}) # Filter data if needed 

613 

614 pipeline.extend( 

615 [ 

616 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

617 sort_step, 

618 ] 

619 ) 

620 

621 if max_documents is not None and max_documents < number_results: 

622 unsampling_ratio = math.ceil(number_results / max_documents) 

623 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

624 pipeline.extend( 

625 [ 

626 { 

627 "$setWindowFields": { 

628 "sortBy": {"precise_timestamp": 1}, 

629 "output": {"index": {"$documentNumber": {}}}, 

630 } 

631 }, 

632 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

633 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

634 {"$replaceRoot": {"newRoot": "$doc"}}, 

635 {"$unset": ["index", "group_id"]}, 

636 {"$sort": {"precise_timestamp": 1}}, 

637 ] 

638 ) 

639 

640 # logger.info(f"pipeline: %s", str(pipeline)) 

641 cursor = collection.aggregate(pipeline) 

642 db_req_time = time.time() - db_req_start 

643 

644 init_time = time.time() 

645 

646 results = cursor.to_list() 

647 time_vector = [] 

648 values = [] 

649 forced_values = [] 

650 for s in results: 

651 time_vector.append(s["precise_timestamp"]) 

652 values.append(s.get("value", None)) 

653 forced_values.append(s.get("forced_value", None)) 

654 

655 signal = Signal.get_from_signal_id(signal_id) 

656 class_ = signal.signal_data_class 

657 

658 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

659 time_vector, values, forced_values = cls.interpolate_bounds( 

660 class_, 

661 collection, 

662 signal_id, 

663 time_vector, 

664 values, 

665 forced_values, 

666 window_min_timestamp, 

667 window_max_timestamp, 

668 ) 

669 

670 if values: 

671 # TODO: check below. a bit strange 

672 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

673 # Adding last value as it should be repeated 

674 time_vector.append(now) 

675 values.append(values[-1]) 

676 forced_values.append(forced_values[-1]) 

677 

678 init_time = time.time() - init_time 

679 

680 # See line 292 for explanation 

681 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

682 first_bucket = None 

683 if bucket is not None: 

684 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

685 if first_bucket is not None: 

686 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

687 else: 

688 data_start = None 

689 

690 last_bucket = None 

691 if bucket is not None: 

692 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

693 if last_bucket is not None: 

694 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

695 else: 

696 data_end = None 

697 

698 return class_( 

699 signal_id=signal_id, 

700 forcible=signal.forcible, 

701 time_vector=time_vector, 

702 values=values, 

703 forced_values=forced_values, 

704 data_start=data_start, 

705 data_end=data_end, 

706 number_samples=len(values), 

707 number_samples_db=number_results, 

708 db_query_time=db_req_time, 

709 init_time=init_time, 

710 ) 

711 

712 @staticmethod 

713 def interpolate_bounds( 

714 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

715 ): 

716 sample_right = None 

717 # Fetching right side value & interpolation 

718 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

719 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

720 sample_right = collection.find_one( 

721 { 

722 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

723 "value": {"$exists": True}, 

724 }, 

725 sort={"precise_timestamp": -1}, 

726 ) 

727 if sample_right: 

728 if time_vector: 

729 right_sd = class_( 

730 signal_id=signal_id, 

731 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

732 values=[values[-1], sample_right.get("value", None)], 

733 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

734 ) 

735 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

736 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

737 else: 

738 max_ts_value = sample_right.get("value", None) 

739 max_ts_forced_value = sample_right.get("forced_value", None) 

740 time_vector.append(window_max_timestamp) 

741 values.append(max_ts_value) 

742 forced_values.append(max_ts_forced_value) 

743 

744 # Fetching left side value & interpolation 

745 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

746 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

747 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

748 sample_left = sample_right 

749 sample_left = collection.find_one( 

750 { 

751 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

752 "value": {"$exists": True}, 

753 }, 

754 sort={"precise_timestamp": -1}, 

755 ) 

756 

757 if sample_left: 

758 if time_vector: 

759 left_sd = class_( 

760 signal_id=signal_id, 

761 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

762 values=[sample_left["value"], values[0]], 

763 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

764 ) 

765 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

766 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

767 else: 

768 min_ts_value = sample_left.get("value", None) 

769 min_ts_forced_value = sample_left.get("forced_value", None) 

770 time_vector.insert(0, window_min_timestamp) 

771 values.insert(0, min_ts_value) 

772 forced_values.insert(0, min_ts_forced_value) 

773 

774 return time_vector, values, forced_values 

775 

776 def interpolate_values(self, new_time_vector: list[float]): 

777 return self.interpolate(new_time_vector, self.values) 

778 

779 def interpolate_forced_values(self, new_time_vector: list[float]): 

780 return self.interpolate(new_time_vector, self.forced_values) 

781 

782 def uniform_desampling(self, number_samples_max: int) -> Self: 

783 data_processing_time = time.time() 

784 if number_samples_max and self.number_samples > number_samples_max: 

785 new_time_vector = npy.linspace( 

786 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

787 ).tolist() 

788 values = self.interpolate_values(new_time_vector) 

789 forced_values = self.interpolate_forced_values(new_time_vector) 

790 time_vector = new_time_vector 

791 number_samples = len(time_vector) 

792 else: 

793 time_vector = self.time_vector 

794 number_samples = len(self.values) 

795 values = self.values[:] 

796 forced_values = self.forced_values[:] 

797 data_processing_time = time.time() - data_processing_time 

798 

799 return self.__class__( 

800 signal_id=self.signal_id, 

801 time_vector=time_vector, 

802 values=values, 

803 forced_values=forced_values, 

804 number_samples=number_samples, 

805 number_samples_db=self.number_samples, 

806 data_start=self.data_start, 

807 data_end=self.data_end, 

808 db_query_time=self.db_query_time, 

809 init_time=self.init_time, 

810 data_processing_time=self.data_processing_time + data_processing_time, 

811 ) 

812 

813 def interest_window_desampling( 

814 self, 

815 window_max_number_samples: int, 

816 outside_max_number_samples: int, 

817 window_min_timestamp: float | None = None, 

818 window_max_timestamp: float | None = None, 

819 ) -> Self: 

820 """Performs a sampling in a window of interest and outside.""" 

821 

822 if not self.time_vector: 

823 return self 

824 

825 if window_min_timestamp is None: 

826 window_min_timestamp = self.time_vector[0] 

827 if window_max_timestamp is None: 

828 window_max_timestamp = self.time_vector[-1] 

829 

830 data_processing_time = time.time() 

831 

832 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

833 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

834 

835 time_vector_before = self.time_vector[:index_window_start] 

836 time_vector_window = self.time_vector[index_window_start:index_window_end] 

837 time_vector_after = self.time_vector[index_window_end:] 

838 

839 # Resampling window 

840 if time_vector_window: 

841 # Ensurring window bounds 

842 if time_vector_window[0] != window_min_timestamp: 

843 time_vector_window.insert(0, window_min_timestamp) 

844 if time_vector_window[-1] != window_max_timestamp: 

845 time_vector_window.append(window_max_timestamp) 

846 else: 

847 time_vector_window = [window_min_timestamp, window_max_timestamp] 

848 

849 if len(time_vector_window) > window_max_number_samples: 

850 # Resampling 

851 new_window_time_vector = npy.linspace( 

852 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

853 ).tolist() 

854 time_vector_window = new_window_time_vector 

855 

856 # Resampling outside 

857 number_samples_before = len(time_vector_before) 

858 number_samples_after = len(time_vector_after) 

859 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

860 new_number_samples_before = min( 

861 number_samples_before, 

862 math.ceil( 

863 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

864 ), 

865 ) 

866 new_number_samples_after = min( 

867 number_samples_after, 

868 math.ceil( 

869 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

870 ), 

871 ) 

872 # Adjusting numbers as math.ceil can do +1 on sum 

873 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

874 if new_number_samples_before > new_number_samples_after: 

875 new_number_samples_before -= 1 

876 else: 

877 new_number_samples_after -= 1 

878 

879 if new_number_samples_before > 0: 

880 new_time_vector_before = npy.linspace( 

881 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

882 ).tolist() 

883 time_vector_before = new_time_vector_before 

884 

885 if new_number_samples_after > 0: 

886 new_time_vector_after = npy.linspace( 

887 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

888 ).tolist()[::-1] 

889 time_vector_after = new_time_vector_after 

890 

891 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

892 values = self.interpolate_values(new_time_vector) 

893 forced_values = self.interpolate_forced_values(new_time_vector) 

894 number_samples = len(values) 

895 

896 data_processing_time = time.time() - data_processing_time 

897 

898 return self.__class__( 

899 signal_id=self.signal_id, 

900 forcible=self.forcible, 

901 time_vector=new_time_vector, 

902 values=values, 

903 forced_values=forced_values, 

904 number_samples=number_samples, 

905 number_samples_db=self.number_samples, 

906 data_start=self.data_start, 

907 data_end=self.data_end, 

908 db_query_time=self.db_query_time, 

909 init_time=self.init_time, 

910 data_processing_time=self.data_processing_time + data_processing_time, 

911 ) 

912 

913 def csv_export(self): 

914 output = io.StringIO() 

915 writer = csv.writer(output) 

916 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

917 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

918 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

919 return output.getvalue().encode("utf-8") 

920 

921 def prestoplot_export(self): 

922 clean_signal_id = self.signal_id.replace(".", "_") 

923 if clean_signal_id[0].isnumeric(): 

924 clean_signal_id = "_" + clean_signal_id 

925 

926 output = io.StringIO() 

927 output.write("# Encoding:\tUTF-8\n") 

928 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

929 output.write("ISO8601\tnone\tnone\n") 

930 output.write(f"# Description :\t{clean_signal_id}\n") 

931 

932 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

933 output.write( 

934 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

935 ) 

936 return output.getvalue().encode("utf-8") 

937 

938 

939class NumericSignalData(SignalData): 

940 data_type: str = "float" 

941 values: list[float | int | None] 

942 forced_values: list[float | int | None] 

943 

944 def interpolate(self, new_time_vector: list[float], items): 

945 items = [npy.nan if s is None else s for s in items] 

946 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

947 

948 def uniform_desampling(self, number_samples_max: int) -> Self: 

949 data_processing_time = time.time() 

950 if number_samples_max and self.number_samples > number_samples_max: 

951 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

952 forced_values = self.interpolate_forced_values(time_vector) 

953 number_samples = len(time_vector) 

954 else: 

955 time_vector = self.time_vector 

956 number_samples = len(self.values) 

957 values = self.values[:] 

958 forced_values = self.forced_values[:] 

959 data_processing_time = time.time() - data_processing_time 

960 

961 return self.__class__( 

962 signal_id=self.signal_id, 

963 time_vector=time_vector, 

964 values=values, 

965 forced_values=forced_values, 

966 number_samples=number_samples, 

967 number_samples_db=self.number_samples, 

968 data_start=self.data_start, 

969 data_end=self.data_end, 

970 db_query_time=self.db_query_time, 

971 init_time=self.init_time, 

972 data_processing_time=self.data_processing_time + data_processing_time, 

973 ) 

974 

975 def interest_window_desampling( 

976 self, 

977 window_max_number_samples: int, 

978 outside_max_number_samples: int, 

979 window_min_timestamp: float | None = None, 

980 window_max_timestamp: float | None = None, 

981 ) -> Self: 

982 """Performs a sampling in a window of interest and outside.""" 

983 

984 if not self.time_vector: 

985 return self 

986 

987 if window_min_timestamp is None: 

988 window_min_timestamp = self.time_vector[0] 

989 if window_max_timestamp is None: 

990 window_max_timestamp = self.time_vector[-1] 

991 

992 data_processing_time = time.time() 

993 

994 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

995 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

996 

997 time_vector_before = self.time_vector[:index_window_start] 

998 time_vector_window = self.time_vector[index_window_start:index_window_end] 

999 time_vector_after = self.time_vector[index_window_end:] 

1000 

1001 values_before = self.values[:index_window_start] 

1002 values_window = self.values[index_window_start:index_window_end] 

1003 values_after = self.values[index_window_end:] 

1004 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

1005 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

1006 

1007 # Resampling window 

1008 if time_vector_window: 

1009 # Ensurring window bounds 

1010 if time_vector_window[0] != window_min_timestamp: 

1011 time_vector_window.insert(0, window_min_timestamp) 

1012 values_window.insert(0, window_min_value) 

1013 if time_vector_window[-1] != window_max_timestamp: 

1014 time_vector_window.append(window_max_timestamp) 

1015 values_window.append(window_max_value) 

1016 else: 

1017 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1018 values_window = [window_min_value, window_max_value] 

1019 

1020 if len(time_vector_window) > window_max_number_samples: 

1021 # Resampling 

1022 time_vector_window, values_window = downsample_list( 

1023 time_vector_window, values_window, window_max_number_samples 

1024 ) 

1025 

1026 # Resampling outside 

1027 number_samples_before = len(time_vector_before) 

1028 number_samples_after = len(time_vector_after) 

1029 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1030 new_number_samples_before = min( 

1031 number_samples_before, 

1032 math.ceil( 

1033 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1034 ), 

1035 ) 

1036 new_number_samples_after = min( 

1037 number_samples_after, 

1038 math.ceil( 

1039 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1040 ), 

1041 ) 

1042 # Adjusting numbers as math.ceil can do +1 on sum 

1043 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1044 if new_number_samples_before > new_number_samples_after: 

1045 new_number_samples_before -= 1 

1046 else: 

1047 new_number_samples_after -= 1 

1048 

1049 if new_number_samples_before > 0: 

1050 time_vector_before, values_before = downsample_list( 

1051 time_vector_before, values_before, new_number_samples_before 

1052 ) 

1053 

1054 if new_number_samples_after > 0: 

1055 time_vector_after, values_after = downsample_list( 

1056 time_vector_after, values_after, new_number_samples_after 

1057 ) 

1058 

1059 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1060 values = values_before + values_window + values_after 

1061 forced_values = self.interpolate_forced_values(new_time_vector) 

1062 number_samples = len(values) 

1063 

1064 data_processing_time = time.time() - data_processing_time 

1065 

1066 return self.__class__( 

1067 signal_id=self.signal_id, 

1068 time_vector=new_time_vector, 

1069 values=values, 

1070 forced_values=forced_values, 

1071 number_samples=number_samples, 

1072 number_samples_db=self.number_samples, 

1073 data_start=self.data_start, 

1074 data_end=self.data_end, 

1075 db_query_time=self.db_query_time, 

1076 init_time=self.init_time, 

1077 data_processing_time=self.data_processing_time + data_processing_time, 

1078 ) 

1079 

1080 

1081class StringSignalData(SignalData): 

1082 data_type: str = "str" 

1083 values: list[str | None] 

1084 forced_values: list[str | None] 

1085 

1086 def interpolate(self, new_time_vector: list[float], items): 

1087 # Find the indices of the values in xp that are just smaller or equal to x 

1088 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1089 indices = npy.clip(indices, 0, len(items) - 1) 

1090 # Return the corresponding left string values from fp 

1091 return [items[i] for i in indices] 

1092 

1093 

1094class SignalsData(TwinPadModel): 

1095 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1096 data_processing_time: float 

1097 data_start: float | None 

1098 data_end: float | None 

1099 

1100 @classmethod 

1101 def get_from_signal_ids( 

1102 cls, 

1103 signal_ids: list[str], 

1104 min_timestamp: float = None, 

1105 max_timestamp: float = None, 

1106 window_min_timestamp: float = None, 

1107 window_max_timestamp: float = None, 

1108 interpolate_bounds: bool = True, 

1109 max_documents: int = None, 

1110 ) -> Self: 

1111 signals_data = [] 

1112 data_start = None 

1113 data_end = None 

1114 if max_timestamp is None: 

1115 max_timestamp = time.time() 

1116 data_processing_time = 0.0 

1117 for signal_id in signal_ids: 

1118 signal_data = SignalData.get_from_signal_id( 

1119 signal_id=signal_id, 

1120 min_timestamp=min_timestamp, 

1121 max_timestamp=max_timestamp, 

1122 window_min_timestamp=window_min_timestamp, 

1123 window_max_timestamp=window_max_timestamp, 

1124 interpolate_bounds=interpolate_bounds, 

1125 max_documents=max_documents, 

1126 ) 

1127 data_processing_time += signal_data.data_processing_time 

1128 signals_data.append(signal_data) 

1129 if signal_data.data_start is not None: 

1130 if data_start is None: 

1131 data_start = signal_data.data_start 

1132 else: 

1133 data_start = min(signal_data.data_start, data_start) 

1134 if signal_data.data_end is not None: 

1135 if data_end is None: 

1136 data_end = signal_data.data_end 

1137 else: 

1138 data_end = max(signal_data.data_end, data_end) 

1139 

1140 return cls( 

1141 signals_data=signals_data, 

1142 data_processing_time=data_processing_time, 

1143 data_start=data_start, 

1144 data_end=data_end, 

1145 ) 

1146 

1147 def uniform_desampling(self, number_samples_max: int) -> Self: 

1148 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1149 return SignalsData( 

1150 signals_data=signals_data, 

1151 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1152 data_start=self.data_start, 

1153 data_end=self.data_end, 

1154 ) 

1155 

1156 def interest_window_desampling( 

1157 self, 

1158 window_max_number_samples: int, 

1159 outside_max_number_samples: int, 

1160 window_min_timestamp: float = None, 

1161 window_max_timestamp: float = None, 

1162 ) -> Self: 

1163 signals_data = [ 

1164 s.interest_window_desampling( 

1165 window_max_number_samples=window_max_number_samples, 

1166 outside_max_number_samples=outside_max_number_samples, 

1167 window_min_timestamp=window_min_timestamp, 

1168 window_max_timestamp=window_max_timestamp, 

1169 ) 

1170 for s in self.signals_data 

1171 ] 

1172 

1173 return SignalsData( 

1174 signals_data=signals_data, 

1175 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1176 data_start=self.data_start, 

1177 data_end=self.data_end, 

1178 ) 

1179 

1180 def zip_export(self, file_format: str = "csv"): 

1181 # return self.signals_data[0].csv_export() 

1182 zip_buffer = io.BytesIO() 

1183 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1184 for signal_data in self.signals_data: 

1185 if file_format == "csv": 

1186 export_io = signal_data.csv_export() 

1187 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1188 elif file_format == "prestoplot": 

1189 export_io = signal_data.prestoplot_export() 

1190 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1191 else: 

1192 raise ValueError(f"Format not found. Got: {file_format}") 

1193 zip_bytes = zip_buffer.getvalue() 

1194 # zip_bytes.seek(0) 

1195 return zip_bytes 

1196 

1197 def hdf5_export(self): 

1198 hdf5_buffer = io.BytesIO() 

1199 custom_type_float = npy.dtype( 

1200 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1201 ) 

1202 custom_type_string = npy.dtype( 

1203 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1204 ) 

1205 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1206 for signal_data in self.signals_data: 

1207 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1208 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1209 if signal_data.data_type == "str": 

1210 export_data = npy.array( 

1211 list( 

1212 zip( 

1213 date_vector, 

1214 signal_data.time_vector, 

1215 signal_data.values, 

1216 signal_data.forced_values, 

1217 ) 

1218 ), 

1219 dtype=custom_type_string, 

1220 ) 

1221 else: 

1222 export_data = npy.array( 

1223 list( 

1224 zip( 

1225 date_vector, 

1226 signal_data.time_vector, 

1227 signal_data.values, 

1228 signal_data.forced_values, 

1229 ) 

1230 ), 

1231 dtype=custom_type_float, 

1232 ) 

1233 signal_group["data"] = export_data 

1234 return hdf5_buffer.getvalue() 

1235 

1236 

1237class SignalStatus(TwinPadModel): 

1238 status: str = "down" 

1239 reason: str = "" 

1240 delay: float | None = None 

1241 

1242 

1243class DigitizationFunction(TwinPadModel): 

1244 bits: int | None = None 

1245 min_value: float 

1246 max_value: float 

1247 min_raw_value: float 

1248 max_raw_value: float 

1249 

1250 

1251class SignalUpdate(TwinPadModel): 

1252 value: float | str | bool | int | None = None 

1253 forced_value: float | str | bool | int | None = None 

1254 timestamp: int | None = None 

1255 

1256 

1257class SignalType(str, Enum): 

1258 command = "command" 

1259 sensor = "sensor" 

1260 external_sensor = "external_sensor" 

1261 

1262 

1263SIGNALDATA_TYPES = { 

1264 "int": NumericSignalData, 

1265 "float": NumericSignalData, 

1266 "str": StringSignalData, 

1267 "bool": NumericSignalData, 

1268 "epoch": NumericSignalData, 

1269} 

1270 

1271 

1272class Signal(GenericMongo): 

1273 collection_name: ClassVar[str] = "signals" 

1274 

1275 signal_id: str 

1276 frequency: float 

1277 unit: str | None 

1278 description: str 

1279 type: SignalType 

1280 data_type: str 

1281 precision_digits: int | None 

1282 forcible: bool 

1283 status: SignalStatus = SignalStatus() 

1284 

1285 digitization_function: DigitizationFunction | None 

1286 

1287 @property 

1288 def device(self) -> Device: 

1289 device_id = self.signal_id.split(".")[0] 

1290 device = Device.get_one_by_attribute("device_id", device_id) 

1291 return device 

1292 

1293 @cached_property 

1294 def signal_data_class(self): 

1295 if self.data_type in SIGNALDATA_TYPES: 

1296 return SIGNALDATA_TYPES[self.data_type] 

1297 if self.data_type.startswith("enum"): 

1298 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1299 raise ValueError(f"Unhandled python type: {self.data_type}") 

1300 

1301 @cached_property 

1302 def python_type(self): 

1303 if self.data_type in TYPES: 

1304 return TYPES[self.data_type] 

1305 if self.data_type.startswith("enum"): 

1306 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1307 return Literal[*choices] 

1308 raise ValueError(f"Unhandled python type: {self.data_type}") 

1309 

1310 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1311 command = Command( 

1312 sent_at=time.time(), 

1313 command_type="Signal command", 

1314 user_id=current_user.id, 

1315 ) 

1316 

1317 has_input_error = False 

1318 error_message = "" 

1319 

1320 if self.data_type.startswith("enum"): 

1321 enum_options = get_args(self.python_type) 

1322 

1323 if update_dict.value is not None and update_dict.value not in enum_options: 

1324 has_input_error = True 

1325 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1326 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1327 has_input_error = True 

1328 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1329 else: 

1330 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1331 has_input_error = True 

1332 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1333 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1334 has_input_error = True 

1335 error_message += ( 

1336 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1337 ) 

1338 

1339 if has_input_error: 

1340 command.response_time = 0 

1341 command.succeeded = False 

1342 command.description = f"Tried to modify signal {self.signal_id}" 

1343 response = {"error": True, "status_code": 400, "message": error_message} 

1344 else: 

1345 response = await send_signal_value(self.signal_id, update_dict) 

1346 command.receive_response(response) 

1347 

1348 Command.create(command) 

1349 return response 

1350 

1351 @classmethod 

1352 def get_from_signal_id(cls, signal_id) -> Self: 

1353 """Could be generic from mongo""" 

1354 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1355 if not raw_value: 

1356 return None 

1357 del raw_value["_id"] 

1358 return cls.dict_to_object(raw_value) 

1359 

1360 @classmethod 

1361 def get_all_ids(cls) -> list[str]: 

1362 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1363 

1364 return [signal["signal_id"] for signal in cursor] 

1365 

1366 @classmethod 

1367 def get_all_statuses(cls) -> list[dict]: 

1368 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}]) 

1369 

1370 return [ 

1371 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]} 

1372 for signal in cursor 

1373 ] 

1374 

1375 async def number_samples(self): 

1376 collection = get_signal_collection(signal_id=self.signal_id) 

1377 if collection is None: 

1378 return 0 

1379 

1380 number_samples = collection.estimated_document_count() 

1381 

1382 number_samples_async_collection = await get_async_collection( 

1383 systems_async_database, "number_samples", create=True, time_series=True 

1384 ) 

1385 

1386 loop = asyncio.get_running_loop() 

1387 loop.create_task( 

1388 number_samples_async_collection.insert_one( 

1389 { 

1390 "timestamp": datetime.datetime.now(pytz.UTC), 

1391 "signal_id": self.signal_id, 

1392 "number_samples": number_samples, 

1393 } 

1394 ) 

1395 ) 

1396 

1397 return number_samples 

1398 

1399 @classmethod 

1400 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]: 

1401 number_samples_by_id = {} 

1402 collections = get_signal_collections_batch(signal_ids) 

1403 number_samples_async_collection = await get_async_collection( 

1404 systems_async_database, "number_samples", create=True, time_series=True 

1405 ) 

1406 

1407 for signal_id, collection in zip(signal_ids, collections): 

1408 if collection is None: 

1409 number_samples_by_id[signal_id] = 0 

1410 continue 

1411 

1412 number_samples = collection.estimated_document_count() 

1413 

1414 number_samples_by_id[signal_id] = number_samples 

1415 

1416 now = datetime.datetime.now(pytz.UTC) 

1417 loop = asyncio.get_running_loop() 

1418 loop.create_task( 

1419 number_samples_async_collection.insert_many( 

1420 [ 

1421 { 

1422 "timestamp": now, 

1423 "signal_id": signal_id, 

1424 "number_samples": number_samples, 

1425 } 

1426 for signal_id, number_samples in number_samples_by_id.items() 

1427 ] 

1428 ) 

1429 ) 

1430 

1431 return number_samples_by_id 

1432 

1433 def sample_datasize(self): 

1434 return signals_database.command("collstats", self.signal_id)["size"] 

1435 

1436 @classmethod 

1437 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1438 result = cls.collection().aggregate( 

1439 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1440 ) 

1441 

1442 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1443 

1444 

1445class ForcedSignal(GenericMongo): 

1446 collection_name: ClassVar[str] = "forced_signals" 

1447 

1448 signal_id: str 

1449 forcing_user_id: str 

1450 forced_at: float 

1451 value: str | float 

1452 

1453 def insert(self): 

1454 insert_result = self.collection().find_one_and_update( 

1455 {"signal_id": self.signal_id}, 

1456 {"$set": self.to_dict(exclude={"id"})}, 

1457 upsert=True, 

1458 return_document=ReturnDocument.AFTER, 

1459 ) 

1460 self.id = str(insert_result["_id"]) 

1461 return self.id 

1462 

1463 @classmethod 

1464 def can_force(cls, signal_id: str, current_user: User) -> bool: 

1465 """Checks whether user can force a given signal. 

1466 

1467 :param signal_id: Signal ID of the signal to force 

1468 :type signal_id: str 

1469 :param current_user: Current user 

1470 :type current_user: User 

1471 :return: False if the signal was forced by someone else than the user, True otherwise 

1472 :rtype: bool 

1473 """ 

1474 forced_signal = cls.get_one_by_attribute("signal_id", signal_id) 

1475 if forced_signal is not None: 

1476 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin: 

1477 return False 

1478 return True 

1479 

1480 

1481class ServicesStatus(TwinPadModel): 

1482 backend: str 

1483 cloud_broker: str 

1484 time_series_database: str 

1485 signal_storage: str 

1486 heartbeat_storage: str 

1487 data_analyzer: str 

1488 

1489 @classmethod 

1490 def check(cls) -> Self: 

1491 return cls( 

1492 cloud_broker=ping(RABBITMQ_HOST), 

1493 backend="up", 

1494 time_series_database=ping(MONGO_HOST), 

1495 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1496 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1497 data_analyzer=ping(DATA_ANALYZER_HOST), 

1498 ) 

1499 

1500 

1501def ping(host): 

1502 try: 

1503 if ping3.ping(host, timeout=0.8): 

1504 return "up" 

1505 except PermissionError: 

1506 pass 

1507 return "down" 

1508 

1509 

1510class Event(GenericMongo): 

1511 collection_name: ClassVar[str] = "events" 

1512 

1513 name: str 

1514 timestamp: float 

1515 event_rule_id: str 

1516 

1517 @computed_field 

1518 @cached_property 

1519 def event_rule(self) -> "EventRule": 

1520 return EventRule.get_from_id(self.event_rule_id) 

1521 

1522 @classmethod 

1523 def dict_to_object(cls, dict_): 

1524 """Refine to convert timestamp to datetime for mongodb.""" 

1525 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1526 return super().dict_to_object(dict_) 

1527 

1528 

1529class TwinPadActivity(GenericMongo): 

1530 timestamp: float 

1531 amount: int 

1532 

1533 @classmethod 

1534 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

1535 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1536 number_events_collection = get_collection(systems_database, "number_events") 

1537 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1538 items = [] 

1539 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1540 if number_events_collection is None or recompute_amount: 

1541 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1542 number_events_collection.delete_many({}) 

1543 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1544 if first_event is None: 

1545 return items 

1546 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1547 tzinfo=pytz.UTC 

1548 ) 

1549 while last_computed_day < TODAY: 

1550 day_nb_events = events_collection.count_documents( 

1551 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1552 ) 

1553 if day_nb_events > 0: 

1554 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

1555 last_computed_day += ONE_DAY_OFFSET 

1556 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1557 if number_events_today > 0: 

1558 number_events_collection.delete_many({"timestamp": TODAY}) 

1559 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

1560 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1561 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1562 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1563 for day in number_events: 

1564 day["timestamp"] = day["timestamp"].timestamp() 

1565 items.append(cls.mongo_dict_to_object(day)) 

1566 return items 

1567 

1568 @classmethod 

1569 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1570 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1571 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1572 signals_number_samples_collection = get_collection( 

1573 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

1574 ) 

1575 items = [] 

1576 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1577 if number_samples_collection is None or recompute_amount: 

1578 number_samples_collection = get_collection( 

1579 systems_database, "number_received_samples", create=True, time_series=True 

1580 ) 

1581 number_samples_collection.delete_many({}) 

1582 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

1583 if first_sample is None: 

1584 return items 

1585 # compute from day of first found event 

1586 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

1587 tzinfo=pytz.UTC 

1588 ) 

1589 while last_computed_day < TODAY: 

1590 number_samples_request = signals_number_samples_collection.aggregate( 

1591 [ 

1592 { 

1593 "$match": { 

1594 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

1595 } 

1596 }, 

1597 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1598 ] 

1599 ).to_list() 

1600 if len(number_samples_request) == 0: 

1601 number_samples = 0 

1602 else: 

1603 number_samples = number_samples_request[0].get("number_samples", 0) 

1604 if number_samples > 0: 

1605 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

1606 last_computed_day += ONE_DAY_OFFSET 

1607 number_samples_request = signals_number_samples_collection.aggregate( 

1608 [ 

1609 {"$match": {"timestamp": {"$gte": TODAY}}}, 

1610 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1611 ] 

1612 ).to_list() 

1613 if len(number_samples_request) == 0: 

1614 number_samples_today = 0 

1615 else: 

1616 number_samples_today = number_samples_request[0].get("number_samples", 0) 

1617 if number_samples_today > 0: 

1618 number_samples_collection.delete_many({"timestamp": TODAY}) 

1619 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

1620 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1621 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1622 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1623 for day in number_events: 

1624 day["timestamp"] = day["timestamp"].timestamp() 

1625 items.append(cls.mongo_dict_to_object(day)) 

1626 return items 

1627 

1628 @classmethod 

1629 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1630 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1631 number_commands_collection = get_collection(systems_database, "number_commands") 

1632 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

1633 items = [] 

1634 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1635 if number_commands_collection is None or recompute_amount: 

1636 number_commands_collection = get_collection( 

1637 systems_database, "number_commands", create=True, time_series=True 

1638 ) 

1639 number_commands_collection.delete_many({}) 

1640 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

1641 if first_command is None: 

1642 return items 

1643 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

1644 tzinfo=pytz.UTC 

1645 ) 

1646 while last_computed_day < TODAY: 

1647 day_nb_commands = commands_collection.count_documents( 

1648 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1649 ) 

1650 if day_nb_commands > 0: 

1651 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

1652 last_computed_day += ONE_DAY_OFFSET 

1653 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1654 if number_commands_today > 0: 

1655 number_commands_collection.delete_many({"timestamp": TODAY}) 

1656 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

1657 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1658 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1659 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1660 for day in number_commands: 

1661 day["timestamp"] = day["timestamp"].timestamp() 

1662 items.append(cls.mongo_dict_to_object(day)) 

1663 return items 

1664 

1665 

1666class EventRule(GenericMongo): 

1667 collection_name: ClassVar[str] = "event_rules" 

1668 

1669 name: str 

1670 formula: str 

1671 variables: list[str] 

1672 

1673 @computed_field 

1674 @cached_property 

1675 def number_events(self) -> int: 

1676 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1677 

1678 

1679class Company(GenericMongo): 

1680 collection_name: ClassVar[str] = "companies" 

1681 name: str 

1682 

1683 

1684class Campaign(GenericMongo): 

1685 collection_name: ClassVar[str] = "campaigns" 

1686 

1687 # Properties 

1688 id: str | None = None 

1689 name: str 

1690 description: str | None = None 

1691 

1692 @classmethod 

1693 def create(cls, campaign: Self): 

1694 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1695 if new_campaign is None: 

1696 return None 

1697 return {"campaign_id": str(new_campaign.inserted_id)} 

1698 

1699 @classmethod 

1700 def update(cls, campaign: Self): 

1701 updated_campaign = cls.collection().find_one_and_update( 

1702 {"_id": ObjectId(campaign.id)}, 

1703 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1704 return_document=ReturnDocument.AFTER, 

1705 ) 

1706 return updated_campaign 

1707 

1708 @classmethod 

1709 def delete(cls, campaign_id): 

1710 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1711 return deleted_user 

1712 

1713 

1714class Phase(GenericMongo): 

1715 collection_name: ClassVar[str] = "phases" 

1716 

1717 # Properties 

1718 id: str | None = None 

1719 name: str 

1720 description: str | None = None 

1721 start_at: float 

1722 end_at: float 

1723 

1724 # FK 

1725 campaign_id: str 

1726 

1727 # @classmethod 

1728 # def get_by_date(cls, datetime: float): 

1729 # phases = [] 

1730 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1731 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1732 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1733 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1734 # if phases is None: 

1735 # return None 

1736 # return phases 

1737 

1738 @classmethod 

1739 def create(cls, phase: Self): 

1740 phase = Phase( 

1741 name=phase.name, 

1742 description=phase.description, 

1743 start_at=phase.start_at, 

1744 end_at=phase.end_at, 

1745 campaign_id=phase.campaign_id, 

1746 ) 

1747 phase_collection = get_collection(systems_database, "phases", create=True) 

1748 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1749 if new_phase is None: 

1750 return None 

1751 return {"phase_id": str(new_phase.inserted_id)} 

1752 

1753 @classmethod 

1754 def update(cls, phase: Self): 

1755 updated_phase = cls.collection().find_one_and_update( 

1756 {"_id": ObjectId(phase.id)}, 

1757 { 

1758 "$set": { 

1759 "name": phase.name, 

1760 "description": phase.description, 

1761 "start_at": phase.start_at, 

1762 "end_at": phase.end_at, 

1763 } 

1764 }, 

1765 return_document=ReturnDocument.AFTER, 

1766 ) 

1767 return updated_phase 

1768 

1769 @classmethod 

1770 def delete(cls, phase_id): 

1771 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1772 return delete_phase 

1773 

1774 @classmethod 

1775 def deleteMany(cls, campaign_id): 

1776 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1777 return delete_phases 

1778 

1779 

1780class CustomViewCreation(GenericMongo): 

1781 collection_name: ClassVar[str] = "custom_views" 

1782 

1783 name: str 

1784 configuration: list 

1785 

1786 

1787class CustomView(CustomViewCreation): 

1788 # Properties 

1789 id: str | None = None 

1790 

1791 # Foreign Key 

1792 user_id: str 

1793 

1794 # # Methods 

1795 # @classmethod 

1796 # def create(cls, form_custom_view: Self, user_id) -> list: 

1797 # custom_view = CustomView( 

1798 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1799 # ) 

1800 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1801 # return new_custom_view 

1802 

1803 # @classmethod 

1804 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1805 # updated_custom_view = cls.collection().find_one_and_update( 

1806 # {"_id": ObjectId(custom_view_id)}, 

1807 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1808 # return_document=ReturnDocument.AFTER, 

1809 # ) 

1810 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1811 # del updated_custom_view["_id"] 

1812 # return cls(**updated_custom_view) 

1813 

1814 # @classmethod 

1815 # def delete(cls, custom_view_id) -> bool: 

1816 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1817 # return deleted_custom_view.acknowledged 

1818 

1819 

1820CustomViewUpdate = create_update_model(CustomView) 

1821 

1822 

1823class Video(GenericMongo): 

1824 collection_name: ClassVar[str] = "videos" 

1825 

1826 # Properties 

1827 name: str 

1828 ip_addr: str 

1829 username: str | None = None 

1830 password: str | None = None 

1831 

1832 # Methods 

1833 @classmethod 

1834 def get_all(cls, sort_by="_id") -> list[Self]: 

1835 items = [] 

1836 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1837 items.append(cls.mongo_dict_to_object(dict_)) 

1838 return items 

1839 

1840 @classmethod 

1841 def get_video(cls, camera_id: ObjectId): 

1842 camera = cls.get_from_id(camera_id) 

1843 if camera is not None: 

1844 return camera.name 

1845 return None 

1846 

1847 

1848class Command(GenericMongo): 

1849 collection_name: ClassVar[str] = "commands" 

1850 

1851 # Properties 

1852 timestamp: datetime.datetime = None 

1853 sent_at: float 

1854 response_time: float = 0.0 

1855 command_type: str 

1856 description: str = "" 

1857 succeeded: bool = False 

1858 

1859 # Foreign key 

1860 user_id: str 

1861 

1862 @classmethod 

1863 def collection(cls): 

1864 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1865 

1866 @classmethod 

1867 def create(cls, command: Self): 

1868 command = cls( 

1869 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1870 sent_at=command.sent_at, 

1871 response_time=command.response_time, 

1872 command_type=command.command_type, 

1873 description=command.description, 

1874 succeeded=command.succeeded, 

1875 user_id=command.user_id, 

1876 ) 

1877 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

1878 if new_command is None: 

1879 return None 

1880 return {"command_id": str(new_command.inserted_id)} 

1881 

1882 def receive_response(self, response: dict): 

1883 self.response_time = time.time() - self.sent_at 

1884 self.succeeded = response.get("error", True) is False 

1885 if self.description == "": 

1886 self.description += response.get("message", "").rstrip() 

1887 

1888 

1889class SignalsPresetCreation(GenericMongo): 

1890 name: str 

1891 signal_ids: list[str] 

1892 

1893 

1894class SignalsPreset(SignalsPresetCreation): 

1895 collection_name: ClassVar[str] = "signals_presets" 

1896 

1897 user_id: str 

1898 

1899 @classmethod 

1900 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

1901 signals_preset = cls( 

1902 user_id=user_id, 

1903 name=signals_preset.name, 

1904 signal_ids=signals_preset.signal_ids, 

1905 ) 

1906 

1907 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

1908 

1909 return str(new_signal_preset.inserted_id) 

1910 

1911 

1912SignalsPresetUpdate = create_update_model(SignalsPreset) 

1913 

1914 

1915class LineStyle(str, Enum): 

1916 solid = "solid" 

1917 dotted = "dotted" 

1918 dashed = "dashed" 

1919 

1920 

1921class SignalAppearance: 

1922 value_color: str 

1923 forced_value_color: str 

1924 

1925 

1926class GraphThemeCreation(GenericMongo): 

1927 collection_name: ClassVar[str] = "graph_themes" 

1928 

1929 name: str 

1930 signal_id: str 

1931 value_color: str = "" 

1932 forced_value_color: str = "" 

1933 value_line_style: LineStyle = LineStyle.solid 

1934 forced_value_line_style: LineStyle = LineStyle.solid 

1935 private: bool = True 

1936 

1937 

1938class PublicGraphTheme(GraphThemeCreation): 

1939 created_by_user: bool 

1940 in_user_library: bool 

1941 active_for_user: bool 

1942 

1943 _current_user_id: str = "" 

1944 

1945 @classproperty 

1946 def custom_pipeline_steps(cls) -> dict[str, list]: 

1947 return { 

1948 "created_by_user": [ 

1949 { 

1950 "$addFields": { 

1951 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

1952 } 

1953 } 

1954 ], 

1955 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

1956 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

1957 ], 

1958 "in_user_library": [ 

1959 { 

1960 "$addFields": { 

1961 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

1962 } 

1963 } 

1964 ], 

1965 "active_for_user": [ 

1966 { 

1967 "$addFields": { 

1968 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

1969 } 

1970 } 

1971 ], 

1972 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

1973 "active": [ 

1974 { 

1975 "$addFields": { 

1976 "active": "$$REMOVE", 

1977 } 

1978 } 

1979 ], 

1980 "creator_id": [ 

1981 { 

1982 "$addFields": { 

1983 "creator_id": "$$REMOVE", 

1984 } 

1985 } 

1986 ], 

1987 } 

1988 

1989 @classmethod 

1990 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

1991 cls._current_user_id = user_id 

1992 return super().response_from_query(query) 

1993 

1994 @classmethod 

1995 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

1996 query.in_user_library = "true" 

1997 return cls.response_from_query(query, user_id) 

1998 

1999 @classmethod 

2000 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

2001 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

2002 

2003 @classmethod 

2004 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2005 cls._current_user_id = user_id 

2006 return super().get_by_attribute(attribute_name, attribute_value) 

2007 

2008 @classmethod 

2009 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

2010 cls._current_user_id = user_id 

2011 return super().get_one_by_attribute(attribute_name, attribute_value) 

2012 

2013 @classmethod 

2014 def get_all(cls, sort_by: str, user_id: str): 

2015 cls._current_user_id = user_id 

2016 return super().get_all(sort_by) 

2017 

2018 @classmethod 

2019 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

2020 pipeline = [ 

2021 { 

2022 "$match": { 

2023 "active": {"$eq": user_id}, 

2024 "signal_id": {"$in": signal_ids}, 

2025 } 

2026 }, 

2027 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

2028 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

2029 { 

2030 "$project": { 

2031 "_id": 0, 

2032 "signal_id": 1, 

2033 "value_color": 1, 

2034 "forced_value_color": 1, 

2035 "value_line_style": 1, 

2036 "forced_value_line_style": 1, 

2037 } 

2038 }, 

2039 ] 

2040 

2041 result = {} 

2042 

2043 cursor = cls.collection().aggregate(pipeline) 

2044 for document in cursor: 

2045 signal_id = document["signal_id"] 

2046 del document["signal_id"] 

2047 result[signal_id] = document 

2048 

2049 return result 

2050 

2051 

2052GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2053 

2054 

2055class PrivateGraphTheme(GraphThemeCreation): 

2056 # private 

2057 creator_id: str 

2058 in_library: list[str] 

2059 active: list[str] 

2060 

2061 @classmethod 

2062 def create( 

2063 cls, 

2064 creator_id: str, 

2065 name: str, 

2066 signal_id: str, 

2067 value_color: str, 

2068 forced_value_color: str, 

2069 value_line_style: LineStyle, 

2070 forced_value_line_style: LineStyle, 

2071 private: bool, 

2072 ): 

2073 color_setting = cls( 

2074 creator_id=creator_id, 

2075 name=name, 

2076 signal_id=signal_id, 

2077 value_color=value_color, 

2078 forced_value_color=forced_value_color, 

2079 value_line_style=value_line_style, 

2080 forced_value_line_style=forced_value_line_style, 

2081 private=private, 

2082 in_library=[creator_id], 

2083 active=[creator_id], 

2084 ) 

2085 

2086 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2087 color_setting.id = str(new_color_setting.inserted_id) 

2088 return color_setting 

2089 

2090 def update(self, update_dict: dict, user_id: str): 

2091 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2092 if in_user_lib and user_id not in self.in_library: 

2093 self.in_library.append(user_id) 

2094 elif not in_user_lib and user_id in self.in_library: 

2095 self.in_library.remove(user_id) 

2096 update_dict["in_library"] = self.in_library 

2097 del update_dict["in_user_library"] 

2098 

2099 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2100 if active_for_user and user_id not in self.active: 

2101 self.active.append(user_id) 

2102 elif not active_for_user and user_id in self.active: 

2103 self.active.remove(user_id) 

2104 update_dict["active"] = self.active 

2105 del update_dict["active_for_user"] 

2106 

2107 if update_dict.get("created_by_user") is not None: 

2108 del update_dict["created_by_user"] 

2109 

2110 self.collection().find_one_and_update( 

2111 {"_id": ObjectId(self.id)}, 

2112 {"$set": update_dict}, 

2113 ) 

2114 

2115 return {}