Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%
1370 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:54 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:54 +0000
1from functools import cached_property
2import os
3import re
4import io
5import time
6import csv
7from typing import Self, ClassVar, Any, Literal, get_args, Annotated
8import datetime
9import math
10import bisect
11from enum import Enum
12import logging
13import copy
14import asyncio
15import requests
17import zipfile
18import ping3
19import pytz
20from bson.objectid import ObjectId
21from pymongo import ASCENDING, ReturnDocument
22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator
23import numpy as npy
24import lttb
25import h5py
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 get_signal_collections_batch,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 signals_async_database,
36 devices_states_database,
37)
38from twinpad_backend.responses import ListResponse
39from twinpad_backend.messages import RabbitMQClient
40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm
42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"]
44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"]
45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"]
48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
55NUMBER_SAMPLES_DATABASE_UPDATE = 120
57logger = logging.getLogger("uvicorn.error")
60class DeleteInfo(BaseModel):
61 is_deleted: bool
62 detail: str
65class classproperty:
66 """
67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
68 Found here: https://stackoverflow.com/a/76301341
69 """
71 def __init__(self, func):
72 self.fget = func
74 def __get__(self, _, owner):
75 return self.fget(owner)
78def create_update_model(model):
79 fields = {}
81 for field_name, field_annotation in model.model_fields.items():
82 if field_name != "id":
83 fields[field_name] = (field_annotation.annotation | None, None)
85 query_name = model.__name__ + "Update"
86 return create_model(query_name, **fields)
89def get_utc_date_from_timestamp(timestamp: float):
90 return datetime.datetime.fromtimestamp(timestamp).isoformat()
93def downsample_list(time_vector: list, values: list, max_number_samples: int):
94 if len(time_vector) < max_number_samples:
95 return time_vector, values
97 time_vector_copy = copy.deepcopy(time_vector)
98 values_copy = copy.deepcopy(values)
100 none_group_bounds = []
101 none_group_index = -1
102 index = -1
103 # LTTB doesn't handle None values so remove them
104 while values_copy.count(None) > 0:
105 # Store bounds of None value groups so we can insert them back after the downsampling
106 if (new_index := values_copy.index(None)) != index:
107 none_group_bounds.append([time_vector_copy.pop(new_index)])
108 none_group_index += 1
109 elif len(none_group_bounds[none_group_index]) < 2:
110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
111 else:
112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
113 values_copy.pop(new_index)
114 index = new_index
115 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
117 try:
118 values_array = npy.array([time_vector_copy, values_copy]).T
119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
121 new_time_vector = interpolated_values[:, 0].tolist()
122 new_values = interpolated_values[:, 1].tolist()
123 except ValueError:
124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
128 return new_time_vector, new_values_nan_to_none
130 # insert back None values at the correct timestamps
131 for none_group in none_group_bounds:
132 start_index = npy.searchsorted(new_time_vector, none_group[0])
133 new_time_vector[start_index:start_index] = none_group
134 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
136 return new_time_vector, new_values
139def is_of_type(value, wanted_type):
140 if wanted_type is float:
141 return isinstance(value, (int, float))
142 return isinstance(value, wanted_type)
145# Models
146class TwinPadModel(BaseModel):
147 @classmethod
148 def dict_to_object(cls, dict_):
149 return cls.model_validate(dict_)
151 def to_dict(self, exclude=None):
152 dict_ = self.model_dump(exclude=exclude, mode="json")
153 return dict_
156def validate_mongo_id(v):
157 if not ObjectId.is_valid(v):
158 raise ValueError("Invalid MongoDB id")
159 return str(v)
162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)]
165def validate_12_hex(v: str) -> str:
166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v):
167 raise ValueError("ID must be a 12-character hexadecimal string")
168 return v
171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)]
174class GenericMongo(TwinPadModel):
175 id: MongoId | None = None
176 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
178 @classmethod
179 def collection(cls):
180 return get_collection(systems_database, cls.collection_name, create=True)
182 @classmethod
183 def response_from_query(cls, query) -> ListResponse[Self]:
184 request_filters = query.mongodb_filter()
185 items = []
187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
188 sort_dict = {}
189 for sort in query.sort_by.split(","):
190 if ":" in sort:
191 sort_field, sort_order = sort.split(":")
192 sort_order = int(sort_order)
193 else:
194 sort_field = sort
195 sort_order = 1
196 sort_dict[sort_field] = sort_order
198 collection = get_collection(systems_database, cls.collection_name, create=True)
199 total = collection.count_documents(request_filters)
201 pipeline = []
202 added_properties = []
203 if "$and" in request_filters:
204 for request_filter in request_filters["$and"]:
205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
206 if filtered_property in request_filter:
207 pipeline.extend(pipeline_steps)
208 added_properties.append(filtered_property)
209 else:
210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
211 if filtered_property in request_filters:
212 pipeline.extend(pipeline_steps)
213 added_properties.append(filtered_property)
214 pipeline.append({"$match": request_filters})
216 for sort_field in sort_dict.keys():
217 if sort_field in cls.custom_pipeline_steps:
218 pipeline.extend(cls.custom_pipeline_steps[sort_field])
219 added_properties.append(sort_field)
220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
222 if (query.limit is not None) and (query.limit != 0):
223 pipeline.append({"$limit": query.limit})
225 for filtered_property, step in cls.custom_pipeline_steps.items():
226 if filtered_property not in added_properties:
227 pipeline.extend(step)
229 cursor = collection.aggregate(pipeline)
231 for item_dict in cursor:
232 items.append(cls.mongo_dict_to_object(item_dict))
234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
236 @classmethod
237 def get_from_id(cls, item_id) -> Self | None:
238 return cls.get_one_by_attribute("_id", ObjectId(item_id))
240 @classmethod
241 def mongo_dict_to_object(cls, mongo_dict):
242 mongo_dict["id"] = str(mongo_dict["_id"])
243 del mongo_dict["_id"]
244 return cls.dict_to_object(mongo_dict)
246 @classmethod
247 def get_by_attribute(cls, attribute_name: str, attribute_value):
248 """Returns all items that match the attribute with value."""
249 pipeline = []
250 if attribute_name in cls.custom_pipeline_steps:
251 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
252 pipeline.append({"$match": {attribute_name: attribute_value}})
253 for key, step in cls.custom_pipeline_steps.items():
254 if key != attribute_name:
255 pipeline.extend(step)
256 items = cls.collection().aggregate(pipeline)
257 if items is None:
258 return None
259 return [cls.mongo_dict_to_object(d) for d in items]
261 @classmethod
262 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
263 pipeline = []
264 if attribute_name in cls.custom_pipeline_steps:
265 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
266 pipeline.append({"$match": {attribute_name: attribute_value}})
267 pipeline.append({"$limit": 1})
268 for key, step in cls.custom_pipeline_steps.items():
269 if key != attribute_name:
270 pipeline.extend(step)
271 items = cls.collection().aggregate(pipeline).to_list()
272 if len(items) == 0:
273 return None
274 return cls.mongo_dict_to_object(items[0])
276 @classmethod
277 def get_all(cls, sort_by="_id") -> list[Self]:
278 items = []
279 pipeline = []
280 if sort_by in cls.custom_pipeline_steps:
281 pipeline.extend(cls.custom_pipeline_steps[sort_by])
282 pipeline.append({"$sort": {sort_by: ASCENDING}})
283 for key, step in cls.custom_pipeline_steps.items():
284 if key != sort_by:
285 pipeline.extend(step)
286 for dict_ in cls.collection().aggregate(pipeline):
287 items.append(cls.mongo_dict_to_object(dict_))
288 return items
290 @classmethod
291 def get_number_documents(cls):
292 collection = get_collection(systems_database, cls.collection_name)
293 if collection is None:
294 return 0
295 return collection.count_documents(
296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]}
297 )
299 def insert(self):
300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"}))
301 self.id = str(insert_result.inserted_id)
302 return self.id
304 def update(self, update_dict):
305 for key, value in update_dict.items():
306 setattr(self, key, value)
307 self.collection().find_one_and_update(
308 {"_id": ObjectId(self.id)},
309 {"$set": update_dict},
310 return_document=ReturnDocument.AFTER,
311 )
313 return self
315 def delete(self):
316 result = self.collection().delete_one({"_id": ObjectId(self.id)})
317 return result.deleted_count > 0
320class User(GenericMongo):
321 collection_name: ClassVar[str] = "users"
323 firstname: str
324 lastname: str
325 email: str
326 password: str
327 is_active: bool | None = False
328 is_admin: bool | None = False
329 is_connected: bool | None = False
330 company_id: str | None = None
332 def to_dict(self, exclude: set = set()):
333 exclude.add("password")
334 return GenericMongo.to_dict(self, exclude=exclude)
336 @classmethod
337 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
338 users = cls.get_all()
339 if not users:
340 is_admin = True
341 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
342 user_collection = get_collection(systems_database, "users", create=True)
343 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
344 if new_user is None:
345 return None
346 return {"user_id": str(new_user.inserted_id)}
348 @classmethod
349 def update_info(cls, user: "UserUpdate", user_id: str):
350 updated_user = cls.collection().find_one_and_update(
351 {"_id": ObjectId(user_id)},
352 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
353 return_document=ReturnDocument.AFTER,
354 )
355 updated_user["id"] = str(updated_user["_id"])
356 del (updated_user["_id"], updated_user["is_connected"])
357 return cls(**updated_user)
360UserUpdate = create_update_model(User)
363class Mode(TwinPadModel):
364 mode_id: int
365 name: str
366 frequency_multiplier: float
367 min_frequency: float
370class DeviceUpdate(TwinPadModel):
371 mode_id: int
374class Device(GenericMongo):
375 collection_name: ClassVar[str] = "devices"
377 device_id: DeviceId
378 name: str
379 description: str = ""
380 modes: list[Mode]
381 current_mode_id: int | None = None
382 last_ping: float | None = None
383 petri_network: Any
384 pid: Any
385 load: float | None = None
386 tokens: list[int] = Field(default_factory=list)
387 status: str
389 async def change_mode(self, update_dict, current_user: User):
390 has_error = False
392 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
393 has_error = True
394 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
395 elif self.current_mode_id is not None:
396 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
397 else:
398 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
399 command = Command(
400 sent_at=time.time(),
401 command_type="Mode change",
402 description=description,
403 user_id=current_user.id,
404 )
406 if has_error:
407 command.response_time = 0
408 command.succeeded = False
409 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
410 else:
411 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id)
412 command.receive_response(response)
414 Command.create(command)
415 return response
417 @classmethod
418 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
419 devices_by_id = {}
420 for signal_id in signal_ids:
421 device_id = signal_id.split(".")[0]
422 if device_id not in devices_by_id:
423 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
424 return devices_by_id
427class DeviceSetup(GenericMongo):
428 collection_name: ClassVar[str] = "device_setups"
430 device_ids: list[str]
431 active: bool = False
432 variable_mapping: dict[str, str]
435DeviceSetupUpdate = create_update_model(DeviceSetup)
438class DeviceState(GenericMongo):
439 collection_name: ClassVar[str] = "devices_states"
441 timestamp: float
442 mode: str | None = None
443 load: float | None = None
444 tokens: list[int] = Field(default_factory=list)
445 modified_properties: list[str] = Field(default_factory=list)
447 @classmethod
448 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]:
449 req_filter = query.mongodb_filter()
450 items = []
451 if ":" in query.sort_by:
452 sort_field, sort_order = query.sort_by.split(":")
453 sort_order = int(sort_order)
454 else:
455 sort_field = query.sort_by
456 sort_order = 1
457 collection = get_collection(devices_states_database, device_id)
458 if collection is None:
459 total = 0
460 cursor = []
461 else:
462 total = collection.count_documents(req_filter)
463 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
464 if (query.limit is not None) and (query.limit != 0):
465 cursor = cursor.limit(query.limit)
466 for item_dict in cursor:
467 items.append(
468 cls(
469 timestamp=item_dict.get("precise_timestamp"),
470 mode=item_dict.get("mode", None),
471 load=item_dict.get("load", None),
472 tokens=item_dict.get("tokens", Field(default_factory=list)),
473 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
474 )
475 )
476 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
479class SignalSample(TwinPadModel):
480 signal_id: str
481 timestamp: float
482 value: float | int | str | bool | None
483 forced_value: float | int | str | bool | None = None
485 @classmethod
486 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
488 collection = get_signal_collection(signal_id)
489 if collection is None:
490 return None
492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
494 bucket = get_signal_collection(f"system.buckets.{signal_id}")
495 first_bucket = None
496 if bucket is not None:
497 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
498 if first_bucket is not None:
499 sample_data = collection.find_one(
500 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
501 )
502 else:
503 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
505 if sample_data is None:
506 return None
508 timestamp = sample_data["precise_timestamp"]
510 return cls(
511 signal_id=signal_id,
512 timestamp=timestamp,
513 value=sample_data.get("value", None),
514 forced_value=sample_data.get("forced_value", None),
515 )
517 @classmethod
518 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
519 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
521 @classmethod
522 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
523 last_value_collection = get_signal_collection("last_values", True)
524 signals_collection = get_collection(systems_database, "signals", create=True)
526 sample_data = last_value_collection.find_one({"signal_id": signal_id}, sort={"precise_timestamp": -1})
528 # If there is no data, check if the signal's type is anything other than float, as they don't send duplicate values
529 if sample_data is None:
530 if signals_collection.count_documents({"status.status": "up", "signal_id": signal_id}) < 1:
531 return None
533 # Same workaround as above function, very effective to narrow down big sets of data
534 bucket = get_signal_collection(f"system.buckets.{signal_id}")
535 last_bucket = None
536 if bucket is not None:
537 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
538 signal_collection = get_signal_collection(signal_id)
539 if last_bucket is not None and signal_collection is not None:
540 sample_data = signal_collection.find_one(
541 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
542 sort={"precise_timestamp": -1},
543 )
544 else:
545 sample_data = signal_collection.find_one({}, sort={"precise_timestamp": -1})
547 if sample_data is None:
548 return None
550 timestamp = sample_data.get("precise_timestamp")
551 # Align the timestamp with the device's last ping, cannot align with current time to avoid false reports if device is down
552 if device is None:
553 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
554 if device is not None and device.last_ping is not None:
555 if timestamp is None:
556 timestamp = device.last_ping
557 else:
558 timestamp = max(timestamp, device.last_ping)
559 else:
560 timestamp = sample_data.get("precise_timestamp")
562 return cls(
563 signal_id=signal_id,
564 timestamp=timestamp,
565 value=sample_data.get("value", None),
566 forced_value=sample_data.get("forced_value", None),
567 )
569 @classmethod
570 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
571 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
572 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
575class SignalData(TwinPadModel):
576 signal_id: str
577 forcible: bool = True
578 time_vector: list[float]
579 values: list[float | int | str | None]
580 forced_values: list[float | int | str | None]
582 data_start: float | None = None
583 data_end: float | None = None
585 number_samples: int = 0
586 number_samples_db: int = 0
588 db_query_time: float = 0.0
589 init_time: float = 0.0
590 data_processing_time: float = 0.0
592 phase_id: str | None = None
594 @classmethod
595 def get_from_signal_id(
596 cls,
597 signal_id: str,
598 min_timestamp: float = None,
599 max_timestamp: float = None,
600 window_min_timestamp: float = None,
601 window_max_timestamp: float = None,
602 interpolate_bounds: bool = True,
603 max_documents: int = None,
604 collection=None,
605 ) -> Self:
607 now = time.time()
609 req_signal = {}
610 if min_timestamp is not None:
611 req_signal.setdefault("timestamp", {})
612 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
613 if max_timestamp is not None:
614 req_signal.setdefault("timestamp", {})
615 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
617 if collection is None:
618 collection = get_signal_collection(signal_id)
619 if collection is None:
620 return cls(
621 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
622 )
624 db_req_start = time.time()
626 sort_step = {"$sort": {"precise_timestamp": 1}}
627 number_results = collection.count_documents(req_signal)
629 pipeline = []
630 if req_signal:
631 pipeline.append({"$match": req_signal}) # Filter data if needed
633 pipeline.extend(
634 [
635 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
636 sort_step,
637 ]
638 )
640 if max_documents is not None and max_documents < number_results:
641 unsampling_ratio = math.ceil(number_results / max_documents)
642 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
643 pipeline.extend(
644 [
645 {
646 "$setWindowFields": {
647 "sortBy": {"precise_timestamp": 1},
648 "output": {"index": {"$documentNumber": {}}},
649 }
650 },
651 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
652 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
653 {"$replaceRoot": {"newRoot": "$doc"}},
654 {"$unset": ["index", "group_id"]},
655 {"$sort": {"precise_timestamp": 1}},
656 ]
657 )
659 # logger.info(f"pipeline: %s", str(pipeline))
660 cursor = collection.aggregate(pipeline)
661 db_req_time = time.time() - db_req_start
663 init_time = time.time()
665 results = cursor.to_list()
666 time_vector = []
667 values = []
668 forced_values = []
669 for s in results:
670 time_vector.append(s["precise_timestamp"])
671 values.append(s.get("value", None))
672 forced_values.append(s.get("forced_value", None))
674 signal = Signal.get_from_signal_id(signal_id)
675 if signal is None:
676 return cls(
677 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
678 )
679 class_ = signal.signal_data_class
681 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
682 time_vector, values, forced_values = cls.interpolate_bounds(
683 class_,
684 collection,
685 signal_id,
686 time_vector,
687 values,
688 forced_values,
689 window_min_timestamp,
690 window_max_timestamp,
691 )
693 if values:
694 # TODO: check below. a bit strange
695 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
696 # Adding last value as it should be repeated
697 time_vector.append(now)
698 values.append(values[-1])
699 forced_values.append(forced_values[-1])
701 init_time = time.time() - init_time
703 # See line 292 for explanation
704 bucket = get_signal_collection(f"system.buckets.{signal_id}")
705 first_bucket = None
706 if bucket is not None:
707 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
708 if first_bucket is not None:
709 data_start = first_bucket["control"]["min"]["precise_timestamp"]
710 else:
711 data_start = None
713 last_bucket = None
714 if bucket is not None:
715 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
716 if last_bucket is not None:
717 data_end = last_bucket["control"]["max"]["precise_timestamp"]
718 else:
719 data_end = None
721 return class_(
722 signal_id=signal_id,
723 forcible=signal.forcible,
724 time_vector=time_vector,
725 values=values,
726 forced_values=forced_values,
727 data_start=data_start,
728 data_end=data_end,
729 number_samples=len(values),
730 number_samples_db=number_results,
731 db_query_time=db_req_time,
732 init_time=init_time,
733 )
735 @staticmethod
736 def interpolate_bounds(
737 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
738 ):
739 sample_right = None
740 # Fetching right side value & interpolation
741 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
742 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
743 sample_right = collection.find_one(
744 {
745 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
746 "value": {"$exists": True},
747 },
748 sort={"precise_timestamp": -1},
749 )
750 if sample_right:
751 if time_vector:
752 right_sd = class_(
753 signal_id=signal_id,
754 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
755 values=[values[-1], sample_right.get("value", None)],
756 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
757 )
758 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
759 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
760 else:
761 max_ts_value = sample_right.get("value", None)
762 max_ts_forced_value = sample_right.get("forced_value", None)
763 time_vector.append(window_max_timestamp)
764 values.append(max_ts_value)
765 forced_values.append(max_ts_forced_value)
767 # Fetching left side value & interpolation
768 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
769 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
770 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
771 sample_left = sample_right
772 sample_left = collection.find_one(
773 {
774 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
775 "value": {"$exists": True},
776 },
777 sort={"precise_timestamp": -1},
778 )
780 if sample_left:
781 if time_vector:
782 left_sd = class_(
783 signal_id=signal_id,
784 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
785 values=[sample_left["value"], values[0]],
786 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
787 )
788 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
789 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
790 else:
791 min_ts_value = sample_left.get("value", None)
792 min_ts_forced_value = sample_left.get("forced_value", None)
793 time_vector.insert(0, window_min_timestamp)
794 values.insert(0, min_ts_value)
795 forced_values.insert(0, min_ts_forced_value)
797 return time_vector, values, forced_values
799 def interpolate_values(self, new_time_vector: list[float]):
800 return self.interpolate(new_time_vector, self.values)
802 def interpolate_forced_values(self, new_time_vector: list[float]):
803 return self.interpolate(new_time_vector, self.forced_values)
805 def uniform_desampling(self, number_samples_max: int) -> Self:
806 data_processing_time = time.time()
807 if number_samples_max and self.number_samples > number_samples_max:
808 new_time_vector = npy.linspace(
809 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
810 ).tolist()
811 values = self.interpolate_values(new_time_vector)
812 forced_values = self.interpolate_forced_values(new_time_vector)
813 time_vector = new_time_vector
814 number_samples = len(time_vector)
815 else:
816 time_vector = self.time_vector
817 number_samples = len(self.values)
818 values = self.values[:]
819 forced_values = self.forced_values[:]
820 data_processing_time = time.time() - data_processing_time
822 return self.__class__(
823 signal_id=self.signal_id,
824 time_vector=time_vector,
825 values=values,
826 forced_values=forced_values,
827 number_samples=number_samples,
828 number_samples_db=self.number_samples,
829 data_start=self.data_start,
830 data_end=self.data_end,
831 db_query_time=self.db_query_time,
832 init_time=self.init_time,
833 data_processing_time=self.data_processing_time + data_processing_time,
834 phase_id=self.phase_id,
835 )
837 def min_max_downsampling(self, number_samples_max: int) -> Self:
838 return self.uniform_desampling(number_samples_max)
840 def interest_window_desampling(
841 self,
842 window_max_number_samples: int,
843 outside_max_number_samples: int,
844 window_min_timestamp: float | None = None,
845 window_max_timestamp: float | None = None,
846 ) -> Self:
847 """Performs a sampling in a window of interest and outside."""
849 if not self.time_vector:
850 return self
852 if window_min_timestamp is None:
853 window_min_timestamp = self.time_vector[0]
854 if window_max_timestamp is None:
855 window_max_timestamp = self.time_vector[-1]
857 data_processing_time = time.time()
859 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
860 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
862 time_vector_before = self.time_vector[:index_window_start]
863 time_vector_window = self.time_vector[index_window_start:index_window_end]
864 time_vector_after = self.time_vector[index_window_end:]
866 # Resampling window
867 if time_vector_window:
868 # Ensurring window bounds
869 if time_vector_window[0] != window_min_timestamp:
870 time_vector_window.insert(0, window_min_timestamp)
871 if time_vector_window[-1] != window_max_timestamp:
872 time_vector_window.append(window_max_timestamp)
873 else:
874 time_vector_window = [window_min_timestamp, window_max_timestamp]
876 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
877 # Resampling
878 new_window_time_vector = npy.linspace(
879 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
880 ).tolist()
881 time_vector_window = new_window_time_vector
883 # Resampling outside
884 number_samples_before = len(time_vector_before)
885 number_samples_after = len(time_vector_after)
886 if (
887 outside_max_number_samples is not None
888 and (number_samples_before + number_samples_after) > outside_max_number_samples
889 ):
890 new_number_samples_before = min(
891 number_samples_before,
892 math.ceil(
893 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
894 ),
895 )
896 new_number_samples_after = min(
897 number_samples_after,
898 math.ceil(
899 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
900 ),
901 )
902 # Adjusting numbers as math.ceil can do +1 on sum
903 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
904 if new_number_samples_before > new_number_samples_after:
905 new_number_samples_before -= 1
906 else:
907 new_number_samples_after -= 1
909 if new_number_samples_before > 0:
910 new_time_vector_before = npy.linspace(
911 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
912 ).tolist()
913 time_vector_before = new_time_vector_before
915 if new_number_samples_after > 0:
916 new_time_vector_after = npy.linspace(
917 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
918 ).tolist()[::-1]
919 time_vector_after = new_time_vector_after
921 new_time_vector = time_vector_before + time_vector_window + time_vector_after
922 values = self.interpolate_values(new_time_vector)
923 forced_values = self.interpolate_forced_values(new_time_vector)
924 number_samples = len(values)
926 data_processing_time = time.time() - data_processing_time
928 return self.__class__(
929 signal_id=self.signal_id,
930 forcible=self.forcible,
931 time_vector=new_time_vector,
932 values=values,
933 forced_values=forced_values,
934 number_samples=number_samples,
935 number_samples_db=self.number_samples,
936 data_start=self.data_start,
937 data_end=self.data_end,
938 db_query_time=self.db_query_time,
939 init_time=self.init_time,
940 data_processing_time=self.data_processing_time + data_processing_time,
941 )
943 def zero_time_vector(self, data_start: float):
944 data_processing_time = time.time()
945 if len(self.time_vector) == 0:
946 return self
947 time_vector = npy.array(self.time_vector) - data_start
948 data_processing_time = time.time() - data_processing_time
950 return self.__class__(
951 signal_id=self.signal_id,
952 time_vector=time_vector,
953 values=self.values,
954 forced_values=self.forced_values,
955 number_samples=self.number_samples,
956 number_samples_db=self.number_samples_db,
957 data_start=time_vector[0],
958 data_end=time_vector[-1],
959 db_query_time=self.db_query_time,
960 init_time=self.init_time,
961 data_processing_time=self.data_processing_time + data_processing_time,
962 )
964 def csv_export(self):
965 output = io.StringIO()
966 writer = csv.writer(output)
967 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
968 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
969 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
970 return output.getvalue().encode("utf-8")
972 def prestoplot_export(self):
973 clean_signal_id = self.signal_id.replace(".", "_")
974 if clean_signal_id[0].isnumeric():
975 clean_signal_id = "_" + clean_signal_id
977 output = io.StringIO()
978 output.write("# Encoding:\tUTF-8\n")
979 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
980 output.write("ISO8601\tnone\tnone\n")
981 output.write(f"# Description :\t{clean_signal_id}\n")
983 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
984 output.write(
985 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
986 )
987 return output.getvalue().encode("utf-8")
990class NumericSignalData(SignalData):
991 data_type: str = "float"
992 values: list[float | int | None]
993 forced_values: list[float | int | None]
995 def interpolate(self, new_time_vector: list[float], items):
996 items = [npy.nan if s is None else s for s in items]
997 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
999 def uniform_desampling(self, number_samples_max: int) -> Self:
1000 data_processing_time = time.time()
1001 if number_samples_max and self.number_samples > number_samples_max:
1002 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
1003 forced_values = self.interpolate_forced_values(time_vector)
1004 number_samples = len(time_vector)
1005 else:
1006 time_vector = self.time_vector
1007 number_samples = len(self.values)
1008 values = self.values[:]
1009 forced_values = self.forced_values[:]
1010 data_processing_time = time.time() - data_processing_time
1012 return self.__class__(
1013 signal_id=self.signal_id,
1014 time_vector=time_vector,
1015 values=values,
1016 forced_values=forced_values,
1017 number_samples=number_samples,
1018 number_samples_db=self.number_samples,
1019 data_start=self.data_start,
1020 data_end=self.data_end,
1021 db_query_time=self.db_query_time,
1022 init_time=self.init_time,
1023 data_processing_time=self.data_processing_time + data_processing_time,
1024 )
1026 def min_max_downsampling(self, number_samples_max: int) -> Self:
1027 if self.number_samples < number_samples_max:
1028 return self
1030 data_processing_time = time.time()
1032 number_bins = number_samples_max // 2
1034 time_vector = npy.array(self.time_vector, dtype=npy.float64)
1035 values = npy.array(self.values, dtype=npy.float64)
1036 forced_values = npy.array(self.forced_values, dtype=npy.float64)
1038 points_per_bin = self.number_samples // number_bins
1040 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above
1041 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for
1042 if self.number_samples - number_bins * points_per_bin > 1:
1043 points_per_bin += 1
1044 number_bins = self.number_samples // points_per_bin + 1
1045 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan)
1046 time_vector = npy.concatenate([time_vector, nan_points_to_add])
1047 values = npy.concatenate([values, nan_points_to_add])
1048 forced_values = npy.concatenate([forced_values, nan_points_to_add])
1050 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin)
1051 values_matrix = values.reshape(number_bins, points_per_bin)
1052 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin)
1054 indexes_min = npy.zeros(number_bins, dtype="int64")
1055 indexes_max = npy.zeros(number_bins, dtype="int64")
1057 for row in range(number_bins):
1058 min_value = values_matrix[row, 0]
1059 max_value = values_matrix[row, 0]
1060 for column in range(points_per_bin):
1061 if values_matrix[row, column] < min_value:
1062 min_value = values_matrix[row, column]
1063 indexes_min[row] = column
1064 elif values_matrix[row, column] > max_value:
1065 max_value = values_matrix[row, column]
1066 indexes_max[row] = column
1068 row_index = npy.repeat(npy.arange(number_bins), 2)
1069 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel()
1071 data_processing_time = time.time() - data_processing_time
1073 new_time_vector = timestamps_matrix[row_index, column_index]
1074 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector)
1075 new_values = values_matrix[row_index, column_index]
1076 new_values = npy.where(npy.isnan(new_values), None, new_values)
1077 new_forced_values = forced_values_matrix[row_index, column_index]
1078 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1080 # Make sure there are no None values for the time vector
1081 time_vector_filter = new_time_vector != None
1082 new_time_vector = new_time_vector[time_vector_filter]
1083 new_values = new_values[time_vector_filter]
1084 new_forced_values = new_forced_values[time_vector_filter]
1086 return self.__class__(
1087 signal_id=self.signal_id,
1088 time_vector=new_time_vector,
1089 values=new_values,
1090 forced_values=new_forced_values,
1091 number_samples=number_bins * 2,
1092 number_samples_db=self.number_samples_db,
1093 data_start=self.data_start,
1094 data_end=self.data_end,
1095 db_query_time=self.db_query_time,
1096 init_time=self.init_time,
1097 data_processing_time=self.data_processing_time + data_processing_time,
1098 phase_id=self.phase_id,
1099 )
1101 def interest_window_desampling(
1102 self,
1103 window_max_number_samples: int,
1104 outside_max_number_samples: int,
1105 window_min_timestamp: float | None = None,
1106 window_max_timestamp: float | None = None,
1107 ) -> Self:
1108 """Performs a sampling in a window of interest and outside."""
1110 if not self.time_vector:
1111 return self
1113 if window_min_timestamp is None:
1114 window_min_timestamp = self.time_vector[0]
1115 if window_max_timestamp is None:
1116 window_max_timestamp = self.time_vector[-1]
1118 data_processing_time = time.time()
1120 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1121 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1123 time_vector_before = self.time_vector[:index_window_start]
1124 time_vector_window = self.time_vector[index_window_start:index_window_end]
1125 time_vector_after = self.time_vector[index_window_end:]
1127 values_before = self.values[:index_window_start]
1128 values_window = self.values[index_window_start:index_window_end]
1129 values_after = self.values[index_window_end:]
1130 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1131 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1133 # Resampling window
1134 if time_vector_window:
1135 # Ensurring window bounds
1136 if time_vector_window[0] != window_min_timestamp:
1137 time_vector_window.insert(0, window_min_timestamp)
1138 values_window.insert(0, window_min_value)
1139 if time_vector_window[-1] != window_max_timestamp:
1140 time_vector_window.append(window_max_timestamp)
1141 values_window.append(window_max_value)
1142 else:
1143 time_vector_window = [window_min_timestamp, window_max_timestamp]
1144 values_window = [window_min_value, window_max_value]
1146 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
1147 # Resampling
1148 time_vector_window, values_window = downsample_list(
1149 time_vector_window, values_window, window_max_number_samples
1150 )
1152 # Resampling outside
1153 number_samples_before = len(time_vector_before)
1154 number_samples_after = len(time_vector_after)
1155 if (
1156 outside_max_number_samples is not None
1157 and (number_samples_before + number_samples_after) > outside_max_number_samples
1158 ):
1159 new_number_samples_before = min(
1160 number_samples_before,
1161 math.ceil(
1162 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1163 ),
1164 )
1165 new_number_samples_after = min(
1166 number_samples_after,
1167 math.ceil(
1168 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1169 ),
1170 )
1171 # Adjusting numbers as math.ceil can do +1 on sum
1172 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1173 if new_number_samples_before > new_number_samples_after:
1174 new_number_samples_before -= 1
1175 else:
1176 new_number_samples_after -= 1
1178 if new_number_samples_before > 0:
1179 time_vector_before, values_before = downsample_list(
1180 time_vector_before, values_before, new_number_samples_before
1181 )
1183 if new_number_samples_after > 0:
1184 time_vector_after, values_after = downsample_list(
1185 time_vector_after, values_after, new_number_samples_after
1186 )
1188 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1189 values = values_before + values_window + values_after
1190 forced_values = self.interpolate_forced_values(new_time_vector)
1191 number_samples = len(values)
1193 data_processing_time = time.time() - data_processing_time
1195 return self.__class__(
1196 signal_id=self.signal_id,
1197 time_vector=new_time_vector,
1198 values=values,
1199 forced_values=forced_values,
1200 number_samples=number_samples,
1201 number_samples_db=self.number_samples,
1202 data_start=self.data_start,
1203 data_end=self.data_end,
1204 db_query_time=self.db_query_time,
1205 init_time=self.init_time,
1206 data_processing_time=self.data_processing_time + data_processing_time,
1207 )
1210class StringSignalData(SignalData):
1211 data_type: str = "str"
1212 values: list[str | None]
1213 forced_values: list[str | None]
1215 def interpolate(self, new_time_vector: list[float], items):
1216 # Find the indices of the values in xp that are just smaller or equal to x
1217 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1218 indices = npy.clip(indices, 0, len(items) - 1)
1219 # Return the corresponding left string values from fp
1220 return [items[i] for i in indices]
1223class SignalsData(TwinPadModel):
1224 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1225 data_processing_time: float
1226 data_start: float | None
1227 data_end: float | None
1229 @classmethod
1230 def get_from_signal_ids(
1231 cls,
1232 signal_ids: list[str],
1233 min_timestamp: float = None,
1234 max_timestamp: float = None,
1235 window_min_timestamp: float = None,
1236 window_max_timestamp: float = None,
1237 interpolate_bounds: bool = True,
1238 max_documents: int = None,
1239 ) -> Self:
1240 signals_data = []
1241 data_start = None
1242 data_end = None
1243 if max_timestamp is None:
1244 max_timestamp = time.time()
1245 data_processing_time = 0.0
1247 signal_collections = get_signal_collections_batch(signal_ids)
1249 for signal_id, collection in zip(signal_ids, signal_collections):
1250 signal_data = SignalData.get_from_signal_id(
1251 signal_id=signal_id,
1252 min_timestamp=min_timestamp,
1253 max_timestamp=max_timestamp,
1254 window_min_timestamp=window_min_timestamp,
1255 window_max_timestamp=window_max_timestamp,
1256 interpolate_bounds=interpolate_bounds,
1257 max_documents=max_documents,
1258 collection=collection,
1259 )
1260 data_processing_time += signal_data.data_processing_time
1261 signals_data.append(signal_data)
1262 if signal_data.data_start is not None:
1263 if data_start is None:
1264 data_start = signal_data.data_start
1265 else:
1266 data_start = min(signal_data.data_start, data_start)
1267 if signal_data.data_end is not None:
1268 if data_end is None:
1269 data_end = signal_data.data_end
1270 else:
1271 data_end = max(signal_data.data_end, data_end)
1273 return cls(
1274 signals_data=signals_data,
1275 data_processing_time=data_processing_time,
1276 data_start=data_start,
1277 data_end=data_end,
1278 )
1280 @classmethod
1281 def get_from_phase_and_signal_ids(
1282 cls,
1283 phases: list,
1284 phase_sync_times: list[float | None],
1285 signal_ids: list[str],
1286 window_min_timestamps: list[float | None],
1287 window_max_timestamps: list[float | None],
1288 zero_time_vector: bool = True,
1289 ):
1290 signals_data: list[SignalData] = []
1291 computation_start = time.time()
1293 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip(
1294 phases, phase_sync_times, window_min_timestamps, window_max_timestamps
1295 ):
1296 min_timestamp = phase.start_at / 1000
1297 max_timestamp = phase.end_at / 1000
1299 if sync_time is None:
1300 sync_time = min_timestamp
1302 if window_max_timestamp is not None and window_min_timestamp is not None:
1303 window_length = window_max_timestamp - window_min_timestamp
1305 if window_min_timestamp != min_timestamp:
1306 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20)
1307 if window_max_timestamp != max_timestamp:
1308 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20)
1310 signal_collections = get_signal_collections_batch(signal_ids)
1312 for signal_id, collection in zip(signal_ids, signal_collections):
1313 signal_data = SignalData.get_from_signal_id(
1314 signal_id,
1315 min_timestamp,
1316 max_timestamp,
1317 window_min_timestamp,
1318 window_max_timestamp,
1319 interpolate_bounds=False,
1320 max_documents=None,
1321 collection=collection,
1322 )
1324 if len(signal_data.time_vector) == 0:
1325 continue
1327 if zero_time_vector:
1328 signal_data = signal_data.zero_time_vector(sync_time)
1329 signal_data.phase_id = phase.id
1331 signals_data.append(signal_data)
1333 return cls(
1334 signals_data=signals_data,
1335 data_processing_time=time.time() - computation_start,
1336 data_start=0,
1337 data_end=0,
1338 )
1340 def uniform_desampling(self, number_samples_max: int) -> Self:
1341 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1342 return SignalsData(
1343 signals_data=signals_data,
1344 data_processing_time=sum(s.data_processing_time for s in signals_data),
1345 data_start=self.data_start,
1346 data_end=self.data_end,
1347 )
1349 def min_max_downsampling(self, number_samples_max: int) -> Self:
1350 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data]
1351 return SignalsData(
1352 signals_data=signals_data,
1353 data_processing_time=sum(s.data_processing_time for s in signals_data),
1354 data_start=self.data_start,
1355 data_end=self.data_end,
1356 )
1358 def interest_window_desampling(
1359 self,
1360 window_max_number_samples: int,
1361 outside_max_number_samples: int,
1362 window_min_timestamp: float = None,
1363 window_max_timestamp: float = None,
1364 ) -> Self:
1365 signals_data = [
1366 s.interest_window_desampling(
1367 window_max_number_samples=window_max_number_samples,
1368 outside_max_number_samples=outside_max_number_samples,
1369 window_min_timestamp=window_min_timestamp,
1370 window_max_timestamp=window_max_timestamp,
1371 )
1372 for s in self.signals_data
1373 ]
1375 return SignalsData(
1376 signals_data=signals_data,
1377 data_processing_time=sum(s.data_processing_time for s in signals_data),
1378 data_start=self.data_start,
1379 data_end=self.data_end,
1380 )
1382 def zero_time_vector(self, data_start: float):
1383 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data]
1384 return SignalsData(
1385 signals_data=signals_data,
1386 data_processing_time=sum(s.data_processing_time for s in signals_data),
1387 data_start=0,
1388 data_end=max([s.data_end for s in signals_data]),
1389 )
1391 @classmethod
1392 async def apply_single_function(
1393 cls,
1394 phase,
1395 base_signal_id: str,
1396 function: SINGLE_POST_PROCESSING_FUNCTION,
1397 window_min_timestamp: float = None,
1398 window_max_timestamp: float = None,
1399 ):
1400 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}"
1402 processed_result_signal = Signal.get_from_signal_id(signal_id)
1403 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids:
1404 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp)
1406 signals_data = cls.get_from_phase_and_signal_ids(
1407 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False
1408 )
1410 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0:
1411 return None
1413 new_values = None
1414 new_forced_values = None
1415 time_vector = npy.array(signals_data.signals_data[0].time_vector)
1416 values = signals_data.signals_data[0].values
1417 forced_values = signals_data.signals_data[0].forced_values
1419 match (function):
1420 case "Cumul":
1421 new_values = cumul(values)
1422 new_forced_values = cumul(forced_values)
1423 # case "CumulDistrib":
1424 # new_values = cumul_distrib(values)
1425 # new_forced_values = cumul_distrib(forced_values)
1426 case "Delta":
1427 new_values = delta(values)
1428 new_forced_values = delta(forced_values)
1429 case "DeltaT":
1430 new_values = delta(time_vector)
1431 new_forced_values = new_values
1432 case "Derive":
1433 new_values = derive(time_vector, values)
1434 new_forced_values = derive(time_vector, forced_values)
1435 case "Integ":
1436 new_values = integ(time_vector, values)
1437 new_forced_values = integ(time_vector, forced_values)
1439 new_values = npy.where(npy.isnan(new_values), None, new_values)
1440 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1442 loop = asyncio.get_running_loop()
1443 loop.create_task(
1444 cls.save_function_signal(
1445 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible
1446 )
1447 )
1449 if window_max_timestamp is not None:
1450 max_timestamp_mask = time_vector <= window_max_timestamp
1451 time_vector = time_vector[max_timestamp_mask]
1452 new_values = new_values[max_timestamp_mask]
1453 new_forced_values = new_forced_values[max_timestamp_mask]
1454 if window_min_timestamp is not None:
1455 min_timestamp_mask = time_vector >= window_min_timestamp
1456 time_vector = time_vector[min_timestamp_mask]
1457 new_values = new_values[min_timestamp_mask]
1458 new_forced_values = new_forced_values[min_timestamp_mask]
1460 signals_data.signals_data[0].time_vector = time_vector.tolist()
1461 signals_data.signals_data[0].values = new_values.tolist()
1462 signals_data.signals_data[0].forced_values = new_forced_values.tolist()
1463 signals_data.signals_data[0].number_samples = time_vector.size
1465 signals_data.signals_data[0].signal_id = signal_id
1467 return signals_data
1469 @classmethod
1470 async def apply_multiple_function(
1471 cls,
1472 phases: list,
1473 signal_ids: list,
1474 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
1475 window_min_timestamp: float = None,
1476 window_max_timestamp: float = None,
1477 ):
1478 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION):
1479 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}"
1480 else:
1481 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}"
1483 active_phase = phases[0]
1484 if function in {"Align-X", "Using-X"}:
1485 active_phase = phases[1]
1487 processed_result_signal = Signal.get_from_signal_id(function_signal_id)
1488 if processed_result_signal is not None and (
1489 active_phase.id in processed_result_signal.computed_phases_ids
1490 ): # If signal has been computed for the correct phase
1491 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp)
1493 array_length = None
1494 time_vector_list = []
1495 values_list = []
1496 forced_values_list = []
1497 forcible = True
1498 for phase, signal_id in zip(phases, signal_ids):
1499 signals_data = cls.get_from_phase_and_signal_ids(
1500 [phase], [None], [signal_id], [None], [None], zero_time_vector=False
1501 )
1503 if len(signals_data.signals_data) == 0:
1504 return None
1506 signal_data = signals_data.signals_data[0]
1508 if array_length is None:
1509 array_length = signal_data.number_samples
1510 if (
1511 array_length != signal_data.number_samples and function != "Align-X"
1512 ) or signal_data.number_samples == 0:
1513 return None
1515 time_vector_list.append(npy.array(signal_data.time_vector))
1516 values_list.append(npy.array(signal_data.values, dtype=npy.float64))
1517 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64))
1518 forcible = forcible and signal_data.forcible
1520 time_vector = time_vector_list[0]
1521 new_values = None
1522 new_forced_values = None
1524 match (function):
1525 case "Align-X":
1526 time_vector = time_vector_list[1]
1527 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000
1528 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000
1529 new_values = align_x(old_time_vector, values_list[0], new_time_vector)
1530 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector)
1531 # case "Atan2":
1532 # new_values = atan2(values_list[0], values_list[1])
1533 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1])
1534 case "Using-X":
1535 if len(time_vector_list[0]) != len(time_vector_list[1]):
1536 return None
1537 time_vector = time_vector_list[1]
1538 new_values = values_list[0]
1539 new_forced_values = forced_values_list[0]
1540 case "Mean":
1541 new_values = mean(*values_list)
1542 new_forced_values = mean(*forced_values_list)
1543 case "Norm":
1544 new_values = norm(*values_list)
1545 new_forced_values = norm(*forced_values_list)
1547 new_values = npy.where(npy.isnan(new_values), None, new_values)
1548 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1550 loop = asyncio.get_running_loop()
1551 loop.create_task(
1552 cls.save_function_signal(
1553 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible
1554 )
1555 )
1557 total_number_samples = time_vector.size
1559 if window_max_timestamp is not None:
1560 max_timestamp_mask = time_vector <= window_max_timestamp
1561 time_vector = time_vector[max_timestamp_mask]
1562 new_values = new_values[max_timestamp_mask]
1563 new_forced_values = new_forced_values[max_timestamp_mask]
1564 if window_min_timestamp is not None:
1565 min_timestamp_mask = time_vector >= window_min_timestamp
1566 time_vector = time_vector[min_timestamp_mask]
1567 new_values = new_values[min_timestamp_mask]
1568 new_forced_values = new_forced_values[min_timestamp_mask]
1570 signals_data = cls(
1571 signals_data=[
1572 NumericSignalData(
1573 signal_id=function_signal_id,
1574 forcible=forcible,
1575 time_vector=time_vector.tolist(),
1576 values=new_values.tolist(),
1577 forced_values=new_forced_values.tolist(),
1578 number_samples=time_vector.size,
1579 number_samples_db=total_number_samples,
1580 )
1581 ],
1582 data_processing_time=0,
1583 data_start=0,
1584 data_end=0,
1585 )
1587 return signals_data
1589 @classmethod
1590 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float):
1591 signal_data_collection = get_signal_collection(signal_id, create=True)
1592 pipeline = []
1593 match_filter = {}
1594 if window_min_timestamp is not None or window_max_timestamp is not None:
1595 match_filter["$match"] = {}
1596 match_filter["$match"]["precise_timestamp"] = {}
1597 if window_max_timestamp is not None:
1598 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp
1599 if window_min_timestamp is not None:
1600 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp
1602 total_number_samples = signal_data_collection.count_documents({})
1604 if match_filter:
1605 pipeline.append(match_filter)
1607 fetch_start = time.time()
1609 samples = signal_data_collection.aggregate(pipeline).to_list()
1610 new_time_vector = []
1611 new_values = []
1612 new_forced_values = []
1613 for sample in samples:
1614 new_time_vector.append(sample["precise_timestamp"])
1615 new_values.append(sample["value"])
1616 new_forced_values.append(sample["forced_value"])
1618 return cls(
1619 signals_data=[
1620 NumericSignalData(
1621 signal_id=signal_id,
1622 time_vector=new_time_vector,
1623 values=new_values,
1624 forced_values=new_forced_values,
1625 number_samples=len(new_time_vector),
1626 number_samples_db=total_number_samples,
1627 )
1628 ],
1629 data_processing_time=time.time() - fetch_start,
1630 data_start=0,
1631 data_end=0,
1632 )
1634 @classmethod
1635 async def save_function_signal(
1636 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool
1637 ):
1638 # Insert data first so if it is requested by another user, it will be computed again
1639 signal_collection = get_signal_collection(function_signal_id, create=True)
1640 signal_collection.delete_many(
1641 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}}
1642 )
1643 signal_collection.insert_many(
1644 [
1645 {
1646 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]),
1647 "precise_timestamp": time_vector[i],
1648 "value": new_values[i],
1649 "forced_value": new_forced_values[i],
1650 }
1651 for i in range(len(time_vector))
1652 ]
1653 )
1655 signals_config_collection = get_collection(systems_database, "signals", create=True)
1656 signals_config_collection.find_one_and_update(
1657 {"signal_id": function_signal_id},
1658 {
1659 "$set": {
1660 "description": "",
1661 "unit": None,
1662 "type": "sensor",
1663 "address": None,
1664 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0
1665 "transfer_function": None,
1666 "precision_digits": None,
1667 "digitization_function": None,
1668 "data_type": "float",
1669 "formula": None,
1670 "forcible": forcible,
1671 "commandable": False,
1672 "broadcastable": True,
1673 "signal_id": function_signal_id,
1674 "post_processing": True,
1675 },
1676 "$push": {"computed_phases_ids": phase.id},
1677 },
1678 upsert=True,
1679 )
1681 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []):
1682 if post_processing:
1683 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1684 zip_buffer = io.BytesIO()
1685 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1686 for signal_data in self.signals_data:
1687 file_name = signal_data.signal_id
1688 if post_processing:
1689 phase = phases_by_id.get(
1690 signal_data.phase_id,
1691 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1692 )
1693 file_name = f"{signal_data.signal_id} ({phase.name})"
1694 if file_format == "csv":
1695 export_io = signal_data.csv_export()
1696 zip_file.writestr(f"{file_name}.csv", export_io)
1697 elif file_format == "prestoplot":
1698 export_io = signal_data.prestoplot_export()
1699 zip_file.writestr(f"{file_name}.tab", export_io)
1700 else:
1701 raise ValueError(f"Format not found. Got: {file_format}")
1702 zip_bytes = zip_buffer.getvalue()
1703 return zip_bytes
1705 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []):
1706 if post_processing:
1707 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1708 hdf5_buffer = io.BytesIO()
1709 custom_type_float = npy.dtype(
1710 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1711 )
1712 custom_type_string = npy.dtype(
1713 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1714 )
1715 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1716 for signal_data in self.signals_data:
1717 if post_processing:
1718 phase = phases_by_id.get(
1719 signal_data.phase_id,
1720 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1721 )
1722 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})")
1723 else:
1724 signal_group = hdf5_file.create_group(signal_data.signal_id)
1725 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1726 if signal_data.data_type == "str":
1727 export_data = npy.array(
1728 list(
1729 zip(
1730 date_vector,
1731 signal_data.time_vector,
1732 signal_data.values,
1733 signal_data.forced_values,
1734 )
1735 ),
1736 dtype=custom_type_string,
1737 )
1738 else:
1739 export_data = npy.array(
1740 list(
1741 zip(
1742 date_vector,
1743 signal_data.time_vector,
1744 signal_data.values,
1745 signal_data.forced_values,
1746 )
1747 ),
1748 dtype=custom_type_float,
1749 )
1750 signal_group["data"] = export_data
1751 return hdf5_buffer.getvalue()
1754class SignalStatus(TwinPadModel):
1755 status: str = "down"
1756 reason: str = ""
1757 delay: float | None = None
1760class DigitizationFunction(TwinPadModel):
1761 bits: int | None = None
1762 min_value: float
1763 max_value: float
1764 min_raw_value: float
1765 max_raw_value: float
1768class SignalUpdate(TwinPadModel):
1769 value: float | str | bool | int | None = None
1770 forced_value: float | str | bool | int | None = None
1771 timestamp: int | None = None
1774class SignalType(str, Enum):
1775 command = "command"
1776 sensor = "sensor"
1777 external_sensor = "external_sensor"
1780SIGNALDATA_TYPES = {
1781 "int": NumericSignalData,
1782 "float": NumericSignalData,
1783 "str": StringSignalData,
1784 "bool": NumericSignalData,
1785 "epoch": NumericSignalData,
1786}
1789class Signal(GenericMongo):
1790 collection_name: ClassVar[str] = "signals"
1792 signal_id: str
1793 frequency: float
1794 unit: str | None
1795 description: str
1796 type: SignalType
1797 data_type: str
1798 precision_digits: int | None
1799 forcible: bool
1800 commandable: bool
1801 broadcastable: bool
1802 status: SignalStatus = SignalStatus()
1804 post_processing: bool = False
1805 computed_phases_ids: list[str] = []
1807 digitization_function: DigitizationFunction | None
1809 @property
1810 def device(self) -> Device:
1811 device_id = self.signal_id.split(".")[0]
1812 device = Device.get_one_by_attribute("device_id", device_id)
1813 return device
1815 @cached_property
1816 def signal_data_class(self):
1817 if self.data_type in SIGNALDATA_TYPES:
1818 return SIGNALDATA_TYPES[self.data_type]
1819 if self.data_type.startswith("enum"):
1820 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1821 raise ValueError(f"Unhandled python type: {self.data_type}")
1823 @cached_property
1824 def python_type(self):
1825 if self.data_type in TYPES:
1826 return TYPES[self.data_type]
1827 if self.data_type.startswith("enum"):
1828 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1829 return Literal[*choices]
1830 raise ValueError(f"Unhandled python type: {self.data_type}")
1832 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1833 command = Command(
1834 sent_at=time.time(),
1835 command_type="Signal command",
1836 user_id=current_user.id,
1837 )
1839 has_input_error = False
1840 error_message = ""
1842 if self.data_type.startswith("enum"):
1843 enum_options = get_args(self.python_type)
1845 if update_dict.value is not None and update_dict.value not in enum_options:
1846 has_input_error = True
1847 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1848 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1849 has_input_error = True
1850 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1851 else:
1852 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1853 has_input_error = True
1854 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1855 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1856 has_input_error = True
1857 error_message += (
1858 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1859 )
1861 if has_input_error:
1862 command.response_time = 0
1863 command.succeeded = False
1864 command.description = f"Tried to modify signal {self.signal_id}"
1865 response = {"error": True, "status_code": 400, "message": error_message}
1866 else:
1867 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict)
1868 command.receive_response(response)
1870 Command.create(command)
1871 return response
1873 @classmethod
1874 def get_from_signal_id(cls, signal_id) -> Self:
1875 """Could be generic from mongo"""
1876 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1877 if not raw_value:
1878 return None
1879 del raw_value["_id"]
1880 return cls.dict_to_object(raw_value)
1882 @classmethod
1883 def get_all_ids(cls) -> list[str]:
1884 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1886 return [signal["signal_id"] for signal in cursor]
1888 @classmethod
1889 def get_all_statuses(cls) -> list[dict]:
1890 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}])
1892 return [
1893 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]}
1894 for signal in cursor
1895 ]
1897 async def number_samples(self):
1898 collection = get_signal_collection(signal_id=self.signal_id)
1899 if collection is None:
1900 return 0
1902 number_samples = collection.estimated_document_count()
1904 number_samples_async_collection = await get_async_collection(
1905 systems_async_database, "number_samples", create=True, time_series=True
1906 )
1908 loop = asyncio.get_running_loop()
1909 loop.create_task(
1910 number_samples_async_collection.insert_one(
1911 {
1912 "timestamp": datetime.datetime.now(pytz.UTC),
1913 "signal_id": self.signal_id,
1914 "number_samples": number_samples,
1915 }
1916 )
1917 )
1919 return number_samples
1921 @classmethod
1922 def total_number_samples(cls) -> int:
1923 TwinPadActivity.get_number_samples_timeframe(0, 0, False)
1924 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1926 if number_samples_collection is None:
1927 return 0
1929 result = number_samples_collection.aggregate(
1930 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}]
1931 )
1933 result = result.to_list()
1934 if len(result) == 0:
1935 return 0
1936 return result[0]["amount"]
1938 def sample_datasize(self):
1939 return signals_database.command("collstats", self.signal_id)["size"]
1941 @classmethod
1942 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1943 result = cls.collection().aggregate(
1944 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1945 )
1947 return {signal["signal_id"]: signal["forcible"] for signal in result}
1950class ForcedSignal(GenericMongo):
1951 collection_name: ClassVar[str] = "forced_signals"
1953 signal_id: str
1954 forcing_user_id: str
1955 forced_at: float
1956 value: str | float
1958 def insert(self):
1959 insert_result = self.collection().find_one_and_update(
1960 {"signal_id": self.signal_id},
1961 {"$set": self.to_dict(exclude={"id"})},
1962 upsert=True,
1963 return_document=ReturnDocument.AFTER,
1964 )
1965 self.id = str(insert_result["_id"])
1966 return self.id
1968 @classmethod
1969 def can_force(cls, signal_id: str, current_user: User) -> bool:
1970 """Checks whether user can force a given signal.
1972 :param signal_id: Signal ID of the signal to force
1973 :type signal_id: str
1974 :param current_user: Current user
1975 :type current_user: User
1976 :return: False if the signal was forced by someone else than the user, True otherwise
1977 :rtype: bool
1978 """
1979 forced_signal = cls.get_one_by_attribute("signal_id", signal_id)
1980 if forced_signal is not None:
1981 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
1982 return False
1983 return True
1986class ServicesStatus(TwinPadModel):
1987 backend: str
1988 cloud_broker: str
1989 time_series_database: str
1990 signal_storage: str
1991 heartbeat_storage: str
1992 data_analyzer: str
1994 @classmethod
1995 def check(cls) -> Self:
1996 return cls(
1997 cloud_broker=ping(RABBITMQ_HOST),
1998 backend="up",
1999 time_series_database=ping(MONGO_HOST),
2000 signal_storage=ping(SIGNAL_STORAGE_HOST),
2001 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
2002 data_analyzer=ping(DATA_ANALYZER_HOST),
2003 )
2006def ping(host):
2007 try:
2008 if ping3.ping(host, timeout=0.8):
2009 return "up"
2010 except PermissionError:
2011 pass
2012 return "down"
2015class Event(GenericMongo):
2016 collection_name: ClassVar[str] = "events"
2018 name: str
2019 timestamp: float
2020 event_rule_id: str
2022 @computed_field
2023 @cached_property
2024 def event_rule(self) -> "EventRule":
2025 return EventRule.get_from_id(self.event_rule_id)
2027 @classmethod
2028 def dict_to_object(cls, dict_):
2029 """Refine to convert timestamp to datetime for mongodb."""
2030 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
2031 return super().dict_to_object(dict_)
2034class TwinPadActivity(GenericMongo):
2035 timestamp: float
2036 amount: int
2038 @classmethod
2039 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
2040 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2041 number_events_collection = get_collection(systems_database, "number_events")
2042 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
2043 items = []
2044 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2045 if number_events_collection is None or recompute_amount:
2046 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
2047 number_events_collection.delete_many({})
2048 first_event = events_collection.find_one(sort={"timestamp": 1})
2049 if first_event is None:
2050 return items
2051 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
2052 tzinfo=pytz.UTC
2053 )
2054 while last_computed_day < TODAY:
2055 day_nb_events = events_collection.count_documents(
2056 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2057 )
2058 if day_nb_events > 0:
2059 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
2060 last_computed_day += ONE_DAY_OFFSET
2061 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
2062 if number_events_today > 0:
2063 number_events_collection.delete_many({"timestamp": TODAY})
2064 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
2065 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2066 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2067 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2068 for day in number_events:
2069 day["timestamp"] = day["timestamp"].timestamp()
2070 items.append(cls.mongo_dict_to_object(day))
2071 return items
2073 @classmethod
2074 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2075 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2076 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
2077 signals_number_samples_collection = get_collection(
2078 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
2079 )
2080 items = []
2081 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2082 if number_samples_collection is None or recompute_amount:
2083 number_samples_collection = get_collection(
2084 systems_database, "number_received_samples", create=True, time_series=True
2085 )
2086 number_samples_collection.delete_many({})
2087 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
2088 if first_sample is None:
2089 return items
2090 # compute from day of first found event
2091 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
2092 tzinfo=pytz.UTC
2093 )
2094 while last_computed_day < TODAY:
2095 number_samples_request = signals_number_samples_collection.aggregate(
2096 [
2097 {
2098 "$match": {
2099 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
2100 }
2101 },
2102 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2103 ]
2104 ).to_list()
2105 if len(number_samples_request) == 0:
2106 number_samples = 0
2107 else:
2108 number_samples = number_samples_request[0].get("number_samples", 0)
2109 if number_samples > 0:
2110 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
2111 last_computed_day += ONE_DAY_OFFSET
2112 number_samples_request = signals_number_samples_collection.aggregate(
2113 [
2114 {"$match": {"timestamp": {"$gte": TODAY}}},
2115 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2116 ]
2117 ).to_list()
2118 if len(number_samples_request) == 0:
2119 number_samples_today = 0
2120 else:
2121 number_samples_today = number_samples_request[0].get("number_samples", 0)
2122 if number_samples_today > 0:
2123 number_samples_collection.delete_many({"timestamp": TODAY})
2124 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
2125 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2126 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2127 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2128 for day in number_events:
2129 day["timestamp"] = day["timestamp"].timestamp()
2130 items.append(cls.mongo_dict_to_object(day))
2131 return items
2133 @classmethod
2134 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2135 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2136 number_commands_collection = get_collection(systems_database, "number_commands")
2137 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
2138 items = []
2139 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2140 if number_commands_collection is None or recompute_amount:
2141 number_commands_collection = get_collection(
2142 systems_database, "number_commands", create=True, time_series=True
2143 )
2144 number_commands_collection.delete_many({})
2145 first_command = commands_collection.find_one(sort={"timestamp": 1})
2146 if first_command is None:
2147 return items
2148 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
2149 tzinfo=pytz.UTC
2150 )
2151 while last_computed_day < TODAY:
2152 day_nb_commands = commands_collection.count_documents(
2153 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2154 )
2155 if day_nb_commands > 0:
2156 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
2157 last_computed_day += ONE_DAY_OFFSET
2158 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
2159 if number_commands_today > 0:
2160 number_commands_collection.delete_many({"timestamp": TODAY})
2161 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
2162 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2163 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2164 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2165 for day in number_commands:
2166 day["timestamp"] = day["timestamp"].timestamp()
2167 items.append(cls.mongo_dict_to_object(day))
2168 return items
2171class EventRule(GenericMongo):
2172 collection_name: ClassVar[str] = "event_rules"
2174 name: str
2175 formula: str
2176 variables: list[str]
2178 @computed_field
2179 @cached_property
2180 def number_events(self) -> int:
2181 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
2184class Company(GenericMongo):
2185 collection_name: ClassVar[str] = "companies"
2186 name: str
2189class Campaign(GenericMongo):
2190 collection_name: ClassVar[str] = "campaigns"
2192 # Properties
2193 id: str | None = None
2194 name: str
2195 description: str | None = None
2198class Phase(GenericMongo):
2199 collection_name: ClassVar[str] = "phases"
2201 # Properties
2202 id: str | None = None
2203 name: str
2204 description: str | None = None
2205 start_at: float
2206 end_at: float
2208 # FK
2209 campaign_id: MongoId
2211 @classmethod
2212 def deleteMany(cls, campaign_id):
2213 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
2214 return delete_phases
2217class CustomViewCreation(GenericMongo):
2218 collection_name: ClassVar[str] = "custom_views"
2220 name: str
2221 configuration: list
2224class CustomView(CustomViewCreation):
2225 # Properties
2226 id: str | None = None
2228 # Foreign Key
2229 user_id: str
2232CustomViewUpdate = create_update_model(CustomView)
2235class Video(GenericMongo):
2236 collection_name: ClassVar[str] = "videos"
2238 # Properties
2239 name: str
2240 ip_addr: str
2241 username: str | None = None
2242 password: str | None = None
2244 # Methods
2245 @classmethod
2246 def get_all(cls, sort_by="_id") -> list[Self]:
2247 items = []
2248 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
2249 items.append(cls.mongo_dict_to_object(dict_))
2250 return items
2252 @classmethod
2253 def get_video(cls, camera_id: ObjectId):
2254 camera = cls.get_from_id(camera_id)
2255 if camera is not None:
2256 return camera.name
2257 return None
2260class Command(GenericMongo):
2261 collection_name: ClassVar[str] = "commands"
2263 # Properties
2264 timestamp: datetime.datetime = None
2265 sent_at: float
2266 response_time: float = 0.0
2267 command_type: str
2268 description: str = ""
2269 succeeded: bool = False
2271 # Foreign key
2272 user_id: str
2274 @classmethod
2275 def collection(cls):
2276 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
2278 @classmethod
2279 def create(cls, command: Self):
2280 command = cls(
2281 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
2282 sent_at=command.sent_at,
2283 response_time=command.response_time,
2284 command_type=command.command_type,
2285 description=command.description,
2286 succeeded=command.succeeded,
2287 user_id=command.user_id,
2288 )
2289 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
2290 if new_command is None:
2291 return None
2292 return {"command_id": str(new_command.inserted_id)}
2294 def receive_response(self, response: dict):
2295 self.response_time = time.time() - self.sent_at
2296 self.succeeded = response.get("error", True) is False
2297 if self.description == "":
2298 self.description += response.get("message", "").rstrip()
2301class SignalsPresetCreation(GenericMongo):
2302 name: str
2303 signal_ids: list[str]
2306class SignalsPreset(SignalsPresetCreation):
2307 collection_name: ClassVar[str] = "signals_presets"
2309 user_id: str
2311 @classmethod
2312 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
2313 signals_preset = cls(
2314 user_id=user_id,
2315 name=signals_preset.name,
2316 signal_ids=signals_preset.signal_ids,
2317 )
2319 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
2321 return str(new_signal_preset.inserted_id)
2324SignalsPresetUpdate = create_update_model(SignalsPreset)
2327class LineStyle(str, Enum):
2328 solid = "solid"
2329 dotted = "dotted"
2330 dashed = "dashed"
2333class SignalAppearance:
2334 value_color: str
2335 forced_value_color: str
2338class GraphThemeCreation(GenericMongo):
2339 collection_name: ClassVar[str] = "graph_themes"
2341 name: str
2342 signal_id: str
2343 value_color: str = ""
2344 forced_value_color: str = ""
2345 value_line_style: LineStyle = LineStyle.solid
2346 forced_value_line_style: LineStyle = LineStyle.solid
2347 private: bool = True
2350class PublicGraphTheme(GraphThemeCreation):
2351 created_by_user: bool
2352 in_user_library: bool
2353 active_for_user: bool
2355 _current_user_id: str = ""
2357 @classproperty
2358 def custom_pipeline_steps(cls) -> dict[str, list]:
2359 return {
2360 "created_by_user": [
2361 {
2362 "$addFields": {
2363 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
2364 }
2365 }
2366 ],
2367 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
2368 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
2369 ],
2370 "in_user_library": [
2371 {
2372 "$addFields": {
2373 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2374 }
2375 }
2376 ],
2377 "active_for_user": [
2378 {
2379 "$addFields": {
2380 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2381 }
2382 }
2383 ],
2384 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2385 "active": [
2386 {
2387 "$addFields": {
2388 "active": "$$REMOVE",
2389 }
2390 }
2391 ],
2392 "creator_id": [
2393 {
2394 "$addFields": {
2395 "creator_id": "$$REMOVE",
2396 }
2397 }
2398 ],
2399 }
2401 @classmethod
2402 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2403 cls._current_user_id = user_id
2404 return super().response_from_query(query)
2406 @classmethod
2407 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2408 query.in_user_library = "true"
2409 return cls.response_from_query(query, user_id)
2411 @classmethod
2412 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2413 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2415 @classmethod
2416 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2417 cls._current_user_id = user_id
2418 return super().get_by_attribute(attribute_name, attribute_value)
2420 @classmethod
2421 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2422 cls._current_user_id = user_id
2423 return super().get_one_by_attribute(attribute_name, attribute_value)
2425 @classmethod
2426 def get_all(cls, sort_by: str, user_id: str):
2427 cls._current_user_id = user_id
2428 return super().get_all(sort_by)
2430 @classmethod
2431 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2432 pipeline = [
2433 {
2434 "$match": {
2435 "active": {"$eq": user_id},
2436 "signal_id": {"$in": signal_ids},
2437 }
2438 },
2439 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2440 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2441 {
2442 "$project": {
2443 "_id": 0,
2444 "signal_id": 1,
2445 "value_color": 1,
2446 "forced_value_color": 1,
2447 "value_line_style": 1,
2448 "forced_value_line_style": 1,
2449 }
2450 },
2451 ]
2453 result = {}
2455 cursor = cls.collection().aggregate(pipeline)
2456 for document in cursor:
2457 signal_id = document["signal_id"]
2458 del document["signal_id"]
2459 result[signal_id] = document
2461 return result
2464GraphThemeUpdate = create_update_model(PublicGraphTheme)
2467class PrivateGraphTheme(GraphThemeCreation):
2468 # private
2469 creator_id: str
2470 in_library: list[str]
2471 active: list[str]
2473 @classmethod
2474 def create(
2475 cls,
2476 creator_id: str,
2477 name: str,
2478 signal_id: str,
2479 value_color: str,
2480 forced_value_color: str,
2481 value_line_style: LineStyle,
2482 forced_value_line_style: LineStyle,
2483 private: bool,
2484 ):
2485 color_setting = cls(
2486 creator_id=creator_id,
2487 name=name,
2488 signal_id=signal_id,
2489 value_color=value_color,
2490 forced_value_color=forced_value_color,
2491 value_line_style=value_line_style,
2492 forced_value_line_style=forced_value_line_style,
2493 private=private,
2494 in_library=[creator_id],
2495 active=[creator_id],
2496 )
2498 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2499 color_setting.id = str(new_color_setting.inserted_id)
2500 return color_setting
2502 def update(self, update_dict: dict, user_id: str):
2503 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2504 if in_user_lib and user_id not in self.in_library:
2505 self.in_library.append(user_id)
2506 elif not in_user_lib and user_id in self.in_library:
2507 self.in_library.remove(user_id)
2508 update_dict["in_library"] = self.in_library
2509 del update_dict["in_user_library"]
2511 if (active_for_user := update_dict.get("active_for_user")) is not None:
2512 if active_for_user and user_id not in self.active:
2513 self.active.append(user_id)
2514 elif not active_for_user and user_id in self.active:
2515 self.active.remove(user_id)
2516 update_dict["active"] = self.active
2517 del update_dict["active_for_user"]
2519 if update_dict.get("created_by_user") is not None:
2520 del update_dict["created_by_user"]
2522 self.collection().find_one_and_update(
2523 {"_id": ObjectId(self.id)},
2524 {"$set": update_dict},
2525 )
2527 return {}
2530class DeviceStatus(str, Enum):
2531 started = "started"
2532 running = "running"
2533 created = "created"
2534 exited = "exited"
2535 restarting = "restarting"
2538class DeviceUpdateFromDeployer(BaseModel):
2539 status: DeviceStatus
2542class DeviceFromDeployerCreation(BaseModel):
2543 name: str
2544 description: str
2547class DeviceFromDeployer(DeviceFromDeployerCreation):
2548 status: DeviceStatus
2549 device_id: DeviceId
2550 logs: str = ""
2553class DeviceDeployer(GenericMongo):
2554 collection_name: ClassVar[str] = "device_deployers"
2555 url: HttpUrl
2557 def endpoint_url(self, endpoint):
2558 return f"{str(self.url).rstrip('/')}/{endpoint}"
2560 def devices(self) -> list[DeviceFromDeployer]:
2561 devices = []
2562 try:
2563 response = requests.get(self.endpoint_url("devices"))
2564 except requests.exceptions.ConnectionError:
2565 logger.info("connection error")
2566 return None
2567 if response.status_code != 200:
2568 return None
2569 for device_dict in response.json()["devices"]:
2570 devices.append(
2571 DeviceFromDeployer(
2572 device_id=device_dict["device_id"],
2573 name=device_dict["container_name"],
2574 description="desc",
2575 status=device_dict["status"],
2576 logs=device_dict["logs"],
2577 )
2578 )
2579 return devices
2581 def get_device(self, device_id: DeviceId):
2582 try:
2583 response = requests.get(self.endpoint_url(f"devices/{device_id}"))
2584 except requests.exceptions.ConnectionError:
2585 return None
2586 if response.status_code != 200:
2587 return None
2588 device_dict = response.json()
2589 return DeviceFromDeployer(
2590 device_id=device_dict["device_id"],
2591 name=device_dict["container_name"],
2592 description="desc",
2593 status=device_dict["status"],
2594 logs=device_dict["logs"],
2595 )
2597 def create_device(self, device: DeviceFromDeployer) -> Device | None:
2598 try:
2599 response = requests.post(self.endpoint_url("devices"), json={"name": device.name})
2600 except requests.exceptions.ConnectionError:
2601 return None
2603 if response.status_code != 201:
2604 return None
2606 device_dict = response.json()
2607 return DeviceFromDeployer(
2608 device_id=device_dict["device_id"],
2609 name="",
2610 description="desc",
2611 status=device_dict["status"],
2612 )
2614 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None:
2615 try:
2616 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump())
2617 except requests.exceptions.ConnectionError:
2618 return None
2620 if response.status_code != 200:
2621 return None
2623 device_dict = response.json()
2624 return Device(
2625 device_id=device_dict["device_id"],
2626 name="",
2627 description="desc",
2628 pid={},
2629 petri_network={},
2630 modes=[],
2631 status=device_dict["status"],
2632 )
2634 def delete_device(self, device_id: DeviceId) -> DeleteInfo:
2635 try:
2636 response = requests.delete(self.endpoint_url(f"devices/{device_id}"))
2637 except requests.exceptions.ConnectionError:
2638 return DeleteInfo(is_deleted=False, detail="Connection to deployer error")
2639 if response.status_code not in [200, 202, 204]:
2640 return DeleteInfo(is_deleted=False, detail=response.text)
2642 return DeleteInfo(is_deleted=True, detail="")
2645DeviceDeployerUpdate = create_update_model(DeviceDeployer)