Skip to content

dp3.database.database

EntityDatabase

EntityDatabase(config: HierarchicalDict, model_spec: ModelSpec, num_processes: int, process_index: int = 0, elog: Optional[EventGroupType] = None)

MongoDB database wrapper responsible for whole communication with database server. Initializes database schema based on database configuration.

Parameters:

Name Type Description Default
config HierarchicalDict

configuration of database connection (content of database.yml)

required
model_spec ModelSpec

ModelSpec object, configuration of data model (entities and attributes)

required
process_index int

index of worker process - used for sharding metadata

0
num_processes int

number of worker processes

required
Source code in dp3/database/database.py
def __init__(
    self,
    config: HierarchicalDict,
    model_spec: ModelSpec,
    num_processes: int,
    process_index: int = 0,
    elog: Optional[EventGroupType] = None,
) -> None:
    self.log = logging.getLogger("EntityDatabase")
    self.elog = elog or DummyEventGroup()

    db_config = MongoConfig.model_validate(config.get("database", {}))

    self.log.info("Connecting to database...")
    for attempt, delay in enumerate(RECONNECT_DELAYS):
        try:
            self._db = self.connect(db_config)
            # Check if connected
            self._db.admin.command("ping")
        except pymongo.errors.ConnectionFailure as e:
            if attempt + 1 == len(RECONNECT_DELAYS):
                raise DatabaseError(
                    "Cannot connect to database with specified connection arguments."
                ) from e
            else:
                self.log.error(
                    "Cannot connect to database (attempt %d, retrying in %ds).",
                    attempt + 1,
                    delay,
                )
                time.sleep(delay)

    self._db_schema_config = model_spec
    self._num_processes = num_processes
    self._process_index = process_index

    # Init and switch to correct database
    codec_opts = get_codec_options()
    self._db = Database(self._db, db_config.db_name, codec_options=codec_opts)

    self.snapshots = SnapshotCollectionContainer(
        self._db, db_config, model_spec, config.get("snapshots", {})
    )
    self._snapshot_bucket_size = db_config.storage.snapshot_bucket_size

    if process_index == 0:
        self._init_database_schema()

    self.schema_cleaner = SchemaCleaner(
        self._db, self.get_module_cache("Schema"), self._db_schema_config, config, self.log
    )

    self._on_entity_delete_one = []
    self._on_entity_delete_many = []

    self._raw_buffer_locks = {etype: threading.Lock() for etype in model_spec.entities}
    self._raw_buffers = defaultdict(list)
    self._master_buffer_locks = {etype: threading.Lock() for etype in model_spec.entities}
    self._master_buffers = defaultdict(dict)

    self._sched = Scheduler()
    seconds = ",".join(
        f"{int(i)}" for i in range(60) if int(i - process_index) % min(num_processes, 3) == 0
    )
    self._sched.register(self._push_raw, second=seconds, misfire_grace_time=5)
    self._sched.register(self._push_master, second=seconds, misfire_grace_time=5)

    self.log.info("Database successfully initialized!")

start

start() -> None

Starts the database sync of raw datapoint inserts.

Source code in dp3/database/database.py
def start(self) -> None:
    """Starts the database sync of raw datapoint inserts."""
    self._sched.start()

stop

stop() -> None

Stops the database sync, push remaining datapoints.

Source code in dp3/database/database.py
def stop(self) -> None:
    """Stops the database sync, push remaining datapoints."""
    self._sched.stop()
    self._push_raw()
    self._push_master()

register_on_entity_delete

register_on_entity_delete(f_one: Callable[[str, AnyEidT], None], f_many: Callable[[str, list[AnyEidT]], None])

Registers function to be called when entity is forcibly deleted.

Source code in dp3/database/database.py
def register_on_entity_delete(
    self, f_one: Callable[[str, AnyEidT], None], f_many: Callable[[str, list[AnyEidT]], None]
):
    """Registers function to be called when entity is forcibly deleted."""
    self._on_entity_delete_one.append(f_one)
    self._on_entity_delete_many.append(f_many)

update_schema

update_schema()

Checks whether schema saved in database is up-to-date and updates it if necessary.

Will NOT perform any changes in master collections on conflicting model changes. Any such changes must be performed via the CLI.

