Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 97%
1154 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-17 10:20 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-17 10:20 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, get_args
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pydantic import BaseModel, computed_field, Field, create_model
21import numpy as npy
22import lttb
23import h5py
25# from scipy import signal as signal_scipy
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 get_signal_collections_batch,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 devices_states_database,
36)
37from twinpad_backend.responses import ListResponse
38from twinpad_backend.messages import send_mode_change, send_signal_value
40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
43RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
44MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
45SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
46HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
47DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
49DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
50NUMBER_SAMPLES_DATABASE_UPDATE = 120
52logger = logging.getLogger("uvicorn.error")
55class classproperty:
56 """
57 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
58 Found here: https://stackoverflow.com/a/76301341
59 """
61 def __init__(self, func):
62 self.fget = func
64 def __get__(self, _, owner):
65 return self.fget(owner)
68def create_update_model(model):
69 fields = {}
71 for field_name, field_annotation in model.model_fields.items():
72 if field_name != "id":
73 fields[field_name] = (field_annotation.annotation | None, None)
75 query_name = model.__name__ + "Update"
76 return create_model(query_name, **fields)
79def get_utc_date_from_timestamp(timestamp: float):
80 return datetime.datetime.fromtimestamp(timestamp).isoformat()
83def downsample_list(time_vector: list, values: list, max_number_samples: int):
84 if len(time_vector) < max_number_samples:
85 return time_vector, values
87 time_vector_copy = copy.deepcopy(time_vector)
88 values_copy = copy.deepcopy(values)
90 none_group_bounds = []
91 none_group_index = -1
92 index = -1
93 # LTTB doesn't handle None values so remove them
94 while values_copy.count(None) > 0:
95 # Store bounds of None value groups so we can insert them back after the downsampling
96 if (new_index := values_copy.index(None)) != index:
97 none_group_bounds.append([time_vector_copy.pop(new_index)])
98 none_group_index += 1
99 elif len(none_group_bounds[none_group_index]) < 2:
100 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
101 else:
102 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
103 values_copy.pop(new_index)
104 index = new_index
105 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
107 try:
108 values_array = npy.array([time_vector_copy, values_copy]).T
109 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
111 new_time_vector = interpolated_values[:, 0].tolist()
112 new_values = interpolated_values[:, 1].tolist()
113 except ValueError:
114 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
115 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
116 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
117 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
118 return new_time_vector, new_values_nan_to_none
120 # insert back None values at the correct timestamps
121 for none_group in none_group_bounds:
122 start_index = npy.searchsorted(new_time_vector, none_group[0])
123 new_time_vector[start_index:start_index] = none_group
124 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
126 return new_time_vector, new_values
129def is_of_type(value, wanted_type):
130 if wanted_type is float:
131 return isinstance(value, (int, float))
132 return isinstance(value, wanted_type)
135# Models
136class TwinPadModel(BaseModel):
137 @classmethod
138 def dict_to_object(cls, dict_):
139 return cls.model_validate(dict_)
141 def to_dict(self, exclude=None):
142 dict_ = self.model_dump(exclude=exclude)
143 return dict_
146class GenericMongo(TwinPadModel):
147 id: str | None = None
148 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
150 @classmethod
151 def collection(cls):
152 return get_collection(systems_database, cls.collection_name, create=True)
154 @classmethod
155 def response_from_query(cls, query) -> ListResponse[Self]:
156 request_filters = query.mongodb_filter()
157 items = []
159 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
160 sort_dict = {}
161 for sort in query.sort_by.split(","):
162 if ":" in sort:
163 sort_field, sort_order = sort.split(":")
164 sort_order = int(sort_order)
165 else:
166 sort_field = sort
167 sort_order = 1
168 sort_dict[sort_field] = sort_order
170 collection = get_collection(systems_database, cls.collection_name, create=True)
171 total = collection.count_documents(request_filters)
173 pipeline = []
174 added_properties = []
175 if "$and" in request_filters:
176 for request_filter in request_filters["$and"]:
177 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
178 if filtered_property in request_filter:
179 pipeline.extend(pipeline_steps)
180 added_properties.append(filtered_property)
181 else:
182 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
183 if filtered_property in request_filters:
184 pipeline.extend(pipeline_steps)
185 added_properties.append(filtered_property)
186 pipeline.append({"$match": request_filters})
188 for sort_field in sort_dict.keys():
189 if sort_field in cls.custom_pipeline_steps:
190 pipeline.extend(cls.custom_pipeline_steps[sort_field])
191 added_properties.append(sort_field)
192 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
194 if (query.limit is not None) and (query.limit != 0):
195 pipeline.append({"$limit": query.limit})
197 for filtered_property, step in cls.custom_pipeline_steps.items():
198 if filtered_property not in added_properties:
199 pipeline.extend(step)
201 cursor = collection.aggregate(pipeline)
203 for item_dict in cursor:
204 items.append(cls.mongo_dict_to_object(item_dict))
206 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
208 @classmethod
209 def get_from_id(cls, item_id) -> Self | None:
210 return cls.get_one_by_attribute("_id", ObjectId(item_id))
212 @classmethod
213 def mongo_dict_to_object(cls, mongo_dict):
214 mongo_dict["id"] = str(mongo_dict["_id"])
215 del mongo_dict["_id"]
216 return cls.dict_to_object(mongo_dict)
218 @classmethod
219 def get_by_attribute(cls, attribute_name: str, attribute_value):
220 """Returns all items that match the attribute with value."""
221 pipeline = []
222 if attribute_name in cls.custom_pipeline_steps:
223 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
224 pipeline.append({"$match": {attribute_name: attribute_value}})
225 for key, step in cls.custom_pipeline_steps.items():
226 if key != attribute_name:
227 pipeline.extend(step)
228 items = cls.collection().aggregate(pipeline)
229 if items is None:
230 return None
231 return [cls.mongo_dict_to_object(d) for d in items]
233 @classmethod
234 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
235 pipeline = []
236 if attribute_name in cls.custom_pipeline_steps:
237 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
238 pipeline.append({"$match": {attribute_name: attribute_value}})
239 pipeline.append({"$limit": 1})
240 for key, step in cls.custom_pipeline_steps.items():
241 if key != attribute_name:
242 pipeline.extend(step)
243 items = cls.collection().aggregate(pipeline).to_list()
244 if len(items) == 0:
245 return None
246 return cls.mongo_dict_to_object(items[0])
248 @classmethod
249 def get_all(cls, sort_by="_id") -> list[Self]:
250 items = []
251 pipeline = []
252 if sort_by in cls.custom_pipeline_steps:
253 pipeline.extend(cls.custom_pipeline_steps[sort_by])
254 pipeline.append({"$sort": {sort_by: ASCENDING}})
255 for key, step in cls.custom_pipeline_steps.items():
256 if key != sort_by:
257 pipeline.extend(step)
258 for dict_ in cls.collection().aggregate(pipeline):
259 items.append(cls.mongo_dict_to_object(dict_))
260 return items
262 @classmethod
263 def get_number_documents(cls):
264 collection = get_collection(systems_database, cls.collection_name)
265 if collection is None:
266 return 0
267 return collection.count_documents({})
269 def insert(self):
270 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
271 self.id = str(insert_result.inserted_id)
272 return self.id
274 def update(self, update_dict):
275 for key, value in update_dict.items():
276 setattr(self, key, value)
277 self.collection().find_one_and_update(
278 {"_id": ObjectId(self.id)},
279 {"$set": update_dict},
280 return_document=ReturnDocument.AFTER,
281 )
283 return self
285 def delete(self):
286 result = self.collection().delete_one({"_id": ObjectId(self.id)})
287 return result.deleted_count > 0
290class User(GenericMongo):
291 collection_name: ClassVar[str] = "users"
293 firstname: str
294 lastname: str
295 email: str
296 password: str
297 is_active: bool | None = False
298 is_admin: bool | None = False
299 is_connected: bool | None = False
300 company_id: str | None = None
302 def to_dict(self, exclude=None):
303 if exclude is None:
304 exclude = {"password"}
305 return GenericMongo.to_dict(self, exclude=exclude)
307 @classmethod
308 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
309 users = cls.get_all()
310 if not users:
311 is_admin = True
312 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
313 user_collection = get_collection(systems_database, "users", create=True)
314 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
315 if new_user is None:
316 return None
317 return {"user_id": str(new_user.inserted_id)}
319 @classmethod
320 def update_info(cls, user: "UserUpdate", user_id: str):
321 updated_user = cls.collection().find_one_and_update(
322 {"_id": ObjectId(user_id)},
323 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
324 return_document=ReturnDocument.AFTER,
325 )
326 updated_user["id"] = str(updated_user["_id"])
327 del (updated_user["_id"], updated_user["is_connected"])
328 return cls(**updated_user)
331UserUpdate = create_update_model(User)
334class Mode(TwinPadModel):
335 mode_id: int
336 name: str
337 frequency_multiplier: float
338 min_frequency: float
341class DeviceUpdate(TwinPadModel):
342 mode_id: int
345class Device(GenericMongo):
346 collection_name: ClassVar[str] = "devices"
348 device_id: str
349 name: str
350 description: str = ""
351 modes: list[Mode]
352 current_mode_id: int | None = None
353 last_ping: float | None = None
354 petri_network: Any
355 pid: Any
356 load: float | None = None
357 tokens: list[int] = Field(default_factory=list)
358 status: str
360 async def change_mode(self, update_dict, current_user: User):
361 has_error = False
363 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
364 has_error = True
365 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
366 elif self.current_mode_id is not None:
367 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
368 else:
369 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
370 command = Command(
371 sent_at=time.time(),
372 command_type="Mode change",
373 description=description,
374 user_id=current_user.id,
375 )
377 if has_error:
378 command.response_time = 0
379 command.succeeded = False
380 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
381 else:
382 response = await send_mode_change(self.device_id, update_dict.mode_id)
383 command.receive_response(response)
385 Command.create(command)
386 return response
388 @classmethod
389 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
390 devices_by_id = {}
391 for signal_id in signal_ids:
392 device_id = signal_id.split(".")[0]
393 if device_id not in devices_by_id:
394 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
395 return devices_by_id
398class DeviceSetup(GenericMongo):
399 collection_name: ClassVar[str] = "device_setups"
401 device_ids: list[str]
402 active: bool = False
403 variable_mapping: dict[str, str]
406DeviceSetupUpdate = create_update_model(DeviceSetup)
409class DeviceState(GenericMongo):
410 collection_name: ClassVar[str] = "devices_states"
412 timestamp: float
413 mode: str | None = None
414 load: float | None = None
415 tokens: list[int] = Field(default_factory=list)
416 modified_properties: list[str] = Field(default_factory=list)
418 @classmethod
419 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
420 req_filter = query.mongodb_filter()
421 items = []
422 if ":" in query.sort_by:
423 sort_field, sort_order = query.sort_by.split(":")
424 sort_order = int(sort_order)
425 else:
426 sort_field = query.sort_by
427 sort_order = 1
428 collection = get_collection(devices_states_database, device_id)
429 if collection is None:
430 total = 0
431 cursor = []
432 else:
433 total = collection.count_documents(req_filter)
434 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
435 if (query.limit is not None) and (query.limit != 0):
436 cursor = cursor.limit(query.limit)
437 for item_dict in cursor:
438 items.append(
439 cls(
440 timestamp=item_dict.get("precise_timestamp"),
441 mode=item_dict.get("mode", None),
442 load=item_dict.get("load", None),
443 tokens=item_dict.get("tokens", Field(default_factory=list)),
444 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
445 )
446 )
447 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
450class SignalSample(TwinPadModel):
451 signal_id: str
452 timestamp: float
453 value: float | int | str | bool | None
454 forced_value: float | int | str | bool | None = None
456 @classmethod
457 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
459 collection = get_signal_collection(signal_id)
460 if collection is None:
461 return None
463 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
464 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
465 bucket = get_signal_collection(f"system.buckets.{signal_id}")
466 first_bucket = None
467 if bucket is not None:
468 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
469 if first_bucket is not None:
470 sample_data = collection.find_one(
471 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
472 )
473 else:
474 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
476 if sample_data is None:
477 return None
479 timestamp = sample_data["precise_timestamp"]
481 return cls(
482 signal_id=signal_id,
483 timestamp=timestamp,
484 value=sample_data.get("value", None),
485 forced_value=sample_data.get("forced_value", None),
486 )
488 @classmethod
489 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
490 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
492 @classmethod
493 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
495 collection = get_signal_collection(signal_id)
496 if collection is None:
497 return None
499 # Same workaround as above function, very effective to narrow down big sets of data
500 bucket = get_signal_collection(f"system.buckets.{signal_id}")
501 last_bucket = None
502 if bucket is not None:
503 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
504 if last_bucket is not None:
505 sample_data = collection.find_one(
506 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
507 sort={"precise_timestamp": -1},
508 )
509 else:
510 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
512 if sample_data is None:
513 return None
515 timestamp = sample_data["precise_timestamp"]
517 if device is None:
518 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
519 if device is not None and device.last_ping is not None:
520 if timestamp is None:
521 timestamp = device.last_ping
522 else:
523 timestamp = max(timestamp, device.last_ping)
524 return cls(
525 signal_id=signal_id,
526 timestamp=timestamp,
527 value=sample_data.get("value", None),
528 forced_value=sample_data.get("forced_value", None),
529 )
531 @classmethod
532 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None:
533 collection = get_signal_collection(signal_id)
534 if collection is None:
535 return None
537 sample_data = collection.find_one(
538 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1}
539 )
540 if sample_data is None:
541 return None
543 return cls(
544 signal_id=signal_id,
545 timestamp=sample_data.get("precise_timestamp"),
546 value=sample_data.get("value"),
547 forced_value=sample_data.get("forced_value"),
548 )
550 @classmethod
551 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
552 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
553 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
555 @classmethod
556 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None:
557 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids]
560class SignalData(TwinPadModel):
561 signal_id: str
562 forcible: bool = True
563 time_vector: list[float]
564 values: list[float | int | str | None]
565 forced_values: list[float | int | str | None]
567 data_start: float | None = None
568 data_end: float | None = None
570 number_samples: int = 0
571 number_samples_db: int = 0
573 db_query_time: float = 0.0
574 init_time: float = 0.0
575 data_processing_time: float = 0.0
577 @classmethod
578 def get_from_signal_id(
579 cls,
580 signal_id: str,
581 min_timestamp: float = None,
582 max_timestamp: float = None,
583 window_min_timestamp: float = None,
584 window_max_timestamp: float = None,
585 interpolate_bounds: bool = True,
586 max_documents: int = None,
587 ) -> Self:
589 now = time.time()
591 req_signal = {}
592 if min_timestamp is not None:
593 req_signal.setdefault("timestamp", {})
594 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
595 if max_timestamp is not None:
596 req_signal.setdefault("timestamp", {})
597 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
599 collection = get_signal_collection(signal_id)
600 if collection is None:
601 return cls(
602 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
603 )
605 db_req_start = time.time()
607 sort_step = {"$sort": {"precise_timestamp": 1}}
608 number_results = collection.count_documents(req_signal)
610 pipeline = []
611 if req_signal:
612 pipeline.append({"$match": req_signal}) # Filter data if needed
614 pipeline.extend(
615 [
616 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
617 sort_step,
618 ]
619 )
621 if max_documents is not None and max_documents < number_results:
622 unsampling_ratio = math.ceil(number_results / max_documents)
623 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
624 pipeline.extend(
625 [
626 {
627 "$setWindowFields": {
628 "sortBy": {"precise_timestamp": 1},
629 "output": {"index": {"$documentNumber": {}}},
630 }
631 },
632 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
633 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
634 {"$replaceRoot": {"newRoot": "$doc"}},
635 {"$unset": ["index", "group_id"]},
636 {"$sort": {"precise_timestamp": 1}},
637 ]
638 )
640 # logger.info(f"pipeline: %s", str(pipeline))
641 cursor = collection.aggregate(pipeline)
642 db_req_time = time.time() - db_req_start
644 init_time = time.time()
646 results = cursor.to_list()
647 time_vector = []
648 values = []
649 forced_values = []
650 for s in results:
651 time_vector.append(s["precise_timestamp"])
652 values.append(s.get("value", None))
653 forced_values.append(s.get("forced_value", None))
655 signal = Signal.get_from_signal_id(signal_id)
656 class_ = signal.signal_data_class
658 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
659 time_vector, values, forced_values = cls.interpolate_bounds(
660 class_,
661 collection,
662 signal_id,
663 time_vector,
664 values,
665 forced_values,
666 window_min_timestamp,
667 window_max_timestamp,
668 )
670 if values:
671 # TODO: check below. a bit strange
672 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
673 # Adding last value as it should be repeated
674 time_vector.append(now)
675 values.append(values[-1])
676 forced_values.append(forced_values[-1])
678 init_time = time.time() - init_time
680 # See line 292 for explanation
681 bucket = get_signal_collection(f"system.buckets.{signal_id}")
682 first_bucket = None
683 if bucket is not None:
684 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
685 if first_bucket is not None:
686 data_start = first_bucket["control"]["min"]["precise_timestamp"]
687 else:
688 data_start = None
690 last_bucket = None
691 if bucket is not None:
692 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
693 if last_bucket is not None:
694 data_end = last_bucket["control"]["max"]["precise_timestamp"]
695 else:
696 data_end = None
698 return class_(
699 signal_id=signal_id,
700 forcible=signal.forcible,
701 time_vector=time_vector,
702 values=values,
703 forced_values=forced_values,
704 data_start=data_start,
705 data_end=data_end,
706 number_samples=len(values),
707 number_samples_db=number_results,
708 db_query_time=db_req_time,
709 init_time=init_time,
710 )
712 @staticmethod
713 def interpolate_bounds(
714 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
715 ):
716 sample_right = None
717 # Fetching right side value & interpolation
718 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
719 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
720 sample_right = collection.find_one(
721 {
722 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
723 "value": {"$exists": True},
724 },
725 sort={"precise_timestamp": -1},
726 )
727 if sample_right:
728 if time_vector:
729 right_sd = class_(
730 signal_id=signal_id,
731 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
732 values=[values[-1], sample_right.get("value", None)],
733 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
734 )
735 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
736 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
737 else:
738 max_ts_value = sample_right.get("value", None)
739 max_ts_forced_value = sample_right.get("forced_value", None)
740 time_vector.append(window_max_timestamp)
741 values.append(max_ts_value)
742 forced_values.append(max_ts_forced_value)
744 # Fetching left side value & interpolation
745 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
746 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
747 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
748 sample_left = sample_right
749 sample_left = collection.find_one(
750 {
751 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
752 "value": {"$exists": True},
753 },
754 sort={"precise_timestamp": -1},
755 )
757 if sample_left:
758 if time_vector:
759 left_sd = class_(
760 signal_id=signal_id,
761 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
762 values=[sample_left["value"], values[0]],
763 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
764 )
765 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
766 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
767 else:
768 min_ts_value = sample_left.get("value", None)
769 min_ts_forced_value = sample_left.get("forced_value", None)
770 time_vector.insert(0, window_min_timestamp)
771 values.insert(0, min_ts_value)
772 forced_values.insert(0, min_ts_forced_value)
774 return time_vector, values, forced_values
776 def interpolate_values(self, new_time_vector: list[float]):
777 return self.interpolate(new_time_vector, self.values)
779 def interpolate_forced_values(self, new_time_vector: list[float]):
780 return self.interpolate(new_time_vector, self.forced_values)
782 def uniform_desampling(self, number_samples_max: int) -> Self:
783 data_processing_time = time.time()
784 if number_samples_max and self.number_samples > number_samples_max:
785 new_time_vector = npy.linspace(
786 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
787 ).tolist()
788 values = self.interpolate_values(new_time_vector)
789 forced_values = self.interpolate_forced_values(new_time_vector)
790 time_vector = new_time_vector
791 number_samples = len(time_vector)
792 else:
793 time_vector = self.time_vector
794 number_samples = len(self.values)
795 values = self.values[:]
796 forced_values = self.forced_values[:]
797 data_processing_time = time.time() - data_processing_time
799 return self.__class__(
800 signal_id=self.signal_id,
801 time_vector=time_vector,
802 values=values,
803 forced_values=forced_values,
804 number_samples=number_samples,
805 number_samples_db=self.number_samples,
806 data_start=self.data_start,
807 data_end=self.data_end,
808 db_query_time=self.db_query_time,
809 init_time=self.init_time,
810 data_processing_time=self.data_processing_time + data_processing_time,
811 )
813 def interest_window_desampling(
814 self,
815 window_max_number_samples: int,
816 outside_max_number_samples: int,
817 window_min_timestamp: float | None = None,
818 window_max_timestamp: float | None = None,
819 ) -> Self:
820 """Performs a sampling in a window of interest and outside."""
822 if not self.time_vector:
823 return self
825 if window_min_timestamp is None:
826 window_min_timestamp = self.time_vector[0]
827 if window_max_timestamp is None:
828 window_max_timestamp = self.time_vector[-1]
830 data_processing_time = time.time()
832 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
833 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
835 time_vector_before = self.time_vector[:index_window_start]
836 time_vector_window = self.time_vector[index_window_start:index_window_end]
837 time_vector_after = self.time_vector[index_window_end:]
839 # Resampling window
840 if time_vector_window:
841 # Ensurring window bounds
842 if time_vector_window[0] != window_min_timestamp:
843 time_vector_window.insert(0, window_min_timestamp)
844 if time_vector_window[-1] != window_max_timestamp:
845 time_vector_window.append(window_max_timestamp)
846 else:
847 time_vector_window = [window_min_timestamp, window_max_timestamp]
849 if len(time_vector_window) > window_max_number_samples:
850 # Resampling
851 new_window_time_vector = npy.linspace(
852 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
853 ).tolist()
854 time_vector_window = new_window_time_vector
856 # Resampling outside
857 number_samples_before = len(time_vector_before)
858 number_samples_after = len(time_vector_after)
859 if (number_samples_before + number_samples_after) > outside_max_number_samples:
860 new_number_samples_before = min(
861 number_samples_before,
862 math.ceil(
863 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
864 ),
865 )
866 new_number_samples_after = min(
867 number_samples_after,
868 math.ceil(
869 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
870 ),
871 )
872 # Adjusting numbers as math.ceil can do +1 on sum
873 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
874 if new_number_samples_before > new_number_samples_after:
875 new_number_samples_before -= 1
876 else:
877 new_number_samples_after -= 1
879 if new_number_samples_before > 0:
880 new_time_vector_before = npy.linspace(
881 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
882 ).tolist()
883 time_vector_before = new_time_vector_before
885 if new_number_samples_after > 0:
886 new_time_vector_after = npy.linspace(
887 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
888 ).tolist()[::-1]
889 time_vector_after = new_time_vector_after
891 new_time_vector = time_vector_before + time_vector_window + time_vector_after
892 values = self.interpolate_values(new_time_vector)
893 forced_values = self.interpolate_forced_values(new_time_vector)
894 number_samples = len(values)
896 data_processing_time = time.time() - data_processing_time
898 return self.__class__(
899 signal_id=self.signal_id,
900 forcible=self.forcible,
901 time_vector=new_time_vector,
902 values=values,
903 forced_values=forced_values,
904 number_samples=number_samples,
905 number_samples_db=self.number_samples,
906 data_start=self.data_start,
907 data_end=self.data_end,
908 db_query_time=self.db_query_time,
909 init_time=self.init_time,
910 data_processing_time=self.data_processing_time + data_processing_time,
911 )
913 def csv_export(self):
914 output = io.StringIO()
915 writer = csv.writer(output)
916 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
917 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
918 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
919 return output.getvalue().encode("utf-8")
921 def prestoplot_export(self):
922 clean_signal_id = self.signal_id.replace(".", "_")
923 if clean_signal_id[0].isnumeric():
924 clean_signal_id = "_" + clean_signal_id
926 output = io.StringIO()
927 output.write("# Encoding:\tUTF-8\n")
928 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
929 output.write("ISO8601\tnone\tnone\n")
930 output.write(f"# Description :\t{clean_signal_id}\n")
932 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
933 output.write(
934 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
935 )
936 return output.getvalue().encode("utf-8")
939class NumericSignalData(SignalData):
940 data_type: str = "float"
941 values: list[float | int | None]
942 forced_values: list[float | int | None]
944 def interpolate(self, new_time_vector: list[float], items):
945 items = [npy.nan if s is None else s for s in items]
946 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
948 def uniform_desampling(self, number_samples_max: int) -> Self:
949 data_processing_time = time.time()
950 if number_samples_max and self.number_samples > number_samples_max:
951 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
952 forced_values = self.interpolate_forced_values(time_vector)
953 number_samples = len(time_vector)
954 else:
955 time_vector = self.time_vector
956 number_samples = len(self.values)
957 values = self.values[:]
958 forced_values = self.forced_values[:]
959 data_processing_time = time.time() - data_processing_time
961 return self.__class__(
962 signal_id=self.signal_id,
963 time_vector=time_vector,
964 values=values,
965 forced_values=forced_values,
966 number_samples=number_samples,
967 number_samples_db=self.number_samples,
968 data_start=self.data_start,
969 data_end=self.data_end,
970 db_query_time=self.db_query_time,
971 init_time=self.init_time,
972 data_processing_time=self.data_processing_time + data_processing_time,
973 )
975 def interest_window_desampling(
976 self,
977 window_max_number_samples: int,
978 outside_max_number_samples: int,
979 window_min_timestamp: float | None = None,
980 window_max_timestamp: float | None = None,
981 ) -> Self:
982 """Performs a sampling in a window of interest and outside."""
984 if not self.time_vector:
985 return self
987 if window_min_timestamp is None:
988 window_min_timestamp = self.time_vector[0]
989 if window_max_timestamp is None:
990 window_max_timestamp = self.time_vector[-1]
992 data_processing_time = time.time()
994 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
995 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
997 time_vector_before = self.time_vector[:index_window_start]
998 time_vector_window = self.time_vector[index_window_start:index_window_end]
999 time_vector_after = self.time_vector[index_window_end:]
1001 values_before = self.values[:index_window_start]
1002 values_window = self.values[index_window_start:index_window_end]
1003 values_after = self.values[index_window_end:]
1004 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1005 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1007 # Resampling window
1008 if time_vector_window:
1009 # Ensurring window bounds
1010 if time_vector_window[0] != window_min_timestamp:
1011 time_vector_window.insert(0, window_min_timestamp)
1012 values_window.insert(0, window_min_value)
1013 if time_vector_window[-1] != window_max_timestamp:
1014 time_vector_window.append(window_max_timestamp)
1015 values_window.append(window_max_value)
1016 else:
1017 time_vector_window = [window_min_timestamp, window_max_timestamp]
1018 values_window = [window_min_value, window_max_value]
1020 if len(time_vector_window) > window_max_number_samples:
1021 # Resampling
1022 time_vector_window, values_window = downsample_list(
1023 time_vector_window, values_window, window_max_number_samples
1024 )
1026 # Resampling outside
1027 number_samples_before = len(time_vector_before)
1028 number_samples_after = len(time_vector_after)
1029 if (number_samples_before + number_samples_after) > outside_max_number_samples:
1030 new_number_samples_before = min(
1031 number_samples_before,
1032 math.ceil(
1033 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1034 ),
1035 )
1036 new_number_samples_after = min(
1037 number_samples_after,
1038 math.ceil(
1039 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1040 ),
1041 )
1042 # Adjusting numbers as math.ceil can do +1 on sum
1043 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1044 if new_number_samples_before > new_number_samples_after:
1045 new_number_samples_before -= 1
1046 else:
1047 new_number_samples_after -= 1
1049 if new_number_samples_before > 0:
1050 time_vector_before, values_before = downsample_list(
1051 time_vector_before, values_before, new_number_samples_before
1052 )
1054 if new_number_samples_after > 0:
1055 time_vector_after, values_after = downsample_list(
1056 time_vector_after, values_after, new_number_samples_after
1057 )
1059 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1060 values = values_before + values_window + values_after
1061 forced_values = self.interpolate_forced_values(new_time_vector)
1062 number_samples = len(values)
1064 data_processing_time = time.time() - data_processing_time
1066 return self.__class__(
1067 signal_id=self.signal_id,
1068 time_vector=new_time_vector,
1069 values=values,
1070 forced_values=forced_values,
1071 number_samples=number_samples,
1072 number_samples_db=self.number_samples,
1073 data_start=self.data_start,
1074 data_end=self.data_end,
1075 db_query_time=self.db_query_time,
1076 init_time=self.init_time,
1077 data_processing_time=self.data_processing_time + data_processing_time,
1078 )
1081class StringSignalData(SignalData):
1082 data_type: str = "str"
1083 values: list[str | None]
1084 forced_values: list[str | None]
1086 def interpolate(self, new_time_vector: list[float], items):
1087 # Find the indices of the values in xp that are just smaller or equal to x
1088 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1089 indices = npy.clip(indices, 0, len(items) - 1)
1090 # Return the corresponding left string values from fp
1091 return [items[i] for i in indices]
1094class SignalsData(TwinPadModel):
1095 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1096 data_processing_time: float
1097 data_start: float | None
1098 data_end: float | None
1100 @classmethod
1101 def get_from_signal_ids(
1102 cls,
1103 signal_ids: list[str],
1104 min_timestamp: float = None,
1105 max_timestamp: float = None,
1106 window_min_timestamp: float = None,
1107 window_max_timestamp: float = None,
1108 interpolate_bounds: bool = True,
1109 max_documents: int = None,
1110 ) -> Self:
1111 signals_data = []
1112 data_start = None
1113 data_end = None
1114 if max_timestamp is None:
1115 max_timestamp = time.time()
1116 data_processing_time = 0.0
1117 for signal_id in signal_ids:
1118 signal_data = SignalData.get_from_signal_id(
1119 signal_id=signal_id,
1120 min_timestamp=min_timestamp,
1121 max_timestamp=max_timestamp,
1122 window_min_timestamp=window_min_timestamp,
1123 window_max_timestamp=window_max_timestamp,
1124 interpolate_bounds=interpolate_bounds,
1125 max_documents=max_documents,
1126 )
1127 data_processing_time += signal_data.data_processing_time
1128 signals_data.append(signal_data)
1129 if signal_data.data_start is not None:
1130 if data_start is None:
1131 data_start = signal_data.data_start
1132 else:
1133 data_start = min(signal_data.data_start, data_start)
1134 if signal_data.data_end is not None:
1135 if data_end is None:
1136 data_end = signal_data.data_end
1137 else:
1138 data_end = max(signal_data.data_end, data_end)
1140 return cls(
1141 signals_data=signals_data,
1142 data_processing_time=data_processing_time,
1143 data_start=data_start,
1144 data_end=data_end,
1145 )
1147 def uniform_desampling(self, number_samples_max: int) -> Self:
1148 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1149 return SignalsData(
1150 signals_data=signals_data,
1151 data_processing_time=sum(s.data_processing_time for s in signals_data),
1152 data_start=self.data_start,
1153 data_end=self.data_end,
1154 )
1156 def interest_window_desampling(
1157 self,
1158 window_max_number_samples: int,
1159 outside_max_number_samples: int,
1160 window_min_timestamp: float = None,
1161 window_max_timestamp: float = None,
1162 ) -> Self:
1163 signals_data = [
1164 s.interest_window_desampling(
1165 window_max_number_samples=window_max_number_samples,
1166 outside_max_number_samples=outside_max_number_samples,
1167 window_min_timestamp=window_min_timestamp,
1168 window_max_timestamp=window_max_timestamp,
1169 )
1170 for s in self.signals_data
1171 ]
1173 return SignalsData(
1174 signals_data=signals_data,
1175 data_processing_time=sum(s.data_processing_time for s in signals_data),
1176 data_start=self.data_start,
1177 data_end=self.data_end,
1178 )
1180 def zip_export(self, file_format: str = "csv"):
1181 # return self.signals_data[0].csv_export()
1182 zip_buffer = io.BytesIO()
1183 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1184 for signal_data in self.signals_data:
1185 if file_format == "csv":
1186 export_io = signal_data.csv_export()
1187 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1188 elif file_format == "prestoplot":
1189 export_io = signal_data.prestoplot_export()
1190 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1191 else:
1192 raise ValueError(f"Format not found. Got: {file_format}")
1193 zip_bytes = zip_buffer.getvalue()
1194 # zip_bytes.seek(0)
1195 return zip_bytes
1197 def hdf5_export(self):
1198 hdf5_buffer = io.BytesIO()
1199 custom_type_float = npy.dtype(
1200 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1201 )
1202 custom_type_string = npy.dtype(
1203 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1204 )
1205 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1206 for signal_data in self.signals_data:
1207 signal_group = hdf5_file.create_group(signal_data.signal_id)
1208 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1209 if signal_data.data_type == "str":
1210 export_data = npy.array(
1211 list(
1212 zip(
1213 date_vector,
1214 signal_data.time_vector,
1215 signal_data.values,
1216 signal_data.forced_values,
1217 )
1218 ),
1219 dtype=custom_type_string,
1220 )
1221 else:
1222 export_data = npy.array(
1223 list(
1224 zip(
1225 date_vector,
1226 signal_data.time_vector,
1227 signal_data.values,
1228 signal_data.forced_values,
1229 )
1230 ),
1231 dtype=custom_type_float,
1232 )
1233 signal_group["data"] = export_data
1234 return hdf5_buffer.getvalue()
1237class SignalStatus(TwinPadModel):
1238 status: str = "down"
1239 reason: str = ""
1240 delay: float | None = None
1243class DigitizationFunction(TwinPadModel):
1244 bits: int | None = None
1245 min_value: float
1246 max_value: float
1247 min_raw_value: float
1248 max_raw_value: float
1251class SignalUpdate(TwinPadModel):
1252 value: float | str | bool | int | None = None
1253 forced_value: float | str | bool | int | None = None
1254 timestamp: int | None = None
1257class SignalType(str, Enum):
1258 command = "command"
1259 sensor = "sensor"
1260 external_sensor = "external_sensor"
1263SIGNALDATA_TYPES = {
1264 "int": NumericSignalData,
1265 "float": NumericSignalData,
1266 "str": StringSignalData,
1267 "bool": NumericSignalData,
1268 "epoch": NumericSignalData,
1269}
1272class Signal(GenericMongo):
1273 collection_name: ClassVar[str] = "signals"
1275 signal_id: str
1276 frequency: float
1277 unit: str | None
1278 description: str
1279 type: SignalType
1280 data_type: str
1281 precision_digits: int | None
1282 forcible: bool
1283 status: SignalStatus = SignalStatus()
1285 digitization_function: DigitizationFunction | None
1287 @property
1288 def device(self) -> Device:
1289 device_id = self.signal_id.split(".")[0]
1290 device = Device.get_one_by_attribute("device_id", device_id)
1291 return device
1293 @cached_property
1294 def signal_data_class(self):
1295 if self.data_type in SIGNALDATA_TYPES:
1296 return SIGNALDATA_TYPES[self.data_type]
1297 if self.data_type.startswith("enum"):
1298 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1299 raise ValueError(f"Unhandled python type: {self.data_type}")
1301 @cached_property
1302 def python_type(self):
1303 if self.data_type in TYPES:
1304 return TYPES[self.data_type]
1305 if self.data_type.startswith("enum"):
1306 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1307 return Literal[*choices]
1308 raise ValueError(f"Unhandled python type: {self.data_type}")
1310 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1311 command = Command(
1312 sent_at=time.time(),
1313 command_type="Signal command",
1314 user_id=current_user.id,
1315 )
1317 has_input_error = False
1318 error_message = ""
1320 if self.data_type.startswith("enum"):
1321 enum_options = get_args(self.python_type)
1323 if update_dict.value is not None and update_dict.value not in enum_options:
1324 has_input_error = True
1325 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1326 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1327 has_input_error = True
1328 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1329 else:
1330 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1331 has_input_error = True
1332 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1333 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1334 has_input_error = True
1335 error_message += (
1336 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1337 )
1339 if has_input_error:
1340 command.response_time = 0
1341 command.succeeded = False
1342 command.description = f"Tried to modify signal {self.signal_id}"
1343 response = {"error": True, "status_code": 400, "message": error_message}
1344 else:
1345 response = await send_signal_value(self.signal_id, update_dict)
1346 command.receive_response(response)
1348 Command.create(command)
1349 return response
1351 @classmethod
1352 def get_from_signal_id(cls, signal_id) -> Self:
1353 """Could be generic from mongo"""
1354 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1355 if not raw_value:
1356 return None
1357 del raw_value["_id"]
1358 return cls.dict_to_object(raw_value)
1360 @classmethod
1361 def get_all_ids(cls) -> list[str]:
1362 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1364 return [signal["signal_id"] for signal in cursor]
1366 @classmethod
1367 def get_all_statuses(cls) -> list[dict]:
1368 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}])
1370 return [
1371 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]}
1372 for signal in cursor
1373 ]
1375 async def number_samples(self):
1376 collection = get_signal_collection(signal_id=self.signal_id)
1377 if collection is None:
1378 return 0
1380 number_samples = collection.estimated_document_count()
1382 number_samples_async_collection = await get_async_collection(
1383 systems_async_database, "number_samples", create=True, time_series=True
1384 )
1386 loop = asyncio.get_running_loop()
1387 loop.create_task(
1388 number_samples_async_collection.insert_one(
1389 {
1390 "timestamp": datetime.datetime.now(pytz.UTC),
1391 "signal_id": self.signal_id,
1392 "number_samples": number_samples,
1393 }
1394 )
1395 )
1397 return number_samples
1399 @classmethod
1400 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]:
1401 number_samples_by_id = {}
1402 collections = get_signal_collections_batch(signal_ids)
1403 number_samples_async_collection = await get_async_collection(
1404 systems_async_database, "number_samples", create=True, time_series=True
1405 )
1407 for signal_id, collection in zip(signal_ids, collections):
1408 if collection is None:
1409 number_samples_by_id[signal_id] = 0
1410 continue
1412 number_samples = collection.estimated_document_count()
1414 number_samples_by_id[signal_id] = number_samples
1416 now = datetime.datetime.now(pytz.UTC)
1417 loop = asyncio.get_running_loop()
1418 loop.create_task(
1419 number_samples_async_collection.insert_many(
1420 [
1421 {
1422 "timestamp": now,
1423 "signal_id": signal_id,
1424 "number_samples": number_samples,
1425 }
1426 for signal_id, number_samples in number_samples_by_id.items()
1427 ]
1428 )
1429 )
1431 return number_samples_by_id
1433 def sample_datasize(self):
1434 return signals_database.command("collstats", self.signal_id)["size"]
1436 @classmethod
1437 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1438 result = cls.collection().aggregate(
1439 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1440 )
1442 return {signal["signal_id"]: signal["forcible"] for signal in result}
1445class ServicesStatus(TwinPadModel):
1446 backend: str
1447 cloud_broker: str
1448 time_series_database: str
1449 signal_storage: str
1450 heartbeat_storage: str
1451 data_analyzer: str
1453 @classmethod
1454 def check(cls) -> Self:
1455 return cls(
1456 cloud_broker=ping(RABBITMQ_HOST),
1457 backend="up",
1458 time_series_database=ping(MONGO_HOST),
1459 signal_storage=ping(SIGNAL_STORAGE_HOST),
1460 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1461 data_analyzer=ping(DATA_ANALYZER_HOST),
1462 )
1465def ping(host):
1466 try:
1467 if ping3.ping(host, timeout=0.8):
1468 return "up"
1469 except PermissionError:
1470 pass
1471 return "down"
1474class Event(GenericMongo):
1475 collection_name: ClassVar[str] = "events"
1477 name: str
1478 timestamp: float
1479 event_rule_id: str
1481 @computed_field
1482 @cached_property
1483 def event_rule(self) -> "EventRule":
1484 return EventRule.get_from_id(self.event_rule_id)
1486 @classmethod
1487 def dict_to_object(cls, dict_):
1488 """Refine to convert timestamp to datetime for mongodb."""
1489 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1490 return super().dict_to_object(dict_)
1493class TwinPadActivity(GenericMongo):
1494 timestamp: float
1495 amount: int
1497 @classmethod
1498 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
1499 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1500 number_events_collection = get_collection(systems_database, "number_events")
1501 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1502 items = []
1503 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1504 if number_events_collection is None or recompute_amount:
1505 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1506 number_events_collection.delete_many({})
1507 first_event = events_collection.find_one(sort={"timestamp": 1})
1508 if first_event is None:
1509 return items
1510 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1511 tzinfo=pytz.UTC
1512 )
1513 while last_computed_day < TODAY:
1514 day_nb_events = events_collection.count_documents(
1515 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1516 )
1517 if day_nb_events > 0:
1518 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
1519 last_computed_day += ONE_DAY_OFFSET
1520 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1521 if number_events_today > 0:
1522 number_events_collection.delete_many({"timestamp": TODAY})
1523 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
1524 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1525 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1526 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1527 for day in number_events:
1528 day["timestamp"] = day["timestamp"].timestamp()
1529 items.append(cls.mongo_dict_to_object(day))
1530 return items
1532 @classmethod
1533 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1534 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1535 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1536 signals_number_samples_collection = get_collection(
1537 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
1538 )
1539 items = []
1540 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1541 if number_samples_collection is None or recompute_amount:
1542 number_samples_collection = get_collection(
1543 systems_database, "number_received_samples", create=True, time_series=True
1544 )
1545 number_samples_collection.delete_many({})
1546 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
1547 if first_sample is None:
1548 return items
1549 # compute from day of first found event
1550 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
1551 tzinfo=pytz.UTC
1552 )
1553 while last_computed_day < TODAY:
1554 number_samples_request = signals_number_samples_collection.aggregate(
1555 [
1556 {
1557 "$match": {
1558 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
1559 }
1560 },
1561 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1562 ]
1563 ).to_list()
1564 if len(number_samples_request) == 0:
1565 number_samples = 0
1566 else:
1567 number_samples = number_samples_request[0].get("number_samples", 0)
1568 if number_samples > 0:
1569 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
1570 last_computed_day += ONE_DAY_OFFSET
1571 number_samples_request = signals_number_samples_collection.aggregate(
1572 [
1573 {"$match": {"timestamp": {"$gte": TODAY}}},
1574 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1575 ]
1576 ).to_list()
1577 if len(number_samples_request) == 0:
1578 number_samples_today = 0
1579 else:
1580 number_samples_today = number_samples_request[0].get("number_samples", 0)
1581 if number_samples_today > 0:
1582 number_samples_collection.delete_many({"timestamp": TODAY})
1583 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
1584 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1585 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1586 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1587 for day in number_events:
1588 day["timestamp"] = day["timestamp"].timestamp()
1589 items.append(cls.mongo_dict_to_object(day))
1590 return items
1592 @classmethod
1593 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1594 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1595 number_commands_collection = get_collection(systems_database, "number_commands")
1596 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
1597 items = []
1598 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1599 if number_commands_collection is None or recompute_amount:
1600 number_commands_collection = get_collection(
1601 systems_database, "number_commands", create=True, time_series=True
1602 )
1603 number_commands_collection.delete_many({})
1604 first_command = commands_collection.find_one(sort={"timestamp": 1})
1605 if first_command is None:
1606 return items
1607 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
1608 tzinfo=pytz.UTC
1609 )
1610 while last_computed_day < TODAY:
1611 day_nb_commands = commands_collection.count_documents(
1612 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1613 )
1614 if day_nb_commands > 0:
1615 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
1616 last_computed_day += ONE_DAY_OFFSET
1617 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
1618 if number_commands_today > 0:
1619 number_commands_collection.delete_many({"timestamp": TODAY})
1620 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
1621 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1622 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1623 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1624 for day in number_commands:
1625 day["timestamp"] = day["timestamp"].timestamp()
1626 items.append(cls.mongo_dict_to_object(day))
1627 return items
1630class EventRule(GenericMongo):
1631 collection_name: ClassVar[str] = "event_rules"
1633 name: str
1634 formula: str
1635 variables: list[str]
1637 @computed_field
1638 @cached_property
1639 def number_events(self) -> int:
1640 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1643class Company(GenericMongo):
1644 collection_name: ClassVar[str] = "companies"
1645 name: str
1648class Campaign(GenericMongo):
1649 collection_name: ClassVar[str] = "campaigns"
1651 # Properties
1652 id: str | None = None
1653 name: str
1654 description: str | None = None
1656 @classmethod
1657 def create(cls, campaign: Self):
1658 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1659 if new_campaign is None:
1660 return None
1661 return {"campaign_id": str(new_campaign.inserted_id)}
1663 @classmethod
1664 def update(cls, campaign: Self):
1665 updated_campaign = cls.collection().find_one_and_update(
1666 {"_id": ObjectId(campaign.id)},
1667 {"$set": {"name": campaign.name, "description": campaign.description}},
1668 return_document=ReturnDocument.AFTER,
1669 )
1670 return updated_campaign
1672 @classmethod
1673 def delete(cls, campaign_id):
1674 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1675 return deleted_user
1678class Phase(GenericMongo):
1679 collection_name: ClassVar[str] = "phases"
1681 # Properties
1682 id: str | None = None
1683 name: str
1684 description: str | None = None
1685 start_at: float
1686 end_at: float
1688 # FK
1689 campaign_id: str
1691 # @classmethod
1692 # def get_by_date(cls, datetime: float):
1693 # phases = []
1694 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1695 # phases.append(cls.dict_to_object(dict_).model_dump())
1696 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1697 # phases.append(cls.dict_to_object(dict_).model_dump())
1698 # if phases is None:
1699 # return None
1700 # return phases
1702 @classmethod
1703 def create(cls, phase: Self):
1704 phase = Phase(
1705 name=phase.name,
1706 description=phase.description,
1707 start_at=phase.start_at,
1708 end_at=phase.end_at,
1709 campaign_id=phase.campaign_id,
1710 )
1711 phase_collection = get_collection(systems_database, "phases", create=True)
1712 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1713 if new_phase is None:
1714 return None
1715 return {"phase_id": str(new_phase.inserted_id)}
1717 @classmethod
1718 def update(cls, phase: Self):
1719 updated_phase = cls.collection().find_one_and_update(
1720 {"_id": ObjectId(phase.id)},
1721 {
1722 "$set": {
1723 "name": phase.name,
1724 "description": phase.description,
1725 "start_at": phase.start_at,
1726 "end_at": phase.end_at,
1727 }
1728 },
1729 return_document=ReturnDocument.AFTER,
1730 )
1731 return updated_phase
1733 @classmethod
1734 def delete(cls, phase_id):
1735 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1736 return delete_phase
1738 @classmethod
1739 def deleteMany(cls, campaign_id):
1740 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1741 return delete_phases
1744class CustomViewCreation(GenericMongo):
1745 collection_name: ClassVar[str] = "custom_views"
1747 name: str
1748 configuration: list
1751class CustomView(CustomViewCreation):
1752 # Properties
1753 id: str | None = None
1755 # Foreign Key
1756 user_id: str
1758 # # Methods
1759 # @classmethod
1760 # def create(cls, form_custom_view: Self, user_id) -> list:
1761 # custom_view = CustomView(
1762 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1763 # )
1764 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1765 # return new_custom_view
1767 # @classmethod
1768 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1769 # updated_custom_view = cls.collection().find_one_and_update(
1770 # {"_id": ObjectId(custom_view_id)},
1771 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1772 # return_document=ReturnDocument.AFTER,
1773 # )
1774 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1775 # del updated_custom_view["_id"]
1776 # return cls(**updated_custom_view)
1778 # @classmethod
1779 # def delete(cls, custom_view_id) -> bool:
1780 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1781 # return deleted_custom_view.acknowledged
1784CustomViewUpdate = create_update_model(CustomView)
1787class Video(GenericMongo):
1788 collection_name: ClassVar[str] = "videos"
1790 # Properties
1791 name: str
1792 ip_addr: str
1793 username: str | None = None
1794 password: str | None = None
1796 # Methods
1797 @classmethod
1798 def get_all(cls, sort_by="_id") -> list[Self]:
1799 items = []
1800 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1801 items.append(cls.mongo_dict_to_object(dict_))
1802 return items
1804 @classmethod
1805 def get_video(cls, camera_id: ObjectId):
1806 camera = cls.get_from_id(camera_id)
1807 if camera is not None:
1808 return camera.name
1809 return None
1812class Command(GenericMongo):
1813 collection_name: ClassVar[str] = "commands"
1815 # Properties
1816 timestamp: datetime.datetime = None
1817 sent_at: float
1818 response_time: float = 0.0
1819 command_type: str
1820 description: str = ""
1821 succeeded: bool = False
1823 # Foreign key
1824 user_id: str
1826 @classmethod
1827 def collection(cls):
1828 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
1830 @classmethod
1831 def create(cls, command: Self):
1832 command = cls(
1833 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
1834 sent_at=command.sent_at,
1835 response_time=command.response_time,
1836 command_type=command.command_type,
1837 description=command.description,
1838 succeeded=command.succeeded,
1839 user_id=command.user_id,
1840 )
1841 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
1842 if new_command is None:
1843 return None
1844 return {"command_id": str(new_command.inserted_id)}
1846 def receive_response(self, response: dict):
1847 self.response_time = time.time() - self.sent_at
1848 self.succeeded = response.get("error", True) is False
1849 if self.description == "":
1850 self.description += response.get("message", "").rstrip()
1853class SignalsPresetCreation(GenericMongo):
1854 name: str
1855 signal_ids: list[str]
1858class SignalsPreset(SignalsPresetCreation):
1859 collection_name: ClassVar[str] = "signals_presets"
1861 user_id: str
1863 @classmethod
1864 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
1865 signals_preset = cls(
1866 user_id=user_id,
1867 name=signals_preset.name,
1868 signal_ids=signals_preset.signal_ids,
1869 )
1871 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
1873 return str(new_signal_preset.inserted_id)
1876SignalsPresetUpdate = create_update_model(SignalsPreset)
1879class LineStyle(str, Enum):
1880 solid = "solid"
1881 dotted = "dotted"
1882 dashed = "dashed"
1885class SignalAppearance:
1886 value_color: str
1887 forced_value_color: str
1890class GraphThemeCreation(GenericMongo):
1891 collection_name: ClassVar[str] = "graph_themes"
1893 name: str
1894 signal_id: str
1895 value_color: str = ""
1896 forced_value_color: str = ""
1897 value_line_style: LineStyle = LineStyle.solid
1898 forced_value_line_style: LineStyle = LineStyle.solid
1899 private: bool = True
1902class PublicGraphTheme(GraphThemeCreation):
1903 created_by_user: bool
1904 in_user_library: bool
1905 active_for_user: bool
1907 _current_user_id: str = ""
1909 @classproperty
1910 def custom_pipeline_steps(cls) -> dict[str, list]:
1911 return {
1912 "created_by_user": [
1913 {
1914 "$addFields": {
1915 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
1916 }
1917 }
1918 ],
1919 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
1920 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
1921 ],
1922 "in_user_library": [
1923 {
1924 "$addFields": {
1925 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
1926 }
1927 }
1928 ],
1929 "active_for_user": [
1930 {
1931 "$addFields": {
1932 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
1933 }
1934 }
1935 ],
1936 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
1937 "active": [
1938 {
1939 "$addFields": {
1940 "active": "$$REMOVE",
1941 }
1942 }
1943 ],
1944 "creator_id": [
1945 {
1946 "$addFields": {
1947 "creator_id": "$$REMOVE",
1948 }
1949 }
1950 ],
1951 }
1953 @classmethod
1954 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
1955 cls._current_user_id = user_id
1956 return super().response_from_query(query)
1958 @classmethod
1959 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
1960 query.in_user_library = "true"
1961 return cls.response_from_query(query, user_id)
1963 @classmethod
1964 def get_from_id(cls, item_id, user_id: str) -> Self | None:
1965 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
1967 @classmethod
1968 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
1969 cls._current_user_id = user_id
1970 return super().get_by_attribute(attribute_name, attribute_value)
1972 @classmethod
1973 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
1974 cls._current_user_id = user_id
1975 return super().get_one_by_attribute(attribute_name, attribute_value)
1977 @classmethod
1978 def get_all(cls, sort_by: str, user_id: str):
1979 cls._current_user_id = user_id
1980 return super().get_all(sort_by)
1982 @classmethod
1983 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
1984 pipeline = [
1985 {
1986 "$match": {
1987 "active": {"$eq": user_id},
1988 "signal_id": {"$in": signal_ids},
1989 }
1990 },
1991 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
1992 {"$replaceRoot": {"newRoot": "$firstDocument"}},
1993 {
1994 "$project": {
1995 "_id": 0,
1996 "signal_id": 1,
1997 "value_color": 1,
1998 "forced_value_color": 1,
1999 "value_line_style": 1,
2000 "forced_value_line_style": 1,
2001 }
2002 },
2003 ]
2005 result = {}
2007 cursor = cls.collection().aggregate(pipeline)
2008 for document in cursor:
2009 signal_id = document["signal_id"]
2010 del document["signal_id"]
2011 result[signal_id] = document
2013 return result
2016GraphThemeUpdate = create_update_model(PublicGraphTheme)
2019class PrivateGraphTheme(GraphThemeCreation):
2020 # private
2021 creator_id: str
2022 in_library: list[str]
2023 active: list[str]
2025 @classmethod
2026 def create(
2027 cls,
2028 creator_id: str,
2029 name: str,
2030 signal_id: str,
2031 value_color: str,
2032 forced_value_color: str,
2033 value_line_style: LineStyle,
2034 forced_value_line_style: LineStyle,
2035 private: bool,
2036 ):
2037 color_setting = cls(
2038 creator_id=creator_id,
2039 name=name,
2040 signal_id=signal_id,
2041 value_color=value_color,
2042 forced_value_color=forced_value_color,
2043 value_line_style=value_line_style,
2044 forced_value_line_style=forced_value_line_style,
2045 private=private,
2046 in_library=[creator_id],
2047 active=[creator_id],
2048 )
2050 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2051 color_setting.id = str(new_color_setting.inserted_id)
2052 return color_setting
2054 def update(self, update_dict: dict, user_id: str):
2055 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2056 if in_user_lib and user_id not in self.in_library:
2057 self.in_library.append(user_id)
2058 elif not in_user_lib and user_id in self.in_library:
2059 self.in_library.remove(user_id)
2060 update_dict["in_library"] = self.in_library
2061 del update_dict["in_user_library"]
2063 if (active_for_user := update_dict.get("active_for_user")) is not None:
2064 if active_for_user and user_id not in self.active:
2065 self.active.append(user_id)
2066 elif not active_for_user and user_id in self.active:
2067 self.active.remove(user_id)
2068 update_dict["active"] = self.active
2069 del update_dict["active_for_user"]
2071 if update_dict.get("created_by_user") is not None:
2072 del update_dict["created_by_user"]
2074 self.collection().find_one_and_update(
2075 {"_id": ObjectId(self.id)},
2076 {"$set": update_dict},
2077 )
2079 return {}