Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 89%
1401 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 11:44 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, get_args
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
14import re
16import zipfile
17import ping3
18import pytz
19from bson.objectid import ObjectId
20from pymongo import ASCENDING, ReturnDocument
21from pymongo.collation import Collation
22from pydantic import BaseModel, computed_field, Field, create_model
23import numpy as npy
24import lttb
25import h5py
26from PIL import Image
27from openpyxl import Workbook
28from openpyxl.styles import Border, Side
30# from scipy import signal as signal_scipy
32from twinpad_backend.db import (
33 get_collection,
34 get_async_collection,
35 get_signal_collection,
36 systems_database,
37 systems_async_database,
38 signals_database,
39 devices_states_database,
40)
41from twinpad_backend.responses import ListResponse
42from twinpad_backend.messages import send_mode_change, send_signal_value
43from twinpad_backend.xml_parsing import extract_subname_ref, xml_tags
45TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
55NUMBER_SAMPLES_DATABASE_UPDATE = 120
57logger = logging.getLogger("uvicorn.error")
60CHANNEL_PATTERN = r"Channel\s+(\d+)"
61XLSX_HEADER = [
62 "Ticker",
63 "Component id",
64 "Signal description",
65 "Unit",
66 "Type",
67 "Frequency (Hz)",
68 "Sensor transfer function",
69 "Precision digits",
70 "Data type",
71 "Formula",
72 "Forcible",
73 "Commandable",
74 "Broadcastable",
75]
78class classproperty:
79 """
80 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
81 Found here: https://stackoverflow.com/a/76301341
82 """
84 def __init__(self, func):
85 self.fget = func
87 def __get__(self, _, owner):
88 return self.fget(owner)
91def create_update_model(model):
92 fields = {}
94 for field_name, field_annotation in model.model_fields.items():
95 if field_name != "id":
96 fields[field_name] = (field_annotation.annotation | None, None)
98 query_name = model.__name__ + "Update"
99 return create_model(query_name, **fields)
102def get_utc_date_from_timestamp(timestamp: float):
103 return datetime.datetime.fromtimestamp(timestamp).isoformat()
106def downsample_list(time_vector: list, values: list, max_number_samples: int):
107 if len(time_vector) < max_number_samples:
108 return time_vector, values
110 time_vector_copy = copy.deepcopy(time_vector)
111 values_copy = copy.deepcopy(values)
113 none_group_bounds = []
114 none_group_index = -1
115 index = -1
116 # LTTB doesn't handle None values so remove them
117 while values_copy.count(None) > 0:
118 # Store bounds of None value groups so we can insert them back after the downsampling
119 if (new_index := values_copy.index(None)) != index:
120 none_group_bounds.append([time_vector_copy.pop(new_index)])
121 none_group_index += 1
122 elif len(none_group_bounds[none_group_index]) < 2:
123 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
124 else:
125 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
126 values_copy.pop(new_index)
127 index = new_index
128 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
130 try:
131 values_array = npy.array([time_vector_copy, values_copy]).T
132 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
134 new_time_vector = interpolated_values[:, 0].tolist()
135 new_values = interpolated_values[:, 1].tolist()
136 except ValueError:
137 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
138 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
139 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
140 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
141 return new_time_vector, new_values_nan_to_none
143 # insert back None values at the correct timestamps
144 for none_group in none_group_bounds:
145 start_index = npy.searchsorted(new_time_vector, none_group[0])
146 new_time_vector[start_index:start_index] = none_group
147 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
149 return new_time_vector, new_values
152def is_of_type(value, wanted_type):
153 if wanted_type is float:
154 return isinstance(value, (int, float))
155 return isinstance(value, wanted_type)
158# Models
159class TwinPadModel(BaseModel):
160 @classmethod
161 def dict_to_object(cls, dict_):
162 return cls.model_validate(dict_)
164 def to_dict(self, exclude=None):
165 dict_ = self.model_dump(exclude=exclude)
166 return dict_
169class GenericMongo(TwinPadModel):
170 id: str | None = None
171 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
173 @classmethod
174 def collection(cls):
175 return get_collection(systems_database, cls.collection_name, create=True)
177 @classmethod
178 def response_from_query(cls, query) -> ListResponse[Self]:
179 request_filters = query.mongodb_filter()
180 items = []
181 if ":" in query.sort_by:
182 sort_field, sort_order = query.sort_by.split(":")
183 sort_order = int(sort_order)
184 else:
185 sort_field = query.sort_by
186 sort_order = 1
187 collection = get_collection(systems_database, cls.collection_name, create=True)
188 total = collection.count_documents(request_filters)
190 pipeline = []
191 added_properties = []
192 if "$and" in request_filters:
193 for request_filter in request_filters["$and"]:
194 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
195 if filtered_property in request_filter:
196 pipeline.extend(pipeline_steps)
197 added_properties.append(filtered_property)
198 else:
199 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
200 if filtered_property in request_filters:
201 pipeline.extend(pipeline_steps)
202 added_properties.append(filtered_property)
203 pipeline.append({"$match": request_filters})
204 if sort_field in cls.custom_pipeline_steps:
205 pipeline.extend(cls.custom_pipeline_steps[sort_field])
206 added_properties.append(sort_field)
207 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}])
209 if (query.limit is not None) and (query.limit != 0):
210 pipeline.append({"$limit": query.limit})
212 for filtered_property, step in cls.custom_pipeline_steps.items():
213 if filtered_property not in added_properties:
214 pipeline.extend(step)
216 cursor = collection.aggregate(pipeline)
218 for item_dict in cursor:
219 items.append(cls.mongo_dict_to_object(item_dict))
221 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
223 @classmethod
224 def get_from_id(cls, item_id) -> Self | None:
225 return cls.get_one_by_attribute("_id", ObjectId(item_id))
227 @classmethod
228 def mongo_dict_to_object(cls, mongo_dict):
229 mongo_dict["id"] = str(mongo_dict["_id"])
230 del mongo_dict["_id"]
231 return cls.dict_to_object(mongo_dict)
233 @classmethod
234 def get_by_attribute(cls, attribute_name: str, attribute_value):
235 """Returns all items that match the attribute with value."""
236 pipeline = []
237 if attribute_name in cls.custom_pipeline_steps:
238 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
239 pipeline.append({"$match": {attribute_name: attribute_value}})
240 for key, step in cls.custom_pipeline_steps.items():
241 if key != attribute_name:
242 pipeline.extend(step)
243 items = cls.collection().aggregate(pipeline)
244 if items is None:
245 return None
246 return [cls.mongo_dict_to_object(d) for d in items]
248 @classmethod
249 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
250 pipeline = []
251 if attribute_name in cls.custom_pipeline_steps:
252 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
253 pipeline.append({"$match": {attribute_name: attribute_value}})
254 pipeline.append({"$limit": 1})
255 for key, step in cls.custom_pipeline_steps.items():
256 if key != attribute_name:
257 pipeline.extend(step)
258 items = cls.collection().aggregate(pipeline).to_list()
259 if len(items) == 0:
260 return None
261 return cls.mongo_dict_to_object(items[0])
263 @classmethod
264 def get_all(cls, sort_by="_id") -> list[Self]:
265 items = []
266 pipeline = []
267 if sort_by in cls.custom_pipeline_steps:
268 pipeline.extend(cls.custom_pipeline_steps[sort_by])
269 pipeline.append({"$sort": {sort_by: ASCENDING}})
270 for key, step in cls.custom_pipeline_steps.items():
271 if key != sort_by:
272 pipeline.extend(step)
273 for dict_ in cls.collection().aggregate(pipeline):
274 items.append(cls.mongo_dict_to_object(dict_))
275 return items
277 @classmethod
278 def get_number_documents(cls):
279 collection = get_collection(systems_database, cls.collection_name)
280 if collection is None:
281 return 0
282 return collection.count_documents({})
284 def insert(self):
285 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
286 self.id = str(insert_result.inserted_id)
287 return self.id
289 def update(self, update_dict):
290 for key, value in update_dict.items():
291 setattr(self, key, value)
292 self.collection().find_one_and_update(
293 {"_id": ObjectId(self.id)},
294 {"$set": update_dict},
295 return_document=ReturnDocument.AFTER,
296 )
298 return self
300 def delete(self):
301 result = self.collection().delete_one({"_id": ObjectId(self.id)})
302 return result.deleted_count > 0
305class User(GenericMongo):
306 collection_name: ClassVar[str] = "users"
308 firstname: str
309 lastname: str
310 email: str
311 password: str
312 is_active: bool | None = False
313 is_admin: bool | None = False
314 is_connected: bool | None = False
315 company_id: str | None = None
317 def to_dict(self, exclude=None):
318 if exclude is None:
319 exclude = {"password"}
320 return GenericMongo.to_dict(self, exclude=exclude)
322 @classmethod
323 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
324 users = cls.get_all()
325 if not users:
326 is_admin = True
327 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
328 user_collection = get_collection(systems_database, "users", create=True)
329 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
330 if new_user is None:
331 return None
332 return {"user_id": str(new_user.inserted_id)}
334 @classmethod
335 def update_info(cls, user: "UserUpdate", user_id: str):
336 updated_user = cls.collection().find_one_and_update(
337 {"_id": ObjectId(user_id)},
338 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
339 return_document=ReturnDocument.AFTER,
340 )
341 updated_user["id"] = str(updated_user["_id"])
342 del (updated_user["_id"], updated_user["is_connected"])
343 return cls(**updated_user)
346UserUpdate = create_update_model(User)
349class Mode(TwinPadModel):
350 mode_id: int
351 name: str
352 frequency_multiplier: float
353 min_frequency: float
356class DeviceUpdate(TwinPadModel):
357 mode_id: int
360class Device(GenericMongo):
361 collection_name: ClassVar[str] = "devices"
363 device_id: str
364 config_id: str | None = None
365 config_name: str | None = None
366 name: str
367 description: str = ""
368 modes: list[Mode]
369 current_mode_id: int | None = None
370 last_ping: float | None = None
371 petri_network: Any
372 pid: Any
373 load: float | None = None
374 tokens: list[int] = Field(default_factory=list)
375 status: str
377 async def change_mode(self, update_dict, current_user: User):
378 has_error = False
380 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
381 has_error = True
382 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
383 elif self.current_mode_id is not None:
384 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
385 else:
386 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
387 command = Command(
388 sent_at=time.time(),
389 command_type="Mode change",
390 description=description,
391 user_id=current_user.id,
392 )
394 if has_error:
395 command.response_time = 0
396 command.succeeded = False
397 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
398 else:
399 response = await send_mode_change(self.device_id, update_dict.mode_id)
400 command.receive_response(response)
402 Command.create(command)
403 return response
405 @classmethod
406 def get_from_device_or_config_id(cls, device_or_config_id: str):
407 items = (
408 cls.collection()
409 .aggregate(
410 [
411 {"$match": {"$or": [{"device_id": device_or_config_id}, {"config_id": device_or_config_id}]}},
412 {"$limit": 1},
413 ]
414 )
415 .to_list()
416 )
417 if len(items) == 0:
418 return None
419 return cls.mongo_dict_to_object(items[0])
421 @classmethod
422 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
423 devices_by_id = {}
424 for signal_id in signal_ids:
425 device_or_config_id = signal_id.split(".")[0]
426 if device_or_config_id not in devices_by_id:
427 devices_by_id[device_or_config_id] = cls.get_from_device_or_config_id(device_or_config_id)
428 return devices_by_id
431class DeviceSetup(GenericMongo):
432 collection_name: ClassVar[str] = "device_setups"
434 device_ids: list[str]
435 active: bool = False
436 variable_mapping: dict[str, str]
439DeviceSetupUpdate = create_update_model(DeviceSetup)
442class DeviceState(GenericMongo):
443 collection_name: ClassVar[str] = "devices_states"
445 timestamp: float
446 mode: str | None = None
447 load: float | None = None
448 tokens: list[int] = Field(default_factory=list)
449 config_id: str | None = None
450 modified_properties: list[str] = Field(default_factory=list)
452 @classmethod
453 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
454 req_filter = query.mongodb_filter()
455 items = []
456 if ":" in query.sort_by:
457 sort_field, sort_order = query.sort_by.split(":")
458 sort_order = int(sort_order)
459 else:
460 sort_field = query.sort_by
461 sort_order = 1
462 collection = get_collection(devices_states_database, device_id)
463 if collection is None:
464 total = 0
465 cursor = []
466 else:
467 total = collection.count_documents(req_filter)
468 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
469 if (query.limit is not None) and (query.limit != 0):
470 cursor = cursor.limit(query.limit)
471 for item_dict in cursor:
472 items.append(
473 cls(
474 timestamp=item_dict.get("precise_timestamp"),
475 mode=item_dict.get("mode"),
476 load=item_dict.get("load"),
477 tokens=item_dict.get("tokens", Field(default_factory=list)),
478 config_id=item_dict.get("config_id"),
479 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
480 )
481 )
482 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
485class SignalSample(TwinPadModel):
486 signal_id: str
487 timestamp: float
488 value: float | int | str | bool | None
489 forced_value: float | int | str | bool | None = None
491 @classmethod
492 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
493 collection = get_signal_collection(signal_id)
494 real_signal_id = signal_id
496 if collection is None:
497 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
498 if device is not None:
499 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
500 collection = get_signal_collection(real_signal_id)
502 if collection is None:
503 return None
505 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
506 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
507 bucket = get_signal_collection(f"system.buckets.{real_signal_id}")
508 first_bucket = None
509 if bucket is not None:
510 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
511 if first_bucket is not None:
512 sample_data = collection.find_one(
513 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
514 )
515 else:
516 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
518 if sample_data is None:
519 return None
521 timestamp = sample_data["precise_timestamp"]
523 return cls(
524 signal_id=real_signal_id,
525 timestamp=timestamp,
526 value=sample_data.get("value", None),
527 forced_value=sample_data.get("forced_value", None),
528 )
530 @classmethod
531 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
532 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
534 @classmethod
535 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
536 collection = get_signal_collection(signal_id)
537 real_signal_id = signal_id
539 if collection is None:
540 if device is None:
541 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
542 if device is not None:
543 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
544 collection = get_signal_collection(real_signal_id)
546 if collection is None:
547 return None
549 # Same workaround as above function, very effective to narrow down big sets of data
550 bucket = get_signal_collection(f"system.buckets.{signal_id}")
551 last_bucket = None
552 if bucket is not None:
553 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
554 if last_bucket is not None:
555 sample_data = collection.find_one(
556 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
557 sort={"precise_timestamp": -1},
558 )
559 else:
560 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
562 if sample_data is None:
563 return None
565 timestamp = sample_data["precise_timestamp"]
567 if device is None:
568 device = Device.get_from_device_or_config_id(real_signal_id.split(".")[0])
569 if device is not None and device.last_ping is not None:
570 if timestamp is None:
571 timestamp = device.last_ping
572 else:
573 timestamp = max(timestamp, device.last_ping)
574 return cls(
575 signal_id=real_signal_id,
576 timestamp=timestamp,
577 value=sample_data.get("value", None),
578 forced_value=sample_data.get("forced_value", None),
579 )
581 @classmethod
582 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None:
583 collection = get_signal_collection(signal_id)
584 real_signal_id = signal_id
586 if collection is None:
587 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
588 if device is not None:
589 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
590 collection = get_signal_collection(real_signal_id)
592 if collection is None:
593 return None
595 sample_data = collection.find_one(
596 {"precise_timestamp": {"$gte": min_timestamp}}, sort={"precise_timestamp": -1}
597 )
598 if sample_data is None:
599 return None
601 return cls(
602 signal_id=real_signal_id,
603 timestamp=sample_data.get("precise_timestamp"),
604 value=sample_data.get("value"),
605 forced_value=sample_data.get("forced_value"),
606 )
608 @classmethod
609 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
610 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
611 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
613 @classmethod
614 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None:
615 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids]
618class SignalData(TwinPadModel):
619 signal_id: str
620 forcible: bool = True
621 time_vector: list[float]
622 values: list[float | int | str | None]
623 forced_values: list[float | int | str | None]
625 data_start: float | None = None
626 data_end: float | None = None
628 number_samples: int = 0
629 number_samples_db: int = 0
631 db_query_time: float = 0.0
632 init_time: float = 0.0
633 data_processing_time: float = 0.0
635 @classmethod
636 def get_from_signal_id(
637 cls,
638 signal_id: str,
639 min_timestamp: float = None,
640 max_timestamp: float = None,
641 window_min_timestamp: float = None,
642 window_max_timestamp: float = None,
643 interpolate_bounds: bool = True,
644 max_documents: int = None,
645 ) -> Self:
647 now = time.time()
649 req_signal = {}
650 if min_timestamp is not None:
651 req_signal.setdefault("timestamp", {})
652 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
653 if max_timestamp is not None:
654 req_signal.setdefault("timestamp", {})
655 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
657 collection = get_signal_collection(signal_id)
659 real_signal_id = signal_id
661 if collection is None:
662 device = Device.get_from_device_or_config_id(signal_id.split(".")[0])
663 if device is not None:
664 real_signal_id = f"{device.device_id}.{device.config_id}.{signal_id.split('.')[-1]}"
665 collection = get_signal_collection(real_signal_id)
667 if collection is None:
668 return cls(
669 signal_id=real_signal_id,
670 time_vector=[],
671 values=[],
672 forced_values=[],
673 number_samples=0,
674 number_samples_db=0,
675 )
677 db_req_start = time.time()
679 sort_step = {"$sort": {"precise_timestamp": 1}}
680 number_results = collection.count_documents(req_signal)
682 pipeline = []
683 if req_signal:
684 pipeline.append({"$match": req_signal}) # Filter data if needed
686 pipeline.extend(
687 [
688 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
689 sort_step,
690 ]
691 )
693 if max_documents is not None and max_documents < number_results:
694 unsampling_ratio = math.ceil(number_results / max_documents)
695 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
696 pipeline.extend(
697 [
698 {
699 "$setWindowFields": {
700 "sortBy": {"precise_timestamp": 1},
701 "output": {"index": {"$documentNumber": {}}},
702 }
703 },
704 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
705 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
706 {"$replaceRoot": {"newRoot": "$doc"}},
707 {"$unset": ["index", "group_id"]},
708 {"$sort": {"precise_timestamp": 1}},
709 ]
710 )
712 # logger.info(f"pipeline: %s", str(pipeline))
713 cursor = collection.aggregate(pipeline)
714 db_req_time = time.time() - db_req_start
716 init_time = time.time()
718 results = cursor.to_list()
719 time_vector = []
720 values = []
721 forced_values = []
722 for s in results:
723 time_vector.append(s["precise_timestamp"])
724 values.append(s.get("value", None))
725 forced_values.append(s.get("forced_value", None))
727 signal = Signal.get_from_signal_id(real_signal_id)
728 class_ = signal.signal_data_class
730 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
731 time_vector, values, forced_values = cls.interpolate_bounds(
732 class_,
733 collection,
734 real_signal_id,
735 time_vector,
736 values,
737 forced_values,
738 window_min_timestamp,
739 window_max_timestamp,
740 )
742 if values:
743 # TODO: check below. a bit strange
744 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
745 # Adding last value as it should be repeated
746 time_vector.append(now)
747 values.append(values[-1])
748 forced_values.append(forced_values[-1])
750 init_time = time.time() - init_time
752 # See line 292 for explanation
753 bucket = get_signal_collection(f"system.buckets.{real_signal_id}")
754 first_bucket = None
755 if bucket is not None:
756 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
757 if first_bucket is not None:
758 data_start = first_bucket["control"]["min"]["precise_timestamp"]
759 else:
760 data_start = None
762 last_bucket = None
763 if bucket is not None:
764 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
765 if last_bucket is not None:
766 data_end = last_bucket["control"]["max"]["precise_timestamp"]
767 else:
768 data_end = None
770 return class_(
771 signal_id=real_signal_id,
772 forcible=signal.forcible,
773 time_vector=time_vector,
774 values=values,
775 forced_values=forced_values,
776 data_start=data_start,
777 data_end=data_end,
778 number_samples=len(values),
779 number_samples_db=number_results,
780 db_query_time=db_req_time,
781 init_time=init_time,
782 )
784 @staticmethod
785 def interpolate_bounds(
786 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
787 ):
788 sample_right = None
789 # Fetching right side value & interpolation
790 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
791 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
792 sample_right = collection.find_one(
793 {
794 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
795 "value": {"$exists": True},
796 },
797 sort={"precise_timestamp": -1},
798 )
799 if sample_right:
800 if time_vector:
801 right_sd = class_(
802 signal_id=signal_id,
803 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
804 values=[values[-1], sample_right.get("value", None)],
805 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
806 )
807 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
808 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
809 else:
810 max_ts_value = sample_right.get("value", None)
811 max_ts_forced_value = sample_right.get("forced_value", None)
812 time_vector.append(window_max_timestamp)
813 values.append(max_ts_value)
814 forced_values.append(max_ts_forced_value)
816 # Fetching left side value & interpolation
817 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
818 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
819 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
820 sample_left = sample_right
821 sample_left = collection.find_one(
822 {
823 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
824 "value": {"$exists": True},
825 },
826 sort={"precise_timestamp": -1},
827 )
829 if sample_left:
830 if time_vector:
831 left_sd = class_(
832 signal_id=signal_id,
833 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
834 values=[sample_left["value"], values[0]],
835 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
836 )
837 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
838 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
839 else:
840 min_ts_value = sample_left.get("value", None)
841 min_ts_forced_value = sample_left.get("forced_value", None)
842 time_vector.insert(0, window_min_timestamp)
843 values.insert(0, min_ts_value)
844 forced_values.insert(0, min_ts_forced_value)
846 return time_vector, values, forced_values
848 def interpolate_values(self, new_time_vector: list[float]):
849 return self.interpolate(new_time_vector, self.values)
851 def interpolate_forced_values(self, new_time_vector: list[float]):
852 return self.interpolate(new_time_vector, self.forced_values)
854 def uniform_desampling(self, number_samples_max: int) -> Self:
855 data_processing_time = time.time()
856 if number_samples_max and self.number_samples > number_samples_max:
857 new_time_vector = npy.linspace(
858 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
859 ).tolist()
860 values = self.interpolate_values(new_time_vector)
861 forced_values = self.interpolate_forced_values(new_time_vector)
862 time_vector = new_time_vector
863 number_samples = len(time_vector)
864 else:
865 time_vector = self.time_vector
866 number_samples = len(self.values)
867 values = self.values[:]
868 forced_values = self.forced_values[:]
869 data_processing_time = time.time() - data_processing_time
871 return self.__class__(
872 signal_id=self.signal_id,
873 time_vector=time_vector,
874 values=values,
875 forced_values=forced_values,
876 number_samples=number_samples,
877 number_samples_db=self.number_samples,
878 data_start=self.data_start,
879 data_end=self.data_end,
880 db_query_time=self.db_query_time,
881 init_time=self.init_time,
882 data_processing_time=self.data_processing_time + data_processing_time,
883 )
885 def interest_window_desampling(
886 self,
887 window_max_number_samples: int,
888 outside_max_number_samples: int,
889 window_min_timestamp: float | None = None,
890 window_max_timestamp: float | None = None,
891 ) -> Self:
892 """Performs a sampling in a window of interest and outside."""
894 if not self.time_vector:
895 return self
897 if window_min_timestamp is None:
898 window_min_timestamp = self.time_vector[0]
899 if window_max_timestamp is None:
900 window_max_timestamp = self.time_vector[-1]
902 data_processing_time = time.time()
904 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
905 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
907 time_vector_before = self.time_vector[:index_window_start]
908 time_vector_window = self.time_vector[index_window_start:index_window_end]
909 time_vector_after = self.time_vector[index_window_end:]
911 # Resampling window
912 if time_vector_window:
913 # Ensurring window bounds
914 if time_vector_window[0] != window_min_timestamp:
915 time_vector_window.insert(0, window_min_timestamp)
916 if time_vector_window[-1] != window_max_timestamp:
917 time_vector_window.append(window_max_timestamp)
918 else:
919 time_vector_window = [window_min_timestamp, window_max_timestamp]
921 if len(time_vector_window) > window_max_number_samples:
922 # Resampling
923 new_window_time_vector = npy.linspace(
924 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
925 ).tolist()
926 time_vector_window = new_window_time_vector
928 # Resampling outside
929 number_samples_before = len(time_vector_before)
930 number_samples_after = len(time_vector_after)
931 if (number_samples_before + number_samples_after) > outside_max_number_samples:
932 new_number_samples_before = min(
933 number_samples_before,
934 math.ceil(
935 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
936 ),
937 )
938 new_number_samples_after = min(
939 number_samples_after,
940 math.ceil(
941 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
942 ),
943 )
944 # Adjusting numbers as math.ceil can do +1 on sum
945 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
946 if new_number_samples_before > new_number_samples_after:
947 new_number_samples_before -= 1
948 else:
949 new_number_samples_after -= 1
951 if new_number_samples_before > 0:
952 new_time_vector_before = npy.linspace(
953 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
954 ).tolist()
955 time_vector_before = new_time_vector_before
957 if new_number_samples_after > 0:
958 new_time_vector_after = npy.linspace(
959 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
960 ).tolist()[::-1]
961 time_vector_after = new_time_vector_after
963 new_time_vector = time_vector_before + time_vector_window + time_vector_after
964 values = self.interpolate_values(new_time_vector)
965 forced_values = self.interpolate_forced_values(new_time_vector)
966 number_samples = len(values)
968 data_processing_time = time.time() - data_processing_time
970 return self.__class__(
971 signal_id=self.signal_id,
972 forcible=self.forcible,
973 time_vector=new_time_vector,
974 values=values,
975 forced_values=forced_values,
976 number_samples=number_samples,
977 number_samples_db=self.number_samples,
978 data_start=self.data_start,
979 data_end=self.data_end,
980 db_query_time=self.db_query_time,
981 init_time=self.init_time,
982 data_processing_time=self.data_processing_time + data_processing_time,
983 )
985 def csv_export(self):
986 output = io.StringIO()
987 writer = csv.writer(output)
988 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
989 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
990 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
991 return output.getvalue().encode("utf-8")
993 def prestoplot_export(self):
994 clean_signal_id = self.signal_id.replace(".", "_")
995 if clean_signal_id[0].isnumeric():
996 clean_signal_id = "_" + clean_signal_id
998 output = io.StringIO()
999 output.write("# Encoding:\tUTF-8\n")
1000 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
1001 output.write("ISO8601\tnone\tnone\n")
1002 output.write(f"# Description :\t{clean_signal_id}\n")
1004 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
1005 output.write(
1006 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
1007 )
1008 return output.getvalue().encode("utf-8")
1011class NumericSignalData(SignalData):
1012 data_type: str = "float"
1013 values: list[float | int | None]
1014 forced_values: list[float | int | None]
1016 def interpolate(self, new_time_vector: list[float], items):
1017 items = [npy.nan if s is None else s for s in items]
1018 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
1020 def uniform_desampling(self, number_samples_max: int) -> Self:
1021 data_processing_time = time.time()
1022 if number_samples_max and self.number_samples > number_samples_max:
1023 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
1024 forced_values = self.interpolate_forced_values(time_vector)
1025 number_samples = len(time_vector)
1026 else:
1027 time_vector = self.time_vector
1028 number_samples = len(self.values)
1029 values = self.values[:]
1030 forced_values = self.forced_values[:]
1031 data_processing_time = time.time() - data_processing_time
1033 return self.__class__(
1034 signal_id=self.signal_id,
1035 time_vector=time_vector,
1036 values=values,
1037 forced_values=forced_values,
1038 number_samples=number_samples,
1039 number_samples_db=self.number_samples,
1040 data_start=self.data_start,
1041 data_end=self.data_end,
1042 db_query_time=self.db_query_time,
1043 init_time=self.init_time,
1044 data_processing_time=self.data_processing_time + data_processing_time,
1045 )
1047 def interest_window_desampling(
1048 self,
1049 window_max_number_samples: int,
1050 outside_max_number_samples: int,
1051 window_min_timestamp: float | None = None,
1052 window_max_timestamp: float | None = None,
1053 ) -> Self:
1054 """Performs a sampling in a window of interest and outside."""
1056 if not self.time_vector:
1057 return self
1059 if window_min_timestamp is None:
1060 window_min_timestamp = self.time_vector[0]
1061 if window_max_timestamp is None:
1062 window_max_timestamp = self.time_vector[-1]
1064 data_processing_time = time.time()
1066 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1067 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1069 time_vector_before = self.time_vector[:index_window_start]
1070 time_vector_window = self.time_vector[index_window_start:index_window_end]
1071 time_vector_after = self.time_vector[index_window_end:]
1073 values_before = self.values[:index_window_start]
1074 values_window = self.values[index_window_start:index_window_end]
1075 values_after = self.values[index_window_end:]
1076 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1077 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1079 # Resampling window
1080 if time_vector_window:
1081 # Ensurring window bounds
1082 if time_vector_window[0] != window_min_timestamp:
1083 time_vector_window.insert(0, window_min_timestamp)
1084 values_window.insert(0, window_min_value)
1085 if time_vector_window[-1] != window_max_timestamp:
1086 time_vector_window.append(window_max_timestamp)
1087 values_window.append(window_max_value)
1088 else:
1089 time_vector_window = [window_min_timestamp, window_max_timestamp]
1090 values_window = [window_min_value, window_max_value]
1092 if len(time_vector_window) > window_max_number_samples:
1093 # Resampling
1094 time_vector_window, values_window = downsample_list(
1095 time_vector_window, values_window, window_max_number_samples
1096 )
1098 # Resampling outside
1099 number_samples_before = len(time_vector_before)
1100 number_samples_after = len(time_vector_after)
1101 if (number_samples_before + number_samples_after) > outside_max_number_samples:
1102 new_number_samples_before = min(
1103 number_samples_before,
1104 math.ceil(
1105 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1106 ),
1107 )
1108 new_number_samples_after = min(
1109 number_samples_after,
1110 math.ceil(
1111 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1112 ),
1113 )
1114 # Adjusting numbers as math.ceil can do +1 on sum
1115 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1116 if new_number_samples_before > new_number_samples_after:
1117 new_number_samples_before -= 1
1118 else:
1119 new_number_samples_after -= 1
1121 if new_number_samples_before > 0:
1122 time_vector_before, values_before = downsample_list(
1123 time_vector_before, values_before, new_number_samples_before
1124 )
1126 if new_number_samples_after > 0:
1127 time_vector_after, values_after = downsample_list(
1128 time_vector_after, values_after, new_number_samples_after
1129 )
1131 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1132 values = values_before + values_window + values_after
1133 forced_values = self.interpolate_forced_values(new_time_vector)
1134 number_samples = len(values)
1136 data_processing_time = time.time() - data_processing_time
1138 return self.__class__(
1139 signal_id=self.signal_id,
1140 time_vector=new_time_vector,
1141 values=values,
1142 forced_values=forced_values,
1143 number_samples=number_samples,
1144 number_samples_db=self.number_samples,
1145 data_start=self.data_start,
1146 data_end=self.data_end,
1147 db_query_time=self.db_query_time,
1148 init_time=self.init_time,
1149 data_processing_time=self.data_processing_time + data_processing_time,
1150 )
1153class StringSignalData(SignalData):
1154 data_type: str = "str"
1155 values: list[str | None]
1156 forced_values: list[str | None]
1158 def interpolate(self, new_time_vector: list[float], items):
1159 # Find the indices of the values in xp that are just smaller or equal to x
1160 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1161 indices = npy.clip(indices, 0, len(items) - 1)
1162 # Return the corresponding left string values from fp
1163 return [items[i] for i in indices]
1166class SignalsData(TwinPadModel):
1167 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1168 data_processing_time: float
1169 data_start: float | None
1170 data_end: float | None
1172 @classmethod
1173 def get_from_signal_ids(
1174 cls,
1175 signal_ids: list[str],
1176 min_timestamp: float = None,
1177 max_timestamp: float = None,
1178 window_min_timestamp: float = None,
1179 window_max_timestamp: float = None,
1180 interpolate_bounds: bool = True,
1181 max_documents: int = None,
1182 ) -> Self:
1183 signals_data = []
1184 data_start = None
1185 data_end = None
1186 if max_timestamp is None:
1187 max_timestamp = time.time()
1188 data_processing_time = 0.0
1189 for signal_id in signal_ids:
1190 signal_data = SignalData.get_from_signal_id(
1191 signal_id=signal_id,
1192 min_timestamp=min_timestamp,
1193 max_timestamp=max_timestamp,
1194 window_min_timestamp=window_min_timestamp,
1195 window_max_timestamp=window_max_timestamp,
1196 interpolate_bounds=interpolate_bounds,
1197 max_documents=max_documents,
1198 )
1199 data_processing_time += signal_data.data_processing_time
1200 signals_data.append(signal_data)
1201 if signal_data.data_start is not None:
1202 if data_start is None:
1203 data_start = signal_data.data_start
1204 else:
1205 data_start = min(signal_data.data_start, data_start)
1206 if signal_data.data_end is not None:
1207 if data_end is None:
1208 data_end = signal_data.data_end
1209 else:
1210 data_end = max(signal_data.data_end, data_end)
1212 return cls(
1213 signals_data=signals_data,
1214 data_processing_time=data_processing_time,
1215 data_start=data_start,
1216 data_end=data_end,
1217 )
1219 def uniform_desampling(self, number_samples_max: int) -> Self:
1220 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1221 return SignalsData(
1222 signals_data=signals_data,
1223 data_processing_time=sum(s.data_processing_time for s in signals_data),
1224 data_start=self.data_start,
1225 data_end=self.data_end,
1226 )
1228 def interest_window_desampling(
1229 self,
1230 window_max_number_samples: int,
1231 outside_max_number_samples: int,
1232 window_min_timestamp: float = None,
1233 window_max_timestamp: float = None,
1234 ) -> Self:
1235 signals_data = [
1236 s.interest_window_desampling(
1237 window_max_number_samples=window_max_number_samples,
1238 outside_max_number_samples=outside_max_number_samples,
1239 window_min_timestamp=window_min_timestamp,
1240 window_max_timestamp=window_max_timestamp,
1241 )
1242 for s in self.signals_data
1243 ]
1245 return SignalsData(
1246 signals_data=signals_data,
1247 data_processing_time=sum(s.data_processing_time for s in signals_data),
1248 data_start=self.data_start,
1249 data_end=self.data_end,
1250 )
1252 def zip_export(self, file_format: str = "csv"):
1253 # return self.signals_data[0].csv_export()
1254 zip_buffer = io.BytesIO()
1255 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1256 for signal_data in self.signals_data:
1257 if file_format == "csv":
1258 export_io = signal_data.csv_export()
1259 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1260 elif file_format == "prestoplot":
1261 export_io = signal_data.prestoplot_export()
1262 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1263 else:
1264 raise ValueError(f"Format not found. Got: {file_format}")
1265 zip_bytes = zip_buffer.getvalue()
1266 # zip_bytes.seek(0)
1267 return zip_bytes
1269 def hdf5_export(self):
1270 hdf5_buffer = io.BytesIO()
1271 custom_type_float = npy.dtype(
1272 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1273 )
1274 custom_type_string = npy.dtype(
1275 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1276 )
1277 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1278 for signal_data in self.signals_data:
1279 signal_group = hdf5_file.create_group(signal_data.signal_id)
1280 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1281 if signal_data.data_type == "str":
1282 export_data = npy.array(
1283 list(
1284 zip(
1285 date_vector,
1286 signal_data.time_vector,
1287 signal_data.values,
1288 signal_data.forced_values,
1289 )
1290 ),
1291 dtype=custom_type_string,
1292 )
1293 else:
1294 export_data = npy.array(
1295 list(
1296 zip(
1297 date_vector,
1298 signal_data.time_vector,
1299 signal_data.values,
1300 signal_data.forced_values,
1301 )
1302 ),
1303 dtype=custom_type_float,
1304 )
1305 signal_group["data"] = export_data
1306 return hdf5_buffer.getvalue()
1309class SignalStatus(TwinPadModel):
1310 status: str
1311 reason: str
1312 delay: float | None
1315class DigitizationFunction(TwinPadModel):
1316 bits: int | None = None
1317 min_value: float
1318 max_value: float
1319 min_raw_value: float
1320 max_raw_value: float
1322 @classmethod
1323 def from_bits(cls, bits: int, min_value: float, max_value: float):
1324 return cls(bits=bits, min_raw_value=0, max_raw_value=2**bits - 1, min_value=min_value, max_value=max_value)
1326 @classmethod
1327 def from_values(cls, min_raw_value: float, max_raw_value: float, min_value: float, max_value: float):
1328 return cls(
1329 bits=None,
1330 min_raw_value=min_raw_value,
1331 max_raw_value=max_raw_value,
1332 min_value=min_value,
1333 max_value=max_value,
1334 )
1336 def to_transfer_function(self):
1337 return TransferFunction(intervals=[(self.min_raw_value, self.min_value), (self.max_raw_value, self.max_value)])
1340class SignalUpdate(TwinPadModel):
1341 value: float | str | bool | int | None = None
1342 forced_value: float | str | bool | int | None = None
1343 timestamp: int | None = None
1346class SignalType(str, Enum):
1347 command = "command"
1348 sensor = "sensor"
1349 external_sensor = "external_sensor"
1350 interface = "interface"
1353SIGNALDATA_TYPES = {
1354 "int": NumericSignalData,
1355 "float": NumericSignalData,
1356 "str": StringSignalData,
1357 "bool": NumericSignalData,
1358 "epoch": NumericSignalData,
1359}
1362class LoopAddress(TwinPadModel):
1363 card_number: int
1364 channel: int
1367class Address(LoopAddress):
1368 loop_number: int
1371class TransferFunction(TwinPadModel):
1372 """
1373 A piecewise monotone linear function.
1374 """
1376 intervals: list[tuple[float, float]]
1378 def evaluate(self, x):
1379 for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]):
1380 if x1 <= x and x <= x2:
1381 return (y2 - y1) / (x2 - x1) * (x - x1) + y1
1382 raise ValueError("Out of bounds")
1384 def reverse(self, y):
1385 for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]):
1386 if min(y1, y2) <= y <= max(y1, y2):
1387 return (x2 - x1) / (y2 - y1) * (y - y1) + x1
1388 raise ValueError(f"Out of bounds: {y} is not in {self.intervals[0][1]}, {self.intervals[0][1]}")
1390 def compose(self, other_function):
1391 if other_function is None:
1392 return self
1393 # for (x1, y1), (x2, y2) in zip(self.intervals[:-1], self.intervals[1:]):
1394 # other_function.reverse()
1395 # Reversing other function x
1396 new_x = {self.reverse(x) for (x, _) in other_function.intervals}
1397 new_x.union(x for (x, _) in self.intervals)
1398 new_intervals = [(x, other_function.evaluate(self.evaluate(x))) for x in sorted(new_x)]
1399 return TransferFunction(new_intervals)
1401 def inverse(self):
1402 """
1403 Calculate the inverse function of this transfer function.
1405 The inverse of a piecewise monotone linear function can be calculated by inverting each interval.
1406 This means swapping x and y, then solving for y.
1408 Returns:
1409 TransferFunction: The inverse function of this transfer function.
1410 """
1412 return TransferFunction([(x, y) for (y, x) in self.intervals])
1415class Signal(GenericMongo):
1416 collection_name: ClassVar[str] = "signals"
1418 signal_id: str
1419 frequency: float
1420 unit: str | None
1421 description: str
1422 type: SignalType
1423 address: Address | LoopAddress | None = None
1424 data_type: str
1425 transfer_function: TransferFunction | None = None
1426 precision_digits: int | None
1427 digitization_function: DigitizationFunction | None = None
1428 forcible: bool
1430 @property
1431 def device(self) -> Device:
1432 device_or_config_id = self.signal_id.split(".")[0]
1433 return Device.get_from_device_or_config_id(device_or_config_id)
1435 @cached_property
1436 def ticker(self) -> str:
1437 return self.signal_id.split(".")[-1]
1439 @cached_property
1440 def signal_data_class(self):
1441 if self.data_type in SIGNALDATA_TYPES:
1442 return SIGNALDATA_TYPES[self.data_type]
1443 if self.data_type.startswith("enum"):
1444 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1445 raise ValueError(f"Unhandled python type: {self.data_type}")
1447 @cached_property
1448 def python_type(self):
1449 if self.data_type in TYPES:
1450 return TYPES[self.data_type]
1451 if self.data_type.startswith("enum"):
1452 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1453 return Literal[*choices]
1454 raise ValueError(f"Unhandled python type: {self.data_type}")
1456 @computed_field
1457 @property
1458 def status(self) -> SignalStatus:
1459 now = time.time()
1460 status = "up"
1461 reason = ""
1463 # See line 285 for explanation
1464 bucket = get_signal_collection(f"system.buckets.{self.signal_id}")
1465 last_bucket = None
1466 if bucket is not None:
1467 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1468 if last_bucket is None:
1469 status = "no data"
1470 reason = "signal does not exist"
1471 return SignalStatus(status=status, reason=reason, delay=None)
1473 try:
1474 last_date = last_bucket["control"]["max"]["timestamp"]
1475 last_date = last_date.replace(tzinfo=pytz.UTC)
1476 last_value_ts = last_date.timestamp()
1477 except IndexError:
1478 last_value_ts = None
1480 if last_value_ts is None:
1481 delay = None
1482 reason = "No data from signal"
1483 else:
1484 # Since device is a computed property, only compute it once
1485 device = self.device
1486 if device is not None and device.last_ping is not None:
1487 last_value_ts = max(last_value_ts, device.last_ping)
1488 delay = now - last_value_ts
1489 if delay > DEVICE_TIMEOUT:
1490 status = "down"
1491 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1492 return SignalStatus(status=status, reason=reason, delay=delay)
1494 async def send_command(self, device_id: str, update_dict: SignalUpdate, current_user: User) -> dict:
1495 command = Command(
1496 sent_at=time.time(),
1497 command_type="Signal command",
1498 user_id=current_user.id,
1499 )
1501 has_input_error = False
1502 error_message = ""
1504 if self.data_type.startswith("enum"):
1505 enum_options = get_args(self.python_type)
1507 if update_dict.value is not None and update_dict.value not in enum_options:
1508 has_input_error = True
1509 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1510 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1511 has_input_error = True
1512 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1513 else:
1514 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1515 has_input_error = True
1516 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1517 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1518 has_input_error = True
1519 error_message += (
1520 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1521 )
1523 if has_input_error:
1524 command.response_time = 0
1525 command.succeeded = False
1526 command.description = f"Tried to modify signal {self.signal_id}"
1527 response = {"error": True, "status_code": 400, "message": error_message}
1528 else:
1529 response = await send_signal_value(device_id, self.signal_id, update_dict)
1530 command.receive_response(response)
1532 Command.create(command)
1533 return response
1535 @classmethod
1536 def get_from_signal_id(cls, signal_id: str) -> Self:
1537 """Could be generic from mongo"""
1538 signal = Signal.get_one_by_attribute("signal_id", signal_id)
1539 if signal is None:
1540 split_signal_id = signal_id.split(".")
1541 device_or_config_id = split_signal_id[0]
1542 ticker = split_signal_id[-1]
1543 possible_device = Device.get_from_device_or_config_id(device_or_config_id)
1544 if possible_device is not None:
1545 signal = Signal.get_one_by_attribute(
1546 "signal_id", f"{possible_device.device_id}.{possible_device.config_id}.{ticker}"
1547 )
1548 if not signal:
1549 return None
1550 return cls.dict_to_object(signal)
1552 @classmethod
1553 def get_all_ids(cls) -> list[str]:
1554 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1556 return [signal["signal_id"] for signal in cursor]
1558 async def number_samples(self):
1559 collection = get_signal_collection(signal_id=self.signal_id)
1560 if collection is None:
1561 return 0
1563 number_samples = collection.estimated_document_count()
1565 number_samples_async_collection = await get_async_collection(
1566 systems_async_database, "number_samples", create=True, time_series=True
1567 )
1569 loop = asyncio.get_running_loop()
1570 loop.create_task(
1571 number_samples_async_collection.insert_one(
1572 {
1573 "timestamp": datetime.datetime.now(pytz.UTC),
1574 "signal_id": self.signal_id,
1575 "number_samples": number_samples,
1576 }
1577 )
1578 )
1580 return number_samples
1582 def sample_datasize(self):
1583 return signals_database.command("collstats", self.signal_id)["size"]
1585 @classmethod
1586 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1587 result = cls.collection().aggregate(
1588 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1589 )
1591 return {signal["signal_id"]: signal["forcible"] for signal in result}
1594class Parameter(TwinPadModel):
1595 name: str
1596 value: str | float | bool | int
1599class Component(GenericMongo):
1600 collection_name: ClassVar[str] = "components"
1602 id: int
1603 name: str
1604 signals: list[Signal]
1605 parameters: list[Parameter] = []
1606 reference: str = None
1609class ServicesStatus(TwinPadModel):
1610 backend: str
1611 cloud_broker: str
1612 time_series_database: str
1613 signal_storage: str
1614 heartbeat_storage: str
1615 data_analyzer: str
1617 @classmethod
1618 def check(cls) -> Self:
1619 return cls(
1620 cloud_broker=ping(RABBITMQ_HOST),
1621 backend="up",
1622 time_series_database=ping(MONGO_HOST),
1623 signal_storage=ping(SIGNAL_STORAGE_HOST),
1624 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1625 data_analyzer=ping(DATA_ANALYZER_HOST),
1626 )
1629def ping(host):
1630 try:
1631 if ping3.ping(host, timeout=0.8):
1632 return "up"
1633 except PermissionError:
1634 pass
1635 return "down"
1638class Event(GenericMongo):
1639 collection_name: ClassVar[str] = "events"
1641 name: str
1642 timestamp: float
1643 event_rule_id: str
1645 @computed_field
1646 @cached_property
1647 def event_rule(self) -> "EventRule":
1648 return EventRule.get_from_id(self.event_rule_id)
1650 @classmethod
1651 def dict_to_object(cls, dict_):
1652 """Refine to convert timestamp to datetime for mongodb."""
1653 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1654 return super().dict_to_object(dict_)
1657class TwinPadActivity(GenericMongo):
1658 timestamp: float
1659 amount: int
1661 @classmethod
1662 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
1663 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1664 number_events_collection = get_collection(systems_database, "number_events")
1665 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1666 items = []
1667 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1668 if number_events_collection is None or recompute_amount:
1669 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1670 number_events_collection.delete_many({})
1671 first_event = events_collection.find_one(sort={"timestamp": 1})
1672 if first_event is None:
1673 return items
1674 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1675 tzinfo=pytz.UTC
1676 )
1677 while last_computed_day < TODAY:
1678 day_nb_events = events_collection.count_documents(
1679 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1680 )
1681 if day_nb_events > 0:
1682 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
1683 last_computed_day += ONE_DAY_OFFSET
1684 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1685 if number_events_today > 0:
1686 number_events_collection.delete_many({"timestamp": TODAY})
1687 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
1688 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1689 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1690 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1691 for day in number_events:
1692 day["timestamp"] = day["timestamp"].timestamp()
1693 items.append(cls.mongo_dict_to_object(day))
1694 return items
1696 @classmethod
1697 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1698 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1699 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1700 signals_number_samples_collection = get_collection(
1701 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
1702 )
1703 items = []
1704 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1705 if number_samples_collection is None or recompute_amount:
1706 number_samples_collection = get_collection(
1707 systems_database, "number_received_samples", create=True, time_series=True
1708 )
1709 number_samples_collection.delete_many({})
1710 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
1711 if first_sample is None:
1712 return items
1713 # compute from day of first found event
1714 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
1715 tzinfo=pytz.UTC
1716 )
1717 while last_computed_day < TODAY:
1718 number_samples_request = signals_number_samples_collection.aggregate(
1719 [
1720 {
1721 "$match": {
1722 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
1723 }
1724 },
1725 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1726 ]
1727 ).to_list()
1728 if len(number_samples_request) == 0:
1729 number_samples = 0
1730 else:
1731 number_samples = number_samples_request[0].get("number_samples", 0)
1732 if number_samples > 0:
1733 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
1734 last_computed_day += ONE_DAY_OFFSET
1735 number_samples_request = signals_number_samples_collection.aggregate(
1736 [
1737 {"$match": {"timestamp": {"$gte": TODAY}}},
1738 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
1739 ]
1740 ).to_list()
1741 if len(number_samples_request) == 0:
1742 number_samples_today = 0
1743 else:
1744 number_samples_today = number_samples_request[0].get("number_samples", 0)
1745 if number_samples_today > 0:
1746 number_samples_collection.delete_many({"timestamp": TODAY})
1747 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
1748 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1749 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1750 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1751 for day in number_events:
1752 day["timestamp"] = day["timestamp"].timestamp()
1753 items.append(cls.mongo_dict_to_object(day))
1754 return items
1756 @classmethod
1757 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
1758 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1759 number_commands_collection = get_collection(systems_database, "number_commands")
1760 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
1761 items = []
1762 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1763 if number_commands_collection is None or recompute_amount:
1764 number_commands_collection = get_collection(
1765 systems_database, "number_commands", create=True, time_series=True
1766 )
1767 number_commands_collection.delete_many({})
1768 first_command = commands_collection.find_one(sort={"timestamp": 1})
1769 if first_command is None:
1770 return items
1771 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
1772 tzinfo=pytz.UTC
1773 )
1774 while last_computed_day < TODAY:
1775 day_nb_commands = commands_collection.count_documents(
1776 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1777 )
1778 if day_nb_commands > 0:
1779 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
1780 last_computed_day += ONE_DAY_OFFSET
1781 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
1782 if number_commands_today > 0:
1783 number_commands_collection.delete_many({"timestamp": TODAY})
1784 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
1785 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1786 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1787 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1788 for day in number_commands:
1789 day["timestamp"] = day["timestamp"].timestamp()
1790 items.append(cls.mongo_dict_to_object(day))
1791 return items
1794class EventRule(GenericMongo):
1795 collection_name: ClassVar[str] = "event_rules"
1797 name: str
1798 formula: str
1799 variables: list[str]
1801 @computed_field
1802 @cached_property
1803 def number_events(self) -> int:
1804 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1807class Company(GenericMongo):
1808 collection_name: ClassVar[str] = "companies"
1809 name: str
1812class Campaign(GenericMongo):
1813 collection_name: ClassVar[str] = "campaigns"
1815 # Properties
1816 id: str | None = None
1817 name: str
1818 description: str | None = None
1820 @classmethod
1821 def create(cls, campaign: Self):
1822 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1823 if new_campaign is None:
1824 return None
1825 return {"campaign_id": str(new_campaign.inserted_id)}
1827 @classmethod
1828 def update(cls, campaign: Self):
1829 updated_campaign = cls.collection().find_one_and_update(
1830 {"_id": ObjectId(campaign.id)},
1831 {"$set": {"name": campaign.name, "description": campaign.description}},
1832 return_document=ReturnDocument.AFTER,
1833 )
1834 return updated_campaign
1836 @classmethod
1837 def delete(cls, campaign_id):
1838 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1839 return deleted_user
1842class Phase(GenericMongo):
1843 collection_name: ClassVar[str] = "phases"
1845 # Properties
1846 id: str | None = None
1847 name: str
1848 description: str | None = None
1849 start_at: float
1850 end_at: float
1852 # FK
1853 campaign_id: str
1855 # @classmethod
1856 # def get_by_date(cls, datetime: float):
1857 # phases = []
1858 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1859 # phases.append(cls.dict_to_object(dict_).model_dump())
1860 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1861 # phases.append(cls.dict_to_object(dict_).model_dump())
1862 # if phases is None:
1863 # return None
1864 # return phases
1866 @classmethod
1867 def create(cls, phase: Self):
1868 phase = Phase(
1869 name=phase.name,
1870 description=phase.description,
1871 start_at=phase.start_at,
1872 end_at=phase.end_at,
1873 campaign_id=phase.campaign_id,
1874 )
1875 phase_collection = get_collection(systems_database, "phases", create=True)
1876 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1877 if new_phase is None:
1878 return None
1879 return {"phase_id": str(new_phase.inserted_id)}
1881 @classmethod
1882 def update(cls, phase: Self):
1883 updated_phase = cls.collection().find_one_and_update(
1884 {"_id": ObjectId(phase.id)},
1885 {
1886 "$set": {
1887 "name": phase.name,
1888 "description": phase.description,
1889 "start_at": phase.start_at,
1890 "end_at": phase.end_at,
1891 }
1892 },
1893 return_document=ReturnDocument.AFTER,
1894 )
1895 return updated_phase
1897 @classmethod
1898 def delete(cls, phase_id):
1899 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1900 return delete_phase
1902 @classmethod
1903 def deleteMany(cls, campaign_id):
1904 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1905 return delete_phases
1908class CustomViewCreation(GenericMongo):
1909 collection_name: ClassVar[str] = "custom_views"
1911 name: str
1912 configuration: list
1915class CustomView(CustomViewCreation):
1916 # Properties
1917 id: str | None = None
1919 # Foreign Key
1920 user_id: str
1922 # # Methods
1923 # @classmethod
1924 # def create(cls, form_custom_view: Self, user_id) -> list:
1925 # custom_view = CustomView(
1926 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1927 # )
1928 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1929 # return new_custom_view
1931 # @classmethod
1932 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1933 # updated_custom_view = cls.collection().find_one_and_update(
1934 # {"_id": ObjectId(custom_view_id)},
1935 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1936 # return_document=ReturnDocument.AFTER,
1937 # )
1938 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1939 # del updated_custom_view["_id"]
1940 # return cls(**updated_custom_view)
1942 # @classmethod
1943 # def delete(cls, custom_view_id) -> bool:
1944 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1945 # return deleted_custom_view.acknowledged
1948CustomViewUpdate = create_update_model(CustomView)
1951class Video(GenericMongo):
1952 collection_name: ClassVar[str] = "videos"
1954 # Properties
1955 name: str
1956 ip_addr: str
1957 username: str | None = None
1958 password: str | None = None
1960 # Methods
1961 @classmethod
1962 def get_all(cls, sort_by="_id") -> list[Self]:
1963 items = []
1964 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1965 items.append(cls.mongo_dict_to_object(dict_))
1966 return items
1968 @classmethod
1969 def get_video(cls, camera_id: ObjectId):
1970 camera = cls.get_from_id(camera_id)
1971 if camera is not None:
1972 return camera.name
1973 return None
1976class Command(GenericMongo):
1977 collection_name: ClassVar[str] = "commands"
1979 # Properties
1980 timestamp: datetime.datetime = None
1981 sent_at: float
1982 response_time: float = 0.0
1983 command_type: str
1984 description: str = ""
1985 succeeded: bool = False
1987 # Foreign key
1988 user_id: str
1990 @classmethod
1991 def collection(cls):
1992 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
1994 @classmethod
1995 def create(cls, command: Self):
1996 command = cls(
1997 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
1998 sent_at=command.sent_at,
1999 response_time=command.response_time,
2000 command_type=command.command_type,
2001 description=command.description,
2002 succeeded=command.succeeded,
2003 user_id=command.user_id,
2004 )
2005 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
2006 if new_command is None:
2007 return None
2008 return {"command_id": str(new_command.inserted_id)}
2010 def receive_response(self, response: dict):
2011 self.response_time = time.time() - self.sent_at
2012 self.succeeded = response.get("error", True) is False
2013 if self.description == "":
2014 self.description += response.get("message", "").rstrip()
2017class SignalsPresetCreation(GenericMongo):
2018 name: str
2019 signal_ids: list[str]
2022class SignalsPreset(SignalsPresetCreation):
2023 collection_name: ClassVar[str] = "signals_presets"
2025 user_id: str
2027 @classmethod
2028 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
2029 signals_preset = cls(
2030 user_id=user_id,
2031 name=signals_preset.name,
2032 signal_ids=signals_preset.signal_ids,
2033 )
2035 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
2037 return str(new_signal_preset.inserted_id)
2040SignalsPresetUpdate = create_update_model(SignalsPreset)
2043class LineStyle(str, Enum):
2044 solid = "solid"
2045 dotted = "dotted"
2046 dashed = "dashed"
2049class SignalAppearance:
2050 value_color: str
2051 forced_value_color: str
2054class GraphThemeCreation(GenericMongo):
2055 collection_name: ClassVar[str] = "graph_themes"
2057 name: str
2058 signal_id: str
2059 value_color: str = ""
2060 forced_value_color: str = ""
2061 value_line_style: LineStyle = LineStyle.solid
2062 forced_value_line_style: LineStyle = LineStyle.solid
2063 private: bool = True
2066class PublicGraphTheme(GraphThemeCreation):
2067 created_by_user: bool
2068 in_user_library: bool
2069 active_for_user: bool
2071 _current_user_id: str = ""
2073 @classproperty
2074 def custom_pipeline_steps(cls) -> dict[str, list]:
2075 return {
2076 "created_by_user": [
2077 {
2078 "$addFields": {
2079 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
2080 }
2081 }
2082 ],
2083 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
2084 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
2085 ],
2086 "in_user_library": [
2087 {
2088 "$addFields": {
2089 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2090 }
2091 }
2092 ],
2093 "active_for_user": [
2094 {
2095 "$addFields": {
2096 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2097 }
2098 }
2099 ],
2100 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2101 "active": [
2102 {
2103 "$addFields": {
2104 "active": "$$REMOVE",
2105 }
2106 }
2107 ],
2108 "creator_id": [
2109 {
2110 "$addFields": {
2111 "creator_id": "$$REMOVE",
2112 }
2113 }
2114 ],
2115 }
2117 @classmethod
2118 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2119 cls._current_user_id = user_id
2120 return super().response_from_query(query)
2122 @classmethod
2123 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2124 query.in_user_library = "true"
2125 return cls.response_from_query(query, user_id)
2127 @classmethod
2128 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2129 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2131 @classmethod
2132 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2133 cls._current_user_id = user_id
2134 return super().get_by_attribute(attribute_name, attribute_value)
2136 @classmethod
2137 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2138 cls._current_user_id = user_id
2139 return super().get_one_by_attribute(attribute_name, attribute_value)
2141 @classmethod
2142 def get_all(cls, sort_by: str, user_id: str):
2143 cls._current_user_id = user_id
2144 return super().get_all(sort_by)
2146 @classmethod
2147 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2148 pipeline = [
2149 {
2150 "$match": {
2151 "active": {"$eq": user_id},
2152 "signal_id": {"$in": signal_ids},
2153 }
2154 },
2155 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2156 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2157 {
2158 "$project": {
2159 "_id": 0,
2160 "signal_id": 1,
2161 "value_color": 1,
2162 "forced_value_color": 1,
2163 "value_line_style": 1,
2164 "forced_value_line_style": 1,
2165 }
2166 },
2167 ]
2169 result = {}
2171 cursor = cls.collection().aggregate(pipeline)
2172 for document in cursor:
2173 signal_id = document["signal_id"]
2174 del document["signal_id"]
2175 result[signal_id] = document
2177 return result
2180GraphThemeUpdate = create_update_model(PublicGraphTheme)
2183class PrivateGraphTheme(GraphThemeCreation):
2184 # private
2185 creator_id: str
2186 in_library: list[str]
2187 active: list[str]
2189 @classmethod
2190 def create(
2191 cls,
2192 creator_id: str,
2193 name: str,
2194 signal_id: str,
2195 value_color: str,
2196 forced_value_color: str,
2197 value_line_style: LineStyle,
2198 forced_value_line_style: LineStyle,
2199 private: bool,
2200 ):
2201 color_setting = cls(
2202 creator_id=creator_id,
2203 name=name,
2204 signal_id=signal_id,
2205 value_color=value_color,
2206 forced_value_color=forced_value_color,
2207 value_line_style=value_line_style,
2208 forced_value_line_style=forced_value_line_style,
2209 private=private,
2210 in_library=[creator_id],
2211 active=[creator_id],
2212 )
2214 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2215 color_setting.id = str(new_color_setting.inserted_id)
2216 return color_setting
2218 def update(self, update_dict: dict, user_id: str):
2219 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2220 if in_user_lib and user_id not in self.in_library:
2221 self.in_library.append(user_id)
2222 elif not in_user_lib and user_id in self.in_library:
2223 self.in_library.remove(user_id)
2224 update_dict["in_library"] = self.in_library
2225 del update_dict["in_user_library"]
2227 if (active_for_user := update_dict.get("active_for_user")) is not None:
2228 if active_for_user and user_id not in self.active:
2229 self.active.append(user_id)
2230 elif not active_for_user and user_id in self.active:
2231 self.active.remove(user_id)
2232 update_dict["active"] = self.active
2233 del update_dict["active_for_user"]
2235 if update_dict.get("created_by_user") is not None:
2236 del update_dict["created_by_user"]
2238 self.collection().find_one_and_update(
2239 {"_id": ObjectId(self.id)},
2240 {"$set": update_dict},
2241 )
2243 return {}
2246class Configuration(GenericMongo):
2247 collection_name: ClassVar[str] = "configs"
2249 # Properties
2250 config_id: str | None = None
2251 generated_at: float
2252 config: dict
2253 components: list
2254 hardware_topology: dict
2255 received_at: float
2256 in_use_by_devices: list[str] = []
2257 is_in_use: bool = False
2259 custom_pipeline_steps = {
2260 "is_in_use": [
2261 {
2262 "$addFields": {
2263 "is_in_use": {
2264 "$cond": [
2265 {"$gt": [{"$size": {"$ifNull": ["$in_use_by_devices", []]}}, 0]},
2266 True,
2267 False,
2268 ]
2269 },
2270 }
2271 }
2272 ],
2273 }
2275 @classmethod
2276 def get_from_config_id(cls, config_id: str) -> Self:
2277 items = (
2278 cls.collection()
2279 .aggregate(
2280 [
2281 {"$match": {"config_id": config_id}},
2282 {"$limit": 1},
2283 ]
2284 )
2285 .to_list()
2286 )
2287 if len(items) == 0:
2288 return None
2289 dict_ = items[0]
2290 # There is some protected information in the config dict, so keep only specific keys
2291 allowed_config_keys = ["description", "broker_host", "target_device_id", "device_name"]
2292 config_dict = dict_.get("config")
2293 dict_["config"] = {k: config_dict[k] for k in allowed_config_keys}
2294 return cls.mongo_dict_to_object(dict_)
2297DIGITIZATION_FUNCTIONS_FROM_REFERENCE = {
2298 "EL1809": DigitizationFunction.from_bits(1, 0, 1),
2299 "EL1819": DigitizationFunction.from_bits(1, 0, 1),
2300 "EL3124": DigitizationFunction.from_bits(15, 0.004, 0.020),
2301 "EL3062-0030": DigitizationFunction.from_bits(15, 0.0, 30.0),
2302 "EL3356-0020": DigitizationFunction.from_bits(24, -12.0, 12.0),
2303 "EL2004": DigitizationFunction.from_bits(1, 0, 1),
2304 "EL2042": DigitizationFunction.from_bits(1, 0, 1),
2305 "EL1004": DigitizationFunction.from_bits(1, 0, 1),
2306 "EL3054": DigitizationFunction.from_bits(15, 0.004, 0.020),
2307 "EL3202": DigitizationFunction.from_values(-2000, 8500, -200, 850),
2308 "EL4022": DigitizationFunction.from_bits(15, 0.004, 0.020),
2309 "EL3064": DigitizationFunction.from_bits(15, 0.0, 10.0),
2310 "EL2022": DigitizationFunction.from_bits(1, 0, 1),
2311 "ELX2008": DigitizationFunction.from_bits(1, 0, 1),
2312 "EXP3158": DigitizationFunction.from_bits(15, 0.004, 0.020),
2313 "EPX1058": DigitizationFunction.from_bits(1, 0, 1),
2314 "ELX4154": DigitizationFunction.from_bits(15, 0.004, 0.020),
2315 "ELM3704": DigitizationFunction.from_bits(31, 0.004, 0.020),
2316 "EL3062": DigitizationFunction.from_bits(15, 0, 10),
2317 "EL3351": DigitizationFunction.from_bits(15, -0.020, 0.020),
2318 "EL9410": DigitizationFunction.from_bits(1, 0, 1),
2319 "EL2911": DigitizationFunction.from_bits(1, 0, 1),
2320}
2323class EtherCatModule:
2324 name: str
2325 reference: str
2326 # ports: list[Port]
2327 signals: list[Signal] = []
2330class EtherCatLoop:
2331 terminals: list[EtherCatModule]
2333 @cached_property
2334 def signals(self):
2335 signals = []
2336 for terminal in self.terminals:
2337 signals.extend(terminal.signals)
2338 return signals
2340 def blueprint(self, filename: str):
2341 width = 0
2342 height = 0
2344 images = []
2345 missing_images = set()
2346 for terminal in self.terminals:
2347 # Load terminal image
2348 try:
2349 terminal_image = Image.open(f"beckhoff/{terminal.reference}.png")
2350 except FileNotFoundError:
2351 terminal_image = Image.open("beckhoff/undefined.png")
2352 missing_images.add(terminal.reference)
2353 images.append(terminal_image)
2355 height = max(height, terminal_image.height)
2356 width += terminal_image.width
2357 if missing_images:
2358 print(f"Missing images: {missing_images}")
2359 im = Image.new("RGBA", (width, height))
2360 x = 0
2361 for image in images:
2363 # box =(x, 0, x, image.width)
2364 im.paste(image, (x, 0))
2365 x += image.width
2366 im.save(filename)
2367 return im
2369 @classmethod
2370 def from_eni(cls, filename: str):
2372 tags = xml_tags(filename)
2374 modules = []
2375 for slave in tags["Slave"]:
2376 # for child in slave:
2377 info = slave.find("Info")
2378 name = info.find("Name").text
2380 subname, ref = extract_subname_ref(name)
2382 modules.append(EtherCatModule(subname, ref))
2383 # print(attr)
2384 return cls(modules)
2386 @classmethod
2387 def from_syvar_xml(cls, filename):
2388 tags = xml_tags(filename)
2390 modules = []
2391 card_number = 0
2392 for slave in tags["Interface"]:
2393 cards_signals = []
2395 # for child in slave:
2396 name = slave.find("name").text
2397 extracted_subname_ref = extract_subname_ref(name)
2398 if extracted_subname_ref is not None:
2399 card_number += 1
2400 subname, ref = extracted_subname_ref
2402 for variable in slave.find("Variables"):
2403 ticker = variable.find("mnemonic").text
2404 description = variable.find("information").text
2405 # Unit
2406 unit_tag = variable.find("datatype").find("unit")
2407 if unit_tag is not None:
2408 unit = unit_tag.text
2409 else:
2410 unit = None
2411 if unit == "NoUnit":
2412 unit = None
2414 address_tag = variable.find("behavior").find("address")
2415 if address_tag is not None:
2416 address = address_tag.text
2417 match = re.search(CHANNEL_PATTERN, address)
2418 if match:
2420 channel_number = int(match.group(1))
2422 digitization_function = DIGITIZATION_FUNCTIONS_FROM_REFERENCE[ref]
2423 transfer_function = None
2425 signal = Signal(
2426 signal_id=ticker,
2427 description=description,
2428 unit=unit,
2429 type="sensor",
2430 address=LoopAddress(card_number=card_number, channel=channel_number),
2431 frequency=None,
2432 transfer_function=transfer_function,
2433 precision_digits=None,
2434 data_type="float",
2435 digitization_function=digitization_function,
2436 )
2437 cards_signals.append(signal)
2439 modules.append(EtherCatModule(subname, ref, signals=cards_signals))
2441 return cls(modules)
2444class EtherCatTopology:
2445 loops: list[EtherCatLoop]
2447 @cached_property
2448 def signals(self) -> list[Signal]:
2449 signals = []
2450 for loop in self.loops:
2451 signals.extend(loop.signals)
2452 return signals
2454 def wiring_xlsx(self, filename: str):
2455 wb = Workbook()
2456 signals_sheet = wb.active
2457 signals_sheet.title = "signals"
2459 for iloop, loop in enumerate(self.loops):
2460 loop_sheet = wb.create_sheet(f"loop_{iloop+1}")
2461 # Creating card header
2462 loop_sheet["B1"] = "Card name"
2463 loop_sheet["B2"] = "Card reference"
2464 loop_sheet["B3"] = "Position"
2465 loop_sheet["A4"] = "Channel"
2466 for i in range(16):
2467 loop_sheet.cell(row=4 + i, column=2, value=i + 1)
2469 for icard, card in enumerate(loop.terminals):
2470 loop_sheet.cell(row=1, column=3 + icard, value=card.name)
2471 loop_sheet.cell(row=2, column=3 + icard, value=card.reference)
2472 loop_sheet.cell(row=3, column=3 + icard, value=icard + 1)
2474 for signal in card.signals:
2475 loop_sheet.cell(row=3 + signal.address.channel, column=3 + icard, value=signal.ticker)
2477 thin = Side(border_style="thin", color="000000")
2478 for i in range(19):
2479 for j in range(len(loop.terminals) + 2):
2480 cell = loop_sheet.cell(row=i + 1, column=j + 1)
2481 cell.border = Border(top=thin, left=thin, right=thin, bottom=thin)
2482 loop_sheet.merge_cells("A4:A19")
2484 # signals_sheet = wb.create_sheet('signals')
2485 for i, header in enumerate(XLSX_HEADER):
2486 signals_sheet.cell(row=1, column=1 + i, value=header)
2488 for isignal, signal in enumerate(self.signals):
2489 signals_sheet.cell(row=isignal + 2, column=1, value=signal.ticker)
2490 signals_sheet.cell(row=isignal + 2, column=3, value=signal.description)
2491 signals_sheet.cell(row=isignal + 2, column=4, value=signal.unit)
2492 signals_sheet.cell(row=isignal + 2, column=5, value=signal.type)
2493 signals_sheet.cell(row=isignal + 2, column=6, value=signal.frequency)
2494 signals_sheet.cell(row=isignal + 2, column=7, value=signal.transfer_function)
2495 signals_sheet.cell(row=isignal + 2, column=8, value=signal.precision_digits)
2497 wb.save(filename)