As this method modifies the schema collection, it should be called only by the main worker.

Source code in dp3/database/database.py
def update_schema(self):
    """
    Checks whether schema saved in database is up-to-date and updates it if necessary.

    Will NOT perform any changes in master collections on conflicting model changes.
    Any such changes must be performed via the CLI.

    As this method modifies the schema collection, it should be called only by the main worker.
    """
    try:
        self.schema_cleaner.safe_update_schema()
    except ValueError as e:
        raise DatabaseError("Schema update failed. Please run `dp3 schema-update`.") from e

await_updated_schema

await_updated_schema()

Checks whether schema saved in database is up-to-date and awaits its update by the main worker on mismatch.

Source code in dp3/database/database.py
def await_updated_schema(self):
    """
    Checks whether schema saved in database is up-to-date and awaits its update
    by the main worker on mismatch.
    """
    self.schema_cleaner.await_updated()

insert_datapoints

insert_datapoints(eid: AnyEidT, dps: list[DataPointBase], new_entity: bool = False) -> None

Inserts datapoint to raw data collection and updates master record.

Raises DatabaseError when insert or update fails.

Source code in dp3/database/database.py
def insert_datapoints(
    self, eid: AnyEidT, dps: list[DataPointBase], new_entity: bool = False
) -> None:
    """Inserts datapoint to raw data collection and updates master record.

    Raises DatabaseError when insert or update fails.
    """
    if len(dps) == 0:
        return

    etype = dps[0].etype

    # Check `etype`
    self._assert_etype_exists(etype)
    self._assert_eid_correct_dtype(etype, eid)

    # Insert raw datapoints
    dps_dicts = [dp.model_dump(exclude={"attr_type"}) for dp in dps]
    with self._raw_buffer_locks[etype]:
        self._raw_buffers[etype].extend(dps_dicts)

    # Update master document
    master_changes = {"pushes": defaultdict(list), "$set": {}}
    for dp in dps:
        attr_spec = self._db_schema_config.attr(etype, dp.attr)

        if attr_spec.t in AttrType.PLAIN | AttrType.OBSERVATIONS and attr_spec.is_iterable:
            v = [elem.model_dump() if isinstance(elem, BaseModel) else elem for elem in dp.v]
        else:
            v = dp.v.model_dump() if isinstance(dp.v, BaseModel) else dp.v

        # Rewrite value of plain attribute
        if attr_spec.t == AttrType.PLAIN:
            master_changes["$set"][dp.attr] = {"v": v, "ts_last_update": datetime.now()}

        # Push new data of observation
        if attr_spec.t == AttrType.OBSERVATIONS:
            master_changes["pushes"][dp.attr].append(
                {"t1": dp.t1, "t2": dp.t2, "v": v, "c": dp.c}
            )

        # Push new data of timeseries
        if attr_spec.t == AttrType.TIMESERIES:
            master_changes["pushes"][dp.attr].append({"t1": dp.t1, "t2": dp.t2, "v": v})

    if new_entity:
        master_changes["$set"]["#hash"] = HASH(f"{etype}:{eid}")
        master_changes["$set"]["#time_created"] = datetime.now()

    with self._master_buffer_locks[etype]:
        if eid in self._master_buffers[etype]:
            for attr, push_dps in master_changes["pushes"].items():
                if attr in self._master_buffers[etype][eid]["pushes"]:
                    self._master_buffers[etype][eid]["pushes"][attr].extend(push_dps)
                else:
                    self._master_buffers[etype][eid]["pushes"][attr] = push_dps
            self._master_buffers[etype][eid]["$set"].update(master_changes["$set"])
        else:
            self._master_buffers[etype][eid] = master_changes

update_master_records

update_master_records(etype: str, eids: list[AnyEidT], records: list[dict]) -> None

Replace master records of etype:eid with the provided records.

Raises DatabaseError when update fails.

