Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%

1161 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-09 14:47 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23import h5py 

24 

25# from scipy import signal as signal_scipy 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 systems_database, 

32 systems_async_database, 

33 signals_database, 

34 devices_states_database, 

35) 

36from twinpad_backend.responses import ListResponse 

37from twinpad_backend.messages import send_mode_change, send_signal_value 

38 

39TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

40 

41 

42RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

43MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

44SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

45HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

46DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

47 

48DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

49NUMBER_SAMPLES_DATABASE_UPDATE = 120 

50 

51logger = logging.getLogger("uvicorn.error") 

52 

53 

54class classproperty: 

55 """ 

56 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

57 Found here: https://stackoverflow.com/a/76301341 

58 """ 

59 

60 def __init__(self, func): 

61 self.fget = func 

62 

63 def __get__(self, _, owner): 

64 return self.fget(owner) 

65 

66 

67def create_update_model(model): 

68 fields = {} 

69 

70 for field_name, field_annotation in model.model_fields.items(): 

71 if field_name != "id": 

72 fields[field_name] = (field_annotation.annotation | None, None) 

73 

74 query_name = model.__name__ + "Update" 

75 return create_model(query_name, **fields) 

76 

77 

78def get_utc_date_from_timestamp(timestamp: float): 

79 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

80 

81 

82def downsample_list(time_vector: list, values: list, max_number_samples: int): 

83 if len(time_vector) < max_number_samples: 

84 return time_vector, values 

85 

86 time_vector_copy = copy.deepcopy(time_vector) 

87 values_copy = copy.deepcopy(values) 

88 

89 none_group_bounds = [] 

90 none_group_index = -1 

91 index = -1 

92 # LTTB doesn't handle None values so remove them 

93 while values_copy.count(None) > 0: 

94 # Store bounds of None value groups so we can insert them back after the downsampling 

95 if (new_index := values_copy.index(None)) != index: 

96 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

97 none_group_index += 1 

98 elif len(none_group_bounds[none_group_index]) < 2: 

99 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

100 else: 

101 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

102 values_copy.pop(new_index) 

103 index = new_index 

104 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

105 

106 try: 

107 values_array = npy.array([time_vector_copy, values_copy]).T 

108 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

109 

110 new_time_vector = interpolated_values[:, 0].tolist() 

111 new_values = interpolated_values[:, 1].tolist() 

112 except ValueError: 

113 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

114 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

115 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

116 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

117 return new_time_vector, new_values_nan_to_none 

118 

119 # insert back None values at the correct timestamps 

120 for none_group in none_group_bounds: 

121 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

122 new_time_vector[start_index:start_index] = none_group 

123 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

124 

125 return new_time_vector, new_values 

126 

127 

128def is_of_type(value, wanted_type): 

129 if wanted_type is float: 

130 return isinstance(value, (int, float)) 

131 return isinstance(value, wanted_type) 

132 

133 

134# Models 

135class TwinPadModel(BaseModel): 

136 @classmethod 

137 def dict_to_object(cls, dict_): 

138 return cls.model_validate(dict_) 

139 

140 def to_dict(self, exclude=None): 

141 dict_ = self.model_dump(exclude=exclude) 

142 return dict_ 

143 

144 

145class GenericMongo(TwinPadModel): 

146 id: str | None = None 

147 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

148 

149 @classmethod 

150 def collection(cls): 

151 return get_collection(systems_database, cls.collection_name, create=True) 

152 

153 @classmethod 

154 def response_from_query(cls, query) -> ListResponse[Self]: 

155 request_filters = query.mongodb_filter() 

156 items = [] 

157 if ":" in query.sort_by: 

158 sort_field, sort_order = query.sort_by.split(":") 

159 sort_order = int(sort_order) 

160 else: 

161 sort_field = query.sort_by 

162 sort_order = 1 

163 collection = get_collection(systems_database, cls.collection_name, create=True) 

164 total = collection.count_documents(request_filters) 

165 

166 pipeline = [] 

167 added_properties = [] 

168 if "$and" in request_filters: 

169 for request_filter in request_filters["$and"]: 

170 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

171 if filtered_property in request_filter: 

172 pipeline.extend(pipeline_steps) 

173 added_properties.append(filtered_property) 

174 else: 

175 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

176 if filtered_property in request_filters: 

177 pipeline.extend(pipeline_steps) 

178 added_properties.append(filtered_property) 

179 pipeline.append({"$match": request_filters}) 

180 if sort_field in cls.custom_pipeline_steps: 

181 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

182 added_properties.append(sort_field) 

183 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}]) 

184 

185 if (query.limit is not None) and (query.limit != 0): 

186 pipeline.append({"$limit": query.limit}) 

187 

188 for filtered_property, step in cls.custom_pipeline_steps.items(): 

189 if filtered_property not in added_properties: 

190 pipeline.extend(step) 

191 

192 cursor = collection.aggregate(pipeline) 

193 

194 for item_dict in cursor: 

195 items.append(cls.mongo_dict_to_object(item_dict)) 

196 

197 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

198 

199 @classmethod 

200 def get_from_id(cls, item_id) -> Self | None: 

201 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

202 

203 @classmethod 

204 def mongo_dict_to_object(cls, mongo_dict): 

205 mongo_dict["id"] = str(mongo_dict["_id"]) 

206 del mongo_dict["_id"] 

207 return cls.dict_to_object(mongo_dict) 

208 

209 @classmethod 

210 def get_by_attribute(cls, attribute_name: str, attribute_value): 

211 """Returns all items that match the attribute with value.""" 

212 pipeline = [] 

213 if attribute_name in cls.custom_pipeline_steps: 

214 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

215 pipeline.append({"$match": {attribute_name: attribute_value}}) 

216 for key, step in cls.custom_pipeline_steps.items(): 

217 if key != attribute_name: 

218 pipeline.extend(step) 

219 items = cls.collection().aggregate(pipeline) 

220 if items is None: 

221 return None 

222 return [cls.mongo_dict_to_object(d) for d in items] 

223 

224 @classmethod 

225 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

226 pipeline = [] 

227 if attribute_name in cls.custom_pipeline_steps: 

228 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

229 pipeline.append({"$match": {attribute_name: attribute_value}}) 

230 pipeline.append({"$limit": 1}) 

231 for key, step in cls.custom_pipeline_steps.items(): 

232 if key != attribute_name: 

233 pipeline.extend(step) 

234 items = cls.collection().aggregate(pipeline).to_list() 

235 if len(items) == 0: 

236 return None 

237 return cls.mongo_dict_to_object(items[0]) 

238 

239 @classmethod 

240 def get_all(cls, sort_by="_id") -> list[Self]: 

241 items = [] 

242 pipeline = [] 

243 if sort_by in cls.custom_pipeline_steps: 

244 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

245 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

246 for key, step in cls.custom_pipeline_steps.items(): 

247 if key != sort_by: 

248 pipeline.extend(step) 

249 for dict_ in cls.collection().aggregate(pipeline): 

250 items.append(cls.mongo_dict_to_object(dict_)) 

251 return items 

252 

253 @classmethod 

254 def get_number_documents(cls): 

255 collection = get_collection(systems_database, cls.collection_name) 

256 if collection is None: 

257 return 0 

258 return collection.count_documents({}) 

259 

260 def insert(self): 

261 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

262 self.id = str(insert_result.inserted_id) 

263 return self.id 

264 

265 def update(self, update_dict): 

266 for key, value in update_dict.items(): 

267 setattr(self, key, value) 

268 self.collection().find_one_and_update( 

269 {"_id": ObjectId(self.id)}, 

270 {"$set": update_dict}, 

271 return_document=ReturnDocument.AFTER, 

272 ) 

