Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 91%
944 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 14:27 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 14:27 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, Union
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pydantic import BaseModel, computed_field, Field, create_model
21import numpy as npy
22import lttb
24# from scipy import signal as signal_scipy
26from twinpad_backend.db import (
27 get_collection,
28 get_async_collection,
29 get_signal_collection,
30 systems_database,
31 systems_async_database,
32 signals_database,
33 devices_states_database,
34)
35from twinpad_backend.responses import ListResponse
36from twinpad_backend.messages import send_mode_change, send_signal_value
38TYPES = ({"int": int, "float": float, "str": str, "bool": bool, "epoch": float},)
41RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
42MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
43SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
44HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
45DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
47DEVICE_TIMEOUT = 5.0
48NUMBER_SAMPLES_DATABASE_UPDATE = 120
50logger = logging.getLogger("uvicorn.error")
53def create_update_model(model):
54 fields = {}
56 for field_name, field_annotation in model.model_fields.items():
57 if field_name != "id":
58 fields[field_name] = (Union[field_annotation.annotation, None], None)
60 query_name = model.__name__ + "Update"
61 return create_model(query_name, **fields)
64def get_utc_date_from_timestamp(timestamp: float):
65 return (
66 datetime.datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %-H:%M:%S.%f") + " UTC"
67 )
70def downsample_list(time_vector: list, values: list, max_number_samples: int):
71 if len(time_vector) < max_number_samples:
72 return time_vector, values
74 time_vector_copy = copy.deepcopy(time_vector)
75 values_copy = copy.deepcopy(values)
76 try:
77 none_group_bounds = []
78 none_group_index = -1
79 index = -1
80 # LTTB doesn't handle None values so remove them
81 while values_copy.count(None) > 0:
82 # Store bounds of None value groups so we can insert them back after the downsampling
83 if (new_index := values_copy.index(None)) != index:
84 none_group_bounds.append([time_vector_copy.pop(new_index)])
85 none_group_index += 1
86 elif len(none_group_bounds[none_group_index]) < 2:
87 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
88 else:
89 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
90 values_copy.pop(new_index)
91 index = new_index
93 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
95 values_array = npy.array([time_vector_copy, values_copy]).T
96 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
98 new_time_vector = interpolated_values[:, 0].tolist()
99 new_values = interpolated_values[:, 1].tolist()
100 except ValueError:
101 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
102 new_time_vector = npy.linspace(
103 time_vector_copy[0], time_vector_copy[-1], max_number_samples, endpoint=True
104 ).tolist()
105 new_values = list(npy.interp(new_time_vector, time_vector_copy, values_copy))
106 return new_time_vector, new_values
108 # insert back None values at the correct timestamps
109 for none_group in none_group_bounds:
110 start_index = npy.searchsorted(new_time_vector, none_group[0])
111 new_time_vector[start_index:start_index] = none_group
112 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
114 return new_time_vector, new_values
117# Models
118class TwinPadModel(BaseModel):
119 @classmethod
120 def dict_to_object(cls, dict_):
121 return cls.model_validate(dict_)
123 def to_dict(self, exclude=None):
124 dict_ = self.model_dump(exclude=exclude)
125 return dict_
128class GenericMongo(TwinPadModel):
129 id: str | None = None
130 custom_pipeline_steps: ClassVar[dict[str, list]] = {}
132 @classmethod
133 def collection(cls):
134 return get_collection(systems_database, cls.collection_name, create=True)
136 @classmethod
137 def response_from_query(cls, query) -> ListResponse[Self]:
138 req_filter = query.mongodb_filter()
139 items = []
140 if ":" in query.sort_by:
141 sort_field, sort_order = query.sort_by.split(":")
142 sort_order = int(sort_order)
143 else:
144 sort_field = query.sort_by
145 sort_order = 1
146 collection = get_collection(systems_database, cls.collection_name, create=True)
147 total = collection.count_documents(req_filter)
149 pipeline = []
150 added_properties = []
152 if "$and" in req_filter:
153 for filter in req_filter["$and"]:
154 for property in cls.custom_pipeline_steps:
155 if property in filter:
156 pipeline.extend(cls.custom_pipeline_steps[property])
157 added_properties.append(property)
158 else:
159 for property in cls.custom_pipeline_steps:
160 if property in req_filter:
161 pipeline.extend(cls.custom_pipeline_steps[property])
162 added_properties.append(property)
164 pipeline.append({"$match": req_filter})
166 if sort_field in cls.custom_pipeline_steps:
167 pipeline.extend(cls.custom_pipeline_steps[sort_field])
168 added_properties.append(sort_field)
170 pipeline.extend([{"$sort": {sort_field: sort_order}}, {"$skip": query.offset}])
172 if (query.limit is not None) and (query.limit != 0):
173 pipeline.append({"$limit": query.limit})
175 for property, step in cls.custom_pipeline_steps.items():
176 if property not in added_properties:
177 pipeline.extend(step)
179 cursor = collection.aggregate(pipeline)
181 for item_dict in cursor:
182 items.append(cls.mongo_dict_to_object(item_dict))
184 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
186 @classmethod
187 def get_from_id(cls, item_id) -> Self | None:
188 return cls.get_one_by_attribute("_id", ObjectId(item_id))
190 @classmethod
191 def mongo_dict_to_object(cls, mongo_dict):
192 mongo_dict["id"] = str(mongo_dict["_id"])
193 del mongo_dict["_id"]
194 return cls.dict_to_object(mongo_dict)
196 @classmethod
197 def get_by_attribute(cls, attribute_name: str, attribute_value):
198 """Returns all items that match the attribute with value."""
199 pipeline = []
200 if attribute_name in cls.custom_pipeline_steps:
201 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
202 pipeline.append({"$match": {attribute_name: attribute_value}})
203 for key, step in cls.custom_pipeline_steps.items():
204 if key != attribute_name:
205 pipeline.extend(step)
206 items = cls.collection().aggregate(pipeline)
207 if items is None:
208 return None
209 return [cls.mongo_dict_to_object(d) for d in items]
211 @classmethod
212 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
213 pipeline = []
214 if attribute_name in cls.custom_pipeline_steps:
215 pipeline.extend(cls.custom_pipeline_steps[attribute_name])
216 pipeline.append({"$match": {attribute_name: attribute_value}})
217 pipeline.append({"$limit": 1})
218 for key, step in cls.custom_pipeline_steps.items():
219 if key != attribute_name:
220 pipeline.extend(step)
221 items = cls.collection().aggregate(pipeline).to_list()
222 if len(items) == 0:
223 return None
224 return cls.mongo_dict_to_object(items[0])
226 @classmethod
227 def get_all(cls, sort_by="_id") -> list[Self]:
228 items = []
229 pipeline = []
230 if sort_by in cls.custom_pipeline_steps:
231 pipeline.extend(cls.custom_pipeline_steps[sort_by])
232 pipeline.append({"$sort": {sort_by: ASCENDING}})
233 for key, step in cls.custom_pipeline_steps.items():
234 if key != sort_by:
235 pipeline.extend(step)
236 for dict_ in cls.collection().aggregate(pipeline):
237 items.append(cls.mongo_dict_to_object(dict_))
238 return items
240 @classmethod
241 def get_number_documents(cls):
242 collection = get_collection(systems_database, cls.collection_name)
243 if collection is None:
244 return 0
245 return collection.count_documents({})
247 def insert(self):
248 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
249 self.id = str(insert_result.inserted_id)
250 return self.id
252 def update(self, update_dict):
253 for key, value in update_dict.items():
254 setattr(self, key, value)
255 self.collection().find_one_and_update(
256 {"_id": ObjectId(self.id)},
257 {"$set": update_dict},
258 return_document=ReturnDocument.AFTER,
259 )
261 return self
263 def delete(self):
264 result = self.collection().delete_one({"_id": ObjectId(self.id)})
265 return result.deleted_count > 0
268class User(GenericMongo):
269 collection_name: ClassVar[str] = "users"
271 firstname: str
272 lastname: str
273 email: str
274 password: str
275 is_active: bool | None = False
276 is_admin: bool | None = False
277 is_connected: bool | None = False
278 company_id: str | None = None
280 def to_dict(self, exclude=None):
281 if exclude is None:
282 exclude = {"password"}
283 return GenericMongo.to_dict(self, exclude=exclude)
285 @classmethod
286 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
287 users = cls.get_all()
288 if not users:
289 is_admin = True
290 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
291 user_collection = get_collection(systems_database, "users", create=True)
292 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
293 if new_user is None:
294 return None
295 return {"user_id": str(new_user.inserted_id)}
297 @classmethod
298 def update(cls, user: "UserUpdate", user_id: str):
299 updated_user = cls.collection().find_one_and_update(
300 {"_id": ObjectId(user_id)},
301 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
302 return_document=ReturnDocument.AFTER,
303 )
304 updated_user["id"] = str(updated_user["_id"])
305 del (updated_user["_id"], updated_user["is_connected"])
306 return cls(**updated_user)
309UserUpdate = create_update_model(User)
312class Mode(TwinPadModel):
313 mode_id: int
314 name: str
315 frequency_multiplier: float
316 min_frequency: float
319class DeviceUpdate(TwinPadModel):
320 mode_id: int
323class Device(GenericMongo):
324 collection_name: ClassVar[str] = "devices"
326 device_id: str
327 name: str
328 description: str = ""
329 modes: list[Mode]
330 current_mode_id: int | None = None
331 last_ping: float | None = None
332 petri_network: Any
333 pid: Any
334 load: float | None = None
335 tokens: list[int] = Field(default_factory=list)
336 status: str
338 custom_pipeline_steps: ClassVar[dict[str, list]] = {
339 "status": [
340 {
341 "$addFields": {
342 "status": {
343 "$cond": {
344 "if": {
345 "$or": [
346 {"$eq": [{"$ifNull": ["$last_ping", None]}, None]},
347 {
348 "$gt": [
349 {"$subtract": ["$$NOW", {"$multiply": ["$last_ping", 1000]}]},
350 {"$toDate": float(DEVICE_TIMEOUT * 1000)},
351 ]
352 },
353 ]
354 },
355 "then": "down",
356 "else": "up",
357 }
358 }
359 }
360 }
361 ]
362 }
364 async def update(self, update_dict, current_user: User):
365 if self.current_mode_id is not None:
366 description = f"Change mode of #{self.device_id} from {self.modes[self.current_mode_id - 1].name} to {self.modes[update_dict.mode_id - 1].name}"
367 else:
368 description = f"Change mode of #{self.device_id} to {self.modes[update_dict.mode_id - 1].name}"
369 command = Command(
370 sent_at=time.time(),
371 command_type="Mode change",
372 description=description,
373 user_id=current_user.id,
374 )
375 response = await send_mode_change(self.device_id, update_dict.mode_id)
376 command.receive_response(response)
377 Command.create(command)
379 return response
381 @classmethod
382 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
383 devices_by_id = {}
384 for signal_id in signal_ids:
385 device_id = signal_id.split(".")[0]
386 if device_id not in devices_by_id:
387 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
388 return devices_by_id
391class DeviceSetup(GenericMongo):
392 collection_name: ClassVar[str] = "device_setups"
394 device_ids: list[str]
395 active: bool = False
396 variable_mapping: dict[str, str]
399DeviceSetupUpdate = create_update_model(DeviceSetup)
402class DeviceState(GenericMongo):
403 collection_name: ClassVar[str] = "devices_states"
405 timestamp: float
406 mode: str | None = None
407 load: float | None = None
408 tokens: list[int] = Field(default_factory=list)
409 modified_properties: list[str] = Field(default_factory=list)
411 @classmethod
412 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
413 req_filter = query.mongodb_filter()
414 items = []
415 if ":" in query.sort_by:
416 sort_field, sort_order = query.sort_by.split(":")
417 sort_order = int(sort_order)
418 else:
419 sort_field = query.sort_by
420 sort_order = 1
421 collection = get_collection(devices_states_database, device_id)
422 if collection is None:
423 total = 0
424 cursor = []
425 else:
426 total = collection.count_documents(req_filter)
427 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
428 if (query.limit is not None) and (query.limit != 0):
429 cursor = cursor.limit(query.limit)
430 for item_dict in cursor:
431 items.append(
432 cls(
433 timestamp=item_dict.get("precise_timestamp"),
434 mode=item_dict.get("mode", None),
435 load=item_dict.get("load", None),
436 tokens=item_dict.get("tokens", Field(default_factory=list)),
437 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
438 )
439 )
440 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
443class SignalSample(TwinPadModel):
444 signal_id: str
445 timestamp: float
446 value: float | int | str | bool | None
447 forced_value: float | int | str | bool | None = None
449 @classmethod
450 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
452 collection = get_signal_collection(signal_id)
453 if collection is None:
454 return None
456 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
457 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
458 bucket = get_signal_collection(f"system.buckets.{signal_id}")
459 first_bucket = None
460 if bucket is not None:
461 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
462 if first_bucket is not None:
463 sample_data = collection.find_one(
464 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
465 )
466 else:
467 sample_data = collection.find_one({}, sort=[("precise_timestamp", 1)])
469 if sample_data is None:
470 return None
472 timestamp = sample_data["precise_timestamp"]
474 return cls(
475 signal_id=signal_id,
476 timestamp=timestamp,
477 value=sample_data.get("value", None),
478 forced_value=sample_data.get("forced_value", None),
479 )
481 @classmethod
482 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
483 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
485 @classmethod
486 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
488 collection = get_signal_collection(signal_id)
489 if collection is None:
490 return None
492 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
493 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
494 bucket = get_signal_collection(f"system.buckets.{signal_id}")
495 last_bucket = None
496 if bucket is not None:
497 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
498 if last_bucket is not None:
499 sample_data = collection.find_one({"precise_timestamp": last_bucket["control"]["max"]["precise_timestamp"]})
500 else:
501 sample_data = collection.find_one({}, sort=[("precise_timestamp", -1)])
503 if sample_data is None:
504 return None
506 timestamp = sample_data["precise_timestamp"]
508 if device is None:
509 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
510 if device is not None and device.last_ping is not None:
511 if timestamp is None:
512 timestamp = device.last_ping
513 else:
514 timestamp = max(timestamp, device.last_ping)
515 return cls(
516 signal_id=signal_id,
517 timestamp=timestamp,
518 value=sample_data.get("value", None),
519 forced_value=sample_data.get("forced_value", None),
520 )
522 @classmethod
523 def get_last_from_signal_id_interest_window(cls, signal_id: str, min_timestamp: float) -> Self | None:
524 collection = get_signal_collection(signal_id)
525 if collection is None:
526 return None
528 cursor = collection.aggregate(
529 [
530 {"$match": {"timestamp": {"$gte": datetime.datetime.fromtimestamp(min_timestamp, pytz.UTC)}}},
531 {"$sort": {"timestamp": -1}},
532 {"$limit": 1},
533 ]
534 )
535 cursor_data = cursor.to_list()
537 if len(cursor_data) == 0:
538 return None
540 sample_data = cursor_data[0]
541 return cls(
542 signal_id=signal_id,
543 timestamp=sample_data.get("precise_timestamp"),
544 value=sample_data.get("value"),
545 forced_value=sample_data.get("forced_value"),
546 )
548 @classmethod
549 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
550 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
551 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
553 @classmethod
554 def get_last_from_signal_ids_interest_window(cls, signal_ids: list[str], min_timestamp: float) -> Self | None:
555 return [cls.get_last_from_signal_id_interest_window(sid, min_timestamp) for sid in signal_ids]
558class SignalData(TwinPadModel):
559 signal_id: str
560 forcible: bool = True
561 time_vector: list[float]
562 values: list[float | int | str | None]
563 forced_values: list[float | int | str | None]
565 data_start: float | None = None
566 data_end: float | None = None
568 number_samples: int = 0
569 number_samples_db: int = 0
571 db_query_time: float = 0.0
572 init_time: float = 0.0
573 data_processing_time: float = 0.0
575 @classmethod
576 def get_from_signal_id(
577 cls,
578 signal_id: str,
579 min_timestamp: float = None,
580 max_timestamp: float = None,
581 window_min_timestamp: float = None,
582 window_max_timestamp: float = None,
583 interpolate_bounds: bool = True,
584 max_documents: int = None,
585 ) -> Self:
587 now = time.time()
589 req_signal = {}
590 if min_timestamp is not None:
591 req_signal.setdefault("timestamp", {})
592 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
593 if max_timestamp is not None:
594 req_signal.setdefault("timestamp", {})
595 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
597 collection = get_signal_collection(signal_id)
598 if collection is None:
599 return cls(
600 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
601 )
603 db_req_start = time.time()
605 sort_step = {"$sort": {"precise_timestamp": 1}}
606 number_results = collection.count_documents(req_signal)
608 pipeline = []
609 if req_signal:
610 pipeline.append({"$match": req_signal}) # Filter data if needed
612 pipeline.extend(
613 [
614 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
615 sort_step,
616 ]
617 )
619 if max_documents is not None and max_documents < number_results:
620 unsampling_ratio = math.ceil(number_results / max_documents)
621 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
622 pipeline.extend(
623 [
624 {
625 "$setWindowFields": {
626 "sortBy": {"precise_timestamp": 1},
627 "output": {"index": {"$documentNumber": {}}},
628 }
629 },
630 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
631 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
632 {"$replaceRoot": {"newRoot": "$doc"}},
633 {"$unset": ["index", "group_id"]},
634 {"$sort": {"precise_timestamp": 1}},
635 ]
636 )
638 # logger.info(f"pipeline: %s", str(pipeline))
639 cursor = collection.aggregate(pipeline)
640 db_req_time = time.time() - db_req_start
642 init_time = time.time()
644 results = cursor.to_list()
645 time_vector = []
646 values = []
647 forced_values = []
648 for s in results:
649 time_vector.append(s["precise_timestamp"])
650 values.append(s.get("value", None))
651 forced_values.append(s.get("forced_value", None))
653 signal = Signal.get_from_signal_id(signal_id)
654 class_ = signal.signal_data_class
656 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
657 time_vector, values, forced_values = cls.interpolate_bounds(
658 class_,
659 collection,
660 signal_id,
661 time_vector,
662 values,
663 forced_values,
664 window_min_timestamp,
665 window_max_timestamp,
666 )
668 if values:
669 # TODO: check below. a bit strange
670 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
671 # Adding last value as it should be repeated
672 time_vector.append(now)
673 values.append(values[-1])
674 forced_values.append(forced_values[-1])
676 init_time = time.time() - init_time
678 # See line 292 for explanation
679 bucket = get_signal_collection(f"system.buckets.{signal_id}")
680 first_bucket = None
681 if bucket is not None:
682 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
683 if first_bucket is not None:
684 data_start = first_bucket["control"]["min"]["precise_timestamp"]
685 else:
686 data_start = None
688 last_bucket = None
689 if bucket is not None:
690 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
691 if last_bucket is not None:
692 data_end = last_bucket["control"]["max"]["precise_timestamp"]
693 else:
694 data_end = None
696 return class_(
697 signal_id=signal_id,
698 forcible=signal.forcible,
699 time_vector=time_vector,
700 values=values,
701 forced_values=forced_values,
702 data_start=data_start,
703 data_end=data_end,
704 number_samples=len(values),
705 number_samples_db=number_results,
706 db_query_time=db_req_time,
707 init_time=init_time,
708 )
710 @staticmethod
711 def interpolate_bounds(
712 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
713 ):
714 sample_right = None
715 # Fetching right side value & interpolation
716 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
717 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
718 sample_right = collection.find_one(
719 {
720 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
721 "value": {"$exists": True},
722 },
723 sort=[("precise_timestamp", -1)],
724 )
725 if sample_right:
726 if time_vector:
727 right_sd = class_(
728 signal_id=signal_id,
729 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
730 values=[values[-1], sample_right.get("value", None)],
731 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
732 )
733 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
734 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
735 else:
736 max_ts_value = sample_right.get("value", None)
737 max_ts_forced_value = sample_right.get("forced_value", None)
738 time_vector.append(window_max_timestamp)
739 values.append(max_ts_value)
740 forced_values.append(max_ts_forced_value)
742 # Fetching left side value & interpolation
743 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
744 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
745 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
746 sample_left = sample_right
747 sample_left = collection.find_one(
748 {
749 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
750 "value": {"$exists": True},
751 },
752 sort=[("precise_timestamp", -1)],
753 )
755 if sample_left:
756 if time_vector:
757 left_sd = class_(
758 signal_id=signal_id,
759 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
760 values=[sample_left["value"], values[0]],
761 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
762 )
763 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
764 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
765 else:
766 min_ts_value = sample_left.get("value", None)
767 min_ts_forced_value = sample_left.get("forced_value", None)
768 time_vector.insert(0, window_min_timestamp)
769 values.insert(0, min_ts_value)
770 forced_values.insert(0, min_ts_forced_value)
772 return time_vector, values, forced_values
774 def interpolate_values(self, new_time_vector: list[float]):
775 return self.interpolate(new_time_vector, self.values)
777 def interpolate_forced_values(self, new_time_vector: list[float]):
778 return self.interpolate(new_time_vector, self.forced_values)
780 def uniform_desampling(self, number_samples_max: int) -> Self:
781 data_processing_time = time.time()
782 if number_samples_max and self.number_samples > number_samples_max:
783 new_time_vector = npy.linspace(
784 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
785 ).tolist()
786 values = self.interpolate_values(new_time_vector)
787 forced_values = self.interpolate_forced_values(new_time_vector)
788 time_vector = new_time_vector
789 number_samples = len(time_vector)
790 else:
791 time_vector = self.time_vector
792 number_samples = len(self.values)
793 values = self.values[:]
794 forced_values = self.forced_values[:]
795 data_processing_time = time.time() - data_processing_time
797 return self.__class__(
798 signal_id=self.signal_id,
799 time_vector=time_vector,
800 values=values,
801 forced_values=forced_values,
802 number_samples=number_samples,
803 number_samples_db=self.number_samples,
804 data_start=self.data_start,
805 data_end=self.data_end,
806 db_query_time=self.db_query_time,
807 init_time=self.init_time,
808 data_processing_time=self.data_processing_time + data_processing_time,
809 )
811 def interest_window_desampling(
812 self,
813 window_max_number_samples: int,
814 outside_max_number_samples: int,
815 window_min_timestamp: float | None = None,
816 window_max_timestamp: float | None = None,
817 ) -> Self:
818 """Performs a sampling in a window of interest and outside."""
820 if not self.time_vector:
821 return self
823 if window_min_timestamp is None:
824 window_min_timestamp = self.time_vector[0]
825 if window_max_timestamp is None:
826 window_max_timestamp = self.time_vector[-1]
828 data_processing_time = time.time()
830 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
831 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
833 time_vector_before = self.time_vector[:index_window_start]
834 time_vector_window = self.time_vector[index_window_start:index_window_end]
835 time_vector_after = self.time_vector[index_window_end:]
837 # Resampling window
838 if time_vector_window:
839 # Ensurring window bounds
840 if time_vector_window[0] != window_min_timestamp:
841 time_vector_window.insert(0, window_min_timestamp)
842 if time_vector_window[-1] != window_max_timestamp:
843 time_vector_window.append(window_max_timestamp)
844 else:
845 time_vector_window = [window_min_timestamp, window_max_timestamp]
847 if len(time_vector_window) > window_max_number_samples:
848 # Resampling
849 new_window_time_vector = npy.linspace(
850 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
851 ).tolist()
852 time_vector_window = new_window_time_vector
854 # Resampling outside
855 number_samples_before = len(time_vector_before)
856 number_samples_after = len(time_vector_after)
857 if (number_samples_before + number_samples_after) > outside_max_number_samples:
858 new_number_samples_before = math.ceil(
859 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
860 )
861 new_number_samples_after = math.ceil(
862 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
863 )
864 # Adjusting numbers as math.ceil can do +1 on sum
865 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
866 if new_number_samples_before > new_number_samples_after:
867 new_number_samples_before -= 1
868 else:
869 new_number_samples_after -= 1
871 if new_number_samples_before:
872 new_time_vector_before = npy.linspace(
873 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
874 ).tolist()
875 time_vector_before = new_time_vector_before
877 if number_samples_after:
878 new_time_vector_after = npy.linspace(
879 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
880 ).tolist()[::-1]
881 time_vector_after = new_time_vector_after
883 new_time_vector = time_vector_before + time_vector_window + time_vector_after
884 values = self.interpolate_values(new_time_vector)
885 forced_values = self.interpolate_forced_values(new_time_vector)
886 number_samples = len(values)
888 data_processing_time = time.time() - data_processing_time
890 return self.__class__(
891 signal_id=self.signal_id,
892 forcible=self.forcible,
893 time_vector=new_time_vector,
894 values=values,
895 forced_values=forced_values,
896 number_samples=number_samples,
897 number_samples_db=self.number_samples,
898 data_start=self.data_start,
899 data_end=self.data_end,
900 db_query_time=self.db_query_time,
901 init_time=self.init_time,
902 data_processing_time=self.data_processing_time + data_processing_time,
903 )
905 def csv_export(self):
906 output = io.StringIO()
907 writer = csv.writer(output)
908 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
909 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
910 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
911 return output.getvalue().encode("utf-8")
913 def prestoplot_export(self):
914 clean_signal_id = self.signal_id.replace(".", "_")
915 if clean_signal_id[0].isnumeric():
916 clean_signal_id = "_" + clean_signal_id
918 output = io.StringIO()
919 output.write("# Encoding:\tUTF-8\n")
920 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
921 output.write("ISO8601\tnone\tnone\n")
922 output.write(f"# Description :\t{clean_signal_id}\n")
924 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
925 output.write(
926 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
927 )
928 return output.getvalue().encode("utf-8")
931class NumericSignalData(SignalData):
932 data_type: str = "float"
933 values: list[float | int | None]
934 forced_values: list[float | int | None]
936 def interpolate(self, new_time_vector: list[float], items):
937 items = [npy.nan if s is None else s for s in items]
938 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
940 def uniform_desampling(self, number_samples_max: int) -> Self:
941 data_processing_time = time.time()
942 if number_samples_max and self.number_samples > number_samples_max:
943 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
944 forced_values = self.interpolate_forced_values(time_vector)
945 number_samples = len(time_vector)
946 else:
947 time_vector = self.time_vector
948 number_samples = len(self.values)
949 values = self.values[:]
950 forced_values = self.forced_values[:]
951 data_processing_time = time.time() - data_processing_time
953 return self.__class__(
954 signal_id=self.signal_id,
955 time_vector=time_vector,
956 values=values,
957 forced_values=forced_values,
958 number_samples=number_samples,
959 number_samples_db=self.number_samples,
960 data_start=self.data_start,
961 data_end=self.data_end,
962 db_query_time=self.db_query_time,
963 init_time=self.init_time,
964 data_processing_time=self.data_processing_time + data_processing_time,
965 )
967 def interest_window_desampling(
968 self,
969 window_max_number_samples: int,
970 outside_max_number_samples: int,
971 window_min_timestamp: float | None = None,
972 window_max_timestamp: float | None = None,
973 ) -> Self:
974 """Performs a sampling in a window of interest and outside."""
976 if not self.time_vector:
977 return self
979 if window_min_timestamp is None:
980 window_min_timestamp = self.time_vector[0]
981 if window_max_timestamp is None:
982 window_max_timestamp = self.time_vector[-1]
984 data_processing_time = time.time()
986 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
987 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
989 time_vector_before = self.time_vector[:index_window_start]
990 time_vector_window = self.time_vector[index_window_start:index_window_end]
991 time_vector_after = self.time_vector[index_window_end:]
993 values_before = self.values[:index_window_start]
994 values_window = self.values[index_window_start:index_window_end]
995 values_after = self.values[index_window_end:]
996 window_min_value = self.interpolate_values([window_min_timestamp])[0]
997 window_max_value = self.interpolate_values([window_max_timestamp])[0]
999 # Resampling window
1000 if time_vector_window:
1001 # Ensurring window bounds
1002 if time_vector_window[0] != window_min_timestamp:
1003 time_vector_window.insert(0, window_min_timestamp)
1004 values_window.insert(0, window_min_value)
1005 if time_vector_window[-1] != window_max_timestamp:
1006 time_vector_window.append(window_max_timestamp)
1007 values_window.append(window_max_value)
1008 else:
1009 time_vector_window = [window_min_timestamp, window_max_timestamp]
1010 values_window = [window_min_value, window_max_value]
1012 if len(time_vector_window) > window_max_number_samples:
1013 # Resampling
1014 time_vector_window, values_window = downsample_list(
1015 time_vector_window, values_window, window_max_number_samples
1016 )
1018 # Resampling outside
1019 number_samples_before = len(time_vector_before)
1020 number_samples_after = len(time_vector_after)
1021 if (number_samples_before + number_samples_after) > outside_max_number_samples:
1022 new_number_samples_before = math.ceil(
1023 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
1024 )
1025 new_number_samples_after = math.ceil(
1026 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
1027 )
1028 # Adjusting numbers as math.ceil can do +1 on sum
1029 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
1030 if new_number_samples_before > new_number_samples_after:
1031 new_number_samples_before -= 1
1032 else:
1033 new_number_samples_after -= 1
1035 if new_number_samples_before:
1036 time_vector_before, values_before = downsample_list(
1037 time_vector_before, values_before, new_number_samples_before
1038 )
1040 if number_samples_after:
1041 time_vector_after, values_after = downsample_list(
1042 time_vector_after, values_after, new_number_samples_after
1043 )
1045 new_time_vector = time_vector_before + time_vector_window + time_vector_after
1046 values = values_before + values_window + values_after
1047 forced_values = self.interpolate_forced_values(new_time_vector)
1048 number_samples = len(values)
1050 data_processing_time = time.time() - data_processing_time
1052 return self.__class__(
1053 signal_id=self.signal_id,
1054 time_vector=new_time_vector,
1055 values=values,
1056 forced_values=forced_values,
1057 number_samples=number_samples,
1058 number_samples_db=self.number_samples,
1059 data_start=self.data_start,
1060 data_end=self.data_end,
1061 db_query_time=self.db_query_time,
1062 init_time=self.init_time,
1063 data_processing_time=self.data_processing_time + data_processing_time,
1064 )
1067class StringSignalData(SignalData):
1068 data_type: str = "str"
1069 values: list[str | None]
1070 forced_values: list[str | None]
1072 def interpolate(self, new_time_vector: list[float], items):
1073 # Find the indices of the values in xp that are just smaller or equal to x
1074 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
1075 indices = npy.clip(indices, 0, len(items) - 1)
1076 # Return the corresponding left string values from fp
1077 return [items[i] for i in indices]
1080class SignalsData(TwinPadModel):
1081 signals_data: list[NumericSignalData | StringSignalData | SignalData]
1082 data_processing_time: float
1083 data_start: float | None
1084 data_end: float | None
1086 @classmethod
1087 def get_from_signal_ids(
1088 cls,
1089 signal_ids: list[str],
1090 min_timestamp: float = None,
1091 max_timestamp: float = None,
1092 window_min_timestamp: float = None,
1093 window_max_timestamp: float = None,
1094 interpolate_bounds: bool = True,
1095 max_documents: int = None,
1096 ) -> Self:
1097 signals_data = []
1098 data_start = None
1099 data_end = None
1100 if max_timestamp is None:
1101 max_timestamp = time.time()
1102 data_processing_time = 0.0
1103 for signal_id in signal_ids:
1104 signal_data = SignalData.get_from_signal_id(
1105 signal_id=signal_id,
1106 min_timestamp=min_timestamp,
1107 max_timestamp=max_timestamp,
1108 window_min_timestamp=window_min_timestamp,
1109 window_max_timestamp=window_max_timestamp,
1110 interpolate_bounds=interpolate_bounds,
1111 max_documents=max_documents,
1112 )
1113 data_processing_time += signal_data.data_processing_time
1114 signals_data.append(signal_data)
1115 if signal_data.data_start is not None:
1116 if data_start is None:
1117 data_start = signal_data.data_start
1118 else:
1119 data_start = min(signal_data.data_start, data_start)
1120 if signal_data.data_end is not None:
1121 if data_end is None:
1122 data_end = signal_data.data_end
1123 else:
1124 data_end = max(signal_data.data_end, data_end)
1126 return cls(
1127 signals_data=signals_data,
1128 data_processing_time=data_processing_time,
1129 data_start=data_start,
1130 data_end=data_end,
1131 )
1133 def uniform_desampling(self, number_samples_max: int) -> Self:
1134 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
1135 return SignalsData(
1136 signals_data=signals_data,
1137 data_processing_time=sum(s.data_processing_time for s in signals_data),
1138 data_start=self.data_start,
1139 data_end=self.data_end,
1140 )
1142 def interest_window_desampling(
1143 self,
1144 window_max_number_samples: int,
1145 outside_max_number_samples: int,
1146 window_min_timestamp: float = None,
1147 window_max_timestamp: float = None,
1148 ) -> Self:
1149 signals_data = [
1150 s.interest_window_desampling(
1151 window_max_number_samples=window_max_number_samples,
1152 outside_max_number_samples=outside_max_number_samples,
1153 window_min_timestamp=window_min_timestamp,
1154 window_max_timestamp=window_max_timestamp,
1155 )
1156 for s in self.signals_data
1157 ]
1159 return SignalsData(
1160 signals_data=signals_data,
1161 data_processing_time=sum(s.data_processing_time for s in signals_data),
1162 data_start=self.data_start,
1163 data_end=self.data_end,
1164 )
1166 def zip_export(self, file_format: str = "csv"):
1167 # return self.signals_data[0].csv_export()
1168 zip_buffer = io.BytesIO()
1169 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1170 for signal_data in self.signals_data:
1171 if file_format == "csv":
1172 export_io = signal_data.csv_export()
1173 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1174 elif file_format == "prestoplot":
1175 export_io = signal_data.prestoplot_export()
1176 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1177 else:
1178 raise ValueError(f"Format not found. Got: {file_format}")
1179 zip_bytes = zip_buffer.getvalue()
1180 # zip_bytes.seek(0)
1181 return zip_bytes
1184class SignalStatus(TwinPadModel):
1185 status: str
1186 reason: str
1187 delay: float | None
1190class DigitizationFunction(TwinPadModel):
1191 bits: int | None = None
1192 min_value: float
1193 max_value: float
1194 min_raw_value: float
1195 max_raw_value: float
1198class SignalUpdate(TwinPadModel):
1199 value: float | str | bool | int | None = None
1200 forced_value: float | str | bool | int | None = None
1201 timestamp: int | None = None
1204class SignalType(str, Enum):
1205 command = "command"
1206 sensor = "sensor"
1207 external_sensor = "external_sensor"
1210SIGNALDATA_TYPES = {
1211 "int": NumericSignalData,
1212 "float": NumericSignalData,
1213 "str": StringSignalData,
1214 "bool": NumericSignalData,
1215 "epoch": NumericSignalData,
1216}
1219class Signal(GenericMongo):
1220 collection_name: ClassVar[str] = "signals"
1222 signal_id: str
1223 frequency: float
1224 unit: str | None
1225 description: str
1226 type: SignalType
1227 data_type: str
1228 precision_digits: int | None
1229 forcible: bool
1231 digitization_function: DigitizationFunction | None
1233 @property
1234 def device(self) -> Device:
1235 device_id = self.signal_id.split(".")[0]
1236 device = Device.get_one_by_attribute("device_id", device_id)
1237 return device
1239 @cached_property
1240 def signal_data_class(self):
1241 if self.data_type in SIGNALDATA_TYPES:
1242 return SIGNALDATA_TYPES[self.data_type]
1243 if self.data_type.startswith("enum"):
1244 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1245 raise ValueError(f"Unhandled python type: {self.data_type}")
1247 @cached_property
1248 def python_type(self):
1249 if self.data_type in TYPES:
1250 return TYPES[self.data_type]
1251 if self.data_type.startswith("enum"):
1252 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1253 return Literal[*choices]
1254 raise ValueError(f"Unhandled python type: {self.data_type}")
1256 @computed_field
1257 @property
1258 def status(self) -> SignalStatus:
1259 now = time.time()
1260 status = "up"
1261 reason = ""
1263 # See line 292 for explanation
1264 bucket = get_signal_collection(f"system.buckets.{self.signal_id}")
1265 last_bucket = None
1266 if bucket is not None:
1267 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1268 if last_bucket is None:
1269 status = "no data"
1270 reason = "signal does not exist"
1271 return SignalStatus(status=status, reason=reason, delay=None)
1273 try:
1274 last_date = last_bucket["control"]["max"]["timestamp"]
1275 last_date = last_date.replace(tzinfo=pytz.UTC)
1276 last_value_ts = last_date.timestamp()
1277 except IndexError:
1278 last_value_ts = None
1280 if last_value_ts is None:
1281 delay = None
1282 reason = "No data from signal"
1283 else:
1284 # Since device is a computed property, only compute it once
1285 device = self.device
1286 if device is not None and device.last_ping is not None:
1287 last_value_ts = max(last_value_ts, device.last_ping)
1288 delay = now - last_value_ts
1289 if delay > DEVICE_TIMEOUT:
1290 status = "down"
1291 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1292 return SignalStatus(status=status, reason=reason, delay=delay)
1294 async def update(self, update_dict: SignalUpdate, current_user: User) -> dict:
1295 command = Command(
1296 sent_at=time.time(),
1297 command_type="Signal command",
1298 user_id=current_user.id,
1299 )
1300 response = await send_signal_value(self.signal_id, update_dict)
1301 command.receive_response(response)
1302 Command.create(command)
1304 return response
1306 @classmethod
1307 def get_from_signal_id(cls, signal_id) -> Self:
1308 """Could be generic from mongo"""
1309 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1310 if not raw_value:
1311 return None
1312 del raw_value["_id"]
1313 return cls.dict_to_object(raw_value)
1315 @classmethod
1316 def get_all_ids(cls) -> list[str]:
1317 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1319 return [signal["signal_id"] for signal in cursor]
1321 async def number_samples(self):
1322 collection = get_signal_collection(signal_id=self.signal_id)
1323 if collection is None:
1324 return 0
1326 number_samples = collection.estimated_document_count()
1328 number_samples_async_collection = await get_async_collection(
1329 systems_async_database, "number_samples", create=True, time_series=True
1330 )
1332 loop = asyncio.get_running_loop()
1333 loop.create_task(
1334 number_samples_async_collection.insert_one(
1335 {
1336 "timestamp": datetime.datetime.now(pytz.UTC),
1337 "signal_id": self.signal_id,
1338 "number_samples": number_samples,
1339 }
1340 )
1341 )
1343 return number_samples
1345 def sample_datasize(self):
1346 return signals_database.command("collstats", self.signal_id)["size"]
1348 @classmethod
1349 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1350 result = cls.collection().aggregate(
1351 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1352 )
1354 return {signal["signal_id"]: signal["forcible"] for signal in result}
1357class ServicesStatus(TwinPadModel):
1358 backend: str
1359 cloud_broker: str
1360 time_series_database: str
1361 signal_storage: str
1362 heartbeat_storage: str
1363 data_analyzer: str
1365 @classmethod
1366 def check(cls) -> Self:
1367 return cls(
1368 cloud_broker=ping(RABBITMQ_HOST),
1369 backend="up",
1370 time_series_database=ping(MONGO_HOST),
1371 signal_storage=ping(SIGNAL_STORAGE_HOST),
1372 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1373 data_analyzer=ping(DATA_ANALYZER_HOST),
1374 )
1377def ping(host):
1378 try:
1379 if ping3.ping(host, timeout=0.8):
1380 return "up"
1381 except PermissionError:
1382 pass
1383 return "down"
1386class Event(GenericMongo):
1387 collection_name: ClassVar[str] = "events"
1389 name: str
1390 timestamp: float
1391 event_rule_id: str
1393 @computed_field
1394 @cached_property
1395 def event_rule(self) -> "EventRule":
1396 return EventRule.get_from_id(self.event_rule_id)
1398 @classmethod
1399 def dict_to_object(cls, dict_):
1400 """Refine to convert timestamp to datetime for mongodb."""
1401 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1402 return super().dict_to_object(dict_)
1405class EventDay(GenericMongo):
1406 collection_name: ClassVar[str] = "number_events"
1408 timestamp: float
1409 number_events: int
1411 @classmethod
1412 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_events) -> list[Self]:
1413 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1414 number_events_collection = get_collection(systems_database, "number_events")
1415 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1416 items = []
1417 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1418 if number_events_collection is None or recompute_events:
1419 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1420 first_event = events_collection.find_one(sort={"timestamp": 1})
1421 if first_event is None:
1422 return items
1423 # compute from day of first found event
1424 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1425 tzinfo=pytz.UTC
1426 )
1427 while last_computed_day < TODAY:
1428 day_nb_events = events_collection.count_documents(
1429 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1430 )
1431 if day_nb_events > 0:
1432 number_events_collection.insert_one(
1433 {"timestamp": last_computed_day, "number_events": day_nb_events}
1434 )
1435 last_computed_day += ONE_DAY_OFFSET
1436 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1437 if number_events_today > 0:
1438 number_events_collection.delete_one({"timestamp": TODAY})
1439 number_events_collection.insert_one({"timestamp": TODAY, "number_events": number_events_today})
1440 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1441 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1442 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1443 for day in number_events:
1444 day["timestamp"] = day["timestamp"].timestamp()
1445 items.append(cls.mongo_dict_to_object(day))
1446 return items
1449class EventRule(GenericMongo):
1450 collection_name: ClassVar[str] = "event_rules"
1452 name: str
1453 formula: str
1454 variables: list[str]
1456 @computed_field
1457 @cached_property
1458 def number_events(self) -> int:
1459 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1462class Company(GenericMongo):
1463 collection_name: ClassVar[str] = "companies"
1464 name: str
1467class Campaign(GenericMongo):
1468 collection_name: ClassVar[str] = "campaigns"
1470 # Properties
1471 id: str | None = None
1472 name: str
1473 description: str | None = None
1475 @classmethod
1476 def create(cls, campaign: Self):
1477 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1478 if new_campaign is None:
1479 return None
1480 return {"campaign_id": str(new_campaign.inserted_id)}
1482 @classmethod
1483 def update(cls, campaign: Self):
1484 updated_campaign = cls.collection().find_one_and_update(
1485 {"_id": ObjectId(campaign.id)},
1486 {"$set": {"name": campaign.name, "description": campaign.description}},
1487 return_document=ReturnDocument.AFTER,
1488 )
1489 return updated_campaign
1491 @classmethod
1492 def delete(cls, campaign_id):
1493 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1494 return deleted_user
1497class Phase(GenericMongo):
1498 collection_name: ClassVar[str] = "phases"
1500 # Properties
1501 id: str | None = None
1502 name: str
1503 description: str | None = None
1504 start_at: float
1505 end_at: float
1507 # FK
1508 campaign_id: str
1510 # @classmethod
1511 # def get_by_date(cls, datetime: float):
1512 # phases = []
1513 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1514 # phases.append(cls.dict_to_object(dict_).model_dump())
1515 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1516 # phases.append(cls.dict_to_object(dict_).model_dump())
1517 # if phases is None:
1518 # return None
1519 # return phases
1521 @classmethod
1522 def create(cls, phase: Self):
1523 phase = Phase(
1524 name=phase.name,
1525 description=phase.description,
1526 start_at=phase.start_at,
1527 end_at=phase.end_at,
1528 campaign_id=phase.campaign_id,
1529 )
1530 phase_collection = get_collection(systems_database, "phases", create=True)
1531 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1532 if new_phase is None:
1533 return None
1534 return {"phase_id": str(new_phase.inserted_id)}
1536 @classmethod
1537 def update(cls, phase: Self):
1538 updated_phase = cls.collection().find_one_and_update(
1539 {"_id": ObjectId(phase.id)},
1540 {
1541 "$set": {
1542 "name": phase.name,
1543 "description": phase.description,
1544 "start_at": phase.start_at,
1545 "end_at": phase.end_at,
1546 }
1547 },
1548 return_document=ReturnDocument.AFTER,
1549 )
1550 return updated_phase
1552 @classmethod
1553 def delete(cls, phase_id):
1554 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1555 return delete_phase
1557 @classmethod
1558 def deleteMany(cls, campaign_id):
1559 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1560 return delete_phases
1563class CustomViewCreation(GenericMongo):
1564 collection_name: ClassVar[str] = "custom_views"
1566 name: str
1567 configuration: list
1570class CustomView(CustomViewCreation):
1571 # Properties
1572 id: str | None = None
1574 # Foreign Key
1575 user_id: str
1577 # # Methods
1578 # @classmethod
1579 # def create(cls, form_custom_view: Self, user_id) -> list:
1580 # custom_view = CustomView(
1581 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1582 # )
1583 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1584 # return new_custom_view
1586 # @classmethod
1587 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1588 # updated_custom_view = cls.collection().find_one_and_update(
1589 # {"_id": ObjectId(custom_view_id)},
1590 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1591 # return_document=ReturnDocument.AFTER,
1592 # )
1593 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1594 # del updated_custom_view["_id"]
1595 # return cls(**updated_custom_view)
1597 # @classmethod
1598 # def delete(cls, custom_view_id) -> bool:
1599 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1600 # return deleted_custom_view.acknowledged
1603CustomViewUpdate = create_update_model(CustomView)
1606class Video(GenericMongo):
1607 collection_name: ClassVar[str] = "videos"
1609 # Properties
1610 name: str
1611 ip_addr: str
1612 username: str | None = None
1613 password: str | None = None
1615 # Methods
1616 @classmethod
1617 def get_all(cls, sort_by="_id") -> list[Self]:
1618 items = []
1619 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1620 items.append(cls.mongo_dict_to_object(dict_))
1621 return items
1623 @classmethod
1624 def get_video(cls, camera_id: ObjectId):
1625 camera = cls.get_from_id(camera_id)
1626 return camera.name
1629class Command(GenericMongo):
1630 collection_name: ClassVar[str] = "commands"
1632 # Properties
1633 timestamp: datetime.datetime = None
1634 sent_at: float
1635 response_time: float = 0.0
1636 command_type: str
1637 description: str = ""
1638 succeeded: bool = False
1640 # Foreign key
1641 user_id: str
1643 @classmethod
1644 def collection(cls):
1645 return get_collection(systems_database, cls.collection_name, create=True, time_series=True)
1647 @classmethod
1648 def create(cls, command: Self):
1649 command = cls(
1650 timestamp=datetime.datetime.fromtimestamp(command.sent_at, tz=pytz.UTC),
1651 sent_at=command.sent_at,
1652 response_time=command.response_time,
1653 command_type=command.command_type,
1654 description=command.description,
1655 succeeded=command.succeeded,
1656 user_id=command.user_id,
1657 )
1658 new_command = cls.collection().insert_one(command.model_dump(exclude={"id"}))
1659 if new_command is None:
1660 return None
1661 return {"command_id": str(new_command.inserted_id)}
1663 def receive_response(self, response: dict):
1664 self.response_time = time.time() - self.sent_at
1665 self.succeeded = response.get("error", True) is False
1666 if self.description == "":
1667 self.description += response.get("message", "").rstrip()