Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%

1155 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-25 07:17 +0000

1from functools import cached_property 

2import os 

3import io 

4import time 

5import csv 

6from typing import Self, ClassVar, Any, Literal, get_args 

7import datetime 

8import math 

9import bisect 

10from enum import Enum 

11import logging 

12import copy 

13import asyncio 

14 

15import zipfile 

16import ping3 

17import pytz 

18from bson.objectid import ObjectId 

19from pymongo import ASCENDING, ReturnDocument 

20from pydantic import BaseModel, computed_field, Field, create_model 

21import numpy as npy 

22import lttb 

23import h5py 

24 

25# from scipy import signal as signal_scipy 

26 

27from twinpad_backend.db import ( 

28 get_collection, 

29 get_async_collection, 

30 get_signal_collection, 

31 systems_database, 

32 systems_async_database, 

33 signals_database, 

34 devices_states_database, 

35) 

36from twinpad_backend.responses import ListResponse 

37from twinpad_backend.messages import send_mode_change, send_signal_value 

38 

39TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float} 

40 

41 

42RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker") 

43MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db") 

44SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage") 

45HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage") 

46DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer") 

47 

48DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0)) 

49NUMBER_SAMPLES_DATABASE_UPDATE = 120 

50 

51logger = logging.getLogger("uvicorn.error") 

52 

53 

54class classproperty: 

55 """ 

56 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13. 

57 Found here: https://stackoverflow.com/a/76301341 

58 """ 

59 

60 def __init__(self, func): 

61 self.fget = func 

62 

63 def __get__(self, _, owner): 

64 return self.fget(owner) 

65 

66 

67def create_update_model(model): 

68 fields = {} 

69 

70 for field_name, field_annotation in model.model_fields.items(): 

71 if field_name != "id": 

72 fields[field_name] = (field_annotation.annotation | None, None) 

73 

74 query_name = model.__name__ + "Update" 

75 return create_model(query_name, **fields) 

76 

77 

78def get_utc_date_from_timestamp(timestamp: float): 

79 return datetime.datetime.fromtimestamp(timestamp).isoformat() 

80 

81 

82def downsample_list(time_vector: list, values: list, max_number_samples: int): 

83 if len(time_vector) < max_number_samples: 

84 return time_vector, values 

85 

86 time_vector_copy = copy.deepcopy(time_vector) 

87 values_copy = copy.deepcopy(values) 

88 

89 none_group_bounds = [] 

90 none_group_index = -1 

91 index = -1 

92 # LTTB doesn't handle None values so remove them 

93 while values_copy.count(None) > 0: 

94 # Store bounds of None value groups so we can insert them back after the downsampling 

95 if (new_index := values_copy.index(None)) != index: 

96 none_group_bounds.append([time_vector_copy.pop(new_index)]) 

97 none_group_index += 1 

98 elif len(none_group_bounds[none_group_index]) < 2: 

99 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index)) 

100 else: 

101 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index) 

102 values_copy.pop(new_index) 

103 index = new_index 

104 number_none_values = sum(len(none_group) for none_group in none_group_bounds) 

105 

106 try: 

107 values_array = npy.array([time_vector_copy, values_copy]).T 

108 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values) 

109 

110 new_time_vector = interpolated_values[:, 0].tolist() 

111 new_values = interpolated_values[:, 1].tolist() 

112 except ValueError: 

113 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace 

114 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist() 

115 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64"))) 

116 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist() 

117 return new_time_vector, new_values_nan_to_none 

118 

119 # insert back None values at the correct timestamps 

120 for none_group in none_group_bounds: 

121 start_index = npy.searchsorted(new_time_vector, none_group[0]) 

122 new_time_vector[start_index:start_index] = none_group 

123 new_values[start_index:start_index] = [None for _ in range(len(none_group))] 

124 

125 return new_time_vector, new_values 

126 

127 

128def is_of_type(value, wanted_type): 

129 if wanted_type is float: 

130 return isinstance(value, (int, float)) 

131 return isinstance(value, wanted_type) 

132 

133 

134# Models 

135class TwinPadModel(BaseModel): 

136 @classmethod 

137 def dict_to_object(cls, dict_): 

138 return cls.model_validate(dict_) 

139 

140 def to_dict(self, exclude=None): 

141 dict_ = self.model_dump(exclude=exclude) 

142 return dict_ 

143 

144 

145class GenericMongo(TwinPadModel): 

146 id: str | None = None 

147 custom_pipeline_steps: ClassVar[dict[str, list]] = {} 

148 

149 @classmethod 

150 def collection(cls): 

151 return get_collection(systems_database, cls.collection_name, create=True) 

152 

153 @classmethod 

154 def response_from_query(cls, query) -> ListResponse[Self]: 

155 request_filters = query.mongodb_filter() 

156 items = [] 

157 if ":" in query.sort_by: 

158 sort_field, sort_order = query.sort_by.split(":") 

159 sort_order = int(sort_order) 

160 else: 

161 sort_field = query.sort_by 

162 sort_order = 1 

163 collection = get_collection(systems_database, cls.collection_name, create=True) 

164 total = collection.count_documents(request_filters) 

165 

166 pipeline = [] 

167 added_properties = [] 

168 if "$and" in request_filters: 

169 for request_filter in request_filters["$and"]: 

170 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

171 if filtered_property in request_filter: 

172 pipeline.extend(pipeline_steps) 

173 added_properties.append(filtered_property) 

174 else: 

175 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items(): 

176 if filtered_property in request_filters: 

177 pipeline.extend(pipeline_steps) 

178 added_properties.append(filtered_property) 

179 pipeline.append({"$match": request_filters}) 

180 if sort_field in cls.custom_pipeline_steps: 

181 pipeline.extend(cls.custom_pipeline_steps[sort_field]) 

182 added_properties.append(sort_field) 

183 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}]) 

184 

185 if (query.limit is not None) and (query.limit != 0): 

186 pipeline.append({"$limit": query.limit}) 

187 

188 for filtered_property, step in cls.custom_pipeline_steps.items(): 

189 if filtered_property not in added_properties: 

190 pipeline.extend(step) 

191 

192 cursor = collection.aggregate(pipeline) 

193 

194 for item_dict in cursor: 

195 items.append(cls.mongo_dict_to_object(item_dict)) 

196 

197 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

198 

199 @classmethod 

200 def get_from_id(cls, item_id) -> Self | None: 

201 return cls.get_one_by_attribute("_id", ObjectId(item_id)) 

202 

203 @classmethod 

204 def mongo_dict_to_object(cls, mongo_dict): 

205 mongo_dict["id"] = str(mongo_dict["_id"]) 

206 del mongo_dict["_id"] 

207 return cls.dict_to_object(mongo_dict) 

208 

209 @classmethod 

210 def get_by_attribute(cls, attribute_name: str, attribute_value): 

211 """Returns all items that match the attribute with value.""" 

212 pipeline = [] 

213 if attribute_name in cls.custom_pipeline_steps: 

214 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

215 pipeline.append({"$match": {attribute_name: attribute_value}}) 

216 for key, step in cls.custom_pipeline_steps.items(): 

217 if key != attribute_name: 

218 pipeline.extend(step) 

219 items = cls.collection().aggregate(pipeline) 

220 if items is None: 

221 return None 

222 return [cls.mongo_dict_to_object(d) for d in items] 

223 

224 @classmethod 

225 def get_one_by_attribute(cls, attribute_name: str, attribute_value): 

226 pipeline = [] 

227 if attribute_name in cls.custom_pipeline_steps: 

228 pipeline.extend(cls.custom_pipeline_steps[attribute_name]) 

229 pipeline.append({"$match": {attribute_name: attribute_value}}) 

230 pipeline.append({"$limit": 1}) 

231 for key, step in cls.custom_pipeline_steps.items(): 

232 if key != attribute_name: 

233 pipeline.extend(step) 

234 items = cls.collection().aggregate(pipeline).to_list() 

235 if len(items) == 0: 

236 return None 

237 return cls.mongo_dict_to_object(items[0]) 

238 

239 @classmethod 

240 def get_all(cls, sort_by="_id") -> list[Self]: 

241 items = [] 

242 pipeline = [] 

243 if sort_by in cls.custom_pipeline_steps: 

244 pipeline.extend(cls.custom_pipeline_steps[sort_by]) 