273 

274 return self 

275 

276 def delete(self): 

277 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

278 return result.deleted_count > 0 

279 

280 

281class User(GenericMongo): 

282 collection_name: ClassVar[str] = "users" 

283 

284 firstname: str 

285 lastname: str 

286 email: str 

287 password: str 

288 is_active: bool | None = False 

289 is_admin: bool | None = False 

290 is_connected: bool | None = False 

291 company_id: str | None = None 

292 

293 def to_dict(self, exclude=None): 

294 if exclude is None: 

295 exclude = {"password"} 

296 return GenericMongo.to_dict(self, exclude=exclude) 

297 

298 @classmethod 

299 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

300 users = cls.get_all() 

301 if not users: 

302 is_admin = True 

303 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

304 user_collection = get_collection(systems_database, "users", create=True) 

305 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

306 if new_user is None: 

307 return None 

308 return {"user_id": str(new_user.inserted_id)} 

309 

310 @classmethod 

311 def update_info(cls, user: "UserUpdate", user_id: str): 

312 updated_user = cls.collection().find_one_and_update( 

313 {"_id": ObjectId(user_id)}, 

314 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

315 return_document=ReturnDocument.AFTER, 

316 ) 

317 updated_user["id"] = str(updated_user["_id"]) 

318 del (updated_user["_id"], updated_user["is_connected"]) 

319 return cls(**updated_user) 

320 

321 

322UserUpdate = create_update_model(User) 

323 

324 

325class Mode(TwinPadModel): 

326 mode_id: int 

327 name: str 

328 frequency_multiplier: float 

329 min_frequency: float 

330 

331 

332class DeviceUpdate(TwinPadModel): 

333 mode_id: int 

334 

335 

336class Device(GenericMongo): 

337 collection_name: ClassVar[str] = "devices" 

338 

339 device_id: str 

340 name: str 

341 description: str = "" 

342 modes: list[Mode] 

343 current_mode_id: int | None = None 

344 last_ping: float | None = None 

345 petri_network: Any 

346 pid: Any 

347 load: float | None = None 

348 tokens: list[int] = Field(default_factory=list) 

349 status: str 

350 

351 async def change_mode(self, update_dict, current_user: User): 

352 has_error = False 

353 

354 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

355 has_error = True 

356 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

357 elif self.current_mode_id is not None: 

358 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

359 else: 

360 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

361 command = Command( 

362 sent_at=time.time(), 

363 command_type="Mode change", 

364 description=description, 

365 user_id=current_user.id, 

366 ) 

367 

368 if has_error: 

369 command.response_time = 0 

370 command.succeeded = False 

371 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

372 else: 

373 response = await send_mode_change(self.device_id, update_dict.mode_id) 

374 command.receive_response(response) 

375 

376 Command.create(command) 

377 return response 

378 

379 @classmethod 

380 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

381 devices_by_id = {} 

382 for signal_id in signal_ids: 

383 device_id = signal_id.split(".")[0] 

384 if device_id not in devices_by_id: 

385 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

386 return devices_by_id 

387 

388 

389class DeviceSetup(GenericMongo): 

390 collection_name: ClassVar[str] = "device_setups" 

391 

392 device_ids: list[str] 

393 active: bool = False 

394 variable_mapping: dict[str, str] 

395 

396 

397DeviceSetupUpdate = create_update_model(DeviceSetup) 

398 

399 

400class DeviceState(GenericMongo): 

401 collection_name: ClassVar[str] = "devices_states" 

402 

403 timestamp: float 

404 mode: str | None = None 

405 load: float | None = None 

406 tokens: list[int] = Field(default_factory=list) 

407 modified_properties: list[str] = Field(default_factory=list) 

408 

409 @classmethod 

410 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

411 req_filter = query.mongodb_filter() 

412 items = [] 

413 if ":" in query.sort_by: 

414 sort_field, sort_order = query.sort_by.split(":") 

415 sort_order = int(sort_order) 

416 else: 

417 sort_field = query.sort_by 

418 sort_order = 1 

419 collection = get_collection(devices_states_database, device_id) 

420 if collection is None: 

421 total = 0 

422 cursor = [] 

423 else: 

424 total = collection.count_documents(req_filter) 

425 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

426 if (query.limit is not None) and (query.limit != 0): 

427 cursor = cursor.limit(query.limit) 

428 for item_dict in cursor: 

429 items.append( 

430 cls( 

431 timestamp=item_dict.get("precise_timestamp"), 

432 mode=item_dict.get("mode", None), 

433 load=item_dict.get("load", None), 

434 tokens=item_dict.get("tokens", Field(default_factory=list)), 

435 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

436 ) 

437 ) 

438 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

439 

440 

441class SignalSample(TwinPadModel): 

442 signal_id: str 

443 timestamp: float 

444 value: float | int | str | bool | None 

445 forced_value: float | int | str | bool | None = None 

446 

447 @classmethod 

448 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

449 

450 collection = get_signal_collection(signal_id) 

451 if collection is None: 

452 return None 

453 

454 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

455 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

456 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

457 first_bucket = None 

458 if bucket is not None: 

459 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

460 if first_bucket is not None: 

461 sample_data = collection.find_one( 

462 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

463 ) 

464 else: 

465 sample_data = collection.find_one({}, sort={"precise_timestamp": 1}) 

466 

467 if sample_data is None: 

468 return None 

469 

470 timestamp = sample_data["precise_timestamp"] 

471 

472 return cls( 

473 signal_id=signal_id, 

474 timestamp=timestamp, 

475 value=sample_data.get("value", None), 

476 forced_value=sample_data.get("forced_value", None), 

477 ) 

478 

479 @classmethod 

480 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

481 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

482 

483 @classmethod 

484 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

485 

486 collection = get_signal_collection(signal_id) 

487 if collection is None: 

488 return None 

489 

490 # Same workaround as above function, very effective to narrow down big sets of data 

491 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

492 last_bucket = None 

493 if bucket is not None: 

494 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

495 if last_bucket is not None: 

496 sample_data = collection.find_one( 

497 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}}, 

498 sort={"precise_timestamp": -1}, 

499 ) 

500 else: 

501 sample_data = collection.find_one({}, sort={"precise_timestamp": -1}) 

502 

503 if sample_data is None: 

504 return None 

505 

506 timestamp = sample_data["precise_timestamp"] 

507 

508 if device is None: 

509 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

510 if device is not None and device.last_ping is not None: 

511 if timestamp is None: 

512 timestamp = device.last_ping 

513 else: 

514 timestamp = max(timestamp, device.last_ping) 

515 return cls( 

516 signal_id=signal_id, 

517 timestamp=timestamp, 

518 value=sample_data.get("value", None), 

519 forced_value=sample_data.get("forced_value", None), 

520 ) 

521 

522 @classmethod 

523 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

524 collection = get_signal_collection(signal_id) 

525 if collection is None: 

526 return None 

527 

528 sample_data = collection.find_one( 

529 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1} 

530 ) 

531 if sample_data is None: 

532 return None 

533 

534 return cls( 

535 signal_id=signal_id, 

536 timestamp=sample_data.get("precise_timestamp"), 

537 value=sample_data.get("value"), 

538 forced_value=sample_data.get("forced_value"), 

539 ) 

540 

541 @classmethod 

542 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

543 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

544 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

545 

546 @classmethod 

547 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

548 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

549 

550 

551class SignalData(TwinPadModel): 

552 signal_id: str 

553 forcible: bool = True 

554 time_vector: list[float] 

555 values: list[float | int | str | None] 

556 forced_values: list[float | int | str | None] 

557 

