Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%
1153 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-23 15:18 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-23 15:18 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, get_args
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pydantic import BaseModel, computed_field, Field, create_model
21import numpy as npy
22import lttb
23import h5py
25# from scipy import signal as signal_scipy
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 systems_database,
32 systems_async_database,
33 signals_database,
34 devices_states_database,
35)
36from twinpad_backend.responses import ListResponse
37from twinpad_backend.messages import send_mode_change, send_signal_value
39TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
42RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
43MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
44SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
45HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
46DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
48DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
49NUMBER_SAMPLES_DATABASE_UPDATE = 120
51logger = logging.getLogger("uvicorn.error")
54class classproperty:
55 """
56 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
57 Found here: https://stackoverflow.com/a/76301341
58 """
60 def __init__(self, func):
61 self.fget = func
63 def __get__(self, _, owner):
64 return self.fget(owner)
67def create_update_model(model):
68 fields = {}
70 for field_name, field_annotation in model.model_fields.items():
71 if field_name != "id":
72 fields[field_name] = (field_annotation.annotation | None, None)
74 query_name = model.__name__ + "Update"
75 return create_model(query_name, **fields)
78def get_utc_date_from_timestamp(timestamp: float):
79 return datetime.datetime.fromtimestamp(timestamp).isoformat()
82def downsample_list(time_vector: list, values: list, max_number_samples: int):
83 if len(time_vector) < max_number_samples:
84 return time_vector, values
86 time_vector_copy = copy.deepcopy(time_vector)
87 values_copy = copy.deepcopy(values)
89 none_group_bounds = []
90 none_group_index = -1
91 index = -1
92 # LTTB doesn't handle None values so remove them
93 while values_copy.count(None) > 0:
94 # Store bounds of None value groups so we can insert them back after the downsampling
95 if (new_index := values_copy.index(None)) != index:
96 none_group_bounds.append([time_vector_copy.pop(new_index)])
97 none_group_index += 1
98 elif len(none_group_bounds[none_group_index]) < 2:
99 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
100 else:
101 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
102 values_copy.pop(new_index)
103 index = new_index
104 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
106 try:
107 values_array = npy.array([time_vector_copy, values_copy]).T
108 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
110 new_time_vector = interpolated_values[:, 0].tolist()
111 new_values = interpolated_values[:, 1].tolist()
112 except ValueError:
113 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
114 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
115 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
116 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
117 return new_time_vector, new_values_nan_to_none
119 # insert back None values at the correct timestamps
120 for none_group in none_group_bounds:
121 start_index = npy.searchsorted(new_time_vector, none_group[0])
122 new_time_vector[start_index:start_index] = none_group
123 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
125 return new_time_vector, new_values
128def is_of_type(value, wanted_type):
129 if wanted_type is float:
130 return isinstance(value, (int, float))
131 return isinstance(value, wanted_type)
134# Models
135class TwinPadModel(BaseModel):
136 @classmethod
137 def dict_to_object(cls, dict_):
138 return cls.model_validate(dict_)
140 def to_dict(self, exclude=None):
141 dict_ = self.model_dump(exclude=exclude)
142 return dict_
145class GenericMongo(TwinPadModel):
146 id: str | None = None
147 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
149 @classmethod
150 def collection(cls):
151 return get_collection(systems_database, cls.collection_name, create=True)
153 @classmethod
154 def response_from_query(cls, query) -> ListResponse[Self]:
155 request_filters = query.mongodb_filter()
156 items = []
157 if ":" in query.sort_by:
158 sort_field, sort_order = query.sort_by.split(":")
159 sort_order = int(sort_order)
160 else:
161 sort_field = query.sort_by
162 sort_order = 1
163 collection = get_collection(systems_database, cls.collection_name, create=True)
164 total = collection.count_documents(request_filters)
166 pipeline = []
167 added_properties = []
168 if "$and" in request_filters:
169 for request_filter in request_filters["$and"]:
170 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
171 if filtered_property in request_filter:
172 pipeline.extend(pipeline_steps)
173 added_properties.append(filtered_property)
174 else:
175 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
176 if filtered_property in request_filters:
177 pipeline.extend(pipeline_steps)
178 added_properties.append(filtered_property)
179 pipeline.append({"$match": request_filters})
180 if sort_field in cls.custom_pipeline_steps:
181 pipeline.extend(cls.custom_pipeline_steps[sort_field])
182 added_properties.append(sort_field)
183 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}])
185 if (query.limit is not None) and (query.limit != 0):
186 pipeline.append({"$limit": query.limit})
188 for filtered_property, step in cls.custom_pipeline_steps.items():
189 if filtered_property not in added_properties:
190 pipeline.extend(step)
192 cursor = collection.aggregate(pipeline)
194 for item_dict in cursor:
195 items.append(cls.mongo_dict_to_object(item_dict))
197 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
199 @classmethod
200 def get_from_id(cls, item_id) -> Self | None:
201 return cls.get_one_by_attribute("_id", ObjectId(item_id))
203 @classmethod
204 def mongo_dict_to_object(cls, mongo_dict):
205 mongo_dict["id"] = str(mongo_dict["_id"])
206 del mongo_dict["_id"]
207 return cls.dict_to_object(mongo_dict)
209 @classmethod
210 def get_by_attribute(cls, attribute_name: str, attribute_value):
211 """Returns all items that match the attribute with value."""
212 pipeline = []
213 if attribute_name in cls.custom_pipeline_steps:
214 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
215 pipeline.append({"$match": {attribute_name: attribute_value}})
216 for key, step in cls.custom_pipeline_steps.items():
217 if key != attribute_name:
218 pipeline.extend(step)
219 items = cls.collection().aggregate(pipeline)
220 if items is None:
221 return None
222 return [cls.mongo_dict_to_object(d) for d in items]
224 @classmethod
225 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
226 pipeline = []
227 if attribute_name in cls.custom_pipeline_steps:
228 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
229 pipeline.append({"$match": {attribute_name: attribute_value}})
230 pipeline.append({"$limit": 1})
231 for key, step in cls.custom_pipeline_steps.items():
232 if key != attribute_name:
233 pipeline.extend(step)
234 items = cls.collection().aggregate(pipeline).to_list()
235 if len(items) == 0:
236 return None
237 return cls.mongo_dict_to_object(items[0])
239 @classmethod
240 def get_all(cls, sort_by="_id") -> list[Self]:
241 items = []
242 pipeline = []
243 if sort_by in cls.custom_pipeline_steps:
244 pipeline.extend(cls.custom_pipeline_steps[sort_by])
245 pipeline.append({"$sort": {sort_by: ASCENDING}})
246 for key, step in cls.custom_pipeline_steps.items():
247 if key != sort_by:
248 pipeline.extend(step)
249 for dict_ in cls.collection().aggregate(pipeline):
250 items.append(cls.mongo_dict_to_object(dict_))
251 return items
253 @classmethod
254 def get_number_documents(cls):
255 collection = get_collection(systems_database, cls.collection_name)
256 if collection is None:
257 return 0
258 return collection.count_documents({})
260 def insert(self):
261 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
262 self.id = str(insert_result.inserted_id)
263 return self.id
265 def update(self, update_dict):
266 for key, value in update_dict.items():
267 setattr(self, key, value)
268 self.collection().find_one_and_update(
269 {"_id": ObjectId(self.id)},
270 {"$set": update_dict},
271 return_document=ReturnDocument.AFTER,
272 )
274 return self
276 def delete(self):
277 result = self.collection().delete_one({"_id": ObjectId(self.id)})
278 return result.deleted_count > 0
281class User(GenericMongo):
282 collection_name: ClassVar[str] = "users"
284 firstname: str
285 lastname: str
286 email: str
287 password: str
288 is_active: bool | None = False
289 is_admin: bool | None = False
290 is_connected: bool | None = False
291 company_id: str | None = None
293 def to_dict(self, exclude=None):
294 if exclude is None:
295 exclude = {"password"}
296 return GenericMongo.to_dict(self, exclude=exclude)
298 @classmethod
299 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
300 users = cls.get_all()
301 if not users:
302 is_admin = True
303 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
304 user_collection = get_collection(systems_database, "users", create=True)
305 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
306 if new_user is None:
307 return None
308 return {"user_id": str(new_user.inserted_id)}
310 @classmethod
311 def update_info(cls, user: "UserUpdate", user_id: str):
312 updated_user = cls.collection().find_one_and_update(
313 {"_id": ObjectId(user_id)},
314 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
315 return_document=ReturnDocument.AFTER,
316 )
317 updated_user["id"] = str(updated_user["_id"])
318 del (updated_user["_id"], updated_user["is_connected"])
319 return cls(**updated_user)
322UserUpdate = create_update_model(User)
325class Mode(TwinPadModel):
326 mode_id: int
327 name: str
328 frequency_multiplier: float
329 min_frequency: float
332class DeviceUpdate(TwinPadModel):
333 mode_id: int
336class Device(GenericMongo):
337 collection_name: ClassVar[str] = "devices"
339 device_id: str
340 name: str
341 description: str = ""
342 modes: list[Mode]
343 current_mode_id: int | None = None
344 last_ping: float | None = None
345 petri_network: Any
346 pid: Any
347 load: float | None = None
348 tokens: list[int] = Field(default_factory=list)
349 status: str
351 async def change_mode(self, update_dict, current_user: User):
352 has_error = False
354 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
355 has_error = True
356 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
357 elif self.current_mode_id is not None:
358 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
359 else:
360 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
361 command = Command(
362 sent_at=time.time(),
363 command_type="Mode change",
364 description=description,
365 user_id=current_user.id,
366 )
368 if has_error:
369 command.response_time = 0
370 command.succeeded = False
371 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
372 else:
373 response = await send_mode_change(self.device_id, update_dict.mode_id)
374 command.receive_response(response)
376 Command.create(command)
377 return response
379 @classmethod
380 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
381 devices_by_id = {}
382 for signal_id in signal_ids:
383 device_id = signal_id.split(".")[0]
384 if device_id not in devices_by_id:
385 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
386 return devices_by_id
389class DeviceSetup(GenericMongo):
390 collection_name: ClassVar[str] = "device_setups"
392 device_ids: list[str]
393 active: bool = False
394 variable_mapping: dict[str, str]
397DeviceSetupUpdate = create_update_model(DeviceSetup)
400class DeviceState(GenericMongo):
401 collection_name: ClassVar[str] = "devices_states"
403 timestamp: float
404 mode: str | None = None
405 load: float | None = None
406 tokens: list[int] = Field(default_factory=list)
407 modified_properties: list[str] = Field(default_factory=list)
409 @classmethod
410 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
411 req_filter = query.mongodb_filter()
412 items = []
413 if ":" in query.sort_by:
414 sort_field, sort_order = query.sort_by.split(":")
415 sort_order = int(sort_order)
416 else:
417 sort_field = query.sort_by
418 sort_order = 1
419 collection = get_collection(devices_states_database, device_id)
420 if collection is None:
421 total = 0
422 cursor = []
423 else:
424 total = collection.count_documents(req_filter)
425 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
426 if (query.limit is not None) and (query.limit != 0):
427 cursor = cursor.limit(query.limit)
428 for item_dict in cursor:
429 items.append(
430 cls(
431 timestamp=item_dict.get("precise_timestamp"),
432 mode=item_dict.get("mode", None),
433 load=item_dict.get("load", None),
434 tokens=item_dict.get("tokens", Field(default_factory=list)),
435 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
436 )
437 )
438 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
441class SignalSample(TwinPadModel):
442 signal_id: str
443 timestamp: float
444 value: float | int | str | bool | None
445 forced_value: float | int | str | bool | None = None
447 @classmethod
448 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
450 collection = get_signal_collection(signal_id)
451 if collection is None:
452 return None
454 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
455 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
456 bucket = get_signal_collection(f"system.buckets.{signal_id}")
457 first_bucket = None
458 if bucket is not None:
459 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
460 if first_bucket is not None:
461 sample_data = collection.find_one(
462 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
463 )
464 else:
465 sample_data = collection.find_one({}, sort=[("precise_timestamp", 1)])
467 if sample_data is None:
468 return None
470 timestamp = sample_data["precise_timestamp"]
472 return cls(
473 signal_id=signal_id,
474 timestamp=timestamp,
475 value=sample_data.get("value", None),
476 forced_value=sample_data.get("forced_value", None),
477 )
479 @classmethod
480 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
481 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
483 @classmethod
484 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
486 collection = get_signal_collection(signal_id)
487 if collection is None:
488 return None
490 sample_data = collection.find_one({}, sort=[("precise_timestamp", -1)])
492 if sample_data is None:
493 return None
495 timestamp = sample_data["precise_timestamp"]
497 if device is None:
498 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
499 if device is not None and device.last_ping is not None:
500 if timestamp is None:
501 timestamp = device.last_ping
502 else:
503 timestamp = max(timestamp, device.last_ping)
504 return cls(
505 signal_id=signal_id,
506 timestamp=timestamp,
507 value=sample_data.get("value", None),
508 forced_value=sample_data.get("forced_value", None),
509 )
511 @classmethod
512 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None:
513 collection = get_signal_collection(signal_id)
514 if collection is None:
515 return None
517 sample_data = collection.find_one(
518 {"precise_timestamp": {"$gte": min_timestamp}}, sort=[("precise_timestamp", -1)]
519 )
520 if sample_data is None:
521 return None
523 return cls(
524 signal_id=signal_id,
525 timestamp=sample_data.get("precise_timestamp"),
526 value=sample_data.get("value"),
527 forced_value=sample_data.get("forced_value"),
528 )
530 @classmethod
531 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
532 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
533 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
535 @classmethod
536 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None:
537 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids]
540class SignalData(TwinPadModel):
541 signal_id: str
542 forcible: bool = True
543 time_vector: list[float]
544 values: list[float | int | str | None]
545 forced_values: list[float | int | str | None]
547 data_start: float | None = None
548 data_end: float | None = None
550 number_samples: int = 0
551 number_samples_db: int = 0
553 db_query_time: float = 0.0
554 init_time: float = 0.0
555 data_processing_time: float = 0.0
557 @classmethod
558 def get_from_signal_id(
559 cls,
560 signal_id: str,
561 min_timestamp: float = None,
562 max_timestamp: float = None,
563 window_min_timestamp: float = None,
564 window_max_timestamp: float = None,
565 interpolate_bounds: bool = True,
566 max_documents: int = None,
567 ) -> Self:
569 now = time.time()
571 req_signal = {}
572 if min_timestamp is not None:
573 req_signal.setdefault("timestamp", {})
574 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
575 if max_timestamp is not None:
576 req_signal.setdefault("timestamp", {})
577 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
579 collection = get_signal_collection(signal_id)
580 if collection is None:
581 return cls(
582 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
583 )
585 db_req_start = time.time()
587 sort_step = {"$sort": {"precise_timestamp": 1}}
588 number_results = collection.count_documents(req_signal)
590 pipeline = []
591 if req_signal:
592 pipeline.append({"$match": req_signal}) # Filter data if needed
594 pipeline.extend(
595 [
596 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
597 sort_step,
598 ]
599 )
601 if max_documents is not None and max_documents < number_results:
602 unsampling_ratio = math.ceil(number_results / max_documents)
603 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
604 pipeline.extend(
605 [
606 {
607 "$setWindowFields": {
608 "sortBy": {"precise_timestamp": 1},
609 "output": {"index": {"$documentNumber": {}}},
610 }
611 },
612 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
613 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
614 {"$replaceRoot": {"newRoot": "$doc"}},
615 {"$unset": ["index", "group_id"]},
616 {"$sort": {"precise_timestamp": 1}},
617 ]
618 )
620 # logger.info(f"pipeline: %s", str(pipeline))
621 cursor = collection.aggregate(pipeline)
622 db_req_time = time.time() - db_req_start
624 init_time = time.time()
626 results = cursor.to_list()
627 time_vector = []
628 values = []
629 forced_values = []
630 for s in results:
631 time_vector.append(s["precise_timestamp"])
632 values.append(s.get("value", None))
633 forced_values.append(s.get("forced_value", None))
635 signal = Signal.get_from_signal_id(signal_id)
636 class_ = signal.signal_data_class
638 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
639 time_vector, values, forced_values = cls.interpolate_bounds(
640 class_,
641 collection,
642 signal_id,
643 time_vector,
644 values,
645 forced_values,
646 window_min_timestamp,
647 window_max_timestamp,
648 )
650 if values:
651 # TODO: check below. a bit strange
652 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
653 # Adding last value as it should be repeated
654 time_vector.append(now)
655 values.append(values[-1])
656 forced_values.append(forced_values[-1])
658 init_time = time.time() - init_time
660 # See line 292 for explanation
661 bucket = get_signal_collection(f"system.buckets.{signal_id}")
662 first_bucket = None
663 if bucket is not None:
664 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
665 if first_bucket is not None:
666 data_start = first_bucket["control"]["min"]["precise_timestamp"]
667 else:
668 data_start = None
670 last_bucket = None
671 if bucket is not None:
672 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
673 if last_bucket is not None:
674 data_end = last_bucket["control"]["max"]["precise_timestamp"]
675 else:
676 data_end = None
678 return class_(
679 signal_id=signal_id,
680 forcible=signal.forcible,
681 time_vector=time_vector,
682 values=values,
683 forced_values=forced_values,
684 data_start=data_start,
685 data_end=data_end,
686 number_samples=len(values),
687 number_samples_db=number_results,
688 db_query_time=db_req_time,
689 init_time=init_time,
690 )
692 @staticmethod
693 def interpolate_bounds(
694 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
695 ):
696 sample_right = None
697 # Fetching right side value & interpolation
698 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
699 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
700 sample_right = collection.find_one(
701 {
702 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
703 "value": {"$exists": True},
704 },
705 sort=[("precise_timestamp", -1)],
706 )
707 if sample_right:
708 if time_vector:
709 right_sd = class_(
710 signal_id=signal_id,
711 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
712 values=[values[-1], sample_right.get("value", None)],
713 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
714 )
715 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
716 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
717 else:
718 max_ts_value = sample_right.get("value", None)
719 max_ts_forced_value = sample_right.get("forced_value", None)
720 time_vector.append(window_max_timestamp)
721 values.append(max_ts_value)
722 forced_values.append(max_ts_forced_value)
724 # Fetching left side value & interpolation
725 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
726 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
727 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
728 sample_left = sample_right
729 sample_left = collection.find_one(
730 {
731 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
732 "value": {"$exists": True},
733 },
734 sort=[("precise_timestamp", -1)],
735 )
737 if sample_left:
738 if time_vector:
739 left_sd = class_(
740 signal_id=signal_id,
741 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
742 values=[sample_left["value"], values[0]],
743 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
744 )
745 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
746 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
747 else:
748 min_ts_value = sample_left.get("value", None)
749 min_ts_forced_value = sample_left.get("forced_value", None)
750 time_vector.insert(0, window_min_timestamp)
751 values.insert(0, min_ts_value)
752 forced_values.insert(0, min_ts_forced_value)
754 return time_vector, values, forced_values
756 def interpolate_values(self, new_time_vector: list[float]):
757 return self.interpolate(new_time_vector, self.values)
759 def interpolate_forced_values(self, new_time_vector: list[float]):
760 return self.interpolate(new_time_vector, self.forced_values)
762 def uniform_desampling(self, number_samples_max: int) -> Self:
763 data_processing_time = time.time()
764 if number_samples_max and self.number_samples > number_samples_max:
765 new_time_vector = npy.linspace(
766 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
767 ).tolist()
768 values = self.interpolate_values(new_time_vector)
769 forced_values = self.interpolate_forced_values(new_time_vector)
770 time_vector = new_time_vector
771 number_samples = len(time_vector)
772 else:
773 time_vector = self.time_vector
774 number_samples = len(self.values)
775 values = self.values[:]
776 forced_values = self.forced_values[:]
777 data_processing_time = time.time() - data_processing_time
779 return self.__class__(
780 signal_id=self.signal_id,
781 time_vector=time_vector,
782 values=values,
783 forced_values=forced_values,
784 number_samples=number_samples,
785 number_samples_db=self.number_samples,
786 data_start=self.data_start,
787 data_end=self.data_end,
788 db_query_time=self.db_query_time,
789 init_time=self.init_time,
790 data_processing_time=self.data_processing_time + data_processing_time,
791 )
793 def interest_window_desampling(
794 self,
795 window_max_number_samples: int,
796 outside_max_number_samples: int,
797 window_min_timestamp: float | None = None,
798 window_max_timestamp: float | None = None,
799 ) -> Self:
800 """Performs a sampling in a window of interest and outside."""
802 if not self.time_vector:
803 return self
805 if window_min_timestamp is None:
806 window_min_timestamp = self.time_vector[0]
807 if window_max_timestamp is None:
808 window_max_timestamp = self.time_vector[-1]
810 data_processing_time = time.time()
812 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
813 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
815 time_vector_before = self.time_vector[:index_window_start]
816 time_vector_window = self.time_vector[index_window_start:index_window_end]
817 time_vector_after = self.time_vector[index_window_end:]
819 # Resampling window
820 if time_vector_window:
821 # Ensurring window bounds
822 if time_vector_window[0] != window_min_timestamp:
823 time_vector_window.insert(0, window_min_timestamp)
824 if time_vector_window[-1] != window_max_timestamp:
825 time_vector_window.append(window_max_timestamp)
826 else:
827 time_vector_window = [window_min_timestamp, window_max_timestamp]
829 if len(time_vector_window) > window_max_number_samples:
830 # Resampling
831 new_window_time_vector = npy.linspace(
832 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
833 ).tolist()
834 time_vector_window = new_window_time_vector
836 # Resampling outside
837 number_samples_before = len(time_vector_before)
838 number_samples_after = len(time_vector_after)
839 if (number_samples_before + number_samples_after) > outside_max_number_samples:
840 new_number_samples_before = min(
841 number_samples_before,
842 math.ceil(
843 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
844 ),
845 )
846 new_number_samples_after = min(
847 number_samples_after,
848 math.ceil(
849 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
850 ),
851 )
852 # Adjusting numbers as math.ceil can do +1 on sum
853 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
854 if new_number_samples_before > new_number_samples_after:
855 new_number_samples_before -= 1
856 else:
857 new_number_samples_after -= 1
859 if new_number_samples_before > 0:
860 new_time_vector_before = npy.linspace(
861 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
862 ).tolist()
863 time_vector_before = new_time_vector_before
865 if new_number_samples_after > 0:
866 new_time_vector_after = npy.linspace(
867 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
868 ).tolist()[::-1]
869 time_vector_after = new_time_vector_after
871 new_time_vector = time_vector_before + time_vector_window + time_vector_after
872 values = self.interpolate_values(new_time_vector)
873 forced_values = self.interpolate_forced_values(new_time_vector)
874 number_samples = len(values)
876 data_processing_time = time.time() - data_processing_time
878 return self.__class__(
879 signal_id=self.signal_id,
880 forcible=self.forcible,
881 time_vector=new_time_vector,
882 values=values,
883 forced_values=forced_values,
884 number_samples=number_samples,
885 number_samples_db=self.number_samples,
886 data_start=self.data_start,
887 data_end=self.data_end,
888 db_query_time=self.db_query_time,
889 init_time=self.init_time,
890 data_processing_time=self.data_processing_time + data_processing_time,
891 )
893 def csv_export(self):
894 output = io.StringIO()
895 writer = csv.writer(output)
896 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
897 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
898 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
899 return output.getvalue().encode("utf-8")
901 def prestoplot_export(self):
902 clean_signal_id = self.signal_id.replace(".", "_")
903 if clean_signal_id[0].isnumeric():
904 clean_signal_id = "_" + clean_signal_id
906 output = io.StringIO()
907 output.write("# Encoding:\tUTF-8\n")
908 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
909 output.write("ISO8601\tnone\tnone\n")
910 output.write(f"# Description :\t{clean_signal_id}\n")
912 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
913 output.write(
914 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
915 )
916 return output.getvalue().encode("utf-8")
919class NumericSignalData(SignalData):
920 data_type: str = "float"
921 values: list[float | int | None]
922 forced_values: list[float | int | None]
924 def interpolate(self, new_time_vector: list[float], items):
925 items = [npy.nan if s is None else s for s in items]
926 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
928 def uniform_desampling(self, number_samples_max: int) -> Self:
929 data_processing_time = time.time()
930 if number_samples_max and self.number_samples > number_samples_max:
931 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
932 forced_values = self.interpolate_forced_values(time_vector)
933 number_samples = len(time_vector)
934 else:
935 time_vector = self.time_vector
936 number_samples = len(self.values)
937 values = self.values[:]
938 forced_values = self.forced_values[:]
939 data_processing_time = time.time() - data_processing_time
941 return self.__class__(
942 signal_id=self.signal_id,
943 time_vector=time_vector,
944 values=values,
945 forced_values=forced_values,
946 number_samples=number_samples,
947 number_samples_db=self.number_samples,
948 data_start=self.data_start,
949 data_end=self.data_end,
950 db_query_time=self.db_query_time,
951 init_time=self.init_time,
952 data_processing_time=self.data_processing_time + data_processing_time,
953 )
955 def interest_window_desampling(
956 self,
957 window_max_number_samples: int,
958 outside_max_number_samples: int,
959 window_min_timestamp: float | None = None,
960 window_max_timestamp: float | None = None,
961 ) -> Self:
962 """Performs a sampling in a window of interest and outside."""
964 if not self.time_vector:
965 return self
967 if window_min_timestamp is None:
968 window_min_timestamp = self.time_vector[0]
969 if window_max_timestamp is None:
970 window_max_timestamp = self.time_vector[-1]
972 data_processing_time = time.time()
974 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
975 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
977 time_vector_before = self.time_vector[:index_window_start]
978 time_vector_window = self.time_vector[index_window_start:index_window_end]
979 time_vector_after = self.time_vector[index_window_end:]
981 values_before = self.values[:index_window_start]
982 values_window = self.values[index_window_start:index_window_end]
983 values_after = self.values[index_window_end:]
984 window_min_value = self.interpolate_values([window_min_timestamp])[0]
985 window_max_value = self.interpolate_values([window_max_timestamp])[0]
987 # Resampling window
988 if time_vector_window:
989 # Ensurring window bounds
990 if time_vector_window[0] != window_min_timestamp:
991 time_vector_window.insert(0, window_min_timestamp)
992 values_window.insert(0, window_min_value)
993 if time_vector_window[-1] != window_max_timestamp:
994 time_vector_window.append(window_max_timestamp)
995 values_window.append(window_max_value)
996 else:
997 time_vector_window = [window_min_timestamp, window_max_timestamp]
998 values_window = [window_min_value, window_max_value]
1000 if len(time_vector_window) > window_max_number_samples:
1001 # Resampling
1002 time_vector_window, values_window = downsample_list(
1003 time_vector_window, values_window, window_max_number_samples
1004 )
1006 # Resampling outside
1007 number_samples_before = len(time_vector_before)
1008 number_samples_after = len(time_vector_after)
1009 if (number_samples_before + number_samples_after) > outside_max_number_samples:
1010 new_number_samples_before = min(
1011 number_samples_before,
1012 math.ceil(
1013 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1014 ),
1015 )
1016 new_number_samples_after = min(
1017 number_samples_after,
1018 math.ceil(
1019 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1020 ),
1021 )
1022 # Adjusting numbers as math.ceil can do +1 on sum
1023 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1024 if new_number_samples_before > new_number_samples_after:
1025 new_number_samples_before -= 1
1026 else:
1027 new_number_samples_after -= 1
1029 if new_number_samples_before > 0:
1030 time_vector_before, values_before = downsample_list(
1031 time_vector_before, values_before, new_number_samples_before
1032 )
1034 if new_number_samples_after > 0:
1035 time_vector_after, values_after = downsample_list(
1036 time_vector_after, values_after, new_number_samples_after
1037 )
1039 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1040 values = values_before + values_window + values_after
1041 forced_values = self.interpolate_forced_values(new_time_vector)
1042 number_samples = len(values)
1044 data_processing_time = time.time() - data_processing_time
1046 return self.__class__(
1047 signal_id=self.signal_id,
1048 time_vector=new_time_vector,
1049 values=values,
1050 forced_values=forced_values,
1051 number_samples=number_samples,
1052 number_samples_db=self.number_samples,
1053 data_start=self.data_start,
1054 data_end=self.data_end,
1055 db_query_time=self.db_query_time,
1056 init_time=self.init_time,
1057 data_processing_time=self.data_processing_time + data_processing_time,
1058 )
1061class StringSignalData(SignalData):
1062 data_type: str = "str"
1063 values: list[str | None]
1064 forced_values: list[str | None]
1066 def interpolate(self, new_time_vector: list[float], items):
1067 # Find the indices of the values in xp that are just smaller or equal to x
1068 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1069 indices = npy.clip(indices, 0, len(items) - 1)
1070 # Return the corresponding left string values from fp
1071 return [items[i] for i in indices]
1074class SignalsData(TwinPadModel):
1075 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1076 data_processing_time: float
1077 data_start: float | None
1078 data_end: float | None
1080 @classmethod
1081 def get_from_signal_ids(
1082 cls,
1083 signal_ids: list[str],
1084 min_timestamp: float = None,
1085 max_timestamp: float = None,
1086 window_min_timestamp: float = None,
1087 window_max_timestamp: float = None,
1088 interpolate_bounds: bool = True,
1089 max_documents: int = None,
1090 ) -> Self:
1091 signals_data = []
1092 data_start = None
1093 data_end = None
1094 if max_timestamp is None:
1095 max_timestamp = time.time()
1096 data_processing_time = 0.0
1097 for signal_id in signal_ids:
1098 signal_data = SignalData.get_from_signal_id(
1099 signal_id=signal_id,
1100 min_timestamp=min_timestamp,
1101 max_timestamp=max_timestamp,
1102 window_min_timestamp=window_min_timestamp,
1103 window_max_timestamp=window_max_timestamp,
1104 interpolate_bounds=interpolate_bounds,
1105 max_documents=max_documents,
1106 )
1107 data_processing_time += signal_data.data_processing_time
1108 signals_data.append(signal_data)
1109 if signal_data.data_start is not None:
1110 if data_start is None:
1111 data_start = signal_data.data_start
1112 else:
1113 data_start = min(signal_data.data_start, data_start)
1114 if signal_data.data_end is not None:
1115 if data_end is None:
1116 data_end = signal_data.data_end
1117 else:
1118 data_end = max(signal_data.data_end, data_end)
1120 return cls(
1121 signals_data=signals_data,
1122 data_processing_time=data_processing_time,
1123 data_start=data_start,
1124 data_end=data_end,
1125 )
1127 def uniform_desampling(self, number_samples_max: int) -> Self:
1128 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1129 return SignalsData(
1130 signals_data=signals_data,
1131 data_processing_time=sum(s.data_processing_time for s in signals_data),
1132 data_start=self.data_start,
1133 data_end=self.data_end,
1134 )
1136 def interest_window_desampling(
1137 self,
1138 window_max_number_samples: int,
1139 outside_max_number_samples: int,
1140 window_min_timestamp: float = None,
1141 window_max_timestamp: float = None,
1142 ) -> Self:
1143 signals_data = [
1144 s.interest_window_desampling(
1145 window_max_number_samples=window_max_number_samples,
1146 outside_max_number_samples=outside_max_number_samples,
1147 window_min_timestamp=window_min_timestamp,
1148 window_max_timestamp=window_max_timestamp,
1149 )
1150 for s in self.signals_data
1151 ]
1153 return SignalsData(
1154 signals_data=signals_data,
1155 data_processing_time=sum(s.data_processing_time for s in signals_data),
1156 data_start=self.data_start,
1157 data_end=self.data_end,
1158 )
1160 def zip_export(self, file_format: str = "csv"):
1161 # return self.signals_data[0].csv_export()
1162 zip_buffer = io.BytesIO()
1163 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1164 for signal_data in self.signals_data:
1165 if file_format == "csv":
1166 export_io = signal_data.csv_export()
1167 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1168 elif file_format == "prestoplot":
1169 export_io = signal_data.prestoplot_export()
1170 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1171 else:
1172 raise ValueError(f"Format not found. Got: {file_format}")
1173 zip_bytes = zip_buffer.getvalue()
1174 # zip_bytes.seek(0)
1175 return zip_bytes
1177 def hdf5_export(self):
1178 hdf5_buffer = io.BytesIO()
1179 custom_type_float = npy.dtype(
1180 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1181 )
1182 custom_type_string = npy.dtype(
1183 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1184 )
1185 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1186 for signal_data in self.signals_data:
1187 signal_group = hdf5_file.create_group(signal_data.signal_id)
1188 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1189 if signal_data.data_type == "str":
1190 export_data = npy.array(
1191 list(
1192 zip(
1193 date_vector,
1194 signal_data.time_vector,
1195 signal_data.values,
1196 signal_data.forced_values,
1197 )
1198 ),
1199 dtype=custom_type_string,
1200 )
1201 else:
1202 export_data = npy.array(
1203 list(
1204 zip(
1205 date_vector,
1206 signal_data.time_vector,
1207 signal_data.values,
1208 signal_data.forced_values,
1209 )
1210 ),
1211 dtype=custom_type_float,
1212 )
1213 signal_group["data"] = export_data
1214 return hdf5_buffer.getvalue()
1217class SignalStatus(TwinPadModel):
1218 status: str
1219 reason: str
1220 delay: float | None
1223class DigitizationFunction(TwinPadModel):
1224 bits: int | None = None
1225 min_value: float
1226 max_value: float
1227 min_raw_value: float
1228 max_raw_value: float
1231class SignalUpdate(TwinPadModel):
1232 value: float | str | bool | int | None = None
1233 forced_value: float | str | bool | int | None = None
1234 timestamp: int | None = None
1237class SignalType(str, Enum):
1238 command = "command"
1239 sensor = "sensor"
1240 external_sensor = "external_sensor"
1243SIGNALDATA_TYPES = {
1244 "int": NumericSignalData,
1245 "float": NumericSignalData,
1246 "str": StringSignalData,
1247 "bool": NumericSignalData,
1248 "epoch": NumericSignalData,
1249}
1252class Signal(GenericMongo):
1253 collection_name: ClassVar[str] = "signals"
1255 signal_id: str
1256 frequency: float
1257 unit: str | None
1258 description: str
1259 type: SignalType
1260 data_type: str
1261 precision_digits: int | None
1262 forcible: bool
1264 digitization_function: DigitizationFunction | None
1266 @property
1267 def device(self) -> Device:
1268 device_id = self.signal_id.split(".")[0]
1269 device = Device.get_one_by_attribute("device_id", device_id)
1270 return device
1272 @cached_property
1273 def signal_data_class(self):
1274 if self.data_type in SIGNALDATA_TYPES:
1275 return SIGNALDATA_TYPES[self.data_type]
1276 if self.data_type.startswith("enum"):
1277 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1278 raise ValueError(f"Unhandled python type: {self.data_type}")
1280 @cached_property
1281 def python_type(self):
1282 if self.data_type in TYPES:
1283 return TYPES[self.data_type]
1284 if self.data_type.startswith("enum"):
1285 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1286 return Literal[*choices]
1287 raise ValueError(f"Unhandled python type: {self.data_type}")
1289 @computed_field
1290 @property
1291 def status(self) -> SignalStatus:
1292 now = time.time()
1293 status = "up"
1294 reason = ""
1296 collection = get_signal_collection(self.signal_id)
1297 last_sample = collection.find_one({}, sort=[("precise_timestamp", -1)])
1298 if last_sample is None:
1299 status = "no data"
1300 reason = "signal does not exist"
1301 return SignalStatus(status=status, reason=reason, delay=None)
1303 try:
1304 last_date = last_sample.get("timestamp")
1305 last_date = last_date.replace(tzinfo=pytz.UTC)
1306 last_value_ts = last_date.timestamp()
1307 except IndexError:
1308 last_value_ts = None
1310 if last_value_ts is None:
1311 delay = None
1312 reason = "No data from signal"
1313 else:
1314 # Since device is a computed property, only compute it once
1315 device = self.device
1316 if device is not None and device.last_ping is not None:
1317 last_value_ts = max(last_value_ts, device.last_ping)
1318 delay = now - last_value_ts
1319 if delay > DEVICE_TIMEOUT:
1320 status = "down"
1321 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1322 return SignalStatus(status=status, reason=reason, delay=delay)
1324 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1325 command = Command(
1326 sent_at=time.time(),
1327 command_type="Signal command",
1328 user_id=current_user.id,
1329 )
1331 has_input_error = False
1332 error_message = ""
1334 if self.data_type.startswith("enum"):
1335 enum_options = get_args(self.python_type)
1337 if update_dict.value is not None and update_dict.value not in enum_options:
1338 has_input_error = True
1339 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1340 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1341 has_input_error = True
1342 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1343 else:
1344 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1345 has_input_error = True
1346 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1347 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1348 has_input_error = True
1349 error_message += (
1350 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1351 )
1353 if has_input_error:
1354 command.response_time = 0
1355 command.succeeded = False
1356 command.description = f"Tried to modify signal {self.signal_id}"
1357 response = {"error": True, "status_code": 400, "message": error_message}
1358 else:
1359 response = await send_signal_value(self.signal_id, update_dict)
1360 command.receive_response(response)
1362 Command.create(command)
1363 return response
1365 @classmethod
1366 def get_from_signal_id(cls, signal_id) -> Self:
1367 """Could be generic from mongo"""
1368 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1369 if not raw_value:
1370 return None
1371 del raw_value["_id"]
1372 return cls.dict_to_object(raw_value)
1374 @classmethod
1375 def get_all_ids(cls) -> list[str]:
1376 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1378 return [signal["signal_id"] for signal in cursor]
1380 async def number_samples(self):
1381 collection = get_signal_collection(signal_id=self.signal_id)
1382 if collection is None:
1383 return 0
1385 number_samples = collection.estimated_document_count()
1387 number_samples_async_collection = await get_async_collection(
1388 systems_async_database, "number_samples", create=True, time_series=True
1389 )
1391 loop = asyncio.get_running_loop()
1392 loop.create_task(
1393 number_samples_async_collection.insert_one(
1394 {
1395 "timestamp": datetime.datetime.now(pytz.UTC),
1396 "signal_id": self.signal_id,
1397 "number_samples": number_samples,
1398 }
1399 )
1400 )
1402 return number_samples
1404 def sample_datasize(self):
1405 return signals_database.command("collstats", self.signal_id)["size"]
1407 @classmethod
1408 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1409 result = cls.collection().aggregate(
1410 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1411 )
1413 return {signal["signal_id"]: signal["forcible"] for signal in result}
1416class ServicesStatus(TwinPadModel):
1417 backend: str
1418 cloud_broker: str
1419 time_series_database: str
1420 signal_storage: str
1421 heartbeat_storage: str
1422 data_analyzer: str
1424 @classmethod
1425 def check(cls) -> Self:
1426 return cls(
1427 cloud_broker=ping(RABBITMQ_HOST),
1428 backend="up",
1429 time_series_database=ping(MONGO_HOST),
1430 signal_storage=ping(SIGNAL_STORAGE_HOST),
1431 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1432 data_analyzer=ping(DATA_ANALYZER_HOST),
1433 )
1436def ping(host):
1437 try:
1438 if ping3.ping(host, timeout=0.8):
1439 return "up"
1440 except PermissionError:
1441 pass
1442 return "down"
1445class Event(GenericMongo):
1446 collection_name: ClassVar[str] = "events"
1448 name: str
1449 timestamp: float
1450 event_rule_id: str
1452 @computed_field
1453 @cached_property
1454 def event_rule(self) -> "EventRule":
1455 return EventRule.get_from_id(self.event_rule_id)
1457 @classmethod
1458 def dict_to_object(cls, dict_):
1459 """Refine to convert timestamp to datetime for mongodb."""
1460 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1461 return super().dict_to_object(dict_)
1464class TwinPadActivity(GenericMongo):
1465 timestamp: float
1466 amount: int
1468 @classmethod
1469 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
1470 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1471 number_events_collection = get_collection(systems_database, "number_events")
1472 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1473 items = []
1474 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1475 if number_events_collection is None or recompute_amount:
1476 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1477 number_events_collection.delete_many({})
1478 first_event = events_collection.find_one(sort={"timestamp": 1})
1479 if first_event is None:
1480 return items
1481 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1482 tzinfo=pytz.UTC
1483 )
1484 while last_computed_day < TODAY:
1485 day_nb_events = events_collection.count_documents(
1486 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1487 )
1488 if day_nb_events > 0:
1489 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
1490 last_computed_day += ONE_DAY_OFFSET
1491 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1492 if number_events_today > 0:
1493 number_events_collection.delete_many({"timestamp": TODAY})
1494 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
1495 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1496 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1497 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1498 for day in number_events:
1499 day["timestamp"] = day["timestamp"].timestamp()
1500 items.append(cls.mongo_dict_to_object(day))
1501 return items
1503 @classmethod
1504 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1505 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1506 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1507 signals_number_samples_collection = get_collection(
1508 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
1509 )
1510 items = []
1511 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1512 if number_samples_collection is None or recompute_amount:
1513 number_samples_collection = get_collection(
1514 systems_database, "number_received_samples", create=True, time_series=True
1515 )
1516 number_samples_collection.delete_many({})
1517 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
1518 if first_sample is None:
1519 return items
1520 # compute from day of first found event
1521 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
1522 tzinfo=pytz.UTC
1523 )
1524 while last_computed_day < TODAY:
1525 number_samples_request = signals_number_samples_collection.aggregate(
1526 [
1527 {
1528 "$match": {
1529 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
1530 }
1531 },
1532 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1533 ]
1534 ).to_list()
1535 if len(number_samples_request) == 0:
1536 number_samples = 0
1537 else:
1538 number_samples = number_samples_request[0].get("number_samples", 0)
1539 if number_samples > 0:
1540 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
1541 last_computed_day += ONE_DAY_OFFSET
1542 number_samples_request = signals_number_samples_collection.aggregate(
1543 [
1544 {"$match": {"timestamp": {"$gte": TODAY}}},
1545 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1546 ]
1547 ).to_list()
1548 if len(number_samples_request) == 0:
1549 number_samples_today = 0
1550 else:
1551 number_samples_today = number_samples_request[0].get("number_samples", 0)
1552 if number_samples_today > 0:
1553 number_samples_collection.delete_many({"timestamp": TODAY})
1554 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
1555 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1556 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1557 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1558 for day in number_events:
1559 day["timestamp"] = day["timestamp"].timestamp()
1560 items.append(cls.mongo_dict_to_object(day))
1561 return items
1563 @classmethod
1564 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1565 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1566 number_commands_collection = get_collection(systems_database, "number_commands")
1567 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
1568 items = []
1569 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1570 if number_commands_collection is None or recompute_amount:
1571 number_commands_collection = get_collection(
1572 systems_database, "number_commands", create=True, time_series=True
1573 )
1574 number_commands_collection.delete_many({})
1575 first_command = commands_collection.find_one(sort={"timestamp": 1})
1576 if first_command is None:
1577 return items
1578 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
1579 tzinfo=pytz.UTC
1580 )
1581 while last_computed_day < TODAY:
1582 day_nb_commands = commands_collection.count_documents(
1583 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1584 )
1585 if day_nb_commands > 0:
1586 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
1587 last_computed_day += ONE_DAY_OFFSET
1588 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
1589 if number_commands_today > 0:
1590 number_commands_collection.delete_many({"timestamp": TODAY})
1591 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
1592 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1593 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1594 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1595 for day in number_commands:
1596 day["timestamp"] = day["timestamp"].timestamp()
1597 items.append(cls.mongo_dict_to_object(day))
1598 return items
1601class EventRule(GenericMongo):
1602 collection_name: ClassVar[str] = "event_rules"
1604 name: str
1605 formula: str
1606 variables: list[str]
1608 @computed_field
1609 @cached_property
1610 def number_events(self) -> int:
1611 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1614class Company(GenericMongo):
1615 collection_name: ClassVar[str] = "companies"
1616 name: str
1619class Campaign(GenericMongo):
1620 collection_name: ClassVar[str] = "campaigns"
1622 # Properties
1623 id: str | None = None
1624 name: str
1625 description: str | None = None
1627 @classmethod
1628 def create(cls, campaign: Self):
1629 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1630 if new_campaign is None:
1631 return None
1632 return {"campaign_id": str(new_campaign.inserted_id)}
1634 @classmethod
1635 def update(cls, campaign: Self):
1636 updated_campaign = cls.collection().find_one_and_update(
1637 {"_id": ObjectId(campaign.id)},
1638 {"$set": {"name": campaign.name, "description": campaign.description}},
1639 return_document=ReturnDocument.AFTER,
1640 )
1641 return updated_campaign
1643 @classmethod
1644 def delete(cls, campaign_id):
1645 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1646 return deleted_user
1649class Phase(GenericMongo):
1650 collection_name: ClassVar[str] = "phases"
1652 # Properties
1653 id: str | None = None
1654 name: str
1655 description: str | None = None
1656 start_at: float
1657 end_at: float
1659 # FK
1660 campaign_id: str
1662 # @classmethod
1663 # def get_by_date(cls, datetime: float):
1664 # phases = []
1665 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1666 # phases.append(cls.dict_to_object(dict_).model_dump())
1667 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1668 # phases.append(cls.dict_to_object(dict_).model_dump())
1669 # if phases is None:
1670 # return None
1671 # return phases
1673 @classmethod
1674 def create(cls, phase: Self):
1675 phase = Phase(
1676 name=phase.name,
1677 description=phase.description,
1678 start_at=phase.start_at,
1679 end_at=phase.end_at,
1680 campaign_id=phase.campaign_id,
1681 )
1682 phase_collection = get_collection(systems_database, "phases", create=True)
1683 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1684 if new_phase is None:
1685 return None
1686 return {"phase_id": str(new_phase.inserted_id)}
1688 @classmethod
1689 def update(cls, phase: Self):
1690 updated_phase = cls.collection().find_one_and_update(
1691 {"_id": ObjectId(phase.id)},
1692 {
1693 "$set": {
1694 "name": phase.name,
1695 "description": phase.description,
1696 "start_at": phase.start_at,
1697 "end_at": phase.end_at,
1698 }
1699 },
1700 return_document=ReturnDocument.AFTER,
1701 )
1702 return updated_phase
1704 @classmethod
1705 def delete(cls, phase_id):
1706 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1707 return delete_phase
1709 @classmethod
1710 def deleteMany(cls, campaign_id):
1711 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1712 return delete_phases
1715class CustomViewCreation(GenericMongo):
1716 collection_name: ClassVar[str] = "custom_views"
1718 name: str
1719 configuration: list
1722class CustomView(CustomViewCreation):
1723 # Properties
1724 id: str | None = None
1726 # Foreign Key
1727 user_id: str
1729 # # Methods
1730 # @classmethod
1731 # def create(cls, form_custom_view: Self, user_id) -> list:
1732 # custom_view = CustomView(
1733 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1734 # )
1735 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1736 # return new_custom_view
1738 # @classmethod
1739 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1740 # updated_custom_view = cls.collection().find_one_and_update(
1741 # {"_id": ObjectId(custom_view_id)},
1742 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1743 # return_document=ReturnDocument.AFTER,
1744 # )
1745 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1746 # del updated_custom_view["_id"]
1747 # return cls(**updated_custom_view)
1749 # @classmethod
1750 # def delete(cls, custom_view_id) -> bool:
1751 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1752 # return deleted_custom_view.acknowledged
1755CustomViewUpdate = create_update_model(CustomView)
1758class Video(GenericMongo):
1759 collection_name: ClassVar[str] = "videos"
1761 # Properties
1762 name: str
1763 ip_addr: str
1764 username: str | None = None
1765 password: str | None = None
1767 # Methods
1768 @classmethod
1769 def get_all(cls, sort_by="_id") -> list[Self]:
1770 items = []
1771 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1772 items.append(cls.mongo_dict_to_object(dict_))
1773 return items
1775 @classmethod
1776 def get_video(cls, camera_id: ObjectId):
1777 camera = cls.get_from_id(camera_id)
1778 if camera is not None:
1779 return camera.name
1780 return None
1783class Command(GenericMongo):
1784 collection_name: ClassVar[str] = "commands"
1786 # Properties
1787 timestamp: datetime.datetime = None
1788 sent_at: float
1789 response_time: float = 0.0
1790 command_type: str
1791 description: str = ""
1792 succeeded: bool = False
1794 # Foreign key
1795 user_id: str
1797 @classmethod
1798 def collection(cls):
1799 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
1801 @classmethod
1802 def create(cls, command: Self):
1803 command = cls(
1804 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
1805 sent_at=command.sent_at,
1806 response_time=command.response_time,
1807 command_type=command.command_type,
1808 description=command.description,
1809 succeeded=command.succeeded,
1810 user_id=command.user_id,
1811 )
1812 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
1813 if new_command is None:
1814 return None
1815 return {"command_id": str(new_command.inserted_id)}
1817 def receive_response(self, response: dict):
1818 self.response_time = time.time() - self.sent_at
1819 self.succeeded = response.get("error", True) is False
1820 if self.description == "":
1821 self.description += response.get("message", "").rstrip()
1824class SignalsPresetCreation(GenericMongo):
1825 name: str
1826 signal_ids: list[str]
1829class SignalsPreset(SignalsPresetCreation):
1830 collection_name: ClassVar[str] = "signals_presets"
1832 user_id: str
1834 @classmethod
1835 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
1836 signals_preset = cls(
1837 user_id=user_id,
1838 name=signals_preset.name,
1839 signal_ids=signals_preset.signal_ids,
1840 )
1842 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
1844 return str(new_signal_preset.inserted_id)
1847SignalsPresetUpdate = create_update_model(SignalsPreset)
1850class LineStyle(str, Enum):
1851 solid = "solid"
1852 dotted = "dotted"
1853 dashed = "dashed"
1856class SignalAppearance:
1857 value_color: str
1858 forced_value_color: str
1861class GraphThemeCreation(GenericMongo):
1862 collection_name: ClassVar[str] = "graph_themes"
1864 name: str
1865 signal_id: str
1866 value_color: str = ""
1867 forced_value_color: str = ""
1868 value_line_style: LineStyle = LineStyle.solid
1869 forced_value_line_style: LineStyle = LineStyle.solid
1870 private: bool = True
1873class PublicGraphTheme(GraphThemeCreation):
1874 created_by_user: bool
1875 in_user_library: bool
1876 active_for_user: bool
1878 _current_user_id: str = ""
1880 @classproperty
1881 def custom_pipeline_steps(cls) -> dict[str, list]:
1882 return {
1883 "created_by_user": [
1884 {
1885 "$addFields": {
1886 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
1887 }
1888 }
1889 ],
1890 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
1891 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
1892 ],
1893 "in_user_library": [
1894 {
1895 "$addFields": {
1896 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
1897 }
1898 }
1899 ],
1900 "active_for_user": [
1901 {
1902 "$addFields": {
1903 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
1904 }
1905 }
1906 ],
1907 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
1908 "active": [
1909 {
1910 "$addFields": {
1911 "active": "$$REMOVE",
1912 }
1913 }
1914 ],
1915 "creator_id": [
1916 {
1917 "$addFields": {
1918 "creator_id": "$$REMOVE",
1919 }
1920 }
1921 ],
1922 }
1924 @classmethod
1925 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
1926 cls._current_user_id = user_id
1927 return super().response_from_query(query)
1929 @classmethod
1930 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
1931 query.in_user_library = "true"
1932 return cls.response_from_query(query, user_id)
1934 @classmethod
1935 def get_from_id(cls, item_id, user_id: str) -> Self | None:
1936 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
1938 @classmethod
1939 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
1940 cls._current_user_id = user_id
1941 return super().get_by_attribute(attribute_name, attribute_value)
1943 @classmethod
1944 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
1945 cls._current_user_id = user_id
1946 return super().get_one_by_attribute(attribute_name, attribute_value)
1948 @classmethod
1949 def get_all(cls, sort_by: str, user_id: str):
1950 cls._current_user_id = user_id
1951 return super().get_all(sort_by)
1953 @classmethod
1954 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
1955 pipeline = [
1956 {
1957 "$match": {
1958 "active": {"$eq": user_id},
1959 "signal_id": {"$in": signal_ids},
1960 }
1961 },
1962 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
1963 {"$replaceRoot": {"newRoot": "$firstDocument"}},
1964 {
1965 "$project": {
1966 "_id": 0,
1967 "signal_id": 1,
1968 "value_color": 1,
1969 "forced_value_color": 1,
1970 "value_line_style": 1,
1971 "forced_value_line_style": 1,
1972 }
1973 },
1974 ]
1976 result = {}
1978 cursor = cls.collection().aggregate(pipeline)
1979 for document in cursor:
1980 signal_id = document["signal_id"]
1981 del document["signal_id"]
1982 result[signal_id] = document
1984 return result
1987GraphThemeUpdate = create_update_model(PublicGraphTheme)
1990class PrivateGraphTheme(GraphThemeCreation):
1991 # private
1992 creator_id: str
1993 in_library: list[str]
1994 active: list[str]
1996 @classmethod
1997 def create(
1998 cls,
1999 creator_id: str,
2000 name: str,
2001 signal_id: str,
2002 value_color: str,
2003 forced_value_color: str,
2004 value_line_style: LineStyle,
2005 forced_value_line_style: LineStyle,
2006 private: bool,
2007 ):
2008 color_setting = cls(
2009 creator_id=creator_id,
2010 name=name,
2011 signal_id=signal_id,
2012 value_color=value_color,
2013 forced_value_color=forced_value_color,
2014 value_line_style=value_line_style,
2015 forced_value_line_style=forced_value_line_style,
2016 private=private,
2017 in_library=[creator_id],
2018 active=[creator_id],
2019 )
2021 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2022 color_setting.id = str(new_color_setting.inserted_id)
2023 return color_setting
2025 def update(self, update_dict: dict, user_id: str):
2026 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2027 if in_user_lib and user_id not in self.in_library:
2028 self.in_library.append(user_id)
2029 elif not in_user_lib and user_id in self.in_library:
2030 self.in_library.remove(user_id)
2031 update_dict["in_library"] = self.in_library
2032 del update_dict["in_user_library"]
2034 if (active_for_user := update_dict.get("active_for_user")) is not None:
2035 if active_for_user and user_id not in self.active:
2036 self.active.append(user_id)
2037 elif not active_for_user and user_id in self.active:
2038 self.active.remove(user_id)
2039 update_dict["active"] = self.active
2040 del update_dict["active_for_user"]
2042 if update_dict.get("created_by_user") is not None:
2043 del update_dict["created_by_user"]
2045 self.collection().find_one_and_update(
2046 {"_id": ObjectId(self.id)},
2047 {"$set": update_dict},
2048 )
2050 return {}