Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%
1210 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 10:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 10:53 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, get_args
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pydantic import BaseModel, computed_field, Field, create_model
21import numpy as npy
22import lttb
23import h5py
25# from scipy import signal as signal_scipy
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 get_signal_collections_batch,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 devices_states_database,
36)
37from twinpad_backend.responses import ListResponse
38from twinpad_backend.messages import send_mode_change, send_signal_value
40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
43RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
44MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
45SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
46HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
47DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
49DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
50NUMBER_SAMPLES_DATABASE_UPDATE = 120
52logger = logging.getLogger("uvicorn.error")
55class classproperty:
56 """
57 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
58 Found here: https://stackoverflow.com/a/76301341
59 """
61 def __init__(self, func):
62 self.fget = func
64 def __get__(self, _, owner):
65 return self.fget(owner)
68def create_update_model(model):
69 fields = {}
71 for field_name, field_annotation in model.model_fields.items():
72 if field_name != "id":
73 fields[field_name] = (field_annotation.annotation | None, None)
75 query_name = model.__name__ + "Update"
76 return create_model(query_name, **fields)
79def get_utc_date_from_timestamp(timestamp: float):
80 return datetime.datetime.fromtimestamp(timestamp).isoformat()
83def downsample_list(time_vector: list, values: list, max_number_samples: int):
84 if len(time_vector) < max_number_samples:
85 return time_vector, values
87 time_vector_copy = copy.deepcopy(time_vector)
88 values_copy = copy.deepcopy(values)
90 none_group_bounds = []
91 none_group_index = -1
92 index = -1
93 # LTTB doesn't handle None values so remove them
94 while values_copy.count(None) > 0:
95 # Store bounds of None value groups so we can insert them back after the downsampling
96 if (new_index := values_copy.index(None)) != index:
97 none_group_bounds.append([time_vector_copy.pop(new_index)])
98 none_group_index += 1
99 elif len(none_group_bounds[none_group_index]) < 2:
100 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
101 else:
102 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
103 values_copy.pop(new_index)
104 index = new_index
105 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
107 try:
108 values_array = npy.array([time_vector_copy, values_copy]).T
109 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
111 new_time_vector = interpolated_values[:, 0].tolist()
112 new_values = interpolated_values[:, 1].tolist()
113 except ValueError:
114 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
115 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
116 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
117 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
118 return new_time_vector, new_values_nan_to_none
120 # insert back None values at the correct timestamps
121 for none_group in none_group_bounds:
122 start_index = npy.searchsorted(new_time_vector, none_group[0])
123 new_time_vector[start_index:start_index] = none_group
124 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
126 return new_time_vector, new_values
129def is_of_type(value, wanted_type):
130 if wanted_type is float:
131 return isinstance(value, (int, float))
132 return isinstance(value, wanted_type)
135# Models
136class TwinPadModel(BaseModel):
137 @classmethod
138 def dict_to_object(cls, dict_):
139 return cls.model_validate(dict_)
141 def to_dict(self, exclude=None):
142 dict_ = self.model_dump(exclude=exclude)
143 return dict_
146class GenericMongo(TwinPadModel):
147 id: str | None = None
148 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
150 @classmethod
151 def collection(cls):
152 return get_collection(systems_database, cls.collection_name, create=True)
154 @classmethod
155 def response_from_query(cls, query) -> ListResponse[Self]:
156 request_filters = query.mongodb_filter()
157 items = []
158 if ":" in query.sort_by:
159 sort_field, sort_order = query.sort_by.split(":")
160 sort_order = int(sort_order)
161 else:
162 sort_field = query.sort_by
163 sort_order = 1
164 collection = get_collection(systems_database, cls.collection_name, create=True)
165 total = collection.count_documents(request_filters)
167 pipeline = []
168 added_properties = []
169 if "$and" in request_filters:
170 for request_filter in request_filters["$and"]:
171 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
172 if filtered_property in request_filter:
173 pipeline.extend(pipeline_steps)
174 added_properties.append(filtered_property)
175 else:
176 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
177 if filtered_property in request_filters:
178 pipeline.extend(pipeline_steps)
179 added_properties.append(filtered_property)
180 pipeline.append({"$match": request_filters})
181 if sort_field in cls.custom_pipeline_steps:
182 pipeline.extend(cls.custom_pipeline_steps[sort_field])
183 added_properties.append(sort_field)
184 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}])
186 if (query.limit is not None) and (query.limit != 0):
187 pipeline.append({"$limit": query.limit})
189 for filtered_property, step in cls.custom_pipeline_steps.items():
190 if filtered_property not in added_properties:
191 pipeline.extend(step)
193 cursor = collection.aggregate(pipeline)
195 for item_dict in cursor:
196 items.append(cls.mongo_dict_to_object(item_dict))
198 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
200 @classmethod
201 def get_from_id(cls, item_id) -> Self | None:
202 return cls.get_one_by_attribute("_id", ObjectId(item_id))
204 @classmethod
205 def mongo_dict_to_object(cls, mongo_dict):
206 mongo_dict["id"] = str(mongo_dict["_id"])
207 del mongo_dict["_id"]
208 return cls.dict_to_object(mongo_dict)
210 @classmethod
211 def get_by_attribute(cls, attribute_name: str, attribute_value):
212 """Returns all items that match the attribute with value."""
213 pipeline = []
214 if attribute_name in cls.custom_pipeline_steps:
215 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
216 pipeline.append({"$match": {attribute_name: attribute_value}})
217 for key, step in cls.custom_pipeline_steps.items():
218 if key != attribute_name:
219 pipeline.extend(step)
220 items = cls.collection().aggregate(pipeline)
221 if items is None:
222 return None
223 return [cls.mongo_dict_to_object(d) for d in items]
225 @classmethod
226 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
227 pipeline = []
228 if attribute_name in cls.custom_pipeline_steps:
229 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
230 pipeline.append({"$match": {attribute_name: attribute_value}})
231 pipeline.append({"$limit": 1})
232 for key, step in cls.custom_pipeline_steps.items():
233 if key != attribute_name:
234 pipeline.extend(step)
235 items = cls.collection().aggregate(pipeline).to_list()
236 if len(items) == 0:
237 return None
238 return cls.mongo_dict_to_object(items[0])
240 @classmethod
241 def get_all(cls, sort_by="_id") -> list[Self]:
242 items = []
243 pipeline = []
244 if sort_by in cls.custom_pipeline_steps:
245 pipeline.extend(cls.custom_pipeline_steps[sort_by])
246 pipeline.append({"$sort": {sort_by: ASCENDING}})
247 for key, step in cls.custom_pipeline_steps.items():
248 if key != sort_by:
249 pipeline.extend(step)
250 for dict_ in cls.collection().aggregate(pipeline):
251 items.append(cls.mongo_dict_to_object(dict_))
252 return items
254 @classmethod
255 def get_number_documents(cls):
256 collection = get_collection(systems_database, cls.collection_name)
257 if collection is None:
258 return 0
259 return collection.count_documents({})
261 def insert(self):
262 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
263 self.id = str(insert_result.inserted_id)
264 return self.id
266 def update(self, update_dict):
267 for key, value in update_dict.items():
268 setattr(self, key, value)
269 self.collection().find_one_and_update(
270 {"_id": ObjectId(self.id)},
271 {"$set": update_dict},
272 return_document=ReturnDocument.AFTER,
273 )
275 return self
277 def delete(self):
278 result = self.collection().delete_one({"_id": ObjectId(self.id)})
279 return result.deleted_count > 0
282class User(GenericMongo):
283 collection_name: ClassVar[str] = "users"
285 firstname: str
286 lastname: str
287 email: str
288 password: str
289 is_active: bool | None = False
290 is_admin: bool | None = False
291 is_connected: bool | None = False
292 company_id: str | None = None
294 def to_dict(self, exclude=None):
295 if exclude is None:
296 exclude = {"password"}
297 return GenericMongo.to_dict(self, exclude=exclude)
299 @classmethod
300 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
301 users = cls.get_all()
302 if not users:
303 is_admin = True
304 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
305 user_collection = get_collection(systems_database, "users", create=True)
306 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
307 if new_user is None:
308 return None
309 return {"user_id": str(new_user.inserted_id)}
311 @classmethod
312 def update_info(cls, user: "UserUpdate", user_id: str):
313 updated_user = cls.collection().find_one_and_update(
314 {"_id": ObjectId(user_id)},
315 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
316 return_document=ReturnDocument.AFTER,
317 )
318 updated_user["id"] = str(updated_user["_id"])
319 del (updated_user["_id"], updated_user["is_connected"])
320 return cls(**updated_user)
323UserUpdate = create_update_model(User)
326class Mode(TwinPadModel):
327 mode_id: int
328 name: str
329 frequency_multiplier: float
330 min_frequency: float
333class DeviceUpdate(TwinPadModel):
334 mode_id: int
337class Device(GenericMongo):
338 collection_name: ClassVar[str] = "devices"
340 device_id: str
341 name: str
342 description: str = ""
343 modes: list[Mode]
344 current_mode_id: int | None = None
345 last_ping: float | None = None
346 petri_network: Any
347 pid: Any
348 load: float | None = None
349 tokens: list[int] = Field(default_factory=list)
350 status: str
352 async def change_mode(self, update_dict, current_user: User):
353 has_error = False
355 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
356 has_error = True
357 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
358 elif self.current_mode_id is not None:
359 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
360 else:
361 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
362 command = Command(
363 sent_at=time.time(),
364 command_type="Mode change",
365 description=description,
366 user_id=current_user.id,
367 )
369 if has_error:
370 command.response_time = 0
371 command.succeeded = False
372 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
373 else:
374 response = await send_mode_change(self.device_id, update_dict.mode_id)
375 command.receive_response(response)
377 Command.create(command)
378 return response
380 @classmethod
381 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
382 devices_by_id = {}
383 for signal_id in signal_ids:
384 device_id = signal_id.split(".")[0]
385 if device_id not in devices_by_id:
386 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
387 return devices_by_id
390class DeviceSetup(GenericMongo):
391 collection_name: ClassVar[str] = "device_setups"
393 device_ids: list[str]
394 active: bool = False
395 variable_mapping: dict[str, str]
398DeviceSetupUpdate = create_update_model(DeviceSetup)
401class DeviceState(GenericMongo):
402 collection_name: ClassVar[str] = "devices_states"
404 timestamp: float
405 mode: str | None = None
406 load: float | None = None
407 tokens: list[int] = Field(default_factory=list)
408 modified_properties: list[str] = Field(default_factory=list)
410 @classmethod
411 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
412 req_filter = query.mongodb_filter()
413 items = []
414 if ":" in query.sort_by:
415 sort_field, sort_order = query.sort_by.split(":")
416 sort_order = int(sort_order)
417 else:
418 sort_field = query.sort_by
419 sort_order = 1
420 collection = get_collection(devices_states_database, device_id)
421 if collection is None:
422 total = 0
423 cursor = []
424 else:
425 total = collection.count_documents(req_filter)
426 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
427 if (query.limit is not None) and (query.limit != 0):
428 cursor = cursor.limit(query.limit)
429 for item_dict in cursor:
430 items.append(
431 cls(
432 timestamp=item_dict.get("precise_timestamp"),
433 mode=item_dict.get("mode", None),
434 load=item_dict.get("load", None),
435 tokens=item_dict.get("tokens", Field(default_factory=list)),
436 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
437 )
438 )
439 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
442class SignalSample(TwinPadModel):
443 signal_id: str
444 timestamp: float
445 value: float | int | str | bool | None
446 forced_value: float | int | str | bool | None = None
448 @classmethod
449 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
451 collection = get_signal_collection(signal_id)
452 if collection is None:
453 return None
455 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
456 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
457 bucket = get_signal_collection(f"system.buckets.{signal_id}")
458 first_bucket = None
459 if bucket is not None:
460 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
461 if first_bucket is not None:
462 sample_data = collection.find_one(
463 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
464 )
465 else:
466 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
468 if sample_data is None:
469 return None
471 timestamp = sample_data["precise_timestamp"]
473 return cls(
474 signal_id=signal_id,
475 timestamp=timestamp,
476 value=sample_data.get("value", None),
477 forced_value=sample_data.get("forced_value", None),
478 )
480 @classmethod
481 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
482 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
484 @classmethod
485 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
487 collection = get_signal_collection(signal_id)
488 if collection is None:
489 return None
491 # Same workaround as above function, very effective to narrow down big sets of data
492 bucket = get_signal_collection(f"system.buckets.{signal_id}")
493 last_bucket = None
494 if bucket is not None:
495 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
496 if last_bucket is not None:
497 sample_data = collection.find_one(
498 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
499 sort={"precise_timestamp": -1},
500 )
501 else:
502 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
504 if sample_data is None:
505 return None
507 timestamp = sample_data["precise_timestamp"]
509 if device is None:
510 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
511 if device is not None and device.last_ping is not None:
512 if timestamp is None:
513 timestamp = device.last_ping
514 else:
515 timestamp = max(timestamp, device.last_ping)
516 return cls(
517 signal_id=signal_id,
518 timestamp=timestamp,
519 value=sample_data.get("value", None),
520 forced_value=sample_data.get("forced_value", None),
521 )
523 @classmethod
524 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None:
525 collection = get_signal_collection(signal_id)
526 if collection is None:
527 return None
529 sample_data = collection.find_one(
530 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1}
531 )
532 if sample_data is None:
533 return None
535 return cls(
536 signal_id=signal_id,
537 timestamp=sample_data.get("precise_timestamp"),
538 value=sample_data.get("value"),
539 forced_value=sample_data.get("forced_value"),
540 )
542 @classmethod
543 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
544 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
545 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
547 @classmethod
548 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None:
549 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids]
552class SignalData(TwinPadModel):
553 signal_id: str
554 forcible: bool = True
555 time_vector: list[float]
556 values: list[float | int | str | None]
557 forced_values: list[float | int | str | None]
559 data_start: float | None = None
560 data_end: float | None = None
562 number_samples: int = 0
563 number_samples_db: int = 0
565 db_query_time: float = 0.0
566 init_time: float = 0.0
567 data_processing_time: float = 0.0
569 @classmethod
570 def get_from_signal_id(
571 cls,
572 signal_id: str,
573 min_timestamp: float = None,
574 max_timestamp: float = None,
575 window_min_timestamp: float = None,
576 window_max_timestamp: float = None,
577 interpolate_bounds: bool = True,
578 max_documents: int = None,
579 ) -> Self:
581 now = time.time()
583 req_signal = {}
584 if min_timestamp is not None:
585 req_signal.setdefault("timestamp", {})
586 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
587 if max_timestamp is not None:
588 req_signal.setdefault("timestamp", {})
589 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
591 collection = get_signal_collection(signal_id)
592 if collection is None:
593 return cls(
594 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
595 )
597 db_req_start = time.time()
599 sort_step = {"$sort": {"precise_timestamp": 1}}
600 number_results = collection.count_documents(req_signal)
602 pipeline = []
603 if req_signal:
604 pipeline.append({"$match": req_signal}) # Filter data if needed
606 pipeline.extend(
607 [
608 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
609 sort_step,
610 ]
611 )
613 if max_documents is not None and max_documents < number_results:
614 unsampling_ratio = math.ceil(number_results / max_documents)
615 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
616 pipeline.extend(
617 [
618 {
619 "$setWindowFields": {
620 "sortBy": {"precise_timestamp": 1},
621 "output": {"index": {"$documentNumber": {}}},
622 }
623 },
624 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
625 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
626 {"$replaceRoot": {"newRoot": "$doc"}},
627 {"$unset": ["index", "group_id"]},
628 {"$sort": {"precise_timestamp": 1}},
629 ]
630 )
632 # logger.info(f"pipeline: %s", str(pipeline))
633 cursor = collection.aggregate(pipeline)
634 db_req_time = time.time() - db_req_start
636 init_time = time.time()
638 results = cursor.to_list()
639 time_vector = []
640 values = []
641 forced_values = []
642 for s in results:
643 time_vector.append(s["precise_timestamp"])
644 values.append(s.get("value", None))
645 forced_values.append(s.get("forced_value", None))
647 signal = Signal.get_from_signal_id(signal_id)
648 class_ = signal.signal_data_class
650 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
651 time_vector, values, forced_values = cls.interpolate_bounds(
652 class_,
653 collection,
654 signal_id,
655 time_vector,
656 values,
657 forced_values,
658 window_min_timestamp,
659 window_max_timestamp,
660 )
662 if values:
663 # TODO: check below. a bit strange
664 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
665 # Adding last value as it should be repeated
666 time_vector.append(now)
667 values.append(values[-1])
668 forced_values.append(forced_values[-1])
670 init_time = time.time() - init_time
672 # See line 292 for explanation
673 bucket = get_signal_collection(f"system.buckets.{signal_id}")
674 first_bucket = None
675 if bucket is not None:
676 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
677 if first_bucket is not None:
678 data_start = first_bucket["control"]["min"]["precise_timestamp"]
679 else:
680 data_start = None
682 last_bucket = None
683 if bucket is not None:
684 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
685 if last_bucket is not None:
686 data_end = last_bucket["control"]["max"]["precise_timestamp"]
687 else:
688 data_end = None
690 return class_(
691 signal_id=signal_id,
692 forcible=signal.forcible,
693 time_vector=time_vector,
694 values=values,
695 forced_values=forced_values,
696 data_start=data_start,
697 data_end=data_end,
698 number_samples=len(values),
699 number_samples_db=number_results,
700 db_query_time=db_req_time,
701 init_time=init_time,
702 )
704 @staticmethod
705 def interpolate_bounds(
706 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
707 ):
708 sample_right = None
709 # Fetching right side value & interpolation
710 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
711 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
712 sample_right = collection.find_one(
713 {
714 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
715 "value": {"$exists": True},
716 },
717 sort={"precise_timestamp": -1},
718 )
719 if sample_right:
720 if time_vector:
721 right_sd = class_(
722 signal_id=signal_id,
723 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
724 values=[values[-1], sample_right.get("value", None)],
725 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
726 )
727 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
728 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
729 else:
730 max_ts_value = sample_right.get("value", None)
731 max_ts_forced_value = sample_right.get("forced_value", None)
732 time_vector.append(window_max_timestamp)
733 values.append(max_ts_value)
734 forced_values.append(max_ts_forced_value)
736 # Fetching left side value & interpolation
737 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
738 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
739 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
740 sample_left = sample_right
741 sample_left = collection.find_one(
742 {
743 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
744 "value": {"$exists": True},
745 },
746 sort={"precise_timestamp": -1},
747 )
749 if sample_left:
750 if time_vector:
751 left_sd = class_(
752 signal_id=signal_id,
753 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
754 values=[sample_left["value"], values[0]],
755 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
756 )
757 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
758 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
759 else:
760 min_ts_value = sample_left.get("value", None)
761 min_ts_forced_value = sample_left.get("forced_value", None)
762 time_vector.insert(0, window_min_timestamp)
763 values.insert(0, min_ts_value)
764 forced_values.insert(0, min_ts_forced_value)
766 return time_vector, values, forced_values
768 def interpolate_values(self, new_time_vector: list[float]):
769 return self.interpolate(new_time_vector, self.values)
771 def interpolate_forced_values(self, new_time_vector: list[float]):
772 return self.interpolate(new_time_vector, self.forced_values)
774 def uniform_desampling(self, number_samples_max: int) -> Self:
775 data_processing_time = time.time()
776 if number_samples_max and self.number_samples > number_samples_max:
777 new_time_vector = npy.linspace(
778 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
779 ).tolist()
780 values = self.interpolate_values(new_time_vector)
781 forced_values = self.interpolate_forced_values(new_time_vector)
782 time_vector = new_time_vector
783 number_samples = len(time_vector)
784 else:
785 time_vector = self.time_vector
786 number_samples = len(self.values)
787 values = self.values[:]
788 forced_values = self.forced_values[:]
789 data_processing_time = time.time() - data_processing_time
791 return self.__class__(
792 signal_id=self.signal_id,
793 time_vector=time_vector,
794 values=values,
795 forced_values=forced_values,
796 number_samples=number_samples,
797 number_samples_db=self.number_samples,
798 data_start=self.data_start,
799 data_end=self.data_end,
800 db_query_time=self.db_query_time,
801 init_time=self.init_time,
802 data_processing_time=self.data_processing_time + data_processing_time,
803 )
805 def interest_window_desampling(
806 self,
807 window_max_number_samples: int,
808 outside_max_number_samples: int,
809 window_min_timestamp: float | None = None,
810 window_max_timestamp: float | None = None,
811 ) -> Self:
812 """Performs a sampling in a window of interest and outside."""
814 if not self.time_vector:
815 return self
817 if window_min_timestamp is None:
818 window_min_timestamp = self.time_vector[0]
819 if window_max_timestamp is None:
820 window_max_timestamp = self.time_vector[-1]
822 data_processing_time = time.time()
824 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
825 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
827 time_vector_before = self.time_vector[:index_window_start]
828 time_vector_window = self.time_vector[index_window_start:index_window_end]
829 time_vector_after = self.time_vector[index_window_end:]
831 # Resampling window
832 if time_vector_window:
833 # Ensurring window bounds
834 if time_vector_window[0] != window_min_timestamp:
835 time_vector_window.insert(0, window_min_timestamp)
836 if time_vector_window[-1] != window_max_timestamp:
837 time_vector_window.append(window_max_timestamp)
838 else:
839 time_vector_window = [window_min_timestamp, window_max_timestamp]
841 if len(time_vector_window) > window_max_number_samples:
842 # Resampling
843 new_window_time_vector = npy.linspace(
844 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
845 ).tolist()
846 time_vector_window = new_window_time_vector
848 # Resampling outside
849 number_samples_before = len(time_vector_before)
850 number_samples_after = len(time_vector_after)
851 if (number_samples_before + number_samples_after) > outside_max_number_samples:
852 new_number_samples_before = min(
853 number_samples_before,
854 math.ceil(
855 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
856 ),
857 )
858 new_number_samples_after = min(
859 number_samples_after,
860 math.ceil(
861 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
862 ),
863 )
864 # Adjusting numbers as math.ceil can do +1 on sum
865 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
866 if new_number_samples_before > new_number_samples_after:
867 new_number_samples_before -= 1
868 else:
869 new_number_samples_after -= 1
871 if new_number_samples_before > 0:
872 new_time_vector_before = npy.linspace(
873 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
874 ).tolist()
875 time_vector_before = new_time_vector_before
877 if new_number_samples_after > 0:
878 new_time_vector_after = npy.linspace(
879 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
880 ).tolist()[::-1]
881 time_vector_after = new_time_vector_after
883 new_time_vector = time_vector_before + time_vector_window + time_vector_after
884 values = self.interpolate_values(new_time_vector)
885 forced_values = self.interpolate_forced_values(new_time_vector)
886 number_samples = len(values)
888 data_processing_time = time.time() - data_processing_time
890 return self.__class__(
891 signal_id=self.signal_id,
892 forcible=self.forcible,
893 time_vector=new_time_vector,
894 values=values,
895 forced_values=forced_values,
896 number_samples=number_samples,
897 number_samples_db=self.number_samples,
898 data_start=self.data_start,
899 data_end=self.data_end,
900 db_query_time=self.db_query_time,
901 init_time=self.init_time,
902 data_processing_time=self.data_processing_time + data_processing_time,
903 )
905 def csv_export(self):
906 output = io.StringIO()
907 writer = csv.writer(output)
908 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
909 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
910 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
911 return output.getvalue().encode("utf-8")
913 def prestoplot_export(self):
914 clean_signal_id = self.signal_id.replace(".", "_")
915 if clean_signal_id[0].isnumeric():
916 clean_signal_id = "_" + clean_signal_id
918 output = io.StringIO()
919 output.write("# Encoding:\tUTF-8\n")
920 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
921 output.write("ISO8601\tnone\tnone\n")
922 output.write(f"# Description :\t{clean_signal_id}\n")
924 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
925 output.write(
926 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
927 )
928 return output.getvalue().encode("utf-8")
931class NumericSignalData(SignalData):
932 data_type: str = "float"
933 values: list[float | int | None]
934 forced_values: list[float | int | None]
936 def interpolate(self, new_time_vector: list[float], items):
937 items = [npy.nan if s is None else s for s in items]
938 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
940 def uniform_desampling(self, number_samples_max: int) -> Self:
941 data_processing_time = time.time()
942 if number_samples_max and self.number_samples > number_samples_max:
943 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
944 forced_values = self.interpolate_forced_values(time_vector)
945 number_samples = len(time_vector)
946 else:
947 time_vector = self.time_vector
948 number_samples = len(self.values)
949 values = self.values[:]
950 forced_values = self.forced_values[:]
951 data_processing_time = time.time() - data_processing_time
953 return self.__class__(
954 signal_id=self.signal_id,
955 time_vector=time_vector,
956 values=values,
957 forced_values=forced_values,
958 number_samples=number_samples,
959 number_samples_db=self.number_samples,
960 data_start=self.data_start,
961 data_end=self.data_end,
962 db_query_time=self.db_query_time,
963 init_time=self.init_time,
964 data_processing_time=self.data_processing_time + data_processing_time,
965 )
967 def interest_window_desampling(
968 self,
969 window_max_number_samples: int,
970 outside_max_number_samples: int,
971 window_min_timestamp: float | None = None,
972 window_max_timestamp: float | None = None,
973 ) -> Self:
974 """Performs a sampling in a window of interest and outside."""
976 if not self.time_vector:
977 return self
979 if window_min_timestamp is None:
980 window_min_timestamp = self.time_vector[0]
981 if window_max_timestamp is None:
982 window_max_timestamp = self.time_vector[-1]
984 data_processing_time = time.time()
986 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
987 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
989 time_vector_before = self.time_vector[:index_window_start]
990 time_vector_window = self.time_vector[index_window_start:index_window_end]
991 time_vector_after = self.time_vector[index_window_end:]
993 values_before = self.values[:index_window_start]
994 values_window = self.values[index_window_start:index_window_end]
995 values_after = self.values[index_window_end:]
996 window_min_value = self.interpolate_values([window_min_timestamp])[0]
997 window_max_value = self.interpolate_values([window_max_timestamp])[0]
999 # Resampling window
1000 if time_vector_window:
1001 # Ensurring window bounds
1002 if time_vector_window[0] != window_min_timestamp:
1003 time_vector_window.insert(0, window_min_timestamp)
1004 values_window.insert(0, window_min_value)
1005 if time_vector_window[-1] != window_max_timestamp:
1006 time_vector_window.append(window_max_timestamp)
1007 values_window.append(window_max_value)
1008 else:
1009 time_vector_window = [window_min_timestamp, window_max_timestamp]
1010 values_window = [window_min_value, window_max_value]
1012 if len(time_vector_window) > window_max_number_samples:
1013 # Resampling
1014 time_vector_window, values_window = downsample_list(
1015 time_vector_window, values_window, window_max_number_samples
1016 )
1018 # Resampling outside
1019 number_samples_before = len(time_vector_before)
1020 number_samples_after = len(time_vector_after)
1021 if (number_samples_before + number_samples_after) > outside_max_number_samples:
1022 new_number_samples_before = min(
1023 number_samples_before,
1024 math.ceil(
1025 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1026 ),
1027 )
1028 new_number_samples_after = min(
1029 number_samples_after,
1030 math.ceil(
1031 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1032 ),
1033 )
1034 # Adjusting numbers as math.ceil can do +1 on sum
1035 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1036 if new_number_samples_before > new_number_samples_after:
1037 new_number_samples_before -= 1
1038 else:
1039 new_number_samples_after -= 1
1041 if new_number_samples_before > 0:
1042 time_vector_before, values_before = downsample_list(
1043 time_vector_before, values_before, new_number_samples_before
1044 )
1046 if new_number_samples_after > 0:
1047 time_vector_after, values_after = downsample_list(
1048 time_vector_after, values_after, new_number_samples_after
1049 )
1051 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1052 values = values_before + values_window + values_after
1053 forced_values = self.interpolate_forced_values(new_time_vector)
1054 number_samples = len(values)
1056 data_processing_time = time.time() - data_processing_time
1058 return self.__class__(
1059 signal_id=self.signal_id,
1060 time_vector=new_time_vector,
1061 values=values,
1062 forced_values=forced_values,
1063 number_samples=number_samples,
1064 number_samples_db=self.number_samples,
1065 data_start=self.data_start,
1066 data_end=self.data_end,
1067 db_query_time=self.db_query_time,
1068 init_time=self.init_time,
1069 data_processing_time=self.data_processing_time + data_processing_time,
1070 )
1073class StringSignalData(SignalData):
1074 data_type: str = "str"
1075 values: list[str | None]
1076 forced_values: list[str | None]
1078 def interpolate(self, new_time_vector: list[float], items):
1079 # Find the indices of the values in xp that are just smaller or equal to x
1080 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1081 indices = npy.clip(indices, 0, len(items) - 1)
1082 # Return the corresponding left string values from fp
1083 return [items[i] for i in indices]
1086class SignalsData(TwinPadModel):
1087 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1088 data_processing_time: float
1089 data_start: float | None
1090 data_end: float | None
1092 @classmethod
1093 def get_from_signal_ids(
1094 cls,
1095 signal_ids: list[str],
1096 min_timestamp: float = None,
1097 max_timestamp: float = None,
1098 window_min_timestamp: float = None,
1099 window_max_timestamp: float = None,
1100 interpolate_bounds: bool = True,
1101 max_documents: int = None,
1102 ) -> Self:
1103 signals_data = []
1104 data_start = None
1105 data_end = None
1106 if max_timestamp is None:
1107 max_timestamp = time.time()
1108 data_processing_time = 0.0
1109 for signal_id in signal_ids:
1110 signal_data = SignalData.get_from_signal_id(
1111 signal_id=signal_id,
1112 min_timestamp=min_timestamp,
1113 max_timestamp=max_timestamp,
1114 window_min_timestamp=window_min_timestamp,
1115 window_max_timestamp=window_max_timestamp,
1116 interpolate_bounds=interpolate_bounds,
1117 max_documents=max_documents,
1118 )
1119 data_processing_time += signal_data.data_processing_time
1120 signals_data.append(signal_data)
1121 if signal_data.data_start is not None:
1122 if data_start is None:
1123 data_start = signal_data.data_start
1124 else:
1125 data_start = min(signal_data.data_start, data_start)
1126 if signal_data.data_end is not None:
1127 if data_end is None:
1128 data_end = signal_data.data_end
1129 else:
1130 data_end = max(signal_data.data_end, data_end)
1132 return cls(
1133 signals_data=signals_data,
1134 data_processing_time=data_processing_time,
1135 data_start=data_start,
1136 data_end=data_end,
1137 )
1139 def uniform_desampling(self, number_samples_max: int) -> Self:
1140 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1141 return SignalsData(
1142 signals_data=signals_data,
1143 data_processing_time=sum(s.data_processing_time for s in signals_data),
1144 data_start=self.data_start,
1145 data_end=self.data_end,
1146 )
1148 def interest_window_desampling(
1149 self,
1150 window_max_number_samples: int,
1151 outside_max_number_samples: int,
1152 window_min_timestamp: float = None,
1153 window_max_timestamp: float = None,
1154 ) -> Self:
1155 signals_data = [
1156 s.interest_window_desampling(
1157 window_max_number_samples=window_max_number_samples,
1158 outside_max_number_samples=outside_max_number_samples,
1159 window_min_timestamp=window_min_timestamp,
1160 window_max_timestamp=window_max_timestamp,
1161 )
1162 for s in self.signals_data
1163 ]
1165 return SignalsData(
1166 signals_data=signals_data,
1167 data_processing_time=sum(s.data_processing_time for s in signals_data),
1168 data_start=self.data_start,
1169 data_end=self.data_end,
1170 )
1172 def zip_export(self, file_format: str = "csv"):
1173 # return self.signals_data[0].csv_export()
1174 zip_buffer = io.BytesIO()
1175 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1176 for signal_data in self.signals_data:
1177 if file_format == "csv":
1178 export_io = signal_data.csv_export()
1179 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1180 elif file_format == "prestoplot":
1181 export_io = signal_data.prestoplot_export()
1182 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1183 else:
1184 raise ValueError(f"Format not found. Got: {file_format}")
1185 zip_bytes = zip_buffer.getvalue()
1186 # zip_bytes.seek(0)
1187 return zip_bytes
1189 def hdf5_export(self):
1190 hdf5_buffer = io.BytesIO()
1191 custom_type_float = npy.dtype(
1192 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1193 )
1194 custom_type_string = npy.dtype(
1195 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1196 )
1197 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1198 for signal_data in self.signals_data:
1199 signal_group = hdf5_file.create_group(signal_data.signal_id)
1200 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1201 if signal_data.data_type == "str":
1202 export_data = npy.array(
1203 list(
1204 zip(
1205 date_vector,
1206 signal_data.time_vector,
1207 signal_data.values,
1208 signal_data.forced_values,
1209 )
1210 ),
1211 dtype=custom_type_string,
1212 )
1213 else:
1214 export_data = npy.array(
1215 list(
1216 zip(
1217 date_vector,
1218 signal_data.time_vector,
1219 signal_data.values,
1220 signal_data.forced_values,
1221 )
1222 ),
1223 dtype=custom_type_float,
1224 )
1225 signal_group["data"] = export_data
1226 return hdf5_buffer.getvalue()
1229class SignalStatus(TwinPadModel):
1230 status: str
1231 reason: str
1232 delay: float | None
1235class DigitizationFunction(TwinPadModel):
1236 bits: int | None = None
1237 min_value: float
1238 max_value: float
1239 min_raw_value: float
1240 max_raw_value: float
1243class SignalUpdate(TwinPadModel):
1244 value: float | str | bool | int | None = None
1245 forced_value: float | str | bool | int | None = None
1246 timestamp: int | None = None
1249class SignalType(str, Enum):
1250 command = "command"
1251 sensor = "sensor"
1252 external_sensor = "external_sensor"
1255SIGNALDATA_TYPES = {
1256 "int": NumericSignalData,
1257 "float": NumericSignalData,
1258 "str": StringSignalData,
1259 "bool": NumericSignalData,
1260 "epoch": NumericSignalData,
1261}
1264class Signal(GenericMongo):
1265 collection_name: ClassVar[str] = "signals"
1267 signal_id: str
1268 frequency: float
1269 unit: str | None
1270 description: str
1271 type: SignalType
1272 data_type: str
1273 precision_digits: int | None
1274 forcible: bool
1276 digitization_function: DigitizationFunction | None
1278 @property
1279 def device(self) -> Device:
1280 device_id = self.signal_id.split(".")[0]
1281 device = Device.get_one_by_attribute("device_id", device_id)
1282 return device
1284 @cached_property
1285 def signal_data_class(self):
1286 if self.data_type in SIGNALDATA_TYPES:
1287 return SIGNALDATA_TYPES[self.data_type]
1288 if self.data_type.startswith("enum"):
1289 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1290 raise ValueError(f"Unhandled python type: {self.data_type}")
1292 @cached_property
1293 def python_type(self):
1294 if self.data_type in TYPES:
1295 return TYPES[self.data_type]
1296 if self.data_type.startswith("enum"):
1297 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1298 return Literal[*choices]
1299 raise ValueError(f"Unhandled python type: {self.data_type}")
1301 @computed_field
1302 @property
1303 def status(self) -> SignalStatus:
1304 now = time.time()
1305 status = "up"
1306 reason = ""
1308 # See line 285 for explanation
1309 bucket = get_signal_collection(f"system.buckets.{self.signal_id}")
1310 last_bucket = None
1311 if bucket is not None:
1312 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1313 if last_bucket is None:
1314 status = "no data"
1315 reason = "signal does not exist"
1316 return SignalStatus(status=status, reason=reason, delay=None)
1318 try:
1319 last_date = last_bucket["control"]["max"]["timestamp"]
1320 last_date = last_date.replace(tzinfo=pytz.UTC)
1321 last_value_ts = last_date.timestamp()
1322 except IndexError:
1323 last_value_ts = None
1325 if last_value_ts is None:
1326 delay = None
1327 reason = "No data from signal"
1328 else:
1329 # Since device is a computed property, only compute it once
1330 device = self.device
1331 if device is not None and device.last_ping is not None:
1332 last_value_ts = max(last_value_ts, device.last_ping)
1333 delay = now - last_value_ts
1334 if delay > DEVICE_TIMEOUT:
1335 status = "down"
1336 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1337 return SignalStatus(status=status, reason=reason, delay=delay)
1339 @classmethod
1340 def status_batch(cls, signal_ids: list[str], devices_by_ids: dict[str, Device]) -> dict[str, SignalStatus]:
1341 """Computes the status of multiple signals in batch from their signal ids.
1343 :param signal_ids: Signal IDs of the wanted signals
1344 :type signal_ids: list[str]
1345 :param devices_by_ids: A pre-computed map of all signal IDs linked to their :py:class:`Device`.
1346 :type devices_by_ids: dict[str, Device]
1347 :return: A dictionary with the signal ID as the keys and their respective :py:class:`SignalStatus` as its values.
1348 :rtype: dict[str, SignalStatus]
1349 """
1350 statuses_by_signal_id = {}
1352 buckets = get_signal_collections_batch([f"system.buckets.{signal_id}" for signal_id in signal_ids])
1353 for signal_id, bucket in zip(signal_ids, buckets):
1354 now = time.time()
1355 status = "up"
1356 reason = ""
1357 last_bucket = None
1358 if bucket is not None:
1359 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1360 if last_bucket is None:
1361 status = "no data"
1362 reason = "signal does not exist"
1363 statuses_by_signal_id[signal_id] = SignalStatus(status=status, reason=reason, delay=None)
1364 continue
1366 try:
1367 last_date = last_bucket["control"]["max"]["timestamp"]
1368 last_date = last_date.replace(tzinfo=pytz.UTC)
1369 last_value_ts = last_date.timestamp()
1370 except IndexError:
1371 last_value_ts = None
1373 if last_value_ts is None:
1374 delay = None
1375 reason = "No data from signal"
1376 else:
1377 # Since device is a computed property, only compute it once
1378 device = devices_by_ids.get(signal_id.split(".")[0], None)
1379 delay = now - last_value_ts
1380 if device is not None and device.status == "up":
1381 delay = 0
1382 if delay > DEVICE_TIMEOUT:
1383 status = "down"
1384 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1385 statuses_by_signal_id[signal_id] = SignalStatus(status=status, reason=reason, delay=delay)
1387 return statuses_by_signal_id
1389 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1390 command = Command(
1391 sent_at=time.time(),
1392 command_type="Signal command",
1393 user_id=current_user.id,
1394 )
1396 has_input_error = False
1397 error_message = ""
1399 if self.data_type.startswith("enum"):
1400 enum_options = get_args(self.python_type)
1402 if update_dict.value is not None and update_dict.value not in enum_options:
1403 has_input_error = True
1404 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1405 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1406 has_input_error = True
1407 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1408 else:
1409 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1410 has_input_error = True
1411 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1412 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1413 has_input_error = True
1414 error_message += (
1415 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1416 )
1418 if has_input_error:
1419 command.response_time = 0
1420 command.succeeded = False
1421 command.description = f"Tried to modify signal {self.signal_id}"
1422 response = {"error": True, "status_code": 400, "message": error_message}
1423 else:
1424 response = await send_signal_value(self.signal_id, update_dict)
1425 command.receive_response(response)
1427 Command.create(command)
1428 return response
1430 @classmethod
1431 def get_from_signal_id(cls, signal_id) -> Self:
1432 """Could be generic from mongo"""
1433 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1434 if not raw_value:
1435 return None
1436 del raw_value["_id"]
1437 return cls.dict_to_object(raw_value)
1439 @classmethod
1440 def get_all_ids(cls) -> list[str]:
1441 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1443 return [signal["signal_id"] for signal in cursor]
1445 async def number_samples(self):
1446 collection = get_signal_collection(signal_id=self.signal_id)
1447 if collection is None:
1448 return 0
1450 number_samples = collection.estimated_document_count()
1452 number_samples_async_collection = await get_async_collection(
1453 systems_async_database, "number_samples", create=True, time_series=True
1454 )
1456 loop = asyncio.get_running_loop()
1457 loop.create_task(
1458 number_samples_async_collection.insert_one(
1459 {
1460 "timestamp": datetime.datetime.now(pytz.UTC),
1461 "signal_id": self.signal_id,
1462 "number_samples": number_samples,
1463 }
1464 )
1465 )
1467 return number_samples
1469 @classmethod
1470 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]:
1471 number_samples_by_id = {}
1472 collections = get_signal_collections_batch(signal_ids)
1473 number_samples_async_collection = await get_async_collection(
1474 systems_async_database, "number_samples", create=True, time_series=True
1475 )
1477 for signal_id, collection in zip(signal_ids, collections):
1478 if collection is None:
1479 number_samples_by_id[signal_id] = 0
1480 continue
1482 number_samples = collection.estimated_document_count()
1484 number_samples_by_id[signal_id] = number_samples
1486 now = datetime.datetime.now(pytz.UTC)
1487 loop = asyncio.get_running_loop()
1488 loop.create_task(
1489 number_samples_async_collection.insert_many(
1490 [
1491 {
1492 "timestamp": now,
1493 "signal_id": signal_id,
1494 "number_samples": number_samples,
1495 }
1496 for signal_id, number_samples in number_samples_by_id.items()
1497 ]
1498 )
1499 )
1501 return number_samples_by_id
1503 def sample_datasize(self):
1504 return signals_database.command("collstats", self.signal_id)["size"]
1506 @classmethod
1507 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1508 result = cls.collection().aggregate(
1509 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1510 )
1512 return {signal["signal_id"]: signal["forcible"] for signal in result}
1515class ServicesStatus(TwinPadModel):
1516 backend: str
1517 cloud_broker: str
1518 time_series_database: str
1519 signal_storage: str
1520 heartbeat_storage: str
1521 data_analyzer: str
1523 @classmethod
1524 def check(cls) -> Self:
1525 return cls(
1526 cloud_broker=ping(RABBITMQ_HOST),
1527 backend="up",
1528 time_series_database=ping(MONGO_HOST),
1529 signal_storage=ping(SIGNAL_STORAGE_HOST),
1530 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1531 data_analyzer=ping(DATA_ANALYZER_HOST),
1532 )
1535def ping(host):
1536 try:
1537 if ping3.ping(host, timeout=0.8):
1538 return "up"
1539 except PermissionError:
1540 pass
1541 return "down"
1544class Event(GenericMongo):
1545 collection_name: ClassVar[str] = "events"
1547 name: str
1548 timestamp: float
1549 event_rule_id: str
1551 @computed_field
1552 @cached_property
1553 def event_rule(self) -> "EventRule":
1554 return EventRule.get_from_id(self.event_rule_id)
1556 @classmethod
1557 def dict_to_object(cls, dict_):
1558 """Refine to convert timestamp to datetime for mongodb."""
1559 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1560 return super().dict_to_object(dict_)
1563class TwinPadActivity(GenericMongo):
1564 timestamp: float
1565 amount: int
1567 @classmethod
1568 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
1569 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1570 number_events_collection = get_collection(systems_database, "number_events")
1571 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1572 items = []
1573 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1574 if number_events_collection is None or recompute_amount:
1575 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1576 number_events_collection.delete_many({})
1577 first_event = events_collection.find_one(sort={"timestamp": 1})
1578 if first_event is None:
1579 return items
1580 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1581 tzinfo=pytz.UTC
1582 )
1583 while last_computed_day < TODAY:
1584 day_nb_events = events_collection.count_documents(
1585 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1586 )
1587 if day_nb_events > 0:
1588 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
1589 last_computed_day += ONE_DAY_OFFSET
1590 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1591 if number_events_today > 0:
1592 number_events_collection.delete_many({"timestamp": TODAY})
1593 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
1594 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1595 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1596 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1597 for day in number_events:
1598 day["timestamp"] = day["timestamp"].timestamp()
1599 items.append(cls.mongo_dict_to_object(day))
1600 return items
1602 @classmethod
1603 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1604 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1605 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1606 signals_number_samples_collection = get_collection(
1607 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
1608 )
1609 items = []
1610 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1611 if number_samples_collection is None or recompute_amount:
1612 number_samples_collection = get_collection(
1613 systems_database, "number_received_samples", create=True, time_series=True
1614 )
1615 number_samples_collection.delete_many({})
1616 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
1617 if first_sample is None:
1618 return items
1619 # compute from day of first found event
1620 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
1621 tzinfo=pytz.UTC
1622 )
1623 while last_computed_day < TODAY:
1624 number_samples_request = signals_number_samples_collection.aggregate(
1625 [
1626 {
1627 "$match": {
1628 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
1629 }
1630 },
1631 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1632 ]
1633 ).to_list()
1634 if len(number_samples_request) == 0:
1635 number_samples = 0
1636 else:
1637 number_samples = number_samples_request[0].get("number_samples", 0)
1638 if number_samples > 0:
1639 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
1640 last_computed_day += ONE_DAY_OFFSET
1641 number_samples_request = signals_number_samples_collection.aggregate(
1642 [
1643 {"$match": {"timestamp": {"$gte": TODAY}}},
1644 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1645 ]
1646 ).to_list()
1647 if len(number_samples_request) == 0:
1648 number_samples_today = 0
1649 else:
1650 number_samples_today = number_samples_request[0].get("number_samples", 0)
1651 if number_samples_today > 0:
1652 number_samples_collection.delete_many({"timestamp": TODAY})
1653 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
1654 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1655 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1656 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1657 for day in number_events:
1658 day["timestamp"] = day["timestamp"].timestamp()
1659 items.append(cls.mongo_dict_to_object(day))
1660 return items
1662 @classmethod
1663 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1664 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1665 number_commands_collection = get_collection(systems_database, "number_commands")
1666 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
1667 items = []
1668 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1669 if number_commands_collection is None or recompute_amount:
1670 number_commands_collection = get_collection(
1671 systems_database, "number_commands", create=True, time_series=True
1672 )
1673 number_commands_collection.delete_many({})
1674 first_command = commands_collection.find_one(sort={"timestamp": 1})
1675 if first_command is None:
1676 return items
1677 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
1678 tzinfo=pytz.UTC
1679 )
1680 while last_computed_day < TODAY:
1681 day_nb_commands = commands_collection.count_documents(
1682 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1683 )
1684 if day_nb_commands > 0:
1685 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
1686 last_computed_day += ONE_DAY_OFFSET
1687 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
1688 if number_commands_today > 0:
1689 number_commands_collection.delete_many({"timestamp": TODAY})
1690 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
1691 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1692 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1693 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1694 for day in number_commands:
1695 day["timestamp"] = day["timestamp"].timestamp()
1696 items.append(cls.mongo_dict_to_object(day))
1697 return items
1700class EventRule(GenericMongo):
1701 collection_name: ClassVar[str] = "event_rules"
1703 name: str
1704 formula: str
1705 variables: list[str]
1707 @computed_field
1708 @cached_property
1709 def number_events(self) -> int:
1710 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1713class Company(GenericMongo):
1714 collection_name: ClassVar[str] = "companies"
1715 name: str
1718class Campaign(GenericMongo):
1719 collection_name: ClassVar[str] = "campaigns"
1721 # Properties
1722 id: str | None = None
1723 name: str
1724 description: str | None = None
1726 @classmethod
1727 def create(cls, campaign: Self):
1728 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1729 if new_campaign is None:
1730 return None
1731 return {"campaign_id": str(new_campaign.inserted_id)}
1733 @classmethod
1734 def update(cls, campaign: Self):
1735 updated_campaign = cls.collection().find_one_and_update(
1736 {"_id": ObjectId(campaign.id)},
1737 {"$set": {"name": campaign.name, "description": campaign.description}},
1738 return_document=ReturnDocument.AFTER,
1739 )
1740 return updated_campaign
1742 @classmethod
1743 def delete(cls, campaign_id):
1744 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1745 return deleted_user
1748class Phase(GenericMongo):
1749 collection_name: ClassVar[str] = "phases"
1751 # Properties
1752 id: str | None = None
1753 name: str
1754 description: str | None = None
1755 start_at: float
1756 end_at: float
1758 # FK
1759 campaign_id: str
1761 # @classmethod
1762 # def get_by_date(cls, datetime: float):
1763 # phases = []
1764 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1765 # phases.append(cls.dict_to_object(dict_).model_dump())
1766 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1767 # phases.append(cls.dict_to_object(dict_).model_dump())
1768 # if phases is None:
1769 # return None
1770 # return phases
1772 @classmethod
1773 def create(cls, phase: Self):
1774 phase = Phase(
1775 name=phase.name,
1776 description=phase.description,
1777 start_at=phase.start_at,
1778 end_at=phase.end_at,
1779 campaign_id=phase.campaign_id,
1780 )
1781 phase_collection = get_collection(systems_database, "phases", create=True)
1782 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1783 if new_phase is None:
1784 return None
1785 return {"phase_id": str(new_phase.inserted_id)}
1787 @classmethod
1788 def update(cls, phase: Self):
1789 updated_phase = cls.collection().find_one_and_update(
1790 {"_id": ObjectId(phase.id)},
1791 {
1792 "$set": {
1793 "name": phase.name,
1794 "description": phase.description,
1795 "start_at": phase.start_at,
1796 "end_at": phase.end_at,
1797 }
1798 },
1799 return_document=ReturnDocument.AFTER,
1800 )
1801 return updated_phase
1803 @classmethod
1804 def delete(cls, phase_id):
1805 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1806 return delete_phase
1808 @classmethod
1809 def deleteMany(cls, campaign_id):
1810 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1811 return delete_phases
1814class CustomViewCreation(GenericMongo):
1815 collection_name: ClassVar[str] = "custom_views"
1817 name: str
1818 configuration: list
1821class CustomView(CustomViewCreation):
1822 # Properties
1823 id: str | None = None
1825 # Foreign Key
1826 user_id: str
1828 # # Methods
1829 # @classmethod
1830 # def create(cls, form_custom_view: Self, user_id) -> list:
1831 # custom_view = CustomView(
1832 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1833 # )
1834 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1835 # return new_custom_view
1837 # @classmethod
1838 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1839 # updated_custom_view = cls.collection().find_one_and_update(
1840 # {"_id": ObjectId(custom_view_id)},
1841 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1842 # return_document=ReturnDocument.AFTER,
1843 # )
1844 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1845 # del updated_custom_view["_id"]
1846 # return cls(**updated_custom_view)
1848 # @classmethod
1849 # def delete(cls, custom_view_id) -> bool:
1850 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1851 # return deleted_custom_view.acknowledged
1854CustomViewUpdate = create_update_model(CustomView)
1857class Video(GenericMongo):
1858 collection_name: ClassVar[str] = "videos"
1860 # Properties
1861 name: str
1862 ip_addr: str
1863 username: str | None = None
1864 password: str | None = None
1866 # Methods
1867 @classmethod
1868 def get_all(cls, sort_by="_id") -> list[Self]:
1869 items = []
1870 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1871 items.append(cls.mongo_dict_to_object(dict_))
1872 return items
1874 @classmethod
1875 def get_video(cls, camera_id: ObjectId):
1876 camera = cls.get_from_id(camera_id)
1877 if camera is not None:
1878 return camera.name
1879 return None
1882class Command(GenericMongo):
1883 collection_name: ClassVar[str] = "commands"
1885 # Properties
1886 timestamp: datetime.datetime = None
1887 sent_at: float
1888 response_time: float = 0.0
1889 command_type: str
1890 description: str = ""
1891 succeeded: bool = False
1893 # Foreign key
1894 user_id: str
1896 @classmethod
1897 def collection(cls):
1898 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
1900 @classmethod
1901 def create(cls, command: Self):
1902 command = cls(
1903 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
1904 sent_at=command.sent_at,
1905 response_time=command.response_time,
1906 command_type=command.command_type,
1907 description=command.description,
1908 succeeded=command.succeeded,
1909 user_id=command.user_id,
1910 )
1911 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
1912 if new_command is None:
1913 return None
1914 return {"command_id": str(new_command.inserted_id)}
1916 def receive_response(self, response: dict):
1917 self.response_time = time.time() - self.sent_at
1918 self.succeeded = response.get("error", True) is False
1919 if self.description == "":
1920 self.description += response.get("message", "").rstrip()
1923class SignalsPresetCreation(GenericMongo):
1924 name: str
1925 signal_ids: list[str]
1928class SignalsPreset(SignalsPresetCreation):
1929 collection_name: ClassVar[str] = "signals_presets"
1931 user_id: str
1933 @classmethod
1934 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
1935 signals_preset = cls(
1936 user_id=user_id,
1937 name=signals_preset.name,
1938 signal_ids=signals_preset.signal_ids,
1939 )
1941 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
1943 return str(new_signal_preset.inserted_id)
1946SignalsPresetUpdate = create_update_model(SignalsPreset)
1949class LineStyle(str, Enum):
1950 solid = "solid"
1951 dotted = "dotted"
1952 dashed = "dashed"
1955class SignalAppearance:
1956 value_color: str
1957 forced_value_color: str
1960class GraphThemeCreation(GenericMongo):
1961 collection_name: ClassVar[str] = "graph_themes"
1963 name: str
1964 signal_id: str
1965 value_color: str = ""
1966 forced_value_color: str = ""
1967 value_line_style: LineStyle = LineStyle.solid
1968 forced_value_line_style: LineStyle = LineStyle.solid
1969 private: bool = True
1972class PublicGraphTheme(GraphThemeCreation):
1973 created_by_user: bool
1974 in_user_library: bool
1975 active_for_user: bool
1977 _current_user_id: str = ""
1979 @classproperty
1980 def custom_pipeline_steps(cls) -> dict[str, list]:
1981 return {
1982 "created_by_user": [
1983 {
1984 "$addFields": {
1985 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
1986 }
1987 }
1988 ],
1989 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
1990 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
1991 ],
1992 "in_user_library": [
1993 {
1994 "$addFields": {
1995 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
1996 }
1997 }
1998 ],
1999 "active_for_user": [
2000 {
2001 "$addFields": {
2002 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2003 }
2004 }
2005 ],
2006 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2007 "active": [
2008 {
2009 "$addFields": {
2010 "active": "$$REMOVE",
2011 }
2012 }
2013 ],
2014 "creator_id": [
2015 {
2016 "$addFields": {
2017 "creator_id": "$$REMOVE",
2018 }
2019 }
2020 ],
2021 }
2023 @classmethod
2024 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2025 cls._current_user_id = user_id
2026 return super().response_from_query(query)
2028 @classmethod
2029 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2030 query.in_user_library = "true"
2031 return cls.response_from_query(query, user_id)
2033 @classmethod
2034 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2035 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2037 @classmethod
2038 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2039 cls._current_user_id = user_id
2040 return super().get_by_attribute(attribute_name, attribute_value)
2042 @classmethod
2043 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2044 cls._current_user_id = user_id
2045 return super().get_one_by_attribute(attribute_name, attribute_value)
2047 @classmethod
2048 def get_all(cls, sort_by: str, user_id: str):
2049 cls._current_user_id = user_id
2050 return super().get_all(sort_by)
2052 @classmethod
2053 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2054 pipeline = [
2055 {
2056 "$match": {
2057 "active": {"$eq": user_id},
2058 "signal_id": {"$in": signal_ids},
2059 }
2060 },
2061 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2062 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2063 {
2064 "$project": {
2065 "_id": 0,
2066 "signal_id": 1,
2067 "value_color": 1,
2068 "forced_value_color": 1,
2069 "value_line_style": 1,
2070 "forced_value_line_style": 1,
2071 }
2072 },
2073 ]
2075 result = {}
2077 cursor = cls.collection().aggregate(pipeline)
2078 for document in cursor:
2079 signal_id = document["signal_id"]
2080 del document["signal_id"]
2081 result[signal_id] = document
2083 return result
2086GraphThemeUpdate = create_update_model(PublicGraphTheme)
2089class PrivateGraphTheme(GraphThemeCreation):
2090 # private
2091 creator_id: str
2092 in_library: list[str]
2093 active: list[str]
2095 @classmethod
2096 def create(
2097 cls,
2098 creator_id: str,
2099 name: str,
2100 signal_id: str,
2101 value_color: str,
2102 forced_value_color: str,
2103 value_line_style: LineStyle,
2104 forced_value_line_style: LineStyle,
2105 private: bool,
2106 ):
2107 color_setting = cls(
2108 creator_id=creator_id,
2109 name=name,
2110 signal_id=signal_id,
2111 value_color=value_color,
2112 forced_value_color=forced_value_color,
2113 value_line_style=value_line_style,
2114 forced_value_line_style=forced_value_line_style,
2115 private=private,
2116 in_library=[creator_id],
2117 active=[creator_id],
2118 )
2120 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2121 color_setting.id = str(new_color_setting.inserted_id)
2122 return color_setting
2124 def update(self, update_dict: dict, user_id: str):
2125 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2126 if in_user_lib and user_id not in self.in_library:
2127 self.in_library.append(user_id)
2128 elif not in_user_lib and user_id in self.in_library:
2129 self.in_library.remove(user_id)
2130 update_dict["in_library"] = self.in_library
2131 del update_dict["in_user_library"]
2133 if (active_for_user := update_dict.get("active_for_user")) is not None:
2134 if active_for_user and user_id not in self.active:
2135 self.active.append(user_id)
2136 elif not active_for_user and user_id in self.active:
2137 self.active.remove(user_id)
2138 update_dict["active"] = self.active
2139 del update_dict["active_for_user"]
2141 if update_dict.get("created_by_user") is not None:
2142 del update_dict["created_by_user"]
2144 self.collection().find_one_and_update(
2145 {"_id": ObjectId(self.id)},
2146 {"$set": update_dict},
2147 )
2149 return {}