Skip to content

dp3.database.database

MongoHostConfig

Bases: BaseModel

MongoDB host.

MongoStandaloneConfig

Bases: BaseModel

MongoDB standalone configuration.

MongoReplicaConfig

Bases: BaseModel

MongoDB replica set configuration.

MongoConfig

Bases: BaseModel

Database configuration.

EntityDatabase

EntityDatabase(db_conf: HierarchicalDict, model_spec: ModelSpec, num_processes: int, elog: Optional[EventGroupType] = None)

MongoDB database wrapper responsible for whole communication with database server. Initializes database schema based on database configuration.

Parameters:

Name Type Description Default
db_conf HierarchicalDict

configuration of database connection (content of database.yml)

required
model_spec ModelSpec

ModelSpec object, configuration of data model (entities and attributes)

required
num_processes int

number of worker processes

required
Source code in dp3/database/database.py
def __init__(
    self,
    db_conf: HierarchicalDict,
    model_spec: ModelSpec,
    num_processes: int,
    elog: Optional[EventGroupType] = None,
) -> None:
    self.log = logging.getLogger("EntityDatabase")
    self.elog = elog or DummyEventGroup()

    config = MongoConfig.model_validate(db_conf)

    self.log.info("Connecting to database...")
    for attempt, delay in enumerate(RECONNECT_DELAYS):
        try:
            self._db = self.connect(config)
            # Check if connected
            self._db.admin.command("ping")
        except pymongo.errors.ConnectionFailure as e:
            if attempt + 1 == len(RECONNECT_DELAYS):
                raise DatabaseError(
                    "Cannot connect to database with specified connection arguments."
                ) from e
            else:
                self.log.error(
                    "Cannot connect to database (attempt %d, retrying in %ds).",
                    attempt + 1,
                    delay,
                )
                time.sleep(delay)

    self._db_schema_config = model_spec
    self._num_processes = num_processes

    # Init and switch to correct database
    self._db = self._db[config.db_name]
    self._init_database_schema(config.db_name)

    self.schema_cleaner = SchemaCleaner(
        self._db, self.get_module_cache("Schema"), self._db_schema_config, self.log
    )

    self._on_entity_delete_one = []
    self._on_entity_delete_many = []

    self.log.info("Database successfully initialized!")

register_on_entity_delete

register_on_entity_delete(f_one: Callable[[str, str], None], f_many: Callable[[str, list[str]], None])

Registers function to be called when entity is forcibly deleted.

Source code in dp3/database/database.py
def register_on_entity_delete(
    self, f_one: Callable[[str, str], None], f_many: Callable[[str, list[str]], None]
):
    """Registers function to be called when entity is forcibly deleted."""
    self._on_entity_delete_one.append(f_one)
    self._on_entity_delete_many.append(f_many)

update_schema

update_schema()

Checks whether schema saved in database is up-to-date and updates it if necessary.

Will NOT perform any changes in master collections on conflicting model changes. Any such changes must be performed via the CLI.

As this method modifies the schema collection, it should be called only by the main worker.

Source code in dp3/database/database.py
def update_schema(self):
    """
    Checks whether schema saved in database is up-to-date and updates it if necessary.

    Will NOT perform any changes in master collections on conflicting model changes.
    Any such changes must be performed via the CLI.

    As this method modifies the schema collection, it should be called only by the main worker.
    """
    try:
        self.schema_cleaner.safe_update_schema()
    except ValueError as e:
        raise DatabaseError("Schema update failed. Please run `dp3 schema-update`.") from e

await_updated_schema

await_updated_schema()

Checks whether schema saved in database is up-to-date and awaits its update by the main worker on mismatch.

Source code in dp3/database/database.py
def await_updated_schema(self):
    """
    Checks whether schema saved in database is up-to-date and awaits its update
    by the main worker on mismatch.
    """
    self.schema_cleaner.await_updated()

insert_datapoints

insert_datapoints(etype: str, eid: str, dps: list[DataPointBase], new_entity: bool = False) -> None

Inserts datapoint to raw data collection and updates master record.

Raises DatabaseError when insert or update fails.

