Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 89%
714 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-03 07:30 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-03 07:30 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Optional
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12from typing import Literal, Dict
14import zipfile
15import ping3
16import pytz
17from bson.objectid import ObjectId
18from pymongo import ASCENDING, ReturnDocument
19from pydantic import BaseModel, computed_field, Field, create_model
20import numpy as npy
22# from scipy import signal as signal_scipy
24from twinpad_backend.db import (
25 get_collection,
26 get_signal_collection,
27 systems_database,
28 signals_database,
29 devices_states_database,
30)
31from twinpad_backend.responses import ListResponse
32from twinpad_backend.messages import send_mode_change, send_signal_value
34TYPES = ({"int": int, "float": float, "str": str, "bool": bool, "epoch": float},)
37RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
38MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
39SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
40HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
41DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
43DEVICE_TIMEOUT = 5.0
44NUMBER_SAMPLES_DATABASE_UPDATE = 120
46logger = logging.getLogger("uvicorn.error")
48number_samples_collection = get_collection(systems_database, "number_samples", create=True, time_series=True)
51def create_update_model(model):
52 fields = {}
54 for field_name, field_annotation in model.model_fields.items():
55 if field_name != "id":
56 fields[field_name] = (Optional[field_annotation.annotation], None)
58 query_name = model.__name__ + "Update"
59 return create_model(query_name, **fields)
62def get_utc_date_from_timestamp(timestamp: float):
63 return (
64 datetime.datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %-H:%M:%S.%f")[:-3]
65 + " UTC"
66 )
69# Models
70class TwinPadModel(BaseModel):
71 @classmethod
72 def dict_to_object(cls, dict_):
73 return cls.model_validate(dict_)
75 def to_dict(self, exclude=None):
76 dict_ = self.model_dump(exclude=exclude)
77 return dict_
80class GenericMongo(TwinPadModel):
81 id: str | None = None
83 @classmethod
84 def collection(self):
85 return get_collection(systems_database, self.collection_name, create=True)
87 @classmethod
88 def response_from_query(cls, query) -> ListResponse[Self]:
89 req_filter = query.mongodb_filter()
90 items = []
91 if ":" in query.sort_by:
92 sort_field, sort_order = query.sort_by.split(":")
93 sort_order = int(sort_order)
94 else:
95 sort_field = query.sort_by
96 sort_order = 1
97 collection = get_collection(systems_database, cls.collection_name, create=True)
98 total = collection.count_documents(req_filter)
99 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
100 if (query.limit is not None) and (query.limit != 0):
101 cursor = cursor.limit(query.limit)
102 for item_dict in cursor:
103 items.append(cls.mongo_dict_to_object(item_dict))
104 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
106 @classmethod
107 def get_from_id(cls, item_id) -> Self | None:
108 # collection = get_collection(systems_database, cls.collection_name, create=True)
110 dict_ = cls.collection().find_one({"_id": ObjectId(item_id)})
111 if dict_ is None:
112 return None
113 return cls.mongo_dict_to_object(dict_)
115 @classmethod
116 def mongo_dict_to_object(cls, mongo_dict):
117 mongo_dict["id"] = str(mongo_dict["_id"])
118 del mongo_dict["_id"]
119 return cls.dict_to_object(mongo_dict)
121 @classmethod
122 def get_by_attribute(cls, attribute_name: str, attribute_value):
123 """Returns all items that match the attribute with value."""
124 items = cls.collection().find({attribute_name: attribute_value})
125 if items is None:
126 return None
127 return [cls.mongo_dict_to_object(d) for d in items]
129 @classmethod
130 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
131 item = cls.collection().find_one({attribute_name: attribute_value})
132 if item is None:
133 return None
134 return cls.mongo_dict_to_object(item)
136 @classmethod
137 def get_all(cls, sort_by="_id") -> list[Self]:
138 items = []
139 for dict_ in cls.collection().find({}).sort(sort_by, ASCENDING):
140 items.append(cls.mongo_dict_to_object(dict_))
141 return items
143 def insert(self):
144 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
145 self.id = str(insert_result.inserted_id)
146 return self.id
148 def update(self, update_dict):
149 for key, value in update_dict.items():
150 setattr(self, key, value)
151 self.collection().find_one_and_update(
152 {"_id": ObjectId(self.id)},
153 {"$set": update_dict},
154 return_document=ReturnDocument.AFTER,
155 )
157 return self
159 def delete(self):
160 result = self.collection().delete_one({"_id": ObjectId(self.id)})
161 return result.deleted_count > 0
164class Mode(TwinPadModel):
165 mode_id: int
166 name: str
167 frequency_multiplier: float
168 min_frequency: float
171class DeviceUpdate(TwinPadModel):
172 mode_id: int
175class Device(GenericMongo):
176 collection_name: ClassVar[str] = "devices"
178 device_id: str
179 name: str
180 description: str = ""
181 modes: list[Mode]
182 current_mode_id: int | None = None
183 last_ping: float | None = None
184 petri_network: Any
185 pid: Any
186 load: float | None = None
187 tokens: list[int] = Field(default_factory=list)
189 def update(self, update_device):
190 send_mode_change(self.device_id, update_device.mode_id)
192 @computed_field
193 @property
194 def status(self) -> str:
195 now = time.time()
196 if self.last_ping is None:
197 return "down"
198 if (now - self.last_ping) > DEVICE_TIMEOUT:
199 return "down"
200 return "up"
203class DeviceSetup(GenericMongo):
204 collection_name: ClassVar[str] = "device_setups"
206 device_ids: list[str]
207 active: bool = False
208 variable_mapping: Dict[str, str]
211DeviceSetupUpdate = create_update_model(DeviceSetup)
214class DeviceState(GenericMongo):
215 collection_name: ClassVar[str] = "devices_states"
217 timestamp: float
218 mode_id: int | None = None
219 load: float | None = None
220 tokens: list[int] = Field(default_factory=list)
222 @classmethod
223 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
224 req_filter = query.mongodb_filter()
225 items = []
226 if ":" in query.sort_by:
227 sort_field, sort_order = query.sort_by.split(":")
228 sort_order = int(sort_order)
229 else:
230 sort_field = query.sort_by
231 sort_order = 1
232 collection = get_collection(devices_states_database, device_id, create=True)
233 total = collection.count_documents(req_filter)
234 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
235 if (query.limit is not None) and (query.limit != 0):
236 cursor = cursor.limit(query.limit)
237 for item_dict in cursor:
238 items.append(
239 cls(
240 timestamp=item_dict.get("precise_timestamp"),
241 mode_id=item_dict.get("mode_id", None),
242 load=item_dict.get("load", None),
243 tokens=item_dict.get("tokens", Field(default_factory=list)),
244 )
245 )
246 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
249class SignalSample(TwinPadModel):
250 signal_id: str
251 timestamp: float
252 value: float | int | str | bool | None
253 forced_value: float | int | str | bool | None = None
255 @classmethod
256 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
258 collection = get_signal_collection(signal_id)
259 if collection is None:
260 return None
262 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
263 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
264 bucket = get_signal_collection(f"system.buckets.{signal_id}")
265 last_bucket = None
266 if bucket is not None:
267 last_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
268 if last_bucket is not None:
269 sample_data = collection.find_one({"timestamp": last_bucket["control"]["min"]["timestamp"]})
270 else:
271 sample_data = collection.find_one({}, sort=[("timestamp", 1)])
273 if sample_data is None:
274 return None
276 timestamp = sample_data["precise_timestamp"]
278 return cls(
279 signal_id=signal_id,
280 timestamp=timestamp,
281 value=sample_data.get("value", None),
282 forced_value=sample_data.get("forced_value", None),
283 )
285 @classmethod
286 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
287 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
289 @classmethod
290 def get_last_from_signal_id(cls, signal_id: str) -> Self | None:
292 collection = get_signal_collection(signal_id)
293 if collection is None:
294 return None
296 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
297 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
298 bucket = get_signal_collection(f"system.buckets.{signal_id}")
299 last_bucket = None
300 if bucket is not None:
301 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
302 if last_bucket is not None:
303 sample_data = collection.find_one({"timestamp": last_bucket["control"]["max"]["timestamp"]})
304 else:
305 sample_data = collection.find_one({}, sort=[("timestamp", -1)])
307 if sample_data is None:
308 return None
310 timestamp = sample_data["precise_timestamp"]
311 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
312 if device is not None and device.last_ping is not None:
313 if timestamp is None:
314 timestamp = device.last_ping
315 else:
316 timestamp = max(timestamp, device.last_ping)
317 return cls(
318 signal_id=signal_id,
319 timestamp=timestamp,
320 value=sample_data.get("value", None),
321 forced_value=sample_data.get("forced_value", None),
322 )
324 @classmethod
325 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
326 return [cls.get_last_from_signal_id(sid) for sid in signal_ids]
329class SignalData(TwinPadModel):
330 signal_id: str
331 time_vector: list[float]
332 values: list[float | int | str | None]
333 forced_values: list[float | int | str | None]
335 data_start: float | None = None
336 data_end: float | None = None
338 number_samples: int = 0
339 number_samples_db: int = 0
341 db_query_time: float = 0.0
342 init_time: float = 0.0
343 data_processing_time: float = 0.0
345 @classmethod
346 def get_from_signal_id(
347 cls,
348 signal_id: str,
349 min_timestamp: float = None,
350 max_timestamp: float = None,
351 interpolate_bounds: bool = True,
352 max_documents: int = None,
353 ) -> Self:
355 now = time.time()
357 req_signal = {}
358 if min_timestamp is not None:
359 req_signal.setdefault("timestamp", {})
360 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
361 if max_timestamp is not None:
362 req_signal.setdefault("timestamp", {})
363 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
365 collection = get_signal_collection(signal_id)
366 if collection is None:
367 return cls(
368 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
369 )
371 db_req_start = time.time()
373 sort_step = {"$sort": {"precise_timestamp": 1}}
374 number_results = collection.count_documents(req_signal)
376 pipeline = []
377 if req_signal:
378 pipeline.append({"$match": req_signal}) # Filter data if needed
380 pipeline.extend(
381 [
382 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
383 sort_step,
384 ]
385 )
387 if max_documents is not None and max_documents < number_results:
388 unsampling_ratio = math.ceil(number_results / max_documents)
389 logger.info(f"unsampling ratio: {unsampling_ratio} (number_results:{number_results})")
390 pipeline.extend(
391 [
392 {
393 "$setWindowFields": {
394 "sortBy": {"precise_timestamp": 1},
395 "output": {"index": {"$documentNumber": {}}},
396 }
397 },
398 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
399 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
400 {"$replaceRoot": {"newRoot": "$doc"}},
401 {"$unset": ["index", "group_id"]},
402 {"$sort": {"precise_timestamp": 1}},
403 ]
404 )
406 # logger.info(f"pipeline: {pipeline}")
407 cursor = collection.aggregate(pipeline)
408 db_req_time = time.time() - db_req_start
410 init_time = time.time()
411 number_samples_db = collection.count_documents(req_signal)
413 results = list(cursor)
414 time_vector = []
415 values = []
416 forced_values = []
417 for s in results:
418 time_vector.append(s["precise_timestamp"])
419 values.append(s.get("value", None))
420 forced_values.append(s.get("forced_value", None))
422 signal = Signal.get_from_signal_id(signal_id)
423 class_ = signal.signal_data_class
425 if interpolate_bounds:
426 # Fetching left side value & interpolation
427 if min_timestamp and (not time_vector or time_vector[0] != min_timestamp):
428 sample_left = collection.find_one(
429 {
430 "timestamp": {"$lte": datetime.datetime.fromtimestamp(min_timestamp)},
431 "value": {"$exists": True},
432 },
433 sort=[("timestamp", -1)],
434 )
436 if sample_left:
437 if time_vector:
438 left_sd = class_(
439 signal_id=signal_id,
440 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
441 values=[sample_left["value"], values[0]],
442 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
443 )
444 min_ts_value = left_sd.interpolate_values([min_timestamp])[0]
445 min_ts_forced_value = left_sd.interpolate_forced_values([min_timestamp])[0]
446 else:
447 min_ts_value = sample_left.get("value", None)
448 min_ts_forced_value = sample_left.get("forced_value", None)
449 time_vector.insert(0, min_timestamp)
450 values.insert(0, min_ts_value)
451 forced_values.insert(0, min_ts_forced_value)
453 # Fetching right side value & interpolation
454 if max_timestamp is not None and (not time_vector or time_vector[-1] != max_timestamp):
455 sample_right = collection.find_one(
456 {
457 "timestamp": {"$gte": datetime.datetime.fromtimestamp(max_timestamp)},
458 "value": {"$exists": True},
459 },
460 sort=[("timestamp", 1)],
461 )
462 if sample_right:
463 if time_vector:
464 right_sd = class_(
465 signal_id=signal_id,
466 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
467 values=[values[-1], sample_right.get("value", None)],
468 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
469 )
470 max_ts_value = right_sd.interpolate_values([max_timestamp])[0]
471 max_ts_forced_value = right_sd.interpolate_forced_values([max_timestamp])[0]
472 else:
473 max_ts_value = sample_right.get("value", None)
474 max_ts_forced_value = sample_right.get("forced_value", None)
475 time_vector.append(max_timestamp)
476 values.append(max_ts_value)
477 forced_values.append(max_ts_forced_value)
479 if values:
480 # TODO: check below. a bit strange
481 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
482 # Adding last value as it should be repeated
483 time_vector.append(now)
484 values.append(values[-1])
485 forced_values.append(forced_values[-1])
487 init_time = time.time() - init_time
489 # See line 292 for explanation
490 bucket = get_signal_collection(f"system.buckets.{signal_id}")
491 first_bucket = None
492 if bucket is not None:
493 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
494 if first_bucket is not None:
495 data_start = first_bucket["control"]["min"]["timestamp"].replace(tzinfo=pytz.UTC).timestamp()
496 else:
497 data_start = None
499 if signal.repeated_sample():
500 data_end = time.time()
501 else:
502 last_bucket = None
503 if bucket is not None:
504 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
505 if last_bucket is not None:
506 data_end = last_bucket["control"]["max"]["timestamp"].replace(tzinfo=pytz.UTC).timestamp()
507 else:
508 data_end = time.time()
510 return class_(
511 signal_id=signal_id,
512 time_vector=time_vector,
513 values=values,
514 forced_values=forced_values,
515 data_start=data_start,
516 data_end=data_end,
517 number_samples=len(values),
518 number_samples_db=number_samples_db,
519 db_query_time=db_req_time,
520 init_time=init_time,
521 )
523 def interpolate_values(self, new_time_vector: list[float]):
524 return self.interpolate(new_time_vector, self.values)
526 def interpolate_forced_values(self, new_time_vector: list[float]):
527 return self.interpolate(new_time_vector, self.forced_values)
529 def uniform_desampling(self, number_samples_max: int) -> Self:
530 data_processing_time = time.time()
531 if number_samples_max and self.number_samples > number_samples_max:
532 new_time_vector = npy.linspace(
533 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
534 ).tolist()
535 values = self.interpolate_values(new_time_vector)
536 forced_values = self.interpolate_forced_values(new_time_vector)
537 time_vector = new_time_vector
538 number_samples = len(time_vector)
539 else:
540 time_vector = self.time_vector
541 number_samples = len(self.values)
542 values = self.values[:]
543 forced_values = self.forced_values[:]
544 data_processing_time = time.time() - data_processing_time
546 return self.__class__(
547 signal_id=self.signal_id,
548 time_vector=time_vector,
549 values=values,
550 forced_values=forced_values,
551 number_samples=number_samples,
552 number_samples_db=self.number_samples,
553 data_start=self.data_start,
554 data_end=self.data_end,
555 db_query_time=self.db_query_time,
556 init_time=self.init_time,
557 data_processing_time=self.data_processing_time + data_processing_time,
558 )
560 def interest_window_desampling(
561 self,
562 window_max_number_samples: int,
563 outside_max_number_samples: int,
564 window_min_timestamp: float | None = None,
565 window_max_timestamp: float | None = None,
566 ) -> Self:
567 """Performs a sampling in a window of interest and outside."""
569 if not self.time_vector:
570 return self
572 if window_min_timestamp is None:
573 window_min_timestamp = self.time_vector[0]
574 if window_max_timestamp is None:
575 window_max_timestamp = self.time_vector[-1]
577 data_processing_time = time.time()
579 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
580 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
582 time_vector_before = self.time_vector[:index_window_start]
583 time_vector_window = self.time_vector[index_window_start:index_window_end]
584 time_vector_after = self.time_vector[index_window_end:]
586 # Resampling window
587 if time_vector_window:
588 # Ensurring window bounds
589 if time_vector_window[0] != window_min_timestamp:
590 time_vector_window.insert(0, window_min_timestamp)
591 if time_vector_window[-1] != window_max_timestamp:
592 time_vector_window.append(window_max_timestamp)
593 else:
594 time_vector_window = [window_min_timestamp, window_max_timestamp]
596 if len(time_vector_window) > window_max_number_samples:
597 # Resampling
598 new_window_time_vector = npy.linspace(
599 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
600 ).tolist()
601 time_vector_window = new_window_time_vector
603 # Resampling outside
604 number_samples_before = len(time_vector_before)
605 number_samples_after = len(time_vector_after)
606 if (number_samples_before + number_samples_after) > outside_max_number_samples:
607 new_number_samples_before = math.ceil(
608 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
609 )
610 new_number_samples_after = math.ceil(
611 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
612 )
613 # Adjusting numbers as math.ceil can do +1 on sum
614 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
615 if new_number_samples_before > new_number_samples_after:
616 new_number_samples_before -= 1
617 else:
618 new_number_samples_after -= 1
620 if new_number_samples_before:
621 new_time_vector_before = npy.linspace(
622 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
623 ).tolist()
624 time_vector_before = new_time_vector_before
626 if number_samples_after:
627 new_time_vector_after = npy.linspace(
628 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
629 ).tolist()[::-1]
630 time_vector_after = new_time_vector_after
632 new_time_vector = time_vector_before + time_vector_window + time_vector_after
633 values = self.interpolate_values(new_time_vector)
634 forced_values = self.interpolate_forced_values(new_time_vector)
635 number_samples = len(values)
637 data_processing_time = time.time() - data_processing_time
639 # logger.warning(f"samples: {samples}")
641 return self.__class__(
642 signal_id=self.signal_id,
643 time_vector=new_time_vector,
644 values=values,
645 forced_values=forced_values,
646 number_samples=number_samples,
647 number_samples_db=self.number_samples,
648 data_start=self.data_start,
649 data_end=self.data_end,
650 db_query_time=self.db_query_time,
651 init_time=self.init_time,
652 data_processing_time=self.data_processing_time + data_processing_time,
653 )
655 def csv_export(self):
656 output = io.StringIO()
657 writer = csv.writer(output)
658 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
659 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
660 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
661 return output.getvalue().encode("utf-8")
663 def prestoplot_export(self):
664 clean_signal_id = self.signal_id.replace(".", "_")
665 if clean_signal_id[0].isnumeric():
666 clean_signal_id = "_" + clean_signal_id
668 output = io.StringIO()
669 output.write("# Encoding:\tUTF-8\n")
670 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
671 output.write("ISO8601\tnone\tnone\n")
672 output.write(f"# Description :\t{clean_signal_id}\n")
674 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
675 output.write(
676 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
677 )
678 return output.getvalue().encode("utf-8")
681class NumericSignalData(SignalData):
682 data_type: str = "float"
683 values: list[float | int | None]
684 forced_values: list[float | int | None]
686 def interpolate(self, new_time_vector: list[float], items):
687 items = [npy.nan if s is None else s for s in items]
688 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
691class StringSignalData(SignalData):
692 data_type: str = "str"
693 values: list[str | None]
694 forced_values: list[str | None]
696 def interpolate(self, new_time_vector: list[float], items):
697 # Find the indices of the values in xp that are just smaller or equal to x
698 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
699 indices = npy.clip(indices, 0, len(items) - 1)
700 # Return the corresponding left string values from fp
701 return [items[i] for i in indices]
704class SignalsData(TwinPadModel):
705 signals_data: list[NumericSignalData | StringSignalData | SignalData]
706 data_processing_time: float
707 data_start: float | None
708 data_end: float | None
710 @classmethod
711 def get_from_signal_ids(
712 cls, signal_ids: list[str], min_timestamp: float = None, max_timestamp: float = None, max_documents: int = None
713 ) -> Self:
714 signals_data = []
715 data_start = None
716 data_end = None
717 if max_timestamp is None:
718 max_timestamp = time.time()
719 data_processing_time = 0.0
720 for signal_id in signal_ids:
721 signal_data = SignalData.get_from_signal_id(
722 signal_id=signal_id,
723 min_timestamp=min_timestamp,
724 max_timestamp=max_timestamp,
725 max_documents=max_documents,
726 )
727 data_processing_time += signal_data.data_processing_time
728 signals_data.append(signal_data)
729 if signal_data.data_start is not None:
730 if data_start is None:
731 data_start = signal_data.data_start
732 else:
733 data_start = min(signal_data.data_start, data_start)
734 if signal_data.data_end is not None:
735 if data_end is None:
736 data_end = signal_data.data_end
737 else:
738 data_end = max(signal_data.data_end, data_end)
740 return cls(
741 signals_data=signals_data,
742 data_processing_time=data_processing_time,
743 data_start=data_start,
744 data_end=data_end,
745 )
747 def uniform_desampling(self, number_samples_max: int) -> Self:
748 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
749 return SignalsData(
750 signals_data=signals_data,
751 data_processing_time=sum(s.data_processing_time for s in signals_data),
752 data_start=self.data_start,
753 data_end=self.data_end,
754 )
756 def interest_window_desampling(
757 self,
758 window_max_number_samples: int,
759 outside_max_number_samples: int,
760 window_min_timestamp: float = None,
761 window_max_timestamp: float = None,
762 ) -> Self:
763 signals_data = [
764 s.interest_window_desampling(
765 window_max_number_samples=window_max_number_samples,
766 outside_max_number_samples=outside_max_number_samples,
767 window_min_timestamp=window_min_timestamp,
768 window_max_timestamp=window_max_timestamp,
769 )
770 for s in self.signals_data
771 ]
773 return SignalsData(
774 signals_data=signals_data,
775 data_processing_time=sum(s.data_processing_time for s in signals_data),
776 data_start=self.data_start,
777 data_end=self.data_end,
778 )
780 def zip_export(self, format: str = "csv"):
781 # return self.signals_data[0].csv_export()
782 zip_buffer = io.BytesIO()
783 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
784 for signal_data in self.signals_data:
785 if format == "csv":
786 export_io = signal_data.csv_export()
787 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
788 elif format == "prestoplot":
789 export_io = signal_data.prestoplot_export()
790 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
791 else:
792 raise ValueError(f"Format not found. Got: {format}")
793 zip_bytes = zip_buffer.getvalue()
794 # zip_bytes.seek(0)
795 return zip_bytes
798class SignalStatus(TwinPadModel):
799 status: str
800 reason: str
801 delay: float | None
804class DigitizationFunction(TwinPadModel):
805 bits: int | None = None
806 min_value: float
807 max_value: float
808 min_raw_value: float
809 max_raw_value: float
812class SignalUpdate(TwinPadModel):
813 value: float | str | bool | int | None = None
814 forced_value: float | str | bool | int | None = None
815 timestamp: int | None = None
818class SignalType(str, Enum):
819 command = "command"
820 sensor = "sensor"
821 external_sensor = "external_sensor"
824SIGNALDATA_TYPES = {
825 "int": NumericSignalData,
826 "float": NumericSignalData,
827 "str": StringSignalData,
828 "bool": NumericSignalData,
829 "epoch": NumericSignalData,
830}
833class Signal(GenericMongo):
834 collection_name: ClassVar[str] = "signals"
836 signal_id: str
837 frequency: float
838 unit: str | None
839 description: str
840 type: SignalType
841 data_type: str
842 precision_digits: int | None
844 digitization_function: DigitizationFunction | None
846 @property
847 def device(self) -> Device:
848 device_id = self.signal_id.split(".")[0]
849 device = Device.get_one_by_attribute("device_id", device_id)
850 return device
852 @cached_property
853 def signal_data_class(self):
854 if self.data_type in SIGNALDATA_TYPES:
855 return SIGNALDATA_TYPES[self.data_type]
856 if self.data_type.startswith("enum"):
857 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
858 raise ValueError(f"Unhandled python type: {self.data_type}")
860 @cached_property
861 def python_type(self):
862 if self.data_type in TYPES:
863 return TYPES[self.data_type]
864 if self.data_type.startswith("enum"):
865 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
866 return Literal[*choices]
867 raise ValueError(f"Unhandled python type: {self.data_type}")
869 @computed_field
870 @property
871 def status(self) -> SignalStatus:
872 now = time.time()
873 status = "up"
874 reason = ""
876 # See line 292 for explanation
877 bucket = get_signal_collection(f"system.buckets.{self.signal_id}")
878 last_bucket = None
879 if bucket is not None:
880 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
881 if last_bucket is None:
882 status = "no data"
883 reason = "signal does not exist"
884 return SignalStatus(status=status, reason=reason, delay=None)
886 try:
887 last_date = last_bucket["control"]["max"]["timestamp"]
888 last_date = last_date.replace(tzinfo=pytz.UTC)
889 last_value_ts = last_date.timestamp()
890 except IndexError:
891 last_value_ts = None
893 if last_value_ts is None:
894 delay = None
895 reason = "No data from signal"
896 else:
897 # Since device is a computed property, only compute it once
898 device = self.device
899 if device.last_ping is not None:
900 last_value_ts = max(last_value_ts, device.last_ping)
901 delay = now - last_value_ts
902 if delay > DEVICE_TIMEOUT:
903 status = "down"
904 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
905 return SignalStatus(status=status, reason=reason, delay=delay)
907 def update(self, update: SignalUpdate):
908 send_signal_value(self.signal_id, update)
910 @classmethod
911 def get_from_signal_id(cls, signal_id) -> Self:
912 """Could be generic from mongo"""
913 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
914 if not raw_value:
915 return None
916 del raw_value["_id"]
917 return cls.dict_to_object(raw_value)
919 def number_samples(self):
920 collection = get_signal_collection(signal_id=self.signal_id)
921 if collection is None:
922 return 0
924 number_samples = collection.estimated_document_count()
926 number_samples_collection.insert_one(
927 {
928 "timestamp": datetime.datetime.now(pytz.UTC),
929 "signal_id": self.signal_id,
930 "number_samples": number_samples,
931 }
932 )
934 return number_samples
936 def repeated_sample(self, now: float = None):
937 if now is None:
938 now = time.time()
940 sample = SignalSample.get_last_from_signal_id(self.signal_id)
941 if sample is not None:
942 if self.device.status == "up":
943 if now - sample.timestamp < DEVICE_TIMEOUT:
944 return True
945 return False
947 def sample_datasize(self):
948 return signals_database.command("collstats", self.signal_id)["size"]
951class ServicesStatus(TwinPadModel):
952 backend: str
953 cloud_broker: str
954 time_series_database: str
955 signal_storage: str
956 heartbeat_storage: str
957 data_analyzer: str
959 @classmethod
960 def check(cls) -> Self:
961 return cls(
962 cloud_broker=ping(RABBITMQ_HOST),
963 backend="up",
964 time_series_database=ping(MONGO_HOST),
965 signal_storage=ping(SIGNAL_STORAGE_HOST),
966 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
967 data_analyzer=ping(DATA_ANALYZER_HOST),
968 )
971def ping(host):
972 try:
973 if ping3.ping(host, timeout=0.8):
974 return "up"
975 except PermissionError:
976 pass
977 return "down"
980class Event(GenericMongo):
981 collection_name: ClassVar[str] = "events"
983 name: str
984 timestamp: float
985 event_rule_id: str
987 @computed_field
988 @cached_property
989 def event_rule(self) -> "EventRule":
990 return EventRule.get_from_id(self.event_rule_id)
992 @classmethod
993 def dict_to_object(cls, dict_):
994 """Refine to convert timestamp to datetime for mongodb."""
995 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
996 return super().dict_to_object(dict_)
999class EventRule(GenericMongo):
1000 collection_name: ClassVar[str] = "event_rules"
1002 name: str
1003 formula: str
1004 variables: list[str]
1006 @computed_field
1007 @cached_property
1008 def number_events(self) -> int:
1009 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1012class Company(GenericMongo):
1013 collection_name: ClassVar[str] = "companies"
1014 name: str
1017class User(GenericMongo):
1018 collection_name: ClassVar[str] = "users"
1020 firstname: str
1021 lastname: str
1022 email: str
1023 password: str
1024 is_active: bool | None = False
1025 is_admin: bool | None = False
1026 is_connected: bool | None = False
1027 company_id: str | None = None
1029 def to_dict(self, exclude={"password"}):
1030 return GenericMongo.to_dict(self, exclude=exclude)
1032 @classmethod
1033 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
1034 users = cls.get_all()
1035 if not users:
1036 is_admin = True
1037 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
1038 user_collection = get_collection(systems_database, "users", create=True)
1039 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
1040 if new_user is None:
1041 return None
1042 return {"user_id": str(new_user.inserted_id)}
1044 @classmethod
1045 def update(cls, user: "UserUpdate", user_id: str):
1046 updated_user = cls.collection().find_one_and_update(
1047 {"_id": ObjectId(user_id)},
1048 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
1049 return_document=ReturnDocument.AFTER,
1050 )
1051 updated_user["id"] = str(updated_user["_id"])
1052 del (updated_user["_id"], updated_user["is_connected"])
1053 return cls(**updated_user)
1056UserUpdate = create_update_model(User)
1059class Campaign(GenericMongo):
1060 collection_name: ClassVar[str] = "campaigns"
1062 # Properties
1063 id: str | None = None
1064 name: str
1065 description: str | None = None
1067 @classmethod
1068 def create(cls, campaign: Self):
1069 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1070 if new_campaign is None:
1071 return None
1072 return {"campaign_id": str(new_campaign.inserted_id)}
1074 @classmethod
1075 def update(cls, campaign: Self):
1076 updated_campaign = cls.collection().find_one_and_update(
1077 {"_id": ObjectId(campaign.id)},
1078 {"$set": {"name": campaign.name, "description": campaign.description}},
1079 return_document=ReturnDocument.AFTER,
1080 )
1081 return updated_campaign
1083 @classmethod
1084 def delete(cls, campaign_id):
1085 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1086 return deleted_user
1089class Phase(GenericMongo):
1090 collection_name: ClassVar[str] = "phases"
1092 # Properties
1093 id: str | None = None
1094 name: str
1095 description: str | None = None
1096 start_at: float
1097 end_at: float
1099 # FK
1100 campaign_id: str
1102 @classmethod
1103 def get_by_date(cls, datetime: float):
1104 phases = []
1105 for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1106 phases.append(cls.dict_to_object(dict_).model_dump())
1107 for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1108 phases.append(cls.dict_to_object(dict_).model_dump())
1109 if phases is None:
1110 return None
1111 return phases
1113 @classmethod
1114 def create(cls, phase: Self):
1115 phase = Phase(
1116 name=phase.name,
1117 description=phase.description,
1118 start_at=phase.start_at,
1119 end_at=phase.end_at,
1120 campaign_id=phase.campaign_id,
1121 )
1122 phase_collection = get_collection(systems_database, "phases", create=True)
1123 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1124 if new_phase is None:
1125 return None
1126 return {"phase_id": str(new_phase.inserted_id)}
1128 @classmethod
1129 def update(cls, phase: Self):
1130 updated_phase = cls.collection().find_one_and_update(
1131 {"_id": ObjectId(phase.id)},
1132 {
1133 "$set": {
1134 "name": phase.name,
1135 "description": phase.description,
1136 "start_at": phase.start_at,
1137 "end_at": phase.end_at,
1138 }
1139 },
1140 return_document=ReturnDocument.AFTER,
1141 )
1142 return updated_phase
1144 @classmethod
1145 def delete(cls, phase_id):
1146 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1147 return delete_phase
1149 @classmethod
1150 def deleteMany(cls, campaign_id):
1151 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1152 return delete_phases
1155class CustomView(GenericMongo):
1156 collection_name: ClassVar[str] = "custom_views"
1158 # Properties
1159 id: str | None = None
1160 name: str
1161 configuration: list
1163 # Foreign Key
1164 user_id: str
1166 # Methods
1167 @classmethod
1168 def create(cls, form_custom_view: Self) -> list:
1169 custom_view = CustomView(
1170 name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=form_custom_view.user_id
1171 )
1172 new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1173 if new_custom_view is None:
1174 return None
1175 return {"custom_view_id": str(new_custom_view.inserted_id)}
1177 @classmethod
1178 def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1179 updated_custom_view = cls.collection().find_one_and_update(
1180 {"_id": ObjectId(custom_view_id)},
1181 {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1182 return_document=ReturnDocument.AFTER,
1183 )
1184 updated_custom_view["id"] = str(updated_custom_view["_id"])
1185 del updated_custom_view["_id"]
1186 return cls(**updated_custom_view)
1188 @classmethod
1189 def delete(cls, custom_view_id) -> bool:
1190 deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1191 return deleted_custom_view.acknowledged