558 data_start: float | None = None 

559 data_end: float | None = None 

560 

561 number_samples: int = 0 

562 number_samples_db: int = 0 

563 

564 db_query_time: float = 0.0 

565 init_time: float = 0.0 

566 data_processing_time: float = 0.0 

567 

568 @classmethod 

569 def get_from_signal_id( 

570 cls, 

571 signal_id: str, 

572 min_timestamp: float = None, 

573 max_timestamp: float = None, 

574 window_min_timestamp: float = None, 

575 window_max_timestamp: float = None, 

576 interpolate_bounds: bool = True, 

577 max_documents: int = None, 

578 ) -> Self: 

579 

580 now = time.time() 

581 

582 req_signal = {} 

583 if min_timestamp is not None: 

584 req_signal.setdefault("timestamp", {}) 

585 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

586 if max_timestamp is not None: 

587 req_signal.setdefault("timestamp", {}) 

588 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

589 

590 collection = get_signal_collection(signal_id) 

591 if collection is None: 

592 return cls( 

593 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

594 ) 

595 

596 db_req_start = time.time() 

597 

598 sort_step = {"$sort": {"precise_timestamp": 1}} 

599 number_results = collection.count_documents(req_signal) 

600 

601 pipeline = [] 

602 if req_signal: 

603 pipeline.append({"$match": req_signal}) # Filter data if needed 

604 

605 pipeline.extend( 

606 [ 

607 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

608 sort_step, 

609 ] 

610 ) 

611 

612 if max_documents is not None and max_documents < number_results: 

613 unsampling_ratio = math.ceil(number_results / max_documents) 

614 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

615 pipeline.extend( 

616 [ 

617 { 

618 "$setWindowFields": { 

619 "sortBy": {"precise_timestamp": 1}, 

620 "output": {"index": {"$documentNumber": {}}}, 

621 } 

622 }, 

623 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

624 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

625 {"$replaceRoot": {"newRoot": "$doc"}}, 

626 {"$unset": ["index", "group_id"]}, 

627 {"$sort": {"precise_timestamp": 1}}, 

628 ] 

629 ) 

630 

631 # logger.info(f"pipeline: %s", str(pipeline)) 

632 cursor = collection.aggregate(pipeline) 

633 db_req_time = time.time() - db_req_start 

634 

635 init_time = time.time() 

636 

637 results = cursor.to_list() 

638 time_vector = [] 

639 values = [] 

640 forced_values = [] 

641 for s in results: 

642 time_vector.append(s["precise_timestamp"]) 

643 values.append(s.get("value", None)) 

644 forced_values.append(s.get("forced_value", None)) 

645 

646 signal = Signal.get_from_signal_id(signal_id) 

647 class_ = signal.signal_data_class 

648 

649 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

650 time_vector, values, forced_values = cls.interpolate_bounds( 

651 class_, 

652 collection, 

653 signal_id, 

654 time_vector, 

655 values, 

656 forced_values, 

657 window_min_timestamp, 

658 window_max_timestamp, 

659 ) 

660 

661 if values: 

662 # TODO: check below. a bit strange 

663 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

664 # Adding last value as it should be repeated 

665 time_vector.append(now) 

666 values.append(values[-1]) 

667 forced_values.append(forced_values[-1]) 

668 

669 init_time = time.time() - init_time 

670 

671 # See line 292 for explanation 

672 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

673 first_bucket = None 

674 if bucket is not None: 

675 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

676 if first_bucket is not None: 

677 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

678 else: 

679 data_start = None 

680 

681 last_bucket = None 

682 if bucket is not None: 

683 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

684 if last_bucket is not None: 

685 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

686 else: 

687 data_end = None 

688 

689 return class_( 

690 signal_id=signal_id, 

691 forcible=signal.forcible, 

692 time_vector=time_vector, 

693 values=values, 

694 forced_values=forced_values, 

695 data_start=data_start, 

696 data_end=data_end, 

697 number_samples=len(values), 

698 number_samples_db=number_results, 

699 db_query_time=db_req_time, 

700 init_time=init_time, 

701 ) 

702 

703 @staticmethod 

704 def interpolate_bounds( 

705 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

706 ): 

707 sample_right = None 

708 # Fetching right side value & interpolation 

709 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

710 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

711 sample_right = collection.find_one( 

712 { 

713 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

714 "value": {"$exists": True}, 

715 }, 

716 sort={"precise_timestamp": -1}, 

717 ) 

718 if sample_right: 

719 if time_vector: 

720 right_sd = class_( 

721 signal_id=signal_id, 

722 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

723 values=[values[-1], sample_right.get("value", None)], 

724 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

725 ) 

726 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

727 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

728 else: 

729 max_ts_value = sample_right.get("value", None) 

730 max_ts_forced_value = sample_right.get("forced_value", None) 

731 time_vector.append(window_max_timestamp) 

732 values.append(max_ts_value) 

733 forced_values.append(max_ts_forced_value) 

734 

735 # Fetching left side value & interpolation 

736 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

737 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

738 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

739 sample_left = sample_right 

740 sample_left = collection.find_one( 

741 { 

742 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

743 "value": {"$exists": True}, 

744 }, 

745 sort={"precise_timestamp": -1}, 

746 ) 

747 

748 if sample_left: 

749 if time_vector: 

750 left_sd = class_( 

751 signal_id=signal_id, 

752 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

753 values=[sample_left["value"], values[0]], 

754 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

755 ) 

756 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

757 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

758 else: 

759 min_ts_value = sample_left.get("value", None) 

760 min_ts_forced_value = sample_left.get("forced_value", None) 

761 time_vector.insert(0, window_min_timestamp) 

762 values.insert(0, min_ts_value) 

763 forced_values.insert(0, min_ts_forced_value) 

764 

765 return time_vector, values, forced_values 

766 

767 def interpolate_values(self, new_time_vector: list[float]): 

768 return self.interpolate(new_time_vector, self.values) 

769 

770 def interpolate_forced_values(self, new_time_vector: list[float]): 

771 return self.interpolate(new_time_vector, self.forced_values) 

772 

773 def uniform_desampling(self, number_samples_max: int) -> Self: 

774 data_processing_time = time.time() 

775 if number_samples_max and self.number_samples > number_samples_max: 

776 new_time_vector = npy.linspace( 

777 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

778 ).tolist() 

779 values = self.interpolate_values(new_time_vector) 

780 forced_values = self.interpolate_forced_values(new_time_vector) 

781 time_vector = new_time_vector 

782 number_samples = len(time_vector) 

783 else: 

784 time_vector = self.time_vector 

785 number_samples = len(self.values) 

786 values = self.values[:] 

787 forced_values = self.forced_values[:] 

788 data_processing_time = time.time() - data_processing_time 

789 

790 return self.__class__( 

791 signal_id=self.signal_id, 

792 time_vector=time_vector, 

793 values=values, 

794 forced_values=forced_values, 

795 number_samples=number_samples, 

796 number_samples_db=self.number_samples, 

797 data_start=self.data_start, 

798 data_end=self.data_end, 

799 db_query_time=self.db_query_time, 

800 init_time=self.init_time, 

801 data_processing_time=self.data_processing_time + data_processing_time, 

802 ) 

803 

804 def interest_window_desampling( 

805 self, 

806 window_max_number_samples: int, 

807 outside_max_number_samples: int, 

808 window_min_timestamp: float | None = None, 

809 window_max_timestamp: float | None = None, 

810 ) -> Self: 

811 """Performs a sampling in a window of interest and outside.""" 

812 

813 if not self.time_vector: 

814 return self 

815 

816 if window_min_timestamp is None: 

817 window_min_timestamp = self.time_vector[0] 