Source code in dp3/database/database.py
def update_master_records(self, etype: str, eids: list[AnyEidT], records: list[dict]) -> None:
    """Replace master records of `etype`:`eid` with the provided `records`.

    Raises DatabaseError when update fails.
    """
    master_col = self._master_col(etype)
    try:
        res = master_col.bulk_write(
            [
                ReplaceOne({"_id": eid}, record, upsert=True)
                for eid, record in zip(eids, records)
            ],
            ordered=False,
        )
        self.log.debug("Updated master records of %s (%s).", etype, len(eids))
        for error in res.bulk_api_result.get("writeErrors", []):
            self.log.error("Error in bulk write: %s", error)
    except Exception as e:
        raise DatabaseError(f"Update of master records failed: {e}\n{records}") from e

extend_ttl

extend_ttl(etype: str, eid: AnyEidT, ttl_tokens: dict[str, datetime])

Extends TTL of given etype:eid by ttl_tokens.

Source code in dp3/database/database.py
def extend_ttl(self, etype: str, eid: AnyEidT, ttl_tokens: dict[str, datetime]):
    """Extends TTL of given `etype`:`eid` by `ttl_tokens`."""
    extensions = {
        f"#ttl.{token_name}": token_value for token_name, token_value in ttl_tokens.items()
    }
    with self._master_buffer_locks[etype]:
        buf = self._master_buffers[etype]
        if eid in buf:
            if "$max" in buf[eid]:
                for ttl_name, this_val in extensions.items():
                    curr_val = buf[eid]["$max"].get(ttl_name, datetime.min)
                    buf[eid]["$max"][ttl_name] = max(curr_val, this_val)
            else:
                self._master_buffers[etype][eid]["$max"] = extensions
        else:
            self._master_buffers[etype][eid] = {"$max": extensions}

remove_expired_ttls

remove_expired_ttls(etype: str, expired_eid_ttls: dict[AnyEidT, list[str]])

Removes expired TTL of given etype:eid.

Source code in dp3/database/database.py
def remove_expired_ttls(self, etype: str, expired_eid_ttls: dict[AnyEidT, list[str]]):
    """Removes expired TTL of given `etype`:`eid`."""
    master_col = self._master_col(etype)
    try:
        res = master_col.bulk_write(
            [
                UpdateOne(
                    {"_id": eid},
                    {"$unset": {f"#ttl.{token_name}": "" for token_name in expired_ttls}},
                )
                for eid, expired_ttls in expired_eid_ttls.items()
            ]
        )
        self.log.debug(
            "Removed expired TTL of %s: (%s, modified %s).",
            etype,
            len(expired_eid_ttls),
            res.modified_count,
        )
    except Exception as e:
        raise DatabaseError(f"TTL update failed: {e}") from e

delete_eids

delete_eids(etype: str, eids: list[AnyEidT])

Delete master record and all snapshots of etype:eids.

Source code in dp3/database/database.py
def delete_eids(self, etype: str, eids: list[AnyEidT]):
    """Delete master record and all snapshots of `etype`:`eids`."""
    master_col = self._master_col(etype)
    with self._master_buffer_locks[etype]:
        for eid in eids:
            if eid in self._master_buffers[etype]:
                del self._master_buffers[etype][eid]
    try:
        res = master_col.delete_many({"_id": {"$in": eids}})
        self.log.debug(
            "Deleted %s master records of %s (%s).", res.deleted_count, etype, len(eids)
        )
        self.elog.log("record_removed", count=res.deleted_count)
    except Exception as e:
        raise DatabaseError(f"Delete of master record failed: {e}\n{eids}") from e
    self.snapshots.delete_eids(etype, eids)

    for f in self._on_entity_delete_many:
        try:
            f(etype, eids)
        except Exception as e:
            self.log.exception("Error in on_entity_delete_many callback %s: %s", f, e)

delete_eid

delete_eid(etype: str, eid: AnyEidT)

Delete master record and all snapshots of etype:eid.

Source code in dp3/database/database.py
def delete_eid(self, etype: str, eid: AnyEidT):
    """Delete master record and all snapshots of `etype`:`eid`."""
    master_col = self._master_col(etype)
    with self._master_buffer_locks[etype]:
        if eid in self._master_buffers[etype]:
            del self._master_buffers[etype][eid]
    try:
        master_col.delete_one({"_id": eid})
        self.log.debug("Deleted master record of %s/%s.", etype, eid)
        self.elog.log("record_removed")
    except Exception as e:
        raise DatabaseError(f"Delete of master record failed: {e}\n{eid}") from e
    self.snapshots.delete_eid(etype, eid)

    for f in self._on_entity_delete_one:
        try:
            f(etype, eid)
        except Exception as e:
            self.log.exception("Error in on_entity_delete_one callback %s: %s", f, e)