Source code in dp3/database/database.py
def insert_datapoints(
    self, etype: str, eid: str, dps: list[DataPointBase], new_entity: bool = False
) -> None:
    """Inserts datapoint to raw data collection and updates master record.

    Raises DatabaseError when insert or update fails.
    """
    if len(dps) == 0:
        return

    etype = dps[0].etype

    # Check `etype`
    self._assert_etype_exists(etype)

    # Insert raw datapoints
    raw_col = self._raw_col_name(etype)
    dps_dicts = [dp.model_dump(exclude={"attr_type"}) for dp in dps]
    try:
        self._db[raw_col].insert_many(dps_dicts)
        self.log.debug(f"Inserted datapoints to raw collection:\n{dps}")
    except Exception as e:
        raise DatabaseError(f"Insert of datapoints failed: {e}\n{dps}") from e

    # Update master document
    master_changes = {"$push": {}, "$set": {}}
    for dp in dps:
        attr_spec = self._db_schema_config.attr(etype, dp.attr)

        if attr_spec.t in AttrType.PLAIN | AttrType.OBSERVATIONS and attr_spec.is_iterable:
            v = [elem.model_dump() if isinstance(elem, BaseModel) else elem for elem in dp.v]
        else:
            v = dp.v.model_dump() if isinstance(dp.v, BaseModel) else dp.v

        # Rewrite value of plain attribute
        if attr_spec.t == AttrType.PLAIN:
            master_changes["$set"][dp.attr] = {"v": v, "ts_last_update": datetime.now()}

        # Push new data of observation
        if attr_spec.t == AttrType.OBSERVATIONS:
            if dp.attr in master_changes["$push"]:
                # Support multiple datapoints being pushed in the same request
                if "$each" not in master_changes["$push"][dp.attr]:
                    saved_dp = master_changes["$push"][dp.attr]
                    master_changes["$push"][dp.attr] = {"$each": [saved_dp]}
                master_changes["$push"][dp.attr]["$each"].append(
                    {"t1": dp.t1, "t2": dp.t2, "v": v, "c": dp.c}
                )
            else:
                # Otherwise just push one datapoint
                master_changes["$push"][dp.attr] = {"t1": dp.t1, "t2": dp.t2, "v": v, "c": dp.c}

        # Push new data of timeseries
        if attr_spec.t == AttrType.TIMESERIES:
            if dp.attr in master_changes["$push"]:
                # Support multiple datapoints being pushed in the same request
                if "$each" not in master_changes["$push"][dp.attr]:
                    saved_dp = master_changes["$push"][dp.attr]
                    master_changes["$push"][dp.attr] = {"$each": [saved_dp]}
                master_changes["$push"][dp.attr]["$each"].append(
                    {"t1": dp.t1, "t2": dp.t2, "v": v}
                )
            else:
                # Otherwise just push one datapoint
                master_changes["$push"][dp.attr] = {"t1": dp.t1, "t2": dp.t2, "v": v}

    if new_entity:
        master_changes["$set"]["#hash"] = HASH(f"{etype}:{eid}")
        master_changes["$set"]["#time_created"] = datetime.now()

    master_col = self._master_col_name(etype)
    try:
        self._db[master_col].update_one({"_id": eid}, master_changes, upsert=True)
        self.log.debug(f"Updated master record of {etype} {eid}: {master_changes}")
    except Exception as e:
        raise DatabaseError(f"Update of master record failed: {e}\n{dps}") from e

update_master_records

update_master_records(etype: str, eids: list[str], records: list[dict]) -> None

Replace master records of etype:eid with the provided records.

Raises DatabaseError when update fails.

Source code in dp3/database/database.py
def update_master_records(self, etype: str, eids: list[str], records: list[dict]) -> None:
    """Replace master records of `etype`:`eid` with the provided `records`.

    Raises DatabaseError when update fails.
    """
    master_col = self._master_col_name(etype)
    try:
        self._db[master_col].bulk_write(
            [
                ReplaceOne({"_id": eid}, record, upsert=True)
                for eid, record in zip(eids, records)
            ]
        )
        self.log.debug("Updated master records of %s (%s).", etype, len(eids))
    except Exception as e:
        raise DatabaseError(f"Update of master records failed: {e}\n{records}") from e

extend_ttl

extend_ttl(etype: str, eid: str, ttl_tokens: dict[str, datetime])

Extends TTL of given etype:eid by ttl_tokens.

Source code in dp3/database/database.py
def extend_ttl(self, etype: str, eid: str, ttl_tokens: dict[str, datetime]):
    """Extends TTL of given `etype`:`eid` by `ttl_tokens`."""
    master_col = self._master_col_name(etype)
    try:
        self._db[master_col].update_one(
            {"_id": eid},
            {
                "$max": {
                    f"#ttl.{token_name}": token_value
                    for token_name, token_value in ttl_tokens.items()
                }
            },
        )
        self.log.debug("Updated TTL of %s: %s.", etype, eid)
    except Exception as e:
        raise DatabaseError(f"TTL update failed: {e} ({ttl_tokens})") from e

remove_expired_ttls

remove_expired_ttls(etype: str, expired_eid_ttls: dict[str, list[str]])

Removes expired TTL of given etype:eid.