818 if window_max_timestamp is None: 

819 window_max_timestamp = self.time_vector[-1] 

820 

821 data_processing_time = time.time() 

822 

823 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

824 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

825 

826 time_vector_before = self.time_vector[:index_window_start] 

827 time_vector_window = self.time_vector[index_window_start:index_window_end] 

828 time_vector_after = self.time_vector[index_window_end:] 

829 

830 # Resampling window 

831 if time_vector_window: 

832 # Ensurring window bounds 

833 if time_vector_window[0] != window_min_timestamp: 

834 time_vector_window.insert(0, window_min_timestamp) 

835 if time_vector_window[-1] != window_max_timestamp: 

836 time_vector_window.append(window_max_timestamp) 

837 else: 

838 time_vector_window = [window_min_timestamp, window_max_timestamp] 

839 

840 if len(time_vector_window) > window_max_number_samples: 

841 # Resampling 

842 new_window_time_vector = npy.linspace( 

843 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

844 ).tolist() 

845 time_vector_window = new_window_time_vector 

846 

847 # Resampling outside 

848 number_samples_before = len(time_vector_before) 

849 number_samples_after = len(time_vector_after) 

850 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

851 new_number_samples_before = min( 

852 number_samples_before, 

853 math.ceil( 

854 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

855 ), 

856 ) 

857 new_number_samples_after = min( 

858 number_samples_after, 

859 math.ceil( 

860 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

861 ), 

862 ) 

863 # Adjusting numbers as math.ceil can do +1 on sum 

864 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

865 if new_number_samples_before > new_number_samples_after: 

866 new_number_samples_before -= 1 

867 else: 

868 new_number_samples_after -= 1 

869 

870 if new_number_samples_before > 0: 

871 new_time_vector_before = npy.linspace( 

872 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

873 ).tolist() 

874 time_vector_before = new_time_vector_before 

875 

876 if new_number_samples_after > 0: 

877 new_time_vector_after = npy.linspace( 

878 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

879 ).tolist()[::-1] 

880 time_vector_after = new_time_vector_after 

881 

882 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

883 values = self.interpolate_values(new_time_vector) 

884 forced_values = self.interpolate_forced_values(new_time_vector) 

885 number_samples = len(values) 

886 

887 data_processing_time = time.time() - data_processing_time 

888 

889 return self.__class__( 

890 signal_id=self.signal_id, 

891 forcible=self.forcible, 

892 time_vector=new_time_vector, 

893 values=values, 

894 forced_values=forced_values, 

895 number_samples=number_samples, 

896 number_samples_db=self.number_samples, 

897 data_start=self.data_start, 

898 data_end=self.data_end, 

899 db_query_time=self.db_query_time, 

900 init_time=self.init_time, 

901 data_processing_time=self.data_processing_time + data_processing_time, 

902 ) 

903 

904 def csv_export(self): 

905 output = io.StringIO() 

906 writer = csv.writer(output) 

907 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

908 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

909 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

910 return output.getvalue().encode("utf-8") 

911 

912 def prestoplot_export(self): 

913 clean_signal_id = self.signal_id.replace(".", "_") 

914 if clean_signal_id[0].isnumeric(): 

915 clean_signal_id = "_" + clean_signal_id 

916 

917 output = io.StringIO() 

918 output.write("# Encoding:\tUTF-8\n") 

919 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

920 output.write("ISO8601\tnone\tnone\n") 

921 output.write(f"# Description :\t{clean_signal_id}\n") 

922 

923 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

924 output.write( 

925 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

926 ) 

927 return output.getvalue().encode("utf-8") 

928 

929 

930class NumericSignalData(SignalData): 

931 data_type: str = "float" 

932 values: list[float | int | None] 

933 forced_values: list[float | int | None] 

934 

935 def interpolate(self, new_time_vector: list[float], items): 

936 items = [npy.nan if s is None else s for s in items] 

937 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

938 

939 def uniform_desampling(self, number_samples_max: int) -> Self: 

940 data_processing_time = time.time() 

941 if number_samples_max and self.number_samples > number_samples_max: 

942 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

943 forced_values = self.interpolate_forced_values(time_vector) 

944 number_samples = len(time_vector) 

945 else: 

946 time_vector = self.time_vector 

947 number_samples = len(self.values) 

948 values = self.values[:] 

949 forced_values = self.forced_values[:] 

950 data_processing_time = time.time() - data_processing_time 

951 

952 return self.__class__( 

953 signal_id=self.signal_id, 

954 time_vector=time_vector, 

955 values=values, 

956 forced_values=forced_values, 

957 number_samples=number_samples, 

958 number_samples_db=self.number_samples, 

959 data_start=self.data_start, 

960 data_end=self.data_end, 

961 db_query_time=self.db_query_time, 

962 init_time=self.init_time, 

963 data_processing_time=self.data_processing_time + data_processing_time, 

964 ) 

965 

966 def interest_window_desampling( 

967 self, 

968 window_max_number_samples: int, 

969 outside_max_number_samples: int, 

970 window_min_timestamp: float | None = None, 

971 window_max_timestamp: float | None = None, 

972 ) -> Self: 

973 """Performs a sampling in a window of interest and outside.""" 

974 

975 if not self.time_vector: 

976 return self 

977 

978 if window_min_timestamp is None: 

979 window_min_timestamp = self.time_vector[0] 

980 if window_max_timestamp is None: 

981 window_max_timestamp = self.time_vector[-1] 

982 

983 data_processing_time = time.time() 

984 

985 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

986 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

987 

988 time_vector_before = self.time_vector[:index_window_start] 

989 time_vector_window = self.time_vector[index_window_start:index_window_end] 

990 time_vector_after = self.time_vector[index_window_end:] 

991 

992 values_before = self.values[:index_window_start] 

993 values_window = self.values[index_window_start:index_window_end] 

994 values_after = self.values[index_window_end:] 

995 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

996 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

997 

998 # Resampling window 

999 if time_vector_window: 

1000 # Ensurring window bounds 

1001 if time_vector_window[0] != window_min_timestamp: 

1002 time_vector_window.insert(0, window_min_timestamp) 

1003 values_window.insert(0, window_min_value) 

1004 if time_vector_window[-1] != window_max_timestamp: 

1005 time_vector_window.append(window_max_timestamp) 

1006 values_window.append(window_max_value) 

1007 else: 

1008 time_vector_window = [window_min_timestamp, window_max_timestamp] 

1009 values_window = [window_min_value, window_max_value] 

1010 

1011 if len(time_vector_window) > window_max_number_samples: 

1012 # Resampling 

1013 time_vector_window, values_window = downsample_list( 

1014 time_vector_window, values_window, window_max_number_samples 

1015 ) 

1016 

1017 # Resampling outside 

1018 number_samples_before = len(time_vector_before) 

1019 number_samples_after = len(time_vector_after) 

1020 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1021 new_number_samples_before = min( 

1022 number_samples_before, 

1023 math.ceil( 

1024 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1025 ), 

1026 ) 

1027 new_number_samples_after = min( 

1028 number_samples_after, 

1029 math.ceil( 

1030 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1031 ), 

1032 ) 

1033 # Adjusting numbers as math.ceil can do +1 on sum 

1034 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1035 if new_number_samples_before > new_number_samples_after: 

1036 new_number_samples_before -= 1 

1037 else: 

1038 new_number_samples_after -= 1 

1039 

1040 if new_number_samples_before > 0: 

1041 time_vector_before, values_before = downsample_list( 

1042 time_vector_before, values_before, new_number_samples_before 

1043 ) 

1044 

1045 if new_number_samples_after > 0: 