245 pipeline.append({"$sort": {sort_by: ASCENDING}}) 

246 for key, step in cls.custom_pipeline_steps.items(): 

247 if key != sort_by: 

248 pipeline.extend(step) 

249 for dict_ in cls.collection().aggregate(pipeline): 

250 items.append(cls.mongo_dict_to_object(dict_)) 

251 return items 

252 

253 @classmethod 

254 def get_number_documents(cls): 

255 collection = get_collection(systems_database, cls.collection_name) 

256 if collection is None: 

257 return 0 

258 return collection.count_documents({}) 

259 

260 def insert(self): 

261 insert_result = self.collection().insert_one(self.to_dict(exclude={id})) 

262 self.id = str(insert_result.inserted_id) 

263 return self.id 

264 

265 def update(self, update_dict): 

266 for key, value in update_dict.items(): 

267 setattr(self, key, value) 

268 self.collection().find_one_and_update( 

269 {"_id": ObjectId(self.id)}, 

270 {"$set": update_dict}, 

271 return_document=ReturnDocument.AFTER, 

272 ) 

273 

274 return self 

275 

276 def delete(self): 

277 result = self.collection().delete_one({"_id": ObjectId(self.id)}) 

278 return result.deleted_count > 0 

279 

280 

281class User(GenericMongo): 

282 collection_name: ClassVar[str] = "users" 

283 

284 firstname: str 

285 lastname: str 

286 email: str 

287 password: str 

288 is_active: bool | None = False 

289 is_admin: bool | None = False 

290 is_connected: bool | None = False 

291 company_id: str | None = None 

292 

293 def to_dict(self, exclude=None): 

294 if exclude is None: 

295 exclude = {"password"} 

296 return GenericMongo.to_dict(self, exclude=exclude) 

297 

298 @classmethod 

299 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False): 

300 users = cls.get_all() 

301 if not users: 

302 is_admin = True 

303 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin) 

304 user_collection = get_collection(systems_database, "users", create=True) 

305 new_user = user_collection.insert_one(user.model_dump(exclude={"id"})) 

306 if new_user is None: 

307 return None 

308 return {"user_id": str(new_user.inserted_id)} 

309 

310 @classmethod 

311 def update_info(cls, user: "UserUpdate", user_id: str): 

312 updated_user = cls.collection().find_one_and_update( 

313 {"_id": ObjectId(user_id)}, 

314 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}}, 

315 return_document=ReturnDocument.AFTER, 

316 ) 

317 updated_user["id"] = str(updated_user["_id"]) 

318 del (updated_user["_id"], updated_user["is_connected"]) 

319 return cls(**updated_user) 

320 

321 

322UserUpdate = create_update_model(User) 

323 

324 

325class Mode(TwinPadModel): 

326 mode_id: int 

327 name: str 

328 frequency_multiplier: float 

329 min_frequency: float 

330 

331 

332class DeviceUpdate(TwinPadModel): 

333 mode_id: int 

334 

335 

336class Device(GenericMongo): 

337 collection_name: ClassVar[str] = "devices" 

338 

339 device_id: str 

340 name: str 

341 description: str = "" 

342 modes: list[Mode] 

343 current_mode_id: int | None = None 

344 last_ping: float | None = None 

345 petri_network: Any 

346 pid: Any 

347 load: float | None = None 

348 tokens: list[int] = Field(default_factory=list) 

349 status: str 

350 

351 async def change_mode(self, update_dict, current_user: User): 

352 has_error = False 

353 

354 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes): 

355 has_error = True 

356 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}" 

357 elif self.current_mode_id is not None: 

358 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}" 

359 else: 

360 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}" 

361 command = Command( 

362 sent_at=time.time(), 

363 command_type="Mode change", 

364 description=description, 

365 user_id=current_user.id, 

366 ) 

367 

368 if has_error: 

369 command.response_time = 0 

370 command.succeeded = False 

371 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"} 

372 else: 

373 response = await send_mode_change(self.device_id, update_dict.mode_id) 

374 command.receive_response(response) 

375 

376 Command.create(command) 

377 return response 

378 

379 @classmethod 

380 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict: 

381 devices_by_id = {} 

382 for signal_id in signal_ids: 

383 device_id = signal_id.split(".")[0] 

384 if device_id not in devices_by_id: 

385 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id) 

386 return devices_by_id 

387 

388 

389class DeviceSetup(GenericMongo): 

390 collection_name: ClassVar[str] = "device_setups" 

391 

392 device_ids: list[str] 

393 active: bool = False 

394 variable_mapping: dict[str, str] 

395 

396 

397DeviceSetupUpdate = create_update_model(DeviceSetup) 

398 

399 

400class DeviceState(GenericMongo): 

401 collection_name: ClassVar[str] = "devices_states" 

402 

403 timestamp: float 

404 mode: str | None = None 

405 load: float | None = None 

406 tokens: list[int] = Field(default_factory=list) 

407 modified_properties: list[str] = Field(default_factory=list) 

408 

409 @classmethod 

410 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]: 

411 req_filter = query.mongodb_filter() 

412 items = [] 

413 if ":" in query.sort_by: 

414 sort_field, sort_order = query.sort_by.split(":") 

415 sort_order = int(sort_order) 

416 else: 

417 sort_field = query.sort_by 

418 sort_order = 1 

419 collection = get_collection(devices_states_database, device_id) 

420 if collection is None: 

421 total = 0 

422 cursor = [] 

423 else: 

424 total = collection.count_documents(req_filter) 

425 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset) 

426 if (query.limit is not None) and (query.limit != 0): 

427 cursor = cursor.limit(query.limit) 

428 for item_dict in cursor: 

429 items.append( 

430 cls( 

431 timestamp=item_dict.get("precise_timestamp"), 

432 mode=item_dict.get("mode", None), 

433 load=item_dict.get("load", None), 

434 tokens=item_dict.get("tokens", Field(default_factory=list)), 

435 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)), 

436 ) 

437 ) 

438 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total) 

439 

440 

441class SignalSample(TwinPadModel): 

442 signal_id: str 

443 timestamp: float 

444 value: float | int | str | bool | None 

445 forced_value: float | int | str | bool | None = None 

446 

447 @classmethod 

448 def get_first_from_signal_id(cls, signal_id: str) -> Self | None: 

449 

450 collection = get_signal_collection(signal_id) 

451 if collection is None: 

452 return None 

453 

454 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document 

455 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4 

456 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

457 first_bucket = None 

458 if bucket is not None: 

459 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

460 if first_bucket is not None: 

461 sample_data = collection.find_one( 

462 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]} 

463 ) 

464 else: 

465 sample_data = collection.find_one({}, sort=[("precise_timestamp", 1)]) 

466 

467 if sample_data is None: 

468 return None 

469 

470 timestamp = sample_data["precise_timestamp"] 

471 

472 return cls( 

473 signal_id=signal_id, 

474 timestamp=timestamp, 

475 value=sample_data.get("value", None), 

476 forced_value=sample_data.get("forced_value", None), 

477 ) 

478 

479 @classmethod 

480 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

481 return [cls.get_first_from_signal_id(sid) for sid in signal_ids] 

482 

483 @classmethod 

484 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None: 

485 

486 collection = get_signal_collection(signal_id) 

487 if collection is None: 

488 return None 

489 

490 sample_data = collection.find_one({}, sort=[("precise_timestamp", -1)]) 

491 

492 if sample_data is None: 

493 return None 

494 

495 timestamp = sample_data["precise_timestamp"] 

496 

497 if device is None: 

498 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0]) 

499 if device is not None and device.last_ping is not None: 

500 if timestamp is None: 

501 timestamp = device.last_ping 

502 else: 

503 timestamp = max(timestamp, device.last_ping) 

504 return cls( 

505 signal_id=signal_id, 

506 timestamp=timestamp, 

507 value=sample_data.get("value", None), 

508 forced_value=sample_data.get("forced_value", None), 

509 ) 

510 

511 @classmethod 

512 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None: 

513 collection = get_signal_collection(signal_id) 

514 if collection is None: 

515 return None 

516 

517 sample_data = collection.find_one( 

518 {"precise_timestamp": {"$gte": min_timestamp}}, sort=[("precise_timestamp", -1)] 

519 ) 

520 if sample_data is None: 

521 return None 

522 