mark_all_entity_dps_t2

mark_all_entity_dps_t2(etype: str, attrs: list[str]) -> UpdateResult

Updates the min_t2s of the master records of etype for all records.

Periodically called for all etypes from HistoryManager.

Source code in dp3/database/database.py
def mark_all_entity_dps_t2(self, etype: str, attrs: list[str]) -> UpdateResult:
    """
    Updates the `min_t2s` of the master records of `etype` for all records.

    Periodically called for all `etype`s from HistoryManager.
    """
    master_col = self._master_col(etype)
    try:
        return master_col.update_many(
            {},
            [
                {
                    "$set": {
                        attr_name: {
                            "$cond": {
                                "if": {
                                    "$eq": [
                                        {"$size": {"$ifNull": [f"${attr_name}", []]}},
                                        0,
                                    ]
                                },
                                "then": "$$REMOVE",
                                "else": f"${attr_name}",
                            }
                        }
                        for attr_name in attrs
                    }
                    | {
                        f"#min_t2s.{attr_name}": {
                            "$cond": {
                                "if": {
                                    "$eq": [
                                        {"$size": {"$ifNull": [f"${attr_name}", []]}},
                                        0,
                                    ]
                                },
                                "then": "$$REMOVE",
                                "else": {"$min": f"${attr_name}.t2"},
                            }
                        }
                        for attr_name in attrs
                    }
                }
            ],
        )
    except Exception as e:
        raise DatabaseError(f"Update of min_t2s failed: {e}") from e

delete_old_dps

delete_old_dps(etype: str, attr_name: str, t_old: datetime) -> UpdateResult

Delete old datapoints from master collection.

Periodically called for all etypes from HistoryManager.

Source code in dp3/database/database.py
def delete_old_dps(self, etype: str, attr_name: str, t_old: datetime) -> UpdateResult:
    """Delete old datapoints from master collection.

    Periodically called for all `etype`s from HistoryManager.
    """
    master_col = self._master_col(etype, write_concern=WriteConcern(w=1))
    try:
        return master_col.update_many(
            {f"#min_t2s.{attr_name}": {"$lt": t_old}},
            [
                {
                    "$set": {
                        attr_name: {
                            "$filter": {
                                "input": f"${attr_name}",
                                "cond": {"$gte": ["$$this.t2", t_old]},
                            }
                        }
                    }
                },
                {
                    "$set": {
                        f"#min_t2s.{attr_name}": {
                            "$cond": {
                                "if": {
                                    "$eq": [
                                        {"$size": {"$ifNull": [f"${attr_name}", []]}},
                                        0,
                                    ]
                                },
                                "then": "$$REMOVE",
                                "else": {"$min": f"${attr_name}.t2"},
                            }
                        }
                    },
                },
            ],
        )
    except Exception as e:
        raise DatabaseError(f"Delete of old datapoints failed: {e}") from e
delete_link_dps(etype: str, affected_eids: list[AnyEidT], attr_name: str, eid_to: AnyEidT) -> None

Delete link datapoints from master collection.

Called from LinkManager for deleted entities.

Source code in dp3/database/database.py
def delete_link_dps(
    self, etype: str, affected_eids: list[AnyEidT], attr_name: str, eid_to: AnyEidT
) -> None:
    """Delete link datapoints from master collection.

    Called from LinkManager for deleted entities.
    """
    master_col = self._master_col(etype)
    attr_type = self._db_schema_config.attr(etype, attr_name).t
    filter_cond = {"_id": {"$in": affected_eids}}
    try:
        if attr_type == AttrType.OBSERVATIONS:
            update_pull = {"$pull": {attr_name: {"v.eid": eid_to}}}
            master_col.update_many(filter_cond, update_pull)
        elif attr_type == AttrType.PLAIN:
            update_unset = {"$unset": {attr_name: ""}}
            master_col.update_many(filter_cond, update_unset)
        else:
            raise ValueError(f"Unsupported attribute type: {attr_type}")
    except Exception as e:
        raise DatabaseError(f"Delete of link datapoints failed: {e}") from e
