Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 90%
858 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-04 13:35 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-04 13:35 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, Union
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pydantic import BaseModel, computed_field, Field, create_model
21import numpy as npy
22import lttb
24# from scipy import signal as signal_scipy
26from twinpad_backend.db import (
27 get_collection,
28 get_async_collection,
29 get_signal_collection,
30 systems_database,
31 systems_async_database,
32 signals_database,
33 devices_states_database,
34)
35from twinpad_backend.responses import ListResponse
36from twinpad_backend.messages import send_mode_change, send_signal_value
38TYPES = ({"int": int, "float": float, "str": str, "bool": bool, "epoch": float},)
41RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
42MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
43SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
44HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
45DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
47DEVICE_TIMEOUT = 5.0
48NUMBER_SAMPLES_DATABASE_UPDATE = 120
50logger = logging.getLogger("uvicorn.error")
53def create_update_model(model):
54 fields = {}
56 for field_name, field_annotation in model.model_fields.items():
57 if field_name != "id":
58 fields[field_name] = (Union[field_annotation.annotation, None], None)
60 query_name = model.__name__ + "Update"
61 return create_model(query_name, **fields)
64def get_utc_date_from_timestamp(timestamp: float):
65 return (
66 datetime.datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %-H:%M:%S.%f") + " UTC"
67 )
70def downsample_list(time_vector: list, values: list, max_number_samples: int):
71 if len(time_vector) < max_number_samples:
72 return time_vector, values
74 time_vector_copy = copy.deepcopy(time_vector)
75 values_copy = copy.deepcopy(values)
76 try:
77 none_group_bounds = []
78 none_group_index = -1
79 index = -1
80 # LTTB doesn't handle None values so remove them
81 while values_copy.count(None) > 0:
82 # Store bounds of None value groups so we can insert them back after the downsampling
83 if (new_index := values_copy.index(None)) != index:
84 none_group_bounds.append([time_vector_copy.pop(new_index)])
85 none_group_index += 1
86 elif len(none_group_bounds[none_group_index]) < 2:
87 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
88 else:
89 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
90 values_copy.pop(new_index)
91 index = new_index
93 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
95 values_array = npy.array([time_vector_copy, values_copy]).T
96 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
98 new_time_vector = interpolated_values[:, 0].tolist()
99 new_values = interpolated_values[:, 1].tolist()
100 except ValueError:
101 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
102 new_time_vector = npy.linspace(
103 time_vector_copy[0], time_vector_copy[-1], max_number_samples, endpoint=True
104 ).tolist()
105 new_values = list(npy.interp(new_time_vector, time_vector_copy, values_copy))
106 return new_time_vector, new_values
108 # insert back None values at the correct timestamps
109 for none_group in none_group_bounds:
110 start_index = npy.searchsorted(new_time_vector, none_group[0])
111 new_time_vector[start_index:start_index] = none_group
112 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
114 return new_time_vector, new_values
117# Models
118class TwinPadModel(BaseModel):
119 @classmethod
120 def dict_to_object(cls, dict_):
121 return cls.model_validate(dict_)
123 def to_dict(self, exclude=None):
124 dict_ = self.model_dump(exclude=exclude)
125 return dict_
128class GenericMongo(TwinPadModel):
129 id: str | None = None
131 @classmethod
132 def collection(cls):
133 return get_collection(systems_database, cls.collection_name, create=True)
135 @classmethod
136 def response_from_query(cls, query) -> ListResponse[Self]:
137 req_filter = query.mongodb_filter()
138 items = []
139 if ":" in query.sort_by:
140 sort_field, sort_order = query.sort_by.split(":")
141 sort_order = int(sort_order)
142 else:
143 sort_field = query.sort_by
144 sort_order = 1
145 collection = get_collection(systems_database, cls.collection_name, create=True)
146 total = collection.count_documents(req_filter)
147 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
148 if (query.limit is not None) and (query.limit != 0):
149 cursor = cursor.limit(query.limit)
150 for item_dict in cursor:
151 items.append(cls.mongo_dict_to_object(item_dict))
152 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
154 @classmethod
155 def get_from_id(cls, item_id) -> Self | None:
156 # collection = get_collection(systems_database, cls.collection_name, create=True)
158 dict_ = cls.collection().find_one({"_id": ObjectId(item_id)})
159 if dict_ is None:
160 return None
161 return cls.mongo_dict_to_object(dict_)
163 @classmethod
164 def mongo_dict_to_object(cls, mongo_dict):
165 mongo_dict["id"] = str(mongo_dict["_id"])
166 del mongo_dict["_id"]
167 return cls.dict_to_object(mongo_dict)
169 @classmethod
170 def get_by_attribute(cls, attribute_name: str, attribute_value):
171 """Returns all items that match the attribute with value."""
172 items = cls.collection().find({attribute_name: attribute_value})
173 if items is None:
174 return None
175 return [cls.mongo_dict_to_object(d) for d in items]
177 @classmethod
178 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
179 item = cls.collection().find_one({attribute_name: attribute_value})
180 if item is None:
181 return None
182 return cls.mongo_dict_to_object(item)
184 @classmethod
185 def get_all(cls, sort_by="_id") -> list[Self]:
186 items = []
187 for dict_ in cls.collection().find({}).sort(sort_by, ASCENDING):
188 items.append(cls.mongo_dict_to_object(dict_))
189 return items
191 @classmethod
192 def get_number_documents(cls):
193 collection = get_collection(systems_database, cls.collection_name)
194 if collection is None:
195 return 0
196 return collection.count_documents({})
198 def insert(self):
199 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
200 self.id = str(insert_result.inserted_id)
201 return self.id
203 def update(self, update_dict):
204 for key, value in update_dict.items():
205 setattr(self, key, value)
206 self.collection().find_one_and_update(
207 {"_id": ObjectId(self.id)},
208 {"$set": update_dict},
209 return_document=ReturnDocument.AFTER,
210 )
212 return self
214 def delete(self):
215 result = self.collection().delete_one({"_id": ObjectId(self.id)})
216 return result.deleted_count > 0
219class Mode(TwinPadModel):
220 mode_id: int
221 name: str
222 frequency_multiplier: float
223 min_frequency: float
226class DeviceUpdate(TwinPadModel):
227 mode_id: int
230class Device(GenericMongo):
231 collection_name: ClassVar[str] = "devices"
233 device_id: str
234 name: str
235 description: str = ""
236 modes: list[Mode]
237 current_mode_id: int | None = None
238 last_ping: float | None = None
239 petri_network: Any
240 pid: Any
241 load: float | None = None
242 tokens: list[int] = Field(default_factory=list)
244 def update(self, update_dict):
245 send_mode_change(self.device_id, update_dict.mode_id)
247 @computed_field
248 @property
249 def status(self) -> str:
250 now = time.time()
251 if self.last_ping is None:
252 return "down"
253 if (now - self.last_ping) > DEVICE_TIMEOUT:
254 return "down"
255 return "up"
257 @classmethod
258 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
259 devices_by_id = {}
260 for signal_id in signal_ids:
261 device_id = signal_id.split(".")[0]
262 if device_id not in devices_by_id:
263 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
264 return devices_by_id
267class DeviceSetup(GenericMongo):
268 collection_name: ClassVar[str] = "device_setups"
270 device_ids: list[str]
271 active: bool = False
272 variable_mapping: dict[str, str]
275DeviceSetupUpdate = create_update_model(DeviceSetup)
278class DeviceState(GenericMongo):
279 collection_name: ClassVar[str] = "devices_states"
281 timestamp: float
282 mode: str | None = None
283 load: float | None = None
284 tokens: list[int] = Field(default_factory=list)
285 modified_properties: list[str] = Field(default_factory=list)
287 @classmethod
288 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
289 req_filter = query.mongodb_filter()
290 items = []
291 if ":" in query.sort_by:
292 sort_field, sort_order = query.sort_by.split(":")
293 sort_order = int(sort_order)
294 else:
295 sort_field = query.sort_by
296 sort_order = 1
297 collection = get_collection(devices_states_database, device_id)
298 if collection is None:
299 total = 0
300 cursor = []
301 else:
302 total = collection.count_documents(req_filter)
303 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
304 if (query.limit is not None) and (query.limit != 0):
305 cursor = cursor.limit(query.limit)
306 for item_dict in cursor:
307 items.append(
308 cls(
309 timestamp=item_dict.get("precise_timestamp"),
310 mode=item_dict.get("mode", None),
311 load=item_dict.get("load", None),
312 tokens=item_dict.get("tokens", Field(default_factory=list)),
313 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
314 )
315 )
316 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
319class SignalSample(TwinPadModel):
320 signal_id: str
321 timestamp: float
322 value: float | int | str | bool | None
323 forced_value: float | int | str | bool | None = None
325 @classmethod
326 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
328 collection = get_signal_collection(signal_id)
329 if collection is None:
330 return None
332 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
333 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
334 bucket = get_signal_collection(f"system.buckets.{signal_id}")
335 first_bucket = None
336 if bucket is not None:
337 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
338 if first_bucket is not None:
339 sample_data = collection.find_one(
340 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
341 )
342 else:
343 sample_data = collection.find_one({}, sort=[("precise_timestamp", 1)])
345 if sample_data is None:
346 return None
348 timestamp = sample_data["precise_timestamp"]
350 return cls(
351 signal_id=signal_id,
352 timestamp=timestamp,
353 value=sample_data.get("value", None),
354 forced_value=sample_data.get("forced_value", None),
355 )
357 @classmethod
358 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
359 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
361 @classmethod
362 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
364 collection = get_signal_collection(signal_id)
365 if collection is None:
366 return None
368 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
369 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
370 bucket = get_signal_collection(f"system.buckets.{signal_id}")
371 last_bucket = None
372 if bucket is not None:
373 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
374 if last_bucket is not None:
375 sample_data = collection.find_one({"precise_timestamp": last_bucket["control"]["max"]["precise_timestamp"]})
376 else:
377 sample_data = collection.find_one({}, sort=[("precise_timestamp", -1)])
379 if sample_data is None:
380 return None
382 timestamp = sample_data["precise_timestamp"]
384 if device is None:
385 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
386 if device is not None and device.last_ping is not None:
387 if timestamp is None:
388 timestamp = device.last_ping
389 else:
390 timestamp = max(timestamp, device.last_ping)
391 return cls(
392 signal_id=signal_id,
393 timestamp=timestamp,
394 value=sample_data.get("value", None),
395 forced_value=sample_data.get("forced_value", None),
396 )
398 @classmethod
399 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
400 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
401 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
404class SignalData(TwinPadModel):
405 signal_id: str
406 forcible: bool = True
407 time_vector: list[float]
408 values: list[float | int | str | None]
409 forced_values: list[float | int | str | None]
411 data_start: float | None = None
412 data_end: float | None = None
414 number_samples: int = 0
415 number_samples_db: int = 0
417 db_query_time: float = 0.0
418 init_time: float = 0.0
419 data_processing_time: float = 0.0
421 @classmethod
422 def get_from_signal_id(
423 cls,
424 signal_id: str,
425 min_timestamp: float = None,
426 max_timestamp: float = None,
427 window_min_timestamp: float = None,
428 window_max_timestamp: float = None,
429 interpolate_bounds: bool = True,
430 max_documents: int = None,
431 ) -> Self:
433 now = time.time()
435 req_signal = {}
436 if min_timestamp is not None:
437 req_signal.setdefault("timestamp", {})
438 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
439 if max_timestamp is not None:
440 req_signal.setdefault("timestamp", {})
441 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
443 collection = get_signal_collection(signal_id)
444 if collection is None:
445 return cls(
446 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
447 )
449 db_req_start = time.time()
451 sort_step = {"$sort": {"precise_timestamp": 1}}
452 number_results = collection.count_documents(req_signal)
454 pipeline = []
455 if req_signal:
456 pipeline.append({"$match": req_signal}) # Filter data if needed
458 pipeline.extend(
459 [
460 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
461 sort_step,
462 ]
463 )
465 if max_documents is not None and max_documents < number_results:
466 unsampling_ratio = math.ceil(number_results / max_documents)
467 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
468 pipeline.extend(
469 [
470 {
471 "$setWindowFields": {
472 "sortBy": {"precise_timestamp": 1},
473 "output": {"index": {"$documentNumber": {}}},
474 }
475 },
476 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
477 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
478 {"$replaceRoot": {"newRoot": "$doc"}},
479 {"$unset": ["index", "group_id"]},
480 {"$sort": {"precise_timestamp": 1}},
481 ]
482 )
484 # logger.info(f"pipeline: %s", str(pipeline))
485 cursor = collection.aggregate(pipeline)
486 db_req_time = time.time() - db_req_start
488 init_time = time.time()
490 results = cursor.to_list()
491 time_vector = []
492 values = []
493 forced_values = []
494 for s in results:
495 time_vector.append(s["precise_timestamp"])
496 values.append(s.get("value", None))
497 forced_values.append(s.get("forced_value", None))
499 signal = Signal.get_from_signal_id(signal_id)
500 class_ = signal.signal_data_class
502 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
503 time_vector, values, forced_values = cls.interpolate_bounds(
504 class_,
505 collection,
506 signal_id,
507 time_vector,
508 values,
509 forced_values,
510 window_min_timestamp,
511 window_max_timestamp,
512 )
514 if values:
515 # TODO: check below. a bit strange
516 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
517 # Adding last value as it should be repeated
518 time_vector.append(now)
519 values.append(values[-1])
520 forced_values.append(forced_values[-1])
522 init_time = time.time() - init_time
524 # See line 292 for explanation
525 bucket = get_signal_collection(f"system.buckets.{signal_id}")
526 first_bucket = None
527 if bucket is not None:
528 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
529 if first_bucket is not None:
530 data_start = first_bucket["control"]["min"]["precise_timestamp"]
531 else:
532 data_start = None
534 last_bucket = None
535 if bucket is not None:
536 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
537 if last_bucket is not None:
538 data_end = last_bucket["control"]["max"]["precise_timestamp"]
539 else:
540 data_end = None
542 return class_(
543 signal_id=signal_id,
544 forcible=signal.forcible,
545 time_vector=time_vector,
546 values=values,
547 forced_values=forced_values,
548 data_start=data_start,
549 data_end=data_end,
550 number_samples=len(values),
551 number_samples_db=number_results,
552 db_query_time=db_req_time,
553 init_time=init_time,
554 )
556 @staticmethod
557 def interpolate_bounds(
558 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
559 ):
560 sample_right = None
561 # Fetching right side value & interpolation
562 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
563 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
564 sample_right = collection.find_one(
565 {
566 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
567 "value": {"$exists": True},
568 },
569 sort=[("precise_timestamp", -1)],
570 )
571 if sample_right:
572 if time_vector:
573 right_sd = class_(
574 signal_id=signal_id,
575 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
576 values=[values[-1], sample_right.get("value", None)],
577 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
578 )
579 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
580 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
581 else:
582 max_ts_value = sample_right.get("value", None)
583 max_ts_forced_value = sample_right.get("forced_value", None)
584 time_vector.append(window_max_timestamp)
585 values.append(max_ts_value)
586 forced_values.append(max_ts_forced_value)
588 # Fetching left side value & interpolation
589 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
590 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
591 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
592 sample_left = sample_right
593 sample_left = collection.find_one(
594 {
595 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
596 "value": {"$exists": True},
597 },
598 sort=[("precise_timestamp", -1)],
599 )
601 if sample_left:
602 if time_vector:
603 left_sd = class_(
604 signal_id=signal_id,
605 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
606 values=[sample_left["value"], values[0]],
607 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
608 )
609 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
610 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
611 else:
612 min_ts_value = sample_left.get("value", None)
613 min_ts_forced_value = sample_left.get("forced_value", None)
614 time_vector.insert(0, window_min_timestamp)
615 values.insert(0, min_ts_value)
616 forced_values.insert(0, min_ts_forced_value)
618 return time_vector, values, forced_values
620 def interpolate_values(self, new_time_vector: list[float]):
621 return self.interpolate(new_time_vector, self.values)
623 def interpolate_forced_values(self, new_time_vector: list[float]):
624 return self.interpolate(new_time_vector, self.forced_values)
626 def uniform_desampling(self, number_samples_max: int) -> Self:
627 data_processing_time = time.time()
628 if number_samples_max and self.number_samples > number_samples_max:
629 new_time_vector = npy.linspace(
630 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
631 ).tolist()
632 values = self.interpolate_values(new_time_vector)
633 forced_values = self.interpolate_forced_values(new_time_vector)
634 time_vector = new_time_vector
635 number_samples = len(time_vector)
636 else:
637 time_vector = self.time_vector
638 number_samples = len(self.values)
639 values = self.values[:]
640 forced_values = self.forced_values[:]
641 data_processing_time = time.time() - data_processing_time
643 return self.__class__(
644 signal_id=self.signal_id,
645 time_vector=time_vector,
646 values=values,
647 forced_values=forced_values,
648 number_samples=number_samples,
649 number_samples_db=self.number_samples,
650 data_start=self.data_start,
651 data_end=self.data_end,
652 db_query_time=self.db_query_time,
653 init_time=self.init_time,
654 data_processing_time=self.data_processing_time + data_processing_time,
655 )
657 def interest_window_desampling(
658 self,
659 window_max_number_samples: int,
660 outside_max_number_samples: int,
661 window_min_timestamp: float | None = None,
662 window_max_timestamp: float | None = None,
663 ) -> Self:
664 """Performs a sampling in a window of interest and outside."""
666 if not self.time_vector:
667 return self
669 if window_min_timestamp is None:
670 window_min_timestamp = self.time_vector[0]
671 if window_max_timestamp is None:
672 window_max_timestamp = self.time_vector[-1]
674 data_processing_time = time.time()
676 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
677 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
679 time_vector_before = self.time_vector[:index_window_start]
680 time_vector_window = self.time_vector[index_window_start:index_window_end]
681 time_vector_after = self.time_vector[index_window_end:]
683 # Resampling window
684 if time_vector_window:
685 # Ensurring window bounds
686 if time_vector_window[0] != window_min_timestamp:
687 time_vector_window.insert(0, window_min_timestamp)
688 if time_vector_window[-1] != window_max_timestamp:
689 time_vector_window.append(window_max_timestamp)
690 else:
691 time_vector_window = [window_min_timestamp, window_max_timestamp]
693 if len(time_vector_window) > window_max_number_samples:
694 # Resampling
695 new_window_time_vector = npy.linspace(
696 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
697 ).tolist()
698 time_vector_window = new_window_time_vector
700 # Resampling outside
701 number_samples_before = len(time_vector_before)
702 number_samples_after = len(time_vector_after)
703 if (number_samples_before + number_samples_after) > outside_max_number_samples:
704 new_number_samples_before = math.ceil(
705 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
706 )
707 new_number_samples_after = math.ceil(
708 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
709 )
710 # Adjusting numbers as math.ceil can do +1 on sum
711 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
712 if new_number_samples_before > new_number_samples_after:
713 new_number_samples_before -= 1
714 else:
715 new_number_samples_after -= 1
717 if new_number_samples_before:
718 new_time_vector_before = npy.linspace(
719 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
720 ).tolist()
721 time_vector_before = new_time_vector_before
723 if number_samples_after:
724 new_time_vector_after = npy.linspace(
725 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
726 ).tolist()[::-1]
727 time_vector_after = new_time_vector_after
729 new_time_vector = time_vector_before + time_vector_window + time_vector_after
730 values = self.interpolate_values(new_time_vector)
731 forced_values = self.interpolate_forced_values(new_time_vector)
732 number_samples = len(values)
734 data_processing_time = time.time() - data_processing_time
736 return self.__class__(
737 signal_id=self.signal_id,
738 forcible=self.forcible,
739 time_vector=new_time_vector,
740 values=values,
741 forced_values=forced_values,
742 number_samples=number_samples,
743 number_samples_db=self.number_samples,
744 data_start=self.data_start,
745 data_end=self.data_end,
746 db_query_time=self.db_query_time,
747 init_time=self.init_time,
748 data_processing_time=self.data_processing_time + data_processing_time,
749 )
751 def csv_export(self):
752 output = io.StringIO()
753 writer = csv.writer(output)
754 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
755 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
756 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
757 return output.getvalue().encode("utf-8")
759 def prestoplot_export(self):
760 clean_signal_id = self.signal_id.replace(".", "_")
761 if clean_signal_id[0].isnumeric():
762 clean_signal_id = "_" + clean_signal_id
764 output = io.StringIO()
765 output.write("# Encoding:\tUTF-8\n")
766 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
767 output.write("ISO8601\tnone\tnone\n")
768 output.write(f"# Description :\t{clean_signal_id}\n")
770 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
771 output.write(
772 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
773 )
774 return output.getvalue().encode("utf-8")
777class NumericSignalData(SignalData):
778 data_type: str = "float"
779 values: list[float | int | None]
780 forced_values: list[float | int | None]
782 def interpolate(self, new_time_vector: list[float], items):
783 items = [npy.nan if s is None else s for s in items]
784 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
786 def uniform_desampling(self, number_samples_max: int) -> Self:
787 data_processing_time = time.time()
788 if number_samples_max and self.number_samples > number_samples_max:
789 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
790 forced_values = self.interpolate_forced_values(time_vector)
791 number_samples = len(time_vector)
792 else:
793 time_vector = self.time_vector
794 number_samples = len(self.values)
795 values = self.values[:]
796 forced_values = self.forced_values[:]
797 data_processing_time = time.time() - data_processing_time
799 return self.__class__(
800 signal_id=self.signal_id,
801 time_vector=time_vector,
802 values=values,
803 forced_values=forced_values,
804 number_samples=number_samples,
805 number_samples_db=self.number_samples,
806 data_start=self.data_start,
807 data_end=self.data_end,
808 db_query_time=self.db_query_time,
809 init_time=self.init_time,
810 data_processing_time=self.data_processing_time + data_processing_time,
811 )
813 def interest_window_desampling(
814 self,
815 window_max_number_samples: int,
816 outside_max_number_samples: int,
817 window_min_timestamp: float | None = None,
818 window_max_timestamp: float | None = None,
819 ) -> Self:
820 """Performs a sampling in a window of interest and outside."""
822 if not self.time_vector:
823 return self
825 if window_min_timestamp is None:
826 window_min_timestamp = self.time_vector[0]
827 if window_max_timestamp is None:
828 window_max_timestamp = self.time_vector[-1]
830 data_processing_time = time.time()
832 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
833 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
835 time_vector_before = self.time_vector[:index_window_start]
836 time_vector_window = self.time_vector[index_window_start:index_window_end]
837 time_vector_after = self.time_vector[index_window_end:]
839 values_before = self.values[:index_window_start]
840 values_window = self.values[index_window_start:index_window_end]
841 values_after = self.values[index_window_end:]
842 window_min_value = self.interpolate_values([window_min_timestamp])[0]
843 window_max_value = self.interpolate_values([window_max_timestamp])[0]
845 # Resampling window
846 if time_vector_window:
847 # Ensurring window bounds
848 if time_vector_window[0] != window_min_timestamp:
849 time_vector_window.insert(0, window_min_timestamp)
850 values_window.insert(0, window_min_value)
851 if time_vector_window[-1] != window_max_timestamp:
852 time_vector_window.append(window_max_timestamp)
853 values_window.append(window_max_value)
854 else:
855 time_vector_window = [window_min_timestamp, window_max_timestamp]
856 values_window = [window_min_value, window_max_value]
858 if len(time_vector_window) > window_max_number_samples:
859 # Resampling
860 time_vector_window, values_window = downsample_list(
861 time_vector_window, values_window, window_max_number_samples
862 )
864 # Resampling outside
865 number_samples_before = len(time_vector_before)
866 number_samples_after = len(time_vector_after)
867 if (number_samples_before + number_samples_after) > outside_max_number_samples:
868 new_number_samples_before = math.ceil(
869 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
870 )
871 new_number_samples_after = math.ceil(
872 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
873 )
874 # Adjusting numbers as math.ceil can do +1 on sum
875 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
876 if new_number_samples_before > new_number_samples_after:
877 new_number_samples_before -= 1
878 else:
879 new_number_samples_after -= 1
881 if new_number_samples_before:
882 time_vector_before, values_before = downsample_list(
883 time_vector_before, values_before, new_number_samples_before
884 )
886 if number_samples_after:
887 time_vector_after, values_after = downsample_list(
888 time_vector_after, values_after, new_number_samples_after
889 )
891 new_time_vector = time_vector_before + time_vector_window + time_vector_after
892 values = values_before + values_window + values_after
893 forced_values = self.interpolate_forced_values(new_time_vector)
894 number_samples = len(values)
896 data_processing_time = time.time() - data_processing_time
898 return self.__class__(
899 signal_id=self.signal_id,
900 time_vector=new_time_vector,
901 values=values,
902 forced_values=forced_values,
903 number_samples=number_samples,
904 number_samples_db=self.number_samples,
905 data_start=self.data_start,
906 data_end=self.data_end,
907 db_query_time=self.db_query_time,
908 init_time=self.init_time,
909 data_processing_time=self.data_processing_time + data_processing_time,
910 )
913class StringSignalData(SignalData):
914 data_type: str = "str"
915 values: list[str | None]
916 forced_values: list[str | None]
918 def interpolate(self, new_time_vector: list[float], items):
919 # Find the indices of the values in xp that are just smaller or equal to x
920 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
921 indices = npy.clip(indices, 0, len(items) - 1)
922 # Return the corresponding left string values from fp
923 return [items[i] for i in indices]
926class SignalsData(TwinPadModel):
927 signals_data: list[NumericSignalData | StringSignalData | SignalData]
928 data_processing_time: float
929 data_start: float | None
930 data_end: float | None
932 @classmethod
933 def get_from_signal_ids(
934 cls,
935 signal_ids: list[str],
936 min_timestamp: float = None,
937 max_timestamp: float = None,
938 window_min_timestamp: float = None,
939 window_max_timestamp: float = None,
940 interpolate_bounds: bool = True,
941 max_documents: int = None,
942 ) -> Self:
943 signals_data = []
944 data_start = None
945 data_end = None
946 if max_timestamp is None:
947 max_timestamp = time.time()
948 data_processing_time = 0.0
949 for signal_id in signal_ids:
950 signal_data = SignalData.get_from_signal_id(
951 signal_id=signal_id,
952 min_timestamp=min_timestamp,
953 max_timestamp=max_timestamp,
954 window_min_timestamp=window_min_timestamp,
955 window_max_timestamp=window_max_timestamp,
956 interpolate_bounds=interpolate_bounds,
957 max_documents=max_documents,
958 )
959 data_processing_time += signal_data.data_processing_time
960 signals_data.append(signal_data)
961 if signal_data.data_start is not None:
962 if data_start is None:
963 data_start = signal_data.data_start
964 else:
965 data_start = min(signal_data.data_start, data_start)
966 if signal_data.data_end is not None:
967 if data_end is None:
968 data_end = signal_data.data_end
969 else:
970 data_end = max(signal_data.data_end, data_end)
972 return cls(
973 signals_data=signals_data,
974 data_processing_time=data_processing_time,
975 data_start=data_start,
976 data_end=data_end,
977 )
979 def uniform_desampling(self, number_samples_max: int) -> Self:
980 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
981 return SignalsData(
982 signals_data=signals_data,
983 data_processing_time=sum(s.data_processing_time for s in signals_data),
984 data_start=self.data_start,
985 data_end=self.data_end,
986 )
988 def interest_window_desampling(
989 self,
990 window_max_number_samples: int,
991 outside_max_number_samples: int,
992 window_min_timestamp: float = None,
993 window_max_timestamp: float = None,
994 ) -> Self:
995 signals_data = [
996 s.interest_window_desampling(
997 window_max_number_samples=window_max_number_samples,
998 outside_max_number_samples=outside_max_number_samples,
999 window_min_timestamp=window_min_timestamp,
1000 window_max_timestamp=window_max_timestamp,
1001 )
1002 for s in self.signals_data
1003 ]
1005 return SignalsData(
1006 signals_data=signals_data,
1007 data_processing_time=sum(s.data_processing_time for s in signals_data),
1008 data_start=self.data_start,
1009 data_end=self.data_end,
1010 )
1012 def zip_export(self, file_format: str = "csv"):
1013 # return self.signals_data[0].csv_export()
1014 zip_buffer = io.BytesIO()
1015 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1016 for signal_data in self.signals_data:
1017 if file_format == "csv":
1018 export_io = signal_data.csv_export()
1019 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1020 elif file_format == "prestoplot":
1021 export_io = signal_data.prestoplot_export()
1022 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1023 else:
1024 raise ValueError(f"Format not found. Got: {file_format}")
1025 zip_bytes = zip_buffer.getvalue()
1026 # zip_bytes.seek(0)
1027 return zip_bytes
1030class SignalStatus(TwinPadModel):
1031 status: str
1032 reason: str
1033 delay: float | None
1036class DigitizationFunction(TwinPadModel):
1037 bits: int | None = None
1038 min_value: float
1039 max_value: float
1040 min_raw_value: float
1041 max_raw_value: float
1044class SignalUpdate(TwinPadModel):
1045 value: float | str | bool | int | None = None
1046 forced_value: float | str | bool | int | None = None
1047 timestamp: int | None = None
1050class SignalType(str, Enum):
1051 command = "command"
1052 sensor = "sensor"
1053 external_sensor = "external_sensor"
1056SIGNALDATA_TYPES = {
1057 "int": NumericSignalData,
1058 "float": NumericSignalData,
1059 "str": StringSignalData,
1060 "bool": NumericSignalData,
1061 "epoch": NumericSignalData,
1062}
1065class Signal(GenericMongo):
1066 collection_name: ClassVar[str] = "signals"
1068 signal_id: str
1069 frequency: float
1070 unit: str | None
1071 description: str
1072 type: SignalType
1073 data_type: str
1074 precision_digits: int | None
1075 forcible: bool
1077 digitization_function: DigitizationFunction | None
1079 @property
1080 def device(self) -> Device:
1081 device_id = self.signal_id.split(".")[0]
1082 device = Device.get_one_by_attribute("device_id", device_id)
1083 return device
1085 @cached_property
1086 def signal_data_class(self):
1087 if self.data_type in SIGNALDATA_TYPES:
1088 return SIGNALDATA_TYPES[self.data_type]
1089 if self.data_type.startswith("enum"):
1090 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1091 raise ValueError(f"Unhandled python type: {self.data_type}")
1093 @cached_property
1094 def python_type(self):
1095 if self.data_type in TYPES:
1096 return TYPES[self.data_type]
1097 if self.data_type.startswith("enum"):
1098 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1099 return Literal[*choices]
1100 raise ValueError(f"Unhandled python type: {self.data_type}")
1102 @computed_field
1103 @property
1104 def status(self) -> SignalStatus:
1105 now = time.time()
1106 status = "up"
1107 reason = ""
1109 # See line 292 for explanation
1110 bucket = get_signal_collection(f"system.buckets.{self.signal_id}")
1111 last_bucket = None
1112 if bucket is not None:
1113 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1114 if last_bucket is None:
1115 status = "no data"
1116 reason = "signal does not exist"
1117 return SignalStatus(status=status, reason=reason, delay=None)
1119 try:
1120 last_date = last_bucket["control"]["max"]["timestamp"]
1121 last_date = last_date.replace(tzinfo=pytz.UTC)
1122 last_value_ts = last_date.timestamp()
1123 except IndexError:
1124 last_value_ts = None
1126 if last_value_ts is None:
1127 delay = None
1128 reason = "No data from signal"
1129 else:
1130 # Since device is a computed property, only compute it once
1131 device = self.device
1132 if device is not None and device.last_ping is not None:
1133 last_value_ts = max(last_value_ts, device.last_ping)
1134 delay = now - last_value_ts
1135 if delay > DEVICE_TIMEOUT:
1136 status = "down"
1137 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1138 return SignalStatus(status=status, reason=reason, delay=delay)
1140 def update(self, update_dict: SignalUpdate):
1141 send_signal_value(self.signal_id, update_dict)
1143 @classmethod
1144 def get_from_signal_id(cls, signal_id) -> Self:
1145 """Could be generic from mongo"""
1146 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1147 if not raw_value:
1148 return None
1149 del raw_value["_id"]
1150 return cls.dict_to_object(raw_value)
1152 async def number_samples(self):
1153 collection = get_signal_collection(signal_id=self.signal_id)
1154 if collection is None:
1155 return 0
1157 number_samples = collection.estimated_document_count()
1159 number_samples_async_collection = await get_async_collection(
1160 systems_async_database, "number_samples", create=True, time_series=True
1161 )
1163 loop = asyncio.get_running_loop()
1164 loop.create_task(
1165 number_samples_async_collection.insert_one(
1166 {
1167 "timestamp": datetime.datetime.now(pytz.UTC),
1168 "signal_id": self.signal_id,
1169 "number_samples": number_samples,
1170 }
1171 )
1172 )
1174 return number_samples
1176 def sample_datasize(self):
1177 return signals_database.command("collstats", self.signal_id)["size"]
1179 @classmethod
1180 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1181 result = cls.collection().aggregate(
1182 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1183 )
1185 return {signal["signal_id"]: signal["forcible"] for signal in result}
1188class ServicesStatus(TwinPadModel):
1189 backend: str
1190 cloud_broker: str
1191 time_series_database: str
1192 signal_storage: str
1193 heartbeat_storage: str
1194 data_analyzer: str
1196 @classmethod
1197 def check(cls) -> Self:
1198 return cls(
1199 cloud_broker=ping(RABBITMQ_HOST),
1200 backend="up",
1201 time_series_database=ping(MONGO_HOST),
1202 signal_storage=ping(SIGNAL_STORAGE_HOST),
1203 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1204 data_analyzer=ping(DATA_ANALYZER_HOST),
1205 )
1208def ping(host):
1209 try:
1210 if ping3.ping(host, timeout=0.8):
1211 return "up"
1212 except PermissionError:
1213 pass
1214 return "down"
1217class Event(GenericMongo):
1218 collection_name: ClassVar[str] = "events"
1220 name: str
1221 timestamp: float
1222 event_rule_id: str
1224 @computed_field
1225 @cached_property
1226 def event_rule(self) -> "EventRule":
1227 return EventRule.get_from_id(self.event_rule_id)
1229 @classmethod
1230 def dict_to_object(cls, dict_):
1231 """Refine to convert timestamp to datetime for mongodb."""
1232 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1233 return super().dict_to_object(dict_)
1236class EventDay(GenericMongo):
1237 collection_name: ClassVar[str] = "number_events"
1239 timestamp: float
1240 number_events: int
1242 @classmethod
1243 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_events) -> list[Self]:
1244 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1245 number_events_collection = get_collection(systems_database, "number_events")
1246 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1247 items = []
1248 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1249 if number_events_collection is None or recompute_events:
1250 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1251 first_event = events_collection.find_one(sort={"timestamp": 1})
1252 if first_event is None:
1253 return items
1254 # compute from day of first found event
1255 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1256 tzinfo=pytz.UTC
1257 )
1258 while last_computed_day < TODAY:
1259 day_nb_events = events_collection.count_documents(
1260 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1261 )
1262 if day_nb_events > 0:
1263 number_events_collection.insert_one(
1264 {"timestamp": last_computed_day, "number_events": day_nb_events}
1265 )
1266 last_computed_day += ONE_DAY_OFFSET
1267 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1268 if number_events_today > 0:
1269 number_events_collection.delete_one({"timestamp": TODAY})
1270 number_events_collection.insert_one({"timestamp": TODAY, "number_events": number_events_today})
1271 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1272 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1273 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1274 for day in number_events:
1275 day["timestamp"] = day["timestamp"].timestamp()
1276 items.append(cls.mongo_dict_to_object(day))
1277 return items
1280class EventRule(GenericMongo):
1281 collection_name: ClassVar[str] = "event_rules"
1283 name: str
1284 formula: str
1285 variables: list[str]
1287 @computed_field
1288 @cached_property
1289 def number_events(self) -> int:
1290 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1293class Company(GenericMongo):
1294 collection_name: ClassVar[str] = "companies"
1295 name: str
1298class User(GenericMongo):
1299 collection_name: ClassVar[str] = "users"
1301 firstname: str
1302 lastname: str
1303 email: str
1304 password: str
1305 is_active: bool | None = False
1306 is_admin: bool | None = False
1307 is_connected: bool | None = False
1308 company_id: str | None = None
1310 def to_dict(self, exclude=None):
1311 if exclude is None:
1312 exclude = {"password"}
1313 return GenericMongo.to_dict(self, exclude=exclude)
1315 @classmethod
1316 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
1317 users = cls.get_all()
1318 if not users:
1319 is_admin = True
1320 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
1321 user_collection = get_collection(systems_database, "users", create=True)
1322 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
1323 if new_user is None:
1324 return None
1325 return {"user_id": str(new_user.inserted_id)}
1327 @classmethod
1328 def update(cls, user: "UserUpdate", user_id: str):
1329 updated_user = cls.collection().find_one_and_update(
1330 {"_id": ObjectId(user_id)},
1331 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
1332 return_document=ReturnDocument.AFTER,
1333 )
1334 updated_user["id"] = str(updated_user["_id"])
1335 del (updated_user["_id"], updated_user["is_connected"])
1336 return cls(**updated_user)
1339UserUpdate = create_update_model(User)
1342class Campaign(GenericMongo):
1343 collection_name: ClassVar[str] = "campaigns"
1345 # Properties
1346 id: str | None = None
1347 name: str
1348 description: str | None = None
1350 @classmethod
1351 def create(cls, campaign: Self):
1352 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1353 if new_campaign is None:
1354 return None
1355 return {"campaign_id": str(new_campaign.inserted_id)}
1357 @classmethod
1358 def update(cls, campaign: Self):
1359 updated_campaign = cls.collection().find_one_and_update(
1360 {"_id": ObjectId(campaign.id)},
1361 {"$set": {"name": campaign.name, "description": campaign.description}},
1362 return_document=ReturnDocument.AFTER,
1363 )
1364 return updated_campaign
1366 @classmethod
1367 def delete(cls, campaign_id):
1368 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1369 return deleted_user
1372class Phase(GenericMongo):
1373 collection_name: ClassVar[str] = "phases"
1375 # Properties
1376 id: str | None = None
1377 name: str
1378 description: str | None = None
1379 start_at: float
1380 end_at: float
1382 # FK
1383 campaign_id: str
1385 # @classmethod
1386 # def get_by_date(cls, datetime: float):
1387 # phases = []
1388 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1389 # phases.append(cls.dict_to_object(dict_).model_dump())
1390 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1391 # phases.append(cls.dict_to_object(dict_).model_dump())
1392 # if phases is None:
1393 # return None
1394 # return phases
1396 @classmethod
1397 def create(cls, phase: Self):
1398 phase = Phase(
1399 name=phase.name,
1400 description=phase.description,
1401 start_at=phase.start_at,
1402 end_at=phase.end_at,
1403 campaign_id=phase.campaign_id,
1404 )
1405 phase_collection = get_collection(systems_database, "phases", create=True)
1406 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1407 if new_phase is None:
1408 return None
1409 return {"phase_id": str(new_phase.inserted_id)}
1411 @classmethod
1412 def update(cls, phase: Self):
1413 updated_phase = cls.collection().find_one_and_update(
1414 {"_id": ObjectId(phase.id)},
1415 {
1416 "$set": {
1417 "name": phase.name,
1418 "description": phase.description,
1419 "start_at": phase.start_at,
1420 "end_at": phase.end_at,
1421 }
1422 },
1423 return_document=ReturnDocument.AFTER,
1424 )
1425 return updated_phase
1427 @classmethod
1428 def delete(cls, phase_id):
1429 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1430 return delete_phase
1432 @classmethod
1433 def deleteMany(cls, campaign_id):
1434 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1435 return delete_phases
1438class CustomViewCreation(GenericMongo):
1439 collection_name: ClassVar[str] = "custom_views"
1441 name: str
1442 configuration: list
1445class CustomView(CustomViewCreation):
1446 # Properties
1447 id: str | None = None
1449 # Foreign Key
1450 user_id: str
1452 # # Methods
1453 # @classmethod
1454 # def create(cls, form_custom_view: Self, user_id) -> list:
1455 # custom_view = CustomView(
1456 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1457 # )
1458 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1459 # return new_custom_view
1461 # @classmethod
1462 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1463 # updated_custom_view = cls.collection().find_one_and_update(
1464 # {"_id": ObjectId(custom_view_id)},
1465 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1466 # return_document=ReturnDocument.AFTER,
1467 # )
1468 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1469 # del updated_custom_view["_id"]
1470 # return cls(**updated_custom_view)
1472 # @classmethod
1473 # def delete(cls, custom_view_id) -> bool:
1474 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1475 # return deleted_custom_view.acknowledged
1478CustomViewUpdate = create_update_model(CustomView)
1481class Video(GenericMongo):
1482 collection_name: ClassVar[str] = "videos"
1484 # Properties
1485 name: str
1486 ip_addr: str
1487 username: str | None = None
1488 password: str | None = None
1490 # Methods
1491 @classmethod
1492 def get_all(cls, sort_by="_id") -> list[Self]:
1493 items = []
1494 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1495 items.append(cls.mongo_dict_to_object(dict_))
1496 return items
1498 @classmethod
1499 def get_video(cls, camera_id: ObjectId):
1500 camera = cls.get_from_id(camera_id)
1501 return camera.name