523 return cls( 

524 signal_id=signal_id, 

525 timestamp=sample_data.get("precise_timestamp"), 

526 value=sample_data.get("value"), 

527 forced_value=sample_data.get("forced_value"), 

528 ) 

529 

530 @classmethod 

531 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None: 

532 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids) 

533 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids] 

534 

535 @classmethod 

536 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None: 

537 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids] 

538 

539 

540class SignalData(TwinPadModel): 

541 signal_id: str 

542 forcible: bool = True 

543 time_vector: list[float] 

544 values: list[float | int | str | None] 

545 forced_values: list[float | int | str | None] 

546 

547 data_start: float | None = None 

548 data_end: float | None = None 

549 

550 number_samples: int = 0 

551 number_samples_db: int = 0 

552 

553 db_query_time: float = 0.0 

554 init_time: float = 0.0 

555 data_processing_time: float = 0.0 

556 

557 @classmethod 

558 def get_from_signal_id( 

559 cls, 

560 signal_id: str, 

561 min_timestamp: float = None, 

562 max_timestamp: float = None, 

563 window_min_timestamp: float = None, 

564 window_max_timestamp: float = None, 

565 interpolate_bounds: bool = True, 

566 max_documents: int = None, 

567 ) -> Self: 

568 

569 now = time.time() 

570 

571 req_signal = {} 

572 if min_timestamp is not None: 

573 req_signal.setdefault("timestamp", {}) 

574 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp) 

575 if max_timestamp is not None: 

576 req_signal.setdefault("timestamp", {}) 

577 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp) 

578 

579 collection = get_signal_collection(signal_id) 

580 if collection is None: 

581 return cls( 

582 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0 

583 ) 

584 

585 db_req_start = time.time() 

586 

587 sort_step = {"$sort": {"precise_timestamp": 1}} 

588 number_results = collection.count_documents(req_signal) 

589 

590 pipeline = [] 

591 if req_signal: 

592 pipeline.append({"$match": req_signal}) # Filter data if needed 

593 

594 pipeline.extend( 

595 [ 

596 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}}, 

597 sort_step, 

598 ] 

599 ) 

600 

601 if max_documents is not None and max_documents < number_results: 

602 unsampling_ratio = math.ceil(number_results / max_documents) 

603 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results) 

604 pipeline.extend( 

605 [ 

606 { 

607 "$setWindowFields": { 

608 "sortBy": {"precise_timestamp": 1}, 

609 "output": {"index": {"$documentNumber": {}}}, 

610 } 

611 }, 

612 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}}, 

613 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}}, 

614 {"$replaceRoot": {"newRoot": "$doc"}}, 

615 {"$unset": ["index", "group_id"]}, 

616 {"$sort": {"precise_timestamp": 1}}, 

617 ] 

618 ) 

619 

620 # logger.info(f"pipeline: %s", str(pipeline)) 

621 cursor = collection.aggregate(pipeline) 

622 db_req_time = time.time() - db_req_start 

623 

624 init_time = time.time() 

625 

626 results = cursor.to_list() 

627 time_vector = [] 

628 values = [] 

629 forced_values = [] 

630 for s in results: 

631 time_vector.append(s["precise_timestamp"]) 

632 values.append(s.get("value", None)) 

633 forced_values.append(s.get("forced_value", None)) 

634 

635 signal = Signal.get_from_signal_id(signal_id) 

636 class_ = signal.signal_data_class 

637 

638 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None: 

639 time_vector, values, forced_values = cls.interpolate_bounds( 

640 class_, 

641 collection, 

642 signal_id, 

643 time_vector, 

644 values, 

645 forced_values, 

646 window_min_timestamp, 

647 window_max_timestamp, 

648 ) 

649 

650 if values: 

651 # TODO: check below. a bit strange 

652 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT: 

653 # Adding last value as it should be repeated 

654 time_vector.append(now) 

655 values.append(values[-1]) 

656 forced_values.append(forced_values[-1]) 

657 

658 init_time = time.time() - init_time 

659 

660 # See line 292 for explanation 

661 bucket = get_signal_collection(f"system.buckets.{signal_id}") 

662 first_bucket = None 

663 if bucket is not None: 

664 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1}) 

665 if first_bucket is not None: 

666 data_start = first_bucket["control"]["min"]["precise_timestamp"] 

667 else: 

668 data_start = None 

669 

670 last_bucket = None 

671 if bucket is not None: 

672 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

673 if last_bucket is not None: 

674 data_end = last_bucket["control"]["max"]["precise_timestamp"] 

675 else: 

676 data_end = None 

677 

678 return class_( 

679 signal_id=signal_id, 

680 forcible=signal.forcible, 

681 time_vector=time_vector, 

682 values=values, 

683 forced_values=forced_values, 

684 data_start=data_start, 

685 data_end=data_end, 

686 number_samples=len(values), 

687 number_samples_db=number_results, 

688 db_query_time=db_req_time, 

689 init_time=init_time, 

690 ) 

691 

692 @staticmethod 

693 def interpolate_bounds( 

694 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp 

695 ): 

696 sample_right = None 

697 # Fetching right side value & interpolation 

698 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5 

699 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit): 

700 sample_right = collection.find_one( 

701 { 

702 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)}, 

703 "value": {"$exists": True}, 

704 }, 

705 sort=[("precise_timestamp", -1)], 

706 ) 

707 if sample_right: 

708 if time_vector: 

709 right_sd = class_( 

710 signal_id=signal_id, 

711 time_vector=[time_vector[-1], sample_right["precise_timestamp"]], 

712 values=[values[-1], sample_right.get("value", None)], 

713 forced_values=[forced_values[-1], sample_right.get("forced_value", None)], 

714 ) 

715 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0] 

716 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0] 

717 else: 

718 max_ts_value = sample_right.get("value", None) 

719 max_ts_forced_value = sample_right.get("forced_value", None) 

720 time_vector.append(window_max_timestamp) 

721 values.append(max_ts_value) 

722 forced_values.append(max_ts_forced_value) 

723 

724 # Fetching left side value & interpolation 

725 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5 

726 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit): 

727 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp: 

728 sample_left = sample_right 

729 sample_left = collection.find_one( 

730 { 

731 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)}, 

732 "value": {"$exists": True}, 

733 }, 

734 sort=[("precise_timestamp", -1)], 

735 ) 

736 

737 if sample_left: 

738 if time_vector: 

739 left_sd = class_( 

740 signal_id=signal_id, 

741 time_vector=[sample_left["precise_timestamp"], time_vector[0]], 

742 values=[sample_left["value"], values[0]], 

743 forced_values=[sample_left.get("forced_value", None), forced_values[0]], 

744 ) 

745 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0] 

746 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0] 

747 else: 

748 min_ts_value = sample_left.get("value", None) 

749 min_ts_forced_value = sample_left.get("forced_value", None) 

750 time_vector.insert(0, window_min_timestamp) 

751 values.insert(0, min_ts_value) 

752 forced_values.insert(0, min_ts_forced_value) 

753 

754 return time_vector, values, forced_values 

755 

756 def interpolate_values(self, new_time_vector: list[float]): 

757 return self.interpolate(new_time_vector, self.values) 

758 

759 def interpolate_forced_values(self, new_time_vector: list[float]): 

760 return self.interpolate(new_time_vector, self.forced_values) 

761 

762 def uniform_desampling(self, number_samples_max: int) -> Self: 

763 data_processing_time = time.time() 

764 if number_samples_max and self.number_samples > number_samples_max: 

765 new_time_vector = npy.linspace( 

766 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False 

767 ).tolist() 

768 values = self.interpolate_values(new_time_vector) 

769 forced_values = self.interpolate_forced_values(new_time_vector) 

770 time_vector = new_time_vector 

771 number_samples = len(time_vector) 

772 else: 

773 time_vector = self.time_vector 

774 number_samples = len(self.values) 

775 values = self.values[:] 

776 forced_values = self.forced_values[:] 

777 data_processing_time = time.time() - data_processing_time 

778 

779 return self.__class__( 

780 signal_id=self.signal_id, 

781 time_vector=time_vector, 

782 values=values, 

783 forced_values=forced_values, 

784 number_samples=number_samples, 

785 number_samples_db=self.number_samples, 

786 data_start=self.data_start, 

787 data_end=self.data_end, 

788 db_query_time=self.db_query_time, 

789 init_time=self.init_time, 

790 data_processing_time=self.data_processing_time + data_processing_time, 

791 ) 