1046 time_vector_after, values_after = downsample_list( 

1047 time_vector_after, values_after, new_number_samples_after 

1048 ) 

1049 

1050 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1051 values = values_before + values_window + values_after 

1052 forced_values = self.interpolate_forced_values(new_time_vector) 

1053 number_samples = len(values) 

1054 

1055 data_processing_time = time.time() - data_processing_time 

1056 

1057 return self.__class__( 

1058 signal_id=self.signal_id, 

1059 time_vector=new_time_vector, 

1060 values=values, 

1061 forced_values=forced_values, 

1062 number_samples=number_samples, 

1063 number_samples_db=self.number_samples, 

1064 data_start=self.data_start, 

1065 data_end=self.data_end, 

1066 db_query_time=self.db_query_time, 

1067 init_time=self.init_time, 

1068 data_processing_time=self.data_processing_time + data_processing_time, 

1069 ) 

1070 

1071 

1072class StringSignalData(SignalData): 

1073 data_type: str = "str" 

1074 values: list[str | None] 

1075 forced_values: list[str | None] 

1076 

1077 def interpolate(self, new_time_vector: list[float], items): 

1078 # Find the indices of the values in xp that are just smaller or equal to x 

1079 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1080 indices = npy.clip(indices, 0, len(items) - 1) 

1081 # Return the corresponding left string values from fp 

1082 return [items[i] for i in indices] 

1083 

1084 

1085class SignalsData(TwinPadModel): 

1086 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1087 data_processing_time: float 

1088 data_start: float | None 

1089 data_end: float | None 

1090 

1091 @classmethod 

1092 def get_from_signal_ids( 

1093 cls, 

1094 signal_ids: list[str], 

1095 min_timestamp: float = None, 

1096 max_timestamp: float = None, 

1097 window_min_timestamp: float = None, 

1098 window_max_timestamp: float = None, 

1099 interpolate_bounds: bool = True, 

1100 max_documents: int = None, 

1101 ) -> Self: 

1102 signals_data = [] 

1103 data_start = None 

1104 data_end = None 

1105 if max_timestamp is None: 

1106 max_timestamp = time.time() 

1107 data_processing_time = 0.0 

1108 for signal_id in signal_ids: 

1109 signal_data = SignalData.get_from_signal_id( 

1110 signal_id=signal_id, 

1111 min_timestamp=min_timestamp, 

1112 max_timestamp=max_timestamp, 

1113 window_min_timestamp=window_min_timestamp, 

1114 window_max_timestamp=window_max_timestamp, 

1115 interpolate_bounds=interpolate_bounds, 

1116 max_documents=max_documents, 

1117 ) 

1118 data_processing_time += signal_data.data_processing_time 

1119 signals_data.append(signal_data) 

1120 if signal_data.data_start is not None: 

1121 if data_start is None: 

1122 data_start = signal_data.data_start 

1123 else: 

1124 data_start = min(signal_data.data_start, data_start) 

1125 if signal_data.data_end is not None: 

1126 if data_end is None: 

1127 data_end = signal_data.data_end 

1128 else: 

1129 data_end = max(signal_data.data_end, data_end) 

1130 

1131 return cls( 

1132 signals_data=signals_data, 

1133 data_processing_time=data_processing_time, 

1134 data_start=data_start, 

1135 data_end=data_end, 

1136 ) 

1137 

1138 def uniform_desampling(self, number_samples_max: int) -> Self: 

1139 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1140 return SignalsData( 

1141 signals_data=signals_data, 

1142 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1143 data_start=self.data_start, 

1144 data_end=self.data_end, 

1145 ) 

1146 

1147 def interest_window_desampling( 

1148 self, 

1149 window_max_number_samples: int, 

1150 outside_max_number_samples: int, 

1151 window_min_timestamp: float = None, 

1152 window_max_timestamp: float = None, 

1153 ) -> Self: 

1154 signals_data = [ 

1155 s.interest_window_desampling( 

1156 window_max_number_samples=window_max_number_samples, 

1157 outside_max_number_samples=outside_max_number_samples, 

1158 window_min_timestamp=window_min_timestamp, 

1159 window_max_timestamp=window_max_timestamp, 

1160 ) 

1161 for s in self.signals_data 

1162 ] 

1163 

1164 return SignalsData( 

1165 signals_data=signals_data, 

1166 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1167 data_start=self.data_start, 

1168 data_end=self.data_end, 

1169 ) 

1170 

1171 def zip_export(self, file_format: str = "csv"): 

1172 # return self.signals_data[0].csv_export() 

1173 zip_buffer = io.BytesIO() 

1174 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1175 for signal_data in self.signals_data: 

1176 if file_format == "csv": 

1177 export_io = signal_data.csv_export() 

1178 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1179 elif file_format == "prestoplot": 

1180 export_io = signal_data.prestoplot_export() 

1181 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1182 else: 

1183 raise ValueError(f"Format not found. Got: {file_format}") 

1184 zip_bytes = zip_buffer.getvalue() 

1185 # zip_bytes.seek(0) 

1186 return zip_bytes 

1187 

1188 def hdf5_export(self): 

1189 hdf5_buffer = io.BytesIO() 

1190 custom_type_float = npy.dtype( 

1191 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1192 ) 

1193 custom_type_string = npy.dtype( 

1194 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1195 ) 

1196 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1197 for signal_data in self.signals_data: 

1198 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1199 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1200 if signal_data.data_type == "str": 

1201 export_data = npy.array( 

1202 list( 

1203 zip( 

1204 date_vector, 

1205 signal_data.time_vector, 

1206 signal_data.values, 

1207 signal_data.forced_values, 

1208 ) 

1209 ), 

1210 dtype=custom_type_string, 

1211 ) 

1212 else: 

1213 export_data = npy.array( 

1214 list( 

1215 zip( 

1216 date_vector, 

1217 signal_data.time_vector, 

1218 signal_data.values, 

1219 signal_data.forced_values, 

1220 ) 

1221 ), 

1222 dtype=custom_type_float, 

1223 ) 

1224 signal_group["data"] = export_data 

1225 return hdf5_buffer.getvalue() 

1226 

1227 

1228class SignalStatus(TwinPadModel): 

1229 status: str 

1230 reason: str 

1231 delay: float | None 

1232 

1233 

1234class DigitizationFunction(TwinPadModel): 

1235 bits: int | None = None 

1236 min_value: float 

1237 max_value: float 

1238 min_raw_value: float 

1239 max_raw_value: float 

1240 

1241 

1242class SignalUpdate(TwinPadModel): 

1243 value: float | str | bool | int | None = None 

1244 forced_value: float | str | bool | int | None = None 

1245 timestamp: int | None = None 

1246 

1247 

1248class SignalType(str, Enum): 

1249 command = "command" 

1250 sensor = "sensor" 

1251 external_sensor = "external_sensor" 

1252 

1253 

1254SIGNALDATA_TYPES = { 

1255 "int": NumericSignalData, 

1256 "float": NumericSignalData, 

1257 "str": StringSignalData, 

1258 "bool": NumericSignalData, 

1259 "epoch": NumericSignalData, 

1260} 

1261 

1262 

1263class Signal(GenericMongo): 

1264 collection_name: ClassVar[str] = "signals" 

1265 

1266 signal_id: str 

1267 frequency: float 

1268 unit: str | None 

1269 description: str 

1270 type: SignalType 

1271 data_type: str 

1272 precision_digits: int | None 

1273 forcible: bool 

1274 

1275 digitization_function: DigitizationFunction | None 

1276 

1277 @property 

1278 def device(self) -> Device: 

1279 device_id = self.signal_id.split(".")[0] 

