Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%
1368 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 09:57 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 09:57 +0000
1from functools import cached_property
2import os
3import re
4import io
5import time
6import csv
7from typing import Self, ClassVar, Any, Literal, get_args, Annotated
8import datetime
9import math
10import bisect
11from enum import Enum
12import logging
13import copy
14import asyncio
15import requests
17import zipfile
18import ping3
19import pytz
20from bson.objectid import ObjectId
21from pymongo import ASCENDING, ReturnDocument
22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator
23import numpy as npy
24import lttb
25import h5py
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 get_signal_collections_batch,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 signals_async_database,
36 devices_states_database,
37)
38from twinpad_backend.responses import ListResponse
39from twinpad_backend.messages import RabbitMQClient
40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm
42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"]
44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"]
45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"]
48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
55NUMBER_SAMPLES_DATABASE_UPDATE = 120
57logger = logging.getLogger("uvicorn.error")
60class DeleteInfo(BaseModel):
61 is_deleted: bool
62 detail: str
65class classproperty:
66 """
67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
68 Found here: https://stackoverflow.com/a/76301341
69 """
71 def __init__(self, func):
72 self.fget = func
74 def __get__(self, _, owner):
75 return self.fget(owner)
78def create_update_model(model):
79 fields = {}
81 for field_name, field_annotation in model.model_fields.items():
82 if field_name != "id":
83 fields[field_name] = (field_annotation.annotation | None, None)
85 query_name = model.__name__ + "Update"
86 return create_model(query_name, **fields)
89def get_utc_date_from_timestamp(timestamp: float):
90 return datetime.datetime.fromtimestamp(timestamp).isoformat()
93def downsample_list(time_vector: list, values: list, max_number_samples: int):
94 if len(time_vector) < max_number_samples:
95 return time_vector, values
97 time_vector_copy = copy.deepcopy(time_vector)
98 values_copy = copy.deepcopy(values)
100 none_group_bounds = []
101 none_group_index = -1
102 index = -1
103 # LTTB doesn't handle None values so remove them
104 while values_copy.count(None) > 0:
105 # Store bounds of None value groups so we can insert them back after the downsampling
106 if (new_index := values_copy.index(None)) != index:
107 none_group_bounds.append([time_vector_copy.pop(new_index)])
108 none_group_index += 1
109 elif len(none_group_bounds[none_group_index]) < 2:
110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
111 else:
112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
113 values_copy.pop(new_index)
114 index = new_index
115 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
117 try:
118 values_array = npy.array([time_vector_copy, values_copy]).T
119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
121 new_time_vector = interpolated_values[:, 0].tolist()
122 new_values = interpolated_values[:, 1].tolist()
123 except ValueError:
124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
128 return new_time_vector, new_values_nan_to_none
130 # insert back None values at the correct timestamps
131 for none_group in none_group_bounds:
132 start_index = npy.searchsorted(new_time_vector, none_group[0])
133 new_time_vector[start_index:start_index] = none_group
134 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
136 return new_time_vector, new_values
139def is_of_type(value, wanted_type):
140 if wanted_type is float:
141 return isinstance(value, (int, float))
142 return isinstance(value, wanted_type)
145# Models
146class TwinPadModel(BaseModel):
147 @classmethod
148 def dict_to_object(cls, dict_):
149 return cls.model_validate(dict_)
151 def to_dict(self, exclude=None):
152 dict_ = self.model_dump(exclude=exclude, mode="json")
153 return dict_
156def validate_mongo_id(v):
157 if not ObjectId.is_valid(v):
158 raise ValueError("Invalid MongoDB id")
159 return str(v)
162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)]
165def validate_12_hex(v: str) -> str:
166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v):
167 raise ValueError("ID must be a 12-character hexadecimal string")
168 return v
171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)]
174class GenericMongo(TwinPadModel):
175 id: MongoId | None = None
176 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
178 @classmethod
179 def collection(cls):
180 return get_collection(systems_database, cls.collection_name, create=True)
182 @classmethod
183 def response_from_query(cls, query) -> ListResponse[Self]:
184 request_filters = query.mongodb_filter()
185 items = []
187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
188 sort_dict = {}
189 for sort in query.sort_by.split(","):
190 if ":" in sort:
191 sort_field, sort_order = sort.split(":")
192 sort_order = int(sort_order)
193 else:
194 sort_field = sort
195 sort_order = 1
196 sort_dict[sort_field] = sort_order
198 collection = get_collection(systems_database, cls.collection_name, create=True)
199 total = collection.count_documents(request_filters)
201 pipeline = []
202 added_properties = []
203 if "$and" in request_filters:
204 for request_filter in request_filters["$and"]:
205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
206 if filtered_property in request_filter:
207 pipeline.extend(pipeline_steps)
208 added_properties.append(filtered_property)
209 else:
210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
211 if filtered_property in request_filters:
212 pipeline.extend(pipeline_steps)
213 added_properties.append(filtered_property)
214 pipeline.append({"$match": request_filters})
216 for sort_field in sort_dict.keys():
217 if sort_field in cls.custom_pipeline_steps:
218 pipeline.extend(cls.custom_pipeline_steps[sort_field])
219 added_properties.append(sort_field)
220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
222 if (query.limit is not None) and (query.limit != 0):
223 pipeline.append({"$limit": query.limit})
225 for filtered_property, step in cls.custom_pipeline_steps.items():
226 if filtered_property not in added_properties:
227 pipeline.extend(step)
229 cursor = collection.aggregate(pipeline)
231 for item_dict in cursor:
232 items.append(cls.mongo_dict_to_object(item_dict))
234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
236 @classmethod
237 def get_from_id(cls, item_id) -> Self | None:
238 return cls.get_one_by_attribute("_id", ObjectId(item_id))
240 @classmethod
241 def mongo_dict_to_object(cls, mongo_dict):
242 mongo_dict["id"] = str(mongo_dict["_id"])
243 del mongo_dict["_id"]
244 return cls.dict_to_object(mongo_dict)
246 @classmethod
247 def get_by_attribute(cls, attribute_name: str, attribute_value):
248 """Returns all items that match the attribute with value."""
249 pipeline = []
250 if attribute_name in cls.custom_pipeline_steps:
251 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
252 pipeline.append({"$match": {attribute_name: attribute_value}})
253 for key, step in cls.custom_pipeline_steps.items():
254 if key != attribute_name:
255 pipeline.extend(step)
256 items = cls.collection().aggregate(pipeline)
257 if items is None:
258 return None
259 return [cls.mongo_dict_to_object(d) for d in items]
261 @classmethod
262 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
263 pipeline = []
264 if attribute_name in cls.custom_pipeline_steps:
265 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
266 pipeline.append({"$match": {attribute_name: attribute_value}})
267 pipeline.append({"$limit": 1})
268 for key, step in cls.custom_pipeline_steps.items():
269 if key != attribute_name:
270 pipeline.extend(step)
271 items = cls.collection().aggregate(pipeline).to_list()
272 if len(items) == 0:
273 return None
274 return cls.mongo_dict_to_object(items[0])
276 @classmethod
277 def get_all(cls, sort_by="_id") -> list[Self]:
278 items = []
279 pipeline = []
280 if sort_by in cls.custom_pipeline_steps:
281 pipeline.extend(cls.custom_pipeline_steps[sort_by])
282 pipeline.append({"$sort": {sort_by: ASCENDING}})
283 for key, step in cls.custom_pipeline_steps.items():
284 if key != sort_by:
285 pipeline.extend(step)
286 for dict_ in cls.collection().aggregate(pipeline):
287 items.append(cls.mongo_dict_to_object(dict_))
288 return items
290 @classmethod
291 def get_number_documents(cls):
292 collection = get_collection(systems_database, cls.collection_name)
293 if collection is None:
294 return 0
295 return collection.count_documents(
296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]}
297 )
299 def insert(self):
300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"}))
301 self.id = str(insert_result.inserted_id)
302 return self.id
304 def update(self, update_dict):
305 for key, value in update_dict.items():
306 setattr(self, key, value)
307 self.collection().find_one_and_update(
308 {"_id": ObjectId(self.id)},
309 {"$set": update_dict},
310 return_document=ReturnDocument.AFTER,
311 )
313 return self
315 def delete(self):
316 result = self.collection().delete_one({"_id": ObjectId(self.id)})
317 return result.deleted_count > 0
320class User(GenericMongo):
321 collection_name: ClassVar[str] = "users"
323 firstname: str
324 lastname: str
325 email: str
326 password: str
327 is_active: bool | None = False
328 is_admin: bool | None = False
329 is_connected: bool | None = False
330 company_id: str | None = None
332 def to_dict(self, exclude: set = set()):
333 exclude.add("password")
334 return GenericMongo.to_dict(self, exclude=exclude)
336 @classmethod
337 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
338 users = cls.get_all()
339 if not users:
340 is_admin = True
341 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
342 user_collection = get_collection(systems_database, "users", create=True)
343 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
344 if new_user is None:
345 return None
346 return {"user_id": str(new_user.inserted_id)}
348 @classmethod
349 def update_info(cls, user: "UserUpdate", user_id: str):
350 updated_user = cls.collection().find_one_and_update(
351 {"_id": ObjectId(user_id)},
352 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
353 return_document=ReturnDocument.AFTER,
354 )
355 updated_user["id"] = str(updated_user["_id"])
356 del (updated_user["_id"], updated_user["is_connected"])
357 return cls(**updated_user)
360UserUpdate = create_update_model(User)
363class Mode(TwinPadModel):
364 mode_id: int
365 name: str
366 frequency_multiplier: float
367 min_frequency: float
370class DeviceUpdate(TwinPadModel):
371 mode_id: int
374class Device(GenericMongo):
375 collection_name: ClassVar[str] = "devices"
377 device_id: DeviceId
378 name: str
379 description: str = ""
380 modes: list[Mode]
381 current_mode_id: int | None = None
382 last_ping: float | None = None
383 petri_network: Any
384 pid: Any
385 load: float | None = None
386 tokens: list[int] = Field(default_factory=list)
387 status: str
389 async def change_mode(self, update_dict, current_user: User):
390 has_error = False
392 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
393 has_error = True
394 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
395 elif self.current_mode_id is not None:
396 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
397 else:
398 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
399 command = Command(
400 sent_at=time.time(),
401 command_type="Mode change",
402 description=description,
403 user_id=current_user.id,
404 )
406 if has_error:
407 command.response_time = 0
408 command.succeeded = False
409 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
410 else:
411 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id)
412 command.receive_response(response)
414 Command.create(command)
415 return response
417 @classmethod
418 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
419 devices_by_id = {}
420 for signal_id in signal_ids:
421 device_id = signal_id.split(".")[0]
422 if device_id not in devices_by_id:
423 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
424 return devices_by_id
427class DeviceSetup(GenericMongo):
428 collection_name: ClassVar[str] = "device_setups"
430 device_ids: list[str]
431 active: bool = False
432 variable_mapping: dict[str, str]
435DeviceSetupUpdate = create_update_model(DeviceSetup)
438class DeviceState(GenericMongo):
439 collection_name: ClassVar[str] = "devices_states"
441 timestamp: float
442 mode: str | None = None
443 load: float | None = None
444 tokens: list[int] = Field(default_factory=list)
445 modified_properties: list[str] = Field(default_factory=list)
447 @classmethod
448 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]:
449 req_filter = query.mongodb_filter()
450 items = []
451 if ":" in query.sort_by:
452 sort_field, sort_order = query.sort_by.split(":")
453 sort_order = int(sort_order)
454 else:
455 sort_field = query.sort_by
456 sort_order = 1
457 collection = get_collection(devices_states_database, device_id)
458 if collection is None:
459 total = 0
460 cursor = []
461 else:
462 total = collection.count_documents(req_filter)
463 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
464 if (query.limit is not None) and (query.limit != 0):
465 cursor = cursor.limit(query.limit)
466 for item_dict in cursor:
467 items.append(
468 cls(
469 timestamp=item_dict.get("precise_timestamp"),
470 mode=item_dict.get("mode", None),
471 load=item_dict.get("load", None),
472 tokens=item_dict.get("tokens", Field(default_factory=list)),
473 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
474 )
475 )
476 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
479class SignalSample(TwinPadModel):
480 signal_id: str
481 timestamp: float
482 value: float | int | str | bool | None
483 forced_value: float | int | str | bool | None = None
485 @classmethod
486 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
488 collection = get_signal_collection(signal_id)
489 if collection is None:
490 return None
492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
494 bucket = get_signal_collection(f"system.buckets.{signal_id}")
495 first_bucket = None
496 if bucket is not None:
497 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
498 if first_bucket is not None:
499 sample_data = collection.find_one(
500 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
501 )
502 else:
503 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
505 if sample_data is None:
506 return None
508 timestamp = sample_data["precise_timestamp"]
510 return cls(
511 signal_id=signal_id,
512 timestamp=timestamp,
513 value=sample_data.get("value", None),
514 forced_value=sample_data.get("forced_value", None),
515 )
517 @classmethod
518 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
519 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
521 @classmethod
522 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
523 last_value_collection = get_signal_collection("last_values", True)
524 signals_collection = get_collection(systems_database, "signals", create=True)
526 sample_data = last_value_collection.find_one({"signal_id": signal_id}, sort={"precise_timestamp": -1})
528 # If there is no data, check if the signal's type is anything other than float, as they don't send duplicate values
529 if sample_data is None:
530 if signals_collection.count_documents({"status.status": "up", "signal_id": signal_id}) < 1:
531 return None
533 # Same workaround as above function, very effective to narrow down big sets of data
534 bucket = get_signal_collection(f"system.buckets.{signal_id}")
535 last_bucket = None
536 if bucket is not None:
537 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
538 signal_collection = get_signal_collection(signal_id)
539 if last_bucket is not None and signal_collection is not None:
540 sample_data = signal_collection.find_one(
541 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
542 sort={"precise_timestamp": -1},
543 )
544 else:
545 sample_data = signal_collection.find_one({}, sort={"precise_timestamp": -1})
547 if sample_data is None:
548 return None
550 timestamp = sample_data.get("precise_timestamp")
551 # Align the timestamp with the device's last ping, cannot align with current time to avoid false reports if device is down
552 if device is None:
553 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
554 if device is not None and device.last_ping is not None:
555 if timestamp is None:
556 timestamp = device.last_ping
557 else:
558 timestamp = max(timestamp, device.last_ping)
559 else:
560 timestamp = sample_data.get("precise_timestamp")
562 return cls(
563 signal_id=signal_id,
564 timestamp=timestamp,
565 value=sample_data.get("value", None),
566 forced_value=sample_data.get("forced_value", None),
567 )
569 @classmethod
570 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
571 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
572 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
575class SignalData(TwinPadModel):
576 signal_id: str
577 forcible: bool = True
578 time_vector: list[float]
579 values: list[float | int | str | None]
580 forced_values: list[float | int | str | None]
582 data_start: float | None = None
583 data_end: float | None = None
585 number_samples: int = 0
586 number_samples_db: int = 0
588 db_query_time: float = 0.0
589 init_time: float = 0.0
590 data_processing_time: float = 0.0
592 phase_id: str | None = None
594 @classmethod
595 def get_from_signal_id(
596 cls,
597 signal_id: str,
598 min_timestamp: float = None,
599 max_timestamp: float = None,
600 window_min_timestamp: float = None,
601 window_max_timestamp: float = None,
602 interpolate_bounds: bool = True,
603 max_documents: int = None,
604 collection=None,
605 ) -> Self:
607 now = time.time()
609 req_signal = {}
610 if min_timestamp is not None:
611 req_signal.setdefault("timestamp", {})
612 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
613 if max_timestamp is not None:
614 req_signal.setdefault("timestamp", {})
615 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
617 if collection is None:
618 collection = get_signal_collection(signal_id)
619 if collection is None:
620 return cls(
621 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
622 )
624 db_req_start = time.time()
626 sort_step = {"$sort": {"precise_timestamp": 1}}
627 number_results = collection.count_documents(req_signal)
629 pipeline = []
630 if req_signal:
631 pipeline.append({"$match": req_signal}) # Filter data if needed
633 pipeline.extend(
634 [
635 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
636 sort_step,
637 ]
638 )
640 if max_documents is not None and max_documents < number_results:
641 unsampling_ratio = math.ceil(number_results / max_documents)
642 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
643 pipeline.extend(
644 [
645 {
646 "$setWindowFields": {
647 "sortBy": {"precise_timestamp": 1},
648 "output": {"index": {"$documentNumber": {}}},
649 }
650 },
651 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
652 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
653 {"$replaceRoot": {"newRoot": "$doc"}},
654 {"$unset": ["index", "group_id"]},
655 {"$sort": {"precise_timestamp": 1}},
656 ]
657 )
659 # logger.info(f"pipeline: %s", str(pipeline))
660 cursor = collection.aggregate(pipeline)
661 db_req_time = time.time() - db_req_start
663 init_time = time.time()
665 results = cursor.to_list()
666 time_vector = []
667 values = []
668 forced_values = []
669 for s in results:
670 time_vector.append(s["precise_timestamp"])
671 values.append(s.get("value", None))
672 forced_values.append(s.get("forced_value", None))
674 signal = Signal.get_from_signal_id(signal_id)
675 if signal is None:
676 return cls(
677 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
678 )
679 class_ = signal.signal_data_class
681 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
682 time_vector, values, forced_values = cls.interpolate_bounds(
683 class_,
684 collection,
685 signal_id,
686 time_vector,
687 values,
688 forced_values,
689 window_min_timestamp,
690 window_max_timestamp,
691 )
693 if values:
694 # TODO: check below. a bit strange
695 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
696 # Adding last value as it should be repeated
697 time_vector.append(now)
698 values.append(values[-1])
699 forced_values.append(forced_values[-1])
701 init_time = time.time() - init_time
703 # See line 292 for explanation
704 bucket = get_signal_collection(f"system.buckets.{signal_id}")
705 first_bucket = None
706 if bucket is not None:
707 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
708 if first_bucket is not None:
709 data_start = first_bucket["control"]["min"]["precise_timestamp"]
710 else:
711 data_start = None
713 last_bucket = None
714 if bucket is not None:
715 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
716 if last_bucket is not None:
717 data_end = last_bucket["control"]["max"]["precise_timestamp"]
718 else:
719 data_end = None
721 return class_(
722 signal_id=signal_id,
723 forcible=signal.forcible,
724 time_vector=time_vector,
725 values=values,
726 forced_values=forced_values,
727 data_start=data_start,
728 data_end=data_end,
729 number_samples=len(values),
730 number_samples_db=number_results,
731 db_query_time=db_req_time,
732 init_time=init_time,
733 )
735 @staticmethod
736 def interpolate_bounds(
737 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
738 ):
739 sample_right = None
740 # Fetching right side value & interpolation
741 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
742 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
743 sample_right = collection.find_one(
744 {
745 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
746 "value": {"$exists": True},
747 },
748 sort={"precise_timestamp": -1},
749 )
750 if sample_right:
751 if time_vector:
752 right_sd = class_(
753 signal_id=signal_id,
754 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
755 values=[values[-1], sample_right.get("value", None)],
756 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
757 )
758 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
759 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
760 else:
761 max_ts_value = sample_right.get("value", None)
762 max_ts_forced_value = sample_right.get("forced_value", None)
763 time_vector.append(window_max_timestamp)
764 values.append(max_ts_value)
765 forced_values.append(max_ts_forced_value)
767 # Fetching left side value & interpolation
768 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
769 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
770 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
771 sample_left = sample_right
772 sample_left = collection.find_one(
773 {
774 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
775 "value": {"$exists": True},
776 },
777 sort={"precise_timestamp": -1},
778 )
780 if sample_left:
781 if time_vector:
782 left_sd = class_(
783 signal_id=signal_id,
784 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
785 values=[sample_left["value"], values[0]],
786 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
787 )
788 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
789 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
790 else:
791 min_ts_value = sample_left.get("value", None)
792 min_ts_forced_value = sample_left.get("forced_value", None)
793 time_vector.insert(0, window_min_timestamp)
794 values.insert(0, min_ts_value)
795 forced_values.insert(0, min_ts_forced_value)
797 return time_vector, values, forced_values
799 def interpolate_values(self, new_time_vector: list[float]):
800 return self.interpolate(new_time_vector, self.values)
802 def interpolate_forced_values(self, new_time_vector: list[float]):
803 return self.interpolate(new_time_vector, self.forced_values)
805 def uniform_desampling(self, number_samples_max: int = None) -> Self:
806 if number_samples_max is None or self.number_samples <= number_samples_max:
807 return self
809 data_processing_time = time.time()
811 new_time_vector = npy.linspace(self.time_vector[0], self.time_vector[-1], number_samples_max).tolist()
812 values = self.interpolate_values(new_time_vector)
813 forced_values = self.interpolate_forced_values(new_time_vector)
814 number_samples = len(new_time_vector)
816 data_processing_time = time.time() - data_processing_time
818 return self.__class__(
819 signal_id=self.signal_id,
820 time_vector=new_time_vector,
821 values=values,
822 forced_values=forced_values,
823 number_samples=number_samples,
824 number_samples_db=self.number_samples_db,
825 data_start=self.data_start,
826 data_end=self.data_end,
827 db_query_time=self.db_query_time,
828 init_time=self.init_time,
829 data_processing_time=self.data_processing_time + data_processing_time,
830 phase_id=self.phase_id,
831 )
833 def min_max_downsampling(self, number_samples_max: int) -> Self:
834 return self.uniform_desampling(number_samples_max)
836 def interest_window_desampling(
837 self,
838 window_max_number_samples: int,
839 outside_max_number_samples: int,
840 window_min_timestamp: float | None = None,
841 window_max_timestamp: float | None = None,
842 ) -> Self:
843 """Performs a sampling in a window of interest and outside."""
845 if not self.time_vector:
846 return self
848 if window_min_timestamp is None:
849 window_min_timestamp = self.time_vector[0]
850 if window_max_timestamp is None:
851 window_max_timestamp = self.time_vector[-1]
853 data_processing_time = time.time()
855 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
856 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
858 time_vector_before = self.time_vector[:index_window_start]
859 time_vector_window = self.time_vector[index_window_start:index_window_end]
860 time_vector_after = self.time_vector[index_window_end:]
862 # Resampling window
863 if time_vector_window:
864 # Ensurring window bounds
865 if time_vector_window[0] != window_min_timestamp:
866 time_vector_window.insert(0, window_min_timestamp)
867 if time_vector_window[-1] != window_max_timestamp:
868 time_vector_window.append(window_max_timestamp)
869 else:
870 time_vector_window = [window_min_timestamp, window_max_timestamp]
872 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
873 # Resampling
874 new_window_time_vector = npy.linspace(
875 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
876 ).tolist()
877 time_vector_window = new_window_time_vector
879 # Resampling outside
880 time_vector_before, time_vector_after = SignalData.resample_outside_window(
881 time_vector_before, time_vector_after, outside_max_number_samples
882 )
884 new_time_vector = time_vector_before + time_vector_window + time_vector_after
885 values = self.interpolate_values(new_time_vector)
886 forced_values = self.interpolate_forced_values(new_time_vector)
887 number_samples = len(values)
889 data_processing_time = time.time() - data_processing_time
891 return self.__class__(
892 signal_id=self.signal_id,
893 forcible=self.forcible,
894 time_vector=new_time_vector,
895 values=values,
896 forced_values=forced_values,
897 number_samples=number_samples,
898 number_samples_db=self.number_samples,
899 data_start=self.data_start,
900 data_end=self.data_end,
901 db_query_time=self.db_query_time,
902 init_time=self.init_time,
903 data_processing_time=self.data_processing_time + data_processing_time,
904 )
906 def zero_time_vector(self, data_start: float):
907 data_processing_time = time.time()
908 if len(self.time_vector) == 0:
909 return self
910 time_vector = npy.array(self.time_vector) - data_start
911 data_processing_time = time.time() - data_processing_time
913 return self.__class__(
914 signal_id=self.signal_id,
915 time_vector=time_vector,
916 values=self.values,
917 forced_values=self.forced_values,
918 number_samples=self.number_samples,
919 number_samples_db=self.number_samples_db,
920 data_start=time_vector[0],
921 data_end=time_vector[-1],
922 db_query_time=self.db_query_time,
923 init_time=self.init_time,
924 data_processing_time=self.data_processing_time + data_processing_time,
925 )
927 def csv_export(self):
928 output = io.StringIO()
929 writer = csv.writer(output)
930 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
931 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
932 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
933 return output.getvalue().encode("utf-8")
935 def prestoplot_export(self):
936 clean_signal_id = self.signal_id.replace(".", "_")
937 if clean_signal_id[0].isnumeric():
938 clean_signal_id = "_" + clean_signal_id
940 output = io.StringIO()
941 output.write("# Encoding:\tUTF-8\n")
942 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
943 output.write("ISO8601\tnone\tnone\n")
944 output.write(f"# Description :\t{clean_signal_id}\n")
946 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
947 output.write(
948 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
949 )
950 return output.getvalue().encode("utf-8")
952 @staticmethod
953 def resample_outside_window(time_vector_left, time_vector_right, outside_max_number_samples):
954 number_samples_left = len(time_vector_left)
955 number_samples_right = len(time_vector_right)
957 if (
958 outside_max_number_samples is None
959 or (number_samples_left + number_samples_right) <= outside_max_number_samples
960 ):
961 return time_vector_left, time_vector_right
963 new_time_vector_left = time_vector_left
964 new_time_vector_right = time_vector_right
966 new_number_samples_before = min(
967 number_samples_left,
968 math.ceil(outside_max_number_samples * number_samples_left / (number_samples_left + number_samples_right)),
969 )
970 new_number_samples_after = min(
971 number_samples_right,
972 math.ceil(outside_max_number_samples * number_samples_right / (number_samples_left + number_samples_right)),
973 )
974 # Adjusting numbers as math.ceil can do +1 on sum
975 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
976 if new_number_samples_before > new_number_samples_after:
977 new_number_samples_before -= 1
978 else:
979 new_number_samples_after -= 1
981 if new_number_samples_before > 0:
982 new_time_vector_left = npy.linspace(
983 time_vector_left[0], time_vector_left[-1], new_number_samples_before
984 ).tolist()
986 if new_number_samples_after > 0:
987 new_time_vector_right = npy.linspace(
988 time_vector_right[-1], time_vector_right[0], new_number_samples_after
989 ).tolist()[::-1]
991 return new_time_vector_left, new_time_vector_right
994class NumericSignalData(SignalData):
995 data_type: str = "float"
996 values: list[float | int | None]
997 forced_values: list[float | int | None]
999 def interpolate(self, new_time_vector: list[float], items):
1000 items = [npy.nan if s is None else s for s in items]
1001 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
1003 def uniform_desampling(self, number_samples_max: int) -> Self:
1004 if number_samples_max is None or self.number_samples <= number_samples_max:
1005 return self
1007 data_processing_time = time.time()
1009 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
1010 forced_values = self.interpolate_forced_values(time_vector)
1011 number_samples = len(time_vector)
1013 data_processing_time = time.time() - data_processing_time
1015 return self.__class__(
1016 signal_id=self.signal_id,
1017 time_vector=time_vector,
1018 values=values,
1019 forced_values=forced_values,
1020 number_samples=number_samples,
1021 number_samples_db=self.number_samples_db,
1022 data_start=self.data_start,
1023 data_end=self.data_end,
1024 db_query_time=self.db_query_time,
1025 init_time=self.init_time,
1026 data_processing_time=self.data_processing_time + data_processing_time,
1027 )
1029 def min_max_downsampling(self, number_samples_max: int) -> Self:
1030 if self.number_samples < number_samples_max:
1031 return self
1033 data_processing_time = time.time()
1035 number_bins = number_samples_max // 2
1037 time_vector = npy.array(self.time_vector, dtype=npy.float64)
1038 values = npy.array(self.values, dtype=npy.float64)
1039 forced_values = npy.array(self.forced_values, dtype=npy.float64)
1041 points_per_bin = self.number_samples // number_bins
1043 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above
1044 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for
1045 if self.number_samples - number_bins * points_per_bin > 1:
1046 points_per_bin += 1
1047 number_bins = self.number_samples // points_per_bin + 1
1048 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan)
1049 time_vector = npy.concatenate([time_vector, nan_points_to_add])
1050 values = npy.concatenate([values, nan_points_to_add])
1051 forced_values = npy.concatenate([forced_values, nan_points_to_add])
1053 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin)
1054 values_matrix = values.reshape(number_bins, points_per_bin)
1055 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin)
1057 indexes_min = npy.zeros(number_bins, dtype="int64")
1058 indexes_max = npy.zeros(number_bins, dtype="int64")
1060 for row in range(number_bins):
1061 min_value = values_matrix[row, 0]
1062 max_value = values_matrix[row, 0]
1063 for column in range(points_per_bin):
1064 if values_matrix[row, column] < min_value:
1065 min_value = values_matrix[row, column]
1066 indexes_min[row] = column
1067 elif values_matrix[row, column] > max_value:
1068 max_value = values_matrix[row, column]
1069 indexes_max[row] = column
1071 row_index = npy.repeat(npy.arange(number_bins), 2)
1072 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel()
1074 data_processing_time = time.time() - data_processing_time
1076 new_time_vector = timestamps_matrix[row_index, column_index]
1077 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector)
1078 new_values = values_matrix[row_index, column_index]
1079 new_values = npy.where(npy.isnan(new_values), None, new_values)
1080 new_forced_values = forced_values_matrix[row_index, column_index]
1081 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1083 # Make sure there are no None values for the time vector
1084 time_vector_filter = new_time_vector != None
1085 new_time_vector = new_time_vector[time_vector_filter]
1086 new_values = new_values[time_vector_filter]
1087 new_forced_values = new_forced_values[time_vector_filter]
1089 return self.__class__(
1090 signal_id=self.signal_id,
1091 time_vector=new_time_vector,
1092 values=new_values,
1093 forced_values=new_forced_values,
1094 number_samples=number_bins * 2,
1095 number_samples_db=self.number_samples_db,
1096 data_start=self.data_start,
1097 data_end=self.data_end,
1098 db_query_time=self.db_query_time,
1099 init_time=self.init_time,
1100 data_processing_time=self.data_processing_time + data_processing_time,
1101 phase_id=self.phase_id,
1102 )
1104 def interest_window_desampling(
1105 self,
1106 window_max_number_samples: int,
1107 outside_max_number_samples: int,
1108 window_min_timestamp: float | None = None,
1109 window_max_timestamp: float | None = None,
1110 ) -> Self:
1111 """Performs a sampling in a window of interest and outside."""
1113 if not self.time_vector:
1114 return self
1116 if window_min_timestamp is None:
1117 window_min_timestamp = self.time_vector[0]
1118 if window_max_timestamp is None:
1119 window_max_timestamp = self.time_vector[-1]
1121 data_processing_time = time.time()
1123 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1124 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1126 time_vector_before = self.time_vector[:index_window_start]
1127 time_vector_window = self.time_vector[index_window_start:index_window_end]
1128 time_vector_after = self.time_vector[index_window_end:]
1130 values_before = self.values[:index_window_start]
1131 values_window = self.values[index_window_start:index_window_end]
1132 values_after = self.values[index_window_end:]
1133 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1134 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1136 # Resampling window
1137 if time_vector_window:
1138 # Ensurring window bounds
1139 if time_vector_window[0] != window_min_timestamp:
1140 time_vector_window.insert(0, window_min_timestamp)
1141 values_window.insert(0, window_min_value)
1142 if time_vector_window[-1] != window_max_timestamp:
1143 time_vector_window.append(window_max_timestamp)
1144 values_window.append(window_max_value)
1145 else:
1146 time_vector_window = [window_min_timestamp, window_max_timestamp]
1147 values_window = [window_min_value, window_max_value]
1149 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
1150 # Resampling
1151 time_vector_window, values_window = downsample_list(
1152 time_vector_window, values_window, window_max_number_samples
1153 )
1155 # Resampling outside
1156 number_samples_before = len(time_vector_before)
1157 number_samples_after = len(time_vector_after)
1158 if (
1159 outside_max_number_samples is not None
1160 and (number_samples_before + number_samples_after) > outside_max_number_samples
1161 ):
1162 new_number_samples_before = min(
1163 number_samples_before,
1164 math.ceil(
1165 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1166 ),
1167 )
1168 new_number_samples_after = min(
1169 number_samples_after,
1170 math.ceil(
1171 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1172 ),
1173 )
1174 # Adjusting numbers as math.ceil can do +1 on sum
1175 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1176 if new_number_samples_before > new_number_samples_after:
1177 new_number_samples_before -= 1
1178 else:
1179 new_number_samples_after -= 1
1181 if new_number_samples_before > 0:
1182 time_vector_before, values_before = downsample_list(
1183 time_vector_before, values_before, new_number_samples_before
1184 )
1186 if new_number_samples_after > 0:
1187 time_vector_after, values_after = downsample_list(
1188 time_vector_after, values_after, new_number_samples_after
1189 )
1191 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1192 values = values_before + values_window + values_after
1193 forced_values = self.interpolate_forced_values(new_time_vector)
1194 number_samples = len(values)
1196 data_processing_time = time.time() - data_processing_time
1198 return self.__class__(
1199 signal_id=self.signal_id,
1200 time_vector=new_time_vector,
1201 values=values,
1202 forced_values=forced_values,
1203 number_samples=number_samples,
1204 number_samples_db=self.number_samples,
1205 data_start=self.data_start,
1206 data_end=self.data_end,
1207 db_query_time=self.db_query_time,
1208 init_time=self.init_time,
1209 data_processing_time=self.data_processing_time + data_processing_time,
1210 )
1213class StringSignalData(SignalData):
1214 data_type: str = "str"
1215 values: list[str | None]
1216 forced_values: list[str | None]
1218 def interpolate(self, new_time_vector: list[float], items):
1219 # Find the indices of the values in xp that are just smaller or equal to x
1220 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1221 indices = npy.clip(indices, 0, len(items) - 1)
1222 # Return the corresponding left string values from fp
1223 return [items[i] for i in indices]
1226class SignalsData(TwinPadModel):
1227 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1228 data_processing_time: float
1229 data_start: float | None
1230 data_end: float | None
1232 @classmethod
1233 def get_from_signal_ids(
1234 cls,
1235 signal_ids: list[str],
1236 min_timestamp: float = None,
1237 max_timestamp: float = None,
1238 window_min_timestamp: float = None,
1239 window_max_timestamp: float = None,
1240 interpolate_bounds: bool = True,
1241 max_documents: int = None,
1242 ) -> Self:
1243 signals_data = []
1244 data_start = None
1245 data_end = None
1246 if max_timestamp is None:
1247 max_timestamp = time.time()
1248 data_processing_time = 0.0
1250 signal_collections = get_signal_collections_batch(signal_ids)
1252 for signal_id, collection in zip(signal_ids, signal_collections):
1253 signal_data = SignalData.get_from_signal_id(
1254 signal_id=signal_id,
1255 min_timestamp=min_timestamp,
1256 max_timestamp=max_timestamp,
1257 window_min_timestamp=window_min_timestamp,
1258 window_max_timestamp=window_max_timestamp,
1259 interpolate_bounds=interpolate_bounds,
1260 max_documents=max_documents,
1261 collection=collection,
1262 )
1263 data_processing_time += signal_data.data_processing_time
1264 signals_data.append(signal_data)
1265 if signal_data.data_start is not None:
1266 if data_start is None:
1267 data_start = signal_data.data_start
1268 else:
1269 data_start = min(signal_data.data_start, data_start)
1270 if signal_data.data_end is not None:
1271 if data_end is None:
1272 data_end = signal_data.data_end
1273 else:
1274 data_end = max(signal_data.data_end, data_end)
1276 return cls(
1277 signals_data=signals_data,
1278 data_processing_time=data_processing_time,
1279 data_start=data_start,
1280 data_end=data_end,
1281 )
1283 @classmethod
1284 def get_from_phase_and_signal_ids(
1285 cls,
1286 phases: list,
1287 phase_sync_times: list[float | None],
1288 signal_ids: list[str],
1289 window_min_timestamps: list[float | None],
1290 window_max_timestamps: list[float | None],
1291 zero_time_vector: bool = True,
1292 ):
1293 signals_data: list[SignalData] = []
1294 computation_start = time.time()
1296 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip(
1297 phases, phase_sync_times, window_min_timestamps, window_max_timestamps
1298 ):
1299 min_timestamp = phase.start_at / 1000
1300 max_timestamp = phase.end_at / 1000
1302 if sync_time is None:
1303 sync_time = min_timestamp
1305 if window_max_timestamp is not None and window_min_timestamp is not None:
1306 window_length = window_max_timestamp - window_min_timestamp
1308 if window_min_timestamp != min_timestamp:
1309 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20)
1310 if window_max_timestamp != max_timestamp:
1311 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20)
1313 signal_collections = get_signal_collections_batch(signal_ids)
1315 for signal_id, collection in zip(signal_ids, signal_collections):
1316 signal_data = SignalData.get_from_signal_id(
1317 signal_id,
1318 min_timestamp,
1319 max_timestamp,
1320 window_min_timestamp,
1321 window_max_timestamp,
1322 interpolate_bounds=False,
1323 max_documents=None,
1324 collection=collection,
1325 )
1327 if len(signal_data.time_vector) == 0:
1328 continue
1330 if zero_time_vector:
1331 signal_data = signal_data.zero_time_vector(sync_time)
1332 signal_data.phase_id = phase.id
1334 signals_data.append(signal_data)
1336 return cls(
1337 signals_data=signals_data,
1338 data_processing_time=time.time() - computation_start,
1339 data_start=0,
1340 data_end=0,
1341 )
1343 def uniform_desampling(self, number_samples_max: int) -> Self:
1344 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1345 return SignalsData(
1346 signals_data=signals_data,
1347 data_processing_time=sum(s.data_processing_time for s in signals_data),
1348 data_start=self.data_start,
1349 data_end=self.data_end,
1350 )
1352 def min_max_downsampling(self, number_samples_max: int) -> Self:
1353 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data]
1354 return SignalsData(
1355 signals_data=signals_data,
1356 data_processing_time=sum(s.data_processing_time for s in signals_data),
1357 data_start=self.data_start,
1358 data_end=self.data_end,
1359 )
1361 def interest_window_desampling(
1362 self,
1363 window_max_number_samples: int,
1364 outside_max_number_samples: int,
1365 window_min_timestamp: float = None,
1366 window_max_timestamp: float = None,
1367 ) -> Self:
1368 signals_data = [
1369 s.interest_window_desampling(
1370 window_max_number_samples=window_max_number_samples,
1371 outside_max_number_samples=outside_max_number_samples,
1372 window_min_timestamp=window_min_timestamp,
1373 window_max_timestamp=window_max_timestamp,
1374 )
1375 for s in self.signals_data
1376 ]
1378 return SignalsData(
1379 signals_data=signals_data,
1380 data_processing_time=sum(s.data_processing_time for s in signals_data),
1381 data_start=self.data_start,
1382 data_end=self.data_end,
1383 )
1385 def zero_time_vector(self, data_start: float):
1386 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data]
1387 return SignalsData(
1388 signals_data=signals_data,
1389 data_processing_time=sum(s.data_processing_time for s in signals_data),
1390 data_start=0,
1391 data_end=max([s.data_end for s in signals_data]),
1392 )
1394 @classmethod
1395 async def apply_single_function(
1396 cls,
1397 phase,
1398 base_signal_id: str,
1399 function: SINGLE_POST_PROCESSING_FUNCTION,
1400 window_min_timestamp: float = None,
1401 window_max_timestamp: float = None,
1402 ):
1403 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}"
1405 processed_result_signal = Signal.get_from_signal_id(signal_id)
1406 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids:
1407 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp)
1409 signals_data = cls.get_from_phase_and_signal_ids(
1410 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False
1411 )
1413 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0:
1414 return None
1416 new_values = None
1417 new_forced_values = None
1418 time_vector = npy.array(signals_data.signals_data[0].time_vector)
1419 values = signals_data.signals_data[0].values
1420 forced_values = signals_data.signals_data[0].forced_values
1422 match (function):
1423 case "Cumul":
1424 new_values = cumul(values)
1425 new_forced_values = cumul(forced_values)
1426 # case "CumulDistrib":
1427 # new_values = cumul_distrib(values)
1428 # new_forced_values = cumul_distrib(forced_values)
1429 case "Delta":
1430 new_values = delta(values)
1431 new_forced_values = delta(forced_values)
1432 case "DeltaT":
1433 new_values = delta(time_vector)
1434 new_forced_values = new_values
1435 case "Derive":
1436 new_values = derive(time_vector, values)
1437 new_forced_values = derive(time_vector, forced_values)
1438 case "Integ":
1439 new_values = integ(time_vector, values)
1440 new_forced_values = integ(time_vector, forced_values)
1442 new_values = npy.where(npy.isnan(new_values), None, new_values)
1443 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1445 loop = asyncio.get_running_loop()
1446 loop.create_task(
1447 cls.save_function_signal(
1448 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible
1449 )
1450 )
1452 if window_max_timestamp is not None:
1453 max_timestamp_mask = time_vector <= window_max_timestamp
1454 time_vector = time_vector[max_timestamp_mask]
1455 new_values = new_values[max_timestamp_mask]
1456 new_forced_values = new_forced_values[max_timestamp_mask]
1457 if window_min_timestamp is not None:
1458 min_timestamp_mask = time_vector >= window_min_timestamp
1459 time_vector = time_vector[min_timestamp_mask]
1460 new_values = new_values[min_timestamp_mask]
1461 new_forced_values = new_forced_values[min_timestamp_mask]
1463 signals_data.signals_data[0].time_vector = time_vector.tolist()
1464 signals_data.signals_data[0].values = new_values.tolist()
1465 signals_data.signals_data[0].forced_values = new_forced_values.tolist()
1466 signals_data.signals_data[0].number_samples = time_vector.size
1468 signals_data.signals_data[0].signal_id = signal_id
1470 return signals_data
1472 @classmethod
1473 async def apply_multiple_function(
1474 cls,
1475 phases: list,
1476 signal_ids: list,
1477 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
1478 window_min_timestamp: float = None,
1479 window_max_timestamp: float = None,
1480 ):
1481 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION):
1482 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}"
1483 else:
1484 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}"
1486 active_phase = phases[0]
1487 if function in {"Align-X", "Using-X"}:
1488 active_phase = phases[1]
1490 processed_result_signal = Signal.get_from_signal_id(function_signal_id)
1491 if processed_result_signal is not None and (
1492 active_phase.id in processed_result_signal.computed_phases_ids
1493 ): # If signal has been computed for the correct phase
1494 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp)
1496 array_length = None
1497 time_vector_list = []
1498 values_list = []
1499 forced_values_list = []
1500 forcible = True
1501 for phase, signal_id in zip(phases, signal_ids):
1502 signals_data = cls.get_from_phase_and_signal_ids(
1503 [phase], [None], [signal_id], [None], [None], zero_time_vector=False
1504 )
1506 if len(signals_data.signals_data) == 0:
1507 return None
1509 signal_data = signals_data.signals_data[0]
1511 if array_length is None:
1512 array_length = signal_data.number_samples
1513 if (
1514 array_length != signal_data.number_samples and function != "Align-X"
1515 ) or signal_data.number_samples == 0:
1516 return None
1518 time_vector_list.append(npy.array(signal_data.time_vector))
1519 values_list.append(npy.array(signal_data.values, dtype=npy.float64))
1520 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64))
1521 forcible = forcible and signal_data.forcible
1523 time_vector = time_vector_list[0]
1524 new_values = None
1525 new_forced_values = None
1527 match (function):
1528 case "Align-X":
1529 time_vector = time_vector_list[1]
1530 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000
1531 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000
1532 new_values = align_x(old_time_vector, values_list[0], new_time_vector)
1533 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector)
1534 # case "Atan2":
1535 # new_values = atan2(values_list[0], values_list[1])
1536 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1])
1537 case "Using-X":
1538 if len(time_vector_list[0]) != len(time_vector_list[1]):
1539 return None
1540 time_vector = time_vector_list[1]
1541 new_values = values_list[0]
1542 new_forced_values = forced_values_list[0]
1543 case "Mean":
1544 new_values = mean(*values_list)
1545 new_forced_values = mean(*forced_values_list)
1546 case "Norm":
1547 new_values = norm(*values_list)
1548 new_forced_values = norm(*forced_values_list)
1550 new_values = npy.where(npy.isnan(new_values), None, new_values)
1551 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1553 loop = asyncio.get_running_loop()
1554 loop.create_task(
1555 cls.save_function_signal(
1556 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible
1557 )
1558 )
1560 total_number_samples = time_vector.size
1562 if window_max_timestamp is not None:
1563 max_timestamp_mask = time_vector <= window_max_timestamp
1564 time_vector = time_vector[max_timestamp_mask]
1565 new_values = new_values[max_timestamp_mask]
1566 new_forced_values = new_forced_values[max_timestamp_mask]
1567 if window_min_timestamp is not None:
1568 min_timestamp_mask = time_vector >= window_min_timestamp
1569 time_vector = time_vector[min_timestamp_mask]
1570 new_values = new_values[min_timestamp_mask]
1571 new_forced_values = new_forced_values[min_timestamp_mask]
1573 signals_data = cls(
1574 signals_data=[
1575 NumericSignalData(
1576 signal_id=function_signal_id,
1577 forcible=forcible,
1578 time_vector=time_vector.tolist(),
1579 values=new_values.tolist(),
1580 forced_values=new_forced_values.tolist(),
1581 number_samples=time_vector.size,
1582 number_samples_db=total_number_samples,
1583 )
1584 ],
1585 data_processing_time=0,
1586 data_start=0,
1587 data_end=0,
1588 )
1590 return signals_data
1592 @classmethod
1593 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float):
1594 signal_data_collection = get_signal_collection(signal_id, create=True)
1595 pipeline = []
1596 match_filter = {}
1597 if window_min_timestamp is not None or window_max_timestamp is not None:
1598 match_filter["$match"] = {}
1599 match_filter["$match"]["precise_timestamp"] = {}
1600 if window_max_timestamp is not None:
1601 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp
1602 if window_min_timestamp is not None:
1603 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp
1605 total_number_samples = signal_data_collection.count_documents({})
1607 if match_filter:
1608 pipeline.append(match_filter)
1610 fetch_start = time.time()
1612 samples = signal_data_collection.aggregate(pipeline).to_list()
1613 new_time_vector = []
1614 new_values = []
1615 new_forced_values = []
1616 for sample in samples:
1617 new_time_vector.append(sample["precise_timestamp"])
1618 new_values.append(sample["value"])
1619 new_forced_values.append(sample["forced_value"])
1621 return cls(
1622 signals_data=[
1623 NumericSignalData(
1624 signal_id=signal_id,
1625 time_vector=new_time_vector,
1626 values=new_values,
1627 forced_values=new_forced_values,
1628 number_samples=len(new_time_vector),
1629 number_samples_db=total_number_samples,
1630 )
1631 ],
1632 data_processing_time=time.time() - fetch_start,
1633 data_start=0,
1634 data_end=0,
1635 )
1637 @classmethod
1638 async def save_function_signal(
1639 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool
1640 ):
1641 # Insert data first so if it is requested by another user, it will be computed again
1642 signal_collection = get_signal_collection(function_signal_id, create=True)
1643 signal_collection.delete_many(
1644 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}}
1645 )
1646 signal_collection.insert_many(
1647 [
1648 {
1649 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]),
1650 "precise_timestamp": time_vector[i],
1651 "value": new_values[i],
1652 "forced_value": new_forced_values[i],
1653 }
1654 for i in range(len(time_vector))
1655 ]
1656 )
1658 signals_config_collection = get_collection(systems_database, "signals", create=True)
1659 signals_config_collection.find_one_and_update(
1660 {"signal_id": function_signal_id},
1661 {
1662 "$set": {
1663 "description": "",
1664 "unit": None,
1665 "type": "sensor",
1666 "address": None,
1667 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0
1668 "transfer_function": None,
1669 "precision_digits": None,
1670 "digitization_function": None,
1671 "data_type": "float",
1672 "formula": None,
1673 "forcible": forcible,
1674 "commandable": False,
1675 "broadcastable": True,
1676 "signal_id": function_signal_id,
1677 "post_processing": True,
1678 },
1679 "$push": {"computed_phases_ids": phase.id},
1680 },
1681 upsert=True,
1682 )
1684 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []):
1685 if post_processing:
1686 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1687 zip_buffer = io.BytesIO()
1688 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1689 for signal_data in self.signals_data:
1690 file_name = signal_data.signal_id
1691 if post_processing:
1692 phase = phases_by_id.get(
1693 signal_data.phase_id,
1694 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1695 )
1696 file_name = f"{signal_data.signal_id} ({phase.name})"
1697 if file_format == "csv":
1698 export_io = signal_data.csv_export()
1699 zip_file.writestr(f"{file_name}.csv", export_io)
1700 elif file_format == "prestoplot":
1701 export_io = signal_data.prestoplot_export()
1702 zip_file.writestr(f"{file_name}.tab", export_io)
1703 else:
1704 raise ValueError(f"Format not found. Got: {file_format}")
1705 zip_bytes = zip_buffer.getvalue()
1706 return zip_bytes
1708 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []):
1709 if post_processing:
1710 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1711 hdf5_buffer = io.BytesIO()
1712 custom_type_float = npy.dtype(
1713 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1714 )
1715 custom_type_string = npy.dtype(
1716 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1717 )
1718 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1719 for signal_data in self.signals_data:
1720 if post_processing:
1721 phase = phases_by_id.get(
1722 signal_data.phase_id,
1723 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1724 )
1725 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})")
1726 else:
1727 signal_group = hdf5_file.create_group(signal_data.signal_id)
1728 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1729 if signal_data.data_type == "str":
1730 export_data = npy.array(
1731 list(
1732 zip(
1733 date_vector,
1734 signal_data.time_vector,
1735 signal_data.values,
1736 signal_data.forced_values,
1737 )
1738 ),
1739 dtype=custom_type_string,
1740 )
1741 else:
1742 export_data = npy.array(
1743 list(
1744 zip(
1745 date_vector,
1746 signal_data.time_vector,
1747 signal_data.values,
1748 signal_data.forced_values,
1749 )
1750 ),
1751 dtype=custom_type_float,
1752 )
1753 signal_group["data"] = export_data
1754 return hdf5_buffer.getvalue()
1757class SignalStatus(TwinPadModel):
1758 status: str = "down"
1759 reason: str = ""
1760 delay: float | None = None
1763class DigitizationFunction(TwinPadModel):
1764 bits: int | None = None
1765 min_value: float
1766 max_value: float
1767 min_raw_value: float
1768 max_raw_value: float
1771class SignalUpdate(TwinPadModel):
1772 value: float | str | bool | int | None = None
1773 forced_value: float | str | bool | int | None = None
1774 timestamp: int | None = None
1777class SignalType(str, Enum):
1778 command = "command"
1779 sensor = "sensor"
1780 external_sensor = "external_sensor"
1783SIGNALDATA_TYPES = {
1784 "int": NumericSignalData,
1785 "float": NumericSignalData,
1786 "str": StringSignalData,
1787 "bool": NumericSignalData,
1788 "epoch": NumericSignalData,
1789}
1792class Signal(GenericMongo):
1793 collection_name: ClassVar[str] = "signals"
1795 signal_id: str
1796 frequency: float
1797 unit: str | None
1798 description: str
1799 type: SignalType
1800 data_type: str
1801 precision_digits: int | None
1802 forcible: bool
1803 commandable: bool
1804 broadcastable: bool
1805 status: SignalStatus = SignalStatus()
1807 post_processing: bool = False
1808 computed_phases_ids: list[str] = []
1810 digitization_function: DigitizationFunction | None
1812 @property
1813 def device(self) -> Device:
1814 device_id = self.signal_id.split(".")[0]
1815 device = Device.get_one_by_attribute("device_id", device_id)
1816 return device
1818 @cached_property
1819 def signal_data_class(self):
1820 if self.data_type in SIGNALDATA_TYPES:
1821 return SIGNALDATA_TYPES[self.data_type]
1822 if self.data_type.startswith("enum"):
1823 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1824 raise ValueError(f"Unhandled python type: {self.data_type}")
1826 @cached_property
1827 def python_type(self):
1828 if self.data_type in TYPES:
1829 return TYPES[self.data_type]
1830 if self.data_type.startswith("enum"):
1831 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1832 return Literal[*choices]
1833 raise ValueError(f"Unhandled python type: {self.data_type}")
1835 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1836 command = Command(
1837 sent_at=time.time(),
1838 command_type="Signal command",
1839 user_id=current_user.id,
1840 )
1842 has_input_error = False
1843 error_message = ""
1845 if self.data_type.startswith("enum"):
1846 enum_options = get_args(self.python_type)
1848 if update_dict.value is not None and update_dict.value not in enum_options:
1849 has_input_error = True
1850 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1851 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1852 has_input_error = True
1853 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1854 else:
1855 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1856 has_input_error = True
1857 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1858 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1859 has_input_error = True
1860 error_message += (
1861 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1862 )
1864 if has_input_error:
1865 command.response_time = 0
1866 command.succeeded = False
1867 command.description = f"Tried to modify signal {self.signal_id}"
1868 response = {"error": True, "status_code": 400, "message": error_message}
1869 else:
1870 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict)
1871 command.receive_response(response)
1873 Command.create(command)
1874 return response
1876 @classmethod
1877 def get_from_signal_id(cls, signal_id) -> Self:
1878 """Could be generic from mongo"""
1879 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1880 if not raw_value:
1881 return None
1882 del raw_value["_id"]
1883 return cls.dict_to_object(raw_value)
1885 @classmethod
1886 def get_all_ids(cls) -> list[str]:
1887 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1889 return [signal["signal_id"] for signal in cursor]
1891 @classmethod
1892 def get_all_statuses(cls) -> list[dict]:
1893 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}])
1895 return [
1896 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]}
1897 for signal in cursor
1898 ]
1900 async def number_samples(self):
1901 collection = get_signal_collection(signal_id=self.signal_id)
1902 if collection is None:
1903 return 0
1905 number_samples = collection.estimated_document_count()
1907 number_samples_async_collection = await get_async_collection(
1908 systems_async_database, "number_samples", create=True, time_series=True
1909 )
1911 loop = asyncio.get_running_loop()
1912 loop.create_task(
1913 number_samples_async_collection.insert_one(
1914 {
1915 "timestamp": datetime.datetime.now(pytz.UTC),
1916 "signal_id": self.signal_id,
1917 "number_samples": number_samples,
1918 }
1919 )
1920 )
1922 return number_samples
1924 @classmethod
1925 def total_number_samples(cls) -> int:
1926 TwinPadActivity.get_number_samples_timeframe(0, 0, False)
1927 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1929 if number_samples_collection is None:
1930 return 0
1932 result = number_samples_collection.aggregate(
1933 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}]
1934 )
1936 result = result.to_list()
1937 if len(result) == 0:
1938 return 0
1939 return result[0]["amount"]
1941 def sample_datasize(self):
1942 return signals_database.command("collstats", self.signal_id)["size"]
1944 @classmethod
1945 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1946 result = cls.collection().aggregate(
1947 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1948 )
1950 return {signal["signal_id"]: signal["forcible"] for signal in result}
1953class ForcedSignal(GenericMongo):
1954 collection_name: ClassVar[str] = "forced_signals"
1956 signal_id: str
1957 forcing_user_id: str
1958 forced_at: float
1959 value: str | float
1961 def insert(self):
1962 insert_result = self.collection().find_one_and_update(
1963 {"signal_id": self.signal_id},
1964 {"$set": self.to_dict(exclude={"id"})},
1965 upsert=True,
1966 return_document=ReturnDocument.AFTER,
1967 )
1968 self.id = str(insert_result["_id"])
1969 return self.id
1971 @classmethod
1972 def can_force(cls, signal_id: str, current_user: User) -> bool:
1973 """Checks whether user can force a given signal.
1975 :param signal_id: Signal ID of the signal to force
1976 :type signal_id: str
1977 :param current_user: Current user
1978 :type current_user: User
1979 :return: False if the signal was forced by someone else than the user, True otherwise
1980 :rtype: bool
1981 """
1982 forced_signal = cls.get_one_by_attribute("signal_id", signal_id)
1983 if forced_signal is not None:
1984 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
1985 return False
1986 return True
1989class ServicesStatus(TwinPadModel):
1990 backend: str
1991 cloud_broker: str
1992 time_series_database: str
1993 signal_storage: str
1994 heartbeat_storage: str
1995 data_analyzer: str
1997 @classmethod
1998 def check(cls) -> Self:
1999 return cls(
2000 cloud_broker=ping(RABBITMQ_HOST),
2001 backend="up",
2002 time_series_database=ping(MONGO_HOST),
2003 signal_storage=ping(SIGNAL_STORAGE_HOST),
2004 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
2005 data_analyzer=ping(DATA_ANALYZER_HOST),
2006 )
2009def ping(host):
2010 try:
2011 if ping3.ping(host, timeout=0.8):
2012 return "up"
2013 except PermissionError:
2014 pass
2015 return "down"
2018class Event(GenericMongo):
2019 collection_name: ClassVar[str] = "events"
2021 name: str
2022 timestamp: float
2023 event_rule_id: str
2025 @computed_field
2026 @cached_property
2027 def event_rule(self) -> "EventRule":
2028 return EventRule.get_from_id(self.event_rule_id)
2030 @classmethod
2031 def dict_to_object(cls, dict_):
2032 """Refine to convert timestamp to datetime for mongodb."""
2033 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
2034 return super().dict_to_object(dict_)
2037class TwinPadActivity(GenericMongo):
2038 timestamp: float
2039 amount: int
2041 @classmethod
2042 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
2043 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2044 number_events_collection = get_collection(systems_database, "number_events")
2045 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
2046 items = []
2047 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2048 if number_events_collection is None or recompute_amount:
2049 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
2050 number_events_collection.delete_many({})
2051 first_event = events_collection.find_one(sort={"timestamp": 1})
2052 if first_event is None:
2053 return items
2054 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
2055 tzinfo=pytz.UTC
2056 )
2057 while last_computed_day < TODAY:
2058 day_nb_events = events_collection.count_documents(
2059 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2060 )
2061 if day_nb_events > 0:
2062 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
2063 last_computed_day += ONE_DAY_OFFSET
2064 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
2065 if number_events_today > 0:
2066 number_events_collection.delete_many({"timestamp": TODAY})
2067 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
2068 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2069 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2070 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2071 for day in number_events:
2072 day["timestamp"] = day["timestamp"].timestamp()
2073 items.append(cls.mongo_dict_to_object(day))
2074 return items
2076 @classmethod
2077 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2078 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2079 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
2080 signals_number_samples_collection = get_collection(
2081 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
2082 )
2083 items = []
2084 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2085 if number_samples_collection is None or recompute_amount:
2086 number_samples_collection = get_collection(
2087 systems_database, "number_received_samples", create=True, time_series=True
2088 )
2089 number_samples_collection.delete_many({})
2090 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
2091 if first_sample is None:
2092 return items
2093 # compute from day of first found event
2094 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
2095 tzinfo=pytz.UTC
2096 )
2097 while last_computed_day < TODAY:
2098 number_samples_request = signals_number_samples_collection.aggregate(
2099 [
2100 {
2101 "$match": {
2102 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
2103 }
2104 },
2105 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2106 ]
2107 ).to_list()
2108 if len(number_samples_request) == 0:
2109 number_samples = 0
2110 else:
2111 number_samples = number_samples_request[0].get("number_samples", 0)
2112 if number_samples > 0:
2113 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
2114 last_computed_day += ONE_DAY_OFFSET
2115 number_samples_request = signals_number_samples_collection.aggregate(
2116 [
2117 {"$match": {"timestamp": {"$gte": TODAY}}},
2118 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2119 ]
2120 ).to_list()
2121 if len(number_samples_request) == 0:
2122 number_samples_today = 0
2123 else:
2124 number_samples_today = number_samples_request[0].get("number_samples", 0)
2125 if number_samples_today > 0:
2126 number_samples_collection.delete_many({"timestamp": TODAY})
2127 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
2128 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2129 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2130 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2131 for day in number_events:
2132 day["timestamp"] = day["timestamp"].timestamp()
2133 items.append(cls.mongo_dict_to_object(day))
2134 return items
2136 @classmethod
2137 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2138 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2139 number_commands_collection = get_collection(systems_database, "number_commands")
2140 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
2141 items = []
2142 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2143 if number_commands_collection is None or recompute_amount:
2144 number_commands_collection = get_collection(
2145 systems_database, "number_commands", create=True, time_series=True
2146 )
2147 number_commands_collection.delete_many({})
2148 first_command = commands_collection.find_one(sort={"timestamp": 1})
2149 if first_command is None:
2150 return items
2151 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
2152 tzinfo=pytz.UTC
2153 )
2154 while last_computed_day < TODAY:
2155 day_nb_commands = commands_collection.count_documents(
2156 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2157 )
2158 if day_nb_commands > 0:
2159 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
2160 last_computed_day += ONE_DAY_OFFSET
2161 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
2162 if number_commands_today > 0:
2163 number_commands_collection.delete_many({"timestamp": TODAY})
2164 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
2165 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2166 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2167 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2168 for day in number_commands:
2169 day["timestamp"] = day["timestamp"].timestamp()
2170 items.append(cls.mongo_dict_to_object(day))
2171 return items
2174class EventRule(GenericMongo):
2175 collection_name: ClassVar[str] = "event_rules"
2177 name: str
2178 formula: str
2179 variables: list[str]
2181 @computed_field
2182 @cached_property
2183 def number_events(self) -> int:
2184 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
2187class Company(GenericMongo):
2188 collection_name: ClassVar[str] = "companies"
2189 name: str
2192class Campaign(GenericMongo):
2193 collection_name: ClassVar[str] = "campaigns"
2195 # Properties
2196 id: str | None = None
2197 name: str
2198 description: str | None = None
2201class Phase(GenericMongo):
2202 collection_name: ClassVar[str] = "phases"
2204 # Properties
2205 id: str | None = None
2206 name: str
2207 description: str | None = None
2208 start_at: float
2209 end_at: float
2211 # FK
2212 campaign_id: MongoId
2214 @classmethod
2215 def deleteMany(cls, campaign_id):
2216 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
2217 return delete_phases
2220class CustomViewCreation(GenericMongo):
2221 collection_name: ClassVar[str] = "custom_views"
2223 name: str
2224 configuration: list
2227class CustomView(CustomViewCreation):
2228 # Properties
2229 id: str | None = None
2231 # Foreign Key
2232 user_id: str
2235CustomViewUpdate = create_update_model(CustomView)
2238class Video(GenericMongo):
2239 collection_name: ClassVar[str] = "videos"
2241 # Properties
2242 name: str
2243 ip_addr: str
2244 username: str | None = None
2245 password: str | None = None
2247 # Methods
2248 @classmethod
2249 def get_all(cls, sort_by="_id") -> list[Self]:
2250 items = []
2251 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
2252 items.append(cls.mongo_dict_to_object(dict_))
2253 return items
2255 @classmethod
2256 def get_video(cls, camera_id: ObjectId):
2257 camera = cls.get_from_id(camera_id)
2258 if camera is not None:
2259 return camera.name
2260 return None
2263class Command(GenericMongo):
2264 collection_name: ClassVar[str] = "commands"
2266 # Properties
2267 timestamp: datetime.datetime = None
2268 sent_at: float
2269 response_time: float = 0.0
2270 command_type: str
2271 description: str = ""
2272 succeeded: bool = False
2274 # Foreign key
2275 user_id: str
2277 @classmethod
2278 def collection(cls):
2279 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
2281 @classmethod
2282 def create(cls, command: Self):
2283 command = cls(
2284 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
2285 sent_at=command.sent_at,
2286 response_time=command.response_time,
2287 command_type=command.command_type,
2288 description=command.description,
2289 succeeded=command.succeeded,
2290 user_id=command.user_id,
2291 )
2292 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
2293 if new_command is None:
2294 return None
2295 return {"command_id": str(new_command.inserted_id)}
2297 def receive_response(self, response: dict):
2298 self.response_time = time.time() - self.sent_at
2299 self.succeeded = response.get("error", True) is False
2300 if self.description == "":
2301 self.description += response.get("message", "").rstrip()
2304class SignalsPresetCreation(GenericMongo):
2305 name: str
2306 signal_ids: list[str]
2309class SignalsPreset(SignalsPresetCreation):
2310 collection_name: ClassVar[str] = "signals_presets"
2312 user_id: str
2314 @classmethod
2315 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
2316 signals_preset = cls(
2317 user_id=user_id,
2318 name=signals_preset.name,
2319 signal_ids=signals_preset.signal_ids,
2320 )
2322 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
2324 return str(new_signal_preset.inserted_id)
2327SignalsPresetUpdate = create_update_model(SignalsPreset)
2330class LineStyle(str, Enum):
2331 solid = "solid"
2332 dotted = "dotted"
2333 dashed = "dashed"
2336class SignalAppearance:
2337 value_color: str
2338 forced_value_color: str
2341class GraphThemeCreation(GenericMongo):
2342 collection_name: ClassVar[str] = "graph_themes"
2344 name: str
2345 signal_id: str
2346 value_color: str = ""
2347 forced_value_color: str = ""
2348 value_line_style: LineStyle = LineStyle.solid
2349 forced_value_line_style: LineStyle = LineStyle.solid
2350 private: bool = True
2353class PublicGraphTheme(GraphThemeCreation):
2354 created_by_user: bool
2355 in_user_library: bool
2356 active_for_user: bool
2358 _current_user_id: str = ""
2360 @classproperty
2361 def custom_pipeline_steps(cls) -> dict[str, list]:
2362 return {
2363 "created_by_user": [
2364 {
2365 "$addFields": {
2366 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
2367 }
2368 }
2369 ],
2370 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
2371 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
2372 ],
2373 "in_user_library": [
2374 {
2375 "$addFields": {
2376 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2377 }
2378 }
2379 ],
2380 "active_for_user": [
2381 {
2382 "$addFields": {
2383 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2384 }
2385 }
2386 ],
2387 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2388 "active": [
2389 {
2390 "$addFields": {
2391 "active": "$$REMOVE",
2392 }
2393 }
2394 ],
2395 "creator_id": [
2396 {
2397 "$addFields": {
2398 "creator_id": "$$REMOVE",
2399 }
2400 }
2401 ],
2402 }
2404 @classmethod
2405 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2406 cls._current_user_id = user_id
2407 return super().response_from_query(query)
2409 @classmethod
2410 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2411 query.in_user_library = "true"
2412 return cls.response_from_query(query, user_id)
2414 @classmethod
2415 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2416 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2418 @classmethod
2419 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2420 cls._current_user_id = user_id
2421 return super().get_by_attribute(attribute_name, attribute_value)
2423 @classmethod
2424 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2425 cls._current_user_id = user_id
2426 return super().get_one_by_attribute(attribute_name, attribute_value)
2428 @classmethod
2429 def get_all(cls, sort_by: str, user_id: str):
2430 cls._current_user_id = user_id
2431 return super().get_all(sort_by)
2433 @classmethod
2434 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2435 pipeline = [
2436 {
2437 "$match": {
2438 "active": {"$eq": user_id},
2439 "signal_id": {"$in": signal_ids},
2440 }
2441 },
2442 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2443 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2444 {
2445 "$project": {
2446 "_id": 0,
2447 "signal_id": 1,
2448 "value_color": 1,
2449 "forced_value_color": 1,
2450 "value_line_style": 1,
2451 "forced_value_line_style": 1,
2452 }
2453 },
2454 ]
2456 result = {}
2458 cursor = cls.collection().aggregate(pipeline)
2459 for document in cursor:
2460 signal_id = document["signal_id"]
2461 del document["signal_id"]
2462 result[signal_id] = document
2464 return result
2467GraphThemeUpdate = create_update_model(PublicGraphTheme)
2470class PrivateGraphTheme(GraphThemeCreation):
2471 # private
2472 creator_id: str
2473 in_library: list[str]
2474 active: list[str]
2476 @classmethod
2477 def create(
2478 cls,
2479 creator_id: str,
2480 name: str,
2481 signal_id: str,
2482 value_color: str,
2483 forced_value_color: str,
2484 value_line_style: LineStyle,
2485 forced_value_line_style: LineStyle,
2486 private: bool,
2487 ):
2488 color_setting = cls(
2489 creator_id=creator_id,
2490 name=name,
2491 signal_id=signal_id,
2492 value_color=value_color,
2493 forced_value_color=forced_value_color,
2494 value_line_style=value_line_style,
2495 forced_value_line_style=forced_value_line_style,
2496 private=private,
2497 in_library=[creator_id],
2498 active=[creator_id],
2499 )
2501 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2502 color_setting.id = str(new_color_setting.inserted_id)
2503 return color_setting
2505 def update(self, update_dict: dict, user_id: str):
2506 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2507 if in_user_lib and user_id not in self.in_library:
2508 self.in_library.append(user_id)
2509 elif not in_user_lib and user_id in self.in_library:
2510 self.in_library.remove(user_id)
2511 update_dict["in_library"] = self.in_library
2512 del update_dict["in_user_library"]
2514 if (active_for_user := update_dict.get("active_for_user")) is not None:
2515 if active_for_user and user_id not in self.active:
2516 self.active.append(user_id)
2517 elif not active_for_user and user_id in self.active:
2518 self.active.remove(user_id)
2519 update_dict["active"] = self.active
2520 del update_dict["active_for_user"]
2522 if update_dict.get("created_by_user") is not None:
2523 del update_dict["created_by_user"]
2525 self.collection().find_one_and_update(
2526 {"_id": ObjectId(self.id)},
2527 {"$set": update_dict},
2528 )
2530 return {}
2533class DeviceStatus(str, Enum):
2534 started = "started"
2535 running = "running"
2536 created = "created"
2537 exited = "exited"
2538 restarting = "restarting"
2541class DeviceUpdateFromDeployer(BaseModel):
2542 status: DeviceStatus
2545class DeviceFromDeployerCreation(BaseModel):
2546 name: str
2547 description: str
2550class DeviceFromDeployer(DeviceFromDeployerCreation):
2551 status: DeviceStatus
2552 device_id: DeviceId
2553 logs: str = ""
2556class DeviceDeployer(GenericMongo):
2557 collection_name: ClassVar[str] = "device_deployers"
2558 url: HttpUrl
2560 def endpoint_url(self, endpoint):
2561 return f"{str(self.url).rstrip('/')}/{endpoint}"
2563 def devices(self) -> list[DeviceFromDeployer]:
2564 devices = []
2565 try:
2566 response = requests.get(self.endpoint_url("devices"))
2567 except requests.exceptions.ConnectionError:
2568 logger.info("connection error")
2569 return None
2570 if response.status_code != 200:
2571 return None
2572 for device_dict in response.json()["devices"]:
2573 devices.append(
2574 DeviceFromDeployer(
2575 device_id=device_dict["device_id"],
2576 name=device_dict["container_name"],
2577 description="desc",
2578 status=device_dict["status"],
2579 logs=device_dict["logs"],
2580 )
2581 )
2582 return devices
2584 def get_device(self, device_id: DeviceId):
2585 try:
2586 response = requests.get(self.endpoint_url(f"devices/{device_id}"))
2587 except requests.exceptions.ConnectionError:
2588 return None
2589 if response.status_code != 200:
2590 return None
2591 device_dict = response.json()
2592 return DeviceFromDeployer(
2593 device_id=device_dict["device_id"],
2594 name=device_dict["container_name"],
2595 description="desc",
2596 status=device_dict["status"],
2597 logs=device_dict["logs"],
2598 )
2600 def create_device(self, device: DeviceFromDeployer) -> Device | None:
2601 try:
2602 response = requests.post(self.endpoint_url("devices"), json={"name": device.name})
2603 except requests.exceptions.ConnectionError:
2604 return None
2606 if response.status_code != 201:
2607 return None
2609 device_dict = response.json()
2610 return DeviceFromDeployer(
2611 device_id=device_dict["device_id"],
2612 name="",
2613 description="desc",
2614 status=device_dict["status"],
2615 )
2617 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None:
2618 try:
2619 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump())
2620 except requests.exceptions.ConnectionError:
2621 return None
2623 if response.status_code != 200:
2624 return None
2626 device_dict = response.json()
2627 return Device(
2628 device_id=device_dict["device_id"],
2629 name="",
2630 description="desc",
2631 pid={},
2632 petri_network={},
2633 modes=[],
2634 status=device_dict["status"],
2635 )
2637 def delete_device(self, device_id: DeviceId) -> DeleteInfo:
2638 try:
2639 response = requests.delete(self.endpoint_url(f"devices/{device_id}"))
2640 except requests.exceptions.ConnectionError:
2641 return DeleteInfo(is_deleted=False, detail="Connection to deployer error")
2642 if response.status_code not in [200, 202, 204]:
2643 return DeleteInfo(is_deleted=False, detail=response.text)
2645 return DeleteInfo(is_deleted=True, detail="")
2648DeviceDeployerUpdate = create_update_model(DeviceDeployer)