792 

793 def interest_window_desampling( 

794 self, 

795 window_max_number_samples: int, 

796 outside_max_number_samples: int, 

797 window_min_timestamp: float | None = None, 

798 window_max_timestamp: float | None = None, 

799 ) -> Self: 

800 """Performs a sampling in a window of interest and outside.""" 

801 

802 if not self.time_vector: 

803 return self 

804 

805 if window_min_timestamp is None: 

806 window_min_timestamp = self.time_vector[0] 

807 if window_max_timestamp is None: 

808 window_max_timestamp = self.time_vector[-1] 

809 

810 data_processing_time = time.time() 

811 

812 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

813 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

814 

815 time_vector_before = self.time_vector[:index_window_start] 

816 time_vector_window = self.time_vector[index_window_start:index_window_end] 

817 time_vector_after = self.time_vector[index_window_end:] 

818 

819 # Resampling window 

820 if time_vector_window: 

821 # Ensurring window bounds 

822 if time_vector_window[0] != window_min_timestamp: 

823 time_vector_window.insert(0, window_min_timestamp) 

824 if time_vector_window[-1] != window_max_timestamp: 

825 time_vector_window.append(window_max_timestamp) 

826 else: 

827 time_vector_window = [window_min_timestamp, window_max_timestamp] 

828 

829 if len(time_vector_window) > window_max_number_samples: 

830 # Resampling 

831 new_window_time_vector = npy.linspace( 

832 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True 

833 ).tolist() 

834 time_vector_window = new_window_time_vector 

835 

836 # Resampling outside 

837 number_samples_before = len(time_vector_before) 

838 number_samples_after = len(time_vector_after) 

839 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

840 new_number_samples_before = min( 

841 number_samples_before, 

842 math.ceil( 

843 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

844 ), 

845 ) 

846 new_number_samples_after = min( 

847 number_samples_after, 

848 math.ceil( 

849 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

850 ), 

851 ) 

852 # Adjusting numbers as math.ceil can do +1 on sum 

853 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

854 if new_number_samples_before > new_number_samples_after: 

855 new_number_samples_before -= 1 

856 else: 

857 new_number_samples_after -= 1 

858 

859 if new_number_samples_before > 0: 

860 new_time_vector_before = npy.linspace( 

861 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False 

862 ).tolist() 

863 time_vector_before = new_time_vector_before 

864 

865 if new_number_samples_after > 0: 

866 new_time_vector_after = npy.linspace( 

867 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False 

868 ).tolist()[::-1] 

869 time_vector_after = new_time_vector_after 

870 

871 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

872 values = self.interpolate_values(new_time_vector) 

873 forced_values = self.interpolate_forced_values(new_time_vector) 

874 number_samples = len(values) 

875 

876 data_processing_time = time.time() - data_processing_time 

877 

878 return self.__class__( 

879 signal_id=self.signal_id, 

880 forcible=self.forcible, 

881 time_vector=new_time_vector, 

882 values=values, 

883 forced_values=forced_values, 

884 number_samples=number_samples, 

885 number_samples_db=self.number_samples, 

886 data_start=self.data_start, 

887 data_end=self.data_end, 

888 db_query_time=self.db_query_time, 

889 init_time=self.init_time, 

890 data_processing_time=self.data_processing_time + data_processing_time, 

891 ) 

892 

893 def csv_export(self): 

894 output = io.StringIO() 

895 writer = csv.writer(output) 

896 writer.writerow(["timestamp", "value", "forced_value"]) # Write header 

897 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

898 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value]) 

899 return output.getvalue().encode("utf-8") 

900 

901 def prestoplot_export(self): 

902 clean_signal_id = self.signal_id.replace(".", "_") 

903 if clean_signal_id[0].isnumeric(): 

904 clean_signal_id = "_" + clean_signal_id 

905 

906 output = io.StringIO() 

907 output.write("# Encoding:\tUTF-8\n") 

908 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n") 

909 output.write("ISO8601\tnone\tnone\n") 

910 output.write(f"# Description :\t{clean_signal_id}\n") 

911 

912 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values): 

913 output.write( 

914 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n" 

915 ) 

916 return output.getvalue().encode("utf-8") 

917 

918 

919class NumericSignalData(SignalData): 

920 data_type: str = "float" 

921 values: list[float | int | None] 

922 forced_values: list[float | int | None] 

923 

924 def interpolate(self, new_time_vector: list[float], items): 

925 items = [npy.nan if s is None else s for s in items] 

926 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)] 

927 

928 def uniform_desampling(self, number_samples_max: int) -> Self: 

929 data_processing_time = time.time() 

930 if number_samples_max and self.number_samples > number_samples_max: 

931 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max) 

932 forced_values = self.interpolate_forced_values(time_vector) 

933 number_samples = len(time_vector) 

934 else: 

935 time_vector = self.time_vector 

936 number_samples = len(self.values) 

937 values = self.values[:] 

938 forced_values = self.forced_values[:] 

939 data_processing_time = time.time() - data_processing_time 

940 

941 return self.__class__( 

942 signal_id=self.signal_id, 

943 time_vector=time_vector, 

944 values=values, 

945 forced_values=forced_values, 

946 number_samples=number_samples, 

947 number_samples_db=self.number_samples, 

948 data_start=self.data_start, 

949 data_end=self.data_end, 

950 db_query_time=self.db_query_time, 

951 init_time=self.init_time, 

952 data_processing_time=self.data_processing_time + data_processing_time, 

953 ) 

954 

955 def interest_window_desampling( 

956 self, 

957 window_max_number_samples: int, 

958 outside_max_number_samples: int, 

959 window_min_timestamp: float | None = None, 

960 window_max_timestamp: float | None = None, 

961 ) -> Self: 

962 """Performs a sampling in a window of interest and outside.""" 

963 

964 if not self.time_vector: 

965 return self 

966 

967 if window_min_timestamp is None: 

968 window_min_timestamp = self.time_vector[0] 

969 if window_max_timestamp is None: 

970 window_max_timestamp = self.time_vector[-1] 

971 

972 data_processing_time = time.time() 

973 

974 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp) 

975 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp) 

976 

977 time_vector_before = self.time_vector[:index_window_start] 

978 time_vector_window = self.time_vector[index_window_start:index_window_end] 

979 time_vector_after = self.time_vector[index_window_end:] 

980 

981 values_before = self.values[:index_window_start] 

982 values_window = self.values[index_window_start:index_window_end] 

983 values_after = self.values[index_window_end:] 

984 window_min_value = self.interpolate_values([window_min_timestamp])[0] 

985 window_max_value = self.interpolate_values([window_max_timestamp])[0] 

986 

987 # Resampling window 

988 if time_vector_window: 

989 # Ensurring window bounds 

990 if time_vector_window[0] != window_min_timestamp: 

991 time_vector_window.insert(0, window_min_timestamp) 

992 values_window.insert(0, window_min_value) 

993 if time_vector_window[-1] != window_max_timestamp: 

994 time_vector_window.append(window_max_timestamp) 

995 values_window.append(window_max_value) 

996 else: 

997 time_vector_window = [window_min_timestamp, window_max_timestamp] 

998 values_window = [window_min_value, window_max_value] 

999 

1000 if len(time_vector_window) > window_max_number_samples: 

1001 # Resampling 

1002 time_vector_window, values_window = downsample_list( 

1003 time_vector_window, values_window, window_max_number_samples 

1004 ) 

1005 

1006 # Resampling outside 

1007 number_samples_before = len(time_vector_before) 

1008 number_samples_after = len(time_vector_after) 

1009 if (number_samples_before + number_samples_after) > outside_max_number_samples: 

1010 new_number_samples_before = min( 

1011 number_samples_before, 

1012 math.ceil( 

1013 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after) 

1014 ), 

1015 ) 

1016 new_number_samples_after = min( 

1017 number_samples_after, 

1018 math.ceil( 

1019 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after) 

1020 ), 

1021 ) 

1022 # Adjusting numbers as math.ceil can do +1 on sum 

1023 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples: 

1024 if new_number_samples_before > new_number_samples_after: 

1025 new_number_samples_before -= 1 

1026 else: 

