Coverage for / usr / local / lib / python3.14 / site-packages / twinpad_backend / models.py: 96%
1365 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-28 09:12 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-28 09:12 +0000
1from functools import cached_property
2import os
3import re
4import io
5import time
6import csv
7from typing import Self, ClassVar, Any, Literal, get_args, Annotated
8import datetime
9import math
10import bisect
11from enum import Enum
12import logging
13import copy
14import asyncio
15import requests
17import zipfile
18import ping3
19import pytz
20from bson.objectid import ObjectId
21from pymongo import ASCENDING, ReturnDocument
22from pydantic import BaseModel, HttpUrl, computed_field, Field, create_model, BeforeValidator
23import numpy as npy
24import lttb
25import h5py
27from twinpad_backend.db import (
28 get_collection,
29 get_async_collection,
30 get_signal_collection,
31 get_signal_collections_batch,
32 systems_database,
33 systems_async_database,
34 signals_database,
35 signals_async_database,
36 devices_states_database,
37)
38from twinpad_backend.responses import ListResponse
39from twinpad_backend.messages import RabbitMQClient
40from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm
42TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
43SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"]
44DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"]
45MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"]
48RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
49MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
50SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
51HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
52DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
54DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
55NUMBER_SAMPLES_DATABASE_UPDATE = 120
57logger = logging.getLogger("uvicorn.error")
60class DeleteInfo(BaseModel):
61 is_deleted: bool
62 detail: str
65class classproperty:
66 """
67 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
68 Found here: https://stackoverflow.com/a/76301341
69 """
71 def __init__(self, func):
72 self.fget = func
74 def __get__(self, _, owner):
75 return self.fget(owner)
78def create_update_model(model):
79 fields = {}
81 for field_name, field_annotation in model.model_fields.items():
82 if field_name != "id":
83 fields[field_name] = (field_annotation.annotation | None, None)
85 query_name = model.__name__ + "Update"
86 return create_model(query_name, **fields)
89def get_utc_date_from_timestamp(timestamp: float):
90 return datetime.datetime.fromtimestamp(timestamp).isoformat()
93def downsample_list(time_vector: list, values: list, max_number_samples: int):
94 if len(time_vector) < max_number_samples:
95 return time_vector, values
97 time_vector_copy = copy.deepcopy(time_vector)
98 values_copy = copy.deepcopy(values)
100 none_group_bounds = []
101 none_group_index = -1
102 index = -1
103 # LTTB doesn't handle None values so remove them
104 while values_copy.count(None) > 0:
105 # Store bounds of None value groups so we can insert them back after the downsampling
106 if (new_index := values_copy.index(None)) != index:
107 none_group_bounds.append([time_vector_copy.pop(new_index)])
108 none_group_index += 1
109 elif len(none_group_bounds[none_group_index]) < 2:
110 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
111 else:
112 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
113 values_copy.pop(new_index)
114 index = new_index
115 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
117 try:
118 values_array = npy.array([time_vector_copy, values_copy]).T
119 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
121 new_time_vector = interpolated_values[:, 0].tolist()
122 new_values = interpolated_values[:, 1].tolist()
123 except ValueError:
124 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
125 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
126 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
127 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
128 return new_time_vector, new_values_nan_to_none
130 # insert back None values at the correct timestamps
131 for none_group in none_group_bounds:
132 start_index = npy.searchsorted(new_time_vector, none_group[0])
133 new_time_vector[start_index:start_index] = none_group
134 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
136 return new_time_vector, new_values
139def is_of_type(value, wanted_type):
140 if wanted_type is float:
141 return isinstance(value, (int, float))
142 return isinstance(value, wanted_type)
145# Models
146class TwinPadModel(BaseModel):
147 @classmethod
148 def dict_to_object(cls, dict_):
149 return cls.model_validate(dict_)
151 def to_dict(self, exclude=None):
152 dict_ = self.model_dump(exclude=exclude, mode="json")
153 return dict_
156def validate_mongo_id(v):
157 if not ObjectId.is_valid(v):
158 raise ValueError("Invalid MongoDB id")
159 return str(v)
162MongoId = Annotated[str, BeforeValidator(validate_mongo_id)]
165def validate_12_hex(v: str) -> str:
166 if not re.fullmatch(r"[0-9a-fA-F]{12}", v):
167 raise ValueError("ID must be a 12-character hexadecimal string")
168 return v
171DeviceId = Annotated[str, BeforeValidator(validate_12_hex)]
174class GenericMongo(TwinPadModel):
175 id: MongoId | None = None
176 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
178 @classmethod
179 def collection(cls):
180 return get_collection(systems_database, cls.collection_name, create=True)
182 @classmethod
183 def response_from_query(cls, query) -> ListResponse[Self]:
184 request_filters = query.mongodb_filter()
185 items = []
187 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
188 sort_dict = {}
189 for sort in query.sort_by.split(","):
190 if ":" in sort:
191 sort_field, sort_order = sort.split(":")
192 sort_order = int(sort_order)
193 else:
194 sort_field = sort
195 sort_order = 1
196 sort_dict[sort_field] = sort_order
198 collection = get_collection(systems_database, cls.collection_name, create=True)
199 total = collection.count_documents(request_filters)
201 pipeline = []
202 added_properties = []
203 if "$and" in request_filters:
204 for request_filter in request_filters["$and"]:
205 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
206 if filtered_property in request_filter:
207 pipeline.extend(pipeline_steps)
208 added_properties.append(filtered_property)
209 else:
210 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
211 if filtered_property in request_filters:
212 pipeline.extend(pipeline_steps)
213 added_properties.append(filtered_property)
214 pipeline.append({"$match": request_filters})
216 for sort_field in sort_dict.keys():
217 if sort_field in cls.custom_pipeline_steps:
218 pipeline.extend(cls.custom_pipeline_steps[sort_field])
219 added_properties.append(sort_field)
220 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
222 if (query.limit is not None) and (query.limit != 0):
223 pipeline.append({"$limit": query.limit})
225 for filtered_property, step in cls.custom_pipeline_steps.items():
226 if filtered_property not in added_properties:
227 pipeline.extend(step)
229 cursor = collection.aggregate(pipeline)
231 for item_dict in cursor:
232 items.append(cls.mongo_dict_to_object(item_dict))
234 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
236 @classmethod
237 def get_from_id(cls, item_id) -> Self | None:
238 return cls.get_one_by_attribute("_id", ObjectId(item_id))
240 @classmethod
241 def mongo_dict_to_object(cls, mongo_dict):
242 mongo_dict["id"] = str(mongo_dict["_id"])
243 del mongo_dict["_id"]
244 return cls.dict_to_object(mongo_dict)
246 @classmethod
247 def get_by_attribute(cls, attribute_name: str, attribute_value):
248 """Returns all items that match the attribute with value."""
249 pipeline = []
250 if attribute_name in cls.custom_pipeline_steps:
251 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
252 pipeline.append({"$match": {attribute_name: attribute_value}})
253 for key, step in cls.custom_pipeline_steps.items():
254 if key != attribute_name:
255 pipeline.extend(step)
256 items = cls.collection().aggregate(pipeline)
257 if items is None:
258 return None
259 return [cls.mongo_dict_to_object(d) for d in items]
261 @classmethod
262 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
263 pipeline = []
264 if attribute_name in cls.custom_pipeline_steps:
265 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
266 pipeline.append({"$match": {attribute_name: attribute_value}})
267 pipeline.append({"$limit": 1})
268 for key, step in cls.custom_pipeline_steps.items():
269 if key != attribute_name:
270 pipeline.extend(step)
271 items = cls.collection().aggregate(pipeline).to_list()
272 if len(items) == 0:
273 return None
274 return cls.mongo_dict_to_object(items[0])
276 @classmethod
277 def get_all(cls, sort_by="_id") -> list[Self]:
278 items = []
279 pipeline = []
280 if sort_by in cls.custom_pipeline_steps:
281 pipeline.extend(cls.custom_pipeline_steps[sort_by])
282 pipeline.append({"$sort": {sort_by: ASCENDING}})
283 for key, step in cls.custom_pipeline_steps.items():
284 if key != sort_by:
285 pipeline.extend(step)
286 for dict_ in cls.collection().aggregate(pipeline):
287 items.append(cls.mongo_dict_to_object(dict_))
288 return items
290 @classmethod
291 def get_number_documents(cls):
292 collection = get_collection(systems_database, cls.collection_name)
293 if collection is None:
294 return 0
295 return collection.count_documents(
296 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]}
297 )
299 def insert(self):
300 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"}))
301 self.id = str(insert_result.inserted_id)
302 return self.id
304 def update(self, update_dict):
305 for key, value in update_dict.items():
306 setattr(self, key, value)
307 self.collection().find_one_and_update(
308 {"_id": ObjectId(self.id)},
309 {"$set": update_dict},
310 return_document=ReturnDocument.AFTER,
311 )
313 return self
315 def delete(self):
316 result = self.collection().delete_one({"_id": ObjectId(self.id)})
317 return result.deleted_count > 0
320class User(GenericMongo):
321 collection_name: ClassVar[str] = "users"
323 firstname: str
324 lastname: str
325 email: str
326 password: str
327 is_active: bool | None = False
328 is_admin: bool | None = False
329 is_connected: bool | None = False
330 company_id: str | None = None
332 def to_dict(self, exclude: set = set()):
333 exclude.add("password")
334 return GenericMongo.to_dict(self, exclude=exclude)
336 @classmethod
337 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
338 users = cls.get_all()
339 if not users:
340 is_admin = True
341 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
342 user_collection = get_collection(systems_database, "users", create=True)
343 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
344 if new_user is None:
345 return None
346 return {"user_id": str(new_user.inserted_id)}
348 @classmethod
349 def update_info(cls, user: "UserUpdate", user_id: str):
350 updated_user = cls.collection().find_one_and_update(
351 {"_id": ObjectId(user_id)},
352 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
353 return_document=ReturnDocument.AFTER,
354 )
355 updated_user["id"] = str(updated_user["_id"])
356 del (updated_user["_id"], updated_user["is_connected"])
357 return cls(**updated_user)
360UserUpdate = create_update_model(User)
363class Mode(TwinPadModel):
364 mode_id: int
365 name: str
366 frequency_multiplier: float
367 min_frequency: float
370class DeviceUpdate(TwinPadModel):
371 mode_id: int
374class Device(GenericMongo):
375 collection_name: ClassVar[str] = "devices"
377 device_id: DeviceId
378 name: str
379 description: str = ""
380 modes: list[Mode]
381 current_mode_id: int | None = None
382 last_ping: float | None = None
383 petri_network: Any
384 pid: Any
385 load: float | None = None
386 tokens: list[int] = Field(default_factory=list)
387 status: str
389 async def change_mode(self, update_dict, current_user: User):
390 has_error = False
392 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
393 has_error = True
394 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
395 elif self.current_mode_id is not None:
396 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
397 else:
398 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
399 command = Command(
400 sent_at=time.time(),
401 command_type="Mode change",
402 description=description,
403 user_id=current_user.id,
404 )
406 if has_error:
407 command.response_time = 0
408 command.succeeded = False
409 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
410 else:
411 response = await RabbitMQClient().send_mode_change(self.device_id, update_dict.mode_id)
412 command.receive_response(response)
414 Command.create(command)
415 return response
417 @classmethod
418 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
419 devices_by_id = {}
420 for signal_id in signal_ids:
421 device_id = signal_id.split(".")[0]
422 if device_id not in devices_by_id:
423 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
424 return devices_by_id
427class DeviceSetup(GenericMongo):
428 collection_name: ClassVar[str] = "device_setups"
430 device_ids: list[str]
431 active: bool = False
432 variable_mapping: dict[str, str]
435DeviceSetupUpdate = create_update_model(DeviceSetup)
438class DeviceState(GenericMongo):
439 collection_name: ClassVar[str] = "devices_states"
441 timestamp: float
442 mode: str | None = None
443 load: float | None = None
444 tokens: list[int] = Field(default_factory=list)
445 modified_properties: list[str] = Field(default_factory=list)
447 @classmethod
448 def get_from_id_and_query(cls, device_id: DeviceId, query) -> ListResponse[Self]:
449 req_filter = query.mongodb_filter()
450 items = []
451 if ":" in query.sort_by:
452 sort_field, sort_order = query.sort_by.split(":")
453 sort_order = int(sort_order)
454 else:
455 sort_field = query.sort_by
456 sort_order = 1
457 collection = get_collection(devices_states_database, device_id)
458 if collection is None:
459 total = 0
460 cursor = []
461 else:
462 total = collection.count_documents(req_filter)
463 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
464 if (query.limit is not None) and (query.limit != 0):
465 cursor = cursor.limit(query.limit)
466 for item_dict in cursor:
467 items.append(
468 cls(
469 timestamp=item_dict.get("precise_timestamp"),
470 mode=item_dict.get("mode", None),
471 load=item_dict.get("load", None),
472 tokens=item_dict.get("tokens", Field(default_factory=list)),
473 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
474 )
475 )
476 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
479class SignalSample(TwinPadModel):
480 signal_id: str
481 timestamp: float
482 value: float | int | str | bool | None
483 forced_value: float | int | str | bool | None = None
485 @classmethod
486 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
488 collection = get_signal_collection(signal_id)
489 if collection is None:
490 return None
492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
494 bucket = get_signal_collection(f"system.buckets.{signal_id}")
495 first_bucket = None
496 if bucket is not None:
497 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
498 if first_bucket is not None:
499 sample_data = collection.find_one(
500 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
501 )
502 else:
503 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
505 if sample_data is None:
506 return None
508 timestamp = sample_data["precise_timestamp"]
510 return cls(
511 signal_id=signal_id,
512 timestamp=timestamp,
513 value=sample_data.get("value", None),
514 forced_value=sample_data.get("forced_value", None),
515 )
517 @classmethod
518 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
519 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
521 @classmethod
522 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
523 collection = get_signal_collection(signal_id)
524 if collection is None:
525 return None
527 # Same workaround as above function, very effective to narrow down big sets of data
528 bucket = get_signal_collection(f"system.buckets.{signal_id}")
529 last_bucket = None
530 if bucket is not None:
531 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
532 if last_bucket is not None:
533 sample_data = collection.find_one(
534 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
535 sort={"precise_timestamp": -1},
536 )
537 else:
538 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
540 if sample_data is None:
541 return None
543 timestamp = sample_data["precise_timestamp"]
545 if device is None:
546 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
547 if device is not None and device.last_ping is not None:
548 if timestamp is None:
549 timestamp = device.last_ping
550 else:
551 timestamp = max(timestamp, device.last_ping)
552 return cls(
553 signal_id=signal_id,
554 timestamp=timestamp,
555 value=sample_data.get("value", None),
556 forced_value=sample_data.get("forced_value", None),
557 )
559 @classmethod
560 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
561 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
562 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
565class SignalData(TwinPadModel):
566 signal_id: str
567 forcible: bool = True
568 time_vector: list[float]
569 values: list[float | int | str | None]
570 forced_values: list[float | int | str | None]
572 data_start: float | None = None
573 data_end: float | None = None
575 number_samples: int = 0
576 number_samples_db: int = 0
578 db_query_time: float = 0.0
579 init_time: float = 0.0
580 data_processing_time: float = 0.0
582 phase_id: str | None = None
584 @classmethod
585 def get_from_signal_id(
586 cls,
587 signal_id: str,
588 min_timestamp: float = None,
589 max_timestamp: float = None,
590 window_min_timestamp: float = None,
591 window_max_timestamp: float = None,
592 interpolate_bounds: bool = True,
593 max_documents: int = None,
594 collection=None,
595 ) -> Self:
597 now = time.time()
599 req_signal = {}
600 if min_timestamp is not None:
601 req_signal.setdefault("timestamp", {})
602 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
603 if max_timestamp is not None:
604 req_signal.setdefault("timestamp", {})
605 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
607 if collection is None:
608 collection = get_signal_collection(signal_id)
609 if collection is None:
610 return cls(
611 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
612 )
614 db_req_start = time.time()
616 sort_step = {"$sort": {"precise_timestamp": 1}}
617 number_results = collection.count_documents(req_signal)
619 pipeline = []
620 if req_signal:
621 pipeline.append({"$match": req_signal}) # Filter data if needed
623 pipeline.extend(
624 [
625 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
626 sort_step,
627 ]
628 )
630 if max_documents is not None and max_documents < number_results:
631 unsampling_ratio = math.ceil(number_results / max_documents)
632 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
633 pipeline.extend(
634 [
635 {
636 "$setWindowFields": {
637 "sortBy": {"precise_timestamp": 1},
638 "output": {"index": {"$documentNumber": {}}},
639 }
640 },
641 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
642 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
643 {"$replaceRoot": {"newRoot": "$doc"}},
644 {"$unset": ["index", "group_id"]},
645 {"$sort": {"precise_timestamp": 1}},
646 ]
647 )
649 # logger.info(f"pipeline: %s", str(pipeline))
650 cursor = collection.aggregate(pipeline)
651 db_req_time = time.time() - db_req_start
653 init_time = time.time()
655 results = cursor.to_list()
656 time_vector = []
657 values = []
658 forced_values = []
659 for s in results:
660 time_vector.append(s["precise_timestamp"])
661 values.append(s.get("value", None))
662 forced_values.append(s.get("forced_value", None))
664 signal = Signal.get_from_signal_id(signal_id)
665 if signal is None:
666 return cls(
667 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
668 )
669 class_ = signal.signal_data_class
671 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
672 time_vector, values, forced_values = cls.interpolate_bounds(
673 class_,
674 collection,
675 signal_id,
676 time_vector,
677 values,
678 forced_values,
679 window_min_timestamp,
680 window_max_timestamp,
681 )
683 if values:
684 # TODO: check below. a bit strange
685 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
686 # Adding last value as it should be repeated
687 time_vector.append(now)
688 values.append(values[-1])
689 forced_values.append(forced_values[-1])
691 init_time = time.time() - init_time
693 # See line 292 for explanation
694 bucket = get_signal_collection(f"system.buckets.{signal_id}")
695 first_bucket = None
696 if bucket is not None:
697 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
698 if first_bucket is not None:
699 data_start = first_bucket["control"]["min"]["precise_timestamp"]
700 else:
701 data_start = None
703 last_bucket = None
704 if bucket is not None:
705 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
706 if last_bucket is not None:
707 data_end = last_bucket["control"]["max"]["precise_timestamp"]
708 else:
709 data_end = None
711 return class_(
712 signal_id=signal_id,
713 forcible=signal.forcible,
714 time_vector=time_vector,
715 values=values,
716 forced_values=forced_values,
717 data_start=data_start,
718 data_end=data_end,
719 number_samples=len(values),
720 number_samples_db=number_results,
721 db_query_time=db_req_time,
722 init_time=init_time,
723 )
725 @staticmethod
726 def interpolate_bounds(
727 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
728 ):
729 sample_right = None
730 # Fetching right side value & interpolation
731 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
732 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
733 sample_right = collection.find_one(
734 {
735 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
736 "value": {"$exists": True},
737 },
738 sort={"precise_timestamp": -1},
739 )
740 if sample_right:
741 if time_vector:
742 right_sd = class_(
743 signal_id=signal_id,
744 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
745 values=[values[-1], sample_right.get("value", None)],
746 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
747 )
748 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
749 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
750 else:
751 max_ts_value = sample_right.get("value", None)
752 max_ts_forced_value = sample_right.get("forced_value", None)
753 time_vector.append(window_max_timestamp)
754 values.append(max_ts_value)
755 forced_values.append(max_ts_forced_value)
757 # Fetching left side value & interpolation
758 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
759 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
760 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
761 sample_left = sample_right
762 sample_left = collection.find_one(
763 {
764 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
765 "value": {"$exists": True},
766 },
767 sort={"precise_timestamp": -1},
768 )
770 if sample_left:
771 if time_vector:
772 left_sd = class_(
773 signal_id=signal_id,
774 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
775 values=[sample_left["value"], values[0]],
776 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
777 )
778 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
779 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
780 else:
781 min_ts_value = sample_left.get("value", None)
782 min_ts_forced_value = sample_left.get("forced_value", None)
783 time_vector.insert(0, window_min_timestamp)
784 values.insert(0, min_ts_value)
785 forced_values.insert(0, min_ts_forced_value)
787 return time_vector, values, forced_values
789 def interpolate_values(self, new_time_vector: list[float]):
790 return self.interpolate(new_time_vector, self.values)
792 def interpolate_forced_values(self, new_time_vector: list[float]):
793 return self.interpolate(new_time_vector, self.forced_values)
795 def uniform_desampling(self, number_samples_max: int) -> Self:
796 data_processing_time = time.time()
797 if number_samples_max and self.number_samples > number_samples_max:
798 new_time_vector = npy.linspace(
799 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
800 ).tolist()
801 values = self.interpolate_values(new_time_vector)
802 forced_values = self.interpolate_forced_values(new_time_vector)
803 time_vector = new_time_vector
804 number_samples = len(time_vector)
805 else:
806 time_vector = self.time_vector
807 number_samples = len(self.values)
808 values = self.values[:]
809 forced_values = self.forced_values[:]
810 data_processing_time = time.time() - data_processing_time
812 return self.__class__(
813 signal_id=self.signal_id,
814 time_vector=time_vector,
815 values=values,
816 forced_values=forced_values,
817 number_samples=number_samples,
818 number_samples_db=self.number_samples,
819 data_start=self.data_start,
820 data_end=self.data_end,
821 db_query_time=self.db_query_time,
822 init_time=self.init_time,
823 data_processing_time=self.data_processing_time + data_processing_time,
824 phase_id=self.phase_id,
825 )
827 def min_max_downsampling(self, number_samples_max: int) -> Self:
828 return self.uniform_desampling(number_samples_max)
830 def interest_window_desampling(
831 self,
832 window_max_number_samples: int,
833 outside_max_number_samples: int,
834 window_min_timestamp: float | None = None,
835 window_max_timestamp: float | None = None,
836 ) -> Self:
837 """Performs a sampling in a window of interest and outside."""
839 if not self.time_vector:
840 return self
842 if window_min_timestamp is None:
843 window_min_timestamp = self.time_vector[0]
844 if window_max_timestamp is None:
845 window_max_timestamp = self.time_vector[-1]
847 data_processing_time = time.time()
849 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
850 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
852 time_vector_before = self.time_vector[:index_window_start]
853 time_vector_window = self.time_vector[index_window_start:index_window_end]
854 time_vector_after = self.time_vector[index_window_end:]
856 # Resampling window
857 if time_vector_window:
858 # Ensurring window bounds
859 if time_vector_window[0] != window_min_timestamp:
860 time_vector_window.insert(0, window_min_timestamp)
861 if time_vector_window[-1] != window_max_timestamp:
862 time_vector_window.append(window_max_timestamp)
863 else:
864 time_vector_window = [window_min_timestamp, window_max_timestamp]
866 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
867 # Resampling
868 new_window_time_vector = npy.linspace(
869 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
870 ).tolist()
871 time_vector_window = new_window_time_vector
873 # Resampling outside
874 number_samples_before = len(time_vector_before)
875 number_samples_after = len(time_vector_after)
876 if (
877 outside_max_number_samples is not None
878 and (number_samples_before + number_samples_after) > outside_max_number_samples
879 ):
880 new_number_samples_before = min(
881 number_samples_before,
882 math.ceil(
883 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
884 ),
885 )
886 new_number_samples_after = min(
887 number_samples_after,
888 math.ceil(
889 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
890 ),
891 )
892 # Adjusting numbers as math.ceil can do +1 on sum
893 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
894 if new_number_samples_before > new_number_samples_after:
895 new_number_samples_before -= 1
896 else:
897 new_number_samples_after -= 1
899 if new_number_samples_before > 0:
900 new_time_vector_before = npy.linspace(
901 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
902 ).tolist()
903 time_vector_before = new_time_vector_before
905 if new_number_samples_after > 0:
906 new_time_vector_after = npy.linspace(
907 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
908 ).tolist()[::-1]
909 time_vector_after = new_time_vector_after
911 new_time_vector = time_vector_before + time_vector_window + time_vector_after
912 values = self.interpolate_values(new_time_vector)
913 forced_values = self.interpolate_forced_values(new_time_vector)
914 number_samples = len(values)
916 data_processing_time = time.time() - data_processing_time
918 return self.__class__(
919 signal_id=self.signal_id,
920 forcible=self.forcible,
921 time_vector=new_time_vector,
922 values=values,
923 forced_values=forced_values,
924 number_samples=number_samples,
925 number_samples_db=self.number_samples,
926 data_start=self.data_start,
927 data_end=self.data_end,
928 db_query_time=self.db_query_time,
929 init_time=self.init_time,
930 data_processing_time=self.data_processing_time + data_processing_time,
931 )
933 def zero_time_vector(self, data_start: float):
934 data_processing_time = time.time()
935 if len(self.time_vector) == 0:
936 return self
937 time_vector = npy.array(self.time_vector) - data_start
938 data_processing_time = time.time() - data_processing_time
940 return self.__class__(
941 signal_id=self.signal_id,
942 time_vector=time_vector,
943 values=self.values,
944 forced_values=self.forced_values,
945 number_samples=self.number_samples,
946 number_samples_db=self.number_samples_db,
947 data_start=time_vector[0],
948 data_end=time_vector[-1],
949 db_query_time=self.db_query_time,
950 init_time=self.init_time,
951 data_processing_time=self.data_processing_time + data_processing_time,
952 )
954 def csv_export(self):
955 output = io.StringIO()
956 writer = csv.writer(output)
957 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
958 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
959 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
960 return output.getvalue().encode("utf-8")
962 def prestoplot_export(self):
963 clean_signal_id = self.signal_id.replace(".", "_")
964 if clean_signal_id[0].isnumeric():
965 clean_signal_id = "_" + clean_signal_id
967 output = io.StringIO()
968 output.write("# Encoding:\tUTF-8\n")
969 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
970 output.write("ISO8601\tnone\tnone\n")
971 output.write(f"# Description :\t{clean_signal_id}\n")
973 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
974 output.write(
975 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
976 )
977 return output.getvalue().encode("utf-8")
980class NumericSignalData(SignalData):
981 data_type: str = "float"
982 values: list[float | int | None]
983 forced_values: list[float | int | None]
985 def interpolate(self, new_time_vector: list[float], items):
986 items = [npy.nan if s is None else s for s in items]
987 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
989 def uniform_desampling(self, number_samples_max: int) -> Self:
990 data_processing_time = time.time()
991 if number_samples_max and self.number_samples > number_samples_max:
992 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
993 forced_values = self.interpolate_forced_values(time_vector)
994 number_samples = len(time_vector)
995 else:
996 time_vector = self.time_vector
997 number_samples = len(self.values)
998 values = self.values[:]
999 forced_values = self.forced_values[:]
1000 data_processing_time = time.time() - data_processing_time
1002 return self.__class__(
1003 signal_id=self.signal_id,
1004 time_vector=time_vector,
1005 values=values,
1006 forced_values=forced_values,
1007 number_samples=number_samples,
1008 number_samples_db=self.number_samples,
1009 data_start=self.data_start,
1010 data_end=self.data_end,
1011 db_query_time=self.db_query_time,
1012 init_time=self.init_time,
1013 data_processing_time=self.data_processing_time + data_processing_time,
1014 )
1016 def min_max_downsampling(self, number_samples_max: int) -> Self:
1017 if self.number_samples < number_samples_max:
1018 return self
1020 data_processing_time = time.time()
1022 number_bins = number_samples_max // 2
1024 time_vector = npy.array(self.time_vector, dtype=npy.float64)
1025 values = npy.array(self.values, dtype=npy.float64)
1026 forced_values = npy.array(self.forced_values, dtype=npy.float64)
1028 points_per_bin = self.number_samples // number_bins
1030 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above
1031 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for
1032 if self.number_samples - number_bins * points_per_bin > 1:
1033 points_per_bin += 1
1034 number_bins = self.number_samples // points_per_bin + 1
1035 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan)
1036 time_vector = npy.concatenate([time_vector, nan_points_to_add])
1037 values = npy.concatenate([values, nan_points_to_add])
1038 forced_values = npy.concatenate([forced_values, nan_points_to_add])
1040 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin)
1041 values_matrix = values.reshape(number_bins, points_per_bin)
1042 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin)
1044 indexes_min = npy.zeros(number_bins, dtype="int64")
1045 indexes_max = npy.zeros(number_bins, dtype="int64")
1047 for row in range(number_bins):
1048 min_value = values_matrix[row, 0]
1049 max_value = values_matrix[row, 0]
1050 for column in range(points_per_bin):
1051 if values_matrix[row, column] < min_value:
1052 min_value = values_matrix[row, column]
1053 indexes_min[row] = column
1054 elif values_matrix[row, column] > max_value:
1055 max_value = values_matrix[row, column]
1056 indexes_max[row] = column
1058 row_index = npy.repeat(npy.arange(number_bins), 2)
1059 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel()
1061 data_processing_time = time.time() - data_processing_time
1063 new_time_vector = timestamps_matrix[row_index, column_index]
1064 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector)
1065 new_values = values_matrix[row_index, column_index]
1066 new_values = npy.where(npy.isnan(new_values), None, new_values)
1067 new_forced_values = forced_values_matrix[row_index, column_index]
1068 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1070 # Make sure there are no None values for the time vector
1071 time_vector_filter = new_time_vector != None
1072 new_time_vector = new_time_vector[time_vector_filter]
1073 new_values = new_values[time_vector_filter]
1074 new_forced_values = new_forced_values[time_vector_filter]
1076 return self.__class__(
1077 signal_id=self.signal_id,
1078 time_vector=new_time_vector,
1079 values=new_values,
1080 forced_values=new_forced_values,
1081 number_samples=number_bins * 2,
1082 number_samples_db=self.number_samples_db,
1083 data_start=self.data_start,
1084 data_end=self.data_end,
1085 db_query_time=self.db_query_time,
1086 init_time=self.init_time,
1087 data_processing_time=self.data_processing_time + data_processing_time,
1088 phase_id=self.phase_id,
1089 )
1091 def interest_window_desampling(
1092 self,
1093 window_max_number_samples: int,
1094 outside_max_number_samples: int,
1095 window_min_timestamp: float | None = None,
1096 window_max_timestamp: float | None = None,
1097 ) -> Self:
1098 """Performs a sampling in a window of interest and outside."""
1100 if not self.time_vector:
1101 return self
1103 if window_min_timestamp is None:
1104 window_min_timestamp = self.time_vector[0]
1105 if window_max_timestamp is None:
1106 window_max_timestamp = self.time_vector[-1]
1108 data_processing_time = time.time()
1110 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1111 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1113 time_vector_before = self.time_vector[:index_window_start]
1114 time_vector_window = self.time_vector[index_window_start:index_window_end]
1115 time_vector_after = self.time_vector[index_window_end:]
1117 values_before = self.values[:index_window_start]
1118 values_window = self.values[index_window_start:index_window_end]
1119 values_after = self.values[index_window_end:]
1120 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1121 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1123 # Resampling window
1124 if time_vector_window:
1125 # Ensurring window bounds
1126 if time_vector_window[0] != window_min_timestamp:
1127 time_vector_window.insert(0, window_min_timestamp)
1128 values_window.insert(0, window_min_value)
1129 if time_vector_window[-1] != window_max_timestamp:
1130 time_vector_window.append(window_max_timestamp)
1131 values_window.append(window_max_value)
1132 else:
1133 time_vector_window = [window_min_timestamp, window_max_timestamp]
1134 values_window = [window_min_value, window_max_value]
1136 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
1137 # Resampling
1138 time_vector_window, values_window = downsample_list(
1139 time_vector_window, values_window, window_max_number_samples
1140 )
1142 # Resampling outside
1143 number_samples_before = len(time_vector_before)
1144 number_samples_after = len(time_vector_after)
1145 if (
1146 outside_max_number_samples is not None
1147 and (number_samples_before + number_samples_after) > outside_max_number_samples
1148 ):
1149 new_number_samples_before = min(
1150 number_samples_before,
1151 math.ceil(
1152 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1153 ),
1154 )
1155 new_number_samples_after = min(
1156 number_samples_after,
1157 math.ceil(
1158 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1159 ),
1160 )
1161 # Adjusting numbers as math.ceil can do +1 on sum
1162 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1163 if new_number_samples_before > new_number_samples_after:
1164 new_number_samples_before -= 1
1165 else:
1166 new_number_samples_after -= 1
1168 if new_number_samples_before > 0:
1169 time_vector_before, values_before = downsample_list(
1170 time_vector_before, values_before, new_number_samples_before
1171 )
1173 if new_number_samples_after > 0:
1174 time_vector_after, values_after = downsample_list(
1175 time_vector_after, values_after, new_number_samples_after
1176 )
1178 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1179 values = values_before + values_window + values_after
1180 forced_values = self.interpolate_forced_values(new_time_vector)
1181 number_samples = len(values)
1183 data_processing_time = time.time() - data_processing_time
1185 return self.__class__(
1186 signal_id=self.signal_id,
1187 time_vector=new_time_vector,
1188 values=values,
1189 forced_values=forced_values,
1190 number_samples=number_samples,
1191 number_samples_db=self.number_samples,
1192 data_start=self.data_start,
1193 data_end=self.data_end,
1194 db_query_time=self.db_query_time,
1195 init_time=self.init_time,
1196 data_processing_time=self.data_processing_time + data_processing_time,
1197 )
1200class StringSignalData(SignalData):
1201 data_type: str = "str"
1202 values: list[str | None]
1203 forced_values: list[str | None]
1205 def interpolate(self, new_time_vector: list[float], items):
1206 # Find the indices of the values in xp that are just smaller or equal to x
1207 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1208 indices = npy.clip(indices, 0, len(items) - 1)
1209 # Return the corresponding left string values from fp
1210 return [items[i] for i in indices]
1213class SignalsData(TwinPadModel):
1214 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1215 data_processing_time: float
1216 data_start: float | None
1217 data_end: float | None
1219 @classmethod
1220 def get_from_signal_ids(
1221 cls,
1222 signal_ids: list[str],
1223 min_timestamp: float = None,
1224 max_timestamp: float = None,
1225 window_min_timestamp: float = None,
1226 window_max_timestamp: float = None,
1227 interpolate_bounds: bool = True,
1228 max_documents: int = None,
1229 ) -> Self:
1230 signals_data = []
1231 data_start = None
1232 data_end = None
1233 if max_timestamp is None:
1234 max_timestamp = time.time()
1235 data_processing_time = 0.0
1237 signal_collections = get_signal_collections_batch(signal_ids)
1239 for signal_id, collection in zip(signal_ids, signal_collections):
1240 signal_data = SignalData.get_from_signal_id(
1241 signal_id=signal_id,
1242 min_timestamp=min_timestamp,
1243 max_timestamp=max_timestamp,
1244 window_min_timestamp=window_min_timestamp,
1245 window_max_timestamp=window_max_timestamp,
1246 interpolate_bounds=interpolate_bounds,
1247 max_documents=max_documents,
1248 collection=collection,
1249 )
1250 data_processing_time += signal_data.data_processing_time
1251 signals_data.append(signal_data)
1252 if signal_data.data_start is not None:
1253 if data_start is None:
1254 data_start = signal_data.data_start
1255 else:
1256 data_start = min(signal_data.data_start, data_start)
1257 if signal_data.data_end is not None:
1258 if data_end is None:
1259 data_end = signal_data.data_end
1260 else:
1261 data_end = max(signal_data.data_end, data_end)
1263 return cls(
1264 signals_data=signals_data,
1265 data_processing_time=data_processing_time,
1266 data_start=data_start,
1267 data_end=data_end,
1268 )
1270 @classmethod
1271 def get_from_phase_and_signal_ids(
1272 cls,
1273 phases: list,
1274 phase_sync_times: list[float | None],
1275 signal_ids: list[str],
1276 window_min_timestamps: list[float | None],
1277 window_max_timestamps: list[float | None],
1278 zero_time_vector: bool = True,
1279 ):
1280 signals_data: list[SignalData] = []
1281 computation_start = time.time()
1283 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip(
1284 phases, phase_sync_times, window_min_timestamps, window_max_timestamps
1285 ):
1286 min_timestamp = phase.start_at / 1000
1287 max_timestamp = phase.end_at / 1000
1289 if sync_time is None:
1290 sync_time = min_timestamp
1292 if window_max_timestamp is not None and window_min_timestamp is not None:
1293 window_length = window_max_timestamp - window_min_timestamp
1295 if window_min_timestamp != min_timestamp:
1296 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20)
1297 if window_max_timestamp != max_timestamp:
1298 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20)
1300 signal_collections = get_signal_collections_batch(signal_ids)
1302 for signal_id, collection in zip(signal_ids, signal_collections):
1303 signal_data = SignalData.get_from_signal_id(
1304 signal_id,
1305 min_timestamp,
1306 max_timestamp,
1307 window_min_timestamp,
1308 window_max_timestamp,
1309 interpolate_bounds=False,
1310 max_documents=None,
1311 collection=collection,
1312 )
1314 if len(signal_data.time_vector) == 0:
1315 continue
1317 if zero_time_vector:
1318 signal_data = signal_data.zero_time_vector(sync_time)
1319 signal_data.phase_id = phase.id
1321 signals_data.append(signal_data)
1323 return cls(
1324 signals_data=signals_data,
1325 data_processing_time=time.time() - computation_start,
1326 data_start=0,
1327 data_end=0,
1328 )
1330 def uniform_desampling(self, number_samples_max: int) -> Self:
1331 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1332 return SignalsData(
1333 signals_data=signals_data,
1334 data_processing_time=sum(s.data_processing_time for s in signals_data),
1335 data_start=self.data_start,
1336 data_end=self.data_end,
1337 )
1339 def min_max_downsampling(self, number_samples_max: int) -> Self:
1340 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data]
1341 return SignalsData(
1342 signals_data=signals_data,
1343 data_processing_time=sum(s.data_processing_time for s in signals_data),
1344 data_start=self.data_start,
1345 data_end=self.data_end,
1346 )
1348 def interest_window_desampling(
1349 self,
1350 window_max_number_samples: int,
1351 outside_max_number_samples: int,
1352 window_min_timestamp: float = None,
1353 window_max_timestamp: float = None,
1354 ) -> Self:
1355 signals_data = [
1356 s.interest_window_desampling(
1357 window_max_number_samples=window_max_number_samples,
1358 outside_max_number_samples=outside_max_number_samples,
1359 window_min_timestamp=window_min_timestamp,
1360 window_max_timestamp=window_max_timestamp,
1361 )
1362 for s in self.signals_data
1363 ]
1365 return SignalsData(
1366 signals_data=signals_data,
1367 data_processing_time=sum(s.data_processing_time for s in signals_data),
1368 data_start=self.data_start,
1369 data_end=self.data_end,
1370 )
1372 def zero_time_vector(self, data_start: float):
1373 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data]
1374 return SignalsData(
1375 signals_data=signals_data,
1376 data_processing_time=sum(s.data_processing_time for s in signals_data),
1377 data_start=0,
1378 data_end=max([s.data_end for s in signals_data]),
1379 )
1381 @classmethod
1382 async def apply_single_function(
1383 cls,
1384 phase,
1385 base_signal_id: str,
1386 function: SINGLE_POST_PROCESSING_FUNCTION,
1387 window_min_timestamp: float = None,
1388 window_max_timestamp: float = None,
1389 ):
1390 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}"
1392 processed_result_signal = Signal.get_from_signal_id(signal_id)
1393 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids:
1394 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp)
1396 signals_data = cls.get_from_phase_and_signal_ids(
1397 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False
1398 )
1400 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0:
1401 return None
1403 new_values = None
1404 new_forced_values = None
1405 time_vector = npy.array(signals_data.signals_data[0].time_vector)
1406 values = signals_data.signals_data[0].values
1407 forced_values = signals_data.signals_data[0].forced_values
1409 match (function):
1410 case "Cumul":
1411 new_values = cumul(values)
1412 new_forced_values = cumul(forced_values)
1413 # case "CumulDistrib":
1414 # new_values = cumul_distrib(values)
1415 # new_forced_values = cumul_distrib(forced_values)
1416 case "Delta":
1417 new_values = delta(values)
1418 new_forced_values = delta(forced_values)
1419 case "DeltaT":
1420 new_values = delta(time_vector)
1421 new_forced_values = new_values
1422 case "Derive":
1423 new_values = derive(time_vector, values)
1424 new_forced_values = derive(time_vector, forced_values)
1425 case "Integ":
1426 new_values = integ(time_vector, values)
1427 new_forced_values = integ(time_vector, forced_values)
1429 new_values = npy.where(npy.isnan(new_values), None, new_values)
1430 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1432 loop = asyncio.get_running_loop()
1433 loop.create_task(
1434 cls.save_function_signal(
1435 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible
1436 )
1437 )
1439 if window_max_timestamp is not None:
1440 max_timestamp_mask = time_vector <= window_max_timestamp
1441 time_vector = time_vector[max_timestamp_mask]
1442 new_values = new_values[max_timestamp_mask]
1443 new_forced_values = new_forced_values[max_timestamp_mask]
1444 if window_min_timestamp is not None:
1445 min_timestamp_mask = time_vector >= window_min_timestamp
1446 time_vector = time_vector[min_timestamp_mask]
1447 new_values = new_values[min_timestamp_mask]
1448 new_forced_values = new_forced_values[min_timestamp_mask]
1450 signals_data.signals_data[0].time_vector = time_vector.tolist()
1451 signals_data.signals_data[0].values = new_values.tolist()
1452 signals_data.signals_data[0].forced_values = new_forced_values.tolist()
1453 signals_data.signals_data[0].number_samples = time_vector.size
1455 signals_data.signals_data[0].signal_id = signal_id
1457 return signals_data
1459 @classmethod
1460 async def apply_multiple_function(
1461 cls,
1462 phases: list,
1463 signal_ids: list,
1464 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
1465 window_min_timestamp: float = None,
1466 window_max_timestamp: float = None,
1467 ):
1468 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION):
1469 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}"
1470 else:
1471 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}"
1473 active_phase = phases[0]
1474 if function in {"Align-X", "Using-X"}:
1475 active_phase = phases[1]
1477 processed_result_signal = Signal.get_from_signal_id(function_signal_id)
1478 if processed_result_signal is not None and (
1479 active_phase.id in processed_result_signal.computed_phases_ids
1480 ): # If signal has been computed for the correct phase
1481 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp)
1483 array_length = None
1484 time_vector_list = []
1485 values_list = []
1486 forced_values_list = []
1487 forcible = True
1488 for phase, signal_id in zip(phases, signal_ids):
1489 signals_data = cls.get_from_phase_and_signal_ids(
1490 [phase], [None], [signal_id], [None], [None], zero_time_vector=False
1491 )
1493 if len(signals_data.signals_data) == 0:
1494 return None
1496 signal_data = signals_data.signals_data[0]
1498 if array_length is None:
1499 array_length = signal_data.number_samples
1500 if (
1501 array_length != signal_data.number_samples and function != "Align-X"
1502 ) or signal_data.number_samples == 0:
1503 return None
1505 time_vector_list.append(npy.array(signal_data.time_vector))
1506 values_list.append(npy.array(signal_data.values, dtype=npy.float64))
1507 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64))
1508 forcible = forcible and signal_data.forcible
1510 time_vector = time_vector_list[0]
1511 new_values = None
1512 new_forced_values = None
1514 match (function):
1515 case "Align-X":
1516 time_vector = time_vector_list[1]
1517 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000
1518 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000
1519 new_values = align_x(old_time_vector, values_list[0], new_time_vector)
1520 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector)
1521 # case "Atan2":
1522 # new_values = atan2(values_list[0], values_list[1])
1523 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1])
1524 case "Using-X":
1525 if len(time_vector_list[0]) != len(time_vector_list[1]):
1526 return None
1527 time_vector = time_vector_list[1]
1528 new_values = values_list[0]
1529 new_forced_values = forced_values_list[0]
1530 case "Mean":
1531 new_values = mean(*values_list)
1532 new_forced_values = mean(*forced_values_list)
1533 case "Norm":
1534 new_values = norm(*values_list)
1535 new_forced_values = norm(*forced_values_list)
1537 new_values = npy.where(npy.isnan(new_values), None, new_values)
1538 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1540 loop = asyncio.get_running_loop()
1541 loop.create_task(
1542 cls.save_function_signal(
1543 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible
1544 )
1545 )
1547 total_number_samples = time_vector.size
1549 if window_max_timestamp is not None:
1550 max_timestamp_mask = time_vector <= window_max_timestamp
1551 time_vector = time_vector[max_timestamp_mask]
1552 new_values = new_values[max_timestamp_mask]
1553 new_forced_values = new_forced_values[max_timestamp_mask]
1554 if window_min_timestamp is not None:
1555 min_timestamp_mask = time_vector >= window_min_timestamp
1556 time_vector = time_vector[min_timestamp_mask]
1557 new_values = new_values[min_timestamp_mask]
1558 new_forced_values = new_forced_values[min_timestamp_mask]
1560 signals_data = cls(
1561 signals_data=[
1562 NumericSignalData(
1563 signal_id=function_signal_id,
1564 forcible=forcible,
1565 time_vector=time_vector.tolist(),
1566 values=new_values.tolist(),
1567 forced_values=new_forced_values.tolist(),
1568 number_samples=time_vector.size,
1569 number_samples_db=total_number_samples,
1570 )
1571 ],
1572 data_processing_time=0,
1573 data_start=0,
1574 data_end=0,
1575 )
1577 return signals_data
1579 @classmethod
1580 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float):
1581 signal_data_collection = get_signal_collection(signal_id, create=True)
1582 pipeline = []
1583 match_filter = {}
1584 if window_min_timestamp is not None or window_max_timestamp is not None:
1585 match_filter["$match"] = {}
1586 match_filter["$match"]["precise_timestamp"] = {}
1587 if window_max_timestamp is not None:
1588 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp
1589 if window_min_timestamp is not None:
1590 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp
1592 total_number_samples = signal_data_collection.count_documents({})
1594 if match_filter:
1595 pipeline.append(match_filter)
1597 fetch_start = time.time()
1599 samples = signal_data_collection.aggregate(pipeline).to_list()
1600 new_time_vector = []
1601 new_values = []
1602 new_forced_values = []
1603 for sample in samples:
1604 new_time_vector.append(sample["precise_timestamp"])
1605 new_values.append(sample["value"])
1606 new_forced_values.append(sample["forced_value"])
1608 return cls(
1609 signals_data=[
1610 NumericSignalData(
1611 signal_id=signal_id,
1612 time_vector=new_time_vector,
1613 values=new_values,
1614 forced_values=new_forced_values,
1615 number_samples=len(new_time_vector),
1616 number_samples_db=total_number_samples,
1617 )
1618 ],
1619 data_processing_time=time.time() - fetch_start,
1620 data_start=0,
1621 data_end=0,
1622 )
1624 @classmethod
1625 async def save_function_signal(
1626 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool
1627 ):
1628 # Insert data first so if it is requested by another user, it will be computed again
1629 signal_collection = get_signal_collection(function_signal_id, create=True)
1630 signal_collection.delete_many(
1631 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}}
1632 )
1633 signal_collection.insert_many(
1634 [
1635 {
1636 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]),
1637 "precise_timestamp": time_vector[i],
1638 "value": new_values[i],
1639 "forced_value": new_forced_values[i],
1640 }
1641 for i in range(len(time_vector))
1642 ]
1643 )
1645 signals_config_collection = get_collection(systems_database, "signals", create=True)
1646 signals_config_collection.find_one_and_update(
1647 {"signal_id": function_signal_id},
1648 {
1649 "$set": {
1650 "description": "",
1651 "unit": None,
1652 "type": "sensor",
1653 "address": None,
1654 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0
1655 "transfer_function": None,
1656 "precision_digits": None,
1657 "digitization_function": None,
1658 "data_type": "float",
1659 "formula": None,
1660 "forcible": forcible,
1661 "commandable": False,
1662 "broadcastable": True,
1663 "signal_id": function_signal_id,
1664 "post_processing": True,
1665 },
1666 "$push": {"computed_phases_ids": phase.id},
1667 },
1668 upsert=True,
1669 )
1671 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []):
1672 if post_processing:
1673 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1674 zip_buffer = io.BytesIO()
1675 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1676 for signal_data in self.signals_data:
1677 file_name = signal_data.signal_id
1678 if post_processing:
1679 phase = phases_by_id.get(
1680 signal_data.phase_id,
1681 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1682 )
1683 file_name = f"{signal_data.signal_id} ({phase.name})"
1684 if file_format == "csv":
1685 export_io = signal_data.csv_export()
1686 zip_file.writestr(f"{file_name}.csv", export_io)
1687 elif file_format == "prestoplot":
1688 export_io = signal_data.prestoplot_export()
1689 zip_file.writestr(f"{file_name}.tab", export_io)
1690 else:
1691 raise ValueError(f"Format not found. Got: {file_format}")
1692 zip_bytes = zip_buffer.getvalue()
1693 return zip_bytes
1695 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []):
1696 if post_processing:
1697 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1698 hdf5_buffer = io.BytesIO()
1699 custom_type_float = npy.dtype(
1700 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1701 )
1702 custom_type_string = npy.dtype(
1703 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1704 )
1705 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1706 for signal_data in self.signals_data:
1707 if post_processing:
1708 phase = phases_by_id.get(
1709 signal_data.phase_id,
1710 Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="000000000000000000000000"),
1711 )
1712 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})")
1713 else:
1714 signal_group = hdf5_file.create_group(signal_data.signal_id)
1715 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1716 if signal_data.data_type == "str":
1717 export_data = npy.array(
1718 list(
1719 zip(
1720 date_vector,
1721 signal_data.time_vector,
1722 signal_data.values,
1723 signal_data.forced_values,
1724 )
1725 ),
1726 dtype=custom_type_string,
1727 )
1728 else:
1729 export_data = npy.array(
1730 list(
1731 zip(
1732 date_vector,
1733 signal_data.time_vector,
1734 signal_data.values,
1735 signal_data.forced_values,
1736 )
1737 ),
1738 dtype=custom_type_float,
1739 )
1740 signal_group["data"] = export_data
1741 return hdf5_buffer.getvalue()
1744class SignalStatus(TwinPadModel):
1745 status: str = "down"
1746 reason: str = ""
1747 delay: float | None = None
1750class DigitizationFunction(TwinPadModel):
1751 bits: int | None = None
1752 min_value: float
1753 max_value: float
1754 min_raw_value: float
1755 max_raw_value: float
1758class SignalUpdate(TwinPadModel):
1759 value: float | str | bool | int | None = None
1760 forced_value: float | str | bool | int | None = None
1761 timestamp: int | None = None
1764class SignalType(str, Enum):
1765 command = "command"
1766 sensor = "sensor"
1767 external_sensor = "external_sensor"
1770SIGNALDATA_TYPES = {
1771 "int": NumericSignalData,
1772 "float": NumericSignalData,
1773 "str": StringSignalData,
1774 "bool": NumericSignalData,
1775 "epoch": NumericSignalData,
1776}
1779class Signal(GenericMongo):
1780 collection_name: ClassVar[str] = "signals"
1782 signal_id: str
1783 frequency: float
1784 unit: str | None
1785 description: str
1786 type: SignalType
1787 data_type: str
1788 precision_digits: int | None
1789 forcible: bool
1790 commandable: bool
1791 broadcastable: bool
1792 status: SignalStatus = SignalStatus()
1794 post_processing: bool = False
1795 computed_phases_ids: list[str] = []
1797 digitization_function: DigitizationFunction | None
1799 @property
1800 def device(self) -> Device:
1801 device_id = self.signal_id.split(".")[0]
1802 device = Device.get_one_by_attribute("device_id", device_id)
1803 return device
1805 @cached_property
1806 def signal_data_class(self):
1807 if self.data_type in SIGNALDATA_TYPES:
1808 return SIGNALDATA_TYPES[self.data_type]
1809 if self.data_type.startswith("enum"):
1810 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1811 raise ValueError(f"Unhandled python type: {self.data_type}")
1813 @cached_property
1814 def python_type(self):
1815 if self.data_type in TYPES:
1816 return TYPES[self.data_type]
1817 if self.data_type.startswith("enum"):
1818 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1819 return Literal[*choices]
1820 raise ValueError(f"Unhandled python type: {self.data_type}")
1822 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1823 command = Command(
1824 sent_at=time.time(),
1825 command_type="Signal command",
1826 user_id=current_user.id,
1827 )
1829 has_input_error = False
1830 error_message = ""
1832 if self.data_type.startswith("enum"):
1833 enum_options = get_args(self.python_type)
1835 if update_dict.value is not None and update_dict.value not in enum_options:
1836 has_input_error = True
1837 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1838 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1839 has_input_error = True
1840 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1841 else:
1842 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1843 has_input_error = True
1844 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1845 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1846 has_input_error = True
1847 error_message += (
1848 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1849 )
1851 if has_input_error:
1852 command.response_time = 0
1853 command.succeeded = False
1854 command.description = f"Tried to modify signal {self.signal_id}"
1855 response = {"error": True, "status_code": 400, "message": error_message}
1856 else:
1857 response = await RabbitMQClient().send_signal_value(self.signal_id, update_dict)
1858 command.receive_response(response)
1860 Command.create(command)
1861 return response
1863 @classmethod
1864 def get_from_signal_id(cls, signal_id) -> Self:
1865 """Could be generic from mongo"""
1866 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1867 if not raw_value:
1868 return None
1869 del raw_value["_id"]
1870 return cls.dict_to_object(raw_value)
1872 @classmethod
1873 def get_all_ids(cls) -> list[str]:
1874 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1876 return [signal["signal_id"] for signal in cursor]
1878 @classmethod
1879 def get_all_statuses(cls) -> list[dict]:
1880 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}])
1882 return [
1883 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]}
1884 for signal in cursor
1885 ]
1887 async def number_samples(self):
1888 collection = get_signal_collection(signal_id=self.signal_id)
1889 if collection is None:
1890 return 0
1892 number_samples = collection.estimated_document_count()
1894 number_samples_async_collection = await get_async_collection(
1895 systems_async_database, "number_samples", create=True, time_series=True
1896 )
1898 loop = asyncio.get_running_loop()
1899 loop.create_task(
1900 number_samples_async_collection.insert_one(
1901 {
1902 "timestamp": datetime.datetime.now(pytz.UTC),
1903 "signal_id": self.signal_id,
1904 "number_samples": number_samples,
1905 }
1906 )
1907 )
1909 return number_samples
1911 @classmethod
1912 def total_number_samples(cls) -> int:
1913 TwinPadActivity.get_number_samples_timeframe(0, 0, False)
1914 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
1916 if number_samples_collection is None:
1917 return 0
1919 result = number_samples_collection.aggregate(
1920 [{"$group": {"_id": "", "amount": {"$sum": "$amount"}}}, {"$project": {"_id": 0, "amount": 1}}]
1921 )
1923 result = result.to_list()
1924 if len(result) == 0:
1925 return 0
1926 return result[0]["amount"]
1928 def sample_datasize(self):
1929 return signals_database.command("collstats", self.signal_id)["size"]
1931 @classmethod
1932 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1933 result = cls.collection().aggregate(
1934 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1935 )
1937 return {signal["signal_id"]: signal["forcible"] for signal in result}
1940class ForcedSignal(GenericMongo):
1941 collection_name: ClassVar[str] = "forced_signals"
1943 signal_id: str
1944 forcing_user_id: str
1945 forced_at: float
1946 value: str | float
1948 def insert(self):
1949 insert_result = self.collection().find_one_and_update(
1950 {"signal_id": self.signal_id},
1951 {"$set": self.to_dict(exclude={"id"})},
1952 upsert=True,
1953 return_document=ReturnDocument.AFTER,
1954 )
1955 self.id = str(insert_result["_id"])
1956 return self.id
1958 @classmethod
1959 def can_force(cls, signal_id: str, current_user: User) -> bool:
1960 """Checks whether user can force a given signal.
1962 :param signal_id: Signal ID of the signal to force
1963 :type signal_id: str
1964 :param current_user: Current user
1965 :type current_user: User
1966 :return: False if the signal was forced by someone else than the user, True otherwise
1967 :rtype: bool
1968 """
1969 forced_signal = cls.get_one_by_attribute("signal_id", signal_id)
1970 if forced_signal is not None:
1971 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
1972 return False
1973 return True
1976class ServicesStatus(TwinPadModel):
1977 backend: str
1978 cloud_broker: str
1979 time_series_database: str
1980 signal_storage: str
1981 heartbeat_storage: str
1982 data_analyzer: str
1984 @classmethod
1985 def check(cls) -> Self:
1986 return cls(
1987 cloud_broker=ping(RABBITMQ_HOST),
1988 backend="up",
1989 time_series_database=ping(MONGO_HOST),
1990 signal_storage=ping(SIGNAL_STORAGE_HOST),
1991 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1992 data_analyzer=ping(DATA_ANALYZER_HOST),
1993 )
1996def ping(host):
1997 try:
1998 if ping3.ping(host, timeout=0.8):
1999 return "up"
2000 except PermissionError:
2001 pass
2002 return "down"
2005class Event(GenericMongo):
2006 collection_name: ClassVar[str] = "events"
2008 name: str
2009 timestamp: float
2010 event_rule_id: str
2012 @computed_field
2013 @cached_property
2014 def event_rule(self) -> "EventRule":
2015 return EventRule.get_from_id(self.event_rule_id)
2017 @classmethod
2018 def dict_to_object(cls, dict_):
2019 """Refine to convert timestamp to datetime for mongodb."""
2020 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
2021 return super().dict_to_object(dict_)
2024class TwinPadActivity(GenericMongo):
2025 timestamp: float
2026 amount: int
2028 @classmethod
2029 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
2030 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2031 number_events_collection = get_collection(systems_database, "number_events")
2032 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
2033 items = []
2034 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2035 if number_events_collection is None or recompute_amount:
2036 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
2037 number_events_collection.delete_many({})
2038 first_event = events_collection.find_one(sort={"timestamp": 1})
2039 if first_event is None:
2040 return items
2041 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
2042 tzinfo=pytz.UTC
2043 )
2044 while last_computed_day < TODAY:
2045 day_nb_events = events_collection.count_documents(
2046 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2047 )
2048 if day_nb_events > 0:
2049 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
2050 last_computed_day += ONE_DAY_OFFSET
2051 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
2052 if number_events_today > 0:
2053 number_events_collection.delete_many({"timestamp": TODAY})
2054 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
2055 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2056 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2057 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2058 for day in number_events:
2059 day["timestamp"] = day["timestamp"].timestamp()
2060 items.append(cls.mongo_dict_to_object(day))
2061 return items
2063 @classmethod
2064 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2065 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2066 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
2067 signals_number_samples_collection = get_collection(
2068 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
2069 )
2070 items = []
2071 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2072 if number_samples_collection is None or recompute_amount:
2073 number_samples_collection = get_collection(
2074 systems_database, "number_received_samples", create=True, time_series=True
2075 )
2076 number_samples_collection.delete_many({})
2077 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
2078 if first_sample is None:
2079 return items
2080 # compute from day of first found event
2081 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
2082 tzinfo=pytz.UTC
2083 )
2084 while last_computed_day < TODAY:
2085 number_samples_request = signals_number_samples_collection.aggregate(
2086 [
2087 {
2088 "$match": {
2089 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
2090 }
2091 },
2092 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2093 ]
2094 ).to_list()
2095 if len(number_samples_request) == 0:
2096 number_samples = 0
2097 else:
2098 number_samples = number_samples_request[0].get("number_samples", 0)
2099 if number_samples > 0:
2100 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
2101 last_computed_day += ONE_DAY_OFFSET
2102 number_samples_request = signals_number_samples_collection.aggregate(
2103 [
2104 {"$match": {"timestamp": {"$gte": TODAY}}},
2105 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2106 ]
2107 ).to_list()
2108 if len(number_samples_request) == 0:
2109 number_samples_today = 0
2110 else:
2111 number_samples_today = number_samples_request[0].get("number_samples", 0)
2112 if number_samples_today > 0:
2113 number_samples_collection.delete_many({"timestamp": TODAY})
2114 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
2115 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2116 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2117 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2118 for day in number_events:
2119 day["timestamp"] = day["timestamp"].timestamp()
2120 items.append(cls.mongo_dict_to_object(day))
2121 return items
2123 @classmethod
2124 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2125 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2126 number_commands_collection = get_collection(systems_database, "number_commands")
2127 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
2128 items = []
2129 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2130 if number_commands_collection is None or recompute_amount:
2131 number_commands_collection = get_collection(
2132 systems_database, "number_commands", create=True, time_series=True
2133 )
2134 number_commands_collection.delete_many({})
2135 first_command = commands_collection.find_one(sort={"timestamp": 1})
2136 if first_command is None:
2137 return items
2138 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
2139 tzinfo=pytz.UTC
2140 )
2141 while last_computed_day < TODAY:
2142 day_nb_commands = commands_collection.count_documents(
2143 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2144 )
2145 if day_nb_commands > 0:
2146 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
2147 last_computed_day += ONE_DAY_OFFSET
2148 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
2149 if number_commands_today > 0:
2150 number_commands_collection.delete_many({"timestamp": TODAY})
2151 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
2152 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2153 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2154 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2155 for day in number_commands:
2156 day["timestamp"] = day["timestamp"].timestamp()
2157 items.append(cls.mongo_dict_to_object(day))
2158 return items
2161class EventRule(GenericMongo):
2162 collection_name: ClassVar[str] = "event_rules"
2164 name: str
2165 formula: str
2166 variables: list[str]
2168 @computed_field
2169 @cached_property
2170 def number_events(self) -> int:
2171 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
2174class Company(GenericMongo):
2175 collection_name: ClassVar[str] = "companies"
2176 name: str
2179class Campaign(GenericMongo):
2180 collection_name: ClassVar[str] = "campaigns"
2182 # Properties
2183 id: str | None = None
2184 name: str
2185 description: str | None = None
2188class Phase(GenericMongo):
2189 collection_name: ClassVar[str] = "phases"
2191 # Properties
2192 id: str | None = None
2193 name: str
2194 description: str | None = None
2195 start_at: float
2196 end_at: float
2198 # FK
2199 campaign_id: MongoId
2201 @classmethod
2202 def deleteMany(cls, campaign_id):
2203 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
2204 return delete_phases
2207class CustomViewCreation(GenericMongo):
2208 collection_name: ClassVar[str] = "custom_views"
2210 name: str
2211 configuration: list
2214class CustomView(CustomViewCreation):
2215 # Properties
2216 id: str | None = None
2218 # Foreign Key
2219 user_id: str
2222CustomViewUpdate = create_update_model(CustomView)
2225class Video(GenericMongo):
2226 collection_name: ClassVar[str] = "videos"
2228 # Properties
2229 name: str
2230 ip_addr: str
2231 username: str | None = None
2232 password: str | None = None
2234 # Methods
2235 @classmethod
2236 def get_all(cls, sort_by="_id") -> list[Self]:
2237 items = []
2238 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
2239 items.append(cls.mongo_dict_to_object(dict_))
2240 return items
2242 @classmethod
2243 def get_video(cls, camera_id: ObjectId):
2244 camera = cls.get_from_id(camera_id)
2245 if camera is not None:
2246 return camera.name
2247 return None
2250class Command(GenericMongo):
2251 collection_name: ClassVar[str] = "commands"
2253 # Properties
2254 timestamp: datetime.datetime = None
2255 sent_at: float
2256 response_time: float = 0.0
2257 command_type: str
2258 description: str = ""
2259 succeeded: bool = False
2261 # Foreign key
2262 user_id: str
2264 @classmethod
2265 def collection(cls):
2266 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
2268 @classmethod
2269 def create(cls, command: Self):
2270 command = cls(
2271 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
2272 sent_at=command.sent_at,
2273 response_time=command.response_time,
2274 command_type=command.command_type,
2275 description=command.description,
2276 succeeded=command.succeeded,
2277 user_id=command.user_id,
2278 )
2279 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
2280 if new_command is None:
2281 return None
2282 return {"command_id": str(new_command.inserted_id)}
2284 def receive_response(self, response: dict):
2285 self.response_time = time.time() - self.sent_at
2286 self.succeeded = response.get("error", True) is False
2287 if self.description == "":
2288 self.description += response.get("message", "").rstrip()
2291class SignalsPresetCreation(GenericMongo):
2292 name: str
2293 signal_ids: list[str]
2296class SignalsPreset(SignalsPresetCreation):
2297 collection_name: ClassVar[str] = "signals_presets"
2299 user_id: str
2301 @classmethod
2302 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
2303 signals_preset = cls(
2304 user_id=user_id,
2305 name=signals_preset.name,
2306 signal_ids=signals_preset.signal_ids,
2307 )
2309 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
2311 return str(new_signal_preset.inserted_id)
2314SignalsPresetUpdate = create_update_model(SignalsPreset)
2317class LineStyle(str, Enum):
2318 solid = "solid"
2319 dotted = "dotted"
2320 dashed = "dashed"
2323class SignalAppearance:
2324 value_color: str
2325 forced_value_color: str
2328class GraphThemeCreation(GenericMongo):
2329 collection_name: ClassVar[str] = "graph_themes"
2331 name: str
2332 signal_id: str
2333 value_color: str = ""
2334 forced_value_color: str = ""
2335 value_line_style: LineStyle = LineStyle.solid
2336 forced_value_line_style: LineStyle = LineStyle.solid
2337 private: bool = True
2340class PublicGraphTheme(GraphThemeCreation):
2341 created_by_user: bool
2342 in_user_library: bool
2343 active_for_user: bool
2345 _current_user_id: str = ""
2347 @classproperty
2348 def custom_pipeline_steps(cls) -> dict[str, list]:
2349 return {
2350 "created_by_user": [
2351 {
2352 "$addFields": {
2353 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
2354 }
2355 }
2356 ],
2357 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
2358 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
2359 ],
2360 "in_user_library": [
2361 {
2362 "$addFields": {
2363 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2364 }
2365 }
2366 ],
2367 "active_for_user": [
2368 {
2369 "$addFields": {
2370 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2371 }
2372 }
2373 ],
2374 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2375 "active": [
2376 {
2377 "$addFields": {
2378 "active": "$$REMOVE",
2379 }
2380 }
2381 ],
2382 "creator_id": [
2383 {
2384 "$addFields": {
2385 "creator_id": "$$REMOVE",
2386 }
2387 }
2388 ],
2389 }
2391 @classmethod
2392 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2393 cls._current_user_id = user_id
2394 return super().response_from_query(query)
2396 @classmethod
2397 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2398 query.in_user_library = "true"
2399 return cls.response_from_query(query, user_id)
2401 @classmethod
2402 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2403 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2405 @classmethod
2406 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2407 cls._current_user_id = user_id
2408 return super().get_by_attribute(attribute_name, attribute_value)
2410 @classmethod
2411 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2412 cls._current_user_id = user_id
2413 return super().get_one_by_attribute(attribute_name, attribute_value)
2415 @classmethod
2416 def get_all(cls, sort_by: str, user_id: str):
2417 cls._current_user_id = user_id
2418 return super().get_all(sort_by)
2420 @classmethod
2421 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2422 pipeline = [
2423 {
2424 "$match": {
2425 "active": {"$eq": user_id},
2426 "signal_id": {"$in": signal_ids},
2427 }
2428 },
2429 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2430 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2431 {
2432 "$project": {
2433 "_id": 0,
2434 "signal_id": 1,
2435 "value_color": 1,
2436 "forced_value_color": 1,
2437 "value_line_style": 1,
2438 "forced_value_line_style": 1,
2439 }
2440 },
2441 ]
2443 result = {}
2445 cursor = cls.collection().aggregate(pipeline)
2446 for document in cursor:
2447 signal_id = document["signal_id"]
2448 del document["signal_id"]
2449 result[signal_id] = document
2451 return result
2454GraphThemeUpdate = create_update_model(PublicGraphTheme)
2457class PrivateGraphTheme(GraphThemeCreation):
2458 # private
2459 creator_id: str
2460 in_library: list[str]
2461 active: list[str]
2463 @classmethod
2464 def create(
2465 cls,
2466 creator_id: str,
2467 name: str,
2468 signal_id: str,
2469 value_color: str,
2470 forced_value_color: str,
2471 value_line_style: LineStyle,
2472 forced_value_line_style: LineStyle,
2473 private: bool,
2474 ):
2475 color_setting = cls(
2476 creator_id=creator_id,
2477 name=name,
2478 signal_id=signal_id,
2479 value_color=value_color,
2480 forced_value_color=forced_value_color,
2481 value_line_style=value_line_style,
2482 forced_value_line_style=forced_value_line_style,
2483 private=private,
2484 in_library=[creator_id],
2485 active=[creator_id],
2486 )
2488 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2489 color_setting.id = str(new_color_setting.inserted_id)
2490 return color_setting
2492 def update(self, update_dict: dict, user_id: str):
2493 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2494 if in_user_lib and user_id not in self.in_library:
2495 self.in_library.append(user_id)
2496 elif not in_user_lib and user_id in self.in_library:
2497 self.in_library.remove(user_id)
2498 update_dict["in_library"] = self.in_library
2499 del update_dict["in_user_library"]
2501 if (active_for_user := update_dict.get("active_for_user")) is not None:
2502 if active_for_user and user_id not in self.active:
2503 self.active.append(user_id)
2504 elif not active_for_user and user_id in self.active:
2505 self.active.remove(user_id)
2506 update_dict["active"] = self.active
2507 del update_dict["active_for_user"]
2509 if update_dict.get("created_by_user") is not None:
2510 del update_dict["created_by_user"]
2512 self.collection().find_one_and_update(
2513 {"_id": ObjectId(self.id)},
2514 {"$set": update_dict},
2515 )
2517 return {}
2520class DeviceStatus(str, Enum):
2521 started = "started"
2522 running = "running"
2523 created = "created"
2524 exited = "exited"
2525 restarting = "restarting"
2528class DeviceUpdateFromDeployer(BaseModel):
2529 status: DeviceStatus
2532class DeviceFromDeployerCreation(BaseModel):
2533 name: str
2534 description: str
2537class DeviceFromDeployer(DeviceFromDeployerCreation):
2538 status: DeviceStatus
2539 device_id: DeviceId
2540 logs: str = ""
2543class DeviceDeployer(GenericMongo):
2544 collection_name: ClassVar[str] = "device_deployers"
2545 url: HttpUrl
2547 def endpoint_url(self, endpoint):
2548 return f"{str(self.url).rstrip('/')}/{endpoint}"
2550 def devices(self) -> list[DeviceFromDeployer]:
2551 devices = []
2552 try:
2553 response = requests.get(self.endpoint_url("devices"))
2554 except requests.exceptions.ConnectionError:
2555 logger.info("connection error")
2556 return None
2557 if response.status_code != 200:
2558 return None
2559 for device_dict in response.json()["devices"]:
2560 devices.append(
2561 DeviceFromDeployer(
2562 device_id=device_dict["device_id"],
2563 name=device_dict["container_name"],
2564 description="desc",
2565 status=device_dict["status"],
2566 logs=device_dict["logs"],
2567 )
2568 )
2569 return devices
2571 def get_device(self, device_id: DeviceId):
2572 try:
2573 response = requests.get(self.endpoint_url(f"devices/{device_id}"))
2574 except requests.exceptions.ConnectionError:
2575 return None
2576 if response.status_code != 200:
2577 return None
2578 device_dict = response.json()
2579 return DeviceFromDeployer(
2580 device_id=device_dict["device_id"],
2581 name=device_dict["container_name"],
2582 description="desc",
2583 status=device_dict["status"],
2584 logs=device_dict["logs"],
2585 )
2587 def create_device(self, device: DeviceFromDeployer) -> Device | None:
2588 try:
2589 response = requests.post(self.endpoint_url("devices"), json={"name": device.name})
2590 except requests.exceptions.ConnectionError:
2591 return None
2593 if response.status_code != 201:
2594 return None
2596 device_dict = response.json()
2597 return DeviceFromDeployer(
2598 device_id=device_dict["device_id"],
2599 name="",
2600 description="desc",
2601 status=device_dict["status"],
2602 )
2604 def update_device(self, device_id, device_update: DeviceUpdateFromDeployer) -> Device | None:
2605 try:
2606 response = requests.patch(self.endpoint_url(f"devices/{device_id}"), json=device_update.model_dump())
2607 except requests.exceptions.ConnectionError:
2608 return None
2610 if response.status_code != 200:
2611 return None
2613 device_dict = response.json()
2614 return Device(
2615 device_id=device_dict["device_id"],
2616 name="",
2617 description="desc",
2618 pid={},
2619 petri_network={},
2620 modes=[],
2621 status=device_dict["status"],
2622 )
2624 def delete_device(self, device_id: DeviceId) -> DeleteInfo:
2625 try:
2626 response = requests.delete(self.endpoint_url(f"devices/{device_id}"))
2627 except requests.exceptions.ConnectionError:
2628 return DeleteInfo(is_deleted=False, detail="Connection to deployer error")
2629 if response.status_code not in [200, 202, 204]:
2630 return DeleteInfo(is_deleted=False, detail=response.text)
2632 return DeleteInfo(is_deleted=True, detail="")
2635DeviceDeployerUpdate = create_update_model(DeviceDeployer)