Source code in dp3/database/database.py
def remove_expired_ttls(self, etype: str, expired_eid_ttls: dict[str, list[str]]):
    """Removes expired TTL of given `etype`:`eid`."""
    master_col = self._master_col_name(etype)
    try:
        res = self._db[master_col].bulk_write(
            [
                UpdateOne(
                    {"_id": eid},
                    {"$unset": {f"#ttl.{token_name}": "" for token_name in expired_ttls}},
                )
                for eid, expired_ttls in expired_eid_ttls.items()
            ]
        )
        self.log.debug(
            "Removed expired TTL of %s: (%s, modified %s).",
            etype,
            len(expired_eid_ttls),
            res.modified_count,
        )
    except Exception as e:
        raise DatabaseError(f"TTL update failed: {e}") from e

delete_eids

delete_eids(etype: str, eids: list[str])

Delete master record and all snapshots of etype:eids.

Source code in dp3/database/database.py
def delete_eids(self, etype: str, eids: list[str]):
    """Delete master record and all snapshots of `etype`:`eids`."""
    master_col = self._master_col_name(etype)
    snapshot_col = self._snapshots_col_name(etype)
    try:
        res = self._db[master_col].delete_many({"_id": {"$in": eids}})
        self.log.debug(
            "Deleted %s master records of %s (%s).", res.deleted_count, etype, len(eids)
        )
        self.elog.log("record_removed", count=res.deleted_count)
    except Exception as e:
        raise DatabaseError(f"Delete of master record failed: {e}\n{eids}") from e
    try:
        res = self._db[snapshot_col].delete_many({"eid": {"$in": eids}})
        self.log.debug("Deleted %s snapshots of %s (%s).", res.deleted_count, etype, len(eids))
    except Exception as e:
        raise DatabaseError(f"Delete of snapshots failed: {e}\n{eids}") from e
    for f in self._on_entity_delete_many:
        try:
            f(etype, eids)
        except Exception as e:
            self.log.exception("Error in on_entity_delete_many callback %s: %s", f, e)

delete_eid

delete_eid(etype: str, eid: str)

Delete master record and all snapshots of etype:eid.

Source code in dp3/database/database.py
def delete_eid(self, etype: str, eid: str):
    """Delete master record and all snapshots of `etype`:`eid`."""
    master_col = self._master_col_name(etype)
    snapshot_col = self._snapshots_col_name(etype)
    try:
        self._db[master_col].delete_one({"_id": eid})
        self.log.debug("Deleted master record of %s/%s.", etype, eid)
        self.elog.log("record_removed")
    except Exception as e:
        raise DatabaseError(f"Delete of master record failed: {e}\n{eid}") from e
    try:
        res = self._db[snapshot_col].delete_many({"eid": eid})
        self.log.debug("deleted %s snapshots of %s/%s.", res.deleted_count, etype, eid)
    except Exception as e:
        raise DatabaseError(f"Delete of snapshots failed: {e}\n{eid}") from e
    for f in self._on_entity_delete_one:
        try:
            f(etype, eid)
        except Exception as e:
            self.log.exception("Error in on_entity_delete_one callback %s: %s", f, e)

delete_old_dps

delete_old_dps(etype: str, attr_name: str, t_old: datetime) -> None

Delete old datapoints from master collection.

Periodically called for all etypes from HistoryManager.

Source code in dp3/database/database.py
def delete_old_dps(self, etype: str, attr_name: str, t_old: datetime) -> None:
    """Delete old datapoints from master collection.

    Periodically called for all `etype`s from HistoryManager.
    """
    master_col = self._master_col_name(etype)
    try:
        self._db[master_col].update_many({}, {"$pull": {attr_name: {"t2": {"$lt": t_old}}}})
    except Exception as e:
        raise DatabaseError(f"Delete of old datapoints failed: {e}") from e
delete_link_dps(etype: str, affected_eids: list[str], attr_name: str, eid_to: str) -> None

Delete link datapoints from master collection.

Called from LinkManager for deleted entities.

Source code in dp3/database/database.py
def delete_link_dps(
    self, etype: str, affected_eids: list[str], attr_name: str, eid_to: str
) -> None:
    """Delete link datapoints from master collection.

    Called from LinkManager for deleted entities.
    """
    master_col = self._master_col_name(etype)
    attr_type = self._db_schema_config.attr(etype, attr_name).t
    filter_cond = {"_id": {"$in": affected_eids}}
    try:
        if attr_type == AttrType.OBSERVATIONS:
            update_pull = {"$pull": {attr_name: {"v.eid": eid_to}}}
            self._db[master_col].update_many(filter_cond, update_pull)
        elif attr_type == AttrType.PLAIN:
            update_unset = {"$unset": {attr_name: ""}}
            self._db[master_col].update_many(filter_cond, update_unset)
        else:
            raise ValueError(f"Unsupported attribute type: {attr_type}")
    except Exception as e:
        raise DatabaseError(f"Delete of link datapoints failed: {e}") from e
