Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%
1366 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-22 08:16 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-22 08:16 +0000
1from functools import cached_property
2import os
3import re
4import io
5import time
6import csv
7from typing import Self, ClassVar, Any, Literal, get_args, Annotated
8import datetime
9import math
10import bisect
11from enum import Enum
12import logging
13import copy
14import asyncio
15import requests
17import zipfile
18import ping3
19import pytz
20from bson.objectid import ObjectId
21from pymongo import ASCENDING, ReturnDocument
22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator
23import numpy as npy
24import lttb
25import h5py
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 get_signal_collections_batch,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 signals_async_database,
36 devices_states_database,
37)
38from twinpad_backend.responses import ListResponse
39from twinpad_backend.messages import RabbitMQClient
40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm
42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"]
44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"]
45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"]
48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
55NUMBER_SAMPLES_DATABASE_UPDATE = 120
57logger = logging.getLogger("uvicorn.error")
60class DeleteInfo(BaseModel):
61 is_deleted: bool
62 detail: str
65class classproperty:
66 """
67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
68 Found here: https://stackoverflow.com/a/76301341
69 """
71 def __init__(self, func):
72 self.fget = func
74 def __get__(self, _, owner):
75 return self.fget(owner)
78def create_update_model(model):
79 fields = {}
81 for field_name, field_annotation in model.model_fields.items():
82 if field_name != "id":
83 fields[field_name] = (field_annotation.annotation | None, None)
85 query_name = model.__name__ + "Update"
86 return create_model(query_name, **fields)
89def get_utc_date_from_timestamp(timestamp: float):
90 return datetime.datetime.fromtimestamp(timestamp).isoformat()
93def downsample_list(time_vector: list, values: list, max_number_samples: int):
94 if len(time_vector) < max_number_samples:
95 return time_vector, values
97 time_vector_copy = copy.deepcopy(time_vector)
98 values_copy = copy.deepcopy(values)
100 none_group_bounds = []
101 none_group_index = -1
102 index = -1
103 # LTTB doesn't handle None values so remove them
104 while values_copy.count(None) > 0:
105 # Store bounds of None value groups so we can insert them back after the downsampling
106 if (new_index := values_copy.index(None)) != index:
107 none_group_bounds.append([time_vector_copy.pop(new_index)])
108 none_group_index += 1
109 elif len(none_group_bounds[none_group_index]) < 2:
110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
111 else:
112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
113 values_copy.pop(new_index)
114 index = new_index
115 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
117 try:
118 values_array = npy.array([time_vector_copy, values_copy]).T
119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
121 new_time_vector = interpolated_values[:, 0].tolist()
122 new_values = interpolated_values[:, 1].tolist()
123 except ValueError:
124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
128 return new_time_vector, new_values_nan_to_none
130 # insert back None values at the correct timestamps
131 for none_group in none_group_bounds:
132 start_index = npy.searchsorted(new_time_vector, none_group[0])
133 new_time_vector[start_index:start_index] = none_group
134 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
136 return new_time_vector, new_values
139def is_of_type(value, wanted_type):
140 if wanted_type is float:
141 return isinstance(value, (int, float))
142 return isinstance(value, wanted_type)
145# Models
146class TwinPadModel(BaseModel):
147 @classmethod
148 def dict_to_object(cls, dict_):
149 return cls.model_validate(dict_)
151 def to_dict(self, exclude=None):
152 dict_ = self.model_dump(exclude=exclude, mode="json")
153 return dict_
156def validate_mongo_id(v):
157 if not ObjectId.is_valid(v):
158 raise ValueError("Invalid MongoDB id")
159 return str(v)
162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)]
165def validate_12_hex(v: str) -> str:
166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v):
167 raise ValueError("ID must be a 12-character hexadecimal string")
168 return v
171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)]
174class GenericMongo(TwinPadModel):
175 id: MongoId | None = None
176 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
178 @classmethod
179 def collection(cls):
180 return get_collection(systems_database, cls.collection_name, create=True)
182 @classmethod
183 def response_from_query(cls, query) -> ListResponse[Self]:
184 request_filters = query.mongodb_filter()
185 items = []
187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
188 sort_dict = {}
189 for sort in query.sort_by.split(","):
190 if ":" in sort:
191 sort_field, sort_order = sort.split(":")
192 sort_order = int(sort_order)
193 else:
194 sort_field = sort
195 sort_order = 1
196 sort_dict[sort_field] = sort_order
198 collection = get_collection(systems_database, cls.collection_name, create=True)
199 total = collection.count_documents(request_filters)
201 pipeline = []
202 added_properties = []
203 if "$and" in request_filters:
204 for request_filter in request_filters["$and"]:
205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
206 if filtered_property in request_filter:
207 pipeline.extend(pipeline_steps)
208 added_properties.append(filtered_property)
209 else:
210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
211 if filtered_property in request_filters:
212 pipeline.extend(pipeline_steps)
213 added_properties.append(filtered_property)
214 pipeline.append({"$match": request_filters})
216 for sort_field in sort_dict.keys():
217 if sort_field in cls.custom_pipeline_steps:
218 pipeline.extend(cls.custom_pipeline_steps[sort_field])
219 added_properties.append(sort_field)
220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
222 if (query.limit is not None) and (query.limit != 0):
223 pipeline.append({"$limit": query.limit})
225 for filtered_property, step in cls.custom_pipeline_steps.items():
226 if filtered_property not in added_properties:
227 pipeline.extend(step)
229 cursor = collection.aggregate(pipeline)
231 for item_dict in cursor:
232 items.append(cls.mongo_dict_to_object(item_dict))
234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
236 @classmethod
237 def get_from_id(cls, item_id) -> Self | None:
238 return cls.get_one_by_attribute("_id", ObjectId(item_id))
240 @classmethod
241 def mongo_dict_to_object(cls, mongo_dict):
242 mongo_dict["id"] = str(mongo_dict["_id"])
243 del mongo_dict["_id"]
244 return cls.dict_to_object(mongo_dict)
246 @classmethod
247 def get_by_attribute(cls, attribute_name: str, attribute_value):
248 """Returns all items that match the attribute with value."""
249 pipeline = []
250 if attribute_name in cls.custom_pipeline_steps:
251 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
252 pipeline.append({"$match": {attribute_name: attribute_value}})
253 for key, step in cls.custom_pipeline_steps.items():
254 if key != attribute_name:
255 pipeline.extend(step)
256 items = cls.collection().aggregate(pipeline)
257 if items is None:
258 return None
259 return [cls.mongo_dict_to_object(d) for d in items]
261 @classmethod
262 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
263 pipeline = []
264 if attribute_name in cls.custom_pipeline_steps:
265 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
266 pipeline.append({"$match": {attribute_name: attribute_value}})
267 pipeline.append({"$limit": 1})
268 for key, step in cls.custom_pipeline_steps.items():
269 if key != attribute_name:
270 pipeline.extend(step)
271 items = cls.collection().aggregate(pipeline).to_list()
272 if len(items) == 0:
273 return None
274 return cls.mongo_dict_to_object(items[0])
276 @classmethod
277 def get_all(cls, sort_by="_id") -> list[Self]:
278 items = []
279 pipeline = []
280 if sort_by in cls.custom_pipeline_steps:
281 pipeline.extend(cls.custom_pipeline_steps[sort_by])
282 pipeline.append({"$sort": {sort_by: ASCENDING}})
283 for key, step in cls.custom_pipeline_steps.items():
284 if key != sort_by:
285 pipeline.extend(step)
286 for dict_ in cls.collection().aggregate(pipeline):
287 items.append(cls.mongo_dict_to_object(dict_))
288 return items
290 @classmethod
291 def get_number_documents(cls):
292 collection = get_collection(systems_database, cls.collection_name)
293 if collection is None:
294 return 0
295 return collection.count_documents(
296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]}
297 )
299 def insert(self):
300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"}))
301 self.id = str(insert_result.inserted_id)
302 return self.id
304 def update(self, update_dict):
305 for key, value in update_dict.items():
306 setattr(self, key, value)
307 self.collection().find_one_and_update(
308 {"_id": ObjectId(self.id)},
309 {"$set": update_dict},
310 return_document=ReturnDocument.AFTER,
311 )
313 return self
315 def delete(self):
316 result = self.collection().delete_one({"_id": ObjectId(self.id)})
317 return result.deleted_count > 0
320class User(GenericMongo):
321 collection_name: ClassVar[str] = "users"
323 firstname: str
324 lastname: str
325 email: str
326 password: str
327 is_active: bool | None = False
328 is_admin: bool | None = False
329 is_connected: bool | None = False
330 company_id: str | None = None
332 def to_dict(self, exclude: set = set()):
333 exclude.add("password")
334 return GenericMongo.to_dict(self, exclude=exclude)
336 @classmethod
337 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
338 users = cls.get_all()
339 if not users:
340 is_admin = True
341 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
342 user_collection = get_collection(systems_database, "users", create=True)
343 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
344 if new_user is None:
345 return None
346 return {"user_id": str(new_user.inserted_id)}
348 @classmethod
349 def update_info(cls, user: "UserUpdate", user_id: str):
350 updated_user = cls.collection().find_one_and_update(
351 {"_id": ObjectId(user_id)},
352 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
353 return_document=ReturnDocument.AFTER,
354 )
355 updated_user["id"] = str(updated_user["_id"])
356 del (updated_user["_id"], updated_user["is_connected"])
357 return cls(**updated_user)
360UserUpdate = create_update_model(User)
363class Mode(TwinPadModel):
364 mode_id: int
365 name: str
366 frequency_multiplier: float
367 min_frequency: float
370class DeviceUpdate(TwinPadModel):
371 mode_id: int
374class Device(GenericMongo):
375 collection_name: ClassVar[str] = "devices"
377 device_id: DeviceId
378 name: str
379 description: str = ""
380 modes: list[Mode]
381 current_mode_id: int | None = None
382 last_ping: float | None = None
383 petri_network: Any
384 pid: Any
385 load: float | None = None
386 tokens: list[int] = Field(default_factory=list)
387 status: str
389 async def change_mode(self, update_dict, current_user: User):
390 has_error = False
392 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
393 has_error = True
394 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
395 elif self.current_mode_id is not None:
396 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
397 else:
398 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
399 command = Command(
400 sent_at=time.time(),
401 command_type="Mode change",
402 description=description,
403 user_id=current_user.id,
404 )
406 if has_error:
407 command.response_time = 0
408 command.succeeded = False
409 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
410 else:
411 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id)
412 command.receive_response(response)
414 Command.create(command)
415 return response
417 @classmethod
418 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
419 devices_by_id = {}
420 for signal_id in signal_ids:
421 device_id = signal_id.split(".")[0]
422 if device_id not in devices_by_id:
423 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
424 return devices_by_id
427class DeviceSetup(GenericMongo):
428 collection_name: ClassVar[str] = "device_setups"
430 device_ids: list[str]
431 active: bool = False
432 variable_mapping: dict[str, str]
435DeviceSetupUpdate = create_update_model(DeviceSetup)
438class DeviceState(GenericMongo):
439 collection_name: ClassVar[str] = "devices_states"
441 timestamp: float
442 mode: str | None = None
443 load: float | None = None
444 tokens: list[int] = Field(default_factory=list)
445 modified_properties: list[str] = Field(default_factory=list)
447 @classmethod
448 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]:
449 req_filter = query.mongodb_filter()
450 items = []
451 if ":" in query.sort_by:
452 sort_field, sort_order = query.sort_by.split(":")
453 sort_order = int(sort_order)
454 else:
455 sort_field = query.sort_by
456 sort_order = 1
457 collection = get_collection(devices_states_database, device_id)
458 if collection is None:
459 total = 0
460 cursor = []
461 else:
462 total = collection.count_documents(req_filter)
463 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
464 if (query.limit is not None) and (query.limit != 0):
465 cursor = cursor.limit(query.limit)
466 for item_dict in cursor:
467 items.append(
468 cls(
469 timestamp=item_dict.get("precise_timestamp"),
470 mode=item_dict.get("mode", None),
471 load=item_dict.get("load", None),
472 tokens=item_dict.get("tokens", Field(default_factory=list)),
473 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
474 )
475 )
476 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
479class SignalSample(TwinPadModel):
480 signal_id: str
481 timestamp: float
482 value: float | int | str | bool | None
483 forced_value: float | int | str | bool | None = None
485 @classmethod
486 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
488 collection = get_signal_collection(signal_id)
489 if collection is None:
490 return None
492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
494 bucket = get_signal_collection(f"system.buckets.{signal_id}")
495 first_bucket = None
496 if bucket is not None:
497 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
498 if first_bucket is not None:
499 sample_data = collection.find_one(
500 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
501 )
502 else:
503 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
505 if sample_data is None:
506 return None
508 timestamp = sample_data["precise_timestamp"]
510 return cls(
511 signal_id=signal_id,
512 timestamp=timestamp,
513 value=sample_data.get("value", None),
514 forced_value=sample_data.get("forced_value", None),
515 )
517 @classmethod
518 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
519 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
521 @classmethod
522 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
523 collection = get_signal_collection(signal_id)
524 if collection is None:
525 return None
527 # Same workaround as above function, very effective to narrow down big sets of data
528 bucket = get_signal_collection(f"system.buckets.{signal_id}")
529 last_bucket = None
530 if bucket is not None:
531 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
532 if last_bucket is not None:
533 sample_data = collection.find_one(
534 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
535 sort={"precise_timestamp": -1},
536 )
537 else:
538 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
540 if sample_data is None:
541 return None
543 timestamp = sample_data["precise_timestamp"]
545 if device is None:
546 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
547 if device is not None and device.last_ping is not None:
548 if timestamp is None:
549 timestamp = device.last_ping
550 else:
551 timestamp = max(timestamp, device.last_ping)
552 return cls(
553 signal_id=signal_id,
554 timestamp=timestamp,
555 value=sample_data.get("value", None),
556 forced_value=sample_data.get("forced_value", None),
557 )
559 @classmethod
560 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
561 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
562 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
565class SignalData(TwinPadModel):
566 signal_id: str
567 forcible: bool = True
568 time_vector: list[float]
569 values: list[float | int | str | None]
570 forced_values: list[float | int | str | None]
572 data_start: float | None = None
573 data_end: float | None = None
575 number_samples: int = 0
576 number_samples_db: int = 0
578 db_query_time: float = 0.0
579 init_time: float = 0.0
580 data_processing_time: float = 0.0
582 phase_id: str | None = None
584 @classmethod
585 def get_from_signal_id(
586 cls,
587 signal_id: str,
588 min_timestamp: float = None,
589 max_timestamp: float = None,
590 window_min_timestamp: float = None,
591 window_max_timestamp: float = None,
592 interpolate_bounds: bool = True,
593 max_documents: int = None,
594 ) -> Self:
596 now = time.time()
598 req_signal = {}
599 if min_timestamp is not None:
600 req_signal.setdefault("timestamp", {})
601 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
602 if max_timestamp is not None:
603 req_signal.setdefault("timestamp", {})
604 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
606 collection = get_signal_collection(signal_id)
607 if collection is None:
608 return cls(
609 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
610 )
612 db_req_start = time.time()
614 sort_step = {"$sort": {"precise_timestamp": 1}}
615 number_results = collection.count_documents(req_signal)
617 pipeline = []
618 if req_signal:
619 pipeline.append({"$match": req_signal}) # Filter data if needed
621 pipeline.extend(
622 [
623 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
624 sort_step,
625 ]
626 )
628 if max_documents is not None and max_documents < number_results:
629 unsampling_ratio = math.ceil(number_results / max_documents)
630 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
631 pipeline.extend(
632 [
633 {
634 "$setWindowFields": {
635 "sortBy": {"precise_timestamp": 1},
636 "output": {"index": {"$documentNumber": {}}},
637 }
638 },
639 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
640 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
641 {"$replaceRoot": {"newRoot": "$doc"}},
642 {"$unset": ["index", "group_id"]},
643 {"$sort": {"precise_timestamp": 1}},
644 ]
645 )
647 # logger.info(f"pipeline: %s", str(pipeline))
648 cursor = collection.aggregate(pipeline)
649 db_req_time = time.time() - db_req_start
651 init_time = time.time()
653 results = cursor.to_list()
654 time_vector = []
655 values = []
656 forced_values = []
657 for s in results:
658 time_vector.append(s["precise_timestamp"])
659 values.append(s.get("value", None))
660 forced_values.append(s.get("forced_value", None))
662 signal = Signal.get_from_signal_id(signal_id)
663 if signal is None:
664 return cls(
665 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
666 )
667 class_ = signal.signal_data_class
669 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
670 time_vector, values, forced_values = cls.interpolate_bounds(
671 class_,
672 collection,
673 signal_id,
674 time_vector,
675 values,
676 forced_values,
677 window_min_timestamp,
678 window_max_timestamp,
679 )
681 if values:
682 # TODO: check below. a bit strange
683 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
684 # Adding last value as it should be repeated
685 time_vector.append(now)
686 values.append(values[-1])
687 forced_values.append(forced_values[-1])
689 init_time = time.time() - init_time
691 # See line 292 for explanation
692 bucket = get_signal_collection(f"system.buckets.{signal_id}")
693 first_bucket = None
694 if bucket is not None:
695 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
696 if first_bucket is not None:
697 data_start = first_bucket["control"]["min"]["precise_timestamp"]
698 else:
699 data_start = None
701 last_bucket = None
702 if bucket is not None:
703 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
704 if last_bucket is not None:
705 data_end = last_bucket["control"]["max"]["precise_timestamp"]
706 else:
707 data_end = None
709 return class_(
710 signal_id=signal_id,
711 forcible=signal.forcible,
712 time_vector=time_vector,
713 values=values,
714 forced_values=forced_values,
715 data_start=data_start,
716 data_end=data_end,
717 number_samples=len(values),
718 number_samples_db=number_results,
719 db_query_time=db_req_time,
720 init_time=init_time,
721 )
723 @staticmethod
724 def interpolate_bounds(
725 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
726 ):
727 sample_right = None
728 # Fetching right side value & interpolation
729 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
730 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
731 sample_right = collection.find_one(
732 {
733 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
734 "value": {"$exists": True},
735 },
736 sort={"precise_timestamp": -1},
737 )
738 if sample_right:
739 if time_vector:
740 right_sd = class_(
741 signal_id=signal_id,
742 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
743 values=[values[-1], sample_right.get("value", None)],
744 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
745 )
746 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
747 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
748 else:
749 max_ts_value = sample_right.get("value", None)
750 max_ts_forced_value = sample_right.get("forced_value", None)
751 time_vector.append(window_max_timestamp)
752 values.append(max_ts_value)
753 forced_values.append(max_ts_forced_value)
755 # Fetching left side value & interpolation
756 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
757 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
758 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
759 sample_left = sample_right
760 sample_left = collection.find_one(
761 {
762 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
763 "value": {"$exists": True},
764 },
765 sort={"precise_timestamp": -1},
766 )
768 if sample_left:
769 if time_vector:
770 left_sd = class_(
771 signal_id=signal_id,
772 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
773 values=[sample_left["value"], values[0]],
774 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
775 )
776 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
777 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
778 else:
779 min_ts_value = sample_left.get("value", None)
780 min_ts_forced_value = sample_left.get("forced_value", None)
781 time_vector.insert(0, window_min_timestamp)
782 values.insert(0, min_ts_value)
783 forced_values.insert(0, min_ts_forced_value)
785 return time_vector, values, forced_values
787 def interpolate_values(self, new_time_vector: list[float]):
788 return self.interpolate(new_time_vector, self.values)
790 def interpolate_forced_values(self, new_time_vector: list[float]):
791 return self.interpolate(new_time_vector, self.forced_values)
793 def uniform_desampling(self, number_samples_max: int) -> Self:
794 data_processing_time = time.time()
795 if number_samples_max and self.number_samples > number_samples_max:
796 new_time_vector = npy.linspace(
797 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
798 ).tolist()
799 values = self.interpolate_values(new_time_vector)
800 forced_values = self.interpolate_forced_values(new_time_vector)
801 time_vector = new_time_vector
802 number_samples = len(time_vector)
803 else:
804 time_vector = self.time_vector
805 number_samples = len(self.values)
806 values = self.values[:]
807 forced_values = self.forced_values[:]
808 data_processing_time = time.time() - data_processing_time
810 return self.__class__(
811 signal_id=self.signal_id,
812 time_vector=time_vector,
813 values=values,
814 forced_values=forced_values,
815 number_samples=number_samples,
816 number_samples_db=self.number_samples,
817 data_start=self.data_start,
818 data_end=self.data_end,
819 db_query_time=self.db_query_time,
820 init_time=self.init_time,
821 data_processing_time=self.data_processing_time + data_processing_time,
822 phase_id=self.phase_id,
823 )
825 def min_max_downsampling(self, number_samples_max: int) -> Self:
826 return self.uniform_desampling(number_samples_max)
828 def interest_window_desampling(
829 self,
830 window_max_number_samples: int,
831 outside_max_number_samples: int,
832 window_min_timestamp: float | None = None,
833 window_max_timestamp: float | None = None,
834 ) -> Self:
835 """Performs a sampling in a window of interest and outside."""
837 if not self.time_vector:
838 return self
840 if window_min_timestamp is None:
841 window_min_timestamp = self.time_vector[0]
842 if window_max_timestamp is None:
843 window_max_timestamp = self.time_vector[-1]
845 data_processing_time = time.time()
847 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
848 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
850 time_vector_before = self.time_vector[:index_window_start]
851 time_vector_window = self.time_vector[index_window_start:index_window_end]
852 time_vector_after = self.time_vector[index_window_end:]
854 # Resampling window
855 if time_vector_window:
856 # Ensurring window bounds
857 if time_vector_window[0] != window_min_timestamp:
858 time_vector_window.insert(0, window_min_timestamp)
859 if time_vector_window[-1] != window_max_timestamp:
860 time_vector_window.append(window_max_timestamp)
861 else:
862 time_vector_window = [window_min_timestamp, window_max_timestamp]
864 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
865 # Resampling
866 new_window_time_vector = npy.linspace(
867 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
868 ).tolist()
869 time_vector_window = new_window_time_vector
871 # Resampling outside
872 number_samples_before = len(time_vector_before)
873 number_samples_after = len(time_vector_after)
874 if (
875 outside_max_number_samples is not None
876 and (number_samples_before + number_samples_after) > outside_max_number_samples
877 ):
878 new_number_samples_before = min(
879 number_samples_before,
880 math.ceil(
881 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
882 ),
883 )
884 new_number_samples_after = min(
885 number_samples_after,
886 math.ceil(
887 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
888 ),
889 )
890 # Adjusting numbers as math.ceil can do +1 on sum
891 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
892 if new_number_samples_before > new_number_samples_after:
893 new_number_samples_before -= 1
894 else:
895 new_number_samples_after -= 1
897 if new_number_samples_before > 0:
898 new_time_vector_before = npy.linspace(
899 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
900 ).tolist()
901 time_vector_before = new_time_vector_before
903 if new_number_samples_after > 0:
904 new_time_vector_after = npy.linspace(
905 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
906 ).tolist()[::-1]
907 time_vector_after = new_time_vector_after
909 new_time_vector = time_vector_before + time_vector_window + time_vector_after
910 values = self.interpolate_values(new_time_vector)
911 forced_values = self.interpolate_forced_values(new_time_vector)
912 number_samples = len(values)
914 data_processing_time = time.time() - data_processing_time
916 return self.__class__(
917 signal_id=self.signal_id,
918 forcible=self.forcible,
919 time_vector=new_time_vector,
920 values=values,
921 forced_values=forced_values,
922 number_samples=number_samples,
923 number_samples_db=self.number_samples,
924 data_start=self.data_start,
925 data_end=self.data_end,
926 db_query_time=self.db_query_time,
927 init_time=self.init_time,
928 data_processing_time=self.data_processing_time + data_processing_time,
929 )
931 def zero_time_vector(self, data_start: float):
932 data_processing_time = time.time()
933 if len(self.time_vector) == 0:
934 return self
935 time_vector = npy.array(self.time_vector) - data_start
936 data_processing_time = time.time() - data_processing_time
938 return self.__class__(
939 signal_id=self.signal_id,
940 time_vector=time_vector,
941 values=self.values,
942 forced_values=self.forced_values,
943 number_samples=self.number_samples,
944 number_samples_db=self.number_samples_db,
945 data_start=time_vector[0],
946 data_end=time_vector[-1],
947 db_query_time=self.db_query_time,
948 init_time=self.init_time,
949 data_processing_time=self.data_processing_time + data_processing_time,
950 )
952 def csv_export(self):
953 output = io.StringIO()
954 writer = csv.writer(output)
955 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
956 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
957 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
958 return output.getvalue().encode("utf-8")
960 def prestoplot_export(self):
961 clean_signal_id = self.signal_id.replace(".", "_")
962 if clean_signal_id[0].isnumeric():
963 clean_signal_id = "_" + clean_signal_id
965 output = io.StringIO()
966 output.write("# Encoding:\tUTF-8\n")
967 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
968 output.write("ISO8601\tnone\tnone\n")
969 output.write(f"# Description :\t{clean_signal_id}\n")
971 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
972 output.write(
973 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
974 )
975 return output.getvalue().encode("utf-8")
978class NumericSignalData(SignalData):
979 data_type: str = "float"
980 values: list[float | int | None]
981 forced_values: list[float | int | None]
983 def interpolate(self, new_time_vector: list[float], items):
984 items = [npy.nan if s is None else s for s in items]
985 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
987 def uniform_desampling(self, number_samples_max: int) -> Self:
988 data_processing_time = time.time()
989 if number_samples_max and self.number_samples > number_samples_max:
990 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
991 forced_values = self.interpolate_forced_values(time_vector)
992 number_samples = len(time_vector)
993 else:
994 time_vector = self.time_vector
995 number_samples = len(self.values)
996 values = self.values[:]
997 forced_values = self.forced_values[:]
998 data_processing_time = time.time() - data_processing_time
1000 return self.__class__(
1001 signal_id=self.signal_id,
1002 time_vector=time_vector,
1003 values=values,
1004 forced_values=forced_values,
1005 number_samples=number_samples,
1006 number_samples_db=self.number_samples,
1007 data_start=self.data_start,
1008 data_end=self.data_end,
1009 db_query_time=self.db_query_time,
1010 init_time=self.init_time,
1011 data_processing_time=self.data_processing_time + data_processing_time,
1012 )
1014 def min_max_downsampling(self, number_samples_max: int) -> Self:
1015 if self.number_samples < number_samples_max:
1016 return self
1018 data_processing_time = time.time()
1020 number_bins = number_samples_max // 2
1022 time_vector = npy.array(self.time_vector, dtype=npy.float64)
1023 values = npy.array(self.values, dtype=npy.float64)
1024 forced_values = npy.array(self.forced_values, dtype=npy.float64)
1026 points_per_bin = self.number_samples // number_bins
1028 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above
1029 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for
1030 if self.number_samples - number_bins * points_per_bin > 1:
1031 points_per_bin += 1
1032 number_bins = self.number_samples // points_per_bin + 1
1033 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan)
1034 time_vector = npy.concatenate([time_vector, nan_points_to_add])
1035 values = npy.concatenate([values, nan_points_to_add])
1036 forced_values = npy.concatenate([forced_values, nan_points_to_add])
1038 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin)
1039 values_matrix = values.reshape(number_bins, points_per_bin)
1040 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin)
1042 indexes_min = npy.zeros(number_bins, dtype="int64")
1043 indexes_max = npy.zeros(number_bins, dtype="int64")
1045 for row in range(number_bins):
1046 min_value = values_matrix[row, 0]
1047 max_value = values_matrix[row, 0]
1048 for column in range(points_per_bin):
1049 if values_matrix[row, column] < min_value:
1050 min_value = values_matrix[row, column]
1051 indexes_min[row] = column
1052 elif values_matrix[row, column] > max_value:
1053 max_value = values_matrix[row, column]
1054 indexes_max[row] = column
1056 row_index = npy.repeat(npy.arange(number_bins), 2)
1057 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel()
1059 data_processing_time = time.time() - data_processing_time
1061 new_time_vector = timestamps_matrix[row_index, column_index]
1062 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector)
1063 new_values = values_matrix[row_index, column_index]
1064 new_values = npy.where(npy.isnan(new_values), None, new_values)
1065 new_forced_values = forced_values_matrix[row_index, column_index]
1066 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1068 # Make sure there are no None values for the time vector
1069 time_vector_filter = new_time_vector != None
1070 new_time_vector = new_time_vector[time_vector_filter]
1071 new_values = new_values[time_vector_filter]
1072 new_forced_values = new_forced_values[time_vector_filter]
1074 return self.__class__(
1075 signal_id=self.signal_id,
1076 time_vector=new_time_vector,
1077 values=new_values,
1078 forced_values=new_forced_values,
1079 number_samples=number_bins * 2,
1080 number_samples_db=self.number_samples_db,
1081 data_start=self.data_start,
1082 data_end=self.data_end,
1083 db_query_time=self.db_query_time,
1084 init_time=self.init_time,
1085 data_processing_time=self.data_processing_time + data_processing_time,
1086 phase_id=self.phase_id,
1087 )
1089 def interest_window_desampling(
1090 self,
1091 window_max_number_samples: int,
1092 outside_max_number_samples: int,
1093 window_min_timestamp: float | None = None,
1094 window_max_timestamp: float | None = None,
1095 ) -> Self:
1096 """Performs a sampling in a window of interest and outside."""
1098 if not self.time_vector:
1099 return self
1101 if window_min_timestamp is None:
1102 window_min_timestamp = self.time_vector[0]
1103 if window_max_timestamp is None:
1104 window_max_timestamp = self.time_vector[-1]
1106 data_processing_time = time.time()
1108 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1109 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1111 time_vector_before = self.time_vector[:index_window_start]
1112 time_vector_window = self.time_vector[index_window_start:index_window_end]
1113 time_vector_after = self.time_vector[index_window_end:]
1115 values_before = self.values[:index_window_start]
1116 values_window = self.values[index_window_start:index_window_end]
1117 values_after = self.values[index_window_end:]
1118 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1119 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1121 # Resampling window
1122 if time_vector_window:
1123 # Ensurring window bounds
1124 if time_vector_window[0] != window_min_timestamp:
1125 time_vector_window.insert(0, window_min_timestamp)
1126 values_window.insert(0, window_min_value)
1127 if time_vector_window[-1] != window_max_timestamp:
1128 time_vector_window.append(window_max_timestamp)
1129 values_window.append(window_max_value)
1130 else:
1131 time_vector_window = [window_min_timestamp, window_max_timestamp]
1132 values_window = [window_min_value, window_max_value]
1134 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
1135 # Resampling
1136 time_vector_window, values_window = downsample_list(
1137 time_vector_window, values_window, window_max_number_samples
1138 )
1140 # Resampling outside
1141 number_samples_before = len(time_vector_before)
1142 number_samples_after = len(time_vector_after)
1143 if (
1144 outside_max_number_samples is not None
1145 and (number_samples_before + number_samples_after) > outside_max_number_samples
1146 ):
1147 new_number_samples_before = min(
1148 number_samples_before,
1149 math.ceil(
1150 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1151 ),
1152 )
1153 new_number_samples_after = min(
1154 number_samples_after,
1155 math.ceil(
1156 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1157 ),
1158 )
1159 # Adjusting numbers as math.ceil can do +1 on sum
1160 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1161 if new_number_samples_before > new_number_samples_after:
1162 new_number_samples_before -= 1
1163 else:
1164 new_number_samples_after -= 1
1166 if new_number_samples_before > 0:
1167 time_vector_before, values_before = downsample_list(
1168 time_vector_before, values_before, new_number_samples_before
1169 )
1171 if new_number_samples_after > 0:
1172 time_vector_after, values_after = downsample_list(
1173 time_vector_after, values_after, new_number_samples_after
1174 )
1176 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1177 values = values_before + values_window + values_after
1178 forced_values = self.interpolate_forced_values(new_time_vector)
1179 number_samples = len(values)
1181 data_processing_time = time.time() - data_processing_time
1183 return self.__class__(
1184 signal_id=self.signal_id,
1185 time_vector=new_time_vector,
1186 values=values,
1187 forced_values=forced_values,
1188 number_samples=number_samples,
1189 number_samples_db=self.number_samples,
1190 data_start=self.data_start,
1191 data_end=self.data_end,
1192 db_query_time=self.db_query_time,
1193 init_time=self.init_time,
1194 data_processing_time=self.data_processing_time + data_processing_time,
1195 )
1198class StringSignalData(SignalData):
1199 data_type: str = "str"
1200 values: list[str | None]
1201 forced_values: list[str | None]
1203 def interpolate(self, new_time_vector: list[float], items):
1204 # Find the indices of the values in xp that are just smaller or equal to x
1205 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1206 indices = npy.clip(indices, 0, len(items) - 1)
1207 # Return the corresponding left string values from fp
1208 return [items[i] for i in indices]
1211class SignalsData(TwinPadModel):
1212 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1213 data_processing_time: float
1214 data_start: float | None
1215 data_end: float | None
1217 @classmethod
1218 def get_from_signal_ids(
1219 cls,
1220 signal_ids: list[str],
1221 min_timestamp: float = None,
1222 max_timestamp: float = None,
1223 window_min_timestamp: float = None,
1224 window_max_timestamp: float = None,
1225 interpolate_bounds: bool = True,
1226 max_documents: int = None,
1227 ) -> Self:
1228 signals_data = []
1229 data_start = None
1230 data_end = None
1231 if max_timestamp is None:
1232 max_timestamp = time.time()
1233 data_processing_time = 0.0
1234 for signal_id in signal_ids:
1235 signal_data = SignalData.get_from_signal_id(
1236 signal_id=signal_id,
1237 min_timestamp=min_timestamp,
1238 max_timestamp=max_timestamp,
1239 window_min_timestamp=window_min_timestamp,
1240 window_max_timestamp=window_max_timestamp,
1241 interpolate_bounds=interpolate_bounds,
1242 max_documents=max_documents,
1243 )
1244 data_processing_time += signal_data.data_processing_time
1245 signals_data.append(signal_data)
1246 if signal_data.data_start is not None:
1247 if data_start is None:
1248 data_start = signal_data.data_start
1249 else:
1250 data_start = min(signal_data.data_start, data_start)
1251 if signal_data.data_end is not None:
1252 if data_end is None:
1253 data_end = signal_data.data_end
1254 else:
1255 data_end = max(signal_data.data_end, data_end)
1257 return cls(
1258 signals_data=signals_data,
1259 data_processing_time=data_processing_time,
1260 data_start=data_start,
1261 data_end=data_end,
1262 )
1264 @classmethod
1265 def get_from_phase_and_signal_ids(
1266 cls,
1267 phases: list,
1268 phase_sync_times: list[float | None],
1269 signal_ids: list[str],
1270 window_min_timestamps: list[float | None],
1271 window_max_timestamps: list[float | None],
1272 zero_time_vector: bool = True,
1273 ):
1274 signals_data: list[SignalData] = []
1275 computation_start = time.time()
1277 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip(
1278 phases, phase_sync_times, window_min_timestamps, window_max_timestamps
1279 ):
1280 min_timestamp = phase.start_at / 1000
1281 max_timestamp = phase.end_at / 1000
1283 if sync_time is None:
1284 sync_time = min_timestamp
1286 if window_max_timestamp is not None and window_min_timestamp is not None:
1287 window_length = window_max_timestamp - window_min_timestamp
1289 if window_min_timestamp != min_timestamp:
1290 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20)
1291 if window_max_timestamp != max_timestamp:
1292 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20)
1294 for signal_id in signal_ids:
1295 signal_data = SignalData.get_from_signal_id(
1296 signal_id,
1297 min_timestamp,
1298 max_timestamp,
1299 window_min_timestamp,
1300 window_max_timestamp,
1301 interpolate_bounds=False,
1302 max_documents=None,
1303 )
1305 if len(signal_data.time_vector) == 0:
1306 continue
1308 if zero_time_vector:
1309 signal_data = signal_data.zero_time_vector(sync_time)
1310 signal_data.phase_id = phase.id
1312 signals_data.append(signal_data)
1314 return cls(
1315 signals_data=signals_data,
1316 data_processing_time=time.time() - computation_start,
1317 data_start=0,
1318 data_end=0,
1319 )
1321 def uniform_desampling(self, number_samples_max: int) -> Self:
1322 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1323 return SignalsData(
1324 signals_data=signals_data,
1325 data_processing_time=sum(s.data_processing_time for s in signals_data),
1326 data_start=self.data_start,
1327 data_end=self.data_end,
1328 )
1330 def min_max_downsampling(self, number_samples_max: int) -> Self:
1331 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data]
1332 return SignalsData(
1333 signals_data=signals_data,
1334 data_processing_time=sum(s.data_processing_time for s in signals_data),
1335 data_start=self.data_start,
1336 data_end=self.data_end,
1337 )
1339 def interest_window_desampling(
1340 self,
1341 window_max_number_samples: int,
1342 outside_max_number_samples: int,
1343 window_min_timestamp: float = None,
1344 window_max_timestamp: float = None,
1345 ) -> Self:
1346 signals_data = [
1347 s.interest_window_desampling(
1348 window_max_number_samples=window_max_number_samples,
1349 outside_max_number_samples=outside_max_number_samples,
1350 window_min_timestamp=window_min_timestamp,
1351 window_max_timestamp=window_max_timestamp,
1352 )
1353 for s in self.signals_data
1354 ]
1356 return SignalsData(
1357 signals_data=signals_data,
1358 data_processing_time=sum(s.data_processing_time for s in signals_data),
1359 data_start=self.data_start,
1360 data_end=self.data_end,
1361 )
1363 def zero_time_vector(self, data_start: float):
1364 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data]
1365 return SignalsData(
1366 signals_data=signals_data,
1367 data_processing_time=sum(s.data_processing_time for s in signals_data),
1368 data_start=0,
1369 data_end=max([s.data_end for s in signals_data]),
1370 )
1372 @classmethod
1373 async def apply_single_function(
1374 cls,
1375 phase,
1376 base_signal_id: str,
1377 function: SINGLE_POST_PROCESSING_FUNCTION,
1378 window_min_timestamp: float = None,
1379 window_max_timestamp: float = None,
1380 ):
1381 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}"
1383 processed_result_signal = Signal.get_from_signal_id(signal_id)
1384 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids:
1385 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp)
1387 signals_data = cls.get_from_phase_and_signal_ids(
1388 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False
1389 )
1391 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0:
1392 return None
1394 new_values = None
1395 new_forced_values = None
1396 time_vector = npy.array(signals_data.signals_data[0].time_vector)
1397 values = signals_data.signals_data[0].values
1398 forced_values = signals_data.signals_data[0].forced_values
1400 match (function):
1401 case "Cumul":
1402 new_values = cumul(values)
1403 new_forced_values = cumul(forced_values)
1404 # case "CumulDistrib":
1405 # new_values = cumul_distrib(values)
1406 # new_forced_values = cumul_distrib(forced_values)
1407 case "Delta":
1408 new_values = delta(values)
1409 new_forced_values = delta(forced_values)
1410 case "DeltaT":
1411 new_values = delta(time_vector)
1412 new_forced_values = new_values
1413 case "Derive":
1414 new_values = derive(time_vector, values)
1415 new_forced_values = derive(time_vector, forced_values)
1416 case "Integ":
1417 new_values = integ(time_vector, values)
1418 new_forced_values = integ(time_vector, forced_values)
1420 new_values = npy.where(npy.isnan(new_values), None, new_values)
1421 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1423 loop = asyncio.get_running_loop()
1424 loop.create_task(
1425 cls.save_function_signal(
1426 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible
1427 )
1428 )
1430 if window_max_timestamp is not None:
1431 max_timestamp_mask = time_vector <= window_max_timestamp
1432 time_vector = time_vector[max_timestamp_mask]
1433 new_values = new_values[max_timestamp_mask]
1434 new_forced_values = new_forced_values[max_timestamp_mask]
1435 if window_min_timestamp is not None:
1436 min_timestamp_mask = time_vector >= window_min_timestamp
1437 time_vector = time_vector[min_timestamp_mask]
1438 new_values = new_values[min_timestamp_mask]
1439 new_forced_values = new_forced_values[min_timestamp_mask]
1441 signals_data.signals_data[0].time_vector = time_vector.tolist()
1442 signals_data.signals_data[0].values = new_values.tolist()
1443 signals_data.signals_data[0].forced_values = new_forced_values.tolist()
1444 signals_data.signals_data[0].number_samples = time_vector.size
1446 signals_data.signals_data[0].signal_id = signal_id
1448 return signals_data
1450 @classmethod
1451 async def apply_multiple_function(
1452 cls,
1453 phases: list,
1454 signal_ids: list,
1455 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
1456 window_min_timestamp: float = None,
1457 window_max_timestamp: float = None,
1458 ):
1459 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION):
1460 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}"
1461 else:
1462 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}"
1464 active_phase = phases[0]
1465 if function in {"Align-X", "Using-X"}:
1466 active_phase = phases[1]
1468 processed_result_signal = Signal.get_from_signal_id(function_signal_id)
1469 if processed_result_signal is not None and (
1470 active_phase.id in processed_result_signal.computed_phases_ids
1471 ): # If signal has been computed for the correct phase
1472 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp)
1474 array_length = None
1475 time_vector_list = []
1476 values_list = []
1477 forced_values_list = []
1478 forcible = True
1479 for phase, signal_id in zip(phases, signal_ids):
1480 signals_data = cls.get_from_phase_and_signal_ids(
1481 [phase], [None], [signal_id], [None], [None], zero_time_vector=False
1482 )
1484 if len(signals_data.signals_data) == 0:
1485 return None
1487 signal_data = signals_data.signals_data[0]
1489 if array_length is None:
1490 array_length = signal_data.number_samples
1491 if (
1492 array_length != signal_data.number_samples and function != "Align-X"
1493 ) or signal_data.number_samples == 0:
1494 return None
1496 time_vector_list.append(npy.array(signal_data.time_vector))
1497 values_list.append(npy.array(signal_data.values, dtype=npy.float64))
1498 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64))
1499 forcible = forcible and signal_data.forcible
1501 time_vector = time_vector_list[0]
1502 new_values = None
1503 new_forced_values = None
1505 match (function):
1506 case "Align-X":
1507 time_vector = time_vector_list[1]
1508 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000
1509 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000
1510 new_values = align_x(old_time_vector, values_list[0], new_time_vector)
1511 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector)
1512 # case "Atan2":
1513 # new_values = atan2(values_list[0], values_list[1])
1514 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1])
1515 case "Using-X":
1516 if len(time_vector_list[0]) != len(time_vector_list[1]):
1517 return None
1518 time_vector = time_vector_list[1]
1519 new_values = values_list[0]
1520 new_forced_values = forced_values_list[0]
1521 case "Mean":
1522 new_values = mean(*values_list)
1523 new_forced_values = mean(*forced_values_list)
1524 case "Norm":
1525 new_values = norm(*values_list)
1526 new_forced_values = norm(*forced_values_list)
1528 new_values = npy.where(npy.isnan(new_values), None, new_values)
1529 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1531 loop = asyncio.get_running_loop()
1532 loop.create_task(
1533 cls.save_function_signal(
1534 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible
1535 )
1536 )
1538 total_number_samples = time_vector.size
1540 if window_max_timestamp is not None:
1541 max_timestamp_mask = time_vector <= window_max_timestamp
1542 time_vector = time_vector[max_timestamp_mask]
1543 new_values = new_values[max_timestamp_mask]
1544 new_forced_values = new_forced_values[max_timestamp_mask]
1545 if window_min_timestamp is not None:
1546 min_timestamp_mask = time_vector >= window_min_timestamp
1547 time_vector = time_vector[min_timestamp_mask]
1548 new_values = new_values[min_timestamp_mask]
1549 new_forced_values = new_forced_values[min_timestamp_mask]
1551 signals_data = cls(
1552 signals_data=[
1553 NumericSignalData(
1554 signal_id=function_signal_id,
1555 forcible=forcible,
1556 time_vector=time_vector.tolist(),
1557 values=new_values.tolist(),
1558 forced_values=new_forced_values.tolist(),
1559 number_samples=time_vector.size,
1560 number_samples_db=total_number_samples,
1561 )
1562 ],
1563 data_processing_time=0,
1564 data_start=0,
1565 data_end=0,
1566 )
1568 return signals_data
1570 @classmethod
1571 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float):
1572 signal_data_collection = get_signal_collection(signal_id, create=True)
1573 pipeline = []
1574 match_filter = {}
1575 if window_min_timestamp is not None or window_max_timestamp is not None:
1576 match_filter["$match"] = {}
1577 match_filter["$match"]["precise_timestamp"] = {}
1578 if window_max_timestamp is not None:
1579 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp
1580 if window_min_timestamp is not None:
1581 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp
1583 total_number_samples = signal_data_collection.count_documents({})
1585 if match_filter:
1586 pipeline.append(match_filter)
1588 fetch_start = time.time()
1590 samples = signal_data_collection.aggregate(pipeline).to_list()
1591 new_time_vector = []
1592 new_values = []
1593 new_forced_values = []
1594 for sample in samples:
1595 new_time_vector.append(sample["precise_timestamp"])
1596 new_values.append(sample["value"])
1597 new_forced_values.append(sample["forced_value"])
1599 return cls(
1600 signals_data=[
1601 NumericSignalData(
1602 signal_id=signal_id,
1603 time_vector=new_time_vector,
1604 values=new_values,
1605 forced_values=new_forced_values,
1606 number_samples=len(new_time_vector),
1607 number_samples_db=total_number_samples,
1608 )
1609 ],
1610 data_processing_time=time.time() - fetch_start,
1611 data_start=0,
1612 data_end=0,
1613 )
1615 @classmethod
1616 async def save_function_signal(
1617 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool
1618 ):
1619 # Insert data first so if it is requested by another user, it will be computed again
1620 signal_collection = get_signal_collection(function_signal_id, create=True)
1621 signal_collection.delete_many(
1622 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}}
1623 )
1624 signal_collection.insert_many(
1625 [
1626 {
1627 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]),
1628 "precise_timestamp": time_vector[i],
1629 "value": new_values[i],
1630 "forced_value": new_forced_values[i],
1631 }
1632 for i in range(len(time_vector))
1633 ]
1634 )
1636 signals_config_collection = get_collection(systems_database, "signals", create=True)
1637 signals_config_collection.find_one_and_update(
1638 {"signal_id": function_signal_id},
1639 {
1640 "$set": {
1641 "description": "",
1642 "unit": None,
1643 "type": "sensor",
1644 "address": None,
1645 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0
1646 "transfer_function": None,
1647 "precision_digits": None,
1648 "digitization_function": None,
1649 "data_type": "float",
1650 "formula": None,
1651 "forcible": forcible,
1652 "commandable": False,
1653 "broadcastable": False,
1654 "signal_id": function_signal_id,
1655 "post_processing": True,
1656 },
1657 "$push": {"computed_phases_ids": phase.id},
1658 },
1659 upsert=True,
1660 )
1662 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []):
1663 if post_processing:
1664 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1665 zip_buffer = io.BytesIO()
1666 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1667 for signal_data in self.signals_data:
1668 file_name = signal_data.signal_id
1669 if post_processing:
1670 phase = phases_by_id.get(
1671 signal_data.phase_id,
1672 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1673 )
1674 file_name = f"{signal_data.signal_id} ({phase.name})"
1675 if file_format == "csv":
1676 export_io = signal_data.csv_export()
1677 zip_file.writestr(f"{file_name}.csv", export_io)
1678 elif file_format == "prestoplot":
1679 export_io = signal_data.prestoplot_export()
1680 zip_file.writestr(f"{file_name}.tab", export_io)
1681 else:
1682 raise ValueError(f"Format not found. Got: {file_format}")
1683 zip_bytes = zip_buffer.getvalue()
1684 return zip_bytes
1686 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []):
1687 if post_processing:
1688 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1689 hdf5_buffer = io.BytesIO()
1690 custom_type_float = npy.dtype(
1691 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1692 )
1693 custom_type_string = npy.dtype(
1694 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1695 )
1696 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1697 for signal_data in self.signals_data:
1698 if post_processing:
1699 phase = phases_by_id.get(
1700 signal_data.phase_id,
1701 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1702 )
1703 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})")
1704 else:
1705 signal_group = hdf5_file.create_group(signal_data.signal_id)
1706 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1707 if signal_data.data_type == "str":
1708 export_data = npy.array(
1709 list(
1710 zip(
1711 date_vector,
1712 signal_data.time_vector,
1713 signal_data.values,
1714 signal_data.forced_values,
1715 )
1716 ),
1717 dtype=custom_type_string,
1718 )
1719 else:
1720 export_data = npy.array(
1721 list(
1722 zip(
1723 date_vector,
1724 signal_data.time_vector,
1725 signal_data.values,
1726 signal_data.forced_values,
1727 )
1728 ),
1729 dtype=custom_type_float,
1730 )
1731 signal_group["data"] = export_data
1732 return hdf5_buffer.getvalue()
1735class SignalStatus(TwinPadModel):
1736 status: str = "down"
1737 reason: str = ""
1738 delay: float | None = None
1741class DigitizationFunction(TwinPadModel):
1742 bits: int | None = None
1743 min_value: float
1744 max_value: float
1745 min_raw_value: float
1746 max_raw_value: float
1749class SignalUpdate(TwinPadModel):
1750 value: float | str | bool | int | None = None
1751 forced_value: float | str | bool | int | None = None
1752 timestamp: int | None = None
1755class SignalType(str, Enum):
1756 command = "command"
1757 sensor = "sensor"
1758 external_sensor = "external_sensor"
1761SIGNALDATA_TYPES = {
1762 "int": NumericSignalData,
1763 "float": NumericSignalData,
1764 "str": StringSignalData,
1765 "bool": NumericSignalData,
1766 "epoch": NumericSignalData,
1767}
1770class Signal(GenericMongo):
1771 collection_name: ClassVar[str] = "signals"
1773 signal_id: str
1774 frequency: float
1775 unit: str | None
1776 description: str
1777 type: SignalType
1778 data_type: str
1779 precision_digits: int | None
1780 forcible: bool
1781 status: SignalStatus = SignalStatus()
1783 post_processing: bool = False
1784 computed_phases_ids: list[str] = []
1786 digitization_function: DigitizationFunction | None
1788 @property
1789 def device(self) -> Device:
1790 device_id = self.signal_id.split(".")[0]
1791 device = Device.get_one_by_attribute("device_id", device_id)
1792 return device
1794 @cached_property
1795 def signal_data_class(self):
1796 if self.data_type in SIGNALDATA_TYPES:
1797 return SIGNALDATA_TYPES[self.data_type]
1798 if self.data_type.startswith("enum"):
1799 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1800 raise ValueError(f"Unhandled python type: {self.data_type}")
1802 @cached_property
1803 def python_type(self):
1804 if self.data_type in TYPES:
1805 return TYPES[self.data_type]
1806 if self.data_type.startswith("enum"):
1807 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1808 return Literal[*choices]
1809 raise ValueError(f"Unhandled python type: {self.data_type}")
1811 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1812 command = Command(
1813 sent_at=time.time(),
1814 command_type="Signal command",
1815 user_id=current_user.id,
1816 )
1818 has_input_error = False
1819 error_message = ""
1821 if self.data_type.startswith("enum"):
1822 enum_options = get_args(self.python_type)
1824 if update_dict.value is not None and update_dict.value not in enum_options:
1825 has_input_error = True
1826 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1827 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1828 has_input_error = True
1829 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1830 else:
1831 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1832 has_input_error = True
1833 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1834 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1835 has_input_error = True
1836 error_message += (
1837 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1838 )
1840 if has_input_error:
1841 command.response_time = 0
1842 command.succeeded = False
1843 command.description = f"Tried to modify signal {self.signal_id}"
1844 response = {"error": True, "status_code": 400, "message": error_message}
1845 else:
1846 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict)
1847 command.receive_response(response)
1849 Command.create(command)
1850 return response
1852 @classmethod
1853 def get_from_signal_id(cls, signal_id) -> Self:
1854 """Could be generic from mongo"""
1855 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1856 if not raw_value:
1857 return None
1858 del raw_value["_id"]
1859 return cls.dict_to_object(raw_value)
1861 @classmethod
1862 def get_all_ids(cls) -> list[str]:
1863 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1865 return [signal["signal_id"] for signal in cursor]
1867 @classmethod
1868 def get_all_statuses(cls) -> list[dict]:
1869 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}])
1871 return [
1872 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]}
1873 for signal in cursor
1874 ]
1876 async def number_samples(self):
1877 collection = get_signal_collection(signal_id=self.signal_id)
1878 if collection is None:
1879 return 0
1881 number_samples = collection.estimated_document_count()
1883 number_samples_async_collection = await get_async_collection(
1884 systems_async_database, "number_samples", create=True, time_series=True
1885 )
1887 loop = asyncio.get_running_loop()
1888 loop.create_task(
1889 number_samples_async_collection.insert_one(
1890 {
1891 "timestamp": datetime.datetime.now(pytz.UTC),
1892 "signal_id": self.signal_id,
1893 "number_samples": number_samples,
1894 }
1895 )
1896 )
1898 return number_samples
1900 @classmethod
1901 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]:
1902 number_samples_by_id = {}
1903 collections = get_signal_collections_batch(signal_ids)
1904 number_samples_async_collection = await get_async_collection(
1905 systems_async_database, "number_samples", create=True, time_series=True
1906 )
1908 for signal_id, collection in zip(signal_ids, collections):
1909 if collection is None:
1910 number_samples_by_id[signal_id] = 0
1911 continue
1913 number_samples = collection.estimated_document_count()
1915 number_samples_by_id[signal_id] = number_samples
1917 now = datetime.datetime.now(pytz.UTC)
1918 loop = asyncio.get_running_loop()
1919 loop.create_task(
1920 number_samples_async_collection.insert_many(
1921 [
1922 {
1923 "timestamp": now,
1924 "signal_id": signal_id,
1925 "number_samples": number_samples,
1926 }
1927 for signal_id, number_samples in number_samples_by_id.items()
1928 ]
1929 )
1930 )
1932 return number_samples_by_id
1934 def sample_datasize(self):
1935 return signals_database.command("collstats", self.signal_id)["size"]
1937 @classmethod
1938 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1939 result = cls.collection().aggregate(
1940 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1941 )
1943 return {signal["signal_id"]: signal["forcible"] for signal in result}
1946class ForcedSignal(GenericMongo):
1947 collection_name: ClassVar[str] = "forced_signals"
1949 signal_id: str
1950 forcing_user_id: str
1951 forced_at: float
1952 value: str | float
1954 def insert(self):
1955 insert_result = self.collection().find_one_and_update(
1956 {"signal_id": self.signal_id},
1957 {"$set": self.to_dict(exclude={"id"})},
1958 upsert=True,
1959 return_document=ReturnDocument.AFTER,
1960 )
1961 self.id = str(insert_result["_id"])
1962 return self.id
1964 @classmethod
1965 def can_force(cls, signal_id: str, current_user: User) -> bool:
1966 """Checks whether user can force a given signal.
1968 :param signal_id: Signal ID of the signal to force
1969 :type signal_id: str
1970 :param current_user: Current user
1971 :type current_user: User
1972 :return: False if the signal was forced by someone else than the user, True otherwise
1973 :rtype: bool
1974 """
1975 forced_signal = cls.get_one_by_attribute("signal_id", signal_id)
1976 if forced_signal is not None:
1977 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
1978 return False
1979 return True
1982class ServicesStatus(TwinPadModel):
1983 backend: str
1984 cloud_broker: str
1985 time_series_database: str
1986 signal_storage: str
1987 heartbeat_storage: str
1988 data_analyzer: str
1990 @classmethod
1991 def check(cls) -> Self:
1992 return cls(
1993 cloud_broker=ping(RABBITMQ_HOST),
1994 backend="up",
1995 time_series_database=ping(MONGO_HOST),
1996 signal_storage=ping(SIGNAL_STORAGE_HOST),
1997 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1998 data_analyzer=ping(DATA_ANALYZER_HOST),
1999 )
2002def ping(host):
2003 try:
2004 if ping3.ping(host, timeout=0.8):
2005 return "up"
2006 except PermissionError:
2007 pass
2008 return "down"
2011class Event(GenericMongo):
2012 collection_name: ClassVar[str] = "events"
2014 name: str
2015 timestamp: float
2016 event_rule_id: str
2018 @computed_field
2019 @cached_property
2020 def event_rule(self) -> "EventRule":
2021 return EventRule.get_from_id(self.event_rule_id)
2023 @classmethod
2024 def dict_to_object(cls, dict_):
2025 """Refine to convert timestamp to datetime for mongodb."""
2026 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
2027 return super().dict_to_object(dict_)
2030class TwinPadActivity(GenericMongo):
2031 timestamp: float
2032 amount: int
2034 @classmethod
2035 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
2036 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2037 number_events_collection = get_collection(systems_database, "number_events")
2038 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
2039 items = []
2040 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2041 if number_events_collection is None or recompute_amount:
2042 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
2043 number_events_collection.delete_many({})
2044 first_event = events_collection.find_one(sort={"timestamp": 1})
2045 if first_event is None:
2046 return items
2047 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
2048 tzinfo=pytz.UTC
2049 )
2050 while last_computed_day < TODAY:
2051 day_nb_events = events_collection.count_documents(
2052 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2053 )
2054 if day_nb_events > 0:
2055 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
2056 last_computed_day += ONE_DAY_OFFSET
2057 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
2058 if number_events_today > 0:
2059 number_events_collection.delete_many({"timestamp": TODAY})
2060 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
2061 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2062 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2063 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2064 for day in number_events:
2065 day["timestamp"] = day["timestamp"].timestamp()
2066 items.append(cls.mongo_dict_to_object(day))
2067 return items
2069 @classmethod
2070 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2071 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2072 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
2073 signals_number_samples_collection = get_collection(
2074 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
2075 )
2076 items = []
2077 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2078 if number_samples_collection is None or recompute_amount:
2079 number_samples_collection = get_collection(
2080 systems_database, "number_received_samples", create=True, time_series=True
2081 )
2082 number_samples_collection.delete_many({})
2083 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
2084 if first_sample is None:
2085 return items
2086 # compute from day of first found event
2087 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
2088 tzinfo=pytz.UTC
2089 )
2090 while last_computed_day < TODAY:
2091 number_samples_request = signals_number_samples_collection.aggregate(
2092 [
2093 {
2094 "$match": {
2095 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
2096 }
2097 },
2098 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2099 ]
2100 ).to_list()
2101 if len(number_samples_request) == 0:
2102 number_samples = 0
2103 else:
2104 number_samples = number_samples_request[0].get("number_samples", 0)
2105 if number_samples > 0:
2106 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
2107 last_computed_day += ONE_DAY_OFFSET
2108 number_samples_request = signals_number_samples_collection.aggregate(
2109 [
2110 {"$match": {"timestamp": {"$gte": TODAY}}},
2111 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2112 ]
2113 ).to_list()
2114 if len(number_samples_request) == 0:
2115 number_samples_today = 0
2116 else:
2117 number_samples_today = number_samples_request[0].get("number_samples", 0)
2118 if number_samples_today > 0:
2119 number_samples_collection.delete_many({"timestamp": TODAY})
2120 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
2121 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2122 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2123 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2124 for day in number_events:
2125 day["timestamp"] = day["timestamp"].timestamp()
2126 items.append(cls.mongo_dict_to_object(day))
2127 return items
2129 @classmethod
2130 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2131 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2132 number_commands_collection = get_collection(systems_database, "number_commands")
2133 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
2134 items = []
2135 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2136 if number_commands_collection is None or recompute_amount:
2137 number_commands_collection = get_collection(
2138 systems_database, "number_commands", create=True, time_series=True
2139 )
2140 number_commands_collection.delete_many({})
2141 first_command = commands_collection.find_one(sort={"timestamp": 1})
2142 if first_command is None:
2143 return items
2144 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
2145 tzinfo=pytz.UTC
2146 )
2147 while last_computed_day < TODAY:
2148 day_nb_commands = commands_collection.count_documents(
2149 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2150 )
2151 if day_nb_commands > 0:
2152 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
2153 last_computed_day += ONE_DAY_OFFSET
2154 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
2155 if number_commands_today > 0:
2156 number_commands_collection.delete_many({"timestamp": TODAY})
2157 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
2158 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2159 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2160 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2161 for day in number_commands:
2162 day["timestamp"] = day["timestamp"].timestamp()
2163 items.append(cls.mongo_dict_to_object(day))
2164 return items
2167class EventRule(GenericMongo):
2168 collection_name: ClassVar[str] = "event_rules"
2170 name: str
2171 formula: str
2172 variables: list[str]
2174 @computed_field
2175 @cached_property
2176 def number_events(self) -> int:
2177 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
2180class Company(GenericMongo):
2181 collection_name: ClassVar[str] = "companies"
2182 name: str
2185class Campaign(GenericMongo):
2186 collection_name: ClassVar[str] = "campaigns"
2188 # Properties
2189 id: str | None = None
2190 name: str
2191 description: str | None = None
2194class Phase(GenericMongo):
2195 collection_name: ClassVar[str] = "phases"
2197 # Properties
2198 id: str | None = None
2199 name: str
2200 description: str | None = None
2201 start_at: float
2202 end_at: float
2204 # FK
2205 campaign_id: MongoId
2207 @classmethod
2208 def deleteMany(cls, campaign_id):
2209 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
2210 return delete_phases
2213class CustomViewCreation(GenericMongo):
2214 collection_name: ClassVar[str] = "custom_views"
2216 name: str
2217 configuration: list
2220class CustomView(CustomViewCreation):
2221 # Properties
2222 id: str | None = None
2224 # Foreign Key
2225 user_id: str
2228CustomViewUpdate = create_update_model(CustomView)
2231class Video(GenericMongo):
2232 collection_name: ClassVar[str] = "videos"
2234 # Properties
2235 name: str
2236 ip_addr: str
2237 username: str | None = None
2238 password: str | None = None
2240 # Methods
2241 @classmethod
2242 def get_all(cls, sort_by="_id") -> list[Self]:
2243 items = []
2244 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
2245 items.append(cls.mongo_dict_to_object(dict_))
2246 return items
2248 @classmethod
2249 def get_video(cls, camera_id: ObjectId):
2250 camera = cls.get_from_id(camera_id)
2251 if camera is not None:
2252 return camera.name
2253 return None
2256class Command(GenericMongo):
2257 collection_name: ClassVar[str] = "commands"
2259 # Properties
2260 timestamp: datetime.datetime = None
2261 sent_at: float
2262 response_time: float = 0.0
2263 command_type: str
2264 description: str = ""
2265 succeeded: bool = False
2267 # Foreign key
2268 user_id: str
2270 @classmethod
2271 def collection(cls):
2272 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
2274 @classmethod
2275 def create(cls, command: Self):
2276 command = cls(
2277 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
2278 sent_at=command.sent_at,
2279 response_time=command.response_time,
2280 command_type=command.command_type,
2281 description=command.description,
2282 succeeded=command.succeeded,
2283 user_id=command.user_id,
2284 )
2285 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
2286 if new_command is None:
2287 return None
2288 return {"command_id": str(new_command.inserted_id)}
2290 def receive_response(self, response: dict):
2291 self.response_time = time.time() - self.sent_at
2292 self.succeeded = response.get("error", True) is False
2293 if self.description == "":
2294 self.description += response.get("message", "").rstrip()
2297class SignalsPresetCreation(GenericMongo):
2298 name: str
2299 signal_ids: list[str]
2302class SignalsPreset(SignalsPresetCreation):
2303 collection_name: ClassVar[str] = "signals_presets"
2305 user_id: str
2307 @classmethod
2308 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
2309 signals_preset = cls(
2310 user_id=user_id,
2311 name=signals_preset.name,
2312 signal_ids=signals_preset.signal_ids,
2313 )
2315 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
2317 return str(new_signal_preset.inserted_id)
2320SignalsPresetUpdate = create_update_model(SignalsPreset)
2323class LineStyle(str, Enum):
2324 solid = "solid"
2325 dotted = "dotted"
2326 dashed = "dashed"
2329class SignalAppearance:
2330 value_color: str
2331 forced_value_color: str
2334class GraphThemeCreation(GenericMongo):
2335 collection_name: ClassVar[str] = "graph_themes"
2337 name: str
2338 signal_id: str
2339 value_color: str = ""
2340 forced_value_color: str = ""
2341 value_line_style: LineStyle = LineStyle.solid
2342 forced_value_line_style: LineStyle = LineStyle.solid
2343 private: bool = True
2346class PublicGraphTheme(GraphThemeCreation):
2347 created_by_user: bool
2348 in_user_library: bool
2349 active_for_user: bool
2351 _current_user_id: str = ""
2353 @classproperty
2354 def custom_pipeline_steps(cls) -> dict[str, list]:
2355 return {
2356 "created_by_user": [
2357 {
2358 "$addFields": {
2359 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
2360 }
2361 }
2362 ],
2363 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
2364 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
2365 ],
2366 "in_user_library": [
2367 {
2368 "$addFields": {
2369 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2370 }
2371 }
2372 ],
2373 "active_for_user": [
2374 {
2375 "$addFields": {
2376 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2377 }
2378 }
2379 ],
2380 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2381 "active": [
2382 {
2383 "$addFields": {
2384 "active": "$$REMOVE",
2385 }
2386 }
2387 ],
2388 "creator_id": [
2389 {
2390 "$addFields": {
2391 "creator_id": "$$REMOVE",
2392 }
2393 }
2394 ],
2395 }
2397 @classmethod
2398 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2399 cls._current_user_id = user_id
2400 return super().response_from_query(query)
2402 @classmethod
2403 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2404 query.in_user_library = "true"
2405 return cls.response_from_query(query, user_id)
2407 @classmethod
2408 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2409 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2411 @classmethod
2412 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2413 cls._current_user_id = user_id
2414 return super().get_by_attribute(attribute_name, attribute_value)
2416 @classmethod
2417 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2418 cls._current_user_id = user_id
2419 return super().get_one_by_attribute(attribute_name, attribute_value)
2421 @classmethod
2422 def get_all(cls, sort_by: str, user_id: str):
2423 cls._current_user_id = user_id
2424 return super().get_all(sort_by)
2426 @classmethod
2427 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2428 pipeline = [
2429 {
2430 "$match": {
2431 "active": {"$eq": user_id},
2432 "signal_id": {"$in": signal_ids},
2433 }
2434 },
2435 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2436 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2437 {
2438 "$project": {
2439 "_id": 0,
2440 "signal_id": 1,
2441 "value_color": 1,
2442 "forced_value_color": 1,
2443 "value_line_style": 1,
2444 "forced_value_line_style": 1,
2445 }
2446 },
2447 ]
2449 result = {}
2451 cursor = cls.collection().aggregate(pipeline)
2452 for document in cursor:
2453 signal_id = document["signal_id"]
2454 del document["signal_id"]
2455 result[signal_id] = document
2457 return result
2460GraphThemeUpdate = create_update_model(PublicGraphTheme)
2463class PrivateGraphTheme(GraphThemeCreation):
2464 # private
2465 creator_id: str
2466 in_library: list[str]
2467 active: list[str]
2469 @classmethod
2470 def create(
2471 cls,
2472 creator_id: str,
2473 name: str,
2474 signal_id: str,
2475 value_color: str,
2476 forced_value_color: str,
2477 value_line_style: LineStyle,
2478 forced_value_line_style: LineStyle,
2479 private: bool,
2480 ):
2481 color_setting = cls(
2482 creator_id=creator_id,
2483 name=name,
2484 signal_id=signal_id,
2485 value_color=value_color,
2486 forced_value_color=forced_value_color,
2487 value_line_style=value_line_style,
2488 forced_value_line_style=forced_value_line_style,
2489 private=private,
2490 in_library=[creator_id],
2491 active=[creator_id],
2492 )
2494 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2495 color_setting.id = str(new_color_setting.inserted_id)
2496 return color_setting
2498 def update(self, update_dict: dict, user_id: str):
2499 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2500 if in_user_lib and user_id not in self.in_library:
2501 self.in_library.append(user_id)
2502 elif not in_user_lib and user_id in self.in_library:
2503 self.in_library.remove(user_id)
2504 update_dict["in_library"] = self.in_library
2505 del update_dict["in_user_library"]
2507 if (active_for_user := update_dict.get("active_for_user")) is not None:
2508 if active_for_user and user_id not in self.active:
2509 self.active.append(user_id)
2510 elif not active_for_user and user_id in self.active:
2511 self.active.remove(user_id)
2512 update_dict["active"] = self.active
2513 del update_dict["active_for_user"]
2515 if update_dict.get("created_by_user") is not None:
2516 del update_dict["created_by_user"]
2518 self.collection().find_one_and_update(
2519 {"_id": ObjectId(self.id)},
2520 {"$set": update_dict},
2521 )
2523 return {}
2526class DeviceStatus(str, Enum):
2527 started = "started"
2528 running = "running"
2529 created = "created"
2530 exited = "exited"
2531 restarting = "restarting"
2534class DeviceUpdateFromDeployer(BaseModel):
2535 status: DeviceStatus
2538class DeviceFromDeployerCreation(BaseModel):
2539 name: str
2540 description: str
2543class DeviceFromDeployer(DeviceFromDeployerCreation):
2544 status: DeviceStatus
2545 device_id: DeviceId
2546 logs: str = ""
2549class DeviceDeployer(GenericMongo):
2550 collection_name: ClassVar[str] = "device_deployers"
2551 url: HttpUrl
2553 def endpoint_url(self, endpoint):
2554 return f"{str(self.url).rstrip('/')}/{endpoint}"
2556 def devices(self) -> list[DeviceFromDeployer]:
2557 devices = []
2558 try:
2559 response = requests.get(self.endpoint_url("devices"))
2560 except requests.exceptions.ConnectionError:
2561 logger.info("connection error")
2562 return None
2563 if response.status_code != 200:
2564 return None
2565 for device_dict in response.json()["devices"]:
2566 devices.append(
2567 DeviceFromDeployer(
2568 device_id=device_dict["device_id"],
2569 name=device_dict["container_name"],
2570 description="desc",
2571 status=device_dict["status"],
2572 logs=device_dict["logs"],
2573 )
2574 )
2575 return devices
2577 def get_device(self, device_id: DeviceId):
2578 try:
2579 response = requests.get(self.endpoint_url(f"devices/{device_id}"))
2580 except requests.exceptions.ConnectionError:
2581 return None
2582 if response.status_code != 200:
2583 return None
2584 device_dict = response.json()
2585 return DeviceFromDeployer(
2586 device_id=device_dict["device_id"],
2587 name=device_dict["container_name"],
2588 description="desc",
2589 status=device_dict["status"],
2590 logs=device_dict["logs"],
2591 )
2593 def create_device(self, device: DeviceFromDeployer) -> Device | None:
2594 try:
2595 response = requests.post(self.endpoint_url("devices"), json={"name": device.name})
2596 except requests.exceptions.ConnectionError:
2597 return None
2599 if response.status_code != 201:
2600 return None
2602 device_dict = response.json()
2603 return DeviceFromDeployer(
2604 device_id=device_dict["device_id"],
2605 name="",
2606 description="desc",
2607 status=device_dict["status"],
2608 )
2610 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None:
2611 try:
2612 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump())
2613 except requests.exceptions.ConnectionError:
2614 return None
2616 if response.status_code != 200:
2617 return None
2619 device_dict = response.json()
2620 return Device(
2621 device_id=device_dict["device_id"],
2622 name="",
2623 description="desc",
2624 pid={},
2625 petri_network={},
2626 modes=[],
2627 status=device_dict["status"],
2628 )
2630 def delete_device(self, device_id: DeviceId) -> DeleteInfo:
2631 try:
2632 response = requests.delete(self.endpoint_url(f"devices/{device_id}"))
2633 except requests.exceptions.ConnectionError:
2634 return DeleteInfo(is_deleted=False, detail="Connection to deployer error")
2635 if response.status_code not in [200, 202, 204]:
2636 return DeleteInfo(is_deleted=False, detail=response.text)
2638 return DeleteInfo(is_deleted=True, detail="")
2641DeviceDeployerUpdate = create_update_model(DeviceDeployer)