Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%
1367 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-17 13:11 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-17 13:11 +0000
1from functools import cached_property
2import os
3import re
4import io
5import time
6import csv
7from typing import Self, ClassVar, Any, Literal, get_args, Annotated
8import datetime
9import math
10import bisect
11from enum import Enum
12import logging
13import copy
14import asyncio
15import requests
17import zipfile
18import ping3
19import pytz
20from bson.objectid import ObjectId
21from pymongo import ASCENDING, ReturnDocument
22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator
23import numpy as npy
24import lttb
25import h5py
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 get_signal_collections_batch,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 signals_async_database,
36 devices_states_database,
37)
38from twinpad_backend.responses import ListResponse
39from twinpad_backend.messages import RabbitMQClient
40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm
42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"]
44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"]
45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"]
48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
55NUMBER_SAMPLES_DATABASE_UPDATE = 120
57logger = logging.getLogger("uvicorn.error")
60class DeleteInfo(BaseModel):
61 is_deleted: bool
62 detail: str
65class classproperty:
66 """
67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
68 Found here: https://stackoverflow.com/a/76301341
69 """
71 def __init__(self, func):
72 self.fget = func
74 def __get__(self, _, owner):
75 return self.fget(owner)
78def create_update_model(model):
79 fields = {}
81 for field_name, field_annotation in model.model_fields.items():
82 if field_name != "id":
83 fields[field_name] = (field_annotation.annotation | None, None)
85 query_name = model.__name__ + "Update"
86 return create_model(query_name, **fields)
89def get_utc_date_from_timestamp(timestamp: float):
90 return datetime.datetime.fromtimestamp(timestamp).isoformat()
93def downsample_list(time_vector: list, values: list, max_number_samples: int):
94 if len(time_vector) < max_number_samples:
95 return time_vector, values
97 time_vector_copy = copy.deepcopy(time_vector)
98 values_copy = copy.deepcopy(values)
100 none_group_bounds = []
101 none_group_index = -1
102 index = -1
103 # LTTB doesn't handle None values so remove them
104 while values_copy.count(None) > 0:
105 # Store bounds of None value groups so we can insert them back after the downsampling
106 if (new_index := values_copy.index(None)) != index:
107 none_group_bounds.append([time_vector_copy.pop(new_index)])
108 none_group_index += 1
109 elif len(none_group_bounds[none_group_index]) < 2:
110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
111 else:
112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
113 values_copy.pop(new_index)
114 index = new_index
115 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
117 try:
118 values_array = npy.array([time_vector_copy, values_copy]).T
119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
121 new_time_vector = interpolated_values[:, 0].tolist()
122 new_values = interpolated_values[:, 1].tolist()
123 except ValueError:
124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
128 return new_time_vector, new_values_nan_to_none
130 # insert back None values at the correct timestamps
131 for none_group in none_group_bounds:
132 start_index = npy.searchsorted(new_time_vector, none_group[0])
133 new_time_vector[start_index:start_index] = none_group
134 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
136 return new_time_vector, new_values
139def is_of_type(value, wanted_type):
140 if wanted_type is float:
141 return isinstance(value, (int, float))
142 return isinstance(value, wanted_type)
145# Models
146class TwinPadModel(BaseModel):
147 @classmethod
148 def dict_to_object(cls, dict_):
149 return cls.model_validate(dict_)
151 def to_dict(self, exclude=None):
152 dict_ = self.model_dump(exclude=exclude, mode="json")
153 return dict_
156def validate_mongo_id(v):
157 if not ObjectId.is_valid(v):
158 raise ValueError("Invalid MongoDB id")
159 return str(v)
162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)]
165def validate_12_hex(v: str) -> str:
166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v):
167 raise ValueError("ID must be a 12-character hexadecimal string")
168 return v
171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)]
174class GenericMongo(TwinPadModel):
175 id: MongoId | None = None
176 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
178 @classmethod
179 def collection(cls):
180 return get_collection(systems_database, cls.collection_name, create=True)
182 @classmethod
183 def response_from_query(cls, query) -> ListResponse[Self]:
184 request_filters = query.mongodb_filter()
185 items = []
187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
188 sort_dict = {}
189 for sort in query.sort_by.split(","):
190 if ":" in sort:
191 sort_field, sort_order = sort.split(":")
192 sort_order = int(sort_order)
193 else:
194 sort_field = sort
195 sort_order = 1
196 sort_dict[sort_field] = sort_order
198 collection = get_collection(systems_database, cls.collection_name, create=True)
199 total = collection.count_documents(request_filters)
201 pipeline = []
202 added_properties = []
203 if "$and" in request_filters:
204 for request_filter in request_filters["$and"]:
205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
206 if filtered_property in request_filter:
207 pipeline.extend(pipeline_steps)
208 added_properties.append(filtered_property)
209 else:
210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
211 if filtered_property in request_filters:
212 pipeline.extend(pipeline_steps)
213 added_properties.append(filtered_property)
214 pipeline.append({"$match": request_filters})
216 for sort_field in sort_dict.keys():
217 if sort_field in cls.custom_pipeline_steps:
218 pipeline.extend(cls.custom_pipeline_steps[sort_field])
219 added_properties.append(sort_field)
220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
222 if (query.limit is not None) and (query.limit != 0):
223 pipeline.append({"$limit": query.limit})
225 for filtered_property, step in cls.custom_pipeline_steps.items():
226 if filtered_property not in added_properties:
227 pipeline.extend(step)
229 cursor = collection.aggregate(pipeline)
231 for item_dict in cursor:
232 items.append(cls.mongo_dict_to_object(item_dict))
234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
236 @classmethod
237 def get_from_id(cls, item_id) -> Self | None:
238 return cls.get_one_by_attribute("_id", ObjectId(item_id))
240 @classmethod
241 def mongo_dict_to_object(cls, mongo_dict):
242 mongo_dict["id"] = str(mongo_dict["_id"])
243 del mongo_dict["_id"]
244 return cls.dict_to_object(mongo_dict)
246 @classmethod
247 def get_by_attribute(cls, attribute_name: str, attribute_value):
248 """Returns all items that match the attribute with value."""
249 pipeline = []
250 if attribute_name in cls.custom_pipeline_steps:
251 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
252 pipeline.append({"$match": {attribute_name: attribute_value}})
253 for key, step in cls.custom_pipeline_steps.items():
254 if key != attribute_name:
255 pipeline.extend(step)
256 items = cls.collection().aggregate(pipeline)
257 if items is None:
258 return None
259 return [cls.mongo_dict_to_object(d) for d in items]
261 @classmethod
262 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
263 pipeline = []
264 if attribute_name in cls.custom_pipeline_steps:
265 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
266 pipeline.append({"$match": {attribute_name: attribute_value}})
267 pipeline.append({"$limit": 1})
268 for key, step in cls.custom_pipeline_steps.items():
269 if key != attribute_name:
270 pipeline.extend(step)
271 items = cls.collection().aggregate(pipeline).to_list()
272 if len(items) == 0:
273 return None
274 return cls.mongo_dict_to_object(items[0])
276 @classmethod
277 def get_all(cls, sort_by="_id") -> list[Self]:
278 items = []
279 pipeline = []
280 if sort_by in cls.custom_pipeline_steps:
281 pipeline.extend(cls.custom_pipeline_steps[sort_by])
282 pipeline.append({"$sort": {sort_by: ASCENDING}})
283 for key, step in cls.custom_pipeline_steps.items():
284 if key != sort_by:
285 pipeline.extend(step)
286 for dict_ in cls.collection().aggregate(pipeline):
287 items.append(cls.mongo_dict_to_object(dict_))
288 return items
290 @classmethod
291 def get_number_documents(cls):
292 collection = get_collection(systems_database, cls.collection_name)
293 if collection is None:
294 return 0
295 return collection.count_documents(
296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]}
297 )
299 def insert(self):
300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"}))
301 self.id = str(insert_result.inserted_id)
302 return self.id
304 def update(self, update_dict):
305 for key, value in update_dict.items():
306 setattr(self, key, value)
307 self.collection().find_one_and_update(
308 {"_id": ObjectId(self.id)},
309 {"$set": update_dict},
310 return_document=ReturnDocument.AFTER,
311 )
313 return self
315 def delete(self):
316 result = self.collection().delete_one({"_id": ObjectId(self.id)})
317 return result.deleted_count > 0
320class User(GenericMongo):
321 collection_name: ClassVar[str] = "users"
323 firstname: str
324 lastname: str
325 email: str
326 password: str
327 is_active: bool | None = False
328 is_admin: bool | None = False
329 is_connected: bool | None = False
330 company_id: str | None = None
332 def to_dict(self, exclude=None):
333 if exclude is None:
334 exclude = {"password"}
335 return GenericMongo.to_dict(self, exclude=exclude)
337 @classmethod
338 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
339 users = cls.get_all()
340 if not users:
341 is_admin = True
342 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
343 user_collection = get_collection(systems_database, "users", create=True)
344 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
345 if new_user is None:
346 return None
347 return {"user_id": str(new_user.inserted_id)}
349 @classmethod
350 def update_info(cls, user: "UserUpdate", user_id: str):
351 updated_user = cls.collection().find_one_and_update(
352 {"_id": ObjectId(user_id)},
353 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
354 return_document=ReturnDocument.AFTER,
355 )
356 updated_user["id"] = str(updated_user["_id"])
357 del (updated_user["_id"], updated_user["is_connected"])
358 return cls(**updated_user)
361UserUpdate = create_update_model(User)
364class Mode(TwinPadModel):
365 mode_id: int
366 name: str
367 frequency_multiplier: float
368 min_frequency: float
371class DeviceUpdate(TwinPadModel):
372 mode_id: int
375class Device(GenericMongo):
376 collection_name: ClassVar[str] = "devices"
378 device_id: DeviceId
379 name: str
380 description: str = ""
381 modes: list[Mode]
382 current_mode_id: int | None = None
383 last_ping: float | None = None
384 petri_network: Any
385 pid: Any
386 load: float | None = None
387 tokens: list[int] = Field(default_factory=list)
388 status: str
390 async def change_mode(self, update_dict, current_user: User):
391 has_error = False
393 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
394 has_error = True
395 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
396 elif self.current_mode_id is not None:
397 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
398 else:
399 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
400 command = Command(
401 sent_at=time.time(),
402 command_type="Mode change",
403 description=description,
404 user_id=current_user.id,
405 )
407 if has_error:
408 command.response_time = 0
409 command.succeeded = False
410 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
411 else:
412 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id)
413 command.receive_response(response)
415 Command.create(command)
416 return response
418 @classmethod
419 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
420 devices_by_id = {}
421 for signal_id in signal_ids:
422 device_id = signal_id.split(".")[0]
423 if device_id not in devices_by_id:
424 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
425 return devices_by_id
428class DeviceSetup(GenericMongo):
429 collection_name: ClassVar[str] = "device_setups"
431 device_ids: list[str]
432 active: bool = False
433 variable_mapping: dict[str, str]
436DeviceSetupUpdate = create_update_model(DeviceSetup)
439class DeviceState(GenericMongo):
440 collection_name: ClassVar[str] = "devices_states"
442 timestamp: float
443 mode: str | None = None
444 load: float | None = None
445 tokens: list[int] = Field(default_factory=list)
446 modified_properties: list[str] = Field(default_factory=list)
448 @classmethod
449 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]:
450 req_filter = query.mongodb_filter()
451 items = []
452 if ":" in query.sort_by:
453 sort_field, sort_order = query.sort_by.split(":")
454 sort_order = int(sort_order)
455 else:
456 sort_field = query.sort_by
457 sort_order = 1
458 collection = get_collection(devices_states_database, device_id)
459 if collection is None:
460 total = 0
461 cursor = []
462 else:
463 total = collection.count_documents(req_filter)
464 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
465 if (query.limit is not None) and (query.limit != 0):
466 cursor = cursor.limit(query.limit)
467 for item_dict in cursor:
468 items.append(
469 cls(
470 timestamp=item_dict.get("precise_timestamp"),
471 mode=item_dict.get("mode", None),
472 load=item_dict.get("load", None),
473 tokens=item_dict.get("tokens", Field(default_factory=list)),
474 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
475 )
476 )
477 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
480class SignalSample(TwinPadModel):
481 signal_id: str
482 timestamp: float
483 value: float | int | str | bool | None
484 forced_value: float | int | str | bool | None = None
486 @classmethod
487 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
489 collection = get_signal_collection(signal_id)
490 if collection is None:
491 return None
493 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
494 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
495 bucket = get_signal_collection(f"system.buckets.{signal_id}")
496 first_bucket = None
497 if bucket is not None:
498 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
499 if first_bucket is not None:
500 sample_data = collection.find_one(
501 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
502 )
503 else:
504 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
506 if sample_data is None:
507 return None
509 timestamp = sample_data["precise_timestamp"]
511 return cls(
512 signal_id=signal_id,
513 timestamp=timestamp,
514 value=sample_data.get("value", None),
515 forced_value=sample_data.get("forced_value", None),
516 )
518 @classmethod
519 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
520 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
522 @classmethod
523 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
524 collection = get_signal_collection(signal_id)
525 if collection is None:
526 return None
528 # Same workaround as above function, very effective to narrow down big sets of data
529 bucket = get_signal_collection(f"system.buckets.{signal_id}")
530 last_bucket = None
531 if bucket is not None:
532 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
533 if last_bucket is not None:
534 sample_data = collection.find_one(
535 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
536 sort={"precise_timestamp": -1},
537 )
538 else:
539 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
541 if sample_data is None:
542 return None
544 timestamp = sample_data["precise_timestamp"]
546 if device is None:
547 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
548 if device is not None and device.last_ping is not None:
549 if timestamp is None:
550 timestamp = device.last_ping
551 else:
552 timestamp = max(timestamp, device.last_ping)
553 return cls(
554 signal_id=signal_id,
555 timestamp=timestamp,
556 value=sample_data.get("value", None),
557 forced_value=sample_data.get("forced_value", None),
558 )
560 @classmethod
561 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
562 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
563 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
566class SignalData(TwinPadModel):
567 signal_id: str
568 forcible: bool = True
569 time_vector: list[float]
570 values: list[float | int | str | None]
571 forced_values: list[float | int | str | None]
573 data_start: float | None = None
574 data_end: float | None = None
576 number_samples: int = 0
577 number_samples_db: int = 0
579 db_query_time: float = 0.0
580 init_time: float = 0.0
581 data_processing_time: float = 0.0
583 phase_id: str | None = None
585 @classmethod
586 def get_from_signal_id(
587 cls,
588 signal_id: str,
589 min_timestamp: float = None,
590 max_timestamp: float = None,
591 window_min_timestamp: float = None,
592 window_max_timestamp: float = None,
593 interpolate_bounds: bool = True,
594 max_documents: int = None,
595 ) -> Self:
597 now = time.time()
599 req_signal = {}
600 if min_timestamp is not None:
601 req_signal.setdefault("timestamp", {})
602 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
603 if max_timestamp is not None:
604 req_signal.setdefault("timestamp", {})
605 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
607 collection = get_signal_collection(signal_id)
608 if collection is None:
609 return cls(
610 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
611 )
613 db_req_start = time.time()
615 sort_step = {"$sort": {"precise_timestamp": 1}}
616 number_results = collection.count_documents(req_signal)
618 pipeline = []
619 if req_signal:
620 pipeline.append({"$match": req_signal}) # Filter data if needed
622 pipeline.extend(
623 [
624 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
625 sort_step,
626 ]
627 )
629 if max_documents is not None and max_documents < number_results:
630 unsampling_ratio = math.ceil(number_results / max_documents)
631 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
632 pipeline.extend(
633 [
634 {
635 "$setWindowFields": {
636 "sortBy": {"precise_timestamp": 1},
637 "output": {"index": {"$documentNumber": {}}},
638 }
639 },
640 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
641 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
642 {"$replaceRoot": {"newRoot": "$doc"}},
643 {"$unset": ["index", "group_id"]},
644 {"$sort": {"precise_timestamp": 1}},
645 ]
646 )
648 # logger.info(f"pipeline: %s", str(pipeline))
649 cursor = collection.aggregate(pipeline)
650 db_req_time = time.time() - db_req_start
652 init_time = time.time()
654 results = cursor.to_list()
655 time_vector = []
656 values = []
657 forced_values = []
658 for s in results:
659 time_vector.append(s["precise_timestamp"])
660 values.append(s.get("value", None))
661 forced_values.append(s.get("forced_value", None))
663 signal = Signal.get_from_signal_id(signal_id)
664 if signal is None:
665 return cls(
666 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
667 )
668 class_ = signal.signal_data_class
670 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
671 time_vector, values, forced_values = cls.interpolate_bounds(
672 class_,
673 collection,
674 signal_id,
675 time_vector,
676 values,
677 forced_values,
678 window_min_timestamp,
679 window_max_timestamp,
680 )
682 if values:
683 # TODO: check below. a bit strange
684 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
685 # Adding last value as it should be repeated
686 time_vector.append(now)
687 values.append(values[-1])
688 forced_values.append(forced_values[-1])
690 init_time = time.time() - init_time
692 # See line 292 for explanation
693 bucket = get_signal_collection(f"system.buckets.{signal_id}")
694 first_bucket = None
695 if bucket is not None:
696 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
697 if first_bucket is not None:
698 data_start = first_bucket["control"]["min"]["precise_timestamp"]
699 else:
700 data_start = None
702 last_bucket = None
703 if bucket is not None:
704 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
705 if last_bucket is not None:
706 data_end = last_bucket["control"]["max"]["precise_timestamp"]
707 else:
708 data_end = None
710 return class_(
711 signal_id=signal_id,
712 forcible=signal.forcible,
713 time_vector=time_vector,
714 values=values,
715 forced_values=forced_values,
716 data_start=data_start,
717 data_end=data_end,
718 number_samples=len(values),
719 number_samples_db=number_results,
720 db_query_time=db_req_time,
721 init_time=init_time,
722 )
724 @staticmethod
725 def interpolate_bounds(
726 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
727 ):
728 sample_right = None
729 # Fetching right side value & interpolation
730 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
731 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
732 sample_right = collection.find_one(
733 {
734 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
735 "value": {"$exists": True},
736 },
737 sort={"precise_timestamp": -1},
738 )
739 if sample_right:
740 if time_vector:
741 right_sd = class_(
742 signal_id=signal_id,
743 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
744 values=[values[-1], sample_right.get("value", None)],
745 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
746 )
747 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
748 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
749 else:
750 max_ts_value = sample_right.get("value", None)
751 max_ts_forced_value = sample_right.get("forced_value", None)
752 time_vector.append(window_max_timestamp)
753 values.append(max_ts_value)
754 forced_values.append(max_ts_forced_value)
756 # Fetching left side value & interpolation
757 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
758 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
759 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
760 sample_left = sample_right
761 sample_left = collection.find_one(
762 {
763 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
764 "value": {"$exists": True},
765 },
766 sort={"precise_timestamp": -1},
767 )
769 if sample_left:
770 if time_vector:
771 left_sd = class_(
772 signal_id=signal_id,
773 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
774 values=[sample_left["value"], values[0]],
775 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
776 )
777 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
778 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
779 else:
780 min_ts_value = sample_left.get("value", None)
781 min_ts_forced_value = sample_left.get("forced_value", None)
782 time_vector.insert(0, window_min_timestamp)
783 values.insert(0, min_ts_value)
784 forced_values.insert(0, min_ts_forced_value)
786 return time_vector, values, forced_values
788 def interpolate_values(self, new_time_vector: list[float]):
789 return self.interpolate(new_time_vector, self.values)
791 def interpolate_forced_values(self, new_time_vector: list[float]):
792 return self.interpolate(new_time_vector, self.forced_values)
794 def uniform_desampling(self, number_samples_max: int) -> Self:
795 data_processing_time = time.time()
796 if number_samples_max and self.number_samples > number_samples_max:
797 new_time_vector = npy.linspace(
798 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
799 ).tolist()
800 values = self.interpolate_values(new_time_vector)
801 forced_values = self.interpolate_forced_values(new_time_vector)
802 time_vector = new_time_vector
803 number_samples = len(time_vector)
804 else:
805 time_vector = self.time_vector
806 number_samples = len(self.values)
807 values = self.values[:]
808 forced_values = self.forced_values[:]
809 data_processing_time = time.time() - data_processing_time
811 return self.__class__(
812 signal_id=self.signal_id,
813 time_vector=time_vector,
814 values=values,
815 forced_values=forced_values,
816 number_samples=number_samples,
817 number_samples_db=self.number_samples,
818 data_start=self.data_start,
819 data_end=self.data_end,
820 db_query_time=self.db_query_time,
821 init_time=self.init_time,
822 data_processing_time=self.data_processing_time + data_processing_time,
823 phase_id=self.phase_id,
824 )
826 def min_max_downsampling(self, number_samples_max: int) -> Self:
827 return self.uniform_desampling(number_samples_max)
829 def interest_window_desampling(
830 self,
831 window_max_number_samples: int,
832 outside_max_number_samples: int,
833 window_min_timestamp: float | None = None,
834 window_max_timestamp: float | None = None,
835 ) -> Self:
836 """Performs a sampling in a window of interest and outside."""
838 if not self.time_vector:
839 return self
841 if window_min_timestamp is None:
842 window_min_timestamp = self.time_vector[0]
843 if window_max_timestamp is None:
844 window_max_timestamp = self.time_vector[-1]
846 data_processing_time = time.time()
848 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
849 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
851 time_vector_before = self.time_vector[:index_window_start]
852 time_vector_window = self.time_vector[index_window_start:index_window_end]
853 time_vector_after = self.time_vector[index_window_end:]
855 # Resampling window
856 if time_vector_window:
857 # Ensurring window bounds
858 if time_vector_window[0] != window_min_timestamp:
859 time_vector_window.insert(0, window_min_timestamp)
860 if time_vector_window[-1] != window_max_timestamp:
861 time_vector_window.append(window_max_timestamp)
862 else:
863 time_vector_window = [window_min_timestamp, window_max_timestamp]
865 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
866 # Resampling
867 new_window_time_vector = npy.linspace(
868 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
869 ).tolist()
870 time_vector_window = new_window_time_vector
872 # Resampling outside
873 number_samples_before = len(time_vector_before)
874 number_samples_after = len(time_vector_after)
875 if (
876 outside_max_number_samples is not None
877 and (number_samples_before + number_samples_after) > outside_max_number_samples
878 ):
879 new_number_samples_before = min(
880 number_samples_before,
881 math.ceil(
882 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
883 ),
884 )
885 new_number_samples_after = min(
886 number_samples_after,
887 math.ceil(
888 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
889 ),
890 )
891 # Adjusting numbers as math.ceil can do +1 on sum
892 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
893 if new_number_samples_before > new_number_samples_after:
894 new_number_samples_before -= 1
895 else:
896 new_number_samples_after -= 1
898 if new_number_samples_before > 0:
899 new_time_vector_before = npy.linspace(
900 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
901 ).tolist()
902 time_vector_before = new_time_vector_before
904 if new_number_samples_after > 0:
905 new_time_vector_after = npy.linspace(
906 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
907 ).tolist()[::-1]
908 time_vector_after = new_time_vector_after
910 new_time_vector = time_vector_before + time_vector_window + time_vector_after
911 values = self.interpolate_values(new_time_vector)
912 forced_values = self.interpolate_forced_values(new_time_vector)
913 number_samples = len(values)
915 data_processing_time = time.time() - data_processing_time
917 return self.__class__(
918 signal_id=self.signal_id,
919 forcible=self.forcible,
920 time_vector=new_time_vector,
921 values=values,
922 forced_values=forced_values,
923 number_samples=number_samples,
924 number_samples_db=self.number_samples,
925 data_start=self.data_start,
926 data_end=self.data_end,
927 db_query_time=self.db_query_time,
928 init_time=self.init_time,
929 data_processing_time=self.data_processing_time + data_processing_time,
930 )
932 def zero_time_vector(self, data_start: float):
933 data_processing_time = time.time()
934 if len(self.time_vector) == 0:
935 return self
936 time_vector = npy.array(self.time_vector) - data_start
937 data_processing_time = time.time() - data_processing_time
939 return self.__class__(
940 signal_id=self.signal_id,
941 time_vector=time_vector,
942 values=self.values,
943 forced_values=self.forced_values,
944 number_samples=self.number_samples,
945 number_samples_db=self.number_samples_db,
946 data_start=time_vector[0],
947 data_end=time_vector[-1],
948 db_query_time=self.db_query_time,
949 init_time=self.init_time,
950 data_processing_time=self.data_processing_time + data_processing_time,
951 )
953 def csv_export(self):
954 output = io.StringIO()
955 writer = csv.writer(output)
956 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
957 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
958 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
959 return output.getvalue().encode("utf-8")
961 def prestoplot_export(self):
962 clean_signal_id = self.signal_id.replace(".", "_")
963 if clean_signal_id[0].isnumeric():
964 clean_signal_id = "_" + clean_signal_id
966 output = io.StringIO()
967 output.write("# Encoding:\tUTF-8\n")
968 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
969 output.write("ISO8601\tnone\tnone\n")
970 output.write(f"# Description :\t{clean_signal_id}\n")
972 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
973 output.write(
974 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
975 )
976 return output.getvalue().encode("utf-8")
979class NumericSignalData(SignalData):
980 data_type: str = "float"
981 values: list[float | int | None]
982 forced_values: list[float | int | None]
984 def interpolate(self, new_time_vector: list[float], items):
985 items = [npy.nan if s is None else s for s in items]
986 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
988 def uniform_desampling(self, number_samples_max: int) -> Self:
989 data_processing_time = time.time()
990 if number_samples_max and self.number_samples > number_samples_max:
991 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
992 forced_values = self.interpolate_forced_values(time_vector)
993 number_samples = len(time_vector)
994 else:
995 time_vector = self.time_vector
996 number_samples = len(self.values)
997 values = self.values[:]
998 forced_values = self.forced_values[:]
999 data_processing_time = time.time() - data_processing_time
1001 return self.__class__(
1002 signal_id=self.signal_id,
1003 time_vector=time_vector,
1004 values=values,
1005 forced_values=forced_values,
1006 number_samples=number_samples,
1007 number_samples_db=self.number_samples,
1008 data_start=self.data_start,
1009 data_end=self.data_end,
1010 db_query_time=self.db_query_time,
1011 init_time=self.init_time,
1012 data_processing_time=self.data_processing_time + data_processing_time,
1013 )
1015 def min_max_downsampling(self, number_samples_max: int) -> Self:
1016 if self.number_samples < number_samples_max:
1017 return self
1019 data_processing_time = time.time()
1021 number_bins = number_samples_max // 2
1023 time_vector = npy.array(self.time_vector, dtype=npy.float64)
1024 values = npy.array(self.values, dtype=npy.float64)
1025 forced_values = npy.array(self.forced_values, dtype=npy.float64)
1027 points_per_bin = self.number_samples // number_bins
1029 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above
1030 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for
1031 if self.number_samples - number_bins * points_per_bin > 1:
1032 points_per_bin += 1
1033 number_bins = self.number_samples // points_per_bin + 1
1034 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan)
1035 time_vector = npy.concatenate([time_vector, nan_points_to_add])
1036 values = npy.concatenate([values, nan_points_to_add])
1037 forced_values = npy.concatenate([forced_values, nan_points_to_add])
1039 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin)
1040 values_matrix = values.reshape(number_bins, points_per_bin)
1041 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin)
1043 indexes_min = npy.zeros(number_bins, dtype="int64")
1044 indexes_max = npy.zeros(number_bins, dtype="int64")
1046 for row in range(number_bins):
1047 min_value = values_matrix[row, 0]
1048 max_value = values_matrix[row, 0]
1049 for column in range(points_per_bin):
1050 if values_matrix[row, column] < min_value:
1051 min_value = values_matrix[row, column]
1052 indexes_min[row] = column
1053 elif values_matrix[row, column] > max_value:
1054 max_value = values_matrix[row, column]
1055 indexes_max[row] = column
1057 row_index = npy.repeat(npy.arange(number_bins), 2)
1058 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel()
1060 data_processing_time = time.time() - data_processing_time
1062 new_time_vector = timestamps_matrix[row_index, column_index]
1063 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector)
1064 new_values = values_matrix[row_index, column_index]
1065 new_values = npy.where(npy.isnan(new_values), None, new_values)
1066 new_forced_values = forced_values_matrix[row_index, column_index]
1067 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1069 # Make sure there are no None values for the time vector
1070 time_vector_filter = new_time_vector != None
1071 new_time_vector = new_time_vector[time_vector_filter]
1072 new_values = new_values[time_vector_filter]
1073 new_forced_values = new_forced_values[time_vector_filter]
1075 return self.__class__(
1076 signal_id=self.signal_id,
1077 time_vector=new_time_vector,
1078 values=new_values,
1079 forced_values=new_forced_values,
1080 number_samples=number_bins * 2,
1081 number_samples_db=self.number_samples_db,
1082 data_start=self.data_start,
1083 data_end=self.data_end,
1084 db_query_time=self.db_query_time,
1085 init_time=self.init_time,
1086 data_processing_time=self.data_processing_time + data_processing_time,
1087 phase_id=self.phase_id,
1088 )
1090 def interest_window_desampling(
1091 self,
1092 window_max_number_samples: int,
1093 outside_max_number_samples: int,
1094 window_min_timestamp: float | None = None,
1095 window_max_timestamp: float | None = None,
1096 ) -> Self:
1097 """Performs a sampling in a window of interest and outside."""
1099 if not self.time_vector:
1100 return self
1102 if window_min_timestamp is None:
1103 window_min_timestamp = self.time_vector[0]
1104 if window_max_timestamp is None:
1105 window_max_timestamp = self.time_vector[-1]
1107 data_processing_time = time.time()
1109 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1110 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1112 time_vector_before = self.time_vector[:index_window_start]
1113 time_vector_window = self.time_vector[index_window_start:index_window_end]
1114 time_vector_after = self.time_vector[index_window_end:]
1116 values_before = self.values[:index_window_start]
1117 values_window = self.values[index_window_start:index_window_end]
1118 values_after = self.values[index_window_end:]
1119 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1120 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1122 # Resampling window
1123 if time_vector_window:
1124 # Ensurring window bounds
1125 if time_vector_window[0] != window_min_timestamp:
1126 time_vector_window.insert(0, window_min_timestamp)
1127 values_window.insert(0, window_min_value)
1128 if time_vector_window[-1] != window_max_timestamp:
1129 time_vector_window.append(window_max_timestamp)
1130 values_window.append(window_max_value)
1131 else:
1132 time_vector_window = [window_min_timestamp, window_max_timestamp]
1133 values_window = [window_min_value, window_max_value]
1135 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
1136 # Resampling
1137 time_vector_window, values_window = downsample_list(
1138 time_vector_window, values_window, window_max_number_samples
1139 )
1141 # Resampling outside
1142 number_samples_before = len(time_vector_before)
1143 number_samples_after = len(time_vector_after)
1144 if (
1145 outside_max_number_samples is not None
1146 and (number_samples_before + number_samples_after) > outside_max_number_samples
1147 ):
1148 new_number_samples_before = min(
1149 number_samples_before,
1150 math.ceil(
1151 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1152 ),
1153 )
1154 new_number_samples_after = min(
1155 number_samples_after,
1156 math.ceil(
1157 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1158 ),
1159 )
1160 # Adjusting numbers as math.ceil can do +1 on sum
1161 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1162 if new_number_samples_before > new_number_samples_after:
1163 new_number_samples_before -= 1
1164 else:
1165 new_number_samples_after -= 1
1167 if new_number_samples_before > 0:
1168 time_vector_before, values_before = downsample_list(
1169 time_vector_before, values_before, new_number_samples_before
1170 )
1172 if new_number_samples_after > 0:
1173 time_vector_after, values_after = downsample_list(
1174 time_vector_after, values_after, new_number_samples_after
1175 )
1177 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1178 values = values_before + values_window + values_after
1179 forced_values = self.interpolate_forced_values(new_time_vector)
1180 number_samples = len(values)
1182 data_processing_time = time.time() - data_processing_time
1184 return self.__class__(
1185 signal_id=self.signal_id,
1186 time_vector=new_time_vector,
1187 values=values,
1188 forced_values=forced_values,
1189 number_samples=number_samples,
1190 number_samples_db=self.number_samples,
1191 data_start=self.data_start,
1192 data_end=self.data_end,
1193 db_query_time=self.db_query_time,
1194 init_time=self.init_time,
1195 data_processing_time=self.data_processing_time + data_processing_time,
1196 )
1199class StringSignalData(SignalData):
1200 data_type: str = "str"
1201 values: list[str | None]
1202 forced_values: list[str | None]
1204 def interpolate(self, new_time_vector: list[float], items):
1205 # Find the indices of the values in xp that are just smaller or equal to x
1206 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1207 indices = npy.clip(indices, 0, len(items) - 1)
1208 # Return the corresponding left string values from fp
1209 return [items[i] for i in indices]
1212class SignalsData(TwinPadModel):
1213 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1214 data_processing_time: float
1215 data_start: float | None
1216 data_end: float | None
1218 @classmethod
1219 def get_from_signal_ids(
1220 cls,
1221 signal_ids: list[str],
1222 min_timestamp: float = None,
1223 max_timestamp: float = None,
1224 window_min_timestamp: float = None,
1225 window_max_timestamp: float = None,
1226 interpolate_bounds: bool = True,
1227 max_documents: int = None,
1228 ) -> Self:
1229 signals_data = []
1230 data_start = None
1231 data_end = None
1232 if max_timestamp is None:
1233 max_timestamp = time.time()
1234 data_processing_time = 0.0
1235 for signal_id in signal_ids:
1236 signal_data = SignalData.get_from_signal_id(
1237 signal_id=signal_id,
1238 min_timestamp=min_timestamp,
1239 max_timestamp=max_timestamp,
1240 window_min_timestamp=window_min_timestamp,
1241 window_max_timestamp=window_max_timestamp,
1242 interpolate_bounds=interpolate_bounds,
1243 max_documents=max_documents,
1244 )
1245 data_processing_time += signal_data.data_processing_time
1246 signals_data.append(signal_data)
1247 if signal_data.data_start is not None:
1248 if data_start is None:
1249 data_start = signal_data.data_start
1250 else:
1251 data_start = min(signal_data.data_start, data_start)
1252 if signal_data.data_end is not None:
1253 if data_end is None:
1254 data_end = signal_data.data_end
1255 else:
1256 data_end = max(signal_data.data_end, data_end)
1258 return cls(
1259 signals_data=signals_data,
1260 data_processing_time=data_processing_time,
1261 data_start=data_start,
1262 data_end=data_end,
1263 )
1265 @classmethod
1266 def get_from_phase_and_signal_ids(
1267 cls,
1268 phases: list,
1269 phase_sync_times: list[float | None],
1270 signal_ids: list[str],
1271 window_min_timestamps: list[float | None],
1272 window_max_timestamps: list[float | None],
1273 zero_time_vector: bool = True,
1274 ):
1275 signals_data: list[SignalData] = []
1276 computation_start = time.time()
1278 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip(
1279 phases, phase_sync_times, window_min_timestamps, window_max_timestamps
1280 ):
1281 min_timestamp = phase.start_at / 1000
1282 max_timestamp = phase.end_at / 1000
1284 if sync_time is None:
1285 sync_time = min_timestamp
1287 if window_max_timestamp is not None and window_min_timestamp is not None:
1288 window_length = window_max_timestamp - window_min_timestamp
1290 if window_min_timestamp != min_timestamp:
1291 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20)
1292 if window_max_timestamp != max_timestamp:
1293 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20)
1295 for signal_id in signal_ids:
1296 signal_data = SignalData.get_from_signal_id(
1297 signal_id,
1298 min_timestamp,
1299 max_timestamp,
1300 window_min_timestamp,
1301 window_max_timestamp,
1302 interpolate_bounds=False,
1303 max_documents=None,
1304 )
1306 if len(signal_data.time_vector) == 0:
1307 continue
1309 if zero_time_vector:
1310 signal_data = signal_data.zero_time_vector(sync_time)
1311 signal_data.phase_id = phase.id
1313 signals_data.append(signal_data)
1315 return cls(
1316 signals_data=signals_data,
1317 data_processing_time=time.time() - computation_start,
1318 data_start=0,
1319 data_end=0,
1320 )
1322 def uniform_desampling(self, number_samples_max: int) -> Self:
1323 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1324 return SignalsData(
1325 signals_data=signals_data,
1326 data_processing_time=sum(s.data_processing_time for s in signals_data),
1327 data_start=self.data_start,
1328 data_end=self.data_end,
1329 )
1331 def min_max_downsampling(self, number_samples_max: int) -> Self:
1332 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data]
1333 return SignalsData(
1334 signals_data=signals_data,
1335 data_processing_time=sum(s.data_processing_time for s in signals_data),
1336 data_start=self.data_start,
1337 data_end=self.data_end,
1338 )
1340 def interest_window_desampling(
1341 self,
1342 window_max_number_samples: int,
1343 outside_max_number_samples: int,
1344 window_min_timestamp: float = None,
1345 window_max_timestamp: float = None,
1346 ) -> Self:
1347 signals_data = [
1348 s.interest_window_desampling(
1349 window_max_number_samples=window_max_number_samples,
1350 outside_max_number_samples=outside_max_number_samples,
1351 window_min_timestamp=window_min_timestamp,
1352 window_max_timestamp=window_max_timestamp,
1353 )
1354 for s in self.signals_data
1355 ]
1357 return SignalsData(
1358 signals_data=signals_data,
1359 data_processing_time=sum(s.data_processing_time for s in signals_data),
1360 data_start=self.data_start,
1361 data_end=self.data_end,
1362 )
1364 def zero_time_vector(self, data_start: float):
1365 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data]
1366 return SignalsData(
1367 signals_data=signals_data,
1368 data_processing_time=sum(s.data_processing_time for s in signals_data),
1369 data_start=0,
1370 data_end=max([s.data_end for s in signals_data]),
1371 )
1373 @classmethod
1374 async def apply_single_function(
1375 cls,
1376 phase,
1377 base_signal_id: str,
1378 function: SINGLE_POST_PROCESSING_FUNCTION,
1379 window_min_timestamp: float = None,
1380 window_max_timestamp: float = None,
1381 ):
1382 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}"
1384 processed_result_signal = Signal.get_from_signal_id(signal_id)
1385 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids:
1386 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp)
1388 signals_data = cls.get_from_phase_and_signal_ids(
1389 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False
1390 )
1392 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0:
1393 return None
1395 new_values = None
1396 new_forced_values = None
1397 time_vector = npy.array(signals_data.signals_data[0].time_vector)
1398 values = signals_data.signals_data[0].values
1399 forced_values = signals_data.signals_data[0].forced_values
1401 match (function):
1402 case "Cumul":
1403 new_values = cumul(values)
1404 new_forced_values = cumul(forced_values)
1405 # case "CumulDistrib":
1406 # new_values = cumul_distrib(values)
1407 # new_forced_values = cumul_distrib(forced_values)
1408 case "Delta":
1409 new_values = delta(values)
1410 new_forced_values = delta(forced_values)
1411 case "DeltaT":
1412 new_values = delta(time_vector)
1413 new_forced_values = new_values
1414 case "Derive":
1415 new_values = derive(time_vector, values)
1416 new_forced_values = derive(time_vector, forced_values)
1417 case "Integ":
1418 new_values = integ(time_vector, values)
1419 new_forced_values = integ(time_vector, forced_values)
1421 new_values = npy.where(npy.isnan(new_values), None, new_values)
1422 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1424 loop = asyncio.get_running_loop()
1425 loop.create_task(
1426 cls.save_function_signal(
1427 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible
1428 )
1429 )
1431 if window_max_timestamp is not None:
1432 max_timestamp_mask = time_vector <= window_max_timestamp
1433 time_vector = time_vector[max_timestamp_mask]
1434 new_values = new_values[max_timestamp_mask]
1435 new_forced_values = new_forced_values[max_timestamp_mask]
1436 if window_min_timestamp is not None:
1437 min_timestamp_mask = time_vector >= window_min_timestamp
1438 time_vector = time_vector[min_timestamp_mask]
1439 new_values = new_values[min_timestamp_mask]
1440 new_forced_values = new_forced_values[min_timestamp_mask]
1442 signals_data.signals_data[0].time_vector = time_vector.tolist()
1443 signals_data.signals_data[0].values = new_values.tolist()
1444 signals_data.signals_data[0].forced_values = new_forced_values.tolist()
1445 signals_data.signals_data[0].number_samples = time_vector.size
1447 signals_data.signals_data[0].signal_id = signal_id
1449 return signals_data
1451 @classmethod
1452 async def apply_multiple_function(
1453 cls,
1454 phases: list,
1455 signal_ids: list,
1456 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
1457 window_min_timestamp: float = None,
1458 window_max_timestamp: float = None,
1459 ):
1460 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION):
1461 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}"
1462 else:
1463 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}"
1465 active_phase = phases[0]
1466 if function in {"Align-X", "Using-X"}:
1467 active_phase = phases[1]
1469 processed_result_signal = Signal.get_from_signal_id(function_signal_id)
1470 if processed_result_signal is not None and (
1471 active_phase.id in processed_result_signal.computed_phases_ids
1472 ): # If signal has been computed for the correct phase
1473 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp)
1475 array_length = None
1476 time_vector_list = []
1477 values_list = []
1478 forced_values_list = []
1479 forcible = True
1480 for phase, signal_id in zip(phases, signal_ids):
1481 signals_data = cls.get_from_phase_and_signal_ids(
1482 [phase], [None], [signal_id], [None], [None], zero_time_vector=False
1483 )
1485 if len(signals_data.signals_data) == 0:
1486 return None
1488 signal_data = signals_data.signals_data[0]
1490 if array_length is None:
1491 array_length = signal_data.number_samples
1492 if (
1493 array_length != signal_data.number_samples and function != "Align-X"
1494 ) or signal_data.number_samples == 0:
1495 return None
1497 time_vector_list.append(npy.array(signal_data.time_vector))
1498 values_list.append(npy.array(signal_data.values, dtype=npy.float64))
1499 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64))
1500 forcible = forcible and signal_data.forcible
1502 time_vector = time_vector_list[0]
1503 new_values = None
1504 new_forced_values = None
1506 match (function):
1507 case "Align-X":
1508 time_vector = time_vector_list[1]
1509 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000
1510 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000
1511 new_values = align_x(old_time_vector, values_list[0], new_time_vector)
1512 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector)
1513 # case "Atan2":
1514 # new_values = atan2(values_list[0], values_list[1])
1515 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1])
1516 case "Using-X":
1517 if len(time_vector_list[0]) != len(time_vector_list[1]):
1518 return None
1519 time_vector = time_vector_list[1]
1520 new_values = values_list[0]
1521 new_forced_values = forced_values_list[0]
1522 case "Mean":
1523 new_values = mean(*values_list)
1524 new_forced_values = mean(*forced_values_list)
1525 case "Norm":
1526 new_values = norm(*values_list)
1527 new_forced_values = norm(*forced_values_list)
1529 new_values = npy.where(npy.isnan(new_values), None, new_values)
1530 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1532 loop = asyncio.get_running_loop()
1533 loop.create_task(
1534 cls.save_function_signal(
1535 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible
1536 )
1537 )
1539 total_number_samples = time_vector.size
1541 if window_max_timestamp is not None:
1542 max_timestamp_mask = time_vector <= window_max_timestamp
1543 time_vector = time_vector[max_timestamp_mask]
1544 new_values = new_values[max_timestamp_mask]
1545 new_forced_values = new_forced_values[max_timestamp_mask]
1546 if window_min_timestamp is not None:
1547 min_timestamp_mask = time_vector >= window_min_timestamp
1548 time_vector = time_vector[min_timestamp_mask]
1549 new_values = new_values[min_timestamp_mask]
1550 new_forced_values = new_forced_values[min_timestamp_mask]
1552 signals_data = cls(
1553 signals_data=[
1554 NumericSignalData(
1555 signal_id=function_signal_id,
1556 forcible=forcible,
1557 time_vector=time_vector.tolist(),
1558 values=new_values.tolist(),
1559 forced_values=new_forced_values.tolist(),
1560 number_samples=time_vector.size,
1561 number_samples_db=total_number_samples,
1562 )
1563 ],
1564 data_processing_time=0,
1565 data_start=0,
1566 data_end=0,
1567 )
1569 return signals_data
1571 @classmethod
1572 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float):
1573 signal_data_collection = get_signal_collection(signal_id, create=True)
1574 pipeline = []
1575 match_filter = {}
1576 if window_min_timestamp is not None or window_max_timestamp is not None:
1577 match_filter["$match"] = {}
1578 match_filter["$match"]["precise_timestamp"] = {}
1579 if window_max_timestamp is not None:
1580 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp
1581 if window_min_timestamp is not None:
1582 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp
1584 total_number_samples = signal_data_collection.count_documents({})
1586 if match_filter:
1587 pipeline.append(match_filter)
1589 fetch_start = time.time()
1591 samples = signal_data_collection.aggregate(pipeline).to_list()
1592 new_time_vector = []
1593 new_values = []
1594 new_forced_values = []
1595 for sample in samples:
1596 new_time_vector.append(sample["precise_timestamp"])
1597 new_values.append(sample["value"])
1598 new_forced_values.append(sample["forced_value"])
1600 return cls(
1601 signals_data=[
1602 NumericSignalData(
1603 signal_id=signal_id,
1604 time_vector=new_time_vector,
1605 values=new_values,
1606 forced_values=new_forced_values,
1607 number_samples=len(new_time_vector),
1608 number_samples_db=total_number_samples,
1609 )
1610 ],
1611 data_processing_time=time.time() - fetch_start,
1612 data_start=0,
1613 data_end=0,
1614 )
1616 @classmethod
1617 async def save_function_signal(
1618 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool
1619 ):
1620 # Insert data first so if it is requested by another user, it will be computed again
1621 signal_collection = get_signal_collection(function_signal_id, create=True)
1622 signal_collection.delete_many(
1623 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}}
1624 )
1625 signal_collection.insert_many(
1626 [
1627 {
1628 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]),
1629 "precise_timestamp": time_vector[i],
1630 "value": new_values[i],
1631 "forced_value": new_forced_values[i],
1632 }
1633 for i in range(len(time_vector))
1634 ]
1635 )
1637 signals_config_collection = get_collection(systems_database, "signals", create=True)
1638 signals_config_collection.find_one_and_update(
1639 {"signal_id": function_signal_id},
1640 {
1641 "$set": {
1642 "description": "",
1643 "unit": None,
1644 "type": "sensor",
1645 "address": None,
1646 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0
1647 "transfer_function": None,
1648 "precision_digits": None,
1649 "digitization_function": None,
1650 "data_type": "float",
1651 "formula": None,
1652 "forcible": forcible,
1653 "commandable": False,
1654 "broadcastable": False,
1655 "signal_id": function_signal_id,
1656 "post_processing": True,
1657 },
1658 "$push": {"computed_phases_ids": phase.id},
1659 },
1660 upsert=True,
1661 )
1663 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []):
1664 if post_processing:
1665 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1666 zip_buffer = io.BytesIO()
1667 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1668 for signal_data in self.signals_data:
1669 file_name = signal_data.signal_id
1670 if post_processing:
1671 phase = phases_by_id.get(
1672 signal_data.phase_id,
1673 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1674 )
1675 file_name = f"{signal_data.signal_id} ({phase.name})"
1676 if file_format == "csv":
1677 export_io = signal_data.csv_export()
1678 zip_file.writestr(f"{file_name}.csv", export_io)
1679 elif file_format == "prestoplot":
1680 export_io = signal_data.prestoplot_export()
1681 zip_file.writestr(f"{file_name}.tab", export_io)
1682 else:
1683 raise ValueError(f"Format not found. Got: {file_format}")
1684 zip_bytes = zip_buffer.getvalue()
1685 return zip_bytes
1687 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []):
1688 if post_processing:
1689 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1690 hdf5_buffer = io.BytesIO()
1691 custom_type_float = npy.dtype(
1692 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1693 )
1694 custom_type_string = npy.dtype(
1695 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1696 )
1697 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1698 for signal_data in self.signals_data:
1699 if post_processing:
1700 phase = phases_by_id.get(
1701 signal_data.phase_id,
1702 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1703 )
1704 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})")
1705 else:
1706 signal_group = hdf5_file.create_group(signal_data.signal_id)
1707 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1708 if signal_data.data_type == "str":
1709 export_data = npy.array(
1710 list(
1711 zip(
1712 date_vector,
1713 signal_data.time_vector,
1714 signal_data.values,
1715 signal_data.forced_values,
1716 )
1717 ),
1718 dtype=custom_type_string,
1719 )
1720 else:
1721 export_data = npy.array(
1722 list(
1723 zip(
1724 date_vector,
1725 signal_data.time_vector,
1726 signal_data.values,
1727 signal_data.forced_values,
1728 )
1729 ),
1730 dtype=custom_type_float,
1731 )
1732 signal_group["data"] = export_data
1733 return hdf5_buffer.getvalue()
1736class SignalStatus(TwinPadModel):
1737 status: str = "down"
1738 reason: str = ""
1739 delay: float | None = None
1742class DigitizationFunction(TwinPadModel):
1743 bits: int | None = None
1744 min_value: float
1745 max_value: float
1746 min_raw_value: float
1747 max_raw_value: float
1750class SignalUpdate(TwinPadModel):
1751 value: float | str | bool | int | None = None
1752 forced_value: float | str | bool | int | None = None
1753 timestamp: int | None = None
1756class SignalType(str, Enum):
1757 command = "command"
1758 sensor = "sensor"
1759 external_sensor = "external_sensor"
1762SIGNALDATA_TYPES = {
1763 "int": NumericSignalData,
1764 "float": NumericSignalData,
1765 "str": StringSignalData,
1766 "bool": NumericSignalData,
1767 "epoch": NumericSignalData,
1768}
1771class Signal(GenericMongo):
1772 collection_name: ClassVar[str] = "signals"
1774 signal_id: str
1775 frequency: float
1776 unit: str | None
1777 description: str
1778 type: SignalType
1779 data_type: str
1780 precision_digits: int | None
1781 forcible: bool
1782 status: SignalStatus = SignalStatus()
1784 post_processing: bool = False
1785 computed_phases_ids: list[str] = []
1787 digitization_function: DigitizationFunction | None
1789 @property
1790 def device(self) -> Device:
1791 device_id = self.signal_id.split(".")[0]
1792 device = Device.get_one_by_attribute("device_id", device_id)
1793 return device
1795 @cached_property
1796 def signal_data_class(self):
1797 if self.data_type in SIGNALDATA_TYPES:
1798 return SIGNALDATA_TYPES[self.data_type]
1799 if self.data_type.startswith("enum"):
1800 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1801 raise ValueError(f"Unhandled python type: {self.data_type}")
1803 @cached_property
1804 def python_type(self):
1805 if self.data_type in TYPES:
1806 return TYPES[self.data_type]
1807 if self.data_type.startswith("enum"):
1808 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1809 return Literal[*choices]
1810 raise ValueError(f"Unhandled python type: {self.data_type}")
1812 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1813 command = Command(
1814 sent_at=time.time(),
1815 command_type="Signal command",
1816 user_id=current_user.id,
1817 )
1819 has_input_error = False
1820 error_message = ""
1822 if self.data_type.startswith("enum"):
1823 enum_options = get_args(self.python_type)
1825 if update_dict.value is not None and update_dict.value not in enum_options:
1826 has_input_error = True
1827 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1828 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1829 has_input_error = True
1830 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1831 else:
1832 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1833 has_input_error = True
1834 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1835 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1836 has_input_error = True
1837 error_message += (
1838 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1839 )
1841 if has_input_error:
1842 command.response_time = 0
1843 command.succeeded = False
1844 command.description = f"Tried to modify signal {self.signal_id}"
1845 response = {"error": True, "status_code": 400, "message": error_message}
1846 else:
1847 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict)
1848 command.receive_response(response)
1850 Command.create(command)
1851 return response
1853 @classmethod
1854 def get_from_signal_id(cls, signal_id) -> Self:
1855 """Could be generic from mongo"""
1856 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1857 if not raw_value:
1858 return None
1859 del raw_value["_id"]
1860 return cls.dict_to_object(raw_value)
1862 @classmethod
1863 def get_all_ids(cls) -> list[str]:
1864 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1866 return [signal["signal_id"] for signal in cursor]
1868 @classmethod
1869 def get_all_statuses(cls) -> list[dict]:
1870 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}])
1872 return [
1873 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]}
1874 for signal in cursor
1875 ]
1877 async def number_samples(self):
1878 collection = get_signal_collection(signal_id=self.signal_id)
1879 if collection is None:
1880 return 0
1882 number_samples = collection.estimated_document_count()
1884 number_samples_async_collection = await get_async_collection(
1885 systems_async_database, "number_samples", create=True, time_series=True
1886 )
1888 loop = asyncio.get_running_loop()
1889 loop.create_task(
1890 number_samples_async_collection.insert_one(
1891 {
1892 "timestamp": datetime.datetime.now(pytz.UTC),
1893 "signal_id": self.signal_id,
1894 "number_samples": number_samples,
1895 }
1896 )
1897 )
1899 return number_samples
1901 @classmethod
1902 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]:
1903 number_samples_by_id = {}
1904 collections = get_signal_collections_batch(signal_ids)
1905 number_samples_async_collection = await get_async_collection(
1906 systems_async_database, "number_samples", create=True, time_series=True
1907 )
1909 for signal_id, collection in zip(signal_ids, collections):
1910 if collection is None:
1911 number_samples_by_id[signal_id] = 0
1912 continue
1914 number_samples = collection.estimated_document_count()
1916 number_samples_by_id[signal_id] = number_samples
1918 now = datetime.datetime.now(pytz.UTC)
1919 loop = asyncio.get_running_loop()
1920 loop.create_task(
1921 number_samples_async_collection.insert_many(
1922 [
1923 {
1924 "timestamp": now,
1925 "signal_id": signal_id,
1926 "number_samples": number_samples,
1927 }
1928 for signal_id, number_samples in number_samples_by_id.items()
1929 ]
1930 )
1931 )
1933 return number_samples_by_id
1935 def sample_datasize(self):
1936 return signals_database.command("collstats", self.signal_id)["size"]
1938 @classmethod
1939 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1940 result = cls.collection().aggregate(
1941 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1942 )
1944 return {signal["signal_id"]: signal["forcible"] for signal in result}
1947class ForcedSignal(GenericMongo):
1948 collection_name: ClassVar[str] = "forced_signals"
1950 signal_id: str
1951 forcing_user_id: str
1952 forced_at: float
1953 value: str | float
1955 def insert(self):
1956 insert_result = self.collection().find_one_and_update(
1957 {"signal_id": self.signal_id},
1958 {"$set": self.to_dict(exclude={"id"})},
1959 upsert=True,
1960 return_document=ReturnDocument.AFTER,
1961 )
1962 self.id = str(insert_result["_id"])
1963 return self.id
1965 @classmethod
1966 def can_force(cls, signal_id: str, current_user: User) -> bool:
1967 """Checks whether user can force a given signal.
1969 :param signal_id: Signal ID of the signal to force
1970 :type signal_id: str
1971 :param current_user: Current user
1972 :type current_user: User
1973 :return: False if the signal was forced by someone else than the user, True otherwise
1974 :rtype: bool
1975 """
1976 forced_signal = cls.get_one_by_attribute("signal_id", signal_id)
1977 if forced_signal is not None:
1978 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
1979 return False
1980 return True
1983class ServicesStatus(TwinPadModel):
1984 backend: str
1985 cloud_broker: str
1986 time_series_database: str
1987 signal_storage: str
1988 heartbeat_storage: str
1989 data_analyzer: str
1991 @classmethod
1992 def check(cls) -> Self:
1993 return cls(
1994 cloud_broker=ping(RABBITMQ_HOST),
1995 backend="up",
1996 time_series_database=ping(MONGO_HOST),
1997 signal_storage=ping(SIGNAL_STORAGE_HOST),
1998 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1999 data_analyzer=ping(DATA_ANALYZER_HOST),
2000 )
2003def ping(host):
2004 try:
2005 if ping3.ping(host, timeout=0.8):
2006 return "up"
2007 except PermissionError:
2008 pass
2009 return "down"
2012class Event(GenericMongo):
2013 collection_name: ClassVar[str] = "events"
2015 name: str
2016 timestamp: float
2017 event_rule_id: str
2019 @computed_field
2020 @cached_property
2021 def event_rule(self) -> "EventRule":
2022 return EventRule.get_from_id(self.event_rule_id)
2024 @classmethod
2025 def dict_to_object(cls, dict_):
2026 """Refine to convert timestamp to datetime for mongodb."""
2027 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
2028 return super().dict_to_object(dict_)
2031class TwinPadActivity(GenericMongo):
2032 timestamp: float
2033 amount: int
2035 @classmethod
2036 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
2037 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2038 number_events_collection = get_collection(systems_database, "number_events")
2039 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
2040 items = []
2041 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2042 if number_events_collection is None or recompute_amount:
2043 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
2044 number_events_collection.delete_many({})
2045 first_event = events_collection.find_one(sort={"timestamp": 1})
2046 if first_event is None:
2047 return items
2048 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
2049 tzinfo=pytz.UTC
2050 )
2051 while last_computed_day < TODAY:
2052 day_nb_events = events_collection.count_documents(
2053 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2054 )
2055 if day_nb_events > 0:
2056 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
2057 last_computed_day += ONE_DAY_OFFSET
2058 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
2059 if number_events_today > 0:
2060 number_events_collection.delete_many({"timestamp": TODAY})
2061 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
2062 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2063 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2064 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2065 for day in number_events:
2066 day["timestamp"] = day["timestamp"].timestamp()
2067 items.append(cls.mongo_dict_to_object(day))
2068 return items
2070 @classmethod
2071 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2072 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2073 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
2074 signals_number_samples_collection = get_collection(
2075 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
2076 )
2077 items = []
2078 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2079 if number_samples_collection is None or recompute_amount:
2080 number_samples_collection = get_collection(
2081 systems_database, "number_received_samples", create=True, time_series=True
2082 )
2083 number_samples_collection.delete_many({})
2084 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
2085 if first_sample is None:
2086 return items
2087 # compute from day of first found event
2088 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
2089 tzinfo=pytz.UTC
2090 )
2091 while last_computed_day < TODAY:
2092 number_samples_request = signals_number_samples_collection.aggregate(
2093 [
2094 {
2095 "$match": {
2096 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
2097 }
2098 },
2099 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2100 ]
2101 ).to_list()
2102 if len(number_samples_request) == 0:
2103 number_samples = 0
2104 else:
2105 number_samples = number_samples_request[0].get("number_samples", 0)
2106 if number_samples > 0:
2107 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
2108 last_computed_day += ONE_DAY_OFFSET
2109 number_samples_request = signals_number_samples_collection.aggregate(
2110 [
2111 {"$match": {"timestamp": {"$gte": TODAY}}},
2112 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2113 ]
2114 ).to_list()
2115 if len(number_samples_request) == 0:
2116 number_samples_today = 0
2117 else:
2118 number_samples_today = number_samples_request[0].get("number_samples", 0)
2119 if number_samples_today > 0:
2120 number_samples_collection.delete_many({"timestamp": TODAY})
2121 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
2122 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2123 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2124 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2125 for day in number_events:
2126 day["timestamp"] = day["timestamp"].timestamp()
2127 items.append(cls.mongo_dict_to_object(day))
2128 return items
2130 @classmethod
2131 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2132 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2133 number_commands_collection = get_collection(systems_database, "number_commands")
2134 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
2135 items = []
2136 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2137 if number_commands_collection is None or recompute_amount:
2138 number_commands_collection = get_collection(
2139 systems_database, "number_commands", create=True, time_series=True
2140 )
2141 number_commands_collection.delete_many({})
2142 first_command = commands_collection.find_one(sort={"timestamp": 1})
2143 if first_command is None:
2144 return items
2145 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
2146 tzinfo=pytz.UTC
2147 )
2148 while last_computed_day < TODAY:
2149 day_nb_commands = commands_collection.count_documents(
2150 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2151 )
2152 if day_nb_commands > 0:
2153 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
2154 last_computed_day += ONE_DAY_OFFSET
2155 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
2156 if number_commands_today > 0:
2157 number_commands_collection.delete_many({"timestamp": TODAY})
2158 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
2159 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2160 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2161 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2162 for day in number_commands:
2163 day["timestamp"] = day["timestamp"].timestamp()
2164 items.append(cls.mongo_dict_to_object(day))
2165 return items
2168class EventRule(GenericMongo):
2169 collection_name: ClassVar[str] = "event_rules"
2171 name: str
2172 formula: str
2173 variables: list[str]
2175 @computed_field
2176 @cached_property
2177 def number_events(self) -> int:
2178 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
2181class Company(GenericMongo):
2182 collection_name: ClassVar[str] = "companies"
2183 name: str
2186class Campaign(GenericMongo):
2187 collection_name: ClassVar[str] = "campaigns"
2189 # Properties
2190 id: str | None = None
2191 name: str
2192 description: str | None = None
2195class Phase(GenericMongo):
2196 collection_name: ClassVar[str] = "phases"
2198 # Properties
2199 id: str | None = None
2200 name: str
2201 description: str | None = None
2202 start_at: float
2203 end_at: float
2205 # FK
2206 campaign_id: MongoId
2208 @classmethod
2209 def deleteMany(cls, campaign_id):
2210 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
2211 return delete_phases
2214class CustomViewCreation(GenericMongo):
2215 collection_name: ClassVar[str] = "custom_views"
2217 name: str
2218 configuration: list
2221class CustomView(CustomViewCreation):
2222 # Properties
2223 id: str | None = None
2225 # Foreign Key
2226 user_id: str
2229CustomViewUpdate = create_update_model(CustomView)
2232class Video(GenericMongo):
2233 collection_name: ClassVar[str] = "videos"
2235 # Properties
2236 name: str
2237 ip_addr: str
2238 username: str | None = None
2239 password: str | None = None
2241 # Methods
2242 @classmethod
2243 def get_all(cls, sort_by="_id") -> list[Self]:
2244 items = []
2245 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
2246 items.append(cls.mongo_dict_to_object(dict_))
2247 return items
2249 @classmethod
2250 def get_video(cls, camera_id: ObjectId):
2251 camera = cls.get_from_id(camera_id)
2252 if camera is not None:
2253 return camera.name
2254 return None
2257class Command(GenericMongo):
2258 collection_name: ClassVar[str] = "commands"
2260 # Properties
2261 timestamp: datetime.datetime = None
2262 sent_at: float
2263 response_time: float = 0.0
2264 command_type: str
2265 description: str = ""
2266 succeeded: bool = False
2268 # Foreign key
2269 user_id: str
2271 @classmethod
2272 def collection(cls):
2273 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
2275 @classmethod
2276 def create(cls, command: Self):
2277 command = cls(
2278 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
2279 sent_at=command.sent_at,
2280 response_time=command.response_time,
2281 command_type=command.command_type,
2282 description=command.description,
2283 succeeded=command.succeeded,
2284 user_id=command.user_id,
2285 )
2286 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
2287 if new_command is None:
2288 return None
2289 return {"command_id": str(new_command.inserted_id)}
2291 def receive_response(self, response: dict):
2292 self.response_time = time.time() - self.sent_at
2293 self.succeeded = response.get("error", True) is False
2294 if self.description == "":
2295 self.description += response.get("message", "").rstrip()
2298class SignalsPresetCreation(GenericMongo):
2299 name: str
2300 signal_ids: list[str]
2303class SignalsPreset(SignalsPresetCreation):
2304 collection_name: ClassVar[str] = "signals_presets"
2306 user_id: str
2308 @classmethod
2309 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
2310 signals_preset = cls(
2311 user_id=user_id,
2312 name=signals_preset.name,
2313 signal_ids=signals_preset.signal_ids,
2314 )
2316 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
2318 return str(new_signal_preset.inserted_id)
2321SignalsPresetUpdate = create_update_model(SignalsPreset)
2324class LineStyle(str, Enum):
2325 solid = "solid"
2326 dotted = "dotted"
2327 dashed = "dashed"
2330class SignalAppearance:
2331 value_color: str
2332 forced_value_color: str
2335class GraphThemeCreation(GenericMongo):
2336 collection_name: ClassVar[str] = "graph_themes"
2338 name: str
2339 signal_id: str
2340 value_color: str = ""
2341 forced_value_color: str = ""
2342 value_line_style: LineStyle = LineStyle.solid
2343 forced_value_line_style: LineStyle = LineStyle.solid
2344 private: bool = True
2347class PublicGraphTheme(GraphThemeCreation):
2348 created_by_user: bool
2349 in_user_library: bool
2350 active_for_user: bool
2352 _current_user_id: str = ""
2354 @classproperty
2355 def custom_pipeline_steps(cls) -> dict[str, list]:
2356 return {
2357 "created_by_user": [
2358 {
2359 "$addFields": {
2360 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
2361 }
2362 }
2363 ],
2364 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
2365 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
2366 ],
2367 "in_user_library": [
2368 {
2369 "$addFields": {
2370 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2371 }
2372 }
2373 ],
2374 "active_for_user": [
2375 {
2376 "$addFields": {
2377 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2378 }
2379 }
2380 ],
2381 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2382 "active": [
2383 {
2384 "$addFields": {
2385 "active": "$$REMOVE",
2386 }
2387 }
2388 ],
2389 "creator_id": [
2390 {
2391 "$addFields": {
2392 "creator_id": "$$REMOVE",
2393 }
2394 }
2395 ],
2396 }
2398 @classmethod
2399 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2400 cls._current_user_id = user_id
2401 return super().response_from_query(query)
2403 @classmethod
2404 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2405 query.in_user_library = "true"
2406 return cls.response_from_query(query, user_id)
2408 @classmethod
2409 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2410 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2412 @classmethod
2413 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2414 cls._current_user_id = user_id
2415 return super().get_by_attribute(attribute_name, attribute_value)
2417 @classmethod
2418 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2419 cls._current_user_id = user_id
2420 return super().get_one_by_attribute(attribute_name, attribute_value)
2422 @classmethod
2423 def get_all(cls, sort_by: str, user_id: str):
2424 cls._current_user_id = user_id
2425 return super().get_all(sort_by)
2427 @classmethod
2428 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2429 pipeline = [
2430 {
2431 "$match": {
2432 "active": {"$eq": user_id},
2433 "signal_id": {"$in": signal_ids},
2434 }
2435 },
2436 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2437 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2438 {
2439 "$project": {
2440 "_id": 0,
2441 "signal_id": 1,
2442 "value_color": 1,
2443 "forced_value_color": 1,
2444 "value_line_style": 1,
2445 "forced_value_line_style": 1,
2446 }
2447 },
2448 ]
2450 result = {}
2452 cursor = cls.collection().aggregate(pipeline)
2453 for document in cursor:
2454 signal_id = document["signal_id"]
2455 del document["signal_id"]
2456 result[signal_id] = document
2458 return result
2461GraphThemeUpdate = create_update_model(PublicGraphTheme)
2464class PrivateGraphTheme(GraphThemeCreation):
2465 # private
2466 creator_id: str
2467 in_library: list[str]
2468 active: list[str]
2470 @classmethod
2471 def create(
2472 cls,
2473 creator_id: str,
2474 name: str,
2475 signal_id: str,
2476 value_color: str,
2477 forced_value_color: str,
2478 value_line_style: LineStyle,
2479 forced_value_line_style: LineStyle,
2480 private: bool,
2481 ):
2482 color_setting = cls(
2483 creator_id=creator_id,
2484 name=name,
2485 signal_id=signal_id,
2486 value_color=value_color,
2487 forced_value_color=forced_value_color,
2488 value_line_style=value_line_style,
2489 forced_value_line_style=forced_value_line_style,
2490 private=private,
2491 in_library=[creator_id],
2492 active=[creator_id],
2493 )
2495 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2496 color_setting.id = str(new_color_setting.inserted_id)
2497 return color_setting
2499 def update(self, update_dict: dict, user_id: str):
2500 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2501 if in_user_lib and user_id not in self.in_library:
2502 self.in_library.append(user_id)
2503 elif not in_user_lib and user_id in self.in_library:
2504 self.in_library.remove(user_id)
2505 update_dict["in_library"] = self.in_library
2506 del update_dict["in_user_library"]
2508 if (active_for_user := update_dict.get("active_for_user")) is not None:
2509 if active_for_user and user_id not in self.active:
2510 self.active.append(user_id)
2511 elif not active_for_user and user_id in self.active:
2512 self.active.remove(user_id)
2513 update_dict["active"] = self.active
2514 del update_dict["active_for_user"]
2516 if update_dict.get("created_by_user") is not None:
2517 del update_dict["created_by_user"]
2519 self.collection().find_one_and_update(
2520 {"_id": ObjectId(self.id)},
2521 {"$set": update_dict},
2522 )
2524 return {}
2527class DeviceStatus(str, Enum):
2528 started = "started"
2529 running = "running"
2530 created = "created"
2531 exited = "exited"
2532 restarting = "restarting"
2535class DeviceUpdateFromDeployer(BaseModel):
2536 status: DeviceStatus
2539class DeviceFromDeployerCreation(BaseModel):
2540 name: str
2541 description: str
2544class DeviceFromDeployer(DeviceFromDeployerCreation):
2545 status: DeviceStatus
2546 device_id: DeviceId
2547 logs: str = ""
2550class DeviceDeployer(GenericMongo):
2551 collection_name: ClassVar[str] = "device_deployers"
2552 url: HttpUrl
2554 def endpoint_url(self, endpoint):
2555 return f"{str(self.url).rstrip('/')}/{endpoint}"
2557 def devices(self) -> list[DeviceFromDeployer]:
2558 devices = []
2559 try:
2560 response = requests.get(self.endpoint_url("devices"))
2561 except requests.exceptions.ConnectionError:
2562 logger.info("connection error")
2563 return None
2564 if response.status_code != 200:
2565 return None
2566 for device_dict in response.json()["devices"]:
2567 devices.append(
2568 DeviceFromDeployer(
2569 device_id=device_dict["device_id"],
2570 name=device_dict["container_name"],
2571 description="desc",
2572 status=device_dict["status"],
2573 logs=device_dict["logs"],
2574 )
2575 )
2576 return devices
2578 def get_device(self, device_id: DeviceId):
2579 try:
2580 response = requests.get(self.endpoint_url(f"devices/{device_id}"))
2581 except requests.exceptions.ConnectionError:
2582 return None
2583 if response.status_code != 200:
2584 return None
2585 device_dict = response.json()
2586 return DeviceFromDeployer(
2587 device_id=device_dict["device_id"],
2588 name=device_dict["container_name"],
2589 description="desc",
2590 status=device_dict["status"],
2591 logs=device_dict["logs"],
2592 )
2594 def create_device(self, device: DeviceFromDeployer) -> Device | None:
2595 try:
2596 response = requests.post(self.endpoint_url("devices"), json={"name": device.name})
2597 except requests.exceptions.ConnectionError:
2598 return None
2600 if response.status_code != 201:
2601 return None
2603 device_dict = response.json()
2604 return DeviceFromDeployer(
2605 device_id=device_dict["device_id"],
2606 name="",
2607 description="desc",
2608 status=device_dict["status"],
2609 )
2611 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None:
2612 try:
2613 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump())
2614 except requests.exceptions.ConnectionError:
2615 return None
2617 if response.status_code != 200:
2618 return None
2620 device_dict = response.json()
2621 return Device(
2622 device_id=device_dict["device_id"],
2623 name="",
2624 description="desc",
2625 pid={},
2626 petri_network={},
2627 modes=[],
2628 status=device_dict["status"],
2629 )
2631 def delete_device(self, device_id: DeviceId) -> DeleteInfo:
2632 try:
2633 response = requests.delete(self.endpoint_url(f"devices/{device_id}"))
2634 except requests.exceptions.ConnectionError:
2635 return DeleteInfo(is_deleted=False, detail="Connection to deployer error")
2636 if response.status_code not in [200, 202, 204]:
2637 return DeleteInfo(is_deleted=False, detail=response.text)
2639 return DeleteInfo(is_deleted=True, detail="")
2642DeviceDeployerUpdate = create_update_model(DeviceDeployer)