delete_many_link_dps(etypes: list[str], affected_eids: list[list[str]], attr_names: list[str], eids_to: list[list[str]]) -> None

Delete link datapoints from master collection.

Called from LinkManager for deleted entities, when deleting multiple entities.

Source code in dp3/database/database.py
def delete_many_link_dps(
    self,
    etypes: list[str],
    affected_eids: list[list[str]],
    attr_names: list[str],
    eids_to: list[list[str]],
) -> None:
    """Delete link datapoints from master collection.

    Called from LinkManager for deleted entities, when deleting multiple entities.
    """
    try:
        updates = []
        for etype, affected_eid_list, attr_name, eid_to_list in zip(
            etypes, affected_eids, attr_names, eids_to
        ):
            master_col = self._master_col_name(etype)
            attr_type = self._db_schema_config.attr(etype, attr_name).t
            filter_cond = {"_id": {"$in": affected_eid_list}}
            if attr_type == AttrType.OBSERVATIONS:
                update_pull = {"$pull": {attr_name: {"v.eid": {"$in": eid_to_list}}}}
                updates.append(UpdateMany(filter_cond, update_pull))
            elif attr_type == AttrType.PLAIN:
                update_unset = {"$unset": {attr_name: ""}}
                updates.append(UpdateMany(filter_cond, update_unset))
            else:
                raise ValueError(f"Unsupported attribute type: {attr_type}")
            self._db[master_col].bulk_write(updates)
    except Exception as e:
        raise DatabaseError(f"Delete of link datapoints failed: {e}") from e

get_master_record

get_master_record(etype: str, eid: str, **kwargs) -> dict

Get current master record for etype/eid.

If doesn't exist, returns {}.

Source code in dp3/database/database.py
def get_master_record(self, etype: str, eid: str, **kwargs) -> dict:
    """Get current master record for etype/eid.

    If doesn't exist, returns {}.
    """
    # Check `etype`
    self._assert_etype_exists(etype)

    master_col = self._master_col_name(etype)
    return self._db[master_col].find_one({"_id": eid}, **kwargs) or {}

ekey_exists

ekey_exists(etype: str, eid: str) -> bool

Checks whether master record for etype/eid exists

Source code in dp3/database/database.py
def ekey_exists(self, etype: str, eid: str) -> bool:
    """Checks whether master record for etype/eid exists"""
    return bool(self.get_master_record(etype, eid))

get_master_records

get_master_records(etype: str, **kwargs) -> pymongo.cursor.Cursor

Get cursor to current master records of etype.

Source code in dp3/database/database.py
def get_master_records(self, etype: str, **kwargs) -> pymongo.cursor.Cursor:
    """Get cursor to current master records of etype."""
    # Check `etype`
    self._assert_etype_exists(etype)

    master_col = self._master_col_name(etype)
    return self._db[master_col].find({}, **kwargs)

get_worker_master_records

get_worker_master_records(worker_index: int, worker_cnt: int, etype: str, query_filter: dict = None, **kwargs) -> pymongo.cursor.Cursor

Get cursor to current master records of etype.

Source code in dp3/database/database.py
def get_worker_master_records(
    self, worker_index: int, worker_cnt: int, etype: str, query_filter: dict = None, **kwargs
) -> pymongo.cursor.Cursor:
    """Get cursor to current master records of etype."""
    if etype not in self._db_schema_config.entities:
        raise DatabaseError(f"Entity '{etype}' does not exist")

    query_filter = {} if query_filter is None else query_filter
    master_col = self._master_col_name(etype)
    return self._db[master_col].find(
        {"#hash": {"$mod": [worker_cnt, worker_index]}, **query_filter}, **kwargs
    )

get_latest_snapshot

get_latest_snapshot(etype: str, eid: str) -> dict

Get latest snapshot of given etype/eid.

If doesn't exist, returns {}.

Source code in dp3/database/database.py
def get_latest_snapshot(self, etype: str, eid: str) -> dict:
    """Get latest snapshot of given etype/eid.

    If doesn't exist, returns {}.
    """
    # Check `etype`
    self._assert_etype_exists(etype)

    snapshot_col = self._snapshots_col_name(etype)
    return self._db[snapshot_col].find_one({"eid": eid}, sort=[("_id", -1)]) or {}

get_latest_snapshots

get_latest_snapshots(etype: str, fulltext_filters: Optional[dict[str, str]] = None) -> tuple[pymongo.cursor.Cursor, int]

Get latest snapshots of given etype.

This method is useful for displaying data on web.

Returns only documents matching fulltext_filters (dictionary attribute - fulltext filter). These filters are interpreted as regular expressions. Only string values may be filtered this way. There's no validation that queried attribute can be fulltext filtered. Only plain and observation attributes with string-based data types can be queried. Array and set data types are supported as well as long as they are not multi value at the same time. If you need to filter EIDs, use attribute eid.

