Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%
1214 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-17 08:53 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-17 08:53 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, get_args
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pydantic import BaseModel, computed_field, Field, create_model
21import numpy as npy
22import lttb
23import h5py
25# from scipy import signal as signal_scipy
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 get_signal_collections_batch,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 devices_states_database,
36)
37from twinpad_backend.responses import ListResponse
38from twinpad_backend.messages import send_mode_change, send_signal_value
40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
43RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
44MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
45SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
46HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
47DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
49DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
50NUMBER_SAMPLES_DATABASE_UPDATE = 120
52logger = logging.getLogger("uvicorn.error")
55class classproperty:
56 """
57 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
58 Found here: https://stackoverflow.com/a/76301341
59 """
61 def __init__(self, func):
62 self.fget = func
64 def __get__(self, _, owner):
65 return self.fget(owner)
68def create_update_model(model):
69 fields = {}
71 for field_name, field_annotation in model.model_fields.items():
72 if field_name != "id":
73 fields[field_name] = (field_annotation.annotation | None, None)
75 query_name = model.__name__ + "Update"
76 return create_model(query_name, **fields)
79def get_utc_date_from_timestamp(timestamp: float):
80 return datetime.datetime.fromtimestamp(timestamp).isoformat()
83def downsample_list(time_vector: list, values: list, max_number_samples: int):
84 if len(time_vector) < max_number_samples:
85 return time_vector, values
87 time_vector_copy = copy.deepcopy(time_vector)
88 values_copy = copy.deepcopy(values)
90 none_group_bounds = []
91 none_group_index = -1
92 index = -1
93 # LTTB doesn't handle None values so remove them
94 while values_copy.count(None) > 0:
95 # Store bounds of None value groups so we can insert them back after the downsampling
96 if (new_index := values_copy.index(None)) != index:
97 none_group_bounds.append([time_vector_copy.pop(new_index)])
98 none_group_index += 1
99 elif len(none_group_bounds[none_group_index]) < 2:
100 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
101 else:
102 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
103 values_copy.pop(new_index)
104 index = new_index
105 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
107 try:
108 values_array = npy.array([time_vector_copy, values_copy]).T
109 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
111 new_time_vector = interpolated_values[:, 0].tolist()
112 new_values = interpolated_values[:, 1].tolist()
113 except ValueError:
114 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
115 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
116 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
117 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
118 return new_time_vector, new_values_nan_to_none
120 # insert back None values at the correct timestamps
121 for none_group in none_group_bounds:
122 start_index = npy.searchsorted(new_time_vector, none_group[0])
123 new_time_vector[start_index:start_index] = none_group
124 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
126 return new_time_vector, new_values
129def is_of_type(value, wanted_type):
130 if wanted_type is float:
131 return isinstance(value, (int, float))
132 return isinstance(value, wanted_type)
135# Models
136class TwinPadModel(BaseModel):
137 @classmethod
138 def dict_to_object(cls, dict_):
139 return cls.model_validate(dict_)
141 def to_dict(self, exclude=None):
142 dict_ = self.model_dump(exclude=exclude)
143 return dict_
146class GenericMongo(TwinPadModel):
147 id: str | None = None
148 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
150 @classmethod
151 def collection(cls):
152 return get_collection(systems_database, cls.collection_name, create=True)
154 @classmethod
155 def response_from_query(cls, query) -> ListResponse[Self]:
156 request_filters = query.mongodb_filter()
157 items = []
159 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
160 sort_dict = {}
161 for sort in query.sort_by.split(","):
162 if ":" in sort:
163 sort_field, sort_order = sort.split(":")
164 sort_order = int(sort_order)
165 else:
166 sort_field = sort
167 sort_order = 1
168 sort_dict[sort_field] = sort_order
170 collection = get_collection(systems_database, cls.collection_name, create=True)
171 total = collection.count_documents(request_filters)
173 pipeline = []
174 added_properties = []
175 if "$and" in request_filters:
176 for request_filter in request_filters["$and"]:
177 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
178 if filtered_property in request_filter:
179 pipeline.extend(pipeline_steps)
180 added_properties.append(filtered_property)
181 else:
182 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
183 if filtered_property in request_filters:
184 pipeline.extend(pipeline_steps)
185 added_properties.append(filtered_property)
186 pipeline.append({"$match": request_filters})
188 for sort_field in sort_dict.keys():
189 if sort_field in cls.custom_pipeline_steps:
190 pipeline.extend(cls.custom_pipeline_steps[sort_field])
191 added_properties.append(sort_field)
192 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
194 if (query.limit is not None) and (query.limit != 0):
195 pipeline.append({"$limit": query.limit})
197 for filtered_property, step in cls.custom_pipeline_steps.items():
198 if filtered_property not in added_properties:
199 pipeline.extend(step)
201 cursor = collection.aggregate(pipeline)
203 for item_dict in cursor:
204 items.append(cls.mongo_dict_to_object(item_dict))
206 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
208 @classmethod
209 def get_from_id(cls, item_id) -> Self | None:
210 return cls.get_one_by_attribute("_id", ObjectId(item_id))
212 @classmethod
213 def mongo_dict_to_object(cls, mongo_dict):
214 mongo_dict["id"] = str(mongo_dict["_id"])
215 del mongo_dict["_id"]
216 return cls.dict_to_object(mongo_dict)
218 @classmethod
219 def get_by_attribute(cls, attribute_name: str, attribute_value):
220 """Returns all items that match the attribute with value."""
221 pipeline = []
222 if attribute_name in cls.custom_pipeline_steps:
223 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
224 pipeline.append({"$match": {attribute_name: attribute_value}})
225 for key, step in cls.custom_pipeline_steps.items():
226 if key != attribute_name:
227 pipeline.extend(step)
228 items = cls.collection().aggregate(pipeline)
229 if items is None:
230 return None
231 return [cls.mongo_dict_to_object(d) for d in items]
233 @classmethod
234 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
235 pipeline = []
236 if attribute_name in cls.custom_pipeline_steps:
237 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
238 pipeline.append({"$match": {attribute_name: attribute_value}})
239 pipeline.append({"$limit": 1})
240 for key, step in cls.custom_pipeline_steps.items():
241 if key != attribute_name:
242 pipeline.extend(step)
243 items = cls.collection().aggregate(pipeline).to_list()
244 if len(items) == 0:
245 return None
246 return cls.mongo_dict_to_object(items[0])
248 @classmethod
249 def get_all(cls, sort_by="_id") -> list[Self]:
250 items = []
251 pipeline = []
252 if sort_by in cls.custom_pipeline_steps:
253 pipeline.extend(cls.custom_pipeline_steps[sort_by])
254 pipeline.append({"$sort": {sort_by: ASCENDING}})
255 for key, step in cls.custom_pipeline_steps.items():
256 if key != sort_by:
257 pipeline.extend(step)
258 for dict_ in cls.collection().aggregate(pipeline):
259 items.append(cls.mongo_dict_to_object(dict_))
260 return items
262 @classmethod
263 def get_number_documents(cls):
264 collection = get_collection(systems_database, cls.collection_name)
265 if collection is None:
266 return 0
267 return collection.count_documents({})
269 def insert(self):
270 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
271 self.id = str(insert_result.inserted_id)
272 return self.id
274 def update(self, update_dict):
275 for key, value in update_dict.items():
276 setattr(self, key, value)
277 self.collection().find_one_and_update(
278 {"_id": ObjectId(self.id)},
279 {"$set": update_dict},
280 return_document=ReturnDocument.AFTER,
281 )
283 return self
285 def delete(self):
286 result = self.collection().delete_one({"_id": ObjectId(self.id)})
287 return result.deleted_count > 0
290class User(GenericMongo):
291 collection_name: ClassVar[str] = "users"
293 firstname: str
294 lastname: str
295 email: str
296 password: str
297 is_active: bool | None = False
298 is_admin: bool | None = False
299 is_connected: bool | None = False
300 company_id: str | None = None
302 def to_dict(self, exclude=None):
303 if exclude is None:
304 exclude = {"password"}
305 return GenericMongo.to_dict(self, exclude=exclude)
307 @classmethod
308 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
309 users = cls.get_all()
310 if not users:
311 is_admin = True
312 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
313 user_collection = get_collection(systems_database, "users", create=True)
314 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
315 if new_user is None:
316 return None
317 return {"user_id": str(new_user.inserted_id)}
319 @classmethod
320 def update_info(cls, user: "UserUpdate", user_id: str):
321 updated_user = cls.collection().find_one_and_update(
322 {"_id": ObjectId(user_id)},
323 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
324 return_document=ReturnDocument.AFTER,
325 )
326 updated_user["id"] = str(updated_user["_id"])
327 del (updated_user["_id"], updated_user["is_connected"])
328 return cls(**updated_user)
331UserUpdate = create_update_model(User)
334class Mode(TwinPadModel):
335 mode_id: int
336 name: str
337 frequency_multiplier: float
338 min_frequency: float
341class DeviceUpdate(TwinPadModel):
342 mode_id: int
345class Device(GenericMongo):
346 collection_name: ClassVar[str] = "devices"
348 device_id: str
349 name: str
350 description: str = ""
351 modes: list[Mode]
352 current_mode_id: int | None = None
353 last_ping: float | None = None
354 petri_network: Any
355 pid: Any
356 load: float | None = None
357 tokens: list[int] = Field(default_factory=list)
358 status: str
360 async def change_mode(self, update_dict, current_user: User):
361 has_error = False
363 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
364 has_error = True
365 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
366 elif self.current_mode_id is not None:
367 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
368 else:
369 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
370 command = Command(
371 sent_at=time.time(),
372 command_type="Mode change",
373 description=description,
374 user_id=current_user.id,
375 )
377 if has_error:
378 command.response_time = 0
379 command.succeeded = False
380 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
381 else:
382 response = await send_mode_change(self.device_id, update_dict.mode_id)
383 command.receive_response(response)
385 Command.create(command)
386 return response
388 @classmethod
389 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
390 devices_by_id = {}
391 for signal_id in signal_ids:
392 device_id = signal_id.split(".")[0]
393 if device_id not in devices_by_id:
394 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
395 return devices_by_id
398class DeviceSetup(GenericMongo):
399 collection_name: ClassVar[str] = "device_setups"
401 device_ids: list[str]
402 active: bool = False
403 variable_mapping: dict[str, str]
406DeviceSetupUpdate = create_update_model(DeviceSetup)
409class DeviceState(GenericMongo):
410 collection_name: ClassVar[str] = "devices_states"
412 timestamp: float
413 mode: str | None = None
414 load: float | None = None
415 tokens: list[int] = Field(default_factory=list)
416 modified_properties: list[str] = Field(default_factory=list)
418 @classmethod
419 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
420 req_filter = query.mongodb_filter()
421 items = []
422 if ":" in query.sort_by:
423 sort_field, sort_order = query.sort_by.split(":")
424 sort_order = int(sort_order)
425 else:
426 sort_field = query.sort_by
427 sort_order = 1
428 collection = get_collection(devices_states_database, device_id)
429 if collection is None:
430 total = 0
431 cursor = []
432 else:
433 total = collection.count_documents(req_filter)
434 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
435 if (query.limit is not None) and (query.limit != 0):
436 cursor = cursor.limit(query.limit)
437 for item_dict in cursor:
438 items.append(
439 cls(
440 timestamp=item_dict.get("precise_timestamp"),
441 mode=item_dict.get("mode", None),
442 load=item_dict.get("load", None),
443 tokens=item_dict.get("tokens", Field(default_factory=list)),
444 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
445 )
446 )
447 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
450class SignalSample(TwinPadModel):
451 signal_id: str
452 timestamp: float
453 value: float | int | str | bool | None
454 forced_value: float | int | str | bool | None = None
456 @classmethod
457 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
459 collection = get_signal_collection(signal_id)
460 if collection is None:
461 return None
463 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
464 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
465 bucket = get_signal_collection(f"system.buckets.{signal_id}")
466 first_bucket = None
467 if bucket is not None:
468 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
469 if first_bucket is not None:
470 sample_data = collection.find_one(
471 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
472 )
473 else:
474 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
476 if sample_data is None:
477 return None
479 timestamp = sample_data["precise_timestamp"]
481 return cls(
482 signal_id=signal_id,
483 timestamp=timestamp,
484 value=sample_data.get("value", None),
485 forced_value=sample_data.get("forced_value", None),
486 )
488 @classmethod
489 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
490 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
492 @classmethod
493 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
495 collection = get_signal_collection(signal_id)
496 if collection is None:
497 return None
499 # Same workaround as above function, very effective to narrow down big sets of data
500 bucket = get_signal_collection(f"system.buckets.{signal_id}")
501 last_bucket = None
502 if bucket is not None:
503 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
504 if last_bucket is not None:
505 sample_data = collection.find_one(
506 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
507 sort={"precise_timestamp": -1},
508 )
509 else:
510 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
512 if sample_data is None:
513 return None
515 timestamp = sample_data["precise_timestamp"]
517 if device is None:
518 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
519 if device is not None and device.last_ping is not None:
520 if timestamp is None:
521 timestamp = device.last_ping
522 else:
523 timestamp = max(timestamp, device.last_ping)
524 return cls(
525 signal_id=signal_id,
526 timestamp=timestamp,
527 value=sample_data.get("value", None),
528 forced_value=sample_data.get("forced_value", None),
529 )
531 @classmethod
532 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None:
533 collection = get_signal_collection(signal_id)
534 if collection is None:
535 return None
537 sample_data = collection.find_one(
538 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1}
539 )
540 if sample_data is None:
541 return None
543 return cls(
544 signal_id=signal_id,
545 timestamp=sample_data.get("precise_timestamp"),
546 value=sample_data.get("value"),
547 forced_value=sample_data.get("forced_value"),
548 )
550 @classmethod
551 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
552 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
553 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
555 @classmethod
556 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None:
557 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids]
560class SignalData(TwinPadModel):
561 signal_id: str
562 forcible: bool = True
563 time_vector: list[float]
564 values: list[float | int | str | None]
565 forced_values: list[float | int | str | None]
567 data_start: float | None = None
568 data_end: float | None = None
570 number_samples: int = 0
571 number_samples_db: int = 0
573 db_query_time: float = 0.0
574 init_time: float = 0.0
575 data_processing_time: float = 0.0
577 @classmethod
578 def get_from_signal_id(
579 cls,
580 signal_id: str,
581 min_timestamp: float = None,
582 max_timestamp: float = None,
583 window_min_timestamp: float = None,
584 window_max_timestamp: float = None,
585 interpolate_bounds: bool = True,
586 max_documents: int = None,
587 ) -> Self:
589 now = time.time()
591 req_signal = {}
592 if min_timestamp is not None:
593 req_signal.setdefault("timestamp", {})
594 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
595 if max_timestamp is not None:
596 req_signal.setdefault("timestamp", {})
597 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
599 collection = get_signal_collection(signal_id)
600 if collection is None:
601 return cls(
602 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
603 )
605 db_req_start = time.time()
607 sort_step = {"$sort": {"precise_timestamp": 1}}
608 number_results = collection.count_documents(req_signal)
610 pipeline = []
611 if req_signal:
612 pipeline.append({"$match": req_signal}) # Filter data if needed
614 pipeline.extend(
615 [
616 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
617 sort_step,
618 ]
619 )
621 if max_documents is not None and max_documents < number_results:
622 unsampling_ratio = math.ceil(number_results / max_documents)
623 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
624 pipeline.extend(
625 [
626 {
627 "$setWindowFields": {
628 "sortBy": {"precise_timestamp": 1},
629 "output": {"index": {"$documentNumber": {}}},
630 }
631 },
632 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
633 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
634 {"$replaceRoot": {"newRoot": "$doc"}},
635 {"$unset": ["index", "group_id"]},
636 {"$sort": {"precise_timestamp": 1}},
637 ]
638 )
640 # logger.info(f"pipeline: %s", str(pipeline))
641 cursor = collection.aggregate(pipeline)
642 db_req_time = time.time() - db_req_start
644 init_time = time.time()
646 results = cursor.to_list()
647 time_vector = []
648 values = []
649 forced_values = []
650 for s in results:
651 time_vector.append(s["precise_timestamp"])
652 values.append(s.get("value", None))
653 forced_values.append(s.get("forced_value", None))
655 signal = Signal.get_from_signal_id(signal_id)
656 class_ = signal.signal_data_class
658 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
659 time_vector, values, forced_values = cls.interpolate_bounds(
660 class_,
661 collection,
662 signal_id,
663 time_vector,
664 values,
665 forced_values,
666 window_min_timestamp,
667 window_max_timestamp,
668 )
670 if values:
671 # TODO: check below. a bit strange
672 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
673 # Adding last value as it should be repeated
674 time_vector.append(now)
675 values.append(values[-1])
676 forced_values.append(forced_values[-1])
678 init_time = time.time() - init_time
680 # See line 292 for explanation
681 bucket = get_signal_collection(f"system.buckets.{signal_id}")
682 first_bucket = None
683 if bucket is not None:
684 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
685 if first_bucket is not None:
686 data_start = first_bucket["control"]["min"]["precise_timestamp"]
687 else:
688 data_start = None
690 last_bucket = None
691 if bucket is not None:
692 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
693 if last_bucket is not None:
694 data_end = last_bucket["control"]["max"]["precise_timestamp"]
695 else:
696 data_end = None
698 return class_(
699 signal_id=signal_id,
700 forcible=signal.forcible,
701 time_vector=time_vector,
702 values=values,
703 forced_values=forced_values,
704 data_start=data_start,
705 data_end=data_end,
706 number_samples=len(values),
707 number_samples_db=number_results,
708 db_query_time=db_req_time,
709 init_time=init_time,
710 )
712 @staticmethod
713 def interpolate_bounds(
714 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
715 ):
716 sample_right = None
717 # Fetching right side value & interpolation
718 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
719 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
720 sample_right = collection.find_one(
721 {
722 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
723 "value": {"$exists": True},
724 },
725 sort={"precise_timestamp": -1},
726 )
727 if sample_right:
728 if time_vector:
729 right_sd = class_(
730 signal_id=signal_id,
731 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
732 values=[values[-1], sample_right.get("value", None)],
733 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
734 )
735 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
736 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
737 else:
738 max_ts_value = sample_right.get("value", None)
739 max_ts_forced_value = sample_right.get("forced_value", None)
740 time_vector.append(window_max_timestamp)
741 values.append(max_ts_value)
742 forced_values.append(max_ts_forced_value)
744 # Fetching left side value & interpolation
745 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
746 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
747 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
748 sample_left = sample_right
749 sample_left = collection.find_one(
750 {
751 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
752 "value": {"$exists": True},
753 },
754 sort={"precise_timestamp": -1},
755 )
757 if sample_left:
758 if time_vector:
759 left_sd = class_(
760 signal_id=signal_id,
761 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
762 values=[sample_left["value"], values[0]],
763 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
764 )
765 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
766 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
767 else:
768 min_ts_value = sample_left.get("value", None)
769 min_ts_forced_value = sample_left.get("forced_value", None)
770 time_vector.insert(0, window_min_timestamp)
771 values.insert(0, min_ts_value)
772 forced_values.insert(0, min_ts_forced_value)
774 return time_vector, values, forced_values
776 def interpolate_values(self, new_time_vector: list[float]):
777 return self.interpolate(new_time_vector, self.values)
779 def interpolate_forced_values(self, new_time_vector: list[float]):
780 return self.interpolate(new_time_vector, self.forced_values)
782 def uniform_desampling(self, number_samples_max: int) -> Self:
783 data_processing_time = time.time()
784 if number_samples_max and self.number_samples > number_samples_max:
785 new_time_vector = npy.linspace(
786 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
787 ).tolist()
788 values = self.interpolate_values(new_time_vector)
789 forced_values = self.interpolate_forced_values(new_time_vector)
790 time_vector = new_time_vector
791 number_samples = len(time_vector)
792 else:
793 time_vector = self.time_vector
794 number_samples = len(self.values)
795 values = self.values[:]
796 forced_values = self.forced_values[:]
797 data_processing_time = time.time() - data_processing_time
799 return self.__class__(
800 signal_id=self.signal_id,
801 time_vector=time_vector,
802 values=values,
803 forced_values=forced_values,
804 number_samples=number_samples,
805 number_samples_db=self.number_samples,
806 data_start=self.data_start,
807 data_end=self.data_end,
808 db_query_time=self.db_query_time,
809 init_time=self.init_time,
810 data_processing_time=self.data_processing_time + data_processing_time,
811 )
813 def interest_window_desampling(
814 self,
815 window_max_number_samples: int,
816 outside_max_number_samples: int,
817 window_min_timestamp: float | None = None,
818 window_max_timestamp: float | None = None,
819 ) -> Self:
820 """Performs a sampling in a window of interest and outside."""
822 if not self.time_vector:
823 return self
825 if window_min_timestamp is None:
826 window_min_timestamp = self.time_vector[0]
827 if window_max_timestamp is None:
828 window_max_timestamp = self.time_vector[-1]
830 data_processing_time = time.time()
832 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
833 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
835 time_vector_before = self.time_vector[:index_window_start]
836 time_vector_window = self.time_vector[index_window_start:index_window_end]
837 time_vector_after = self.time_vector[index_window_end:]
839 # Resampling window
840 if time_vector_window:
841 # Ensurring window bounds
842 if time_vector_window[0] != window_min_timestamp:
843 time_vector_window.insert(0, window_min_timestamp)
844 if time_vector_window[-1] != window_max_timestamp:
845 time_vector_window.append(window_max_timestamp)
846 else:
847 time_vector_window = [window_min_timestamp, window_max_timestamp]
849 if len(time_vector_window) > window_max_number_samples:
850 # Resampling
851 new_window_time_vector = npy.linspace(
852 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
853 ).tolist()
854 time_vector_window = new_window_time_vector
856 # Resampling outside
857 number_samples_before = len(time_vector_before)
858 number_samples_after = len(time_vector_after)
859 if (number_samples_before + number_samples_after) > outside_max_number_samples:
860 new_number_samples_before = min(
861 number_samples_before,
862 math.ceil(
863 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
864 ),
865 )
866 new_number_samples_after = min(
867 number_samples_after,
868 math.ceil(
869 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
870 ),
871 )
872 # Adjusting numbers as math.ceil can do +1 on sum
873 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
874 if new_number_samples_before > new_number_samples_after:
875 new_number_samples_before -= 1
876 else:
877 new_number_samples_after -= 1
879 if new_number_samples_before > 0:
880 new_time_vector_before = npy.linspace(
881 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
882 ).tolist()
883 time_vector_before = new_time_vector_before
885 if new_number_samples_after > 0:
886 new_time_vector_after = npy.linspace(
887 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
888 ).tolist()[::-1]
889 time_vector_after = new_time_vector_after
891 new_time_vector = time_vector_before + time_vector_window + time_vector_after
892 values = self.interpolate_values(new_time_vector)
893 forced_values = self.interpolate_forced_values(new_time_vector)
894 number_samples = len(values)
896 data_processing_time = time.time() - data_processing_time
898 return self.__class__(
899 signal_id=self.signal_id,
900 forcible=self.forcible,
901 time_vector=new_time_vector,
902 values=values,
903 forced_values=forced_values,
904 number_samples=number_samples,
905 number_samples_db=self.number_samples,
906 data_start=self.data_start,
907 data_end=self.data_end,
908 db_query_time=self.db_query_time,
909 init_time=self.init_time,
910 data_processing_time=self.data_processing_time + data_processing_time,
911 )
913 def csv_export(self):
914 output = io.StringIO()
915 writer = csv.writer(output)
916 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
917 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
918 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
919 return output.getvalue().encode("utf-8")
921 def prestoplot_export(self):
922 clean_signal_id = self.signal_id.replace(".", "_")
923 if clean_signal_id[0].isnumeric():
924 clean_signal_id = "_" + clean_signal_id
926 output = io.StringIO()
927 output.write("# Encoding:\tUTF-8\n")
928 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
929 output.write("ISO8601\tnone\tnone\n")
930 output.write(f"# Description :\t{clean_signal_id}\n")
932 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
933 output.write(
934 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
935 )
936 return output.getvalue().encode("utf-8")
939class NumericSignalData(SignalData):
940 data_type: str = "float"
941 values: list[float | int | None]
942 forced_values: list[float | int | None]
944 def interpolate(self, new_time_vector: list[float], items):
945 items = [npy.nan if s is None else s for s in items]
946 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
948 def uniform_desampling(self, number_samples_max: int) -> Self:
949 data_processing_time = time.time()
950 if number_samples_max and self.number_samples > number_samples_max:
951 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
952 forced_values = self.interpolate_forced_values(time_vector)
953 number_samples = len(time_vector)
954 else:
955 time_vector = self.time_vector
956 number_samples = len(self.values)
957 values = self.values[:]
958 forced_values = self.forced_values[:]
959 data_processing_time = time.time() - data_processing_time
961 return self.__class__(
962 signal_id=self.signal_id,
963 time_vector=time_vector,
964 values=values,
965 forced_values=forced_values,
966 number_samples=number_samples,
967 number_samples_db=self.number_samples,
968 data_start=self.data_start,
969 data_end=self.data_end,
970 db_query_time=self.db_query_time,
971 init_time=self.init_time,
972 data_processing_time=self.data_processing_time + data_processing_time,
973 )
975 def interest_window_desampling(
976 self,
977 window_max_number_samples: int,
978 outside_max_number_samples: int,
979 window_min_timestamp: float | None = None,
980 window_max_timestamp: float | None = None,
981 ) -> Self:
982 """Performs a sampling in a window of interest and outside."""
984 if not self.time_vector:
985 return self
987 if window_min_timestamp is None:
988 window_min_timestamp = self.time_vector[0]
989 if window_max_timestamp is None:
990 window_max_timestamp = self.time_vector[-1]
992 data_processing_time = time.time()
994 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
995 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
997 time_vector_before = self.time_vector[:index_window_start]
998 time_vector_window = self.time_vector[index_window_start:index_window_end]
999 time_vector_after = self.time_vector[index_window_end:]
1001 values_before = self.values[:index_window_start]
1002 values_window = self.values[index_window_start:index_window_end]
1003 values_after = self.values[index_window_end:]
1004 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1005 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1007 # Resampling window
1008 if time_vector_window:
1009 # Ensurring window bounds
1010 if time_vector_window[0] != window_min_timestamp:
1011 time_vector_window.insert(0, window_min_timestamp)
1012 values_window.insert(0, window_min_value)
1013 if time_vector_window[-1] != window_max_timestamp:
1014 time_vector_window.append(window_max_timestamp)
1015 values_window.append(window_max_value)
1016 else:
1017 time_vector_window = [window_min_timestamp, window_max_timestamp]
1018 values_window = [window_min_value, window_max_value]
1020 if len(time_vector_window) > window_max_number_samples:
1021 # Resampling
1022 time_vector_window, values_window = downsample_list(
1023 time_vector_window, values_window, window_max_number_samples
1024 )
1026 # Resampling outside
1027 number_samples_before = len(time_vector_before)
1028 number_samples_after = len(time_vector_after)
1029 if (number_samples_before + number_samples_after) > outside_max_number_samples:
1030 new_number_samples_before = min(
1031 number_samples_before,
1032 math.ceil(
1033 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1034 ),
1035 )
1036 new_number_samples_after = min(
1037 number_samples_after,
1038 math.ceil(
1039 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1040 ),
1041 )
1042 # Adjusting numbers as math.ceil can do +1 on sum
1043 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1044 if new_number_samples_before > new_number_samples_after:
1045 new_number_samples_before -= 1
1046 else:
1047 new_number_samples_after -= 1
1049 if new_number_samples_before > 0:
1050 time_vector_before, values_before = downsample_list(
1051 time_vector_before, values_before, new_number_samples_before
1052 )
1054 if new_number_samples_after > 0:
1055 time_vector_after, values_after = downsample_list(
1056 time_vector_after, values_after, new_number_samples_after
1057 )
1059 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1060 values = values_before + values_window + values_after
1061 forced_values = self.interpolate_forced_values(new_time_vector)
1062 number_samples = len(values)
1064 data_processing_time = time.time() - data_processing_time
1066 return self.__class__(
1067 signal_id=self.signal_id,
1068 time_vector=new_time_vector,
1069 values=values,
1070 forced_values=forced_values,
1071 number_samples=number_samples,
1072 number_samples_db=self.number_samples,
1073 data_start=self.data_start,
1074 data_end=self.data_end,
1075 db_query_time=self.db_query_time,
1076 init_time=self.init_time,
1077 data_processing_time=self.data_processing_time + data_processing_time,
1078 )
1081class StringSignalData(SignalData):
1082 data_type: str = "str"
1083 values: list[str | None]
1084 forced_values: list[str | None]
1086 def interpolate(self, new_time_vector: list[float], items):
1087 # Find the indices of the values in xp that are just smaller or equal to x
1088 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1089 indices = npy.clip(indices, 0, len(items) - 1)
1090 # Return the corresponding left string values from fp
1091 return [items[i] for i in indices]
1094class SignalsData(TwinPadModel):
1095 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1096 data_processing_time: float
1097 data_start: float | None
1098 data_end: float | None
1100 @classmethod
1101 def get_from_signal_ids(
1102 cls,
1103 signal_ids: list[str],
1104 min_timestamp: float = None,
1105 max_timestamp: float = None,
1106 window_min_timestamp: float = None,
1107 window_max_timestamp: float = None,
1108 interpolate_bounds: bool = True,
1109 max_documents: int = None,
1110 ) -> Self:
1111 signals_data = []
1112 data_start = None
1113 data_end = None
1114 if max_timestamp is None:
1115 max_timestamp = time.time()
1116 data_processing_time = 0.0
1117 for signal_id in signal_ids:
1118 signal_data = SignalData.get_from_signal_id(
1119 signal_id=signal_id,
1120 min_timestamp=min_timestamp,
1121 max_timestamp=max_timestamp,
1122 window_min_timestamp=window_min_timestamp,
1123 window_max_timestamp=window_max_timestamp,
1124 interpolate_bounds=interpolate_bounds,
1125 max_documents=max_documents,
1126 )
1127 data_processing_time += signal_data.data_processing_time
1128 signals_data.append(signal_data)
1129 if signal_data.data_start is not None:
1130 if data_start is None:
1131 data_start = signal_data.data_start
1132 else:
1133 data_start = min(signal_data.data_start, data_start)
1134 if signal_data.data_end is not None:
1135 if data_end is None:
1136 data_end = signal_data.data_end
1137 else:
1138 data_end = max(signal_data.data_end, data_end)
1140 return cls(
1141 signals_data=signals_data,
1142 data_processing_time=data_processing_time,
1143 data_start=data_start,
1144 data_end=data_end,
1145 )
1147 def uniform_desampling(self, number_samples_max: int) -> Self:
1148 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1149 return SignalsData(
1150 signals_data=signals_data,
1151 data_processing_time=sum(s.data_processing_time for s in signals_data),
1152 data_start=self.data_start,
1153 data_end=self.data_end,
1154 )
1156 def interest_window_desampling(
1157 self,
1158 window_max_number_samples: int,
1159 outside_max_number_samples: int,
1160 window_min_timestamp: float = None,
1161 window_max_timestamp: float = None,
1162 ) -> Self:
1163 signals_data = [
1164 s.interest_window_desampling(
1165 window_max_number_samples=window_max_number_samples,
1166 outside_max_number_samples=outside_max_number_samples,
1167 window_min_timestamp=window_min_timestamp,
1168 window_max_timestamp=window_max_timestamp,
1169 )
1170 for s in self.signals_data
1171 ]
1173 return SignalsData(
1174 signals_data=signals_data,
1175 data_processing_time=sum(s.data_processing_time for s in signals_data),
1176 data_start=self.data_start,
1177 data_end=self.data_end,
1178 )
1180 def zip_export(self, file_format: str = "csv"):
1181 # return self.signals_data[0].csv_export()
1182 zip_buffer = io.BytesIO()
1183 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1184 for signal_data in self.signals_data:
1185 if file_format == "csv":
1186 export_io = signal_data.csv_export()
1187 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1188 elif file_format == "prestoplot":
1189 export_io = signal_data.prestoplot_export()
1190 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1191 else:
1192 raise ValueError(f"Format not found. Got: {file_format}")
1193 zip_bytes = zip_buffer.getvalue()
1194 # zip_bytes.seek(0)
1195 return zip_bytes
1197 def hdf5_export(self):
1198 hdf5_buffer = io.BytesIO()
1199 custom_type_float = npy.dtype(
1200 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1201 )
1202 custom_type_string = npy.dtype(
1203 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1204 )
1205 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1206 for signal_data in self.signals_data:
1207 signal_group = hdf5_file.create_group(signal_data.signal_id)
1208 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1209 if signal_data.data_type == "str":
1210 export_data = npy.array(
1211 list(
1212 zip(
1213 date_vector,
1214 signal_data.time_vector,
1215 signal_data.values,
1216 signal_data.forced_values,
1217 )
1218 ),
1219 dtype=custom_type_string,
1220 )
1221 else:
1222 export_data = npy.array(
1223 list(
1224 zip(
1225 date_vector,
1226 signal_data.time_vector,
1227 signal_data.values,
1228 signal_data.forced_values,
1229 )
1230 ),
1231 dtype=custom_type_float,
1232 )
1233 signal_group["data"] = export_data
1234 return hdf5_buffer.getvalue()
1237class SignalStatus(TwinPadModel):
1238 status: str
1239 reason: str
1240 delay: float | None
1243class DigitizationFunction(TwinPadModel):
1244 bits: int | None = None
1245 min_value: float
1246 max_value: float
1247 min_raw_value: float
1248 max_raw_value: float
1251class SignalUpdate(TwinPadModel):
1252 value: float | str | bool | int | None = None
1253 forced_value: float | str | bool | int | None = None
1254 timestamp: int | None = None
1257class SignalType(str, Enum):
1258 command = "command"
1259 sensor = "sensor"
1260 external_sensor = "external_sensor"
1263SIGNALDATA_TYPES = {
1264 "int": NumericSignalData,
1265 "float": NumericSignalData,
1266 "str": StringSignalData,
1267 "bool": NumericSignalData,
1268 "epoch": NumericSignalData,
1269}
1272class Signal(GenericMongo):
1273 collection_name: ClassVar[str] = "signals"
1275 signal_id: str
1276 frequency: float
1277 unit: str | None
1278 description: str
1279 type: SignalType
1280 data_type: str
1281 precision_digits: int | None
1282 forcible: bool
1284 digitization_function: DigitizationFunction | None
1286 @property
1287 def device(self) -> Device:
1288 device_id = self.signal_id.split(".")[0]
1289 device = Device.get_one_by_attribute("device_id", device_id)
1290 return device
1292 @cached_property
1293 def signal_data_class(self):
1294 if self.data_type in SIGNALDATA_TYPES:
1295 return SIGNALDATA_TYPES[self.data_type]
1296 if self.data_type.startswith("enum"):
1297 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1298 raise ValueError(f"Unhandled python type: {self.data_type}")
1300 @cached_property
1301 def python_type(self):
1302 if self.data_type in TYPES:
1303 return TYPES[self.data_type]
1304 if self.data_type.startswith("enum"):
1305 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1306 return Literal[*choices]
1307 raise ValueError(f"Unhandled python type: {self.data_type}")
1309 @computed_field
1310 @property
1311 def status(self) -> SignalStatus:
1312 now = time.time()
1313 status = "up"
1314 reason = ""
1316 # See line 285 for explanation
1317 bucket = get_signal_collection(f"system.buckets.{self.signal_id}")
1318 last_bucket = None
1319 if bucket is not None:
1320 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1321 if last_bucket is None:
1322 status = "no data"
1323 reason = "signal does not exist"
1324 return SignalStatus(status=status, reason=reason, delay=None)
1326 try:
1327 last_date = last_bucket["control"]["max"]["timestamp"]
1328 last_date = last_date.replace(tzinfo=pytz.UTC)
1329 last_value_ts = last_date.timestamp()
1330 except IndexError:
1331 last_value_ts = None
1333 if last_value_ts is None:
1334 delay = None
1335 reason = "No data from signal"
1336 else:
1337 # Since device is a computed property, only compute it once
1338 device = self.device
1339 if device is not None and device.last_ping is not None:
1340 last_value_ts = max(last_value_ts, device.last_ping)
1341 delay = now - last_value_ts
1342 if delay > DEVICE_TIMEOUT:
1343 status = "down"
1344 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1345 return SignalStatus(status=status, reason=reason, delay=delay)
1347 @classmethod
1348 def status_batch(cls, signal_ids: list[str], devices_by_ids: dict[str, Device]) -> dict[str, SignalStatus]:
1349 """Computes the status of multiple signals in batch from their signal ids.
1351 :param signal_ids: Signal IDs of the wanted signals
1352 :type signal_ids: list[str]
1353 :param devices_by_ids: A pre-computed map of all signal IDs linked to their :py:class:`Device`.
1354 :type devices_by_ids: dict[str, Device]
1355 :return: A dictionary with the signal ID as the keys and their respective :py:class:`SignalStatus` as its values.
1356 :rtype: dict[str, SignalStatus]
1357 """
1358 statuses_by_signal_id = {}
1360 buckets = get_signal_collections_batch([f"system.buckets.{signal_id}" for signal_id in signal_ids])
1361 for signal_id, bucket in zip(signal_ids, buckets):
1362 now = time.time()
1363 status = "up"
1364 reason = ""
1365 last_bucket = None
1366 if bucket is not None:
1367 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1368 if last_bucket is None:
1369 status = "no data"
1370 reason = "signal does not exist"
1371 statuses_by_signal_id[signal_id] = SignalStatus(status=status, reason=reason, delay=None)
1372 continue
1374 try:
1375 last_date = last_bucket["control"]["max"]["timestamp"]
1376 last_date = last_date.replace(tzinfo=pytz.UTC)
1377 last_value_ts = last_date.timestamp()
1378 except IndexError:
1379 last_value_ts = None
1381 if last_value_ts is None:
1382 delay = None
1383 reason = "No data from signal"
1384 else:
1385 # Since device is a computed property, only compute it once
1386 device = devices_by_ids.get(signal_id.split(".")[0], None)
1387 delay = now - last_value_ts
1388 if device is not None and device.status == "up":
1389 delay = 0
1390 if delay > DEVICE_TIMEOUT:
1391 status = "down"
1392 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1393 statuses_by_signal_id[signal_id] = SignalStatus(status=status, reason=reason, delay=delay)
1395 return statuses_by_signal_id
1397 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1398 command = Command(
1399 sent_at=time.time(),
1400 command_type="Signal command",
1401 user_id=current_user.id,
1402 )
1404 has_input_error = False
1405 error_message = ""
1407 if self.data_type.startswith("enum"):
1408 enum_options = get_args(self.python_type)
1410 if update_dict.value is not None and update_dict.value not in enum_options:
1411 has_input_error = True
1412 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1413 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1414 has_input_error = True
1415 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1416 else:
1417 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1418 has_input_error = True
1419 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1420 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1421 has_input_error = True
1422 error_message += (
1423 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1424 )
1426 if has_input_error:
1427 command.response_time = 0
1428 command.succeeded = False
1429 command.description = f"Tried to modify signal {self.signal_id}"
1430 response = {"error": True, "status_code": 400, "message": error_message}
1431 else:
1432 response = await send_signal_value(self.signal_id, update_dict)
1433 command.receive_response(response)
1435 Command.create(command)
1436 return response
1438 @classmethod
1439 def get_from_signal_id(cls, signal_id) -> Self:
1440 """Could be generic from mongo"""
1441 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1442 if not raw_value:
1443 return None
1444 del raw_value["_id"]
1445 return cls.dict_to_object(raw_value)
1447 @classmethod
1448 def get_all_ids(cls) -> list[str]:
1449 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1451 return [signal["signal_id"] for signal in cursor]
1453 async def number_samples(self):
1454 collection = get_signal_collection(signal_id=self.signal_id)
1455 if collection is None:
1456 return 0
1458 number_samples = collection.estimated_document_count()
1460 number_samples_async_collection = await get_async_collection(
1461 systems_async_database, "number_samples", create=True, time_series=True
1462 )
1464 loop = asyncio.get_running_loop()
1465 loop.create_task(
1466 number_samples_async_collection.insert_one(
1467 {
1468 "timestamp": datetime.datetime.now(pytz.UTC),
1469 "signal_id": self.signal_id,
1470 "number_samples": number_samples,
1471 }
1472 )
1473 )
1475 return number_samples
1477 @classmethod
1478 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]:
1479 number_samples_by_id = {}
1480 collections = get_signal_collections_batch(signal_ids)
1481 number_samples_async_collection = await get_async_collection(
1482 systems_async_database, "number_samples", create=True, time_series=True
1483 )
1485 for signal_id, collection in zip(signal_ids, collections):
1486 if collection is None:
1487 number_samples_by_id[signal_id] = 0
1488 continue
1490 number_samples = collection.estimated_document_count()
1492 number_samples_by_id[signal_id] = number_samples
1494 now = datetime.datetime.now(pytz.UTC)
1495 loop = asyncio.get_running_loop()
1496 loop.create_task(
1497 number_samples_async_collection.insert_many(
1498 [
1499 {
1500 "timestamp": now,
1501 "signal_id": signal_id,
1502 "number_samples": number_samples,
1503 }
1504 for signal_id, number_samples in number_samples_by_id.items()
1505 ]
1506 )
1507 )
1509 return number_samples_by_id
1511 def sample_datasize(self):
1512 return signals_database.command("collstats", self.signal_id)["size"]
1514 @classmethod
1515 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1516 result = cls.collection().aggregate(
1517 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1518 )
1520 return {signal["signal_id"]: signal["forcible"] for signal in result}
1523class ServicesStatus(TwinPadModel):
1524 backend: str
1525 cloud_broker: str
1526 time_series_database: str
1527 signal_storage: str
1528 heartbeat_storage: str
1529 data_analyzer: str
1531 @classmethod
1532 def check(cls) -> Self:
1533 return cls(
1534 cloud_broker=ping(RABBITMQ_HOST),
1535 backend="up",
1536 time_series_database=ping(MONGO_HOST),
1537 signal_storage=ping(SIGNAL_STORAGE_HOST),
1538 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1539 data_analyzer=ping(DATA_ANALYZER_HOST),
1540 )
1543def ping(host):
1544 try:
1545 if ping3.ping(host, timeout=0.8):
1546 return "up"
1547 except PermissionError:
1548 pass
1549 return "down"
1552class Event(GenericMongo):
1553 collection_name: ClassVar[str] = "events"
1555 name: str
1556 timestamp: float
1557 event_rule_id: str
1559 @computed_field
1560 @cached_property
1561 def event_rule(self) -> "EventRule":
1562 return EventRule.get_from_id(self.event_rule_id)
1564 @classmethod
1565 def dict_to_object(cls, dict_):
1566 """Refine to convert timestamp to datetime for mongodb."""
1567 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1568 return super().dict_to_object(dict_)
1571class TwinPadActivity(GenericMongo):
1572 timestamp: float
1573 amount: int
1575 @classmethod
1576 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
1577 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1578 number_events_collection = get_collection(systems_database, "number_events")
1579 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1580 items = []
1581 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1582 if number_events_collection is None or recompute_amount:
1583 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1584 number_events_collection.delete_many({})
1585 first_event = events_collection.find_one(sort={"timestamp": 1})
1586 if first_event is None:
1587 return items
1588 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1589 tzinfo=pytz.UTC
1590 )
1591 while last_computed_day < TODAY:
1592 day_nb_events = events_collection.count_documents(
1593 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1594 )
1595 if day_nb_events > 0:
1596 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
1597 last_computed_day += ONE_DAY_OFFSET
1598 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1599 if number_events_today > 0:
1600 number_events_collection.delete_many({"timestamp": TODAY})
1601 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
1602 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1603 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1604 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1605 for day in number_events:
1606 day["timestamp"] = day["timestamp"].timestamp()
1607 items.append(cls.mongo_dict_to_object(day))
1608 return items
1610 @classmethod
1611 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1612 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1613 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1614 signals_number_samples_collection = get_collection(
1615 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
1616 )
1617 items = []
1618 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1619 if number_samples_collection is None or recompute_amount:
1620 number_samples_collection = get_collection(
1621 systems_database, "number_received_samples", create=True, time_series=True
1622 )
1623 number_samples_collection.delete_many({})
1624 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
1625 if first_sample is None:
1626 return items
1627 # compute from day of first found event
1628 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
1629 tzinfo=pytz.UTC
1630 )
1631 while last_computed_day < TODAY:
1632 number_samples_request = signals_number_samples_collection.aggregate(
1633 [
1634 {
1635 "$match": {
1636 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
1637 }
1638 },
1639 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1640 ]
1641 ).to_list()
1642 if len(number_samples_request) == 0:
1643 number_samples = 0
1644 else:
1645 number_samples = number_samples_request[0].get("number_samples", 0)
1646 if number_samples > 0:
1647 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
1648 last_computed_day += ONE_DAY_OFFSET
1649 number_samples_request = signals_number_samples_collection.aggregate(
1650 [
1651 {"$match": {"timestamp": {"$gte": TODAY}}},
1652 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1653 ]
1654 ).to_list()
1655 if len(number_samples_request) == 0:
1656 number_samples_today = 0
1657 else:
1658 number_samples_today = number_samples_request[0].get("number_samples", 0)
1659 if number_samples_today > 0:
1660 number_samples_collection.delete_many({"timestamp": TODAY})
1661 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
1662 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1663 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1664 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1665 for day in number_events:
1666 day["timestamp"] = day["timestamp"].timestamp()
1667 items.append(cls.mongo_dict_to_object(day))
1668 return items
1670 @classmethod
1671 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1672 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1673 number_commands_collection = get_collection(systems_database, "number_commands")
1674 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
1675 items = []
1676 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1677 if number_commands_collection is None or recompute_amount:
1678 number_commands_collection = get_collection(
1679 systems_database, "number_commands", create=True, time_series=True
1680 )
1681 number_commands_collection.delete_many({})
1682 first_command = commands_collection.find_one(sort={"timestamp": 1})
1683 if first_command is None:
1684 return items
1685 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
1686 tzinfo=pytz.UTC
1687 )
1688 while last_computed_day < TODAY:
1689 day_nb_commands = commands_collection.count_documents(
1690 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1691 )
1692 if day_nb_commands > 0:
1693 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
1694 last_computed_day += ONE_DAY_OFFSET
1695 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
1696 if number_commands_today > 0:
1697 number_commands_collection.delete_many({"timestamp": TODAY})
1698 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
1699 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1700 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1701 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1702 for day in number_commands:
1703 day["timestamp"] = day["timestamp"].timestamp()
1704 items.append(cls.mongo_dict_to_object(day))
1705 return items
1708class EventRule(GenericMongo):
1709 collection_name: ClassVar[str] = "event_rules"
1711 name: str
1712 formula: str
1713 variables: list[str]
1715 @computed_field
1716 @cached_property
1717 def number_events(self) -> int:
1718 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1721class Company(GenericMongo):
1722 collection_name: ClassVar[str] = "companies"
1723 name: str
1726class Campaign(GenericMongo):
1727 collection_name: ClassVar[str] = "campaigns"
1729 # Properties
1730 id: str | None = None
1731 name: str
1732 description: str | None = None
1734 @classmethod
1735 def create(cls, campaign: Self):
1736 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1737 if new_campaign is None:
1738 return None
1739 return {"campaign_id": str(new_campaign.inserted_id)}
1741 @classmethod
1742 def update(cls, campaign: Self):
1743 updated_campaign = cls.collection().find_one_and_update(
1744 {"_id": ObjectId(campaign.id)},
1745 {"$set": {"name": campaign.name, "description": campaign.description}},
1746 return_document=ReturnDocument.AFTER,
1747 )
1748 return updated_campaign
1750 @classmethod
1751 def delete(cls, campaign_id):
1752 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1753 return deleted_user
1756class Phase(GenericMongo):
1757 collection_name: ClassVar[str] = "phases"
1759 # Properties
1760 id: str | None = None
1761 name: str
1762 description: str | None = None
1763 start_at: float
1764 end_at: float
1766 # FK
1767 campaign_id: str
1769 # @classmethod
1770 # def get_by_date(cls, datetime: float):
1771 # phases = []
1772 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1773 # phases.append(cls.dict_to_object(dict_).model_dump())
1774 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1775 # phases.append(cls.dict_to_object(dict_).model_dump())
1776 # if phases is None:
1777 # return None
1778 # return phases
1780 @classmethod
1781 def create(cls, phase: Self):
1782 phase = Phase(
1783 name=phase.name,
1784 description=phase.description,
1785 start_at=phase.start_at,
1786 end_at=phase.end_at,
1787 campaign_id=phase.campaign_id,
1788 )
1789 phase_collection = get_collection(systems_database, "phases", create=True)
1790 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1791 if new_phase is None:
1792 return None
1793 return {"phase_id": str(new_phase.inserted_id)}
1795 @classmethod
1796 def update(cls, phase: Self):
1797 updated_phase = cls.collection().find_one_and_update(
1798 {"_id": ObjectId(phase.id)},
1799 {
1800 "$set": {
1801 "name": phase.name,
1802 "description": phase.description,
1803 "start_at": phase.start_at,
1804 "end_at": phase.end_at,
1805 }
1806 },
1807 return_document=ReturnDocument.AFTER,
1808 )
1809 return updated_phase
1811 @classmethod
1812 def delete(cls, phase_id):
1813 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1814 return delete_phase
1816 @classmethod
1817 def deleteMany(cls, campaign_id):
1818 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1819 return delete_phases
1822class CustomViewCreation(GenericMongo):
1823 collection_name: ClassVar[str] = "custom_views"
1825 name: str
1826 configuration: list
1829class CustomView(CustomViewCreation):
1830 # Properties
1831 id: str | None = None
1833 # Foreign Key
1834 user_id: str
1836 # # Methods
1837 # @classmethod
1838 # def create(cls, form_custom_view: Self, user_id) -> list:
1839 # custom_view = CustomView(
1840 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1841 # )
1842 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1843 # return new_custom_view
1845 # @classmethod
1846 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1847 # updated_custom_view = cls.collection().find_one_and_update(
1848 # {"_id": ObjectId(custom_view_id)},
1849 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1850 # return_document=ReturnDocument.AFTER,
1851 # )
1852 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1853 # del updated_custom_view["_id"]
1854 # return cls(**updated_custom_view)
1856 # @classmethod
1857 # def delete(cls, custom_view_id) -> bool:
1858 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1859 # return deleted_custom_view.acknowledged
1862CustomViewUpdate = create_update_model(CustomView)
1865class Video(GenericMongo):
1866 collection_name: ClassVar[str] = "videos"
1868 # Properties
1869 name: str
1870 ip_addr: str
1871 username: str | None = None
1872 password: str | None = None
1874 # Methods
1875 @classmethod
1876 def get_all(cls, sort_by="_id") -> list[Self]:
1877 items = []
1878 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1879 items.append(cls.mongo_dict_to_object(dict_))
1880 return items
1882 @classmethod
1883 def get_video(cls, camera_id: ObjectId):
1884 camera = cls.get_from_id(camera_id)
1885 if camera is not None:
1886 return camera.name
1887 return None
1890class Command(GenericMongo):
1891 collection_name: ClassVar[str] = "commands"
1893 # Properties
1894 timestamp: datetime.datetime = None
1895 sent_at: float
1896 response_time: float = 0.0
1897 command_type: str
1898 description: str = ""
1899 succeeded: bool = False
1901 # Foreign key
1902 user_id: str
1904 @classmethod
1905 def collection(cls):
1906 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
1908 @classmethod
1909 def create(cls, command: Self):
1910 command = cls(
1911 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
1912 sent_at=command.sent_at,
1913 response_time=command.response_time,
1914 command_type=command.command_type,
1915 description=command.description,
1916 succeeded=command.succeeded,
1917 user_id=command.user_id,
1918 )
1919 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
1920 if new_command is None:
1921 return None
1922 return {"command_id": str(new_command.inserted_id)}
1924 def receive_response(self, response: dict):
1925 self.response_time = time.time() - self.sent_at
1926 self.succeeded = response.get("error", True) is False
1927 if self.description == "":
1928 self.description += response.get("message", "").rstrip()
1931class SignalsPresetCreation(GenericMongo):
1932 name: str
1933 signal_ids: list[str]
1936class SignalsPreset(SignalsPresetCreation):
1937 collection_name: ClassVar[str] = "signals_presets"
1939 user_id: str
1941 @classmethod
1942 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
1943 signals_preset = cls(
1944 user_id=user_id,
1945 name=signals_preset.name,
1946 signal_ids=signals_preset.signal_ids,
1947 )
1949 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
1951 return str(new_signal_preset.inserted_id)
1954SignalsPresetUpdate = create_update_model(SignalsPreset)
1957class LineStyle(str, Enum):
1958 solid = "solid"
1959 dotted = "dotted"
1960 dashed = "dashed"
1963class SignalAppearance:
1964 value_color: str
1965 forced_value_color: str
1968class GraphThemeCreation(GenericMongo):
1969 collection_name: ClassVar[str] = "graph_themes"
1971 name: str
1972 signal_id: str
1973 value_color: str = ""
1974 forced_value_color: str = ""
1975 value_line_style: LineStyle = LineStyle.solid
1976 forced_value_line_style: LineStyle = LineStyle.solid
1977 private: bool = True
1980class PublicGraphTheme(GraphThemeCreation):
1981 created_by_user: bool
1982 in_user_library: bool
1983 active_for_user: bool
1985 _current_user_id: str = ""
1987 @classproperty
1988 def custom_pipeline_steps(cls) -> dict[str, list]:
1989 return {
1990 "created_by_user": [
1991 {
1992 "$addFields": {
1993 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
1994 }
1995 }
1996 ],
1997 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
1998 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
1999 ],
2000 "in_user_library": [
2001 {
2002 "$addFields": {
2003 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2004 }
2005 }
2006 ],
2007 "active_for_user": [
2008 {
2009 "$addFields": {
2010 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2011 }
2012 }
2013 ],
2014 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2015 "active": [
2016 {
2017 "$addFields": {
2018 "active": "$$REMOVE",
2019 }
2020 }
2021 ],
2022 "creator_id": [
2023 {
2024 "$addFields": {
2025 "creator_id": "$$REMOVE",
2026 }
2027 }
2028 ],
2029 }
2031 @classmethod
2032 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2033 cls._current_user_id = user_id
2034 return super().response_from_query(query)
2036 @classmethod
2037 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2038 query.in_user_library = "true"
2039 return cls.response_from_query(query, user_id)
2041 @classmethod
2042 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2043 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2045 @classmethod
2046 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2047 cls._current_user_id = user_id
2048 return super().get_by_attribute(attribute_name, attribute_value)
2050 @classmethod
2051 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2052 cls._current_user_id = user_id
2053 return super().get_one_by_attribute(attribute_name, attribute_value)
2055 @classmethod
2056 def get_all(cls, sort_by: str, user_id: str):
2057 cls._current_user_id = user_id
2058 return super().get_all(sort_by)
2060 @classmethod
2061 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2062 pipeline = [
2063 {
2064 "$match": {
2065 "active": {"$eq": user_id},
2066 "signal_id": {"$in": signal_ids},
2067 }
2068 },
2069 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2070 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2071 {
2072 "$project": {
2073 "_id": 0,
2074 "signal_id": 1,
2075 "value_color": 1,
2076 "forced_value_color": 1,
2077 "value_line_style": 1,
2078 "forced_value_line_style": 1,
2079 }
2080 },
2081 ]
2083 result = {}
2085 cursor = cls.collection().aggregate(pipeline)
2086 for document in cursor:
2087 signal_id = document["signal_id"]
2088 del document["signal_id"]
2089 result[signal_id] = document
2091 return result
2094GraphThemeUpdate = create_update_model(PublicGraphTheme)
2097class PrivateGraphTheme(GraphThemeCreation):
2098 # private
2099 creator_id: str
2100 in_library: list[str]
2101 active: list[str]
2103 @classmethod
2104 def create(
2105 cls,
2106 creator_id: str,
2107 name: str,
2108 signal_id: str,
2109 value_color: str,
2110 forced_value_color: str,
2111 value_line_style: LineStyle,
2112 forced_value_line_style: LineStyle,
2113 private: bool,
2114 ):
2115 color_setting = cls(
2116 creator_id=creator_id,
2117 name=name,
2118 signal_id=signal_id,
2119 value_color=value_color,
2120 forced_value_color=forced_value_color,
2121 value_line_style=value_line_style,
2122 forced_value_line_style=forced_value_line_style,
2123 private=private,
2124 in_library=[creator_id],
2125 active=[creator_id],
2126 )
2128 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2129 color_setting.id = str(new_color_setting.inserted_id)
2130 return color_setting
2132 def update(self, update_dict: dict, user_id: str):
2133 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2134 if in_user_lib and user_id not in self.in_library:
2135 self.in_library.append(user_id)
2136 elif not in_user_lib and user_id in self.in_library:
2137 self.in_library.remove(user_id)
2138 update_dict["in_library"] = self.in_library
2139 del update_dict["in_user_library"]
2141 if (active_for_user := update_dict.get("active_for_user")) is not None:
2142 if active_for_user and user_id not in self.active:
2143 self.active.append(user_id)
2144 elif not active_for_user and user_id in self.active:
2145 self.active.remove(user_id)
2146 update_dict["active"] = self.active
2147 del update_dict["active_for_user"]
2149 if update_dict.get("created_by_user") is not None:
2150 del update_dict["created_by_user"]
2152 self.collection().find_one_and_update(
2153 {"_id": ObjectId(self.id)},
2154 {"$set": update_dict},
2155 )
2157 return {}