1027 new_number_samples_after -= 1 

1028 

1029 if new_number_samples_before > 0: 

1030 time_vector_before, values_before = downsample_list( 

1031 time_vector_before, values_before, new_number_samples_before 

1032 ) 

1033 

1034 if new_number_samples_after > 0: 

1035 time_vector_after, values_after = downsample_list( 

1036 time_vector_after, values_after, new_number_samples_after 

1037 ) 

1038 

1039 new_time_vector = time_vector_before + time_vector_window + time_vector_after 

1040 values = values_before + values_window + values_after 

1041 forced_values = self.interpolate_forced_values(new_time_vector) 

1042 number_samples = len(values) 

1043 

1044 data_processing_time = time.time() - data_processing_time 

1045 

1046 return self.__class__( 

1047 signal_id=self.signal_id, 

1048 time_vector=new_time_vector, 

1049 values=values, 

1050 forced_values=forced_values, 

1051 number_samples=number_samples, 

1052 number_samples_db=self.number_samples, 

1053 data_start=self.data_start, 

1054 data_end=self.data_end, 

1055 db_query_time=self.db_query_time, 

1056 init_time=self.init_time, 

1057 data_processing_time=self.data_processing_time + data_processing_time, 

1058 ) 

1059 

1060 

1061class StringSignalData(SignalData): 

1062 data_type: str = "str" 

1063 values: list[str | None] 

1064 forced_values: list[str | None] 

1065 

1066 def interpolate(self, new_time_vector: list[float], items): 

1067 # Find the indices of the values in xp that are just smaller or equal to x 

1068 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1 

1069 indices = npy.clip(indices, 0, len(items) - 1) 

1070 # Return the corresponding left string values from fp 

1071 return [items[i] for i in indices] 

1072 

1073 

1074class SignalsData(TwinPadModel): 

1075 signals_data: list[NumericSignalData | StringSignalData | SignalData] 

1076 data_processing_time: float 

1077 data_start: float | None 

1078 data_end: float | None 

1079 

1080 @classmethod 

1081 def get_from_signal_ids( 

1082 cls, 

1083 signal_ids: list[str], 

1084 min_timestamp: float = None, 

1085 max_timestamp: float = None, 

1086 window_min_timestamp: float = None, 

1087 window_max_timestamp: float = None, 

1088 interpolate_bounds: bool = True, 

1089 max_documents: int = None, 

1090 ) -> Self: 

1091 signals_data = [] 

1092 data_start = None 

1093 data_end = None 

1094 if max_timestamp is None: 

1095 max_timestamp = time.time() 

1096 data_processing_time = 0.0 

1097 for signal_id in signal_ids: 

1098 signal_data = SignalData.get_from_signal_id( 

1099 signal_id=signal_id, 

1100 min_timestamp=min_timestamp, 

1101 max_timestamp=max_timestamp, 

1102 window_min_timestamp=window_min_timestamp, 

1103 window_max_timestamp=window_max_timestamp, 

1104 interpolate_bounds=interpolate_bounds, 

1105 max_documents=max_documents, 

1106 ) 

1107 data_processing_time += signal_data.data_processing_time 

1108 signals_data.append(signal_data) 

1109 if signal_data.data_start is not None: 

1110 if data_start is None: 

1111 data_start = signal_data.data_start 

1112 else: 

1113 data_start = min(signal_data.data_start, data_start) 

1114 if signal_data.data_end is not None: 

1115 if data_end is None: 

1116 data_end = signal_data.data_end 

1117 else: 

1118 data_end = max(signal_data.data_end, data_end) 

1119 

1120 return cls( 

1121 signals_data=signals_data, 

1122 data_processing_time=data_processing_time, 

1123 data_start=data_start, 

1124 data_end=data_end, 

1125 ) 

1126 

1127 def uniform_desampling(self, number_samples_max: int) -> Self: 

1128 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data] 

1129 return SignalsData( 

1130 signals_data=signals_data, 

1131 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1132 data_start=self.data_start, 

1133 data_end=self.data_end, 

1134 ) 

1135 

1136 def interest_window_desampling( 

1137 self, 

1138 window_max_number_samples: int, 

1139 outside_max_number_samples: int, 

1140 window_min_timestamp: float = None, 

1141 window_max_timestamp: float = None, 

1142 ) -> Self: 

1143 signals_data = [ 

1144 s.interest_window_desampling( 

1145 window_max_number_samples=window_max_number_samples, 

1146 outside_max_number_samples=outside_max_number_samples, 

1147 window_min_timestamp=window_min_timestamp, 

1148 window_max_timestamp=window_max_timestamp, 

1149 ) 

1150 for s in self.signals_data 

1151 ] 

1152 

1153 return SignalsData( 

1154 signals_data=signals_data, 

1155 data_processing_time=sum(s.data_processing_time for s in signals_data), 

1156 data_start=self.data_start, 

1157 data_end=self.data_end, 

1158 ) 

1159 

1160 def zip_export(self, file_format: str = "csv"): 

1161 # return self.signals_data[0].csv_export() 

1162 zip_buffer = io.BytesIO() 

1163 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: 

1164 for signal_data in self.signals_data: 

1165 if file_format == "csv": 

1166 export_io = signal_data.csv_export() 

1167 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io) 

1168 elif file_format == "prestoplot": 

1169 export_io = signal_data.prestoplot_export() 

1170 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io) 

1171 else: 

1172 raise ValueError(f"Format not found. Got: {file_format}") 

1173 zip_bytes = zip_buffer.getvalue() 

1174 # zip_bytes.seek(0) 

1175 return zip_bytes 

1176 

1177 def hdf5_export(self): 

1178 hdf5_buffer = io.BytesIO() 

1179 custom_type_float = npy.dtype( 

1180 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)] 

1181 ) 

1182 custom_type_string = npy.dtype( 

1183 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")] 

1184 ) 

1185 with h5py.File(hdf5_buffer, "w") as hdf5_file: 

1186 for signal_data in self.signals_data: 

1187 signal_group = hdf5_file.create_group(signal_data.signal_id) 

1188 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector] 

1189 if signal_data.data_type == "str": 

1190 export_data = npy.array( 

1191 list( 

1192 zip( 

1193 date_vector, 

1194 signal_data.time_vector, 

1195 signal_data.values, 

1196 signal_data.forced_values, 

1197 ) 

1198 ), 

1199 dtype=custom_type_string, 

1200 ) 

1201 else: 

1202 export_data = npy.array( 

1203 list( 

1204 zip( 

1205 date_vector, 

1206 signal_data.time_vector, 

1207 signal_data.values, 

1208 signal_data.forced_values, 

1209 ) 

1210 ), 

1211 dtype=custom_type_float, 

1212 ) 

1213 signal_group["data"] = export_data 

1214 return hdf5_buffer.getvalue() 

1215 

1216 

1217class SignalStatus(TwinPadModel): 

1218 status: str 

1219 reason: str 

1220 delay: float | None 

1221 

1222 

1223class DigitizationFunction(TwinPadModel): 

1224 bits: int | None = None 

1225 min_value: float 

1226 max_value: float 

1227 min_raw_value: float 

1228 max_raw_value: float 

1229 

1230 

1231class SignalUpdate(TwinPadModel): 

1232 value: float | str | bool | int | None = None 

1233 forced_value: float | str | bool | int | None = None 

1234 timestamp: int | None = None 

1235 

1236 

1237class SignalType(str, Enum): 

1238 command = "command" 

1239 sensor = "sensor" 

1240 external_sensor = "external_sensor" 

1241 

1242 

1243SIGNALDATA_TYPES = { 

1244 "int": NumericSignalData, 

1245 "float": NumericSignalData, 

1246 "str": StringSignalData, 

1247 "bool": NumericSignalData, 

1248 "epoch": NumericSignalData, 

1249} 

1250 

1251 

1252class Signal(GenericMongo): 

1253 collection_name: ClassVar[str] = "signals" 

1254 

1255 signal_id: str 

1256 frequency: float 

1257 unit: str | None 

1258 description: str 

1259 type: SignalType 

1260 data_type: str 

1261 precision_digits: int | None 

1262 forcible: bool 

1263 

1264 digitization_function: DigitizationFunction | None 

1265 

1266 @property 

1267 def device(self) -> Device: 

1268 device_id = self.signal_id.split(".")[0] 