Also returns total document count (after filtering).

May raise DatabaseError if query is invalid.

Source code in dp3/database/database.py
def get_latest_snapshots(
    self, etype: str, fulltext_filters: Optional[dict[str, str]] = None
) -> tuple[pymongo.cursor.Cursor, int]:
    """Get latest snapshots of given `etype`.

    This method is useful for displaying data on web.

    Returns only documents matching `fulltext_filters`
    (dictionary attribute - fulltext filter).
    These filters are interpreted as regular expressions.
    Only string values may be filtered this way. There's no validation that queried attribute
    can be fulltext filtered.
    Only plain and observation attributes with string-based data types can be queried.
    Array and set data types are supported as well as long as they are not multi value
    at the same time.
    If you need to filter EIDs, use attribute `eid`.

    Also returns total document count (after filtering).

    May raise `DatabaseError` if query is invalid.
    """
    # Check `etype`
    self._assert_etype_exists(etype)

    snapshot_col = self._snapshots_col_name(etype)

    # Find newest fully completed snapshot date
    latest_snapshot_date = self._get_latest_snapshots_date()

    # There are no fully completed snapshots sets - return all currently existing snapshots
    if latest_snapshot_date is None:
        return self._db[snapshot_col].find().sort([("eid", pymongo.ASCENDING)]), self._db[
            snapshot_col
        ].count_documents({})

    # Create base of query
    query = {"_time_created": latest_snapshot_date}

    if not fulltext_filters:
        fulltext_filters = {}

    # Process fulltext filters
    for attr in fulltext_filters:
        fulltext_filter = {"$regex": fulltext_filters[attr], "$options": "i"}

        # EID filter
        if attr == "eid":
            query[attr] = fulltext_filter
            continue

        # Check if attribute exists
        try:
            attr_spec = self._db_schema_config.attr(etype, attr)
        except KeyError as e:
            raise DatabaseError(f"Attribute '{attr}' in fulltext filter doesn't exist") from e

        # Correctly handle link<...> data type
        if attr_spec.t in AttrType.PLAIN | AttrType.OBSERVATIONS and attr_spec.is_relation:
            query[attr + ".eid"] = fulltext_filter
        else:
            query[attr] = fulltext_filter

    try:
        return self._db[snapshot_col].find(query).sort([("eid", pymongo.ASCENDING)]), self._db[
            snapshot_col
        ].count_documents(query)
    except OperationFailure as e:
        raise DatabaseError("Invalid query") from e

get_snapshots

get_snapshots(etype: str, eid: str, t1: Optional[datetime] = None, t2: Optional[datetime] = None) -> pymongo.cursor.Cursor

Get all (or filtered) snapshots of given eid.

This method is useful for displaying eid's history on web.

Parameters:

Name Type Description Default
etype str

entity type

required
eid str

id of entity, to which data-points correspond

required
t1 Optional[datetime]

left value of time interval (inclusive)

None
t2 Optional[datetime]

right value of time interval (inclusive)

None
Source code in dp3/database/database.py
def get_snapshots(
    self, etype: str, eid: str, t1: Optional[datetime] = None, t2: Optional[datetime] = None
) -> pymongo.cursor.Cursor:
    """Get all (or filtered) snapshots of given `eid`.

    This method is useful for displaying `eid`'s history on web.

    Args:
        etype: entity type
        eid: id of entity, to which data-points correspond
        t1: left value of time interval (inclusive)
        t2: right value of time interval (inclusive)
    """
    # Check `etype`
    self._assert_etype_exists(etype)

    snapshot_col = self._snapshots_col_name(etype)
    query = {"eid": eid, "_time_created": {}}

    # Filter by date
    if t1:
        query["_time_created"]["$gte"] = t1
    if t2:
        query["_time_created"]["$lte"] = t2

    # Unset if empty
    if not query["_time_created"]:
        del query["_time_created"]

    return self._db[snapshot_col].find(query).sort([("_time_created", pymongo.ASCENDING)])

get_value_or_history

get_value_or_history(etype: str, attr_name: str, eid: str, t1: Optional[datetime] = None, t2: Optional[datetime] = None) -> dict

Gets current value and/or history of attribute for given eid.

Depends on attribute type: - plain: just (current) value - observations: (current) value and history stored in master record (optionally filtered) - timeseries: just history stored in master record (optionally filtered)

Returns dict with two keys: current_value and history (list of values).

