Skip to content



TypedSnapshotCollection(db: Database, entity_type: str, db_config: MongoConfig, model_spec: ModelSpec, snapshots_config: dict)

Bases: ABC

Snapshot collection handler with eid type awareness.

Source code in dp3/database/
def __init__(
    db: Database,
    entity_type: str,
    db_config: MongoConfig,
    model_spec: ModelSpec,
    snapshots_config: dict,
    self._db = db.with_options(codec_options=get_codec_options())

    if entity_type not in model_spec.entities:
        raise ValueError(f"Entity type '{entity_type}' not found in model spec")
    self.entity_type = entity_type
    self._col_name = f"{entity_type}#snapshots"
    self._os_col_name = f"{entity_type}#oversized_snapshots"

    self.attr_specs: dict[str, AttrSpecType] = model_spec.entity_attributes[entity_type]

    self.log = logging.getLogger(f"EntityDatabase.SnapshotCollection[{entity_type}]")

    self._normal_snapshot_eids = set()
    self._oversized_snapshot_eids = set()
    self._snapshot_bucket_size =
    self._bucket_delta = self._get_snapshot_bucket_delta(snapshots_config)


get_latest_one(eid: AnyEidT) -> dict

Get latest snapshot of given eid.

If doesn't exist, returns {}.

Source code in dp3/database/
def get_latest_one(self, eid: AnyEidT) -> dict:
    """Get latest snapshot of given eid.

    If doesn't exist, returns {}.
    return (
        self._col().find_one(self._filter_from_eid(eid), {"last": 1}, sort=[("_id", -1)]) or {}
    ).get("last", {})


get_latest(fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> tuple[Cursor, int]

Get latest snapshots of given etype.

This method is useful for displaying data on web.

Returns only documents matching generic_filter and fulltext_filters (dictionary attribute - fulltext filter). Fulltext filters are interpreted as regular expressions. Only string values may be filtered this way. There's no validation that queried attribute can be fulltext filtered. Only plain and observation attributes with string-based data types can be queried. Array and set data types are supported as well as long as they are not multi value at the same time. If you need to filter EIDs, ensure the EID is string, then use attribute eid. Otherwise, use generic filter.

Generic filter allows filtering using generic MongoDB query (including $and, $or, $lt, etc.). For querying non-JSON-native types, you can use magic strings, such as "$$IPv4{<ip address>}" for IPv4 addresses. The full spec with examples is in the magic strings module.

Generic and fulltext filters are merged - fulltext overrides conflicting keys.

Also returns total document count (after filtering).

May raise SnapshotCollectionError if query is invalid.

Source code in dp3/database/
def get_latest(
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> tuple[Cursor, int]:
    """Get latest snapshots of given `etype`.

    This method is useful for displaying data on web.

    Returns only documents matching `generic_filter` and `fulltext_filters`
    (dictionary attribute - fulltext filter).
    Fulltext filters are interpreted as regular expressions.
    Only string values may be filtered this way. There's no validation that queried attribute
    can be fulltext filtered.
    Only plain and observation attributes with string-based data types can be queried.
    Array and set data types are supported as well as long as they are not multi value
    at the same time.
    If you need to filter EIDs, ensure the EID is string, then use attribute `eid`.
    Otherwise, use generic filter.

    Generic filter allows filtering using generic MongoDB query (including `$and`, `$or`,
    `$lt`, etc.).
    For querying non-JSON-native types, you can use magic strings, such as
    `"$$IPv4{<ip address>}"` for IPv4 addresses. The full spec with examples is in the
    [magic strings module][dp3.database.magic].

    Generic and fulltext filters are merged - fulltext overrides conflicting keys.

    Also returns total document count (after filtering).

    May raise `SnapshotCollectionError` if query is invalid.
    snapshot_col = self._col()
    query = self._prepare_latest_query(fulltext_filters or {}, generic_filter or {})

        return snapshot_col.find(query, {"last": 1}).sort(
            [("_id", pymongo.ASCENDING)]
        ), snapshot_col.count_documents(query)
    except OperationFailure as e:
        raise SnapshotCollectionError(f"Query is invalid: {e}") from e


find_latest(fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> Cursor

Find latest snapshots of given etype.

See get_latest for more information.

Returns only documents matching generic_filter and fulltext_filters, does not count them.

Source code in dp3/database/
def find_latest(
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> Cursor:
    """Find latest snapshots of given `etype`.

    See [`get_latest`][dp3.database.snapshots.SnapshotCollectionContainer.get_latest]
    for more information.

    Returns only documents matching `generic_filter` and `fulltext_filters`,
    does not count them.
    query = self._prepare_latest_query(fulltext_filters or {}, generic_filter or {})
        return self._col().find(query, {"last": 1}).sort([("_id", pymongo.ASCENDING)])
    except OperationFailure as e:
        raise SnapshotCollectionError(f"Query is invalid: {e}") from e


count_latest(fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> int

Count latest snapshots of given etype.

See get_latest for more information.

Returns only count of documents matching generic_filter and fulltext_filters.

Note that this method may take much longer than get_latest on larger databases, as it does count all documents, not just return the first few.

Source code in dp3/database/
def count_latest(
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> int:
    """Count latest snapshots of given `etype`.

    See [`get_latest`][dp3.database.snapshots.SnapshotCollectionContainer.get_latest]
    for more information.

    Returns only count of documents matching `generic_filter` and `fulltext_filters`.

    Note that this method may take much longer than `get_latest` on larger databases,
    as it does count all documents, not just return the first few.
    query = self._prepare_latest_query(fulltext_filters or {}, generic_filter or {})
        return self._col().count_documents(query)
    except OperationFailure as e:
        raise SnapshotCollectionError(f"Query is invalid: {e}") from e


get_by_eid(eid: AnyEidT, t1: Optional[datetime] = None, t2: Optional[datetime] = None) -> Union[Cursor, CommandCursor]

Get all (or filtered) snapshots of given eid.

This method is useful for displaying eid's history on web.


Name Type Description Default
eid AnyEidT

id of entity, to which data-points correspond

t1 Optional[datetime]

left value of time interval (inclusive)

t2 Optional[datetime]

right value of time interval (inclusive)

Source code in dp3/database/
def get_by_eid(
    self, eid: AnyEidT, t1: Optional[datetime] = None, t2: Optional[datetime] = None
) -> Union[Cursor, CommandCursor]:
    """Get all (or filtered) snapshots of given `eid`.

    This method is useful for displaying `eid`'s history on web.

        eid: id of entity, to which data-points correspond
        t1: left value of time interval (inclusive)
        t2: right value of time interval (inclusive)
    snapshot_col = self._col()

    # Find out if the snapshot is oversized
    doc = (
        snapshot_col.find(self._filter_from_eid(eid), {"oversized": 1})
        .sort([("_id", -1)])
    doc = next(doc, None)
    if doc and doc.get("oversized", False):
        return self._get_oversized(eid, t1, t2)

    query = {"_time_created": {}}
    pipeline = [
        {"$match": self._filter_from_eid(eid)},
        {"$unwind": "$history"},
        {"$replaceRoot": {"newRoot": "$history"}},

    # Filter by date
    if t1:
        query["_time_created"]["$gte"] = t1
    if t2:
        query["_time_created"]["$lte"] = t2

    # Unset if empty
    if query["_time_created"]:
        pipeline.append({"$match": query})
    pipeline.append({"$sort": {"_time_created": pymongo.ASCENDING}})
    return snapshot_col.aggregate(pipeline)


get_distinct_val_count(attr: str) -> dict[Any, int]

Counts occurrences of distinct values of given attribute in snapshots.

Returns dictionary mapping value -> count.

Works for all plain and observation data types except dict and json.

Source code in dp3/database/
def get_distinct_val_count(self, attr: str) -> dict[Any, int]:
    """Counts occurrences of distinct values of given attribute in snapshots.

    Returns dictionary mapping value -> count.

    Works for all plain and observation data types except `dict` and `json`.
    # Get attribute specification
        attr_spec = self.attr_specs[attr]
    except KeyError as e:
        raise SnapshotCollectionError(f"Attribute '{attr}' does not exist") from e

    if attr_spec.t not in AttrType.PLAIN | AttrType.OBSERVATIONS:
        raise SnapshotCollectionError(f"Attribute '{attr}' isn't plain or observations")

    # Attribute data type must be primitive, array<T> or set<T>
    if any(needle in attr_spec.data_type.root for needle in ("dict", "json")):
        raise SnapshotCollectionError(
            f"Data type '{attr_spec.data_type}' of attribute '{attr}' is not processable"

    # Build aggregation query
    attr_path = "$last." + attr
    unwinding = []

    # Unwind array-like and multi value attributes
    # If attribute is multi value array, unwind twice
    if "array" in attr_spec.data_type.root or "set" in attr_spec.data_type.root:
        unwinding.append({"$unwind": attr_path})
    if attr_spec.t == AttrType.OBSERVATIONS and attr_spec.multi_value:
        unwinding.append({"$unwind": attr_path})

    # Group
    agg_query_group_id = attr_path
    if "link" in attr_spec.data_type.root:
        agg_query_group_id += ".eid"

    agg_query = [
        {"$match": {"latest": True}},
        {"$group": {"_id": agg_query_group_id, "count": {"$sum": 1}}},
        {"$sort": {"_id": 1, "count": -1}},
    # Run aggregation
    distinct_counts_cur = self._col().aggregate(agg_query)

    distinct_counts = {x["_id"]: x["count"] for x in distinct_counts_cur}

    if None in distinct_counts:
        del distinct_counts[None]

    return distinct_counts


save_one(snapshot: dict, ctime: datetime)

Saves snapshot to specified entity of current master document.

Will move snapshot to oversized snapshots if the maintained bucket is too large.

Source code in dp3/database/
def save_one(self, snapshot: dict, ctime: datetime):
    """Saves snapshot to specified entity of current master document.

    Will move snapshot to oversized snapshots if the maintained bucket is too large.
    if "eid" not in snapshot:
        self.log.error("Snapshot is missing 'eid' field: %s", snapshot)
    eid = snapshot["eid"]
    snapshot["_time_created"] = ctime

    snapshot_col = self._col()
    os_col = self._os_col()

    # Find out if the snapshot is oversized
    normal, oversized = self._get_state({eid})
    if normal:
            res = snapshot_col.update_one(
                self._filter_from_eid(eid) | {"count": {"$lt": self._snapshot_bucket_size}},
                    "$set": {"last": snapshot},
                    "$push": {"history": {"$each": [snapshot], "$position": 0}},
                    "$inc": {"count": 1},
                    "$setOnInsert": {
                        "_id": self._bucket_id(eid, ctime),
                        "_time_created": ctime,
                        "oversized": False,
                        "latest": True,

            if res.upserted_id is not None:
                    | {"latest": True, "count": self._snapshot_bucket_size},
                    {"$unset": {"latest": 1}},
        except (WriteError, OperationFailure, DocumentTooLarge) as e:
            if e.code != BSON_OBJECT_TOO_LARGE:
                raise e
            # The snapshot is too large, move it to oversized snapshots
  "Snapshot of {eid} is too large: {e}, marking as oversized.")
            self._migrate_to_oversized(eid, snapshot)
            self._cache_snapshot_state(set(), normal)
        except Exception as e:
            raise SnapshotCollectionError(
                f"Insert of snapshot {eid} failed: {e}, {snapshot}"
            ) from e
    elif oversized:
        # Snapshot is already marked as oversized
        snapshot_col.update_one(self._filter_from_eid(eid), {"$set": {"last": snapshot}})


save_many(snapshots: list[dict], ctime: datetime)

Saves a list of snapshots of current master documents.

All snapshots must belong to same entity type.

Will move snapshots to oversized snapshots if the maintained bucket is too large. For better understanding, see save().

Source code in dp3/database/
def save_many(self, snapshots: list[dict], ctime: datetime):
    Saves a list of snapshots of current master documents.

    All snapshots must belong to same entity type.

    Will move snapshots to oversized snapshots if the maintained bucket is too large.
    For better understanding, see `save()`.

    for snapshot in snapshots:
        snapshot["_time_created"] = ctime

    snapshot_col = self._col()
    os_col = self._os_col()

    snapshots_by_eid = defaultdict(list)
    for snapshot in snapshots:
        if "eid" not in snapshot:
        if "_id" in snapshot:
            del snapshot["_id"]

    # Find out if any of the snapshots are oversized
    normal, oversized = self._get_state(set(snapshots_by_eid.keys()))

    upserts = []
    update_originals: list[list[dict]] = []
    oversized_inserts = []
    oversized_updates = []

    # A normal snapshot, shift the last snapshot to history and update last
    for eid in normal:
                self._filter_from_eid(eid) | {"count": {"$lt": self._snapshot_bucket_size}},
                    "$set": {"last": snapshots_by_eid[eid][-1]},
                    "$push": {"history": {"$each": snapshots_by_eid[eid], "$position": 0}},
                    "$inc": {"count": len(snapshots_by_eid[eid])},
                    "$setOnInsert": {
                        "_id": self._bucket_id(eid, ctime),
                        "_time_created": ctime,
                        "oversized": False,
                        "latest": True,

    # Snapshot is already marked as oversized
    for eid in oversized:
                {"$set": {"last": snapshots_by_eid[eid][-1]}},

    new_oversized = set()

    if upserts:
            res = snapshot_col.bulk_write(upserts, ordered=False)

            # Unset latest snapshots if new snapshots were inserted
            if res.upserted_count > 0:
                unset_latest_updates = []
                for upsert_id in res.upserted_ids.values():
                            | {"latest": True, "count": self._snapshot_bucket_size},
                            {"$unset": {"latest": 1}},
                up_res = snapshot_col.bulk_write(unset_latest_updates)
                if up_res.modified_count != res.upserted_count:
                        "Upserted the first snapshot for %d entities.",
                        res.upserted_count - up_res.modified_count,

            if res.modified_count + res.upserted_count != len(upserts):
                    "Some snapshots were not updated, %s != %s",
                    res.modified_count + res.upserted_count,
        except (BulkWriteError, OperationFailure) as e:
  "Update of snapshots failed, will retry with oversize.")
            failed_indexes = [
                for err in e.details["writeErrors"]
                if err["code"] == BSON_OBJECT_TOO_LARGE
            failed_snapshots = (update_originals[i] for i in failed_indexes)
            for eid_snapshots in failed_snapshots:
                eid = eid_snapshots[0]["eid"]
                failed_snapshots = sorted(
                    eid_snapshots, key=lambda s: s["_time_created"], reverse=True
                self._migrate_to_oversized(eid, failed_snapshots[0])

            if any(err["code"] != BSON_OBJECT_TOO_LARGE for err in e.details["writeErrors"]):
                # Some other error occurred
                raise e
        except Exception as e:
            raise SnapshotCollectionError(f"Upsert of snapshots failed: {str(e)[:2048]}") from e

    # Update the oversized snapshots
    if oversized_inserts:
            if oversized_updates:
        except Exception as e:
            raise SnapshotCollectionError(f"Insert of snapshots failed: {str(e)[:2048]}") from e

    # Cache the new state
    self._cache_snapshot_state(set(), new_oversized)


delete_old(t_old: datetime) -> int

Delete old snapshots.

Periodically called from HistoryManager.

Source code in dp3/database/
def delete_old(self, t_old: datetime) -> int:
    """Delete old snapshots.

    Periodically called from HistoryManager.
    deleted = 0
        res = self._col().delete_many(
            {"_time_created": {"$lte": t_old - self._bucket_delta}},
        deleted += res.deleted_count * self._snapshot_bucket_size
        res = self._os_col().delete_many({"_time_created": {"$lt": t_old}})
        deleted += res.deleted_count
    except Exception as e:
        raise SnapshotCollectionError(f"Delete of olds snapshots failed: {e}") from e
    return deleted


delete_eid(eid: AnyEidT)

Delete all snapshots of eid.

Source code in dp3/database/
def delete_eid(self, eid: AnyEidT):
    """Delete all snapshots of `eid`."""
        res = self._col().delete_many(self._filter_from_eid(eid))
        del_cnt = res.deleted_count * self._snapshot_bucket_size
        self.log.debug("deleted %s snapshots of %s/%s.", del_cnt, self.entity_type, eid)

        res = self._os_col().delete_many({"eid": eid})
            "Deleted %s oversized snapshots of %s/%s.", res.deleted_count, self.entity_type, eid
    except Exception as e:
        raise SnapshotCollectionError(f"Delete of failed: {e}\n{eid}") from e


delete_eids(eids: list[Any])

Delete all snapshots of eids.

Source code in dp3/database/
def delete_eids(self, eids: list[Any]):
    """Delete all snapshots of `eids`."""
        res = self._col().delete_many(self._filter_from_eids(eids))
        del_cnt = res.deleted_count * self._snapshot_bucket_size
        self.log.debug("Deleted %s snapshots of %s (%s).", del_cnt, self.entity_type, len(eids))
        res = self._os_col().delete_many({"eid": {"$in": eids}})
            "Deleted %s oversized snapshots of %s (%s).",
    except Exception as e:
        raise SnapshotCollectionError(f"Delete of snapshots failed: {e}\n{eids}") from e


BinaryEidSnapshots(db: Database, entity_type: str, db_config: MongoConfig, model_spec: ModelSpec, snapshots_config: dict)

Bases: TypedSnapshotCollection, ABC

Source code in dp3/database/
def __init__(
    db: Database,
    entity_type: str,
    db_config: MongoConfig,
    model_spec: ModelSpec,
    snapshots_config: dict,
    self._db = db.with_options(codec_options=get_codec_options())

    if entity_type not in model_spec.entities:
        raise ValueError(f"Entity type '{entity_type}' not found in model spec")
    self.entity_type = entity_type
    self._col_name = f"{entity_type}#snapshots"
    self._os_col_name = f"{entity_type}#oversized_snapshots"

    self.attr_specs: dict[str, AttrSpecType] = model_spec.entity_attributes[entity_type]

    self.log = logging.getLogger(f"EntityDatabase.SnapshotCollection[{entity_type}]")

    self._normal_snapshot_eids = set()
    self._oversized_snapshot_eids = set()
    self._snapshot_bucket_size =
    self._bucket_delta = self._get_snapshot_bucket_delta(snapshots_config)


SnapshotCollectionContainer(db: Database, db_config: MongoConfig, model_spec: ModelSpec, snapshots_config: dict)

Container for all required snapshot collections, exposing the public interface.

Source code in dp3/database/
def __init__(
    self, db: Database, db_config: MongoConfig, model_spec: ModelSpec, snapshots_config: dict
    self._snapshot_collections = {}
    for entity_type, entity_spec in model_spec.entities.items():
        eid_type_name = entity_spec.id_data_type.root
        typed_collection: type[TypedSnapshotCollection] = entity_type2collection[eid_type_name]
        self._snapshot_collections[entity_type] = typed_collection(
            db, entity_type, db_config, model_spec, snapshots_config

    self.log = logging.getLogger("EntityDatabase.SnapshotCollections")


save_one(entity_type: str, snapshot: dict, ctime: datetime) -> None

Save snapshot to specified entity of current master document.

Source code in dp3/database/
def save_one(self, entity_type: str, snapshot: dict, ctime: datetime) -> None:
    """Save snapshot to specified entity of current master document."""
    return self[entity_type].save_one(snapshot, ctime)


save_many(entity_type: str, snapshots: list[dict], ctime: datetime) -> None

Saves a list of snapshots of current master documents.

All snapshots must belong to same entity type.

Source code in dp3/database/
def save_many(self, entity_type: str, snapshots: list[dict], ctime: datetime) -> None:
    Saves a list of snapshots of current master documents.

    All snapshots must belong to same entity type.
    return self[entity_type].save_many(snapshots, ctime)


get_latest_one(entity_type: str, eid: AnyEidT) -> dict

Get latest snapshot of given eid.

If doesn't exist, returns {}.

Source code in dp3/database/
def get_latest_one(self, entity_type: str, eid: AnyEidT) -> dict:
    """Get latest snapshot of given eid.

    If doesn't exist, returns {}.
    return self[entity_type].get_latest_one(eid)


get_latest(entity_type: str, fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> tuple[Cursor, int]

Get latest snapshots of given etype.

This method is useful for displaying data on web.

Returns only documents matching generic_filter and fulltext_filters (dictionary attribute - fulltext filter). Fulltext filters are interpreted as regular expressions. Only string values may be filtered this way. There's no validation that queried attribute can be fulltext filtered. Only plain and observation attributes with string-based data types can be queried. Array and set data types are supported as well as long as they are not multi value at the same time. If you need to filter EIDs, ensure the EID is string, then use attribute eid. Otherwise, use generic filter.

Generic filter allows filtering using generic MongoDB query (including $and, $or, $lt, etc.). For querying non-JSON-native types, you can use magic strings, such as "$$IPv4{<ip address>}" for IPv4 addresses. The full spec with examples is in the magic strings module.

Generic and fulltext filters are merged - fulltext overrides conflicting keys.

Also returns total document count (after filtering).

May raise SnapshotCollectionError if query is invalid.

Source code in dp3/database/
def get_latest(
    entity_type: str,
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> tuple[Cursor, int]:
    """Get latest snapshots of given `etype`.

    This method is useful for displaying data on web.

    Returns only documents matching `generic_filter` and `fulltext_filters`
    (dictionary attribute - fulltext filter).
    Fulltext filters are interpreted as regular expressions.
    Only string values may be filtered this way. There's no validation that queried attribute
    can be fulltext filtered.
    Only plain and observation attributes with string-based data types can be queried.
    Array and set data types are supported as well as long as they are not multi value
    at the same time.
    If you need to filter EIDs, ensure the EID is string, then use attribute `eid`.
    Otherwise, use generic filter.

    Generic filter allows filtering using generic MongoDB query (including `$and`, `$or`,
    `$lt`, etc.).
    For querying non-JSON-native types, you can use magic strings, such as
    `"$$IPv4{<ip address>}"` for IPv4 addresses. The full spec with examples is in the
    [magic strings module][dp3.database.magic].

    Generic and fulltext filters are merged - fulltext overrides conflicting keys.

    Also returns total document count (after filtering).

    May raise `SnapshotCollectionError` if query is invalid.
    return self[entity_type].get_latest(fulltext_filters, generic_filter)


find_latest(entity_type: str, fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> Cursor

Find latest snapshots of given etype.

see get_latest for more information.

Returns only documents matching generic_filter and fulltext_filters, does not count them.

Source code in dp3/database/
def find_latest(
    entity_type: str,
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> Cursor:
    """Find latest snapshots of given `etype`.

    see [`get_latest`][dp3.database.snapshots.SnapshotCollectionContainer.get_latest]
    for more information.

    Returns only documents matching `generic_filter` and `fulltext_filters`,
    does not count them.
    return self[entity_type].find_latest(fulltext_filters, generic_filter)


count_latest(entity_type: str, fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> int

Count latest snapshots of given etype.

see get_latest for more information.

Returns only count of documents matching generic_filter and fulltext_filters.

Note that this method may take much longer than get_latest on larger databases, as it does count all documents, not just return the first few.

Source code in dp3/database/
def count_latest(
    entity_type: str,
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> int:
    """Count latest snapshots of given `etype`.

    see [`get_latest`][dp3.database.snapshots.SnapshotCollectionContainer.get_latest]
    for more information.

    Returns only count of documents matching `generic_filter` and `fulltext_filters`.

    Note that this method may take much longer than `get_latest` on larger databases,
    as it does count all documents, not just return the first few.
    return self[entity_type].count_latest(fulltext_filters, generic_filter)


get_by_eid(entity_type: str, eid: AnyEidT, t1: Optional[datetime] = None, t2: Optional[datetime] = None) -> Union[Cursor, CommandCursor]

Get all (or filtered) snapshots of given eid.

This method is useful for displaying eid's history on web.


Name Type Description Default
entity_type str

name of entity type

eid AnyEidT

id of entity, to which data-points correspond

t1 Optional[datetime]

left value of time interval (inclusive)

t2 Optional[datetime]

right value of time interval (inclusive)

Source code in dp3/database/
def get_by_eid(
    entity_type: str,
    eid: AnyEidT,
    t1: Optional[datetime] = None,
    t2: Optional[datetime] = None,
) -> Union[Cursor, CommandCursor]:
    """Get all (or filtered) snapshots of given `eid`.

    This method is useful for displaying `eid`'s history on web.

        entity_type: name of entity type
        eid: id of entity, to which data-points correspond
        t1: left value of time interval (inclusive)
        t2: right value of time interval (inclusive)
    return self[entity_type].get_by_eid(eid, t1, t2)


get_distinct_val_count(entity_type: str, attr: str) -> dict[Any, int]

Counts occurrences of distinct values of given attribute in snapshots.

Returns dictionary mapping value -> count.

Works for all plain and observation data types except dict and json.

Source code in dp3/database/
def get_distinct_val_count(self, entity_type: str, attr: str) -> dict[Any, int]:
    """Counts occurrences of distinct values of given attribute in snapshots.

    Returns dictionary mapping value -> count.

    Works for all plain and observation data types except `dict` and `json`.
    return self[entity_type].get_distinct_val_count(attr)


delete_old(t_old: datetime) -> int

Delete old snapshots, may raise SnapshotCollectionError.

Periodically called from HistoryManager. Returns: number of deleted snapshots.

Source code in dp3/database/
def delete_old(self, t_old: datetime) -> int:
    """Delete old snapshots, may raise `SnapshotCollectionError`.

    Periodically called from HistoryManager.
         number of deleted snapshots.
    deleted_total = 0
    for collection in self._snapshot_collections.values():
            deleted_total += collection.delete_old(t_old)
        except Exception as e:
            raise SnapshotCollectionError(f"Delete of old snapshots failed: {e}") from e
    return deleted_total


delete_eid(entity_type: str, eid: AnyEidT) -> int

Delete snapshots of given eids.

Source code in dp3/database/
def delete_eid(self, entity_type: str, eid: AnyEidT) -> int:
    """Delete snapshots of given `eids`."""
    return self[entity_type].delete_eid(eid)


delete_eids(entity_type: str, eids: list[Any]) -> int

Delete snapshots of given eids.

Source code in dp3/database/
def delete_eids(self, entity_type: str, eids: list[Any]) -> int:
    """Delete snapshots of given `eids`."""
    return self[entity_type].delete_eids(eids)