1269 device = Device.get_one_by_attribute("device_id", device_id) 

1270 return device 

1271 

1272 @cached_property 

1273 def signal_data_class(self): 

1274 if self.data_type in SIGNALDATA_TYPES: 

1275 return SIGNALDATA_TYPES[self.data_type] 

1276 if self.data_type.startswith("enum"): 

1277 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")] 

1278 raise ValueError(f"Unhandled python type: {self.data_type}") 

1279 

1280 @cached_property 

1281 def python_type(self): 

1282 if self.data_type in TYPES: 

1283 return TYPES[self.data_type] 

1284 if self.data_type.startswith("enum"): 

1285 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")")) 

1286 return Literal[*choices] 

1287 raise ValueError(f"Unhandled python type: {self.data_type}") 

1288 

1289 @computed_field 

1290 @property 

1291 def status(self) -> SignalStatus: 

1292 now = time.time() 

1293 status = "up" 

1294 reason = "" 

1295 

1296 # See line 285 for explanation 

1297 bucket = get_signal_collection(f"system.buckets.{self.signal_id}") 

1298 last_bucket = None 

1299 if bucket is not None: 

1300 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1}) 

1301 if last_bucket is None: 

1302 status = "no data" 

1303 reason = "signal does not exist" 

1304 return SignalStatus(status=status, reason=reason, delay=None) 

1305 

1306 try: 

1307 last_date = last_bucket["control"]["max"]["timestamp"] 

1308 last_date = last_date.replace(tzinfo=pytz.UTC) 

1309 last_value_ts = last_date.timestamp() 

1310 except IndexError: 

1311 last_value_ts = None 

1312 

1313 if last_value_ts is None: 

1314 delay = None 

1315 reason = "No data from signal" 

1316 else: 

1317 # Since device is a computed property, only compute it once 

1318 device = self.device 

1319 if device is not None and device.last_ping is not None: 

1320 last_value_ts = max(last_value_ts, device.last_ping) 

1321 delay = now - last_value_ts 

1322 if delay > DEVICE_TIMEOUT: 

1323 status = "down" 

1324 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) " 

1325 return SignalStatus(status=status, reason=reason, delay=delay) 

1326 

1327 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict: 

1328 command = Command( 

1329 sent_at=time.time(), 

1330 command_type="Signal command", 

1331 user_id=current_user.id, 

1332 ) 

1333 

1334 has_input_error = False 

1335 error_message = "" 

1336 

1337 if self.data_type.startswith("enum"): 

1338 enum_options = get_args(self.python_type) 

1339 

1340 if update_dict.value is not None and update_dict.value not in enum_options: 

1341 has_input_error = True 

1342 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n" 

1343 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options: 

1344 has_input_error = True 

1345 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n" 

1346 else: 

1347 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type): 

1348 has_input_error = True 

1349 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n" 

1350 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type): 

1351 has_input_error = True 

1352 error_message += ( 

1353 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n" 

1354 ) 

1355 

1356 if has_input_error: 

1357 command.response_time = 0 

1358 command.succeeded = False 

1359 command.description = f"Tried to modify signal {self.signal_id}" 

1360 response = {"error": True, "status_code": 400, "message": error_message} 

1361 else: 

1362 response = await send_signal_value(self.signal_id, update_dict) 

1363 command.receive_response(response) 

1364 

1365 Command.create(command) 

1366 return response 

1367 

1368 @classmethod 

1369 def get_from_signal_id(cls, signal_id) -> Self: 

1370 """Could be generic from mongo""" 

1371 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id}) 

1372 if not raw_value: 

1373 return None 

1374 del raw_value["_id"] 

1375 return cls.dict_to_object(raw_value) 

1376 

1377 @classmethod 

1378 def get_all_ids(cls) -> list[str]: 

1379 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]) 

1380 

1381 return [signal["signal_id"] for signal in cursor] 

1382 

1383 async def number_samples(self): 

1384 collection = get_signal_collection(signal_id=self.signal_id) 

1385 if collection is None: 

1386 return 0 

1387 

1388 number_samples = collection.estimated_document_count() 

1389 

1390 number_samples_async_collection = await get_async_collection( 

1391 systems_async_database, "number_samples", create=True, time_series=True 

1392 ) 

1393 

1394 loop = asyncio.get_running_loop() 

1395 loop.create_task( 

1396 number_samples_async_collection.insert_one( 

1397 { 

1398 "timestamp": datetime.datetime.now(pytz.UTC), 

1399 "signal_id": self.signal_id, 

1400 "number_samples": number_samples, 

1401 } 

1402 ) 

1403 ) 

1404 

1405 return number_samples 

1406 

1407 def sample_datasize(self): 

1408 return signals_database.command("collstats", self.signal_id)["size"] 

1409 

1410 @classmethod 

1411 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]: 

1412 result = cls.collection().aggregate( 

1413 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}] 

1414 ) 

1415 

1416 return {signal["signal_id"]: signal["forcible"] for signal in result} 

1417 

1418 

1419class ServicesStatus(TwinPadModel): 

1420 backend: str 

1421 cloud_broker: str 

1422 time_series_database: str 

1423 signal_storage: str 

1424 heartbeat_storage: str 

1425 data_analyzer: str 

1426 

1427 @classmethod 

1428 def check(cls) -> Self: 

1429 return cls( 

1430 cloud_broker=ping(RABBITMQ_HOST), 

1431 backend="up", 

1432 time_series_database=ping(MONGO_HOST), 

1433 signal_storage=ping(SIGNAL_STORAGE_HOST), 

1434 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST), 

1435 data_analyzer=ping(DATA_ANALYZER_HOST), 

1436 ) 

1437 

1438 

1439def ping(host): 

1440 try: 

1441 if ping3.ping(host, timeout=0.8): 

1442 return "up" 

1443 except PermissionError: 

1444 pass 

1445 return "down" 

1446 

1447 

1448class Event(GenericMongo): 

1449 collection_name: ClassVar[str] = "events" 

1450 

1451 name: str 

1452 timestamp: float 

1453 event_rule_id: str 

1454 

1455 @computed_field 

1456 @cached_property 

1457 def event_rule(self) -> "EventRule": 

1458 return EventRule.get_from_id(self.event_rule_id) 

1459 

1460 @classmethod 

1461 def dict_to_object(cls, dict_): 

1462 """Refine to convert timestamp to datetime for mongodb.""" 

1463 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"]) 

1464 return super().dict_to_object(dict_) 

1465 

1466 

1467class TwinPadActivity(GenericMongo): 

1468 timestamp: float 

1469 amount: int 

1470 

1471 @classmethod 

1472 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]: 

1473 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1474 number_events_collection = get_collection(systems_database, "number_events") 

1475 events_collection = get_collection(systems_database, "events", create=True, time_series=True) 

1476 items = [] 

1477 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1478 if number_events_collection is None or recompute_amount: 

1479 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True) 

1480 number_events_collection.delete_many({}) 

1481 first_event = events_collection.find_one(sort={"timestamp": 1}) 

1482 if first_event is None: 

1483 return items 

1484 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace( 

1485 tzinfo=pytz.UTC 

1486 ) 

1487 while last_computed_day < TODAY: 

1488 day_nb_events = events_collection.count_documents( 

1489 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1490 ) 

1491 if day_nb_events > 0: 

1492 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events}) 

1493 last_computed_day += ONE_DAY_OFFSET 

1494 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1495 if number_events_today > 0: 

1496 number_events_collection.delete_many({"timestamp": TODAY}) 

1497 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today}) 

1498 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1499 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1500 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1501 for day in number_events: 

1502 day["timestamp"] = day["timestamp"].timestamp() 

1503 items.append(cls.mongo_dict_to_object(day)) 

1504 return items 

1505 

1506 @classmethod 

1507 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1508 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1509 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False) 

1510 signals_number_samples_collection = get_collection( 

1511 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True 

1512 ) 

1513 items = [] 

1514 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1515 if number_samples_collection is None or recompute_amount: 

1516 number_samples_collection = get_collection( 

1517 systems_database, "number_received_samples", create=True, time_series=True 

1518 ) 

1519 number_samples_collection.delete_many({}) 

1520 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1}) 

1521 if first_sample is None: 

1522 return items 

1523 # compute from day of first found event 