1280 device = Device.get_one_by_attribute("device_id", device_id) 

1281 return device 

1282 

1283 @cached_property 

1284 def signal_data_class(self): 

1285 if self.data_type in SIGNALDATA_TYPES: 

1286 return SIGNALDATA_TYPES[self.data_type] 

1287 if self.data_type.startswith("enum"): 

1288 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1289 raise ValueError(f"Unhandled python type: {self.data_type}") 

1290 

1291 @cached_property 

1292 def python_type(self): 

1293 if self.data_type in TYPES: 

1294 return TYPES[self.data_type] 

1295 if self.data_type.startswith("enum"): 

1296 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1297 return Literal[*choices] 

1298 raise ValueError(f"Unhandled python type: {self.data_type}") 

1299 

1300 @computed_field 

1301 @property 

1302 def status(self) -> SignalStatus: 

1303 now = time.time() 

1304 status = "up" 

1305 reason = "" 

1306 

1307 # See line 285 for explanation 

1308 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1309 last_bucket = None 

1310 if bucket is not None: 

1311 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1312 if last_bucket is None: 

1313 status = "no data" 

1314 reason = "signal does not exist" 

1315 return SignalStatus(status=status, reason=reason, delay=None) 

1316 

1317 try: 

1318 last_date = last_bucket["control"]["max"]["timestamp"] 

1319 last_date = last_date.replace(tzinfo=pytz.UTC) 

1320 last_value_ts = last_date.timestamp() 

1321 except IndexError: 

1322 last_value_ts = None 

1323 

1324 if last_value_ts is None: 

1325 delay = None 

1326 reason = "No data from signal" 

1327 else: 

1328 # Since device is a computed property, only compute it once 

1329 device = self.device 

1330 if device is not None and device.last_ping is not None: 

1331 last_value_ts = max(last_value_ts, device.last_ping) 

1332 delay = now - last_value_ts 

1333 if delay > DEVICE_TIMEOUT: 

1334 status = "down" 

1335 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1336 return SignalStatus(status=status, reason=reason, delay=delay) 

1337 

1338 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1339 command = Command( 

1340 sent_at=time.time(), 

1341 command_type="Signal command", 

1342 user_id=current_user.id, 

1343 ) 

1344 

1345 has_input_error = False 

1346 error_message = "" 

1347 

1348 if self.data_type.startswith("enum"): 

1349 enum_options = get_args(self.python_type) 

1350 

1351 if update_dict.value is not None and update_dict.value not in enum_options: 

1352 has_input_error = True 

1353 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1354 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1355 has_input_error = True 

1356 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1357 else: 

1358 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1359 has_input_error = True 

1360 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1361 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1362 has_input_error = True 

1363 error_message += ( 

1364 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1365 ) 

1366 

1367 if has_input_error: 

1368 command.response_time = 0 

1369 command.succeeded = False 

1370 command.description = f"Tried to modify signal {self.signal_id}" 

1371 response = {"error": True, "status_code": 400, "message": error_message} 

1372 else: 

1373 response = await send_signal_value(self.signal_id, update_dict) 

1374 command.receive_response(response) 

1375 

1376 Command.create(command) 

1377 return response 

1378 

1379 @classmethod 

1380 def get_from_signal_id(cls, signal_id) -> Self: 

1381 """Could be generic from mongo""" 

1382 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1383 if not raw_value: 

1384 return None 

1385 del raw_value["_id"] 

1386 return cls.dict_to_object(raw_value) 

1387 

1388 @classmethod 

1389 def get_all_ids(cls) -> list[str]: 

1390 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1391 

1392 return [signal["signal_id"] for signal in cursor] 

1393 

1394 async def number_samples(self): 

1395 collection = get_signal_collection(signal_id=self.signal_id) 

1396 if collection is None: 

1397 return 0 

1398 

1399 number_samples = collection.estimated_document_count() 

1400 

1401 number_samples_async_collection = await get_async_collection( 

1402 systems_async_database, "number_samples", create=True, time_series=True 

1403 ) 

1404 

1405 loop = asyncio.get_running_loop() 

1406 loop.create_task( 

1407 number_samples_async_collection.insert_one( 

1408 { 

1409 "timestamp": datetime.datetime.now(pytz.UTC), 

1410 "signal_id": self.signal_id, 

1411 "number_samples": number_samples, 

1412 } 

1413 ) 

1414 ) 

1415 

1416 return number_samples 

1417 

1418 def sample_datasize(self): 

1419 return signals_database.command("collstats", self.signal_id)["size"] 

1420 

1421 @classmethod 

1422 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1423 result = cls.collection().aggregate( 

1424 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1425 ) 

1426 

1427 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1428 

1429 

1430class ServicesStatus(TwinPadModel): 

1431 backend: str 

1432 cloud_broker: str 

1433 time_series_database: str 

1434 signal_storage: str 

1435 heartbeat_storage: str 

1436 data_analyzer: str 

1437 

1438 @classmethod 

1439 def check(cls) -> Self: 

1440 return cls( 

1441 cloud_broker=ping(RABBITMQ_HOST), 

1442 backend="up", 

1443 time_series_database=ping(MONGO_HOST), 

1444 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1445 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1446 data_analyzer=ping(DATA_ANALYZER_HOST), 

1447 ) 

1448 

1449 

1450def ping(host): 

1451 try: 

1452 if ping3.ping(host, timeout=0.8): 

1453 return "up" 

1454 except PermissionError: 

1455 pass 

1456 return "down" 

1457 

1458 

1459class Event(GenericMongo): 

1460 collection_name: ClassVar[str] = "events" 

1461 

1462 name: str 

1463 timestamp: float 

1464 event_rule_id: str 

1465 

1466 @computed_field 

1467 @cached_property 

1468 def event_rule(self) -> "EventRule": 

1469 return EventRule.get_from_id(self.event_rule_id) 

1470 

1471 @classmethod 

1472 def dict_to_object(cls, dict_): 

1473 """Refine to convert timestamp to datetime for mongodb.""" 

1474 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1475 return super().dict_to_object(dict_) 

1476 

1477 

1478class TwinPadActivity(GenericMongo): 

1479 timestamp: float 

1480 amount: int 

1481 

1482 @classmethod 

1483 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

1484 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1485 number_events_collection = get_collection(systems_database, "number_events") 

1486 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1487 items = [] 

1488 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1489 if number_events_collection is None or recompute_amount: 

1490 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1491 number_events_collection.delete_many({}) 

1492 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1493 if first_event is None: 

1494 return items 

1495 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1496 tzinfo=pytz.UTC 

1497 ) 

1498 while last_computed_day < TODAY: 

1499 day_nb_events = events_collection.count_documents( 

1500 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1501 ) 

1502 if day_nb_events > 0: 

1503 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

1504 last_computed_day += ONE_DAY_OFFSET 

1505 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1506 if number_events_today > 0: 

1507 number_events_collection.delete_many({"timestamp": TODAY}) 

1508 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

1509 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1510 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1511 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1512 for day in number_events: 

1513 day["timestamp"] = day["timestamp"].timestamp() 

1514 items.append(cls.mongo_dict_to_object(day)) 

1515 return items 

1516 

1517 @classmethod 

1518 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1519 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1520 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1521 signals_number_samples_collection = get_collection( 

1522 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

1523 ) 

1524 items = [] 

1525 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1526 if number_samples_collection is None or recompute_amount: 

1527 number_samples_collection = get_collection( 

1528 systems_database, "number_received_samples", create=True, time_series=True 

1529 ) 

1530 number_samples_collection.delete_many({}) 

1531 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

1532 if first_sample is None: 

1533 return items 

1534 # compute from day of first found event 

