Coverage for / usr / local / lib / python3.11 / site-packages / twinpad_backend / models.py: 97%
1413 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-25 16:48 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-25 16:48 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, get_args
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pydantic import BaseModel, computed_field, Field, create_model
21import numpy as npy
22import lttb
23import h5py
25from twinpad_backend.db import (
26 get_collection,
27 get_async_collection,
28 get_signal_collection,
29 get_signal_collections_batch,
30 systems_database,
31 systems_async_database,
32 signals_database,
33 signals_async_database,
34 devices_states_database,
35)
36from twinpad_backend.responses import ListResponse
37from twinpad_backend.messages import send_mode_change, send_signal_value
38from twinpad_backend.post_processing import cumul, delta, derive, integ, align_x, mean, norm
40TYPES = {"int": int, "float": float, "str": str, "bool": bool, "epoch": float}
41SINGLE_POST_PROCESSING_FUNCTION = Literal["Cumul", "Delta", "DeltaT", "Derive", "Integ"]
42DOUBLE_POST_PROCESSING_FUNCTION = Literal["Align-X", "Atan2", "Using-X"]
43MULTIPLE_POST_PROCESSING_FUNCTION = Literal["Mean", "Merge", "Norm"]
46RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
47MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
48SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
49HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
50DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
52DEVICE_TIMEOUT = float(os.environ.get("DEVICE_TIMEOUT", 5.0))
53NUMBER_SAMPLES_DATABASE_UPDATE = 120
55logger = logging.getLogger("uvicorn.error")
58class classproperty:
59 """
60 Custom classproperty wrapper since it was deprecated in python 3.11 and will be removed in python 3.13.
61 Found here: https://stackoverflow.com/a/76301341
62 """
64 def __init__(self, func):
65 self.fget = func
67 def __get__(self, _, owner):
68 return self.fget(owner)
71def create_update_model(model):
72 fields = {}
74 for field_name, field_annotation in model.model_fields.items():
75 if field_name != "id":
76 fields[field_name] = (field_annotation.annotation | None, None)
78 query_name = model.__name__ + "Update"
79 return create_model(query_name, **fields)
82def get_utc_date_from_timestamp(timestamp: float):
83 return datetime.datetime.fromtimestamp(timestamp).isoformat()
86def downsample_list(time_vector: list, values: list, max_number_samples: int):
87 if len(time_vector) < max_number_samples:
88 return time_vector, values
90 time_vector_copy = copy.deepcopy(time_vector)
91 values_copy = copy.deepcopy(values)
93 none_group_bounds = []
94 none_group_index = -1
95 index = -1
96 # LTTB doesn't handle None values so remove them
97 while values_copy.count(None) > 0:
98 # Store bounds of None value groups so we can insert them back after the downsampling
99 if (new_index := values_copy.index(None)) != index:
100 none_group_bounds.append([time_vector_copy.pop(new_index)])
101 none_group_index += 1
102 elif len(none_group_bounds[none_group_index]) < 2:
103 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
104 else:
105 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
106 values_copy.pop(new_index)
107 index = new_index
108 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
110 try:
111 values_array = npy.array([time_vector_copy, values_copy]).T
112 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
114 new_time_vector = interpolated_values[:, 0].tolist()
115 new_values = interpolated_values[:, 1].tolist()
116 except ValueError:
117 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
118 new_time_vector = npy.linspace(time_vector[0], time_vector[-1], max_number_samples, endpoint=True).tolist()
119 new_values = list(npy.interp(new_time_vector, time_vector, npy.array(values, dtype="float64")))
120 new_values_nan_to_none = npy.where(npy.isnan(new_values), None, new_values).tolist()
121 return new_time_vector, new_values_nan_to_none
123 # insert back None values at the correct timestamps
124 for none_group in none_group_bounds:
125 start_index = npy.searchsorted(new_time_vector, none_group[0])
126 new_time_vector[start_index:start_index] = none_group
127 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
129 return new_time_vector, new_values
132def is_of_type(value, wanted_type):
133 if wanted_type is float:
134 return isinstance(value, (int, float))
135 return isinstance(value, wanted_type)
138# Models
139class TwinPadModel(BaseModel):
140 @classmethod
141 def dict_to_object(cls, dict_):
142 return cls.model_validate(dict_)
144 def to_dict(self, exclude=None):
145 dict_ = self.model_dump(exclude=exclude)
146 return dict_
149class GenericMongo(TwinPadModel):
150 id: str | None = None
151 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
153 @classmethod
154 def collection(cls):
155 return get_collection(systems_database, cls.collection_name, create=True)
157 @classmethod
158 def response_from_query(cls, query) -> ListResponse[Self]:
159 request_filters = query.mongodb_filter()
160 items = []
162 # Allows for multi-sort, Python dicts are ordered so no issue while sorting
163 sort_dict = {}
164 for sort in query.sort_by.split(","):
165 if ":" in sort:
166 sort_field, sort_order = sort.split(":")
167 sort_order = int(sort_order)
168 else:
169 sort_field = sort
170 sort_order = 1
171 sort_dict[sort_field] = sort_order
173 collection = get_collection(systems_database, cls.collection_name, create=True)
174 total = collection.count_documents(request_filters)
176 pipeline = []
177 added_properties = []
178 if "$and" in request_filters:
179 for request_filter in request_filters["$and"]:
180 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
181 if filtered_property in request_filter:
182 pipeline.extend(pipeline_steps)
183 added_properties.append(filtered_property)
184 else:
185 for filtered_property, pipeline_steps in cls.custom_pipeline_steps.items():
186 if filtered_property in request_filters:
187 pipeline.extend(pipeline_steps)
188 added_properties.append(filtered_property)
189 pipeline.append({"$match": request_filters})
191 for sort_field in sort_dict.keys():
192 if sort_field in cls.custom_pipeline_steps:
193 pipeline.extend(cls.custom_pipeline_steps[sort_field])
194 added_properties.append(sort_field)
195 pipeline.extend([{"$sort": sort_dict}, {"$skip": query.offset}])
197 if (query.limit is not None) and (query.limit != 0):
198 pipeline.append({"$limit": query.limit})
200 for filtered_property, step in cls.custom_pipeline_steps.items():
201 if filtered_property not in added_properties:
202 pipeline.extend(step)
204 cursor = collection.aggregate(pipeline)
206 for item_dict in cursor:
207 items.append(cls.mongo_dict_to_object(item_dict))
209 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
211 @classmethod
212 def get_from_id(cls, item_id) -> Self | None:
213 return cls.get_one_by_attribute("_id", ObjectId(item_id))
215 @classmethod
216 def mongo_dict_to_object(cls, mongo_dict):
217 mongo_dict["id"] = str(mongo_dict["_id"])
218 del mongo_dict["_id"]
219 return cls.dict_to_object(mongo_dict)
221 @classmethod
222 def get_by_attribute(cls, attribute_name: str, attribute_value):
223 """Returns all items that match the attribute with value."""
224 pipeline = []
225 if attribute_name in cls.custom_pipeline_steps:
226 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
227 pipeline.append({"$match": {attribute_name: attribute_value}})
228 for key, step in cls.custom_pipeline_steps.items():
229 if key != attribute_name:
230 pipeline.extend(step)
231 items = cls.collection().aggregate(pipeline)
232 if items is None:
233 return None
234 return [cls.mongo_dict_to_object(d) for d in items]
236 @classmethod
237 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
238 pipeline = []
239 if attribute_name in cls.custom_pipeline_steps:
240 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
241 pipeline.append({"$match": {attribute_name: attribute_value}})
242 pipeline.append({"$limit": 1})
243 for key, step in cls.custom_pipeline_steps.items():
244 if key != attribute_name:
245 pipeline.extend(step)
246 items = cls.collection().aggregate(pipeline).to_list()
247 if len(items) == 0:
248 return None
249 return cls.mongo_dict_to_object(items[0])
251 @classmethod
252 def get_all(cls, sort_by="_id") -> list[Self]:
253 items = []
254 pipeline = []
255 if sort_by in cls.custom_pipeline_steps:
256 pipeline.extend(cls.custom_pipeline_steps[sort_by])
257 pipeline.append({"$sort": {sort_by: ASCENDING}})
258 for key, step in cls.custom_pipeline_steps.items():
259 if key != sort_by:
260 pipeline.extend(step)
261 for dict_ in cls.collection().aggregate(pipeline):
262 items.append(cls.mongo_dict_to_object(dict_))
263 return items
265 @classmethod
266 def get_number_documents(cls):
267 collection = get_collection(systems_database, cls.collection_name)
268 if collection is None:
269 return 0
270 return collection.count_documents(
271 {"$or": [{"post_processing": False}, {"post_processing": {"$exists": False}}]}
272 )
274 def insert(self):
275 insert_result = self.collection().insert_one(self.to_dict(exclude={"id"}))
276 self.id = str(insert_result.inserted_id)
277 return self.id
279 def update(self, update_dict):
280 for key, value in update_dict.items():
281 setattr(self, key, value)
282 self.collection().find_one_and_update(
283 {"_id": ObjectId(self.id)},
284 {"$set": update_dict},
285 return_document=ReturnDocument.AFTER,
286 )
288 return self
290 def delete(self):
291 result = self.collection().delete_one({"_id": ObjectId(self.id)})
292 return result.deleted_count > 0
295class User(GenericMongo):
296 collection_name: ClassVar[str] = "users"
298 firstname: str
299 lastname: str
300 email: str
301 password: str
302 is_active: bool | None = False
303 is_admin: bool | None = False
304 is_connected: bool | None = False
305 company_id: str | None = None
307 def to_dict(self, exclude=None):
308 if exclude is None:
309 exclude = {"password"}
310 return GenericMongo.to_dict(self, exclude=exclude)
312 @classmethod
313 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
314 users = cls.get_all()
315 if not users:
316 is_admin = True
317 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
318 user_collection = get_collection(systems_database, "users", create=True)
319 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
320 if new_user is None:
321 return None
322 return {"user_id": str(new_user.inserted_id)}
324 @classmethod
325 def update_info(cls, user: "UserUpdate", user_id: str):
326 updated_user = cls.collection().find_one_and_update(
327 {"_id": ObjectId(user_id)},
328 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
329 return_document=ReturnDocument.AFTER,
330 )
331 updated_user["id"] = str(updated_user["_id"])
332 del (updated_user["_id"], updated_user["is_connected"])
333 return cls(**updated_user)
336UserUpdate = create_update_model(User)
339class Mode(TwinPadModel):
340 mode_id: int
341 name: str
342 frequency_multiplier: float
343 min_frequency: float
346class DeviceUpdate(TwinPadModel):
347 mode_id: int
350class Device(GenericMongo):
351 collection_name: ClassVar[str] = "devices"
353 device_id: str
354 name: str
355 description: str = ""
356 modes: list[Mode]
357 current_mode_id: int | None = None
358 last_ping: float | None = None
359 petri_network: Any
360 pid: Any
361 load: float | None = None
362 tokens: list[int] = Field(default_factory=list)
363 status: str
365 async def change_mode(self, update_dict, current_user: User):
366 has_error = False
368 if update_dict.mode_id < 0 or update_dict.mode_id > len(self.modes):
369 has_error = True
370 description = f"Change mode of #{self.device_id} to inexistent mode #{update_dict.mode_id}"
371 elif self.current_mode_id is not None:
372 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
373 else:
374 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
375 command = Command(
376 sent_at=time.time(),
377 command_type="Mode change",
378 description=description,
379 user_id=current_user.id,
380 )
382 if has_error:
383 command.response_time = 0
384 command.succeeded = False
385 response = {"error": True, "status_code": 400, "message": f"Invalid mode id: {update_dict.mode_id}"}
386 else:
387 response = await send_mode_change(self.device_id, update_dict.mode_id)
388 command.receive_response(response)
390 Command.create(command)
391 return response
393 @classmethod
394 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
395 devices_by_id = {}
396 for signal_id in signal_ids:
397 device_id = signal_id.split(".")[0]
398 if device_id not in devices_by_id:
399 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
400 return devices_by_id
403class DeviceSetup(GenericMongo):
404 collection_name: ClassVar[str] = "device_setups"
406 device_ids: list[str]
407 active: bool = False
408 variable_mapping: dict[str, str]
411DeviceSetupUpdate = create_update_model(DeviceSetup)
414class DeviceState(GenericMongo):
415 collection_name: ClassVar[str] = "devices_states"
417 timestamp: float
418 mode: str | None = None
419 load: float | None = None
420 tokens: list[int] = Field(default_factory=list)
421 modified_properties: list[str] = Field(default_factory=list)
423 @classmethod
424 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
425 req_filter = query.mongodb_filter()
426 items = []
427 if ":" in query.sort_by:
428 sort_field, sort_order = query.sort_by.split(":")
429 sort_order = int(sort_order)
430 else:
431 sort_field = query.sort_by
432 sort_order = 1
433 collection = get_collection(devices_states_database, device_id)
434 if collection is None:
435 total = 0
436 cursor = []
437 else:
438 total = collection.count_documents(req_filter)
439 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
440 if (query.limit is not None) and (query.limit != 0):
441 cursor = cursor.limit(query.limit)
442 for item_dict in cursor:
443 items.append(
444 cls(
445 timestamp=item_dict.get("precise_timestamp"),
446 mode=item_dict.get("mode", None),
447 load=item_dict.get("load", None),
448 tokens=item_dict.get("tokens", Field(default_factory=list)),
449 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
450 )
451 )
452 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
455class SignalSample(TwinPadModel):
456 signal_id: str
457 timestamp: float
458 value: float | int | str | bool | None
459 forced_value: float | int | str | bool | None = None
461 @classmethod
462 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
464 collection = get_signal_collection(signal_id)
465 if collection is None:
466 return None
468 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
469 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
470 bucket = get_signal_collection(f"system.buckets.{signal_id}")
471 first_bucket = None
472 if bucket is not None:
473 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
474 if first_bucket is not None:
475 sample_data = collection.find_one(
476 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
477 )
478 else:
479 sample_data = collection.find_one({}, sort={"precise_timestamp": 1})
481 if sample_data is None:
482 return None
484 timestamp = sample_data["precise_timestamp"]
486 return cls(
487 signal_id=signal_id,
488 timestamp=timestamp,
489 value=sample_data.get("value", None),
490 forced_value=sample_data.get("forced_value", None),
491 )
493 @classmethod
494 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
495 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
497 @classmethod
498 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
499 collection = get_signal_collection(signal_id)
500 if collection is None:
501 return None
503 # Same workaround as above function, very effective to narrow down big sets of data
504 bucket = get_signal_collection(f"system.buckets.{signal_id}")
505 last_bucket = None
506 if bucket is not None:
507 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
508 if last_bucket is not None:
509 sample_data = collection.find_one(
510 {"precise_timestamp": {"$gte": last_bucket["control"]["max"]["precise_timestamp"]}},
511 sort={"precise_timestamp": -1},
512 )
513 else:
514 sample_data = collection.find_one({}, sort={"precise_timestamp": -1})
516 if sample_data is None:
517 return None
519 timestamp = sample_data["precise_timestamp"]
521 if device is None:
522 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
523 if device is not None and device.last_ping is not None:
524 if timestamp is None:
525 timestamp = device.last_ping
526 else:
527 timestamp = max(timestamp, device.last_ping)
528 return cls(
529 signal_id=signal_id,
530 timestamp=timestamp,
531 value=sample_data.get("value", None),
532 forced_value=sample_data.get("forced_value", None),
533 )
535 @classmethod
536 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
537 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
538 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
541class SignalData(TwinPadModel):
542 signal_id: str
543 forcible: bool = True
544 time_vector: list[float]
545 values: list[float | int | str | None]
546 forced_values: list[float | int | str | None]
548 data_start: float | None = None
549 data_end: float | None = None
551 number_samples: int = 0
552 number_samples_db: int = 0
554 db_query_time: float = 0.0
555 init_time: float = 0.0
556 data_processing_time: float = 0.0
558 phase_id: str | None = None
560 @classmethod
561 def get_from_signal_id(
562 cls,
563 signal_id: str,
564 min_timestamp: float = None,
565 max_timestamp: float = None,
566 window_min_timestamp: float = None,
567 window_max_timestamp: float = None,
568 interpolate_bounds: bool = True,
569 max_documents: int = None,
570 ) -> Self:
572 now = time.time()
574 req_signal = {}
575 if min_timestamp is not None:
576 req_signal.setdefault("timestamp", {})
577 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
578 if max_timestamp is not None:
579 req_signal.setdefault("timestamp", {})
580 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
582 collection = get_signal_collection(signal_id)
583 if collection is None:
584 return cls(
585 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
586 )
588 db_req_start = time.time()
590 sort_step = {"$sort": {"precise_timestamp": 1}}
591 number_results = collection.count_documents(req_signal)
593 pipeline = []
594 if req_signal:
595 pipeline.append({"$match": req_signal}) # Filter data if needed
597 pipeline.extend(
598 [
599 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
600 sort_step,
601 ]
602 )
604 if max_documents is not None and max_documents < number_results:
605 unsampling_ratio = math.ceil(number_results / max_documents)
606 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
607 pipeline.extend(
608 [
609 {
610 "$setWindowFields": {
611 "sortBy": {"precise_timestamp": 1},
612 "output": {"index": {"$documentNumber": {}}},
613 }
614 },
615 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
616 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
617 {"$replaceRoot": {"newRoot": "$doc"}},
618 {"$unset": ["index", "group_id"]},
619 {"$sort": {"precise_timestamp": 1}},
620 ]
621 )
623 # logger.info(f"pipeline: %s", str(pipeline))
624 cursor = collection.aggregate(pipeline)
625 db_req_time = time.time() - db_req_start
627 init_time = time.time()
629 results = cursor.to_list()
630 time_vector = []
631 values = []
632 forced_values = []
633 for s in results:
634 time_vector.append(s["precise_timestamp"])
635 values.append(s.get("value", None))
636 forced_values.append(s.get("forced_value", None))
638 signal = Signal.get_from_signal_id(signal_id)
639 if signal is None:
640 return cls(
641 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
642 )
643 class_ = signal.signal_data_class
645 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
646 time_vector, values, forced_values = cls.interpolate_bounds(
647 class_,
648 collection,
649 signal_id,
650 time_vector,
651 values,
652 forced_values,
653 window_min_timestamp,
654 window_max_timestamp,
655 )
657 if values:
658 # TODO: check below. a bit strange
659 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
660 # Adding last value as it should be repeated
661 time_vector.append(now)
662 values.append(values[-1])
663 forced_values.append(forced_values[-1])
665 init_time = time.time() - init_time
667 # See line 292 for explanation
668 bucket = get_signal_collection(f"system.buckets.{signal_id}")
669 first_bucket = None
670 if bucket is not None:
671 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
672 if first_bucket is not None:
673 data_start = first_bucket["control"]["min"]["precise_timestamp"]
674 else:
675 data_start = None
677 last_bucket = None
678 if bucket is not None:
679 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
680 if last_bucket is not None:
681 data_end = last_bucket["control"]["max"]["precise_timestamp"]
682 else:
683 data_end = None
685 return class_(
686 signal_id=signal_id,
687 forcible=signal.forcible,
688 time_vector=time_vector,
689 values=values,
690 forced_values=forced_values,
691 data_start=data_start,
692 data_end=data_end,
693 number_samples=len(values),
694 number_samples_db=number_results,
695 db_query_time=db_req_time,
696 init_time=init_time,
697 )
699 @staticmethod
700 def interpolate_bounds(
701 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
702 ):
703 sample_right = None
704 # Fetching right side value & interpolation
705 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
706 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
707 sample_right = collection.find_one(
708 {
709 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
710 "value": {"$exists": True},
711 },
712 sort={"precise_timestamp": -1},
713 )
714 if sample_right:
715 if time_vector:
716 right_sd = class_(
717 signal_id=signal_id,
718 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
719 values=[values[-1], sample_right.get("value", None)],
720 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
721 )
722 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
723 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
724 else:
725 max_ts_value = sample_right.get("value", None)
726 max_ts_forced_value = sample_right.get("forced_value", None)
727 time_vector.append(window_max_timestamp)
728 values.append(max_ts_value)
729 forced_values.append(max_ts_forced_value)
731 # Fetching left side value & interpolation
732 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
733 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
734 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
735 sample_left = sample_right
736 sample_left = collection.find_one(
737 {
738 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
739 "value": {"$exists": True},
740 },
741 sort={"precise_timestamp": -1},
742 )
744 if sample_left:
745 if time_vector:
746 left_sd = class_(
747 signal_id=signal_id,
748 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
749 values=[sample_left["value"], values[0]],
750 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
751 )
752 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
753 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
754 else:
755 min_ts_value = sample_left.get("value", None)
756 min_ts_forced_value = sample_left.get("forced_value", None)
757 time_vector.insert(0, window_min_timestamp)
758 values.insert(0, min_ts_value)
759 forced_values.insert(0, min_ts_forced_value)
761 return time_vector, values, forced_values
763 def interpolate_values(self, new_time_vector: list[float]):
764 return self.interpolate(new_time_vector, self.values)
766 def interpolate_forced_values(self, new_time_vector: list[float]):
767 return self.interpolate(new_time_vector, self.forced_values)
769 def uniform_desampling(self, number_samples_max: int) -> Self:
770 data_processing_time = time.time()
771 if number_samples_max and self.number_samples > number_samples_max:
772 new_time_vector = npy.linspace(
773 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
774 ).tolist()
775 values = self.interpolate_values(new_time_vector)
776 forced_values = self.interpolate_forced_values(new_time_vector)
777 time_vector = new_time_vector
778 number_samples = len(time_vector)
779 else:
780 time_vector = self.time_vector
781 number_samples = len(self.values)
782 values = self.values[:]
783 forced_values = self.forced_values[:]
784 data_processing_time = time.time() - data_processing_time
786 return self.__class__(
787 signal_id=self.signal_id,
788 time_vector=time_vector,
789 values=values,
790 forced_values=forced_values,
791 number_samples=number_samples,
792 number_samples_db=self.number_samples,
793 data_start=self.data_start,
794 data_end=self.data_end,
795 db_query_time=self.db_query_time,
796 init_time=self.init_time,
797 data_processing_time=self.data_processing_time + data_processing_time,
798 phase_id=self.phase_id,
799 )
801 def min_max_downsampling(self, number_samples_max: int) -> Self:
802 return self.uniform_desampling(number_samples_max)
804 def interest_window_desampling(
805 self,
806 window_max_number_samples: int,
807 outside_max_number_samples: int,
808 window_min_timestamp: float | None = None,
809 window_max_timestamp: float | None = None,
810 ) -> Self:
811 """Performs a sampling in a window of interest and outside."""
813 if not self.time_vector:
814 return self
816 if window_min_timestamp is None:
817 window_min_timestamp = self.time_vector[0]
818 if window_max_timestamp is None:
819 window_max_timestamp = self.time_vector[-1]
821 data_processing_time = time.time()
823 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
824 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
826 time_vector_before = self.time_vector[:index_window_start]
827 time_vector_window = self.time_vector[index_window_start:index_window_end]
828 time_vector_after = self.time_vector[index_window_end:]
830 # Resampling window
831 if time_vector_window:
832 # Ensurring window bounds
833 if time_vector_window[0] != window_min_timestamp:
834 time_vector_window.insert(0, window_min_timestamp)
835 if time_vector_window[-1] != window_max_timestamp:
836 time_vector_window.append(window_max_timestamp)
837 else:
838 time_vector_window = [window_min_timestamp, window_max_timestamp]
840 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
841 # Resampling
842 new_window_time_vector = npy.linspace(
843 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
844 ).tolist()
845 time_vector_window = new_window_time_vector
847 # Resampling outside
848 number_samples_before = len(time_vector_before)
849 number_samples_after = len(time_vector_after)
850 if (
851 outside_max_number_samples is not None
852 and (number_samples_before + number_samples_after) > outside_max_number_samples
853 ):
854 new_number_samples_before = min(
855 number_samples_before,
856 math.ceil(
857 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
858 ),
859 )
860 new_number_samples_after = min(
861 number_samples_after,
862 math.ceil(
863 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
864 ),
865 )
866 # Adjusting numbers as math.ceil can do +1 on sum
867 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
868 if new_number_samples_before > new_number_samples_after:
869 new_number_samples_before -= 1
870 else:
871 new_number_samples_after -= 1
873 if new_number_samples_before > 0:
874 new_time_vector_before = npy.linspace(
875 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
876 ).tolist()
877 time_vector_before = new_time_vector_before
879 if new_number_samples_after > 0:
880 new_time_vector_after = npy.linspace(
881 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
882 ).tolist()[::-1]
883 time_vector_after = new_time_vector_after
885 new_time_vector = time_vector_before + time_vector_window + time_vector_after
886 values = self.interpolate_values(new_time_vector)
887 forced_values = self.interpolate_forced_values(new_time_vector)
888 number_samples = len(values)
890 data_processing_time = time.time() - data_processing_time
892 return self.__class__(
893 signal_id=self.signal_id,
894 forcible=self.forcible,
895 time_vector=new_time_vector,
896 values=values,
897 forced_values=forced_values,
898 number_samples=number_samples,
899 number_samples_db=self.number_samples,
900 data_start=self.data_start,
901 data_end=self.data_end,
902 db_query_time=self.db_query_time,
903 init_time=self.init_time,
904 data_processing_time=self.data_processing_time + data_processing_time,
905 )
907 def zero_time_vector(self, data_start: float):
908 data_processing_time = time.time()
909 if len(self.time_vector) == 0:
910 return self
911 time_vector = npy.array(self.time_vector) - data_start
912 data_processing_time = time.time() - data_processing_time
914 return self.__class__(
915 signal_id=self.signal_id,
916 time_vector=time_vector,
917 values=self.values,
918 forced_values=self.forced_values,
919 number_samples=self.number_samples,
920 number_samples_db=self.number_samples_db,
921 data_start=time_vector[0],
922 data_end=time_vector[-1],
923 db_query_time=self.db_query_time,
924 init_time=self.init_time,
925 data_processing_time=self.data_processing_time + data_processing_time,
926 )
928 def csv_export(self):
929 output = io.StringIO()
930 writer = csv.writer(output)
931 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
932 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
933 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
934 return output.getvalue().encode("utf-8")
936 def prestoplot_export(self):
937 clean_signal_id = self.signal_id.replace(".", "_")
938 if clean_signal_id[0].isnumeric():
939 clean_signal_id = "_" + clean_signal_id
941 output = io.StringIO()
942 output.write("# Encoding:\tUTF-8\n")
943 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
944 output.write("ISO8601\tnone\tnone\n")
945 output.write(f"# Description :\t{clean_signal_id}\n")
947 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
948 output.write(
949 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
950 )
951 return output.getvalue().encode("utf-8")
954class NumericSignalData(SignalData):
955 data_type: str = "float"
956 values: list[float | int | None]
957 forced_values: list[float | int | None]
959 def interpolate(self, new_time_vector: list[float], items):
960 items = [npy.nan if s is None else s for s in items]
961 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
963 def uniform_desampling(self, number_samples_max: int) -> Self:
964 data_processing_time = time.time()
965 if number_samples_max and self.number_samples > number_samples_max:
966 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
967 forced_values = self.interpolate_forced_values(time_vector)
968 number_samples = len(time_vector)
969 else:
970 time_vector = self.time_vector
971 number_samples = len(self.values)
972 values = self.values[:]
973 forced_values = self.forced_values[:]
974 data_processing_time = time.time() - data_processing_time
976 return self.__class__(
977 signal_id=self.signal_id,
978 time_vector=time_vector,
979 values=values,
980 forced_values=forced_values,
981 number_samples=number_samples,
982 number_samples_db=self.number_samples,
983 data_start=self.data_start,
984 data_end=self.data_end,
985 db_query_time=self.db_query_time,
986 init_time=self.init_time,
987 data_processing_time=self.data_processing_time + data_processing_time,
988 )
990 def min_max_downsampling(self, number_samples_max: int) -> Self:
991 if self.number_samples < number_samples_max:
992 return self
994 data_processing_time = time.time()
996 number_bins = number_samples_max // 2
998 time_vector = npy.array(self.time_vector, dtype=npy.float64)
999 values = npy.array(self.values, dtype=npy.float64)
1000 forced_values = npy.array(self.forced_values, dtype=npy.float64)
1002 points_per_bin = self.number_samples // number_bins
1004 # When not done like this, the end of the data will be cut off because of the remainder of the two divisions above
1005 # This increases the number of points per bin and reduces the number of bins while filling the last bin with NaNs to ensure that every point is accounted for
1006 if self.number_samples - number_bins * points_per_bin > 1:
1007 points_per_bin += 1
1008 number_bins = self.number_samples // points_per_bin + 1
1009 nan_points_to_add = npy.full(points_per_bin * number_bins - self.number_samples, npy.nan)
1010 time_vector = npy.concatenate([time_vector, nan_points_to_add])
1011 values = npy.concatenate([values, nan_points_to_add])
1012 forced_values = npy.concatenate([forced_values, nan_points_to_add])
1014 timestamps_matrix = time_vector.reshape(number_bins, points_per_bin)
1015 values_matrix = values.reshape(number_bins, points_per_bin)
1016 forced_values_matrix = forced_values.reshape(number_bins, points_per_bin)
1018 indexes_min = npy.zeros(number_bins, dtype="int64")
1019 indexes_max = npy.zeros(number_bins, dtype="int64")
1021 for row in range(number_bins):
1022 min_value = values_matrix[row, 0]
1023 max_value = values_matrix[row, 0]
1024 for column in range(points_per_bin):
1025 if values_matrix[row, column] < min_value:
1026 min_value = values_matrix[row, column]
1027 indexes_min[row] = column
1028 elif values_matrix[row, column] > max_value:
1029 max_value = values_matrix[row, column]
1030 indexes_max[row] = column
1032 row_index = npy.repeat(npy.arange(number_bins), 2)
1033 column_index = npy.sort(npy.stack((indexes_min, indexes_max), axis=1)).ravel()
1035 data_processing_time = time.time() - data_processing_time
1037 new_time_vector = timestamps_matrix[row_index, column_index]
1038 new_time_vector = npy.where(npy.isnan(new_time_vector), None, new_time_vector)
1039 new_values = values_matrix[row_index, column_index]
1040 new_values = npy.where(npy.isnan(new_values), None, new_values)
1041 new_forced_values = forced_values_matrix[row_index, column_index]
1042 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1044 # Make sure there are no None values for the time vector
1045 time_vector_filter = new_time_vector != None
1046 new_time_vector = new_time_vector[time_vector_filter]
1047 new_values = new_values[time_vector_filter]
1048 new_forced_values = new_forced_values[time_vector_filter]
1050 return self.__class__(
1051 signal_id=self.signal_id,
1052 time_vector=new_time_vector,
1053 values=new_values,
1054 forced_values=new_forced_values,
1055 number_samples=number_bins * 2,
1056 number_samples_db=self.number_samples_db,
1057 data_start=self.data_start,
1058 data_end=self.data_end,
1059 db_query_time=self.db_query_time,
1060 init_time=self.init_time,
1061 data_processing_time=self.data_processing_time + data_processing_time,
1062 phase_id=self.phase_id,
1063 )
1065 def interest_window_desampling(
1066 self,
1067 window_max_number_samples: int,
1068 outside_max_number_samples: int,
1069 window_min_timestamp: float | None = None,
1070 window_max_timestamp: float | None = None,
1071 ) -> Self:
1072 """Performs a sampling in a window of interest and outside."""
1074 if not self.time_vector:
1075 return self
1077 if window_min_timestamp is None:
1078 window_min_timestamp = self.time_vector[0]
1079 if window_max_timestamp is None:
1080 window_max_timestamp = self.time_vector[-1]
1082 data_processing_time = time.time()
1084 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
1085 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
1087 time_vector_before = self.time_vector[:index_window_start]
1088 time_vector_window = self.time_vector[index_window_start:index_window_end]
1089 time_vector_after = self.time_vector[index_window_end:]
1091 values_before = self.values[:index_window_start]
1092 values_window = self.values[index_window_start:index_window_end]
1093 values_after = self.values[index_window_end:]
1094 window_min_value = self.interpolate_values([window_min_timestamp])[0]
1095 window_max_value = self.interpolate_values([window_max_timestamp])[0]
1097 # Resampling window
1098 if time_vector_window:
1099 # Ensurring window bounds
1100 if time_vector_window[0] != window_min_timestamp:
1101 time_vector_window.insert(0, window_min_timestamp)
1102 values_window.insert(0, window_min_value)
1103 if time_vector_window[-1] != window_max_timestamp:
1104 time_vector_window.append(window_max_timestamp)
1105 values_window.append(window_max_value)
1106 else:
1107 time_vector_window = [window_min_timestamp, window_max_timestamp]
1108 values_window = [window_min_value, window_max_value]
1110 if window_max_number_samples is not None and len(time_vector_window) > window_max_number_samples:
1111 # Resampling
1112 time_vector_window, values_window = downsample_list(
1113 time_vector_window, values_window, window_max_number_samples
1114 )
1116 # Resampling outside
1117 number_samples_before = len(time_vector_before)
1118 number_samples_after = len(time_vector_after)
1119 if (
1120 outside_max_number_samples is not None
1121 and (number_samples_before + number_samples_after) > outside_max_number_samples
1122 ):
1123 new_number_samples_before = min(
1124 number_samples_before,
1125 math.ceil(
1126 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1127 ),
1128 )
1129 new_number_samples_after = min(
1130 number_samples_after,
1131 math.ceil(
1132 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1133 ),
1134 )
1135 # Adjusting numbers as math.ceil can do +1 on sum
1136 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1137 if new_number_samples_before > new_number_samples_after:
1138 new_number_samples_before -= 1
1139 else:
1140 new_number_samples_after -= 1
1142 if new_number_samples_before > 0:
1143 time_vector_before, values_before = downsample_list(
1144 time_vector_before, values_before, new_number_samples_before
1145 )
1147 if new_number_samples_after > 0:
1148 time_vector_after, values_after = downsample_list(
1149 time_vector_after, values_after, new_number_samples_after
1150 )
1152 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1153 values = values_before + values_window + values_after
1154 forced_values = self.interpolate_forced_values(new_time_vector)
1155 number_samples = len(values)
1157 data_processing_time = time.time() - data_processing_time
1159 return self.__class__(
1160 signal_id=self.signal_id,
1161 time_vector=new_time_vector,
1162 values=values,
1163 forced_values=forced_values,
1164 number_samples=number_samples,
1165 number_samples_db=self.number_samples,
1166 data_start=self.data_start,
1167 data_end=self.data_end,
1168 db_query_time=self.db_query_time,
1169 init_time=self.init_time,
1170 data_processing_time=self.data_processing_time + data_processing_time,
1171 )
1174class StringSignalData(SignalData):
1175 data_type: str = "str"
1176 values: list[str | None]
1177 forced_values: list[str | None]
1179 def interpolate(self, new_time_vector: list[float], items):
1180 # Find the indices of the values in xp that are just smaller or equal to x
1181 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1182 indices = npy.clip(indices, 0, len(items) - 1)
1183 # Return the corresponding left string values from fp
1184 return [items[i] for i in indices]
1187class SignalsData(TwinPadModel):
1188 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1189 data_processing_time: float
1190 data_start: float | None
1191 data_end: float | None
1193 @classmethod
1194 def get_from_signal_ids(
1195 cls,
1196 signal_ids: list[str],
1197 min_timestamp: float = None,
1198 max_timestamp: float = None,
1199 window_min_timestamp: float = None,
1200 window_max_timestamp: float = None,
1201 interpolate_bounds: bool = True,
1202 max_documents: int = None,
1203 ) -> Self:
1204 signals_data = []
1205 data_start = None
1206 data_end = None
1207 if max_timestamp is None:
1208 max_timestamp = time.time()
1209 data_processing_time = 0.0
1210 for signal_id in signal_ids:
1211 signal_data = SignalData.get_from_signal_id(
1212 signal_id=signal_id,
1213 min_timestamp=min_timestamp,
1214 max_timestamp=max_timestamp,
1215 window_min_timestamp=window_min_timestamp,
1216 window_max_timestamp=window_max_timestamp,
1217 interpolate_bounds=interpolate_bounds,
1218 max_documents=max_documents,
1219 )
1220 data_processing_time += signal_data.data_processing_time
1221 signals_data.append(signal_data)
1222 if signal_data.data_start is not None:
1223 if data_start is None:
1224 data_start = signal_data.data_start
1225 else:
1226 data_start = min(signal_data.data_start, data_start)
1227 if signal_data.data_end is not None:
1228 if data_end is None:
1229 data_end = signal_data.data_end
1230 else:
1231 data_end = max(signal_data.data_end, data_end)
1233 return cls(
1234 signals_data=signals_data,
1235 data_processing_time=data_processing_time,
1236 data_start=data_start,
1237 data_end=data_end,
1238 )
1240 @classmethod
1241 def get_from_phase_and_signal_ids(
1242 cls,
1243 phases: list,
1244 phase_sync_times: list[float | None],
1245 signal_ids: list[str],
1246 window_min_timestamps: list[float | None],
1247 window_max_timestamps: list[float | None],
1248 zero_time_vector: bool = True,
1249 ):
1250 signals_data: list[SignalData] = []
1251 computation_start = time.time()
1253 for phase, sync_time, window_min_timestamp, window_max_timestamp in zip(
1254 phases, phase_sync_times, window_min_timestamps, window_max_timestamps
1255 ):
1256 min_timestamp = phase.start_at / 1000
1257 max_timestamp = phase.end_at / 1000
1259 if sync_time is None:
1260 sync_time = min_timestamp
1262 if window_max_timestamp is not None and window_min_timestamp is not None:
1263 window_length = window_max_timestamp - window_min_timestamp
1265 if window_min_timestamp != min_timestamp:
1266 min_timestamp = max(min_timestamp, window_min_timestamp - window_length / 20)
1267 if window_max_timestamp != max_timestamp:
1268 max_timestamp = min(max_timestamp, window_max_timestamp + window_length / 20)
1270 for signal_id in signal_ids:
1271 signal_data = SignalData.get_from_signal_id(
1272 signal_id,
1273 min_timestamp,
1274 max_timestamp,
1275 window_min_timestamp,
1276 window_max_timestamp,
1277 interpolate_bounds=False,
1278 max_documents=None,
1279 )
1281 if len(signal_data.time_vector) == 0:
1282 continue
1284 if zero_time_vector:
1285 signal_data = signal_data.zero_time_vector(sync_time)
1286 signal_data.phase_id = phase.id
1288 signals_data.append(signal_data)
1290 return cls(
1291 signals_data=signals_data,
1292 data_processing_time=time.time() - computation_start,
1293 data_start=0,
1294 data_end=0,
1295 )
1297 def uniform_desampling(self, number_samples_max: int) -> Self:
1298 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1299 return SignalsData(
1300 signals_data=signals_data,
1301 data_processing_time=sum(s.data_processing_time for s in signals_data),
1302 data_start=self.data_start,
1303 data_end=self.data_end,
1304 )
1306 def min_max_downsampling(self, number_samples_max: int) -> Self:
1307 signals_data = [s.min_max_downsampling(number_samples_max=number_samples_max) for s in self.signals_data]
1308 return SignalsData(
1309 signals_data=signals_data,
1310 data_processing_time=sum(s.data_processing_time for s in signals_data),
1311 data_start=self.data_start,
1312 data_end=self.data_end,
1313 )
1315 def interest_window_desampling(
1316 self,
1317 window_max_number_samples: int,
1318 outside_max_number_samples: int,
1319 window_min_timestamp: float = None,
1320 window_max_timestamp: float = None,
1321 ) -> Self:
1322 signals_data = [
1323 s.interest_window_desampling(
1324 window_max_number_samples=window_max_number_samples,
1325 outside_max_number_samples=outside_max_number_samples,
1326 window_min_timestamp=window_min_timestamp,
1327 window_max_timestamp=window_max_timestamp,
1328 )
1329 for s in self.signals_data
1330 ]
1332 return SignalsData(
1333 signals_data=signals_data,
1334 data_processing_time=sum(s.data_processing_time for s in signals_data),
1335 data_start=self.data_start,
1336 data_end=self.data_end,
1337 )
1339 def zero_time_vector(self, data_start: float):
1340 signals_data = [s.zero_time_vector(data_start) for s in self.signals_data]
1341 return SignalsData(
1342 signals_data=signals_data,
1343 data_processing_time=sum(s.data_processing_time for s in signals_data),
1344 data_start=0,
1345 data_end=max([s.data_end for s in signals_data]),
1346 )
1348 @classmethod
1349 async def apply_single_function(
1350 cls,
1351 phase,
1352 base_signal_id: str,
1353 function: SINGLE_POST_PROCESSING_FUNCTION,
1354 window_min_timestamp: float = None,
1355 window_max_timestamp: float = None,
1356 ):
1357 signal_id = f"post_processing.{base_signal_id.removeprefix('post_processing.')}.{function}"
1359 processed_result_signal = Signal.get_from_signal_id(signal_id)
1360 if processed_result_signal is not None and phase.id in processed_result_signal.computed_phases_ids:
1361 return cls.get_existing_function_signal(signal_id, window_min_timestamp, window_max_timestamp)
1363 signals_data = cls.get_from_phase_and_signal_ids(
1364 [phase], [None], [base_signal_id], [None], [None], zero_time_vector=False
1365 )
1367 if len(signals_data.signals_data) == 0 or signals_data.signals_data[0].number_samples == 0:
1368 return None
1370 new_values = None
1371 new_forced_values = None
1372 time_vector = npy.array(signals_data.signals_data[0].time_vector)
1373 values = signals_data.signals_data[0].values
1374 forced_values = signals_data.signals_data[0].forced_values
1376 match (function):
1377 case "Cumul":
1378 new_values = cumul(values)
1379 new_forced_values = cumul(forced_values)
1380 # case "CumulDistrib":
1381 # new_values = cumul_distrib(values)
1382 # new_forced_values = cumul_distrib(forced_values)
1383 case "Delta":
1384 new_values = delta(values)
1385 new_forced_values = delta(forced_values)
1386 case "DeltaT":
1387 new_values = delta(time_vector)
1388 new_forced_values = new_values
1389 case "Derive":
1390 new_values = derive(time_vector, values)
1391 new_forced_values = derive(time_vector, forced_values)
1392 case "Integ":
1393 new_values = integ(time_vector, values)
1394 new_forced_values = integ(time_vector, forced_values)
1396 new_values = npy.where(npy.isnan(new_values), None, new_values)
1397 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1399 loop = asyncio.get_running_loop()
1400 loop.create_task(
1401 cls.save_function_signal(
1402 phase, signal_id, time_vector, new_values, new_forced_values, signals_data.signals_data[0].forcible
1403 )
1404 )
1406 if window_max_timestamp is not None:
1407 max_timestamp_mask = time_vector <= window_max_timestamp
1408 time_vector = time_vector[max_timestamp_mask]
1409 new_values = new_values[max_timestamp_mask]
1410 new_forced_values = new_forced_values[max_timestamp_mask]
1411 if window_min_timestamp is not None:
1412 min_timestamp_mask = time_vector >= window_min_timestamp
1413 time_vector = time_vector[min_timestamp_mask]
1414 new_values = new_values[min_timestamp_mask]
1415 new_forced_values = new_forced_values[min_timestamp_mask]
1417 signals_data.signals_data[0].time_vector = time_vector.tolist()
1418 signals_data.signals_data[0].values = new_values.tolist()
1419 signals_data.signals_data[0].forced_values = new_forced_values.tolist()
1420 signals_data.signals_data[0].number_samples = time_vector.size
1422 signals_data.signals_data[0].signal_id = signal_id
1424 return signals_data
1426 @classmethod
1427 async def apply_multiple_function(
1428 cls,
1429 phases: list,
1430 signal_ids: list,
1431 function: DOUBLE_POST_PROCESSING_FUNCTION | MULTIPLE_POST_PROCESSING_FUNCTION,
1432 window_min_timestamp: float = None,
1433 window_max_timestamp: float = None,
1434 ):
1435 if function in get_args(DOUBLE_POST_PROCESSING_FUNCTION):
1436 function_signal_id = f"post_processing.{signal_ids[0].removeprefix('post_processing.')}.{function}.{signal_ids[1].removeprefix('post_processing.')}"
1437 else:
1438 function_signal_id = f"post_processing.{'.'.join([signal_id.removeprefix('post_processing.') for signal_id in signal_ids])}.{function}"
1440 active_phase = phases[0]
1441 if function in {"Align-X", "Using-X"}:
1442 active_phase = phases[1]
1444 processed_result_signal = Signal.get_from_signal_id(function_signal_id)
1445 if processed_result_signal is not None and (
1446 active_phase.id in processed_result_signal.computed_phases_ids
1447 ): # If signal has been computed for the correct phase
1448 return cls.get_existing_function_signal(function_signal_id, window_min_timestamp, window_max_timestamp)
1450 array_length = None
1451 time_vector_list = []
1452 values_list = []
1453 forced_values_list = []
1454 forcible = True
1455 for phase, signal_id in zip(phases, signal_ids):
1456 signals_data = cls.get_from_phase_and_signal_ids(
1457 [phase], [None], [signal_id], [None], [None], zero_time_vector=False
1458 )
1460 if len(signals_data.signals_data) == 0:
1461 return None
1463 signal_data = signals_data.signals_data[0]
1465 if array_length is None:
1466 array_length = signal_data.number_samples
1467 if (
1468 array_length != signal_data.number_samples and function != "Align-X"
1469 ) or signal_data.number_samples == 0:
1470 return None
1472 time_vector_list.append(npy.array(signal_data.time_vector))
1473 values_list.append(npy.array(signal_data.values, dtype=npy.float64))
1474 forced_values_list.append(npy.array(signal_data.forced_values, dtype=npy.float64))
1475 forcible = forcible and signal_data.forcible
1477 time_vector = time_vector_list[0]
1478 new_values = None
1479 new_forced_values = None
1481 match (function):
1482 case "Align-X":
1483 time_vector = time_vector_list[1]
1484 old_time_vector = time_vector_list[0] - phases[0].start_at / 1000
1485 new_time_vector = time_vector_list[1] - phases[1].start_at / 1000
1486 new_values = align_x(old_time_vector, values_list[0], new_time_vector)
1487 new_forced_values = align_x(old_time_vector, forced_values_list[0], new_time_vector)
1488 # case "Atan2":
1489 # new_values = atan2(values_list[0], values_list[1])
1490 # new_forced_values = atan2(forced_values_list[0], forced_values_list[1])
1491 case "Using-X":
1492 if len(time_vector_list[0]) != len(time_vector_list[1]):
1493 return None
1494 time_vector = time_vector_list[1]
1495 new_values = values_list[0]
1496 new_forced_values = forced_values_list[0]
1497 case "Mean":
1498 new_values = mean(*values_list)
1499 new_forced_values = mean(*forced_values_list)
1500 case "Norm":
1501 new_values = norm(*values_list)
1502 new_forced_values = norm(*forced_values_list)
1504 new_values = npy.where(npy.isnan(new_values), None, new_values)
1505 new_forced_values = npy.where(npy.isnan(new_forced_values), None, new_forced_values)
1507 loop = asyncio.get_running_loop()
1508 loop.create_task(
1509 cls.save_function_signal(
1510 active_phase, function_signal_id, time_vector, new_values, new_forced_values, forcible
1511 )
1512 )
1514 total_number_samples = time_vector.size
1516 if window_max_timestamp is not None:
1517 max_timestamp_mask = time_vector <= window_max_timestamp
1518 time_vector = time_vector[max_timestamp_mask]
1519 new_values = new_values[max_timestamp_mask]
1520 new_forced_values = new_forced_values[max_timestamp_mask]
1521 if window_min_timestamp is not None:
1522 min_timestamp_mask = time_vector >= window_min_timestamp
1523 time_vector = time_vector[min_timestamp_mask]
1524 new_values = new_values[min_timestamp_mask]
1525 new_forced_values = new_forced_values[min_timestamp_mask]
1527 signals_data = cls(
1528 signals_data=[
1529 NumericSignalData(
1530 signal_id=function_signal_id,
1531 forcible=forcible,
1532 time_vector=time_vector.tolist(),
1533 values=new_values.tolist(),
1534 forced_values=new_forced_values.tolist(),
1535 number_samples=time_vector.size,
1536 number_samples_db=total_number_samples,
1537 )
1538 ],
1539 data_processing_time=0,
1540 data_start=0,
1541 data_end=0,
1542 )
1544 return signals_data
1546 @classmethod
1547 def get_existing_function_signal(cls, signal_id: str, window_min_timestamp: float, window_max_timestamp: float):
1548 signal_data_collection = get_signal_collection(signal_id, create=True)
1549 pipeline = []
1550 match_filter = {}
1551 if window_min_timestamp is not None or window_max_timestamp is not None:
1552 match_filter["$match"] = {}
1553 match_filter["$match"]["precise_timestamp"] = {}
1554 if window_max_timestamp is not None:
1555 match_filter["$match"]["precise_timestamp"]["$lte"] = window_max_timestamp
1556 if window_min_timestamp is not None:
1557 match_filter["$match"]["precise_timestamp"]["$gte"] = window_min_timestamp
1559 total_number_samples = signal_data_collection.count_documents({})
1561 if match_filter:
1562 pipeline.append(match_filter)
1564 fetch_start = time.time()
1566 samples = signal_data_collection.aggregate(pipeline).to_list()
1567 new_time_vector = []
1568 new_values = []
1569 new_forced_values = []
1570 for sample in samples:
1571 new_time_vector.append(sample["precise_timestamp"])
1572 new_values.append(sample["value"])
1573 new_forced_values.append(sample["forced_value"])
1575 return cls(
1576 signals_data=[
1577 NumericSignalData(
1578 signal_id=signal_id,
1579 time_vector=new_time_vector,
1580 values=new_values,
1581 forced_values=new_forced_values,
1582 number_samples=len(new_time_vector),
1583 number_samples_db=total_number_samples,
1584 )
1585 ],
1586 data_processing_time=time.time() - fetch_start,
1587 data_start=0,
1588 data_end=0,
1589 )
1591 @classmethod
1592 async def save_function_signal(
1593 cls, phase, function_signal_id: str, time_vector, new_values, new_forced_values, forcible: bool
1594 ):
1595 # Insert data first so if it is requested by another user, it will be computed again
1596 signal_collection = get_signal_collection(function_signal_id, create=True)
1597 signal_collection.delete_many(
1598 {"precise_timestamp": {"$gte": phase.start_at / 1000, "$lte": phase.end_at / 1000}}
1599 )
1600 signal_collection.insert_many(
1601 [
1602 {
1603 "timestamp": datetime.datetime.fromtimestamp(time_vector[i]),
1604 "precise_timestamp": time_vector[i],
1605 "value": new_values[i],
1606 "forced_value": new_forced_values[i],
1607 }
1608 for i in range(len(time_vector))
1609 ]
1610 )
1612 signals_config_collection = get_collection(systems_database, "signals", create=True)
1613 signals_config_collection.find_one_and_update(
1614 {"signal_id": function_signal_id},
1615 {
1616 "$set": {
1617 "description": "",
1618 "unit": None,
1619 "type": "sensor",
1620 "address": None,
1621 "frequency": 1 / max(npy.mean(delta(time_vector)), 1), # avoid division by 0
1622 "transfer_function": None,
1623 "precision_digits": None,
1624 "digitization_function": None,
1625 "data_type": "float",
1626 "formula": None,
1627 "forcible": forcible,
1628 "commandable": False,
1629 "broadcastable": False,
1630 "signal_id": function_signal_id,
1631 "post_processing": True,
1632 },
1633 "$push": {"computed_phases_ids": phase.id},
1634 },
1635 upsert=True,
1636 )
1638 def zip_export(self, file_format: str = "csv", post_processing: bool = False, phase_ids: list = []):
1639 if post_processing:
1640 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1641 zip_buffer = io.BytesIO()
1642 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1643 for signal_data in self.signals_data:
1644 file_name = signal_data.signal_id
1645 if post_processing:
1646 phase = phases_by_id.get(
1647 signal_data.phase_id, Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="0")
1648 )
1649 file_name = f"{signal_data.signal_id} ({phase.name})"
1650 if file_format == "csv":
1651 export_io = signal_data.csv_export()
1652 zip_file.writestr(f"{file_name}.csv", export_io)
1653 elif file_format == "prestoplot":
1654 export_io = signal_data.prestoplot_export()
1655 zip_file.writestr(f"{file_name}.tab", export_io)
1656 else:
1657 raise ValueError(f"Format not found. Got: {file_format}")
1658 zip_bytes = zip_buffer.getvalue()
1659 return zip_bytes
1661 def hdf5_export(self, post_processing: bool = False, phase_ids: list = []):
1662 if post_processing:
1663 phases_by_id = {phase_id: Phase.get_from_id(phase_id) for phase_id in phase_ids}
1664 hdf5_buffer = io.BytesIO()
1665 custom_type_float = npy.dtype(
1666 [("date", "S31"), ("timestamp", npy.float64), ("value", npy.float64), ("forced_value", npy.float64)]
1667 )
1668 custom_type_string = npy.dtype(
1669 [("date", "S31"), ("timestamp", npy.float64), ("value", "S30"), ("forced_value", "S30")]
1670 )
1671 with h5py.File(hdf5_buffer, "w") as hdf5_file:
1672 for signal_data in self.signals_data:
1673 if post_processing:
1674 phase = phases_by_id.get(
1675 signal_data.phase_id, Phase(name="Unknown phase", start_at=0, end_at=0, campaign_id="0")
1676 )
1677 signal_group = hdf5_file.create_group(f"{signal_data.signal_id} ({phase.name})")
1678 else:
1679 signal_group = hdf5_file.create_group(signal_data.signal_id)
1680 date_vector = [get_utc_date_from_timestamp(ts) for ts in signal_data.time_vector]
1681 if signal_data.data_type == "str":
1682 export_data = npy.array(
1683 list(
1684 zip(
1685 date_vector,
1686 signal_data.time_vector,
1687 signal_data.values,
1688 signal_data.forced_values,
1689 )
1690 ),
1691 dtype=custom_type_string,
1692 )
1693 else:
1694 export_data = npy.array(
1695 list(
1696 zip(
1697 date_vector,
1698 signal_data.time_vector,
1699 signal_data.values,
1700 signal_data.forced_values,
1701 )
1702 ),
1703 dtype=custom_type_float,
1704 )
1705 signal_group["data"] = export_data
1706 return hdf5_buffer.getvalue()
1709class SignalStatus(TwinPadModel):
1710 status: str = "down"
1711 reason: str = ""
1712 delay: float | None = None
1715class DigitizationFunction(TwinPadModel):
1716 bits: int | None = None
1717 min_value: float
1718 max_value: float
1719 min_raw_value: float
1720 max_raw_value: float
1723class SignalUpdate(TwinPadModel):
1724 value: float | str | bool | int | None = None
1725 forced_value: float | str | bool | int | None = None
1726 timestamp: int | None = None
1729class SignalType(str, Enum):
1730 command = "command"
1731 sensor = "sensor"
1732 external_sensor = "external_sensor"
1735SIGNALDATA_TYPES = {
1736 "int": NumericSignalData,
1737 "float": NumericSignalData,
1738 "str": StringSignalData,
1739 "bool": NumericSignalData,
1740 "epoch": NumericSignalData,
1741}
1744class Signal(GenericMongo):
1745 collection_name: ClassVar[str] = "signals"
1747 signal_id: str
1748 frequency: float
1749 unit: str | None
1750 description: str
1751 type: SignalType
1752 data_type: str
1753 precision_digits: int | None
1754 forcible: bool
1755 status: SignalStatus = SignalStatus()
1757 post_processing: bool = False
1758 computed_phases_ids: list[str] = []
1760 digitization_function: DigitizationFunction | None
1762 @property
1763 def device(self) -> Device:
1764 device_id = self.signal_id.split(".")[0]
1765 device = Device.get_one_by_attribute("device_id", device_id)
1766 return device
1768 @cached_property
1769 def signal_data_class(self):
1770 if self.data_type in SIGNALDATA_TYPES:
1771 return SIGNALDATA_TYPES[self.data_type]
1772 if self.data_type.startswith("enum"):
1773 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1774 raise ValueError(f"Unhandled python type: {self.data_type}")
1776 @cached_property
1777 def python_type(self):
1778 if self.data_type in TYPES:
1779 return TYPES[self.data_type]
1780 if self.data_type.startswith("enum"):
1781 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1782 return Literal[*choices]
1783 raise ValueError(f"Unhandled python type: {self.data_type}")
1785 async def send_command(self, update_dict: SignalUpdate, current_user: User) -> dict:
1786 command = Command(
1787 sent_at=time.time(),
1788 command_type="Signal command",
1789 user_id=current_user.id,
1790 )
1792 has_input_error = False
1793 error_message = ""
1795 if self.data_type.startswith("enum"):
1796 enum_options = get_args(self.python_type)
1798 if update_dict.value is not None and update_dict.value not in enum_options:
1799 has_input_error = True
1800 error_message += f"Invalid value: {update_dict.value} not in {enum_options}\n"
1801 if update_dict.forced_value is not None and update_dict.forced_value not in enum_options:
1802 has_input_error = True
1803 error_message += f"Invalid type for forced value: {update_dict.forced_value} not in {enum_options}\n"
1804 else:
1805 if update_dict.value is not None and not is_of_type(update_dict.value, self.python_type):
1806 has_input_error = True
1807 error_message += f"Invalid type for value: {update_dict.value.__class__} is not {self.python_type}\n"
1808 if update_dict.forced_value is not None and not is_of_type(update_dict.forced_value, self.python_type):
1809 has_input_error = True
1810 error_message += (
1811 f"Invalid type for forced value: {update_dict.forced_value.__class__} is not {self.python_type}\n"
1812 )
1814 if has_input_error:
1815 command.response_time = 0
1816 command.succeeded = False
1817 command.description = f"Tried to modify signal {self.signal_id}"
1818 response = {"error": True, "status_code": 400, "message": error_message}
1819 else:
1820 response = await send_signal_value(self.signal_id, update_dict)
1821 command.receive_response(response)
1823 Command.create(command)
1824 return response
1826 @classmethod
1827 def get_from_signal_id(cls, signal_id) -> Self:
1828 """Could be generic from mongo"""
1829 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1830 if not raw_value:
1831 return None
1832 del raw_value["_id"]
1833 return cls.dict_to_object(raw_value)
1835 @classmethod
1836 def get_all_ids(cls) -> list[str]:
1837 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1839 return [signal["signal_id"] for signal in cursor]
1841 @classmethod
1842 def get_all_statuses(cls) -> list[dict]:
1843 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "status": 1, "_id": 0}}])
1845 return [
1846 {"signal_id": signal["signal_id"], "status": signal.get("status", {"status": "down"})["status"]}
1847 for signal in cursor
1848 ]
1850 async def number_samples(self):
1851 collection = get_signal_collection(signal_id=self.signal_id)
1852 if collection is None:
1853 return 0
1855 number_samples = collection.estimated_document_count()
1857 number_samples_async_collection = await get_async_collection(
1858 systems_async_database, "number_samples", create=True, time_series=True
1859 )
1861 loop = asyncio.get_running_loop()
1862 loop.create_task(
1863 number_samples_async_collection.insert_one(
1864 {
1865 "timestamp": datetime.datetime.now(pytz.UTC),
1866 "signal_id": self.signal_id,
1867 "number_samples": number_samples,
1868 }
1869 )
1870 )
1872 return number_samples
1874 @classmethod
1875 async def number_samples_batch(cls, signal_ids: list[str]) -> dict[str, int]:
1876 number_samples_by_id = {}
1877 collections = get_signal_collections_batch(signal_ids)
1878 number_samples_async_collection = await get_async_collection(
1879 systems_async_database, "number_samples", create=True, time_series=True
1880 )
1882 for signal_id, collection in zip(signal_ids, collections):
1883 if collection is None:
1884 number_samples_by_id[signal_id] = 0
1885 continue
1887 number_samples = collection.estimated_document_count()
1889 number_samples_by_id[signal_id] = number_samples
1891 now = datetime.datetime.now(pytz.UTC)
1892 loop = asyncio.get_running_loop()
1893 loop.create_task(
1894 number_samples_async_collection.insert_many(
1895 [
1896 {
1897 "timestamp": now,
1898 "signal_id": signal_id,
1899 "number_samples": number_samples,
1900 }
1901 for signal_id, number_samples in number_samples_by_id.items()
1902 ]
1903 )
1904 )
1906 return number_samples_by_id
1908 def sample_datasize(self):
1909 return signals_database.command("collstats", self.signal_id)["size"]
1911 @classmethod
1912 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1913 result = cls.collection().aggregate(
1914 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1915 )
1917 return {signal["signal_id"]: signal["forcible"] for signal in result}
1920class ForcedSignal(GenericMongo):
1921 collection_name: ClassVar[str] = "forced_signals"
1923 signal_id: str
1924 forcing_user_id: str
1925 forced_at: float
1926 value: str | float
1928 def insert(self):
1929 insert_result = self.collection().find_one_and_update(
1930 {"signal_id": self.signal_id},
1931 {"$set": self.to_dict(exclude={"id"})},
1932 upsert=True,
1933 return_document=ReturnDocument.AFTER,
1934 )
1935 self.id = str(insert_result["_id"])
1936 return self.id
1938 @classmethod
1939 def can_force(cls, signal_id: str, current_user: User) -> bool:
1940 """Checks whether user can force a given signal.
1942 :param signal_id: Signal ID of the signal to force
1943 :type signal_id: str
1944 :param current_user: Current user
1945 :type current_user: User
1946 :return: False if the signal was forced by someone else than the user, True otherwise
1947 :rtype: bool
1948 """
1949 forced_signal = cls.get_one_by_attribute("signal_id", signal_id)
1950 if forced_signal is not None:
1951 if forced_signal.forcing_user_id != current_user.id and not current_user.is_admin:
1952 return False
1953 return True
1956class ServicesStatus(TwinPadModel):
1957 backend: str
1958 cloud_broker: str
1959 time_series_database: str
1960 signal_storage: str
1961 heartbeat_storage: str
1962 data_analyzer: str
1964 @classmethod
1965 def check(cls) -> Self:
1966 return cls(
1967 cloud_broker=ping(RABBITMQ_HOST),
1968 backend="up",
1969 time_series_database=ping(MONGO_HOST),
1970 signal_storage=ping(SIGNAL_STORAGE_HOST),
1971 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1972 data_analyzer=ping(DATA_ANALYZER_HOST),
1973 )
1976def ping(host):
1977 try:
1978 if ping3.ping(host, timeout=0.8):
1979 return "up"
1980 except PermissionError:
1981 pass
1982 return "down"
1985class Event(GenericMongo):
1986 collection_name: ClassVar[str] = "events"
1988 name: str
1989 timestamp: float
1990 event_rule_id: str
1992 @computed_field
1993 @cached_property
1994 def event_rule(self) -> "EventRule":
1995 return EventRule.get_from_id(self.event_rule_id)
1997 @classmethod
1998 def dict_to_object(cls, dict_):
1999 """Refine to convert timestamp to datetime for mongodb."""
2000 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
2001 return super().dict_to_object(dict_)
2004class TwinPadActivity(GenericMongo):
2005 timestamp: float
2006 amount: int
2008 @classmethod
2009 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_amount) -> list[Self]:
2010 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2011 number_events_collection = get_collection(systems_database, "number_events")
2012 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
2013 items = []
2014 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2015 if number_events_collection is None or recompute_amount:
2016 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
2017 number_events_collection.delete_many({})
2018 first_event = events_collection.find_one(sort={"timestamp": 1})
2019 if first_event is None:
2020 return items
2021 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
2022 tzinfo=pytz.UTC
2023 )
2024 while last_computed_day < TODAY:
2025 day_nb_events = events_collection.count_documents(
2026 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2027 )
2028 if day_nb_events > 0:
2029 number_events_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_events})
2030 last_computed_day += ONE_DAY_OFFSET
2031 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
2032 if number_events_today > 0:
2033 number_events_collection.delete_many({"timestamp": TODAY})
2034 number_events_collection.insert_one({"timestamp": TODAY, "amount": number_events_today})
2035 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2036 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2037 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2038 for day in number_events:
2039 day["timestamp"] = day["timestamp"].timestamp()
2040 items.append(cls.mongo_dict_to_object(day))
2041 return items
2043 @classmethod
2044 def get_number_samples_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2045 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2046 number_samples_collection = get_collection(systems_database, "number_received_samples", create=False)
2047 signals_number_samples_collection = get_collection(
2048 signals_database, "signal_storage._NUMBER_SAMPLES", create=True, time_series=True
2049 )
2050 items = []
2051 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2052 if number_samples_collection is None or recompute_amount:
2053 number_samples_collection = get_collection(
2054 systems_database, "number_received_samples", create=True, time_series=True
2055 )
2056 number_samples_collection.delete_many({})
2057 first_sample = signals_number_samples_collection.find_one(sort={"timestamp": 1})
2058 if first_sample is None:
2059 return items
2060 # compute from day of first found event
2061 last_computed_day = datetime.datetime.combine(first_sample["timestamp"], datetime.time.min).replace(
2062 tzinfo=pytz.UTC
2063 )
2064 while last_computed_day < TODAY:
2065 number_samples_request = signals_number_samples_collection.aggregate(
2066 [
2067 {
2068 "$match": {
2069 "timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}
2070 }
2071 },
2072 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2073 ]
2074 ).to_list()
2075 if len(number_samples_request) == 0:
2076 number_samples = 0
2077 else:
2078 number_samples = number_samples_request[0].get("number_samples", 0)
2079 if number_samples > 0:
2080 number_samples_collection.insert_one({"timestamp": last_computed_day, "amount": number_samples})
2081 last_computed_day += ONE_DAY_OFFSET
2082 number_samples_request = signals_number_samples_collection.aggregate(
2083 [
2084 {"$match": {"timestamp": {"$gte": TODAY}}},
2085 {"$group": {"_id": "temp", "number_samples": {"$sum": "$value"}}},
2086 ]
2087 ).to_list()
2088 if len(number_samples_request) == 0:
2089 number_samples_today = 0
2090 else:
2091 number_samples_today = number_samples_request[0].get("number_samples", 0)
2092 if number_samples_today > 0:
2093 number_samples_collection.delete_many({"timestamp": TODAY})
2094 number_samples_collection.insert_one({"timestamp": TODAY, "amount": number_samples_today})
2095 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2096 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2097 number_events = number_samples_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2098 for day in number_events:
2099 day["timestamp"] = day["timestamp"].timestamp()
2100 items.append(cls.mongo_dict_to_object(day))
2101 return items
2103 @classmethod
2104 def get_number_commands_timeframe(cls, min_timestamp, max_timestamp, recompute_amount: bool = False) -> list[Self]:
2105 ONE_DAY_OFFSET = datetime.timedelta(days=1)
2106 number_commands_collection = get_collection(systems_database, "number_commands")
2107 commands_collection = get_collection(systems_database, "commands", create=True, time_series=True)
2108 items = []
2109 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
2110 if number_commands_collection is None or recompute_amount:
2111 number_commands_collection = get_collection(
2112 systems_database, "number_commands", create=True, time_series=True
2113 )
2114 number_commands_collection.delete_many({})
2115 first_command = commands_collection.find_one(sort={"timestamp": 1})
2116 if first_command is None:
2117 return items
2118 last_computed_day = datetime.datetime.combine(first_command["timestamp"], datetime.time.min).replace(
2119 tzinfo=pytz.UTC
2120 )
2121 while last_computed_day < TODAY:
2122 day_nb_commands = commands_collection.count_documents(
2123 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
2124 )
2125 if day_nb_commands > 0:
2126 number_commands_collection.insert_one({"timestamp": last_computed_day, "amount": day_nb_commands})
2127 last_computed_day += ONE_DAY_OFFSET
2128 number_commands_today = commands_collection.count_documents({"timestamp": {"$gte": TODAY}})
2129 if number_commands_today > 0:
2130 number_commands_collection.delete_many({"timestamp": TODAY})
2131 number_commands_collection.insert_one({"timestamp": TODAY, "amount": number_commands_today})
2132 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
2133 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
2134 number_commands = number_commands_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
2135 for day in number_commands:
2136 day["timestamp"] = day["timestamp"].timestamp()
2137 items.append(cls.mongo_dict_to_object(day))
2138 return items
2141class EventRule(GenericMongo):
2142 collection_name: ClassVar[str] = "event_rules"
2144 name: str
2145 formula: str
2146 variables: list[str]
2148 @computed_field
2149 @cached_property
2150 def number_events(self) -> int:
2151 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
2154class Company(GenericMongo):
2155 collection_name: ClassVar[str] = "companies"
2156 name: str
2159class Campaign(GenericMongo):
2160 collection_name: ClassVar[str] = "campaigns"
2162 # Properties
2163 id: str | None = None
2164 name: str
2165 description: str | None = None
2167 @classmethod
2168 def create(cls, campaign: Self):
2169 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
2170 if new_campaign is None:
2171 return None
2172 return {"campaign_id": str(new_campaign.inserted_id)}
2174 @classmethod
2175 def update(cls, campaign: Self):
2176 updated_campaign = cls.collection().find_one_and_update(
2177 {"_id": ObjectId(campaign.id)},
2178 {"$set": {"name": campaign.name, "description": campaign.description}},
2179 return_document=ReturnDocument.AFTER,
2180 )
2181 return updated_campaign
2183 @classmethod
2184 def delete(cls, campaign_id):
2185 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
2186 return deleted_user
2189class Phase(GenericMongo):
2190 collection_name: ClassVar[str] = "phases"
2192 # Properties
2193 id: str | None = None
2194 name: str
2195 description: str | None = None
2196 start_at: float
2197 end_at: float
2199 # FK
2200 campaign_id: str
2202 # @classmethod
2203 # def get_by_date(cls, datetime: float):
2204 # phases = []
2205 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
2206 # phases.append(cls.dict_to_object(dict_).model_dump())
2207 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
2208 # phases.append(cls.dict_to_object(dict_).model_dump())
2209 # if phases is None:
2210 # return None
2211 # return phases
2213 @classmethod
2214 def create(cls, phase: Self):
2215 phase = Phase(
2216 name=phase.name,
2217 description=phase.description,
2218 start_at=phase.start_at,
2219 end_at=phase.end_at,
2220 campaign_id=phase.campaign_id,
2221 )
2222 phase_collection = get_collection(systems_database, "phases", create=True)
2223 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
2224 if new_phase is None:
2225 return None
2226 return {"phase_id": str(new_phase.inserted_id)}
2228 @classmethod
2229 def update(cls, phase: Self):
2230 updated_phase = cls.collection().find_one_and_update(
2231 {"_id": ObjectId(phase.id)},
2232 {
2233 "$set": {
2234 "name": phase.name,
2235 "description": phase.description,
2236 "start_at": phase.start_at,
2237 "end_at": phase.end_at,
2238 }
2239 },
2240 return_document=ReturnDocument.AFTER,
2241 )
2242 return updated_phase
2244 @classmethod
2245 def delete(cls, phase_id):
2246 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
2247 return delete_phase
2249 @classmethod
2250 def deleteMany(cls, campaign_id):
2251 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
2252 return delete_phases
2255class CustomViewCreation(GenericMongo):
2256 collection_name: ClassVar[str] = "custom_views"
2258 name: str
2259 configuration: list
2262class CustomView(CustomViewCreation):
2263 # Properties
2264 id: str | None = None
2266 # Foreign Key
2267 user_id: str
2269 # # Methods
2270 # @classmethod
2271 # def create(cls, form_custom_view: Self, user_id) -> list:
2272 # custom_view = CustomView(
2273 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
2274 # )
2275 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
2276 # return new_custom_view
2278 # @classmethod
2279 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
2280 # updated_custom_view = cls.collection().find_one_and_update(
2281 # {"_id": ObjectId(custom_view_id)},
2282 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
2283 # return_document=ReturnDocument.AFTER,
2284 # )
2285 # updated_custom_view["id"] = str(updated_custom_view["_id"])
2286 # del updated_custom_view["_id"]
2287 # return cls(**updated_custom_view)
2289 # @classmethod
2290 # def delete(cls, custom_view_id) -> bool:
2291 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
2292 # return deleted_custom_view.acknowledged
2295CustomViewUpdate = create_update_model(CustomView)
2298class Video(GenericMongo):
2299 collection_name: ClassVar[str] = "videos"
2301 # Properties
2302 name: str
2303 ip_addr: str
2304 username: str | None = None
2305 password: str | None = None
2307 # Methods
2308 @classmethod
2309 def get_all(cls, sort_by="_id") -> list[Self]:
2310 items = []
2311 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
2312 items.append(cls.mongo_dict_to_object(dict_))
2313 return items
2315 @classmethod
2316 def get_video(cls, camera_id: ObjectId):
2317 camera = cls.get_from_id(camera_id)
2318 if camera is not None:
2319 return camera.name
2320 return None
2323class Command(GenericMongo):
2324 collection_name: ClassVar[str] = "commands"
2326 # Properties
2327 timestamp: datetime.datetime = None
2328 sent_at: float
2329 response_time: float = 0.0
2330 command_type: str
2331 description: str = ""
2332 succeeded: bool = False
2334 # Foreign key
2335 user_id: str
2337 @classmethod
2338 def collection(cls):
2339 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
2341 @classmethod
2342 def create(cls, command: Self):
2343 command = cls(
2344 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
2345 sent_at=command.sent_at,
2346 response_time=command.response_time,
2347 command_type=command.command_type,
2348 description=command.description,
2349 succeeded=command.succeeded,
2350 user_id=command.user_id,
2351 )
2352 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
2353 if new_command is None:
2354 return None
2355 return {"command_id": str(new_command.inserted_id)}
2357 def receive_response(self, response: dict):
2358 self.response_time = time.time() - self.sent_at
2359 self.succeeded = response.get("error", True) is False
2360 if self.description == "":
2361 self.description += response.get("message", "").rstrip()
2364class SignalsPresetCreation(GenericMongo):
2365 name: str
2366 signal_ids: list[str]
2369class SignalsPreset(SignalsPresetCreation):
2370 collection_name: ClassVar[str] = "signals_presets"
2372 user_id: str
2374 @classmethod
2375 def create(cls, signals_preset: SignalsPresetCreation, user_id: str):
2376 signals_preset = cls(
2377 user_id=user_id,
2378 name=signals_preset.name,
2379 signal_ids=signals_preset.signal_ids,
2380 )
2382 new_signal_preset = cls.collection().insert_one(signals_preset.model_dump(exclude={"id"}))
2384 return str(new_signal_preset.inserted_id)
2387SignalsPresetUpdate = create_update_model(SignalsPreset)
2390class LineStyle(str, Enum):
2391 solid = "solid"
2392 dotted = "dotted"
2393 dashed = "dashed"
2396class SignalAppearance:
2397 value_color: str
2398 forced_value_color: str
2401class GraphThemeCreation(GenericMongo):
2402 collection_name: ClassVar[str] = "graph_themes"
2404 name: str
2405 signal_id: str
2406 value_color: str = ""
2407 forced_value_color: str = ""
2408 value_line_style: LineStyle = LineStyle.solid
2409 forced_value_line_style: LineStyle = LineStyle.solid
2410 private: bool = True
2413class PublicGraphTheme(GraphThemeCreation):
2414 created_by_user: bool
2415 in_user_library: bool
2416 active_for_user: bool
2418 _current_user_id: str = ""
2420 @classproperty
2421 def custom_pipeline_steps(cls) -> dict[str, list]:
2422 return {
2423 "created_by_user": [
2424 {
2425 "$addFields": {
2426 "created_by_user": {"$eq": ["$creator_id", cls._current_user_id]},
2427 }
2428 }
2429 ],
2430 "default": [ # never allow to return a theme the user isn't be allowed to see (ie. not his and not shared), done as early as possible
2431 {"$match": {"$or": [{"private": False}, {"created_by_user": True}]}}
2432 ],
2433 "in_user_library": [
2434 {
2435 "$addFields": {
2436 "in_user_library": {"$in": [cls._current_user_id, "$in_library"]},
2437 }
2438 }
2439 ],
2440 "active_for_user": [
2441 {
2442 "$addFields": {
2443 "active_for_user": {"$in": [cls._current_user_id, "$active"]},
2444 }
2445 }
2446 ],
2447 "in_library": [{"$addFields": {"in_library": "$$REMOVE"}}],
2448 "active": [
2449 {
2450 "$addFields": {
2451 "active": "$$REMOVE",
2452 }
2453 }
2454 ],
2455 "creator_id": [
2456 {
2457 "$addFields": {
2458 "creator_id": "$$REMOVE",
2459 }
2460 }
2461 ],
2462 }
2464 @classmethod
2465 def response_from_query(cls, query, user_id: str) -> ListResponse[Self]:
2466 cls._current_user_id = user_id
2467 return super().response_from_query(query)
2469 @classmethod
2470 def response_from_query_in_user_library(cls, query, user_id: str) -> ListResponse[Self]:
2471 query.in_user_library = "true"
2472 return cls.response_from_query(query, user_id)
2474 @classmethod
2475 def get_from_id(cls, item_id, user_id: str) -> Self | None:
2476 return cls.get_one_by_attribute("_id", ObjectId(item_id), user_id)
2478 @classmethod
2479 def get_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2480 cls._current_user_id = user_id
2481 return super().get_by_attribute(attribute_name, attribute_value)
2483 @classmethod
2484 def get_one_by_attribute(cls, attribute_name: str, attribute_value, user_id: str):
2485 cls._current_user_id = user_id
2486 return super().get_one_by_attribute(attribute_name, attribute_value)
2488 @classmethod
2489 def get_all(cls, sort_by: str, user_id: str):
2490 cls._current_user_id = user_id
2491 return super().get_all(sort_by)
2493 @classmethod
2494 def get_signal_appearances(cls, signal_ids: list[str], user_id: str) -> dict:
2495 pipeline = [
2496 {
2497 "$match": {
2498 "active": {"$eq": user_id},
2499 "signal_id": {"$in": signal_ids},
2500 }
2501 },
2502 {"$group": {"_id": "$signal_id", "firstDocument": {"$first": "$$ROOT"}}},
2503 {"$replaceRoot": {"newRoot": "$firstDocument"}},
2504 {
2505 "$project": {
2506 "_id": 0,
2507 "signal_id": 1,
2508 "value_color": 1,
2509 "forced_value_color": 1,
2510 "value_line_style": 1,
2511 "forced_value_line_style": 1,
2512 }
2513 },
2514 ]
2516 result = {}
2518 cursor = cls.collection().aggregate(pipeline)
2519 for document in cursor:
2520 signal_id = document["signal_id"]
2521 del document["signal_id"]
2522 result[signal_id] = document
2524 return result
2527GraphThemeUpdate = create_update_model(PublicGraphTheme)
2530class PrivateGraphTheme(GraphThemeCreation):
2531 # private
2532 creator_id: str
2533 in_library: list[str]
2534 active: list[str]
2536 @classmethod
2537 def create(
2538 cls,
2539 creator_id: str,
2540 name: str,
2541 signal_id: str,
2542 value_color: str,
2543 forced_value_color: str,
2544 value_line_style: LineStyle,
2545 forced_value_line_style: LineStyle,
2546 private: bool,
2547 ):
2548 color_setting = cls(
2549 creator_id=creator_id,
2550 name=name,
2551 signal_id=signal_id,
2552 value_color=value_color,
2553 forced_value_color=forced_value_color,
2554 value_line_style=value_line_style,
2555 forced_value_line_style=forced_value_line_style,
2556 private=private,
2557 in_library=[creator_id],
2558 active=[creator_id],
2559 )
2561 new_color_setting = cls.collection().insert_one(color_setting.model_dump(exclude={"id": True}))
2562 color_setting.id = str(new_color_setting.inserted_id)
2563 return color_setting
2565 def update(self, update_dict: dict, user_id: str):
2566 if (in_user_lib := update_dict.get("in_user_library")) is not None:
2567 if in_user_lib and user_id not in self.in_library:
2568 self.in_library.append(user_id)
2569 elif not in_user_lib and user_id in self.in_library:
2570 self.in_library.remove(user_id)
2571 update_dict["in_library"] = self.in_library
2572 del update_dict["in_user_library"]
2574 if (active_for_user := update_dict.get("active_for_user")) is not None:
2575 if active_for_user and user_id not in self.active:
2576 self.active.append(user_id)
2577 elif not active_for_user and user_id in self.active:
2578 self.active.remove(user_id)
2579 update_dict["active"] = self.active
2580 del update_dict["active_for_user"]
2582 if update_dict.get("created_by_user") is not None:
2583 del update_dict["created_by_user"]
2585 self.collection().find_one_and_update(
2586 {"_id": ObjectId(self.id)},
2587 {"$set": update_dict},
2588 )
2590 return {}