1524 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace( 

1525 tzinfo=pytz.UTC 

1526 ) 

1527 while last_computed_day < TODAY: 

1528 number_samples_request = signals_number_samples_collection.aggregate( 

1529 [ 

1530 { 

1531 "$match": { 

1532 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET} 

1533 } 

1534 }, 

1535 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1536 ] 

1537 ).to_list() 

1538 if len(number_samples_request) == 0: 

1539 number_samples = 0 

1540 else: 

1541 number_samples = number_samples_request[0].get("number_samples", 0) 

1542 if number_samples > 0: 

1543 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples}) 

1544 last_computed_day += ONE_DAY_OFFSET 

1545 number_samples_request = signals_number_samples_collection.aggregate( 

1546 [ 

1547 {"$match": {"timestamp": {"$gte": TODAY}}}, 

1548 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}}, 

1549 ] 

1550 ).to_list() 

1551 if len(number_samples_request) == 0: 

1552 number_samples_today = 0 

1553 else: 

1554 number_samples_today = number_samples_request[0].get("number_samples", 0) 

1555 if number_samples_today > 0: 

1556 number_samples_collection.delete_many({"timestamp": TODAY}) 

1557 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today}) 

1558 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1559 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1560 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1561 for day in number_events: 

1562 day["timestamp"] = day["timestamp"].timestamp() 

1563 items.append(cls.mongo_dict_to_object(day)) 

1564 return items 

1565 

1566 @classmethod 

1567 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]: 

1568 ONE_DAY_OFFSET = datetime.timedelta(days=1) 

1569 number_commands_collection = get_collection(systems_database, "number_commands") 

1570 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True) 

1571 items = [] 

1572 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC) 

1573 if number_commands_collection is None or recompute_amount: 

1574 number_commands_collection = get_collection( 

1575 systems_database, "number_commands", create=True, time_series=True 

1576 ) 

1577 number_commands_collection.delete_many({}) 

1578 first_command = commands_collection.find_one(sort={"timestamp": 1}) 

1579 if first_command is None: 

1580 return items 

1581 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace( 

1582 tzinfo=pytz.UTC 

1583 ) 

1584 while last_computed_day < TODAY: 

1585 day_nb_commands = commands_collection.count_documents( 

1586 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}} 

1587 ) 

1588 if day_nb_commands > 0: 

1589 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands}) 

1590 last_computed_day += ONE_DAY_OFFSET 

1591 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}}) 

1592 if number_commands_today > 0: 

1593 number_commands_collection.delete_many({"timestamp": TODAY}) 

1594 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today}) 

1595 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC) 

1596 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC) 

1597 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}}) 

1598 for day in number_commands: 

1599 day["timestamp"] = day["timestamp"].timestamp() 

1600 items.append(cls.mongo_dict_to_object(day)) 

1601 return items 

1602 

1603 

1604class EventRule(GenericMongo): 

1605 collection_name: ClassVar[str] = "event_rules" 

1606 

1607 name: str 

1608 formula: str 

1609 variables: list[str] 

1610 

1611 @computed_field 

1612 @cached_property 

1613 def number_events(self) -> int: 

1614 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id}) 

1615 

1616 

1617class Company(GenericMongo): 

1618 collection_name: ClassVar[str] = "companies" 

1619 name: str 

1620 

1621 

1622class Campaign(GenericMongo): 

1623 collection_name: ClassVar[str] = "campaigns" 

1624 

1625 # Properties 

1626 id: str | None = None 

1627 name: str 

1628 description: str | None = None 

1629 

1630 @classmethod 

1631 def create(cls, campaign: Self): 

1632 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"})) 

1633 if new_campaign is None: 

1634 return None 

1635 return {"campaign_id": str(new_campaign.inserted_id)} 

1636 

1637 @classmethod 

1638 def update(cls, campaign: Self): 

1639 updated_campaign = cls.collection().find_one_and_update( 

1640 {"_id": ObjectId(campaign.id)}, 

1641 {"$set": {"name": campaign.name, "description": campaign.description}}, 

1642 return_document=ReturnDocument.AFTER, 

1643 ) 

1644 return updated_campaign 

1645 

1646 @classmethod 

1647 def delete(cls, campaign_id): 

1648 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)}) 

1649 return deleted_user 

1650 

1651 

1652class Phase(GenericMongo): 

1653 collection_name: ClassVar[str] = "phases" 

1654 

1655 # Properties 

1656 id: str | None = None 

1657 name: str 

1658 description: str | None = None 

1659 start_at: float 

1660 end_at: float 

1661 

1662 # FK 

1663 campaign_id: str 

1664 

1665 # @classmethod 

1666 # def get_by_date(cls, datetime: float): 

1667 # phases = [] 

1668 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING): 

1669 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1670 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING): 

1671 # phases.append(cls.dict_to_object(dict_).model_dump()) 

1672 # if phases is None: 

1673 # return None 

1674 # return phases 

1675 

1676 @classmethod 

1677 def create(cls, phase: Self): 

1678 phase = Phase( 

1679 name=phase.name, 

1680 description=phase.description, 

1681 start_at=phase.start_at, 

1682 end_at=phase.end_at, 

1683 campaign_id=phase.campaign_id, 

1684 ) 

1685 phase_collection = get_collection(systems_database, "phases", create=True) 

1686 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"})) 

1687 if new_phase is None: 

1688 return None 

1689 return {"phase_id": str(new_phase.inserted_id)} 

1690 

1691 @classmethod 

1692 def update(cls, phase: Self): 

1693 updated_phase = cls.collection().find_one_and_update( 

1694 {"_id": ObjectId(phase.id)}, 

1695 { 

1696 "$set": { 

1697 "name": phase.name, 

1698 "description": phase.description, 

1699 "start_at": phase.start_at, 

1700 "end_at": phase.end_at, 

1701 } 

1702 }, 

1703 return_document=ReturnDocument.AFTER, 

1704 ) 

1705 return updated_phase 

1706 

1707 @classmethod 

1708 def delete(cls, phase_id): 

1709 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)}) 

1710 return delete_phase 

1711 

1712 @classmethod 

1713 def deleteMany(cls, campaign_id): 

1714 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id}) 

1715 return delete_phases 

1716 

1717 

1718class CustomViewCreation(GenericMongo): 

1719 collection_name: ClassVar[str] = "custom_views" 

1720 

1721 name: str 

1722 configuration: list 

1723 

1724 

1725class CustomView(CustomViewCreation): 

1726 # Properties 

1727 id: str | None = None 

1728 

1729 # Foreign Key 

1730 user_id: str 

1731 

1732 # # Methods 

1733 # @classmethod 

1734 # def create(cls, form_custom_view: Self, user_id) -> list: 

1735 # custom_view = CustomView( 

1736 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id 

1737 # ) 

1738 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"})) 

1739 # return new_custom_view 

1740 

1741 # @classmethod 

1742 # def update(cls, custom_view: Self, custom_view_id: str) -> Self: 

1743 # updated_custom_view = cls.collection().find_one_and_update( 

1744 # {"_id": ObjectId(custom_view_id)}, 

1745 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}}, 

1746 # return_document=ReturnDocument.AFTER, 

1747 # ) 

1748 # updated_custom_view["id"] = str(updated_custom_view["_id"]) 

1749 # del updated_custom_view["_id"] 

1750 # return cls(**updated_custom_view) 

1751 

1752 # @classmethod 

1753 # def delete(cls, custom_view_id) -> bool: 

1754 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)}) 

1755 # return deleted_custom_view.acknowledged 

1756 

1757 

1758CustomViewUpdate = create_update_model(CustomView) 

1759 

1760 

1761class Video(GenericMongo): 

1762 collection_name: ClassVar[str] = "videos" 

1763 

1764 # Properties 

1765 name: str 

1766 ip_addr: str 

1767 username: str | None = None 

1768 password: str | None = None 

1769 

1770 # Methods 

1771 @classmethod 

1772 def get_all(cls, sort_by="_id") -> list[Self]: 

1773 items = [] 

1774 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING): 

1775 items.append(cls.mongo_dict_to_object(dict_)) 

1776 return items 

1777 

1778 @classmethod 

1779 def get_video(cls, camera_id: ObjectId): 

1780 camera = cls.get_from_id(camera_id) 

1781 if camera is not None: 

