Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%
1161 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 15:13 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 15:13 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, get_args
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pydantic import BaseModel, computed_field, Field, create_model
21import numpy as npy
22import lttb
23import h5py
25# from scipy import signal as signal_scipy
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 systems_database,
32 systems_async_database,
33 signals_database,
34 devices_states_database,
35)
36from twinpad_backend.responses import ListResponse
37from twinpad_backend.messages import send_mode_change, send_signal_value
39TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
42RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
43MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
44SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
45HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
46DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
48DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
49NUMBER_SAMPLES_DATABASE_UPDATE = 120
51logger = logging.getLogger("uvicorn.error")
54class classproperty:
55 """
56 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
57 Found here: https://stackoverflow.com/a/76301341
58 """
60 def __init__(self, func):
61 self.fget = func
63 def __get__(self, _, owner):
64 return self.fget(owner)
67def create_update_model(model):
68 fields = {}
70 for field_name, field_annotation in model.model_fields.items():
71 if field_name != "id":
72 fields[field_name] = (field_annotation.annotation | None, None)
74 query_name = model.__name__ + "Update"
75 return create_model(query_name, **fields)
78def get_utc_date_from_timestamp(timestamp: float):
79 return datetime.datetime.fromtimestamp(timestamp).isoformat()
82def downsample_list(time_vector: list, values: list, max_number_samples: int):
83 if len(time_vector) < max_number_samples:
84 return time_vector, values
86 time_vector_copy = copy.deepcopy(time_vector)
87 values_copy = copy.deepcopy(values)
89 none_group_bounds = []
90 none_group_index = -1
91 index = -1
92 # LTTB doesn't handle None values so remove them
93 while values_copy.count(None) > 0:
94 # Store bounds of None value groups so we can insert them back after the downsampling
95 if (new_index := values_copy.index(None)) != index:
96 none_group_bounds.append([time_vector_copy.pop(new_index)])
97 none_group_index += 1
98 elif len(none_group_bounds[none_group_index]) < 2:
99 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
100 else:
101 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
102 values_copy.pop(new_index)
103 index = new_index
104 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
106 try:
107 values_array = npy.array([time_vector_copy, values_copy]).T
108 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
110 new_time_vector = interpolated_values[:, 0].tolist()
111 new_values = interpolated_values[:, 1].tolist()
112 except ValueError:
113 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
114 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
115 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
116 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
117 return new_time_vector, new_values_nan_to_none
119 # insert back None values at the correct timestamps
120 for none_group in none_group_bounds:
121 start_index = npy.searchsorted(new_time_vector, none_group[0])
122 new_time_vector[start_index:start_index] = none_group
123 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
125 return new_time_vector, new_values
128def is_of_type(value, wanted_type):
129 if wanted_type is float:
130 return isinstance(value, (int, float))
131 return isinstance(value, wanted_type)
134# Models
135class TwinPadModel(BaseModel):
136 @classmethod
137 def dict_to_object(cls, dict_):
138 return cls.model_validate(dict_)
140 def to_dict(self, exclude=None):
141 dict_ = self.model_dump(exclude=exclude)
142 return dict_
145class GenericMongo(TwinPadModel):
146 id: str | None = None
147 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
149 @classmethod
150 def collection(cls):
151 return get_collection(systems_database, cls.collection_name, create=True)
153 @classmethod
154 def response_from_query(cls, query) -> ListResponse[Self]:
155 request_filters = query.mongodb_filter()
156 items = []
157 if ":" in query.sort_by:
158 sort_field, sort_order = query.sort_by.split(":")
159 sort_order = int(sort_order)
160 else:
161 sort_field = query.sort_by
162 sort_order = 1
163 collection = get_collection(systems_database, cls.collection_name, create=True)
164 total = collection.count_documents(request_filters)
166 pipeline = []
167 added_properties = []
168 if "$and" in request_filters:
169 for request_filter in request_filters["$and"]:
170 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
171 if filtered_property in request_filter:
172 pipeline.extend(pipeline_steps)
173 added_properties.append(filtered_property)
174 else:
175 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
176 if filtered_property in request_filters:
177 pipeline.extend(pipeline_steps)
178 added_properties.append(filtered_property)
179 pipeline.append({"$match": request_filters})
180 if sort_field in cls.custom_pipeline_steps:
181 pipeline.extend(cls.custom_pipeline_steps[sort_field])
182 added_properties.append(sort_field)
183 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}])
185 if (query.limit is not None) and (query.limit != 0):
186 pipeline.append({"$limit": query.limit})
188 for filtered_property, step in cls.custom_pipeline_steps.items():
189 if filtered_property not in added_properties:
190 pipeline.extend(step)
192 cursor = collection.aggregate(pipeline)
194 for item_dict in cursor:
195 items.append(cls.mongo_dict_to_object(item_dict))
197 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
199 @classmethod
200 def get_from_id(cls, item_id) -> Self | None:
201 return cls.get_one_by_attribute("_id", ObjectId(item_id))
203 @classmethod
204 def mongo_dict_to_object(cls, mongo_dict):
205 mongo_dict["id"] = str(mongo_dict["_id"])
206 del mongo_dict["_id"]
207 return cls.dict_to_object(mongo_dict)
209 @classmethod
210 def get_by_attribute(cls, attribute_name: str, attribute_value):
211 """Returns all items that match the attribute with value."""
212 pipeline = []
213 if attribute_name in cls.custom_pipeline_steps:
214 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
215 pipeline.append({"$match": {attribute_name: attribute_value}})
216 for key, step in cls.custom_pipeline_steps.items():
217 if key != attribute_name:
218 pipeline.extend(step)
219 items = cls.collection().aggregate(pipeline)
220 if items is None:
221 return None
222 return [cls.mongo_dict_to_object(d) for d in items]
224 @classmethod
225 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
226 pipeline = []
227 if attribute_name in cls.custom_pipeline_steps:
228 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
229 pipeline.append({"$match": {attribute_name: attribute_value}})
230 pipeline.append({"$limit": 1})
231 for key, step in cls.custom_pipeline_steps.items():
232 if key != attribute_name:
233 pipeline.extend(step)
234 items = cls.collection().aggregate(pipeline).to_list()
235 if len(items) == 0:
236 return None
237 return cls.mongo_dict_to_object(items[0])
239 @classmethod
240 def get_all(cls, sort_by="_id") -> list[Self]:
241 items = []
242 pipeline = []
243 if sort_by in cls.custom_pipeline_steps:
244 pipeline.extend(cls.custom_pipeline_steps[sort_by])
245 pipeline.append({"$sort": {sort_by: ASCENDING}})
246 for key, step in cls.custom_pipeline_steps.items():
247 if key != sort_by:
248 pipeline.extend(step)
249 for dict_ in cls.collection().aggregate(pipeline):
250 items.append(cls.mongo_dict_to_object(dict_))
251 return items
253 @classmethod
254 def get_number_documents(cls):
255 collection = get_collection(systems_database, cls.collection_name)
256 if collection is None:
257 return 0
258 return collection.count_documents({})
260 def insert(self):
261 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
262 self.id = str(insert_result.inserted_id)
263 return self.id
265 def update(self, update_dict):
266 for key, value in update_dict.items():
267 setattr(self, key, value)
268 self.collection().find_one_and_update(
269 {"_id": ObjectId(self.id)},
270 {"$set": update_dict},
271 return_document=ReturnDocument.AFTER,
272 )
274 return self
276 def delete(self):
277 result = self.collection().delete_one({"_id": ObjectId(self.id)})
278 return result.deleted_count > 0
281class User(GenericMongo):
282 collection_name: ClassVar[str] = "users"
284 firstname: str
285 lastname: str
286 email: str
287 password: str
288 is_active: bool | None = False
289 is_admin: bool | None = False
290 is_connected: bool | None = False
291 company_id: str | None = None
293 def to_dict(self, exclude=None):
294 if exclude is None:
295 exclude = {"password"}
296 return GenericMongo.to_dict(self, exclude=exclude)
298 @classmethod
299 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
300 users = cls.get_all()
301 if not users:
302 is_admin = True
303 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
304 user_collection = get_collection(systems_database, "users", create=True)
305 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
306 if new_user is None:
307 return None
308 return {"user_id": str(new_user.inserted_id)}
310 @classmethod
311 def update_info(cls, user: "UserUpdate", user_id: str):
312 updated_user = cls.collection().find_one_and_update(
313 {"_id": ObjectId(user_id)},
314 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
315 return_document=ReturnDocument.AFTER,
316 )
317 updated_user["id"] = str(updated_user["_id"])
318 del (updated_user["_id"], updated_user["is_connected"])
319 return cls(**updated_user)
322UserUpdate = create_update_model(User)
325class Mode(TwinPadModel):
326 mode_id: int
327 name: str
328 frequency_multiplier: float
329 min_frequency: float
332class DeviceUpdate(TwinPadModel):
333 mode_id: int
336class Device(GenericMongo):
337 collection_name: ClassVar[str] = "devices"
339 device_id: str
340 name: str
341 description: str = ""
342 modes: list[Mode]
343 current_mode_id: int | None = None
344 last_ping: float | None = None
345 petri_network: Any
346 pid: Any
347 load: float | None = None
348 tokens: list[int] = Field(default_factory=list)
349 status: str
351 async def change_mode(self, update_dict, current_user: User):
352 has_error = False
354 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
355 has_error = True
356 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
357 elif self.current_mode_id is not None:
358 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
359 else:
360 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
361 command = Command(
362 sent_at=time.time(),
363 command_type="Mode change",
364 description=description,
365 user_id=current_user.id,
366 )
368 if has_error:
369 command.response_time = 0
370 command.succeeded = False
371 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
372 else:
373 response = await send_mode_change(self.device_id, update_dict.mode_id)
374 command.receive_response(response)
376 Command.create(command)
377 return response
379 @classmethod
380 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
381 devices_by_id = {}
382 for signal_id in signal_ids:
383 device_id = signal_id.split(".")[0]
384 if device_id not in devices_by_id:
385 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
386 return devices_by_id
389class DeviceSetup(GenericMongo):
390 collection_name: ClassVar[str] = "device_setups"
392 device_ids: list[str]
393 active: bool = False
394 variable_mapping: dict[str, str]
397DeviceSetupUpdate = create_update_model(DeviceSetup)
400class DeviceState(GenericMongo):
401 collection_name: ClassVar[str] = "devices_states"
403 timestamp: float
404 mode: str | None = None
405 load: float | None = None
406 tokens: list[int] = Field(default_factory=list)
407 modified_properties: list[str] = Field(default_factory=list)
409 @classmethod
410 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
411 req_filter = query.mongodb_filter()
412 items = []
413 if ":" in query.sort_by:
414 sort_field, sort_order = query.sort_by.split(":")
415 sort_order = int(sort_order)
416 else:
417 sort_field = query.sort_by
418 sort_order = 1
419 collection = get_collection(devices_states_database, device_id)
420 if collection is None:
421 total = 0
422 cursor = []
423 else:
424 total = collection.count_documents(req_filter)
425 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
426 if (query.limit is not None) and (query.limit != 0):
427 cursor = cursor.limit(query.limit)
428 for item_dict in cursor:
429 items.append(
430 cls(
431 timestamp=item_dict.get("precise_timestamp"),
432 mode=item_dict.get("mode", None),
433 load=item_dict.get("load", None),
434 tokens=item_dict.get("tokens", Field(default_factory=list)),
435 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
436 )
437 )
438 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
441class SignalSample(TwinPadModel):
442 signal_id: str
443 timestamp: float
444 value: float | int | str | bool | None
445 forced_value: float | int | str | bool | None = None
447 @classmethod
448 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
450 collection = get_signal_collection(signal_id)
451 if collection is None:
452 return None
454 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
455 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
456 bucket = get_signal_collection(f"system.buckets.{signal_id}")
457 first_bucket = None
458 if bucket is not None:
459 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
460 if first_bucket is not None:
461 sample_data = collection.find_one(
462 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
463 )
464 else:
465 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
467 if sample_data is None:
468 return None
470 timestamp = sample_data["precise_timestamp"]
472 return cls(
473 signal_id=signal_id,
474 timestamp=timestamp,
475 value=sample_data.get("value", None),
476 forced_value=sample_data.get("forced_value", None),
477 )
479 @classmethod
480 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
481 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
483 @classmethod
484 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
486 collection = get_signal_collection(signal_id)
487 if collection is None:
488 return None
490 # Same workaround as above function, very effective to narrow down big sets of data
491 bucket = get_signal_collection(f"system.buckets.{signal_id}")
492 last_bucket = None
493 if bucket is not None:
494 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
495 if last_bucket is not None:
496 sample_data = collection.find_one(
497 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
498 sort={"precise_timestamp": -1},
499 )
500 else:
501 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
503 if sample_data is None:
504 return None
506 timestamp = sample_data["precise_timestamp"]
508 if device is None:
509 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
510 if device is not None and device.last_ping is not None:
511 if timestamp is None:
512 timestamp = device.last_ping
513 else:
514 timestamp = max(timestamp, device.last_ping)
515 return cls(
516 signal_id=signal_id,
517 timestamp=timestamp,
518 value=sample_data.get("value", None),
519 forced_value=sample_data.get("forced_value", None),
520 )
522 @classmethod
523 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None:
524 collection = get_signal_collection(signal_id)
525 if collection is None:
526 return None
528 sample_data = collection.find_one(
529 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1}
530 )
531 if sample_data is None:
532 return None
534 return cls(
535 signal_id=signal_id,
536 timestamp=sample_data.get("precise_timestamp"),
537 value=sample_data.get("value"),
538 forced_value=sample_data.get("forced_value"),
539 )
541 @classmethod
542 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
543 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
544 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
546 @classmethod
547 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None:
548 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids]
551class SignalData(TwinPadModel):
552 signal_id: str
553 forcible: bool = True
554 time_vector: list[float]
555 values: list[float | int | str | None]
556 forced_values: list[float | int | str | None]
558 data_start: float | None = None
559 data_end: float | None = None
561 number_samples: int = 0
562 number_samples_db: int = 0
564 db_query_time: float = 0.0
565 init_time: float = 0.0
566 data_processing_time: float = 0.0
568 @classmethod
569 def get_from_signal_id(
570 cls,
571 signal_id: str,
572 min_timestamp: float = None,
573 max_timestamp: float = None,
574 window_min_timestamp: float = None,
575 window_max_timestamp: float = None,
576 interpolate_bounds: bool = True,
577 max_documents: int = None,
578 ) -> Self:
580 now = time.time()
582 req_signal = {}
583 if min_timestamp is not None:
584 req_signal.setdefault("timestamp", {})
585 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
586 if max_timestamp is not None:
587 req_signal.setdefault("timestamp", {})
588 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
590 collection = get_signal_collection(signal_id)
591 if collection is None:
592 return cls(
593 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
594 )
596 db_req_start = time.time()
598 sort_step = {"$sort": {"precise_timestamp": 1}}
599 number_results = collection.count_documents(req_signal)
601 pipeline = []
602 if req_signal:
603 pipeline.append({"$match": req_signal}) # Filter data if needed
605 pipeline.extend(
606 [
607 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
608 sort_step,
609 ]
610 )
612 if max_documents is not None and max_documents < number_results:
613 unsampling_ratio = math.ceil(number_results / max_documents)
614 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
615 pipeline.extend(
616 [
617 {
618 "$setWindowFields": {
619 "sortBy": {"precise_timestamp": 1},
620 "output": {"index": {"$documentNumber": {}}},
621 }
622 },
623 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
624 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
625 {"$replaceRoot": {"newRoot": "$doc"}},
626 {"$unset": ["index", "group_id"]},
627 {"$sort": {"precise_timestamp": 1}},
628 ]
629 )
631 # logger.info(f"pipeline: %s", str(pipeline))
632 cursor = collection.aggregate(pipeline)
633 db_req_time = time.time() - db_req_start
635 init_time = time.time()
637 results = cursor.to_list()
638 time_vector = []
639 values = []
640 forced_values = []
641 for s in results:
642 time_vector.append(s["precise_timestamp"])
643 values.append(s.get("value", None))
644 forced_values.append(s.get("forced_value", None))
646 signal = Signal.get_from_signal_id(signal_id)
647 class_ = signal.signal_data_class
649 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
650 time_vector, values, forced_values = cls.interpolate_bounds(
651 class_,
652 collection,
653 signal_id,
654 time_vector,
655 values,
656 forced_values,
657 window_min_timestamp,
658 window_max_timestamp,
659 )
661 if values:
662 # TODO: check below. a bit strange
663 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
664 # Adding last value as it should be repeated
665 time_vector.append(now)
666 values.append(values[-1])
667 forced_values.append(forced_values[-1])
669 init_time = time.time() - init_time
671 # See line 292 for explanation
672 bucket = get_signal_collection(f"system.buckets.{signal_id}")
673 first_bucket = None
674 if bucket is not None:
675 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
676 if first_bucket is not None:
677 data_start = first_bucket["control"]["min"]["precise_timestamp"]
678 else:
679 data_start = None
681 last_bucket = None
682 if bucket is not None:
683 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
684 if last_bucket is not None:
685 data_end = last_bucket["control"]["max"]["precise_timestamp"]
686 else:
687 data_end = None
689 return class_(
690 signal_id=signal_id,
691 forcible=signal.forcible,
692 time_vector=time_vector,
693 values=values,
694 forced_values=forced_values,
695 data_start=data_start,
696 data_end=data_end,
697 number_samples=len(values),
698 number_samples_db=number_results,
699 db_query_time=db_req_time,
700 init_time=init_time,
701 )
703 @staticmethod
704 def interpolate_bounds(
705 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
706 ):
707 sample_right = None
708 # Fetching right side value & interpolation
709 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
710 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
711 sample_right = collection.find_one(
712 {
713 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
714 "value": {"$exists": True},
715 },
716 sort={"precise_timestamp": -1},
717 )
718 if sample_right:
719 if time_vector:
720 right_sd = class_(
721 signal_id=signal_id,
722 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
723 values=[values[-1], sample_right.get("value", None)],
724 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
725 )
726 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
727 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
728 else:
729 max_ts_value = sample_right.get("value", None)
730 max_ts_forced_value = sample_right.get("forced_value", None)
731 time_vector.append(window_max_timestamp)
732 values.append(max_ts_value)
733 forced_values.append(max_ts_forced_value)
735 # Fetching left side value & interpolation
736 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
737 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
738 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
739 sample_left = sample_right
740 sample_left = collection.find_one(
741 {
742 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
743 "value": {"$exists": True},
744 },
745 sort={"precise_timestamp": -1},
746 )
748 if sample_left:
749 if time_vector:
750 left_sd = class_(
751 signal_id=signal_id,
752 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
753 values=[sample_left["value"], values[0]],
754 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
755 )
756 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
757 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
758 else:
759 min_ts_value = sample_left.get("value", None)
760 min_ts_forced_value = sample_left.get("forced_value", None)
761 time_vector.insert(0, window_min_timestamp)
762 values.insert(0, min_ts_value)
763 forced_values.insert(0, min_ts_forced_value)
765 return time_vector, values, forced_values
767 def interpolate_values(self, new_time_vector: list[float]):
768 return self.interpolate(new_time_vector, self.values)
770 def interpolate_forced_values(self, new_time_vector: list[float]):
771 return self.interpolate(new_time_vector, self.forced_values)
773 def uniform_desampling(self, number_samples_max: int) -> Self:
774 data_processing_time = time.time()
775 if number_samples_max and self.number_samples > number_samples_max:
776 new_time_vector = npy.linspace(
777 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
778 ).tolist()
779 values = self.interpolate_values(new_time_vector)
780 forced_values = self.interpolate_forced_values(new_time_vector)
781 time_vector = new_time_vector
782 number_samples = len(time_vector)
783 else:
784 time_vector = self.time_vector
785 number_samples = len(self.values)
786 values = self.values[:]
787 forced_values = self.forced_values[:]
788 data_processing_time = time.time() - data_processing_time
790 return self.__class__(
791 signal_id=self.signal_id,
792 time_vector=time_vector,
793 values=values,
794 forced_values=forced_values,
795 number_samples=number_samples,
796 number_samples_db=self.number_samples,
797 data_start=self.data_start,
798 data_end=self.data_end,
799 db_query_time=self.db_query_time,
800 init_time=self.init_time,
801 data_processing_time=self.data_processing_time + data_processing_time,
802 )
804 def interest_window_desampling(
805 self,
806 window_max_number_samples: int,
807 outside_max_number_samples: int,
808 window_min_timestamp: float | None = None,
809 window_max_timestamp: float | None = None,
810 ) -> Self:
811 """Performs a sampling in a window of interest and outside."""
813 if not self.time_vector:
814 return self
816 if window_min_timestamp is None:
817 window_min_timestamp = self.time_vector[0]
818 if window_max_timestamp is None:
819 window_max_timestamp = self.time_vector[-1]
821 data_processing_time = time.time()
823 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
824 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
826 time_vector_before = self.time_vector[:index_window_start]
827 time_vector_window = self.time_vector[index_window_start:index_window_end]
828 time_vector_after = self.time_vector[index_window_end:]
830 # Resampling window
831 if time_vector_window:
832 # Ensurring window bounds
833 if time_vector_window[0] != window_min_timestamp:
834 time_vector_window.insert(0, window_min_timestamp)
835 if time_vector_window[-1] != window_max_timestamp:
836 time_vector_window.append(window_max_timestamp)
837 else:
838 time_vector_window = [window_min_timestamp, window_max_timestamp]
840 if len(time_vector_window) > window_max_number_samples:
841 # Resampling
842 new_window_time_vector = npy.linspace(
843 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
844 ).tolist()
845 time_vector_window = new_window_time_vector
847 # Resampling outside
848 number_samples_before = len(time_vector_before)
849 number_samples_after = len(time_vector_after)
850 if (number_samples_before + number_samples_after) > outside_max_number_samples:
851 new_number_samples_before = min(
852 number_samples_before,
853 math.ceil(
854 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
855 ),
856 )
857 new_number_samples_after = min(
858 number_samples_after,
859 math.ceil(
860 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
861 ),
862 )
863 # Adjusting numbers as math.ceil can do +1 on sum
864 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
865 if new_number_samples_before > new_number_samples_after:
866 new_number_samples_before -= 1
867 else:
868 new_number_samples_after -= 1
870 if new_number_samples_before > 0:
871 new_time_vector_before = npy.linspace(
872 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
873 ).tolist()
874 time_vector_before = new_time_vector_before
876 if new_number_samples_after > 0:
877 new_time_vector_after = npy.linspace(
878 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
879 ).tolist()[::-1]
880 time_vector_after = new_time_vector_after
882 new_time_vector = time_vector_before + time_vector_window + time_vector_after
883 values = self.interpolate_values(new_time_vector)
884 forced_values = self.interpolate_forced_values(new_time_vector)
885 number_samples = len(values)
887 data_processing_time = time.time() - data_processing_time
889 return self.__class__(
890 signal_id=self.signal_id,
891 forcible=self.forcible,
892 time_vector=new_time_vector,
893 values=values,
894 forced_values=forced_values,
895 number_samples=number_samples,
896 number_samples_db=self.number_samples,
897 data_start=self.data_start,
898 data_end=self.data_end,
899 db_query_time=self.db_query_time,
900 init_time=self.init_time,
901 data_processing_time=self.data_processing_time + data_processing_time,
902 )
904 def csv_export(self):
905 output = io.StringIO()
906 writer = csv.writer(output)
907 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
908 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
909 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
910 return output.getvalue().encode("utf-8")
912 def prestoplot_export(self):
913 clean_signal_id = self.signal_id.replace(".", "_")
914 if clean_signal_id[0].isnumeric():
915 clean_signal_id = "_" + clean_signal_id
917 output = io.StringIO()
918 output.write("# Encoding:\tUTF-8\n")
919 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
920 output.write("ISO8601\tnone\tnone\n")
921 output.write(f"# Description :\t{clean_signal_id}\n")
923 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
924 output.write(
925 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
926 )
927 return output.getvalue().encode("utf-8")
930class NumericSignalData(SignalData):
931 data_type: str = "float"
932 values: list[float | int | None]
933 forced_values: list[float | int | None]
935 def interpolate(self, new_time_vector: list[float], items):
936 items = [npy.nan if s is None else s for s in items]
937 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
939 def uniform_desampling(self, number_samples_max: int) -> Self:
940 data_processing_time = time.time()
941 if number_samples_max and self.number_samples > number_samples_max:
942 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
943 forced_values = self.interpolate_forced_values(time_vector)
944 number_samples = len(time_vector)
945 else:
946 time_vector = self.time_vector
947 number_samples = len(self.values)
948 values = self.values[:]
949 forced_values = self.forced_values[:]
950 data_processing_time = time.time() - data_processing_time
952 return self.__class__(
953 signal_id=self.signal_id,
954 time_vector=time_vector,
955 values=values,
956 forced_values=forced_values,
957 number_samples=number_samples,
958 number_samples_db=self.number_samples,
959 data_start=self.data_start,
960 data_end=self.data_end,
961 db_query_time=self.db_query_time,
962 init_time=self.init_time,
963 data_processing_time=self.data_processing_time + data_processing_time,
964 )
966 def interest_window_desampling(
967 self,
968 window_max_number_samples: int,
969 outside_max_number_samples: int,
970 window_min_timestamp: float | None = None,
971 window_max_timestamp: float | None = None,
972 ) -> Self:
973 """Performs a sampling in a window of interest and outside."""
975 if not self.time_vector:
976 return self
978 if window_min_timestamp is None:
979 window_min_timestamp = self.time_vector[0]
980 if window_max_timestamp is None:
981 window_max_timestamp = self.time_vector[-1]
983 data_processing_time = time.time()
985 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
986 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
988 time_vector_before = self.time_vector[:index_window_start]
989 time_vector_window = self.time_vector[index_window_start:index_window_end]
990 time_vector_after = self.time_vector[index_window_end:]
992 values_before = self.values[:index_window_start]
993 values_window = self.values[index_window_start:index_window_end]
994 values_after = self.values[index_window_end:]
995 window_min_value = self.interpolate_values([window_min_timestamp])[0]
996 window_max_value = self.interpolate_values([window_max_timestamp])[0]
998 # Resampling window
999 if time_vector_window:
1000 # Ensurring window bounds
1001 if time_vector_window[0] != window_min_timestamp:
1002 time_vector_window.insert(0, window_min_timestamp)
1003 values_window.insert(0, window_min_value)
1004 if time_vector_window[-1] != window_max_timestamp:
1005 time_vector_window.append(window_max_timestamp)
1006 values_window.append(window_max_value)
1007 else:
1008 time_vector_window = [window_min_timestamp, window_max_timestamp]
1009 values_window = [window_min_value, window_max_value]
1011 if len(time_vector_window) > window_max_number_samples:
1012 # Resampling
1013 time_vector_window, values_window = downsample_list(
1014 time_vector_window, values_window, window_max_number_samples
1015 )
1017 # Resampling outside
1018 number_samples_before = len(time_vector_before)
1019 number_samples_after = len(time_vector_after)
1020 if (number_samples_before + number_samples_after) > outside_max_number_samples:
1021 new_number_samples_before = min(
1022 number_samples_before,
1023 math.ceil(
1024 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1025 ),
1026 )
1027 new_number_samples_after = min(
1028 number_samples_after,
1029 math.ceil(
1030 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1031 ),
1032 )
1033 # Adjusting numbers as math.ceil can do +1 on sum
1034 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1035 if new_number_samples_before > new_number_samples_after:
1036 new_number_samples_before -= 1
1037 else:
1038 new_number_samples_after -= 1
1040 if new_number_samples_before > 0:
1041 time_vector_before, values_before = downsample_list(
1042 time_vector_before, values_before, new_number_samples_before
1043 )
1045 if new_number_samples_after > 0:
1046 time_vector_after, values_after = downsample_list(
1047 time_vector_after, values_after, new_number_samples_after
1048 )
1050 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1051 values = values_before + values_window + values_after
1052 forced_values = self.interpolate_forced_values(new_time_vector)
1053 number_samples = len(values)
1055 data_processing_time = time.time() - data_processing_time
1057 return self.__class__(
1058 signal_id=self.signal_id,
1059 time_vector=new_time_vector,
1060 values=values,
1061 forced_values=forced_values,
1062 number_samples=number_samples,
1063 number_samples_db=self.number_samples,
1064 data_start=self.data_start,
1065 data_end=self.data_end,
1066 db_query_time=self.db_query_time,
1067 init_time=self.init_time,
1068 data_processing_time=self.data_processing_time + data_processing_time,
1069 )
1072class StringSignalData(SignalData):
1073 data_type: str = "str"
1074 values: list[str | None]
1075 forced_values: list[str | None]
1077 def interpolate(self, new_time_vector: list[float], items):
1078 # Find the indices of the values in xp that are just smaller or equal to x
1079 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1080 indices = npy.clip(indices, 0, len(items) - 1)
1081 # Return the corresponding left string values from fp
1082 return [items[i] for i in indices]
1085class SignalsData(TwinPadModel):
1086 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1087 data_processing_time: float
1088 data_start: float | None
1089 data_end: float | None
1091 @classmethod
1092 def get_from_signal_ids(
1093 cls,
1094 signal_ids: list[str],
1095 min_timestamp: float = None,
1096 max_timestamp: float = None,
1097 window_min_timestamp: float = None,
1098 window_max_timestamp: float = None,
1099 interpolate_bounds: bool = True,
1100 max_documents: int = None,
1101 ) -> Self:
1102 signals_data = []
1103 data_start = None
1104 data_end = None
1105 if max_timestamp is None:
1106 max_timestamp = time.time()
1107 data_processing_time = 0.0
1108 for signal_id in signal_ids:
1109 signal_data = SignalData.get_from_signal_id(
1110 signal_id=signal_id,
1111 min_timestamp=min_timestamp,
1112 max_timestamp=max_timestamp,
1113 window_min_timestamp=window_min_timestamp,
1114 window_max_timestamp=window_max_timestamp,
1115 interpolate_bounds=interpolate_bounds,
1116 max_documents=max_documents,
1117 )
1118 data_processing_time += signal_data.data_processing_time
1119 signals_data.append(signal_data)
1120 if signal_data.data_start is not None:
1121 if data_start is None:
1122 data_start = signal_data.data_start
1123 else:
1124 data_start = min(signal_data.data_start, data_start)
1125 if signal_data.data_end is not None:
1126 if data_end is None:
1127 data_end = signal_data.data_end
1128 else:
1129 data_end = max(signal_data.data_end, data_end)
1131 return cls(
1132 signals_data=signals_data,
1133 data_processing_time=data_processing_time,
1134 data_start=data_start,
1135 data_end=data_end,
1136 )
1138 def uniform_desampling(self, number_samples_max: int) -> Self:
1139 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1140 return SignalsData(
1141 signals_data=signals_data,
1142 data_processing_time=sum(s.data_processing_time for s in signals_data),
1143 data_start=self.data_start,
1144 data_end=self.data_end,
1145 )
1147 def interest_window_desampling(
1148 self,
1149 window_max_number_samples: int,
1150 outside_max_number_samples: int,
1151 window_min_timestamp: float = None,
1152 window_max_timestamp: float = None,
1153 ) -> Self:
1154 signals_data = [
1155 s.interest_window_desampling(
1156 window_max_number_samples=window_max_number_samples,
1157 outside_max_number_samples=outside_max_number_samples,
1158 window_min_timestamp=window_min_timestamp,
1159 window_max_timestamp=window_max_timestamp,
1160 )
1161 for s in self.signals_data
1162 ]
1164 return SignalsData(
1165 signals_data=signals_data,
1166 data_processing_time=sum(s.data_processing_time for s in signals_data),
1167 data_start=self.data_start,
1168 data_end=self.data_end,
1169 )
1171 def zip_export(self, file_format: str = "csv"):
1172 # return self.signals_data[0].csv_export()
1173 zip_buffer = io.BytesIO()
1174 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1175 for signal_data in self.signals_data:
1176 if file_format == "csv":
1177 export_io = signal_data.csv_export()
1178 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1179 elif file_format == "prestoplot":
1180 export_io = signal_data.prestoplot_export()
1181 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1182 else:
1183 raise ValueError(f"Format not found. Got: {file_format}")
1184 zip_bytes = zip_buffer.getvalue()
1185 # zip_bytes.seek(0)
1186 return zip_bytes
1188 def hdf5_export(self):
1189 hdf5_buffer = io.BytesIO()
1190 custom_type_float = npy.dtype(
1191 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1192 )
1193 custom_type_string = npy.dtype(
1194 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1195 )
1196 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1197 for signal_data in self.signals_data:
1198 signal_group = hdf5_file.create_group(signal_data.signal_id)
1199 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1200 if signal_data.data_type == "str":
1201 export_data = npy.array(
1202 list(
1203 zip(
1204 date_vector,
1205 signal_data.time_vector,
1206 signal_data.values,
1207 signal_data.forced_values,
1208 )
1209 ),
1210 dtype=custom_type_string,
1211 )
1212 else:
1213 export_data = npy.array(
1214 list(
1215 zip(
1216 date_vector,
1217 signal_data.time_vector,
1218 signal_data.values,
1219 signal_data.forced_values,
1220 )
1221 ),
1222 dtype=custom_type_float,
1223 )
1224 signal_group["data"] = export_data
1225 return hdf5_buffer.getvalue()
1228class SignalStatus(TwinPadModel):
1229 status: str
1230 reason: str
1231 delay: float | None
1234class DigitizationFunction(TwinPadModel):
1235 bits: int | None = None
1236 min_value: float
1237 max_value: float
1238 min_raw_value: float
1239 max_raw_value: float
1242class SignalUpdate(TwinPadModel):
1243 value: float | str | bool | int | None = None
1244 forced_value: float | str | bool | int | None = None
1245 timestamp: int | None = None
1248class SignalType(str, Enum):
1249 command = "command"
1250 sensor = "sensor"
1251 external_sensor = "external_sensor"
1254SIGNALDATA_TYPES = {
1255 "int": NumericSignalData,
1256 "float": NumericSignalData,
1257 "str": StringSignalData,
1258 "bool": NumericSignalData,
1259 "epoch": NumericSignalData,
1260}
1263class Signal(GenericMongo):
1264 collection_name: ClassVar[str] = "signals"
1266 signal_id: str
1267 frequency: float
1268 unit: str | None
1269 description: str
1270 type: SignalType
1271 data_type: str
1272 precision_digits: int | None
1273 forcible: bool
1275 digitization_function: DigitizationFunction | None
1277 @property
1278 def device(self) -> Device:
1279 device_id = self.signal_id.split(".")[0]
1280 device = Device.get_one_by_attribute("device_id", device_id)
1281 return device
1283 @cached_property
1284 def signal_data_class(self):
1285 if self.data_type in SIGNALDATA_TYPES:
1286 return SIGNALDATA_TYPES[self.data_type]
1287 if self.data_type.startswith("enum"):
1288 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1289 raise ValueError(f"Unhandled python type: {self.data_type}")
1291 @cached_property
1292 def python_type(self):
1293 if self.data_type in TYPES:
1294 return TYPES[self.data_type]
1295 if self.data_type.startswith("enum"):
1296 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1297 return Literal[*choices]
1298 raise ValueError(f"Unhandled python type: {self.data_type}")
1300 @computed_field
1301 @property
1302 def status(self) -> SignalStatus:
1303 now = time.time()
1304 status = "up"
1305 reason = ""
1307 # See line 285 for explanation
1308 bucket = get_signal_collection(f"system.buckets.{self.signal_id}")
1309 last_bucket = None
1310 if bucket is not None:
1311 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1312 if last_bucket is None:
1313 status = "no data"
1314 reason = "signal does not exist"
1315 return SignalStatus(status=status, reason=reason, delay=None)
1317 try:
1318 last_date = last_bucket["control"]["max"]["timestamp"]
1319 last_date = last_date.replace(tzinfo=pytz.UTC)
1320 last_value_ts = last_date.timestamp()
1321 except IndexError:
1322 last_value_ts = None
1324 if last_value_ts is None:
1325 delay = None
1326 reason = "No data from signal"
1327 else:
1328 # Since device is a computed property, only compute it once
1329 device = self.device
1330 if device is not None and device.last_ping is not None:
1331 last_value_ts = max(last_value_ts, device.last_ping)
1332 delay = now - last_value_ts
1333 if delay > DEVICE_TIMEOUT:
1334 status = "down"
1335 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1336 return SignalStatus(status=status, reason=reason, delay=delay)
1338 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1339 command = Command(
1340 sent_at=time.time(),
1341 command_type="Signal command",
1342 user_id=current_user.id,
1343 )
1345 has_input_error = False
1346 error_message = ""
1348 if self.data_type.startswith("enum"):
1349 enum_options = get_args(self.python_type)
1351 if update_dict.value is not None and update_dict.value not in enum_options:
1352 has_input_error = True
1353 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1354 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1355 has_input_error = True
1356 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1357 else:
1358 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1359 has_input_error = True
1360 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1361 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1362 has_input_error = True
1363 error_message += (
1364 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1365 )
1367 if has_input_error:
1368 command.response_time = 0
1369 command.succeeded = False
1370 command.description = f"Tried to modify signal {self.signal_id}"
1371 response = {"error": True, "status_code": 400, "message": error_message}
1372 else:
1373 response = await send_signal_value(self.signal_id, update_dict)
1374 command.receive_response(response)
1376 Command.create(command)
1377 return response
1379 @classmethod
1380 def get_from_signal_id(cls, signal_id) -> Self:
1381 """Could be generic from mongo"""
1382 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1383 if not raw_value:
1384 return None
1385 del raw_value["_id"]
1386 return cls.dict_to_object(raw_value)
1388 @classmethod
1389 def get_all_ids(cls) -> list[str]:
1390 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1392 return [signal["signal_id"] for signal in cursor]
1394 async def number_samples(self):
1395 collection = get_signal_collection(signal_id=self.signal_id)
1396 if collection is None:
1397 return 0
1399 number_samples = collection.estimated_document_count()
1401 number_samples_async_collection = await get_async_collection(
1402 systems_async_database, "number_samples", create=True, time_series=True
1403 )
1405 loop = asyncio.get_running_loop()
1406 loop.create_task(
1407 number_samples_async_collection.insert_one(
1408 {
1409 "timestamp": datetime.datetime.now(pytz.UTC),
1410 "signal_id": self.signal_id,
1411 "number_samples": number_samples,
1412 }
1413 )
1414 )
1416 return number_samples
1418 def sample_datasize(self):
1419 return signals_database.command("collstats", self.signal_id)["size"]
1421 @classmethod
1422 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1423 result = cls.collection().aggregate(
1424 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1425 )
1427 return {signal["signal_id"]: signal["forcible"] for signal in result}
1430class ServicesStatus(TwinPadModel):
1431 backend: str
1432 cloud_broker: str
1433 time_series_database: str
1434 signal_storage: str
1435 heartbeat_storage: str
1436 data_analyzer: str
1438 @classmethod
1439 def check(cls) -> Self:
1440 return cls(
1441 cloud_broker=ping(RABBITMQ_HOST),
1442 backend="up",
1443 time_series_database=ping(MONGO_HOST),
1444 signal_storage=ping(SIGNAL_STORAGE_HOST),
1445 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1446 data_analyzer=ping(DATA_ANALYZER_HOST),
1447 )
1450def ping(host):
1451 try:
1452 if ping3.ping(host, timeout=0.8):
1453 return "up"
1454 except PermissionError:
1455 pass
1456 return "down"
1459class Event(GenericMongo):
1460 collection_name: ClassVar[str] = "events"
1462 name: str
1463 timestamp: float
1464 event_rule_id: str
1466 @computed_field
1467 @cached_property
1468 def event_rule(self) -> "EventRule":
1469 return EventRule.get_from_id(self.event_rule_id)
1471 @classmethod
1472 def dict_to_object(cls, dict_):
1473 """Refine to convert timestamp to datetime for mongodb."""
1474 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1475 return super().dict_to_object(dict_)
1478class TwinPadActivity(GenericMongo):
1479 timestamp: float
1480 amount: int
1482 @classmethod
1483 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
1484 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1485 number_events_collection = get_collection(systems_database, "number_events")
1486 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1487 items = []
1488 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1489 if number_events_collection is None or recompute_amount:
1490 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1491 number_events_collection.delete_many({})
1492 first_event = events_collection.find_one(sort={"timestamp": 1})
1493 if first_event is None:
1494 return items
1495 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1496 tzinfo=pytz.UTC
1497 )
1498 while last_computed_day < TODAY:
1499 day_nb_events = events_collection.count_documents(
1500 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1501 )
1502 if day_nb_events > 0:
1503 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
1504 last_computed_day += ONE_DAY_OFFSET
1505 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1506 if number_events_today > 0:
1507 number_events_collection.delete_many({"timestamp": TODAY})
1508 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
1509 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1510 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1511 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1512 for day in number_events:
1513 day["timestamp"] = day["timestamp"].timestamp()
1514 items.append(cls.mongo_dict_to_object(day))
1515 return items
1517 @classmethod
1518 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1519 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1520 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1521 signals_number_samples_collection = get_collection(
1522 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
1523 )
1524 items = []
1525 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1526 if number_samples_collection is None or recompute_amount:
1527 number_samples_collection = get_collection(
1528 systems_database, "number_received_samples", create=True, time_series=True
1529 )
1530 number_samples_collection.delete_many({})
1531 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
1532 if first_sample is None:
1533 return items
1534 # compute from day of first found event
1535 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
1536 tzinfo=pytz.UTC
1537 )
1538 while last_computed_day < TODAY:
1539 number_samples_request = signals_number_samples_collection.aggregate(
1540 [
1541 {
1542 "$match": {
1543 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
1544 }
1545 },
1546 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1547 ]
1548 ).to_list()
1549 if len(number_samples_request) == 0:
1550 number_samples = 0
1551 else:
1552 number_samples = number_samples_request[0].get("number_samples", 0)
1553 if number_samples > 0:
1554 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
1555 last_computed_day += ONE_DAY_OFFSET
1556 number_samples_request = signals_number_samples_collection.aggregate(
1557 [
1558 {"$match": {"timestamp": {"$gte": TODAY}}},
1559 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1560 ]
1561 ).to_list()
1562 if len(number_samples_request) == 0:
1563 number_samples_today = 0
1564 else:
1565 number_samples_today = number_samples_request[0].get("number_samples", 0)
1566 if number_samples_today > 0:
1567 number_samples_collection.delete_many({"timestamp": TODAY})
1568 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
1569 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1570 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1571 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1572 for day in number_events:
1573 day["timestamp"] = day["timestamp"].timestamp()
1574 items.append(cls.mongo_dict_to_object(day))
1575 return items
1577 @classmethod
1578 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1579 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1580 number_commands_collection = get_collection(systems_database, "number_commands")
1581 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
1582 items = []
1583 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1584 if number_commands_collection is None or recompute_amount:
1585 number_commands_collection = get_collection(
1586 systems_database, "number_commands", create=True, time_series=True
1587 )
1588 number_commands_collection.delete_many({})
1589 first_command = commands_collection.find_one(sort={"timestamp": 1})
1590 if first_command is None:
1591 return items
1592 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
1593 tzinfo=pytz.UTC
1594 )
1595 while last_computed_day < TODAY:
1596 day_nb_commands = commands_collection.count_documents(
1597 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1598 )
1599 if day_nb_commands > 0:
1600 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
1601 last_computed_day += ONE_DAY_OFFSET
1602 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
1603 if number_commands_today > 0:
1604 number_commands_collection.delete_many({"timestamp": TODAY})
1605 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
1606 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1607 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1608 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1609 for day in number_commands:
1610 day["timestamp"] = day["timestamp"].timestamp()
1611 items.append(cls.mongo_dict_to_object(day))
1612 return items
1615class EventRule(GenericMongo):
1616 collection_name: ClassVar[str] = "event_rules"
1618 name: str
1619 formula: str
1620 variables: list[str]
1622 @computed_field
1623 @cached_property
1624 def number_events(self) -> int:
1625 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1628class Company(GenericMongo):
1629 collection_name: ClassVar[str] = "companies"
1630 name: str
1633class Campaign(GenericMongo):
1634 collection_name: ClassVar[str] = "campaigns"
1636 # Properties
1637 id: str | None = None
1638 name: str
1639 description: str | None = None
1641 @classmethod
1642 def create(cls, campaign: Self):
1643 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1644 if new_campaign is None:
1645 return None
1646 return {"campaign_id": str(new_campaign.inserted_id)}
1648 @classmethod
1649 def update(cls, campaign: Self):
1650 updated_campaign = cls.collection().find_one_and_update(
1651 {"_id": ObjectId(campaign.id)},
1652 {"$set": {"name": campaign.name, "description": campaign.description}},
1653 return_document=ReturnDocument.AFTER,
1654 )
1655 return updated_campaign
1657 @classmethod
1658 def delete(cls, campaign_id):
1659 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1660 return deleted_user
1663class Phase(GenericMongo):
1664 collection_name: ClassVar[str] = "phases"
1666 # Properties
1667 id: str | None = None
1668 name: str
1669 description: str | None = None
1670 start_at: float
1671 end_at: float
1673 # FK
1674 campaign_id: str
1676 # @classmethod
1677 # def get_by_date(cls, datetime: float):
1678 # phases = []
1679 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1680 # phases.append(cls.dict_to_object(dict_).model_dump())
1681 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1682 # phases.append(cls.dict_to_object(dict_).model_dump())
1683 # if phases is None:
1684 # return None
1685 # return phases
1687 @classmethod
1688 def create(cls, phase: Self):
1689 phase = Phase(
1690 name=phase.name,
1691 description=phase.description,
1692 start_at=phase.start_at,
1693 end_at=phase.end_at,
1694 campaign_id=phase.campaign_id,
1695 )
1696 phase_collection = get_collection(systems_database, "phases", create=True)
1697 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1698 if new_phase is None:
1699 return None
1700 return {"phase_id": str(new_phase.inserted_id)}
1702 @classmethod
1703 def update(cls, phase: Self):
1704 updated_phase = cls.collection().find_one_and_update(
1705 {"_id": ObjectId(phase.id)},
1706 {
1707 "$set": {
1708 "name": phase.name,
1709 "description": phase.description,
1710 "start_at": phase.start_at,
1711 "end_at": phase.end_at,
1712 }
1713 },
1714 return_document=ReturnDocument.AFTER,
1715 )
1716 return updated_phase
1718 @classmethod
1719 def delete(cls, phase_id):
1720 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1721 return delete_phase
1723 @classmethod
1724 def deleteMany(cls, campaign_id):
1725 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1726 return delete_phases
1729class CustomViewCreation(GenericMongo):
1730 collection_name: ClassVar[str] = "custom_views"
1732 name: str
1733 configuration: list
1736class CustomView(CustomViewCreation):
1737 # Properties
1738 id: str | None = None
1740 # Foreign Key
1741 user_id: str
1743 # # Methods
1744 # @classmethod
1745 # def create(cls, form_custom_view: Self, user_id) -> list:
1746 # custom_view = CustomView(
1747 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1748 # )
1749 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1750 # return new_custom_view
1752 # @classmethod
1753 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1754 # updated_custom_view = cls.collection().find_one_and_update(
1755 # {"_id": ObjectId(custom_view_id)},
1756 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1757 # return_document=ReturnDocument.AFTER,
1758 # )
1759 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1760 # del updated_custom_view["_id"]
1761 # return cls(**updated_custom_view)
1763 # @classmethod
1764 # def delete(cls, custom_view_id) -> bool:
1765 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1766 # return deleted_custom_view.acknowledged
1769CustomViewUpdate = create_update_model(CustomView)
1772class Video(GenericMongo):
1773 collection_name: ClassVar[str] = "videos"
1775 # Properties
1776 name: str
1777 ip_addr: str
1778 username: str | None = None
1779 password: str | None = None
1781 # Methods
1782 @classmethod
1783 def get_all(cls, sort_by="_id") -> list[Self]:
1784 items = []
1785 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1786 items.append(cls.mongo_dict_to_object(dict_))
1787 return items
1789 @classmethod
1790 def get_video(cls, camera_id: ObjectId):
1791 camera = cls.get_from_id(camera_id)
1792 if camera is not None:
1793 return camera.name
1794 return None
1797class Command(GenericMongo):
1798 collection_name: ClassVar[str] = "commands"
1800 # Properties
1801 timestamp: datetime.datetime = None
1802 sent_at: float
1803 response_time: float = 0.0
1804 command_type: str
1805 description: str = ""
1806 succeeded: bool = False
1808 # Foreign key
1809 user_id: str
1811 @classmethod
1812 def collection(cls):
1813 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
1815 @classmethod
1816 def create(cls, command: Self):
1817 command = cls(
1818 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
1819 sent_at=command.sent_at,
1820 response_time=command.response_time,
1821 command_type=command.command_type,
1822 description=command.description,
1823 succeeded=command.succeeded,
1824 user_id=command.user_id,
1825 )
1826 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
1827 if new_command is None:
1828 return None
1829 return {"command_id": str(new_command.inserted_id)}
1831 def receive_response(self, response: dict):
1832 self.response_time = time.time() - self.sent_at
1833 self.succeeded = response.get("error", True) is False
1834 if self.description == "":
1835 self.description += response.get("message", "").rstrip()
1838class SignalsPresetCreation(GenericMongo):
1839 name: str
1840 signal_ids: list[str]
1843class SignalsPreset(SignalsPresetCreation):
1844 collection_name: ClassVar[str] = "signals_presets"
1846 user_id: str
1848 @classmethod
1849 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
1850 signals_preset = cls(
1851 user_id=user_id,
1852 name=signals_preset.name,
1853 signal_ids=signals_preset.signal_ids,
1854 )
1856 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
1858 return str(new_signal_preset.inserted_id)
1861SignalsPresetUpdate = create_update_model(SignalsPreset)
1864class LineStyle(str, Enum):
1865 solid = "solid"
1866 dotted = "dotted"
1867 dashed = "dashed"
1870class SignalAppearance:
1871 value_color: str
1872 forced_value_color: str
1875class GraphThemeCreation(GenericMongo):
1876 collection_name: ClassVar[str] = "graph_themes"
1878 name: str
1879 signal_id: str
1880 value_color: str = ""
1881 forced_value_color: str = ""
1882 value_line_style: LineStyle = LineStyle.solid
1883 forced_value_line_style: LineStyle = LineStyle.solid
1884 private: bool = True
1887class PublicGraphTheme(GraphThemeCreation):
1888 created_by_user: bool
1889 in_user_library: bool
1890 active_for_user: bool
1892 _current_user_id: str = ""
1894 @classproperty
1895 def custom_pipeline_steps(cls) -> dict[str, list]:
1896 return {
1897 "created_by_user": [
1898 {
1899 "$addFields": {
1900 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
1901 }
1902 }
1903 ],
1904 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
1905 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
1906 ],
1907 "in_user_library": [
1908 {
1909 "$addFields": {
1910 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
1911 }
1912 }
1913 ],
1914 "active_for_user": [
1915 {
1916 "$addFields": {
1917 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
1918 }
1919 }
1920 ],
1921 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
1922 "active": [
1923 {
1924 "$addFields": {
1925 "active": "$$REMOVE",
1926 }
1927 }
1928 ],
1929 "creator_id": [
1930 {
1931 "$addFields": {
1932 "creator_id": "$$REMOVE",
1933 }
1934 }
1935 ],
1936 }
1938 @classmethod
1939 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
1940 cls._current_user_id = user_id
1941 return super().response_from_query(query)
1943 @classmethod
1944 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
1945 query.in_user_library = "true"
1946 return cls.response_from_query(query, user_id)
1948 @classmethod
1949 def get_from_id(cls, item_id, user_id: str) -> Self | None:
1950 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
1952 @classmethod
1953 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
1954 cls._current_user_id = user_id
1955 return super().get_by_attribute(attribute_name, attribute_value)
1957 @classmethod
1958 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
1959 cls._current_user_id = user_id
1960 return super().get_one_by_attribute(attribute_name, attribute_value)
1962 @classmethod
1963 def get_all(cls, sort_by: str, user_id: str):
1964 cls._current_user_id = user_id
1965 return super().get_all(sort_by)
1967 @classmethod
1968 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
1969 pipeline = [
1970 {
1971 "$match": {
1972 "active": {"$eq": user_id},
1973 "signal_id": {"$in": signal_ids},
1974 }
1975 },
1976 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
1977 {"$replaceRoot": {"newRoot": "$firstDocument"}},
1978 {
1979 "$project": {
1980 "_id": 0,
1981 "signal_id": 1,
1982 "value_color": 1,
1983 "forced_value_color": 1,
1984 "value_line_style": 1,
1985 "forced_value_line_style": 1,
1986 }
1987 },
1988 ]
1990 result = {}
1992 cursor = cls.collection().aggregate(pipeline)
1993 for document in cursor:
1994 signal_id = document["signal_id"]
1995 del document["signal_id"]
1996 result[signal_id] = document
1998 return result
2001GraphThemeUpdate = create_update_model(PublicGraphTheme)
2004class PrivateGraphTheme(GraphThemeCreation):
2005 # private
2006 creator_id: str
2007 in_library: list[str]
2008 active: list[str]
2010 @classmethod
2011 def create(
2012 cls,
2013 creator_id: str,
2014 name: str,
2015 signal_id: str,
2016 value_color: str,
2017 forced_value_color: str,
2018 value_line_style: LineStyle,
2019 forced_value_line_style: LineStyle,
2020 private: bool,
2021 ):
2022 color_setting = cls(
2023 creator_id=creator_id,
2024 name=name,
2025 signal_id=signal_id,
2026 value_color=value_color,
2027 forced_value_color=forced_value_color,
2028 value_line_style=value_line_style,
2029 forced_value_line_style=forced_value_line_style,
2030 private=private,
2031 in_library=[creator_id],
2032 active=[creator_id],
2033 )
2035 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2036 color_setting.id = str(new_color_setting.inserted_id)
2037 return color_setting
2039 def update(self, update_dict: dict, user_id: str):
2040 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2041 if in_user_lib and user_id not in self.in_library:
2042 self.in_library.append(user_id)
2043 elif not in_user_lib and user_id in self.in_library:
2044 self.in_library.remove(user_id)
2045 update_dict["in_library"] = self.in_library
2046 del update_dict["in_user_library"]
2048 if (active_for_user := update_dict.get("active_for_user")) is not None:
2049 if active_for_user and user_id not in self.active:
2050 self.active.append(user_id)
2051 elif not active_for_user and user_id in self.active:
2052 self.active.remove(user_id)
2053 update_dict["active"] = self.active
2054 del update_dict["active_for_user"]
2056 if update_dict.get("created_by_user") is not None:
2057 del update_dict["created_by_user"]
2059 self.collection().find_one_and_update(
2060 {"_id": ObjectId(self.id)},
2061 {"$set": update_dict},
2062 )
2064 return {}