Source code in dp3/database/database.py
def get_value_or_history(
    self,
    etype: str,
    attr_name: str,
    eid: str,
    t1: Optional[datetime] = None,
    t2: Optional[datetime] = None,
) -> dict:
    """Gets current value and/or history of attribute for given `eid`.

    Depends on attribute type:
    - plain: just (current) value
    - observations: (current) value and history stored in master record (optionally filtered)
    - timeseries: just history stored in master record (optionally filtered)

    Returns dict with two keys: `current_value` and `history` (list of values).
    """
    # Check `etype`
    self._assert_etype_exists(etype)

    attr_spec = self._db_schema_config.attr(etype, attr_name)

    result = {"current_value": None, "history": []}

    # Add current value to the result
    if attr_spec.t == AttrType.PLAIN:
        result["current_value"] = (
            self.get_master_record(etype, eid).get(attr_name, {}).get("v", None)
        )
    elif attr_spec.t == AttrType.OBSERVATIONS:
        result["current_value"] = self.get_latest_snapshot(etype, eid).get(attr_name, None)

    # Add history
    if attr_spec.t == AttrType.OBSERVATIONS:
        result["history"] = self.get_observation_history(etype, attr_name, eid, t1, t2)
    elif attr_spec.t == AttrType.TIMESERIES:
        result["history"] = self.get_timeseries_history(etype, attr_name, eid, t1, t2)

    return result

estimate_count_eids

estimate_count_eids(etype: str) -> int

Estimates count of eids in given etype

Source code in dp3/database/database.py
def estimate_count_eids(self, etype: str) -> int:
    """Estimates count of `eid`s in given `etype`"""
    # Check `etype`
    self._assert_etype_exists(etype)

    master_col = self._master_col_name(etype)
    return self._db[master_col].estimated_document_count({})

save_snapshot

save_snapshot(etype: str, snapshot: dict, time: datetime)

Saves snapshot to specified entity of current master document.

Source code in dp3/database/database.py
def save_snapshot(self, etype: str, snapshot: dict, time: datetime):
    """Saves snapshot to specified entity of current master document."""
    # Check `etype`
    self._assert_etype_exists(etype)

    snapshot["_time_created"] = time

    snapshot_col = self._snapshots_col_name(etype)
    try:
        self._db[snapshot_col].insert_one(snapshot)
        self.log.debug(f"Inserted snapshot: {snapshot}")
    except Exception as e:
        raise DatabaseError(f"Insert of snapshot failed: {e}\n{snapshot}") from e

save_snapshots

save_snapshots(etype: str, snapshots: list[dict], time: datetime)

Saves a list of snapshots of current master documents.

All snapshots must belong to same entity type.

Source code in dp3/database/database.py
def save_snapshots(self, etype: str, snapshots: list[dict], time: datetime):
    """
    Saves a list of snapshots of current master documents.

    All snapshots must belong to same entity type.
    """
    # Check `etype`
    self._assert_etype_exists(etype)

    for snapshot in snapshots:
        snapshot["_time_created"] = time

    snapshot_col = self._snapshots_col_name(etype)
    try:
        self._db[snapshot_col].insert_many(snapshots)
        self.log.debug(f"Inserted snapshots: {snapshots}")
    except Exception as e:
        raise DatabaseError(f"Insert of snapshots failed: {e}\n{snapshots}") from e

save_metadata

save_metadata(time: datetime, metadata: dict)

Saves snapshot to specified entity of current master document.

Source code in dp3/database/database.py
def save_metadata(self, time: datetime, metadata: dict):
    """Saves snapshot to specified entity of current master document."""
    module = get_caller_id()
    metadata["_id"] = module + time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    metadata["#module"] = module
    metadata["#time_created"] = time
    metadata["#last_update"] = datetime.now()
    try:
        self._db["#metadata"].insert_one(metadata)
        self.log.debug("Inserted metadata %s: %s", metadata["_id"], metadata)
    except Exception as e:
        raise DatabaseError(f"Insert of metadata failed: {e}\n{metadata}") from e

get_observation_history

get_observation_history(etype: str, attr_name: str, eid: str, t1: datetime = None, t2: datetime = None, sort: int = None) -> list[dict]

Get full (or filtered) history of observation attribute.

This method is useful for displaying eid's history on web. Also used to feed data into get_timeseries_history().

Parameters:

Name Type Description Default
etype str

entity type

required
attr_name str

name of attribute

required
eid str

id of entity, to which data-points correspond

required
t1 datetime

left value of time interval (inclusive)

None
t2 datetime

right value of time interval (inclusive)

None
sort int

sort by timestamps - 0: ascending order by t1, 1: descending order by t2, None: don't sort

None

Returns: list of dicts (reduced datapoints)