1535 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

1536 tzinfo=pytz.UTC 

1537 ) 

1538 while last_computed_day < TODAY: 

1539 number_samples_request = signals_number_samples_collection.aggregate( 

1540 [ 

1541 { 

1542 "$match": { 

1543 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

1544 } 

1545 }, 

1546 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1547 ] 

1548 ).to_list() 

1549 if len(number_samples_request) == 0: 

1550 number_samples = 0 

1551 else: 

1552 number_samples = number_samples_request[0].get("number_samples", 0) 

1553 if number_samples > 0: 

1554 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

1555 last_computed_day += ONE_DAY_OFFSET 

1556 number_samples_request = signals_number_samples_collection.aggregate( 

1557 [ 

1558 {"$match": {"timestamp": {"$gte": TODAY}}}, 

1559 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1560 ] 

1561 ).to_list() 

1562 if len(number_samples_request) == 0: 

1563 number_samples_today = 0 

1564 else: 

1565 number_samples_today = number_samples_request[0].get("number_samples", 0) 

1566 if number_samples_today > 0: 

1567 number_samples_collection.delete_many({"timestamp": TODAY}) 

1568 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

1569 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1570 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1571 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1572 for day in number_events: 

1573 day["timestamp"] = day["timestamp"].timestamp() 

1574 items.append(cls.mongo_dict_to_object(day)) 

1575 return items 

1576 

1577 @classmethod 

1578 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1579 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1580 number_commands_collection = get_collection(systems_database, "number_commands") 

1581 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

1582 items = [] 

1583 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1584 if number_commands_collection is None or recompute_amount: 

1585 number_commands_collection = get_collection( 

1586 systems_database, "number_commands", create=True, time_series=True 

1587 ) 

1588 number_commands_collection.delete_many({}) 

1589 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

1590 if first_command is None: 

1591 return items 

1592 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

1593 tzinfo=pytz.UTC 

1594 ) 

1595 while last_computed_day < TODAY: 

1596 day_nb_commands = commands_collection.count_documents( 

1597 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1598 ) 

1599 if day_nb_commands > 0: 

1600 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

1601 last_computed_day += ONE_DAY_OFFSET 

1602 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1603 if number_commands_today > 0: 

1604 number_commands_collection.delete_many({"timestamp": TODAY}) 

1605 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

1606 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1607 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1608 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1609 for day in number_commands: 

1610 day["timestamp"] = day["timestamp"].timestamp() 

1611 items.append(cls.mongo_dict_to_object(day)) 

1612 return items 

1613 

1614 

1615class EventRule(GenericMongo): 

1616 collection_name: ClassVar[str] = "event_rules" 

1617 

1618 name: str 

1619 formula: str 

1620 variables: list[str] 

1621 

1622 @computed_field 

1623 @cached_property 

1624 def number_events(self) -> int: 

1625 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1626 

1627 

1628class Company(GenericMongo): 

1629 collection_name: ClassVar[str] = "companies" 

1630 name: str 

1631 

1632 

1633class Campaign(GenericMongo): 

1634 collection_name: ClassVar[str] = "campaigns" 

1635 

1636 # Properties 

1637 id: str | None = None 

1638 name: str 

1639 description: str | None = None 

1640 

1641 @classmethod 

1642 def create(cls, campaign: Self): 

1643 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1644 if new_campaign is None: 

1645 return None 

1646 return {"campaign_id": str(new_campaign.inserted_id)} 

1647 

1648 @classmethod 

1649 def update(cls, campaign: Self): 

1650 updated_campaign = cls.collection().find_one_and_update( 

1651 {"_id": ObjectId(campaign.id)}, 

1652 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1653 return_document=ReturnDocument.AFTER, 

1654 ) 

1655 return updated_campaign 

1656 

1657 @classmethod 

1658 def delete(cls, campaign_id): 

1659 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1660 return deleted_user 

1661 

1662 

1663class Phase(GenericMongo): 

1664 collection_name: ClassVar[str] = "phases" 

1665 

1666 # Properties 

1667 id: str | None = None 

1668 name: str 

1669 description: str | None = None 

1670 start_at: float 

1671 end_at: float 

1672 

1673 # FK 

1674 campaign_id: str 

1675 

1676 # @classmethod 

1677 # def get_by_date(cls, datetime: float): 

1678 # phases = [] 

1679 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1680 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1681 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1682 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1683 # if phases is None: 

1684 # return None 

1685 # return phases 

1686 

1687 @classmethod 

1688 def create(cls, phase: Self): 

1689 phase = Phase( 

1690 name=phase.name, 

1691 description=phase.description, 

1692 start_at=phase.start_at, 

1693 end_at=phase.end_at, 

1694 campaign_id=phase.campaign_id, 

1695 ) 

1696 phase_collection = get_collection(systems_database, "phases", create=True) 

1697 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1698 if new_phase is None: 

1699 return None 

1700 return {"phase_id": str(new_phase.inserted_id)} 

1701 

1702 @classmethod 

1703 def update(cls, phase: Self): 

1704 updated_phase = cls.collection().find_one_and_update( 

1705 {"_id": ObjectId(phase.id)}, 

1706 { 

1707 "$set": { 

1708 "name": phase.name, 

1709 "description": phase.description, 

1710 "start_at": phase.start_at, 

1711 "end_at": phase.end_at, 

1712 } 

1713 }, 

1714 return_document=ReturnDocument.AFTER, 

1715 ) 

1716 return updated_phase 

1717 

1718 @classmethod 

1719 def delete(cls, phase_id): 

1720 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1721 return delete_phase 

1722 

1723 @classmethod 

1724 def deleteMany(cls, campaign_id): 

1725 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1726 return delete_phases 

1727 

1728 

1729class CustomViewCreation(GenericMongo): 

1730 collection_name: ClassVar[str] = "custom_views" 

1731 

1732 name: str 

1733 configuration: list 

1734 

1735 

1736class CustomView(CustomViewCreation): 

1737 # Properties 

1738 id: str | None = None 

1739 

1740 # Foreign Key 

1741 user_id: str 

1742 

1743 # # Methods 

1744 # @classmethod 

1745 # def create(cls, form_custom_view: Self, user_id) -> list: 

1746 # custom_view = CustomView( 

1747 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1748 # ) 

1749 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1750 # return new_custom_view 

1751 

1752 # @classmethod 

1753 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1754 # updated_custom_view = cls.collection().find_one_and_update( 

1755 # {"_id": ObjectId(custom_view_id)}, 

1756 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1757 # return_document=ReturnDocument.AFTER, 

1758 # ) 

1759 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1760 # del updated_custom_view["_id"] 

1761 # return cls(**updated_custom_view) 

1762 

1763 # @classmethod 

1764 # def delete(cls, custom_view_id) -> bool: 

1765 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1766 # return deleted_custom_view.acknowledged 

1767 

1768 

1769CustomViewUpdate = create_update_model(CustomView) 

1770 

1771 

1772class Video(GenericMongo): 

1773 collection_name: ClassVar[str] = "videos" 

1774 

1775 # Properties 

1776 name: str 

1777 ip_addr: str 

1778 username: str | None = None 

1779 password: str | None = None 

1780 

1781 # Methods 

1782 @classmethod 

1783 def get_all(cls, sort_by="_id") -> list[Self]: 

1784 items = [] 

1785 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1786 items.append(cls.mongo_dict_to_object(dict_)) 

1787 return items 

1788 

1789 @classmethod 

1790 def get_video(cls, camera_id: ObjectId): 

1791 camera = cls.get_from_id(camera_id) 

1792 if camera is not None: 

1793 return camera.name 

1794 return None 

1795 