delete_many_link_dps(etypes: list[str], affected_eids: list[list[AnyEidT]], attr_names: list[str], eids_to: list[list[AnyEidT]]) -> None

Delete link datapoints from master collection.

Called from LinkManager for deleted entities, when deleting multiple entities.

Source code in dp3/database/database.py
def delete_many_link_dps(
    self,
    etypes: list[str],
    affected_eids: list[list[AnyEidT]],
    attr_names: list[str],
    eids_to: list[list[AnyEidT]],
) -> None:
    """Delete link datapoints from master collection.

    Called from LinkManager for deleted entities, when deleting multiple entities.
    """
    try:
        updates = []
        for etype, affected_eid_list, attr_name, eid_to_list in zip(
            etypes, affected_eids, attr_names, eids_to
        ):
            master_col = self._master_col(etype)
            attr_type = self._db_schema_config.attr(etype, attr_name).t
            filter_cond = {"_id": {"$in": affected_eid_list}}
            if attr_type == AttrType.OBSERVATIONS:
                update_pull = {"$pull": {attr_name: {"v.eid": {"$in": eid_to_list}}}}
                updates.append(UpdateMany(filter_cond, update_pull))
            elif attr_type == AttrType.PLAIN:
                update_unset = {"$unset": {attr_name: ""}}
                updates.append(UpdateMany(filter_cond, update_unset))
            else:
                raise ValueError(f"Unsupported attribute type: {attr_type}")
            master_col.bulk_write(updates)
    except Exception as e:
        raise DatabaseError(f"Delete of link datapoints failed: {e}") from e

get_master_record

get_master_record(etype: str, eid: AnyEidT, **kwargs) -> dict

Get current master record for etype/eid.

If doesn't exist, returns {}.

Source code in dp3/database/database.py
def get_master_record(self, etype: str, eid: AnyEidT, **kwargs) -> dict:
    """Get current master record for etype/eid.

    If doesn't exist, returns {}.
    """
    self._assert_etype_exists(etype)
    self._assert_eid_correct_dtype(etype, eid)

    master_col = self._master_col(etype)

    return master_col.find_one({"_id": eid}, **kwargs) or {}

ekey_exists

ekey_exists(etype: str, eid: AnyEidT) -> bool

Checks whether master record for etype/eid exists

Source code in dp3/database/database.py
def ekey_exists(self, etype: str, eid: AnyEidT) -> bool:
    """Checks whether master record for etype/eid exists"""
    with self._master_buffer_locks[etype]:
        in_cache = eid in self._master_buffers[etype]
    return in_cache or bool(self.get_master_record(etype, eid, projection={"_id": 1}))

get_master_records

get_master_records(etype: str, **kwargs) -> Cursor

Get cursor to current master records of etype.

Source code in dp3/database/database.py
def get_master_records(self, etype: str, **kwargs) -> Cursor:
    """Get cursor to current master records of etype."""
    self._assert_etype_exists(etype)

    master_col = self._master_col(etype)
    return master_col.find({}, **kwargs)

get_worker_master_records

get_worker_master_records(worker_index: int, worker_cnt: int, etype: str, query_filter: dict = None, **kwargs) -> Cursor

Get cursor to current master records of etype.

Source code in dp3/database/database.py
def get_worker_master_records(
    self, worker_index: int, worker_cnt: int, etype: str, query_filter: dict = None, **kwargs
) -> Cursor:
    """Get cursor to current master records of etype."""
    if etype not in self._db_schema_config.entities:
        raise DatabaseError(f"Entity '{etype}' does not exist")

    query_filter = {} if query_filter is None else query_filter
    master_col = self._master_col(etype)
    return master_col.find(
        {"#hash": {"$mod": [worker_cnt, worker_index]}, **query_filter}, **kwargs
    )

get_value_or_history

get_value_or_history(etype: str, attr_name: str, eid: AnyEidT, t1: Optional[datetime] = None, t2: Optional[datetime] = None) -> dict