Source code in dp3/database/database.py
def get_observation_history(
    self,
    etype: str,
    attr_name: str,
    eid: str,
    t1: datetime = None,
    t2: datetime = None,
    sort: int = None,
) -> list[dict]:
    """Get full (or filtered) history of observation attribute.

    This method is useful for displaying `eid`'s history on web.
    Also used to feed data into `get_timeseries_history()`.

    Args:
        etype: entity type
        attr_name: name of attribute
        eid: id of entity, to which data-points correspond
        t1: left value of time interval (inclusive)
        t2: right value of time interval (inclusive)
        sort: sort by timestamps - 0: ascending order by t1, 1: descending order by t2,
            None: don't sort
    Returns:
        list of dicts (reduced datapoints)
    """
    t1 = datetime.fromtimestamp(0) if t1 is None else t1.replace(tzinfo=None)
    t2 = datetime.now() if t2 is None else t2.replace(tzinfo=None)

    # Get attribute history
    mr = self.get_master_record(etype, eid)
    attr_history = mr.get(attr_name, [])

    # Filter
    attr_history_filtered = [row for row in attr_history if row["t1"] <= t2 and row["t2"] >= t1]

    # Sort
    if sort == 1:
        attr_history_filtered.sort(key=lambda row: row["t1"])
    elif sort == 2:
        attr_history_filtered.sort(key=lambda row: row["t2"], reverse=True)

    return attr_history_filtered

get_timeseries_history

get_timeseries_history(etype: str, attr_name: str, eid: str, t1: datetime = None, t2: datetime = None, sort: int = None) -> list[dict]

Get full (or filtered) history of timeseries attribute. Outputs them in format:

    [
        {
            "t1": ...,
            "t2": ...,
            "v": {
                "series1": ...,
                "series2": ...
            }
        },
        ...
    ]
This method is useful for displaying eid's history on web.

Parameters:

Name Type Description Default
etype str

entity type

required
attr_name str

name of attribute

required
eid str

id of entity, to which data-points correspond

required
t1 datetime

left value of time interval (inclusive)

None
t2 datetime

right value of time interval (inclusive)

None
sort int

sort by timestamps - 0: ascending order by t1, 1: descending order by t2, None: don't sort

None

Returns: list of dicts (reduced datapoints) - each represents just one point at time

Source code in dp3/database/database.py
def get_timeseries_history(
    self,
    etype: str,
    attr_name: str,
    eid: str,
    t1: datetime = None,
    t2: datetime = None,
    sort: int = None,
) -> list[dict]:
    """Get full (or filtered) history of timeseries attribute.
    Outputs them in format:
    ```
        [
            {
                "t1": ...,
                "t2": ...,
                "v": {
                    "series1": ...,
                    "series2": ...
                }
            },
            ...
        ]
    ```
    This method is useful for displaying `eid`'s history on web.

    Args:
        etype: entity type
        attr_name: name of attribute
        eid: id of entity, to which data-points correspond
        t1: left value of time interval (inclusive)
        t2: right value of time interval (inclusive)
        sort: sort by timestamps - `0`: ascending order by `t1`, `1`: descending order by `t2`,
            `None`: don't sort
    Returns:
         list of dicts (reduced datapoints) - each represents just one point at time
    """
    t1 = datetime.fromtimestamp(0) if t1 is None else t1.replace(tzinfo=None)
    t2 = datetime.now() if t2 is None else t2.replace(tzinfo=None)

    attr_history = self.get_observation_history(etype, attr_name, eid, t1, t2, sort)
    if not attr_history:
        return []

    attr_history_split = self._split_timeseries_dps(etype, attr_name, attr_history)

    # Filter out rows outside [t1, t2] interval
    attr_history_filtered = [
        row for row in attr_history_split if row["t1"] <= t2 and row["t2"] >= t1
    ]

    return attr_history_filtered

get_distinct_val_count

get_distinct_val_count(etype: str, attr: str) -> dict[Any, int]

Counts occurences of distinct values of given attribute in snapshots.

Returns dictionary mapping value -> count.

Works for all plain and observation data types except dict and json.

