Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%
1226 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-27 13:40 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-27 13:40 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, get_args
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pymongo.collation import Collation
21from pydantic import BaseModel, computed_field, Field, create_model
22import numpy as npy
23import lttb
24import h5py
26# from scipy import signal as signal_scipy
28from twinpad_backend.db import (
29 get_collection,
30 get_async_collection,
31 get_signal_collection,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 devices_states_database,
36)
37from twinpad_backend.responses import ListResponse
38from twinpad_backend.messages import send_mode_change, send_signal_value
40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
43RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
44MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
45SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
46HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
47DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
49DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
50NUMBER_SAMPLES_DATABASE_UPDATE = 120
52logger = logging.getLogger("uvicorn.error")
55class classproperty:
56 """
57 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
58 Found here: https://stackoverflow.com/a/76301341
59 """
61 def __init__(self, func):
62 self.fget = func
64 def __get__(self, _, owner):
65 return self.fget(owner)
68def create_update_model(model):
69 fields = {}
71 for field_name, field_annotation in model.model_fields.items():
72 if field_name != "id":
73 fields[field_name] = (field_annotation.annotation | None, None)
75 query_name = model.__name__ + "Update"
76 return create_model(query_name, **fields)
79def get_utc_date_from_timestamp(timestamp: float):
80 return datetime.datetime.fromtimestamp(timestamp).isoformat()
83def downsample_list(time_vector: list, values: list, max_number_samples: int):
84 if len(time_vector) < max_number_samples:
85 return time_vector, values
87 time_vector_copy = copy.deepcopy(time_vector)
88 values_copy = copy.deepcopy(values)
90 none_group_bounds = []
91 none_group_index = -1
92 index = -1
93 # LTTB doesn't handle None values so remove them
94 while values_copy.count(None) > 0:
95 # Store bounds of None value groups so we can insert them back after the downsampling
96 if (new_index := values_copy.index(None)) != index:
97 none_group_bounds.append([time_vector_copy.pop(new_index)])
98 none_group_index += 1
99 elif len(none_group_bounds[none_group_index]) < 2:
100 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
101 else:
102 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
103 values_copy.pop(new_index)
104 index = new_index
105 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
107 try:
108 values_array = npy.array([time_vector_copy, values_copy]).T
109 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
111 new_time_vector = interpolated_values[:, 0].tolist()
112 new_values = interpolated_values[:, 1].tolist()
113 except ValueError:
114 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
115 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
116 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
117 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
118 return new_time_vector, new_values_nan_to_none
120 # insert back None values at the correct timestamps
121 for none_group in none_group_bounds:
122 start_index = npy.searchsorted(new_time_vector, none_group[0])
123 new_time_vector[start_index:start_index] = none_group
124 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
126 return new_time_vector, new_values
129def is_of_type(value, wanted_type):
130 if wanted_type is float:
131 return isinstance(value, (int, float))
132 return isinstance(value, wanted_type)
135# Models
136class TwinPadModel(BaseModel):
137 @classmethod
138 def dict_to_object(cls, dict_):
139 return cls.model_validate(dict_)
141 def to_dict(self, exclude=None):
142 dict_ = self.model_dump(exclude=exclude)
143 return dict_
146class GenericMongo(TwinPadModel):
147 id: str | None = None
148 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
150 @classmethod
151 def collection(cls):
152 return get_collection(systems_database, cls.collection_name, create=True)
154 @classmethod
155 def response_from_query(cls, query) -> ListResponse[Self]:
156 request_filters = query.mongodb_filter()
157 items = []
158 if ":" in query.sort_by:
159 sort_field, sort_order = query.sort_by.split(":")
160 sort_order = int(sort_order)
161 else:
162 sort_field = query.sort_by
163 sort_order = 1
164 collection = get_collection(systems_database, cls.collection_name, create=True)
165 total = collection.count_documents(request_filters)
167 pipeline = []
168 added_properties = []
169 if "$and" in request_filters:
170 for request_filter in request_filters["$and"]:
171 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
172 if filtered_property in request_filter:
173 pipeline.extend(pipeline_steps)
174 added_properties.append(filtered_property)
175 else:
176 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
177 if filtered_property in request_filters:
178 pipeline.extend(pipeline_steps)
179 added_properties.append(filtered_property)
180 pipeline.append({"$match": request_filters})
181 if sort_field in cls.custom_pipeline_steps:
182 pipeline.extend(cls.custom_pipeline_steps[sort_field])
183 added_properties.append(sort_field)
184 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}])
186 if (query.limit is not None) and (query.limit != 0):
187 pipeline.append({"$limit": query.limit})
189 for filtered_property, step in cls.custom_pipeline_steps.items():
190 if filtered_property not in added_properties:
191 pipeline.extend(step)
193 cursor = collection.aggregate(pipeline)
195 for item_dict in cursor:
196 items.append(cls.mongo_dict_to_object(item_dict))
198 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
200 @classmethod
201 def get_from_id(cls, item_id) -> Self | None:
202 return cls.get_one_by_attribute("_id", ObjectId(item_id))
204 @classmethod
205 def mongo_dict_to_object(cls, mongo_dict):
206 mongo_dict["id"] = str(mongo_dict["_id"])
207 del mongo_dict["_id"]
208 return cls.dict_to_object(mongo_dict)
210 @classmethod
211 def get_by_attribute(cls, attribute_name: str, attribute_value):
212 """Returns all items that match the attribute with value."""
213 pipeline = []
214 if attribute_name in cls.custom_pipeline_steps:
215 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
216 pipeline.append({"$match": {attribute_name: attribute_value}})
217 for key, step in cls.custom_pipeline_steps.items():
218 if key != attribute_name:
219 pipeline.extend(step)
220 items = cls.collection().aggregate(pipeline)
221 if items is None:
222 return None
223 return [cls.mongo_dict_to_object(d) for d in items]
225 @classmethod
226 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
227 pipeline = []
228 if attribute_name in cls.custom_pipeline_steps:
229 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
230 pipeline.append({"$match": {attribute_name: attribute_value}})
231 pipeline.append({"$limit": 1})
232 for key, step in cls.custom_pipeline_steps.items():
233 if key != attribute_name:
234 pipeline.extend(step)
235 items = cls.collection().aggregate(pipeline).to_list()
236 if len(items) == 0:
237 return None
238 return cls.mongo_dict_to_object(items[0])
240 @classmethod
241 def get_all(cls, sort_by="_id") -> list[Self]:
242 items = []
243 pipeline = []
244 if sort_by in cls.custom_pipeline_steps:
245 pipeline.extend(cls.custom_pipeline_steps[sort_by])
246 pipeline.append({"$sort": {sort_by: ASCENDING}})
247 for key, step in cls.custom_pipeline_steps.items():
248 if key != sort_by:
249 pipeline.extend(step)
250 for dict_ in cls.collection().aggregate(pipeline):
251 items.append(cls.mongo_dict_to_object(dict_))
252 return items
254 @classmethod
255 def get_number_documents(cls):
256 collection = get_collection(systems_database, cls.collection_name)
257 if collection is None:
258 return 0
259 return collection.count_documents({})
261 def insert(self):
262 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
263 self.id = str(insert_result.inserted_id)
264 return self.id
266 def update(self, update_dict):
267 for key, value in update_dict.items():
268 setattr(self, key, value)
269 self.collection().find_one_and_update(
270 {"_id": ObjectId(self.id)},
271 {"$set": update_dict},
272 return_document=ReturnDocument.AFTER,
273 )
275 return self
277 def delete(self):
278 result = self.collection().delete_one({"_id": ObjectId(self.id)})
279 return result.deleted_count > 0
282class User(GenericMongo):
283 collection_name: ClassVar[str] = "users"
285 firstname: str
286 lastname: str
287 email: str
288 password: str
289 is_active: bool | None = False
290 is_admin: bool | None = False
291 is_connected: bool | None = False
292 company_id: str | None = None
294 def to_dict(self, exclude=None):
295 if exclude is None:
296 exclude = {"password"}
297 return GenericMongo.to_dict(self, exclude=exclude)
299 @classmethod
300 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
301 users = cls.get_all()
302 if not users:
303 is_admin = True
304 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
305 user_collection = get_collection(systems_database, "users", create=True)
306 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
307 if new_user is None:
308 return None
309 return {"user_id": str(new_user.inserted_id)}
311 @classmethod
312 def update_info(cls, user: "UserUpdate", user_id: str):
313 updated_user = cls.collection().find_one_and_update(
314 {"_id": ObjectId(user_id)},
315 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
316 return_document=ReturnDocument.AFTER,
317 )
318 updated_user["id"] = str(updated_user["_id"])
319 del (updated_user["_id"], updated_user["is_connected"])
320 return cls(**updated_user)
323UserUpdate = create_update_model(User)
326class Mode(TwinPadModel):
327 mode_id: int
328 name: str
329 frequency_multiplier: float
330 min_frequency: float
333class DeviceUpdate(TwinPadModel):
334 mode_id: int
337class Device(GenericMongo):
338 collection_name: ClassVar[str] = "devices"
340 device_id: str
341 config_id: str | None = None
342 config_name: str | None = None
343 name: str
344 description: str = ""
345 modes: list[Mode]
346 current_mode_id: int | None = None
347 last_ping: float | None = None
348 petri_network: Any
349 pid: Any
350 load: float | None = None
351 tokens: list[int] = Field(default_factory=list)
352 status: str
354 async def change_mode(self, update_dict, current_user: User):
355 has_error = False
357 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
358 has_error = True
359 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
360 elif self.current_mode_id is not None:
361 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
362 else:
363 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
364 command = Command(
365 sent_at=time.time(),
366 command_type="Mode change",
367 description=description,
368 user_id=current_user.id,
369 )
371 if has_error:
372 command.response_time = 0
373 command.succeeded = False
374 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
375 else:
376 response = await send_mode_change(self.device_id, update_dict.mode_id)
377 command.receive_response(response)
379 Command.create(command)
380 return response
382 @classmethod
383 def get_from_device_or_config_id(cls, device_or_config_id: str):
384 items = (
385 cls.collection()
386 .aggregate(
387 [
388 {"$match": {"$or": [{"device_id": device_or_config_id}, {"config_id": device_or_config_id}]}},
389 {"$limit": 1},
390 ]
391 )
392 .to_list()
393 )
394 if len(items) == 0:
395 return None
396 return cls.mongo_dict_to_object(items[0])
398 @classmethod
399 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
400 devices_by_id = {}
401 for signal_id in signal_ids:
402 device_or_config_id = signal_id.split(".")[0]
403 if device_or_config_id not in devices_by_id:
404 devices_by_id[device_or_config_id] = cls.get_from_device_or_config_id(device_or_config_id)
405 return devices_by_id
408class DeviceSetup(GenericMongo):
409 collection_name: ClassVar[str] = "device_setups"
411 device_ids: list[str]
412 active: bool = False
413 variable_mapping: dict[str, str]
416DeviceSetupUpdate = create_update_model(DeviceSetup)
419class DeviceState(GenericMongo):
420 collection_name: ClassVar[str] = "devices_states"
422 timestamp: float
423 mode: str | None = None
424 load: float | None = None
425 tokens: list[int] = Field(default_factory=list)
426 config_id: str | None = None
427 modified_properties: list[str] = Field(default_factory=list)
429 @classmethod
430 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
431 req_filter = query.mongodb_filter()
432 items = []
433 if ":" in query.sort_by:
434 sort_field, sort_order = query.sort_by.split(":")
435 sort_order = int(sort_order)
436 else:
437 sort_field = query.sort_by
438 sort_order = 1
439 collection = get_collection(devices_states_database, device_id)
440 if collection is None:
441 total = 0
442 cursor = []
443 else:
444 total = collection.count_documents(req_filter)
445 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
446 if (query.limit is not None) and (query.limit != 0):
447 cursor = cursor.limit(query.limit)
448 for item_dict in cursor:
449 items.append(
450 cls(
451 timestamp=item_dict.get("precise_timestamp"),
452 mode=item_dict.get("mode"),
453 load=item_dict.get("load"),
454 tokens=item_dict.get("tokens", Field(default_factory=list)),
455 config_id=item_dict.get("config_id"),
456 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
457 )
458 )
459 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
462class SignalSample(TwinPadModel):
463 signal_id: str
464 timestamp: float
465 value: float | int | str | bool | None
466 forced_value: float | int | str | bool | None = None
468 @classmethod
469 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
470 collection = get_signal_collection(signal_id)
471 real_signal_id = signal_id
473 if collection is None:
474 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
475 if device is not None:
476 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
477 collection = get_signal_collection(real_signal_id)
479 if collection is None:
480 return None
482 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
483 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
484 bucket = get_signal_collection(f"system.buckets.{real_signal_id}")
485 first_bucket = None
486 if bucket is not None:
487 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
488 if first_bucket is not None:
489 sample_data = collection.find_one(
490 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
491 )
492 else:
493 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
495 if sample_data is None:
496 return None
498 timestamp = sample_data["precise_timestamp"]
500 return cls(
501 signal_id=real_signal_id,
502 timestamp=timestamp,
503 value=sample_data.get("value", None),
504 forced_value=sample_data.get("forced_value", None),
505 )
507 @classmethod
508 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
509 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
511 @classmethod
512 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
513 collection = get_signal_collection(signal_id)
514 real_signal_id = signal_id
516 if collection is None:
517 if device is None:
518 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
519 if device is not None:
520 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
521 collection = get_signal_collection(real_signal_id)
523 if collection is None:
524 return None
526 # Same workaround as above function, very effective to narrow down big sets of data
527 bucket = get_signal_collection(f"system.buckets.{signal_id}")
528 last_bucket = None
529 if bucket is not None:
530 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
531 if last_bucket is not None:
532 sample_data = collection.find_one(
533 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
534 sort={"precise_timestamp": -1},
535 )
536 else:
537 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
539 if sample_data is None:
540 return None
542 timestamp = sample_data["precise_timestamp"]
544 if device is None:
545 device = Device.get_from_device_or_config_id(real_signal_id.split(".")[0])
546 if device is not None and device.last_ping is not None:
547 if timestamp is None:
548 timestamp = device.last_ping
549 else:
550 timestamp = max(timestamp, device.last_ping)
551 return cls(
552 signal_id=real_signal_id,
553 timestamp=timestamp,
554 value=sample_data.get("value", None),
555 forced_value=sample_data.get("forced_value", None),
556 )
558 @classmethod
559 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None:
560 collection = get_signal_collection(signal_id)
561 real_signal_id = signal_id
563 if collection is None:
564 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
565 if device is not None:
566 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
567 collection = get_signal_collection(real_signal_id)
569 if collection is None:
570 return None
572 sample_data = collection.find_one(
573 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1}
574 )
575 if sample_data is None:
576 return None
578 return cls(
579 signal_id=real_signal_id,
580 timestamp=sample_data.get("precise_timestamp"),
581 value=sample_data.get("value"),
582 forced_value=sample_data.get("forced_value"),
583 )
585 @classmethod
586 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
587 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
588 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
590 @classmethod
591 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None:
592 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids]
595class SignalData(TwinPadModel):
596 signal_id: str
597 forcible: bool = True
598 time_vector: list[float]
599 values: list[float | int | str | None]
600 forced_values: list[float | int | str | None]
602 data_start: float | None = None
603 data_end: float | None = None
605 number_samples: int = 0
606 number_samples_db: int = 0
608 db_query_time: float = 0.0
609 init_time: float = 0.0
610 data_processing_time: float = 0.0
612 @classmethod
613 def get_from_signal_id(
614 cls,
615 signal_id: str,
616 min_timestamp: float = None,
617 max_timestamp: float = None,
618 window_min_timestamp: float = None,
619 window_max_timestamp: float = None,
620 interpolate_bounds: bool = True,
621 max_documents: int = None,
622 ) -> Self:
624 now = time.time()
626 req_signal = {}
627 if min_timestamp is not None:
628 req_signal.setdefault("timestamp", {})
629 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
630 if max_timestamp is not None:
631 req_signal.setdefault("timestamp", {})
632 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
634 collection = get_signal_collection(signal_id)
636 real_signal_id = signal_id
638 if collection is None:
639 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
640 if device is not None:
641 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
642 collection = get_signal_collection(real_signal_id)
644 if collection is None:
645 return cls(
646 signal_id=real_signal_id,
647 time_vector=[],
648 values=[],
649 forced_values=[],
650 number_samples=0,
651 number_samples_db=0,
652 )
654 db_req_start = time.time()
656 sort_step = {"$sort": {"precise_timestamp": 1}}
657 number_results = collection.count_documents(req_signal)
659 pipeline = []
660 if req_signal:
661 pipeline.append({"$match": req_signal}) # Filter data if needed
663 pipeline.extend(
664 [
665 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
666 sort_step,
667 ]
668 )
670 if max_documents is not None and max_documents < number_results:
671 unsampling_ratio = math.ceil(number_results / max_documents)
672 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
673 pipeline.extend(
674 [
675 {
676 "$setWindowFields": {
677 "sortBy": {"precise_timestamp": 1},
678 "output": {"index": {"$documentNumber": {}}},
679 }
680 },
681 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
682 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
683 {"$replaceRoot": {"newRoot": "$doc"}},
684 {"$unset": ["index", "group_id"]},
685 {"$sort": {"precise_timestamp": 1}},
686 ]
687 )
689 # logger.info(f"pipeline: %s", str(pipeline))
690 cursor = collection.aggregate(pipeline)
691 db_req_time = time.time() - db_req_start
693 init_time = time.time()
695 results = cursor.to_list()
696 time_vector = []
697 values = []
698 forced_values = []
699 for s in results:
700 time_vector.append(s["precise_timestamp"])
701 values.append(s.get("value", None))
702 forced_values.append(s.get("forced_value", None))
704 signal = Signal.get_from_signal_id(real_signal_id)
705 class_ = signal.signal_data_class
707 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
708 time_vector, values, forced_values = cls.interpolate_bounds(
709 class_,
710 collection,
711 real_signal_id,
712 time_vector,
713 values,
714 forced_values,
715 window_min_timestamp,
716 window_max_timestamp,
717 )
719 if values:
720 # TODO: check below. a bit strange
721 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
722 # Adding last value as it should be repeated
723 time_vector.append(now)
724 values.append(values[-1])
725 forced_values.append(forced_values[-1])
727 init_time = time.time() - init_time
729 # See line 292 for explanation
730 bucket = get_signal_collection(f"system.buckets.{real_signal_id}")
731 first_bucket = None
732 if bucket is not None:
733 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
734 if first_bucket is not None:
735 data_start = first_bucket["control"]["min"]["precise_timestamp"]
736 else:
737 data_start = None
739 last_bucket = None
740 if bucket is not None:
741 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
742 if last_bucket is not None:
743 data_end = last_bucket["control"]["max"]["precise_timestamp"]
744 else:
745 data_end = None
747 return class_(
748 signal_id=real_signal_id,
749 forcible=signal.forcible,
750 time_vector=time_vector,
751 values=values,
752 forced_values=forced_values,
753 data_start=data_start,
754 data_end=data_end,
755 number_samples=len(values),
756 number_samples_db=number_results,
757 db_query_time=db_req_time,
758 init_time=init_time,
759 )
761 @staticmethod
762 def interpolate_bounds(
763 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
764 ):
765 sample_right = None
766 # Fetching right side value & interpolation
767 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
768 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
769 sample_right = collection.find_one(
770 {
771 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
772 "value": {"$exists": True},
773 },
774 sort={"precise_timestamp": -1},
775 )
776 if sample_right:
777 if time_vector:
778 right_sd = class_(
779 signal_id=signal_id,
780 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
781 values=[values[-1], sample_right.get("value", None)],
782 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
783 )
784 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
785 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
786 else:
787 max_ts_value = sample_right.get("value", None)
788 max_ts_forced_value = sample_right.get("forced_value", None)
789 time_vector.append(window_max_timestamp)
790 values.append(max_ts_value)
791 forced_values.append(max_ts_forced_value)
793 # Fetching left side value & interpolation
794 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
795 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
796 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
797 sample_left = sample_right
798 sample_left = collection.find_one(
799 {
800 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
801 "value": {"$exists": True},
802 },
803 sort={"precise_timestamp": -1},
804 )
806 if sample_left:
807 if time_vector:
808 left_sd = class_(
809 signal_id=signal_id,
810 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
811 values=[sample_left["value"], values[0]],
812 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
813 )
814 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
815 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
816 else:
817 min_ts_value = sample_left.get("value", None)
818 min_ts_forced_value = sample_left.get("forced_value", None)
819 time_vector.insert(0, window_min_timestamp)
820 values.insert(0, min_ts_value)
821 forced_values.insert(0, min_ts_forced_value)
823 return time_vector, values, forced_values
825 def interpolate_values(self, new_time_vector: list[float]):
826 return self.interpolate(new_time_vector, self.values)
828 def interpolate_forced_values(self, new_time_vector: list[float]):
829 return self.interpolate(new_time_vector, self.forced_values)
831 def uniform_desampling(self, number_samples_max: int) -> Self:
832 data_processing_time = time.time()
833 if number_samples_max and self.number_samples > number_samples_max:
834 new_time_vector = npy.linspace(
835 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
836 ).tolist()
837 values = self.interpolate_values(new_time_vector)
838 forced_values = self.interpolate_forced_values(new_time_vector)
839 time_vector = new_time_vector
840 number_samples = len(time_vector)
841 else:
842 time_vector = self.time_vector
843 number_samples = len(self.values)
844 values = self.values[:]
845 forced_values = self.forced_values[:]
846 data_processing_time = time.time() - data_processing_time
848 return self.__class__(
849 signal_id=self.signal_id,
850 time_vector=time_vector,
851 values=values,
852 forced_values=forced_values,
853 number_samples=number_samples,
854 number_samples_db=self.number_samples,
855 data_start=self.data_start,
856 data_end=self.data_end,
857 db_query_time=self.db_query_time,
858 init_time=self.init_time,
859 data_processing_time=self.data_processing_time + data_processing_time,
860 )
862 def interest_window_desampling(
863 self,
864 window_max_number_samples: int,
865 outside_max_number_samples: int,
866 window_min_timestamp: float | None = None,
867 window_max_timestamp: float | None = None,
868 ) -> Self:
869 """Performs a sampling in a window of interest and outside."""
871 if not self.time_vector:
872 return self
874 if window_min_timestamp is None:
875 window_min_timestamp = self.time_vector[0]
876 if window_max_timestamp is None:
877 window_max_timestamp = self.time_vector[-1]
879 data_processing_time = time.time()
881 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
882 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
884 time_vector_before = self.time_vector[:index_window_start]
885 time_vector_window = self.time_vector[index_window_start:index_window_end]
886 time_vector_after = self.time_vector[index_window_end:]
888 # Resampling window
889 if time_vector_window:
890 # Ensurring window bounds
891 if time_vector_window[0] != window_min_timestamp:
892 time_vector_window.insert(0, window_min_timestamp)
893 if time_vector_window[-1] != window_max_timestamp:
894 time_vector_window.append(window_max_timestamp)
895 else:
896 time_vector_window = [window_min_timestamp, window_max_timestamp]
898 if len(time_vector_window) > window_max_number_samples:
899 # Resampling
900 new_window_time_vector = npy.linspace(
901 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
902 ).tolist()
903 time_vector_window = new_window_time_vector
905 # Resampling outside
906 number_samples_before = len(time_vector_before)
907 number_samples_after = len(time_vector_after)
908 if (number_samples_before + number_samples_after) > outside_max_number_samples:
909 new_number_samples_before = min(
910 number_samples_before,
911 math.ceil(
912 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
913 ),
914 )
915 new_number_samples_after = min(
916 number_samples_after,
917 math.ceil(
918 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
919 ),
920 )
921 # Adjusting numbers as math.ceil can do +1 on sum
922 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
923 if new_number_samples_before > new_number_samples_after:
924 new_number_samples_before -= 1
925 else:
926 new_number_samples_after -= 1
928 if new_number_samples_before > 0:
929 new_time_vector_before = npy.linspace(
930 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
931 ).tolist()
932 time_vector_before = new_time_vector_before
934 if new_number_samples_after > 0:
935 new_time_vector_after = npy.linspace(
936 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
937 ).tolist()[::-1]
938 time_vector_after = new_time_vector_after
940 new_time_vector = time_vector_before + time_vector_window + time_vector_after
941 values = self.interpolate_values(new_time_vector)
942 forced_values = self.interpolate_forced_values(new_time_vector)
943 number_samples = len(values)
945 data_processing_time = time.time() - data_processing_time
947 return self.__class__(
948 signal_id=self.signal_id,
949 forcible=self.forcible,
950 time_vector=new_time_vector,
951 values=values,
952 forced_values=forced_values,
953 number_samples=number_samples,
954 number_samples_db=self.number_samples,
955 data_start=self.data_start,
956 data_end=self.data_end,
957 db_query_time=self.db_query_time,
958 init_time=self.init_time,
959 data_processing_time=self.data_processing_time + data_processing_time,
960 )
962 def csv_export(self):
963 output = io.StringIO()
964 writer = csv.writer(output)
965 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
966 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
967 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
968 return output.getvalue().encode("utf-8")
970 def prestoplot_export(self):
971 clean_signal_id = self.signal_id.replace(".", "_")
972 if clean_signal_id[0].isnumeric():
973 clean_signal_id = "_" + clean_signal_id
975 output = io.StringIO()
976 output.write("# Encoding:\tUTF-8\n")
977 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
978 output.write("ISO8601\tnone\tnone\n")
979 output.write(f"# Description :\t{clean_signal_id}\n")
981 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
982 output.write(
983 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
984 )
985 return output.getvalue().encode("utf-8")
988class NumericSignalData(SignalData):
989 data_type: str = "float"
990 values: list[float | int | None]
991 forced_values: list[float | int | None]
993 def interpolate(self, new_time_vector: list[float], items):
994 items = [npy.nan if s is None else s for s in items]
995 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
997 def uniform_desampling(self, number_samples_max: int) -> Self:
998 data_processing_time = time.time()
999 if number_samples_max and self.number_samples > number_samples_max:
1000 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
1001 forced_values = self.interpolate_forced_values(time_vector)
1002 number_samples = len(time_vector)
1003 else:
1004 time_vector = self.time_vector
1005 number_samples = len(self.values)
1006 values = self.values[:]
1007 forced_values = self.forced_values[:]
1008 data_processing_time = time.time() - data_processing_time
1010 return self.__class__(
1011 signal_id=self.signal_id,
1012 time_vector=time_vector,
1013 values=values,
1014 forced_values=forced_values,
1015 number_samples=number_samples,
1016 number_samples_db=self.number_samples,
1017 data_start=self.data_start,
1018 data_end=self.data_end,
1019 db_query_time=self.db_query_time,
1020 init_time=self.init_time,
1021 data_processing_time=self.data_processing_time + data_processing_time,
1022 )
1024 def interest_window_desampling(
1025 self,
1026 window_max_number_samples: int,
1027 outside_max_number_samples: int,
1028 window_min_timestamp: float | None = None,
1029 window_max_timestamp: float | None = None,
1030 ) -> Self:
1031 """Performs a sampling in a window of interest and outside."""
1033 if not self.time_vector:
1034 return self
1036 if window_min_timestamp is None:
1037 window_min_timestamp = self.time_vector[0]
1038 if window_max_timestamp is None:
1039 window_max_timestamp = self.time_vector[-1]
1041 data_processing_time = time.time()
1043 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1044 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1046 time_vector_before = self.time_vector[:index_window_start]
1047 time_vector_window = self.time_vector[index_window_start:index_window_end]
1048 time_vector_after = self.time_vector[index_window_end:]
1050 values_before = self.values[:index_window_start]
1051 values_window = self.values[index_window_start:index_window_end]
1052 values_after = self.values[index_window_end:]
1053 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1054 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1056 # Resampling window
1057 if time_vector_window:
1058 # Ensurring window bounds
1059 if time_vector_window[0] != window_min_timestamp:
1060 time_vector_window.insert(0, window_min_timestamp)
1061 values_window.insert(0, window_min_value)
1062 if time_vector_window[-1] != window_max_timestamp:
1063 time_vector_window.append(window_max_timestamp)
1064 values_window.append(window_max_value)
1065 else:
1066 time_vector_window = [window_min_timestamp, window_max_timestamp]
1067 values_window = [window_min_value, window_max_value]
1069 if len(time_vector_window) > window_max_number_samples:
1070 # Resampling
1071 time_vector_window, values_window = downsample_list(
1072 time_vector_window, values_window, window_max_number_samples
1073 )
1075 # Resampling outside
1076 number_samples_before = len(time_vector_before)
1077 number_samples_after = len(time_vector_after)
1078 if (number_samples_before + number_samples_after) > outside_max_number_samples:
1079 new_number_samples_before = min(
1080 number_samples_before,
1081 math.ceil(
1082 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1083 ),
1084 )
1085 new_number_samples_after = min(
1086 number_samples_after,
1087 math.ceil(
1088 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1089 ),
1090 )
1091 # Adjusting numbers as math.ceil can do +1 on sum
1092 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1093 if new_number_samples_before > new_number_samples_after:
1094 new_number_samples_before -= 1
1095 else:
1096 new_number_samples_after -= 1
1098 if new_number_samples_before > 0:
1099 time_vector_before, values_before = downsample_list(
1100 time_vector_before, values_before, new_number_samples_before
1101 )
1103 if new_number_samples_after > 0:
1104 time_vector_after, values_after = downsample_list(
1105 time_vector_after, values_after, new_number_samples_after
1106 )
1108 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1109 values = values_before + values_window + values_after
1110 forced_values = self.interpolate_forced_values(new_time_vector)
1111 number_samples = len(values)
1113 data_processing_time = time.time() - data_processing_time
1115 return self.__class__(
1116 signal_id=self.signal_id,
1117 time_vector=new_time_vector,
1118 values=values,
1119 forced_values=forced_values,
1120 number_samples=number_samples,
1121 number_samples_db=self.number_samples,
1122 data_start=self.data_start,
1123 data_end=self.data_end,
1124 db_query_time=self.db_query_time,
1125 init_time=self.init_time,
1126 data_processing_time=self.data_processing_time + data_processing_time,
1127 )
1130class StringSignalData(SignalData):
1131 data_type: str = "str"
1132 values: list[str | None]
1133 forced_values: list[str | None]
1135 def interpolate(self, new_time_vector: list[float], items):
1136 # Find the indices of the values in xp that are just smaller or equal to x
1137 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1138 indices = npy.clip(indices, 0, len(items) - 1)
1139 # Return the corresponding left string values from fp
1140 return [items[i] for i in indices]
1143class SignalsData(TwinPadModel):
1144 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1145 data_processing_time: float
1146 data_start: float | None
1147 data_end: float | None
1149 @classmethod
1150 def get_from_signal_ids(
1151 cls,
1152 signal_ids: list[str],
1153 min_timestamp: float = None,
1154 max_timestamp: float = None,
1155 window_min_timestamp: float = None,
1156 window_max_timestamp: float = None,
1157 interpolate_bounds: bool = True,
1158 max_documents: int = None,
1159 ) -> Self:
1160 signals_data = []
1161 data_start = None
1162 data_end = None
1163 if max_timestamp is None:
1164 max_timestamp = time.time()
1165 data_processing_time = 0.0
1166 for signal_id in signal_ids:
1167 signal_data = SignalData.get_from_signal_id(
1168 signal_id=signal_id,
1169 min_timestamp=min_timestamp,
1170 max_timestamp=max_timestamp,
1171 window_min_timestamp=window_min_timestamp,
1172 window_max_timestamp=window_max_timestamp,
1173 interpolate_bounds=interpolate_bounds,
1174 max_documents=max_documents,
1175 )
1176 data_processing_time += signal_data.data_processing_time
1177 signals_data.append(signal_data)
1178 if signal_data.data_start is not None:
1179 if data_start is None:
1180 data_start = signal_data.data_start
1181 else:
1182 data_start = min(signal_data.data_start, data_start)
1183 if signal_data.data_end is not None:
1184 if data_end is None:
1185 data_end = signal_data.data_end
1186 else:
1187 data_end = max(signal_data.data_end, data_end)
1189 return cls(
1190 signals_data=signals_data,
1191 data_processing_time=data_processing_time,
1192 data_start=data_start,
1193 data_end=data_end,
1194 )
1196 def uniform_desampling(self, number_samples_max: int) -> Self:
1197 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1198 return SignalsData(
1199 signals_data=signals_data,
1200 data_processing_time=sum(s.data_processing_time for s in signals_data),
1201 data_start=self.data_start,
1202 data_end=self.data_end,
1203 )
1205 def interest_window_desampling(
1206 self,
1207 window_max_number_samples: int,
1208 outside_max_number_samples: int,
1209 window_min_timestamp: float = None,
1210 window_max_timestamp: float = None,
1211 ) -> Self:
1212 signals_data = [
1213 s.interest_window_desampling(
1214 window_max_number_samples=window_max_number_samples,
1215 outside_max_number_samples=outside_max_number_samples,
1216 window_min_timestamp=window_min_timestamp,
1217 window_max_timestamp=window_max_timestamp,
1218 )
1219 for s in self.signals_data
1220 ]
1222 return SignalsData(
1223 signals_data=signals_data,
1224 data_processing_time=sum(s.data_processing_time for s in signals_data),
1225 data_start=self.data_start,
1226 data_end=self.data_end,
1227 )
1229 def zip_export(self, file_format: str = "csv"):
1230 # return self.signals_data[0].csv_export()
1231 zip_buffer = io.BytesIO()
1232 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1233 for signal_data in self.signals_data:
1234 if file_format == "csv":
1235 export_io = signal_data.csv_export()
1236 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1237 elif file_format == "prestoplot":
1238 export_io = signal_data.prestoplot_export()
1239 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1240 else:
1241 raise ValueError(f"Format not found. Got: {file_format}")
1242 zip_bytes = zip_buffer.getvalue()
1243 # zip_bytes.seek(0)
1244 return zip_bytes
1246 def hdf5_export(self):
1247 hdf5_buffer = io.BytesIO()
1248 custom_type_float = npy.dtype(
1249 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1250 )
1251 custom_type_string = npy.dtype(
1252 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1253 )
1254 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1255 for signal_data in self.signals_data:
1256 signal_group = hdf5_file.create_group(signal_data.signal_id)
1257 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1258 if signal_data.data_type == "str":
1259 export_data = npy.array(
1260 list(
1261 zip(
1262 date_vector,
1263 signal_data.time_vector,
1264 signal_data.values,
1265 signal_data.forced_values,
1266 )
1267 ),
1268 dtype=custom_type_string,
1269 )
1270 else:
1271 export_data = npy.array(
1272 list(
1273 zip(
1274 date_vector,
1275 signal_data.time_vector,
1276 signal_data.values,
1277 signal_data.forced_values,
1278 )
1279 ),
1280 dtype=custom_type_float,
1281 )
1282 signal_group["data"] = export_data
1283 return hdf5_buffer.getvalue()
1286class SignalStatus(TwinPadModel):
1287 status: str
1288 reason: str
1289 delay: float | None
1292class DigitizationFunction(TwinPadModel):
1293 bits: int | None = None
1294 min_value: float
1295 max_value: float
1296 min_raw_value: float
1297 max_raw_value: float
1300class SignalUpdate(TwinPadModel):
1301 value: float | str | bool | int | None = None
1302 forced_value: float | str | bool | int | None = None
1303 timestamp: int | None = None
1306class SignalType(str, Enum):
1307 command = "command"
1308 sensor = "sensor"
1309 external_sensor = "external_sensor"
1312SIGNALDATA_TYPES = {
1313 "int": NumericSignalData,
1314 "float": NumericSignalData,
1315 "str": StringSignalData,
1316 "bool": NumericSignalData,
1317 "epoch": NumericSignalData,
1318}
1321class Signal(GenericMongo):
1322 collection_name: ClassVar[str] = "signals"
1324 signal_id: str
1325 frequency: float
1326 unit: str | None
1327 description: str
1328 type: SignalType
1329 data_type: str
1330 precision_digits: int | None
1331 forcible: bool
1333 digitization_function: DigitizationFunction | None
1335 @property
1336 def device(self) -> Device:
1337 device_or_config_id = self.signal_id.split(".")[0]
1338 return Device.get_from_device_or_config_id(device_or_config_id)
1340 @cached_property
1341 def signal_data_class(self):
1342 if self.data_type in SIGNALDATA_TYPES:
1343 return SIGNALDATA_TYPES[self.data_type]
1344 if self.data_type.startswith("enum"):
1345 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1346 raise ValueError(f"Unhandled python type: {self.data_type}")
1348 @cached_property
1349 def python_type(self):
1350 if self.data_type in TYPES:
1351 return TYPES[self.data_type]
1352 if self.data_type.startswith("enum"):
1353 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1354 return Literal[*choices]
1355 raise ValueError(f"Unhandled python type: {self.data_type}")
1357 @computed_field
1358 @property
1359 def status(self) -> SignalStatus:
1360 now = time.time()
1361 status = "up"
1362 reason = ""
1364 # See line 285 for explanation
1365 bucket = get_signal_collection(f"system.buckets.{self.signal_id}")
1366 last_bucket = None
1367 if bucket is not None:
1368 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1369 if last_bucket is None:
1370 status = "no data"
1371 reason = "signal does not exist"
1372 return SignalStatus(status=status, reason=reason, delay=None)
1374 try:
1375 last_date = last_bucket["control"]["max"]["timestamp"]
1376 last_date = last_date.replace(tzinfo=pytz.UTC)
1377 last_value_ts = last_date.timestamp()
1378 except IndexError:
1379 last_value_ts = None
1381 if last_value_ts is None:
1382 delay = None
1383 reason = "No data from signal"
1384 else:
1385 # Since device is a computed property, only compute it once
1386 device = self.device
1387 if device is not None and device.last_ping is not None:
1388 last_value_ts = max(last_value_ts, device.last_ping)
1389 delay = now - last_value_ts
1390 if delay > DEVICE_TIMEOUT:
1391 status = "down"
1392 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1393 return SignalStatus(status=status, reason=reason, delay=delay)
1395 async def send_command(self, device_id: str, update_dict: SignalUpdate, current_user: User) -> dict:
1396 command = Command(
1397 sent_at=time.time(),
1398 command_type="Signal command",
1399 user_id=current_user.id,
1400 )
1402 has_input_error = False
1403 error_message = ""
1405 if self.data_type.startswith("enum"):
1406 enum_options = get_args(self.python_type)
1408 if update_dict.value is not None and update_dict.value not in enum_options:
1409 has_input_error = True
1410 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1411 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1412 has_input_error = True
1413 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1414 else:
1415 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1416 has_input_error = True
1417 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1418 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1419 has_input_error = True
1420 error_message += (
1421 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1422 )
1424 if has_input_error:
1425 command.response_time = 0
1426 command.succeeded = False
1427 command.description = f"Tried to modify signal {self.signal_id}"
1428 response = {"error": True, "status_code": 400, "message": error_message}
1429 else:
1430 response = await send_signal_value(device_id, self.signal_id, update_dict)
1431 command.receive_response(response)
1433 Command.create(command)
1434 return response
1436 @classmethod
1437 def get_from_signal_id(cls, signal_id: str) -> Self:
1438 """Could be generic from mongo"""
1439 signal = Signal.get_one_by_attribute("signal_id", signal_id)
1440 if signal is None:
1441 split_signal_id = signal_id.split(".")
1442 device_or_config_id = split_signal_id[0]
1443 ticker = split_signal_id[-1]
1444 possible_device = Device.get_from_device_or_config_id(device_or_config_id)
1445 if possible_device is not None:
1446 signal = Signal.get_one_by_attribute(
1447 "signal_id", f"{possible_device.device_id}.{possible_device.config_id}.{ticker}"
1448 )
1449 if not signal:
1450 return None
1451 return cls.dict_to_object(signal)
1453 @classmethod
1454 def get_all_ids(cls) -> list[str]:
1455 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1457 return [signal["signal_id"] for signal in cursor]
1459 async def number_samples(self):
1460 collection = get_signal_collection(signal_id=self.signal_id)
1461 if collection is None:
1462 return 0
1464 number_samples = collection.estimated_document_count()
1466 number_samples_async_collection = await get_async_collection(
1467 systems_async_database, "number_samples", create=True, time_series=True
1468 )
1470 loop = asyncio.get_running_loop()
1471 loop.create_task(
1472 number_samples_async_collection.insert_one(
1473 {
1474 "timestamp": datetime.datetime.now(pytz.UTC),
1475 "signal_id": self.signal_id,
1476 "number_samples": number_samples,
1477 }
1478 )
1479 )
1481 return number_samples
1483 def sample_datasize(self):
1484 return signals_database.command("collstats", self.signal_id)["size"]
1486 @classmethod
1487 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1488 result = cls.collection().aggregate(
1489 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1490 )
1492 return {signal["signal_id"]: signal["forcible"] for signal in result}
1495class ServicesStatus(TwinPadModel):
1496 backend: str
1497 cloud_broker: str
1498 time_series_database: str
1499 signal_storage: str
1500 heartbeat_storage: str
1501 data_analyzer: str
1503 @classmethod
1504 def check(cls) -> Self:
1505 return cls(
1506 cloud_broker=ping(RABBITMQ_HOST),
1507 backend="up",
1508 time_series_database=ping(MONGO_HOST),
1509 signal_storage=ping(SIGNAL_STORAGE_HOST),
1510 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1511 data_analyzer=ping(DATA_ANALYZER_HOST),
1512 )
1515def ping(host):
1516 try:
1517 if ping3.ping(host, timeout=0.8):
1518 return "up"
1519 except PermissionError:
1520 pass
1521 return "down"
1524class Event(GenericMongo):
1525 collection_name: ClassVar[str] = "events"
1527 name: str
1528 timestamp: float
1529 event_rule_id: str
1531 @computed_field
1532 @cached_property
1533 def event_rule(self) -> "EventRule":
1534 return EventRule.get_from_id(self.event_rule_id)
1536 @classmethod
1537 def dict_to_object(cls, dict_):
1538 """Refine to convert timestamp to datetime for mongodb."""
1539 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1540 return super().dict_to_object(dict_)
1543class TwinPadActivity(GenericMongo):
1544 timestamp: float
1545 amount: int
1547 @classmethod
1548 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
1549 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1550 number_events_collection = get_collection(systems_database, "number_events")
1551 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1552 items = []
1553 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1554 if number_events_collection is None or recompute_amount:
1555 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1556 number_events_collection.delete_many({})
1557 first_event = events_collection.find_one(sort={"timestamp": 1})
1558 if first_event is None:
1559 return items
1560 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1561 tzinfo=pytz.UTC
1562 )
1563 while last_computed_day < TODAY:
1564 day_nb_events = events_collection.count_documents(
1565 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1566 )
1567 if day_nb_events > 0:
1568 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
1569 last_computed_day += ONE_DAY_OFFSET
1570 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1571 if number_events_today > 0:
1572 number_events_collection.delete_many({"timestamp": TODAY})
1573 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
1574 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1575 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1576 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1577 for day in number_events:
1578 day["timestamp"] = day["timestamp"].timestamp()
1579 items.append(cls.mongo_dict_to_object(day))
1580 return items
1582 @classmethod
1583 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1584 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1585 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1586 signals_number_samples_collection = get_collection(
1587 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
1588 )
1589 items = []
1590 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1591 if number_samples_collection is None or recompute_amount:
1592 number_samples_collection = get_collection(
1593 systems_database, "number_received_samples", create=True, time_series=True
1594 )
1595 number_samples_collection.delete_many({})
1596 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
1597 if first_sample is None:
1598 return items
1599 # compute from day of first found event
1600 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
1601 tzinfo=pytz.UTC
1602 )
1603 while last_computed_day < TODAY:
1604 number_samples_request = signals_number_samples_collection.aggregate(
1605 [
1606 {
1607 "$match": {
1608 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
1609 }
1610 },
1611 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1612 ]
1613 ).to_list()
1614 if len(number_samples_request) == 0:
1615 number_samples = 0
1616 else:
1617 number_samples = number_samples_request[0].get("number_samples", 0)
1618 if number_samples > 0:
1619 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
1620 last_computed_day += ONE_DAY_OFFSET
1621 number_samples_request = signals_number_samples_collection.aggregate(
1622 [
1623 {"$match": {"timestamp": {"$gte": TODAY}}},
1624 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1625 ]
1626 ).to_list()
1627 if len(number_samples_request) == 0:
1628 number_samples_today = 0
1629 else:
1630 number_samples_today = number_samples_request[0].get("number_samples", 0)
1631 if number_samples_today > 0:
1632 number_samples_collection.delete_many({"timestamp": TODAY})
1633 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
1634 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1635 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1636 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1637 for day in number_events:
1638 day["timestamp"] = day["timestamp"].timestamp()
1639 items.append(cls.mongo_dict_to_object(day))
1640 return items
1642 @classmethod
1643 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1644 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1645 number_commands_collection = get_collection(systems_database, "number_commands")
1646 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
1647 items = []
1648 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1649 if number_commands_collection is None or recompute_amount:
1650 number_commands_collection = get_collection(
1651 systems_database, "number_commands", create=True, time_series=True
1652 )
1653 number_commands_collection.delete_many({})
1654 first_command = commands_collection.find_one(sort={"timestamp": 1})
1655 if first_command is None:
1656 return items
1657 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
1658 tzinfo=pytz.UTC
1659 )
1660 while last_computed_day < TODAY:
1661 day_nb_commands = commands_collection.count_documents(
1662 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1663 )
1664 if day_nb_commands > 0:
1665 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
1666 last_computed_day += ONE_DAY_OFFSET
1667 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
1668 if number_commands_today > 0:
1669 number_commands_collection.delete_many({"timestamp": TODAY})
1670 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
1671 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1672 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1673 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1674 for day in number_commands:
1675 day["timestamp"] = day["timestamp"].timestamp()
1676 items.append(cls.mongo_dict_to_object(day))
1677 return items
1680class EventRule(GenericMongo):
1681 collection_name: ClassVar[str] = "event_rules"
1683 name: str
1684 formula: str
1685 variables: list[str]
1687 @computed_field
1688 @cached_property
1689 def number_events(self) -> int:
1690 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1693class Company(GenericMongo):
1694 collection_name: ClassVar[str] = "companies"
1695 name: str
1698class Campaign(GenericMongo):
1699 collection_name: ClassVar[str] = "campaigns"
1701 # Properties
1702 id: str | None = None
1703 name: str
1704 description: str | None = None
1706 @classmethod
1707 def create(cls, campaign: Self):
1708 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1709 if new_campaign is None:
1710 return None
1711 return {"campaign_id": str(new_campaign.inserted_id)}
1713 @classmethod
1714 def update(cls, campaign: Self):
1715 updated_campaign = cls.collection().find_one_and_update(
1716 {"_id": ObjectId(campaign.id)},
1717 {"$set": {"name": campaign.name, "description": campaign.description}},
1718 return_document=ReturnDocument.AFTER,
1719 )
1720 return updated_campaign
1722 @classmethod
1723 def delete(cls, campaign_id):
1724 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1725 return deleted_user
1728class Phase(GenericMongo):
1729 collection_name: ClassVar[str] = "phases"
1731 # Properties
1732 id: str | None = None
1733 name: str
1734 description: str | None = None
1735 start_at: float
1736 end_at: float
1738 # FK
1739 campaign_id: str
1741 # @classmethod
1742 # def get_by_date(cls, datetime: float):
1743 # phases = []
1744 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1745 # phases.append(cls.dict_to_object(dict_).model_dump())
1746 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1747 # phases.append(cls.dict_to_object(dict_).model_dump())
1748 # if phases is None:
1749 # return None
1750 # return phases
1752 @classmethod
1753 def create(cls, phase: Self):
1754 phase = Phase(
1755 name=phase.name,
1756 description=phase.description,
1757 start_at=phase.start_at,
1758 end_at=phase.end_at,
1759 campaign_id=phase.campaign_id,
1760 )
1761 phase_collection = get_collection(systems_database, "phases", create=True)
1762 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1763 if new_phase is None:
1764 return None
1765 return {"phase_id": str(new_phase.inserted_id)}
1767 @classmethod
1768 def update(cls, phase: Self):
1769 updated_phase = cls.collection().find_one_and_update(
1770 {"_id": ObjectId(phase.id)},
1771 {
1772 "$set": {
1773 "name": phase.name,
1774 "description": phase.description,
1775 "start_at": phase.start_at,
1776 "end_at": phase.end_at,
1777 }
1778 },
1779 return_document=ReturnDocument.AFTER,
1780 )
1781 return updated_phase
1783 @classmethod
1784 def delete(cls, phase_id):
1785 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1786 return delete_phase
1788 @classmethod
1789 def deleteMany(cls, campaign_id):
1790 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1791 return delete_phases
1794class CustomViewCreation(GenericMongo):
1795 collection_name: ClassVar[str] = "custom_views"
1797 name: str
1798 configuration: list
1801class CustomView(CustomViewCreation):
1802 # Properties
1803 id: str | None = None
1805 # Foreign Key
1806 user_id: str
1808 # # Methods
1809 # @classmethod
1810 # def create(cls, form_custom_view: Self, user_id) -> list:
1811 # custom_view = CustomView(
1812 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1813 # )
1814 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1815 # return new_custom_view
1817 # @classmethod
1818 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1819 # updated_custom_view = cls.collection().find_one_and_update(
1820 # {"_id": ObjectId(custom_view_id)},
1821 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1822 # return_document=ReturnDocument.AFTER,
1823 # )
1824 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1825 # del updated_custom_view["_id"]
1826 # return cls(**updated_custom_view)
1828 # @classmethod
1829 # def delete(cls, custom_view_id) -> bool:
1830 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1831 # return deleted_custom_view.acknowledged
1834CustomViewUpdate = create_update_model(CustomView)
1837class Video(GenericMongo):
1838 collection_name: ClassVar[str] = "videos"
1840 # Properties
1841 name: str
1842 ip_addr: str
1843 username: str | None = None
1844 password: str | None = None
1846 # Methods
1847 @classmethod
1848 def get_all(cls, sort_by="_id") -> list[Self]:
1849 items = []
1850 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1851 items.append(cls.mongo_dict_to_object(dict_))
1852 return items
1854 @classmethod
1855 def get_video(cls, camera_id: ObjectId):
1856 camera = cls.get_from_id(camera_id)
1857 if camera is not None:
1858 return camera.name
1859 return None
1862class Command(GenericMongo):
1863 collection_name: ClassVar[str] = "commands"
1865 # Properties
1866 timestamp: datetime.datetime = None
1867 sent_at: float
1868 response_time: float = 0.0
1869 command_type: str
1870 description: str = ""
1871 succeeded: bool = False
1873 # Foreign key
1874 user_id: str
1876 @classmethod
1877 def collection(cls):
1878 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
1880 @classmethod
1881 def create(cls, command: Self):
1882 command = cls(
1883 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
1884 sent_at=command.sent_at,
1885 response_time=command.response_time,
1886 command_type=command.command_type,
1887 description=command.description,
1888 succeeded=command.succeeded,
1889 user_id=command.user_id,
1890 )
1891 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
1892 if new_command is None:
1893 return None
1894 return {"command_id": str(new_command.inserted_id)}
1896 def receive_response(self, response: dict):
1897 self.response_time = time.time() - self.sent_at
1898 self.succeeded = response.get("error", True) is False
1899 if self.description == "":
1900 self.description += response.get("message", "").rstrip()
1903class SignalsPresetCreation(GenericMongo):
1904 name: str
1905 signal_ids: list[str]
1908class SignalsPreset(SignalsPresetCreation):
1909 collection_name: ClassVar[str] = "signals_presets"
1911 user_id: str
1913 @classmethod
1914 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
1915 signals_preset = cls(
1916 user_id=user_id,
1917 name=signals_preset.name,
1918 signal_ids=signals_preset.signal_ids,
1919 )
1921 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
1923 return str(new_signal_preset.inserted_id)
1926SignalsPresetUpdate = create_update_model(SignalsPreset)
1929class LineStyle(str, Enum):
1930 solid = "solid"
1931 dotted = "dotted"
1932 dashed = "dashed"
1935class SignalAppearance:
1936 value_color: str
1937 forced_value_color: str
1940class GraphThemeCreation(GenericMongo):
1941 collection_name: ClassVar[str] = "graph_themes"
1943 name: str
1944 signal_id: str
1945 value_color: str = ""
1946 forced_value_color: str = ""
1947 value_line_style: LineStyle = LineStyle.solid
1948 forced_value_line_style: LineStyle = LineStyle.solid
1949 private: bool = True
1952class PublicGraphTheme(GraphThemeCreation):
1953 created_by_user: bool
1954 in_user_library: bool
1955 active_for_user: bool
1957 _current_user_id: str = ""
1959 @classproperty
1960 def custom_pipeline_steps(cls) -> dict[str, list]:
1961 return {
1962 "created_by_user": [
1963 {
1964 "$addFields": {
1965 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
1966 }
1967 }
1968 ],
1969 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
1970 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
1971 ],
1972 "in_user_library": [
1973 {
1974 "$addFields": {
1975 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
1976 }
1977 }
1978 ],
1979 "active_for_user": [
1980 {
1981 "$addFields": {
1982 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
1983 }
1984 }
1985 ],
1986 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
1987 "active": [
1988 {
1989 "$addFields": {
1990 "active": "$$REMOVE",
1991 }
1992 }
1993 ],
1994 "creator_id": [
1995 {
1996 "$addFields": {
1997 "creator_id": "$$REMOVE",
1998 }
1999 }
2000 ],
2001 }
2003 @classmethod
2004 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2005 cls._current_user_id = user_id
2006 return super().response_from_query(query)
2008 @classmethod
2009 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2010 query.in_user_library = "true"
2011 return cls.response_from_query(query, user_id)
2013 @classmethod
2014 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2015 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2017 @classmethod
2018 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2019 cls._current_user_id = user_id
2020 return super().get_by_attribute(attribute_name, attribute_value)
2022 @classmethod
2023 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2024 cls._current_user_id = user_id
2025 return super().get_one_by_attribute(attribute_name, attribute_value)
2027 @classmethod
2028 def get_all(cls, sort_by: str, user_id: str):
2029 cls._current_user_id = user_id
2030 return super().get_all(sort_by)
2032 @classmethod
2033 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2034 pipeline = [
2035 {
2036 "$match": {
2037 "active": {"$eq": user_id},
2038 "signal_id": {"$in": signal_ids},
2039 }
2040 },
2041 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2042 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2043 {
2044 "$project": {
2045 "_id": 0,
2046 "signal_id": 1,
2047 "value_color": 1,
2048 "forced_value_color": 1,
2049 "value_line_style": 1,
2050 "forced_value_line_style": 1,
2051 }
2052 },
2053 ]
2055 result = {}
2057 cursor = cls.collection().aggregate(pipeline)
2058 for document in cursor:
2059 signal_id = document["signal_id"]
2060 del document["signal_id"]
2061 result[signal_id] = document
2063 return result
2066GraphThemeUpdate = create_update_model(PublicGraphTheme)
2069class PrivateGraphTheme(GraphThemeCreation):
2070 # private
2071 creator_id: str
2072 in_library: list[str]
2073 active: list[str]
2075 @classmethod
2076 def create(
2077 cls,
2078 creator_id: str,
2079 name: str,
2080 signal_id: str,
2081 value_color: str,
2082 forced_value_color: str,
2083 value_line_style: LineStyle,
2084 forced_value_line_style: LineStyle,
2085 private: bool,
2086 ):
2087 color_setting = cls(
2088 creator_id=creator_id,
2089 name=name,
2090 signal_id=signal_id,
2091 value_color=value_color,
2092 forced_value_color=forced_value_color,
2093 value_line_style=value_line_style,
2094 forced_value_line_style=forced_value_line_style,
2095 private=private,
2096 in_library=[creator_id],
2097 active=[creator_id],
2098 )
2100 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2101 color_setting.id = str(new_color_setting.inserted_id)
2102 return color_setting
2104 def update(self, update_dict: dict, user_id: str):
2105 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2106 if in_user_lib and user_id not in self.in_library:
2107 self.in_library.append(user_id)
2108 elif not in_user_lib and user_id in self.in_library:
2109 self.in_library.remove(user_id)
2110 update_dict["in_library"] = self.in_library
2111 del update_dict["in_user_library"]
2113 if (active_for_user := update_dict.get("active_for_user")) is not None:
2114 if active_for_user and user_id not in self.active:
2115 self.active.append(user_id)
2116 elif not active_for_user and user_id in self.active:
2117 self.active.remove(user_id)
2118 update_dict["active"] = self.active
2119 del update_dict["active_for_user"]
2121 if update_dict.get("created_by_user") is not None:
2122 del update_dict["created_by_user"]
2124 self.collection().find_one_and_update(
2125 {"_id": ObjectId(self.id)},
2126 {"$set": update_dict},
2127 )
2129 return {}
2132class Configuration(GenericMongo):
2133 collection_name: ClassVar[str] = "configs"
2135 # Properties
2136 config_id: str | None = None
2137 generated_at: float
2138 config: dict
2139 components: list
2140 hardware_topology: dict
2141 received_at: float
2142 in_use_by_devices: list[str] = []
2143 is_in_use: bool = False
2145 custom_pipeline_steps = {
2146 "is_in_use": [
2147 {
2148 "$addFields": {
2149 "is_in_use": {
2150 "$cond": [
2151 {"$gt": [{"$size": {"$ifNull": ["$in_use_by_devices", []]}}, 0]},
2152 True,
2153 False,
2154 ]
2155 },
2156 }
2157 }
2158 ],
2159 }
2161 @classmethod
2162 def get_all_ids(cls) -> list[str]:
2163 cursor = cls.collection().aggregate([{"$project": {"config_id": 1, "config_name": 1, "_id": 0}}])
2165 return [{"config_id": config["config_id"], "config_name": config["config_name"]} for config in cursor]
2167 @classmethod
2168 def get_from_config_id(cls, config_id: str) -> Self:
2169 items = (
2170 cls.collection()
2171 .aggregate(
2172 [
2173 {"$match": {"config_id": config_id}},
2174 {"$limit": 1},
2175 ]
2176 )
2177 .to_list()
2178 )
2179 if len(items) == 0:
2180 return None
2181 dict_ = items[0]
2182 # There is some protected information in the config dict, so keep only specific keys
2183 allowed_config_keys = ["description", "broker_host", "target_device_id", "device_name"]
2184 config_dict = dict_.get("config")
2185 dict_["config"] = {k: config_dict[k] for k in allowed_config_keys}
2186 return cls.mongo_dict_to_object(dict_)