1796 

1797class Command(GenericMongo): 

1798 collection_name: ClassVar[str] = "commands" 

1799 

1800 # Properties 

1801 timestamp: datetime.datetime = None 

1802 sent_at: float 

1803 response_time: float = 0.0 

1804 command_type: str 

1805 description: str = "" 

1806 succeeded: bool = False 

1807 

1808 # Foreign key 

1809 user_id: str 

1810 

1811 @classmethod 

1812 def collection(cls): 

1813 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1814 

1815 @classmethod 

1816 def create(cls, command: Self): 

1817 command = cls( 

1818 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1819 sent_at=command.sent_at, 

1820 response_time=command.response_time, 

1821 command_type=command.command_type, 

1822 description=command.description, 

1823 succeeded=command.succeeded, 

1824 user_id=command.user_id, 

1825 ) 

1826 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

1827 if new_command is None: 

1828 return None 

1829 return {"command_id": str(new_command.inserted_id)} 

1830 

1831 def receive_response(self, response: dict): 

1832 self.response_time = time.time() - self.sent_at 

1833 self.succeeded = response.get("error", True) is False 

1834 if self.description == "": 

1835 self.description += response.get("message", "").rstrip() 

1836 

1837 

1838class SignalsPresetCreation(GenericMongo): 

1839 name: str 

1840 signal_ids: list[str] 

1841 

1842 

1843class SignalsPreset(SignalsPresetCreation): 

1844 collection_name: ClassVar[str] = "signals_presets" 

1845 

1846 user_id: str 

1847 

1848 @classmethod 

1849 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

1850 signals_preset = cls( 

1851 user_id=user_id, 

1852 name=signals_preset.name, 

1853 signal_ids=signals_preset.signal_ids, 

1854 ) 

1855 

1856 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

1857 

1858 return str(new_signal_preset.inserted_id) 

1859 

1860 

1861SignalsPresetUpdate = create_update_model(SignalsPreset) 

1862 

1863 

1864class LineStyle(str, Enum): 

1865 solid = "solid" 

1866 dotted = "dotted" 

1867 dashed = "dashed" 

1868 

1869 

1870class SignalAppearance: 

1871 value_color: str 

1872 forced_value_color: str 

1873 

1874 

1875class GraphThemeCreation(GenericMongo): 

1876 collection_name: ClassVar[str] = "graph_themes" 

1877 

1878 name: str 

1879 signal_id: str 

1880 value_color: str = "" 

1881 forced_value_color: str = "" 

1882 value_line_style: LineStyle = LineStyle.solid 

1883 forced_value_line_style: LineStyle = LineStyle.solid 

1884 private: bool = True 

1885 

1886 

1887class PublicGraphTheme(GraphThemeCreation): 

1888 created_by_user: bool 

1889 in_user_library: bool 

1890 active_for_user: bool 

1891 

1892 _current_user_id: str = "" 

1893 

1894 @classproperty 

1895 def custom_pipeline_steps(cls) -> dict[str, list]: 

1896 return { 

1897 "created_by_user": [ 

1898 { 

1899 "$addFields": { 

1900 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

1901 } 

1902 } 

1903 ], 

1904 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

1905 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

1906 ], 

1907 "in_user_library": [ 

1908 { 

1909 "$addFields": { 

1910 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

1911 } 

1912 } 

1913 ], 

1914 "active_for_user": [ 

1915 { 

1916 "$addFields": { 

1917 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

1918 } 

1919 } 

1920 ], 

1921 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

1922 "active": [ 

1923 { 

1924 "$addFields": { 

1925 "active": "$$REMOVE", 

1926 } 

1927 } 

1928 ], 

1929 "creator_id": [ 

1930 { 

1931 "$addFields": { 

1932 "creator_id": "$$REMOVE", 

1933 } 

1934 } 

1935 ], 

1936 } 

1937 

1938 @classmethod 

1939 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

1940 cls._current_user_id = user_id 

1941 return super().response_from_query(query) 

1942 

1943 @classmethod 

1944 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

1945 query.in_user_library = "true" 

1946 return cls.response_from_query(query, user_id) 

1947 

1948 @classmethod 

1949 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

1950 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

1951 

1952 @classmethod 

1953 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

1954 cls._current_user_id = user_id 

1955 return super().get_by_attribute(attribute_name, attribute_value) 

1956 

1957 @classmethod 

1958 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

1959 cls._current_user_id = user_id 

1960 return super().get_one_by_attribute(attribute_name, attribute_value) 

1961 

1962 @classmethod 

1963 def get_all(cls, sort_by: str, user_id: str): 

1964 cls._current_user_id = user_id 

1965 return super().get_all(sort_by) 

1966 

1967 @classmethod 

1968 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

1969 pipeline = [ 

1970 { 

1971 "$match": { 

1972 "active": {"$eq": user_id}, 

1973 "signal_id": {"$in": signal_ids}, 

1974 } 

1975 }, 

1976 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

1977 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

1978 { 

1979 "$project": { 

1980 "_id": 0, 

1981 "signal_id": 1, 

1982 "value_color": 1, 

1983 "forced_value_color": 1, 

1984 "value_line_style": 1, 

1985 "forced_value_line_style": 1, 

1986 } 

1987 }, 

1988 ] 

1989 

1990 result = {} 

1991 

1992 cursor = cls.collection().aggregate(pipeline) 

1993 for document in cursor: 

1994 signal_id = document["signal_id"] 

1995 del document["signal_id"] 

1996 result[signal_id] = document 

1997 

1998 return result 

1999 

2000 

2001GraphThemeUpdate = create_update_model(PublicGraphTheme) 

2002 

2003 

2004class PrivateGraphTheme(GraphThemeCreation): 

2005 # private 

2006 creator_id: str 

2007 in_library: list[str] 

2008 active: list[str] 

2009 

2010 @classmethod 

2011 def create( 

2012 cls, 

2013 creator_id: str, 

2014 name: str, 

2015 signal_id: str, 

2016 value_color: str, 

2017 forced_value_color: str, 

2018 value_line_style: LineStyle, 

2019 forced_value_line_style: LineStyle, 

2020 private: bool, 

2021 ): 

2022 color_setting = cls( 

2023 creator_id=creator_id, 

2024 name=name, 

2025 signal_id=signal_id, 

2026 value_color=value_color, 

2027 forced_value_color=forced_value_color, 

2028 value_line_style=value_line_style, 

2029 forced_value_line_style=forced_value_line_style, 

2030 private=private, 

2031 in_library=[creator_id], 

2032 active=[creator_id], 

2033 ) 

2034 

2035 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2036 color_setting.id = str(new_color_setting.inserted_id) 

2037 return color_setting 

2038 

2039 def update(self, update_dict: dict, user_id: str): 

2040 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2041 if in_user_lib and user_id not in self.in_library: 

2042 self.in_library.append(user_id) 

2043 elif not in_user_lib and user_id in self.in_library: 

2044 self.in_library.remove(user_id) 

2045 update_dict["in_library"] = self.in_library 

2046 del update_dict["in_user_library"] 

2047 

2048 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2049 if active_for_user and user_id not in self.active: 

2050 self.active.append(user_id) 

2051 elif not active_for_user and user_id in self.active: 

2052 self.active.remove(user_id) 

2053 update_dict["active"] = self.active 

2054 del update_dict["active_for_user"] 

2055 

2056 if update_dict.get("created_by_user") is not None: 

2057 del update_dict["created_by_user"] 

2058 

2059 self.collection().find_one_and_update( 

2060 {"_id": ObjectId(self.id)}, 

2061 {"$set": update_dict}, 

2062 ) 

2063 

2064 return {}