Gets current value and/or history of attribute for given eid.

Depends on attribute type: - plain: just (current) value - observations: (current) value and history stored in master record (optionally filtered) - timeseries: just history stored in master record (optionally filtered)

Returns dict with two keys: current_value and history (list of values).

Source code in dp3/database/database.py
def get_value_or_history(
    self,
    etype: str,
    attr_name: str,
    eid: AnyEidT,
    t1: Optional[datetime] = None,
    t2: Optional[datetime] = None,
) -> dict:
    """Gets current value and/or history of attribute for given `eid`.

    Depends on attribute type:
    - plain: just (current) value
    - observations: (current) value and history stored in master record (optionally filtered)
    - timeseries: just history stored in master record (optionally filtered)

    Returns dict with two keys: `current_value` and `history` (list of values).
    """
    self._assert_etype_exists(etype)
    self._assert_eid_correct_dtype(etype, eid)

    attr_spec = self._db_schema_config.attr(etype, attr_name)

    result = {"current_value": None, "history": []}

    # Add current value to the result
    if attr_spec.t == AttrType.PLAIN:
        result["current_value"] = (
            self.get_master_record(etype, eid).get(attr_name, {}).get("v", None)
        )
    elif attr_spec.t == AttrType.OBSERVATIONS:
        result["current_value"] = self.snapshots.get_latest_one(etype, eid).get(attr_name, None)

    # Add history
    if attr_spec.t == AttrType.OBSERVATIONS:
        result["history"] = self.get_observation_history(etype, attr_name, eid, t1, t2)
    elif attr_spec.t == AttrType.TIMESERIES:
        result["history"] = self.get_timeseries_history(etype, attr_name, eid, t1, t2)

    return result

estimate_count_eids

estimate_count_eids(etype: str) -> int

Estimates count of eids in given etype

Source code in dp3/database/database.py
def estimate_count_eids(self, etype: str) -> int:
    """Estimates count of `eid`s in given `etype`"""
    self._assert_etype_exists(etype)

    master_col = self._master_col(etype)
    return master_col.estimated_document_count({})

save_metadata

save_metadata(time: datetime, metadata: dict, worker_id: Optional[int] = None)

Saves metadata dict under the caller module and passed timestamp.

Source code in dp3/database/database.py
def save_metadata(self, time: datetime, metadata: dict, worker_id: Optional[int] = None):
    """Saves metadata dict under the caller module and passed timestamp."""
    module = get_caller_id()
    metadata["_id"] = self._get_metadata_id(module, time, worker_id)
    metadata["#module"] = module
    metadata["#time_created"] = time
    metadata["#last_update"] = datetime.now()
    try:
        self._db["#metadata"].insert_one(metadata)
        self.log.debug("Inserted metadata %s: %s", metadata["_id"], metadata)
    except Exception as e:
        raise DatabaseError(f"Insert of metadata failed: {e}\n{metadata}") from e

update_metadata

update_metadata(time: datetime, metadata: dict, increase: dict = None, worker_id: Optional[int] = None)

Updates existing metadata of caller module and passed timestamp.

Source code in dp3/database/database.py
def update_metadata(
    self, time: datetime, metadata: dict, increase: dict = None, worker_id: Optional[int] = None
):
    """Updates existing metadata of caller module and passed timestamp."""
    module = get_caller_id()
    metadata_id = self._get_metadata_id(module, time, worker_id)
    metadata["#last_update"] = datetime.now()

    changes = {"$set": metadata} if increase is None else {"$set": metadata, "$inc": increase}

    try:
        res = self._db["#metadata"].update_one({"_id": metadata_id}, changes, upsert=True)
        self.log.debug(
            "Updated metadata %s, changes: %s, result: %s", metadata_id, changes, res.raw_result
        )
    except Exception as e:
        raise DatabaseError(f"Update of metadata failed: {e}\n{metadata_id}, {changes}") from e

get_observation_history

get_observation_history(etype: str, attr_name: str, eid: AnyEidT, t1: datetime = None, t2: datetime = None, sort: int = None) -> list[dict]

Get full (or filtered) history of observation attribute.

This method is useful for displaying eid's history on web. Also used to feed data into get_timeseries_history().

