Coverage for /usr/local/lib/python3.11/site-packages/twinpad_backend/models.py: 91%
862 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-10 15:38 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-10 15:38 +0000
1from functools import cached_property
2import os
3import io
4import time
5import csv
6from typing import Self, ClassVar, Any, Literal, Union
7import datetime
8import math
9import bisect
10from enum import Enum
11import logging
12import copy
13import asyncio
15import zipfile
16import ping3
17import pytz
18from bson.objectid import ObjectId
19from pymongo import ASCENDING, ReturnDocument
20from pydantic import BaseModel, computed_field, Field, create_model
21import numpy as npy
22import lttb
24# from scipy import signal as signal_scipy
26from twinpad_backend.db import (
27 get_collection,
28 get_async_collection,
29 get_signal_collection,
30 systems_database,
31 systems_async_database,
32 signals_database,
33 devices_states_database,
34)
35from twinpad_backend.responses import ListResponse
36from twinpad_backend.messages import send_mode_change, send_signal_value
38TYPES = ({"int": int, "float": float, "str": str, "bool": bool, "epoch": float},)
41RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "cloud-broker")
42MONGO_HOST = os.environ.get("MONGO_HOST", "time-series-db")
43SIGNAL_STORAGE_HOST = os.environ.get("SIGNAL_STORAGE_HOST", "signal-storage")
44HEARTBEAT_STORAGE_HOST = os.environ.get("HEARTBEAT_STORAGE_HOST", "heartbeat-storage")
45DATA_ANALYZER_HOST = os.environ.get("DATA_ANALYZER_HOST", "data-analyzer")
47DEVICE_TIMEOUT = 5.0
48NUMBER_SAMPLES_DATABASE_UPDATE = 120
50logger = logging.getLogger("uvicorn.error")
53def create_update_model(model):
54 fields = {}
56 for field_name, field_annotation in model.model_fields.items():
57 if field_name != "id":
58 fields[field_name] = (Union[field_annotation.annotation, None], None)
60 query_name = model.__name__ + "Update"
61 return create_model(query_name, **fields)
64def get_utc_date_from_timestamp(timestamp: float):
65 return (
66 datetime.datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %-H:%M:%S.%f") + " UTC"
67 )
70def downsample_list(time_vector: list, values: list, max_number_samples: int):
71 if len(time_vector) < max_number_samples:
72 return time_vector, values
74 time_vector_copy = copy.deepcopy(time_vector)
75 values_copy = copy.deepcopy(values)
76 try:
77 none_group_bounds = []
78 none_group_index = -1
79 index = -1
80 # LTTB doesn't handle None values so remove them
81 while values_copy.count(None) > 0:
82 # Store bounds of None value groups so we can insert them back after the downsampling
83 if (new_index := values_copy.index(None)) != index:
84 none_group_bounds.append([time_vector_copy.pop(new_index)])
85 none_group_index += 1
86 elif len(none_group_bounds[none_group_index]) < 2:
87 none_group_bounds[none_group_index].append(time_vector_copy.pop(new_index))
88 else:
89 none_group_bounds[none_group_index][1] = time_vector_copy.pop(new_index)
90 values_copy.pop(new_index)
91 index = new_index
93 number_none_values = sum(len(none_group) for none_group in none_group_bounds)
95 values_array = npy.array([time_vector_copy, values_copy]).T
96 interpolated_values = lttb.downsample(values_array, n_out=max_number_samples - number_none_values)
98 new_time_vector = interpolated_values[:, 0].tolist()
99 new_values = interpolated_values[:, 1].tolist()
100 except ValueError:
101 # If LTTB downsampling doesn't work, do a simple numpy interpolation on a linspace
102 new_time_vector = npy.linspace(
103 time_vector_copy[0], time_vector_copy[-1], max_number_samples, endpoint=True
104 ).tolist()
105 new_values = list(npy.interp(new_time_vector, time_vector_copy, values_copy))
106 return new_time_vector, new_values
108 # insert back None values at the correct timestamps
109 for none_group in none_group_bounds:
110 start_index = npy.searchsorted(new_time_vector, none_group[0])
111 new_time_vector[start_index:start_index] = none_group
112 new_values[start_index:start_index] = [None for _ in range(len(none_group))]
114 return new_time_vector, new_values
117# Models
118class TwinPadModel(BaseModel):
119 @classmethod
120 def dict_to_object(cls, dict_):
121 return cls.model_validate(dict_)
123 def to_dict(self, exclude=None):
124 dict_ = self.model_dump(exclude=exclude)
125 return dict_
128class GenericMongo(TwinPadModel):
129 id: str | None = None
131 @classmethod
132 def collection(cls):
133 return get_collection(systems_database, cls.collection_name, create=True)
135 @classmethod
136 def response_from_query(cls, query) -> ListResponse[Self]:
137 req_filter = query.mongodb_filter()
138 items = []
139 if ":" in query.sort_by:
140 sort_field, sort_order = query.sort_by.split(":")
141 sort_order = int(sort_order)
142 else:
143 sort_field = query.sort_by
144 sort_order = 1
145 collection = get_collection(systems_database, cls.collection_name, create=True)
146 total = collection.count_documents(req_filter)
147 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
148 if (query.limit is not None) and (query.limit != 0):
149 cursor = cursor.limit(query.limit)
150 for item_dict in cursor:
151 items.append(cls.mongo_dict_to_object(item_dict))
152 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
154 @classmethod
155 def get_from_id(cls, item_id) -> Self | None:
156 # collection = get_collection(systems_database, cls.collection_name, create=True)
158 dict_ = cls.collection().find_one({"_id": ObjectId(item_id)})
159 if dict_ is None:
160 return None
161 return cls.mongo_dict_to_object(dict_)
163 @classmethod
164 def mongo_dict_to_object(cls, mongo_dict):
165 mongo_dict["id"] = str(mongo_dict["_id"])
166 del mongo_dict["_id"]
167 return cls.dict_to_object(mongo_dict)
169 @classmethod
170 def get_by_attribute(cls, attribute_name: str, attribute_value):
171 """Returns all items that match the attribute with value."""
172 items = cls.collection().find({attribute_name: attribute_value})
173 if items is None:
174 return None
175 return [cls.mongo_dict_to_object(d) for d in items]
177 @classmethod
178 def get_one_by_attribute(cls, attribute_name: str, attribute_value):
179 item = cls.collection().find_one({attribute_name: attribute_value})
180 if item is None:
181 return None
182 return cls.mongo_dict_to_object(item)
184 @classmethod
185 def get_all(cls, sort_by="_id") -> list[Self]:
186 items = []
187 for dict_ in cls.collection().find({}).sort(sort_by, ASCENDING):
188 items.append(cls.mongo_dict_to_object(dict_))
189 return items
191 @classmethod
192 def get_number_documents(cls):
193 collection = get_collection(systems_database, cls.collection_name)
194 if collection is None:
195 return 0
196 return collection.count_documents({})
198 def insert(self):
199 insert_result = self.collection().insert_one(self.to_dict(exclude={id}))
200 self.id = str(insert_result.inserted_id)
201 return self.id
203 def update(self, update_dict):
204 for key, value in update_dict.items():
205 setattr(self, key, value)
206 self.collection().find_one_and_update(
207 {"_id": ObjectId(self.id)},
208 {"$set": update_dict},
209 return_document=ReturnDocument.AFTER,
210 )
212 return self
214 def delete(self):
215 result = self.collection().delete_one({"_id": ObjectId(self.id)})
216 return result.deleted_count > 0
219class Mode(TwinPadModel):
220 mode_id: int
221 name: str
222 frequency_multiplier: float
223 min_frequency: float
226class DeviceUpdate(TwinPadModel):
227 mode_id: int
230class Device(GenericMongo):
231 collection_name: ClassVar[str] = "devices"
233 device_id: str
234 name: str
235 description: str = ""
236 modes: list[Mode]
237 current_mode_id: int | None = None
238 last_ping: float | None = None
239 petri_network: Any
240 pid: Any
241 load: float | None = None
242 tokens: list[int] = Field(default_factory=list)
244 def update(self, update_dict):
245 send_mode_change(self.device_id, update_dict.mode_id)
247 @computed_field
248 @property
249 def status(self) -> str:
250 now = time.time()
251 if self.last_ping is None:
252 return "down"
253 if (now - self.last_ping) > DEVICE_TIMEOUT:
254 return "down"
255 return "up"
257 @classmethod
258 def get_multiple_from_signal_ids(cls, signal_ids: list[str]) -> dict:
259 devices_by_id = {}
260 for signal_id in signal_ids:
261 device_id = signal_id.split(".")[0]
262 if device_id not in devices_by_id:
263 devices_by_id[device_id] = cls.get_one_by_attribute("device_id", device_id)
264 return devices_by_id
267class DeviceSetup(GenericMongo):
268 collection_name: ClassVar[str] = "device_setups"
270 device_ids: list[str]
271 active: bool = False
272 variable_mapping: dict[str, str]
275DeviceSetupUpdate = create_update_model(DeviceSetup)
278class DeviceState(GenericMongo):
279 collection_name: ClassVar[str] = "devices_states"
281 timestamp: float
282 mode: str | None = None
283 load: float | None = None
284 tokens: list[int] = Field(default_factory=list)
285 modified_properties: list[str] = Field(default_factory=list)
287 @classmethod
288 def get_from_id_and_query(cls, device_id: str, query) -> ListResponse[Self]:
289 req_filter = query.mongodb_filter()
290 items = []
291 if ":" in query.sort_by:
292 sort_field, sort_order = query.sort_by.split(":")
293 sort_order = int(sort_order)
294 else:
295 sort_field = query.sort_by
296 sort_order = 1
297 collection = get_collection(devices_states_database, device_id)
298 if collection is None:
299 total = 0
300 cursor = []
301 else:
302 total = collection.count_documents(req_filter)
303 cursor = collection.find(req_filter).sort(sort_field, sort_order).skip(query.offset)
304 if (query.limit is not None) and (query.limit != 0):
305 cursor = cursor.limit(query.limit)
306 for item_dict in cursor:
307 items.append(
308 cls(
309 timestamp=item_dict.get("precise_timestamp"),
310 mode=item_dict.get("mode", None),
311 load=item_dict.get("load", None),
312 tokens=item_dict.get("tokens", Field(default_factory=list)),
313 modified_properties=item_dict.get("modified_properties", Field(default_factory=list)),
314 )
315 )
316 return ListResponse(items=items, limit=query.limit, offset=query.offset, sort_by=query.sort_by, total=total)
319class SignalSample(TwinPadModel):
320 signal_id: str
321 timestamp: float
322 value: float | int | str | bool | None
323 forced_value: float | int | str | bool | None = None
325 @classmethod
326 def get_first_from_signal_id(cls, signal_id: str) -> Self | None:
328 collection = get_signal_collection(signal_id)
329 if collection is None:
330 return None
332 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
333 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
334 bucket = get_signal_collection(f"system.buckets.{signal_id}")
335 first_bucket = None
336 if bucket is not None:
337 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
338 if first_bucket is not None:
339 sample_data = collection.find_one(
340 {"precise_timestamp": first_bucket["control"]["min"]["precise_timestamp"]}
341 )
342 else:
343 sample_data = collection.find_one({}, sort=[("precise_timestamp", 1)])
345 if sample_data is None:
346 return None
348 timestamp = sample_data["precise_timestamp"]
350 return cls(
351 signal_id=signal_id,
352 timestamp=timestamp,
353 value=sample_data.get("value", None),
354 forced_value=sample_data.get("forced_value", None),
355 )
357 @classmethod
358 def get_first_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
359 return [cls.get_first_from_signal_id(sid) for sid in signal_ids]
361 @classmethod
362 def get_last_from_signal_id(cls, signal_id: str, device: Device = None) -> Self | None:
364 collection = get_signal_collection(signal_id)
365 if collection is None:
366 return None
368 # This is a workaround using the internal collection associated to a timeseries to find the most/least recent document
369 # found here: https://www.mongodb.com/community/forums/t/time-series-how-to-get-the-most-recent-document/149455/4
370 bucket = get_signal_collection(f"system.buckets.{signal_id}")
371 last_bucket = None
372 if bucket is not None:
373 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
374 if last_bucket is not None:
375 sample_data = collection.find_one({"precise_timestamp": last_bucket["control"]["max"]["precise_timestamp"]})
376 else:
377 sample_data = collection.find_one({}, sort=[("precise_timestamp", -1)])
379 if sample_data is None:
380 return None
382 timestamp = sample_data["precise_timestamp"]
384 if device is None:
385 device = Device.get_one_by_attribute("device_id", signal_id.split(".")[0])
386 if device is not None and device.last_ping is not None:
387 if timestamp is None:
388 timestamp = device.last_ping
389 else:
390 timestamp = max(timestamp, device.last_ping)
391 return cls(
392 signal_id=signal_id,
393 timestamp=timestamp,
394 value=sample_data.get("value", None),
395 forced_value=sample_data.get("forced_value", None),
396 )
398 @classmethod
399 def get_last_from_signal_ids(cls, signal_ids: list[str]) -> Self | None:
400 devices_by_ids = Device.get_multiple_from_signal_ids(signal_ids)
401 return [cls.get_last_from_signal_id(sid, devices_by_ids[sid.split(".")[0]]) for sid in signal_ids]
404class SignalData(TwinPadModel):
405 signal_id: str
406 forcible: bool = True
407 time_vector: list[float]
408 values: list[float | int | str | None]
409 forced_values: list[float | int | str | None]
411 data_start: float | None = None
412 data_end: float | None = None
414 number_samples: int = 0
415 number_samples_db: int = 0
417 db_query_time: float = 0.0
418 init_time: float = 0.0
419 data_processing_time: float = 0.0
421 @classmethod
422 def get_from_signal_id(
423 cls,
424 signal_id: str,
425 min_timestamp: float = None,
426 max_timestamp: float = None,
427 window_min_timestamp: float = None,
428 window_max_timestamp: float = None,
429 interpolate_bounds: bool = True,
430 max_documents: int = None,
431 ) -> Self:
433 now = time.time()
435 req_signal = {}
436 if min_timestamp is not None:
437 req_signal.setdefault("timestamp", {})
438 req_signal["timestamp"]["$gte"] = datetime.datetime.fromtimestamp(min_timestamp)
439 if max_timestamp is not None:
440 req_signal.setdefault("timestamp", {})
441 req_signal["timestamp"]["$lte"] = datetime.datetime.fromtimestamp(max_timestamp)
443 collection = get_signal_collection(signal_id)
444 if collection is None:
445 return cls(
446 signal_id=signal_id, time_vector=[], values=[], forced_values=[], number_samples=0, number_samples_db=0
447 )
449 db_req_start = time.time()
451 sort_step = {"$sort": {"precise_timestamp": 1}}
452 number_results = collection.count_documents(req_signal)
454 pipeline = []
455 if req_signal:
456 pipeline.append({"$match": req_signal}) # Filter data if needed
458 pipeline.extend(
459 [
460 {"$project": {"precise_timestamp": 1, "value": 1, "forced_value": 1, "_id": 0}},
461 sort_step,
462 ]
463 )
465 if max_documents is not None and max_documents < number_results:
466 unsampling_ratio = math.ceil(number_results / max_documents)
467 logger.info("Unsampling ratio: %d (number_results: %d)", unsampling_ratio, number_results)
468 pipeline.extend(
469 [
470 {
471 "$setWindowFields": {
472 "sortBy": {"precise_timestamp": 1},
473 "output": {"index": {"$documentNumber": {}}},
474 }
475 },
476 {"$set": {"group_id": {"$floor": {"$divide": ["$index", unsampling_ratio]}}}},
477 {"$group": {"_id": "$group_id", "doc": {"$first": "$$ROOT"}}},
478 {"$replaceRoot": {"newRoot": "$doc"}},
479 {"$unset": ["index", "group_id"]},
480 {"$sort": {"precise_timestamp": 1}},
481 ]
482 )
484 # logger.info(f"pipeline: %s", str(pipeline))
485 cursor = collection.aggregate(pipeline)
486 db_req_time = time.time() - db_req_start
488 init_time = time.time()
490 results = cursor.to_list()
491 time_vector = []
492 values = []
493 forced_values = []
494 for s in results:
495 time_vector.append(s["precise_timestamp"])
496 values.append(s.get("value", None))
497 forced_values.append(s.get("forced_value", None))
499 signal = Signal.get_from_signal_id(signal_id)
500 class_ = signal.signal_data_class
502 if interpolate_bounds and window_max_timestamp is not None and window_min_timestamp is not None:
503 time_vector, values, forced_values = cls.interpolate_bounds(
504 class_,
505 collection,
506 signal_id,
507 time_vector,
508 values,
509 forced_values,
510 window_min_timestamp,
511 window_max_timestamp,
512 )
514 if values:
515 # TODO: check below. a bit strange
516 if ((max_timestamp is None) or (max_timestamp >= now)) and (now - time_vector[-1]) > DEVICE_TIMEOUT:
517 # Adding last value as it should be repeated
518 time_vector.append(now)
519 values.append(values[-1])
520 forced_values.append(forced_values[-1])
522 init_time = time.time() - init_time
524 # See line 292 for explanation
525 bucket = get_signal_collection(f"system.buckets.{signal_id}")
526 first_bucket = None
527 if bucket is not None:
528 first_bucket = bucket.find_one({}, sort={"control.min.timestamp": 1})
529 if first_bucket is not None:
530 data_start = first_bucket["control"]["min"]["precise_timestamp"]
531 else:
532 data_start = None
534 last_bucket = None
535 if bucket is not None:
536 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
537 if last_bucket is not None:
538 data_end = last_bucket["control"]["max"]["precise_timestamp"]
539 else:
540 data_end = None
542 return class_(
543 signal_id=signal_id,
544 forcible=signal.forcible,
545 time_vector=time_vector,
546 values=values,
547 forced_values=forced_values,
548 data_start=data_start,
549 data_end=data_end,
550 number_samples=len(values),
551 number_samples_db=number_results,
552 db_query_time=db_req_time,
553 init_time=init_time,
554 )
556 @staticmethod
557 def interpolate_bounds(
558 class_, collection, signal_id, time_vector, values, forced_values, window_min_timestamp, window_max_timestamp
559 ):
560 sample_right = None
561 # Fetching right side value & interpolation
562 window_right_limit = window_max_timestamp - (window_max_timestamp - window_min_timestamp) / 5
563 if window_max_timestamp is not None and (not time_vector or time_vector[-1] < window_right_limit):
564 sample_right = collection.find_one(
565 {
566 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_max_timestamp)},
567 "value": {"$exists": True},
568 },
569 sort=[("precise_timestamp", -1)],
570 )
571 if sample_right:
572 if time_vector:
573 right_sd = class_(
574 signal_id=signal_id,
575 time_vector=[time_vector[-1], sample_right["precise_timestamp"]],
576 values=[values[-1], sample_right.get("value", None)],
577 forced_values=[forced_values[-1], sample_right.get("forced_value", None)],
578 )
579 max_ts_value = right_sd.interpolate_values([window_max_timestamp])[0]
580 max_ts_forced_value = right_sd.interpolate_forced_values([window_max_timestamp])[0]
581 else:
582 max_ts_value = sample_right.get("value", None)
583 max_ts_forced_value = sample_right.get("forced_value", None)
584 time_vector.append(window_max_timestamp)
585 values.append(max_ts_value)
586 forced_values.append(max_ts_forced_value)
588 # Fetching left side value & interpolation
589 window_left_limit = window_min_timestamp + (window_max_timestamp - window_min_timestamp) / 5
590 if window_min_timestamp and (not time_vector or time_vector[0] > window_left_limit):
591 if sample_right is not None and sample_right["precise_timestamp"] < window_min_timestamp:
592 sample_left = sample_right
593 sample_left = collection.find_one(
594 {
595 "timestamp": {"$lte": datetime.datetime.fromtimestamp(window_min_timestamp)},
596 "value": {"$exists": True},
597 },
598 sort=[("precise_timestamp", -1)],
599 )
601 if sample_left:
602 if time_vector:
603 left_sd = class_(
604 signal_id=signal_id,
605 time_vector=[sample_left["precise_timestamp"], time_vector[0]],
606 values=[sample_left["value"], values[0]],
607 forced_values=[sample_left.get("forced_value", None), forced_values[0]],
608 )
609 min_ts_value = left_sd.interpolate_values([window_min_timestamp])[0]
610 min_ts_forced_value = left_sd.interpolate_forced_values([window_min_timestamp])[0]
611 else:
612 min_ts_value = sample_left.get("value", None)
613 min_ts_forced_value = sample_left.get("forced_value", None)
614 time_vector.insert(0, window_min_timestamp)
615 values.insert(0, min_ts_value)
616 forced_values.insert(0, min_ts_forced_value)
618 return time_vector, values, forced_values
620 def interpolate_values(self, new_time_vector: list[float]):
621 return self.interpolate(new_time_vector, self.values)
623 def interpolate_forced_values(self, new_time_vector: list[float]):
624 return self.interpolate(new_time_vector, self.forced_values)
626 def uniform_desampling(self, number_samples_max: int) -> Self:
627 data_processing_time = time.time()
628 if number_samples_max and self.number_samples > number_samples_max:
629 new_time_vector = npy.linspace(
630 self.time_vector[0], self.time_vector[-1], number_samples_max, endpoint=False
631 ).tolist()
632 values = self.interpolate_values(new_time_vector)
633 forced_values = self.interpolate_forced_values(new_time_vector)
634 time_vector = new_time_vector
635 number_samples = len(time_vector)
636 else:
637 time_vector = self.time_vector
638 number_samples = len(self.values)
639 values = self.values[:]
640 forced_values = self.forced_values[:]
641 data_processing_time = time.time() - data_processing_time
643 return self.__class__(
644 signal_id=self.signal_id,
645 time_vector=time_vector,
646 values=values,
647 forced_values=forced_values,
648 number_samples=number_samples,
649 number_samples_db=self.number_samples,
650 data_start=self.data_start,
651 data_end=self.data_end,
652 db_query_time=self.db_query_time,
653 init_time=self.init_time,
654 data_processing_time=self.data_processing_time + data_processing_time,
655 )
657 def interest_window_desampling(
658 self,
659 window_max_number_samples: int,
660 outside_max_number_samples: int,
661 window_min_timestamp: float | None = None,
662 window_max_timestamp: float | None = None,
663 ) -> Self:
664 """Performs a sampling in a window of interest and outside."""
666 if not self.time_vector:
667 return self
669 if window_min_timestamp is None:
670 window_min_timestamp = self.time_vector[0]
671 if window_max_timestamp is None:
672 window_max_timestamp = self.time_vector[-1]
674 data_processing_time = time.time()
676 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
677 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
679 time_vector_before = self.time_vector[:index_window_start]
680 time_vector_window = self.time_vector[index_window_start:index_window_end]
681 time_vector_after = self.time_vector[index_window_end:]
683 # Resampling window
684 if time_vector_window:
685 # Ensurring window bounds
686 if time_vector_window[0] != window_min_timestamp:
687 time_vector_window.insert(0, window_min_timestamp)
688 if time_vector_window[-1] != window_max_timestamp:
689 time_vector_window.append(window_max_timestamp)
690 else:
691 time_vector_window = [window_min_timestamp, window_max_timestamp]
693 if len(time_vector_window) > window_max_number_samples:
694 # Resampling
695 new_window_time_vector = npy.linspace(
696 time_vector_window[0], time_vector_window[-1], window_max_number_samples, endpoint=True
697 ).tolist()
698 time_vector_window = new_window_time_vector
700 # Resampling outside
701 number_samples_before = len(time_vector_before)
702 number_samples_after = len(time_vector_after)
703 if (number_samples_before + number_samples_after) > outside_max_number_samples:
704 new_number_samples_before = math.ceil(
705 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
706 )
707 new_number_samples_after = math.ceil(
708 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
709 )
710 # Adjusting numbers as math.ceil can do +1 on sum
711 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
712 if new_number_samples_before > new_number_samples_after:
713 new_number_samples_before -= 1
714 else:
715 new_number_samples_after -= 1
717 if new_number_samples_before:
718 new_time_vector_before = npy.linspace(
719 time_vector_before[0], time_vector_before[-1], new_number_samples_before, endpoint=False
720 ).tolist()
721 time_vector_before = new_time_vector_before
723 if number_samples_after:
724 new_time_vector_after = npy.linspace(
725 time_vector_after[-1], time_vector_after[0], new_number_samples_after, endpoint=False
726 ).tolist()[::-1]
727 time_vector_after = new_time_vector_after
729 new_time_vector = time_vector_before + time_vector_window + time_vector_after
730 values = self.interpolate_values(new_time_vector)
731 forced_values = self.interpolate_forced_values(new_time_vector)
732 number_samples = len(values)
734 data_processing_time = time.time() - data_processing_time
736 return self.__class__(
737 signal_id=self.signal_id,
738 forcible=self.forcible,
739 time_vector=new_time_vector,
740 values=values,
741 forced_values=forced_values,
742 number_samples=number_samples,
743 number_samples_db=self.number_samples,
744 data_start=self.data_start,
745 data_end=self.data_end,
746 db_query_time=self.db_query_time,
747 init_time=self.init_time,
748 data_processing_time=self.data_processing_time + data_processing_time,
749 )
751 def csv_export(self):
752 output = io.StringIO()
753 writer = csv.writer(output)
754 writer.writerow(["timestamp", "value", "forced_value"]) # Write header
755 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
756 writer.writerow([get_utc_date_from_timestamp(ts), value, forced_value])
757 return output.getvalue().encode("utf-8")
759 def prestoplot_export(self):
760 clean_signal_id = self.signal_id.replace(".", "_")
761 if clean_signal_id[0].isnumeric():
762 clean_signal_id = "_" + clean_signal_id
764 output = io.StringIO()
765 output.write("# Encoding:\tUTF-8\n")
766 output.write(f"t\t{clean_signal_id}\t{clean_signal_id}_forced_value\n")
767 output.write("ISO8601\tnone\tnone\n")
768 output.write(f"# Description :\t{clean_signal_id}\n")
770 for ts, value, forced_value in zip(self.time_vector, self.values, self.forced_values):
771 output.write(
772 f"{get_utc_date_from_timestamp(ts)}\t{value if value is not None else 'none'}\t{forced_value if forced_value is not None else 'none'}\n"
773 )
774 return output.getvalue().encode("utf-8")
777class NumericSignalData(SignalData):
778 data_type: str = "float"
779 values: list[float | int | None]
780 forced_values: list[float | int | None]
782 def interpolate(self, new_time_vector: list[float], items):
783 items = [npy.nan if s is None else s for s in items]
784 return [None if npy.isnan(s) else s for s in npy.interp(new_time_vector, self.time_vector, items)]
786 def uniform_desampling(self, number_samples_max: int) -> Self:
787 data_processing_time = time.time()
788 if number_samples_max and self.number_samples > number_samples_max:
789 time_vector, values = downsample_list(self.time_vector, self.values, number_samples_max)
790 forced_values = self.interpolate_forced_values(time_vector)
791 number_samples = len(time_vector)
792 else:
793 time_vector = self.time_vector
794 number_samples = len(self.values)
795 values = self.values[:]
796 forced_values = self.forced_values[:]
797 data_processing_time = time.time() - data_processing_time
799 return self.__class__(
800 signal_id=self.signal_id,
801 time_vector=time_vector,
802 values=values,
803 forced_values=forced_values,
804 number_samples=number_samples,
805 number_samples_db=self.number_samples,
806 data_start=self.data_start,
807 data_end=self.data_end,
808 db_query_time=self.db_query_time,
809 init_time=self.init_time,
810 data_processing_time=self.data_processing_time + data_processing_time,
811 )
813 def interest_window_desampling(
814 self,
815 window_max_number_samples: int,
816 outside_max_number_samples: int,
817 window_min_timestamp: float | None = None,
818 window_max_timestamp: float | None = None,
819 ) -> Self:
820 """Performs a sampling in a window of interest and outside."""
822 if not self.time_vector:
823 return self
825 if window_min_timestamp is None:
826 window_min_timestamp = self.time_vector[0]
827 if window_max_timestamp is None:
828 window_max_timestamp = self.time_vector[-1]
830 data_processing_time = time.time()
832 index_window_start = bisect.bisect_left(self.time_vector, window_min_timestamp)
833 index_window_end = bisect.bisect_right(self.time_vector, window_max_timestamp)
835 time_vector_before = self.time_vector[:index_window_start]
836 time_vector_window = self.time_vector[index_window_start:index_window_end]
837 time_vector_after = self.time_vector[index_window_end:]
839 values_before = self.values[:index_window_start]
840 values_window = self.values[index_window_start:index_window_end]
841 values_after = self.values[index_window_end:]
842 window_min_value = self.interpolate_values([window_min_timestamp])[0]
843 window_max_value = self.interpolate_values([window_max_timestamp])[0]
845 # Resampling window
846 if time_vector_window:
847 # Ensurring window bounds
848 if time_vector_window[0] != window_min_timestamp:
849 time_vector_window.insert(0, window_min_timestamp)
850 values_window.insert(0, window_min_value)
851 if time_vector_window[-1] != window_max_timestamp:
852 time_vector_window.append(window_max_timestamp)
853 values_window.append(window_max_value)
854 else:
855 time_vector_window = [window_min_timestamp, window_max_timestamp]
856 values_window = [window_min_value, window_max_value]
858 if len(time_vector_window) > window_max_number_samples:
859 # Resampling
860 time_vector_window, values_window = downsample_list(
861 time_vector_window, values_window, window_max_number_samples
862 )
864 # Resampling outside
865 number_samples_before = len(time_vector_before)
866 number_samples_after = len(time_vector_after)
867 if (number_samples_before + number_samples_after) > outside_max_number_samples:
868 new_number_samples_before = math.ceil(
869 outside_max_number_samples * number_samples_before / (number_samples_before + number_samples_after)
870 )
871 new_number_samples_after = math.ceil(
872 outside_max_number_samples * number_samples_after / (number_samples_before + number_samples_after)
873 )
874 # Adjusting numbers as math.ceil can do +1 on sum
875 if (new_number_samples_before + new_number_samples_after) > outside_max_number_samples:
876 if new_number_samples_before > new_number_samples_after:
877 new_number_samples_before -= 1
878 else:
879 new_number_samples_after -= 1
881 if new_number_samples_before:
882 time_vector_before, values_before = downsample_list(
883 time_vector_before, values_before, new_number_samples_before
884 )
886 if number_samples_after:
887 time_vector_after, values_after = downsample_list(
888 time_vector_after, values_after, new_number_samples_after
889 )
891 new_time_vector = time_vector_before + time_vector_window + time_vector_after
892 values = values_before + values_window + values_after
893 forced_values = self.interpolate_forced_values(new_time_vector)
894 number_samples = len(values)
896 data_processing_time = time.time() - data_processing_time
898 return self.__class__(
899 signal_id=self.signal_id,
900 time_vector=new_time_vector,
901 values=values,
902 forced_values=forced_values,
903 number_samples=number_samples,
904 number_samples_db=self.number_samples,
905 data_start=self.data_start,
906 data_end=self.data_end,
907 db_query_time=self.db_query_time,
908 init_time=self.init_time,
909 data_processing_time=self.data_processing_time + data_processing_time,
910 )
913class StringSignalData(SignalData):
914 data_type: str = "str"
915 values: list[str | None]
916 forced_values: list[str | None]
918 def interpolate(self, new_time_vector: list[float], items):
919 # Find the indices of the values in xp that are just smaller or equal to x
920 indices = npy.searchsorted(self.time_vector, new_time_vector, side="right") - 1
921 indices = npy.clip(indices, 0, len(items) - 1)
922 # Return the corresponding left string values from fp
923 return [items[i] for i in indices]
926class SignalsData(TwinPadModel):
927 signals_data: list[NumericSignalData | StringSignalData | SignalData]
928 data_processing_time: float
929 data_start: float | None
930 data_end: float | None
932 @classmethod
933 def get_from_signal_ids(
934 cls,
935 signal_ids: list[str],
936 min_timestamp: float = None,
937 max_timestamp: float = None,
938 window_min_timestamp: float = None,
939 window_max_timestamp: float = None,
940 interpolate_bounds: bool = True,
941 max_documents: int = None,
942 ) -> Self:
943 signals_data = []
944 data_start = None
945 data_end = None
946 if max_timestamp is None:
947 max_timestamp = time.time()
948 data_processing_time = 0.0
949 for signal_id in signal_ids:
950 signal_data = SignalData.get_from_signal_id(
951 signal_id=signal_id,
952 min_timestamp=min_timestamp,
953 max_timestamp=max_timestamp,
954 window_min_timestamp=window_min_timestamp,
955 window_max_timestamp=window_max_timestamp,
956 interpolate_bounds=interpolate_bounds,
957 max_documents=max_documents,
958 )
959 data_processing_time += signal_data.data_processing_time
960 signals_data.append(signal_data)
961 if signal_data.data_start is not None:
962 if data_start is None:
963 data_start = signal_data.data_start
964 else:
965 data_start = min(signal_data.data_start, data_start)
966 if signal_data.data_end is not None:
967 if data_end is None:
968 data_end = signal_data.data_end
969 else:
970 data_end = max(signal_data.data_end, data_end)
972 return cls(
973 signals_data=signals_data,
974 data_processing_time=data_processing_time,
975 data_start=data_start,
976 data_end=data_end,
977 )
979 def uniform_desampling(self, number_samples_max: int) -> Self:
980 signals_data = [s.uniform_desampling(number_samples_max=number_samples_max) for s in self.signals_data]
981 return SignalsData(
982 signals_data=signals_data,
983 data_processing_time=sum(s.data_processing_time for s in signals_data),
984 data_start=self.data_start,
985 data_end=self.data_end,
986 )
988 def interest_window_desampling(
989 self,
990 window_max_number_samples: int,
991 outside_max_number_samples: int,
992 window_min_timestamp: float = None,
993 window_max_timestamp: float = None,
994 ) -> Self:
995 signals_data = [
996 s.interest_window_desampling(
997 window_max_number_samples=window_max_number_samples,
998 outside_max_number_samples=outside_max_number_samples,
999 window_min_timestamp=window_min_timestamp,
1000 window_max_timestamp=window_max_timestamp,
1001 )
1002 for s in self.signals_data
1003 ]
1005 return SignalsData(
1006 signals_data=signals_data,
1007 data_processing_time=sum(s.data_processing_time for s in signals_data),
1008 data_start=self.data_start,
1009 data_end=self.data_end,
1010 )
1012 def zip_export(self, file_format: str = "csv"):
1013 # return self.signals_data[0].csv_export()
1014 zip_buffer = io.BytesIO()
1015 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
1016 for signal_data in self.signals_data:
1017 if file_format == "csv":
1018 export_io = signal_data.csv_export()
1019 zip_file.writestr(f"{signal_data.signal_id}.csv", export_io)
1020 elif file_format == "prestoplot":
1021 export_io = signal_data.prestoplot_export()
1022 zip_file.writestr(f"{signal_data.signal_id}.tab", export_io)
1023 else:
1024 raise ValueError(f"Format not found. Got: {file_format}")
1025 zip_bytes = zip_buffer.getvalue()
1026 # zip_bytes.seek(0)
1027 return zip_bytes
1030class SignalStatus(TwinPadModel):
1031 status: str
1032 reason: str
1033 delay: float | None
1036class DigitizationFunction(TwinPadModel):
1037 bits: int | None = None
1038 min_value: float
1039 max_value: float
1040 min_raw_value: float
1041 max_raw_value: float
1044class SignalUpdate(TwinPadModel):
1045 value: float | str | bool | int | None = None
1046 forced_value: float | str | bool | int | None = None
1047 timestamp: int | None = None
1050class SignalType(str, Enum):
1051 command = "command"
1052 sensor = "sensor"
1053 external_sensor = "external_sensor"
1056SIGNALDATA_TYPES = {
1057 "int": NumericSignalData,
1058 "float": NumericSignalData,
1059 "str": StringSignalData,
1060 "bool": NumericSignalData,
1061 "epoch": NumericSignalData,
1062}
1065class Signal(GenericMongo):
1066 collection_name: ClassVar[str] = "signals"
1068 signal_id: str
1069 frequency: float
1070 unit: str | None
1071 description: str
1072 type: SignalType
1073 data_type: str
1074 precision_digits: int | None
1075 forcible: bool
1077 digitization_function: DigitizationFunction | None
1079 @property
1080 def device(self) -> Device:
1081 device_id = self.signal_id.split(".")[0]
1082 device = Device.get_one_by_attribute("device_id", device_id)
1083 return device
1085 @cached_property
1086 def signal_data_class(self):
1087 if self.data_type in SIGNALDATA_TYPES:
1088 return SIGNALDATA_TYPES[self.data_type]
1089 if self.data_type.startswith("enum"):
1090 return SIGNALDATA_TYPES[self.data_type.split(",")[0].replace("enum(", "").lstrip(" ")]
1091 raise ValueError(f"Unhandled python type: {self.data_type}")
1093 @cached_property
1094 def python_type(self):
1095 if self.data_type in TYPES:
1096 return TYPES[self.data_type]
1097 if self.data_type.startswith("enum"):
1098 choices = eval(",".join(self.data_type.split(",")[1:]).rstrip(")"))
1099 return Literal[*choices]
1100 raise ValueError(f"Unhandled python type: {self.data_type}")
1102 @computed_field
1103 @property
1104 def status(self) -> SignalStatus:
1105 now = time.time()
1106 status = "up"
1107 reason = ""
1109 # See line 292 for explanation
1110 bucket = get_signal_collection(f"system.buckets.{self.signal_id}")
1111 last_bucket = None
1112 if bucket is not None:
1113 last_bucket = bucket.find_one({}, sort={"control.max.timestamp": -1})
1114 if last_bucket is None:
1115 status = "no data"
1116 reason = "signal does not exist"
1117 return SignalStatus(status=status, reason=reason, delay=None)
1119 try:
1120 last_date = last_bucket["control"]["max"]["timestamp"]
1121 last_date = last_date.replace(tzinfo=pytz.UTC)
1122 last_value_ts = last_date.timestamp()
1123 except IndexError:
1124 last_value_ts = None
1126 if last_value_ts is None:
1127 delay = None
1128 reason = "No data from signal"
1129 else:
1130 # Since device is a computed property, only compute it once
1131 device = self.device
1132 if device is not None and device.last_ping is not None:
1133 last_value_ts = max(last_value_ts, device.last_ping)
1134 delay = now - last_value_ts
1135 if delay > DEVICE_TIMEOUT:
1136 status = "down"
1137 reason = f"delay of {delay:.{3}f} seconds (timeout of {DEVICE_TIMEOUT} seconds) "
1138 return SignalStatus(status=status, reason=reason, delay=delay)
1140 def update(self, update_dict: SignalUpdate):
1141 send_signal_value(self.signal_id, update_dict)
1143 @classmethod
1144 def get_from_signal_id(cls, signal_id) -> Self:
1145 """Could be generic from mongo"""
1146 raw_value = get_collection(systems_database, "signals", create=True).find_one({"signal_id": signal_id})
1147 if not raw_value:
1148 return None
1149 del raw_value["_id"]
1150 return cls.dict_to_object(raw_value)
1152 @classmethod
1153 def get_all_ids(cls) -> list[str]:
1154 cursor = cls.collection().aggregate([{"$project": {"signal_id": 1, "_id": 0}}, {"$sort": {"signal_id": 1}}])
1156 return [signal["signal_id"] for signal in cursor]
1158 async def number_samples(self):
1159 collection = get_signal_collection(signal_id=self.signal_id)
1160 if collection is None:
1161 return 0
1163 number_samples = collection.estimated_document_count()
1165 number_samples_async_collection = await get_async_collection(
1166 systems_async_database, "number_samples", create=True, time_series=True
1167 )
1169 loop = asyncio.get_running_loop()
1170 loop.create_task(
1171 number_samples_async_collection.insert_one(
1172 {
1173 "timestamp": datetime.datetime.now(pytz.UTC),
1174 "signal_id": self.signal_id,
1175 "number_samples": number_samples,
1176 }
1177 )
1178 )
1180 return number_samples
1182 def sample_datasize(self):
1183 return signals_database.command("collstats", self.signal_id)["size"]
1185 @classmethod
1186 def get_forcibility(cls, signal_ids: list[str]) -> dict[str, bool]:
1187 result = cls.collection().aggregate(
1188 [{"$match": {"signal_id": {"$in": signal_ids}}}, {"$project": {"_id": 0, "signal_id": 1, "forcible": 1}}]
1189 )
1191 return {signal["signal_id"]: signal["forcible"] for signal in result}
1194class ServicesStatus(TwinPadModel):
1195 backend: str
1196 cloud_broker: str
1197 time_series_database: str
1198 signal_storage: str
1199 heartbeat_storage: str
1200 data_analyzer: str
1202 @classmethod
1203 def check(cls) -> Self:
1204 return cls(
1205 cloud_broker=ping(RABBITMQ_HOST),
1206 backend="up",
1207 time_series_database=ping(MONGO_HOST),
1208 signal_storage=ping(SIGNAL_STORAGE_HOST),
1209 heartbeat_storage=ping(HEARTBEAT_STORAGE_HOST),
1210 data_analyzer=ping(DATA_ANALYZER_HOST),
1211 )
1214def ping(host):
1215 try:
1216 if ping3.ping(host, timeout=0.8):
1217 return "up"
1218 except PermissionError:
1219 pass
1220 return "down"
1223class Event(GenericMongo):
1224 collection_name: ClassVar[str] = "events"
1226 name: str
1227 timestamp: float
1228 event_rule_id: str
1230 @computed_field
1231 @cached_property
1232 def event_rule(self) -> "EventRule":
1233 return EventRule.get_from_id(self.event_rule_id)
1235 @classmethod
1236 def dict_to_object(cls, dict_):
1237 """Refine to convert timestamp to datetime for mongodb."""
1238 dict_["timestamp"] = datetime.datetime.timestamp(dict_["timestamp"])
1239 return super().dict_to_object(dict_)
1242class EventDay(GenericMongo):
1243 collection_name: ClassVar[str] = "number_events"
1245 timestamp: float
1246 number_events: int
1248 @classmethod
1249 def get_number_events_timeframe(cls, min_timestamp, max_timestamp, recompute_events) -> list[Self]:
1250 ONE_DAY_OFFSET = datetime.timedelta(days=1)
1251 number_events_collection = get_collection(systems_database, "number_events")
1252 events_collection = get_collection(systems_database, "events", create=True, time_series=True)
1253 items = []
1254 TODAY = datetime.datetime.combine(datetime.datetime.today(), datetime.time.min).replace(tzinfo=pytz.UTC)
1255 if number_events_collection is None or recompute_events:
1256 number_events_collection = get_collection(systems_database, "number_events", create=True, time_series=True)
1257 first_event = events_collection.find_one(sort={"timestamp": 1})
1258 if first_event is None:
1259 return items
1260 # compute from day of first found event
1261 last_computed_day = datetime.datetime.combine(first_event["timestamp"], datetime.time.min).replace(
1262 tzinfo=pytz.UTC
1263 )
1264 while last_computed_day < TODAY:
1265 day_nb_events = events_collection.count_documents(
1266 {"timestamp": {"$gte": last_computed_day, "$lt": last_computed_day + ONE_DAY_OFFSET}}
1267 )
1268 if day_nb_events > 0:
1269 number_events_collection.insert_one(
1270 {"timestamp": last_computed_day, "number_events": day_nb_events}
1271 )
1272 last_computed_day += ONE_DAY_OFFSET
1273 number_events_today = events_collection.count_documents({"timestamp": {"$gte": TODAY}})
1274 if number_events_today > 0:
1275 number_events_collection.delete_one({"timestamp": TODAY})
1276 number_events_collection.insert_one({"timestamp": TODAY, "number_events": number_events_today})
1277 data_start = datetime.datetime.fromtimestamp(min_timestamp).replace(tzinfo=pytz.UTC)
1278 data_end = datetime.datetime.fromtimestamp(max_timestamp).replace(tzinfo=pytz.UTC)
1279 number_events = number_events_collection.find({"timestamp": {"$gte": data_start, "$lte": data_end}})
1280 for day in number_events:
1281 day["timestamp"] = day["timestamp"].timestamp()
1282 items.append(cls.mongo_dict_to_object(day))
1283 return items
1286class EventRule(GenericMongo):
1287 collection_name: ClassVar[str] = "event_rules"
1289 name: str
1290 formula: str
1291 variables: list[str]
1293 @computed_field
1294 @cached_property
1295 def number_events(self) -> int:
1296 return get_collection(systems_database, "events").count_documents({"event_rule_id": self.id})
1299class Company(GenericMongo):
1300 collection_name: ClassVar[str] = "companies"
1301 name: str
1304class User(GenericMongo):
1305 collection_name: ClassVar[str] = "users"
1307 firstname: str
1308 lastname: str
1309 email: str
1310 password: str
1311 is_active: bool | None = False
1312 is_admin: bool | None = False
1313 is_connected: bool | None = False
1314 company_id: str | None = None
1316 def to_dict(self, exclude=None):
1317 if exclude is None:
1318 exclude = {"password"}
1319 return GenericMongo.to_dict(self, exclude=exclude)
1321 @classmethod
1322 def create(cls, firstname: str, lastname: str, email: str, password: str, is_admin: bool | None = False):
1323 users = cls.get_all()
1324 if not users:
1325 is_admin = True
1326 user = User(firstname=firstname, lastname=lastname, email=email, password=password, is_admin=is_admin)
1327 user_collection = get_collection(systems_database, "users", create=True)
1328 new_user = user_collection.insert_one(user.model_dump(exclude={"id"}))
1329 if new_user is None:
1330 return None
1331 return {"user_id": str(new_user.inserted_id)}
1333 @classmethod
1334 def update(cls, user: "UserUpdate", user_id: str):
1335 updated_user = cls.collection().find_one_and_update(
1336 {"_id": ObjectId(user_id)},
1337 {"$set": {k: v for k, v in user.model_dump(exclude={"id"}).items() if v is not None}},
1338 return_document=ReturnDocument.AFTER,
1339 )
1340 updated_user["id"] = str(updated_user["_id"])
1341 del (updated_user["_id"], updated_user["is_connected"])
1342 return cls(**updated_user)
1345UserUpdate = create_update_model(User)
1348class Campaign(GenericMongo):
1349 collection_name: ClassVar[str] = "campaigns"
1351 # Properties
1352 id: str | None = None
1353 name: str
1354 description: str | None = None
1356 @classmethod
1357 def create(cls, campaign: Self):
1358 new_campaign = cls.collection().insert_one(campaign.model_dump(exclude={"id"}))
1359 if new_campaign is None:
1360 return None
1361 return {"campaign_id": str(new_campaign.inserted_id)}
1363 @classmethod
1364 def update(cls, campaign: Self):
1365 updated_campaign = cls.collection().find_one_and_update(
1366 {"_id": ObjectId(campaign.id)},
1367 {"$set": {"name": campaign.name, "description": campaign.description}},
1368 return_document=ReturnDocument.AFTER,
1369 )
1370 return updated_campaign
1372 @classmethod
1373 def delete(cls, campaign_id):
1374 deleted_user = cls.collection().delete_one({"_id": ObjectId(campaign_id)})
1375 return deleted_user
1378class Phase(GenericMongo):
1379 collection_name: ClassVar[str] = "phases"
1381 # Properties
1382 id: str | None = None
1383 name: str
1384 description: str | None = None
1385 start_at: float
1386 end_at: float
1388 # FK
1389 campaign_id: str
1391 # @classmethod
1392 # def get_by_date(cls, datetime: float):
1393 # phases = []
1394 # for dict_ in cls.collection.find({"start_at": datetime}).sort("name", ASCENDING):
1395 # phases.append(cls.dict_to_object(dict_).model_dump())
1396 # for dict_ in cls.collection.find({"end_at": datetime}).sort("name", ASCENDING):
1397 # phases.append(cls.dict_to_object(dict_).model_dump())
1398 # if phases is None:
1399 # return None
1400 # return phases
1402 @classmethod
1403 def create(cls, phase: Self):
1404 phase = Phase(
1405 name=phase.name,
1406 description=phase.description,
1407 start_at=phase.start_at,
1408 end_at=phase.end_at,
1409 campaign_id=phase.campaign_id,
1410 )
1411 phase_collection = get_collection(systems_database, "phases", create=True)
1412 new_phase = phase_collection.insert_one(phase.model_dump(exclude={"id"}))
1413 if new_phase is None:
1414 return None
1415 return {"phase_id": str(new_phase.inserted_id)}
1417 @classmethod
1418 def update(cls, phase: Self):
1419 updated_phase = cls.collection().find_one_and_update(
1420 {"_id": ObjectId(phase.id)},
1421 {
1422 "$set": {
1423 "name": phase.name,
1424 "description": phase.description,
1425 "start_at": phase.start_at,
1426 "end_at": phase.end_at,
1427 }
1428 },
1429 return_document=ReturnDocument.AFTER,
1430 )
1431 return updated_phase
1433 @classmethod
1434 def delete(cls, phase_id):
1435 delete_phase = cls.collection().delete_one({"_id": ObjectId(phase_id)})
1436 return delete_phase
1438 @classmethod
1439 def deleteMany(cls, campaign_id):
1440 delete_phases = cls.collection().delete_many({"campaign_id": campaign_id})
1441 return delete_phases
1444class CustomViewCreation(GenericMongo):
1445 collection_name: ClassVar[str] = "custom_views"
1447 name: str
1448 configuration: list
1451class CustomView(CustomViewCreation):
1452 # Properties
1453 id: str | None = None
1455 # Foreign Key
1456 user_id: str
1458 # # Methods
1459 # @classmethod
1460 # def create(cls, form_custom_view: Self, user_id) -> list:
1461 # custom_view = CustomView(
1462 # name=form_custom_view.name, configuration=form_custom_view.configuration, user_id=user_id
1463 # )
1464 # new_custom_view = cls.collection().insert_one(custom_view.model_dump(exclude={"id"}))
1465 # return new_custom_view
1467 # @classmethod
1468 # def update(cls, custom_view: Self, custom_view_id: str) -> Self:
1469 # updated_custom_view = cls.collection().find_one_and_update(
1470 # {"_id": ObjectId(custom_view_id)},
1471 # {"$set": {k: v for k, v in custom_view.model_dump(exclude={"id"}).items() if v is not None}},
1472 # return_document=ReturnDocument.AFTER,
1473 # )
1474 # updated_custom_view["id"] = str(updated_custom_view["_id"])
1475 # del updated_custom_view["_id"]
1476 # return cls(**updated_custom_view)
1478 # @classmethod
1479 # def delete(cls, custom_view_id) -> bool:
1480 # deleted_custom_view = cls.collection().delete_one({"_id": ObjectId(custom_view_id)})
1481 # return deleted_custom_view.acknowledged
1484CustomViewUpdate = create_update_model(CustomView)
1487class Video(GenericMongo):
1488 collection_name: ClassVar[str] = "videos"
1490 # Properties
1491 name: str
1492 ip_addr: str
1493 username: str | None = None
1494 password: str | None = None
1496 # Methods
1497 @classmethod
1498 def get_all(cls, sort_by="_id") -> list[Self]:
1499 items = []
1500 for dict_ in cls.collection().find({}, {"name": 1, "ip_addr": 1}).sort(sort_by, ASCENDING):
1501 items.append(cls.mongo_dict_to_object(dict_))
1502 return items
1504 @classmethod
1505 def get_video(cls, camera_id: ObjectId):
1506 camera = cls.get_from_id(camera_id)
1507 return camera.name