Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%
1701 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 15:40 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 15:40 +0000
1from functools import cached_property
2import os
3import re
4import io
5import time
6import csv
7from typing import IO, Self, ClassVar, Any, Literal, get_args, Annotated
8import datetime
9import math
10import bisect
11from enum import Enum
12import logging
13import copy
14import asyncio
15import json
16import hashlib
17from ast import literal_eval
18import requests
20import zipfile
21import ping3
22import pytz
23from bson.objectid import ObjectId
24from pymongo import ASCENDING, ReturnDocument
25from pydantic import BaseModel, ConfigDict, HttpUrl, computed_field, Field, create_model, BeforeValidator
26import numpy as npy
27import lttb
28import h5py
29from openpyxl import Workbook, load_workbook
30from openpyxl.worksheet.worksheet import Worksheet
32from twinpad_backend.db import (
33 get_collection,
34 get_async_collection,
35 get_signal_collection,
36 get_signal_collections_batch,
37 systems_database,
38 systems_async_database,
39 signals_database,
40 signals_async_database,
41 devices_states_database,
42)
43from twinpad_backend.responses import ListResponse
44from twinpad_backend.messages import RabbitMQClient
45from twinpad_backend.config_manager.utils import is_line_empty, read_boolean_cell
46from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm
48TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
49SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"]
50DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"]
51MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"]
54RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
55MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
56SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
57HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
58DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
60DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
61NUMBER_SAMPLES_DATABASE_UPDATE = 120
63logger = logging.getLogger("uvicorn.error")
66CHANNEL_PATTERN = r"Channel\s+(\d+)"
67XLSX_HEADER = [
68 "Ticker",
69 "Component id",
70 "Signal description",
71 "Unit",
72 "Type",
73 "Frequency (Hz)",
74 "Sensor transfer function",
75 "Precision digits",
76 "Data type",
77 "Formula",
78 "Forcible",
79 "Commandable",
80 "Broadcastable",
81]
83ReadableFile = str | os.PathLike[str] | IO[bytes] | bytes
86class DeleteInfo(BaseModel):
87 is_deleted: bool
88 detail: str
91class classproperty:
92 """
93 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
94 Found here: https://stackoverflow.com/a/76301341
95 """
97 def __init__(self, func):
98 self.fget = func
100 def __get__(self, _, owner):
101 return self.fget(owner)
104def create_update_model(model):
105 fields = {}
107 for field_name, field_annotation in model.model_fields.items():
108 if field_name != "id":
109 fields[field_name] = (field_annotation.annotation | None, None)
111 query_name = model.__name__ + "Update"
112 return create_model(query_name, **fields)
115def get_utc_date_from_timestamp(timestamp: float):
116 return datetime.datetime.fromtimestamp(timestamp).isoformat()
119def downsample_list(time_vector: list, values: list, max_number_samples: int):
120 if len(time_vector) < max_number_samples:
121 return time_vector, values
123 time_vector_copy = copy.deepcopy(time_vector)
124 values_copy = copy.deepcopy(values)
126 none_group_bounds = []
127 none_group_index = -1
128 index = -1
129 # LTTB doesn't handle None values so remove them
130 while values_copy.count(None) > 0:
131 # Store bounds of None value groups so we can insert them back after the downsampling
132 if (new_index := values_copy.index(None)) != index:
133 none_group_bounds.append([time_vector_copy.pop(new_index)])
134 none_group_index += 1
135 elif len(none_group_bounds[none_group_index]) < 2:
136 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
137 else:
138 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
139 values_copy.pop(new_index)
140 index = new_index
141 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
143 try:
144 values_array = npy.array([time_vector_copy, values_copy]).T
145 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
147 new_time_vector = interpolated_values[:, 0].tolist()
148 new_values = interpolated_values[:, 1].tolist()
149 except ValueError:
150 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
151 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
152 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
153 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
154 return new_time_vector, new_values_nan_to_none
156 # insert back None values at the correct timestamps
157 for none_group in none_group_bounds:
158 start_index = npy.searchsorted(new_time_vector, none_group[0])
159 new_time_vector[start_index:start_index] = none_group
160 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
162 return new_time_vector, new_values
165def is_of_type(value, wanted_type):
166 if wanted_type is float:
167 return isinstance(value, (int, float))
168 return isinstance(value, wanted_type)
171def is_valid_excel_file(excel_file: ReadableFile) -> bool:
172 """Checks if file can be opened as an Excel file.
174 :param excel_file: Path to the file.
175 :type excel_file: str | os.PathLike[str] | IO[bytes] | bytes
176 :return: True if it can be opened as an Excel file, False otherwise
177 :rtype: bool
178 """
179 try:
180 temp = load_workbook(excel_file, read_only=True)
181 temp.close()
182 except zipfile.BadZipFile:
183 return False
185 return True
188# Models
189class TwinPadModel(BaseModel):
190 @classmethod
191 def dict_to_object(cls, dict_):
192 return cls.model_validate(dict_)
194 def to_dict(self, exclude=None, by_alias: bool | None = None):
195 dict_ = self.model_dump(exclude=exclude, mode="json", by_alias=by_alias)
196 return dict_
199def validate_mongo_id(v):
200 if not ObjectId.is_valid(v):
201 raise ValueError("Invalid MongoDB id")
202 return str(v)
205MongoId = Annotated[str, BeforeValidator(validate_mongo_id)]
208def validate_12_hex(v: str) -> str:
209 if not re.fullmatch(r"[0-9a-fA-F]{12}", v):
210 raise ValueError("ID must be a 12-character hexadecimal string")
211 return v
214DeviceId = Annotated[str, BeforeValidator(validate_12_hex)]
217class GenericMongo(TwinPadModel):
218 id: MongoId | None = None
219 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
221 @classmethod
222 def collection(cls):
223 return get_collection(systems_database, cls.collection_name, create=True)
225 @classmethod
226 def response_from_query(cls, query) -> ListResponse[Self]:
227 request_filters = query.mongodb_filter()
228 items = []
230 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
231 sort_dict = {}
232 for sort in query.sort_by.split(","):
233 if ":" in sort:
234 sort_field, sort_order = sort.split(":")
235 sort_order = int(sort_order)
236 else:
237 sort_field = sort
238 sort_order = 1
239 sort_dict[sort_field] = sort_order
241 collection = get_collection(systems_database, cls.collection_name, create=True)
242 total = collection.count_documents(request_filters)
244 pipeline = []
245 added_properties = []
246 if "$and" in request_filters:
247 for request_filter in request_filters["$and"]:
248 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
249 if filtered_property in request_filter:
250 pipeline.extend(pipeline_steps)
251 added_properties.append(filtered_property)
252 else:
253 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
254 if filtered_property in request_filters:
255 pipeline.extend(pipeline_steps)
256 added_properties.append(filtered_property)
257 pipeline.append({"$match": request_filters})
259 for sort_field in sort_dict.keys():
260 if sort_field in cls.custom_pipeline_steps:
261 pipeline.extend(cls.custom_pipeline_steps[sort_field])
262 added_properties.append(sort_field)
263 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
265 if (query.limit is not None) and (query.limit != 0):
266 pipeline.append({"$limit": query.limit})
268 for filtered_property, step in cls.custom_pipeline_steps.items():
269 if filtered_property not in added_properties:
270 pipeline.extend(step)
272 cursor = collection.aggregate(pipeline)
274 for item_dict in cursor:
275 items.append(cls.mongo_dict_to_object(item_dict))
277 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
279 @classmethod
280 def get_from_id(cls, item_id) -> Self | None:
281 return cls.get_one_by_attribute("_id", ObjectId(item_id))
283 @classmethod
284 def mongo_dict_to_object(cls, mongo_dict):
285 mongo_dict["id"] = str(mongo_dict["_id"])
286 del mongo_dict["_id"]
287 return cls.dict_to_object(mongo_dict)
289 @classmethod
290 def get_by_attribute(cls, attribute_name: str, attribute_value):
291 """Returns all items that match the attribute with value."""
292 pipeline = []
293 if attribute_name in cls.custom_pipeline_steps:
294 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
295 pipeline.append({"$match": {attribute_name: attribute_value}})
296 for key, step in cls.custom_pipeline_steps.items():
297 if key != attribute_name:
298 pipeline.extend(step)
299 items = cls.collection().aggregate(pipeline)
300 if items is None:
301 return None
302 return [cls.mongo_dict_to_object(d) for d in items]
304 @classmethod
305 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
306 pipeline = []
307 if attribute_name in cls.custom_pipeline_steps:
308 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
309 pipeline.append({"$match": {attribute_name: attribute_value}})
310 pipeline.append({"$limit": 1})
311 for key, step in cls.custom_pipeline_steps.items():
312 if key != attribute_name:
313 pipeline.extend(step)
314 items = cls.collection().aggregate(pipeline).to_list()
315 if len(items) == 0:
316 return None
317 return cls.mongo_dict_to_object(items[0])
319 @classmethod
320 def get_all(cls, sort_by="_id") -> list[Self]:
321 items = []
322 pipeline = []
323 if sort_by in cls.custom_pipeline_steps:
324 pipeline.extend(cls.custom_pipeline_steps[sort_by])
325 pipeline.append({"$sort": {sort_by: ASCENDING}})
326 for key, step in cls.custom_pipeline_steps.items():
327 if key != sort_by:
328 pipeline.extend(step)
329 for dict_ in cls.collection().aggregate(pipeline):
330 items.append(cls.mongo_dict_to_object(dict_))
331 return items
333 @classmethod
334 def get_number_documents(cls):
335 collection = get_collection(systems_database, cls.collection_name)
336 if collection is None:
337 return 0
338 return collection.count_documents(
339 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]}
340 )
342 def insert(self):
343 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"}))
344 self.id = str(insert_result.inserted_id)
345 return self.id
347 def update(self, update_dict):
348 for key, value in update_dict.items():
349 setattr(self, key, value)
350 self.collection().find_one_and_update(
351 {"_id": ObjectId(self.id)},
352 {"$set": update_dict},
353 return_document=ReturnDocument.AFTER,
354 )
356 return self
358 def delete(self):
359 result = self.collection().delete_one({"_id": ObjectId(self.id)})
360 return result.deleted_count > 0
363class User(GenericMongo):
364 collection_name: ClassVar[str] = "users"
366 firstname: str
367 lastname: str
368 email: str
369 password: str
370 is_active: bool | None = False
371 is_admin: bool | None = False
372 is_connected: bool | None = False
373 company_id: str | None = None
375 def to_dict(self, exclude: set = set()):
376 exclude.add("password")
377 return GenericMongo.to_dict(self, exclude=exclude)
379 @classmethod
380 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
381 users = cls.get_all()
382 if not users:
383 is_admin = True
384 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
385 user_collection = get_collection(systems_database, "users", create=True)
386 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
387 if new_user is None:
388 return None
389 return {"user_id": str(new_user.inserted_id)}
391 @classmethod
392 def update_info(cls, user: "UserUpdate", user_id: str):
393 updated_user = cls.collection().find_one_and_update(
394 {"_id": ObjectId(user_id)},
395 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
396 return_document=ReturnDocument.AFTER,
397 )
398 updated_user["id"] = str(updated_user["_id"])
399 del (updated_user["_id"], updated_user["is_connected"])
400 return cls(**updated_user)
403UserUpdate = create_update_model(User)
406class Mode(TwinPadModel):
407 mode_id: int
408 name: str
409 frequency_multiplier: float
410 min_frequency: int
412 @classmethod
413 def from_excel(cls, modes_sheet: Worksheet) -> list[Self]:
414 modes = []
415 for mode_index, (mode_name, freq_multiplier, min_frequency) in enumerate(
416 modes_sheet.iter_rows(min_row=2, max_col=3)
417 ):
418 modes.append(
419 Mode(
420 mode_id=mode_index + 1,
421 name=mode_name.value,
422 frequency_multiplier=freq_multiplier.value,
423 min_frequency=min_frequency.value,
424 )
425 )
427 return modes
430class DeviceUpdate(TwinPadModel):
431 mode_id: int
434class Device(GenericMongo):
435 collection_name: ClassVar[str] = "devices"
437 device_id: DeviceId
438 config_id: str | None = None
439 config_name: str | None = None
440 name: str
441 description: str = ""
442 modes: list[Mode]
443 current_mode_id: int | None = None
444 last_ping: float | None = None
445 petri_network: Any
446 pid: Any
447 load: float | None = None
448 tokens: list[int] = Field(default_factory=list)
449 status: str
451 async def change_mode(self, update_dict, current_user: User):
452 has_error = False
454 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
455 has_error = True
456 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
457 elif self.current_mode_id is not None:
458 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
459 else:
460 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
461 command = Command(
462 sent_at=time.time(),
463 command_type="Mode change",
464 description=description,
465 user_id=current_user.id,
466 )
468 if has_error:
469 command.response_time = 0
470 command.succeeded = False
471 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
472 else:
473 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id)
474 command.receive_response(response)
476 Command.create(command)
477 return response
479 @classmethod
480 def get_from_device_or_config_id(cls, device_or_config_id: str):
481 items = (
482 cls.collection()
483 .aggregate(
484 [
485 {"$match": {"$or": [{"device_id": device_or_config_id}, {"config_id": device_or_config_id}]}},
486 {"$limit": 1},
487 ]
488 )
489 .to_list()
490 )
491 if len(items) == 0:
492 return None
493 return cls.mongo_dict_to_object(items[0])
495 @classmethod
496 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
497 devices_by_id = {}
498 for signal_id in signal_ids:
499 device_or_config_id = signal_id.split(".")[0]
500 if device_or_config_id not in devices_by_id:
501 devices_by_id[device_or_config_id] = cls.get_from_device_or_config_id(device_or_config_id)
502 return devices_by_id
505class DeviceSetup(GenericMongo):
506 collection_name: ClassVar[str] = "device_setups"
508 device_ids: list[str]
509 active: bool = False
510 variable_mapping: dict[str, str]
513DeviceSetupUpdate = create_update_model(DeviceSetup)
516class DeviceState(GenericMongo):
517 collection_name: ClassVar[str] = "devices_states"
519 timestamp: float
520 mode: str | None = None
521 load: float | None = None
522 tokens: list[int] = Field(default_factory=list)
523 config_id: str | None = None
524 modified_properties: list[str] = Field(default_factory=list)
526 @classmethod
527 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]:
528 req_filter = query.mongodb_filter()
529 items = []
530 if ":" in query.sort_by:
531 sort_field, sort_order = query.sort_by.split(":")
532 sort_order = int(sort_order)
533 else:
534 sort_field = query.sort_by
535 sort_order = 1
536 collection = get_collection(devices_states_database, device_id)
537 if collection is None:
538 total = 0
539 cursor = []
540 else:
541 total = collection.count_documents(req_filter)
542 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
543 if (query.limit is not None) and (query.limit != 0):
544 cursor = cursor.limit(query.limit)
545 for item_dict in cursor:
546 items.append(
547 cls(
548 timestamp=item_dict.get("precise_timestamp"),
549 mode=item_dict.get("mode"),
550 load=item_dict.get("load"),
551 tokens=item_dict.get("tokens", Field(default_factory=list)),
552 config_id=item_dict.get("config_id"),
553 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
554 )
555 )
556 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
559class SignalSample(TwinPadModel):
560 signal_id: str
561 timestamp: float
562 value: float | int | str | bool | None
563 forced_value: float | int | str | bool | None = None
565 @classmethod
566 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
567 collection = get_signal_collection(signal_id)
568 real_signal_id = signal_id
570 if collection is None:
571 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
572 if device is not None:
573 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
574 collection = get_signal_collection(real_signal_id)
576 if collection is None:
577 return None
579 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
580 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
581 bucket = get_signal_collection(f"system.buckets.{real_signal_id}")
582 first_bucket = None
583 if bucket is not None:
584 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
585 if first_bucket is not None:
586 sample_data = collection.find_one(
587 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
588 )
589 else:
590 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
592 if sample_data is None:
593 return None
595 timestamp = sample_data["precise_timestamp"]
597 return cls(
598 signal_id=real_signal_id,
599 timestamp=timestamp,
600 value=sample_data.get("value", None),
601 forced_value=sample_data.get("forced_value", None),
602 )
604 @classmethod
605 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
606 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
608 @classmethod
609 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
610 last_value_collection = get_signal_collection("last_values", True)
611 signals_collection = get_collection(systems_database, "signals", create=True)
612 signal_collection = get_signal_collection(signal_id)
613 real_signal_id = signal_id
615 if signal_collection is None:
616 if device is None:
617 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
618 if device is not None:
619 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
621 sample_data = last_value_collection.find_one({"signal_id": real_signal_id}, sort={"precise_timestamp": -1})
623 # If there is no data, check if the signal's type is anything other than float, as they don't send duplicate values
624 if sample_data is None:
625 if signals_collection.count_documents({"status.status": "up", "signal_id": real_signal_id}) < 1:
626 return None
628 # Same workaround as above function, very effective to narrow down big sets of data
629 bucket = get_signal_collection(f"system.buckets.{real_signal_id}")
630 last_bucket = None
631 if bucket is not None:
632 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
633 signal_collection = get_signal_collection(real_signal_id)
634 if last_bucket is not None and signal_collection is not None:
635 sample_data = signal_collection.find_one(
636 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
637 sort={"precise_timestamp": -1},
638 )
639 else:
640 sample_data = signal_collection.find_one({}, sort={"precise_timestamp": -1})
642 if sample_data is None:
643 return None
645 timestamp = sample_data.get("precise_timestamp")
646 # Align the timestamp with the device's last ping, cannot align with current time to avoid false reports if device is down
647 if device is None:
648 device = Device.get_from_device_or_config_id(real_signal_id.split(".")[0])
649 if device is not None and device.last_ping is not None:
650 if timestamp is None:
651 timestamp = device.last_ping
652 else:
653 timestamp = max(timestamp, device.last_ping)
654 else:
655 timestamp = sample_data.get("precise_timestamp")
657 return cls(
658 signal_id=real_signal_id,
659 timestamp=timestamp,
660 value=sample_data.get("value", None),
661 forced_value=sample_data.get("forced_value", None),
662 )
664 @classmethod
665 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
666 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
667 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
670class SignalData(TwinPadModel):
671 signal_id: str
672 forcible: bool = True
673 time_vector: list[float]
674 values: list[float | int | str | None]
675 forced_values: list[float | int | str | None]
677 data_start: float | None = None
678 data_end: float | None = None
680 number_samples: int = 0
681 number_samples_db: int = 0
683 db_query_time: float = 0.0
684 init_time: float = 0.0
685 data_processing_time: float = 0.0
687 phase_id: str | None = None
689 @classmethod
690 def get_from_signal_id(
691 cls,
692 signal_id: str,
693 min_timestamp: float = None,
694 max_timestamp: float = None,
695 window_min_timestamp: float = None,
696 window_max_timestamp: float = None,
697 interpolate_bounds: bool = True,
698 max_documents: int = None,
699 collection=None,
700 ) -> Self:
702 now = time.time()
704 req_signal = {}
705 if min_timestamp is not None:
706 req_signal.setdefault("timestamp", {})
707 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
708 if max_timestamp is not None:
709 req_signal.setdefault("timestamp", {})
710 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
712 if collection is None:
713 collection = get_signal_collection(signal_id)
715 real_signal_id = signal_id
717 if collection is None:
718 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
719 if device is not None:
720 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
721 collection = get_signal_collection(real_signal_id)
723 if collection is None:
724 return cls(
725 signal_id=real_signal_id,
726 time_vector=[],
727 values=[],
728 forced_values=[],
729 number_samples=0,
730 number_samples_db=0,
731 )
733 db_req_start = time.time()
735 sort_step = {"$sort": {"precise_timestamp": 1}}
736 number_results = collection.count_documents(req_signal)
738 pipeline = []
739 if req_signal:
740 pipeline.append({"$match": req_signal}) # Filter data if needed
742 pipeline.extend(
743 [
744 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
745 sort_step,
746 ]
747 )
749 if max_documents is not None and max_documents < number_results:
750 unsampling_ratio = math.ceil(number_results / max_documents)
751 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
752 pipeline.extend(
753 [
754 {
755 "$setWindowFields": {
756 "sortBy": {"precise_timestamp": 1},
757 "output": {"index": {"$documentNumber": {}}},
758 }
759 },
760 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
761 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
762 {"$replaceRoot": {"newRoot": "$doc"}},
763 {"$unset": ["index", "group_id"]},
764 {"$sort": {"precise_timestamp": 1}},
765 ]
766 )
768 # logger.info(f"pipeline: %s", str(pipeline))
769 cursor = collection.aggregate(pipeline)
770 db_req_time = time.time() - db_req_start
772 init_time = time.time()
774 results = cursor.to_list()
775 time_vector = []
776 values = []
777 forced_values = []
778 for s in results:
779 time_vector.append(s["precise_timestamp"])
780 values.append(s.get("value", None))
781 forced_values.append(s.get("forced_value", None))
783 signal = Signal.get_from_signal_id(real_signal_id)
784 if signal is None:
785 return cls(
786 signal_id=real_signal_id,
787 time_vector=[],
788 values=[],
789 forced_values=[],
790 number_samples=0,
791 number_samples_db=0,
792 )
793 class_ = signal.signal_data_class
795 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
796 time_vector, values, forced_values = cls.interpolate_bounds(
797 class_,
798 collection,
799 real_signal_id,
800 time_vector,
801 values,
802 forced_values,
803 window_min_timestamp,
804 window_max_timestamp,
805 )
807 if values:
808 # TODO: check below. a bit strange
809 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
810 # Adding last value as it should be repeated
811 time_vector.append(now)
812 values.append(values[-1])
813 forced_values.append(forced_values[-1])
815 init_time = time.time() - init_time
817 # See line 292 for explanation
818 bucket = get_signal_collection(f"system.buckets.{real_signal_id}")
819 first_bucket = None
820 if bucket is not None:
821 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
822 if first_bucket is not None:
823 data_start = first_bucket["control"]["min"]["precise_timestamp"]
824 else:
825 data_start = None
827 last_bucket = None
828 if bucket is not None:
829 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
830 if last_bucket is not None:
831 data_end = last_bucket["control"]["max"]["precise_timestamp"]
832 else:
833 data_end = None
835 return class_(
836 signal_id=real_signal_id,
837 forcible=signal.forcible,
838 time_vector=time_vector,
839 values=values,
840 forced_values=forced_values,
841 data_start=data_start,
842 data_end=data_end,
843 number_samples=len(values),
844 number_samples_db=number_results,
845 db_query_time=db_req_time,
846 init_time=init_time,
847 )
849 @staticmethod
850 def interpolate_bounds(
851 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
852 ):
853 sample_right = None
854 # Fetching right side value & interpolation
855 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
856 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
857 sample_right = collection.find_one(
858 {
859 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
860 "value": {"$exists": True},
861 },
862 sort={"precise_timestamp": -1},
863 )
864 if sample_right:
865 if time_vector:
866 right_sd = class_(
867 signal_id=signal_id,
868 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
869 values=[values[-1], sample_right.get("value", None)],
870 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
871 )
872 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
873 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
874 else:
875 max_ts_value = sample_right.get("value", None)
876 max_ts_forced_value = sample_right.get("forced_value", None)
877 time_vector.append(window_max_timestamp)
878 values.append(max_ts_value)
879 forced_values.append(max_ts_forced_value)
881 # Fetching left side value & interpolation
882 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
883 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
884 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
885 sample_left = sample_right
886 sample_left = collection.find_one(
887 {
888 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
889 "value": {"$exists": True},
890 },
891 sort={"precise_timestamp": -1},
892 )
894 if sample_left:
895 if time_vector:
896 left_sd = class_(
897 signal_id=signal_id,
898 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
899 values=[sample_left["value"], values[0]],
900 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
901 )
902 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
903 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
904 else:
905 min_ts_value = sample_left.get("value", None)
906 min_ts_forced_value = sample_left.get("forced_value", None)
907 time_vector.insert(0, window_min_timestamp)
908 values.insert(0, min_ts_value)
909 forced_values.insert(0, min_ts_forced_value)
911 return time_vector, values, forced_values
913 def interpolate_values(self, new_time_vector: list[float]):
914 return self.interpolate(new_time_vector, self.values)
916 def interpolate_forced_values(self, new_time_vector: list[float]):
917 return self.interpolate(new_time_vector, self.forced_values)
919 def uniform_desampling(self, number_samples_max: int) -> Self:
920 data_processing_time = time.time()
921 if number_samples_max and self.number_samples > number_samples_max:
922 new_time_vector = npy.linspace(
923 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
924 ).tolist()
925 values = self.interpolate_values(new_time_vector)
926 forced_values = self.interpolate_forced_values(new_time_vector)
927 time_vector = new_time_vector
928 number_samples = len(time_vector)
929 else:
930 time_vector = self.time_vector
931 number_samples = len(self.values)
932 values = self.values[:]
933 forced_values = self.forced_values[:]
934 data_processing_time = time.time() - data_processing_time
936 return self.__class__(
937 signal_id=self.signal_id,
938 time_vector=time_vector,
939 values=values,
940 forced_values=forced_values,
941 number_samples=number_samples,
942 number_samples_db=self.number_samples,
943 data_start=self.data_start,
944 data_end=self.data_end,
945 db_query_time=self.db_query_time,
946 init_time=self.init_time,
947 data_processing_time=self.data_processing_time + data_processing_time,
948 phase_id=self.phase_id,
949 )
951 def min_max_downsampling(self, number_samples_max: int) -> Self:
952 return self.uniform_desampling(number_samples_max)
954 def interest_window_desampling(
955 self,
956 window_max_number_samples: int,
957 outside_max_number_samples: int,
958 window_min_timestamp: float | None = None,
959 window_max_timestamp: float | None = None,
960 ) -> Self:
961 """Performs a sampling in a window of interest and outside."""
963 if not self.time_vector:
964 return self
966 if window_min_timestamp is None:
967 window_min_timestamp = self.time_vector[0]
968 if window_max_timestamp is None:
969 window_max_timestamp = self.time_vector[-1]
971 data_processing_time = time.time()
973 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
974 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
976 time_vector_before = self.time_vector[:index_window_start]
977 time_vector_window = self.time_vector[index_window_start:index_window_end]
978 time_vector_after = self.time_vector[index_window_end:]
980 # Resampling window
981 if time_vector_window:
982 # Ensurring window bounds
983 if time_vector_window[0] != window_min_timestamp:
984 time_vector_window.insert(0, window_min_timestamp)
985 if time_vector_window[-1] != window_max_timestamp:
986 time_vector_window.append(window_max_timestamp)
987 else:
988 time_vector_window = [window_min_timestamp, window_max_timestamp]
990 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
991 # Resampling
992 new_window_time_vector = npy.linspace(
993 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
994 ).tolist()
995 time_vector_window = new_window_time_vector
997 # Resampling outside
998 number_samples_before = len(time_vector_before)
999 number_samples_after = len(time_vector_after)
1000 if (
1001 outside_max_number_samples is not None
1002 and (number_samples_before + number_samples_after) > outside_max_number_samples
1003 ):
1004 new_number_samples_before = min(
1005 number_samples_before,
1006 math.ceil(
1007 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1008 ),
1009 )
1010 new_number_samples_after = min(
1011 number_samples_after,
1012 math.ceil(
1013 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1014 ),
1015 )
1016 # Adjusting numbers as math.ceil can do +1 on sum
1017 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1018 if new_number_samples_before > new_number_samples_after:
1019 new_number_samples_before -= 1
1020 else:
1021 new_number_samples_after -= 1
1023 if new_number_samples_before > 0:
1024 new_time_vector_before = npy.linspace(
1025 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
1026 ).tolist()
1027 time_vector_before = new_time_vector_before
1029 if new_number_samples_after > 0:
1030 new_time_vector_after = npy.linspace(
1031 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
1032 ).tolist()[::-1]
1033 time_vector_after = new_time_vector_after
1035 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1036 values = self.interpolate_values(new_time_vector)
1037 forced_values = self.interpolate_forced_values(new_time_vector)
1038 number_samples = len(values)
1040 data_processing_time = time.time() - data_processing_time
1042 return self.__class__(
1043 signal_id=self.signal_id,
1044 forcible=self.forcible,
1045 time_vector=new_time_vector,
1046 values=values,
1047 forced_values=forced_values,
1048 number_samples=number_samples,
1049 number_samples_db=self.number_samples,
1050 data_start=self.data_start,
1051 data_end=self.data_end,
1052 db_query_time=self.db_query_time,
1053 init_time=self.init_time,
1054 data_processing_time=self.data_processing_time + data_processing_time,
1055 )
1057 def zero_time_vector(self, data_start: float):
1058 data_processing_time = time.time()
1059 if len(self.time_vector) == 0:
1060 return self
1061 time_vector = npy.array(self.time_vector) - data_start
1062 data_processing_time = time.time() - data_processing_time
1064 return self.__class__(
1065 signal_id=self.signal_id,
1066 time_vector=time_vector,
1067 values=self.values,
1068 forced_values=self.forced_values,
1069 number_samples=self.number_samples,
1070 number_samples_db=self.number_samples_db,
1071 data_start=time_vector[0],
1072 data_end=time_vector[-1],
1073 db_query_time=self.db_query_time,
1074 init_time=self.init_time,
1075 data_processing_time=self.data_processing_time + data_processing_time,
1076 )
1078 def csv_export(self):
1079 output = io.StringIO()
1080 writer = csv.writer(output)
1081 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
1082 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
1083 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
1084 return output.getvalue().encode("utf-8")
1086 def prestoplot_export(self):
1087 clean_signal_id = self.signal_id.replace(".", "_")
1088 if clean_signal_id[0].isnumeric():
1089 clean_signal_id = "_" + clean_signal_id
1091 output = io.StringIO()
1092 output.write("# Encoding:\tUTF-8\n")
1093 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
1094 output.write("ISO8601\tnone\tnone\n")
1095 output.write(f"# Description :\t{clean_signal_id}\n")
1097 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
1098 output.write(
1099 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
1100 )
1101 return output.getvalue().encode("utf-8")
1104class NumericSignalData(SignalData):
1105 data_type: str = "float"
1106 values: list[float | int | None]
1107 forced_values: list[float | int | None]
1109 def interpolate(self, new_time_vector: list[float], items):
1110 items = [npy.nan if s is None else s for s in items]
1111 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
1113 def uniform_desampling(self, number_samples_max: int) -> Self:
1114 data_processing_time = time.time()
1115 if number_samples_max and self.number_samples > number_samples_max:
1116 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
1117 forced_values = self.interpolate_forced_values(time_vector)
1118 number_samples = len(time_vector)
1119 else:
1120 time_vector = self.time_vector
1121 number_samples = len(self.values)
1122 values = self.values[:]
1123 forced_values = self.forced_values[:]
1124 data_processing_time = time.time() - data_processing_time
1126 return self.__class__(
1127 signal_id=self.signal_id,
1128 time_vector=time_vector,
1129 values=values,
1130 forced_values=forced_values,
1131 number_samples=number_samples,
1132 number_samples_db=self.number_samples,
1133 data_start=self.data_start,
1134 data_end=self.data_end,
1135 db_query_time=self.db_query_time,
1136 init_time=self.init_time,
1137 data_processing_time=self.data_processing_time + data_processing_time,
1138 )
1140 def min_max_downsampling(self, number_samples_max: int) -> Self:
1141 if self.number_samples < number_samples_max:
1142 return self
1144 data_processing_time = time.time()
1146 number_bins = number_samples_max // 2
1148 time_vector = npy.array(self.time_vector, dtype=npy.float64)
1149 values = npy.array(self.values, dtype=npy.float64)
1150 forced_values = npy.array(self.forced_values, dtype=npy.float64)
1152 points_per_bin = self.number_samples // number_bins
1154 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above
1155 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for
1156 if self.number_samples - number_bins * points_per_bin > 1:
1157 points_per_bin += 1
1158 number_bins = self.number_samples // points_per_bin + 1
1159 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan)
1160 time_vector = npy.concatenate([time_vector, nan_points_to_add])
1161 values = npy.concatenate([values, nan_points_to_add])
1162 forced_values = npy.concatenate([forced_values, nan_points_to_add])
1164 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin)
1165 values_matrix = values.reshape(number_bins, points_per_bin)
1166 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin)
1168 indexes_min = npy.zeros(number_bins, dtype="int64")
1169 indexes_max = npy.zeros(number_bins, dtype="int64")
1171 for row in range(number_bins):
1172 min_value = values_matrix[row, 0]
1173 max_value = values_matrix[row, 0]
1174 for column in range(points_per_bin):
1175 if values_matrix[row, column] < min_value:
1176 min_value = values_matrix[row, column]
1177 indexes_min[row] = column
1178 elif values_matrix[row, column] > max_value:
1179 max_value = values_matrix[row, column]
1180 indexes_max[row] = column
1182 row_index = npy.repeat(npy.arange(number_bins), 2)
1183 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel()
1185 data_processing_time = time.time() - data_processing_time
1187 new_time_vector = timestamps_matrix[row_index, column_index]
1188 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector)
1189 new_values = values_matrix[row_index, column_index]
1190 new_values = npy.where(npy.isnan(new_values), None, new_values)
1191 new_forced_values = forced_values_matrix[row_index, column_index]
1192 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1194 # Make sure there are no None values for the time vector
1195 time_vector_filter = new_time_vector != None
1196 new_time_vector = new_time_vector[time_vector_filter]
1197 new_values = new_values[time_vector_filter]
1198 new_forced_values = new_forced_values[time_vector_filter]
1200 return self.__class__(
1201 signal_id=self.signal_id,
1202 time_vector=new_time_vector,
1203 values=new_values,
1204 forced_values=new_forced_values,
1205 number_samples=number_bins * 2,
1206 number_samples_db=self.number_samples_db,
1207 data_start=self.data_start,
1208 data_end=self.data_end,
1209 db_query_time=self.db_query_time,
1210 init_time=self.init_time,
1211 data_processing_time=self.data_processing_time + data_processing_time,
1212 phase_id=self.phase_id,
1213 )
1215 def interest_window_desampling(
1216 self,
1217 window_max_number_samples: int,
1218 outside_max_number_samples: int,
1219 window_min_timestamp: float | None = None,
1220 window_max_timestamp: float | None = None,
1221 ) -> Self:
1222 """Performs a sampling in a window of interest and outside."""
1224 if not self.time_vector:
1225 return self
1227 if window_min_timestamp is None:
1228 window_min_timestamp = self.time_vector[0]
1229 if window_max_timestamp is None:
1230 window_max_timestamp = self.time_vector[-1]
1232 data_processing_time = time.time()
1234 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1235 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1237 time_vector_before = self.time_vector[:index_window_start]
1238 time_vector_window = self.time_vector[index_window_start:index_window_end]
1239 time_vector_after = self.time_vector[index_window_end:]
1241 values_before = self.values[:index_window_start]
1242 values_window = self.values[index_window_start:index_window_end]
1243 values_after = self.values[index_window_end:]
1244 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1245 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1247 # Resampling window
1248 if time_vector_window:
1249 # Ensurring window bounds
1250 if time_vector_window[0] != window_min_timestamp:
1251 time_vector_window.insert(0, window_min_timestamp)
1252 values_window.insert(0, window_min_value)
1253 if time_vector_window[-1] != window_max_timestamp:
1254 time_vector_window.append(window_max_timestamp)
1255 values_window.append(window_max_value)
1256 else:
1257 time_vector_window = [window_min_timestamp, window_max_timestamp]
1258 values_window = [window_min_value, window_max_value]
1260 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
1261 # Resampling
1262 time_vector_window, values_window = downsample_list(
1263 time_vector_window, values_window, window_max_number_samples
1264 )
1266 # Resampling outside
1267 number_samples_before = len(time_vector_before)
1268 number_samples_after = len(time_vector_after)
1269 if (
1270 outside_max_number_samples is not None
1271 and (number_samples_before + number_samples_after) > outside_max_number_samples
1272 ):
1273 new_number_samples_before = min(
1274 number_samples_before,
1275 math.ceil(
1276 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1277 ),
1278 )
1279 new_number_samples_after = min(
1280 number_samples_after,
1281 math.ceil(
1282 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1283 ),
1284 )
1285 # Adjusting numbers as math.ceil can do +1 on sum
1286 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1287 if new_number_samples_before > new_number_samples_after:
1288 new_number_samples_before -= 1
1289 else:
1290 new_number_samples_after -= 1
1292 if new_number_samples_before > 0:
1293 time_vector_before, values_before = downsample_list(
1294 time_vector_before, values_before, new_number_samples_before
1295 )
1297 if new_number_samples_after > 0:
1298 time_vector_after, values_after = downsample_list(
1299 time_vector_after, values_after, new_number_samples_after
1300 )
1302 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1303 values = values_before + values_window + values_after
1304 forced_values = self.interpolate_forced_values(new_time_vector)
1305 number_samples = len(values)
1307 data_processing_time = time.time() - data_processing_time
1309 return self.__class__(
1310 signal_id=self.signal_id,
1311 time_vector=new_time_vector,
1312 values=values,
1313 forced_values=forced_values,
1314 number_samples=number_samples,
1315 number_samples_db=self.number_samples,
1316 data_start=self.data_start,
1317 data_end=self.data_end,
1318 db_query_time=self.db_query_time,
1319 init_time=self.init_time,
1320 data_processing_time=self.data_processing_time + data_processing_time,
1321 )
1324class StringSignalData(SignalData):
1325 data_type: str = "str"
1326 values: list[str | None]
1327 forced_values: list[str | None]
1329 def interpolate(self, new_time_vector: list[float], items):
1330 # Find the indices of the values in xp that are just smaller or equal to x
1331 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1332 indices = npy.clip(indices, 0, len(items) - 1)
1333 # Return the corresponding left string values from fp
1334 return [items[i] for i in indices]
1337class SignalsData(TwinPadModel):
1338 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1339 data_processing_time: float
1340 data_start: float | None
1341 data_end: float | None
1343 @classmethod
1344 def get_from_signal_ids(
1345 cls,
1346 signal_ids: list[str],
1347 min_timestamp: float = None,
1348 max_timestamp: float = None,
1349 window_min_timestamp: float = None,
1350 window_max_timestamp: float = None,
1351 interpolate_bounds: bool = True,
1352 max_documents: int = None,
1353 ) -> Self:
1354 signals_data = []
1355 data_start = None
1356 data_end = None
1357 if max_timestamp is None:
1358 max_timestamp = time.time()
1359 data_processing_time = 0.0
1361 signal_collections = get_signal_collections_batch(signal_ids)
1363 for signal_id, collection in zip(signal_ids, signal_collections):
1364 signal_data = SignalData.get_from_signal_id(
1365 signal_id=signal_id,
1366 min_timestamp=min_timestamp,
1367 max_timestamp=max_timestamp,
1368 window_min_timestamp=window_min_timestamp,
1369 window_max_timestamp=window_max_timestamp,
1370 interpolate_bounds=interpolate_bounds,
1371 max_documents=max_documents,
1372 collection=collection,
1373 )
1374 data_processing_time += signal_data.data_processing_time
1375 signals_data.append(signal_data)
1376 if signal_data.data_start is not None:
1377 if data_start is None:
1378 data_start = signal_data.data_start
1379 else:
1380 data_start = min(signal_data.data_start, data_start)
1381 if signal_data.data_end is not None:
1382 if data_end is None:
1383 data_end = signal_data.data_end
1384 else:
1385 data_end = max(signal_data.data_end, data_end)
1387 return cls(
1388 signals_data=signals_data,
1389 data_processing_time=data_processing_time,
1390 data_start=data_start,
1391 data_end=data_end,
1392 )
1394 @classmethod
1395 def get_from_phase_and_signal_ids(
1396 cls,
1397 phases: list,
1398 phase_sync_times: list[float | None],
1399 signal_ids: list[str],
1400 window_min_timestamps: list[float | None],
1401 window_max_timestamps: list[float | None],
1402 zero_time_vector: bool = True,
1403 ):
1404 signals_data: list[SignalData] = []
1405 computation_start = time.time()
1407 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip(
1408 phases, phase_sync_times, window_min_timestamps, window_max_timestamps
1409 ):
1410 min_timestamp = phase.start_at / 1000
1411 max_timestamp = phase.end_at / 1000
1413 if sync_time is None:
1414 sync_time = min_timestamp
1416 if window_max_timestamp is not None and window_min_timestamp is not None:
1417 window_length = window_max_timestamp - window_min_timestamp
1419 if window_min_timestamp != min_timestamp:
1420 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20)
1421 if window_max_timestamp != max_timestamp:
1422 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20)
1424 signal_collections = get_signal_collections_batch(signal_ids)
1426 for signal_id, collection in zip(signal_ids, signal_collections):
1427 signal_data = SignalData.get_from_signal_id(
1428 signal_id,
1429 min_timestamp,
1430 max_timestamp,
1431 window_min_timestamp,
1432 window_max_timestamp,
1433 interpolate_bounds=False,
1434 max_documents=None,
1435 collection=collection,
1436 )
1438 if len(signal_data.time_vector) == 0:
1439 continue
1441 if zero_time_vector:
1442 signal_data = signal_data.zero_time_vector(sync_time)
1443 signal_data.phase_id = phase.id
1445 signals_data.append(signal_data)
1447 return cls(
1448 signals_data=signals_data,
1449 data_processing_time=time.time() - computation_start,
1450 data_start=0,
1451 data_end=0,
1452 )
1454 def uniform_desampling(self, number_samples_max: int) -> Self:
1455 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1456 return SignalsData(
1457 signals_data=signals_data,
1458 data_processing_time=sum(s.data_processing_time for s in signals_data),
1459 data_start=self.data_start,
1460 data_end=self.data_end,
1461 )
1463 def min_max_downsampling(self, number_samples_max: int) -> Self:
1464 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data]
1465 return SignalsData(
1466 signals_data=signals_data,
1467 data_processing_time=sum(s.data_processing_time for s in signals_data),
1468 data_start=self.data_start,
1469 data_end=self.data_end,
1470 )
1472 def interest_window_desampling(
1473 self,
1474 window_max_number_samples: int,
1475 outside_max_number_samples: int,
1476 window_min_timestamp: float = None,
1477 window_max_timestamp: float = None,
1478 ) -> Self:
1479 signals_data = [
1480 s.interest_window_desampling(
1481 window_max_number_samples=window_max_number_samples,
1482 outside_max_number_samples=outside_max_number_samples,
1483 window_min_timestamp=window_min_timestamp,
1484 window_max_timestamp=window_max_timestamp,
1485 )
1486 for s in self.signals_data
1487 ]
1489 return SignalsData(
1490 signals_data=signals_data,
1491 data_processing_time=sum(s.data_processing_time for s in signals_data),
1492 data_start=self.data_start,
1493 data_end=self.data_end,
1494 )
1496 def zero_time_vector(self, data_start: float):
1497 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data]
1498 return SignalsData(
1499 signals_data=signals_data,
1500 data_processing_time=sum(s.data_processing_time for s in signals_data),
1501 data_start=0,
1502 data_end=max([s.data_end for s in signals_data]),
1503 )
1505 @classmethod
1506 async def apply_single_function(
1507 cls,
1508 phase,
1509 base_signal_id: str,
1510 function: SINGLE_POST_PROCESSING_FUNCTION,
1511 window_min_timestamp: float = None,
1512 window_max_timestamp: float = None,
1513 ):
1514 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}"
1516 processed_result_signal = Signal.get_from_signal_id(signal_id)
1517 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids:
1518 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp)
1520 signals_data = cls.get_from_phase_and_signal_ids(
1521 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False
1522 )
1524 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0:
1525 return None
1527 new_values = None
1528 new_forced_values = None
1529 time_vector = npy.array(signals_data.signals_data[0].time_vector)
1530 values = signals_data.signals_data[0].values
1531 forced_values = signals_data.signals_data[0].forced_values
1533 match (function):
1534 case "Cumul":
1535 new_values = cumul(values)
1536 new_forced_values = cumul(forced_values)
1537 # case "CumulDistrib":
1538 # new_values = cumul_distrib(values)
1539 # new_forced_values = cumul_distrib(forced_values)
1540 case "Delta":
1541 new_values = delta(values)
1542 new_forced_values = delta(forced_values)
1543 case "DeltaT":
1544 new_values = delta(time_vector)
1545 new_forced_values = new_values
1546 case "Derive":
1547 new_values = derive(time_vector, values)
1548 new_forced_values = derive(time_vector, forced_values)
1549 case "Integ":
1550 new_values = integ(time_vector, values)
1551 new_forced_values = integ(time_vector, forced_values)
1553 new_values = npy.where(npy.isnan(new_values), None, new_values)
1554 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1556 loop = asyncio.get_running_loop()
1557 loop.create_task(
1558 cls.save_function_signal(
1559 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible
1560 )
1561 )
1563 if window_max_timestamp is not None:
1564 max_timestamp_mask = time_vector <= window_max_timestamp
1565 time_vector = time_vector[max_timestamp_mask]
1566 new_values = new_values[max_timestamp_mask]
1567 new_forced_values = new_forced_values[max_timestamp_mask]
1568 if window_min_timestamp is not None:
1569 min_timestamp_mask = time_vector >= window_min_timestamp
1570 time_vector = time_vector[min_timestamp_mask]
1571 new_values = new_values[min_timestamp_mask]
1572 new_forced_values = new_forced_values[min_timestamp_mask]
1574 signals_data.signals_data[0].time_vector = time_vector.tolist()
1575 signals_data.signals_data[0].values = new_values.tolist()
1576 signals_data.signals_data[0].forced_values = new_forced_values.tolist()
1577 signals_data.signals_data[0].number_samples = time_vector.size
1579 signals_data.signals_data[0].signal_id = signal_id
1581 return signals_data
1583 @classmethod
1584 async def apply_multiple_function(
1585 cls,
1586 phases: list,
1587 signal_ids: list,
1588 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
1589 window_min_timestamp: float = None,
1590 window_max_timestamp: float = None,
1591 ):
1592 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION):
1593 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}"
1594 else:
1595 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}"
1597 active_phase = phases[0]
1598 if function in {"Align-X", "Using-X"}:
1599 active_phase = phases[1]
1601 processed_result_signal = Signal.get_from_signal_id(function_signal_id)
1602 if processed_result_signal is not None and (
1603 active_phase.id in processed_result_signal.computed_phases_ids
1604 ): # If signal has been computed for the correct phase
1605 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp)
1607 array_length = None
1608 time_vector_list = []
1609 values_list = []
1610 forced_values_list = []
1611 forcible = True
1612 for phase, signal_id in zip(phases, signal_ids):
1613 signals_data = cls.get_from_phase_and_signal_ids(
1614 [phase], [None], [signal_id], [None], [None], zero_time_vector=False
1615 )
1617 if len(signals_data.signals_data) == 0:
1618 return None
1620 signal_data = signals_data.signals_data[0]
1622 if array_length is None:
1623 array_length = signal_data.number_samples
1624 if (
1625 array_length != signal_data.number_samples and function != "Align-X"
1626 ) or signal_data.number_samples == 0:
1627 return None
1629 time_vector_list.append(npy.array(signal_data.time_vector))
1630 values_list.append(npy.array(signal_data.values, dtype=npy.float64))
1631 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64))
1632 forcible = forcible and signal_data.forcible
1634 time_vector = time_vector_list[0]
1635 new_values = None
1636 new_forced_values = None
1638 match (function):
1639 case "Align-X":
1640 time_vector = time_vector_list[1]
1641 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000
1642 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000
1643 new_values = align_x(old_time_vector, values_list[0], new_time_vector)
1644 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector)
1645 # case "Atan2":
1646 # new_values = atan2(values_list[0], values_list[1])
1647 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1])
1648 case "Using-X":
1649 if len(time_vector_list[0]) != len(time_vector_list[1]):
1650 return None
1651 time_vector = time_vector_list[1]
1652 new_values = values_list[0]
1653 new_forced_values = forced_values_list[0]
1654 case "Mean":
1655 new_values = mean(*values_list)
1656 new_forced_values = mean(*forced_values_list)
1657 case "Norm":
1658 new_values = norm(*values_list)
1659 new_forced_values = norm(*forced_values_list)
1661 new_values = npy.where(npy.isnan(new_values), None, new_values)
1662 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1664 loop = asyncio.get_running_loop()
1665 loop.create_task(
1666 cls.save_function_signal(
1667 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible
1668 )
1669 )
1671 total_number_samples = time_vector.size
1673 if window_max_timestamp is not None:
1674 max_timestamp_mask = time_vector <= window_max_timestamp
1675 time_vector = time_vector[max_timestamp_mask]
1676 new_values = new_values[max_timestamp_mask]
1677 new_forced_values = new_forced_values[max_timestamp_mask]
1678 if window_min_timestamp is not None:
1679 min_timestamp_mask = time_vector >= window_min_timestamp
1680 time_vector = time_vector[min_timestamp_mask]
1681 new_values = new_values[min_timestamp_mask]
1682 new_forced_values = new_forced_values[min_timestamp_mask]
1684 signals_data = cls(
1685 signals_data=[
1686 NumericSignalData(
1687 signal_id=function_signal_id,
1688 forcible=forcible,
1689 time_vector=time_vector.tolist(),
1690 values=new_values.tolist(),
1691 forced_values=new_forced_values.tolist(),
1692 number_samples=time_vector.size,
1693 number_samples_db=total_number_samples,
1694 )
1695 ],
1696 data_processing_time=0,
1697 data_start=0,
1698 data_end=0,
1699 )
1701 return signals_data
1703 @classmethod
1704 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float):
1705 signal_data_collection = get_signal_collection(signal_id, create=True)
1706 pipeline = []
1707 match_filter = {}
1708 if window_min_timestamp is not None or window_max_timestamp is not None:
1709 match_filter["$match"] = {}
1710 match_filter["$match"]["precise_timestamp"] = {}
1711 if window_max_timestamp is not None:
1712 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp
1713 if window_min_timestamp is not None:
1714 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp
1716 total_number_samples = signal_data_collection.count_documents({})
1718 if match_filter:
1719 pipeline.append(match_filter)
1721 fetch_start = time.time()
1723 samples = signal_data_collection.aggregate(pipeline).to_list()
1724 new_time_vector = []
1725 new_values = []
1726 new_forced_values = []
1727 for sample in samples:
1728 new_time_vector.append(sample["precise_timestamp"])
1729 new_values.append(sample["value"])
1730 new_forced_values.append(sample["forced_value"])
1732 return cls(
1733 signals_data=[
1734 NumericSignalData(
1735 signal_id=signal_id,
1736 time_vector=new_time_vector,
1737 values=new_values,
1738 forced_values=new_forced_values,
1739 number_samples=len(new_time_vector),
1740 number_samples_db=total_number_samples,
1741 )
1742 ],
1743 data_processing_time=time.time() - fetch_start,
1744 data_start=0,
1745 data_end=0,
1746 )
1748 @classmethod
1749 async def save_function_signal(
1750 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool
1751 ):
1752 # Insert data first so if it is requested by another user, it will be computed again
1753 signal_collection = get_signal_collection(function_signal_id, create=True)
1754 signal_collection.delete_many(
1755 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}}
1756 )
1757 signal_collection.insert_many(
1758 [
1759 {
1760 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]),
1761 "precise_timestamp": time_vector[i],
1762 "value": new_values[i],
1763 "forced_value": new_forced_values[i],
1764 }
1765 for i in range(len(time_vector))
1766 ]
1767 )
1769 signals_config_collection = get_collection(systems_database, "signals", create=True)
1770 signals_config_collection.find_one_and_update(
1771 {"signal_id": function_signal_id},
1772 {
1773 "$set": {
1774 "description": "",
1775 "unit": None,
1776 "type": "sensor",
1777 "address": None,
1778 "frequency": round(1 / max(npy.mean(delta(time_vector)), 1)), # avoid division by 0
1779 "transfer_function": None,
1780 "precision_digits": None,
1781 "digitization_function": None,
1782 "data_type": "float",
1783 "formula": None,
1784 "forcible": forcible,
1785 "commandable": False,
1786 "broadcastable": True,
1787 "signal_id": function_signal_id,
1788 "post_processing": True,
1789 },
1790 "$push": {"computed_phases_ids": phase.id},
1791 },
1792 upsert=True,
1793 )
1795 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []):
1796 if post_processing:
1797 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1798 zip_buffer = io.BytesIO()
1799 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1800 for signal_data in self.signals_data:
1801 file_name = signal_data.signal_id
1802 if post_processing:
1803 phase = phases_by_id.get(
1804 signal_data.phase_id,
1805 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1806 )
1807 file_name = f"{signal_data.signal_id} ({phase.name})"
1808 if file_format == "csv":
1809 export_io = signal_data.csv_export()
1810 zip_file.writestr(f"{file_name}.csv", export_io)
1811 elif file_format == "prestoplot":
1812 export_io = signal_data.prestoplot_export()
1813 zip_file.writestr(f"{file_name}.tab", export_io)
1814 else:
1815 raise ValueError(f"Format not found. Got: {file_format}")
1816 zip_bytes = zip_buffer.getvalue()
1817 return zip_bytes
1819 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []):
1820 if post_processing:
1821 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1822 hdf5_buffer = io.BytesIO()
1823 custom_type_float = npy.dtype(
1824 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1825 )
1826 custom_type_string = npy.dtype(
1827 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1828 )
1829 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1830 for signal_data in self.signals_data:
1831 if post_processing:
1832 phase = phases_by_id.get(
1833 signal_data.phase_id,
1834 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1835 )
1836 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})")
1837 else:
1838 signal_group = hdf5_file.create_group(signal_data.signal_id)
1839 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1840 if signal_data.data_type == "str":
1841 export_data = npy.array(
1842 list(
1843 zip(
1844 date_vector,
1845 signal_data.time_vector,
1846 signal_data.values,
1847 signal_data.forced_values,
1848 )
1849 ),
1850 dtype=custom_type_string,
1851 )
1852 else:
1853 export_data = npy.array(
1854 list(
1855 zip(
1856 date_vector,
1857 signal_data.time_vector,
1858 signal_data.values,
1859 signal_data.forced_values,
1860 )
1861 ),
1862 dtype=custom_type_float,
1863 )
1864 signal_group["data"] = export_data
1865 return hdf5_buffer.getvalue()
1868class SignalStatus(TwinPadModel):
1869 status: str = "down"
1870 reason: str = ""
1871 delay: float | None = None
1874class DigitizationFunction(TwinPadModel):
1875 bits: int | None = None
1876 min_value: float
1877 max_value: float
1878 min_raw_value: float
1879 max_raw_value: float
1881 @classmethod
1882 def from_bits(cls, bits: int, min_value: float, max_value: float):
1883 return cls(bits=bits, min_raw_value=0, max_raw_value=2**bits - 1, min_value=min_value, max_value=max_value)
1885 @classmethod
1886 def from_values(cls, min_raw_value: float, max_raw_value: float, min_value: float, max_value: float):
1887 return cls(
1888 bits=None,
1889 min_raw_value=min_raw_value,
1890 max_raw_value=max_raw_value,
1891 min_value=min_value,
1892 max_value=max_value,
1893 )
1895 def to_transfer_function(self):
1896 return TransferFunction(intervals=[(self.min_raw_value, self.min_value), (self.max_raw_value, self.max_value)])
1899class SignalUpdate(TwinPadModel):
1900 value: float | str | bool | int | None = None
1901 forced_value: float | str | bool | int | None = None
1902 timestamp: int | None = None
1905class SignalType(str, Enum):
1906 command = "command"
1907 sensor = "sensor"
1908 external_sensor = "external_sensor"
1909 interface = "interface"
1912SIGNALDATA_TYPES = {
1913 "int": NumericSignalData,
1914 "float": NumericSignalData,
1915 "str": StringSignalData,
1916 "bool": NumericSignalData,
1917 "epoch": NumericSignalData,
1918}
1921class LoopAddress(TwinPadModel):
1922 card_number: int
1923 channel: int
1926class Address(LoopAddress):
1927 loop_number: int
1930class TransferFunction(TwinPadModel):
1931 """
1932 A piecewise monotone linear function.
1933 """
1935 intervals: list[tuple[float, float]]
1937 def evaluate(self, x):
1938 for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]):
1939 if x1 <= x and x <= x2:
1940 return (y2 - y1) / (x2 - x1) * (x - x1) + y1
1941 raise ValueError("Out of bounds")
1943 def reverse(self, y):
1944 for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]):
1945 if min(y1, y2) <= y <= max(y1, y2):
1946 return (x2 - x1) / (y2 - y1) * (y - y1) + x1
1947 raise ValueError(f"Out of bounds: {y} is not in {self.intervals[0][1]}, {self.intervals[0][1]}")
1949 def compose(self, other_function):
1950 if other_function is None:
1951 return self
1952 # for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]):
1953 # other_function.reverse()
1954 # Reversing other function x
1955 new_x = {self.reverse(x) for (x, _) in other_function.intervals}
1956 new_x.union(x for (x, _) in self.intervals)
1957 new_intervals = [(x, other_function.evaluate(self.evaluate(x))) for x in sorted(new_x)]
1958 return TransferFunction(intervals=new_intervals)
1960 def inverse(self):
1961 """
1962 Calculate the inverse function of this transfer function.
1964 The inverse of a piecewise monotone linear function can be calculated by inverting each interval.
1965 This means swapping x and y, then solving for y.
1967 Returns:
1968 TransferFunction: The inverse function of this transfer function.
1969 """
1971 return TransferFunction([(x, y) for (y, x) in self.intervals])
1974class Signal(GenericMongo):
1975 collection_name: ClassVar[str] = "signals"
1977 signal_id: str | None = None
1978 ticker: str
1979 frequency: int
1980 unit: str | None
1981 description: str
1982 type: SignalType
1983 address: Address | LoopAddress | None = None
1984 data_type: str
1985 formula: str | None = None
1986 transfer_function: TransferFunction | None = None
1987 precision_digits: int | None
1988 digitization_function: DigitizationFunction | None = None
1989 forcible: bool
1990 commandable: bool
1991 broadcastable: bool
1992 status: SignalStatus = SignalStatus()
1994 post_processing: bool = False
1995 computed_phases_ids: list[str] = []
1997 custom_pipeline_steps: ClassVar[dict] = {
1998 "ticker": [
1999 {
2000 "$addFields": {
2001 "ticker": {"$ifNull": ["$ticker", {"$arrayElemAt": [{"$split": ["$signal_id", "."]}, -1]}]}
2002 }
2003 }
2004 ]
2005 }
2007 @property
2008 def device(self) -> Device:
2009 device_or_config_id = self.signal_id.split(".")[0]
2010 return Device.get_from_device_or_config_id(device_or_config_id)
2012 @cached_property
2013 def signal_data_class(self):
2014 if self.data_type in SIGNALDATA_TYPES:
2015 return SIGNALDATA_TYPES[self.data_type]
2016 if self.data_type.startswith("enum"):
2017 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
2018 raise ValueError(f"Unhandled python type: {self.data_type}")
2020 @cached_property
2021 def python_type(self):
2022 if self.data_type in TYPES:
2023 return TYPES[self.data_type]
2024 if self.data_type.startswith("enum"):
2025 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
2026 return Literal[*choices]
2027 raise ValueError(f"Unhandled python type: {self.data_type}")
2029 def to_config_dict(self):
2030 return self.to_dict(exclude={"id", "signal_id", "status", "post_processing", "computed_phases_ids"})
2032 async def send_command(self, device_id: str, update_dict: SignalUpdate, current_user: User) -> dict:
2033 command = Command(
2034 sent_at=time.time(),
2035 command_type="Signal command",
2036 user_id=current_user.id,
2037 )
2039 has_input_error = False
2040 error_message = ""
2042 if self.data_type.startswith("enum"):
2043 enum_options = get_args(self.python_type)
2045 if update_dict.value is not None and update_dict.value not in enum_options:
2046 has_input_error = True
2047 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
2048 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
2049 has_input_error = True
2050 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
2051 else:
2052 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
2053 has_input_error = True
2054 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
2055 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
2056 has_input_error = True
2057 error_message += (
2058 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
2059 )
2061 if has_input_error:
2062 command.response_time = 0
2063 command.succeeded = False
2064 command.description = f"Tried to modify signal {self.signal_id}"
2065 response = {"error": True, "status_code": 400, "message": error_message}
2066 else:
2067 response = await RabbitMQClient().send_signal_value(device_id, self.signal_id, update_dict)
2068 command.receive_response(response)
2070 Command.create(command)
2071 return response
2073 @classmethod
2074 def from_excel(
2075 cls, signals_sheet: Worksheet, card_indexes_to_refs: dict[str, str], tickers_to_addresses: dict[str, Address]
2076 ) -> dict[int, Self]:
2077 first_row = next(signals_sheet.iter_rows(max_row=1))
2078 header = list(cell.value for cell in first_row if cell.value is not None)
2079 if header != XLSX_HEADER:
2080 raise ValueError(f"Header of XLSX is not valid. got: {header}, expected: {XLSX_HEADER}")
2082 tickers = []
2083 signals_by_component_id: dict[int, list] = {}
2085 for row in signals_sheet.iter_rows(min_row=2):
2086 if is_line_empty(row):
2087 break
2089 ticker = row[0].value
2090 if not ticker:
2091 raise ValueError("Ticker should not be empty.")
2092 tickers.append(ticker)
2094 signal_type = row[4].value.lower()
2095 if signal_type not in ["sensor", "command", "external_sensor"]:
2096 raise ValueError(f"Should be either sensor, command or external_sensor, got {signal_type}")
2098 address = tickers_to_addresses.get(ticker, None)
2099 intervals = literal_eval(row[6].value) if row[6].value is not None else None
2100 sensor_transfer_function = TransferFunction(intervals=intervals) if intervals else None
2101 if address is not None:
2102 card_reference = card_indexes_to_refs.get((address.loop_number, address.card_number))
2103 digitization_function = DIGITIZATION_FUNCTIONS_FROM_REFERENCE.get(card_reference)
2104 try:
2105 transfer_function = digitization_function.to_transfer_function().compose(sensor_transfer_function)
2106 except ValueError as error:
2107 raise error
2108 else:
2109 digitization_function = None
2110 transfer_function = None
2112 type = row[8].value
2113 if type:
2114 type = type.replace('"', "'")
2116 formula = row[9].value
2117 if formula is not None:
2118 formula = formula.lstrip(" ")
2119 if formula.startswith("{"):
2120 # Test formula mapping is parsable
2121 literal_eval(formula)
2123 if formula is None and address is None and signal_type not in ["external_sensor", "command"]:
2124 raise ValueError(
2125 f"{ticker} should either be on an I/O card or have a formula or be an interface. Current signal type: {signal_type}"
2126 )
2128 if formula and address and signal_type != "command":
2129 raise ValueError(
2130 f"{ticker} is both on an I/O card and has a formula. This is forbidden for signal type {signal_type}."
2131 )
2133 signal = cls(
2134 ticker=ticker,
2135 description=row[2].value if row[2].value else "",
2136 unit=row[3].value,
2137 type=signal_type,
2138 address=address,
2139 frequency=row[5].value,
2140 transfer_function=transfer_function,
2141 precision_digits=row[7].value,
2142 digitization_function=digitization_function,
2143 data_type=type,
2144 formula=formula,
2145 forcible=read_boolean_cell(row[10], True),
2146 commandable=read_boolean_cell(row[11], True),
2147 broadcastable=read_boolean_cell(row[12], True),
2148 )
2150 component_id = row[1].value
2151 signals_by_component_id.setdefault(component_id, [])
2152 signals_by_component_id[component_id].append(signal)
2154 distinct_tickers = set()
2155 duplicated_tickers = {
2156 ticker for ticker in tickers if ticker in distinct_tickers or distinct_tickers.add(ticker)
2157 }
2158 if len(duplicated_tickers) > 0:
2159 raise ValueError(f"Duplicated ticker(s): {", ".join(duplicated_tickers)}")
2161 return signals_by_component_id
2163 @classmethod
2164 def get_from_signal_id(cls, signal_id: str) -> Self:
2165 """Could be generic from mongo"""
2166 signal = Signal.get_one_by_attribute("signal_id", signal_id)
2167 if signal is None:
2168 split_signal_id = signal_id.split(".")
2169 device_or_config_id = split_signal_id[0]
2170 ticker = split_signal_id[-1]
2171 possible_device = Device.get_from_device_or_config_id(device_or_config_id)
2172 if possible_device is not None:
2173 signal = Signal.get_one_by_attribute(
2174 "signal_id", f"{possible_device.device_id}.{possible_device.config_id}.{ticker}"
2175 )
2176 if not signal:
2177 return None
2178 return cls.dict_to_object(signal)
2180 @classmethod
2181 def get_all_ids(cls) -> list[str]:
2182 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
2184 return [signal["signal_id"] for signal in cursor]
2186 @classmethod
2187 def get_all_statuses(cls) -> list[dict]:
2188 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}])
2190 return [
2191 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]}
2192 for signal in cursor
2193 ]
2195 async def number_samples(self):
2196 collection = get_signal_collection(signal_id=self.signal_id)
2197 if collection is None:
2198 return 0
2200 number_samples = collection.estimated_document_count()
2202 number_samples_async_collection = await get_async_collection(
2203 systems_async_database, "number_samples", create=True, time_series=True
2204 )
2206 loop = asyncio.get_running_loop()
2207 loop.create_task(
2208 number_samples_async_collection.insert_one(
2209 {
2210 "timestamp": datetime.datetime.now(pytz.UTC),
2211 "signal_id": self.signal_id,
2212 "number_samples": number_samples,
2213 }
2214 )
2215 )
2217 return number_samples
2219 @classmethod
2220 def total_number_samples(cls) -> int:
2221 TwinPadActivity.get_number_samples_timeframe(0, 0, False)
2222 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
2224 if number_samples_collection is None:
2225 return 0
2227 result = number_samples_collection.aggregate(
2228 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}]
2229 )
2231 result = result.to_list()
2232 if len(result) == 0:
2233 return 0
2234 return result[0]["amount"]
2236 def sample_datasize(self):
2237 return signals_database.command("collstats", self.signal_id)["size"]
2239 @classmethod
2240 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
2241 result = cls.collection().aggregate(
2242 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
2243 )
2245 return {signal["signal_id"]: signal["forcible"] for signal in result}
2248class ForcedSignal(GenericMongo):
2249 collection_name: ClassVar[str] = "forced_signals"
2251 signal_id: str
2252 forcing_user_id: str
2253 forced_at: float
2254 value: str | float
2256 def insert(self):
2257 insert_result = self.collection().find_one_and_update(
2258 {"signal_id": self.signal_id},
2259 {"$set": self.to_dict(exclude={"id"})},
2260 upsert=True,
2261 return_document=ReturnDocument.AFTER,
2262 )
2263 self.id = str(insert_result["_id"])
2264 return self.id
2266 @classmethod
2267 def can_force(cls, signal_id: str, current_user: User) -> bool:
2268 """Checks whether user can force a given signal.
2270 :param signal_id: Signal ID of the signal to force
2271 :type signal_id: str
2272 :param current_user: Current user
2273 :type current_user: User
2274 :return: False if the signal was forced by someone else than the user, True otherwise
2275 :rtype: bool
2276 """
2277 forced_signal = cls.get_one_by_attribute("signal_id", signal_id)
2278 if forced_signal is not None:
2279 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
2280 return False
2281 return True
2284class Parameter(TwinPadModel):
2285 name: str
2286 value: str | float | bool
2289class ConfigurationComponent(GenericMongo):
2290 id: int
2291 name: str
2292 signals: list[Signal]
2293 parameters: list[Parameter] = []
2294 reference: str = None
2296 @classmethod
2297 def from_excel(cls, components_sheet: Worksheet) -> list[Self]:
2298 first_row = next(components_sheet.iter_rows(max_row=1, min_col=4))
2299 col_to_parameter_names = {header.column: header.value for header in first_row if header.value is not None}
2301 components: list[ConfigurationComponent] = []
2302 for component_id, name, reference, *extra_parameters in components_sheet.iter_rows(min_row=2):
2303 if component_id.value is None:
2304 break
2305 reference = reference.value if reference.value is not None else ""
2307 parameters = [
2308 Parameter(name=col_to_parameter_names[parameter.column], value=parameter.value)
2309 for parameter in extra_parameters
2310 if parameter.value is not None
2311 ]
2312 components.append(
2313 cls(
2314 id=component_id.value,
2315 name=name.value,
2316 reference=reference,
2317 parameters=parameters,
2318 signals=[],
2319 )
2320 )
2322 return components
2324 def to_config_dict(self):
2325 component_dict = self.to_dict(exclude={})
2326 component_dict["signals"] = [signal.to_config_dict() for signal in self.signals]
2327 return component_dict
2330class ServicesStatus(TwinPadModel):
2331 backend: str
2332 cloud_broker: str
2333 time_series_database: str
2334 signal_storage: str
2335 heartbeat_storage: str
2336 data_analyzer: str
2338 @classmethod
2339 def check(cls) -> Self:
2340 return cls(
2341 cloud_broker=ping(RABBITMQ_HOST),
2342 backend="up",
2343 time_series_database=ping(MONGO_HOST),
2344 signal_storage=ping(SIGNAL_STORAGE_HOST),
2345 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
2346 data_analyzer=ping(DATA_ANALYZER_HOST),
2347 )
2350def ping(host):
2351 try:
2352 if ping3.ping(host, timeout=0.8):
2353 return "up"
2354 except PermissionError:
2355 pass
2356 return "down"
2359class Event(GenericMongo):
2360 collection_name: ClassVar[str] = "events"
2362 name: str
2363 timestamp: float
2364 event_rule_id: str
2366 @computed_field
2367 @cached_property
2368 def event_rule(self) -> "EventRule":
2369 return EventRule.get_from_id(self.event_rule_id)
2371 @classmethod
2372 def dict_to_object(cls, dict_):
2373 """Refine to convert timestamp to datetime for mongodb."""
2374 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
2375 return super().dict_to_object(dict_)
2378class TwinPadActivity(GenericMongo):
2379 timestamp: float
2380 amount: int
2382 @classmethod
2383 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
2384 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2385 number_events_collection = get_collection(systems_database, "number_events")
2386 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
2387 items = []
2388 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2389 if number_events_collection is None or recompute_amount:
2390 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
2391 number_events_collection.delete_many({})
2392 first_event = events_collection.find_one(sort={"timestamp": 1})
2393 if first_event is None:
2394 return items
2395 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
2396 tzinfo=pytz.UTC
2397 )
2398 while last_computed_day < TODAY:
2399 day_nb_events = events_collection.count_documents(
2400 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2401 )
2402 if day_nb_events > 0:
2403 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
2404 last_computed_day += ONE_DAY_OFFSET
2405 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
2406 if number_events_today > 0:
2407 number_events_collection.delete_many({"timestamp": TODAY})
2408 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
2409 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2410 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2411 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2412 for day in number_events:
2413 day["timestamp"] = day["timestamp"].timestamp()
2414 items.append(cls.mongo_dict_to_object(day))
2415 return items
2417 @classmethod
2418 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2419 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2420 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
2421 signals_number_samples_collection = get_collection(
2422 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
2423 )
2424 items = []
2425 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2426 if number_samples_collection is None or recompute_amount:
2427 number_samples_collection = get_collection(
2428 systems_database, "number_received_samples", create=True, time_series=True
2429 )
2430 number_samples_collection.delete_many({})
2431 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
2432 if first_sample is None:
2433 return items
2434 # compute from day of first found event
2435 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
2436 tzinfo=pytz.UTC
2437 )
2438 while last_computed_day < TODAY:
2439 number_samples_request = signals_number_samples_collection.aggregate(
2440 [
2441 {
2442 "$match": {
2443 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
2444 }
2445 },
2446 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2447 ]
2448 ).to_list()
2449 if len(number_samples_request) == 0:
2450 number_samples = 0
2451 else:
2452 number_samples = number_samples_request[0].get("number_samples", 0)
2453 if number_samples > 0:
2454 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
2455 last_computed_day += ONE_DAY_OFFSET
2456 number_samples_request = signals_number_samples_collection.aggregate(
2457 [
2458 {"$match": {"timestamp": {"$gte": TODAY}}},
2459 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2460 ]
2461 ).to_list()
2462 if len(number_samples_request) == 0:
2463 number_samples_today = 0
2464 else:
2465 number_samples_today = number_samples_request[0].get("number_samples", 0)
2466 if number_samples_today > 0:
2467 number_samples_collection.delete_many({"timestamp": TODAY})
2468 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
2469 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2470 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2471 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2472 for day in number_events:
2473 day["timestamp"] = day["timestamp"].timestamp()
2474 items.append(cls.mongo_dict_to_object(day))
2475 return items
2477 @classmethod
2478 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2479 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2480 number_commands_collection = get_collection(systems_database, "number_commands")
2481 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
2482 items = []
2483 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2484 if number_commands_collection is None or recompute_amount:
2485 number_commands_collection = get_collection(
2486 systems_database, "number_commands", create=True, time_series=True
2487 )
2488 number_commands_collection.delete_many({})
2489 first_command = commands_collection.find_one(sort={"timestamp": 1})
2490 if first_command is None:
2491 return items
2492 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
2493 tzinfo=pytz.UTC
2494 )
2495 while last_computed_day < TODAY:
2496 day_nb_commands = commands_collection.count_documents(
2497 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2498 )
2499 if day_nb_commands > 0:
2500 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
2501 last_computed_day += ONE_DAY_OFFSET
2502 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
2503 if number_commands_today > 0:
2504 number_commands_collection.delete_many({"timestamp": TODAY})
2505 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
2506 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2507 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2508 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2509 for day in number_commands:
2510 day["timestamp"] = day["timestamp"].timestamp()
2511 items.append(cls.mongo_dict_to_object(day))
2512 return items
2515class EventRule(GenericMongo):
2516 collection_name: ClassVar[str] = "event_rules"
2518 name: str
2519 formula: str
2520 variables: list[str] = []
2522 @computed_field
2523 @cached_property
2524 def number_events(self) -> int:
2525 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
2527 @classmethod
2528 def from_excel(cls, event_rules_sheet: Worksheet) -> list[Self]:
2529 event_rules = []
2530 for event_name, formula in event_rules_sheet.iter_rows(min_row=2, min_col=2, max_col=3):
2531 if event_name.value is None or formula.value is None:
2532 break
2533 event_rules.append(EventRule(name=event_name.value, formula=formula.value))
2535 return event_rules
2537 def to_config_dict(self):
2538 return self.to_dict(exclude={"id", "variables", "number_events"})
2541class Company(GenericMongo):
2542 collection_name: ClassVar[str] = "companies"
2543 name: str
2546class Campaign(GenericMongo):
2547 collection_name: ClassVar[str] = "campaigns"
2549 # Properties
2550 id: str | None = None
2551 name: str
2552 description: str | None = None
2555class Phase(GenericMongo):
2556 collection_name: ClassVar[str] = "phases"
2558 # Properties
2559 id: str | None = None
2560 name: str
2561 description: str | None = None
2562 start_at: float
2563 end_at: float
2565 # FK
2566 campaign_id: MongoId
2568 @classmethod
2569 def deleteMany(cls, campaign_id):
2570 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
2571 return delete_phases
2574class CustomViewCreation(GenericMongo):
2575 collection_name: ClassVar[str] = "custom_views"
2577 name: str
2578 configuration: list
2581class CustomView(CustomViewCreation):
2582 # Properties
2583 id: str | None = None
2585 # Foreign Key
2586 user_id: str
2589CustomViewUpdate = create_update_model(CustomView)
2592class Video(GenericMongo):
2593 collection_name: ClassVar[str] = "videos"
2595 # Properties
2596 name: str
2597 ip_addr: str
2598 username: str | None = None
2599 password: str | None = None
2601 # Methods
2602 @classmethod
2603 def get_all(cls, sort_by="_id") -> list[Self]:
2604 items = []
2605 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
2606 items.append(cls.mongo_dict_to_object(dict_))
2607 return items
2609 @classmethod
2610 def get_video(cls, camera_id: ObjectId):
2611 camera = cls.get_from_id(camera_id)
2612 if camera is not None:
2613 return camera.name
2614 return None
2617class Command(GenericMongo):
2618 collection_name: ClassVar[str] = "commands"
2620 # Properties
2621 timestamp: datetime.datetime = None
2622 sent_at: float
2623 response_time: float = 0.0
2624 command_type: str
2625 description: str = ""
2626 succeeded: bool = False
2628 # Foreign key
2629 user_id: str
2631 @classmethod
2632 def collection(cls):
2633 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
2635 @classmethod
2636 def create(cls, command: Self):
2637 command = cls(
2638 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
2639 sent_at=command.sent_at,
2640 response_time=command.response_time,
2641 command_type=command.command_type,
2642 description=command.description,
2643 succeeded=command.succeeded,
2644 user_id=command.user_id,
2645 )
2646 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
2647 if new_command is None:
2648 return None
2649 return {"command_id": str(new_command.inserted_id)}
2651 def receive_response(self, response: dict):
2652 self.response_time = time.time() - self.sent_at
2653 self.succeeded = response.get("error", True) is False
2654 if self.description == "":
2655 self.description += response.get("message", "").rstrip()
2658class SignalsPresetCreation(GenericMongo):
2659 name: str
2660 signal_ids: list[str]
2663class SignalsPreset(SignalsPresetCreation):
2664 collection_name: ClassVar[str] = "signals_presets"
2666 user_id: str
2668 @classmethod
2669 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
2670 signals_preset = cls(
2671 user_id=user_id,
2672 name=signals_preset.name,
2673 signal_ids=signals_preset.signal_ids,
2674 )
2676 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
2678 return str(new_signal_preset.inserted_id)
2681SignalsPresetUpdate = create_update_model(SignalsPreset)
2684class LineStyle(str, Enum):
2685 solid = "solid"
2686 dotted = "dotted"
2687 dashed = "dashed"
2690class SignalAppearance:
2691 value_color: str
2692 forced_value_color: str
2695class GraphThemeCreation(GenericMongo):
2696 collection_name: ClassVar[str] = "graph_themes"
2698 name: str
2699 signal_id: str
2700 value_color: str = ""
2701 forced_value_color: str = ""
2702 value_line_style: LineStyle = LineStyle.solid
2703 forced_value_line_style: LineStyle = LineStyle.solid
2704 private: bool = True
2707class PublicGraphTheme(GraphThemeCreation):
2708 created_by_user: bool
2709 in_user_library: bool
2710 active_for_user: bool
2712 _current_user_id: str = ""
2714 @classproperty
2715 def custom_pipeline_steps(cls) -> dict[str, list]:
2716 return {
2717 "created_by_user": [
2718 {
2719 "$addFields": {
2720 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
2721 }
2722 }
2723 ],
2724 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
2725 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
2726 ],
2727 "in_user_library": [
2728 {
2729 "$addFields": {
2730 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2731 }
2732 }
2733 ],
2734 "active_for_user": [
2735 {
2736 "$addFields": {
2737 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2738 }
2739 }
2740 ],
2741 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2742 "active": [
2743 {
2744 "$addFields": {
2745 "active": "$$REMOVE",
2746 }
2747 }
2748 ],
2749 "creator_id": [
2750 {
2751 "$addFields": {
2752 "creator_id": "$$REMOVE",
2753 }
2754 }
2755 ],
2756 }
2758 @classmethod
2759 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2760 cls._current_user_id = user_id
2761 return super().response_from_query(query)
2763 @classmethod
2764 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2765 query.in_user_library = "true"
2766 return cls.response_from_query(query, user_id)
2768 @classmethod
2769 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2770 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2772 @classmethod
2773 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2774 cls._current_user_id = user_id
2775 return super().get_by_attribute(attribute_name, attribute_value)
2777 @classmethod
2778 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2779 cls._current_user_id = user_id
2780 return super().get_one_by_attribute(attribute_name, attribute_value)
2782 @classmethod
2783 def get_all(cls, sort_by: str, user_id: str):
2784 cls._current_user_id = user_id
2785 return super().get_all(sort_by)
2787 @classmethod
2788 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2789 pipeline = [
2790 {
2791 "$match": {
2792 "active": {"$eq": user_id},
2793 "signal_id": {"$in": signal_ids},
2794 }
2795 },
2796 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2797 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2798 {
2799 "$project": {
2800 "_id": 0,
2801 "signal_id": 1,
2802 "value_color": 1,
2803 "forced_value_color": 1,
2804 "value_line_style": 1,
2805 "forced_value_line_style": 1,
2806 }
2807 },
2808 ]
2810 result = {}
2812 cursor = cls.collection().aggregate(pipeline)
2813 for document in cursor:
2814 signal_id = document["signal_id"]
2815 del document["signal_id"]
2816 result[signal_id] = document
2818 return result
2821GraphThemeUpdate = create_update_model(PublicGraphTheme)
2824class PrivateGraphTheme(GraphThemeCreation):
2825 # private
2826 creator_id: str
2827 in_library: list[str]
2828 active: list[str]
2830 @classmethod
2831 def create(
2832 cls,
2833 creator_id: str,
2834 name: str,
2835 signal_id: str,
2836 value_color: str,
2837 forced_value_color: str,
2838 value_line_style: LineStyle,
2839 forced_value_line_style: LineStyle,
2840 private: bool,
2841 ):
2842 color_setting = cls(
2843 creator_id=creator_id,
2844 name=name,
2845 signal_id=signal_id,
2846 value_color=value_color,
2847 forced_value_color=forced_value_color,
2848 value_line_style=value_line_style,
2849 forced_value_line_style=forced_value_line_style,
2850 private=private,
2851 in_library=[creator_id],
2852 active=[creator_id],
2853 )
2855 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2856 color_setting.id = str(new_color_setting.inserted_id)
2857 return color_setting
2859 def update(self, update_dict: dict, user_id: str):
2860 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2861 if in_user_lib and user_id not in self.in_library:
2862 self.in_library.append(user_id)
2863 elif not in_user_lib and user_id in self.in_library:
2864 self.in_library.remove(user_id)
2865 update_dict["in_library"] = self.in_library
2866 del update_dict["in_user_library"]
2868 if (active_for_user := update_dict.get("active_for_user")) is not None:
2869 if active_for_user and user_id not in self.active:
2870 self.active.append(user_id)
2871 elif not active_for_user and user_id in self.active:
2872 self.active.remove(user_id)
2873 update_dict["active"] = self.active
2874 del update_dict["active_for_user"]
2876 if update_dict.get("created_by_user") is not None:
2877 del update_dict["created_by_user"]
2879 self.collection().find_one_and_update(
2880 {"_id": ObjectId(self.id)},
2881 {"$set": update_dict},
2882 )
2884 return {}
2887class Configuration(GenericMongo):
2888 model_config = ConfigDict(serialize_by_alias=True)
2890 collection_name: ClassVar[str] = "configs"
2892 # Properties
2893 json_schema: str | None = Field(default=None, serialization_alias="$schema")
2894 config_name: str | None = None
2895 config_id: str | None = None
2896 generated_at: float
2897 config: dict
2898 modes: list[Mode]
2899 components: list[ConfigurationComponent]
2900 hardware_topology: EtherCatTopology
2901 petri_network: dict | None = None
2902 pid: dict | None = None
2903 event_rules: list[EventRule]
2905 received_at: float | None = None
2906 in_use_by_devices: list[str] = []
2907 is_in_use: bool = False
2909 custom_pipeline_steps = {
2910 "is_in_use": [
2911 {
2912 "$addFields": {
2913 "is_in_use": {
2914 "$cond": [
2915 {"$gt": [{"$size": {"$ifNull": ["$in_use_by_devices", []]}}, 0]},
2916 True,
2917 False,
2918 ]
2919 },
2920 }
2921 }
2922 ],
2923 }
2925 @classmethod
2926 def get_all_names_devices(cls) -> list[dict[str, str]]:
2927 cursor = cls.collection().aggregate(
2928 [
2929 {"$replaceRoot": {"newRoot": {"$mergeObjects": ["$config", "$$ROOT"]}}},
2930 {"$project": {"config_id": 1, "config_name": 1, "target_device_id": 1, "device_name": 1, "_id": 0}},
2931 {"$sort": {"signal_id": 1}},
2932 ]
2933 )
2935 return cursor.to_list()
2937 def to_config_dict(self, exclude=set()):
2938 for property in ("id", "received_at", "in_use_by_devices", "is_in_use"):
2939 exclude.add(property)
2940 config = self.to_dict(exclude=exclude)
2941 config["components"] = [component.to_config_dict() for component in self.components]
2942 config["event_rules"] = [event_rule.to_config_dict() for event_rule in self.event_rules]
2943 return config
2945 def to_json(self):
2946 return json.dumps(self.to_config_dict(), indent=2)
2948 @classmethod
2949 def get_from_config_id(cls, config_id: str, exclude_sensitive_info: bool = True) -> Self:
2950 pipeline = []
2951 if len(config_id) == 24:
2952 pipeline.append({"$match": {"_id": ObjectId(config_id)}})
2953 else:
2954 pipeline.append({"$match": {"config_id": config_id}})
2955 pipeline.append({"$limit": 1})
2956 for _, step in cls.custom_pipeline_steps.items():
2957 pipeline.extend(step)
2958 items = cls.collection().aggregate(pipeline).to_list()
2959 if len(items) == 0:
2960 return None
2961 dict_ = items[0]
2962 # There is some protected information in the config dict, so keep only specific keys
2963 if exclude_sensitive_info:
2964 allowed_config_keys = ["description", "broker_host", "target_device_id", "device_name"]
2965 config_dict = dict_.get("config")
2966 dict_["config"] = {k: config_dict[k] for k in allowed_config_keys}
2967 return cls.mongo_dict_to_object(dict_)
2969 @classmethod
2970 def get_from_excel(
2971 cls,
2972 config_name: str,
2973 excel_file: ReadableFile,
2974 pid_json: ReadableFile | None,
2975 petri_json: ReadableFile | None,
2976 ) -> Self:
2977 """
2978 Converts a device's configuration from an excel file to a proper JSON configuration.
2980 :param config_name: The configuration's name.
2981 :type config_name: str
2982 :param excel_file: The path to open or a file-like object of the excel file.
2983 :type excel_file: str | os.PathLike[str] | IO[bytes] | bytes
2984 :param pid_json: The path to open or a file-like object of the PID's JSON file.
2985 :type pid_json: str | os.PathLike[str] | IO[bytes] | bytes
2986 :param petri_json: The path to open or a file-like object of the Petri Network's JSON file.
2987 :type petri_json: str | os.PathLike[str] | IO[bytes] | bytes
2989 :raises ValueError: When excel file is invalid in any way.
2990 :raises FileNotFoundError: When the schema file used to verify the configuration was not found locally.
2992 :return: Dictionary representing the loaded configuration.
2993 :rtype: Configuration
2994 """
2995 workbook: Workbook = load_workbook(filename=excel_file)
2997 sheets_by_name = {s.title: s for s in workbook.worksheets}
2998 # These can generate errors if the sheets don't exist, they are meant to be caught
2999 signals_sheet = sheets_by_name["signals"]
3000 components_sheet = sheets_by_name["components"]
3001 config_sheet = sheets_by_name["config"]
3002 modes_sheet = sheets_by_name["modes"]
3003 event_rules_sheet = sheets_by_name["event_rules"]
3005 topology, card_indexes_to_refs, tickers_to_addresses = EtherCatTopology.from_excel(workbook)
3007 workbook.close()
3009 components = ConfigurationComponent.from_excel(components_sheet)
3010 components_by_id = {component.id: component for component in components}
3012 signals_by_component_id = Signal.from_excel(signals_sheet, card_indexes_to_refs, tickers_to_addresses)
3014 for component_id, signals in signals_by_component_id.items():
3015 component = components_by_id.get(component_id)
3016 if component is None:
3017 raise ValueError(
3018 f"Component with id #{component_id} does not exist. Mnemonic(s) {[signal.ticker for signal in signals]} is/are orphaned."
3019 )
3020 component.signals = signals
3022 device_config = {"description": ""} # Backwards compatibility
3023 for key, value in config_sheet.iter_rows(max_col=2):
3024 device_config[key.value] = value.value
3026 if device_config.get("target_device_id") is None:
3027 if (target_device_id := device_config.get("device_id", None)) is not None:
3028 device_config["target_device_id"] = target_device_id
3029 del device_config["device_id"]
3030 else:
3031 raise ValueError("Missing target device id.")
3033 modes = Mode.from_excel(modes_sheet)
3034 event_rules = EventRule.from_excel(event_rules_sheet)
3036 hwconfig = {
3037 "config_name": config_name,
3038 "components": [c.to_config_dict() for c in components],
3039 "modes": [m.to_dict() for m in modes],
3040 "hardware_topology": topology.to_dict(),
3041 "event_rules": [e.to_config_dict() for e in event_rules],
3042 }
3044 # Adding petri
3045 if petri_json is not None:
3046 petri = json.load(petri_json)
3047 hwconfig["petri_network"] = petri
3048 else:
3049 hwconfig["petri_network"] = None
3051 # Adding PID
3052 if pid_json is not None:
3053 pid = json.load(pid_json)
3054 hwconfig["pid"] = pid
3055 else:
3056 hwconfig["pid"] = None
3058 json_config = json.dumps(hwconfig, sort_keys=True, indent=2).encode()
3059 config_hash = hashlib.md5(json_config).hexdigest()[:12]
3061 schema = (
3062 "https://gitea.spacedreams.com/SpaceDreams/Twinpad/src/branch/master/data-storage/device-config_v2.json"
3063 )
3065 hwconfig["json_schema"] = schema
3066 hwconfig["generated_at"] = round(time.time())
3067 hwconfig["config_id"] = config_hash
3068 hwconfig["config"] = device_config
3069 # Since some properties in signals are ignored, add back full signal objects after hash computing
3070 hwconfig["components"] = [c.to_dict() for c in components]
3072 insert_result = cls.collection().find_one_and_update(
3073 {"config_id": config_hash, "config": device_config},
3074 {"$set": hwconfig},
3075 upsert=True,
3076 return_document=ReturnDocument.AFTER,
3077 )
3079 return str(insert_result["_id"])
3081 def compute_config_id(self):
3082 config_dict = self.to_config_dict(
3083 exclude={
3084 "id",
3085 "json_schema",
3086 "generated_at",
3087 "config_id",
3088 "config",
3089 "received_at",
3090 "in_use_by_devices",
3091 "is_in_use",
3092 }
3093 )
3095 json_config = json.dumps(config_dict, sort_keys=True, indent=2).encode()
3096 return hashlib.md5(json_config).hexdigest()[:12]
3098 def update(
3099 self, config_name: str, petri_file: ReadableFile | None, pid_file: ReadableFile | None, save_as_new: bool
3100 ):
3101 if config_name is not None:
3102 self.config_name = config_name
3103 if petri_file is not None:
3104 self.petri_network = json.load(petri_file)
3105 if pid_file is not None:
3106 self.pid = json.load(pid_file)
3108 self.config_id = self.compute_config_id()
3109 self.generated_at = round(time.time())
3111 insert_dict = self.to_dict(exclude={"id", "received_at", "in_use_by_devices"}, by_alias=False)
3112 if save_as_new:
3113 self.collection().find_one_and_update(
3114 {"config_id": self.config_id, "config": self.config},
3115 {"$set": insert_dict},
3116 upsert=True,
3117 )
3118 else:
3119 self.collection().find_one_and_update({"_id": ObjectId(self.id)}, {"$set": insert_dict})
3121 return self.config_id
3124DIGITIZATION_FUNCTIONS_FROM_REFERENCE = {
3125 "EL1809": DigitizationFunction.from_bits(1, 0, 1),
3126 "EL1819": DigitizationFunction.from_bits(1, 0, 1),
3127 "EL1918": DigitizationFunction.from_bits(1, 0, 1),
3128 "EL3124": DigitizationFunction.from_bits(15, 0.004, 0.020),
3129 "EL3062-0030": DigitizationFunction.from_bits(15, 0.0, 30.0),
3130 "EL3356-0020": DigitizationFunction.from_bits(24, -12.0, 12.0),
3131 "EL2004": DigitizationFunction.from_bits(1, 0, 1),
3132 "EL2042": DigitizationFunction.from_bits(1, 0, 1),
3133 "EL1004": DigitizationFunction.from_bits(1, 0, 1),
3134 "EL3054": DigitizationFunction.from_bits(15, 0.004, 0.020),
3135 "EL3202": DigitizationFunction.from_values(-2000, 8500, -200, 850),
3136 "EL4022": DigitizationFunction.from_bits(15, 0.004, 0.020),
3137 "EL3064": DigitizationFunction.from_bits(15, 0.0, 10.0),
3138 "EL2022": DigitizationFunction.from_bits(1, 0, 1),
3139 "ELX2008": DigitizationFunction.from_bits(1, 0, 1),
3140 "EPX3158": DigitizationFunction.from_bits(15, 0.004, 0.020),
3141 "EPX1058": DigitizationFunction.from_bits(1, 0, 1),
3142 "ELX4154": DigitizationFunction.from_bits(15, 0.004, 0.020),
3143 "ELM3704": DigitizationFunction.from_bits(31, 0.004, 0.020),
3144 "EL3062": DigitizationFunction.from_bits(15, 0, 10),
3145 "EL3351": DigitizationFunction.from_bits(15, -0.020, 0.020),
3146 "EL9410": DigitizationFunction.from_bits(1, 0, 1),
3147 "EL2904": DigitizationFunction.from_bits(1, 0, 1),
3148 "EL2911": DigitizationFunction.from_bits(1, 0, 1),
3149}
3151REFERENCED_CARDS = list(DIGITIZATION_FUNCTIONS_FROM_REFERENCE.keys()) + ["EK1100", "EL9011"]
3154class EtherCatModule(TwinPadModel):
3155 name: str
3156 reference: str
3157 signals: list[str] = []
3160class EtherCatLoop(TwinPadModel):
3161 terminals: list[EtherCatModule]
3163 @cached_property
3164 def signals(self):
3165 signals = []
3166 for terminal in self.terminals:
3167 signals.extend(terminal.signals)
3168 return signals
3171class EtherCatTopology(TwinPadModel):
3172 loops: list[EtherCatLoop]
3174 @cached_property
3175 def signals(self) -> list[str]:
3176 signals = []
3177 for loop in self.loops:
3178 signals.extend(loop.signals)
3179 return signals
3181 @classmethod
3182 def from_excel(cls, excel_workbook: Workbook):
3183 loop_number = 0
3184 loops = []
3185 tickers_to_addresses = {}
3186 card_indexes_to_refs = {}
3187 for sheet in excel_workbook.worksheets:
3188 if sheet.title.startswith("loop_"):
3189 if sheet["B3"].value != "Position":
3190 raise ValueError(
3191 f'Invalid channel position.\nCell B3: (currently "{sheet["B3"].value}") should be "Position"'
3192 )
3193 loop_number += 1
3195 # Acquisition cards
3196 for row_index, row in enumerate(sheet.iter_rows(min_row=4, min_col=3)):
3197 if is_line_empty(row):
3198 break
3199 for cell_index, cell in enumerate(row):
3200 if cell.value:
3201 tickers_to_addresses[cell.value] = Address(
3202 card_number=cell_index + 1, channel=row_index + 1, loop_number=loop_number
3203 )
3205 modules = []
3206 for column_index, (name, reference, _, *tickers) in enumerate(sheet.iter_cols(min_col=3, min_row=1)):
3207 if is_line_empty((name, reference, *tickers)):
3208 break
3209 if reference.value not in REFERENCED_CARDS:
3210 raise NotImplementedError(
3211 f"Card '{reference.value}' hasn't been referenced yet. Report this to system administrators."
3212 )
3213 card_indexes_to_refs[loop_number, column_index + 1] = reference.value
3214 module = EtherCatModule(
3215 name=name.value,
3216 reference=reference.value if reference.value is not None else "",
3217 signals=[str(ticker_cell.value) for ticker_cell in tickers if ticker_cell.value is not None],
3218 )
3219 modules.append(module)
3221 loops.append(EtherCatLoop(terminals=modules))
3223 return cls(loops=loops), card_indexes_to_refs, tickers_to_addresses
3226class DeviceStatus(str, Enum):
3227 started = "started"
3228 running = "running"
3229 created = "created"
3230 exited = "exited"
3231 restarting = "restarting"
3234class DeviceUpdateFromDeployer(BaseModel):
3235 status: DeviceStatus
3238class DeviceFromDeployerCreation(BaseModel):
3239 name: str
3240 description: str
3243class DeviceFromDeployer(DeviceFromDeployerCreation):
3244 status: DeviceStatus
3245 device_id: DeviceId
3246 logs: str = ""
3249class DeviceDeployer(GenericMongo):
3250 collection_name: ClassVar[str] = "device_deployers"
3251 url: HttpUrl
3253 def endpoint_url(self, endpoint):
3254 return f"{str(self.url).rstrip('/')}/{endpoint}"
3256 def devices(self) -> list[DeviceFromDeployer]:
3257 devices = []
3258 try:
3259 response = requests.get(self.endpoint_url("devices"))
3260 except requests.exceptions.ConnectionError:
3261 logger.info("connection error")
3262 return None
3263 if response.status_code != 200:
3264 return None
3265 for device_dict in response.json()["devices"]:
3266 devices.append(
3267 DeviceFromDeployer(
3268 device_id=device_dict["device_id"],
3269 name=device_dict["container_name"],
3270 description="desc",
3271 status=device_dict["status"],
3272 logs=device_dict["logs"],
3273 )
3274 )
3275 return devices
3277 def get_device(self, device_id: DeviceId):
3278 try:
3279 response = requests.get(self.endpoint_url(f"devices/{device_id}"))
3280 except requests.exceptions.ConnectionError:
3281 return None
3282 if response.status_code != 200:
3283 return None
3284 device_dict = response.json()
3285 return DeviceFromDeployer(
3286 device_id=device_dict["device_id"],
3287 name=device_dict["container_name"],
3288 description="desc",
3289 status=device_dict["status"],
3290 logs=device_dict["logs"],
3291 )
3293 def create_device(self, device: DeviceFromDeployer) -> Device | None:
3294 try:
3295 response = requests.post(self.endpoint_url("devices"), json={"name": device.name})
3296 except requests.exceptions.ConnectionError:
3297 return None
3299 if response.status_code != 201:
3300 return None
3302 device_dict = response.json()
3303 return DeviceFromDeployer(
3304 device_id=device_dict["device_id"],
3305 name="",
3306 description="desc",
3307 status=device_dict["status"],
3308 )
3310 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None:
3311 try:
3312 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump())
3313 except requests.exceptions.ConnectionError:
3314 return None
3316 if response.status_code != 200:
3317 return None
3319 device_dict = response.json()
3320 return Device(
3321 device_id=device_dict["device_id"],
3322 name="",
3323 description="desc",
3324 pid={},
3325 petri_network={},
3326 modes=[],
3327 status=device_dict["status"],
3328 )
3330 def delete_device(self, device_id: DeviceId) -> DeleteInfo:
3331 try:
3332 response = requests.delete(self.endpoint_url(f"devices/{device_id}"))
3333 except requests.exceptions.ConnectionError:
3334 return DeleteInfo(is_deleted=False, detail="Connection to deployer error")
3335 if response.status_code not in [200, 202, 204]:
3336 return DeleteInfo(is_deleted=False, detail=response.text)
3338 return DeleteInfo(is_deleted=True, detail="")
3341DeviceDeployerUpdate = create_update_model(DeviceDeployer)