1782 return camera.name 

1783 return None 

1784 

1785 

1786class Command(GenericMongo): 

1787 collection_name: ClassVar[str] = "commands" 

1788 

1789 # Properties 

1790 timestamp: datetime.datetime = None 

1791 sent_at: float 

1792 response_time: float = 0.0 

1793 command_type: str 

1794 description: str = "" 

1795 succeeded: bool = False 

1796 

1797 # Foreign key 

1798 user_id: str 

1799 

1800 @classmethod 

1801 def collection(cls): 

1802 return get_collection(systems_database, cls.collection_name, create=True, time_series=True) 

1803 

1804 @classmethod 

1805 def create(cls, command: Self): 

1806 command = cls( 

1807 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC), 

1808 sent_at=command.sent_at, 

1809 response_time=command.response_time, 

1810 command_type=command.command_type, 

1811 description=command.description, 

1812 succeeded=command.succeeded, 

1813 user_id=command.user_id, 

1814 ) 

1815 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"})) 

1816 if new_command is None: 

1817 return None 

1818 return {"command_id": str(new_command.inserted_id)} 

1819 

1820 def receive_response(self, response: dict): 

1821 self.response_time = time.time() - self.sent_at 

1822 self.succeeded = response.get("error", True) is False 

1823 if self.description == "": 

1824 self.description += response.get("message", "").rstrip() 

1825 

1826 

1827class SignalsPresetCreation(GenericMongo): 

1828 name: str 

1829 signal_ids: list[str] 

1830 

1831 

1832class SignalsPreset(SignalsPresetCreation): 

1833 collection_name: ClassVar[str] = "signals_presets" 

1834 

1835 user_id: str 

1836 

1837 @classmethod 

1838 def create(cls, signals_preset: SignalsPresetCreation, user_id: str): 

1839 signals_preset = cls( 

1840 user_id=user_id, 

1841 name=signals_preset.name, 

1842 signal_ids=signals_preset.signal_ids, 

1843 ) 

1844 

1845 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"})) 

1846 

1847 return str(new_signal_preset.inserted_id) 

1848 

1849 

1850SignalsPresetUpdate = create_update_model(SignalsPreset) 

1851 

1852 

1853class LineStyle(str, Enum): 

1854 solid = "solid" 

1855 dotted = "dotted" 

1856 dashed = "dashed" 

1857 

1858 

1859class SignalAppearance: 

1860 value_color: str 

1861 forced_value_color: str 

1862 

1863 

1864class GraphThemeCreation(GenericMongo): 

1865 collection_name: ClassVar[str] = "graph_themes" 

1866 

1867 name: str 

1868 signal_id: str 

1869 value_color: str = "" 

1870 forced_value_color: str = "" 

1871 value_line_style: LineStyle = LineStyle.solid 

1872 forced_value_line_style: LineStyle = LineStyle.solid 

1873 private: bool = True 

1874 

1875 

1876class PublicGraphTheme(GraphThemeCreation): 

1877 created_by_user: bool 

1878 in_user_library: bool 

1879 active_for_user: bool 

1880 

1881 _current_user_id: str = "" 

1882 

1883 @classproperty 

1884 def custom_pipeline_steps(cls) -> dict[str, list]: 

1885 return { 

1886 "created_by_user": [ 

1887 { 

1888 "$addFields": { 

1889 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]}, 

1890 } 

1891 } 

1892 ], 

1893 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible 

1894 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}} 

1895 ], 

1896 "in_user_library": [ 

1897 { 

1898 "$addFields": { 

1899 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]}, 

1900 } 

1901 } 

1902 ], 

1903 "active_for_user": [ 

1904 { 

1905 "$addFields": { 

1906 "active_for_user": {"$in": [cls._current_user_id, "$active"]}, 

1907 } 

1908 } 

1909 ], 

1910 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}], 

1911 "active": [ 

1912 { 

1913 "$addFields": { 

1914 "active": "$$REMOVE", 

1915 } 

1916 } 

1917 ], 

1918 "creator_id": [ 

1919 { 

1920 "$addFields": { 

1921 "creator_id": "$$REMOVE", 

1922 } 

1923 } 

1924 ], 

1925 } 

1926 

1927 @classmethod 

1928 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]: 

1929 cls._current_user_id = user_id 

1930 return super().response_from_query(query) 

1931 

1932 @classmethod 

1933 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]: 

1934 query.in_user_library = "true" 

1935 return cls.response_from_query(query, user_id) 

1936 

1937 @classmethod 

1938 def get_from_id(cls, item_id, user_id: str) -> Self | None: 

1939 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id) 

1940 

1941 @classmethod 

1942 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

1943 cls._current_user_id = user_id 

1944 return super().get_by_attribute(attribute_name, attribute_value) 

1945 

1946 @classmethod 

1947 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str): 

1948 cls._current_user_id = user_id 

1949 return super().get_one_by_attribute(attribute_name, attribute_value) 

1950 

1951 @classmethod 

1952 def get_all(cls, sort_by: str, user_id: str): 

1953 cls._current_user_id = user_id 

1954 return super().get_all(sort_by) 

1955 

1956 @classmethod 

1957 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict: 

1958 pipeline = [ 

1959 { 

1960 "$match": { 

1961 "active": {"$eq": user_id}, 

1962 "signal_id": {"$in": signal_ids}, 

1963 } 

1964 }, 

1965 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}}, 

1966 {"$replaceRoot": {"newRoot": "$firstDocument"}}, 

1967 { 

1968 "$project": { 

1969 "_id": 0, 

1970 "signal_id": 1, 

1971 "value_color": 1, 

1972 "forced_value_color": 1, 

1973 "value_line_style": 1, 

1974 "forced_value_line_style": 1, 

1975 } 

1976 }, 

1977 ] 

1978 

1979 result = {} 

1980 

1981 cursor = cls.collection().aggregate(pipeline) 

1982 for document in cursor: 

1983 signal_id = document["signal_id"] 

1984 del document["signal_id"] 

1985 result[signal_id] = document 

1986 

1987 return result 

1988 

1989 

1990GraphThemeUpdate = create_update_model(PublicGraphTheme) 

1991 

1992 

1993class PrivateGraphTheme(GraphThemeCreation): 

1994 # private 

1995 creator_id: str 

1996 in_library: list[str] 

1997 active: list[str] 

1998 

1999 @classmethod 

2000 def create( 

2001 cls, 

2002 creator_id: str, 

2003 name: str, 

2004 signal_id: str, 

2005 value_color: str, 

2006 forced_value_color: str, 

2007 value_line_style: LineStyle, 

2008 forced_value_line_style: LineStyle, 

2009 private: bool, 

2010 ): 

2011 color_setting = cls( 

2012 creator_id=creator_id, 

2013 name=name, 

2014 signal_id=signal_id, 

2015 value_color=value_color, 

2016 forced_value_color=forced_value_color, 

2017 value_line_style=value_line_style, 

2018 forced_value_line_style=forced_value_line_style, 

2019 private=private, 

2020 in_library=[creator_id], 

2021 active=[creator_id], 

2022 ) 

2023 

2024 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True})) 

2025 color_setting.id = str(new_color_setting.inserted_id) 

2026 return color_setting 

2027 

2028 def update(self, update_dict: dict, user_id: str): 

2029 if (in_user_lib := update_dict.get("in_user_library")) is not None: 

2030 if in_user_lib and user_id not in self.in_library: 

2031 self.in_library.append(user_id) 

2032 elif not in_user_lib and user_id in self.in_library: 

2033 self.in_library.remove(user_id) 

2034 update_dict["in_library"] = self.in_library 

2035 del update_dict["in_user_library"] 

2036 

2037 if (active_for_user := update_dict.get("active_for_user")) is not None: 

2038 if active_for_user and user_id not in self.active: 

2039 self.active.append(user_id) 

2040 elif not active_for_user and user_id in self.active: 

2041 self.active.remove(user_id) 

2042 update_dict["active"] = self.active 

2043 del update_dict["active_for_user"] 

2044 

2045 if update_dict.get("created_by_user") is not None: 

2046 del update_dict["created_by_user"] 

2047 

2048 self.collection().find_one_and_update( 

2049 {"_id": ObjectId(self.id)}, 

2050 {"$set": update_dict}, 

2051 ) 

2052 

2053 return {}