Parameters:

Name Type Description Default
etype str

entity type

required
attr_name str

name of attribute

required
eid AnyEidT

id of entity, to which data-points correspond

required
t1 datetime

left value of time interval (inclusive)

None
t2 datetime

right value of time interval (inclusive)

None
sort int

sort by timestamps - 0: ascending order by t1, 1: descending order by t2, None: don't sort

None

Returns: list of dicts (reduced datapoints)

Source code in dp3/database/database.py
def get_observation_history(
    self,
    etype: str,
    attr_name: str,
    eid: AnyEidT,
    t1: datetime = None,
    t2: datetime = None,
    sort: int = None,
) -> list[dict]:
    """Get full (or filtered) history of observation attribute.

    This method is useful for displaying `eid`'s history on web.
    Also used to feed data into `get_timeseries_history()`.

    Args:
        etype: entity type
        attr_name: name of attribute
        eid: id of entity, to which data-points correspond
        t1: left value of time interval (inclusive)
        t2: right value of time interval (inclusive)
        sort: sort by timestamps - 0: ascending order by t1, 1: descending order by t2,
            None: don't sort
    Returns:
        list of dicts (reduced datapoints)
    """
    t1 = datetime.fromtimestamp(0) if t1 is None else t1.replace(tzinfo=None)
    t2 = datetime.now() if t2 is None else t2.replace(tzinfo=None)

    # Get attribute history
    mr = self.get_master_record(etype, eid)
    attr_history = mr.get(attr_name, [])

    # Filter
    attr_history_filtered = [row for row in attr_history if row["t1"] <= t2 and row["t2"] >= t1]

    # Sort
    if sort == 1:
        attr_history_filtered.sort(key=lambda row: row["t1"])
    elif sort == 2:
        attr_history_filtered.sort(key=lambda row: row["t2"], reverse=True)

    return attr_history_filtered

get_timeseries_history

get_timeseries_history(etype: str, attr_name: str, eid: AnyEidT, t1: datetime = None, t2: datetime = None, sort: int = None) -> list[dict]

Get full (or filtered) history of timeseries attribute. Outputs them in format:

    [
        {
            "t1": ...,
            "t2": ...,
            "v": {
                "series1": ...,
                "series2": ...
            }
        },
        ...
    ]
This method is useful for displaying eid's history on web.

Parameters:

Name Type Description Default
etype str

entity type

required
attr_name str

name of attribute

required
eid AnyEidT

id of entity, to which data-points correspond

required
t1 datetime

left value of time interval (inclusive)

None
t2 datetime

right value of time interval (inclusive)

None
sort int

sort by timestamps - 0: ascending order by t1, 1: descending order by t2, None: don't sort

None

Returns: list of dicts (reduced datapoints) - each represents just one point at time

Source code in dp3/database/database.py
def get_timeseries_history(
    self,
    etype: str,
    attr_name: str,
    eid: AnyEidT,
    t1: datetime = None,
    t2: datetime = None,
    sort: int = None,
) -> list[dict]:
    """Get full (or filtered) history of timeseries attribute.
    Outputs them in format:
    ```
        [
            {
                "t1": ...,
                "t2": ...,
                "v": {
                    "series1": ...,
                    "series2": ...
                }
            },
            ...
        ]
    ```
    This method is useful for displaying `eid`'s history on web.

    Args:
        etype: entity type
        attr_name: name of attribute
        eid: id of entity, to which data-points correspond
        t1: left value of time interval (inclusive)
        t2: right value of time interval (inclusive)
        sort: sort by timestamps - `0`: ascending order by `t1`, `1`: descending order by `t2`,
            `None`: don't sort
    Returns:
         list of dicts (reduced datapoints) - each represents just one point at time
    """
    t1 = datetime.fromtimestamp(0) if t1 is None else t1.replace(tzinfo=None)
    t2 = datetime.now() if t2 is None else t2.replace(tzinfo=None)

    attr_history = self.get_observation_history(etype, attr_name, eid, t1, t2, sort)
    if not attr_history:
        return []

    attr_history_split = self._split_timeseries_dps(etype, attr_name, attr_history)

    # Filter out rows outside [t1, t2] interval
    attr_history_filtered = [
        row for row in attr_history_split if row["t1"] <= t2 and row["t2"] >= t1
    ]

    return attr_history_filtered