Source code in dp3/database/database.py
def get_distinct_val_count(self, etype: str, attr: str) -> dict[Any, int]:
    """Counts occurences of distinct values of given attribute in snapshots.

    Returns dictionary mapping value -> count.

    Works for all plain and observation data types except `dict` and `json`.
    """
    self._assert_etype_exists(etype)

    snapshot_col = self._snapshots_col_name(etype)

    # Get attribute specification
    try:
        attr_spec = self._db_schema_config.attr(etype, attr)
    except KeyError as e:
        raise DatabaseError(f"Attribute '{attr}' does not exist") from e

    # Find newest fully completed snapshot date
    latest_snapshot_date = self._get_latest_snapshots_date()
    if latest_snapshot_date is None:
        return {}

    if attr_spec.t not in AttrType.PLAIN | AttrType.OBSERVATIONS:
        raise DatabaseError(f"Attribute '{attr}' isn't plain or observations")

    # Attribute data type must be primitive, array<T> or set<T>
    if attr_spec.data_type.root in ("dict", "json"):
        raise DatabaseError(
            f"Data type '{attr_spec.data_type}' of attribute '{attr}' is not processable"
        )

    # Build aggregation query
    agg_query = [
        {"$match": {"_time_created": latest_snapshot_date}},
    ]

    # Unwind array-like and multi value attributes
    # If attribute is multi value array, unwind twice
    if "array" in attr_spec.data_type.root or "set" in attr_spec.data_type.root:
        agg_query.append({"$unwind": "$" + attr})
    if attr_spec.t == AttrType.OBSERVATIONS and attr_spec.multi_value:
        agg_query.append({"$unwind": "$" + attr})

    # Group
    agg_query_group_id = "$" + attr
    if "link" in attr_spec.data_type.root:
        agg_query_group_id += ".eid"
    agg_query.append(
        {
            "$group": {
                "_id": agg_query_group_id,
                "count": {"$sum": 1},
            }
        }
    )

    # Sort
    agg_query.append({"$sort": {"_id": 1, "count": -1}})

    # Run aggregation
    distinct_counts_cur = self._db[snapshot_col].aggregate(agg_query)

    distinct_counts = {x["_id"]: x["count"] for x in distinct_counts_cur}
    if None in distinct_counts:
        del distinct_counts[None]

    return distinct_counts

get_raw

get_raw(etype: str, after: datetime, before: datetime, plain: bool = True) -> pymongo.cursor

Get raw datapoints where t1 is in <after, before).

If plain is True, then all plain datapoints will be returned (default).

Source code in dp3/database/database.py
def get_raw(
    self, etype: str, after: datetime, before: datetime, plain: bool = True
) -> pymongo.cursor:
    """Get raw datapoints where `t1` is in <`after`, `before`).

    If `plain` is `True`, then all plain datapoints will be returned (default).
    """
    raw_col_name = self._raw_col_name(etype)
    query_filter = {"$or": [{"t1": {"$gte": after, "$lt": before}}]}
    if plain:
        query_filter["$or"].append({"t1": None})
    return self._db[raw_col_name].find(query_filter)

delete_old_raw_dps

delete_old_raw_dps(etype: str, before: datetime, plain: bool = True)

Delete raw datapoints older than before.

Deletes all plain datapoints if plain is True (default).

Source code in dp3/database/database.py
def delete_old_raw_dps(self, etype: str, before: datetime, plain: bool = True):
    """Delete raw datapoints older than `before`.

    Deletes all plain datapoints if `plain` is `True` (default).
    """
    raw_col_name = self._raw_col_name(etype)
    query_filter = {"$or": [{"t1": {"$lt": before}}]}
    if plain:
        query_filter["$or"].append({"t1": None})
    try:
        return self._db[raw_col_name].delete_many(query_filter)
    except Exception as e:
        raise DatabaseError(f"Delete of old datapoints failed: {e}") from e

delete_old_snapshots

delete_old_snapshots(etype: str, t_old: datetime)

Delete old snapshots.

Periodically called for all etypes from HistoryManager.

Source code in dp3/database/database.py
def delete_old_snapshots(self, etype: str, t_old: datetime):
    """Delete old snapshots.

    Periodically called for all `etype`s from HistoryManager.
    """
    snapshot_col_name = self._snapshots_col_name(etype)
    try:
        return self._db[snapshot_col_name].delete_many({"_time_created": {"$lt": t_old}})
    except Exception as e:
        raise DatabaseError(f"Delete of olds snapshots failed: {e}") from e

get_module_cache

get_module_cache(override_called_id: Optional[str] = None)

Return a persistent cache collection for given module name.

Module name is determined automatically, but you can override it.

Source code in dp3/database/database.py
def get_module_cache(self, override_called_id: Optional[str] = None):
    """Return a persistent cache collection for given module name.

    Module name is determined automatically, but you can override it.
    """
    module = override_called_id or get_caller_id()
    self.log.debug("Cache collection access: %s", module)
    return self._db[f"#cache#{module}"]

get_estimated_entity_count

get_estimated_entity_count(entity_type: str) -> int

Get count of entities of given type.

Source code in dp3/database/database.py
def get_estimated_entity_count(self, entity_type: str) -> int:
    """Get count of entities of given type."""
    master_col = self._master_col_name(entity_type)
    return self._db[master_col].estimated_document_count()

get_caller_id

get_caller_id()

Returns the name of the caller method's class, or function name if caller is not a method.

Source code in dp3/database/database.py
def get_caller_id():
    """Returns the name of the caller method's class, or function name if caller is not a method."""
    caller = inspect.stack()[2]
    if module := caller.frame.f_locals.get("self"):
        return module.__class__.__qualname__
    return caller.function