Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%
1394 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-25 12:17 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-25 12:17 +0000
1from functools import cached_property
2import os
3import re
4import io
5import time
6import csv
7from typing import Self, ClassVar, Any, Literal, get_args, Annotated
8import datetime
9import math
10import bisect
11from enum import Enum
12import logging
13import copy
14import asyncio
15import zipfile
17import requests
18import ping3
19import pytz
20from bson.objectid import ObjectId
21from pymongo import ASCENDING, ReturnDocument
22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator
23import numpy as npy
24import lttb
25import h5py
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 get_signal_collections_batch,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 devices_states_database,
36)
37from twinpad_backend.responses import ListResponse
38from twinpad_backend.messages import RabbitMQClient
39from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm
41TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
42SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"]
43DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"]
44MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"]
47RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
48MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
49SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
50HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
51DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
53DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
54NUMBER_SAMPLES_DATABASE_UPDATE = 120
56logger = logging.getLogger("uvicorn.error")
59class DeleteInfo(BaseModel):
60 is_deleted: bool
61 detail: str
64class classproperty:
65 """
66 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
67 Found here: https://stackoverflow.com/a/76301341
68 """
70 def __init__(self, func):
71 self.fget = func
73 def __get__(self, _, owner):
74 return self.fget(owner)
77def create_update_model(model):
78 fields = {}
80 for field_name, field_annotation in model.model_fields.items():
81 if field_name != "id":
82 fields[field_name] = (field_annotation.annotation | None, None)
84 query_name = model.__name__ + "Update"
85 return create_model(query_name, **fields)
88def get_utc_date_from_timestamp(timestamp: float):
89 return datetime.datetime.fromtimestamp(timestamp).isoformat()
92def downsample_list(time_vector: list, values: list, max_number_samples: int):
93 if len(time_vector) < max_number_samples:
94 return time_vector, values
96 time_vector_copy = copy.deepcopy(time_vector)
97 values_copy = copy.deepcopy(values)
99 none_group_bounds = []
100 none_group_index = -1
101 index = -1
102 # LTTB doesn't handle None values so remove them
103 while values_copy.count(None) > 0:
104 # Store bounds of None value groups so we can insert them back after the downsampling
105 if (new_index := values_copy.index(None)) != index:
106 none_group_bounds.append([time_vector_copy.pop(new_index)])
107 none_group_index += 1
108 elif len(none_group_bounds[none_group_index]) < 2:
109 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
110 else:
111 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
112 values_copy.pop(new_index)
113 index = new_index
114 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
116 try:
117 values_array = npy.array([time_vector_copy, values_copy]).T
118 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
120 new_time_vector = interpolated_values[:, 0].tolist()
121 new_values = interpolated_values[:, 1].tolist()
122 except ValueError:
123 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
124 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
125 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
126 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
127 return new_time_vector, new_values_nan_to_none
129 # insert back None values at the correct timestamps
130 for none_group in none_group_bounds:
131 start_index = npy.searchsorted(new_time_vector, none_group[0])
132 new_time_vector[start_index:start_index] = none_group
133 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
135 return new_time_vector, new_values
138def is_of_type(value, wanted_type):
139 if wanted_type is float:
140 return isinstance(value, (int, float))
141 return isinstance(value, wanted_type)
144# Models
145class TwinPadModel(BaseModel):
146 @classmethod
147 def dict_to_object(cls, dict_):
148 return cls.model_validate(dict_)
150 def to_dict(self, exclude=None):
151 dict_ = self.model_dump(exclude=exclude, mode="json")
152 return dict_
155def validate_mongo_id(v):
156 if not ObjectId.is_valid(v):
157 raise ValueError("Invalid MongoDB id")
158 return str(v)
161MongoId = Annotated[str, BeforeValidator(validate_mongo_id)]
164def validate_12_hex(v: str) -> str:
165 if not re.fullmatch(r"[0-9a-fA-F]{12}", v):
166 raise ValueError("ID must be a 12-character hexadecimal string")
167 return v
170DeviceId = Annotated[str, BeforeValidator(validate_12_hex)]
173class GenericMongo(TwinPadModel):
174 id: MongoId | None = None
175 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
177 @classmethod
178 def collection(cls):
179 return get_collection(systems_database, cls.collection_name, create=True)
181 @classmethod
182 def response_from_query(cls, query) -> ListResponse[Self]:
183 request_filters = query.mongodb_filter()
184 items = []
186 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
187 sort_dict = {}
188 for sort in query.sort_by.split(","):
189 if ":" in sort:
190 sort_field, sort_order = sort.split(":")
191 sort_order = int(sort_order)
192 else:
193 sort_field = sort
194 sort_order = 1
195 sort_dict[sort_field] = sort_order
197 collection = get_collection(systems_database, cls.collection_name, create=True)
198 total = collection.count_documents(request_filters)
200 pipeline = []
201 added_properties = []
202 if "$and" in request_filters:
203 for request_filter in request_filters["$and"]:
204 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
205 if filtered_property in request_filter:
206 pipeline.extend(pipeline_steps)
207 added_properties.append(filtered_property)
208 else:
209 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
210 if filtered_property in request_filters:
211 pipeline.extend(pipeline_steps)
212 added_properties.append(filtered_property)
213 pipeline.append({"$match": request_filters})
215 for sort_field in sort_dict:
216 if sort_field in cls.custom_pipeline_steps:
217 pipeline.extend(cls.custom_pipeline_steps[sort_field])
218 added_properties.append(sort_field)
219 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
221 if (query.limit is not None) and (query.limit != 0):
222 pipeline.append({"$limit": query.limit})
224 for filtered_property, step in cls.custom_pipeline_steps.items():
225 if filtered_property not in added_properties:
226 pipeline.extend(step)
228 cursor = collection.aggregate(pipeline)
230 for item_dict in cursor:
231 items.append(cls.mongo_dict_to_object(item_dict))
233 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
235 @classmethod
236 def get_from_id(cls, item_id) -> Self | None:
237 return cls.get_one_by_attribute("_id", ObjectId(item_id))
239 @classmethod
240 def mongo_dict_to_object(cls, mongo_dict):
241 mongo_dict["id"] = str(mongo_dict["_id"])
242 del mongo_dict["_id"]
243 return cls.dict_to_object(mongo_dict)
245 @classmethod
246 def get_by_attribute(cls, attribute_name: str, attribute_value):
247 """Returns all items that match the attribute with value."""
248 pipeline = []
249 if attribute_name in cls.custom_pipeline_steps:
250 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
251 pipeline.append({"$match": {attribute_name: attribute_value}})
252 for key, step in cls.custom_pipeline_steps.items():
253 if key != attribute_name:
254 pipeline.extend(step)
255 items = cls.collection().aggregate(pipeline)
256 if items is None:
257 return None
258 return [cls.mongo_dict_to_object(d) for d in items]
260 @classmethod
261 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
262 pipeline = []
263 if attribute_name in cls.custom_pipeline_steps:
264 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
265 pipeline.append({"$match": {attribute_name: attribute_value}})
266 pipeline.append({"$limit": 1})
267 for key, step in cls.custom_pipeline_steps.items():
268 if key != attribute_name:
269 pipeline.extend(step)
270 items = cls.collection().aggregate(pipeline).to_list()
271 if len(items) == 0:
272 return None
273 return cls.mongo_dict_to_object(items[0])
275 @classmethod
276 def get_all(cls, sort_by="_id") -> list[Self]:
277 items = []
278 pipeline = []
279 if sort_by in cls.custom_pipeline_steps:
280 pipeline.extend(cls.custom_pipeline_steps[sort_by])
281 pipeline.append({"$sort": {sort_by: ASCENDING}})
282 for key, step in cls.custom_pipeline_steps.items():
283 if key != sort_by:
284 pipeline.extend(step)
285 for dict_ in cls.collection().aggregate(pipeline):
286 items.append(cls.mongo_dict_to_object(dict_))
287 return items
289 @classmethod
290 def get_number_documents(cls):
291 collection = get_collection(systems_database, cls.collection_name)
292 if collection is None:
293 return 0
294 return collection.count_documents(
295 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]}
296 )
298 def insert(self):
299 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"}))
300 self.id = str(insert_result.inserted_id)
301 return self.id
303 def update(self, update_dict):
304 for key, value in update_dict.items():
305 setattr(self, key, value)
306 self.collection().find_one_and_update(
307 {"_id": ObjectId(self.id)},
308 {"$set": update_dict},
309 return_document=ReturnDocument.AFTER,
310 )
312 return self
314 def delete(self):
315 result = self.collection().delete_one({"_id": ObjectId(self.id)})
316 return result.deleted_count > 0
319class User(GenericMongo):
320 collection_name: ClassVar[str] = "users"
322 firstname: str
323 lastname: str
324 email: str
325 password: str
326 is_blocked: bool | None = False
327 is_admin: bool | None = False
328 is_connected: bool | None = False
329 company_id: str | None = None
331 def to_dict(self, exclude: set = None):
332 if exclude is None:
333 exclude = set()
334 exclude.add("password")
335 return GenericMongo.to_dict(self, exclude=exclude)
337 @classmethod
338 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
339 users = cls.get_all()
340 if not users:
341 is_admin = True
342 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
343 user_collection = get_collection(systems_database, "users", create=True)
344 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
345 if new_user is None:
346 return None
347 return {"user_id": str(new_user.inserted_id)}
349 @classmethod
350 def update_info(cls, user: "UserUpdate", user_id: str): # type: ignore
351 updated_user = cls.collection().find_one_and_update(
352 {"_id": ObjectId(user_id)},
353 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
354 return_document=ReturnDocument.AFTER,
355 )
356 updated_user["id"] = str(updated_user["_id"])
357 del (updated_user["_id"], updated_user["is_connected"])
358 return cls(**updated_user)
361UserUpdate = create_update_model(User)
364class Mode(TwinPadModel):
365 mode_id: int
366 name: str
367 frequency_multiplier: float
368 min_frequency: float
371class DeviceUpdate(TwinPadModel):
372 mode_id: int
375class Device(GenericMongo):
376 collection_name: ClassVar[str] = "devices"
378 device_id: DeviceId
379 name: str
380 description: str = ""
381 modes: list[Mode]
382 current_mode_id: int | None = None
383 last_ping: float | None = None
384 petri_network: Any
385 pid: Any
386 load: float | None = None
387 tokens: list[int] = Field(default_factory=list)
388 status: str
390 async def change_mode(self, update_dict, current_user: User):
391 has_error = False
393 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
394 has_error = True
395 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
396 elif self.current_mode_id is not None:
397 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
398 else:
399 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
400 command = Command(
401 sent_at=time.time(),
402 command_type="Mode change",
403 description=description,
404 user_id=current_user.id,
405 )
407 if has_error:
408 command.response_time = 0
409 command.succeeded = False
410 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
411 else:
412 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id)
413 command.receive_response(response)
415 Command.create(command)
416 return response
418 @classmethod
419 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
420 devices_by_id = {}
421 for signal_id in signal_ids:
422 device_id = signal_id.split(".")[0]
423 if device_id not in devices_by_id:
424 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
425 return devices_by_id
428class DeviceSetup(GenericMongo):
429 collection_name: ClassVar[str] = "device_setups"
431 device_ids: list[str]
432 active: bool = False
433 variable_mapping: dict[str, str]
436DeviceSetupUpdate = create_update_model(DeviceSetup)
439class DeviceState(GenericMongo):
440 collection_name: ClassVar[str] = "devices_states"
442 timestamp: float
443 mode: str | None = None
444 load: float | None = None
445 places: list[str] = Field(default_factory=list)
446 modified_properties: list[str] = Field(default_factory=list)
448 @classmethod
449 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]:
450 req_filter = query.mongodb_filter()
451 items = []
452 if ":" in query.sort_by:
453 sort_field, sort_order = query.sort_by.split(":")
454 sort_order = int(sort_order)
455 else:
456 sort_field = query.sort_by
457 sort_order = 1
458 collection = get_collection(devices_states_database, device_id)
459 if collection is None:
460 total = 0
461 cursor = []
462 else:
463 total = collection.count_documents(req_filter)
464 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
465 if (query.limit is not None) and (query.limit != 0):
466 cursor = cursor.limit(query.limit)
467 for item_dict in cursor:
468 items.append(
469 cls(
470 timestamp=item_dict.get("precise_timestamp"),
471 mode=item_dict.get("mode", None),
472 load=item_dict.get("load", None),
473 places=item_dict.get("places", []),
474 modified_properties=item_dict.get("modified_properties", []),
475 )
476 )
477 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
480class SignalSample(TwinPadModel):
481 signal_id: str
482 timestamp: float
483 value: float | int | str | bool | None
484 forced_value: float | int | str | bool | None = None
486 @classmethod
487 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
489 collection = get_signal_collection(signal_id)
490 if collection is None:
491 return None
493 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
494 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
495 bucket = get_signal_collection(f"system.buckets.{signal_id}")
496 first_bucket = None
497 if bucket is not None:
498 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
499 if first_bucket is not None:
500 sample_data = collection.find_one(
501 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
502 )
503 else:
504 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
506 if sample_data is None:
507 return None
509 timestamp = sample_data["precise_timestamp"]
511 return cls(
512 signal_id=signal_id,
513 timestamp=timestamp,
514 value=sample_data.get("value", None),
515 forced_value=sample_data.get("forced_value", None),
516 )
518 @classmethod
519 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
520 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
522 @classmethod
523 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
524 last_value_collection = get_signal_collection("last_values", create=False)
525 signals_collection = get_collection(systems_database, "signals", create=True)
527 sample_data = None
528 if last_value_collection is not None:
529 sample_data = last_value_collection.find_one({"signal_id": signal_id}, sort={"precise_timestamp": -1})
531 # If there is no data, check if the signal's is up, as they don't send duplicate values
532 if sample_data is None:
533 if signals_collection.count_documents({"status.status": "up", "signal_id": signal_id}) < 1:
534 return None
536 # Same workaround as above function, very effective to narrow down big sets of data
537 bucket = get_signal_collection(f"system.buckets.{signal_id}")
538 last_bucket = None
539 if bucket is not None:
540 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
541 signal_collection = get_signal_collection(signal_id)
542 if last_bucket is not None and signal_collection is not None:
543 sample_data = signal_collection.find_one(
544 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
545 sort={"precise_timestamp": -1},
546 )
547 else:
548 sample_data = signal_collection.find_one({}, sort={"precise_timestamp": -1})
550 if sample_data is None:
551 return None
553 timestamp = sample_data.get("precise_timestamp")
554 # Align the timestamp with the device's last ping, cannot align with current time to avoid false reports if device is down
555 if device is None:
556 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
557 if device is not None and device.last_ping is not None:
558 if timestamp is None:
559 timestamp = device.last_ping
560 else:
561 timestamp = max(timestamp, device.last_ping)
563 return cls(
564 signal_id=signal_id,
565 timestamp=timestamp,
566 value=sample_data.get("value", None),
567 forced_value=sample_data.get("forced_value", None),
568 )
570 @classmethod
571 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
572 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
573 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
576class SignalData(TwinPadModel):
577 signal_id: str
578 forcible: bool = True
579 time_vector: list[float]
580 values: list[float | int | str | None]
581 forced_values: list[float | int | str | None]
583 data_start: float | None = None
584 data_end: float | None = None
586 number_samples: int = 0
587 number_samples_db: int = 0
589 db_query_time: float = 0.0
590 init_time: float = 0.0
591 data_processing_time: float = 0.0
593 phase_id: str | None = None
595 @classmethod
596 def get_from_signal_id(
597 cls,
598 signal_id: str,
599 min_timestamp: float = None,
600 max_timestamp: float = None,
601 window_min_timestamp: float = None,
602 window_max_timestamp: float = None,
603 interpolate_bounds: bool = True,
604 max_documents: int = None,
605 collection=None,
606 ) -> Self:
608 now = time.time()
610 req_signal = {}
611 if min_timestamp is not None:
612 req_signal.setdefault("timestamp", {})
613 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
614 if max_timestamp is not None:
615 req_signal.setdefault("timestamp", {})
616 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
618 if collection is None:
619 collection = get_signal_collection(signal_id)
620 if collection is None:
621 return cls(
622 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
623 )
625 db_req_start = time.time()
627 sort_step = {"$sort": {"precise_timestamp": 1}}
628 number_results = collection.count_documents(req_signal)
630 pipeline = []
631 if req_signal:
632 pipeline.append({"$match": req_signal}) # Filter data if needed
634 pipeline.extend(
635 [
636 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
637 sort_step,
638 ]
639 )
641 if max_documents is not None and max_documents < number_results:
642 unsampling_ratio = math.ceil(number_results / max_documents)
643 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
644 pipeline.extend(
645 [
646 {
647 "$setWindowFields": {
648 "sortBy": {"precise_timestamp": 1},
649 "output": {"index": {"$documentNumber": {}}},
650 }
651 },
652 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
653 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
654 {"$replaceRoot": {"newRoot": "$doc"}},
655 {"$unset": ["index", "group_id"]},
656 {"$sort": {"precise_timestamp": 1}},
657 ]
658 )
660 # logger.info(f"pipeline: %s", str(pipeline))
661 cursor = collection.aggregate(pipeline)
662 db_req_time = time.time() - db_req_start
664 init_time = time.time()
666 results = cursor.to_list()
667 time_vector = []
668 values = []
669 forced_values = []
670 for s in results:
671 time_vector.append(s["precise_timestamp"])
672 values.append(s.get("value", None))
673 forced_values.append(s.get("forced_value", None))
675 signal = Signal.get_from_signal_id(signal_id)
676 if signal is None:
677 return cls(
678 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
679 )
680 class_ = signal.signal_data_class
682 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
683 time_vector, values, forced_values = cls.interpolate_bounds(
684 class_,
685 collection,
686 signal_id,
687 time_vector,
688 values,
689 forced_values,
690 window_min_timestamp,
691 window_max_timestamp,
692 )
694 if values:
695 # TODO: check below. a bit strange
696 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
697 # Adding last value as it should be repeated
698 time_vector.append(now)
699 values.append(values[-1])
700 forced_values.append(forced_values[-1])
702 init_time = time.time() - init_time
704 # See line 292 for explanation
705 bucket = get_signal_collection(f"system.buckets.{signal_id}")
706 first_bucket = None
707 if bucket is not None:
708 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
709 if first_bucket is not None:
710 data_start = first_bucket["control"]["min"]["precise_timestamp"]
711 else:
712 data_start = None
714 last_bucket = None
715 if bucket is not None:
716 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
717 if last_bucket is not None:
718 data_end = last_bucket["control"]["max"]["precise_timestamp"]
719 else:
720 data_end = None
722 return class_(
723 signal_id=signal_id,
724 forcible=signal.forcible,
725 time_vector=time_vector,
726 values=values,
727 forced_values=forced_values,
728 data_start=data_start,
729 data_end=data_end,
730 number_samples=len(values),
731 number_samples_db=number_results,
732 db_query_time=db_req_time,
733 init_time=init_time,
734 )
736 @staticmethod
737 def interpolate_bounds(
738 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
739 ):
740 sample_right = None
741 # Fetching right side value & interpolation
742 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
743 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
744 sample_right = collection.find_one(
745 {
746 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
747 "value": {"$exists": True},
748 },
749 sort={"precise_timestamp": -1},
750 )
751 if sample_right:
752 if time_vector:
753 right_sd = class_(
754 signal_id=signal_id,
755 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
756 values=[values[-1], sample_right.get("value", None)],
757 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
758 )
759 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
760 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
761 else:
762 max_ts_value = sample_right.get("value", None)
763 max_ts_forced_value = sample_right.get("forced_value", None)
764 time_vector.append(window_max_timestamp)
765 values.append(max_ts_value)
766 forced_values.append(max_ts_forced_value)
768 # Fetching left side value & interpolation
769 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
770 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
771 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
772 sample_left = sample_right
773 sample_left = collection.find_one(
774 {
775 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
776 "value": {"$exists": True},
777 },
778 sort={"precise_timestamp": -1},
779 )
781 if sample_left:
782 if time_vector:
783 left_sd = class_(
784 signal_id=signal_id,
785 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
786 values=[sample_left["value"], values[0]],
787 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
788 )
789 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
790 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
791 else:
792 min_ts_value = sample_left.get("value", None)
793 min_ts_forced_value = sample_left.get("forced_value", None)
794 time_vector.insert(0, window_min_timestamp)
795 values.insert(0, min_ts_value)
796 forced_values.insert(0, min_ts_forced_value)
798 return time_vector, values, forced_values
800 def interpolate_values(self, new_time_vector: list[float]):
801 return self.interpolate(new_time_vector, self.values)
803 def interpolate_forced_values(self, new_time_vector: list[float]):
804 return self.interpolate(new_time_vector, self.forced_values)
806 def uniform_desampling(self, number_samples_max: int = None) -> Self:
807 if number_samples_max is None or self.number_samples <= number_samples_max:
808 return self
810 data_processing_time = time.time()
812 new_time_vector = npy.linspace(self.time_vector[0], self.time_vector[-1], number_samples_max).tolist()
813 values = self.interpolate_values(new_time_vector)
814 forced_values = self.interpolate_forced_values(new_time_vector)
815 number_samples = len(new_time_vector)
817 data_processing_time = time.time() - data_processing_time
819 return self.__class__(
820 signal_id=self.signal_id,
821 time_vector=new_time_vector,
822 values=values,
823 forced_values=forced_values,
824 number_samples=number_samples,
825 number_samples_db=self.number_samples_db,
826 data_start=self.data_start,
827 data_end=self.data_end,
828 db_query_time=self.db_query_time,
829 init_time=self.init_time,
830 data_processing_time=self.data_processing_time + data_processing_time,
831 phase_id=self.phase_id,
832 )
834 def min_max_downsampling(self, number_samples_max: int) -> Self:
835 return self.uniform_desampling(number_samples_max)
837 def interest_window_desampling(
838 self,
839 window_max_number_samples: int,
840 outside_max_number_samples: int,
841 window_min_timestamp: float | None = None,
842 window_max_timestamp: float | None = None,
843 ) -> Self:
844 """Performs a sampling in a window of interest and outside."""
846 if not self.time_vector:
847 return self
849 if window_min_timestamp is None:
850 window_min_timestamp = self.time_vector[0]
851 if window_max_timestamp is None:
852 window_max_timestamp = self.time_vector[-1]
854 data_processing_time = time.time()
856 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
857 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
859 time_vector_before = self.time_vector[:index_window_start]
860 time_vector_window = self.time_vector[index_window_start:index_window_end]
861 time_vector_after = self.time_vector[index_window_end:]
863 # Resampling window
864 if time_vector_window:
865 # Ensurring window bounds
866 if time_vector_window[0] != window_min_timestamp:
867 time_vector_window.insert(0, window_min_timestamp)
868 if time_vector_window[-1] != window_max_timestamp:
869 time_vector_window.append(window_max_timestamp)
870 else:
871 time_vector_window = [window_min_timestamp, window_max_timestamp]
873 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
874 # Resampling
875 new_window_time_vector = npy.linspace(
876 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
877 ).tolist()
878 time_vector_window = new_window_time_vector
880 # Resampling outside
881 time_vector_before, time_vector_after = SignalData.resample_outside_window(
882 time_vector_before, time_vector_after, outside_max_number_samples
883 )
885 new_time_vector = time_vector_before + time_vector_window + time_vector_after
886 values = self.interpolate_values(new_time_vector)
887 forced_values = self.interpolate_forced_values(new_time_vector)
888 number_samples = len(values)
890 data_processing_time = time.time() - data_processing_time
892 return self.__class__(
893 signal_id=self.signal_id,
894 forcible=self.forcible,
895 time_vector=new_time_vector,
896 values=values,
897 forced_values=forced_values,
898 number_samples=number_samples,
899 number_samples_db=self.number_samples,
900 data_start=self.data_start,
901 data_end=self.data_end,
902 db_query_time=self.db_query_time,
903 init_time=self.init_time,
904 data_processing_time=self.data_processing_time + data_processing_time,
905 )
907 def zero_time_vector(self, data_start: float):
908 data_processing_time = time.time()
909 if len(self.time_vector) == 0:
910 return self
911 time_vector = npy.array(self.time_vector) - data_start
912 data_processing_time = time.time() - data_processing_time
914 return self.__class__(
915 signal_id=self.signal_id,
916 time_vector=time_vector,
917 values=self.values,
918 forced_values=self.forced_values,
919 number_samples=self.number_samples,
920 number_samples_db=self.number_samples_db,
921 data_start=time_vector[0],
922 data_end=time_vector[-1],
923 db_query_time=self.db_query_time,
924 init_time=self.init_time,
925 data_processing_time=self.data_processing_time + data_processing_time,
926 )
928 def csv_export(self):
929 output = io.StringIO()
930 writer = csv.writer(output)
931 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
932 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
933 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
934 return output.getvalue().encode("utf-8")
936 def prestoplot_export(self):
937 clean_signal_id = self.signal_id.replace(".", "_")
938 if clean_signal_id[0].isnumeric():
939 clean_signal_id = "_" + clean_signal_id
941 output = io.StringIO()
942 output.write("# Encoding:\tUTF-8\n")
943 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
944 output.write("ISO8601\tnone\tnone\n")
945 output.write(f"# Description :\t{clean_signal_id}\n")
947 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
948 output.write(
949 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
950 )
951 return output.getvalue().encode("utf-8")
953 @staticmethod
954 def resample_outside_window(time_vector_left, time_vector_right, outside_max_number_samples):
955 number_samples_left = len(time_vector_left)
956 number_samples_right = len(time_vector_right)
958 if (
959 outside_max_number_samples is None
960 or (number_samples_left + number_samples_right) <= outside_max_number_samples
961 ):
962 return time_vector_left, time_vector_right
964 new_time_vector_left = time_vector_left
965 new_time_vector_right = time_vector_right
967 new_number_samples_before = min(
968 number_samples_left,
969 math.ceil(outside_max_number_samples * number_samples_left / (number_samples_left + number_samples_right)),
970 )
971 new_number_samples_after = min(
972 number_samples_right,
973 math.ceil(outside_max_number_samples * number_samples_right / (number_samples_left + number_samples_right)),
974 )
975 # Adjusting numbers as math.ceil can do +1 on sum
976 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
977 if new_number_samples_before > new_number_samples_after:
978 new_number_samples_before -= 1
979 else:
980 new_number_samples_after -= 1
982 if new_number_samples_before > 0:
983 new_time_vector_left = npy.linspace(
984 time_vector_left[0], time_vector_left[-1], new_number_samples_before
985 ).tolist()
987 if new_number_samples_after > 0:
988 new_time_vector_right = npy.linspace(
989 time_vector_right[-1], time_vector_right[0], new_number_samples_after
990 ).tolist()[::-1]
992 return new_time_vector_left, new_time_vector_right
995class NumericSignalData(SignalData):
996 data_type: str = "float"
997 values: list[float | int | None]
998 forced_values: list[float | int | None]
1000 def interpolate(self, new_time_vector: list[float], items):
1001 items = [npy.nan if s is None else s for s in items]
1002 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
1004 def uniform_desampling(self, number_samples_max: int = None) -> Self:
1005 if number_samples_max is None or self.number_samples <= number_samples_max:
1006 return self
1008 data_processing_time = time.time()
1010 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
1011 forced_values = self.interpolate_forced_values(time_vector)
1012 number_samples = len(time_vector)
1014 data_processing_time = time.time() - data_processing_time
1016 return self.__class__(
1017 signal_id=self.signal_id,
1018 time_vector=time_vector,
1019 values=values,
1020 forced_values=forced_values,
1021 number_samples=number_samples,
1022 number_samples_db=self.number_samples_db,
1023 data_start=self.data_start,
1024 data_end=self.data_end,
1025 db_query_time=self.db_query_time,
1026 init_time=self.init_time,
1027 data_processing_time=self.data_processing_time + data_processing_time,
1028 )
1030 def min_max_downsampling(self, number_samples_max: int) -> Self:
1031 if self.number_samples < number_samples_max:
1032 return self
1034 data_processing_time = time.time()
1036 number_bins = number_samples_max // 2
1038 time_vector = npy.array(self.time_vector, dtype=npy.float64)
1039 values = npy.array(self.values, dtype=npy.float64)
1040 forced_values = npy.array(self.forced_values, dtype=npy.float64)
1042 points_per_bin = self.number_samples // number_bins
1044 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above
1045 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for
1046 if self.number_samples - number_bins * points_per_bin > 1:
1047 points_per_bin += 1
1048 number_bins = self.number_samples // points_per_bin + 1
1049 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan)
1050 time_vector = npy.concatenate([time_vector, nan_points_to_add])
1051 values = npy.concatenate([values, nan_points_to_add])
1052 forced_values = npy.concatenate([forced_values, nan_points_to_add])
1054 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin)
1055 values_matrix = values.reshape(number_bins, points_per_bin)
1056 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin)
1058 indexes_min = npy.zeros(number_bins, dtype="int64")
1059 indexes_max = npy.zeros(number_bins, dtype="int64")
1061 for row in range(number_bins):
1062 min_value = values_matrix[row, 0]
1063 max_value = values_matrix[row, 0]
1064 for column in range(points_per_bin):
1065 if values_matrix[row, column] < min_value:
1066 min_value = values_matrix[row, column]
1067 indexes_min[row] = column
1068 elif values_matrix[row, column] > max_value:
1069 max_value = values_matrix[row, column]
1070 indexes_max[row] = column
1072 row_index = npy.repeat(npy.arange(number_bins), 2)
1073 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel()
1075 data_processing_time = time.time() - data_processing_time
1077 new_time_vector = timestamps_matrix[row_index, column_index]
1078 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector)
1079 new_values = values_matrix[row_index, column_index]
1080 new_values = npy.where(npy.isnan(new_values), None, new_values)
1081 new_forced_values = forced_values_matrix[row_index, column_index]
1082 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1084 # Make sure there are no None values for the time vector
1085 # Numpy operator to ensure no element in new time vector is None
1086 time_vector_filter = new_time_vector != None # pylint: disable=singleton-comparison
1087 new_time_vector = new_time_vector[time_vector_filter]
1088 new_values = new_values[time_vector_filter]
1089 new_forced_values = new_forced_values[time_vector_filter]
1091 return self.__class__(
1092 signal_id=self.signal_id,
1093 time_vector=new_time_vector,
1094 values=new_values,
1095 forced_values=new_forced_values,
1096 number_samples=number_bins * 2,
1097 number_samples_db=self.number_samples_db,
1098 data_start=self.data_start,
1099 data_end=self.data_end,
1100 db_query_time=self.db_query_time,
1101 init_time=self.init_time,
1102 data_processing_time=self.data_processing_time + data_processing_time,
1103 phase_id=self.phase_id,
1104 )
1106 def interest_window_desampling(
1107 self,
1108 window_max_number_samples: int,
1109 outside_max_number_samples: int,
1110 window_min_timestamp: float | None = None,
1111 window_max_timestamp: float | None = None,
1112 ) -> Self:
1113 """Performs a sampling in a window of interest and outside."""
1115 if not self.time_vector:
1116 return self
1118 if window_min_timestamp is None:
1119 window_min_timestamp = self.time_vector[0]
1120 if window_max_timestamp is None:
1121 window_max_timestamp = self.time_vector[-1]
1123 data_processing_time = time.time()
1125 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1126 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1128 time_vector_before = self.time_vector[:index_window_start]
1129 time_vector_window = self.time_vector[index_window_start:index_window_end]
1130 time_vector_after = self.time_vector[index_window_end:]
1132 values_before = self.values[:index_window_start]
1133 values_window = self.values[index_window_start:index_window_end]
1134 values_after = self.values[index_window_end:]
1135 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1136 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1138 # Resampling window
1139 if time_vector_window:
1140 # Ensurring window bounds
1141 if time_vector_window[0] != window_min_timestamp:
1142 time_vector_window.insert(0, window_min_timestamp)
1143 values_window.insert(0, window_min_value)
1144 if time_vector_window[-1] != window_max_timestamp:
1145 time_vector_window.append(window_max_timestamp)
1146 values_window.append(window_max_value)
1147 else:
1148 time_vector_window = [window_min_timestamp, window_max_timestamp]
1149 values_window = [window_min_value, window_max_value]
1151 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
1152 # Resampling
1153 time_vector_window, values_window = downsample_list(
1154 time_vector_window, values_window, window_max_number_samples
1155 )
1157 # Resampling outside
1158 number_samples_before = len(time_vector_before)
1159 number_samples_after = len(time_vector_after)
1160 if (
1161 outside_max_number_samples is not None
1162 and (number_samples_before + number_samples_after) > outside_max_number_samples
1163 ):
1164 new_number_samples_before = min(
1165 number_samples_before,
1166 math.ceil(
1167 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1168 ),
1169 )
1170 new_number_samples_after = min(
1171 number_samples_after,
1172 math.ceil(
1173 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1174 ),
1175 )
1176 # Adjusting numbers as math.ceil can do +1 on sum
1177 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1178 if new_number_samples_before > new_number_samples_after:
1179 new_number_samples_before -= 1
1180 else:
1181 new_number_samples_after -= 1
1183 if new_number_samples_before > 0:
1184 time_vector_before, values_before = downsample_list(
1185 time_vector_before, values_before, new_number_samples_before
1186 )
1188 if new_number_samples_after > 0:
1189 time_vector_after, values_after = downsample_list(
1190 time_vector_after, values_after, new_number_samples_after
1191 )
1193 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1194 values = values_before + values_window + values_after
1195 forced_values = self.interpolate_forced_values(new_time_vector)
1196 number_samples = len(values)
1198 data_processing_time = time.time() - data_processing_time
1200 return self.__class__(
1201 signal_id=self.signal_id,
1202 time_vector=new_time_vector,
1203 values=values,
1204 forced_values=forced_values,
1205 number_samples=number_samples,
1206 number_samples_db=self.number_samples,
1207 data_start=self.data_start,
1208 data_end=self.data_end,
1209 db_query_time=self.db_query_time,
1210 init_time=self.init_time,
1211 data_processing_time=self.data_processing_time + data_processing_time,
1212 )
1215class StringSignalData(SignalData):
1216 data_type: str = "str"
1217 values: list[str | None]
1218 forced_values: list[str | None]
1220 def interpolate(self, new_time_vector: list[float], items):
1221 # Find the indices of the values in xp that are just smaller or equal to x
1222 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1223 indices = npy.clip(indices, 0, len(items) - 1)
1224 # Return the corresponding left string values from fp
1225 return [items[i] for i in indices]
1228class SignalsData(TwinPadModel):
1229 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1230 data_processing_time: float
1231 data_start: float | None
1232 data_end: float | None
1234 @classmethod
1235 def get_from_signal_ids(
1236 cls,
1237 signal_ids: list[str],
1238 min_timestamp: float = None,
1239 max_timestamp: float = None,
1240 window_min_timestamp: float = None,
1241 window_max_timestamp: float = None,
1242 interpolate_bounds: bool = True,
1243 max_documents: int = None,
1244 ) -> Self:
1245 signals_data = []
1246 data_start = None
1247 data_end = None
1248 if max_timestamp is None:
1249 max_timestamp = time.time()
1250 data_processing_time = 0.0
1252 signal_collections = get_signal_collections_batch(signal_ids)
1254 for signal_id, collection in zip(signal_ids, signal_collections):
1255 signal_data = SignalData.get_from_signal_id(
1256 signal_id=signal_id,
1257 min_timestamp=min_timestamp,
1258 max_timestamp=max_timestamp,
1259 window_min_timestamp=window_min_timestamp,
1260 window_max_timestamp=window_max_timestamp,
1261 interpolate_bounds=interpolate_bounds,
1262 max_documents=max_documents,
1263 collection=collection,
1264 )
1265 data_processing_time += signal_data.data_processing_time
1266 signals_data.append(signal_data)
1267 if signal_data.data_start is not None:
1268 if data_start is None:
1269 data_start = signal_data.data_start
1270 else:
1271 data_start = min(signal_data.data_start, data_start)
1272 if signal_data.data_end is not None:
1273 if data_end is None:
1274 data_end = signal_data.data_end
1275 else:
1276 data_end = max(signal_data.data_end, data_end)
1278 return cls(
1279 signals_data=signals_data,
1280 data_processing_time=data_processing_time,
1281 data_start=data_start,
1282 data_end=data_end,
1283 )
1285 @classmethod
1286 def get_from_phase_and_signal_ids(
1287 cls,
1288 phases: list,
1289 phase_sync_times: list[float | None],
1290 signal_ids: list[str],
1291 window_min_timestamps: list[float | None],
1292 window_max_timestamps: list[float | None],
1293 zero_time_vector: bool = True,
1294 ):
1295 signals_data: list[SignalData] = []
1296 computation_start = time.time()
1298 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip(
1299 phases, phase_sync_times, window_min_timestamps, window_max_timestamps
1300 ):
1301 min_timestamp = phase.start_at / 1000
1302 max_timestamp = phase.end_at / 1000
1304 if sync_time is None:
1305 sync_time = min_timestamp
1307 if window_max_timestamp is not None and window_min_timestamp is not None:
1308 window_length = window_max_timestamp - window_min_timestamp
1310 if window_min_timestamp != min_timestamp:
1311 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20)
1312 if window_max_timestamp != max_timestamp:
1313 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20)
1315 signal_collections = get_signal_collections_batch(signal_ids)
1317 for signal_id, collection in zip(signal_ids, signal_collections):
1318 signal_data = SignalData.get_from_signal_id(
1319 signal_id,
1320 min_timestamp,
1321 max_timestamp,
1322 window_min_timestamp,
1323 window_max_timestamp,
1324 interpolate_bounds=False,
1325 max_documents=None,
1326 collection=collection,
1327 )
1329 if len(signal_data.time_vector) == 0:
1330 continue
1332 if zero_time_vector:
1333 signal_data = signal_data.zero_time_vector(sync_time)
1334 signal_data.phase_id = phase.id
1336 signals_data.append(signal_data)
1338 return cls(
1339 signals_data=signals_data,
1340 data_processing_time=time.time() - computation_start,
1341 data_start=0,
1342 data_end=0,
1343 )
1345 def uniform_desampling(self, number_samples_max: int) -> Self:
1346 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1347 return SignalsData(
1348 signals_data=signals_data,
1349 data_processing_time=sum(s.data_processing_time for s in signals_data),
1350 data_start=self.data_start,
1351 data_end=self.data_end,
1352 )
1354 def min_max_downsampling(self, number_samples_max: int) -> Self:
1355 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data]
1356 return SignalsData(
1357 signals_data=signals_data,
1358 data_processing_time=sum(s.data_processing_time for s in signals_data),
1359 data_start=self.data_start,
1360 data_end=self.data_end,
1361 )
1363 def interest_window_desampling(
1364 self,
1365 window_max_number_samples: int,
1366 outside_max_number_samples: int,
1367 window_min_timestamp: float = None,
1368 window_max_timestamp: float = None,
1369 ) -> Self:
1370 signals_data = [
1371 s.interest_window_desampling(
1372 window_max_number_samples=window_max_number_samples,
1373 outside_max_number_samples=outside_max_number_samples,
1374 window_min_timestamp=window_min_timestamp,
1375 window_max_timestamp=window_max_timestamp,
1376 )
1377 for s in self.signals_data
1378 ]
1380 return SignalsData(
1381 signals_data=signals_data,
1382 data_processing_time=sum(s.data_processing_time for s in signals_data),
1383 data_start=self.data_start,
1384 data_end=self.data_end,
1385 )
1387 def zero_time_vector(self, data_start: float):
1388 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data]
1389 return SignalsData(
1390 signals_data=signals_data,
1391 data_processing_time=sum(s.data_processing_time for s in signals_data),
1392 data_start=0,
1393 data_end=max([s.data_end for s in signals_data]),
1394 )
1396 @classmethod
1397 async def apply_single_function(
1398 cls,
1399 phase,
1400 base_signal_id: str,
1401 function: SINGLE_POST_PROCESSING_FUNCTION,
1402 window_min_timestamp: float = None,
1403 window_max_timestamp: float = None,
1404 ):
1405 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}"
1407 processed_result_signal = Signal.get_from_signal_id(signal_id)
1408 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids:
1409 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp)
1411 signals_data = cls.get_from_phase_and_signal_ids(
1412 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False
1413 )
1415 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0:
1416 return None
1418 new_values = None
1419 new_forced_values = None
1420 time_vector = npy.array(signals_data.signals_data[0].time_vector)
1421 values = signals_data.signals_data[0].values
1422 forced_values = signals_data.signals_data[0].forced_values
1424 match (function):
1425 case "Cumul":
1426 new_values = cumul(values)
1427 new_forced_values = cumul(forced_values)
1428 # case "CumulDistrib":
1429 # new_values = cumul_distrib(values)
1430 # new_forced_values = cumul_distrib(forced_values)
1431 case "Delta":
1432 new_values = delta(values)
1433 new_forced_values = delta(forced_values)
1434 case "DeltaT":
1435 new_values = delta(time_vector)
1436 new_forced_values = new_values
1437 case "Derive":
1438 new_values = derive(time_vector, values)
1439 new_forced_values = derive(time_vector, forced_values)
1440 case "Integ":
1441 new_values = integ(time_vector, values)
1442 new_forced_values = integ(time_vector, forced_values)
1444 new_values = npy.where(npy.isnan(new_values), None, new_values)
1445 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1447 loop = asyncio.get_running_loop()
1448 loop.create_task(
1449 cls.save_function_signal(
1450 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible
1451 )
1452 )
1454 if window_max_timestamp is not None:
1455 max_timestamp_mask = time_vector <= window_max_timestamp
1456 time_vector = time_vector[max_timestamp_mask]
1457 new_values = new_values[max_timestamp_mask]
1458 new_forced_values = new_forced_values[max_timestamp_mask]
1459 if window_min_timestamp is not None:
1460 min_timestamp_mask = time_vector >= window_min_timestamp
1461 time_vector = time_vector[min_timestamp_mask]
1462 new_values = new_values[min_timestamp_mask]
1463 new_forced_values = new_forced_values[min_timestamp_mask]
1465 signals_data.signals_data[0].time_vector = time_vector.tolist()
1466 signals_data.signals_data[0].values = new_values.tolist()
1467 signals_data.signals_data[0].forced_values = new_forced_values.tolist()
1468 signals_data.signals_data[0].number_samples = time_vector.size
1470 signals_data.signals_data[0].signal_id = signal_id
1472 return signals_data
1474 @classmethod
1475 async def apply_multiple_function(
1476 cls,
1477 phases: list,
1478 signal_ids: list,
1479 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
1480 window_min_timestamp: float = None,
1481 window_max_timestamp: float = None,
1482 ):
1483 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION):
1484 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}"
1485 else:
1486 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}"
1488 active_phase = phases[0]
1489 if function in {"Align-X", "Using-X"}:
1490 active_phase = phases[1]
1492 processed_result_signal = Signal.get_from_signal_id(function_signal_id)
1493 if processed_result_signal is not None and (
1494 active_phase.id in processed_result_signal.computed_phases_ids
1495 ): # If signal has been computed for the correct phase
1496 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp)
1498 array_length = None
1499 time_vector_list = []
1500 values_list = []
1501 forced_values_list = []
1502 forcible = True
1503 for phase, signal_id in zip(phases, signal_ids):
1504 signals_data = cls.get_from_phase_and_signal_ids(
1505 [phase], [None], [signal_id], [None], [None], zero_time_vector=False
1506 )
1508 if len(signals_data.signals_data) == 0:
1509 return None
1511 signal_data = signals_data.signals_data[0]
1513 if array_length is None:
1514 array_length = signal_data.number_samples
1515 if (
1516 array_length != signal_data.number_samples and function != "Align-X"
1517 ) or signal_data.number_samples == 0:
1518 return None
1520 time_vector_list.append(npy.array(signal_data.time_vector))
1521 values_list.append(npy.array(signal_data.values, dtype=npy.float64))
1522 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64))
1523 forcible = forcible and signal_data.forcible
1525 time_vector = time_vector_list[0]
1526 new_values = None
1527 new_forced_values = None
1529 match (function):
1530 case "Align-X":
1531 time_vector = time_vector_list[1]
1532 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000
1533 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000
1534 new_values = align_x(old_time_vector, values_list[0], new_time_vector)
1535 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector)
1536 # case "Atan2":
1537 # new_values = atan2(values_list[0], values_list[1])
1538 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1])
1539 case "Using-X":
1540 if len(time_vector_list[0]) != len(time_vector_list[1]):
1541 return None
1542 time_vector = time_vector_list[1]
1543 new_values = values_list[0]
1544 new_forced_values = forced_values_list[0]
1545 case "Mean":
1546 new_values = mean(*values_list)
1547 new_forced_values = mean(*forced_values_list)
1548 case "Norm":
1549 new_values = norm(*values_list)
1550 new_forced_values = norm(*forced_values_list)
1552 new_values = npy.where(npy.isnan(new_values), None, new_values)
1553 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1555 loop = asyncio.get_running_loop()
1556 loop.create_task(
1557 cls.save_function_signal(
1558 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible
1559 )
1560 )
1562 total_number_samples = time_vector.size
1564 if window_max_timestamp is not None:
1565 max_timestamp_mask = time_vector <= window_max_timestamp
1566 time_vector = time_vector[max_timestamp_mask]
1567 new_values = new_values[max_timestamp_mask]
1568 new_forced_values = new_forced_values[max_timestamp_mask]
1569 if window_min_timestamp is not None:
1570 min_timestamp_mask = time_vector >= window_min_timestamp
1571 time_vector = time_vector[min_timestamp_mask]
1572 new_values = new_values[min_timestamp_mask]
1573 new_forced_values = new_forced_values[min_timestamp_mask]
1575 signals_data = cls(
1576 signals_data=[
1577 NumericSignalData(
1578 signal_id=function_signal_id,
1579 forcible=forcible,
1580 time_vector=time_vector.tolist(),
1581 values=new_values.tolist(),
1582 forced_values=new_forced_values.tolist(),
1583 number_samples=time_vector.size,
1584 number_samples_db=total_number_samples,
1585 )
1586 ],
1587 data_processing_time=0,
1588 data_start=0,
1589 data_end=0,
1590 )
1592 return signals_data
1594 @classmethod
1595 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float):
1596 signal_data_collection = get_signal_collection(signal_id, create=True)
1597 pipeline = []
1598 match_filter = {}
1599 if window_min_timestamp is not None or window_max_timestamp is not None:
1600 match_filter["$match"] = {}
1601 match_filter["$match"]["precise_timestamp"] = {}
1602 if window_max_timestamp is not None:
1603 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp
1604 if window_min_timestamp is not None:
1605 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp
1607 total_number_samples = signal_data_collection.count_documents({})
1609 if match_filter:
1610 pipeline.append(match_filter)
1612 fetch_start = time.time()
1614 samples = signal_data_collection.aggregate(pipeline).to_list()
1615 new_time_vector = []
1616 new_values = []
1617 new_forced_values = []
1618 for sample in samples:
1619 new_time_vector.append(sample["precise_timestamp"])
1620 new_values.append(sample["value"])
1621 new_forced_values.append(sample["forced_value"])
1623 return cls(
1624 signals_data=[
1625 NumericSignalData(
1626 signal_id=signal_id,
1627 time_vector=new_time_vector,
1628 values=new_values,
1629 forced_values=new_forced_values,
1630 number_samples=len(new_time_vector),
1631 number_samples_db=total_number_samples,
1632 )
1633 ],
1634 data_processing_time=time.time() - fetch_start,
1635 data_start=0,
1636 data_end=0,
1637 )
1639 @classmethod
1640 async def save_function_signal(
1641 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool
1642 ):
1643 # Insert data first so if it is requested by another user, it will be computed again
1644 signal_collection = get_signal_collection(function_signal_id, create=True)
1645 signal_collection.delete_many(
1646 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}}
1647 )
1648 signal_collection.insert_many(
1649 [
1650 {
1651 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]),
1652 "precise_timestamp": time_vector[i],
1653 "value": new_values[i],
1654 "forced_value": new_forced_values[i],
1655 }
1656 for i in range(len(time_vector))
1657 ]
1658 )
1660 signals_config_collection = get_collection(systems_database, "signals", create=True)
1661 signals_config_collection.find_one_and_update(
1662 {"signal_id": function_signal_id},
1663 {
1664 "$set": {
1665 "description": "",
1666 "unit": None,
1667 "type": "sensor",
1668 "address": None,
1669 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0
1670 "transfer_function": None,
1671 "precision_digits": None,
1672 "digitization_function": None,
1673 "data_type": "float",
1674 "formula": None,
1675 "forcible": forcible,
1676 "commandable": False,
1677 "broadcastable": True,
1678 "signal_id": function_signal_id,
1679 "post_processing": True,
1680 },
1681 "$push": {"computed_phases_ids": phase.id},
1682 },
1683 upsert=True,
1684 )
1686 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = None):
1687 if phase_ids is None:
1688 phase_ids = []
1689 if post_processing:
1690 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1691 zip_buffer = io.BytesIO()
1692 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1693 for signal_data in self.signals_data:
1694 file_name = signal_data.signal_id
1695 if post_processing:
1696 phase = phases_by_id.get(
1697 signal_data.phase_id,
1698 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1699 )
1700 file_name = f"{signal_data.signal_id} ({phase.name})"
1701 if file_format == "csv":
1702 export_io = signal_data.csv_export()
1703 zip_file.writestr(f"{file_name}.csv", export_io)
1704 elif file_format == "prestoplot":
1705 export_io = signal_data.prestoplot_export()
1706 zip_file.writestr(f"{file_name}.tab", export_io)
1707 else:
1708 raise ValueError(f"Format not found. Got: {file_format}")
1709 zip_bytes = zip_buffer.getvalue()
1710 return zip_bytes
1712 def hdf5_export(self, post_processing: bool = False, phase_ids: list = None):
1713 if phase_ids is None:
1714 phase_ids = []
1715 if post_processing:
1716 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1717 hdf5_buffer = io.BytesIO()
1718 custom_type_float = npy.dtype(
1719 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1720 )
1721 custom_type_string = npy.dtype(
1722 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1723 )
1724 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1725 for signal_data in self.signals_data:
1726 if post_processing:
1727 phase = phases_by_id.get(
1728 signal_data.phase_id,
1729 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1730 )
1731 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})")
1732 else:
1733 signal_group = hdf5_file.create_group(signal_data.signal_id)
1734 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1735 if signal_data.data_type == "str":
1736 export_data = npy.array(
1737 list(
1738 zip(
1739 date_vector,
1740 signal_data.time_vector,
1741 signal_data.values,
1742 signal_data.forced_values,
1743 )
1744 ),
1745 dtype=custom_type_string,
1746 )
1747 else:
1748 export_data = npy.array(
1749 list(
1750 zip(
1751 date_vector,
1752 signal_data.time_vector,
1753 signal_data.values,
1754 signal_data.forced_values,
1755 )
1756 ),
1757 dtype=custom_type_float,
1758 )
1759 signal_group["data"] = export_data
1760 return hdf5_buffer.getvalue()
1763class SignalStatus(TwinPadModel):
1764 status: str = "down"
1765 reason: str = ""
1766 delay: float | None = None
1769class DigitizationFunction(TwinPadModel):
1770 bits: int | None = None
1771 min_value: float
1772 max_value: float
1773 min_raw_value: float
1774 max_raw_value: float
1777class SignalUpdate(TwinPadModel):
1778 value: float | str | bool | int | None = None
1779 forced_value: float | str | bool | int | None = None
1780 timestamp: int | None = None
1783class SignalType(str, Enum):
1784 command = "command"
1785 sensor = "sensor"
1786 external_sensor = "external_sensor"
1789SIGNALDATA_TYPES = {
1790 "int": NumericSignalData,
1791 "float": NumericSignalData,
1792 "str": StringSignalData,
1793 "bool": NumericSignalData,
1794 "epoch": NumericSignalData,
1795}
1798class TransferFunction(TwinPadModel):
1799 intervals: list[list[float]]
1802class Signal(GenericMongo):
1803 collection_name: ClassVar[str] = "signals"
1805 signal_id: str
1806 frequency: float
1807 unit: str | None
1808 description: str
1809 type: SignalType
1810 data_type: str
1811 transfer_function: TransferFunction | None = None
1812 precision_digits: int | None
1813 forcible: bool
1814 commandable: bool
1815 broadcastable: bool
1816 status: SignalStatus = SignalStatus()
1818 post_processing: bool = False
1819 computed_phases_ids: list[str] = []
1821 digitization_function: DigitizationFunction | None
1823 @property
1824 def device(self) -> Device:
1825 device_id = self.signal_id.split(".")[0]
1826 device = Device.get_one_by_attribute("device_id", device_id)
1827 return device
1829 @cached_property
1830 def signal_data_class(self):
1831 if self.data_type in SIGNALDATA_TYPES:
1832 return SIGNALDATA_TYPES[self.data_type]
1833 if self.data_type.startswith("enum"):
1834 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1835 raise ValueError(f"Unhandled python type: {self.data_type}")
1837 @cached_property
1838 def python_type(self):
1839 if self.data_type in TYPES:
1840 return TYPES[self.data_type]
1841 if self.data_type.startswith("enum"):
1842 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1843 return Literal[*choices]
1844 raise ValueError(f"Unhandled python type: {self.data_type}")
1846 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1847 command = Command(
1848 sent_at=time.time(),
1849 command_type="Signal command",
1850 user_id=current_user.id,
1851 )
1853 has_input_error = False
1854 error_message = ""
1856 if self.data_type.startswith("enum"):
1857 enum_options = get_args(self.python_type)
1859 if update_dict.value is not None and update_dict.value not in enum_options:
1860 has_input_error = True
1861 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1862 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1863 has_input_error = True
1864 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1865 else:
1866 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1867 has_input_error = True
1868 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1869 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1870 has_input_error = True
1871 error_message += (
1872 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1873 )
1874 if (
1875 self.transfer_function is not None
1876 and update_dict.value is not None
1877 and (
1878 (min_value := self.transfer_function.intervals[0][1]) > update_dict.value
1879 or update_dict.value > (max_value := self.transfer_function.intervals[1][1])
1880 )
1881 ):
1882 has_input_error = True
1883 error_message += (
1884 f"Impossible value: {update_dict.value} is not within the transfer function's bounds"
1885 f"([{min_value}, {max_value}])\n"
1886 )
1888 if has_input_error:
1889 command.response_time = 0
1890 command.succeeded = False
1891 command.description = f"Tried to modify signal {self.signal_id}"
1892 response = {"error": True, "status_code": 400, "message": error_message}
1893 else:
1894 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict)
1895 command.receive_response(response)
1897 Command.create(command)
1898 return response
1900 @classmethod
1901 def get_from_signal_id(cls, signal_id) -> Self:
1902 """Could be generic from mongo"""
1903 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1904 if not raw_value:
1905 return None
1906 del raw_value["_id"]
1907 return cls.dict_to_object(raw_value)
1909 @classmethod
1910 def get_all_ids(cls) -> list[str]:
1911 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1913 return [signal["signal_id"] for signal in cursor]
1915 @classmethod
1916 def get_all_statuses(cls) -> list[dict]:
1917 cursor = cls.collection().aggregate(
1918 [{"$project": {"signal_id": 1, "status": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}]
1919 )
1921 return [
1922 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]}
1923 for signal in cursor
1924 ]
1926 async def number_samples(self):
1927 collection = get_signal_collection(signal_id=self.signal_id)
1928 if collection is None:
1929 return 0
1931 number_samples = collection.estimated_document_count()
1933 number_samples_async_collection = await get_async_collection(
1934 systems_async_database, "number_samples", create=True, time_series=True
1935 )
1937 loop = asyncio.get_running_loop()
1938 loop.create_task(
1939 number_samples_async_collection.insert_one(
1940 {
1941 "timestamp": datetime.datetime.now(pytz.UTC),
1942 "signal_id": self.signal_id,
1943 "number_samples": number_samples,
1944 }
1945 )
1946 )
1948 return number_samples
1950 @classmethod
1951 def total_number_samples(cls) -> int:
1952 TwinPadActivity.get_number_samples_timeframe(0, 0, False)
1953 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1955 if number_samples_collection is None:
1956 return 0
1958 result = number_samples_collection.aggregate(
1959 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}]
1960 )
1962 result = result.to_list()
1963 if len(result) == 0:
1964 return 0
1965 return result[0]["amount"]
1967 def sample_datasize(self):
1968 return signals_database.command("collstats", self.signal_id)["size"]
1970 @classmethod
1971 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1972 result = cls.collection().aggregate(
1973 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1974 )
1976 return {signal["signal_id"]: signal["forcible"] for signal in result}
1979class ForcedSignal(GenericMongo):
1980 collection_name: ClassVar[str] = "forced_signals"
1982 signal_id: str
1983 forcing_user_id: str
1984 forced_at: float
1985 value: str | float
1987 def insert(self):
1988 insert_result = self.collection().find_one_and_update(
1989 {"signal_id": self.signal_id},
1990 {"$set": self.to_dict(exclude={"id"})},
1991 upsert=True,
1992 return_document=ReturnDocument.AFTER,
1993 )
1994 self.id = str(insert_result["_id"])
1995 return self.id
1997 @classmethod
1998 def can_force(cls, signal_id: str, current_user: User) -> bool:
1999 """Checks whether user can force a given signal.
2001 :param signal_id: Signal ID of the signal to force
2002 :type signal_id: str
2003 :param current_user: Current user
2004 :type current_user: User
2005 :return: False if the signal was forced by someone else than the user, True otherwise
2006 :rtype: bool
2007 """
2008 forced_signal = cls.get_one_by_attribute("signal_id", signal_id)
2009 if forced_signal is not None:
2010 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
2011 return False
2012 return True
2015class ServicesStatus(TwinPadModel):
2016 backend: str
2017 cloud_broker: str
2018 time_series_database: str
2019 signal_storage: str
2020 heartbeat_storage: str
2021 data_analyzer: str
2023 @classmethod
2024 def check(cls) -> Self:
2025 return cls(
2026 cloud_broker=ping(RABBITMQ_HOST),
2027 backend="up",
2028 time_series_database=ping(MONGO_HOST),
2029 signal_storage=ping(SIGNAL_STORAGE_HOST),
2030 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
2031 data_analyzer=ping(DATA_ANALYZER_HOST),
2032 )
2035def ping(host):
2036 try:
2037 if ping3.ping(host, timeout=0.8):
2038 return "up"
2039 except PermissionError:
2040 pass
2041 return "down"
2044class Event(GenericMongo):
2045 collection_name: ClassVar[str] = "events"
2047 name: str
2048 timestamp: float
2049 event_rule_id: str
2051 @computed_field
2052 @cached_property
2053 def event_rule(self) -> "EventRule":
2054 return EventRule.get_from_id(self.event_rule_id)
2056 @classmethod
2057 def dict_to_object(cls, dict_):
2058 """Refine to convert timestamp to datetime for mongodb."""
2059 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
2060 return super().dict_to_object(dict_)
2063class TwinPadActivity(GenericMongo):
2064 timestamp: float
2065 amount: int
2067 @classmethod
2068 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
2069 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2070 number_events_collection = get_collection(systems_database, "number_events")
2071 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
2072 items = []
2073 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2074 if number_events_collection is None or recompute_amount:
2075 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
2076 number_events_collection.delete_many({})
2077 first_event = events_collection.find_one(sort={"timestamp": 1})
2078 if first_event is None:
2079 return items
2080 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
2081 tzinfo=pytz.UTC
2082 )
2083 while last_computed_day < TODAY:
2084 day_nb_events = events_collection.count_documents(
2085 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2086 )
2087 if day_nb_events > 0:
2088 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
2089 last_computed_day += ONE_DAY_OFFSET
2090 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
2091 if number_events_today > 0:
2092 number_events_collection.delete_many({"timestamp": TODAY})
2093 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
2094 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2095 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2096 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2097 for day in number_events:
2098 day["timestamp"] = day["timestamp"].timestamp()
2099 items.append(cls.mongo_dict_to_object(day))
2100 return items
2102 @classmethod
2103 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2104 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2105 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
2106 signals_number_samples_collection = get_collection(
2107 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
2108 )
2109 items = []
2110 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2111 if number_samples_collection is None or recompute_amount:
2112 number_samples_collection = get_collection(
2113 systems_database, "number_received_samples", create=True, time_series=True
2114 )
2115 number_samples_collection.delete_many({})
2116 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
2117 if first_sample is None:
2118 return items
2119 # compute from day of first found event
2120 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
2121 tzinfo=pytz.UTC
2122 )
2123 while last_computed_day < TODAY:
2124 number_samples_request = signals_number_samples_collection.aggregate(
2125 [
2126 {
2127 "$match": {
2128 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
2129 }
2130 },
2131 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2132 ]
2133 ).to_list()
2134 if len(number_samples_request) == 0:
2135 number_samples = 0
2136 else:
2137 number_samples = number_samples_request[0].get("number_samples", 0)
2138 if number_samples > 0:
2139 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
2140 last_computed_day += ONE_DAY_OFFSET
2141 number_samples_request = signals_number_samples_collection.aggregate(
2142 [
2143 {"$match": {"timestamp": {"$gte": TODAY}}},
2144 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2145 ]
2146 ).to_list()
2147 if len(number_samples_request) == 0:
2148 number_samples_today = 0
2149 else:
2150 number_samples_today = number_samples_request[0].get("number_samples", 0)
2151 if number_samples_today > 0:
2152 number_samples_collection.delete_many({"timestamp": TODAY})
2153 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
2154 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2155 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2156 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2157 for day in number_events:
2158 day["timestamp"] = day["timestamp"].timestamp()
2159 items.append(cls.mongo_dict_to_object(day))
2160 return items
2162 @classmethod
2163 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2164 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2165 number_commands_collection = get_collection(systems_database, "number_commands")
2166 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
2167 items = []
2168 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2169 if number_commands_collection is None or recompute_amount:
2170 number_commands_collection = get_collection(
2171 systems_database, "number_commands", create=True, time_series=True
2172 )
2173 number_commands_collection.delete_many({})
2174 first_command = commands_collection.find_one(sort={"timestamp": 1})
2175 if first_command is None:
2176 return items
2177 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
2178 tzinfo=pytz.UTC
2179 )
2180 while last_computed_day < TODAY:
2181 day_nb_commands = commands_collection.count_documents(
2182 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2183 )
2184 if day_nb_commands > 0:
2185 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
2186 last_computed_day += ONE_DAY_OFFSET
2187 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
2188 if number_commands_today > 0:
2189 number_commands_collection.delete_many({"timestamp": TODAY})
2190 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
2191 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2192 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2193 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2194 for day in number_commands:
2195 day["timestamp"] = day["timestamp"].timestamp()
2196 items.append(cls.mongo_dict_to_object(day))
2197 return items
2200class EventRule(GenericMongo):
2201 collection_name: ClassVar[str] = "event_rules"
2203 name: str
2204 formula: str
2205 variables: list[str]
2207 @computed_field
2208 @cached_property
2209 def number_events(self) -> int:
2210 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
2213class Company(GenericMongo):
2214 collection_name: ClassVar[str] = "companies"
2215 name: str
2218class Campaign(GenericMongo):
2219 collection_name: ClassVar[str] = "campaigns"
2221 # Properties
2222 id: str | None = None
2223 name: str
2224 description: str | None = None
2227class Phase(GenericMongo):
2228 collection_name: ClassVar[str] = "phases"
2230 # Properties
2231 id: str | None = None
2232 name: str
2233 description: str | None = None
2234 start_at: float
2235 end_at: float
2237 # FK
2238 campaign_id: MongoId
2240 @classmethod
2241 def deleteMany(cls, campaign_id):
2242 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
2243 return delete_phases
2246class CustomViewCreation(GenericMongo):
2247 collection_name: ClassVar[str] = "custom_views"
2249 name: str
2250 configuration: list
2253class CustomView(CustomViewCreation):
2254 # Properties
2255 id: str | None = None
2257 # Foreign Key
2258 user_id: str
2261CustomViewUpdate = create_update_model(CustomView)
2264class Video(GenericMongo):
2265 collection_name: ClassVar[str] = "videos"
2267 # Properties
2268 name: str
2269 ip_addr: str
2270 username: str | None = None
2271 password: str | None = None
2273 # Methods
2274 @classmethod
2275 def get_all(cls, sort_by="_id") -> list[Self]:
2276 items = []
2277 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
2278 items.append(cls.mongo_dict_to_object(dict_))
2279 return items
2281 @classmethod
2282 def get_video(cls, camera_id: ObjectId):
2283 camera = cls.get_from_id(camera_id)
2284 if camera is not None:
2285 return camera.name
2286 return None
2289class Command(GenericMongo):
2290 collection_name: ClassVar[str] = "commands"
2292 # Properties
2293 timestamp: datetime.datetime = None
2294 sent_at: float
2295 response_time: float = 0.0
2296 command_type: str
2297 description: str = ""
2298 succeeded: bool = False
2300 # Foreign key
2301 user_id: str
2303 @classmethod
2304 def collection(cls):
2305 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
2307 @classmethod
2308 def create(cls, command: Self):
2309 command = cls(
2310 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
2311 sent_at=command.sent_at,
2312 response_time=command.response_time,
2313 command_type=command.command_type,
2314 description=command.description,
2315 succeeded=command.succeeded,
2316 user_id=command.user_id,
2317 )
2318 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
2319 if new_command is None:
2320 return None
2321 return {"command_id": str(new_command.inserted_id)}
2323 def receive_response(self, response: dict):
2324 self.response_time = time.time() - self.sent_at
2325 self.succeeded = response.get("error", True) is False
2326 if self.description == "":
2327 self.description += response.get("message", "").rstrip()
2330class SignalsPresetCreation(GenericMongo):
2331 name: str
2332 signal_ids: list[str]
2335class SignalsPreset(SignalsPresetCreation):
2336 collection_name: ClassVar[str] = "signals_presets"
2338 user_id: str
2340 @classmethod
2341 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
2342 signals_preset = cls(
2343 user_id=user_id,
2344 name=signals_preset.name,
2345 signal_ids=signals_preset.signal_ids,
2346 )
2348 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
2350 return str(new_signal_preset.inserted_id)
2353SignalsPresetUpdate = create_update_model(SignalsPreset)
2356class LineStyle(str, Enum):
2357 solid = "solid"
2358 dotted = "dotted"
2359 dashed = "dashed"
2362class SignalAppearance:
2363 value_color: str
2364 forced_value_color: str
2367class GraphThemeCreation(GenericMongo):
2368 collection_name: ClassVar[str] = "graph_themes"
2370 name: str
2371 signal_id: str
2372 value_color: str = ""
2373 forced_value_color: str = ""
2374 value_line_style: LineStyle = LineStyle.solid
2375 forced_value_line_style: LineStyle = LineStyle.solid
2376 private: bool = True
2379class PublicGraphTheme(GraphThemeCreation):
2380 created_by_user: bool
2381 in_user_library: bool
2382 active_for_user: bool
2384 _current_user_id: str = ""
2386 @classproperty
2387 def custom_pipeline_steps(cls) -> dict[str, list]: # pylint: disable=no-self-argument
2388 return {
2389 "created_by_user": [
2390 {
2391 "$addFields": {
2392 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
2393 }
2394 }
2395 ],
2396 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
2397 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
2398 ],
2399 "in_user_library": [
2400 {
2401 "$addFields": {
2402 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2403 }
2404 }
2405 ],
2406 "active_for_user": [
2407 {
2408 "$addFields": {
2409 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2410 }
2411 }
2412 ],
2413 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2414 "active": [
2415 {
2416 "$addFields": {
2417 "active": "$$REMOVE",
2418 }
2419 }
2420 ],
2421 "creator_id": [
2422 {
2423 "$addFields": {
2424 "creator_id": "$$REMOVE",
2425 }
2426 }
2427 ],
2428 }
2430 @classmethod
2431 def response_from_query(cls, query, user_id: str = None) -> ListResponse[Self]:
2432 if user_id is None:
2433 return ListResponse(items=[], limit=0, offset=0, sort_by="", total=0)
2434 cls._current_user_id = user_id
2435 return super().response_from_query(query)
2437 @classmethod
2438 def response_from_query_in_user_library(cls, query, user_id: str = None) -> ListResponse[Self]:
2439 if user_id is None:
2440 return ListResponse(items=[], limit=0, offset=0, sort_by="", total=0)
2441 query.in_user_library = "true"
2442 return cls.response_from_query(query, user_id)
2444 @classmethod
2445 def get_from_id(cls, item_id, user_id: str = None) -> Self | None:
2446 if user_id is None:
2447 return None
2448 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2450 @classmethod
2451 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str = None):
2452 if user_id is None:
2453 return None
2454 cls._current_user_id = user_id
2455 return super().get_by_attribute(attribute_name, attribute_value)
2457 @classmethod
2458 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str = None):
2459 if user_id is None:
2460 return None
2461 cls._current_user_id = user_id
2462 return super().get_one_by_attribute(attribute_name, attribute_value)
2464 @classmethod
2465 def get_all(cls, sort_by: str, user_id: str = None):
2466 if user_id is None:
2467 return []
2468 cls._current_user_id = user_id
2469 return super().get_all(sort_by)
2471 @classmethod
2472 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2473 pipeline = [
2474 {
2475 "$match": {
2476 "active": {"$eq": user_id},
2477 "signal_id": {"$in": signal_ids},
2478 }
2479 },
2480 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2481 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2482 {
2483 "$project": {
2484 "_id": 0,
2485 "signal_id": 1,
2486 "value_color": 1,
2487 "forced_value_color": 1,
2488 "value_line_style": 1,
2489 "forced_value_line_style": 1,
2490 }
2491 },
2492 ]
2494 result = {}
2496 cursor = cls.collection().aggregate(pipeline)
2497 for document in cursor:
2498 signal_id = document["signal_id"]
2499 del document["signal_id"]
2500 result[signal_id] = document
2502 return result
2505GraphThemeUpdate = create_update_model(PublicGraphTheme)
2508class PrivateGraphTheme(GraphThemeCreation):
2509 # private
2510 creator_id: str
2511 in_library: list[str]
2512 active: list[str]
2514 @classmethod
2515 def create(
2516 cls,
2517 creator_id: str,
2518 name: str,
2519 signal_id: str,
2520 value_color: str,
2521 forced_value_color: str,
2522 value_line_style: LineStyle,
2523 forced_value_line_style: LineStyle,
2524 private: bool,
2525 ):
2526 color_setting = cls(
2527 creator_id=creator_id,
2528 name=name,
2529 signal_id=signal_id,
2530 value_color=value_color,
2531 forced_value_color=forced_value_color,
2532 value_line_style=value_line_style,
2533 forced_value_line_style=forced_value_line_style,
2534 private=private,
2535 in_library=[creator_id],
2536 active=[creator_id],
2537 )
2539 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2540 color_setting.id = str(new_color_setting.inserted_id)
2541 return color_setting
2543 def update(self, update_dict: dict, user_id: str = None):
2544 if user_id is None:
2545 return {}
2546 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2547 if in_user_lib and user_id not in self.in_library:
2548 self.in_library.append(user_id)
2549 elif not in_user_lib and user_id in self.in_library:
2550 self.in_library.remove(user_id)
2551 update_dict["in_library"] = self.in_library
2552 del update_dict["in_user_library"]
2554 if (active_for_user := update_dict.get("active_for_user")) is not None:
2555 if active_for_user and user_id not in self.active:
2556 self.active.append(user_id)
2557 elif not active_for_user and user_id in self.active:
2558 self.active.remove(user_id)
2559 update_dict["active"] = self.active
2560 del update_dict["active_for_user"]
2562 if update_dict.get("created_by_user") is not None:
2563 del update_dict["created_by_user"]
2565 self.collection().find_one_and_update(
2566 {"_id": ObjectId(self.id)},
2567 {"$set": update_dict},
2568 )
2570 return {}
2573class DeviceStatus(str, Enum):
2574 started = "started"
2575 running = "running"
2576 created = "created"
2577 exited = "exited"
2578 restarting = "restarting"
2581class DeviceUpdateFromDeployer(BaseModel):
2582 status: DeviceStatus
2585class DeviceFromDeployerCreation(BaseModel):
2586 name: str
2587 description: str
2590class DeviceFromDeployer(DeviceFromDeployerCreation):
2591 status: DeviceStatus
2592 device_id: DeviceId
2593 logs: str = ""
2596class DeviceDeployer(GenericMongo):
2597 collection_name: ClassVar[str] = "device_deployers"
2598 url: HttpUrl
2600 def endpoint_url(self, endpoint):
2601 return f"{str(self.url).rstrip('/')}/{endpoint}"
2603 def devices(self) -> list[DeviceFromDeployer]:
2604 devices = []
2605 try:
2606 response = requests.get(self.endpoint_url("devices"))
2607 except requests.exceptions.ConnectionError:
2608 logger.info("connection error")
2609 return None
2610 if response.status_code != 200:
2611 return None
2612 for device_dict in response.json()["devices"]:
2613 devices.append(
2614 DeviceFromDeployer(
2615 device_id=device_dict["device_id"],
2616 name=device_dict["container_name"],
2617 description="desc",
2618 status=device_dict["status"],
2619 logs=device_dict["logs"],
2620 )
2621 )
2622 return devices
2624 def get_device(self, device_id: DeviceId):
2625 try:
2626 response = requests.get(self.endpoint_url(f"devices/{device_id}"))
2627 except requests.exceptions.ConnectionError:
2628 return None
2629 if response.status_code != 200:
2630 return None
2631 device_dict = response.json()
2632 return DeviceFromDeployer(
2633 device_id=device_dict["device_id"],
2634 name=device_dict["container_name"],
2635 description="desc",
2636 status=device_dict["status"],
2637 logs=device_dict["logs"],
2638 )
2640 def create_device(self, device: DeviceFromDeployer) -> Device | None:
2641 try:
2642 response = requests.post(self.endpoint_url("devices"), json={"name": device.name})
2643 except requests.exceptions.ConnectionError:
2644 return None
2646 if response.status_code != 201:
2647 return None
2649 device_dict = response.json()
2650 return DeviceFromDeployer(
2651 device_id=device_dict["device_id"],
2652 name="",
2653 description="desc",
2654 status=device_dict["status"],
2655 )
2657 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None:
2658 try:
2659 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump())
2660 except requests.exceptions.ConnectionError:
2661 return None
2663 if response.status_code != 200:
2664 return None
2666 device_dict = response.json()
2667 return Device(
2668 device_id=device_dict["device_id"],
2669 name="",
2670 description="desc",
2671 pid={},
2672 petri_network={},
2673 modes=[],
2674 status=device_dict["status"],
2675 )
2677 def delete_device(self, device_id: DeviceId) -> DeleteInfo:
2678 try:
2679 response = requests.delete(self.endpoint_url(f"devices/{device_id}"))
2680 except requests.exceptions.ConnectionError:
2681 return DeleteInfo(is_deleted=False, detail="Connection to deployer error")
2682 if response.status_code not in [200, 202, 204]:
2683 return DeleteInfo(is_deleted=False, detail=response.text)
2685 return DeleteInfo(is_deleted=True, detail="")
2688DeviceDeployerUpdate = create_update_model(DeviceDeployer)