move_raw_to_archive

move_raw_to_archive(etype: str)

Rename the current raw collection to archive collection.

Multiple archive collections can exist for one entity type, though they are exported and dropped over time.

Source code in dp3/database/database.py
def move_raw_to_archive(self, etype: str):
    """Rename the current raw collection to archive collection.

    Multiple archive collections can exist for one entity type,
    though they are exported and dropped over time.
    """
    raw_col_name = self._raw_col_name(etype)
    archive_col_name = self._get_new_archive_col_name(etype)
    try:
        if self._db.list_collection_names(filter={"name": raw_col_name}):
            self._db[raw_col_name].rename(archive_col_name)
            return archive_col_name
        return None
    except Exception as e:
        raise DatabaseError(f"Move of raw collection failed: {e}") from e

get_archive

get_archive(etype: str, after: datetime, before: datetime) -> Iterator[Cursor]

Get archived raw datapoints where t1 is in <after, before).

All plain datapoints will be returned (default).

Source code in dp3/database/database.py
def get_archive(self, etype: str, after: datetime, before: datetime) -> Iterator[Cursor]:
    """Get archived raw datapoints where `t1` is in <`after`, `before`).

    All plain datapoints will be returned (default).
    """
    query_filter = {"$or": [{"t1": {"$gte": after, "$lt": before}}, {"t1": None}]}
    for archive_col in self._archive_cols(etype):
        try:
            yield archive_col.find(query_filter)
        except Exception as e:
            raise DatabaseError(f"Archive collection {archive_col} fetch failed: {e}") from e

delete_old_archived_dps

delete_old_archived_dps(etype: str, before: datetime) -> Iterator[DeleteResult]

Delete archived raw datapoints older than before.

Deletes all plain datapoints if plain is True (default).

Source code in dp3/database/database.py
def delete_old_archived_dps(self, etype: str, before: datetime) -> Iterator[DeleteResult]:
    """Delete archived raw datapoints older than `before`.

    Deletes all plain datapoints if `plain` is `True` (default).
    """
    query_filter = {"$or": [{"t1": {"$lt": before}}, {"t1": None}]}
    for archive_col in self._archive_cols(etype):
        try:
            yield archive_col.delete_many(query_filter)
        except Exception as e:
            raise DatabaseError(f"Delete of old datapoints failed: {e}") from e

drop_empty_archives

drop_empty_archives(etype: str) -> int

Drop empty archive collections.

Source code in dp3/database/database.py
def drop_empty_archives(self, etype: str) -> int:
    """Drop empty archive collections."""
    dropped_count = 0
    for col in self._archive_cols(etype):
        try:
            if col.estimated_document_count() < 1000 and col.count_documents({}) == 0:
                col.drop()
                dropped_count += 1
        except Exception as e:
            raise DatabaseError(f"Drop of empty archive failed: {e}") from e
    return dropped_count

get_module_cache

get_module_cache(override_called_id: Optional[str] = None)

Return a persistent cache collection for given module name.

Module name is determined automatically, but you can override it.

Source code in dp3/database/database.py
def get_module_cache(self, override_called_id: Optional[str] = None):
    """Return a persistent cache collection for given module name.

    Module name is determined automatically, but you can override it.
    """
    module = override_called_id or get_caller_id()
    self.log.debug("Cache collection access: %s", module)
    return self._db[f"#cache#{module}"]

get_estimated_entity_count

get_estimated_entity_count(entity_type: str) -> int

Get count of entities of given type.

Source code in dp3/database/database.py
def get_estimated_entity_count(self, entity_type: str) -> int:
    """Get count of entities of given type."""
    return self._master_col(entity_type).estimated_document_count()

get_caller_id

get_caller_id()

Returns the name of the caller method's class, or function name if caller is not a method.

Source code in dp3/database/database.py
def get_caller_id():
    """Returns the name of the caller method's class, or function name if caller is not a method."""
    caller = inspect.stack()[2]
    if module := caller.frame.f_locals.get("self"):
        return module.__class__.__qualname__
    return caller.function